test_chord.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. from __future__ import absolute_import
  2. from contextlib import contextmanager
  3. from celery import group
  4. from celery import canvas
  5. from celery import result
  6. from celery.exceptions import ChordError
  7. from celery.five import range
  8. from celery.result import AsyncResult, GroupResult, EagerResult
  9. from celery.tests.case import AppCase, Mock
  10. passthru = lambda x: x
  11. class ChordCase(AppCase):
  12. def setup(self):
  13. @self.app.task(shared=False)
  14. def add(x, y):
  15. return x + y
  16. self.add = add
  17. class TSR(GroupResult):
  18. is_ready = True
  19. value = None
  20. def ready(self):
  21. return self.is_ready
  22. def join(self, propagate=True, **kwargs):
  23. if propagate:
  24. for value in self.value:
  25. if isinstance(value, Exception):
  26. raise value
  27. return self.value
  28. join_native = join
  29. def _failed_join_report(self):
  30. for value in self.value:
  31. if isinstance(value, Exception):
  32. yield EagerResult('some_id', value, 'FAILURE')
  33. class TSRNoReport(TSR):
  34. def _failed_join_report(self):
  35. return iter([])
  36. @contextmanager
  37. def patch_unlock_retry(app):
  38. unlock = app.tasks['celery.chord_unlock']
  39. retry = Mock()
  40. prev, unlock.retry = unlock.retry, retry
  41. try:
  42. yield unlock, retry
  43. finally:
  44. unlock.retry = prev
  45. class test_unlock_chord_task(ChordCase):
  46. def test_unlock_ready(self):
  47. class AlwaysReady(TSR):
  48. is_ready = True
  49. value = [2, 4, 8, 6]
  50. with self._chord_context(AlwaysReady) as (cb, retry, _):
  51. cb.type.apply_async.assert_called_with(
  52. ([2, 4, 8, 6], ), {}, task_id=cb.id,
  53. )
  54. # did not retry
  55. self.assertFalse(retry.call_count)
  56. def test_callback_fails(self):
  57. class AlwaysReady(TSR):
  58. is_ready = True
  59. value = [2, 4, 8, 6]
  60. def setup(callback):
  61. callback.apply_async.side_effect = IOError()
  62. with self._chord_context(AlwaysReady, setup) as (cb, retry, fail):
  63. self.assertTrue(fail.called)
  64. self.assertEqual(
  65. fail.call_args[0][0], cb.id,
  66. )
  67. self.assertIsInstance(
  68. fail.call_args[1]['exc'], ChordError,
  69. )
  70. def test_unlock_ready_failed(self):
  71. class Failed(TSR):
  72. is_ready = True
  73. value = [2, KeyError('foo'), 8, 6]
  74. with self._chord_context(Failed) as (cb, retry, fail_current):
  75. self.assertFalse(cb.type.apply_async.called)
  76. # did not retry
  77. self.assertFalse(retry.call_count)
  78. self.assertTrue(fail_current.called)
  79. self.assertEqual(
  80. fail_current.call_args[0][0], cb.id,
  81. )
  82. self.assertIsInstance(
  83. fail_current.call_args[1]['exc'], ChordError,
  84. )
  85. self.assertIn('some_id', str(fail_current.call_args[1]['exc']))
  86. def test_unlock_ready_failed_no_culprit(self):
  87. class Failed(TSRNoReport):
  88. is_ready = True
  89. value = [2, KeyError('foo'), 8, 6]
  90. with self._chord_context(Failed) as (cb, retry, fail_current):
  91. self.assertTrue(fail_current.called)
  92. self.assertEqual(
  93. fail_current.call_args[0][0], cb.id,
  94. )
  95. self.assertIsInstance(
  96. fail_current.call_args[1]['exc'], ChordError,
  97. )
  98. @contextmanager
  99. def _chord_context(self, ResultCls, setup=None, **kwargs):
  100. @self.app.task(shared=False)
  101. def callback(*args, **kwargs):
  102. pass
  103. self.app.finalize()
  104. pts, result.GroupResult = result.GroupResult, ResultCls
  105. callback.apply_async = Mock()
  106. callback_s = callback.s()
  107. callback_s.id = 'callback_id'
  108. fail_current = self.app.backend.fail_from_current_stack = Mock()
  109. try:
  110. with patch_unlock_retry(self.app) as (unlock, retry):
  111. subtask, canvas.maybe_signature = (
  112. canvas.maybe_signature, passthru,
  113. )
  114. if setup:
  115. setup(callback)
  116. try:
  117. assert self.app.tasks['celery.chord_unlock'] is unlock
  118. unlock(
  119. 'group_id', callback_s,
  120. result=[self.app.AsyncResult(r) for r in ['1', 2, 3]],
  121. GroupResult=ResultCls, **kwargs
  122. )
  123. finally:
  124. canvas.maybe_signature = subtask
  125. yield callback_s, retry, fail_current
  126. finally:
  127. result.GroupResult = pts
  128. def test_when_not_ready(self):
  129. class NeverReady(TSR):
  130. is_ready = False
  131. with self._chord_context(NeverReady, interval=10, max_retries=30) \
  132. as (cb, retry, _):
  133. self.assertFalse(cb.type.apply_async.called)
  134. # did retry
  135. retry.assert_called_with(countdown=10, max_retries=30)
  136. def test_is_in_registry(self):
  137. self.assertIn('celery.chord_unlock', self.app.tasks)
  138. class test_chord(ChordCase):
  139. def test_eager(self):
  140. from celery import chord
  141. @self.app.task(shared=False)
  142. def addX(x, y):
  143. return x + y
  144. @self.app.task(shared=False)
  145. def sumX(n):
  146. return sum(n)
  147. self.app.conf.CELERY_ALWAYS_EAGER = True
  148. x = chord(addX.s(i, i) for i in range(10))
  149. body = sumX.s()
  150. result = x(body)
  151. self.assertEqual(result.get(), sum(i + i for i in range(10)))
  152. def test_apply(self):
  153. self.app.conf.CELERY_ALWAYS_EAGER = False
  154. from celery import chord
  155. m = Mock()
  156. m.app.conf.CELERY_ALWAYS_EAGER = False
  157. m.AsyncResult = AsyncResult
  158. prev, chord._type = chord._type, m
  159. try:
  160. x = chord(self.add.s(i, i) for i in range(10))
  161. body = self.add.s(2)
  162. result = x(body)
  163. self.assertTrue(result.id)
  164. # does not modify original subtask
  165. with self.assertRaises(KeyError):
  166. body.options['task_id']
  167. self.assertTrue(chord._type.called)
  168. finally:
  169. chord._type = prev
  170. class test_Chord_task(ChordCase):
  171. def test_run(self):
  172. self.app.backend = Mock()
  173. self.app.backend.cleanup = Mock()
  174. self.app.backend.cleanup.__name__ = 'cleanup'
  175. Chord = self.app.tasks['celery.chord']
  176. body = dict()
  177. Chord(group(self.add.subtask((i, i)) for i in range(5)), body)
  178. Chord([self.add.subtask((j, j)) for j in range(5)], body)
  179. self.assertEqual(self.app.backend.on_chord_apply.call_count, 2)