import datetime from huey import crontab from huey import exceptions as huey_exceptions from huey import RedisHuey from huey.api import Huey from huey.api import QueueTask from huey.api import TaskWrapper from huey.constants import EmptyData from huey.exceptions import TaskException from huey.registry import registry from huey.storage import RedisStorage from huey.tests.base import b from huey.tests.base import BaseTestCase from huey.utils import local_to_utc huey = RedisHuey(result_store=False, events=False, blocking=False) huey_results = RedisHuey(blocking=False, max_errors=10) huey_store_none = RedisHuey(store_none=True, blocking=False) # Global state. state = {} @huey.task() def put_data(key, value): state[key] = value @huey.task(include_task=True) def put_data_ctx(key, value, task=None): state['last_task_class'] = type(task).__name__ @huey_results.task(include_task=True) def error_testing_task_with_ctx(key, value, task=None): bad = 1/0 state['last_task_class'] = type(task).__name__ class PutTask(QueueTask): def execute(self): k, v = self.data state[k] = v registry.register(PutTask) class TestException(Exception): pass def _throw_error_task(message=None): raise TestException(message or 'bampf') throw_error_task = huey.task()(_throw_error_task) throw_error_task_res = huey_results.task()(_throw_error_task) @huey_results.task() def add_values(a, b): return a + b @huey_results.task() def add_values2(a, b): return a + b @huey_results.periodic_task(crontab(minute='0')) def hourly_task2(): state['periodic'] = 2 @huey_results.task() def returns_none(): return None @huey_store_none.task() def returns_none2(): return None class BaseQueueTestCase(BaseTestCase): def setUp(self): global state state = {} huey.flush() huey_results.flush() huey_store_none.flush() self.assertEqual(len(huey), 0) def tearDown(self): huey.flush() huey_results.flush() huey_store_none.flush() class TestHueyQueueMetadataAPIs(BaseQueueTestCase): def test_queue_metadata(self): put_data('k1', 'v1') put_data('k2', 'v2') cmd2, cmd1 = huey.pending() self.assertEqual(cmd2.data, (('k2', 'v2'), {})) self.assertEqual(cmd1.data, (('k1', 'v1'), {})) huey.dequeue() cmd1, = huey.pending() self.assertEqual(cmd1.data, (('k2', 'v2'), {})) def test_schedule_metadata(self): add_values.schedule((1, 2), delay=10) add_values.schedule((3, 4), delay=5) self.assertEqual(len(huey_results), 2) huey_results.add_schedule(huey.dequeue()) huey_results.add_schedule(huey.dequeue()) cmd2, cmd1 = huey_results.scheduled() self.assertEqual(cmd1.data, ((1, 2), {})) self.assertEqual(cmd2.data, ((3, 4), {})) def test_results_metadata(self): add_values(1, 2) add_values(3, 4) t1 = huey_results.dequeue() t2 = huey_results.dequeue() self.assertEqual(huey_results.all_results(), {}) huey_results.execute(t1) self.assertEqual(list(huey_results.all_results()), [b(t1.task_id)]) huey_results.execute(t2) self.assertEqual(sorted(huey_results.all_results().keys()), sorted([b(t1.task_id), b(t2.task_id)])) class TestHueyQueueAPIs(BaseQueueTestCase): def test_enqueue(self): # initializing the command does not enqueue it task = PutTask(('k', 'v')) self.assertEqual(len(huey), 0) # ok, enqueue it, then check that it was enqueued huey.enqueue(task) self.assertEqual(len(huey), 1) self.assertEqual(state, {}) # it can be enqueued multiple times huey.enqueue(task) self.assertEqual(len(huey), 2) # no changes to state self.assertEqual(state, {}) def test_enqueue_decorator(self): put_data('k', 'v') self.assertEqual(len(huey), 1) put_data('k', 'v') self.assertEqual(len(huey), 2) # no changes to state self.assertEqual(state, {}) def test_scheduled_time(self): put_data('k', 'v') task = huey.dequeue() self.assertEqual(len(huey), 0) self.assertEqual(task.execute_time, None) dt = datetime.datetime(2011, 1, 1, 0, 1) put_data.schedule(args=('k2', 'v2'), eta=dt) self.assertEqual(len(huey), 1) task = huey.dequeue() self.assertEqual(task.execute_time, local_to_utc(dt)) put_data.schedule(args=('k3', 'v3'), eta=dt, convert_utc=False) self.assertEqual(len(huey), 1) task = huey.dequeue() self.assertEqual(task.execute_time, dt) def test_error_raised(self): throw_error_task() task = huey.dequeue() self.assertRaises(TestException, huey.execute, task) def test_error_logging(self): def call_task(): throw_error_task_res('nuggie') task = huey_results.dequeue() self.assertRaises(TestException, huey_results.execute, task) return task hr = huey_results self.assertEqual(len(hr.errors()), 0) task = call_task() errors = hr.errors() self.assertEqual(len(errors), 1) error = errors[0] self.assertTrue(error['error'].startswith('TestException(\'nuggie\'')) self.assertEqual(error['id'], task.task_id) for i in range(9): call_task() self.assertEqual(len(hr.errors()), i + 2) self.assertEqual(len(hr.errors()), 10) # Just to be clear. # When we run the task again, the queue will have been trimmed. task = call_task() self.assertEqual(len(hr.errors()), 10) # The first item in the queue is the most recently executed task. most_recent_error = hr.errors()[0] self.assertEqual(most_recent_error['id'], task.task_id) def test_internal_error(self): """ Verify that exceptions are wrapped with the special "huey" exception classes. """ class SpecialException(Exception): pass class BrokenStorage(RedisStorage): def enqueue(self): raise SpecialException('read error') def dequeue(self, data): raise SpecialException('write error') def pop_data(self, key): raise SpecialException('get error') def peek_data(self, key): raise SpecialException('get error') def put_data(self, key, value): raise SpecialException('put error') def add_to_schedule(self, data, ts): raise SpecialException('add error') def read_schedule(self, ts): raise SpecialException('read error') class BrokenHuey(RedisHuey): def get_storage(self, **kwargs): return BrokenStorage(self.name) task = PutTask(('foo', 'bar')) huey = BrokenHuey() self.assertRaises( huey_exceptions.QueueWriteException, huey.enqueue, task) self.assertRaises( huey_exceptions.QueueReadException, huey.dequeue) self.assertRaises( huey_exceptions.DataStorePutException, huey.revoke, task) self.assertRaises( huey_exceptions.DataStoreGetException, huey.restore, task) self.assertRaises( huey_exceptions.ScheduleAddException, huey.add_schedule, task) self.assertRaises( huey_exceptions.ScheduleReadException, huey.read_schedule, 1) def test_dequeueing(self): res = huey.dequeue() # no error raised if queue is empty self.assertEqual(res, None) put_data('k', 'v') task = huey.dequeue() self.assertTrue(isinstance(task, QueueTask)) self.assertEqual(task.get_data(), (('k', 'v'), {})) def test_execution(self): self.assertEqual(state, {}) put_data('k', 'v') task = huey.dequeue() self.assertFalse('k' in state) huey.execute(task) self.assertEqual(state, {'k': 'v'}) put_data('k', 'X') self.assertEqual(state, {'k': 'v'}) huey.execute(huey.dequeue()) self.assertEqual(state, {'k': 'X'}) self.assertRaises(TypeError, huey.execute, huey.dequeue()) def test_self_awareness(self): put_data_ctx('k', 'v') task = huey.dequeue() huey.execute(task) self.assertEqual(state['last_task_class'], 'queue_task_put_data_ctx') del state['last_task_class'] put_data('k', 'x') huey.execute(huey.dequeue()) self.assertFalse('last_task_class' in state) def test_self_aware_error_handler(self): error_testing_task_with_ctx('k', 'v') task = huey_results.dequeue() self.assertRaises(ZeroDivisionError, huey_results.execute, task) def test_call_local(self): self.assertEqual(len(huey), 0) self.assertEqual(state, {}) put_data.call_local('nugget', 'green') self.assertEqual(len(huey), 0) self.assertEqual(state, {'nugget': 'green'}) def test_reschedule(self): eta = datetime.datetime.utcnow() + datetime.timedelta(seconds=60) trw = add_values.schedule((1, 2), eta=eta, convert_utc=False) self.assertEqual(trw.task.execute_time, eta) # Pull pending task off queue. Quick sanity check that the task result # wrapper has the same task_id as the task we just pulled down. task = huey_results.dequeue() self.assertEqual(trw.task.task_id, task.task_id) self.assertEqual(trw.task.execute_time, task.execute_time) # Verify the task is not ready to run and add to schedule. self.assertFalse(huey_results.ready_to_run(task)) huey_results.add_schedule(task) # Reschedule the task using the result wrapper. new_eta = eta - datetime.timedelta(seconds=30) trw_r = trw.reschedule(eta=new_eta, convert_utc=False) self.assertEqual(trw_r.task.execute_time, new_eta) task_r = huey_results.dequeue() self.assertEqual(task_r.execute_time, new_eta) self.assertFalse(huey_results.ready_to_run(task_r)) huey_results.add_schedule(task_r) self.assertTrue(huey_results.is_revoked(task)) self.assertFalse(huey_results.is_revoked(task_r)) # Reschedule without an ETA. trw_r2 = trw_r.reschedule() task_r2 = huey_results.dequeue() self.assertTrue(task_r2.execute_time is None) self.assertTrue(huey_results.ready_to_run(task_r2)) self.assertTrue(huey_results.is_revoked(task_r)) def test_revoke(self): ac = PutTask(('k', 'v')) ac2 = PutTask(('k2', 'v2')) ac3 = PutTask(('k3', 'v3')) huey_results.enqueue(ac) huey_results.enqueue(ac2) huey_results.enqueue(ac3) huey_results.enqueue(ac2) huey_results.enqueue(ac) self.assertEqual(len(huey_results), 5) huey_results.revoke(ac2) while huey_results: task = huey_results.dequeue() if not huey_results.is_revoked(task): huey_results.execute(task) self.assertEqual(state, {'k': 'v', 'k3': 'v3'}) def test_revoke_all(self): r1 = add_values(1, 2) r2 = add_values(2, 3) r3 = add_values(3, 4) r4 = add_values2(4, 5) add_values.revoke() self.assertFalse(r2.restore()) # No effect, task itself is revoked. self.assertTrue(add_values.is_revoked()) for task_result in (r1, r2, r3): self.assertTrue(task_result.is_revoked()) self.assertFalse(r4.is_revoked()) self.assertEqual(len(huey_results), 4) results = [] while huey_results: task = huey_results.dequeue() if not huey_results.is_revoked(task): results.append(task.execute()) self.assertEqual(results, [9]) add_values.restore() rr_1 = add_values(5, 6) rr_2 = add_values(6, 7) for task_result in (rr_1, rr_2): self.assertFalse(task_result.is_revoked()) while huey_results: task = huey_results.dequeue() if not huey_results.is_revoked(task): results.append(task.execute()) self.assertEqual(results, [9, 11, 13]) def test_revoke_restore_by_id(self): t1 = PutTask(('k1', 'v1')) t2 = PutTask(('k2', 'v2')) t3 = PutTask(('k3', 'v3')) for task in (t1, t2, t3): huey_results.enqueue(task) huey_results.revoke_by_id(t3.task_id) huey_results.revoke_by_id(t2.task_id) huey_results.restore_by_id(t3.task_id) self.assertFalse(huey_results.is_revoked(huey_results.dequeue())) self.assertTrue(huey_results.is_revoked(huey_results.dequeue())) self.assertFalse(huey_results.is_revoked(huey_results.dequeue())) def test_revoke_periodic(self): hourly_task2.revoke() self.assertTrue(hourly_task2.is_revoked()) # it is still revoked self.assertTrue(hourly_task2.is_revoked()) self.assertTrue(hourly_task2.restore()) self.assertFalse(hourly_task2.is_revoked()) self.assertFalse(hourly_task2.restore()) # It is not revoked. hourly_task2.revoke(revoke_once=True) self.assertTrue(hourly_task2.is_revoked()) # it is revoked once, but we are preserving that state self.assertTrue(hourly_task2.is_revoked(peek=False)) # is revoked once, but clear state self.assertFalse(hourly_task2.is_revoked()) # no longer revoked d = datetime.datetime hourly_task2.revoke(revoke_until=d(2011, 1, 1, 11, 0)) self.assertTrue(hourly_task2.is_revoked(dt=d(2011, 1, 1, 10, 0))) self.assertTrue(hourly_task2.is_revoked(dt=d(2011, 1, 1, 10, 59))) self.assertFalse(hourly_task2.is_revoked(dt=d(2011, 1, 1, 11, 0))) hourly_task2.restore() self.assertFalse(hourly_task2.is_revoked()) def test_result_store(self): res = add_values(1, 2) res2 = add_values(4, 5) res3 = add_values(0, 0) # none have been executed as yet self.assertEqual(res.get(), None) self.assertEqual(res2.get(), None) self.assertEqual(res3.get(), None) # execute the first task huey_results.execute(huey_results.dequeue()) self.assertEqual(res.get(), 3) self.assertEqual(res2.get(), None) self.assertEqual(res3.get(), None) # We can also call the result object. self.assertEqual(res(), 3) self.assertEqual(res2(), None) # execute the second task huey_results.execute(huey_results.dequeue()) self.assertEqual(res.get(), 3) self.assertEqual(res2.get(), 9) self.assertEqual(res3.get(), None) # execute the 3rd, which returns a zero value huey_results.execute(huey_results.dequeue()) self.assertEqual(res.get(), 3) self.assertEqual(res2.get(), 9) self.assertEqual(res3.get(), 0) # check that it returns None when nothing is present res = returns_none() self.assertEqual(res.get(), None) # execute, it will still return None, but underneath it is an EmptyResult # indicating its actual result was not persisted huey_results.execute(huey_results.dequeue()) self.assertEqual(res.get(), None) self.assertEqual(res._result, EmptyData) # execute again, this time note that we're pointing at the invoker # that *does* accept None as a store-able result res = returns_none2() self.assertEqual(res.get(), None) # it stores None huey_store_none.execute(huey_store_none.dequeue()) self.assertEqual(res.get(), None) self.assertEqual(res._result, None) def test_huey_result_method(self): res = add_values(1, 2) tid = res.task.task_id res2 = add_values(0, 0) tid2 = res2.task.task_id self.assertTrue(huey_results.result(tid) is None) self.assertTrue(huey_results.result(tid2) is None) # Execute the first task huey_results.execute(huey_results.dequeue()) self.assertEqual(huey_results.result(tid), 3) self.assertTrue(huey_results.result(tid2) is None) # Execute the second task, which returns a zero value. huey_results.execute(huey_results.dequeue()) self.assertEqual(huey_results.result(tid2), 0) def test_huey_result_error_propagation(self): # Execute a task that raises a TestException error. res = throw_error_task_res() task = huey_results.dequeue() self.assertRaises(TestException, huey_results.execute, task) # Verify TaskException raised when resolving TaskResultWrapper. self.assertRaises(TaskException, res.get) # Execute task again to verify the huey.result() API behavior. res = throw_error_task_res() tid = res.task.task_id task = huey_results.dequeue() self.assertRaises(TestException, huey_results.execute, task) # Verify error raised when calling .result() with task ID. self.assertRaises(TaskException, huey.result, tid) def test_result_preserve(self): res = add_values(1, 2) tid = res.task.task_id huey_results.execute(huey_results.dequeue()) self.assertEqual(res.get(preserve=True), 3) self.assertEqual(huey_results.result(tid, preserve=True), 3) self.assertEqual(huey_results.result(tid, preserve=False), 3) self.assertEqual(huey_results.result(tid), None) def test_task_store(self): dt1 = datetime.datetime(2011, 1, 1, 0, 0) dt2 = datetime.datetime(2035, 1, 1, 0, 0) add_values.schedule(args=('k', 'v'), eta=dt1, convert_utc=False) task1 = huey_results.dequeue() add_values.schedule(args=('k2', 'v2'), eta=dt2, convert_utc=False) task2 = huey_results.dequeue() add_values('k3', 'v3') task3 = huey_results.dequeue() # add the command to the schedule huey_results.add_schedule(task1) self.assertEqual(huey_results.scheduled_count(), 1) # add a future-dated command huey_results.add_schedule(task2) self.assertEqual(huey_results.scheduled_count(), 2) huey_results.add_schedule(task3) tasks = huey_results.read_schedule(dt1) self.assertEqual(tasks, [task3, task1]) tasks = huey_results.read_schedule(dt1) self.assertEqual(tasks, []) tasks = huey_results.read_schedule(dt2) self.assertEqual(tasks, [task2]) def test_ready_to_run_method(self): dt1 = datetime.datetime(2011, 1, 1, 0, 0) dt2 = datetime.datetime(2035, 1, 1, 0, 0) add_values.schedule(args=('k', 'v'), eta=dt1) task1 = huey_results.dequeue() add_values.schedule(args=('k2', 'v2'), eta=dt2) task2 = huey_results.dequeue() add_values('k3', 'v3') task3 = huey_results.dequeue() add_values.schedule(args=('k4', 'v4'), task_id='test_task_id') task4 = huey_results.dequeue() # sanity check what should be run self.assertTrue(huey_results.ready_to_run(task1)) self.assertFalse(huey_results.ready_to_run(task2)) self.assertTrue(huey_results.ready_to_run(task3)) self.assertTrue(huey_results.ready_to_run(task4)) self.assertEqual('test_task_id', task4.task_id) def test_task_delay(self): curr = datetime.datetime.utcnow() curr50 = curr + datetime.timedelta(seconds=50) curr70 = curr + datetime.timedelta(seconds=70) add_values.schedule(args=('k', 'v'), delay=60) task1 = huey_results.dequeue() add_values.schedule(args=('k2', 'v2'), delay=600) task2 = huey_results.dequeue() add_values('k3', 'v3') task3 = huey_results.dequeue() # add the command to the schedule huey_results.add_schedule(task1) huey_results.add_schedule(task2) huey_results.add_schedule(task3) # sanity check what should be run self.assertFalse(huey_results.ready_to_run(task1)) self.assertFalse(huey_results.ready_to_run(task2)) self.assertTrue(huey_results.ready_to_run(task3)) self.assertFalse(huey_results.ready_to_run(task1, curr50)) self.assertFalse(huey_results.ready_to_run(task2, curr50)) self.assertTrue(huey_results.ready_to_run(task3, curr50)) self.assertTrue(huey_results.ready_to_run(task1, curr70)) self.assertFalse(huey_results.ready_to_run(task2, curr70)) self.assertTrue(huey_results.ready_to_run(task3, curr70)) def test_task_decorators(self): huey = RedisHuey() def test_fn(): pass test_fn_task = huey.task()(test_fn) test_fn_cron = huey.periodic_task(crontab(minute='0'))(test_fn) self.assertTrue(isinstance(test_fn_task, TaskWrapper)) self.assertTrue(test_fn_task.func is test_fn) self.assertTrue(isinstance(test_fn_cron, TaskWrapper)) self.assertTrue(test_fn_cron.func is test_fn) test_cron_task = huey.periodic_task(crontab(minute='0'))(test_fn_task) self.assertTrue(isinstance(test_cron_task, TaskWrapper)) self.assertTrue(test_cron_task.func is test_fn) def test_flush_locks(self): with huey.lock_task('lock1'): with huey.lock_task('lock2'): flushed = huey.flush_locks() self.assertEqual(flushed, set(['lock1', 'lock2'])) self.assertEqual(huey.flush_locks(), set()) eager_huey = RedisHuey(blocking=False, always_eager=True) @eager_huey.task() def add(a, b): return a + b class TestAlwaysEager(BaseQueueTestCase): def test_always_eager(self): self.assertEqual(add(1, 3), 4) # Test pipelining. pipe = add.s(1, 2).then(add, 3).then(add, 4).then(add, 5) result = eager_huey.enqueue(pipe) self.assertEqual(result, [3, 6, 10, 15]) def test_always_eager_failure(self): self.assertRaises(TypeError, add, 1, None) pipe = add.s(1, 2).then(add, None).then(add, 4) self.assertRaises(TypeError, eager_huey.enqueue, pipe)