queues_test.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  2. # not use this file except in compliance with the License. You may obtain
  3. # a copy of the License at
  4. #
  5. # http://www.apache.org/licenses/LICENSE-2.0
  6. #
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  9. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  10. # License for the specific language governing permissions and limitations
  11. # under the License.
  12. from __future__ import absolute_import, division, print_function
  13. from datetime import timedelta
  14. from random import random
  15. from tornado import gen, queues
  16. from tornado.gen import TimeoutError
  17. from tornado.testing import gen_test, AsyncTestCase
  18. from tornado.test.util import unittest, skipBefore35, exec_test
  19. class QueueBasicTest(AsyncTestCase):
  20. def test_repr_and_str(self):
  21. q = queues.Queue(maxsize=1)
  22. self.assertIn(hex(id(q)), repr(q))
  23. self.assertNotIn(hex(id(q)), str(q))
  24. q.get()
  25. for q_str in repr(q), str(q):
  26. self.assertTrue(q_str.startswith('<Queue'))
  27. self.assertIn('maxsize=1', q_str)
  28. self.assertIn('getters[1]', q_str)
  29. self.assertNotIn('putters', q_str)
  30. self.assertNotIn('tasks', q_str)
  31. q.put(None)
  32. q.put(None)
  33. # Now the queue is full, this putter blocks.
  34. q.put(None)
  35. for q_str in repr(q), str(q):
  36. self.assertNotIn('getters', q_str)
  37. self.assertIn('putters[1]', q_str)
  38. self.assertIn('tasks=2', q_str)
  39. def test_order(self):
  40. q = queues.Queue()
  41. for i in [1, 3, 2]:
  42. q.put_nowait(i)
  43. items = [q.get_nowait() for _ in range(3)]
  44. self.assertEqual([1, 3, 2], items)
  45. @gen_test
  46. def test_maxsize(self):
  47. self.assertRaises(TypeError, queues.Queue, maxsize=None)
  48. self.assertRaises(ValueError, queues.Queue, maxsize=-1)
  49. q = queues.Queue(maxsize=2)
  50. self.assertTrue(q.empty())
  51. self.assertFalse(q.full())
  52. self.assertEqual(2, q.maxsize)
  53. self.assertTrue(q.put(0).done())
  54. self.assertTrue(q.put(1).done())
  55. self.assertFalse(q.empty())
  56. self.assertTrue(q.full())
  57. put2 = q.put(2)
  58. self.assertFalse(put2.done())
  59. self.assertEqual(0, (yield q.get())) # Make room.
  60. self.assertTrue(put2.done())
  61. self.assertFalse(q.empty())
  62. self.assertTrue(q.full())
  63. class QueueGetTest(AsyncTestCase):
  64. @gen_test
  65. def test_blocking_get(self):
  66. q = queues.Queue()
  67. q.put_nowait(0)
  68. self.assertEqual(0, (yield q.get()))
  69. def test_nonblocking_get(self):
  70. q = queues.Queue()
  71. q.put_nowait(0)
  72. self.assertEqual(0, q.get_nowait())
  73. def test_nonblocking_get_exception(self):
  74. q = queues.Queue()
  75. self.assertRaises(queues.QueueEmpty, q.get_nowait)
  76. @gen_test
  77. def test_get_with_putters(self):
  78. q = queues.Queue(1)
  79. q.put_nowait(0)
  80. put = q.put(1)
  81. self.assertEqual(0, (yield q.get()))
  82. self.assertIsNone((yield put))
  83. @gen_test
  84. def test_blocking_get_wait(self):
  85. q = queues.Queue()
  86. q.put(0)
  87. self.io_loop.call_later(0.01, q.put, 1)
  88. self.io_loop.call_later(0.02, q.put, 2)
  89. self.assertEqual(0, (yield q.get(timeout=timedelta(seconds=1))))
  90. self.assertEqual(1, (yield q.get(timeout=timedelta(seconds=1))))
  91. @gen_test
  92. def test_get_timeout(self):
  93. q = queues.Queue()
  94. get_timeout = q.get(timeout=timedelta(seconds=0.01))
  95. get = q.get()
  96. with self.assertRaises(TimeoutError):
  97. yield get_timeout
  98. q.put_nowait(0)
  99. self.assertEqual(0, (yield get))
  100. @gen_test
  101. def test_get_timeout_preempted(self):
  102. q = queues.Queue()
  103. get = q.get(timeout=timedelta(seconds=0.01))
  104. q.put(0)
  105. yield gen.sleep(0.02)
  106. self.assertEqual(0, (yield get))
  107. @gen_test
  108. def test_get_clears_timed_out_putters(self):
  109. q = queues.Queue(1)
  110. # First putter succeeds, remainder block.
  111. putters = [q.put(i, timedelta(seconds=0.01)) for i in range(10)]
  112. put = q.put(10)
  113. self.assertEqual(10, len(q._putters))
  114. yield gen.sleep(0.02)
  115. self.assertEqual(10, len(q._putters))
  116. self.assertFalse(put.done()) # Final waiter is still active.
  117. q.put(11)
  118. self.assertEqual(0, (yield q.get())) # get() clears the waiters.
  119. self.assertEqual(1, len(q._putters))
  120. for putter in putters[1:]:
  121. self.assertRaises(TimeoutError, putter.result)
  122. @gen_test
  123. def test_get_clears_timed_out_getters(self):
  124. q = queues.Queue()
  125. getters = [q.get(timedelta(seconds=0.01)) for _ in range(10)]
  126. get = q.get()
  127. self.assertEqual(11, len(q._getters))
  128. yield gen.sleep(0.02)
  129. self.assertEqual(11, len(q._getters))
  130. self.assertFalse(get.done()) # Final waiter is still active.
  131. q.get() # get() clears the waiters.
  132. self.assertEqual(2, len(q._getters))
  133. for getter in getters:
  134. self.assertRaises(TimeoutError, getter.result)
  135. @skipBefore35
  136. @gen_test
  137. def test_async_for(self):
  138. q = queues.Queue()
  139. for i in range(5):
  140. q.put(i)
  141. namespace = exec_test(globals(), locals(), """
  142. async def f():
  143. results = []
  144. async for i in q:
  145. results.append(i)
  146. if i == 4:
  147. return results
  148. """)
  149. results = yield namespace['f']()
  150. self.assertEqual(results, list(range(5)))
  151. class QueuePutTest(AsyncTestCase):
  152. @gen_test
  153. def test_blocking_put(self):
  154. q = queues.Queue()
  155. q.put(0)
  156. self.assertEqual(0, q.get_nowait())
  157. def test_nonblocking_put_exception(self):
  158. q = queues.Queue(1)
  159. q.put(0)
  160. self.assertRaises(queues.QueueFull, q.put_nowait, 1)
  161. @gen_test
  162. def test_put_with_getters(self):
  163. q = queues.Queue()
  164. get0 = q.get()
  165. get1 = q.get()
  166. yield q.put(0)
  167. self.assertEqual(0, (yield get0))
  168. yield q.put(1)
  169. self.assertEqual(1, (yield get1))
  170. @gen_test
  171. def test_nonblocking_put_with_getters(self):
  172. q = queues.Queue()
  173. get0 = q.get()
  174. get1 = q.get()
  175. q.put_nowait(0)
  176. # put_nowait does *not* immediately unblock getters.
  177. yield gen.moment
  178. self.assertEqual(0, (yield get0))
  179. q.put_nowait(1)
  180. yield gen.moment
  181. self.assertEqual(1, (yield get1))
  182. @gen_test
  183. def test_blocking_put_wait(self):
  184. q = queues.Queue(1)
  185. q.put_nowait(0)
  186. self.io_loop.call_later(0.01, q.get)
  187. self.io_loop.call_later(0.02, q.get)
  188. futures = [q.put(0), q.put(1)]
  189. self.assertFalse(any(f.done() for f in futures))
  190. yield futures
  191. @gen_test
  192. def test_put_timeout(self):
  193. q = queues.Queue(1)
  194. q.put_nowait(0) # Now it's full.
  195. put_timeout = q.put(1, timeout=timedelta(seconds=0.01))
  196. put = q.put(2)
  197. with self.assertRaises(TimeoutError):
  198. yield put_timeout
  199. self.assertEqual(0, q.get_nowait())
  200. # 1 was never put in the queue.
  201. self.assertEqual(2, (yield q.get()))
  202. # Final get() unblocked this putter.
  203. yield put
  204. @gen_test
  205. def test_put_timeout_preempted(self):
  206. q = queues.Queue(1)
  207. q.put_nowait(0)
  208. put = q.put(1, timeout=timedelta(seconds=0.01))
  209. q.get()
  210. yield gen.sleep(0.02)
  211. yield put # No TimeoutError.
  212. @gen_test
  213. def test_put_clears_timed_out_putters(self):
  214. q = queues.Queue(1)
  215. # First putter succeeds, remainder block.
  216. putters = [q.put(i, timedelta(seconds=0.01)) for i in range(10)]
  217. put = q.put(10)
  218. self.assertEqual(10, len(q._putters))
  219. yield gen.sleep(0.02)
  220. self.assertEqual(10, len(q._putters))
  221. self.assertFalse(put.done()) # Final waiter is still active.
  222. q.put(11) # put() clears the waiters.
  223. self.assertEqual(2, len(q._putters))
  224. for putter in putters[1:]:
  225. self.assertRaises(TimeoutError, putter.result)
  226. @gen_test
  227. def test_put_clears_timed_out_getters(self):
  228. q = queues.Queue()
  229. getters = [q.get(timedelta(seconds=0.01)) for _ in range(10)]
  230. get = q.get()
  231. q.get()
  232. self.assertEqual(12, len(q._getters))
  233. yield gen.sleep(0.02)
  234. self.assertEqual(12, len(q._getters))
  235. self.assertFalse(get.done()) # Final waiters still active.
  236. q.put(0) # put() clears the waiters.
  237. self.assertEqual(1, len(q._getters))
  238. self.assertEqual(0, (yield get))
  239. for getter in getters:
  240. self.assertRaises(TimeoutError, getter.result)
  241. @gen_test
  242. def test_float_maxsize(self):
  243. # Non-int maxsize must round down: http://bugs.python.org/issue21723
  244. q = queues.Queue(maxsize=1.3)
  245. self.assertTrue(q.empty())
  246. self.assertFalse(q.full())
  247. q.put_nowait(0)
  248. q.put_nowait(1)
  249. self.assertFalse(q.empty())
  250. self.assertTrue(q.full())
  251. self.assertRaises(queues.QueueFull, q.put_nowait, 2)
  252. self.assertEqual(0, q.get_nowait())
  253. self.assertFalse(q.empty())
  254. self.assertFalse(q.full())
  255. yield q.put(2)
  256. put = q.put(3)
  257. self.assertFalse(put.done())
  258. self.assertEqual(1, (yield q.get()))
  259. yield put
  260. self.assertTrue(q.full())
  261. class QueueJoinTest(AsyncTestCase):
  262. queue_class = queues.Queue
  263. def test_task_done_underflow(self):
  264. q = self.queue_class()
  265. self.assertRaises(ValueError, q.task_done)
  266. @gen_test
  267. def test_task_done(self):
  268. q = self.queue_class()
  269. for i in range(100):
  270. q.put_nowait(i)
  271. self.accumulator = 0
  272. @gen.coroutine
  273. def worker():
  274. while True:
  275. item = yield q.get()
  276. self.accumulator += item
  277. q.task_done()
  278. yield gen.sleep(random() * 0.01)
  279. # Two coroutines share work.
  280. worker()
  281. worker()
  282. yield q.join()
  283. self.assertEqual(sum(range(100)), self.accumulator)
  284. @gen_test
  285. def test_task_done_delay(self):
  286. # Verify it is task_done(), not get(), that unblocks join().
  287. q = self.queue_class()
  288. q.put_nowait(0)
  289. join = q.join()
  290. self.assertFalse(join.done())
  291. yield q.get()
  292. self.assertFalse(join.done())
  293. yield gen.moment
  294. self.assertFalse(join.done())
  295. q.task_done()
  296. self.assertTrue(join.done())
  297. @gen_test
  298. def test_join_empty_queue(self):
  299. q = self.queue_class()
  300. yield q.join()
  301. yield q.join()
  302. @gen_test
  303. def test_join_timeout(self):
  304. q = self.queue_class()
  305. q.put(0)
  306. with self.assertRaises(TimeoutError):
  307. yield q.join(timeout=timedelta(seconds=0.01))
  308. class PriorityQueueJoinTest(QueueJoinTest):
  309. queue_class = queues.PriorityQueue
  310. @gen_test
  311. def test_order(self):
  312. q = self.queue_class(maxsize=2)
  313. q.put_nowait((1, 'a'))
  314. q.put_nowait((0, 'b'))
  315. self.assertTrue(q.full())
  316. q.put((3, 'c'))
  317. q.put((2, 'd'))
  318. self.assertEqual((0, 'b'), q.get_nowait())
  319. self.assertEqual((1, 'a'), (yield q.get()))
  320. self.assertEqual((2, 'd'), q.get_nowait())
  321. self.assertEqual((3, 'c'), (yield q.get()))
  322. self.assertTrue(q.empty())
  323. class LifoQueueJoinTest(QueueJoinTest):
  324. queue_class = queues.LifoQueue
  325. @gen_test
  326. def test_order(self):
  327. q = self.queue_class(maxsize=2)
  328. q.put_nowait(1)
  329. q.put_nowait(0)
  330. self.assertTrue(q.full())
  331. q.put(3)
  332. q.put(2)
  333. self.assertEqual(3, q.get_nowait())
  334. self.assertEqual(2, (yield q.get()))
  335. self.assertEqual(0, q.get_nowait())
  336. self.assertEqual(1, (yield q.get()))
  337. self.assertTrue(q.empty())
  338. class ProducerConsumerTest(AsyncTestCase):
  339. @gen_test
  340. def test_producer_consumer(self):
  341. q = queues.Queue(maxsize=3)
  342. history = []
  343. # We don't yield between get() and task_done(), so get() must wait for
  344. # the next tick. Otherwise we'd immediately call task_done and unblock
  345. # join() before q.put() resumes, and we'd only process the first four
  346. # items.
  347. @gen.coroutine
  348. def consumer():
  349. while True:
  350. history.append((yield q.get()))
  351. q.task_done()
  352. @gen.coroutine
  353. def producer():
  354. for item in range(10):
  355. yield q.put(item)
  356. consumer()
  357. yield producer()
  358. yield q.join()
  359. self.assertEqual(list(range(10)), history)
  360. if __name__ == '__main__':
  361. unittest.main()