test_celery.py 18 KB


  1. from __future__ import absolute_import
  2. import sys
  3. from anyjson import dumps
  4. from datetime import datetime
  5. from mock import Mock, patch
  6. from celery import __main__
  7. from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
  8. from celery.bin.base import Error
  9. from celery.bin.celery import (
  10. Command,
  11. list_,
  12. call,
  13. purge,
  14. result,
  15. inspect,
  16. control,
  17. status,
  18. migrate,
  19. help,
  20. report,
  21. CeleryCommand,
  22. determine_exit_status,
  23. multi,
  24. main as mainfun,
  25. _RemoteControl,
  26. command,
  27. )
  28. from celery.tests.case import AppCase, WhateverIO, override_stdouts
  29. class test__main__(AppCase):
  30. def test_warn_deprecated(self):
  31. with override_stdouts() as (stdout, _):
  32. __main__._warn_deprecated('YADDA YADDA')
  33. self.assertIn('command is deprecated', stdout.getvalue())
  34. self.assertIn('YADDA YADDA', stdout.getvalue())
  35. def test_main(self):
  36. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  37. with patch('celery.bin.celery.main') as main:
  38. __main__.main()
  39. mpc.assert_called_with()
  40. main.assert_called_with()
  41. def test_compat_worker(self):
  42. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  43. with patch('celery.__main__._warn_deprecated') as depr:
  44. with patch('celery.bin.worker.main') as main:
  45. __main__._compat_worker()
  46. mpc.assert_called_with()
  47. depr.assert_called_with('celery worker')
  48. main.assert_called_with()
  49. def test_compat_multi(self):
  50. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  51. with patch('celery.__main__._warn_deprecated') as depr:
  52. with patch('celery.bin.multi.main') as main:
  53. __main__._compat_multi()
  54. mpc.assert_called_with()
  55. depr.assert_called_with('celery multi')
  56. main.assert_called_with()
  57. def test_compat_beat(self):
  58. with patch('celery.__main__.maybe_patch_concurrency') as mpc:
  59. with patch('celery.__main__._warn_deprecated') as depr:
  60. with patch('celery.bin.beat.main') as main:
  61. __main__._compat_beat()
  62. mpc.assert_called_with()
  63. depr.assert_called_with('celery beat')
  64. main.assert_called_with()
  65. class test_Command(AppCase):
  66. def test_Error_repr(self):
  67. x = Error('something happened')
  68. self.assertIsNotNone(x.status)
  69. self.assertTrue(x.reason)
  70. self.assertTrue(str(x))
  71. def setup(self):
  72. self.out = WhateverIO()
  73. self.err = WhateverIO()
  74. self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
  75. def test_error(self):
  76. self.cmd.out = Mock()
  77. self.cmd.error('FOO')
  78. self.assertTrue(self.cmd.out.called)
  79. def test_out(self):
  80. f = Mock()
  81. self.cmd.out('foo', f)
  82. def test_call(self):
  83. def ok_run():
  84. pass
  85. self.cmd.run = ok_run
  86. self.assertEqual(self.cmd(), EX_OK)
  87. def error_run():
  88. raise Error('error', EX_FAILURE)
  89. self.cmd.run = error_run
  90. self.assertEqual(self.cmd(), EX_FAILURE)
  91. def test_run_from_argv(self):
  92. with self.assertRaises(NotImplementedError):
  93. self.cmd.run_from_argv('prog', ['foo', 'bar'])
  94. def test_pretty_list(self):
  95. self.assertEqual(self.cmd.pretty([])[1], '- empty -')
  96. self.assertIn('bar', self.cmd.pretty(['foo', 'bar'])[1])
  97. def test_pretty_dict(self):
  98. self.assertIn(
  99. 'OK',
  100. str(self.cmd.pretty({'ok': 'the quick brown fox'})[0]),
  101. )
  102. self.assertIn(
  103. 'ERROR',
  104. str(self.cmd.pretty({'error': 'the quick brown fox'})[0]),
  105. )
  106. def test_pretty(self):
  107. self.assertIn('OK', str(self.cmd.pretty('the quick brown')))
  108. self.assertIn('OK', str(self.cmd.pretty(object())))
  109. self.assertIn('OK', str(self.cmd.pretty({'foo': 'bar'})))
  110. class test_list(AppCase):
  111. def test_list_bindings_no_support(self):
  112. l = list_(app=self.app, stderr=WhateverIO())
  113. management = Mock()
  114. management.get_bindings.side_effect = NotImplementedError()
  115. with self.assertRaises(Error):
  116. l.list_bindings(management)
  117. def test_run(self):
  118. l = list_(app=self.app, stderr=WhateverIO())
  119. l.run('bindings')
  120. with self.assertRaises(Error):
  121. l.run(None)
  122. with self.assertRaises(Error):
  123. l.run('foo')
  124. class test_call(AppCase):
  125. def setup(self):
  126. @self.app.task(shared=False)
  127. def add(x, y):
  128. return x + y
  129. self.add = add
  130. @patch('celery.app.base.Celery.send_task')
  131. def test_run(self, send_task):
  132. a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
  133. a.run(self.add.name)
  134. self.assertTrue(send_task.called)
  135. a.run(self.add.name,
  136. args=dumps([4, 4]),
  137. kwargs=dumps({'x': 2, 'y': 2}))
  138. self.assertEqual(send_task.call_args[1]['args'], [4, 4])
  139. self.assertEqual(send_task.call_args[1]['kwargs'], {'x': 2, 'y': 2})
  140. a.run(self.add.name, expires=10, countdown=10)
  141. self.assertEqual(send_task.call_args[1]['expires'], 10)
  142. self.assertEqual(send_task.call_args[1]['countdown'], 10)
  143. now = datetime.now()
  144. iso = now.isoformat()
  145. a.run(self.add.name, expires=iso)
  146. self.assertEqual(send_task.call_args[1]['expires'], now)
  147. with self.assertRaises(ValueError):
  148. a.run(self.add.name, expires='foobaribazibar')
  149. class test_purge(AppCase):
  150. @patch('celery.app.control.Control.purge')
  151. def test_run(self, purge_):
  152. out = WhateverIO()
  153. a = purge(app=self.app, stdout=out)
  154. purge_.return_value = 0
  155. a.run()
  156. self.assertIn('No messages purged', out.getvalue())
  157. purge_.return_value = 100
  158. a.run()
  159. self.assertIn('100 messages', out.getvalue())
  160. class test_result(AppCase):
  161. def setup(self):
  162. @self.app.task(shared=False)
  163. def add(x, y):
  164. return x + y
  165. self.add = add
  166. def test_run(self):
  167. with patch('celery.result.AsyncResult.get') as get:
  168. out = WhateverIO()
  169. r = result(app=self.app, stdout=out)
  170. get.return_value = 'Jerry'
  171. r.run('id')
  172. self.assertIn('Jerry', out.getvalue())
  173. get.return_value = 'Elaine'
  174. r.run('id', task=self.add.name)
  175. self.assertIn('Elaine', out.getvalue())
  176. with patch('celery.result.AsyncResult.traceback') as tb:
  177. r.run('id', task=self.add.name, traceback=True)
  178. self.assertIn(str(tb), out.getvalue())
  179. class test_status(AppCase):
  180. @patch('celery.bin.celery.inspect')
  181. def test_run(self, inspect_):
  182. out, err = WhateverIO(), WhateverIO()
  183. ins = inspect_.return_value = Mock()
  184. ins.run.return_value = []
  185. s = status(self.app, stdout=out, stderr=err)
  186. with self.assertRaises(Error):
  187. s.run()
  188. ins.run.return_value = ['a', 'b', 'c']
  189. s.run()
  190. self.assertIn('3 nodes online', out.getvalue())
  191. s.run(quiet=True)
  192. class test_migrate(AppCase):
  193. @patch('celery.contrib.migrate.migrate_tasks')
  194. def test_run(self, migrate_tasks):
  195. out = WhateverIO()
  196. m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
  197. with self.assertRaises(TypeError):
  198. m.run()
  199. self.assertFalse(migrate_tasks.called)
  200. m.run('memory://foo', 'memory://bar')
  201. self.assertTrue(migrate_tasks.called)
  202. state = Mock()
  203. state.count = 10
  204. state.strtotal = 30
  205. m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
  206. self.assertIn('10/30', out.getvalue())
  207. class test_report(AppCase):
  208. def test_run(self):
  209. out = WhateverIO()
  210. r = report(app=self.app, stdout=out)
  211. self.assertEqual(r.run(), EX_OK)
  212. self.assertTrue(out.getvalue())
  213. class test_help(AppCase):
  214. def test_run(self):
  215. out = WhateverIO()
  216. h = help(app=self.app, stdout=out)
  217. h.parser = Mock()
  218. self.assertEqual(h.run(), EX_USAGE)
  219. self.assertTrue(out.getvalue())
  220. self.assertTrue(h.usage('help'))
  221. h.parser.print_help.assert_called_with()
  222. class test_CeleryCommand(AppCase):
  223. def test_execute_from_commandline(self):
  224. x = CeleryCommand(app=self.app)
  225. x.handle_argv = Mock()
  226. x.handle_argv.return_value = 1
  227. with self.assertRaises(SystemExit):
  228. x.execute_from_commandline()
  229. x.handle_argv.return_value = True
  230. with self.assertRaises(SystemExit):
  231. x.execute_from_commandline()
  232. x.handle_argv.side_effect = KeyboardInterrupt()
  233. with self.assertRaises(SystemExit):
  234. x.execute_from_commandline()
  235. x.respects_app_option = True
  236. with self.assertRaises(SystemExit):
  237. x.execute_from_commandline(['celery', 'multi'])
  238. self.assertFalse(x.respects_app_option)
  239. x.respects_app_option = True
  240. with self.assertRaises(SystemExit):
  241. x.execute_from_commandline(['manage.py', 'celery', 'multi'])
  242. self.assertFalse(x.respects_app_option)
  243. def test_with_pool_option(self):
  244. x = CeleryCommand(app=self.app)
  245. self.assertIsNone(x.with_pool_option(['celery', 'events']))
  246. self.assertTrue(x.with_pool_option(['celery', 'worker']))
  247. self.assertTrue(x.with_pool_option(['manage.py', 'celery', 'worker']))
  248. def test_load_extensions_no_commands(self):
  249. with patch('celery.bin.celery.Extensions') as Ext:
  250. ext = Ext.return_value = Mock(name='Extension')
  251. ext.load.return_value = None
  252. x = CeleryCommand(app=self.app)
  253. x.load_extension_commands()
  254. def test_determine_exit_status(self):
  255. self.assertEqual(determine_exit_status('true'), EX_OK)
  256. self.assertEqual(determine_exit_status(''), EX_FAILURE)
  257. def test_remove_options_at_beginning(self):
  258. x = CeleryCommand(app=self.app)
  259. self.assertEqual(x.remove_options_at_beginning(None), [])
  260. self.assertEqual(x.remove_options_at_beginning(['-c 3', '--foo']), [])
  261. self.assertEqual(x.remove_options_at_beginning(['--foo', '-c 3']), [])
  262. self.assertEqual(x.remove_options_at_beginning(
  263. ['foo', '--foo=1']), ['foo', '--foo=1'])
  264. def test_handle_argv(self):
  265. x = CeleryCommand(app=self.app)
  266. x.execute = Mock()
  267. x.handle_argv('celery', [])
  268. x.execute.assert_called_with('help', ['help'])
  269. x.handle_argv('celery', ['start', 'foo'])
  270. x.execute.assert_called_with('start', ['start', 'foo'])
  271. def test_execute(self):
  272. x = CeleryCommand(app=self.app)
  273. Help = x.commands['help'] = Mock()
  274. help = Help.return_value = Mock()
  275. x.execute('fooox', ['a'])
  276. help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
  277. help.reset()
  278. x.execute('help', ['help'])
  279. help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
  280. Dummy = x.commands['dummy'] = Mock()
  281. dummy = Dummy.return_value = Mock()
  282. dummy.run_from_argv.side_effect = Error('foo', status='EX_FAILURE')
  283. help.reset()
  284. x.execute('dummy', ['dummy'])
  285. dummy.run_from_argv.assert_called_with(
  286. x.prog_name, [], command='dummy',
  287. )
  288. help.run_from_argv.assert_called_with(
  289. x.prog_name, [], command='help',
  290. )
  291. exc = dummy.run_from_argv.side_effect = x.UsageError('foo')
  292. x.on_usage_error = Mock()
  293. x.execute('dummy', ['dummy'])
  294. x.on_usage_error.assert_called_with(exc)
  295. def test_on_usage_error(self):
  296. x = CeleryCommand(app=self.app)
  297. x.error = Mock()
  298. x.on_usage_error(x.UsageError('foo'), command=None)
  299. self.assertTrue(x.error.called)
  300. x.on_usage_error(x.UsageError('foo'), command='dummy')
  301. def test_prepare_prog_name(self):
  302. x = CeleryCommand(app=self.app)
  303. main = Mock(name='__main__')
  304. main.__file__ = '/opt/foo.py'
  305. with patch.dict(sys.modules, __main__=main):
  306. self.assertEqual(x.prepare_prog_name('__main__.py'), '/opt/foo.py')
  307. self.assertEqual(x.prepare_prog_name('celery'), 'celery')
  308. class test_RemoteControl(AppCase):
  309. def test_call_interface(self):
  310. with self.assertRaises(NotImplementedError):
  311. _RemoteControl(app=self.app).call()
  312. class test_inspect(AppCase):
  313. def test_usage(self):
  314. self.assertTrue(inspect(app=self.app).usage('foo'))
  315. def test_command_info(self):
  316. i = inspect(app=self.app)
  317. self.assertTrue(i.get_command_info(
  318. 'ping', help=True, color=i.colored.red,
  319. ))
  320. def test_list_commands_color(self):
  321. i = inspect(app=self.app)
  322. self.assertTrue(i.list_commands(
  323. help=True, color=i.colored.red,
  324. ))
  325. self.assertTrue(i.list_commands(
  326. help=False, color=None,
  327. ))
  328. def test_epilog(self):
  329. self.assertTrue(inspect(app=self.app).epilog)
  330. def test_do_call_method_sql_transport_type(self):
  331. self.app.connection = Mock()
  332. conn = self.app.connection.return_value = Mock(name='Connection')
  333. conn.transport.driver_type = 'sql'
  334. i = inspect(app=self.app)
  335. with self.assertRaises(i.Error):
  336. i.do_call_method(['ping'])
  337. def test_say_directions(self):
  338. i = inspect(self.app)
  339. i.out = Mock()
  340. i.quiet = True
  341. i.say_chat('<-', 'hello out')
  342. self.assertFalse(i.out.called)
  343. i.say_chat('->', 'hello in')
  344. self.assertTrue(i.out.called)
  345. i.quiet = False
  346. i.out.reset_mock()
  347. i.say_chat('<-', 'hello out', 'body')
  348. self.assertTrue(i.out.called)
  349. @patch('celery.app.control.Control.inspect')
  350. def test_run(self, real):
  351. out = WhateverIO()
  352. i = inspect(app=self.app, stdout=out)
  353. with self.assertRaises(Error):
  354. i.run()
  355. with self.assertRaises(Error):
  356. i.run('help')
  357. with self.assertRaises(Error):
  358. i.run('xyzzybaz')
  359. i.run('ping')
  360. self.assertTrue(real.called)
  361. i.run('ping', destination='foo,bar')
  362. self.assertEqual(real.call_args[1]['destination'], ['foo', 'bar'])
  363. self.assertEqual(real.call_args[1]['timeout'], 0.2)
  364. callback = real.call_args[1]['callback']
  365. callback({'foo': {'ok': 'pong'}})
  366. self.assertIn('OK', out.getvalue())
  367. instance = real.return_value = Mock()
  368. instance.ping.return_value = None
  369. with self.assertRaises(Error):
  370. i.run('ping')
  371. out.seek(0)
  372. out.truncate()
  373. i.quiet = True
  374. i.say_chat('<-', 'hello')
  375. self.assertFalse(out.getvalue())
  376. class test_control(AppCase):
  377. def control(self, patch_call, *args, **kwargs):
  378. kwargs.setdefault('app', Mock(name='app'))
  379. c = control(*args, **kwargs)
  380. if patch_call:
  381. c.call = Mock(name='control.call')
  382. return c
  383. def test_call(self):
  384. i = self.control(False)
  385. i.call('foo', 1, kw=2)
  386. i.app.control.foo.assert_called_with(1, kw=2, reply=True)
  387. def test_pool_grow(self):
  388. i = self.control(True)
  389. i.pool_grow('pool_grow', n=2)
  390. i.call.assert_called_with('pool_grow', 2)
  391. def test_pool_shrink(self):
  392. i = self.control(True)
  393. i.pool_shrink('pool_shrink', n=2)
  394. i.call.assert_called_with('pool_shrink', 2)
  395. def test_autoscale(self):
  396. i = self.control(True)
  397. i.autoscale('autoscale', max=3, min=2)
  398. i.call.assert_called_with('autoscale', 3, 2)
  399. def test_rate_limit(self):
  400. i = self.control(True)
  401. i.rate_limit('rate_limit', 'proj.add', '1/s')
  402. i.call.assert_called_with('rate_limit', 'proj.add', '1/s', reply=True)
  403. def test_time_limit(self):
  404. i = self.control(True)
  405. i.time_limit('time_limit', 'proj.add', 10, 30)
  406. i.call.assert_called_with('time_limit', 'proj.add', 10, 30, reply=True)
  407. def test_add_consumer(self):
  408. i = self.control(True)
  409. i.add_consumer(
  410. 'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
  411. durable=True,
  412. )
  413. i.call.assert_called_with(
  414. 'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
  415. durable=True, reply=True,
  416. )
  417. def test_cancel_consumer(self):
  418. i = self.control(True)
  419. i.cancel_consumer('cancel_consumer', 'queue')
  420. i.call.assert_called_with('cancel_consumer', 'queue', reply=True)
  421. class test_multi(AppCase):
  422. def test_get_options(self):
  423. self.assertTupleEqual(multi(app=self.app).get_options(), ())
  424. def test_run_from_argv(self):
  425. with patch('celery.bin.multi.MultiTool') as MultiTool:
  426. m = MultiTool.return_value = Mock()
  427. multi(self.app).run_from_argv('celery', ['arg'], command='multi')
  428. m.execute_from_commandline.assert_called_with(
  429. ['multi', 'arg'], 'celery',
  430. )
  431. class test_main(AppCase):
  432. @patch('celery.bin.celery.CeleryCommand')
  433. def test_main(self, Command):
  434. cmd = Command.return_value = Mock()
  435. mainfun()
  436. cmd.execute_from_commandline.assert_called_with(None)
  437. @patch('celery.bin.celery.CeleryCommand')
  438. def test_main_KeyboardInterrupt(self, Command):
  439. cmd = Command.return_value = Mock()
  440. cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
  441. mainfun()
  442. cmd.execute_from_commandline.assert_called_with(None)
  443. class test_compat(AppCase):
  444. def test_compat_command_decorator(self):
  445. with patch('celery.bin.celery.CeleryCommand') as CC:
  446. self.assertEqual(command(), CC.register_command)
  447. fun = Mock(name='fun')
  448. command(fun)
  449. CC.register_command.assert_called_with(fun)