test_amqp.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. from __future__ import absolute_import
  2. from kombu import Exchange, Queue
  3. from mock import Mock
  4. from celery.app.amqp import Queues, TaskPublisher
  5. from celery.five import keys
  6. from celery.tests.case import AppCase
  7. class test_TaskProducer(AppCase):
  8. def test__exit__(self):
  9. publisher = self.app.amqp.TaskProducer(self.app.connection())
  10. publisher.release = Mock()
  11. with publisher:
  12. pass
  13. publisher.release.assert_called_with()
  14. def test_declare(self):
  15. publisher = self.app.amqp.TaskProducer(self.app.connection())
  16. publisher.exchange.name = 'foo'
  17. publisher.declare()
  18. publisher.exchange.name = None
  19. publisher.declare()
  20. def test_retry_policy(self):
  21. prod = self.app.amqp.TaskProducer(Mock())
  22. prod.channel.connection.client.declared_entities = set()
  23. prod.publish_task('tasks.add', (2, 2), {},
  24. retry_policy={'frobulate': 32.4})
  25. def test_publish_no_retry(self):
  26. prod = self.app.amqp.TaskProducer(Mock())
  27. prod.channel.connection.client.declared_entities = set()
  28. prod.publish_task('tasks.add', (2, 2), {}, retry=False, chord=123)
  29. self.assertFalse(prod.connection.ensure.call_count)
  30. def test_publish_custom_queue(self):
  31. prod = self.app.amqp.TaskProducer(Mock())
  32. self.app.amqp.queues['some_queue'] = Queue(
  33. 'xxx', Exchange('yyy'), 'zzz',
  34. )
  35. prod.channel.connection.client.declared_entities = set()
  36. prod.publish = Mock()
  37. prod.publish_task('tasks.add', (8, 8), {}, retry=False,
  38. queue='some_queue')
  39. self.assertEqual(prod.publish.call_args[1]['exchange'], 'yyy')
  40. self.assertEqual(prod.publish.call_args[1]['routing_key'], 'zzz')
  41. def test_event_dispatcher(self):
  42. prod = self.app.amqp.TaskProducer(Mock())
  43. self.assertTrue(prod.event_dispatcher)
  44. self.assertFalse(prod.event_dispatcher.enabled)
  45. class test_TaskConsumer(AppCase):
  46. def test_accept_content(self):
  47. with self.app.pool.acquire(block=True) as conn:
  48. self.app.conf.CELERY_ACCEPT_CONTENT = ['application/json']
  49. self.assertEqual(
  50. self.app.amqp.TaskConsumer(conn).accept,
  51. set(['application/json'])
  52. )
  53. self.assertEqual(
  54. self.app.amqp.TaskConsumer(conn, accept=['json']).accept,
  55. set(['application/json']),
  56. )
  57. class test_compat_TaskPublisher(AppCase):
  58. def test_compat_exchange_is_string(self):
  59. producer = TaskPublisher(exchange='foo', app=self.app)
  60. self.assertIsInstance(producer.exchange, Exchange)
  61. self.assertEqual(producer.exchange.name, 'foo')
  62. self.assertEqual(producer.exchange.type, 'direct')
  63. producer = TaskPublisher(exchange='foo', exchange_type='topic',
  64. app=self.app)
  65. self.assertEqual(producer.exchange.type, 'topic')
  66. def test_compat_exchange_is_Exchange(self):
  67. producer = TaskPublisher(exchange=Exchange('foo'), app=self.app)
  68. self.assertEqual(producer.exchange.name, 'foo')
  69. class test_PublisherPool(AppCase):
  70. def test_setup_nolimit(self):
  71. self.app.conf.BROKER_POOL_LIMIT = None
  72. try:
  73. delattr(self.app, '_pool')
  74. except AttributeError:
  75. pass
  76. self.app.amqp._producer_pool = None
  77. pool = self.app.amqp.producer_pool
  78. self.assertEqual(pool.limit, self.app.pool.limit)
  79. self.assertFalse(pool._resource.queue)
  80. r1 = pool.acquire()
  81. r2 = pool.acquire()
  82. r1.release()
  83. r2.release()
  84. r1 = pool.acquire()
  85. r2 = pool.acquire()
  86. def test_setup(self):
  87. self.app.conf.BROKER_POOL_LIMIT = 2
  88. try:
  89. delattr(self.app, '_pool')
  90. except AttributeError:
  91. pass
  92. self.app.amqp._producer_pool = None
  93. pool = self.app.amqp.producer_pool
  94. self.assertEqual(pool.limit, self.app.pool.limit)
  95. self.assertTrue(pool._resource.queue)
  96. p1 = r1 = pool.acquire()
  97. p2 = r2 = pool.acquire()
  98. r1.release()
  99. r2.release()
  100. r1 = pool.acquire()
  101. r2 = pool.acquire()
  102. self.assertIs(p2, r1)
  103. self.assertIs(p1, r2)
  104. r1.release()
  105. r2.release()
  106. class test_Queues(AppCase):
  107. def test_queues_format(self):
  108. self.app.amqp.queues._consume_from = {}
  109. self.assertEqual(self.app.amqp.queues.format(), '')
  110. def test_with_defaults(self):
  111. self.assertEqual(Queues(None), {})
  112. def test_add(self):
  113. q = Queues()
  114. q.add('foo', exchange='ex', routing_key='rk')
  115. self.assertIn('foo', q)
  116. self.assertIsInstance(q['foo'], Queue)
  117. self.assertEqual(q['foo'].routing_key, 'rk')
  118. def test_with_ha_policy(self):
  119. qn = Queues(ha_policy=None, create_missing=False)
  120. qn.add('xyz')
  121. self.assertIsNone(qn['xyz'].queue_arguments)
  122. qn.add('xyx', queue_arguments={'x-foo': 'bar'})
  123. self.assertEqual(qn['xyx'].queue_arguments, {'x-foo': 'bar'})
  124. q = Queues(ha_policy='all', create_missing=False)
  125. q.add(Queue('foo'))
  126. self.assertEqual(q['foo'].queue_arguments, {'x-ha-policy': 'all'})
  127. qq = Queue('xyx2', queue_arguments={'x-foo': 'bari'})
  128. q.add(qq)
  129. self.assertEqual(q['xyx2'].queue_arguments, {
  130. 'x-ha-policy': 'all',
  131. 'x-foo': 'bari',
  132. })
  133. q2 = Queues(ha_policy=['A', 'B', 'C'], create_missing=False)
  134. q2.add(Queue('foo'))
  135. self.assertEqual(q2['foo'].queue_arguments, {
  136. 'x-ha-policy': 'nodes',
  137. 'x-ha-policy-params': ['A', 'B', 'C'],
  138. })
  139. def test_select_add(self):
  140. q = Queues()
  141. q.select(['foo', 'bar'])
  142. q.select_add('baz')
  143. self.assertItemsEqual(keys(q._consume_from), ['foo', 'bar', 'baz'])
  144. def test_deselect(self):
  145. q = Queues()
  146. q.select(['foo', 'bar'])
  147. q.deselect('bar')
  148. self.assertItemsEqual(keys(q._consume_from), ['foo'])
  149. def test_with_ha_policy_compat(self):
  150. q = Queues(ha_policy='all')
  151. q.add('bar')
  152. self.assertEqual(q['bar'].queue_arguments, {'x-ha-policy': 'all'})
  153. def test_add_default_exchange(self):
  154. ex = Exchange('fff', 'fanout')
  155. q = Queues(default_exchange=ex)
  156. q.add(Queue('foo'))
  157. self.assertEqual(q['foo'].exchange, ex)
  158. def test_alias(self):
  159. q = Queues()
  160. q.add(Queue('foo', alias='barfoo'))
  161. self.assertIs(q['barfoo'], q['foo'])