123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- from __future__ import absolute_import
- from kombu import Exchange
- from kombu.utils.functional import maybe_evaluate
- from celery.app import routes
- from celery.exceptions import QueueNotFound
- from celery.utils.functional import LRUCache
- from celery.tests.case import AppCase
- def Router(app, *args, **kwargs):
- return routes.Router(*args, app=app, **kwargs)
- def E(app, queues):
- def expand(answer):
- return Router(app, [], queues).expand_destination(answer)
- return expand
- def set_queues(app, **queues):
- app.conf.CELERY_QUEUES = queues
- app.amqp.queues = app.amqp.Queues(queues)
- class RouteCase(AppCase):
- def setup(self):
- self.a_queue = {
- 'exchange': 'fooexchange',
- 'exchange_type': 'fanout',
- 'routing_key': 'xuzzy',
- }
- self.b_queue = {
- 'exchange': 'barexchange',
- 'exchange_type': 'topic',
- 'routing_key': 'b.b.#',
- }
- self.d_queue = {
- 'exchange': self.app.conf.CELERY_DEFAULT_EXCHANGE,
- 'exchange_type': self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
- 'routing_key': self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
- }
- @self.app.task(shared=False)
- def mytask():
- pass
- self.mytask = mytask
- class test_MapRoute(RouteCase):
- def test_route_for_task_expanded_route(self):
- set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
- expand = E(self.app, self.app.amqp.queues)
- route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
- self.assertEqual(
- expand(route.route_for_task(self.mytask.name))['queue'].name,
- 'foo',
- )
- self.assertIsNone(route.route_for_task('celery.awesome'))
- def test_route_for_task(self):
- set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
- expand = E(self.app, self.app.amqp.queues)
- route = routes.MapRoute({self.mytask.name: self.b_queue})
- self.assertDictContainsSubset(
- self.b_queue,
- expand(route.route_for_task(self.mytask.name)),
- )
- self.assertIsNone(route.route_for_task('celery.awesome'))
- def test_expand_route_not_found(self):
- expand = E(self.app, self.app.amqp.Queues(
- self.app.conf.CELERY_QUEUES, False))
- route = routes.MapRoute({'a': {'queue': 'x'}})
- with self.assertRaises(QueueNotFound):
- expand(route.route_for_task('a'))
- class test_lookup_route(RouteCase):
- def test_init_queues(self):
- router = Router(self.app, queues=None)
- self.assertDictEqual(router.queues, {})
- def test_lookup_takes_first(self):
- set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
- R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
- {self.mytask.name: {'queue': 'foo'}}))
- router = Router(self.app, R, self.app.amqp.queues)
- self.assertEqual(router.route({}, self.mytask.name,
- args=[1, 2], kwargs={})['queue'].name, 'bar')
- def test_expands_queue_in_options(self):
- set_queues(self.app)
- R = routes.prepare(())
- router = Router(
- self.app, R, self.app.amqp.queues, create_missing=True,
- )
- # apply_async forwards all arguments, even exchange=None etc,
- # so need to make sure it's merged correctly.
- route = router.route(
- {'queue': 'testq',
- 'exchange': None,
- 'routing_key': None,
- 'immediate': False},
- self.mytask.name,
- args=[1, 2], kwargs={},
- )
- self.assertEqual(route['queue'].name, 'testq')
- self.assertEqual(route['queue'].exchange, Exchange('testq'))
- self.assertEqual(route['queue'].routing_key, 'testq')
- self.assertEqual(route['immediate'], False)
- def test_expand_destination_string(self):
- set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
- x = Router(self.app, {}, self.app.amqp.queues)
- dest = x.expand_destination('foo')
- self.assertEqual(dest['queue'].name, 'foo')
- def test_lookup_paths_traversed(self):
- set_queues(
- self.app, foo=self.a_queue, bar=self.b_queue,
- **{self.app.conf.CELERY_DEFAULT_QUEUE: self.d_queue}
- )
- R = routes.prepare((
- {'celery.xaza': {'queue': 'bar'}},
- {self.mytask.name: {'queue': 'foo'}}
- ))
- router = Router(self.app, R, self.app.amqp.queues)
- self.assertEqual(router.route({}, self.mytask.name,
- args=[1, 2], kwargs={})['queue'].name, 'foo')
- self.assertEqual(
- router.route({}, 'celery.poza')['queue'].name,
- self.app.conf.CELERY_DEFAULT_QUEUE,
- )
- class test_prepare(AppCase):
- def test_prepare(self):
- o = object()
- R = [{'foo': 'bar'},
- 'celery.utils.functional.LRUCache', o]
- p = routes.prepare(R)
- self.assertIsInstance(p[0], routes.MapRoute)
- self.assertIsInstance(maybe_evaluate(p[1]), LRUCache)
- self.assertIs(p[2], o)
- self.assertEqual(routes.prepare(o), [o])
- def test_prepare_item_is_dict(self):
- R = {'foo': 'bar'}
- p = routes.prepare(R)
- self.assertIsInstance(p[0], routes.MapRoute)
|