test_pqueue.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import os
  2. from queuelib.pqueue import PriorityQueue
  3. from queuelib.queue import (
  4. FifoMemoryQueue, LifoMemoryQueue, FifoDiskQueue, LifoDiskQueue,
  5. FifoSQLiteQueue, LifoSQLiteQueue,
  6. )
  7. from queuelib.tests import QueuelibTestCase
  8. def track_closed(cls):
  9. """Wraps a queue class to track down if close() method was called"""
  10. class TrackingClosed(cls):
  11. def __init__(self, *a, **kw):
  12. super(TrackingClosed, self).__init__(*a, **kw)
  13. self.closed = False
  14. def close(self):
  15. super(TrackingClosed, self).close()
  16. self.closed = True
  17. return TrackingClosed
  18. # hack to prevent py.test from discovering base test class
  19. class base:
  20. class PQueueTestBase(QueuelibTestCase):
  21. def setUp(self):
  22. QueuelibTestCase.setUp(self)
  23. self.q = PriorityQueue(self.qfactory)
  24. def qfactory(self, prio):
  25. raise NotImplementedError
  26. def test_len_nonzero(self):
  27. assert not self.q
  28. self.assertEqual(len(self.q), 0)
  29. self.q.push(b'a', 3)
  30. assert self.q
  31. self.q.push(b'b', 1)
  32. self.q.push(b'c', 2)
  33. self.q.push(b'd', 1)
  34. self.assertEqual(len(self.q), 4)
  35. self.q.pop()
  36. self.q.pop()
  37. self.q.pop()
  38. self.q.pop()
  39. assert not self.q
  40. self.assertEqual(len(self.q), 0)
  41. def test_close(self):
  42. self.q.push(b'a', 3)
  43. self.q.push(b'b', 1)
  44. self.q.push(b'c', 2)
  45. self.q.push(b'd', 1)
  46. iqueues = self.q.queues.values()
  47. self.assertEqual(sorted(self.q.close()), [1, 2, 3])
  48. assert all(q.closed for q in iqueues)
  49. def test_close_return_active(self):
  50. self.q.push(b'b', 1)
  51. self.q.push(b'c', 2)
  52. self.q.push(b'a', 3)
  53. self.q.pop()
  54. self.assertEqual(sorted(self.q.close()), [2, 3])
  55. def test_popped_internal_queues_closed(self):
  56. self.q.push(b'a', 3)
  57. self.q.push(b'b', 1)
  58. self.q.push(b'c', 2)
  59. p1queue = self.q.queues[1]
  60. self.assertEqual(self.q.pop(), b'b')
  61. self.q.close()
  62. assert p1queue.closed
  63. class FifoTestMixin(object):
  64. def test_push_pop_noprio(self):
  65. self.q.push(b'a')
  66. self.q.push(b'b')
  67. self.q.push(b'c')
  68. self.assertEqual(self.q.pop(), b'a')
  69. self.assertEqual(self.q.pop(), b'b')
  70. self.assertEqual(self.q.pop(), b'c')
  71. self.assertEqual(self.q.pop(), None)
  72. def test_push_pop_prio(self):
  73. self.q.push(b'a', 3)
  74. self.q.push(b'b', 1)
  75. self.q.push(b'c', 2)
  76. self.q.push(b'd', 1)
  77. self.assertEqual(self.q.pop(), b'b')
  78. self.assertEqual(self.q.pop(), b'd')
  79. self.assertEqual(self.q.pop(), b'c')
  80. self.assertEqual(self.q.pop(), b'a')
  81. self.assertEqual(self.q.pop(), None)
  82. class LifoTestMixin(object):
  83. def test_push_pop_noprio(self):
  84. self.q.push(b'a')
  85. self.q.push(b'b')
  86. self.q.push(b'c')
  87. self.assertEqual(self.q.pop(), b'c')
  88. self.assertEqual(self.q.pop(), b'b')
  89. self.assertEqual(self.q.pop(), b'a')
  90. self.assertEqual(self.q.pop(), None)
  91. def test_push_pop_prio(self):
  92. self.q.push(b'a', 3)
  93. self.q.push(b'b', 1)
  94. self.q.push(b'c', 2)
  95. self.q.push(b'd', 1)
  96. self.assertEqual(self.q.pop(), b'd')
  97. self.assertEqual(self.q.pop(), b'b')
  98. self.assertEqual(self.q.pop(), b'c')
  99. self.assertEqual(self.q.pop(), b'a')
  100. self.assertEqual(self.q.pop(), None)
  101. class FifoMemoryPriorityQueueTest(FifoTestMixin, base.PQueueTestBase):
  102. def qfactory(self, prio):
  103. return track_closed(FifoMemoryQueue)()
  104. class LifoMemoryPriorityQueueTest(LifoTestMixin, base.PQueueTestBase):
  105. def qfactory(self, prio):
  106. return track_closed(LifoMemoryQueue)()
  107. class DiskTestMixin(object):
  108. def test_nonserializable_object_one(self):
  109. self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
  110. self.assertEqual(self.q.close(), [])
  111. def test_nonserializable_object_many_close(self):
  112. self.q.push(b'a', 3)
  113. self.q.push(b'b', 1)
  114. self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
  115. self.q.push(b'c', 2)
  116. self.assertEqual(self.q.pop(), b'b')
  117. self.assertEqual(sorted(self.q.close()), [2, 3])
  118. def test_nonserializable_object_many_pop(self):
  119. self.q.push(b'a', 3)
  120. self.q.push(b'b', 1)
  121. self.assertRaises(TypeError, self.q.push, lambda x: x, 0)
  122. self.q.push(b'c', 2)
  123. self.assertEqual(self.q.pop(), b'b')
  124. self.assertEqual(self.q.pop(), b'c')
  125. self.assertEqual(self.q.pop(), b'a')
  126. self.assertEqual(self.q.pop(), None)
  127. self.assertEqual(self.q.close(), [])
  128. def test_reopen_with_prio(self):
  129. q1 = PriorityQueue(self.qfactory)
  130. q1.push(b'a', 3)
  131. q1.push(b'b', 1)
  132. q1.push(b'c', 2)
  133. active = q1.close()
  134. q2 = PriorityQueue(self.qfactory, startprios=active)
  135. self.assertEqual(q2.pop(), b'b')
  136. self.assertEqual(q2.pop(), b'c')
  137. self.assertEqual(q2.pop(), b'a')
  138. self.assertEqual(q2.close(), [])
  139. class FifoDiskPriorityQueueTest(FifoTestMixin, DiskTestMixin, base.PQueueTestBase):
  140. def qfactory(self, prio):
  141. path = os.path.join(self.qdir, str(prio))
  142. return track_closed(FifoDiskQueue)(path)
  143. class LifoDiskPriorityQueueTest(LifoTestMixin, DiskTestMixin, base.PQueueTestBase):
  144. def qfactory(self, prio):
  145. path = os.path.join(self.qdir, str(prio))
  146. return track_closed(LifoDiskQueue)(path)
  147. class FifoSQLitePriorityQueueTest(FifoTestMixin, DiskTestMixin, base.PQueueTestBase):
  148. def qfactory(self, prio):
  149. path = os.path.join(self.qdir, str(prio))
  150. return track_closed(FifoSQLiteQueue)(path)
  151. class LifoSQLitePriorityQueueTest(LifoTestMixin, DiskTestMixin, base.PQueueTestBase):
  152. def qfactory(self, prio):
  153. path = os.path.join(self.qdir, str(prio))
  154. return track_closed(LifoSQLiteQueue)(path)