test_events.py 8.3 KB


  1. from __future__ import absolute_import
  2. import socket
  3. from mock import Mock
  4. from celery.events import Event
  5. from celery.tests.case import AppCase
  6. class MockProducer(object):
  7. raise_on_publish = False
  8. def __init__(self, *args, **kwargs):
  9. self.sent = []
  10. def publish(self, msg, *args, **kwargs):
  11. if self.raise_on_publish:
  12. raise KeyError()
  13. self.sent.append(msg)
  14. def close(self):
  15. pass
  16. def has_event(self, kind):
  17. for event in self.sent:
  18. if event['type'] == kind:
  19. return event
  20. return False
  21. class test_Event(AppCase):
  22. def test_constructor(self):
  23. event = Event('world war II')
  24. self.assertEqual(event['type'], 'world war II')
  25. self.assertTrue(event['timestamp'])
  26. class test_EventDispatcher(AppCase):
  27. def test_redis_uses_fanout_exchange(self):
  28. self.app.connection = Mock()
  29. conn = self.app.connection.return_value = Mock()
  30. conn.transport.driver_type = 'redis'
  31. dispatcher = self.app.events.Dispatcher(conn, enabled=False)
  32. self.assertEqual(dispatcher.exchange.type, 'fanout')
  33. def test_others_use_topic_exchange(self):
  34. self.app.connection = Mock()
  35. conn = self.app.connection.return_value = Mock()
  36. conn.transport.driver_type = 'amqp'
  37. dispatcher = self.app.events.Dispatcher(conn, enabled=False)
  38. self.assertEqual(dispatcher.exchange.type, 'topic')
  39. def test_takes_channel_connection(self):
  40. x = self.app.events.Dispatcher(channel=Mock())
  41. self.assertIs(x.connection, x.channel.connection.client)
  42. def test_sql_transports_disabled(self):
  43. conn = Mock()
  44. conn.transport.driver_type = 'sql'
  45. x = self.app.events.Dispatcher(connection=conn)
  46. self.assertFalse(x.enabled)
  47. def test_send(self):
  48. producer = MockProducer()
  49. producer.connection = self.app.connection()
  50. connection = Mock()
  51. connection.transport.driver_type = 'amqp'
  52. eventer = self.app.events.Dispatcher(connection, enabled=False,
  53. buffer_while_offline=False)
  54. eventer.producer = producer
  55. eventer.enabled = True
  56. eventer.send('World War II', ended=True)
  57. self.assertTrue(producer.has_event('World War II'))
  58. eventer.enabled = False
  59. eventer.send('World War III')
  60. self.assertFalse(producer.has_event('World War III'))
  61. evs = ('Event 1', 'Event 2', 'Event 3')
  62. eventer.enabled = True
  63. eventer.producer.raise_on_publish = True
  64. eventer.buffer_while_offline = False
  65. with self.assertRaises(KeyError):
  66. eventer.send('Event X')
  67. eventer.buffer_while_offline = True
  68. for ev in evs:
  69. eventer.send(ev)
  70. eventer.producer.raise_on_publish = False
  71. eventer.flush()
  72. for ev in evs:
  73. self.assertTrue(producer.has_event(ev))
  74. buf = eventer._outbound_buffer = Mock()
  75. buf.popleft.side_effect = IndexError()
  76. eventer.flush()
  77. def test_enter_exit(self):
  78. with self.app.connection() as conn:
  79. d = self.app.events.Dispatcher(conn)
  80. d.close = Mock()
  81. with d as _d:
  82. self.assertTrue(_d)
  83. d.close.assert_called_with()
  84. def test_enable_disable_callbacks(self):
  85. on_enable = Mock()
  86. on_disable = Mock()
  87. with self.app.connection() as conn:
  88. with self.app.events.Dispatcher(conn, enabled=False) as d:
  89. d.on_enabled.add(on_enable)
  90. d.on_disabled.add(on_disable)
  91. d.enable()
  92. on_enable.assert_called_with()
  93. d.disable()
  94. on_disable.assert_called_with()
  95. def test_enabled_disable(self):
  96. connection = self.app.connection()
  97. channel = connection.channel()
  98. try:
  99. dispatcher = self.app.events.Dispatcher(connection,
  100. enabled=True)
  101. dispatcher2 = self.app.events.Dispatcher(connection,
  102. enabled=True,
  103. channel=channel)
  104. self.assertTrue(dispatcher.enabled)
  105. self.assertTrue(dispatcher.producer.channel)
  106. self.assertEqual(dispatcher.producer.serializer,
  107. self.app.conf.CELERY_EVENT_SERIALIZER)
  108. created_channel = dispatcher.producer.channel
  109. dispatcher.disable()
  110. dispatcher.disable() # Disable with no active producer
  111. dispatcher2.disable()
  112. self.assertFalse(dispatcher.enabled)
  113. self.assertIsNone(dispatcher.producer)
  114. self.assertFalse(dispatcher2.channel.closed,
  115. 'does not close manually provided channel')
  116. dispatcher.enable()
  117. self.assertTrue(dispatcher.enabled)
  118. self.assertTrue(dispatcher.producer)
  119. # XXX test compat attribute
  120. self.assertIs(dispatcher.publisher, dispatcher.producer)
  121. prev, dispatcher.publisher = dispatcher.producer, 42
  122. try:
  123. self.assertEqual(dispatcher.producer, 42)
  124. finally:
  125. dispatcher.producer = prev
  126. finally:
  127. channel.close()
  128. connection.close()
  129. self.assertTrue(created_channel.closed)
  130. class test_EventReceiver(AppCase):
  131. def test_process(self):
  132. message = {'type': 'world-war'}
  133. got_event = [False]
  134. def my_handler(event):
  135. got_event[0] = True
  136. connection = Mock()
  137. connection.transport_cls = 'memory'
  138. r = self.app.events.Receiver(
  139. connection,
  140. handlers={'world-war': my_handler},
  141. node_id='celery.tests',
  142. )
  143. r._receive(message, object())
  144. self.assertTrue(got_event[0])
  145. def test_catch_all_event(self):
  146. message = {'type': 'world-war'}
  147. got_event = [False]
  148. def my_handler(event):
  149. got_event[0] = True
  150. connection = Mock()
  151. connection.transport_cls = 'memory'
  152. r = self.app.events.Receiver(connection, node_id='celery.tests')
  153. r.handlers['*'] = my_handler
  154. r._receive(message, object())
  155. self.assertTrue(got_event[0])
  156. def test_itercapture(self):
  157. connection = self.app.connection()
  158. try:
  159. r = self.app.events.Receiver(connection, node_id='celery.tests')
  160. it = r.itercapture(timeout=0.0001, wakeup=False)
  161. with self.assertRaises(socket.timeout):
  162. next(it)
  163. with self.assertRaises(socket.timeout):
  164. r.capture(timeout=0.00001)
  165. finally:
  166. connection.close()
  167. def test_event_from_message_localize_disabled(self):
  168. r = self.app.events.Receiver(Mock(), node_id='celery.tests')
  169. r.adjust_clock = Mock()
  170. ts_adjust = Mock()
  171. r.event_from_message(
  172. {'type': 'worker-online', 'clock': 313},
  173. localize=False,
  174. adjust_timestamp=ts_adjust,
  175. )
  176. self.assertFalse(ts_adjust.called)
  177. r.adjust_clock.assert_called_with(313)
  178. def test_itercapture_limit(self):
  179. connection = self.app.connection()
  180. channel = connection.channel()
  181. try:
  182. events_received = [0]
  183. def handler(event):
  184. events_received[0] += 1
  185. producer = self.app.events.Dispatcher(
  186. connection, enabled=True, channel=channel,
  187. )
  188. r = self.app.events.Receiver(
  189. connection,
  190. handlers={'*': handler},
  191. node_id='celery.tests',
  192. )
  193. evs = ['ev1', 'ev2', 'ev3', 'ev4', 'ev5']
  194. for ev in evs:
  195. producer.send(ev)
  196. it = r.itercapture(limit=4, wakeup=True)
  197. next(it) # skip consumer (see itercapture)
  198. list(it)
  199. self.assertEqual(events_received[0], 4)
  200. finally:
  201. channel.close()
  202. connection.close()
  203. class test_misc(AppCase):
  204. def test_State(self):
  205. state = self.app.events.State()
  206. self.assertDictEqual(dict(state.workers), {})
  207. def test_default_dispatcher(self):
  208. with self.app.events.default_dispatcher() as d:
  209. self.assertTrue(d)
  210. self.assertTrue(d.connection)