test_worker.py 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091
  1. from __future__ import absolute_import
  2. import os
  3. import socket
  4. from collections import deque
  5. from datetime import datetime, timedelta
  6. from threading import Event
  7. from amqp import ChannelError
  8. from kombu import Connection
  9. from kombu.common import QoS, ignore_errors
  10. from kombu.transport.base import Message
  11. from mock import Mock, patch
  12. from celery.app.defaults import DEFAULTS
  13. from celery.bootsteps import RUN, CLOSE, StartStopStep
  14. from celery.concurrency.base import BasePool
  15. from celery.datastructures import AttributeDict
  16. from celery.exceptions import SystemTerminate, TaskRevokedError
  17. from celery.five import Empty, range, Queue as FastQueue
  18. from celery.utils import uuid
  19. from celery.worker import components
  20. from celery.worker import consumer
  21. from celery.worker.consumer import Consumer as __Consumer
  22. from celery.worker.job import Request
  23. from celery.utils import worker_direct
  24. from celery.utils.serialization import pickle
  25. from celery.utils.timer2 import Timer
  26. from celery.tests.case import AppCase, restore_logging
  27. def MockStep(step=None):
  28. step = Mock() if step is None else step
  29. step.blueprint = Mock()
  30. step.blueprint.name = 'MockNS'
  31. step.name = 'MockStep(%s)' % (id(step), )
  32. return step
  33. class PlaceHolder(object):
  34. pass
  35. def find_step(obj, typ):
  36. return obj.blueprint.steps[typ.name]
  37. class Consumer(__Consumer):
  38. def __init__(self, *args, **kwargs):
  39. kwargs.setdefault('without_mingle', True) # disable Mingle step
  40. kwargs.setdefault('without_gossip', True) # disable Gossip step
  41. kwargs.setdefault('without_heartbeat', True) # disable Heart step
  42. super(Consumer, self).__init__(*args, **kwargs)
  43. class _MyKombuConsumer(Consumer):
  44. broadcast_consumer = Mock()
  45. task_consumer = Mock()
  46. def __init__(self, *args, **kwargs):
  47. kwargs.setdefault('pool', BasePool(2))
  48. super(_MyKombuConsumer, self).__init__(*args, **kwargs)
  49. def restart_heartbeat(self):
  50. self.heart = None
  51. class MyKombuConsumer(Consumer):
  52. def loop(self, *args, **kwargs):
  53. pass
  54. class MockNode(object):
  55. commands = []
  56. def handle_message(self, body, message):
  57. self.commands.append(body.pop('command', None))
  58. class MockEventDispatcher(object):
  59. sent = []
  60. closed = False
  61. flushed = False
  62. _outbound_buffer = []
  63. def send(self, event, *args, **kwargs):
  64. self.sent.append(event)
  65. def close(self):
  66. self.closed = True
  67. def flush(self):
  68. self.flushed = True
  69. class MockHeart(object):
  70. closed = False
  71. def stop(self):
  72. self.closed = True
  73. def create_message(channel, **data):
  74. data.setdefault('id', uuid())
  75. channel.no_ack_consumers = set()
  76. m = Message(channel, body=pickle.dumps(dict(**data)),
  77. content_type='application/x-python-serialize',
  78. content_encoding='binary',
  79. delivery_info={'consumer_tag': 'mock'})
  80. m.accept = ['application/x-python-serialize']
  81. return m
  82. class test_Consumer(AppCase):
  83. def setup(self):
  84. self.buffer = FastQueue()
  85. self.timer = Timer()
  86. @self.app.task(shared=False)
  87. def foo_task(x, y, z):
  88. return x * y * z
  89. self.foo_task = foo_task
  90. def teardown(self):
  91. self.timer.stop()
  92. def test_info(self):
  93. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  94. l.task_consumer = Mock()
  95. l.qos = QoS(l.task_consumer.qos, 10)
  96. l.connection = Mock()
  97. l.connection.info.return_value = {'foo': 'bar'}
  98. l.controller = l.app.WorkController()
  99. l.controller.pool = Mock()
  100. l.controller.pool.info.return_value = [Mock(), Mock()]
  101. l.controller.consumer = l
  102. info = l.controller.stats()
  103. self.assertEqual(info['prefetch_count'], 10)
  104. self.assertTrue(info['broker'])
  105. def test_start_when_closed(self):
  106. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  107. l.blueprint.state = CLOSE
  108. l.start()
  109. def test_connection(self):
  110. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  111. l.blueprint.start(l)
  112. self.assertIsInstance(l.connection, Connection)
  113. l.blueprint.state = RUN
  114. l.event_dispatcher = None
  115. l.blueprint.restart(l)
  116. self.assertTrue(l.connection)
  117. l.blueprint.state = RUN
  118. l.shutdown()
  119. self.assertIsNone(l.connection)
  120. self.assertIsNone(l.task_consumer)
  121. l.blueprint.start(l)
  122. self.assertIsInstance(l.connection, Connection)
  123. l.blueprint.restart(l)
  124. l.stop()
  125. l.shutdown()
  126. self.assertIsNone(l.connection)
  127. self.assertIsNone(l.task_consumer)
  128. def test_close_connection(self):
  129. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  130. l.blueprint.state = RUN
  131. step = find_step(l, consumer.Connection)
  132. conn = l.connection = Mock()
  133. step.shutdown(l)
  134. self.assertTrue(conn.close.called)
  135. self.assertIsNone(l.connection)
  136. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  137. eventer = l.event_dispatcher = Mock()
  138. eventer.enabled = True
  139. heart = l.heart = MockHeart()
  140. l.blueprint.state = RUN
  141. Events = find_step(l, consumer.Events)
  142. Events.shutdown(l)
  143. Heart = find_step(l, consumer.Heart)
  144. Heart.shutdown(l)
  145. self.assertTrue(eventer.close.call_count)
  146. self.assertTrue(heart.closed)
  147. @patch('celery.worker.consumer.warn')
  148. def test_receive_message_unknown(self, warn):
  149. l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  150. l.blueprint.state = RUN
  151. l.steps.pop()
  152. backend = Mock()
  153. m = create_message(backend, unknown={'baz': '!!!'})
  154. l.event_dispatcher = Mock()
  155. l.node = MockNode()
  156. callback = self._get_on_message(l)
  157. callback(m.decode(), m)
  158. self.assertTrue(warn.call_count)
  159. @patch('celery.worker.strategy.to_timestamp')
  160. def test_receive_message_eta_OverflowError(self, to_timestamp):
  161. to_timestamp.side_effect = OverflowError()
  162. l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  163. l.blueprint.state = RUN
  164. l.steps.pop()
  165. m = create_message(Mock(), task=self.foo_task.name,
  166. args=('2, 2'),
  167. kwargs={},
  168. eta=datetime.now().isoformat())
  169. l.event_dispatcher = Mock()
  170. l.node = MockNode()
  171. l.update_strategies()
  172. l.qos = Mock()
  173. callback = self._get_on_message(l)
  174. callback(m.decode(), m)
  175. self.assertTrue(m.acknowledged)
  176. @patch('celery.worker.consumer.error')
  177. def test_receive_message_InvalidTaskError(self, error):
  178. l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  179. l.blueprint.state = RUN
  180. l.event_dispatcher = Mock()
  181. l.steps.pop()
  182. m = create_message(Mock(), task=self.foo_task.name,
  183. args=(1, 2), kwargs='foobarbaz', id=1)
  184. l.update_strategies()
  185. l.event_dispatcher = Mock()
  186. callback = self._get_on_message(l)
  187. callback(m.decode(), m)
  188. self.assertIn('Received invalid task message', error.call_args[0][0])
  189. @patch('celery.worker.consumer.crit')
  190. def test_on_decode_error(self, crit):
  191. l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
  192. class MockMessage(Mock):
  193. content_type = 'application/x-msgpack'
  194. content_encoding = 'binary'
  195. body = 'foobarbaz'
  196. message = MockMessage()
  197. l.on_decode_error(message, KeyError('foo'))
  198. self.assertTrue(message.ack.call_count)
  199. self.assertIn("Can't decode message body", crit.call_args[0][0])
  200. def _get_on_message(self, l):
  201. if l.qos is None:
  202. l.qos = Mock()
  203. l.event_dispatcher = Mock()
  204. l.task_consumer = Mock()
  205. l.connection = Mock()
  206. l.connection.drain_events.side_effect = SystemExit()
  207. with self.assertRaises(SystemExit):
  208. l.loop(*l.loop_args())
  209. self.assertTrue(l.task_consumer.register_callback.called)
  210. return l.task_consumer.register_callback.call_args[0][0]
  211. def test_receieve_message(self):
  212. l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
  213. l.blueprint.state = RUN
  214. l.event_dispatcher = Mock()
  215. m = create_message(Mock(), task=self.foo_task.name,
  216. args=[2, 4, 8], kwargs={})
  217. l.update_strategies()
  218. callback = self._get_on_message(l)
  219. callback(m.decode(), m)
  220. in_bucket = self.buffer.get_nowait()
  221. self.assertIsInstance(in_bucket, Request)
  222. self.assertEqual(in_bucket.name, self.foo_task.name)
  223. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  224. self.assertTrue(self.timer.empty())
  225. def test_start_channel_error(self):
  226. class MockConsumer(Consumer):
  227. iterations = 0
  228. def loop(self, *args, **kwargs):
  229. if not self.iterations:
  230. self.iterations = 1
  231. raise KeyError('foo')
  232. raise SyntaxError('bar')
  233. l = MockConsumer(self.buffer.put, timer=self.timer,
  234. send_events=False, pool=BasePool(), app=self.app)
  235. l.channel_errors = (KeyError, )
  236. with self.assertRaises(KeyError):
  237. l.start()
  238. l.timer.stop()
  239. def test_start_connection_error(self):
  240. class MockConsumer(Consumer):
  241. iterations = 0
  242. def loop(self, *args, **kwargs):
  243. if not self.iterations:
  244. self.iterations = 1
  245. raise KeyError('foo')
  246. raise SyntaxError('bar')
  247. l = MockConsumer(self.buffer.put, timer=self.timer,
  248. send_events=False, pool=BasePool(), app=self.app)
  249. l.connection_errors = (KeyError, )
  250. self.assertRaises(SyntaxError, l.start)
  251. l.timer.stop()
  252. def test_loop_ignores_socket_timeout(self):
  253. class Connection(self.app.connection().__class__):
  254. obj = None
  255. def drain_events(self, **kwargs):
  256. self.obj.connection = None
  257. raise socket.timeout(10)
  258. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  259. l.connection = Connection()
  260. l.task_consumer = Mock()
  261. l.connection.obj = l
  262. l.qos = QoS(l.task_consumer.qos, 10)
  263. l.loop(*l.loop_args())
  264. def test_loop_when_socket_error(self):
  265. class Connection(self.app.connection().__class__):
  266. obj = None
  267. def drain_events(self, **kwargs):
  268. self.obj.connection = None
  269. raise socket.error('foo')
  270. l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
  271. l.blueprint.state = RUN
  272. c = l.connection = Connection()
  273. l.connection.obj = l
  274. l.task_consumer = Mock()
  275. l.qos = QoS(l.task_consumer.qos, 10)
  276. with self.assertRaises(socket.error):
  277. l.loop(*l.loop_args())
  278. l.blueprint.state = CLOSE
  279. l.connection = c
  280. l.loop(*l.loop_args())
  281. def test_loop(self):
  282. class Connection(self.app.connection().__class__):
  283. obj = None
  284. def drain_events(self, **kwargs):
  285. self.obj.connection = None
  286. l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
  287. l.blueprint.state = RUN
  288. l.connection = Connection()
  289. l.connection.obj = l
  290. l.task_consumer = Mock()
  291. l.qos = QoS(l.task_consumer.qos, 10)
  292. l.loop(*l.loop_args())
  293. l.loop(*l.loop_args())
  294. self.assertTrue(l.task_consumer.consume.call_count)
  295. l.task_consumer.qos.assert_called_with(prefetch_count=10)
  296. self.assertEqual(l.qos.value, 10)
  297. l.qos.decrement_eventually()
  298. self.assertEqual(l.qos.value, 9)
  299. l.qos.update()
  300. self.assertEqual(l.qos.value, 9)
  301. l.task_consumer.qos.assert_called_with(prefetch_count=9)
  302. def test_ignore_errors(self):
  303. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  304. l.connection_errors = (AttributeError, KeyError, )
  305. l.channel_errors = (SyntaxError, )
  306. ignore_errors(l, Mock(side_effect=AttributeError('foo')))
  307. ignore_errors(l, Mock(side_effect=KeyError('foo')))
  308. ignore_errors(l, Mock(side_effect=SyntaxError('foo')))
  309. with self.assertRaises(IndexError):
  310. ignore_errors(l, Mock(side_effect=IndexError('foo')))
  311. def test_apply_eta_task(self):
  312. from celery.worker import state
  313. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  314. l.qos = QoS(None, 10)
  315. task = object()
  316. qos = l.qos.value
  317. l.apply_eta_task(task)
  318. self.assertIn(task, state.reserved_requests)
  319. self.assertEqual(l.qos.value, qos - 1)
  320. self.assertIs(self.buffer.get_nowait(), task)
  321. def test_receieve_message_eta_isoformat(self):
  322. l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  323. l.blueprint.state = RUN
  324. l.steps.pop()
  325. m = create_message(
  326. Mock(), task=self.foo_task.name,
  327. eta=(datetime.now() + timedelta(days=1)).isoformat(),
  328. args=[2, 4, 8], kwargs={},
  329. )
  330. l.task_consumer = Mock()
  331. l.qos = QoS(l.task_consumer.qos, 1)
  332. current_pcount = l.qos.value
  333. l.event_dispatcher = Mock()
  334. l.enabled = False
  335. l.update_strategies()
  336. callback = self._get_on_message(l)
  337. callback(m.decode(), m)
  338. l.timer.stop()
  339. l.timer.join(1)
  340. items = [entry[2] for entry in self.timer.queue]
  341. found = 0
  342. for item in items:
  343. if item.args[0].name == self.foo_task.name:
  344. found = True
  345. self.assertTrue(found)
  346. self.assertGreater(l.qos.value, current_pcount)
  347. l.timer.stop()
  348. def test_pidbox_callback(self):
  349. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  350. con = find_step(l, consumer.Control).box
  351. con.node = Mock()
  352. con.reset = Mock()
  353. con.on_message('foo', 'bar')
  354. con.node.handle_message.assert_called_with('foo', 'bar')
  355. con.node = Mock()
  356. con.node.handle_message.side_effect = KeyError('foo')
  357. con.on_message('foo', 'bar')
  358. con.node.handle_message.assert_called_with('foo', 'bar')
  359. con.node = Mock()
  360. con.node.handle_message.side_effect = ValueError('foo')
  361. con.on_message('foo', 'bar')
  362. con.node.handle_message.assert_called_with('foo', 'bar')
  363. self.assertTrue(con.reset.called)
  364. def test_revoke(self):
  365. l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  366. l.blueprint.state = RUN
  367. l.steps.pop()
  368. backend = Mock()
  369. id = uuid()
  370. t = create_message(backend, task=self.foo_task.name, args=[2, 4, 8],
  371. kwargs={}, id=id)
  372. from celery.worker.state import revoked
  373. revoked.add(id)
  374. callback = self._get_on_message(l)
  375. callback(t.decode(), t)
  376. self.assertTrue(self.buffer.empty())
  377. def test_receieve_message_not_registered(self):
  378. l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  379. l.blueprint.state = RUN
  380. l.steps.pop()
  381. backend = Mock()
  382. m = create_message(backend, task='x.X.31x', args=[2, 4, 8], kwargs={})
  383. l.event_dispatcher = Mock()
  384. callback = self._get_on_message(l)
  385. self.assertFalse(callback(m.decode(), m))
  386. with self.assertRaises(Empty):
  387. self.buffer.get_nowait()
  388. self.assertTrue(self.timer.empty())
  389. @patch('celery.worker.consumer.warn')
  390. @patch('celery.worker.consumer.logger')
  391. def test_receieve_message_ack_raises(self, logger, warn):
  392. l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
  393. l.blueprint.state = RUN
  394. backend = Mock()
  395. m = create_message(backend, args=[2, 4, 8], kwargs={})
  396. l.event_dispatcher = Mock()
  397. l.connection_errors = (socket.error, )
  398. m.reject = Mock()
  399. m.reject.side_effect = socket.error('foo')
  400. callback = self._get_on_message(l)
  401. self.assertFalse(callback(m.decode(), m))
  402. self.assertTrue(warn.call_count)
  403. with self.assertRaises(Empty):
  404. self.buffer.get_nowait()
  405. self.assertTrue(self.timer.empty())
  406. m.reject.assert_called_with(requeue=False)
  407. self.assertTrue(logger.critical.call_count)
  408. def test_receive_message_eta(self):
  409. l = _MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  410. l.steps.pop()
  411. l.event_dispatcher = Mock()
  412. l.event_dispatcher._outbound_buffer = deque()
  413. backend = Mock()
  414. m = create_message(
  415. backend, task=self.foo_task.name,
  416. args=[2, 4, 8], kwargs={},
  417. eta=(datetime.now() + timedelta(days=1)).isoformat(),
  418. )
  419. try:
  420. l.blueprint.start(l)
  421. p = l.app.conf.BROKER_CONNECTION_RETRY
  422. l.app.conf.BROKER_CONNECTION_RETRY = False
  423. l.blueprint.start(l)
  424. l.app.conf.BROKER_CONNECTION_RETRY = p
  425. l.blueprint.restart(l)
  426. l.event_dispatcher = Mock()
  427. callback = self._get_on_message(l)
  428. callback(m.decode(), m)
  429. finally:
  430. l.timer.stop()
  431. l.timer.join()
  432. in_hold = l.timer.queue[0]
  433. self.assertEqual(len(in_hold), 3)
  434. eta, priority, entry = in_hold
  435. task = entry.args[0]
  436. self.assertIsInstance(task, Request)
  437. self.assertEqual(task.name, self.foo_task.name)
  438. self.assertEqual(task.execute(), 2 * 4 * 8)
  439. with self.assertRaises(Empty):
  440. self.buffer.get_nowait()
  441. def test_reset_pidbox_node(self):
  442. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  443. con = find_step(l, consumer.Control).box
  444. con.node = Mock()
  445. chan = con.node.channel = Mock()
  446. l.connection = Mock()
  447. chan.close.side_effect = socket.error('foo')
  448. l.connection_errors = (socket.error, )
  449. con.reset()
  450. chan.close.assert_called_with()
  451. def test_reset_pidbox_node_green(self):
  452. from celery.worker.pidbox import gPidbox
  453. pool = Mock()
  454. pool.is_green = True
  455. l = MyKombuConsumer(self.buffer.put, timer=self.timer, pool=pool,
  456. app=self.app)
  457. con = find_step(l, consumer.Control)
  458. self.assertIsInstance(con.box, gPidbox)
  459. con.start(l)
  460. l.pool.spawn_n.assert_called_with(
  461. con.box.loop, l,
  462. )
  463. def test__green_pidbox_node(self):
  464. pool = Mock()
  465. pool.is_green = True
  466. l = MyKombuConsumer(self.buffer.put, timer=self.timer, pool=pool,
  467. app=self.app)
  468. l.node = Mock()
  469. controller = find_step(l, consumer.Control)
  470. class BConsumer(Mock):
  471. def __enter__(self):
  472. self.consume()
  473. return self
  474. def __exit__(self, *exc_info):
  475. self.cancel()
  476. controller.box.node.listen = BConsumer()
  477. connections = []
  478. class Connection(object):
  479. calls = 0
  480. def __init__(self, obj):
  481. connections.append(self)
  482. self.obj = obj
  483. self.default_channel = self.channel()
  484. self.closed = False
  485. def __enter__(self):
  486. return self
  487. def __exit__(self, *exc_info):
  488. self.close()
  489. def channel(self):
  490. return Mock()
  491. def as_uri(self):
  492. return 'dummy://'
  493. def drain_events(self, **kwargs):
  494. if not self.calls:
  495. self.calls += 1
  496. raise socket.timeout()
  497. self.obj.connection = None
  498. controller.box._node_shutdown.set()
  499. def close(self):
  500. self.closed = True
  501. l.connection = Mock()
  502. l.connect = lambda: Connection(obj=l)
  503. controller = find_step(l, consumer.Control)
  504. controller.box.loop(l)
  505. self.assertTrue(controller.box.node.listen.called)
  506. self.assertTrue(controller.box.consumer)
  507. controller.box.consumer.consume.assert_called_with()
  508. self.assertIsNone(l.connection)
  509. self.assertTrue(connections[0].closed)
  510. @patch('kombu.connection.Connection._establish_connection')
  511. @patch('kombu.utils.sleep')
  512. def test_connect_errback(self, sleep, connect):
  513. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  514. from kombu.transport.memory import Transport
  515. Transport.connection_errors = (ChannelError, )
  516. def effect():
  517. if connect.call_count > 1:
  518. return
  519. raise ChannelError('error')
  520. connect.side_effect = effect
  521. l.connect()
  522. connect.assert_called_with()
  523. def test_stop_pidbox_node(self):
  524. l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
  525. cont = find_step(l, consumer.Control)
  526. cont._node_stopped = Event()
  527. cont._node_shutdown = Event()
  528. cont._node_stopped.set()
  529. cont.stop(l)
  530. def test_start__loop(self):
  531. class _QoS(object):
  532. prev = 3
  533. value = 4
  534. def update(self):
  535. self.prev = self.value
  536. class _Consumer(MyKombuConsumer):
  537. iterations = 0
  538. def reset_connection(self):
  539. if self.iterations >= 1:
  540. raise KeyError('foo')
  541. init_callback = Mock()
  542. l = _Consumer(self.buffer.put, timer=self.timer,
  543. init_callback=init_callback, app=self.app)
  544. l.task_consumer = Mock()
  545. l.broadcast_consumer = Mock()
  546. l.qos = _QoS()
  547. l.connection = Connection()
  548. l.iterations = 0
  549. def raises_KeyError(*args, **kwargs):
  550. l.iterations += 1
  551. if l.qos.prev != l.qos.value:
  552. l.qos.update()
  553. if l.iterations >= 2:
  554. raise KeyError('foo')
  555. l.loop = raises_KeyError
  556. with self.assertRaises(KeyError):
  557. l.start()
  558. self.assertEqual(l.iterations, 2)
  559. self.assertEqual(l.qos.prev, l.qos.value)
  560. init_callback.reset_mock()
  561. l = _Consumer(self.buffer.put, timer=self.timer, app=self.app,
  562. send_events=False, init_callback=init_callback)
  563. l.qos = _QoS()
  564. l.task_consumer = Mock()
  565. l.broadcast_consumer = Mock()
  566. l.connection = Connection()
  567. l.loop = Mock(side_effect=socket.error('foo'))
  568. with self.assertRaises(socket.error):
  569. l.start()
  570. self.assertTrue(l.loop.call_count)
  571. def test_reset_connection_with_no_node(self):
  572. l = Consumer(self.buffer.put, timer=self.timer, app=self.app)
  573. l.steps.pop()
  574. self.assertEqual(None, l.pool)
  575. l.blueprint.start(l)
  576. class test_WorkController(AppCase):
  577. def setup(self):
  578. self.worker = self.create_worker()
  579. from celery import worker
  580. self._logger = worker.logger
  581. self._comp_logger = components.logger
  582. self.logger = worker.logger = Mock()
  583. self.comp_logger = components.logger = Mock()
  584. @self.app.task(shared=False)
  585. def foo_task(x, y, z):
  586. return x * y * z
  587. self.foo_task = foo_task
  588. def teardown(self):
  589. from celery import worker
  590. worker.logger = self._logger
  591. components.logger = self._comp_logger
  592. def create_worker(self, **kw):
  593. worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
  594. worker.blueprint.shutdown_complete.set()
  595. return worker
  596. def test_on_consumer_ready(self):
  597. self.worker.on_consumer_ready(Mock())
  598. def test_setup_queues_worker_direct(self):
  599. self.app.conf.CELERY_WORKER_DIRECT = True
  600. self.app.amqp.__dict__['queues'] = Mock()
  601. self.worker.setup_queues({})
  602. self.app.amqp.queues.select_add.assert_called_with(
  603. worker_direct(self.worker.hostname),
  604. )
  605. def test_send_worker_shutdown(self):
  606. with patch('celery.signals.worker_shutdown') as ws:
  607. self.worker._send_worker_shutdown()
  608. ws.send.assert_called_with(sender=self.worker)
  609. def test_process_shutdown_on_worker_shutdown(self):
  610. from celery.concurrency.prefork import process_destructor
  611. from celery.concurrency.asynpool import Worker
  612. with patch('celery.signals.worker_process_shutdown') as ws:
  613. Worker._make_shortcuts = Mock()
  614. with patch('os._exit') as _exit:
  615. worker = Worker(None, None, on_exit=process_destructor)
  616. worker._do_exit(22, 3.1415926)
  617. ws.send.assert_called_with(
  618. sender=None, pid=22, exitcode=3.1415926,
  619. )
  620. _exit.assert_called_with(3.1415926)
  621. def test_process_task_revoked_release_semaphore(self):
  622. self.worker._quick_release = Mock()
  623. req = Mock()
  624. req.execute_using_pool.side_effect = TaskRevokedError
  625. self.worker._process_task(req)
  626. self.worker._quick_release.assert_called_with()
  627. delattr(self.worker, '_quick_release')
  628. self.worker._process_task(req)
  629. def test_shutdown_no_blueprint(self):
  630. self.worker.blueprint = None
  631. self.worker._shutdown()
  632. @patch('celery.platforms.create_pidlock')
  633. def test_use_pidfile(self, create_pidlock):
  634. create_pidlock.return_value = Mock()
  635. worker = self.create_worker(pidfile='pidfilelockfilepid')
  636. worker.steps = []
  637. worker.start()
  638. self.assertTrue(create_pidlock.called)
  639. worker.stop()
  640. self.assertTrue(worker.pidlock.release.called)
  641. @patch('celery.platforms.signals')
  642. @patch('celery.platforms.set_mp_process_title')
  643. def test_process_initializer(self, set_mp_process_title, _signals):
  644. with restore_logging():
  645. from celery import signals
  646. from celery._state import _tls
  647. from celery.concurrency.prefork import (
  648. process_initializer, WORKER_SIGRESET, WORKER_SIGIGNORE,
  649. )
  650. def on_worker_process_init(**kwargs):
  651. on_worker_process_init.called = True
  652. on_worker_process_init.called = False
  653. signals.worker_process_init.connect(on_worker_process_init)
  654. def Loader(*args, **kwargs):
  655. loader = Mock(*args, **kwargs)
  656. loader.conf = {}
  657. loader.override_backends = {}
  658. return loader
  659. with self.Celery(loader=Loader) as app:
  660. app.conf = AttributeDict(DEFAULTS)
  661. process_initializer(app, 'awesome.worker.com')
  662. _signals.ignore.assert_any_call(*WORKER_SIGIGNORE)
  663. _signals.reset.assert_any_call(*WORKER_SIGRESET)
  664. self.assertTrue(app.loader.init_worker.call_count)
  665. self.assertTrue(on_worker_process_init.called)
  666. self.assertIs(_tls.current_app, app)
  667. set_mp_process_title.assert_called_with(
  668. 'celeryd', hostname='awesome.worker.com',
  669. )
  670. with patch('celery.app.trace.setup_worker_optimizations') as S:
  671. os.environ['FORKED_BY_MULTIPROCESSING'] = "1"
  672. try:
  673. process_initializer(app, 'luke.worker.com')
  674. S.assert_called_with(app)
  675. finally:
  676. os.environ.pop('FORKED_BY_MULTIPROCESSING', None)
  677. def test_attrs(self):
  678. worker = self.worker
  679. self.assertIsNotNone(worker.timer)
  680. self.assertIsInstance(worker.timer, Timer)
  681. self.assertIsNotNone(worker.pool)
  682. self.assertIsNotNone(worker.consumer)
  683. self.assertTrue(worker.steps)
  684. def test_with_embedded_beat(self):
  685. worker = self.app.WorkController(concurrency=1, loglevel=0, beat=True)
  686. self.assertTrue(worker.beat)
  687. self.assertIn(worker.beat, [w.obj for w in worker.steps])
  688. def test_with_autoscaler(self):
  689. worker = self.create_worker(
  690. autoscale=[10, 3], send_events=False,
  691. timer_cls='celery.utils.timer2.Timer',
  692. )
  693. self.assertTrue(worker.autoscaler)
  694. def test_dont_stop_or_terminate(self):
  695. worker = self.app.WorkController(concurrency=1, loglevel=0)
  696. worker.stop()
  697. self.assertNotEqual(worker.blueprint.state, CLOSE)
  698. worker.terminate()
  699. self.assertNotEqual(worker.blueprint.state, CLOSE)
  700. sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
  701. try:
  702. worker.blueprint.state = RUN
  703. worker.stop(in_sighandler=True)
  704. self.assertNotEqual(worker.blueprint.state, CLOSE)
  705. worker.terminate(in_sighandler=True)
  706. self.assertNotEqual(worker.blueprint.state, CLOSE)
  707. finally:
  708. worker.pool.signal_safe = sigsafe
  709. def test_on_timer_error(self):
  710. worker = self.app.WorkController(concurrency=1, loglevel=0)
  711. try:
  712. raise KeyError('foo')
  713. except KeyError as exc:
  714. components.Timer(worker).on_timer_error(exc)
  715. msg, args = self.comp_logger.error.call_args[0]
  716. self.assertIn('KeyError', msg % args)
  717. def test_on_timer_tick(self):
  718. worker = self.app.WorkController(concurrency=1, loglevel=10)
  719. components.Timer(worker).on_timer_tick(30.0)
  720. xargs = self.comp_logger.debug.call_args[0]
  721. fmt, arg = xargs[0], xargs[1]
  722. self.assertEqual(30.0, arg)
  723. self.assertIn('Next eta %s secs', fmt)
  724. def test_process_task(self):
  725. worker = self.worker
  726. worker.pool = Mock()
  727. backend = Mock()
  728. m = create_message(backend, task=self.foo_task.name, args=[4, 8, 10],
  729. kwargs={})
  730. task = Request(m.decode(), message=m, app=self.app)
  731. worker._process_task(task)
  732. self.assertEqual(worker.pool.apply_async.call_count, 1)
  733. worker.pool.stop()
  734. def test_process_task_raise_base(self):
  735. worker = self.worker
  736. worker.pool = Mock()
  737. worker.pool.apply_async.side_effect = KeyboardInterrupt('Ctrl+C')
  738. backend = Mock()
  739. m = create_message(backend, task=self.foo_task.name, args=[4, 8, 10],
  740. kwargs={})
  741. task = Request(m.decode(), message=m, app=self.app)
  742. worker.steps = []
  743. worker.blueprint.state = RUN
  744. with self.assertRaises(KeyboardInterrupt):
  745. worker._process_task(task)
  746. def test_process_task_raise_SystemTerminate(self):
  747. worker = self.worker
  748. worker.pool = Mock()
  749. worker.pool.apply_async.side_effect = SystemTerminate()
  750. backend = Mock()
  751. m = create_message(backend, task=self.foo_task.name, args=[4, 8, 10],
  752. kwargs={})
  753. task = Request(m.decode(), message=m, app=self.app)
  754. worker.steps = []
  755. worker.blueprint.state = RUN
  756. with self.assertRaises(SystemExit):
  757. worker._process_task(task)
  758. def test_process_task_raise_regular(self):
  759. worker = self.worker
  760. worker.pool = Mock()
  761. worker.pool.apply_async.side_effect = KeyError('some exception')
  762. backend = Mock()
  763. m = create_message(backend, task=self.foo_task.name, args=[4, 8, 10],
  764. kwargs={})
  765. task = Request(m.decode(), message=m, app=self.app)
  766. worker._process_task(task)
  767. worker.pool.stop()
  768. def test_start_catches_base_exceptions(self):
  769. worker1 = self.create_worker()
  770. worker1.blueprint.state = RUN
  771. stc = MockStep()
  772. stc.start.side_effect = SystemTerminate()
  773. worker1.steps = [stc]
  774. worker1.start()
  775. stc.start.assert_called_with(worker1)
  776. self.assertTrue(stc.terminate.call_count)
  777. worker2 = self.create_worker()
  778. worker2.blueprint.state = RUN
  779. sec = MockStep()
  780. sec.start.side_effect = SystemExit()
  781. sec.terminate = None
  782. worker2.steps = [sec]
  783. worker2.start()
  784. self.assertTrue(sec.stop.call_count)
  785. def test_state_db(self):
  786. from celery.worker import state
  787. Persistent = state.Persistent
  788. state.Persistent = Mock()
  789. try:
  790. worker = self.create_worker(state_db='statefilename')
  791. self.assertTrue(worker._persistence)
  792. finally:
  793. state.Persistent = Persistent
  794. def test_process_task_sem(self):
  795. worker = self.worker
  796. worker._quick_acquire = Mock()
  797. req = Mock()
  798. worker._process_task_sem(req)
  799. worker._quick_acquire.assert_called_with(worker._process_task, req)
  800. def test_signal_consumer_close(self):
  801. worker = self.worker
  802. worker.consumer = Mock()
  803. worker.signal_consumer_close()
  804. worker.consumer.close.assert_called_with()
  805. worker.consumer.close.side_effect = AttributeError()
  806. worker.signal_consumer_close()
  807. def test_start__stop(self):
  808. worker = self.worker
  809. worker.blueprint.shutdown_complete.set()
  810. worker.steps = [MockStep(StartStopStep(self)) for _ in range(4)]
  811. worker.blueprint.state = RUN
  812. worker.blueprint.started = 4
  813. for w in worker.steps:
  814. w.start = Mock()
  815. w.close = Mock()
  816. w.stop = Mock()
  817. worker.start()
  818. for w in worker.steps:
  819. self.assertTrue(w.start.call_count)
  820. worker.consumer = Mock()
  821. worker.stop()
  822. for stopstep in worker.steps:
  823. self.assertTrue(stopstep.close.call_count)
  824. self.assertTrue(stopstep.stop.call_count)
  825. # Doesn't close pool if no pool.
  826. worker.start()
  827. worker.pool = None
  828. worker.stop()
  829. # test that stop of None is not attempted
  830. worker.steps[-1] = None
  831. worker.start()
  832. worker.stop()
  833. def test_step_raises(self):
  834. worker = self.worker
  835. step = Mock()
  836. worker.steps = [step]
  837. step.start.side_effect = TypeError()
  838. worker.stop = Mock()
  839. worker.start()
  840. worker.stop.assert_called_with()
  841. def test_state(self):
  842. self.assertTrue(self.worker.state)
  843. def test_start__terminate(self):
  844. worker = self.worker
  845. worker.blueprint.shutdown_complete.set()
  846. worker.blueprint.started = 5
  847. worker.blueprint.state = RUN
  848. worker.steps = [MockStep() for _ in range(5)]
  849. worker.start()
  850. for w in worker.steps[:3]:
  851. self.assertTrue(w.start.call_count)
  852. self.assertTrue(worker.blueprint.started, len(worker.steps))
  853. self.assertEqual(worker.blueprint.state, RUN)
  854. worker.terminate()
  855. for step in worker.steps:
  856. self.assertTrue(step.terminate.call_count)
  857. def test_Queues_pool_no_sem(self):
  858. w = Mock()
  859. w.pool_cls.uses_semaphore = False
  860. components.Queues(w).create(w)
  861. self.assertIs(w.process_task, w._process_task)
  862. def test_Hub_crate(self):
  863. w = Mock()
  864. x = components.Hub(w)
  865. x.create(w)
  866. self.assertTrue(w.timer.max_interval)
  867. def test_Pool_crate_threaded(self):
  868. w = Mock()
  869. w._conninfo.connection_errors = w._conninfo.channel_errors = ()
  870. w.pool_cls = Mock()
  871. w.use_eventloop = False
  872. pool = components.Pool(w)
  873. pool.create(w)
  874. def test_Pool_create(self):
  875. from kombu.async.semaphore import LaxBoundedSemaphore
  876. w = Mock()
  877. w._conninfo.connection_errors = w._conninfo.channel_errors = ()
  878. w.hub = Mock()
  879. PoolImp = Mock()
  880. poolimp = PoolImp.return_value = Mock()
  881. poolimp._pool = [Mock(), Mock()]
  882. poolimp._cache = {}
  883. poolimp._fileno_to_inq = {}
  884. poolimp._fileno_to_outq = {}
  885. from celery.concurrency.prefork import TaskPool as _TaskPool
  886. class MockTaskPool(_TaskPool):
  887. Pool = PoolImp
  888. @property
  889. def timers(self):
  890. return {Mock(): 30}
  891. w.pool_cls = MockTaskPool
  892. w.use_eventloop = True
  893. w.consumer.restart_count = -1
  894. pool = components.Pool(w)
  895. pool.create(w)
  896. pool.register_with_event_loop(w, w.hub)
  897. self.assertIsInstance(w.semaphore, LaxBoundedSemaphore)
  898. P = w.pool
  899. P.start()