test_app.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. from __future__ import absolute_import
  2. import os
  3. import itertools
  4. from copy import deepcopy
  5. from mock import Mock, patch
  6. from pickle import loads, dumps
  7. from kombu import Exchange
  8. from celery import shared_task, current_app
  9. from celery import app as _app
  10. from celery import _state
  11. from celery.app import base as _appbase
  12. from celery.app import defaults
  13. from celery.exceptions import ImproperlyConfigured
  14. from celery.five import items
  15. from celery.loaders.base import BaseLoader
  16. from celery.platforms import pyimplementation
  17. from celery.utils.serialization import pickle
  18. from celery.tests.case import (
  19. CELERY_TEST_CONFIG,
  20. AppCase,
  21. depends_on_current_app,
  22. mask_modules,
  23. platform_pyimp,
  24. sys_platform,
  25. pypy_version,
  26. with_environ,
  27. )
  28. from celery.utils import uuid
  29. from celery.utils.mail import ErrorMail
  30. THIS_IS_A_KEY = 'this is a value'
  31. class ObjectConfig(object):
  32. FOO = 1
  33. BAR = 2
  34. object_config = ObjectConfig()
  35. dict_config = dict(FOO=10, BAR=20)
  36. class Object(object):
  37. def __init__(self, **kwargs):
  38. for key, value in items(kwargs):
  39. setattr(self, key, value)
  40. def _get_test_config():
  41. return deepcopy(CELERY_TEST_CONFIG)
  42. test_config = _get_test_config()
  43. class test_module(AppCase):
  44. def test_default_app(self):
  45. self.assertEqual(_app.default_app, _state.default_app)
  46. def test_bugreport(self):
  47. self.assertTrue(_app.bugreport(app=self.app))
  48. class test_App(AppCase):
  49. def setup(self):
  50. self.app.add_defaults(test_config)
  51. def test_task(self):
  52. with self.Celery('foozibari') as app:
  53. def fun():
  54. pass
  55. fun.__module__ = '__main__'
  56. task = app.task(fun)
  57. self.assertEqual(task.name, app.main + '.fun')
  58. def test_with_config_source(self):
  59. with self.Celery(config_source=ObjectConfig) as app:
  60. self.assertEqual(app.conf.FOO, 1)
  61. self.assertEqual(app.conf.BAR, 2)
  62. @depends_on_current_app
  63. def test_task_windows_execv(self):
  64. prev, _appbase._EXECV = _appbase._EXECV, True
  65. try:
  66. @self.app.task(shared=False)
  67. def foo():
  68. pass
  69. self.assertTrue(foo._get_current_object()) # is proxy
  70. finally:
  71. _appbase._EXECV = prev
  72. assert not _appbase._EXECV
  73. def test_task_takes_no_args(self):
  74. with self.assertRaises(TypeError):
  75. @self.app.task(1)
  76. def foo():
  77. pass
  78. def test_add_defaults(self):
  79. self.assertFalse(self.app.configured)
  80. _conf = {'FOO': 300}
  81. conf = lambda: _conf
  82. self.app.add_defaults(conf)
  83. self.assertIn(conf, self.app._pending_defaults)
  84. self.assertFalse(self.app.configured)
  85. self.assertEqual(self.app.conf.FOO, 300)
  86. self.assertTrue(self.app.configured)
  87. self.assertFalse(self.app._pending_defaults)
  88. # defaults not pickled
  89. appr = loads(dumps(self.app))
  90. with self.assertRaises(AttributeError):
  91. appr.conf.FOO
  92. # add more defaults after configured
  93. conf2 = {'FOO': 'BAR'}
  94. self.app.add_defaults(conf2)
  95. self.assertEqual(self.app.conf.FOO, 'BAR')
  96. self.assertIn(_conf, self.app.conf.defaults)
  97. self.assertIn(conf2, self.app.conf.defaults)
  98. def test_connection_or_acquire(self):
  99. with self.app.connection_or_acquire(block=True):
  100. self.assertTrue(self.app.pool._dirty)
  101. with self.app.connection_or_acquire(pool=False):
  102. self.assertFalse(self.app.pool._dirty)
  103. def test_maybe_close_pool(self):
  104. cpool = self.app._pool = Mock()
  105. ppool = self.app.amqp._producer_pool = Mock()
  106. self.app._maybe_close_pool()
  107. cpool.force_close_all.assert_called_with()
  108. ppool.force_close_all.assert_called_with()
  109. self.assertIsNone(self.app._pool)
  110. self.assertIsNone(self.app.amqp._producer_pool)
  111. self.app._pool = Mock()
  112. self.app._maybe_close_pool()
  113. self.app._maybe_close_pool()
  114. def test_using_v1_reduce(self):
  115. self.app._using_v1_reduce = True
  116. self.assertTrue(loads(dumps(self.app)))
  117. def test_autodiscover_tasks(self):
  118. self.app.conf.CELERY_FORCE_BILLIARD_LOGGING = True
  119. with patch('celery.app.base.ensure_process_aware_logger') as ep:
  120. self.app.loader.autodiscover_tasks = Mock()
  121. self.app.autodiscover_tasks(['proj.A', 'proj.B'])
  122. ep.assert_called_with()
  123. self.app.loader.autodiscover_tasks.assert_called_with(
  124. ['proj.A', 'proj.B'], 'tasks',
  125. )
  126. with patch('celery.app.base.ensure_process_aware_logger') as ep:
  127. self.app.conf.CELERY_FORCE_BILLIARD_LOGGING = False
  128. self.app.autodiscover_tasks(['proj.A', 'proj.B'])
  129. self.assertFalse(ep.called)
  130. @with_environ('CELERY_BROKER_URL', '')
  131. def test_with_broker(self):
  132. with self.Celery(broker='foo://baribaz') as app:
  133. self.assertEqual(app.conf.BROKER_URL, 'foo://baribaz')
  134. def test_repr(self):
  135. self.assertTrue(repr(self.app))
  136. def test_custom_task_registry(self):
  137. with self.Celery(tasks=self.app.tasks) as app2:
  138. self.assertIs(app2.tasks, self.app.tasks)
  139. def test_include_argument(self):
  140. with self.Celery(include=('foo', 'bar.foo')) as app:
  141. self.assertEqual(app.conf.CELERY_IMPORTS, ('foo', 'bar.foo'))
  142. def test_set_as_current(self):
  143. current = _state._tls.current_app
  144. try:
  145. app = self.Celery(set_as_current=True)
  146. self.assertIs(_state._tls.current_app, app)
  147. finally:
  148. _state._tls.current_app = current
  149. def test_current_task(self):
  150. @self.app.task
  151. def foo(shared=False):
  152. pass
  153. _state._task_stack.push(foo)
  154. try:
  155. self.assertEqual(self.app.current_task.name, foo.name)
  156. finally:
  157. _state._task_stack.pop()
  158. def test_task_not_shared(self):
  159. with patch('celery.app.base.shared_task') as sh:
  160. @self.app.task(shared=False)
  161. def foo():
  162. pass
  163. self.assertFalse(sh.called)
  164. def test_task_compat_with_filter(self):
  165. with self.Celery(accept_magic_kwargs=True) as app:
  166. check = Mock()
  167. def filter(task):
  168. check(task)
  169. return task
  170. @app.task(filter=filter, shared=False)
  171. def foo():
  172. pass
  173. check.assert_called_with(foo)
  174. def test_task_with_filter(self):
  175. with self.Celery(accept_magic_kwargs=False) as app:
  176. check = Mock()
  177. def filter(task):
  178. check(task)
  179. return task
  180. assert not _appbase._EXECV
  181. @app.task(filter=filter, shared=False)
  182. def foo():
  183. pass
  184. check.assert_called_with(foo)
  185. def test_task_sets_main_name_MP_MAIN_FILE(self):
  186. from celery import utils as _utils
  187. _utils.MP_MAIN_FILE = __file__
  188. try:
  189. with self.Celery('xuzzy') as app:
  190. @app.task
  191. def foo():
  192. pass
  193. self.assertEqual(foo.name, 'xuzzy.foo')
  194. finally:
  195. _utils.MP_MAIN_FILE = None
  196. def test_annotate_decorator(self):
  197. from celery.app.task import Task
  198. class adX(Task):
  199. abstract = True
  200. def run(self, y, z, x):
  201. return y, z, x
  202. check = Mock()
  203. def deco(fun):
  204. def _inner(*args, **kwargs):
  205. check(*args, **kwargs)
  206. return fun(*args, **kwargs)
  207. return _inner
  208. self.app.conf.CELERY_ANNOTATIONS = {
  209. adX.name: {'@__call__': deco}
  210. }
  211. adX.bind(self.app)
  212. self.assertIs(adX.app, self.app)
  213. i = adX()
  214. i(2, 4, x=3)
  215. check.assert_called_with(i, 2, 4, x=3)
  216. i.annotate()
  217. i.annotate()
  218. def test_apply_async_has__self__(self):
  219. @self.app.task(__self__='hello', shared=False)
  220. def aawsX():
  221. pass
  222. with patch('celery.app.amqp.TaskProducer.publish_task') as dt:
  223. aawsX.apply_async((4, 5))
  224. args = dt.call_args[0][1]
  225. self.assertEqual(args, ('hello', 4, 5))
  226. def test_apply_async_adds_children(self):
  227. from celery._state import _task_stack
  228. @self.app.task(shared=False)
  229. def a3cX1(self):
  230. pass
  231. @self.app.task(shared=False)
  232. def a3cX2(self):
  233. pass
  234. _task_stack.push(a3cX1)
  235. try:
  236. a3cX1.push_request(called_directly=False)
  237. try:
  238. res = a3cX2.apply_async(add_to_parent=True)
  239. self.assertIn(res, a3cX1.request.children)
  240. finally:
  241. a3cX1.pop_request()
  242. finally:
  243. _task_stack.pop()
  244. def test_pickle_app(self):
  245. changes = dict(THE_FOO_BAR='bars',
  246. THE_MII_MAR='jars')
  247. self.app.conf.update(changes)
  248. saved = pickle.dumps(self.app)
  249. self.assertLess(len(saved), 2048)
  250. restored = pickle.loads(saved)
  251. self.assertDictContainsSubset(changes, restored.conf)
  252. def test_worker_main(self):
  253. from celery.bin import worker as worker_bin
  254. class worker(worker_bin.worker):
  255. def execute_from_commandline(self, argv):
  256. return argv
  257. prev, worker_bin.worker = worker_bin.worker, worker
  258. try:
  259. ret = self.app.worker_main(argv=['--version'])
  260. self.assertListEqual(ret, ['--version'])
  261. finally:
  262. worker_bin.worker = prev
  263. def test_config_from_envvar(self):
  264. os.environ['CELERYTEST_CONFIG_OBJECT'] = 'celery.tests.app.test_app'
  265. self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')
  266. self.assertEqual(self.app.conf.THIS_IS_A_KEY, 'this is a value')
  267. def test_config_from_object(self):
  268. class Object(object):
  269. LEAVE_FOR_WORK = True
  270. MOMENT_TO_STOP = True
  271. CALL_ME_BACK = 123456789
  272. WANT_ME_TO = False
  273. UNDERSTAND_ME = True
  274. self.app.config_from_object(Object())
  275. self.assertTrue(self.app.conf.LEAVE_FOR_WORK)
  276. self.assertTrue(self.app.conf.MOMENT_TO_STOP)
  277. self.assertEqual(self.app.conf.CALL_ME_BACK, 123456789)
  278. self.assertFalse(self.app.conf.WANT_ME_TO)
  279. self.assertTrue(self.app.conf.UNDERSTAND_ME)
  280. def test_config_from_cmdline(self):
  281. cmdline = ['.always_eager=no',
  282. '.result_backend=/dev/null',
  283. 'celeryd.prefetch_multiplier=368',
  284. '.foobarstring=(string)300',
  285. '.foobarint=(int)300',
  286. '.result_engine_options=(dict){"foo": "bar"}']
  287. self.app.config_from_cmdline(cmdline, namespace='celery')
  288. self.assertFalse(self.app.conf.CELERY_ALWAYS_EAGER)
  289. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, '/dev/null')
  290. self.assertEqual(self.app.conf.CELERYD_PREFETCH_MULTIPLIER, 368)
  291. self.assertEqual(self.app.conf.CELERY_FOOBARSTRING, '300')
  292. self.assertEqual(self.app.conf.CELERY_FOOBARINT, 300)
  293. self.assertDictEqual(self.app.conf.CELERY_RESULT_ENGINE_OPTIONS,
  294. {'foo': 'bar'})
  295. def test_compat_setting_CELERY_BACKEND(self):
  296. self.app.config_from_object(Object(CELERY_BACKEND='set_by_us'))
  297. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, 'set_by_us')
  298. def test_setting_BROKER_TRANSPORT_OPTIONS(self):
  299. _args = {'foo': 'bar', 'spam': 'baz'}
  300. self.app.config_from_object(Object())
  301. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, {})
  302. self.app.config_from_object(Object(BROKER_TRANSPORT_OPTIONS=_args))
  303. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, _args)
  304. def test_Windows_log_color_disabled(self):
  305. self.app.IS_WINDOWS = True
  306. self.assertFalse(self.app.log.supports_color(True))
  307. def test_compat_setting_CARROT_BACKEND(self):
  308. self.app.config_from_object(Object(CARROT_BACKEND='set_by_us'))
  309. self.assertEqual(self.app.conf.BROKER_TRANSPORT, 'set_by_us')
  310. def test_WorkController(self):
  311. x = self.app.WorkController
  312. self.assertIs(x.app, self.app)
  313. def test_Worker(self):
  314. x = self.app.Worker
  315. self.assertIs(x.app, self.app)
  316. @depends_on_current_app
  317. def test_AsyncResult(self):
  318. x = self.app.AsyncResult('1')
  319. self.assertIs(x.app, self.app)
  320. r = loads(dumps(x))
  321. # not set as current, so ends up as default app after reduce
  322. self.assertIs(r.app, current_app._get_current_object())
  323. def test_get_active_apps(self):
  324. self.assertTrue(list(_state._get_active_apps()))
  325. app1 = self.Celery()
  326. appid = id(app1)
  327. self.assertIn(app1, _state._get_active_apps())
  328. app1.close()
  329. del(app1)
  330. # weakref removed from list when app goes out of scope.
  331. with self.assertRaises(StopIteration):
  332. next(app for app in _state._get_active_apps() if id(app) == appid)
  333. def test_config_from_envvar_more(self, key='CELERY_HARNESS_CFG1'):
  334. self.assertFalse(self.app.config_from_envvar('HDSAJIHWIQHEWQU',
  335. silent=True))
  336. with self.assertRaises(ImproperlyConfigured):
  337. self.app.config_from_envvar('HDSAJIHWIQHEWQU', silent=False)
  338. os.environ[key] = __name__ + '.object_config'
  339. self.assertTrue(self.app.config_from_envvar(key))
  340. self.assertEqual(self.app.conf['FOO'], 1)
  341. self.assertEqual(self.app.conf['BAR'], 2)
  342. os.environ[key] = 'unknown_asdwqe.asdwqewqe'
  343. with self.assertRaises(ImportError):
  344. self.app.config_from_envvar(key, silent=False)
  345. self.assertFalse(self.app.config_from_envvar(key, silent=True))
  346. os.environ[key] = __name__ + '.dict_config'
  347. self.assertTrue(self.app.config_from_envvar(key))
  348. self.assertEqual(self.app.conf['FOO'], 10)
  349. self.assertEqual(self.app.conf['BAR'], 20)
  350. @patch('celery.bin.celery.CeleryCommand.execute_from_commandline')
  351. def test_start(self, execute):
  352. self.app.start()
  353. self.assertTrue(execute.called)
  354. def test_mail_admins(self):
  355. class Loader(BaseLoader):
  356. def mail_admins(*args, **kwargs):
  357. return args, kwargs
  358. self.app.loader = Loader(app=self.app)
  359. self.app.conf.ADMINS = None
  360. self.assertFalse(self.app.mail_admins('Subject', 'Body'))
  361. self.app.conf.ADMINS = [('George Costanza', 'george@vandelay.com')]
  362. self.assertTrue(self.app.mail_admins('Subject', 'Body'))
  363. def test_amqp_get_broker_info(self):
  364. self.assertDictContainsSubset(
  365. {'hostname': 'localhost',
  366. 'userid': 'guest',
  367. 'password': 'guest',
  368. 'virtual_host': '/'},
  369. self.app.connection('pyamqp://').info(),
  370. )
  371. self.app.conf.BROKER_PORT = 1978
  372. self.app.conf.BROKER_VHOST = 'foo'
  373. self.assertDictContainsSubset(
  374. {'port': 1978, 'virtual_host': 'foo'},
  375. self.app.connection('pyamqp://:1978/foo').info(),
  376. )
  377. conn = self.app.connection('pyamqp:////value')
  378. self.assertDictContainsSubset({'virtual_host': '/value'},
  379. conn.info())
  380. def test_amqp_failover_strategy_selection(self):
  381. # Test passing in a string and make sure the string
  382. # gets there untouched
  383. self.app.conf.BROKER_FAILOVER_STRATEGY = 'foo-bar'
  384. self.assertEquals(
  385. self.app.connection('amqp:////value').failover_strategy,
  386. 'foo-bar',
  387. )
  388. # Try passing in None
  389. self.app.conf.BROKER_FAILOVER_STRATEGY = None
  390. self.assertEquals(
  391. self.app.connection('amqp:////value').failover_strategy,
  392. itertools.cycle,
  393. )
  394. # Test passing in a method
  395. def my_failover_strategy(it):
  396. yield True
  397. self.app.conf.BROKER_FAILOVER_STRATEGY = my_failover_strategy
  398. self.assertEquals(
  399. self.app.connection('amqp:////value').failover_strategy,
  400. my_failover_strategy,
  401. )
  402. def test_BROKER_BACKEND_alias(self):
  403. self.assertEqual(self.app.conf.BROKER_BACKEND,
  404. self.app.conf.BROKER_TRANSPORT)
  405. def test_after_fork(self):
  406. p = self.app._pool = Mock()
  407. self.app._after_fork(self.app)
  408. p.force_close_all.assert_called_with()
  409. self.assertIsNone(self.app._pool)
  410. self.app._after_fork(self.app)
  411. def test_pool_no_multiprocessing(self):
  412. with mask_modules('multiprocessing.util'):
  413. pool = self.app.pool
  414. self.assertIs(pool, self.app._pool)
  415. def test_bugreport(self):
  416. self.assertTrue(self.app.bugreport())
  417. def test_send_task_sent_event(self):
  418. class Dispatcher(object):
  419. sent = []
  420. def publish(self, type, fields, *args, **kwargs):
  421. self.sent.append((type, fields))
  422. conn = self.app.connection()
  423. chan = conn.channel()
  424. try:
  425. for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):
  426. chan.exchange_declare(e, 'direct', durable=True)
  427. chan.queue_declare(e, durable=True)
  428. chan.queue_bind(e, e, e)
  429. finally:
  430. chan.close()
  431. assert conn.transport_cls == 'memory'
  432. prod = self.app.amqp.TaskProducer(
  433. conn, exchange=Exchange('foo_exchange'),
  434. send_sent_event=True,
  435. )
  436. dispatcher = Dispatcher()
  437. self.assertTrue(prod.publish_task('footask', (), {},
  438. exchange='moo_exchange',
  439. routing_key='moo_exchange',
  440. event_dispatcher=dispatcher))
  441. self.assertTrue(dispatcher.sent)
  442. self.assertEqual(dispatcher.sent[0][0], 'task-sent')
  443. self.assertTrue(prod.publish_task('footask', (), {},
  444. event_dispatcher=dispatcher,
  445. exchange='bar_exchange',
  446. routing_key='bar_exchange'))
  447. def test_error_mail_sender(self):
  448. x = ErrorMail.subject % {'name': 'task_name',
  449. 'id': uuid(),
  450. 'exc': 'FOOBARBAZ',
  451. 'hostname': 'lana'}
  452. self.assertTrue(x)
  453. def test_error_mail_disabled(self):
  454. task = Mock()
  455. x = ErrorMail(task)
  456. x.should_send = Mock()
  457. x.should_send.return_value = False
  458. x.send(Mock(), Mock())
  459. self.assertFalse(task.app.mail_admins.called)
  460. class test_defaults(AppCase):
  461. def test_str_to_bool(self):
  462. for s in ('false', 'no', '0'):
  463. self.assertFalse(defaults.strtobool(s))
  464. for s in ('true', 'yes', '1'):
  465. self.assertTrue(defaults.strtobool(s))
  466. with self.assertRaises(TypeError):
  467. defaults.strtobool('unsure')
  468. class test_debugging_utils(AppCase):
  469. def test_enable_disable_trace(self):
  470. try:
  471. _app.enable_trace()
  472. self.assertEqual(_app.app_or_default, _app._app_or_default_trace)
  473. _app.disable_trace()
  474. self.assertEqual(_app.app_or_default, _app._app_or_default)
  475. finally:
  476. _app.disable_trace()
  477. class test_pyimplementation(AppCase):
  478. def test_platform_python_implementation(self):
  479. with platform_pyimp(lambda: 'Xython'):
  480. self.assertEqual(pyimplementation(), 'Xython')
  481. def test_platform_jython(self):
  482. with platform_pyimp():
  483. with sys_platform('java 1.6.51'):
  484. self.assertIn('Jython', pyimplementation())
  485. def test_platform_pypy(self):
  486. with platform_pyimp():
  487. with sys_platform('darwin'):
  488. with pypy_version((1, 4, 3)):
  489. self.assertIn('PyPy', pyimplementation())
  490. with pypy_version((1, 4, 3, 'a4')):
  491. self.assertIn('PyPy', pyimplementation())
  492. def test_platform_fallback(self):
  493. with platform_pyimp():
  494. with sys_platform('darwin'):
  495. with pypy_version():
  496. self.assertEqual('CPython', pyimplementation())
  497. class test_shared_task(AppCase):
  498. def test_registers_to_all_apps(self):
  499. with self.Celery('xproj', set_as_current=True) as xproj:
  500. xproj.finalize()
  501. @shared_task
  502. def foo():
  503. return 42
  504. @shared_task()
  505. def bar():
  506. return 84
  507. self.assertIs(foo.app, xproj)
  508. self.assertIs(bar.app, xproj)
  509. self.assertTrue(foo._get_current_object())
  510. with self.Celery('yproj', set_as_current=True) as yproj:
  511. self.assertIs(foo.app, yproj)
  512. self.assertIs(bar.app, yproj)
  513. @shared_task()
  514. def baz():
  515. return 168
  516. self.assertIs(baz.app, yproj)