123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- from __future__ import absolute_import, division, print_function
- from datetime import timedelta
- from random import random
- from tornado import gen, queues
- from tornado.gen import TimeoutError
- from tornado.testing import gen_test, AsyncTestCase
- from tornado.test.util import unittest, skipBefore35, exec_test
- class QueueBasicTest(AsyncTestCase):
- def test_repr_and_str(self):
- q = queues.Queue(maxsize=1)
- self.assertIn(hex(id(q)), repr(q))
- self.assertNotIn(hex(id(q)), str(q))
- q.get()
- for q_str in repr(q), str(q):
- self.assertTrue(q_str.startswith('<Queue'))
- self.assertIn('maxsize=1', q_str)
- self.assertIn('getters[1]', q_str)
- self.assertNotIn('putters', q_str)
- self.assertNotIn('tasks', q_str)
- q.put(None)
- q.put(None)
- # Now the queue is full, this putter blocks.
- q.put(None)
- for q_str in repr(q), str(q):
- self.assertNotIn('getters', q_str)
- self.assertIn('putters[1]', q_str)
- self.assertIn('tasks=2', q_str)
- def test_order(self):
- q = queues.Queue()
- for i in [1, 3, 2]:
- q.put_nowait(i)
- items = [q.get_nowait() for _ in range(3)]
- self.assertEqual([1, 3, 2], items)
- @gen_test
- def test_maxsize(self):
- self.assertRaises(TypeError, queues.Queue, maxsize=None)
- self.assertRaises(ValueError, queues.Queue, maxsize=-1)
- q = queues.Queue(maxsize=2)
- self.assertTrue(q.empty())
- self.assertFalse(q.full())
- self.assertEqual(2, q.maxsize)
- self.assertTrue(q.put(0).done())
- self.assertTrue(q.put(1).done())
- self.assertFalse(q.empty())
- self.assertTrue(q.full())
- put2 = q.put(2)
- self.assertFalse(put2.done())
- self.assertEqual(0, (yield q.get())) # Make room.
- self.assertTrue(put2.done())
- self.assertFalse(q.empty())
- self.assertTrue(q.full())
- class QueueGetTest(AsyncTestCase):
- @gen_test
- def test_blocking_get(self):
- q = queues.Queue()
- q.put_nowait(0)
- self.assertEqual(0, (yield q.get()))
- def test_nonblocking_get(self):
- q = queues.Queue()
- q.put_nowait(0)
- self.assertEqual(0, q.get_nowait())
- def test_nonblocking_get_exception(self):
- q = queues.Queue()
- self.assertRaises(queues.QueueEmpty, q.get_nowait)
- @gen_test
- def test_get_with_putters(self):
- q = queues.Queue(1)
- q.put_nowait(0)
- put = q.put(1)
- self.assertEqual(0, (yield q.get()))
- self.assertIsNone((yield put))
- @gen_test
- def test_blocking_get_wait(self):
- q = queues.Queue()
- q.put(0)
- self.io_loop.call_later(0.01, q.put, 1)
- self.io_loop.call_later(0.02, q.put, 2)
- self.assertEqual(0, (yield q.get(timeout=timedelta(seconds=1))))
- self.assertEqual(1, (yield q.get(timeout=timedelta(seconds=1))))
- @gen_test
- def test_get_timeout(self):
- q = queues.Queue()
- get_timeout = q.get(timeout=timedelta(seconds=0.01))
- get = q.get()
- with self.assertRaises(TimeoutError):
- yield get_timeout
- q.put_nowait(0)
- self.assertEqual(0, (yield get))
- @gen_test
- def test_get_timeout_preempted(self):
- q = queues.Queue()
- get = q.get(timeout=timedelta(seconds=0.01))
- q.put(0)
- yield gen.sleep(0.02)
- self.assertEqual(0, (yield get))
- @gen_test
- def test_get_clears_timed_out_putters(self):
- q = queues.Queue(1)
- # First putter succeeds, remainder block.
- putters = [q.put(i, timedelta(seconds=0.01)) for i in range(10)]
- put = q.put(10)
- self.assertEqual(10, len(q._putters))
- yield gen.sleep(0.02)
- self.assertEqual(10, len(q._putters))
- self.assertFalse(put.done()) # Final waiter is still active.
- q.put(11)
- self.assertEqual(0, (yield q.get())) # get() clears the waiters.
- self.assertEqual(1, len(q._putters))
- for putter in putters[1:]:
- self.assertRaises(TimeoutError, putter.result)
- @gen_test
- def test_get_clears_timed_out_getters(self):
- q = queues.Queue()
- getters = [q.get(timedelta(seconds=0.01)) for _ in range(10)]
- get = q.get()
- self.assertEqual(11, len(q._getters))
- yield gen.sleep(0.02)
- self.assertEqual(11, len(q._getters))
- self.assertFalse(get.done()) # Final waiter is still active.
- q.get() # get() clears the waiters.
- self.assertEqual(2, len(q._getters))
- for getter in getters:
- self.assertRaises(TimeoutError, getter.result)
- @skipBefore35
- @gen_test
- def test_async_for(self):
- q = queues.Queue()
- for i in range(5):
- q.put(i)
- namespace = exec_test(globals(), locals(), """
- async def f():
- results = []
- async for i in q:
- results.append(i)
- if i == 4:
- return results
- """)
- results = yield namespace['f']()
- self.assertEqual(results, list(range(5)))
- class QueuePutTest(AsyncTestCase):
- @gen_test
- def test_blocking_put(self):
- q = queues.Queue()
- q.put(0)
- self.assertEqual(0, q.get_nowait())
- def test_nonblocking_put_exception(self):
- q = queues.Queue(1)
- q.put(0)
- self.assertRaises(queues.QueueFull, q.put_nowait, 1)
- @gen_test
- def test_put_with_getters(self):
- q = queues.Queue()
- get0 = q.get()
- get1 = q.get()
- yield q.put(0)
- self.assertEqual(0, (yield get0))
- yield q.put(1)
- self.assertEqual(1, (yield get1))
- @gen_test
- def test_nonblocking_put_with_getters(self):
- q = queues.Queue()
- get0 = q.get()
- get1 = q.get()
- q.put_nowait(0)
- # put_nowait does *not* immediately unblock getters.
- yield gen.moment
- self.assertEqual(0, (yield get0))
- q.put_nowait(1)
- yield gen.moment
- self.assertEqual(1, (yield get1))
- @gen_test
- def test_blocking_put_wait(self):
- q = queues.Queue(1)
- q.put_nowait(0)
- self.io_loop.call_later(0.01, q.get)
- self.io_loop.call_later(0.02, q.get)
- futures = [q.put(0), q.put(1)]
- self.assertFalse(any(f.done() for f in futures))
- yield futures
- @gen_test
- def test_put_timeout(self):
- q = queues.Queue(1)
- q.put_nowait(0) # Now it's full.
- put_timeout = q.put(1, timeout=timedelta(seconds=0.01))
- put = q.put(2)
- with self.assertRaises(TimeoutError):
- yield put_timeout
- self.assertEqual(0, q.get_nowait())
- # 1 was never put in the queue.
- self.assertEqual(2, (yield q.get()))
- # Final get() unblocked this putter.
- yield put
- @gen_test
- def test_put_timeout_preempted(self):
- q = queues.Queue(1)
- q.put_nowait(0)
- put = q.put(1, timeout=timedelta(seconds=0.01))
- q.get()
- yield gen.sleep(0.02)
- yield put # No TimeoutError.
- @gen_test
- def test_put_clears_timed_out_putters(self):
- q = queues.Queue(1)
- # First putter succeeds, remainder block.
- putters = [q.put(i, timedelta(seconds=0.01)) for i in range(10)]
- put = q.put(10)
- self.assertEqual(10, len(q._putters))
- yield gen.sleep(0.02)
- self.assertEqual(10, len(q._putters))
- self.assertFalse(put.done()) # Final waiter is still active.
- q.put(11) # put() clears the waiters.
- self.assertEqual(2, len(q._putters))
- for putter in putters[1:]:
- self.assertRaises(TimeoutError, putter.result)
- @gen_test
- def test_put_clears_timed_out_getters(self):
- q = queues.Queue()
- getters = [q.get(timedelta(seconds=0.01)) for _ in range(10)]
- get = q.get()
- q.get()
- self.assertEqual(12, len(q._getters))
- yield gen.sleep(0.02)
- self.assertEqual(12, len(q._getters))
- self.assertFalse(get.done()) # Final waiters still active.
- q.put(0) # put() clears the waiters.
- self.assertEqual(1, len(q._getters))
- self.assertEqual(0, (yield get))
- for getter in getters:
- self.assertRaises(TimeoutError, getter.result)
- @gen_test
- def test_float_maxsize(self):
- # Non-int maxsize must round down: http://bugs.python.org/issue21723
- q = queues.Queue(maxsize=1.3)
- self.assertTrue(q.empty())
- self.assertFalse(q.full())
- q.put_nowait(0)
- q.put_nowait(1)
- self.assertFalse(q.empty())
- self.assertTrue(q.full())
- self.assertRaises(queues.QueueFull, q.put_nowait, 2)
- self.assertEqual(0, q.get_nowait())
- self.assertFalse(q.empty())
- self.assertFalse(q.full())
- yield q.put(2)
- put = q.put(3)
- self.assertFalse(put.done())
- self.assertEqual(1, (yield q.get()))
- yield put
- self.assertTrue(q.full())
- class QueueJoinTest(AsyncTestCase):
- queue_class = queues.Queue
- def test_task_done_underflow(self):
- q = self.queue_class()
- self.assertRaises(ValueError, q.task_done)
- @gen_test
- def test_task_done(self):
- q = self.queue_class()
- for i in range(100):
- q.put_nowait(i)
- self.accumulator = 0
- @gen.coroutine
- def worker():
- while True:
- item = yield q.get()
- self.accumulator += item
- q.task_done()
- yield gen.sleep(random() * 0.01)
- # Two coroutines share work.
- worker()
- worker()
- yield q.join()
- self.assertEqual(sum(range(100)), self.accumulator)
- @gen_test
- def test_task_done_delay(self):
- # Verify it is task_done(), not get(), that unblocks join().
- q = self.queue_class()
- q.put_nowait(0)
- join = q.join()
- self.assertFalse(join.done())
- yield q.get()
- self.assertFalse(join.done())
- yield gen.moment
- self.assertFalse(join.done())
- q.task_done()
- self.assertTrue(join.done())
- @gen_test
- def test_join_empty_queue(self):
- q = self.queue_class()
- yield q.join()
- yield q.join()
- @gen_test
- def test_join_timeout(self):
- q = self.queue_class()
- q.put(0)
- with self.assertRaises(TimeoutError):
- yield q.join(timeout=timedelta(seconds=0.01))
- class PriorityQueueJoinTest(QueueJoinTest):
- queue_class = queues.PriorityQueue
- @gen_test
- def test_order(self):
- q = self.queue_class(maxsize=2)
- q.put_nowait((1, 'a'))
- q.put_nowait((0, 'b'))
- self.assertTrue(q.full())
- q.put((3, 'c'))
- q.put((2, 'd'))
- self.assertEqual((0, 'b'), q.get_nowait())
- self.assertEqual((1, 'a'), (yield q.get()))
- self.assertEqual((2, 'd'), q.get_nowait())
- self.assertEqual((3, 'c'), (yield q.get()))
- self.assertTrue(q.empty())
- class LifoQueueJoinTest(QueueJoinTest):
- queue_class = queues.LifoQueue
- @gen_test
- def test_order(self):
- q = self.queue_class(maxsize=2)
- q.put_nowait(1)
- q.put_nowait(0)
- self.assertTrue(q.full())
- q.put(3)
- q.put(2)
- self.assertEqual(3, q.get_nowait())
- self.assertEqual(2, (yield q.get()))
- self.assertEqual(0, q.get_nowait())
- self.assertEqual(1, (yield q.get()))
- self.assertTrue(q.empty())
- class ProducerConsumerTest(AsyncTestCase):
- @gen_test
- def test_producer_consumer(self):
- q = queues.Queue(maxsize=3)
- history = []
- # We don't yield between get() and task_done(), so get() must wait for
- # the next tick. Otherwise we'd immediately call task_done and unblock
- # join() before q.put() resumes, and we'd only process the first four
- # items.
- @gen.coroutine
- def consumer():
- while True:
- history.append((yield q.get()))
- q.task_done()
- @gen.coroutine
- def producer():
- for item in range(10):
- yield q.put(item)
- consumer()
- yield producer()
- yield q.join()
- self.assertEqual(list(range(10)), history)
- if __name__ == '__main__':
- unittest.main()
|