test_pipeline.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. from huey import RedisHuey
  2. from huey.tests.base import BaseTestCase
  3. huey = RedisHuey(blocking=False, max_errors=10)
  4. state = {}
  5. @huey.task()
  6. def fib(a, b=1):
  7. a, b = a + b, a
  8. return (a, b)
  9. @huey.task()
  10. def stateful(v1=None, v2=None, v3=None):
  11. state = {
  12. 'v1': v1 + 1 if v1 is not None else 0,
  13. 'v2': v2 + 2 if v2 is not None else 0,
  14. 'v3': v3 + 3 if v3 is not None else 0}
  15. return state
  16. @huey.task()
  17. def add(a, b):
  18. return a + b
  19. @huey.task()
  20. def mul(a, b):
  21. return a * b
  22. class TestPipeline(BaseTestCase):
  23. def assertPipe(self, pipe, expected):
  24. results = huey.enqueue(pipe)
  25. for _ in range(len(results)):
  26. self.assertEqual(len(huey), 1)
  27. huey.execute(huey.dequeue())
  28. # Pipeline is finished.
  29. self.assertEqual(len(huey), 0)
  30. self.assertEqual([result.get() for result in results], expected)
  31. def test_pipeline_tuple(self):
  32. pipe = fib.s(1).then(fib).then(fib).then(fib)
  33. self.assertPipe(pipe, [(2, 1), (3, 2), (5, 3), (8, 5)])
  34. def test_pipeline_dict(self):
  35. pipe = stateful.s().then(stateful).then(stateful)
  36. self.assertPipe(pipe, [
  37. {'v1': 0, 'v2': 0, 'v3': 0},
  38. {'v1': 1, 'v2': 2, 'v3': 3},
  39. {'v1': 2, 'v2': 4, 'v3': 6}])
  40. def test_partial(self):
  41. pipe = add.s(1, 2).then(add, 3).then(add, 4).then(add, 5)
  42. self.assertPipe(pipe, [3, 6, 10, 15])
  43. def test_mixed(self):
  44. pipe = add.s(1, 2).then(mul, 4).then(add, -5).then(mul, 3).then(add, 8)
  45. self.assertPipe(pipe, [3, 12, 7, 21, 29])