test_storage.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import datetime
  2. import itertools
  3. from redis.connection import ConnectionPool
  4. from huey.constants import EmptyData
  5. from huey.storage import RedisHuey
  6. from huey.storage import RedisStorage
  7. from huey.tests.base import b
  8. from huey.tests.base import HueyTestCase
  9. class TestRedisStorage(HueyTestCase):
  10. def test_queues(self):
  11. storage = self.huey.storage
  12. storage.flush_queue()
  13. @self.huey.task()
  14. def test_queues_add(k, v):
  15. return k + v
  16. res = test_queues_add('k', 'v')
  17. self.assertEqual(storage.queue_size(), 1)
  18. task = self.huey.dequeue()
  19. self.huey.execute(task)
  20. self.assertEqual(res.get(), 'kv')
  21. res = test_queues_add('\xce', '\xcf')
  22. task = self.huey.dequeue()
  23. self.huey.execute(task)
  24. self.assertEqual(res.get(), '\xce\xcf')
  25. def test_data_stores(self):
  26. storage = self.huey.storage
  27. storage.put_data('k1', 'v1')
  28. storage.put_data('k2', 'v2')
  29. storage.put_data('k3', 'v3')
  30. self.assertEqual(storage.peek_data('k2'), b('v2'))
  31. self.assertEqual(storage.pop_data('k2'), b('v2'))
  32. self.assertEqual(storage.peek_data('k2'), EmptyData)
  33. self.assertEqual(storage.pop_data('k2'), EmptyData)
  34. self.assertEqual(storage.peek_data('k3'), b('v3'))
  35. storage.put_data('k3', 'v3-2')
  36. self.assertEqual(storage.peek_data('k3'), b('v3-2'))
  37. def test_schedules(self):
  38. storage = self.huey.storage
  39. dt1 = datetime.datetime(2013, 1, 1, 0, 0)
  40. dt2 = datetime.datetime(2013, 1, 2, 0, 0)
  41. dt3 = datetime.datetime(2013, 1, 3, 0, 0)
  42. dt4 = datetime.datetime(2013, 1, 4, 0, 0)
  43. # Add to schedule out-of-order to ensure sorting is performed by
  44. # the schedule.
  45. storage.add_to_schedule('s2', dt2)
  46. storage.add_to_schedule('s1', dt1)
  47. storage.add_to_schedule('s4', dt4)
  48. storage.add_to_schedule('s3', dt3)
  49. # Ensure that asking for a timestamp previous to any item in the
  50. # schedule returns empty list.
  51. self.assertEqual(
  52. storage.read_schedule(dt1 - datetime.timedelta(days=1)),
  53. [])
  54. # Ensure the upper boundary is inclusive of whatever timestamp
  55. # is passed in.
  56. self.assertEqual(
  57. storage.read_schedule(dt3),
  58. [b('s1'), b('s2'), b('s3')])
  59. self.assertEqual(storage.read_schedule(dt3), [])
  60. # Ensure the schedule is flushed and an empty schedule returns an
  61. # empty list.
  62. self.assertEqual(storage.read_schedule(dt4), [b('s4')])
  63. self.assertEqual(storage.read_schedule(dt4), [])
  64. def test_events(self):
  65. storage = self.huey.storage
  66. ps = storage.listener()
  67. messages = ['a', 'b', 'c']
  68. for message in messages:
  69. storage.emit(message)
  70. g = ps.listen()
  71. next(g)
  72. self.assertEqual(next(g)['data'], b('a'))
  73. self.assertEqual(next(g)['data'], b('b'))
  74. self.assertEqual(next(g)['data'], b('c'))
  75. def test_event_iterator(self):
  76. i = iter(self.huey.storage)
  77. self.huey.storage.emit('"a"')
  78. self.huey.storage.emit('"b"')
  79. res = next(i)
  80. self.assertEqual(res, 'a')
  81. res = next(i)
  82. self.assertEqual(res, 'b')
  83. def test_conflicting_init_args(self):
  84. options = {
  85. 'host': 'localhost',
  86. 'url': 'redis://localhost',
  87. 'connection_pool': ConnectionPool()
  88. }
  89. combinations = itertools.combinations(options.items(), 2)
  90. for kwargs in (dict(item) for item in combinations):
  91. self.assertRaises(ValueError, lambda: RedisStorage(**kwargs))
  92. def test_init_with_url(self):
  93. s = RedisStorage(url='redis://example.org:1234')
  94. args = s.pool.connection_kwargs
  95. self.assertEqual(args['host'], 'example.org')
  96. self.assertEqual(args['port'], 1234)
  97. def test_init_with_kwargs(self):
  98. s = RedisStorage(host='example.org', port=1234)
  99. args = s.pool.connection_kwargs
  100. self.assertEqual(args['host'], 'example.org')
  101. self.assertEqual(args['port'], 1234)
  102. def test_init_huey(self):
  103. huey = RedisHuey(url='redis://example.org:31337/?db=7')
  104. conn = huey.storage.pool.connection_kwargs
  105. self.assertEqual(conn['host'], 'example.org')
  106. self.assertEqual(conn['port'], 31337)
  107. self.assertEqual(conn['db'], 7)