test_consumer.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. from __future__ import absolute_import
  2. import errno
  3. import socket
  4. from mock import Mock, patch, call
  5. from nose import SkipTest
  6. from billiard.exceptions import RestartFreqExceeded
  7. from celery.datastructures import LimitedSet
  8. from celery.worker import state as worker_state
  9. from celery.worker.consumer import (
  10. Consumer,
  11. Heart,
  12. Tasks,
  13. Agent,
  14. Mingle,
  15. Gossip,
  16. dump_body,
  17. CLOSE,
  18. )
  19. from celery.tests.case import AppCase
  20. class test_Consumer(AppCase):
  21. def get_consumer(self, no_hub=False, **kwargs):
  22. consumer = Consumer(
  23. on_task_request=Mock(),
  24. init_callback=Mock(),
  25. pool=Mock(),
  26. app=self.app,
  27. timer=Mock(),
  28. controller=Mock(),
  29. hub=None if no_hub else Mock(),
  30. **kwargs
  31. )
  32. consumer.blueprint = Mock()
  33. consumer._restart_state = Mock()
  34. consumer.connection = Mock()
  35. consumer.connection_errors = (socket.error, OSError, )
  36. return consumer
  37. def test_taskbuckets_defaultdict(self):
  38. c = self.get_consumer()
  39. self.assertIsNone(c.task_buckets['fooxasdwx.wewe'])
  40. def test_dump_body_buffer(self):
  41. msg = Mock()
  42. msg.body = 'str'
  43. try:
  44. buf = buffer(msg.body)
  45. except NameError:
  46. raise SkipTest('buffer type not available')
  47. self.assertTrue(dump_body(msg, buf))
  48. def test_sets_heartbeat(self):
  49. c = self.get_consumer(amqheartbeat=10)
  50. self.assertEqual(c.amqheartbeat, 10)
  51. self.app.conf.BROKER_HEARTBEAT = 20
  52. c = self.get_consumer(amqheartbeat=None)
  53. self.assertEqual(c.amqheartbeat, 20)
  54. def test_gevent_bug_disables_connection_timeout(self):
  55. with patch('celery.worker.consumer._detect_environment') as de:
  56. de.return_value = 'gevent'
  57. self.app.conf.BROKER_CONNECTION_TIMEOUT = 33.33
  58. self.get_consumer()
  59. self.assertIsNone(self.app.conf.BROKER_CONNECTION_TIMEOUT)
  60. def test_limit_task(self):
  61. c = self.get_consumer()
  62. with patch('celery.worker.consumer.task_reserved') as reserved:
  63. bucket = Mock()
  64. request = Mock()
  65. bucket.can_consume.return_value = True
  66. c._limit_task(request, bucket, 3)
  67. bucket.can_consume.assert_called_with(3)
  68. reserved.assert_called_with(request)
  69. c.on_task_request.assert_called_with(request)
  70. with patch('celery.worker.consumer.task_reserved') as reserved:
  71. bucket.can_consume.return_value = False
  72. bucket.expected_time.return_value = 3.33
  73. c._limit_task(request, bucket, 4)
  74. bucket.can_consume.assert_called_with(4)
  75. c.timer.call_after.assert_called_with(
  76. 3.33, c._limit_task, (request, bucket, 4),
  77. )
  78. bucket.expected_time.assert_called_with(4)
  79. self.assertFalse(reserved.called)
  80. def test_start_blueprint_raises_EMFILE(self):
  81. c = self.get_consumer()
  82. exc = c.blueprint.start.side_effect = OSError()
  83. exc.errno = errno.EMFILE
  84. with self.assertRaises(OSError):
  85. c.start()
  86. def test_max_restarts_exceeded(self):
  87. c = self.get_consumer()
  88. def se(*args, **kwargs):
  89. c.blueprint.state = CLOSE
  90. raise RestartFreqExceeded()
  91. c._restart_state.step.side_effect = se
  92. c.blueprint.start.side_effect = socket.error()
  93. with patch('celery.worker.consumer.sleep') as sleep:
  94. c.start()
  95. sleep.assert_called_with(1)
  96. def _closer(self, c):
  97. def se(*args, **kwargs):
  98. c.blueprint.state = CLOSE
  99. return se
  100. def test_collects_at_restart(self):
  101. c = self.get_consumer()
  102. c.connection.collect.side_effect = MemoryError()
  103. c.blueprint.start.side_effect = socket.error()
  104. c.blueprint.restart.side_effect = self._closer(c)
  105. c.start()
  106. c.connection.collect.assert_called_with()
  107. def test_register_with_event_loop(self):
  108. c = self.get_consumer()
  109. c.register_with_event_loop(Mock(name='loop'))
  110. def test_on_close_clears_semaphore_timer_and_reqs(self):
  111. with patch('celery.worker.consumer.reserved_requests') as reserved:
  112. c = self.get_consumer()
  113. c.on_close()
  114. c.controller.semaphore.clear.assert_called_with()
  115. c.timer.clear.assert_called_with()
  116. reserved.clear.assert_called_with()
  117. c.pool.flush.assert_called_with()
  118. c.controller = None
  119. c.timer = None
  120. c.pool = None
  121. c.on_close()
  122. def test_connect_error_handler(self):
  123. self.app.connection = Mock()
  124. conn = self.app.connection.return_value = Mock()
  125. c = self.get_consumer()
  126. self.assertTrue(c.connect())
  127. self.assertTrue(conn.ensure_connection.called)
  128. errback = conn.ensure_connection.call_args[0][0]
  129. conn.alt = [(1, 2, 3)]
  130. errback(Mock(), 0)
  131. class test_Heart(AppCase):
  132. def test_start(self):
  133. c = Mock()
  134. c.timer = Mock()
  135. c.event_dispatcher = Mock()
  136. with patch('celery.worker.heartbeat.Heart') as hcls:
  137. h = Heart(c)
  138. self.assertTrue(h.enabled)
  139. self.assertIsNone(c.heart)
  140. h.start(c)
  141. self.assertTrue(c.heart)
  142. hcls.assert_called_with(c.timer, c.event_dispatcher)
  143. c.heart.start.assert_called_with()
  144. class test_Tasks(AppCase):
  145. def test_stop(self):
  146. c = Mock()
  147. tasks = Tasks(c)
  148. self.assertIsNone(c.task_consumer)
  149. self.assertIsNone(c.qos)
  150. c.task_consumer = Mock()
  151. tasks.stop(c)
  152. def test_stop_already_stopped(self):
  153. c = Mock()
  154. tasks = Tasks(c)
  155. tasks.stop(c)
  156. class test_Agent(AppCase):
  157. def test_start(self):
  158. c = Mock()
  159. agent = Agent(c)
  160. agent.instantiate = Mock()
  161. agent.agent_cls = 'foo:Agent'
  162. self.assertIsNotNone(agent.create(c))
  163. agent.instantiate.assert_called_with(agent.agent_cls, c.connection)
  164. class test_Mingle(AppCase):
  165. def test_start_no_replies(self):
  166. c = Mock()
  167. mingle = Mingle(c)
  168. I = c.app.control.inspect.return_value = Mock()
  169. I.hello.return_value = {}
  170. mingle.start(c)
  171. def test_start(self):
  172. try:
  173. c = Mock()
  174. mingle = Mingle(c)
  175. self.assertTrue(mingle.enabled)
  176. Aig = LimitedSet()
  177. Big = LimitedSet()
  178. Aig.add('Aig-1')
  179. Aig.add('Aig-2')
  180. Big.add('Big-1')
  181. I = c.app.control.inspect.return_value = Mock()
  182. I.hello.return_value = {
  183. 'A@example.com': {
  184. 'clock': 312,
  185. 'revoked': Aig._data,
  186. },
  187. 'B@example.com': {
  188. 'clock': 29,
  189. 'revoked': Big._data,
  190. },
  191. 'C@example.com': {
  192. 'error': 'unknown method',
  193. },
  194. }
  195. mingle.start(c)
  196. I.hello.assert_called_with(c.hostname, worker_state.revoked._data)
  197. c.app.clock.adjust.assert_has_calls([
  198. call(312), call(29),
  199. ], any_order=True)
  200. self.assertIn('Aig-1', worker_state.revoked)
  201. self.assertIn('Aig-2', worker_state.revoked)
  202. self.assertIn('Big-1', worker_state.revoked)
  203. finally:
  204. worker_state.revoked.clear()
  205. class test_Gossip(AppCase):
  206. def test_init(self):
  207. c = self.Consumer()
  208. g = Gossip(c)
  209. self.assertTrue(g.enabled)
  210. self.assertIs(c.gossip, g)
  211. def test_election(self):
  212. c = self.Consumer()
  213. g = Gossip(c)
  214. g.start(c)
  215. g.election('id', 'topic', 'action')
  216. self.assertListEqual(g.consensus_replies['id'], [])
  217. g.dispatcher.send.assert_called_with(
  218. 'worker-elect', id='id', topic='topic', cver=1, action='action',
  219. )
  220. def test_call_task(self):
  221. c = self.Consumer()
  222. g = Gossip(c)
  223. g.start(c)
  224. with patch('celery.worker.consumer.signature') as signature:
  225. sig = signature.return_value = Mock()
  226. task = Mock()
  227. g.call_task(task)
  228. signature.assert_called_with(task, app=c.app)
  229. sig.apply_async.assert_called_with()
  230. sig.apply_async.side_effect = MemoryError()
  231. with patch('celery.worker.consumer.error') as error:
  232. g.call_task(task)
  233. self.assertTrue(error.called)
  234. def Event(self, id='id', clock=312,
  235. hostname='foo@example.com', pid=4312,
  236. topic='topic', action='action', cver=1):
  237. return {
  238. 'id': id,
  239. 'clock': clock,
  240. 'hostname': hostname,
  241. 'pid': pid,
  242. 'topic': topic,
  243. 'action': action,
  244. 'cver': cver,
  245. }
  246. def test_on_elect(self):
  247. c = self.Consumer()
  248. g = Gossip(c)
  249. g.start(c)
  250. event = self.Event('id1')
  251. g.on_elect(event)
  252. in_heap = g.consensus_requests['id1']
  253. self.assertTrue(in_heap)
  254. g.dispatcher.send.assert_called_with('worker-elect-ack', id='id1')
  255. event.pop('clock')
  256. with patch('celery.worker.consumer.error') as error:
  257. g.on_elect(event)
  258. self.assertTrue(error.called)
  259. def Consumer(self, hostname='foo@x.com', pid=4312):
  260. c = Mock()
  261. c.hostname = hostname
  262. c.pid = pid
  263. return c
  264. def setup_election(self, g, c):
  265. g.start(c)
  266. g.clock = self.app.clock
  267. self.assertNotIn('idx', g.consensus_replies)
  268. self.assertIsNone(g.on_elect_ack({'id': 'idx'}))
  269. g.state.alive_workers.return_value = [
  270. 'foo@x.com', 'bar@x.com', 'baz@x.com',
  271. ]
  272. g.consensus_replies['id1'] = []
  273. g.consensus_requests['id1'] = []
  274. e1 = self.Event('id1', 1, 'foo@x.com')
  275. e2 = self.Event('id1', 2, 'bar@x.com')
  276. e3 = self.Event('id1', 3, 'baz@x.com')
  277. g.on_elect(e1)
  278. g.on_elect(e2)
  279. g.on_elect(e3)
  280. self.assertEqual(len(g.consensus_requests['id1']), 3)
  281. with patch('celery.worker.consumer.info'):
  282. g.on_elect_ack(e1)
  283. self.assertEqual(len(g.consensus_replies['id1']), 1)
  284. g.on_elect_ack(e2)
  285. self.assertEqual(len(g.consensus_replies['id1']), 2)
  286. g.on_elect_ack(e3)
  287. with self.assertRaises(KeyError):
  288. g.consensus_replies['id1']
  289. def test_on_elect_ack_win(self):
  290. c = self.Consumer(hostname='foo@x.com') # I will win
  291. g = Gossip(c)
  292. handler = g.election_handlers['topic'] = Mock()
  293. self.setup_election(g, c)
  294. handler.assert_called_with('action')
  295. def test_on_elect_ack_lose(self):
  296. c = self.Consumer(hostname='bar@x.com') # I will lose
  297. g = Gossip(c)
  298. handler = g.election_handlers['topic'] = Mock()
  299. self.setup_election(g, c)
  300. self.assertFalse(handler.called)
  301. def test_on_elect_ack_win_but_no_action(self):
  302. c = self.Consumer(hostname='foo@x.com') # I will win
  303. g = Gossip(c)
  304. g.election_handlers = {}
  305. with patch('celery.worker.consumer.error') as error:
  306. self.setup_election(g, c)
  307. self.assertTrue(error.called)
  308. def test_on_node_join(self):
  309. c = self.Consumer()
  310. g = Gossip(c)
  311. with patch('celery.worker.consumer.info') as info:
  312. g.on_node_join(c)
  313. info.assert_called_with('%s joined the party', 'foo@x.com')
  314. def test_on_node_leave(self):
  315. c = self.Consumer()
  316. g = Gossip(c)
  317. with patch('celery.worker.consumer.info') as info:
  318. g.on_node_leave(c)
  319. info.assert_called_with('%s left', 'foo@x.com')
  320. def test_on_node_lost(self):
  321. c = self.Consumer()
  322. g = Gossip(c)
  323. with patch('celery.worker.consumer.warn') as warn:
  324. g.on_node_lost(c)
  325. warn.assert_called_with('%s went missing!', 'foo@x.com')
  326. def test_register_timer(self):
  327. c = self.Consumer()
  328. g = Gossip(c)
  329. g.register_timer()
  330. c.timer.call_repeatedly.assert_called_with(g.interval, g.periodic)
  331. tref = g._tref
  332. g.register_timer()
  333. tref.cancel.assert_called_with()
  334. def test_periodic(self):
  335. c = self.Consumer()
  336. g = Gossip(c)
  337. g.on_node_lost = Mock()
  338. state = g.state = Mock()
  339. worker = Mock()
  340. state.workers = {'foo': worker}
  341. worker.alive = True
  342. worker.hostname = 'foo'
  343. g.periodic()
  344. worker.alive = False
  345. g.periodic()
  346. g.on_node_lost.assert_called_with(worker)
  347. with self.assertRaises(KeyError):
  348. state.workers['foo']
  349. def test_on_message(self):
  350. c = self.Consumer()
  351. g = Gossip(c)
  352. prepare = Mock()
  353. prepare.return_value = 'worker-online', {}
  354. g.update_state = Mock()
  355. worker = Mock()
  356. g.on_node_join = Mock()
  357. g.on_node_leave = Mock()
  358. g.update_state.return_value = worker, 1
  359. message = Mock()
  360. message.delivery_info = {'routing_key': 'worker-online'}
  361. message.headers = {'hostname': 'other'}
  362. handler = g.event_handlers['worker-online'] = Mock()
  363. g.on_message(prepare, message)
  364. handler.assert_called_with(message.payload)
  365. g.event_handlers = {}
  366. g.on_message(prepare, message)
  367. g.on_node_join.assert_called_with(worker)
  368. message.delivery_info = {'routing_key': 'worker-offline'}
  369. prepare.return_value = 'worker-offline', {}
  370. g.on_message(prepare, message)
  371. g.on_node_leave.assert_called_with(worker)
  372. message.delivery_info = {'routing_key': 'worker-baz'}
  373. prepare.return_value = 'worker-baz', {}
  374. g.update_state.return_value = worker, 0
  375. g.on_message(prepare, message)
  376. g.on_node_leave.reset_mock()
  377. message.headers = {'hostname': g.hostname}
  378. g.on_message(prepare, message)
  379. self.assertFalse(g.on_node_leave.called)
  380. g.clock.forward.assert_called_with()