queues.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. # Copyright 2015 The Tornado Authors
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. """Asynchronous queues for coroutines. These classes are very similar
  15. to those provided in the standard library's `asyncio package
  16. <https://docs.python.org/3/library/asyncio-queue.html>`_.
  17. .. warning::
  18. Unlike the standard library's `queue` module, the classes defined here
  19. are *not* thread-safe. To use these queues from another thread,
  20. use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
  21. before calling any queue methods.
  22. """
  23. from __future__ import absolute_import, division, print_function
  24. import collections
  25. import heapq
  26. from tornado import gen, ioloop
  27. from tornado.concurrent import Future, future_set_result_unless_cancelled
  28. from tornado.locks import Event
  29. __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
  30. class QueueEmpty(Exception):
  31. """Raised by `.Queue.get_nowait` when the queue has no items."""
  32. pass
  33. class QueueFull(Exception):
  34. """Raised by `.Queue.put_nowait` when a queue is at its maximum size."""
  35. pass
  36. def _set_timeout(future, timeout):
  37. if timeout:
  38. def on_timeout():
  39. if not future.done():
  40. future.set_exception(gen.TimeoutError())
  41. io_loop = ioloop.IOLoop.current()
  42. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  43. future.add_done_callback(
  44. lambda _: io_loop.remove_timeout(timeout_handle))
  45. class _QueueIterator(object):
  46. def __init__(self, q):
  47. self.q = q
  48. def __anext__(self):
  49. return self.q.get()
  50. class Queue(object):
  51. """Coordinate producer and consumer coroutines.
  52. If maxsize is 0 (the default) the queue size is unbounded.
  53. .. testcode::
  54. from tornado import gen
  55. from tornado.ioloop import IOLoop
  56. from tornado.queues import Queue
  57. q = Queue(maxsize=2)
  58. async def consumer():
  59. async for item in q:
  60. try:
  61. print('Doing work on %s' % item)
  62. await gen.sleep(0.01)
  63. finally:
  64. q.task_done()
  65. async def producer():
  66. for item in range(5):
  67. await q.put(item)
  68. print('Put %s' % item)
  69. async def main():
  70. # Start consumer without waiting (since it never finishes).
  71. IOLoop.current().spawn_callback(consumer)
  72. await producer() # Wait for producer to put all tasks.
  73. await q.join() # Wait for consumer to finish all tasks.
  74. print('Done')
  75. IOLoop.current().run_sync(main)
  76. .. testoutput::
  77. Put 0
  78. Put 1
  79. Doing work on 0
  80. Put 2
  81. Doing work on 1
  82. Put 3
  83. Doing work on 2
  84. Put 4
  85. Doing work on 3
  86. Doing work on 4
  87. Done
  88. In versions of Python without native coroutines (before 3.5),
  89. ``consumer()`` could be written as::
  90. @gen.coroutine
  91. def consumer():
  92. while True:
  93. item = yield q.get()
  94. try:
  95. print('Doing work on %s' % item)
  96. yield gen.sleep(0.01)
  97. finally:
  98. q.task_done()
  99. .. versionchanged:: 4.3
  100. Added ``async for`` support in Python 3.5.
  101. """
  102. def __init__(self, maxsize=0):
  103. if maxsize is None:
  104. raise TypeError("maxsize can't be None")
  105. if maxsize < 0:
  106. raise ValueError("maxsize can't be negative")
  107. self._maxsize = maxsize
  108. self._init()
  109. self._getters = collections.deque([]) # Futures.
  110. self._putters = collections.deque([]) # Pairs of (item, Future).
  111. self._unfinished_tasks = 0
  112. self._finished = Event()
  113. self._finished.set()
  114. @property
  115. def maxsize(self):
  116. """Number of items allowed in the queue."""
  117. return self._maxsize
  118. def qsize(self):
  119. """Number of items in the queue."""
  120. return len(self._queue)
  121. def empty(self):
  122. return not self._queue
  123. def full(self):
  124. if self.maxsize == 0:
  125. return False
  126. else:
  127. return self.qsize() >= self.maxsize
  128. def put(self, item, timeout=None):
  129. """Put an item into the queue, perhaps waiting until there is room.
  130. Returns a Future, which raises `tornado.util.TimeoutError` after a
  131. timeout.
  132. ``timeout`` may be a number denoting a time (on the same
  133. scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
  134. `datetime.timedelta` object for a deadline relative to the
  135. current time.
  136. """
  137. future = Future()
  138. try:
  139. self.put_nowait(item)
  140. except QueueFull:
  141. self._putters.append((item, future))
  142. _set_timeout(future, timeout)
  143. else:
  144. future.set_result(None)
  145. return future
  146. def put_nowait(self, item):
  147. """Put an item into the queue without blocking.
  148. If no free slot is immediately available, raise `QueueFull`.
  149. """
  150. self._consume_expired()
  151. if self._getters:
  152. assert self.empty(), "queue non-empty, why are getters waiting?"
  153. getter = self._getters.popleft()
  154. self.__put_internal(item)
  155. future_set_result_unless_cancelled(getter, self._get())
  156. elif self.full():
  157. raise QueueFull
  158. else:
  159. self.__put_internal(item)
  160. def get(self, timeout=None):
  161. """Remove and return an item from the queue.
  162. Returns a Future which resolves once an item is available, or raises
  163. `tornado.util.TimeoutError` after a timeout.
  164. ``timeout`` may be a number denoting a time (on the same
  165. scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
  166. `datetime.timedelta` object for a deadline relative to the
  167. current time.
  168. """
  169. future = Future()
  170. try:
  171. future.set_result(self.get_nowait())
  172. except QueueEmpty:
  173. self._getters.append(future)
  174. _set_timeout(future, timeout)
  175. return future
  176. def get_nowait(self):
  177. """Remove and return an item from the queue without blocking.
  178. Return an item if one is immediately available, else raise
  179. `QueueEmpty`.
  180. """
  181. self._consume_expired()
  182. if self._putters:
  183. assert self.full(), "queue not full, why are putters waiting?"
  184. item, putter = self._putters.popleft()
  185. self.__put_internal(item)
  186. future_set_result_unless_cancelled(putter, None)
  187. return self._get()
  188. elif self.qsize():
  189. return self._get()
  190. else:
  191. raise QueueEmpty
  192. def task_done(self):
  193. """Indicate that a formerly enqueued task is complete.
  194. Used by queue consumers. For each `.get` used to fetch a task, a
  195. subsequent call to `.task_done` tells the queue that the processing
  196. on the task is complete.
  197. If a `.join` is blocking, it resumes when all items have been
  198. processed; that is, when every `.put` is matched by a `.task_done`.
  199. Raises `ValueError` if called more times than `.put`.
  200. """
  201. if self._unfinished_tasks <= 0:
  202. raise ValueError('task_done() called too many times')
  203. self._unfinished_tasks -= 1
  204. if self._unfinished_tasks == 0:
  205. self._finished.set()
  206. def join(self, timeout=None):
  207. """Block until all items in the queue are processed.
  208. Returns a Future, which raises `tornado.util.TimeoutError` after a
  209. timeout.
  210. """
  211. return self._finished.wait(timeout)
  212. def __aiter__(self):
  213. return _QueueIterator(self)
  214. # These three are overridable in subclasses.
  215. def _init(self):
  216. self._queue = collections.deque()
  217. def _get(self):
  218. return self._queue.popleft()
  219. def _put(self, item):
  220. self._queue.append(item)
  221. # End of the overridable methods.
  222. def __put_internal(self, item):
  223. self._unfinished_tasks += 1
  224. self._finished.clear()
  225. self._put(item)
  226. def _consume_expired(self):
  227. # Remove timed-out waiters.
  228. while self._putters and self._putters[0][1].done():
  229. self._putters.popleft()
  230. while self._getters and self._getters[0].done():
  231. self._getters.popleft()
  232. def __repr__(self):
  233. return '<%s at %s %s>' % (
  234. type(self).__name__, hex(id(self)), self._format())
  235. def __str__(self):
  236. return '<%s %s>' % (type(self).__name__, self._format())
  237. def _format(self):
  238. result = 'maxsize=%r' % (self.maxsize, )
  239. if getattr(self, '_queue', None):
  240. result += ' queue=%r' % self._queue
  241. if self._getters:
  242. result += ' getters[%s]' % len(self._getters)
  243. if self._putters:
  244. result += ' putters[%s]' % len(self._putters)
  245. if self._unfinished_tasks:
  246. result += ' tasks=%s' % self._unfinished_tasks
  247. return result
  248. class PriorityQueue(Queue):
  249. """A `.Queue` that retrieves entries in priority order, lowest first.
  250. Entries are typically tuples like ``(priority number, data)``.
  251. .. testcode::
  252. from tornado.queues import PriorityQueue
  253. q = PriorityQueue()
  254. q.put((1, 'medium-priority item'))
  255. q.put((0, 'high-priority item'))
  256. q.put((10, 'low-priority item'))
  257. print(q.get_nowait())
  258. print(q.get_nowait())
  259. print(q.get_nowait())
  260. .. testoutput::
  261. (0, 'high-priority item')
  262. (1, 'medium-priority item')
  263. (10, 'low-priority item')
  264. """
  265. def _init(self):
  266. self._queue = []
  267. def _put(self, item):
  268. heapq.heappush(self._queue, item)
  269. def _get(self):
  270. return heapq.heappop(self._queue)
  271. class LifoQueue(Queue):
  272. """A `.Queue` that retrieves the most recently put items first.
  273. .. testcode::
  274. from tornado.queues import LifoQueue
  275. q = LifoQueue()
  276. q.put(3)
  277. q.put(2)
  278. q.put(1)
  279. print(q.get_nowait())
  280. print(q.get_nowait())
  281. print(q.get_nowait())
  282. .. testoutput::
  283. 1
  284. 2
  285. 3
  286. """
  287. def _init(self):
  288. self._queue = []
  289. def _put(self, item):
  290. self._queue.append(item)
  291. def _get(self):
  292. return self._queue.pop()