from __future__ import absolute_import import errno from datetime import datetime, timedelta from mock import Mock, call, patch from nose import SkipTest from pickle import dumps, loads from celery import beat from celery.five import keys, string_t from celery.schedules import schedule from celery.utils import uuid from celery.tests.case import AppCase class Object(object): pass class MockShelve(dict): closed = False synced = False def close(self): self.closed = True def sync(self): self.synced = True class MockService(object): started = False stopped = False def __init__(self, *args, **kwargs): pass def start(self, **kwargs): self.started = True def stop(self, **kwargs): self.stopped = True class test_ScheduleEntry(AppCase): Entry = beat.ScheduleEntry def create_entry(self, **kwargs): entry = dict( name='celery.unittest.add', schedule=timedelta(seconds=10), args=(2, 2), options={'routing_key': 'cpu'}, app=self.app, ) return self.Entry(**dict(entry, **kwargs)) def test_next(self): entry = self.create_entry(schedule=10) self.assertTrue(entry.last_run_at) self.assertIsInstance(entry.last_run_at, datetime) self.assertEqual(entry.total_run_count, 0) next_run_at = entry.last_run_at + timedelta(seconds=10) next_entry = entry.next(next_run_at) self.assertGreaterEqual(next_entry.last_run_at, next_run_at) self.assertEqual(next_entry.total_run_count, 1) def test_is_due(self): entry = self.create_entry(schedule=timedelta(seconds=10)) self.assertIs(entry.app, self.app) self.assertIs(entry.schedule.app, self.app) due1, next_time_to_run1 = entry.is_due() self.assertFalse(due1) self.assertGreater(next_time_to_run1, 9) next_run_at = entry.last_run_at - timedelta(seconds=10) next_entry = entry.next(next_run_at) due2, next_time_to_run2 = next_entry.is_due() self.assertTrue(due2) self.assertGreater(next_time_to_run2, 9) def test_repr(self): entry = self.create_entry() self.assertIn(' 1: return s.sh raise OSError() opens.side_effect = effect s.setup_schedule() s._remove_db.assert_called_with() s._store = {'__version__': 1} s.setup_schedule() s._store.clear = Mock() op = s.persistence.open = Mock() op.return_value = s._store s._store['tz'] = 'FUNKY' s.setup_schedule() op.assert_called_with(s.schedule_filename, writeback=True) s._store.clear.assert_called_with() s._store['utc_enabled'] = False s._store.clear = Mock() s.setup_schedule() s._store.clear.assert_called_with() def test_get_schedule(self): s = create_persistent_scheduler()[0]( schedule_filename='schedule', app=self.app, ) s._store = {'entries': {}} s.schedule = {'foo': 'bar'} self.assertDictEqual(s.schedule, {'foo': 'bar'}) self.assertDictEqual(s._store['entries'], s.schedule) class test_Service(AppCase): def get_service(self): Scheduler, mock_shelve = create_persistent_scheduler() return beat.Service(app=self.app, scheduler_cls=Scheduler), mock_shelve def test_pickleable(self): s = beat.Service(app=self.app, scheduler_cls=Mock) self.assertTrue(loads(dumps(s))) def test_start(self): s, sh = self.get_service() schedule = s.scheduler.schedule self.assertIsInstance(schedule, dict) self.assertIsInstance(s.scheduler, beat.Scheduler) scheduled = list(schedule.keys()) for task_name in keys(sh['entries']): self.assertIn(task_name, scheduled) s.sync() self.assertTrue(sh.closed) self.assertTrue(sh.synced) self.assertTrue(s._is_stopped.isSet()) s.sync() s.stop(wait=False) self.assertTrue(s._is_shutdown.isSet()) s.stop(wait=True) self.assertTrue(s._is_shutdown.isSet()) p = s.scheduler._store s.scheduler._store = None try: s.scheduler.sync() finally: s.scheduler._store = p def test_start_embedded_process(self): s, sh = self.get_service() s._is_shutdown.set() s.start(embedded_process=True) def test_start_thread(self): s, sh = self.get_service() s._is_shutdown.set() s.start(embedded_process=False) def test_start_tick_raises_exit_error(self): s, sh = self.get_service() s.scheduler.tick_raises_exit = True s.start() self.assertTrue(s._is_shutdown.isSet()) def test_start_manages_one_tick_before_shutdown(self): s, sh = self.get_service() s.scheduler.shutdown_service = s s.start() self.assertTrue(s._is_shutdown.isSet()) class test_EmbeddedService(AppCase): def test_start_stop_process(self): try: import _multiprocessing # noqa except ImportError: raise SkipTest('multiprocessing not available') from billiard.process import Process s = beat.EmbeddedService(app=self.app) self.assertIsInstance(s, Process) self.assertIsInstance(s.service, beat.Service) s.service = MockService() class _Popen(object): terminated = False def terminate(self): self.terminated = True s.run() self.assertTrue(s.service.started) s._popen = _Popen() s.stop() self.assertTrue(s.service.stopped) self.assertTrue(s._popen.terminated) def test_start_stop_threaded(self): s = beat.EmbeddedService(thread=True, app=self.app) from threading import Thread self.assertIsInstance(s, Thread) self.assertIsInstance(s.service, beat.Service) s.service = MockService() s.run() self.assertTrue(s.service.started) s.stop() self.assertTrue(s.service.stopped) class test_schedule(AppCase): def test_maybe_make_aware(self): x = schedule(10, app=self.app) x.utc_enabled = True d = x.maybe_make_aware(datetime.utcnow()) self.assertTrue(d.tzinfo) x.utc_enabled = False d2 = x.maybe_make_aware(datetime.utcnow()) self.assertIsNone(d2.tzinfo) def test_to_local(self): x = schedule(10, app=self.app) x.utc_enabled = True d = x.to_local(datetime.utcnow()) self.assertIsNone(d.tzinfo) x.utc_enabled = False d = x.to_local(datetime.utcnow()) self.assertTrue(d.tzinfo)