123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- from __future__ import absolute_import
- from mock import Mock, patch
- from celery import bootsteps
- from celery.tests.case import AppCase
- class test_StepFormatter(AppCase):
- def test_get_prefix(self):
- f = bootsteps.StepFormatter()
- s = Mock()
- s.last = True
- self.assertEqual(f._get_prefix(s), f.blueprint_prefix)
- s2 = Mock()
- s2.last = False
- s2.conditional = True
- self.assertEqual(f._get_prefix(s2), f.conditional_prefix)
- s3 = Mock()
- s3.last = s3.conditional = False
- self.assertEqual(f._get_prefix(s3), '')
- def test_node(self):
- f = bootsteps.StepFormatter()
- f.draw_node = Mock()
- step = Mock()
- step.last = False
- f.node(step, x=3)
- f.draw_node.assert_called_with(step, f.node_scheme, {'x': 3})
- step.last = True
- f.node(step, x=3)
- f.draw_node.assert_called_with(step, f.blueprint_scheme, {'x': 3})
- def test_edge(self):
- f = bootsteps.StepFormatter()
- f.draw_edge = Mock()
- a, b = Mock(), Mock()
- a.last = True
- f.edge(a, b, x=6)
- f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
- 'x': 6, 'arrowhead': 'none', 'color': 'darkseagreen3',
- })
- a.last = False
- f.edge(a, b, x=6)
- f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
- 'x': 6,
- })
- class test_Step(AppCase):
- class Def(bootsteps.StartStopStep):
- name = 'test_Step.Def'
- def setup(self):
- self.steps = []
- def test_blueprint_name(self, bp='test_blueprint_name'):
- class X(bootsteps.Step):
- blueprint = bp
- name = 'X'
- self.assertEqual(X.name, 'X')
- class Y(bootsteps.Step):
- name = '%s.Y' % bp
- self.assertEqual(Y.name, '%s.Y' % bp)
- def test_init(self):
- self.assertTrue(self.Def(self))
- def test_create(self):
- self.Def(self).create(self)
- def test_include_if(self):
- x = self.Def(self)
- x.enabled = True
- self.assertTrue(x.include_if(self))
- x.enabled = False
- self.assertFalse(x.include_if(self))
- def test_instantiate(self):
- self.assertIsInstance(self.Def(self).instantiate(self.Def, self),
- self.Def)
- def test_include_when_enabled(self):
- x = self.Def(self)
- x.create = Mock()
- x.create.return_value = 'George'
- self.assertTrue(x.include(self))
- self.assertEqual(x.obj, 'George')
- x.create.assert_called_with(self)
- def test_include_when_disabled(self):
- x = self.Def(self)
- x.enabled = False
- x.create = Mock()
- self.assertFalse(x.include(self))
- self.assertFalse(x.create.call_count)
- def test_repr(self):
- x = self.Def(self)
- self.assertTrue(repr(x))
- class test_ConsumerStep(AppCase):
- def test_interface(self):
- step = bootsteps.ConsumerStep(self)
- with self.assertRaises(NotImplementedError):
- step.get_consumers(self)
- def test_start_stop_shutdown(self):
- consumer = Mock()
- self.connection = Mock()
- class Step(bootsteps.ConsumerStep):
- def get_consumers(self, c):
- return [consumer]
- step = Step(self)
- self.assertEqual(step.get_consumers(self), [consumer])
- step.start(self)
- consumer.consume.assert_called_with()
- step.stop(self)
- consumer.cancel.assert_called_with()
- step.shutdown(self)
- consumer.channel.close.assert_called_with()
- def test_start_no_consumers(self):
- self.connection = Mock()
- class Step(bootsteps.ConsumerStep):
- def get_consumers(self, c):
- return ()
- step = Step(self)
- step.start(self)
- class test_StartStopStep(AppCase):
- class Def(bootsteps.StartStopStep):
- name = 'test_StartStopStep.Def'
- def setup(self):
- self.steps = []
- def test_start__stop(self):
- x = self.Def(self)
- x.create = Mock()
- # include creates the underlying object and sets
- # its x.obj attribute to it, as well as appending
- # it to the parent.steps list.
- x.include(self)
- self.assertTrue(self.steps)
- self.assertIs(self.steps[0], x)
- x.start(self)
- x.obj.start.assert_called_with()
- x.stop(self)
- x.obj.stop.assert_called_with()
- x.obj = None
- self.assertIsNone(x.start(self))
- def test_include_when_disabled(self):
- x = self.Def(self)
- x.enabled = False
- x.include(self)
- self.assertFalse(self.steps)
- def test_terminate(self):
- x = self.Def(self)
- x.create = Mock()
- x.include(self)
- delattr(x.obj, 'terminate')
- x.terminate(self)
- x.obj.stop.assert_called_with()
- class test_Blueprint(AppCase):
- class Blueprint(bootsteps.Blueprint):
- name = 'test_Blueprint'
- def test_steps_added_to_unclaimed(self):
- class tnA(bootsteps.Step):
- name = 'test_Blueprint.A'
- class tnB(bootsteps.Step):
- name = 'test_Blueprint.B'
- class xxA(bootsteps.Step):
- name = 'xx.A'
- class Blueprint(self.Blueprint):
- default_steps = [tnA, tnB]
- blueprint = Blueprint(app=self.app)
- self.assertIn(tnA, blueprint._all_steps())
- self.assertIn(tnB, blueprint._all_steps())
- self.assertNotIn(xxA, blueprint._all_steps())
- def test_init(self):
- blueprint = self.Blueprint(app=self.app)
- self.assertIs(blueprint.app, self.app)
- self.assertEqual(blueprint.name, 'test_Blueprint')
- def test_close__on_close_is_None(self):
- blueprint = self.Blueprint(app=self.app)
- blueprint.on_close = None
- blueprint.send_all = Mock()
- blueprint.close(1)
- blueprint.send_all.assert_called_with(
- 1, 'close', 'closing', reverse=False,
- )
- def test_send_all_with_None_steps(self):
- parent = Mock()
- blueprint = self.Blueprint(app=self.app)
- parent.steps = [None, None, None]
- blueprint.send_all(parent, 'close', 'Closing', reverse=False)
- def test_join_raises_IGNORE_ERRORS(self):
- prev, bootsteps.IGNORE_ERRORS = bootsteps.IGNORE_ERRORS, (KeyError, )
- try:
- blueprint = self.Blueprint(app=self.app)
- blueprint.shutdown_complete = Mock()
- blueprint.shutdown_complete.wait.side_effect = KeyError('luke')
- blueprint.join(timeout=10)
- blueprint.shutdown_complete.wait.assert_called_with(timeout=10)
- finally:
- bootsteps.IGNORE_ERRORS = prev
- def test_connect_with(self):
- class b1s1(bootsteps.Step):
- pass
- class b1s2(bootsteps.Step):
- last = True
- class b2s1(bootsteps.Step):
- pass
- class b2s2(bootsteps.Step):
- last = True
- b1 = self.Blueprint([b1s1, b1s2], app=self.app)
- b2 = self.Blueprint([b2s1, b2s2], app=self.app)
- b1.apply(Mock())
- b2.apply(Mock())
- b1.connect_with(b2)
- self.assertIn(b1s1, b1.graph)
- self.assertIn(b2s1, b1.graph)
- self.assertIn(b2s2, b1.graph)
- self.assertTrue(repr(b1s1))
- self.assertTrue(str(b1s1))
- def test_topsort_raises_KeyError(self):
- class Step(bootsteps.Step):
- requires = ('xyxxx.fsdasewe.Unknown', )
- b = self.Blueprint([Step], app=self.app)
- b.steps = b.claim_steps()
- with self.assertRaises(ImportError):
- b._finalize_steps(b.steps)
- Step.requires = ()
- b.steps = b.claim_steps()
- b._finalize_steps(b.steps)
- with patch('celery.bootsteps.DependencyGraph') as Dep:
- g = Dep.return_value = Mock()
- g.topsort.side_effect = KeyError('foo')
- with self.assertRaises(KeyError):
- b._finalize_steps(b.steps)
- def test_apply(self):
- class MyBlueprint(bootsteps.Blueprint):
- name = 'test_apply'
- def modules(self):
- return ['A', 'B']
- class B(bootsteps.Step):
- name = 'test_apply.B'
- class C(bootsteps.Step):
- name = 'test_apply.C'
- requires = [B]
- class A(bootsteps.Step):
- name = 'test_apply.A'
- requires = [C]
- class D(bootsteps.Step):
- name = 'test_apply.D'
- last = True
- x = MyBlueprint([A, D], app=self.app)
- x.apply(self)
- self.assertIsInstance(x.order[0], B)
- self.assertIsInstance(x.order[1], C)
- self.assertIsInstance(x.order[2], A)
- self.assertIsInstance(x.order[3], D)
- self.assertIn(A, x.types)
- self.assertIs(x[A.name], x.order[2])
- def test_find_last_but_no_steps(self):
- class MyBlueprint(bootsteps.Blueprint):
- name = 'qwejwioqjewoqiej'
- x = MyBlueprint(app=self.app)
- x.apply(self)
- self.assertIsNone(x._find_last())
|