multiproc.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. # Copyright 2011 Matt Chaput. All rights reserved.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions are met:
  5. #
  6. # 1. Redistributions of source code must retain the above copyright notice,
  7. # this list of conditions and the following disclaimer.
  8. #
  9. # 2. Redistributions in binary form must reproduce the above copyright
  10. # notice, this list of conditions and the following disclaimer in the
  11. # documentation and/or other materials provided with the distribution.
  12. #
  13. # THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
  14. # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  15. # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
  16. # EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  17. # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  18. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
  19. # OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  20. # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  21. # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
  22. # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. #
  24. # The views and conclusions contained in the software and documentation are
  25. # those of the authors and should not be interpreted as representing official
  26. # policies, either expressed or implied, of Matt Chaput.
  27. from __future__ import with_statement
  28. import os
  29. from multiprocessing import Process, Queue, cpu_count
  30. from whoosh.compat import queue, xrange, iteritems, pickle
  31. from whoosh.codec import base
  32. from whoosh.writing import PostingPool, SegmentWriter
  33. from whoosh.externalsort import imerge
  34. from whoosh.util import random_name
  35. def finish_subsegment(writer, k=64):
  36. # Tell the pool to finish up the current file
  37. writer.pool.save()
  38. # Tell the pool to merge any and all runs in the pool until there
  39. # is only one run remaining. "k" is an optional parameter passed
  40. # from the parent which sets the maximum number of files to open
  41. # while reducing.
  42. writer.pool.reduce_to(1, k)
  43. # The filename of the single remaining run
  44. runname = writer.pool.runs[0]
  45. # The indexed field names
  46. fieldnames = writer.pool.fieldnames
  47. # The segment object (parent can use this to re-open the files created
  48. # by the sub-writer)
  49. segment = writer._partial_segment()
  50. return runname, fieldnames, segment
  51. # Multiprocessing Writer
  52. class SubWriterTask(Process):
  53. # This is a Process object that takes "jobs" off a job Queue, processes
  54. # them, and when it's done, puts a summary of its work on a results Queue
  55. def __init__(self, storage, indexname, jobqueue, resultqueue, kwargs,
  56. multisegment):
  57. Process.__init__(self)
  58. self.storage = storage
  59. self.indexname = indexname
  60. self.jobqueue = jobqueue
  61. self.resultqueue = resultqueue
  62. self.kwargs = kwargs
  63. self.multisegment = multisegment
  64. self.running = True
  65. def run(self):
  66. # This is the main loop of the process. OK, so the way this works is
  67. # kind of brittle and stupid, but I had to figure out how to use the
  68. # multiprocessing module, work around bugs, and address performance
  69. # issues, so there is at least some reasoning behind some of this
  70. # The "parent" task farms individual documents out to the subtasks for
  71. # indexing. You could pickle the actual documents and put them in the
  72. # queue, but that is not very performant. Instead, we assume the tasks
  73. # share a filesystem and use that to pass the information around. The
  74. # parent task writes a certain number of documents to a file, then puts
  75. # the filename on the "job queue". A subtask gets the filename off the
  76. # queue and reads through the file processing the documents.
  77. jobqueue = self.jobqueue
  78. resultqueue = self.resultqueue
  79. multisegment = self.multisegment
  80. # Open a placeholder object representing the index
  81. ix = self.storage.open_index(self.indexname)
  82. # Open a writer for the index. The _lk=False parameter means to not try
  83. # to lock the index (the parent object that started me takes care of
  84. # locking the index)
  85. writer = self.writer = SegmentWriter(ix, _lk=False, **self.kwargs)
  86. # If the parent task calls cancel() on me, it will set self.running to
  87. # False, so I'll notice the next time through the loop
  88. while self.running:
  89. # Take an object off the job queue
  90. jobinfo = jobqueue.get()
  91. # If the object is None, it means the parent task wants me to
  92. # finish up
  93. if jobinfo is None:
  94. break
  95. # The object from the queue is a tuple of (filename,
  96. # number_of_docs_in_file). Pass those two pieces of information as
  97. # arguments to _process_file().
  98. self._process_file(*jobinfo)
  99. # jobqueue.task_done()
  100. if not self.running:
  101. # I was cancelled, so I'll cancel my underlying writer
  102. writer.cancel()
  103. else:
  104. if multisegment:
  105. # Actually finish the segment and return it with no run
  106. runname = None
  107. fieldnames = writer.pool.fieldnames
  108. segment = writer._finalize_segment()
  109. else:
  110. # Merge all runs in the writer's pool into one run, close the
  111. # segment, and return the run name and the segment
  112. k = self.kwargs.get("k", 64)
  113. runname, fieldnames, segment = finish_subsegment(writer, k)
  114. # Put the results (the run filename and the segment object) on the
  115. # result queue
  116. resultqueue.put((runname, fieldnames, segment), timeout=5)
  117. def _process_file(self, filename, doc_count):
  118. # This method processes a "job file" written out by the parent task. A
  119. # job file is a series of pickled (code, arguments) tuples. Currently
  120. # the only command codes is 0=add_document
  121. writer = self.writer
  122. tempstorage = writer.temp_storage()
  123. load = pickle.load
  124. with tempstorage.open_file(filename).raw_file() as f:
  125. for _ in xrange(doc_count):
  126. # Load the next pickled tuple from the file
  127. code, args = load(f)
  128. assert code == 0
  129. writer.add_document(**args)
  130. # Remove the job file
  131. tempstorage.delete_file(filename)
  132. def cancel(self):
  133. self.running = False
  134. class MpWriter(SegmentWriter):
  135. def __init__(self, ix, procs=None, batchsize=100, subargs=None,
  136. multisegment=False, **kwargs):
  137. # This is the "main" writer that will aggregate the results created by
  138. # the sub-tasks
  139. SegmentWriter.__init__(self, ix, **kwargs)
  140. self.procs = procs or cpu_count()
  141. # The maximum number of documents in each job file submitted to the
  142. # sub-tasks
  143. self.batchsize = batchsize
  144. # You can use keyword arguments or the "subargs" argument to pass
  145. # keyword arguments to the sub-writers
  146. self.subargs = subargs if subargs else kwargs
  147. # If multisegment is True, don't merge the segments created by the
  148. # sub-writers, just add them directly to the TOC
  149. self.multisegment = multisegment
  150. # A list to hold the sub-task Process objects
  151. self.tasks = []
  152. # A queue to pass the filenames of job files to the sub-tasks
  153. self.jobqueue = Queue(self.procs * 4)
  154. # A queue to get back the final results of the sub-tasks
  155. self.resultqueue = Queue()
  156. # A buffer for documents before they are flushed to a job file
  157. self.docbuffer = []
  158. self._grouping = 0
  159. self._added_sub = False
  160. def _new_task(self):
  161. task = SubWriterTask(self.storage, self.indexname,
  162. self.jobqueue, self.resultqueue, self.subargs,
  163. self.multisegment)
  164. self.tasks.append(task)
  165. task.start()
  166. return task
  167. def _enqueue(self):
  168. # Flush the documents stored in self.docbuffer to a file and put the
  169. # filename on the job queue
  170. docbuffer = self.docbuffer
  171. dump = pickle.dump
  172. length = len(docbuffer)
  173. filename = "%s.doclist" % random_name()
  174. with self.temp_storage().create_file(filename).raw_file() as f:
  175. for item in docbuffer:
  176. dump(item, f, 2)
  177. if len(self.tasks) < self.procs:
  178. self._new_task()
  179. jobinfo = (filename, length)
  180. self.jobqueue.put(jobinfo)
  181. self.docbuffer = []
  182. def cancel(self):
  183. try:
  184. for task in self.tasks:
  185. task.cancel()
  186. finally:
  187. SegmentWriter.cancel(self)
  188. def start_group(self):
  189. self._grouping += 1
  190. def end_group(self):
  191. if not self._grouping:
  192. raise Exception("Unbalanced end_group")
  193. self._grouping -= 1
  194. def add_document(self, **fields):
  195. # Add the document to the docbuffer
  196. self.docbuffer.append((0, fields))
  197. # If the buffer is full, flush it to the job queue
  198. if not self._grouping and len(self.docbuffer) >= self.batchsize:
  199. self._enqueue()
  200. self._added_sub = True
  201. def _read_and_renumber_run(self, path, offset):
  202. # Note that SortingPool._read_run() automatically deletes the run file
  203. # when it's finished
  204. gen = self.pool._read_run(path)
  205. # If offset is 0, just return the items unchanged
  206. if not offset:
  207. return gen
  208. else:
  209. # Otherwise, add the offset to each docnum
  210. return ((fname, text, docnum + offset, weight, value)
  211. for fname, text, docnum, weight, value in gen)
  212. def commit(self, mergetype=None, optimize=None, merge=None):
  213. if self._added_sub:
  214. # If documents have been added to sub-writers, use the parallel
  215. # merge commit code
  216. self._commit(mergetype, optimize, merge)
  217. else:
  218. # Otherwise, just do a regular-old commit
  219. SegmentWriter.commit(self, mergetype=mergetype, optimize=optimize,
  220. merge=merge)
  221. def _commit(self, mergetype, optimize, merge):
  222. # Index the remaining documents in the doc buffer
  223. if self.docbuffer:
  224. self._enqueue()
  225. # Tell the tasks to finish
  226. for task in self.tasks:
  227. self.jobqueue.put(None)
  228. # Merge existing segments
  229. finalsegments = self._merge_segments(mergetype, optimize, merge)
  230. # Wait for the subtasks to finish
  231. for task in self.tasks:
  232. task.join()
  233. # Pull a (run_file_name, fieldnames, segment) tuple off the result
  234. # queue for each sub-task, representing the final results of the task
  235. results = []
  236. for _ in self.tasks:
  237. try:
  238. results.append(self.resultqueue.get(timeout=1))
  239. except queue.Empty:
  240. pass
  241. if self.multisegment:
  242. # If we're not merging the segments, we don't care about the runname
  243. # and fieldnames in the results... just pull out the segments and
  244. # add them to the list of final segments
  245. finalsegments += [s for _, _, s in results]
  246. if self._added:
  247. finalsegments.append(self._finalize_segment())
  248. else:
  249. self._close_segment()
  250. assert self.perdocwriter.is_closed
  251. else:
  252. # Merge the posting sources from the sub-writers and my
  253. # postings into this writer
  254. self._merge_subsegments(results, mergetype)
  255. self._close_segment()
  256. self._assemble_segment()
  257. finalsegments.append(self.get_segment())
  258. assert self.perdocwriter.is_closed
  259. self._commit_toc(finalsegments)
  260. self._finish()
  261. def _merge_subsegments(self, results, mergetype):
  262. schema = self.schema
  263. schemanames = set(schema.names())
  264. storage = self.storage
  265. codec = self.codec
  266. sources = []
  267. # If information was added to this writer the conventional (e.g.
  268. # through add_reader or merging segments), add it as an extra source
  269. if self._added:
  270. sources.append(self.pool.iter_postings())
  271. pdrs = []
  272. for runname, fieldnames, segment in results:
  273. fieldnames = set(fieldnames) | schemanames
  274. pdr = codec.per_document_reader(storage, segment)
  275. pdrs.append(pdr)
  276. basedoc = self.docnum
  277. docmap = self.write_per_doc(fieldnames, pdr)
  278. assert docmap is None
  279. items = self._read_and_renumber_run(runname, basedoc)
  280. sources.append(items)
  281. # Create a MultiLengths object combining the length files from the
  282. # subtask segments
  283. self.perdocwriter.close()
  284. pdrs.insert(0, self.per_document_reader())
  285. mpdr = base.MultiPerDocumentReader(pdrs)
  286. try:
  287. # Merge the iterators into the field writer
  288. self.fieldwriter.add_postings(schema, mpdr, imerge(sources))
  289. finally:
  290. mpdr.close()
  291. self._added = True
  292. class SerialMpWriter(MpWriter):
  293. # A non-parallel version of the MpWriter for testing purposes
  294. def __init__(self, ix, procs=None, batchsize=100, subargs=None, **kwargs):
  295. SegmentWriter.__init__(self, ix, **kwargs)
  296. self.procs = procs or cpu_count()
  297. self.batchsize = batchsize
  298. self.subargs = subargs if subargs else kwargs
  299. self.tasks = [SegmentWriter(ix, _lk=False, **self.subargs)
  300. for _ in xrange(self.procs)]
  301. self.pointer = 0
  302. self._added_sub = False
  303. def add_document(self, **fields):
  304. self.tasks[self.pointer].add_document(**fields)
  305. self.pointer = (self.pointer + 1) % len(self.tasks)
  306. self._added_sub = True
  307. def _commit(self, mergetype, optimize, merge):
  308. # Pull a (run_file_name, segment) tuple off the result queue for each
  309. # sub-task, representing the final results of the task
  310. # Merge existing segments
  311. finalsegments = self._merge_segments(mergetype, optimize, merge)
  312. results = []
  313. for writer in self.tasks:
  314. results.append(finish_subsegment(writer))
  315. self._merge_subsegments(results, mergetype)
  316. self._close_segment()
  317. self._assemble_segment()
  318. finalsegments.append(self.get_segment())
  319. self._commit_toc(finalsegments)
  320. self._finish()
  321. # For compatibility with old multiproc module
  322. class MultiSegmentWriter(MpWriter):
  323. def __init__(self, *args, **kwargs):
  324. MpWriter.__init__(self, *args, **kwargs)
  325. self.multisegment = True