123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- # Copyright 2011 Matt Chaput. All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are met:
- #
- # 1. Redistributions of source code must retain the above copyright notice,
- # this list of conditions and the following disclaimer.
- #
- # 2. Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- #
- # THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
- # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
- # EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
- # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
- # OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
- # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- #
- # The views and conclusions contained in the software and documentation are
- # those of the authors and should not be interpreted as representing official
- # policies, either expressed or implied, of Matt Chaput.
- import errno
- import os
- import sys
- from threading import Lock
- from shutil import copyfileobj
- try:
- import mmap
- except ImportError:
- mmap = None
- from whoosh.compat import BytesIO, memoryview_
- from whoosh.filedb.structfile import BufferFile, StructFile
- from whoosh.filedb.filestore import FileStorage, StorageError
- from whoosh.system import emptybytes
- from whoosh.util import random_name
- class CompoundStorage(FileStorage):
- readonly = True
- def __init__(self, dbfile, use_mmap=True, basepos=0):
- self._file = dbfile
- self.is_closed = False
- # Seek to the end to get total file size (to check if mmap is OK)
- dbfile.seek(0, os.SEEK_END)
- filesize = self._file.tell()
- dbfile.seek(basepos)
- self._diroffset = self._file.read_long()
- self._dirlength = self._file.read_int()
- self._file.seek(self._diroffset)
- self._dir = self._file.read_pickle()
- self._options = self._file.read_pickle()
- self._locks = {}
- self._source = None
- use_mmap = (
- use_mmap
- and hasattr(self._file, "fileno") # check file is a real file
- and filesize < sys.maxsize # check fit on 32-bit Python
- )
- if mmap and use_mmap:
- # Try to open the entire segment as a memory-mapped object
- try:
- fileno = self._file.fileno()
- self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ)
- except (mmap.error, OSError):
- e = sys.exc_info()[1]
- # If we got an error because there wasn't enough memory to
- # open the map, ignore it and fall through, we'll just use the
- # (slower) "sub-file" implementation
- if e.errno == errno.ENOMEM:
- pass
- else:
- raise
- else:
- # If that worked, we can close the file handle we were given
- self._file.close()
- self._file = None
- def __repr__(self):
- return "<%s (%s)>" % (self.__class__.__name__, self._name)
- def close(self):
- if self.is_closed:
- raise Exception("Already closed")
- self.is_closed = True
- if self._source:
- try:
- self._source.close()
- except BufferError:
- del self._source
- if self._file:
- self._file.close()
- def range(self, name):
- try:
- fileinfo = self._dir[name]
- except KeyError:
- raise NameError("Unknown file %r" % (name,))
- return fileinfo["offset"], fileinfo["length"]
- def open_file(self, name, *args, **kwargs):
- if self.is_closed:
- raise StorageError("Storage was closed")
- offset, length = self.range(name)
- if self._source:
- # Create a memoryview/buffer from the mmap
- buf = memoryview_(self._source, offset, length)
- f = BufferFile(buf, name=name)
- elif hasattr(self._file, "subset"):
- f = self._file.subset(offset, length, name=name)
- else:
- f = StructFile(SubFile(self._file, offset, length), name=name)
- return f
- def list(self):
- return list(self._dir.keys())
- def file_exists(self, name):
- return name in self._dir
- def file_length(self, name):
- info = self._dir[name]
- return info["length"]
- def file_modified(self, name):
- info = self._dir[name]
- return info["modified"]
- def lock(self, name):
- if name not in self._locks:
- self._locks[name] = Lock()
- return self._locks[name]
- @staticmethod
- def assemble(dbfile, store, names, **options):
- assert names, names
- directory = {}
- basepos = dbfile.tell()
- dbfile.write_long(0) # Directory position
- dbfile.write_int(0) # Directory length
- # Copy the files into the compound file
- for name in names:
- if name.endswith(".toc") or name.endswith(".seg"):
- raise Exception(name)
- for name in names:
- offset = dbfile.tell()
- length = store.file_length(name)
- modified = store.file_modified(name)
- directory[name] = {"offset": offset, "length": length,
- "modified": modified}
- f = store.open_file(name)
- copyfileobj(f, dbfile)
- f.close()
- CompoundStorage.write_dir(dbfile, basepos, directory, options)
- @staticmethod
- def write_dir(dbfile, basepos, directory, options=None):
- options = options or {}
- dirpos = dbfile.tell() # Remember the start of the directory
- dbfile.write_pickle(directory) # Write the directory
- dbfile.write_pickle(options)
- endpos = dbfile.tell() # Remember the end of the directory
- dbfile.flush()
- dbfile.seek(basepos) # Seek back to the start
- dbfile.write_long(dirpos) # Directory position
- dbfile.write_int(endpos - dirpos) # Directory length
- dbfile.close()
- class SubFile(object):
- def __init__(self, parentfile, offset, length, name=None):
- self._file = parentfile
- self._offset = offset
- self._length = length
- self._end = offset + length
- self._pos = 0
- self.name = name
- self.closed = False
- def close(self):
- self.closed = True
- def subset(self, position, length, name=None):
- start = self._offset + position
- end = start + length
- name = name or self.name
- assert self._offset >= start >= self._end
- assert self._offset >= end >= self._end
- return SubFile(self._file, self._offset + position, length, name=name)
- def read(self, size=None):
- if size is None:
- size = self._length - self._pos
- else:
- size = min(size, self._length - self._pos)
- if size < 0:
- size = 0
- if size > 0:
- self._file.seek(self._offset + self._pos)
- self._pos += size
- return self._file.read(size)
- else:
- return emptybytes
- def readline(self):
- maxsize = self._length - self._pos
- self._file.seek(self._offset + self._pos)
- data = self._file.readline()
- if len(data) > maxsize:
- data = data[:maxsize]
- self._pos += len(data)
- return data
- def seek(self, where, whence=0):
- if whence == 0: # Absolute
- pos = where
- elif whence == 1: # Relative
- pos = self._pos + where
- elif whence == 2: # From end
- pos = self._length - where
- else:
- raise ValueError
- self._pos = pos
- def tell(self):
- return self._pos
- class CompoundWriter(object):
- def __init__(self, tempstorage, buffersize=32 * 1024):
- assert isinstance(buffersize, int)
- self._tempstorage = tempstorage
- self._tempname = "%s.ctmp" % random_name()
- self._temp = tempstorage.create_file(self._tempname, mode="w+b")
- self._buffersize = buffersize
- self._streams = {}
- def create_file(self, name):
- ss = self.SubStream(self._temp, self._buffersize)
- self._streams[name] = ss
- return StructFile(ss)
- def _readback(self):
- temp = self._temp
- for name, substream in self._streams.items():
- substream.close()
- def gen():
- for f, offset, length in substream.blocks:
- if f is None:
- f = temp
- f.seek(offset)
- yield f.read(length)
- yield (name, gen)
- temp.close()
- self._tempstorage.delete_file(self._tempname)
- def save_as_compound(self, dbfile):
- basepos = dbfile.tell()
- dbfile.write_long(0) # Directory offset
- dbfile.write_int(0) # Directory length
- directory = {}
- for name, blocks in self._readback():
- filestart = dbfile.tell()
- for block in blocks():
- dbfile.write(block)
- directory[name] = {"offset": filestart,
- "length": dbfile.tell() - filestart}
- CompoundStorage.write_dir(dbfile, basepos, directory)
- def save_as_files(self, storage, name_fn):
- for name, blocks in self._readback():
- f = storage.create_file(name_fn(name))
- for block in blocks():
- f.write(block)
- f.close()
- class SubStream(object):
- def __init__(self, dbfile, buffersize):
- self._dbfile = dbfile
- self._buffersize = buffersize
- self._buffer = BytesIO()
- self.blocks = []
- def tell(self):
- return sum(b[2] for b in self.blocks) + self._buffer.tell()
- def write(self, inbytes):
- bio = self._buffer
- buflen = bio.tell()
- length = buflen + len(inbytes)
- if length >= self._buffersize:
- offset = self._dbfile.tell()
- self._dbfile.write(bio.getvalue()[:buflen])
- self._dbfile.write(inbytes)
- self.blocks.append((None, offset, length))
- self._buffer.seek(0)
- else:
- bio.write(inbytes)
- def close(self):
- bio = self._buffer
- length = bio.tell()
- if length:
- self.blocks.append((bio, 0, length))
|