test_builtins.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. from __future__ import absolute_import
  2. from mock import Mock, patch
  3. from celery import group, chord
  4. from celery.app import builtins
  5. from celery.canvas import Signature
  6. from celery.five import range
  7. from celery._state import _task_stack
  8. from celery.tests.case import AppCase
  9. class BuiltinsCase(AppCase):
  10. def setup(self):
  11. @self.app.task(shared=False)
  12. def xsum(x):
  13. return sum(x)
  14. self.xsum = xsum
  15. @self.app.task(shared=False)
  16. def add(x, y):
  17. return x + y
  18. self.add = add
  19. class test_backend_cleanup(BuiltinsCase):
  20. def test_run(self):
  21. self.app.backend.cleanup = Mock()
  22. self.app.backend.cleanup.__name__ = 'cleanup'
  23. cleanup_task = builtins.add_backend_cleanup_task(self.app)
  24. cleanup_task()
  25. self.assertTrue(self.app.backend.cleanup.called)
  26. class test_map(BuiltinsCase):
  27. def test_run(self):
  28. @self.app.task(shared=False)
  29. def map_mul(x):
  30. return x[0] * x[1]
  31. res = self.app.tasks['celery.map'](
  32. map_mul, [(2, 2), (4, 4), (8, 8)],
  33. )
  34. self.assertEqual(res, [4, 16, 64])
  35. class test_starmap(BuiltinsCase):
  36. def test_run(self):
  37. @self.app.task(shared=False)
  38. def smap_mul(x, y):
  39. return x * y
  40. res = self.app.tasks['celery.starmap'](
  41. smap_mul, [(2, 2), (4, 4), (8, 8)],
  42. )
  43. self.assertEqual(res, [4, 16, 64])
  44. class test_chunks(BuiltinsCase):
  45. @patch('celery.canvas.chunks.apply_chunks')
  46. def test_run(self, apply_chunks):
  47. @self.app.task(shared=False)
  48. def chunks_mul(l):
  49. return l
  50. self.app.tasks['celery.chunks'](
  51. chunks_mul, [(2, 2), (4, 4), (8, 8)], 1,
  52. )
  53. self.assertTrue(apply_chunks.called)
  54. class test_group(BuiltinsCase):
  55. def setup(self):
  56. self.task = builtins.add_group_task(self.app)()
  57. super(test_group, self).setup()
  58. def test_apply_async_eager(self):
  59. self.task.apply = Mock()
  60. self.app.conf.CELERY_ALWAYS_EAGER = True
  61. self.task.apply_async()
  62. self.assertTrue(self.task.apply.called)
  63. def test_apply(self):
  64. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  65. x.name = self.task.name
  66. res = x.apply()
  67. self.assertEqual(res.get(), [8, 16])
  68. def test_apply_async(self):
  69. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  70. x.apply_async()
  71. def test_apply_empty(self):
  72. x = group(app=self.app)
  73. x.apply()
  74. res = x.apply_async()
  75. self.assertFalse(res)
  76. self.assertFalse(res.results)
  77. def test_apply_async_with_parent(self):
  78. _task_stack.push(self.add)
  79. try:
  80. self.add.push_request(called_directly=False)
  81. try:
  82. assert not self.add.request.children
  83. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  84. res = x()
  85. self.assertTrue(self.add.request.children)
  86. self.assertIn(res, self.add.request.children)
  87. self.assertEqual(len(self.add.request.children), 1)
  88. finally:
  89. self.add.pop_request()
  90. finally:
  91. _task_stack.pop()
  92. class test_chain(BuiltinsCase):
  93. def setup(self):
  94. BuiltinsCase.setup(self)
  95. self.task = builtins.add_chain_task(self.app)()
  96. def test_apply_async(self):
  97. c = self.add.s(2, 2) | self.add.s(4) | self.add.s(8)
  98. result = c.apply_async()
  99. self.assertTrue(result.parent)
  100. self.assertTrue(result.parent.parent)
  101. self.assertIsNone(result.parent.parent.parent)
  102. def test_group_to_chord(self):
  103. c = (
  104. group(self.add.s(i, i) for i in range(5)) |
  105. self.add.s(10) |
  106. self.add.s(20) |
  107. self.add.s(30)
  108. )
  109. tasks, _ = c.type.prepare_steps((), c.tasks)
  110. self.assertIsInstance(tasks[0], chord)
  111. self.assertTrue(tasks[0].body.options['link'])
  112. self.assertTrue(tasks[0].body.options['link'][0].options['link'])
  113. c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
  114. tasks2, _ = c2.type.prepare_steps((), c2.tasks)
  115. self.assertIsInstance(tasks2[1], group)
  116. def test_apply_options(self):
  117. class static(Signature):
  118. def clone(self, *args, **kwargs):
  119. return self
  120. def s(*args, **kwargs):
  121. return static(self.add, args, kwargs, type=self.add)
  122. c = s(2, 2) | s(4, 4) | s(8, 8)
  123. r1 = c.apply_async(task_id='some_id')
  124. self.assertEqual(r1.id, 'some_id')
  125. c.apply_async(group_id='some_group_id')
  126. self.assertEqual(c.tasks[-1].options['group_id'], 'some_group_id')
  127. c.apply_async(chord='some_chord_id')
  128. self.assertEqual(c.tasks[-1].options['chord'], 'some_chord_id')
  129. c.apply_async(link=[s(32)])
  130. self.assertListEqual(c.tasks[-1].options['link'], [s(32)])
  131. c.apply_async(link_error=[s('error')])
  132. for task in c.tasks:
  133. self.assertListEqual(task.options['link_error'], [s('error')])
  134. class test_chord(BuiltinsCase):
  135. def setup(self):
  136. self.task = builtins.add_chord_task(self.app)()
  137. super(test_chord, self).setup()
  138. def test_apply_async(self):
  139. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  140. r = x.apply_async()
  141. self.assertTrue(r)
  142. self.assertTrue(r.parent)
  143. def test_run_header_not_group(self):
  144. self.task([self.add.s(i, i) for i in range(10)], self.xsum.s())
  145. def test_forward_options(self):
  146. body = self.xsum.s()
  147. x = chord([self.add.s(i, i) for i in range(10)], body=body)
  148. x._type = Mock()
  149. x._type.app.conf.CELERY_ALWAYS_EAGER = False
  150. x.apply_async(group_id='some_group_id')
  151. self.assertTrue(x._type.called)
  152. resbody = x._type.call_args[0][1]
  153. self.assertEqual(resbody.options['group_id'], 'some_group_id')
  154. x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
  155. x2._type = Mock()
  156. x2._type.app.conf.CELERY_ALWAYS_EAGER = False
  157. x2.apply_async(chord='some_chord_id')
  158. self.assertTrue(x2._type.called)
  159. resbody = x2._type.call_args[0][1]
  160. self.assertEqual(resbody.options['chord'], 'some_chord_id')
  161. def test_apply_eager(self):
  162. self.app.conf.CELERY_ALWAYS_EAGER = True
  163. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  164. r = x.apply_async()
  165. self.assertEqual(r.get(), 90)