test_worker.py 23 KB


  1. from __future__ import absolute_import
  2. import logging
  3. import os
  4. import sys
  5. from functools import wraps
  6. from mock import Mock, patch
  7. from nose import SkipTest
  8. from billiard import current_process
  9. from kombu import Exchange, Queue
  10. from celery import platforms
  11. from celery import signals
  12. from celery.app import trace
  13. from celery.apps import worker as cd
  14. from celery.bin.worker import worker, main as worker_main
  15. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  16. from celery.utils.log import ensure_process_aware_logger
  17. from celery.worker import state
  18. from celery.tests.case import (
  19. AppCase,
  20. WhateverIO,
  21. skip_if_pypy,
  22. skip_if_jython,
  23. )
  24. ensure_process_aware_logger()
  25. class WorkerAppCase(AppCase):
  26. def tearDown(self):
  27. super(WorkerAppCase, self).tearDown()
  28. trace.reset_worker_optimizations()
  29. def disable_stdouts(fun):
  30. @wraps(fun)
  31. def disable(*args, **kwargs):
  32. prev_out, prev_err = sys.stdout, sys.stderr
  33. prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__
  34. sys.stdout = sys.__stdout__ = WhateverIO()
  35. sys.stderr = sys.__stderr__ = WhateverIO()
  36. try:
  37. return fun(*args, **kwargs)
  38. finally:
  39. sys.stdout = prev_out
  40. sys.stderr = prev_err
  41. sys.__stdout__ = prev_rout
  42. sys.__stderr__ = prev_rerr
  43. return disable
  44. class Worker(cd.Worker):
  45. redirect_stdouts = False
  46. def start(self, *args, **kwargs):
  47. self.on_start()
  48. class test_Worker(WorkerAppCase):
  49. Worker = Worker
  50. @disable_stdouts
  51. def test_queues_string(self):
  52. w = self.app.Worker()
  53. w.setup_queues('foo,bar,baz')
  54. self.assertTrue('foo' in self.app.amqp.queues)
  55. @disable_stdouts
  56. def test_cpu_count(self):
  57. with patch('celery.worker.cpu_count') as cpu_count:
  58. cpu_count.side_effect = NotImplementedError()
  59. w = self.app.Worker(concurrency=None)
  60. self.assertEqual(w.concurrency, 2)
  61. w = self.app.Worker(concurrency=5)
  62. self.assertEqual(w.concurrency, 5)
  63. @disable_stdouts
  64. def test_windows_B_option(self):
  65. self.app.IS_WINDOWS = True
  66. with self.assertRaises(SystemExit):
  67. worker(app=self.app).run(beat=True)
  68. def test_setup_concurrency_very_early(self):
  69. x = worker()
  70. x.run = Mock()
  71. with self.assertRaises(ImportError):
  72. x.execute_from_commandline(['worker', '-P', 'xyzybox'])
  73. def test_run_from_argv_basic(self):
  74. x = worker(app=self.app)
  75. x.run = Mock()
  76. x.maybe_detach = Mock()
  77. def run(*args, **kwargs):
  78. pass
  79. x.run = run
  80. x.run_from_argv('celery', [])
  81. self.assertTrue(x.maybe_detach.called)
  82. def test_maybe_detach(self):
  83. x = worker(app=self.app)
  84. with patch('celery.bin.worker.detached_celeryd') as detached:
  85. x.maybe_detach([])
  86. self.assertFalse(detached.called)
  87. with self.assertRaises(SystemExit):
  88. x.maybe_detach(['--detach'])
  89. self.assertTrue(detached.called)
  90. @disable_stdouts
  91. def test_invalid_loglevel_gives_error(self):
  92. x = worker(app=self.app)
  93. with self.assertRaises(SystemExit):
  94. x.run(loglevel='GRIM_REAPER')
  95. def test_no_loglevel(self):
  96. self.app.Worker = Mock()
  97. worker(app=self.app).run(loglevel=None)
  98. def test_tasklist(self):
  99. worker = self.app.Worker()
  100. self.assertTrue(worker.app.tasks)
  101. self.assertTrue(worker.app.finalized)
  102. self.assertTrue(worker.tasklist(include_builtins=True))
  103. worker.tasklist(include_builtins=False)
  104. def test_extra_info(self):
  105. worker = self.app.Worker()
  106. worker.loglevel = logging.WARNING
  107. self.assertFalse(worker.extra_info())
  108. worker.loglevel = logging.INFO
  109. self.assertTrue(worker.extra_info())
  110. @disable_stdouts
  111. def test_loglevel_string(self):
  112. worker = self.Worker(app=self.app, loglevel='INFO')
  113. self.assertEqual(worker.loglevel, logging.INFO)
  114. @disable_stdouts
  115. def test_run_worker(self):
  116. handlers = {}
  117. class Signals(platforms.Signals):
  118. def __setitem__(self, sig, handler):
  119. handlers[sig] = handler
  120. p = platforms.signals
  121. platforms.signals = Signals()
  122. try:
  123. w = self.Worker(app=self.app)
  124. w._isatty = False
  125. w.on_start()
  126. for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
  127. self.assertIn(sig, handlers)
  128. handlers.clear()
  129. w = self.Worker(app=self.app)
  130. w._isatty = True
  131. w.on_start()
  132. for sig in 'SIGINT', 'SIGTERM':
  133. self.assertIn(sig, handlers)
  134. self.assertNotIn('SIGHUP', handlers)
  135. finally:
  136. platforms.signals = p
  137. @disable_stdouts
  138. def test_startup_info(self):
  139. worker = self.Worker(app=self.app)
  140. worker.on_start()
  141. self.assertTrue(worker.startup_info())
  142. worker.loglevel = logging.DEBUG
  143. self.assertTrue(worker.startup_info())
  144. worker.loglevel = logging.INFO
  145. self.assertTrue(worker.startup_info())
  146. worker.autoscale = 13, 10
  147. self.assertTrue(worker.startup_info())
  148. prev_loader = self.app.loader
  149. worker = self.Worker(app=self.app, queues='foo,bar,baz,xuzzy,do,re,mi')
  150. self.app.loader = Mock()
  151. self.app.loader.__module__ = 'acme.baked_beans'
  152. self.assertTrue(worker.startup_info())
  153. self.app.loader = Mock()
  154. self.app.loader.__module__ = 'celery.loaders.foo'
  155. self.assertTrue(worker.startup_info())
  156. from celery.loaders.app import AppLoader
  157. self.app.loader = AppLoader(app=self.app)
  158. self.assertTrue(worker.startup_info())
  159. self.app.loader = prev_loader
  160. worker.send_events = True
  161. self.assertTrue(worker.startup_info())
  162. # test when there are too few output lines
  163. # to draft the ascii art onto
  164. prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
  165. self.assertTrue(worker.startup_info())
  166. @disable_stdouts
  167. def test_run(self):
  168. self.Worker(app=self.app).on_start()
  169. self.Worker(app=self.app, purge=True).on_start()
  170. worker = self.Worker(app=self.app)
  171. worker.on_start()
  172. @disable_stdouts
  173. def test_purge_messages(self):
  174. self.Worker(app=self.app).purge_messages()
  175. @disable_stdouts
  176. def test_init_queues(self):
  177. app = self.app
  178. c = app.conf
  179. app.amqp.queues = app.amqp.Queues({
  180. 'celery': {'exchange': 'celery',
  181. 'routing_key': 'celery'},
  182. 'video': {'exchange': 'video',
  183. 'routing_key': 'video'},
  184. })
  185. worker = self.Worker(app=self.app)
  186. worker.setup_queues(['video'])
  187. self.assertIn('video', app.amqp.queues)
  188. self.assertIn('video', app.amqp.queues.consume_from)
  189. self.assertIn('celery', app.amqp.queues)
  190. self.assertNotIn('celery', app.amqp.queues.consume_from)
  191. c.CELERY_CREATE_MISSING_QUEUES = False
  192. del(app.amqp.queues)
  193. with self.assertRaises(ImproperlyConfigured):
  194. self.Worker(app=self.app).setup_queues(['image'])
  195. del(app.amqp.queues)
  196. c.CELERY_CREATE_MISSING_QUEUES = True
  197. worker = self.Worker(app=self.app)
  198. worker.setup_queues(['image'])
  199. self.assertIn('image', app.amqp.queues.consume_from)
  200. self.assertEqual(
  201. Queue('image', Exchange('image'), routing_key='image'),
  202. app.amqp.queues['image'],
  203. )
  204. @disable_stdouts
  205. def test_autoscale_argument(self):
  206. worker1 = self.Worker(app=self.app, autoscale='10,3')
  207. self.assertListEqual(worker1.autoscale, [10, 3])
  208. worker2 = self.Worker(app=self.app, autoscale='10')
  209. self.assertListEqual(worker2.autoscale, [10, 0])
  210. self.assert_no_logging_side_effect()
  211. def test_include_argument(self):
  212. worker1 = self.Worker(app=self.app, include='os')
  213. self.assertListEqual(worker1.include, ['os'])
  214. worker2 = self.Worker(app=self.app,
  215. include='os,sys')
  216. self.assertListEqual(worker2.include, ['os', 'sys'])
  217. self.Worker(app=self.app, include=['os', 'sys'])
  218. @disable_stdouts
  219. def test_unknown_loglevel(self):
  220. with self.assertRaises(SystemExit):
  221. worker(app=self.app).run(loglevel='ALIEN')
  222. worker1 = self.Worker(app=self.app, loglevel=0xFFFF)
  223. self.assertEqual(worker1.loglevel, 0xFFFF)
  224. @disable_stdouts
  225. def test_warns_if_running_as_privileged_user(self):
  226. app = self.app
  227. if app.IS_WINDOWS:
  228. raise SkipTest('Not applicable on Windows')
  229. def getuid():
  230. return 0
  231. with patch('os.getuid') as getuid:
  232. getuid.return_value = 0
  233. self.app.conf.CELERY_ACCEPT_CONTENT = ['pickle']
  234. with self.assertRaises(RuntimeError):
  235. worker = self.Worker(app=self.app)
  236. worker.on_start()
  237. cd.C_FORCE_ROOT = True
  238. try:
  239. with self.assertWarnsRegex(
  240. RuntimeWarning,
  241. r'absolutely not recommended'):
  242. worker = self.Worker(app=self.app)
  243. worker.on_start()
  244. finally:
  245. cd.C_FORCE_ROOT = False
  246. self.app.conf.CELERY_ACCEPT_CONTENT = ['json']
  247. with self.assertWarnsRegex(
  248. RuntimeWarning,
  249. r'absolutely not recommended'):
  250. worker = self.Worker(app=self.app)
  251. worker.on_start()
  252. @disable_stdouts
  253. def test_redirect_stdouts(self):
  254. self.Worker(app=self.app, redirect_stdouts=False)
  255. with self.assertRaises(AttributeError):
  256. sys.stdout.logger
  257. @disable_stdouts
  258. def test_on_start_custom_logging(self):
  259. self.app.log.redirect_stdouts = Mock()
  260. worker = self.Worker(app=self.app, redirect_stoutds=True)
  261. worker._custom_logging = True
  262. worker.on_start()
  263. self.assertFalse(self.app.log.redirect_stdouts.called)
  264. def test_setup_logging_no_color(self):
  265. worker = self.Worker(
  266. app=self.app, redirect_stdouts=False, no_color=True,
  267. )
  268. prev, self.app.log.setup = self.app.log.setup, Mock()
  269. worker.setup_logging()
  270. self.assertFalse(self.app.log.setup.call_args[1]['colorize'])
  271. @disable_stdouts
  272. def test_startup_info_pool_is_str(self):
  273. worker = self.Worker(app=self.app, redirect_stdouts=False)
  274. worker.pool_cls = 'foo'
  275. worker.startup_info()
  276. def test_redirect_stdouts_already_handled(self):
  277. logging_setup = [False]
  278. @signals.setup_logging.connect
  279. def on_logging_setup(**kwargs):
  280. logging_setup[0] = True
  281. try:
  282. worker = self.Worker(app=self.app, redirect_stdouts=False)
  283. worker.app.log.already_setup = False
  284. worker.setup_logging()
  285. self.assertTrue(logging_setup[0])
  286. with self.assertRaises(AttributeError):
  287. sys.stdout.logger
  288. finally:
  289. signals.setup_logging.disconnect(on_logging_setup)
  290. @disable_stdouts
  291. def test_platform_tweaks_osx(self):
  292. class OSXWorker(Worker):
  293. proxy_workaround_installed = False
  294. def osx_proxy_detection_workaround(self):
  295. self.proxy_workaround_installed = True
  296. worker = OSXWorker(app=self.app, redirect_stdouts=False)
  297. def install_HUP_nosupport(controller):
  298. controller.hup_not_supported_installed = True
  299. class Controller(object):
  300. pass
  301. prev = cd.install_HUP_not_supported_handler
  302. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  303. try:
  304. worker.app.IS_OSX = True
  305. controller = Controller()
  306. worker.install_platform_tweaks(controller)
  307. self.assertTrue(controller.hup_not_supported_installed)
  308. self.assertTrue(worker.proxy_workaround_installed)
  309. finally:
  310. cd.install_HUP_not_supported_handler = prev
  311. @disable_stdouts
  312. def test_general_platform_tweaks(self):
  313. restart_worker_handler_installed = [False]
  314. def install_worker_restart_handler(worker):
  315. restart_worker_handler_installed[0] = True
  316. class Controller(object):
  317. pass
  318. prev = cd.install_worker_restart_handler
  319. cd.install_worker_restart_handler = install_worker_restart_handler
  320. try:
  321. worker = self.Worker(app=self.app)
  322. worker.app.IS_OSX = False
  323. worker.install_platform_tweaks(Controller())
  324. self.assertTrue(restart_worker_handler_installed[0])
  325. finally:
  326. cd.install_worker_restart_handler = prev
  327. @disable_stdouts
  328. def test_on_consumer_ready(self):
  329. worker_ready_sent = [False]
  330. @signals.worker_ready.connect
  331. def on_worker_ready(**kwargs):
  332. worker_ready_sent[0] = True
  333. self.Worker(app=self.app).on_consumer_ready(object())
  334. self.assertTrue(worker_ready_sent[0])
  335. class test_funs(WorkerAppCase):
  336. def test_active_thread_count(self):
  337. self.assertTrue(cd.active_thread_count())
  338. @disable_stdouts
  339. def test_set_process_status(self):
  340. try:
  341. __import__('setproctitle')
  342. except ImportError:
  343. raise SkipTest('setproctitle not installed')
  344. worker = Worker(app=self.app, hostname='xyzza')
  345. prev1, sys.argv = sys.argv, ['Arg0']
  346. try:
  347. st = worker.set_process_status('Running')
  348. self.assertIn('celeryd', st)
  349. self.assertIn('xyzza', st)
  350. self.assertIn('Running', st)
  351. prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
  352. try:
  353. st = worker.set_process_status('Running')
  354. self.assertIn('celeryd', st)
  355. self.assertIn('xyzza', st)
  356. self.assertIn('Running', st)
  357. self.assertIn('Arg1', st)
  358. finally:
  359. sys.argv = prev2
  360. finally:
  361. sys.argv = prev1
  362. @disable_stdouts
  363. def test_parse_options(self):
  364. cmd = worker()
  365. cmd.app = self.app
  366. opts, args = cmd.parse_options('worker', ['--concurrency=512'])
  367. self.assertEqual(opts.concurrency, 512)
  368. @disable_stdouts
  369. def test_main(self):
  370. p, cd.Worker = cd.Worker, Worker
  371. s, sys.argv = sys.argv, ['worker', '--discard']
  372. try:
  373. worker_main(app=self.app)
  374. finally:
  375. cd.Worker = p
  376. sys.argv = s
  377. class test_signal_handlers(WorkerAppCase):
  378. class _Worker(object):
  379. stopped = False
  380. terminated = False
  381. def stop(self, in_sighandler=False):
  382. self.stopped = True
  383. def terminate(self, in_sighandler=False):
  384. self.terminated = True
  385. def psig(self, fun, *args, **kwargs):
  386. handlers = {}
  387. class Signals(platforms.Signals):
  388. def __setitem__(self, sig, handler):
  389. handlers[sig] = handler
  390. p, platforms.signals = platforms.signals, Signals()
  391. try:
  392. fun(*args, **kwargs)
  393. return handlers
  394. finally:
  395. platforms.signals = p
  396. @disable_stdouts
  397. def test_worker_int_handler(self):
  398. worker = self._Worker()
  399. handlers = self.psig(cd.install_worker_int_handler, worker)
  400. next_handlers = {}
  401. state.should_stop = False
  402. state.should_terminate = False
  403. class Signals(platforms.Signals):
  404. def __setitem__(self, sig, handler):
  405. next_handlers[sig] = handler
  406. with patch('celery.apps.worker.active_thread_count') as c:
  407. c.return_value = 3
  408. p, platforms.signals = platforms.signals, Signals()
  409. try:
  410. handlers['SIGINT']('SIGINT', object())
  411. self.assertTrue(state.should_stop)
  412. finally:
  413. platforms.signals = p
  414. state.should_stop = False
  415. try:
  416. next_handlers['SIGINT']('SIGINT', object())
  417. self.assertTrue(state.should_terminate)
  418. finally:
  419. state.should_terminate = False
  420. with patch('celery.apps.worker.active_thread_count') as c:
  421. c.return_value = 1
  422. p, platforms.signals = platforms.signals, Signals()
  423. try:
  424. with self.assertRaises(SystemExit):
  425. handlers['SIGINT']('SIGINT', object())
  426. finally:
  427. platforms.signals = p
  428. with self.assertRaises(SystemTerminate):
  429. next_handlers['SIGINT']('SIGINT', object())
  430. @disable_stdouts
  431. def test_worker_int_handler_only_stop_MainProcess(self):
  432. try:
  433. import _multiprocessing # noqa
  434. except ImportError:
  435. raise SkipTest('only relevant for multiprocessing')
  436. process = current_process()
  437. name, process.name = process.name, 'OtherProcess'
  438. with patch('celery.apps.worker.active_thread_count') as c:
  439. c.return_value = 3
  440. try:
  441. worker = self._Worker()
  442. handlers = self.psig(cd.install_worker_int_handler, worker)
  443. handlers['SIGINT']('SIGINT', object())
  444. self.assertTrue(state.should_stop)
  445. finally:
  446. process.name = name
  447. state.should_stop = False
  448. with patch('celery.apps.worker.active_thread_count') as c:
  449. c.return_value = 1
  450. try:
  451. worker = self._Worker()
  452. handlers = self.psig(cd.install_worker_int_handler, worker)
  453. with self.assertRaises(SystemExit):
  454. handlers['SIGINT']('SIGINT', object())
  455. finally:
  456. process.name = name
  457. state.should_stop = False
  458. @disable_stdouts
  459. def test_install_HUP_not_supported_handler(self):
  460. worker = self._Worker()
  461. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  462. handlers['SIGHUP']('SIGHUP', object())
  463. @disable_stdouts
  464. def test_worker_term_hard_handler_only_stop_MainProcess(self):
  465. try:
  466. import _multiprocessing # noqa
  467. except ImportError:
  468. raise SkipTest('only relevant for multiprocessing')
  469. process = current_process()
  470. name, process.name = process.name, 'OtherProcess'
  471. try:
  472. with patch('celery.apps.worker.active_thread_count') as c:
  473. c.return_value = 3
  474. worker = self._Worker()
  475. handlers = self.psig(
  476. cd.install_worker_term_hard_handler, worker)
  477. try:
  478. handlers['SIGQUIT']('SIGQUIT', object())
  479. self.assertTrue(state.should_terminate)
  480. finally:
  481. state.should_terminate = False
  482. with patch('celery.apps.worker.active_thread_count') as c:
  483. c.return_value = 1
  484. worker = self._Worker()
  485. handlers = self.psig(
  486. cd.install_worker_term_hard_handler, worker)
  487. with self.assertRaises(SystemTerminate):
  488. handlers['SIGQUIT']('SIGQUIT', object())
  489. finally:
  490. process.name = name
  491. @disable_stdouts
  492. def test_worker_term_handler_when_threads(self):
  493. with patch('celery.apps.worker.active_thread_count') as c:
  494. c.return_value = 3
  495. worker = self._Worker()
  496. handlers = self.psig(cd.install_worker_term_handler, worker)
  497. try:
  498. handlers['SIGTERM']('SIGTERM', object())
  499. self.assertTrue(state.should_stop)
  500. finally:
  501. state.should_stop = False
  502. @disable_stdouts
  503. def test_worker_term_handler_when_single_thread(self):
  504. with patch('celery.apps.worker.active_thread_count') as c:
  505. c.return_value = 1
  506. worker = self._Worker()
  507. handlers = self.psig(cd.install_worker_term_handler, worker)
  508. try:
  509. with self.assertRaises(SystemExit):
  510. handlers['SIGTERM']('SIGTERM', object())
  511. finally:
  512. state.should_stop = False
  513. @patch('sys.__stderr__')
  514. @skip_if_pypy
  515. @skip_if_jython
  516. def test_worker_cry_handler(self, stderr):
  517. handlers = self.psig(cd.install_cry_handler)
  518. self.assertIsNone(handlers['SIGUSR1']('SIGUSR1', object()))
  519. self.assertTrue(stderr.write.called)
  520. @disable_stdouts
  521. def test_worker_term_handler_only_stop_MainProcess(self):
  522. try:
  523. import _multiprocessing # noqa
  524. except ImportError:
  525. raise SkipTest('only relevant for multiprocessing')
  526. process = current_process()
  527. name, process.name = process.name, 'OtherProcess'
  528. try:
  529. with patch('celery.apps.worker.active_thread_count') as c:
  530. c.return_value = 3
  531. worker = self._Worker()
  532. handlers = self.psig(cd.install_worker_term_handler, worker)
  533. handlers['SIGTERM']('SIGTERM', object())
  534. self.assertTrue(state.should_stop)
  535. with patch('celery.apps.worker.active_thread_count') as c:
  536. c.return_value = 1
  537. worker = self._Worker()
  538. handlers = self.psig(cd.install_worker_term_handler, worker)
  539. with self.assertRaises(SystemExit):
  540. handlers['SIGTERM']('SIGTERM', object())
  541. finally:
  542. process.name = name
  543. state.should_stop = False
  544. @disable_stdouts
  545. @patch('atexit.register')
  546. @patch('os.close')
  547. def test_worker_restart_handler(self, _close, register):
  548. if getattr(os, 'execv', None) is None:
  549. raise SkipTest('platform does not have excv')
  550. argv = []
  551. def _execv(*args):
  552. argv.extend(args)
  553. execv, os.execv = os.execv, _execv
  554. try:
  555. worker = self._Worker()
  556. handlers = self.psig(cd.install_worker_restart_handler, worker)
  557. handlers['SIGHUP']('SIGHUP', object())
  558. self.assertTrue(state.should_stop)
  559. self.assertTrue(register.called)
  560. callback = register.call_args[0][0]
  561. callback()
  562. self.assertTrue(argv)
  563. finally:
  564. os.execv = execv
  565. state.should_stop = False
  566. @disable_stdouts
  567. def test_worker_term_hard_handler_when_threaded(self):
  568. with patch('celery.apps.worker.active_thread_count') as c:
  569. c.return_value = 3
  570. worker = self._Worker()
  571. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  572. try:
  573. handlers['SIGQUIT']('SIGQUIT', object())
  574. self.assertTrue(state.should_terminate)
  575. finally:
  576. state.should_terminate = False
  577. @disable_stdouts
  578. def test_worker_term_hard_handler_when_single_threaded(self):
  579. with patch('celery.apps.worker.active_thread_count') as c:
  580. c.return_value = 1
  581. worker = self._Worker()
  582. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  583. with self.assertRaises(SystemTerminate):
  584. handlers['SIGQUIT']('SIGQUIT', object())