test_queue.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import os
  2. import glob
  3. import pytest
  4. from queuelib.queue import (
  5. FifoMemoryQueue, LifoMemoryQueue, FifoDiskQueue, LifoDiskQueue,
  6. FifoSQLiteQueue, LifoSQLiteQueue,
  7. )
  8. from queuelib.tests import QueuelibTestCase
  9. class BaseQueueTest(object):
  10. def queue(self):
  11. return NotImplementedError()
  12. def test_empty(self):
  13. """Empty queue test"""
  14. q = self.queue()
  15. assert q.pop() is None
  16. def test_single_pushpop(self):
  17. q = self.queue()
  18. q.push(b'a')
  19. assert q.pop() == b'a'
  20. def test_binary_element(self):
  21. elem = (
  22. b'\x80\x02}q\x01(U\x04bodyq\x02U\x00U\t_encodingq\x03U\x05utf-'
  23. b'8q\x04U\x07cookiesq\x05}q\x06U\x04metaq\x07}q\x08U\x07header'
  24. b'sq\t}U\x03urlq\nX\x15\x00\x00\x00file:///tmp/tmphDJYsgU\x0bd'
  25. b'ont_filterq\x0b\x89U\x08priorityq\x0cK\x00U\x08callbackq\rNU'
  26. b'\x06methodq\x0eU\x03GETq\x0fU\x07errbackq\x10Nu.'
  27. )
  28. q = self.queue()
  29. q.push(elem)
  30. assert q.pop() == elem
  31. def test_len(self):
  32. q = self.queue()
  33. self.assertEqual(len(q), 0)
  34. q.push(b'a')
  35. self.assertEqual(len(q), 1)
  36. q.push(b'b')
  37. q.push(b'c')
  38. self.assertEqual(len(q), 3)
  39. q.pop()
  40. q.pop()
  41. q.pop()
  42. self.assertEqual(len(q), 0)
  43. class FifoTestMixin(BaseQueueTest):
  44. def test_push_pop1(self):
  45. """Basic push/pop test"""
  46. q = self.queue()
  47. q.push(b'a')
  48. q.push(b'b')
  49. q.push(b'c')
  50. self.assertEqual(q.pop(), b'a')
  51. self.assertEqual(q.pop(), b'b')
  52. self.assertEqual(q.pop(), b'c')
  53. self.assertEqual(q.pop(), None)
  54. def test_push_pop2(self):
  55. """Test interleaved push and pops"""
  56. q = self.queue()
  57. q.push(b'a')
  58. q.push(b'b')
  59. q.push(b'c')
  60. q.push(b'd')
  61. self.assertEqual(q.pop(), b'a')
  62. self.assertEqual(q.pop(), b'b')
  63. q.push(b'e')
  64. self.assertEqual(q.pop(), b'c')
  65. self.assertEqual(q.pop(), b'd')
  66. self.assertEqual(q.pop(), b'e')
  67. class LifoTestMixin(BaseQueueTest):
  68. def test_push_pop1(self):
  69. """Basic push/pop test"""
  70. q = self.queue()
  71. q.push(b'a')
  72. q.push(b'b')
  73. q.push(b'c')
  74. self.assertEqual(q.pop(), b'c')
  75. self.assertEqual(q.pop(), b'b')
  76. self.assertEqual(q.pop(), b'a')
  77. self.assertEqual(q.pop(), None)
  78. def test_push_pop2(self):
  79. """Test interleaved push and pops"""
  80. q = self.queue()
  81. q.push(b'a')
  82. q.push(b'b')
  83. q.push(b'c')
  84. q.push(b'd')
  85. self.assertEqual(q.pop(), b'd')
  86. self.assertEqual(q.pop(), b'c')
  87. q.push(b'e')
  88. self.assertEqual(q.pop(), b'e')
  89. self.assertEqual(q.pop(), b'b')
  90. self.assertEqual(q.pop(), b'a')
  91. class PersistentTestMixin(object):
  92. chunksize = 100000
  93. @pytest.mark.xfail(reason="Reenable once Scrapy.squeues stop"
  94. " extending from queuelib testsuite")
  95. def test_non_bytes_raises_typeerror(self):
  96. q = self.queue()
  97. self.assertRaises(TypeError, q.push, 0)
  98. self.assertRaises(TypeError, q.push, u'')
  99. self.assertRaises(TypeError, q.push, None)
  100. self.assertRaises(TypeError, q.push, lambda x: x)
  101. def test_text_in_windows(self):
  102. e1 = b'\r\n'
  103. q = self.queue()
  104. q.push(e1)
  105. q.close()
  106. q = self.queue()
  107. e2 = q.pop()
  108. self.assertEqual(e1, e2)
  109. def test_close_open(self):
  110. """Test closing and re-opening keeps state"""
  111. q = self.queue()
  112. q.push(b'a')
  113. q.push(b'b')
  114. q.push(b'c')
  115. q.push(b'd')
  116. q.pop()
  117. q.pop()
  118. q.close()
  119. del q
  120. q = self.queue()
  121. self.assertEqual(len(q), 2)
  122. q.push(b'e')
  123. q.pop()
  124. q.pop()
  125. q.close()
  126. del q
  127. q = self.queue()
  128. assert q.pop() is not None
  129. self.assertEqual(len(q), 0)
  130. def test_cleanup(self):
  131. """Test queue dir is removed if queue is empty"""
  132. q = self.queue()
  133. values = [b'0', b'1', b'2', b'3', b'4']
  134. assert os.path.exists(self.qpath)
  135. for x in values:
  136. q.push(x)
  137. for x in values:
  138. q.pop()
  139. q.close()
  140. assert not os.path.exists(self.qpath)
  141. class FifoMemoryQueueTest(FifoTestMixin, QueuelibTestCase):
  142. def queue(self):
  143. return FifoMemoryQueue()
  144. class LifoMemoryQueueTest(LifoTestMixin, QueuelibTestCase):
  145. def queue(self):
  146. return LifoMemoryQueue()
  147. class FifoDiskQueueTest(FifoTestMixin, PersistentTestMixin, QueuelibTestCase):
  148. def queue(self):
  149. return FifoDiskQueue(self.qpath, chunksize=self.chunksize)
  150. def test_chunks(self):
  151. """Test chunks are created and removed"""
  152. values = [b'0', b'1', b'2', b'3', b'4']
  153. q = self.queue()
  154. for x in values:
  155. q.push(x)
  156. chunks = glob.glob(os.path.join(self.qpath, 'q*'))
  157. self.assertEqual(len(chunks), 5 // self.chunksize + 1)
  158. for x in values:
  159. q.pop()
  160. chunks = glob.glob(os.path.join(self.qpath, 'q*'))
  161. self.assertEqual(len(chunks), 1)
  162. class ChunkSize1FifoDiskQueueTest(FifoDiskQueueTest):
  163. chunksize = 1
  164. class ChunkSize2FifoDiskQueueTest(FifoDiskQueueTest):
  165. chunksize = 2
  166. class ChunkSize3FifoDiskQueueTest(FifoDiskQueueTest):
  167. chunksize = 3
  168. class ChunkSize4FifoDiskQueueTest(FifoDiskQueueTest):
  169. chunksize = 4
  170. class LifoDiskQueueTest(LifoTestMixin, PersistentTestMixin, QueuelibTestCase):
  171. def queue(self):
  172. return LifoDiskQueue(self.qpath)
  173. def test_file_size_shrinks(self):
  174. """Test size of queue file shrinks when popping items"""
  175. q = self.queue()
  176. q.push(b'a')
  177. q.push(b'b')
  178. q.close()
  179. size = os.path.getsize(self.qpath)
  180. q = self.queue()
  181. q.pop()
  182. q.close()
  183. assert os.path.getsize(self.qpath), size
  184. class FifoSQLiteQueueTest(FifoTestMixin, PersistentTestMixin, QueuelibTestCase):
  185. def queue(self):
  186. return FifoSQLiteQueue(self.qpath)
  187. class LifoSQLiteQueueTest(LifoTestMixin, PersistentTestMixin, QueuelibTestCase):
  188. def queue(self):
  189. return LifoSQLiteQueue(self.qpath)