compound.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. # Copyright 2011 Matt Chaput. All rights reserved.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions are met:
  5. #
  6. # 1. Redistributions of source code must retain the above copyright notice,
  7. # this list of conditions and the following disclaimer.
  8. #
  9. # 2. Redistributions in binary form must reproduce the above copyright
  10. # notice, this list of conditions and the following disclaimer in the
  11. # documentation and/or other materials provided with the distribution.
  12. #
  13. # THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
  14. # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  15. # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
  16. # EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  17. # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  18. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
  19. # OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  20. # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  21. # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
  22. # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. #
  24. # The views and conclusions contained in the software and documentation are
  25. # those of the authors and should not be interpreted as representing official
  26. # policies, either expressed or implied, of Matt Chaput.
  27. import errno
  28. import os
  29. import sys
  30. from threading import Lock
  31. from shutil import copyfileobj
  32. try:
  33. import mmap
  34. except ImportError:
  35. mmap = None
  36. from whoosh.compat import BytesIO, memoryview_
  37. from whoosh.filedb.structfile import BufferFile, StructFile
  38. from whoosh.filedb.filestore import FileStorage, StorageError
  39. from whoosh.system import emptybytes
  40. from whoosh.util import random_name
  41. class CompoundStorage(FileStorage):
  42. readonly = True
  43. def __init__(self, dbfile, use_mmap=True, basepos=0):
  44. self._file = dbfile
  45. self.is_closed = False
  46. # Seek to the end to get total file size (to check if mmap is OK)
  47. dbfile.seek(0, os.SEEK_END)
  48. filesize = self._file.tell()
  49. dbfile.seek(basepos)
  50. self._diroffset = self._file.read_long()
  51. self._dirlength = self._file.read_int()
  52. self._file.seek(self._diroffset)
  53. self._dir = self._file.read_pickle()
  54. self._options = self._file.read_pickle()
  55. self._locks = {}
  56. self._source = None
  57. use_mmap = (
  58. use_mmap
  59. and hasattr(self._file, "fileno") # check file is a real file
  60. and filesize < sys.maxsize # check fit on 32-bit Python
  61. )
  62. if mmap and use_mmap:
  63. # Try to open the entire segment as a memory-mapped object
  64. try:
  65. fileno = self._file.fileno()
  66. self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ)
  67. except (mmap.error, OSError):
  68. e = sys.exc_info()[1]
  69. # If we got an error because there wasn't enough memory to
  70. # open the map, ignore it and fall through, we'll just use the
  71. # (slower) "sub-file" implementation
  72. if e.errno == errno.ENOMEM:
  73. pass
  74. else:
  75. raise
  76. else:
  77. # If that worked, we can close the file handle we were given
  78. self._file.close()
  79. self._file = None
  80. def __repr__(self):
  81. return "<%s (%s)>" % (self.__class__.__name__, self._name)
  82. def close(self):
  83. if self.is_closed:
  84. raise Exception("Already closed")
  85. self.is_closed = True
  86. if self._source:
  87. try:
  88. self._source.close()
  89. except BufferError:
  90. del self._source
  91. if self._file:
  92. self._file.close()
  93. def range(self, name):
  94. try:
  95. fileinfo = self._dir[name]
  96. except KeyError:
  97. raise NameError("Unknown file %r" % (name,))
  98. return fileinfo["offset"], fileinfo["length"]
  99. def open_file(self, name, *args, **kwargs):
  100. if self.is_closed:
  101. raise StorageError("Storage was closed")
  102. offset, length = self.range(name)
  103. if self._source:
  104. # Create a memoryview/buffer from the mmap
  105. buf = memoryview_(self._source, offset, length)
  106. f = BufferFile(buf, name=name)
  107. elif hasattr(self._file, "subset"):
  108. f = self._file.subset(offset, length, name=name)
  109. else:
  110. f = StructFile(SubFile(self._file, offset, length), name=name)
  111. return f
  112. def list(self):
  113. return list(self._dir.keys())
  114. def file_exists(self, name):
  115. return name in self._dir
  116. def file_length(self, name):
  117. info = self._dir[name]
  118. return info["length"]
  119. def file_modified(self, name):
  120. info = self._dir[name]
  121. return info["modified"]
  122. def lock(self, name):
  123. if name not in self._locks:
  124. self._locks[name] = Lock()
  125. return self._locks[name]
  126. @staticmethod
  127. def assemble(dbfile, store, names, **options):
  128. assert names, names
  129. directory = {}
  130. basepos = dbfile.tell()
  131. dbfile.write_long(0) # Directory position
  132. dbfile.write_int(0) # Directory length
  133. # Copy the files into the compound file
  134. for name in names:
  135. if name.endswith(".toc") or name.endswith(".seg"):
  136. raise Exception(name)
  137. for name in names:
  138. offset = dbfile.tell()
  139. length = store.file_length(name)
  140. modified = store.file_modified(name)
  141. directory[name] = {"offset": offset, "length": length,
  142. "modified": modified}
  143. f = store.open_file(name)
  144. copyfileobj(f, dbfile)
  145. f.close()
  146. CompoundStorage.write_dir(dbfile, basepos, directory, options)
  147. @staticmethod
  148. def write_dir(dbfile, basepos, directory, options=None):
  149. options = options or {}
  150. dirpos = dbfile.tell() # Remember the start of the directory
  151. dbfile.write_pickle(directory) # Write the directory
  152. dbfile.write_pickle(options)
  153. endpos = dbfile.tell() # Remember the end of the directory
  154. dbfile.flush()
  155. dbfile.seek(basepos) # Seek back to the start
  156. dbfile.write_long(dirpos) # Directory position
  157. dbfile.write_int(endpos - dirpos) # Directory length
  158. dbfile.close()
  159. class SubFile(object):
  160. def __init__(self, parentfile, offset, length, name=None):
  161. self._file = parentfile
  162. self._offset = offset
  163. self._length = length
  164. self._end = offset + length
  165. self._pos = 0
  166. self.name = name
  167. self.closed = False
  168. def close(self):
  169. self.closed = True
  170. def subset(self, position, length, name=None):
  171. start = self._offset + position
  172. end = start + length
  173. name = name or self.name
  174. assert self._offset >= start >= self._end
  175. assert self._offset >= end >= self._end
  176. return SubFile(self._file, self._offset + position, length, name=name)
  177. def read(self, size=None):
  178. if size is None:
  179. size = self._length - self._pos
  180. else:
  181. size = min(size, self._length - self._pos)
  182. if size < 0:
  183. size = 0
  184. if size > 0:
  185. self._file.seek(self._offset + self._pos)
  186. self._pos += size
  187. return self._file.read(size)
  188. else:
  189. return emptybytes
  190. def readline(self):
  191. maxsize = self._length - self._pos
  192. self._file.seek(self._offset + self._pos)
  193. data = self._file.readline()
  194. if len(data) > maxsize:
  195. data = data[:maxsize]
  196. self._pos += len(data)
  197. return data
  198. def seek(self, where, whence=0):
  199. if whence == 0: # Absolute
  200. pos = where
  201. elif whence == 1: # Relative
  202. pos = self._pos + where
  203. elif whence == 2: # From end
  204. pos = self._length - where
  205. else:
  206. raise ValueError
  207. self._pos = pos
  208. def tell(self):
  209. return self._pos
  210. class CompoundWriter(object):
  211. def __init__(self, tempstorage, buffersize=32 * 1024):
  212. assert isinstance(buffersize, int)
  213. self._tempstorage = tempstorage
  214. self._tempname = "%s.ctmp" % random_name()
  215. self._temp = tempstorage.create_file(self._tempname, mode="w+b")
  216. self._buffersize = buffersize
  217. self._streams = {}
  218. def create_file(self, name):
  219. ss = self.SubStream(self._temp, self._buffersize)
  220. self._streams[name] = ss
  221. return StructFile(ss)
  222. def _readback(self):
  223. temp = self._temp
  224. for name, substream in self._streams.items():
  225. substream.close()
  226. def gen():
  227. for f, offset, length in substream.blocks:
  228. if f is None:
  229. f = temp
  230. f.seek(offset)
  231. yield f.read(length)
  232. yield (name, gen)
  233. temp.close()
  234. self._tempstorage.delete_file(self._tempname)
  235. def save_as_compound(self, dbfile):
  236. basepos = dbfile.tell()
  237. dbfile.write_long(0) # Directory offset
  238. dbfile.write_int(0) # Directory length
  239. directory = {}
  240. for name, blocks in self._readback():
  241. filestart = dbfile.tell()
  242. for block in blocks():
  243. dbfile.write(block)
  244. directory[name] = {"offset": filestart,
  245. "length": dbfile.tell() - filestart}
  246. CompoundStorage.write_dir(dbfile, basepos, directory)
  247. def save_as_files(self, storage, name_fn):
  248. for name, blocks in self._readback():
  249. f = storage.create_file(name_fn(name))
  250. for block in blocks():
  251. f.write(block)
  252. f.close()
  253. class SubStream(object):
  254. def __init__(self, dbfile, buffersize):
  255. self._dbfile = dbfile
  256. self._buffersize = buffersize
  257. self._buffer = BytesIO()
  258. self.blocks = []
  259. def tell(self):
  260. return sum(b[2] for b in self.blocks) + self._buffer.tell()
  261. def write(self, inbytes):
  262. bio = self._buffer
  263. buflen = bio.tell()
  264. length = buflen + len(inbytes)
  265. if length >= self._buffersize:
  266. offset = self._dbfile.tell()
  267. self._dbfile.write(bio.getvalue()[:buflen])
  268. self._dbfile.write(inbytes)
  269. self.blocks.append((None, offset, length))
  270. self._buffer.seek(0)
  271. else:
  272. bio.write(inbytes)
  273. def close(self):
  274. bio = self._buffer
  275. length = bio.tell()
  276. if length:
  277. self.blocks.append((bio, 0, length))