123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275 |
- # Copyright 2007 Matt Chaput. All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are met:
- #
- # 1. Redistributions of source code must retain the above copyright notice,
- # this list of conditions and the following disclaimer.
- #
- # 2. Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- #
- # THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
- # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
- # EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
- # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
- # OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
- # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- #
- # The views and conclusions contained in the software and documentation are
- # those of the authors and should not be interpreted as representing official
- # policies, either expressed or implied, of Matt Chaput.
- from __future__ import with_statement
- import threading, time
- from bisect import bisect_right
- from contextlib import contextmanager
- from whoosh import columns
- from whoosh.compat import abstractmethod, bytes_type
- from whoosh.externalsort import SortingPool
- from whoosh.fields import UnknownFieldError
- from whoosh.index import LockError
- from whoosh.system import emptybytes
- from whoosh.util import fib, random_name
- from whoosh.util.filelock import try_for
- from whoosh.util.text import utf8encode
- # Exceptions
- class IndexingError(Exception):
- pass
- # Document grouping context manager
- @contextmanager
- def groupmanager(writer):
- writer.start_group()
- yield
- writer.end_group()
- # Merge policies
- # A merge policy is a callable that takes the Index object, the SegmentWriter
- # object, and the current segment list (not including the segment being
- # written), and returns an updated segment list (not including the segment
- # being written).
- def NO_MERGE(writer, segments):
- """This policy does not merge any existing segments.
- """
- return segments
- def MERGE_SMALL(writer, segments):
- """This policy merges small segments, where "small" is defined using a
- heuristic based on the fibonacci sequence.
- """
- from whoosh.reading import SegmentReader
- unchanged_segments = []
- segments_to_merge = []
- sorted_segment_list = sorted(segments, key=lambda s: s.doc_count_all())
- total_docs = 0
- merge_point_found = False
- for i, seg in enumerate(sorted_segment_list):
- count = seg.doc_count_all()
- if count > 0:
- total_docs += count
- if merge_point_found: # append the remaining to unchanged
- unchanged_segments.append(seg)
- else: # look for a merge point
- segments_to_merge.append((seg, i)) # merge every segment up to the merge point
- if i > 3 and total_docs < fib(i + 5):
- merge_point_found = True
- if merge_point_found and len(segments_to_merge) > 1:
- for seg, i in segments_to_merge:
- reader = SegmentReader(writer.storage, writer.schema, seg)
- writer.add_reader(reader)
- reader.close()
- return unchanged_segments
- else:
- return segments
- def OPTIMIZE(writer, segments):
- """This policy merges all existing segments.
- """
- from whoosh.reading import SegmentReader
- for seg in segments:
- reader = SegmentReader(writer.storage, writer.schema, seg)
- writer.add_reader(reader)
- reader.close()
- return []
- def CLEAR(writer, segments):
- """This policy DELETES all existing segments and only writes the new
- segment.
- """
- return []
- # Customized sorting pool for postings
- class PostingPool(SortingPool):
- # Subclass whoosh.externalsort.SortingPool to use knowledge of
- # postings to set run size in bytes instead of items
- namechars = "abcdefghijklmnopqrstuvwxyz0123456789"
- def __init__(self, tempstore, segment, limitmb=128, **kwargs):
- SortingPool.__init__(self, **kwargs)
- self.tempstore = tempstore
- self.segment = segment
- self.limit = limitmb * 1024 * 1024
- self.currentsize = 0
- self.fieldnames = set()
- def _new_run(self):
- path = "%s.run" % random_name()
- f = self.tempstore.create_file(path).raw_file()
- return path, f
- def _open_run(self, path):
- return self.tempstore.open_file(path).raw_file()
- def _remove_run(self, path):
- return self.tempstore.delete_file(path)
- def add(self, item):
- # item = (fieldname, tbytes, docnum, weight, vbytes)
- assert isinstance(item[1], bytes_type), "tbytes=%r" % item[1]
- if item[4] is not None:
- assert isinstance(item[4], bytes_type), "vbytes=%r" % item[4]
- self.fieldnames.add(item[0])
- size = (28 + 4 * 5 # tuple = 28 + 4 * length
- + 21 + len(item[0]) # fieldname = str = 21 + length
- + 26 + len(item[1]) * 2 # text = unicode = 26 + 2 * length
- + 18 # docnum = long = 18
- + 16 # weight = float = 16
- + 21 + len(item[4] or '')) # valuestring
- self.currentsize += size
- if self.currentsize > self.limit:
- self.save()
- self.current.append(item)
- def iter_postings(self):
- # This is just an alias for items() to be consistent with the
- # iter_postings()/add_postings() interface of a lot of other classes
- return self.items()
- def save(self):
- SortingPool.save(self)
- self.currentsize = 0
- # Writer base class
- class IndexWriter(object):
- """High-level object for writing to an index.
- To get a writer for a particular index, call
- :meth:`~whoosh.index.Index.writer` on the Index object.
- >>> writer = myindex.writer()
- You can use this object as a context manager. If an exception is thrown
- from within the context it calls :meth:`~IndexWriter.cancel` to clean up
- temporary files, otherwise it calls :meth:`~IndexWriter.commit` when the
- context exits.
- >>> with myindex.writer() as w:
- ... w.add_document(title="First document", content="Hello there.")
- ... w.add_document(title="Second document", content="This is easy!")
- """
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_type:
- self.cancel()
- else:
- self.commit()
- def group(self):
- """Returns a context manager that calls
- :meth:`~IndexWriter.start_group` and :meth:`~IndexWriter.end_group` for
- you, allowing you to use a ``with`` statement to group hierarchical
- documents::
- with myindex.writer() as w:
- with w.group():
- w.add_document(kind="class", name="Accumulator")
- w.add_document(kind="method", name="add")
- w.add_document(kind="method", name="get_result")
- w.add_document(kind="method", name="close")
- with w.group():
- w.add_document(kind="class", name="Calculator")
- w.add_document(kind="method", name="add")
- w.add_document(kind="method", name="multiply")
- w.add_document(kind="method", name="get_result")
- w.add_document(kind="method", name="close")
- """
- return groupmanager(self)
- def start_group(self):
- """Start indexing a group of hierarchical documents. The backend should
- ensure that these documents are all added to the same segment::
- with myindex.writer() as w:
- w.start_group()
- w.add_document(kind="class", name="Accumulator")
- w.add_document(kind="method", name="add")
- w.add_document(kind="method", name="get_result")
- w.add_document(kind="method", name="close")
- w.end_group()
- w.start_group()
- w.add_document(kind="class", name="Calculator")
- w.add_document(kind="method", name="add")
- w.add_document(kind="method", name="multiply")
- w.add_document(kind="method", name="get_result")
- w.add_document(kind="method", name="close")
- w.end_group()
- A more convenient way to group documents is to use the
- :meth:`~IndexWriter.group` method and the ``with`` statement.
- """
- pass
- def end_group(self):
- """Finish indexing a group of hierarchical documents. See
- :meth:`~IndexWriter.start_group`.
- """
- pass
- def add_field(self, fieldname, fieldtype, **kwargs):
- """Adds a field to the index's schema.
- :param fieldname: the name of the field to add.
- :param fieldtype: an instantiated :class:`whoosh.fields.FieldType`
- object.
- """
- self.schema.add(fieldname, fieldtype, **kwargs)
- def remove_field(self, fieldname, **kwargs):
- """Removes the named field from the index's schema. Depending on the
- backend implementation, this may or may not actually remove existing
- data for the field from the index. Optimizing the index should always
- clear out existing data for a removed field.
- """
- self.schema.remove(fieldname, **kwargs)
- @abstractmethod
- def reader(self, **kwargs):
- """Returns a reader for the existing index.
- """
- raise NotImplementedError
- def searcher(self, **kwargs):
- from whoosh.searching import Searcher
- return Searcher(self.reader(), **kwargs)
- def delete_by_term(self, fieldname, text, searcher=None):
- """Deletes any documents containing "term" in the "fieldname" field.
- This is useful when you have an indexed field containing a unique ID
- (such as "pathname") for each document.
- :returns: the number of documents deleted.
- """
- from whoosh.query import Term
- q = Term(fieldname, text)
- return self.delete_by_query(q, searcher=searcher)
- def delete_by_query(self, q, searcher=None):
- """Deletes any documents matching a query object.
- :returns: the number of documents deleted.
- """
- if searcher:
- s = searcher
- else:
- s = self.searcher()
- try:
- count = 0
- for docnum in s.docs_for_query(q, for_deletion=True):
- self.delete_document(docnum)
- count += 1
- finally:
- if not searcher:
- s.close()
- return count
- @abstractmethod
- def delete_document(self, docnum, delete=True):
- """Deletes a document by number.
- """
- raise NotImplementedError
- @abstractmethod
- def add_document(self, **fields):
- """The keyword arguments map field names to the values to index/store::
- w = myindex.writer()
- w.add_document(path=u"/a", title=u"First doc", text=u"Hello")
- w.commit()
- Depending on the field type, some fields may take objects other than
- unicode strings. For example, NUMERIC fields take numbers, and DATETIME
- fields take ``datetime.datetime`` objects::
- from datetime import datetime, timedelta
- from whoosh import index
- from whoosh.fields import *
- schema = Schema(date=DATETIME, size=NUMERIC(float), content=TEXT)
- myindex = index.create_in("indexdir", schema)
- w = myindex.writer()
- w.add_document(date=datetime.now(), size=5.5, content=u"Hello")
- w.commit()
- Instead of a single object (i.e., unicode string, number, or datetime),
- you can supply a list or tuple of objects. For unicode strings, this
- bypasses the field's analyzer. For numbers and dates, this lets you add
- multiple values for the given field::
- date1 = datetime.now()
- date2 = datetime(2005, 12, 25)
- date3 = datetime(1999, 1, 1)
- w.add_document(date=[date1, date2, date3], size=[9.5, 10],
- content=[u"alfa", u"bravo", u"charlie"])
- For fields that are both indexed and stored, you can specify an
- alternate value to store using a keyword argument in the form
- "_stored_<fieldname>". For example, if you have a field named "title"
- and you want to index the text "a b c" but store the text "e f g", use
- keyword arguments like this::
- writer.add_document(title=u"a b c", _stored_title=u"e f g")
- You can boost the weight of all terms in a certain field by specifying
- a ``_<fieldname>_boost`` keyword argument. For example, if you have a
- field named "content", you can double the weight of this document for
- searches in the "content" field like this::
- writer.add_document(content="a b c", _title_boost=2.0)
- You can boost every field at once using the ``_boost`` keyword. For
- example, to boost fields "a" and "b" by 2.0, and field "c" by 3.0::
- writer.add_document(a="alfa", b="bravo", c="charlie",
- _boost=2.0, _c_boost=3.0)
- Note that some scoring algroithms, including Whoosh's default BM25F,
- do not work with term weights less than 1, so you should generally not
- use a boost factor less than 1.
- See also :meth:`Writer.update_document`.
- """
- raise NotImplementedError
- @abstractmethod
- def add_reader(self, reader):
- raise NotImplementedError
- def _doc_boost(self, fields, default=1.0):
- if "_boost" in fields:
- return float(fields["_boost"])
- else:
- return default
- def _field_boost(self, fields, fieldname, default=1.0):
- boostkw = "_%s_boost" % fieldname
- if boostkw in fields:
- return float(fields[boostkw])
- else:
- return default
- def _unique_fields(self, fields):
- # Check which of the supplied fields are unique
- unique_fields = [name for name, field in self.schema.items()
- if name in fields and field.unique]
- return unique_fields
- def update_document(self, **fields):
- """The keyword arguments map field names to the values to index/store.
- This method adds a new document to the index, and automatically deletes
- any documents with the same values in any fields marked "unique" in the
- schema::
- schema = fields.Schema(path=fields.ID(unique=True, stored=True),
- content=fields.TEXT)
- myindex = index.create_in("index", schema)
- w = myindex.writer()
- w.add_document(path=u"/", content=u"Mary had a lamb")
- w.commit()
- w = myindex.writer()
- w.update_document(path=u"/", content=u"Mary had a little lamb")
- w.commit()
- assert myindex.doc_count() == 1
- It is safe to use ``update_document`` in place of ``add_document``; if
- there is no existing document to replace, it simply does an add.
- You cannot currently pass a list or tuple of values to a "unique"
- field.
- Because this method has to search for documents with the same unique
- fields and delete them before adding the new document, it is slower
- than using ``add_document``.
- * Marking more fields "unique" in the schema will make each
- ``update_document`` call slightly slower.
- * When you are updating multiple documents, it is faster to batch
- delete all changed documents and then use ``add_document`` to add
- the replacements instead of using ``update_document``.
- Note that this method will only replace a *committed* document;
- currently it cannot replace documents you've added to the IndexWriter
- but haven't yet committed. For example, if you do this:
- >>> writer.update_document(unique_id=u"1", content=u"Replace me")
- >>> writer.update_document(unique_id=u"1", content=u"Replacement")
- ...this will add two documents with the same value of ``unique_id``,
- instead of the second document replacing the first.
- See :meth:`Writer.add_document` for information on
- ``_stored_<fieldname>``, ``_<fieldname>_boost``, and ``_boost`` keyword
- arguments.
- """
- # Delete the set of documents matching the unique terms
- unique_fields = self._unique_fields(fields)
- if unique_fields:
- with self.searcher() as s:
- uniqueterms = [(name, fields[name]) for name in unique_fields]
- docs = s._find_unique(uniqueterms)
- for docnum in docs:
- self.delete_document(docnum)
- # Add the given fields
- self.add_document(**fields)
- def commit(self):
- """Finishes writing and unlocks the index.
- """
- pass
- def cancel(self):
- """Cancels any documents/deletions added by this object
- and unlocks the index.
- """
- pass
- # Codec-based writer
- class SegmentWriter(IndexWriter):
- def __init__(self, ix, poolclass=None, timeout=0.0, delay=0.1, _lk=True,
- limitmb=128, docbase=0, codec=None, compound=True, **kwargs):
- # Lock the index
- self.writelock = None
- if _lk:
- self.writelock = ix.lock("WRITELOCK")
- if not try_for(self.writelock.acquire, timeout=timeout,
- delay=delay):
- raise LockError
- if codec is None:
- from whoosh.codec import default_codec
- codec = default_codec()
- self.codec = codec
- # Get info from the index
- self.storage = ix.storage
- self.indexname = ix.indexname
- info = ix._read_toc()
- self.generation = info.generation + 1
- self.schema = info.schema
- self.segments = info.segments
- self.docnum = self.docbase = docbase
- self._setup_doc_offsets()
- # Internals
- self._tempstorage = self.storage.temp_storage("%s.tmp" % self.indexname)
- newsegment = codec.new_segment(self.storage, self.indexname)
- self.newsegment = newsegment
- self.compound = compound and newsegment.should_assemble()
- self.is_closed = False
- self._added = False
- self.pool = PostingPool(self._tempstorage, self.newsegment,
- limitmb=limitmb)
- # Set up writers
- self.perdocwriter = codec.per_document_writer(self.storage, newsegment)
- self.fieldwriter = codec.field_writer(self.storage, newsegment)
- self.merge = True
- self.optimize = False
- self.mergetype = None
- def __repr__(self):
- return "<%s %r>" % (self.__class__.__name__, self.newsegment)
- def _check_state(self):
- if self.is_closed:
- raise IndexingError("This writer is closed")
- def _setup_doc_offsets(self):
- self._doc_offsets = []
- base = 0
- for s in self.segments:
- self._doc_offsets.append(base)
- base += s.doc_count_all()
- def _document_segment(self, docnum):
- #Returns the index.Segment object containing the given document
- #number.
- offsets = self._doc_offsets
- if len(offsets) == 1:
- return 0
- return bisect_right(offsets, docnum) - 1
- def _segment_and_docnum(self, docnum):
- #Returns an (index.Segment, segment_docnum) pair for the segment
- #containing the given document number.
- segmentnum = self._document_segment(docnum)
- offset = self._doc_offsets[segmentnum]
- segment = self.segments[segmentnum]
- return segment, docnum - offset
- def _process_posts(self, items, startdoc, docmap):
- schema = self.schema
- for fieldname, text, docnum, weight, vbytes in items:
- if fieldname not in schema:
- continue
- if docmap is not None:
- newdoc = docmap[docnum]
- else:
- newdoc = startdoc + docnum
- yield (fieldname, text, newdoc, weight, vbytes)
- def temp_storage(self):
- return self._tempstorage
- def add_field(self, fieldname, fieldspec, **kwargs):
- self._check_state()
- if self._added:
- raise Exception("Can't modify schema after adding data to writer")
- super(SegmentWriter, self).add_field(fieldname, fieldspec, **kwargs)
- def remove_field(self, fieldname):
- self._check_state()
- if self._added:
- raise Exception("Can't modify schema after adding data to writer")
- super(SegmentWriter, self).remove_field(fieldname)
- def has_deletions(self):
- """
- Returns True if the current index has documents that are marked deleted
- but haven't been optimized out of the index yet.
- """
- return any(s.has_deletions() for s in self.segments)
- def delete_document(self, docnum, delete=True):
- self._check_state()
- if docnum >= sum(seg.doc_count_all() for seg in self.segments):
- raise IndexingError("No document ID %r in this index" % docnum)
- segment, segdocnum = self._segment_and_docnum(docnum)
- segment.delete_document(segdocnum, delete=delete)
- def deleted_count(self):
- """
- :returns: the total number of deleted documents in the index.
- """
- return sum(s.deleted_count() for s in self.segments)
- def is_deleted(self, docnum):
- segment, segdocnum = self._segment_and_docnum(docnum)
- return segment.is_deleted(segdocnum)
- def reader(self, reuse=None):
- from whoosh.index import FileIndex
- self._check_state()
- return FileIndex._reader(self.storage, self.schema, self.segments,
- self.generation, reuse=reuse)
- def iter_postings(self):
- return self.pool.iter_postings()
- def add_postings_to_pool(self, reader, startdoc, docmap):
- items = self._process_posts(reader.iter_postings(), startdoc, docmap)
- add_post = self.pool.add
- for item in items:
- add_post(item)
- def write_postings(self, lengths, items, startdoc, docmap):
- items = self._process_posts(items, startdoc, docmap)
- self.fieldwriter.add_postings(self.schema, lengths, items)
- def write_per_doc(self, fieldnames, reader):
- # Very bad hack: reader should be an IndexReader, but may be a
- # PerDocumentReader if this is called from multiproc, where the code
- # tries to be efficient by merging per-doc and terms separately.
- # TODO: fix this!
- schema = self.schema
- if reader.has_deletions():
- docmap = {}
- else:
- docmap = None
- pdw = self.perdocwriter
- # Open all column readers
- cols = {}
- for fieldname in fieldnames:
- fieldobj = schema[fieldname]
- coltype = fieldobj.column_type
- if coltype and reader.has_column(fieldname):
- creader = reader.column_reader(fieldname, coltype)
- if isinstance(creader, columns.TranslatingColumnReader):
- creader = creader.raw_column()
- cols[fieldname] = creader
- for docnum, stored in reader.iter_docs():
- if docmap is not None:
- docmap[docnum] = self.docnum
- pdw.start_doc(self.docnum)
- for fieldname in fieldnames:
- fieldobj = schema[fieldname]
- length = reader.doc_field_length(docnum, fieldname)
- pdw.add_field(fieldname, fieldobj,
- stored.get(fieldname), length)
- if fieldobj.vector and reader.has_vector(docnum, fieldname):
- v = reader.vector(docnum, fieldname, fieldobj.vector)
- pdw.add_vector_matcher(fieldname, fieldobj, v)
- if fieldname in cols:
- cv = cols[fieldname][docnum]
- pdw.add_column_value(fieldname, fieldobj.column_type, cv)
- pdw.finish_doc()
- self.docnum += 1
- return docmap
- def add_reader(self, reader):
- self._check_state()
- basedoc = self.docnum
- ndxnames = set(fname for fname in reader.indexed_field_names()
- if fname in self.schema)
- fieldnames = set(self.schema.names()) | ndxnames
- docmap = self.write_per_doc(fieldnames, reader)
- self.add_postings_to_pool(reader, basedoc, docmap)
- self._added = True
- def _check_fields(self, schema, fieldnames):
- # Check if the caller gave us a bogus field
- for name in fieldnames:
- if name not in schema:
- raise UnknownFieldError("No field named %r in %s"
- % (name, schema))
- def add_document(self, **fields):
- self._check_state()
- perdocwriter = self.perdocwriter
- schema = self.schema
- docnum = self.docnum
- add_post = self.pool.add
- docboost = self._doc_boost(fields)
- fieldnames = sorted([name for name in fields.keys()
- if not name.startswith("_")])
- self._check_fields(schema, fieldnames)
- perdocwriter.start_doc(docnum)
- for fieldname in fieldnames:
- value = fields.get(fieldname)
- if value is None:
- continue
- field = schema[fieldname]
- length = 0
- if field.indexed:
- # TODO: Method for adding progressive field values, ie
- # setting start_pos/start_char?
- fieldboost = self._field_boost(fields, fieldname, docboost)
- # Ask the field to return a list of (text, weight, vbytes)
- # tuples
- items = field.index(value)
- # Only store the length if the field is marked scorable
- scorable = field.scorable
- # Add the terms to the pool
- for tbytes, freq, weight, vbytes in items:
- weight *= fieldboost
- if scorable:
- length += freq
- add_post((fieldname, tbytes, docnum, weight, vbytes))
- if field.separate_spelling():
- spellfield = field.spelling_fieldname(fieldname)
- for word in field.spellable_words(value):
- word = utf8encode(word)[0]
- # item = (fieldname, tbytes, docnum, weight, vbytes)
- add_post((spellfield, word, 0, 1, vbytes))
- vformat = field.vector
- if vformat:
- analyzer = field.analyzer
- # Call the format's word_values method to get posting values
- vitems = vformat.word_values(value, analyzer, mode="index")
- # Remove unused frequency field from the tuple
- vitems = sorted((text, weight, vbytes)
- for text, _, weight, vbytes in vitems)
- perdocwriter.add_vector_items(fieldname, field, vitems)
- # Allow a custom value for stored field/column
- customval = fields.get("_stored_%s" % fieldname, value)
- # Add the stored value and length for this field to the per-
- # document writer
- sv = customval if field.stored else None
- perdocwriter.add_field(fieldname, field, sv, length)
- column = field.column_type
- if column and customval is not None:
- cv = field.to_column_value(customval)
- perdocwriter.add_column_value(fieldname, column, cv)
- perdocwriter.finish_doc()
- self._added = True
- self.docnum += 1
- def doc_count(self):
- return self.docnum - self.docbase
- def get_segment(self):
- newsegment = self.newsegment
- newsegment.set_doc_count(self.docnum)
- return newsegment
- def per_document_reader(self):
- if not self.perdocwriter.is_closed:
- raise Exception("Per-doc writer is still open")
- return self.codec.per_document_reader(self.storage, self.get_segment())
- # The following methods break out the commit functionality into smaller
- # pieces to allow MpWriter to call them individually
- def _merge_segments(self, mergetype, optimize, merge):
- # The writer supports two ways of setting mergetype/optimize/merge:
- # as attributes or as keyword arguments to commit(). Originally there
- # were just the keyword arguments, but then I added the ability to use
- # the writer as a context manager using "with", so the user no longer
- # explicitly called commit(), hence the attributes
- mergetype = mergetype if mergetype is not None else self.mergetype
- optimize = optimize if optimize is not None else self.optimize
- merge = merge if merge is not None else self.merge
- if mergetype:
- pass
- elif optimize:
- mergetype = OPTIMIZE
- elif not merge:
- mergetype = NO_MERGE
- else:
- mergetype = MERGE_SMALL
- # Call the merge policy function. The policy may choose to merge
- # other segments into this writer's pool
- return mergetype(self, self.segments)
- def _flush_segment(self):
- self.perdocwriter.close()
- if self.codec.length_stats:
- pdr = self.per_document_reader()
- else:
- pdr = None
- postings = self.pool.iter_postings()
- self.fieldwriter.add_postings(self.schema, pdr, postings)
- self.fieldwriter.close()
- if pdr:
- pdr.close()
- def _close_segment(self):
- if not self.perdocwriter.is_closed:
- self.perdocwriter.close()
- if not self.fieldwriter.is_closed:
- self.fieldwriter.close()
- self.pool.cleanup()
- def _assemble_segment(self):
- if self.compound:
- # Assemble the segment files into a compound file
- newsegment = self.get_segment()
- newsegment.create_compound_file(self.storage)
- newsegment.compound = True
- def _partial_segment(self):
- # For use by a parent multiprocessing writer: Closes out the segment
- # but leaves the pool files intact so the parent can access them
- self._check_state()
- self.perdocwriter.close()
- self.fieldwriter.close()
- # Don't call self.pool.cleanup()! We want to grab the pool files.
- return self.get_segment()
- def _finalize_segment(self):
- # Finish writing segment
- self._flush_segment()
- # Close segment files
- self._close_segment()
- # Assemble compound segment if necessary
- self._assemble_segment()
- return self.get_segment()
- def _commit_toc(self, segments):
- from whoosh.index import TOC, clean_files
- # Write a new TOC with the new segment list (and delete old files)
- toc = TOC(self.schema, segments, self.generation)
- toc.write(self.storage, self.indexname)
- # Delete leftover files
- clean_files(self.storage, self.indexname, self.generation, segments)
- def _finish(self):
- self._tempstorage.destroy()
- if self.writelock:
- self.writelock.release()
- self.is_closed = True
- #self.storage.close()
- # Finalization methods
- def commit(self, mergetype=None, optimize=None, merge=None):
- """Finishes writing and saves all additions and changes to disk.
- There are four possible ways to use this method::
- # Merge small segments but leave large segments, trying to
- # balance fast commits with fast searching:
- writer.commit()
- # Merge all segments into a single segment:
- writer.commit(optimize=True)
- # Don't merge any existing segments:
- writer.commit(merge=False)
- # Use a custom merge function
- writer.commit(mergetype=my_merge_function)
- :param mergetype: a custom merge function taking a Writer object and
- segment list as arguments, and returning a new segment list. If you
- supply a ``mergetype`` function, the values of the ``optimize`` and
- ``merge`` arguments are ignored.
- :param optimize: if True, all existing segments are merged with the
- documents you've added to this writer (and the value of the
- ``merge`` argument is ignored).
- :param merge: if False, do not merge small segments.
- """
- self._check_state()
- # Merge old segments if necessary
- finalsegments = self._merge_segments(mergetype, optimize, merge)
- if self._added:
- # Flush the current segment being written and add it to the
- # list of remaining segments returned by the merge policy
- # function
- finalsegments.append(self._finalize_segment())
- else:
- # Close segment files
- self._close_segment()
- # Write TOC
- self._commit_toc(finalsegments)
- # Final cleanup
- self._finish()
- def cancel(self):
- self._check_state()
- self._close_segment()
- self._finish()
- # Writer wrappers
- class AsyncWriter(threading.Thread, IndexWriter):
- """Convenience wrapper for a writer object that might fail due to locking
- (i.e. the ``filedb`` writer). This object will attempt once to obtain the
- underlying writer, and if it's successful, will simply pass method calls on
- to it.
- If this object *can't* obtain a writer immediately, it will *buffer*
- delete, add, and update method calls in memory until you call ``commit()``.
- At that point, this object will start running in a separate thread, trying
- to obtain the writer over and over, and once it obtains it, "replay" all
- the buffered method calls on it.
- In a typical scenario where you're adding a single or a few documents to
- the index as the result of a Web transaction, this lets you just create the
- writer, add, and commit, without having to worry about index locks,
- retries, etc.
- For example, to get an aynchronous writer, instead of this:
- >>> writer = myindex.writer()
- Do this:
- >>> from whoosh.writing import AsyncWriter
- >>> writer = AsyncWriter(myindex)
- """
- def __init__(self, index, delay=0.25, writerargs=None):
- """
- :param index: the :class:`whoosh.index.Index` to write to.
- :param delay: the delay (in seconds) between attempts to instantiate
- the actual writer.
- :param writerargs: an optional dictionary specifying keyword arguments
- to to be passed to the index's ``writer()`` method.
- """
- threading.Thread.__init__(self)
- self.running = False
- self.index = index
- self.writerargs = writerargs or {}
- self.delay = delay
- self.events = []
- try:
- self.writer = self.index.writer(**self.writerargs)
- except LockError:
- self.writer = None
- def reader(self):
- return self.index.reader()
- def searcher(self, **kwargs):
- from whoosh.searching import Searcher
- return Searcher(self.reader(), fromindex=self.index, **kwargs)
- def _record(self, method, args, kwargs):
- if self.writer:
- getattr(self.writer, method)(*args, **kwargs)
- else:
- self.events.append((method, args, kwargs))
- def run(self):
- self.running = True
- writer = self.writer
- while writer is None:
- try:
- writer = self.index.writer(**self.writerargs)
- except LockError:
- time.sleep(self.delay)
- for method, args, kwargs in self.events:
- getattr(writer, method)(*args, **kwargs)
- writer.commit(*self.commitargs, **self.commitkwargs)
- def delete_document(self, *args, **kwargs):
- self._record("delete_document", args, kwargs)
- def add_document(self, *args, **kwargs):
- self._record("add_document", args, kwargs)
- def update_document(self, *args, **kwargs):
- self._record("update_document", args, kwargs)
- def add_field(self, *args, **kwargs):
- self._record("add_field", args, kwargs)
- def remove_field(self, *args, **kwargs):
- self._record("remove_field", args, kwargs)
- def delete_by_term(self, *args, **kwargs):
- self._record("delete_by_term", args, kwargs)
- def commit(self, *args, **kwargs):
- if self.writer:
- self.writer.commit(*args, **kwargs)
- else:
- self.commitargs, self.commitkwargs = args, kwargs
- self.start()
- def cancel(self, *args, **kwargs):
- if self.writer:
- self.writer.cancel(*args, **kwargs)
- # Ex post factor functions
- def add_spelling(ix, fieldnames, commit=True):
- """Adds spelling files to an existing index that was created without
- them, and modifies the schema so the given fields have the ``spelling``
- attribute. Only works on filedb indexes.
- >>> ix = index.open_dir("testindex")
- >>> add_spelling(ix, ["content", "tags"])
- :param ix: a :class:`whoosh.filedb.fileindex.FileIndex` object.
- :param fieldnames: a list of field names to create word graphs for.
- :param force: if True, overwrites existing word graph files. This is only
- useful for debugging.
- """
- from whoosh.automata import fst
- from whoosh.reading import SegmentReader
- writer = ix.writer()
- storage = writer.storage
- schema = writer.schema
- segments = writer.segments
- for segment in segments:
- ext = segment.codec().FST_EXT
- r = SegmentReader(storage, schema, segment)
- f = segment.create_file(storage, ext)
- gw = fst.GraphWriter(f)
- for fieldname in fieldnames:
- gw.start_field(fieldname)
- for word in r.lexicon(fieldname):
- gw.insert(word)
- gw.finish_field()
- gw.close()
- for fieldname in fieldnames:
- schema[fieldname].spelling = True
- if commit:
- writer.commit(merge=False)
- # Buffered writer class
- class BufferedWriter(IndexWriter):
- """Convenience class that acts like a writer but buffers added documents
- before dumping the buffered documents as a batch into the actual index.
- In scenarios where you are continuously adding single documents very
- rapidly (for example a web application where lots of users are adding
- content simultaneously), using a BufferedWriter is *much* faster than
- opening and committing a writer for each document you add. If you're adding
- batches of documents at a time, you can just use a regular writer.
- (This class may also be useful for batches of ``update_document`` calls. In
- a normal writer, ``update_document`` calls cannot update documents you've
- added *in that writer*. With ``BufferedWriter``, this will work.)
- To use this class, create it from your index and *keep it open*, sharing
- it between threads.
- >>> from whoosh.writing import BufferedWriter
- >>> writer = BufferedWriter(myindex, period=120, limit=20)
- >>> # Then you can use the writer to add and update documents
- >>> writer.add_document(...)
- >>> writer.add_document(...)
- >>> writer.add_document(...)
- >>> # Before the writer goes out of scope, call close() on it
- >>> writer.close()
- .. note::
- This object stores documents in memory and may keep an underlying
- writer open, so you must explicitly call the
- :meth:`~BufferedWriter.close` method on this object before it goes out
- of scope to release the write lock and make sure any uncommitted
- changes are saved.
- You can read/search the combination of the on-disk index and the
- buffered documents in memory by calling ``BufferedWriter.reader()`` or
- ``BufferedWriter.searcher()``. This allows quasi-real-time search, where
- documents are available for searching as soon as they are buffered in
- memory, before they are committed to disk.
- .. tip::
- By using a searcher from the shared writer, multiple *threads* can
- search the buffered documents. Of course, other *processes* will only
- see the documents that have been written to disk. If you want indexed
- documents to become available to other processes as soon as possible,
- you have to use a traditional writer instead of a ``BufferedWriter``.
- You can control how often the ``BufferedWriter`` flushes the in-memory
- index to disk using the ``period`` and ``limit`` arguments. ``period`` is
- the maximum number of seconds between commits. ``limit`` is the maximum
- number of additions to buffer between commits.
- You don't need to call ``commit()`` on the ``BufferedWriter`` manually.
- Doing so will just flush the buffered documents to disk early. You can
- continue to make changes after calling ``commit()``, and you can call
- ``commit()`` multiple times.
- """
- def __init__(self, index, period=60, limit=10, writerargs=None,
- commitargs=None):
- """
- :param index: the :class:`whoosh.index.Index` to write to.
- :param period: the maximum amount of time (in seconds) between commits.
- Set this to ``0`` or ``None`` to not use a timer. Do not set this
- any lower than a few seconds.
- :param limit: the maximum number of documents to buffer before
- committing.
- :param writerargs: dictionary specifying keyword arguments to be passed
- to the index's ``writer()`` method when creating a writer.
- """
- self.index = index
- self.period = period
- self.limit = limit
- self.writerargs = writerargs or {}
- self.commitargs = commitargs or {}
- self.lock = threading.RLock()
- self.writer = self.index.writer(**self.writerargs)
- self._make_ram_index()
- self.bufferedcount = 0
- # Start timer
- if self.period:
- self.timer = threading.Timer(self.period, self.commit)
- self.timer.start()
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- def _make_ram_index(self):
- from whoosh.codec.memory import MemoryCodec
- self.codec = MemoryCodec()
- def _get_ram_reader(self):
- return self.codec.reader(self.schema)
- @property
- def schema(self):
- return self.writer.schema
- def reader(self, **kwargs):
- from whoosh.reading import MultiReader
- reader = self.writer.reader()
- with self.lock:
- ramreader = self._get_ram_reader()
- # If there are in-memory docs, combine the readers
- if ramreader.doc_count():
- if reader.is_atomic():
- reader = MultiReader([reader, ramreader])
- else:
- reader.add_reader(ramreader)
- return reader
- def searcher(self, **kwargs):
- from whoosh.searching import Searcher
- return Searcher(self.reader(), fromindex=self.index, **kwargs)
- def close(self):
- self.commit(restart=False)
- def commit(self, restart=True):
- if self.period:
- self.timer.cancel()
- with self.lock:
- ramreader = self._get_ram_reader()
- self._make_ram_index()
- if self.bufferedcount:
- self.writer.add_reader(ramreader)
- self.writer.commit(**self.commitargs)
- self.bufferedcount = 0
- if restart:
- self.writer = self.index.writer(**self.writerargs)
- if self.period:
- self.timer = threading.Timer(self.period, self.commit)
- self.timer.start()
- def add_reader(self, reader):
- # Pass through to the underlying on-disk index
- self.writer.add_reader(reader)
- self.commit()
- def add_document(self, **fields):
- with self.lock:
- # Hijack a writer to make the calls into the codec
- with self.codec.writer(self.writer.schema) as w:
- w.add_document(**fields)
- self.bufferedcount += 1
- if self.bufferedcount >= self.limit:
- self.commit()
- def update_document(self, **fields):
- with self.lock:
- IndexWriter.update_document(self, **fields)
- def delete_document(self, docnum, delete=True):
- with self.lock:
- base = self.index.doc_count_all()
- if docnum < base:
- self.writer.delete_document(docnum, delete=delete)
- else:
- ramsegment = self.codec.segment
- ramsegment.delete_document(docnum - base, delete=delete)
- def is_deleted(self, docnum):
- base = self.index.doc_count_all()
- if docnum < base:
- return self.writer.is_deleted(docnum)
- else:
- return self._get_ram_reader().is_deleted(docnum - base)
- # Backwards compatibility with old name
- BatchWriter = BufferedWriter
|