test_sqlite.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import datetime
  2. import threading
  3. from huey.constants import EmptyData
  4. from huey.consumer import Consumer
  5. from huey.contrib.sqlitedb import SqliteHuey
  6. from huey.contrib.sqlitedb import SqliteStorage
  7. from huey.tests.base import CaptureLogs
  8. from huey.tests.base import HueyTestCase
  9. sqlite_huey = SqliteHuey('/tmp/sqlite-huey.db')
  10. class TestSqliteStorage(HueyTestCase):
  11. def get_huey(self):
  12. return sqlite_huey
  13. def test_enqueue_dequeue_results(self):
  14. @self.huey.task()
  15. def test_queues_add(k, v):
  16. return k + v
  17. db = self.huey.storage
  18. self.assertTrue(isinstance(db, SqliteStorage))
  19. res = test_queues_add(3, 4)
  20. self.assertEqual(db.queue_size(), 1)
  21. task = self.huey.dequeue()
  22. self.huey.execute(task)
  23. self.assertEqual(db.result_store_size(), 1)
  24. self.assertEqual(res.get(), 7)
  25. self.assertEqual(db.queue_size(), 0)
  26. self.assertEqual(db.result_store_size(), 0)
  27. def test_put_if_empty(self):
  28. storage = self.huey.storage
  29. self.assertTrue(storage.put_if_empty('k1', '1'))
  30. self.assertFalse(storage.put_if_empty('k1', '2'))
  31. self.assertEqual(storage.pop_data('k1'), '1')
  32. self.assertTrue(storage.put_if_empty('k1', '3'))
  33. self.assertTrue(storage.put_if_empty('k2', '4'))
  34. self.assertFalse(storage.put_if_empty('k1', 'x'))
  35. self.assertEqual(storage.pop_data('k1'), '3')
  36. self.assertEqual(storage.pop_data('k2'), '4')
  37. def test_schedule(self):
  38. dt1 = datetime.datetime(2013, 1, 1, 0, 0)
  39. dt2 = datetime.datetime(2013, 1, 2, 0, 0)
  40. dt3 = datetime.datetime(2013, 1, 3, 0, 0)
  41. @self.huey.task()
  42. def test_task(k, v):
  43. return k + v
  44. test_task.schedule((1, 2), eta=dt1, convert_utc=False)
  45. test_task.schedule((3, 4), eta=dt3, convert_utc=False)
  46. test_task.schedule((2, 3), eta=dt2, convert_utc=False)
  47. self.assertEqual(len(self.huey), 3)
  48. for i in range(3):
  49. self.huey.add_schedule(self.huey.dequeue())
  50. tasks = self.huey.scheduled()
  51. self.assertEqual(len(tasks), 3)
  52. c1, c2, c3 = tasks
  53. self.assertEqual(c1.data, ((1, 2), {}))
  54. self.assertEqual(c2.data, ((2, 3), {}))
  55. self.assertEqual(c3.data, ((3, 4), {}))
  56. storage = self.huey.storage
  57. self.assertEqual(len(storage.read_schedule(dt2)), 2)
  58. self.assertEqual(len(storage.read_schedule(dt2)), 0)
  59. self.assertEqual(len(storage.read_schedule(dt3)), 1)
  60. self.assertEqual(len(storage.read_schedule(dt3)), 0)
  61. def test_consumer_integration(self):
  62. lock = threading.Lock()
  63. @self.huey.task()
  64. def add_values(a, b):
  65. return a + b
  66. consumer = Consumer(self.huey, max_delay=0.1, workers=2,
  67. worker_type='thread', health_check_interval=0.01)
  68. with CaptureLogs() as capture:
  69. consumer.start()
  70. try:
  71. r1 = add_values(1, 2)
  72. r2 = add_values(2, 3)
  73. r3 = add_values(3, 5)
  74. self.assertEqual(r1.get(blocking=True, timeout=3), 3)
  75. self.assertEqual(r2.get(blocking=True, timeout=3), 5)
  76. self.assertEqual(r3.get(blocking=True, timeout=3), 8)
  77. finally:
  78. consumer.stop()
  79. for _, worker in consumer.worker_threads:
  80. worker.join()
  81. executing = 0
  82. executed = 0
  83. for message in capture.messages[-7:-1]:
  84. if message.startswith('Executing huey.tests.test_'):
  85. executing += 1
  86. elif message.startswith('Executed huey.tests.test_'):
  87. executed += 1
  88. self.assertEqual(executing, 3)
  89. self.assertEqual(executed, 3)
  90. self.assertTrue(capture.messages[-1].startswith('Shutting down'))