test_bootsteps.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. from __future__ import absolute_import
  2. from mock import Mock, patch
  3. from celery import bootsteps
  4. from celery.tests.case import AppCase
  5. class test_StepFormatter(AppCase):
  6. def test_get_prefix(self):
  7. f = bootsteps.StepFormatter()
  8. s = Mock()
  9. s.last = True
  10. self.assertEqual(f._get_prefix(s), f.blueprint_prefix)
  11. s2 = Mock()
  12. s2.last = False
  13. s2.conditional = True
  14. self.assertEqual(f._get_prefix(s2), f.conditional_prefix)
  15. s3 = Mock()
  16. s3.last = s3.conditional = False
  17. self.assertEqual(f._get_prefix(s3), '')
  18. def test_node(self):
  19. f = bootsteps.StepFormatter()
  20. f.draw_node = Mock()
  21. step = Mock()
  22. step.last = False
  23. f.node(step, x=3)
  24. f.draw_node.assert_called_with(step, f.node_scheme, {'x': 3})
  25. step.last = True
  26. f.node(step, x=3)
  27. f.draw_node.assert_called_with(step, f.blueprint_scheme, {'x': 3})
  28. def test_edge(self):
  29. f = bootsteps.StepFormatter()
  30. f.draw_edge = Mock()
  31. a, b = Mock(), Mock()
  32. a.last = True
  33. f.edge(a, b, x=6)
  34. f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
  35. 'x': 6, 'arrowhead': 'none', 'color': 'darkseagreen3',
  36. })
  37. a.last = False
  38. f.edge(a, b, x=6)
  39. f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
  40. 'x': 6,
  41. })
  42. class test_Step(AppCase):
  43. class Def(bootsteps.StartStopStep):
  44. name = 'test_Step.Def'
  45. def setup(self):
  46. self.steps = []
  47. def test_blueprint_name(self, bp='test_blueprint_name'):
  48. class X(bootsteps.Step):
  49. blueprint = bp
  50. name = 'X'
  51. self.assertEqual(X.name, 'X')
  52. class Y(bootsteps.Step):
  53. name = '%s.Y' % bp
  54. self.assertEqual(Y.name, '%s.Y' % bp)
  55. def test_init(self):
  56. self.assertTrue(self.Def(self))
  57. def test_create(self):
  58. self.Def(self).create(self)
  59. def test_include_if(self):
  60. x = self.Def(self)
  61. x.enabled = True
  62. self.assertTrue(x.include_if(self))
  63. x.enabled = False
  64. self.assertFalse(x.include_if(self))
  65. def test_instantiate(self):
  66. self.assertIsInstance(self.Def(self).instantiate(self.Def, self),
  67. self.Def)
  68. def test_include_when_enabled(self):
  69. x = self.Def(self)
  70. x.create = Mock()
  71. x.create.return_value = 'George'
  72. self.assertTrue(x.include(self))
  73. self.assertEqual(x.obj, 'George')
  74. x.create.assert_called_with(self)
  75. def test_include_when_disabled(self):
  76. x = self.Def(self)
  77. x.enabled = False
  78. x.create = Mock()
  79. self.assertFalse(x.include(self))
  80. self.assertFalse(x.create.call_count)
  81. def test_repr(self):
  82. x = self.Def(self)
  83. self.assertTrue(repr(x))
  84. class test_ConsumerStep(AppCase):
  85. def test_interface(self):
  86. step = bootsteps.ConsumerStep(self)
  87. with self.assertRaises(NotImplementedError):
  88. step.get_consumers(self)
  89. def test_start_stop_shutdown(self):
  90. consumer = Mock()
  91. self.connection = Mock()
  92. class Step(bootsteps.ConsumerStep):
  93. def get_consumers(self, c):
  94. return [consumer]
  95. step = Step(self)
  96. self.assertEqual(step.get_consumers(self), [consumer])
  97. step.start(self)
  98. consumer.consume.assert_called_with()
  99. step.stop(self)
  100. consumer.cancel.assert_called_with()
  101. step.shutdown(self)
  102. consumer.channel.close.assert_called_with()
  103. def test_start_no_consumers(self):
  104. self.connection = Mock()
  105. class Step(bootsteps.ConsumerStep):
  106. def get_consumers(self, c):
  107. return ()
  108. step = Step(self)
  109. step.start(self)
  110. class test_StartStopStep(AppCase):
  111. class Def(bootsteps.StartStopStep):
  112. name = 'test_StartStopStep.Def'
  113. def setup(self):
  114. self.steps = []
  115. def test_start__stop(self):
  116. x = self.Def(self)
  117. x.create = Mock()
  118. # include creates the underlying object and sets
  119. # its x.obj attribute to it, as well as appending
  120. # it to the parent.steps list.
  121. x.include(self)
  122. self.assertTrue(self.steps)
  123. self.assertIs(self.steps[0], x)
  124. x.start(self)
  125. x.obj.start.assert_called_with()
  126. x.stop(self)
  127. x.obj.stop.assert_called_with()
  128. x.obj = None
  129. self.assertIsNone(x.start(self))
  130. def test_include_when_disabled(self):
  131. x = self.Def(self)
  132. x.enabled = False
  133. x.include(self)
  134. self.assertFalse(self.steps)
  135. def test_terminate(self):
  136. x = self.Def(self)
  137. x.create = Mock()
  138. x.include(self)
  139. delattr(x.obj, 'terminate')
  140. x.terminate(self)
  141. x.obj.stop.assert_called_with()
  142. class test_Blueprint(AppCase):
  143. class Blueprint(bootsteps.Blueprint):
  144. name = 'test_Blueprint'
  145. def test_steps_added_to_unclaimed(self):
  146. class tnA(bootsteps.Step):
  147. name = 'test_Blueprint.A'
  148. class tnB(bootsteps.Step):
  149. name = 'test_Blueprint.B'
  150. class xxA(bootsteps.Step):
  151. name = 'xx.A'
  152. class Blueprint(self.Blueprint):
  153. default_steps = [tnA, tnB]
  154. blueprint = Blueprint(app=self.app)
  155. self.assertIn(tnA, blueprint._all_steps())
  156. self.assertIn(tnB, blueprint._all_steps())
  157. self.assertNotIn(xxA, blueprint._all_steps())
  158. def test_init(self):
  159. blueprint = self.Blueprint(app=self.app)
  160. self.assertIs(blueprint.app, self.app)
  161. self.assertEqual(blueprint.name, 'test_Blueprint')
  162. def test_close__on_close_is_None(self):
  163. blueprint = self.Blueprint(app=self.app)
  164. blueprint.on_close = None
  165. blueprint.send_all = Mock()
  166. blueprint.close(1)
  167. blueprint.send_all.assert_called_with(
  168. 1, 'close', 'closing', reverse=False,
  169. )
  170. def test_send_all_with_None_steps(self):
  171. parent = Mock()
  172. blueprint = self.Blueprint(app=self.app)
  173. parent.steps = [None, None, None]
  174. blueprint.send_all(parent, 'close', 'Closing', reverse=False)
  175. def test_join_raises_IGNORE_ERRORS(self):
  176. prev, bootsteps.IGNORE_ERRORS = bootsteps.IGNORE_ERRORS, (KeyError, )
  177. try:
  178. blueprint = self.Blueprint(app=self.app)
  179. blueprint.shutdown_complete = Mock()
  180. blueprint.shutdown_complete.wait.side_effect = KeyError('luke')
  181. blueprint.join(timeout=10)
  182. blueprint.shutdown_complete.wait.assert_called_with(timeout=10)
  183. finally:
  184. bootsteps.IGNORE_ERRORS = prev
  185. def test_connect_with(self):
  186. class b1s1(bootsteps.Step):
  187. pass
  188. class b1s2(bootsteps.Step):
  189. last = True
  190. class b2s1(bootsteps.Step):
  191. pass
  192. class b2s2(bootsteps.Step):
  193. last = True
  194. b1 = self.Blueprint([b1s1, b1s2], app=self.app)
  195. b2 = self.Blueprint([b2s1, b2s2], app=self.app)
  196. b1.apply(Mock())
  197. b2.apply(Mock())
  198. b1.connect_with(b2)
  199. self.assertIn(b1s1, b1.graph)
  200. self.assertIn(b2s1, b1.graph)
  201. self.assertIn(b2s2, b1.graph)
  202. self.assertTrue(repr(b1s1))
  203. self.assertTrue(str(b1s1))
  204. def test_topsort_raises_KeyError(self):
  205. class Step(bootsteps.Step):
  206. requires = ('xyxxx.fsdasewe.Unknown', )
  207. b = self.Blueprint([Step], app=self.app)
  208. b.steps = b.claim_steps()
  209. with self.assertRaises(ImportError):
  210. b._finalize_steps(b.steps)
  211. Step.requires = ()
  212. b.steps = b.claim_steps()
  213. b._finalize_steps(b.steps)
  214. with patch('celery.bootsteps.DependencyGraph') as Dep:
  215. g = Dep.return_value = Mock()
  216. g.topsort.side_effect = KeyError('foo')
  217. with self.assertRaises(KeyError):
  218. b._finalize_steps(b.steps)
  219. def test_apply(self):
  220. class MyBlueprint(bootsteps.Blueprint):
  221. name = 'test_apply'
  222. def modules(self):
  223. return ['A', 'B']
  224. class B(bootsteps.Step):
  225. name = 'test_apply.B'
  226. class C(bootsteps.Step):
  227. name = 'test_apply.C'
  228. requires = [B]
  229. class A(bootsteps.Step):
  230. name = 'test_apply.A'
  231. requires = [C]
  232. class D(bootsteps.Step):
  233. name = 'test_apply.D'
  234. last = True
  235. x = MyBlueprint([A, D], app=self.app)
  236. x.apply(self)
  237. self.assertIsInstance(x.order[0], B)
  238. self.assertIsInstance(x.order[1], C)
  239. self.assertIsInstance(x.order[2], A)
  240. self.assertIsInstance(x.order[3], D)
  241. self.assertIn(A, x.types)
  242. self.assertIs(x[A.name], x.order[2])
  243. def test_find_last_but_no_steps(self):
  244. class MyBlueprint(bootsteps.Blueprint):
  245. name = 'qwejwioqjewoqiej'
  246. x = MyBlueprint(app=self.app)
  247. x.apply(self)
  248. self.assertIsNone(x._find_last())