test_beat.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. from __future__ import absolute_import
  2. import errno
  3. from datetime import datetime, timedelta
  4. from mock import Mock, call, patch
  5. from nose import SkipTest
  6. from pickle import dumps, loads
  7. from celery import beat
  8. from celery.five import keys, string_t
  9. from celery.schedules import schedule
  10. from celery.utils import uuid
  11. from celery.tests.case import AppCase
  12. class Object(object):
  13. pass
  14. class MockShelve(dict):
  15. closed = False
  16. synced = False
  17. def close(self):
  18. self.closed = True
  19. def sync(self):
  20. self.synced = True
  21. class MockService(object):
  22. started = False
  23. stopped = False
  24. def __init__(self, *args, **kwargs):
  25. pass
  26. def start(self, **kwargs):
  27. self.started = True
  28. def stop(self, **kwargs):
  29. self.stopped = True
  30. class test_ScheduleEntry(AppCase):
  31. Entry = beat.ScheduleEntry
  32. def create_entry(self, **kwargs):
  33. entry = dict(
  34. name='celery.unittest.add',
  35. schedule=timedelta(seconds=10),
  36. args=(2, 2),
  37. options={'routing_key': 'cpu'},
  38. app=self.app,
  39. )
  40. return self.Entry(**dict(entry, **kwargs))
  41. def test_next(self):
  42. entry = self.create_entry(schedule=10)
  43. self.assertTrue(entry.last_run_at)
  44. self.assertIsInstance(entry.last_run_at, datetime)
  45. self.assertEqual(entry.total_run_count, 0)
  46. next_run_at = entry.last_run_at + timedelta(seconds=10)
  47. next_entry = entry.next(next_run_at)
  48. self.assertGreaterEqual(next_entry.last_run_at, next_run_at)
  49. self.assertEqual(next_entry.total_run_count, 1)
  50. def test_is_due(self):
  51. entry = self.create_entry(schedule=timedelta(seconds=10))
  52. self.assertIs(entry.app, self.app)
  53. self.assertIs(entry.schedule.app, self.app)
  54. due1, next_time_to_run1 = entry.is_due()
  55. self.assertFalse(due1)
  56. self.assertGreater(next_time_to_run1, 9)
  57. next_run_at = entry.last_run_at - timedelta(seconds=10)
  58. next_entry = entry.next(next_run_at)
  59. due2, next_time_to_run2 = next_entry.is_due()
  60. self.assertTrue(due2)
  61. self.assertGreater(next_time_to_run2, 9)
  62. def test_repr(self):
  63. entry = self.create_entry()
  64. self.assertIn('<Entry:', repr(entry))
  65. def test_update(self):
  66. entry = self.create_entry()
  67. self.assertEqual(entry.schedule, timedelta(seconds=10))
  68. self.assertTupleEqual(entry.args, (2, 2))
  69. self.assertDictEqual(entry.kwargs, {})
  70. self.assertDictEqual(entry.options, {'routing_key': 'cpu'})
  71. entry2 = self.create_entry(schedule=timedelta(minutes=20),
  72. args=(16, 16),
  73. kwargs={'callback': 'foo.bar.baz'},
  74. options={'routing_key': 'urgent'})
  75. entry.update(entry2)
  76. self.assertEqual(entry.schedule, schedule(timedelta(minutes=20)))
  77. self.assertTupleEqual(entry.args, (16, 16))
  78. self.assertDictEqual(entry.kwargs, {'callback': 'foo.bar.baz'})
  79. self.assertDictEqual(entry.options, {'routing_key': 'urgent'})
  80. class mScheduler(beat.Scheduler):
  81. def __init__(self, *args, **kwargs):
  82. self.sent = []
  83. beat.Scheduler.__init__(self, *args, **kwargs)
  84. def send_task(self, name=None, args=None, kwargs=None, **options):
  85. self.sent.append({'name': name,
  86. 'args': args,
  87. 'kwargs': kwargs,
  88. 'options': options})
  89. return self.app.AsyncResult(uuid())
  90. class mSchedulerSchedulingError(mScheduler):
  91. def send_task(self, *args, **kwargs):
  92. raise beat.SchedulingError('Could not apply task')
  93. class mSchedulerRuntimeError(mScheduler):
  94. def maybe_due(self, *args, **kwargs):
  95. raise RuntimeError('dict modified while itervalues')
  96. class mocked_schedule(schedule):
  97. def __init__(self, is_due, next_run_at):
  98. self._is_due = is_due
  99. self._next_run_at = next_run_at
  100. self.run_every = timedelta(seconds=1)
  101. self.nowfun = datetime.utcnow
  102. def is_due(self, last_run_at):
  103. return self._is_due, self._next_run_at
  104. always_due = mocked_schedule(True, 1)
  105. always_pending = mocked_schedule(False, 1)
  106. class test_Scheduler(AppCase):
  107. def test_custom_schedule_dict(self):
  108. custom = {'foo': 'bar'}
  109. scheduler = mScheduler(app=self.app, schedule=custom, lazy=True)
  110. self.assertIs(scheduler.data, custom)
  111. def test_apply_async_uses_registered_task_instances(self):
  112. @self.app.task(shared=False)
  113. def foo():
  114. pass
  115. foo.apply_async = Mock(name='foo.apply_async')
  116. assert foo.name in foo._get_app().tasks
  117. scheduler = mScheduler(app=self.app)
  118. scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
  119. self.assertTrue(foo.apply_async.called)
  120. def test_apply_async_should_not_sync(self):
  121. @self.app.task(shared=False)
  122. def not_sync():
  123. pass
  124. not_sync.apply_async = Mock()
  125. s = mScheduler(app=self.app)
  126. s._do_sync = Mock()
  127. s.should_sync = Mock()
  128. s.should_sync.return_value = True
  129. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  130. s._do_sync.assert_called_with()
  131. s._do_sync = Mock()
  132. s.should_sync.return_value = False
  133. s.apply_async(s.Entry(task=not_sync.name, app=self.app))
  134. self.assertFalse(s._do_sync.called)
  135. @patch('celery.app.base.Celery.send_task')
  136. def test_send_task(self, send_task):
  137. b = beat.Scheduler(app=self.app)
  138. b.send_task('tasks.add', countdown=10)
  139. send_task.assert_called_with('tasks.add', countdown=10)
  140. def test_info(self):
  141. scheduler = mScheduler(app=self.app)
  142. self.assertIsInstance(scheduler.info, string_t)
  143. def test_maybe_entry(self):
  144. s = mScheduler(app=self.app)
  145. entry = s.Entry(name='add every', task='tasks.add', app=self.app)
  146. self.assertIs(s._maybe_entry(entry.name, entry), entry)
  147. self.assertTrue(s._maybe_entry('add every', {
  148. 'task': 'tasks.add',
  149. }))
  150. def test_set_schedule(self):
  151. s = mScheduler(app=self.app)
  152. s.schedule = {'foo': 'bar'}
  153. self.assertEqual(s.data, {'foo': 'bar'})
  154. @patch('kombu.connection.Connection.ensure_connection')
  155. def test_ensure_connection_error_handler(self, ensure):
  156. s = mScheduler(app=self.app)
  157. self.assertTrue(s._ensure_connected())
  158. self.assertTrue(ensure.called)
  159. callback = ensure.call_args[0][0]
  160. callback(KeyError(), 5)
  161. def test_install_default_entries(self):
  162. self.app.conf.CELERY_TASK_RESULT_EXPIRES = None
  163. self.app.conf.CELERYBEAT_SCHEDULE = {}
  164. s = mScheduler(app=self.app)
  165. s.install_default_entries({})
  166. self.assertNotIn('celery.backend_cleanup', s.data)
  167. self.app.backend.supports_autoexpire = False
  168. self.app.conf.CELERY_TASK_RESULT_EXPIRES = 30
  169. s = mScheduler(app=self.app)
  170. s.install_default_entries({})
  171. self.assertIn('celery.backend_cleanup', s.data)
  172. self.app.backend.supports_autoexpire = True
  173. self.app.conf.CELERY_TASK_RESULT_EXPIRES = 31
  174. s = mScheduler(app=self.app)
  175. s.install_default_entries({})
  176. self.assertNotIn('celery.backend_cleanup', s.data)
  177. def test_due_tick(self):
  178. scheduler = mScheduler(app=self.app)
  179. scheduler.add(name='test_due_tick',
  180. schedule=always_due,
  181. args=(1, 2),
  182. kwargs={'foo': 'bar'})
  183. self.assertEqual(scheduler.tick(), 1)
  184. @patch('celery.beat.error')
  185. def test_due_tick_SchedulingError(self, error):
  186. scheduler = mSchedulerSchedulingError(app=self.app)
  187. scheduler.add(name='test_due_tick_SchedulingError',
  188. schedule=always_due)
  189. self.assertEqual(scheduler.tick(), 1)
  190. self.assertTrue(error.called)
  191. def test_due_tick_RuntimeError(self):
  192. scheduler = mSchedulerRuntimeError(app=self.app)
  193. scheduler.add(name='test_due_tick_RuntimeError',
  194. schedule=always_due)
  195. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  196. def test_pending_tick(self):
  197. scheduler = mScheduler(app=self.app)
  198. scheduler.add(name='test_pending_tick',
  199. schedule=always_pending)
  200. self.assertEqual(scheduler.tick(), 1)
  201. def test_honors_max_interval(self):
  202. scheduler = mScheduler(app=self.app)
  203. maxi = scheduler.max_interval
  204. scheduler.add(name='test_honors_max_interval',
  205. schedule=mocked_schedule(False, maxi * 4))
  206. self.assertEqual(scheduler.tick(), maxi)
  207. def test_ticks(self):
  208. scheduler = mScheduler(app=self.app)
  209. nums = [600, 300, 650, 120, 250, 36]
  210. s = dict(('test_ticks%s' % i,
  211. {'schedule': mocked_schedule(False, j)})
  212. for i, j in enumerate(nums))
  213. scheduler.update_from_dict(s)
  214. self.assertEqual(scheduler.tick(), min(nums))
  215. def test_schedule_no_remain(self):
  216. scheduler = mScheduler(app=self.app)
  217. scheduler.add(name='test_schedule_no_remain',
  218. schedule=mocked_schedule(False, None))
  219. self.assertEqual(scheduler.tick(), scheduler.max_interval)
  220. def test_interface(self):
  221. scheduler = mScheduler(app=self.app)
  222. scheduler.sync()
  223. scheduler.setup_schedule()
  224. scheduler.close()
  225. def test_merge_inplace(self):
  226. a = mScheduler(app=self.app)
  227. b = mScheduler(app=self.app)
  228. a.update_from_dict({'foo': {'schedule': mocked_schedule(True, 10)},
  229. 'bar': {'schedule': mocked_schedule(True, 20)}})
  230. b.update_from_dict({'bar': {'schedule': mocked_schedule(True, 40)},
  231. 'baz': {'schedule': mocked_schedule(True, 10)}})
  232. a.merge_inplace(b.schedule)
  233. self.assertNotIn('foo', a.schedule)
  234. self.assertIn('baz', a.schedule)
  235. self.assertEqual(a.schedule['bar'].schedule._next_run_at, 40)
  236. def create_persistent_scheduler(shelv=None):
  237. if shelv is None:
  238. shelv = MockShelve()
  239. class MockPersistentScheduler(beat.PersistentScheduler):
  240. sh = shelv
  241. persistence = Object()
  242. persistence.open = lambda *a, **kw: shelv
  243. tick_raises_exit = False
  244. shutdown_service = None
  245. def tick(self):
  246. if self.tick_raises_exit:
  247. raise SystemExit()
  248. if self.shutdown_service:
  249. self.shutdown_service._is_shutdown.set()
  250. return 0.0
  251. return MockPersistentScheduler, shelv
  252. class test_PersistentScheduler(AppCase):
  253. @patch('os.remove')
  254. def test_remove_db(self, remove):
  255. s = create_persistent_scheduler()[0](app=self.app,
  256. schedule_filename='schedule')
  257. s._remove_db()
  258. remove.assert_has_calls(
  259. [call('schedule' + suffix) for suffix in s.known_suffixes]
  260. )
  261. err = OSError()
  262. err.errno = errno.ENOENT
  263. remove.side_effect = err
  264. s._remove_db()
  265. err.errno = errno.EPERM
  266. with self.assertRaises(OSError):
  267. s._remove_db()
  268. def test_setup_schedule(self):
  269. s = create_persistent_scheduler()[0](app=self.app,
  270. schedule_filename='schedule')
  271. opens = s.persistence.open = Mock()
  272. s._remove_db = Mock()
  273. def effect(*args, **kwargs):
  274. if opens.call_count > 1:
  275. return s.sh
  276. raise OSError()
  277. opens.side_effect = effect
  278. s.setup_schedule()
  279. s._remove_db.assert_called_with()
  280. s._store = {'__version__': 1}
  281. s.setup_schedule()
  282. s._store.clear = Mock()
  283. op = s.persistence.open = Mock()
  284. op.return_value = s._store
  285. s._store['tz'] = 'FUNKY'
  286. s.setup_schedule()
  287. op.assert_called_with(s.schedule_filename, writeback=True)
  288. s._store.clear.assert_called_with()
  289. s._store['utc_enabled'] = False
  290. s._store.clear = Mock()
  291. s.setup_schedule()
  292. s._store.clear.assert_called_with()
  293. def test_get_schedule(self):
  294. s = create_persistent_scheduler()[0](
  295. schedule_filename='schedule', app=self.app,
  296. )
  297. s._store = {'entries': {}}
  298. s.schedule = {'foo': 'bar'}
  299. self.assertDictEqual(s.schedule, {'foo': 'bar'})
  300. self.assertDictEqual(s._store['entries'], s.schedule)
  301. class test_Service(AppCase):
  302. def get_service(self):
  303. Scheduler, mock_shelve = create_persistent_scheduler()
  304. return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve
  305. def test_pickleable(self):
  306. s = beat.Service(app=self.app, scheduler_cls=Mock)
  307. self.assertTrue(loads(dumps(s)))
  308. def test_start(self):
  309. s, sh = self.get_service()
  310. schedule = s.scheduler.schedule
  311. self.assertIsInstance(schedule, dict)
  312. self.assertIsInstance(s.scheduler, beat.Scheduler)
  313. scheduled = list(schedule.keys())
  314. for task_name in keys(sh['entries']):
  315. self.assertIn(task_name, scheduled)
  316. s.sync()
  317. self.assertTrue(sh.closed)
  318. self.assertTrue(sh.synced)
  319. self.assertTrue(s._is_stopped.isSet())
  320. s.sync()
  321. s.stop(wait=False)
  322. self.assertTrue(s._is_shutdown.isSet())
  323. s.stop(wait=True)
  324. self.assertTrue(s._is_shutdown.isSet())
  325. p = s.scheduler._store
  326. s.scheduler._store = None
  327. try:
  328. s.scheduler.sync()
  329. finally:
  330. s.scheduler._store = p
  331. def test_start_embedded_process(self):
  332. s, sh = self.get_service()
  333. s._is_shutdown.set()
  334. s.start(embedded_process=True)
  335. def test_start_thread(self):
  336. s, sh = self.get_service()
  337. s._is_shutdown.set()
  338. s.start(embedded_process=False)
  339. def test_start_tick_raises_exit_error(self):
  340. s, sh = self.get_service()
  341. s.scheduler.tick_raises_exit = True
  342. s.start()
  343. self.assertTrue(s._is_shutdown.isSet())
  344. def test_start_manages_one_tick_before_shutdown(self):
  345. s, sh = self.get_service()
  346. s.scheduler.shutdown_service = s
  347. s.start()
  348. self.assertTrue(s._is_shutdown.isSet())
  349. class test_EmbeddedService(AppCase):
  350. def test_start_stop_process(self):
  351. try:
  352. import _multiprocessing # noqa
  353. except ImportError:
  354. raise SkipTest('multiprocessing not available')
  355. from billiard.process import Process
  356. s = beat.EmbeddedService(app=self.app)
  357. self.assertIsInstance(s, Process)
  358. self.assertIsInstance(s.service, beat.Service)
  359. s.service = MockService()
  360. class _Popen(object):
  361. terminated = False
  362. def terminate(self):
  363. self.terminated = True
  364. s.run()
  365. self.assertTrue(s.service.started)
  366. s._popen = _Popen()
  367. s.stop()
  368. self.assertTrue(s.service.stopped)
  369. self.assertTrue(s._popen.terminated)
  370. def test_start_stop_threaded(self):
  371. s = beat.EmbeddedService(thread=True, app=self.app)
  372. from threading import Thread
  373. self.assertIsInstance(s, Thread)
  374. self.assertIsInstance(s.service, beat.Service)
  375. s.service = MockService()
  376. s.run()
  377. self.assertTrue(s.service.started)
  378. s.stop()
  379. self.assertTrue(s.service.stopped)
  380. class test_schedule(AppCase):
  381. def test_maybe_make_aware(self):
  382. x = schedule(10, app=self.app)
  383. x.utc_enabled = True
  384. d = x.maybe_make_aware(datetime.utcnow())
  385. self.assertTrue(d.tzinfo)
  386. x.utc_enabled = False
  387. d2 = x.maybe_make_aware(datetime.utcnow())
  388. self.assertIsNone(d2.tzinfo)
  389. def test_to_local(self):
  390. x = schedule(10, app=self.app)
  391. x.utc_enabled = True
  392. d = x.to_local(datetime.utcnow())
  393. self.assertIsNone(d.tzinfo)
  394. x.utc_enabled = False
  395. d = x.to_local(datetime.utcnow())
  396. self.assertTrue(d.tzinfo)