test_routes.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from __future__ import absolute_import
  2. from kombu import Exchange
  3. from kombu.utils.functional import maybe_evaluate
  4. from celery.app import routes
  5. from celery.exceptions import QueueNotFound
  6. from celery.utils.functional import LRUCache
  7. from celery.tests.case import AppCase
  8. def Router(app, *args, **kwargs):
  9. return routes.Router(*args, app=app, **kwargs)
  10. def E(app, queues):
  11. def expand(answer):
  12. return Router(app, [], queues).expand_destination(answer)
  13. return expand
  14. def set_queues(app, **queues):
  15. app.conf.CELERY_QUEUES = queues
  16. app.amqp.queues = app.amqp.Queues(queues)
  17. class RouteCase(AppCase):
  18. def setup(self):
  19. self.a_queue = {
  20. 'exchange': 'fooexchange',
  21. 'exchange_type': 'fanout',
  22. 'routing_key': 'xuzzy',
  23. }
  24. self.b_queue = {
  25. 'exchange': 'barexchange',
  26. 'exchange_type': 'topic',
  27. 'routing_key': 'b.b.#',
  28. }
  29. self.d_queue = {
  30. 'exchange': self.app.conf.CELERY_DEFAULT_EXCHANGE,
  31. 'exchange_type': self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
  32. 'routing_key': self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
  33. }
  34. @self.app.task(shared=False)
  35. def mytask():
  36. pass
  37. self.mytask = mytask
  38. class test_MapRoute(RouteCase):
  39. def test_route_for_task_expanded_route(self):
  40. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  41. expand = E(self.app, self.app.amqp.queues)
  42. route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
  43. self.assertEqual(
  44. expand(route.route_for_task(self.mytask.name))['queue'].name,
  45. 'foo',
  46. )
  47. self.assertIsNone(route.route_for_task('celery.awesome'))
  48. def test_route_for_task(self):
  49. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  50. expand = E(self.app, self.app.amqp.queues)
  51. route = routes.MapRoute({self.mytask.name: self.b_queue})
  52. self.assertDictContainsSubset(
  53. self.b_queue,
  54. expand(route.route_for_task(self.mytask.name)),
  55. )
  56. self.assertIsNone(route.route_for_task('celery.awesome'))
  57. def test_expand_route_not_found(self):
  58. expand = E(self.app, self.app.amqp.Queues(
  59. self.app.conf.CELERY_QUEUES, False))
  60. route = routes.MapRoute({'a': {'queue': 'x'}})
  61. with self.assertRaises(QueueNotFound):
  62. expand(route.route_for_task('a'))
  63. class test_lookup_route(RouteCase):
  64. def test_init_queues(self):
  65. router = Router(self.app, queues=None)
  66. self.assertDictEqual(router.queues, {})
  67. def test_lookup_takes_first(self):
  68. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  69. R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
  70. {self.mytask.name: {'queue': 'foo'}}))
  71. router = Router(self.app, R, self.app.amqp.queues)
  72. self.assertEqual(router.route({}, self.mytask.name,
  73. args=[1, 2], kwargs={})['queue'].name, 'bar')
  74. def test_expands_queue_in_options(self):
  75. set_queues(self.app)
  76. R = routes.prepare(())
  77. router = Router(
  78. self.app, R, self.app.amqp.queues, create_missing=True,
  79. )
  80. # apply_async forwards all arguments, even exchange=None etc,
  81. # so need to make sure it's merged correctly.
  82. route = router.route(
  83. {'queue': 'testq',
  84. 'exchange': None,
  85. 'routing_key': None,
  86. 'immediate': False},
  87. self.mytask.name,
  88. args=[1, 2], kwargs={},
  89. )
  90. self.assertEqual(route['queue'].name, 'testq')
  91. self.assertEqual(route['queue'].exchange, Exchange('testq'))
  92. self.assertEqual(route['queue'].routing_key, 'testq')
  93. self.assertEqual(route['immediate'], False)
  94. def test_expand_destination_string(self):
  95. set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
  96. x = Router(self.app, {}, self.app.amqp.queues)
  97. dest = x.expand_destination('foo')
  98. self.assertEqual(dest['queue'].name, 'foo')
  99. def test_lookup_paths_traversed(self):
  100. set_queues(
  101. self.app, foo=self.a_queue, bar=self.b_queue,
  102. **{self.app.conf.CELERY_DEFAULT_QUEUE: self.d_queue}
  103. )
  104. R = routes.prepare((
  105. {'celery.xaza': {'queue': 'bar'}},
  106. {self.mytask.name: {'queue': 'foo'}}
  107. ))
  108. router = Router(self.app, R, self.app.amqp.queues)
  109. self.assertEqual(router.route({}, self.mytask.name,
  110. args=[1, 2], kwargs={})['queue'].name, 'foo')
  111. self.assertEqual(
  112. router.route({}, 'celery.poza')['queue'].name,
  113. self.app.conf.CELERY_DEFAULT_QUEUE,
  114. )
  115. class test_prepare(AppCase):
  116. def test_prepare(self):
  117. o = object()
  118. R = [{'foo': 'bar'},
  119. 'celery.utils.functional.LRUCache', o]
  120. p = routes.prepare(R)
  121. self.assertIsInstance(p[0], routes.MapRoute)
  122. self.assertIsInstance(maybe_evaluate(p[1]), LRUCache)
  123. self.assertIs(p[2], o)
  124. self.assertEqual(routes.prepare(o), [o])
  125. def test_prepare_item_is_dict(self):
  126. R = {'foo': 'bar'}
  127. p = routes.prepare(R)
  128. self.assertIsInstance(p[0], routes.MapRoute)