from __future__ import absolute_import import warnings from functools import wraps from kombu.pidbox import Mailbox from celery.app import control from celery.utils import uuid from celery.tests.case import AppCase class MockMailbox(Mailbox): sent = [] def _publish(self, command, *args, **kwargs): self.__class__.sent.append(command) def close(self): pass def _collect(self, *args, **kwargs): pass class Control(control.Control): Mailbox = MockMailbox def with_mock_broadcast(fun): @wraps(fun) def _resets(*args, **kwargs): MockMailbox.sent = [] try: return fun(*args, **kwargs) finally: MockMailbox.sent = [] return _resets class test_flatten_reply(AppCase): def test_flatten_reply(self): reply = [ {'foo@example.com': {'hello': 10}}, {'foo@example.com': {'hello': 20}}, {'bar@example.com': {'hello': 30}} ] with warnings.catch_warnings(record=True) as w: nodes = control.flatten_reply(reply) self.assertIn( 'multiple replies', str(w[-1].message), ) self.assertIn('foo@example.com', nodes) self.assertIn('bar@example.com', nodes) class test_inspect(AppCase): def setup(self): self.c = Control(app=self.app) self.prev, self.app.control = self.app.control, self.c self.i = self.c.inspect() def test_prepare_reply(self): self.assertDictEqual(self.i._prepare([{'w1': {'ok': 1}}, {'w2': {'ok': 1}}]), {'w1': {'ok': 1}, 'w2': {'ok': 1}}) i = self.c.inspect(destination='w1') self.assertEqual(i._prepare([{'w1': {'ok': 1}}]), {'ok': 1}) @with_mock_broadcast def test_active(self): self.i.active() self.assertIn('dump_active', MockMailbox.sent) @with_mock_broadcast def test_clock(self): self.i.clock() self.assertIn('clock', MockMailbox.sent) @with_mock_broadcast def test_conf(self): self.i.conf() self.assertIn('dump_conf', MockMailbox.sent) @with_mock_broadcast def test_hello(self): self.i.hello('george@vandelay.com') self.assertIn('hello', MockMailbox.sent) @with_mock_broadcast def test_memsample(self): self.i.memsample() self.assertIn('memsample', MockMailbox.sent) @with_mock_broadcast def test_memdump(self): self.i.memdump() self.assertIn('memdump', MockMailbox.sent) @with_mock_broadcast def test_objgraph(self): self.i.objgraph() self.assertIn('objgraph', MockMailbox.sent) @with_mock_broadcast def test_scheduled(self): self.i.scheduled() self.assertIn('dump_schedule', MockMailbox.sent) @with_mock_broadcast def test_reserved(self): self.i.reserved() self.assertIn('dump_reserved', MockMailbox.sent) @with_mock_broadcast def test_stats(self): self.i.stats() self.assertIn('stats', MockMailbox.sent) @with_mock_broadcast def test_revoked(self): self.i.revoked() self.assertIn('dump_revoked', MockMailbox.sent) @with_mock_broadcast def test_tasks(self): self.i.registered() self.assertIn('dump_tasks', MockMailbox.sent) @with_mock_broadcast def test_ping(self): self.i.ping() self.assertIn('ping', MockMailbox.sent) @with_mock_broadcast def test_active_queues(self): self.i.active_queues() self.assertIn('active_queues', MockMailbox.sent) @with_mock_broadcast def test_report(self): self.i.report() self.assertIn('report', MockMailbox.sent) class test_Broadcast(AppCase): def setup(self): self.control = Control(app=self.app) self.app.control = self.control @self.app.task(shared=False) def mytask(): pass self.mytask = mytask def test_purge(self): self.control.purge() @with_mock_broadcast def test_broadcast(self): self.control.broadcast('foobarbaz', arguments=[]) self.assertIn('foobarbaz', MockMailbox.sent) @with_mock_broadcast def test_broadcast_limit(self): self.control.broadcast( 'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3], ) self.assertIn('foobarbaz1', MockMailbox.sent) @with_mock_broadcast def test_broadcast_validate(self): with self.assertRaises(ValueError): self.control.broadcast('foobarbaz2', destination='foo') @with_mock_broadcast def test_rate_limit(self): self.control.rate_limit(self.mytask.name, '100/m') self.assertIn('rate_limit', MockMailbox.sent) @with_mock_broadcast def test_time_limit(self): self.control.time_limit(self.mytask.name, soft=10, hard=20) self.assertIn('time_limit', MockMailbox.sent) @with_mock_broadcast def test_add_consumer(self): self.control.add_consumer('foo') self.assertIn('add_consumer', MockMailbox.sent) @with_mock_broadcast def test_cancel_consumer(self): self.control.cancel_consumer('foo') self.assertIn('cancel_consumer', MockMailbox.sent) @with_mock_broadcast def test_enable_events(self): self.control.enable_events() self.assertIn('enable_events', MockMailbox.sent) @with_mock_broadcast def test_disable_events(self): self.control.disable_events() self.assertIn('disable_events', MockMailbox.sent) @with_mock_broadcast def test_revoke(self): self.control.revoke('foozbaaz') self.assertIn('revoke', MockMailbox.sent) @with_mock_broadcast def test_ping(self): self.control.ping() self.assertIn('ping', MockMailbox.sent) @with_mock_broadcast def test_election(self): self.control.election('some_id', 'topic', 'action') self.assertIn('election', MockMailbox.sent) @with_mock_broadcast def test_pool_grow(self): self.control.pool_grow(2) self.assertIn('pool_grow', MockMailbox.sent) @with_mock_broadcast def test_pool_shrink(self): self.control.pool_shrink(2) self.assertIn('pool_shrink', MockMailbox.sent) @with_mock_broadcast def test_revoke_from_result(self): self.app.AsyncResult('foozbazzbar').revoke() self.assertIn('revoke', MockMailbox.sent) @with_mock_broadcast def test_revoke_from_resultset(self): r = self.app.GroupResult(uuid(), [self.app.AsyncResult(x) for x in [uuid() for i in range(10)]]) r.revoke() self.assertIn('revoke', MockMailbox.sent)