from __future__ import absolute_import from kombu import Exchange from kombu.utils.functional import maybe_evaluate from celery.app import routes from celery.exceptions import QueueNotFound from celery.utils.functional import LRUCache from celery.tests.case import AppCase def Router(app, *args, **kwargs): return routes.Router(*args, app=app, **kwargs) def E(app, queues): def expand(answer): return Router(app, [], queues).expand_destination(answer) return expand def set_queues(app, **queues): app.conf.CELERY_QUEUES = queues app.amqp.queues = app.amqp.Queues(queues) class RouteCase(AppCase): def setup(self): self.a_queue = { 'exchange': 'fooexchange', 'exchange_type': 'fanout', 'routing_key': 'xuzzy', } self.b_queue = { 'exchange': 'barexchange', 'exchange_type': 'topic', 'routing_key': 'b.b.#', } self.d_queue = { 'exchange': self.app.conf.CELERY_DEFAULT_EXCHANGE, 'exchange_type': self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE, 'routing_key': self.app.conf.CELERY_DEFAULT_ROUTING_KEY, } @self.app.task(shared=False) def mytask(): pass self.mytask = mytask class test_MapRoute(RouteCase): def test_route_for_task_expanded_route(self): set_queues(self.app, foo=self.a_queue, bar=self.b_queue) expand = E(self.app, self.app.amqp.queues) route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}}) self.assertEqual( expand(route.route_for_task(self.mytask.name))['queue'].name, 'foo', ) self.assertIsNone(route.route_for_task('celery.awesome')) def test_route_for_task(self): set_queues(self.app, foo=self.a_queue, bar=self.b_queue) expand = E(self.app, self.app.amqp.queues) route = routes.MapRoute({self.mytask.name: self.b_queue}) self.assertDictContainsSubset( self.b_queue, expand(route.route_for_task(self.mytask.name)), ) self.assertIsNone(route.route_for_task('celery.awesome')) def test_expand_route_not_found(self): expand = E(self.app, self.app.amqp.Queues( self.app.conf.CELERY_QUEUES, False)) route = routes.MapRoute({'a': {'queue': 'x'}}) with self.assertRaises(QueueNotFound): expand(route.route_for_task('a')) class test_lookup_route(RouteCase): def test_init_queues(self): router = Router(self.app, queues=None) self.assertDictEqual(router.queues, {}) def test_lookup_takes_first(self): set_queues(self.app, foo=self.a_queue, bar=self.b_queue) R = routes.prepare(({self.mytask.name: {'queue': 'bar'}}, {self.mytask.name: {'queue': 'foo'}})) router = Router(self.app, R, self.app.amqp.queues) self.assertEqual(router.route({}, self.mytask.name, args=[1, 2], kwargs={})['queue'].name, 'bar') def test_expands_queue_in_options(self): set_queues(self.app) R = routes.prepare(()) router = Router( self.app, R, self.app.amqp.queues, create_missing=True, ) # apply_async forwards all arguments, even exchange=None etc, # so need to make sure it's merged correctly. route = router.route( {'queue': 'testq', 'exchange': None, 'routing_key': None, 'immediate': False}, self.mytask.name, args=[1, 2], kwargs={}, ) self.assertEqual(route['queue'].name, 'testq') self.assertEqual(route['queue'].exchange, Exchange('testq')) self.assertEqual(route['queue'].routing_key, 'testq') self.assertEqual(route['immediate'], False) def test_expand_destination_string(self): set_queues(self.app, foo=self.a_queue, bar=self.b_queue) x = Router(self.app, {}, self.app.amqp.queues) dest = x.expand_destination('foo') self.assertEqual(dest['queue'].name, 'foo') def test_lookup_paths_traversed(self): set_queues( self.app, foo=self.a_queue, bar=self.b_queue, **{self.app.conf.CELERY_DEFAULT_QUEUE: self.d_queue} ) R = routes.prepare(( {'celery.xaza': {'queue': 'bar'}}, {self.mytask.name: {'queue': 'foo'}} )) router = Router(self.app, R, self.app.amqp.queues) self.assertEqual(router.route({}, self.mytask.name, args=[1, 2], kwargs={})['queue'].name, 'foo') self.assertEqual( router.route({}, 'celery.poza')['queue'].name, self.app.conf.CELERY_DEFAULT_QUEUE, ) class test_prepare(AppCase): def test_prepare(self): o = object() R = [{'foo': 'bar'}, 'celery.utils.functional.LRUCache', o] p = routes.prepare(R) self.assertIsInstance(p[0], routes.MapRoute) self.assertIsInstance(maybe_evaluate(p[1]), LRUCache) self.assertIs(p[2], o) self.assertEqual(routes.prepare(o), [o]) def test_prepare_item_is_dict(self): R = {'foo': 'bar'} p = routes.prepare(R) self.assertIsInstance(p[0], routes.MapRoute)