externalsort.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # Copyright 2011 Matt Chaput. All rights reserved.
  2. #
  3. # Redistribution and use in source and binary forms, with or without
  4. # modification, are permitted provided that the following conditions are met:
  5. #
  6. # 1. Redistributions of source code must retain the above copyright notice,
  7. # this list of conditions and the following disclaimer.
  8. #
  9. # 2. Redistributions in binary form must reproduce the above copyright
  10. # notice, this list of conditions and the following disclaimer in the
  11. # documentation and/or other materials provided with the distribution.
  12. #
  13. # THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
  14. # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
  15. # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
  16. # EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
  17. # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  18. # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
  19. # OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  20. # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  21. # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
  22. # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. #
  24. # The views and conclusions contained in the software and documentation are
  25. # those of the authors and should not be interpreted as representing official
  26. # policies, either expressed or implied, of Matt Chaput.
  27. """
  28. This module implements a general external merge sort for Python objects.
  29. """
  30. from __future__ import with_statement
  31. import os, tempfile
  32. from heapq import heapify, heappop, heapreplace
  33. from whoosh.compat import dump, load
  34. ## Python 3.2 had a bug that make marshal.load unusable
  35. #if (hasattr(platform, "python_implementation")
  36. # and platform.python_implementation() == "CPython"
  37. # and platform.python_version() == "3.2.0"):
  38. # # Use pickle instead of marshal on Python 3.2
  39. # from whoosh.compat import dump as dump_pickle
  40. # from whoosh.compat import load
  41. #
  42. # def dump(obj, f):
  43. # dump_pickle(obj, f, -1)
  44. #else:
  45. # from marshal import dump, load
  46. try:
  47. from heapq import merge
  48. def imerge(iterables):
  49. return merge(*iterables)
  50. except ImportError:
  51. def imerge(iterables):
  52. _hpop, _hreplace, _Stop = (heappop, heapreplace, StopIteration)
  53. h = []
  54. h_append = h.append
  55. for itnum, it in enumerate(map(iter, iterables)):
  56. try:
  57. nx = it.next
  58. h_append([nx(), itnum, nx])
  59. except _Stop:
  60. pass
  61. heapify(h)
  62. while 1:
  63. try:
  64. while 1:
  65. v, itnum, nx = s = h[0]
  66. yield v
  67. s[0] = nx()
  68. _hreplace(h, s)
  69. except _Stop:
  70. _hpop(h)
  71. except IndexError:
  72. return
  73. class SortingPool(object):
  74. """This object implements a general K-way external merge sort for Python
  75. objects.
  76. >>> pool = MergePool()
  77. >>> # Add an unlimited number of items in any order
  78. >>> for item in my_items:
  79. ... pool.add(item)
  80. ...
  81. >>> # Get the items back in sorted order
  82. >>> for item in pool.items():
  83. ... print(item)
  84. This class uses the `marshal` module to write the items to temporary files,
  85. so you can only sort marshal-able types (generally: numbers, strings,
  86. tuples, lists, and dicts).
  87. """
  88. def __init__(self, maxsize=1000000, tempdir=None, prefix="",
  89. suffix=".run"):
  90. """
  91. :param maxsize: the maximum number of items to keep in memory at once.
  92. :param tempdir: the path of a directory to use for temporary file
  93. storage. The default is to use the system's temp directory.
  94. :param prefix: a prefix to add to temporary filenames.
  95. :param suffix: a suffix to add to temporary filenames.
  96. """
  97. self.tempdir = tempdir
  98. if maxsize < 1:
  99. raise ValueError("maxsize=%s must be >= 1" % maxsize)
  100. self.maxsize = maxsize
  101. self.prefix = prefix
  102. self.suffix = suffix
  103. # Current run queue
  104. self.current = []
  105. # List of run filenames
  106. self.runs = []
  107. def _new_run(self):
  108. fd, path = tempfile.mkstemp(prefix=self.prefix, suffix=self.suffix,
  109. dir=self.tempdir)
  110. f = os.fdopen(fd, "wb")
  111. return path, f
  112. def _open_run(self, path):
  113. return open(path, "rb")
  114. def _remove_run(self, path):
  115. os.remove(path)
  116. def _read_run(self, path):
  117. f = self._open_run(path)
  118. try:
  119. while True:
  120. yield load(f)
  121. except EOFError:
  122. return
  123. finally:
  124. f.close()
  125. self._remove_run(path)
  126. def _merge_runs(self, paths):
  127. iters = [self._read_run(path) for path in paths]
  128. for item in imerge(iters):
  129. yield item
  130. def add(self, item):
  131. """Adds `item` to the pool to be sorted.
  132. """
  133. if len(self.current) >= self.maxsize:
  134. self.save()
  135. self.current.append(item)
  136. def _write_run(self, f, items):
  137. for item in items:
  138. dump(item, f, 2)
  139. f.close()
  140. def _add_run(self, filename):
  141. self.runs.append(filename)
  142. def save(self):
  143. current = self.current
  144. if current:
  145. current.sort()
  146. path, f = self._new_run()
  147. self._write_run(f, current)
  148. self._add_run(path)
  149. self.current = []
  150. def cleanup(self):
  151. for path in self.runs:
  152. try:
  153. os.remove(path)
  154. except OSError:
  155. pass
  156. def reduce_to(self, target, k):
  157. # Reduce the number of runs to "target" by merging "k" runs at a time
  158. if k < 2:
  159. raise ValueError("k=%s must be > 2" % k)
  160. if target < 1:
  161. raise ValueError("target=%s must be >= 1" % target)
  162. runs = self.runs
  163. while len(runs) > target:
  164. newpath, f = self._new_run()
  165. # Take k runs off the end of the run list
  166. tomerge = []
  167. while runs and len(tomerge) < k:
  168. tomerge.append(runs.pop())
  169. # Merge them into a new run and add it at the start of the list
  170. self._write_run(f, self._merge_runs(tomerge))
  171. runs.insert(0, newpath)
  172. def items(self, maxfiles=128):
  173. """Returns a sorted list or iterator of the items in the pool.
  174. :param maxfiles: maximum number of files to open at once.
  175. """
  176. if maxfiles < 2:
  177. raise ValueError("maxfiles=%s must be >= 2" % maxfiles)
  178. if not self.runs:
  179. # We never wrote a run to disk, so just sort the queue in memory
  180. # and return that
  181. return sorted(self.current)
  182. # Write a new run with the leftover items in the queue
  183. self.save()
  184. # If we have more runs than allowed open files, merge some of the runs
  185. if maxfiles < len(self.runs):
  186. self.reduce_to(maxfiles, maxfiles)
  187. # Take all the runs off the run list and merge them
  188. runs = self.runs
  189. self.runs = [] # Minor detail, makes this object reusable
  190. return self._merge_runs(runs)
  191. def sort(items, maxsize=100000, tempdir=None, maxfiles=128):
  192. """Sorts the given items using an external merge sort.
  193. :param tempdir: the path of a directory to use for temporary file
  194. storage. The default is to use the system's temp directory.
  195. :param maxsize: the maximum number of items to keep in memory at once.
  196. :param maxfiles: maximum number of files to open at once.
  197. """
  198. p = SortingPool(maxsize=maxsize, tempdir=tempdir)
  199. for item in items:
  200. p.add(item)
  201. return p.items(maxfiles=maxfiles)