test_rpc.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from __future__ import absolute_import
  2. from mock import patch
  3. from celery.backends.rpc import RPCBackend
  4. from celery._state import _task_stack
  5. from celery.tests.case import AppCase, Mock
  6. class test_RPCBackend(AppCase):
  7. def setup(self):
  8. self.b = RPCBackend(app=self.app)
  9. def test_oid(self):
  10. oid = self.b.oid
  11. oid2 = self.b.oid
  12. self.assertEqual(oid, oid2)
  13. self.assertEqual(oid, self.app.oid)
  14. def test_interface(self):
  15. self.b.on_reply_declare('task_id')
  16. def test_destination_for(self):
  17. req = Mock(name='request')
  18. req.reply_to = 'reply_to'
  19. req.correlation_id = 'corid'
  20. self.assertTupleEqual(
  21. self.b.destination_for('task_id', req),
  22. ('reply_to', 'corid'),
  23. )
  24. task = Mock()
  25. _task_stack.push(task)
  26. try:
  27. task.request.reply_to = 'reply_to'
  28. task.request.correlation_id = 'corid'
  29. self.assertTupleEqual(
  30. self.b.destination_for('task_id', None),
  31. ('reply_to', 'corid'),
  32. )
  33. finally:
  34. _task_stack.pop()
  35. with self.assertRaises(RuntimeError):
  36. self.b.destination_for('task_id', None)
  37. def test_binding(self):
  38. queue = self.b.binding
  39. self.assertEqual(queue.name, self.b.oid)
  40. self.assertEqual(queue.exchange, self.b.exchange)
  41. self.assertEqual(queue.routing_key, self.b.oid)
  42. self.assertFalse(queue.durable)
  43. self.assertFalse(queue.auto_delete)
  44. def test_many_bindings(self):
  45. self.assertListEqual(
  46. self.b._many_bindings(['a', 'b']),
  47. [self.b.binding],
  48. )
  49. def test_create_binding(self):
  50. self.assertEqual(self.b._create_binding('id'), self.b.binding)
  51. def test_on_task_call(self):
  52. with patch('celery.backends.rpc.maybe_declare') as md:
  53. with self.app.amqp.producer_pool.acquire() as prod:
  54. self.b.on_task_call(prod, 'task_id'),
  55. md.assert_called_with(
  56. self.b.binding(prod.channel),
  57. retry=True,
  58. )
  59. def test_create_exchange(self):
  60. ex = self.b._create_exchange('name')
  61. self.assertIsInstance(ex, self.b.Exchange)
  62. self.assertEqual(ex.name, '')