123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011 |
- import datetime
- import json
- import pickle
- import re
- import time
- import traceback
- import uuid
- from collections import OrderedDict
- from functools import wraps
- from inspect import isclass
- from huey.constants import EmptyData
- from huey.consumer import Consumer
- from huey.exceptions import DataStoreGetException
- from huey.exceptions import DataStorePutException
- from huey.exceptions import DataStoreTimeout
- from huey.exceptions import QueueException
- from huey.exceptions import QueueReadException
- from huey.exceptions import QueueRemoveException
- from huey.exceptions import QueueWriteException
- from huey.exceptions import ScheduleAddException
- from huey.exceptions import ScheduleReadException
- from huey.exceptions import TaskException
- from huey.exceptions import TaskLockedException
- from huey.registry import registry
- from huey.registry import TaskRegistry
- from huey.utils import Error
- from huey.utils import aware_to_utc
- from huey.utils import is_aware
- from huey.utils import is_naive
- from huey.utils import local_to_utc
- from huey.utils import make_naive
- from huey.utils import wrap_exception
- class Huey(object):
- """
- Huey executes tasks by exposing function decorators that cause the function
- call to be enqueued for execution by the consumer.
- Typically your application will only need one Huey instance, but you can
- have as many as you like -- the only caveat is that one consumer process
- must be executed for each Huey instance.
- :param name: a name for the task queue.
- :param bool result_store: whether to store task results.
- :param bool events: whether to enable consumer-sent events.
- :param store_none: Flag to indicate whether tasks that return ``None``
- should store their results in the result store.
- :param always_eager: Useful for testing, this will execute all tasks
- immediately, without enqueueing them.
- :param store_errors: Flag to indicate whether task errors should be stored.
- :param global_registry: Use a global registry for tasks.
- Example usage::
- from huey import RedisHuey
- # Create a huey instance and disable consumer-sent events.
- huey = RedisHuey('my-app', events=False)
- @huey.task()
- def slow_function(some_arg):
- # ... do something ...
- return some_arg
- @huey.periodic_task(crontab(minute='0', hour='3'))
- def backup():
- # do a backup every day at 3am
- return
- """
- def __init__(self, name='huey', result_store=True, events=True,
- store_none=False, always_eager=False, store_errors=True,
- blocking=False, global_registry=True, **storage_kwargs):
- self.name = name
- self.result_store = result_store
- self.events = events
- self.store_none = store_none
- self.always_eager = always_eager
- self.store_errors = store_errors
- self.blocking = blocking
- self.storage = self.get_storage(**storage_kwargs)
- self.pre_execute_hooks = OrderedDict()
- self.post_execute_hooks = OrderedDict()
- self.startup_hooks = OrderedDict()
- self._locks = set()
- if global_registry:
- self.registry = registry
- else:
- self.registry = TaskRegistry()
- def get_storage(self, **kwargs):
- raise NotImplementedError('Storage API not implemented in the base '
- 'Huey class. Use `RedisHuey` instead.')
- def create_consumer(self, **config):
- return Consumer(self, **config)
- def _normalize_execute_time(self, eta=None, delay=None, convert_utc=True):
- if delay and eta:
- raise ValueError('Both a delay and an eta cannot be '
- 'specified at the same time')
- elif delay:
- method = (convert_utc and datetime.datetime.utcnow or
- datetime.datetime.now)
- return method() + datetime.timedelta(seconds=delay)
- elif eta:
- if is_naive(eta) and convert_utc:
- eta = local_to_utc(eta)
- elif is_aware(eta) and convert_utc:
- eta = aware_to_utc(eta)
- elif is_aware(eta) and not convert_utc:
- eta = make_naive(eta)
- return eta
- def task(self, retries=0, retry_delay=0, retries_as_argument=False,
- include_task=False, name=None, **task_settings):
- def decorator(func):
- """
- Decorator to execute a function out-of-band via the consumer.
- """
- return TaskWrapper(
- self,
- func.func if isinstance(func, TaskWrapper) else func,
- retries=retries,
- retry_delay=retry_delay,
- retries_as_argument=retries_as_argument,
- include_task=include_task,
- name=name,
- **task_settings)
- return decorator
- # We specify retries and retry_delay as 0 because they become the default
- # values as class attributes on the derived PeriodicQueueTask instance.
- # Since the values the class is instantiated with will always be `None`,
- # we want the fallback behavior to be 0 by default.
- def periodic_task(self, validate_datetime, name=None, retries=0,
- retry_delay=0, **task_settings):
- """
- Decorator to execute a function on a specific schedule.
- """
- def decorator(func):
- def method_validate(self, dt):
- return validate_datetime(dt)
- return TaskWrapper(
- self,
- func.func if isinstance(func, TaskWrapper) else func,
- name=name,
- task_base=PeriodicQueueTask,
- default_retries=retries,
- default_retry_delay=retry_delay,
- validate_datetime=method_validate,
- **task_settings)
- return decorator
- def register_pre_execute(self, name, fn):
- """
- Register a pre-execute hook. The callback will be executed before the
- execution of all tasks. Execution of the task can be cancelled by
- raising a :py:class:`CancelExecution` exception. Uncaught exceptions
- will be logged but will not cause the task itself to be cancelled.
- The callback function should accept a single task instance, the return
- value is ignored.
- :param name: Name for the hook.
- :param fn: Callback function that accepts task to be executed.
- """
- self.pre_execute_hooks[name] = fn
- def unregister_pre_execute(self, name):
- del self.pre_execute_hooks[name]
- def pre_execute(self, name=None):
- """
- Decorator for registering a pre-execute hook.
- """
- def decorator(fn):
- self.register_pre_execute(name or fn.__name__, fn)
- return fn
- return decorator
- def register_post_execute(self, name, fn):
- """
- Register a post-execute hook. The callback will be executed after the
- execution of all tasks. Uncaught exceptions will be logged but will
- have no other effect on the overall operation of the consumer.
- The callback function should accept:
- * a task instance
- * the return value from the execution of the task (which may be None)
- * any exception that was raised during the execution of the task (which
- will be None for tasks that executed normally).
- The return value of the callback itself is ignored.
- :param name: Name for the hook.
- :param fn: Callback function that accepts task that was executed and
- the tasks return value (or None).
- """
- self.post_execute_hooks[name] = fn
- def unregister_post_execute(self, name):
- del self.post_execute_hooks[name]
- def post_execute(self, name=None):
- """
- Decorator for registering a post-execute hook.
- """
- def decorator(fn):
- self.register_post_execute(name or fn.__name__, fn)
- return fn
- return decorator
- def register_startup(self, name, fn):
- """
- Register a startup hook. The callback will be executed whenever a
- worker comes online. Uncaught exceptions will be logged but will
- have no other effect on the overall operation of the worker.
- The callback function must not accept any parameters.
- This API is provided to simplify setting up global resources that, for
- whatever reason, should not be created as import-time side-effects. For
- example, your tasks need to write data into a Postgres database. If you
- create the connection at import-time, before the worker processes are
- spawned, you'll likely run into errors when attempting to use the
- connection from the child (worker) processes. To avoid this problem,
- you can register a startup hook which is executed by the worker process
- as part of its initialization.
- :param name: Name for the hook.
- :param fn: Callback function.
- """
- self.startup_hooks[name] = fn
- def unregister_startup(self, name):
- del self.startup_hooks[name]
- def on_startup(self, name=None):
- """
- Decorator for registering a startup hook.
- """
- def decorator(fn):
- self.register_startup(name or fn.__name__, fn)
- return fn
- return decorator
- def _wrapped_operation(exc_class):
- def decorator(fn):
- def inner(*args, **kwargs):
- try:
- return fn(*args, **kwargs)
- except (KeyboardInterrupt, RuntimeError):
- raise
- except:
- wrap_exception(exc_class)
- return inner
- return decorator
- @_wrapped_operation(QueueWriteException)
- def _enqueue(self, msg):
- self.storage.enqueue(msg)
- @_wrapped_operation(QueueReadException)
- def _dequeue(self):
- return self.storage.dequeue()
- @_wrapped_operation(QueueRemoveException)
- def _unqueue(self, msg):
- return self.queue.unqueue(msg)
- @_wrapped_operation(DataStoreGetException)
- def _get_data(self, key, peek=False):
- if peek:
- return self.storage.peek_data(key)
- else:
- return self.storage.pop_data(key)
- @_wrapped_operation(DataStorePutException)
- def _put_data(self, key, value):
- return self.storage.put_data(key, value)
- @_wrapped_operation(DataStorePutException)
- def _put_if_empty(self, key, value):
- return self.storage.put_if_empty(key, value)
- @_wrapped_operation(DataStorePutException)
- def _put_error(self, metadata):
- self.storage.put_error(metadata)
- @_wrapped_operation(DataStoreGetException)
- def _get_errors(self, limit=None, offset=0):
- return self.storage.get_errors(limit=limit, offset=offset)
- @_wrapped_operation(ScheduleAddException)
- def _add_to_schedule(self, data, ts):
- self.storage.add_to_schedule(data, ts)
- @_wrapped_operation(ScheduleReadException)
- def _read_schedule(self, ts):
- return self.storage.read_schedule(ts)
- def emit(self, message):
- try:
- self.storage.emit(message)
- except:
- # Events always fail silently since they are treated as a non-
- # critical component.
- pass
- def _execute_always_eager(self, task):
- accum = []
- failure_exc = None
- while task is not None:
- for name, callback in self.pre_execute_hooks.items():
- callback(task)
- try:
- result = task.execute()
- except Exception as exc:
- result = None
- failure_exc = task_exc = exc
- else:
- task_exc = None
- accum.append(result)
- for name, callback in self.post_execute_hooks.items():
- callback(task, result, task_exc)
- if task.on_complete:
- task = task.on_complete
- task.extend_data(result)
- else:
- task = None
- if failure_exc is not None:
- raise failure_exc
- return accum[0] if len(accum) == 1 else accum
- def enqueue(self, task):
- if self.always_eager:
- return self._execute_always_eager(task)
- self._enqueue(self.registry.get_message_for_task(task))
- if not self.result_store:
- return
- if task.on_complete:
- q = [task]
- result_wrappers = []
- while q:
- current = q.pop()
- result_wrappers.append(TaskResultWrapper(self, current))
- if current.on_complete:
- q.append(current.on_complete)
- return result_wrappers
- else:
- return TaskResultWrapper(self, task)
- def dequeue(self):
- message = self._dequeue()
- if message:
- return self.registry.get_task_for_message(message)
- def put(self, key, value):
- return self._put_data(key,
- pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
- def get(self, key, peek=False):
- data = self._get_data(key, peek=peek)
- if data is EmptyData:
- return
- else:
- return pickle.loads(data)
- def put_error(self, metadata):
- return self._put_error(pickle.dumps(metadata))
- def _format_time(self, dt):
- if dt is None:
- return None
- return time.mktime(dt.timetuple())
- def _get_task_metadata(self, task, error=False, include_data=False):
- metadata = {
- 'id': task.task_id,
- 'task': type(task).__name__,
- 'retries': task.retries,
- 'retry_delay': task.retry_delay,
- 'execute_time': self._format_time(task.execute_time)}
- if include_data and not isinstance(task, PeriodicQueueTask):
- targs, tkwargs = task.get_data()
- if tkwargs.get("task") and isinstance(tkwargs["task"], QueueTask):
- del(tkwargs['task'])
- metadata['data'] = (targs, tkwargs)
- return metadata
- def emit_status(self, status, error=False, **data):
- if self.events:
- metadata = {'status': status, 'error': error}
- if error:
- metadata['traceback'] = traceback.format_exc()
- metadata.update(data)
- self.emit(json.dumps(metadata))
- def emit_task(self, status, task, error=False, **data):
- if self.events:
- metadata = self._get_task_metadata(task)
- metadata.update(data)
- self.emit_status(status, error=error, **metadata)
- def execute(self, task):
- if not isinstance(task, QueueTask):
- raise TypeError('Unknown object: %s' % task)
- try:
- result = task.execute()
- except Exception as exc:
- if self.store_errors:
- metadata = self._get_task_metadata(task, True)
- metadata['error'] = repr(exc)
- metadata['traceback'] = traceback.format_exc()
- self.put(task.task_id, Error(metadata))
- self.put_error(metadata)
- raise
- if self.result_store and not isinstance(task, PeriodicQueueTask):
- if result is not None or self.store_none:
- self.put(task.task_id, result)
- if task.on_complete:
- next_task = task.on_complete
- next_task.extend_data(result)
- self.enqueue(next_task)
- return result
- def revoke_all(self, task_class, revoke_until=None, revoke_once=False):
- self.put('rt:%s' % task_class.__name__, (revoke_until, revoke_once))
- def restore_all(self, task_class):
- return self._get_data('rt:%s' % task_class.__name__) is not EmptyData
- def revoke(self, task, revoke_until=None, revoke_once=False):
- self.put(task.revoke_id, (revoke_until, revoke_once))
- def restore(self, task):
- # Return value indicates whether the task was in fact revoked.
- return self._get_data(task.revoke_id) is not EmptyData
- def revoke_by_id(self, task_id, revoke_until=None, revoke_once=False):
- return self.revoke(QueueTask(task_id=task_id), revoke_until,
- revoke_once)
- def restore_by_id(self, task_id):
- return self.restore(QueueTask(task_id=task_id))
- def _check_revoked(self, revoke_id, dt=None, peek=True):
- """
- Checks if a task is revoked, returns a 2-tuple indicating:
- 1. Is task revoked?
- 2. Should task be restored?
- """
- res = self.get(revoke_id, peek=True)
- if res is None:
- return False, False
- revoke_until, revoke_once = res
- if revoke_once:
- # This task *was* revoked for one run, but now it should be
- # restored to normal execution (unless we are just peeking).
- return True, not peek
- elif revoke_until is not None and revoke_until <= dt:
- # Task is no longer revoked and can be restored.
- return False, True
- else:
- # Task is still revoked. Do not restore.
- return True, False
- def is_revoked(self, task, dt=None, peek=True):
- if isclass(task) and issubclass(task, QueueTask):
- revoke_id = 'rt:%s' % task.__name__
- is_revoked, can_restore = self._check_revoked(revoke_id, dt, peek)
- if can_restore:
- self.restore_all(task)
- return is_revoked
- if not isinstance(task, QueueTask):
- task = QueueTask(task_id=task)
- is_revoked, can_restore = self._check_revoked(task.revoke_id, dt, peek)
- if can_restore:
- self.restore(task)
- if not is_revoked:
- is_revoked = self.is_revoked(type(task), dt, peek)
- return is_revoked
- def add_schedule(self, task):
- msg = self.registry.get_message_for_task(task)
- ex_time = task.execute_time or datetime.datetime.fromtimestamp(0)
- self._add_to_schedule(msg, ex_time)
- def read_schedule(self, ts):
- return [self.registry.get_task_for_message(m)
- for m in self._read_schedule(ts)]
- def read_periodic(self, ts):
- periodic = self.registry.get_periodic_tasks()
- return [task for task in periodic
- if task.validate_datetime(ts)]
- def ready_to_run(self, cmd, dt=None):
- dt = dt or datetime.datetime.utcnow()
- return cmd.execute_time is None or cmd.execute_time <= dt
- def pending(self, limit=None):
- return [self.registry.get_task_for_message(m)
- for m in self.storage.enqueued_items(limit)]
- def pending_count(self):
- return self.storage.queue_size()
- def scheduled(self, limit=None):
- return [self.registry.get_task_for_message(m)
- for m in self.storage.scheduled_items(limit)]
- def scheduled_count(self):
- return self.storage.schedule_size()
- def all_results(self):
- return self.storage.result_items()
- def result_count(self):
- return self.storage.result_store_size()
- def errors(self, limit=None, offset=0):
- return [
- pickle.loads(error)
- for error in self.storage.get_errors(limit, offset)]
- def __len__(self):
- return self.pending_count()
- def flush(self):
- self.storage.flush_all()
- def get_tasks(self):
- return sorted(self.registry._registry.keys())
- def get_periodic_tasks(self):
- return [name for name, task in self.registry._registry.items()
- if hasattr(task, 'validate_datetime')]
- def get_regular_tasks(self):
- periodic = set(self.get_periodic_tasks())
- return [task for task in self.get_tasks() if task not in periodic]
- def lock_task(self, lock_name):
- """
- Utilize the Storage key/value APIs to implement simple locking.
- This lock is designed to be used to prevent multiple invocations of a
- task from running concurrently. Can be used as either a context-manager
- or as a task decorator. If using as a decorator, place it directly
- above the function declaration.
- If a second invocation occurs and the lock cannot be acquired, then a
- special exception is raised, which is handled by the consumer. The task
- will not be executed and an ``EVENT_LOCKED`` will be emitted. If the
- task is configured to be retried, then it will be retried normally, but
- the failure to acquire the lock is not considered an error.
- Examples:
- @huey.periodic_task(crontab(minute='*/5'))
- @huey.lock_task('reports-lock')
- def generate_report():
- # If a report takes longer than 5 minutes to generate, we do
- # not want to kick off another until the previous invocation
- # has finished.
- run_report()
- @huey.periodic_task(crontab(minute='0'))
- def backup():
- # Generate backup of code
- do_code_backup()
- # Generate database backup. Since this may take longer than an
- # hour, we want to ensure that it is not run concurrently.
- with huey.lock_task('db-backup'):
- do_db_backup()
- """
- return TaskLock(self, lock_name)
- def flush_locks(self):
- """
- Flush any stale locks (for example, when restarting the consumer).
- :return: List of any stale locks that were cleared.
- """
- flushed = set()
- for lock_key in self._locks:
- if self._get_data(lock_key) is not EmptyData:
- flushed.add(lock_key.split('.lock.', 1)[-1])
- return flushed
- def result(self, task_id, blocking=False, timeout=None, backoff=1.15,
- max_delay=1.0, revoke_on_timeout=False, preserve=False):
- """
- Retrieve the results of a task, given the task's ID. This
- method accepts the same parameters and has the same behavior
- as the :py:class:`TaskResultWrapper` object.
- """
- task_result = TaskResultWrapper(self, QueueTask(task_id=task_id))
- return task_result.get(
- blocking=blocking,
- timeout=timeout,
- backoff=backoff,
- max_delay=max_delay,
- revoke_on_timeout=revoke_on_timeout,
- preserve=preserve)
- class TaskWrapper(object):
- def __init__(self, huey, func, retries=0, retry_delay=0,
- retries_as_argument=False, include_task=False, name=None,
- task_base=None, **task_settings):
- self.huey = huey
- self.func = func
- self.retries = retries
- self.retry_delay = retry_delay
- self.retries_as_argument = retries_as_argument
- self.include_task = include_task
- self.name = name
- self.task_settings = task_settings
- self.task_class = create_task(
- QueueTask if task_base is None else task_base,
- func,
- retries_as_argument,
- name,
- include_task,
- **task_settings)
- self.huey.registry.register(self.task_class)
- def is_revoked(self, dt=None, peek=True):
- return self.huey.is_revoked(self.task_class, dt, peek)
- def revoke(self, revoke_until=None, revoke_once=False):
- self.huey.revoke_all(self.task_class, revoke_until, revoke_once)
- def restore(self):
- return self.huey.restore_all(self.task_class)
- def schedule(self, args=None, kwargs=None, eta=None, delay=None,
- convert_utc=True, task_id=None):
- execute_time = self.huey._normalize_execute_time(
- eta=eta, delay=delay, convert_utc=convert_utc)
- cmd = self.task_class(
- (args or (), kwargs or {}),
- execute_time=execute_time,
- retries=self.retries,
- retry_delay=self.retry_delay,
- task_id=task_id)
- return self.huey.enqueue(cmd)
- def __call__(self, *args, **kwargs):
- return self.huey.enqueue(self.s(*args, **kwargs))
- def call_local(self, *args, **kwargs):
- return self.func(*args, **kwargs)
- def s(self, *args, **kwargs):
- return self.task_class((args, kwargs), retries=self.retries,
- retry_delay=self.retry_delay)
- class TaskLock(object):
- """
- Utilize the Storage key/value APIs to implement simple locking. For more
- details see :py:meth:`Huey.lock_task`.
- """
- def __init__(self, huey, name):
- self._huey = huey
- self._name = name
- self._key = '%s.lock.%s' % (self._huey.name, self._name)
- self._huey._locks.add(self._key)
- def __call__(self, fn):
- @wraps(fn)
- def inner(*args, **kwargs):
- with self:
- return fn(*args, **kwargs)
- return inner
- def __enter__(self):
- if not self._huey._put_if_empty(self._key, '1'):
- raise TaskLockedException('unable to set lock: %s' % self._name)
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._huey._get_data(self._key)
- class TaskResultWrapper(object):
- """
- Wrapper around task result data. When a task is executed, an instance of
- ``TaskResultWrapper`` is returned to provide access to the return value.
- To retrieve the task's result value, you can simply call the wrapper::
- @huey.task()
- def my_task(a, b):
- return a + b
- result = my_task(1, 2)
- # After a moment, when the consumer has executed the task and put
- # the result in the result storage, we can retrieve the value.
- print result() # Prints 3
- # If you want to block until the result is ready, you can pass
- # blocking=True. We'll also specify a 4 second timeout so we don't
- # block forever if the consumer goes down:
- result2 = my_task(2, 3)
- print result(blocking=True, timeout=4)
- """
- def __init__(self, huey, task):
- self.huey = huey
- self.task = task
- self._result = EmptyData
- def __call__(self, *args, **kwargs):
- return self.get(*args, **kwargs)
- def _get(self, preserve=False):
- task_id = self.task.task_id
- if self._result is EmptyData:
- res = self.huey._get_data(task_id, peek=preserve)
- if res is not EmptyData:
- self._result = pickle.loads(res)
- return self._result
- else:
- return res
- else:
- return self._result
- def get_raw_result(self, blocking=False, timeout=None, backoff=1.15,
- max_delay=1.0, revoke_on_timeout=False, preserve=False):
- if not blocking:
- res = self._get(preserve)
- if res is not EmptyData:
- return res
- else:
- start = time.time()
- delay = .1
- while self._result is EmptyData:
- if timeout and time.time() - start >= timeout:
- if revoke_on_timeout:
- self.revoke()
- raise DataStoreTimeout
- if delay > max_delay:
- delay = max_delay
- if self._get(preserve) is EmptyData:
- time.sleep(delay)
- delay *= backoff
- return self._result
- def get(self, blocking=False, timeout=None, backoff=1.15, max_delay=1.0,
- revoke_on_timeout=False, preserve=False):
- result = self.get_raw_result(blocking, timeout, backoff, max_delay,
- revoke_on_timeout, preserve)
- if result is not None and isinstance(result, Error):
- raise TaskException(result.metadata)
- return result
- def is_revoked(self):
- return self.huey.is_revoked(self.task, peek=True)
- def revoke(self):
- self.huey.revoke(self.task)
- def restore(self):
- return self.huey.restore(self.task)
- def reschedule(self, eta=None, delay=None, convert_utc=True):
- # Rescheduling works by revoking the currently-scheduled task (nothing
- # is done to check if the task has already run, however). Then the
- # original task's data is used to enqueue a new task with a new task ID
- # and execution_time.
- self.revoke()
- execute_time = self.huey._normalize_execute_time(
- eta=eta, delay=delay, convert_utc=convert_utc)
- cmd = self.task.__class__(
- self.task.data,
- execute_time=execute_time,
- retries=self.task.retries,
- retry_delay=self.task.retry_delay,
- task_id=None)
- return self.huey.enqueue(cmd)
- def reset(self):
- self._result = EmptyData
- def with_metaclass(meta, base=object):
- return meta("NewBase", (base,), {})
- class QueueTask(object):
- """
- A class that encapsulates the logic necessary to 'do something' given some
- arbitrary data. When enqueued with the :class:`Huey`, it will be
- stored in a queue for out-of-band execution via the consumer. See also
- the :meth:`task` decorator, which can be used to automatically
- execute any function out-of-band.
- Example::
- class SendEmailTask(QueueTask):
- def execute(self):
- data = self.get_data()
- send_email(data['recipient'], data['subject'], data['body'])
- huey.enqueue(
- SendEmailTask({
- 'recipient': 'somebody@spam.com',
- 'subject': 'look at this awesome website',
- 'body': 'http://youtube.com'
- })
- )
- """
- default_retries = 0
- default_retry_delay = 0
- def __init__(self, data=None, task_id=None, execute_time=None,
- retries=None, retry_delay=None, on_complete=None):
- self.name = type(self).__name__
- self.set_data(data)
- self.task_id = task_id or self.create_id()
- self.revoke_id = 'r:%s' % self.task_id
- self.execute_time = execute_time
- self.retries = retries if retries is not None else self.default_retries
- self.retry_delay = retry_delay if retry_delay is not None else \
- self.default_retry_delay
- self.on_complete = on_complete
- def __repr__(self):
- rep = '%s.%s: %s' % (self.__module__, self.name, self.task_id)
- if self.execute_time:
- rep += ' @%s' % self.execute_time
- if self.retries:
- rep += ' %s retries' % self.retries
- if self.on_complete:
- rep += ' -> %s' % self.on_complete
- return rep
- def create_id(self):
- return str(uuid.uuid4())
- def get_data(self):
- return self.data
- def set_data(self, data):
- self.data = data
- def extend_data(self, data):
- if data is None or data == ():
- return
- args, kwargs = self.get_data()
- if isinstance(data, tuple):
- args += data
- elif isinstance(data, dict):
- kwargs.update(data)
- else:
- args = args + (data,)
- self.set_data((args, kwargs))
- def then(self, task, *args, **kwargs):
- if self.on_complete:
- self.on_complete.then(task, *args, **kwargs)
- else:
- self.on_complete = task.s(*args, **kwargs)
- return self
- def execute(self):
- """Execute any arbitary code here"""
- raise NotImplementedError
- def __eq__(self, rhs):
- return (
- self.task_id == rhs.task_id and
- self.execute_time == rhs.execute_time and
- type(self) == type(rhs))
- class PeriodicQueueTask(QueueTask):
- def validate_datetime(self, dt):
- """Validate that the task should execute at the given datetime"""
- return False
- def create_task(task_class, func, retries_as_argument=False, task_name=None,
- include_task=False, **kwargs):
- def execute(self):
- args, kwargs = self.data or ((), {})
- if retries_as_argument:
- kwargs['retries'] = self.retries
- if include_task:
- kwargs['task'] = self
- return func(*args, **kwargs)
- attrs = {
- 'execute': execute,
- '__module__': func.__module__,
- '__doc__': func.__doc__}
- attrs.update(kwargs)
- if not task_name:
- task_name = 'queue_task_%s' % (func.__name__)
- return type(task_name, (task_class,), attrs)
- dash_re = re.compile('(\d+)-(\d+)')
- every_re = re.compile('\*\/(\d+)')
- def crontab(month='*', day='*', day_of_week='*', hour='*', minute='*'):
- """
- Convert a "crontab"-style set of parameters into a test function that will
- return True when the given datetime matches the parameters set forth in
- the crontab.
- For day-of-week, 0=Sunday and 6=Saturday.
- Acceptable inputs:
- * = every distinct value
- */n = run every "n" times, i.e. hours='*/4' == 0, 4, 8, 12, 16, 20
- m-n = run every time m..n
- m,n = run on m and n
- """
- validation = (
- ('m', month, range(1, 13)),
- ('d', day, range(1, 32)),
- ('w', day_of_week, range(8)), # 0-6, but also 7 for Sunday.
- ('H', hour, range(24)),
- ('M', minute, range(60))
- )
- cron_settings = []
- for (date_str, value, acceptable) in validation:
- settings = set([])
- if isinstance(value, int):
- value = str(value)
- for piece in value.split(','):
- if piece == '*':
- settings.update(acceptable)
- continue
- if piece.isdigit():
- piece = int(piece)
- if piece not in acceptable:
- raise ValueError('%d is not a valid input' % piece)
- elif date_str == 'w':
- piece %= 7
- settings.add(piece)
- else:
- dash_match = dash_re.match(piece)
- if dash_match:
- lhs, rhs = map(int, dash_match.groups())
- if lhs not in acceptable or rhs not in acceptable:
- raise ValueError('%s is not a valid input' % piece)
- elif date_str == 'w':
- lhs %= 7
- rhs %= 7
- settings.update(range(lhs, rhs + 1))
- continue
- every_match = every_re.match(piece)
- if every_match:
- if date_str == 'w':
- raise ValueError('Cannot perform this kind of matching'
- ' on day-of-week.')
- interval = int(every_match.groups()[0])
- settings.update(acceptable[::interval])
- cron_settings.append(sorted(list(settings)))
- def validate_date(dt):
- _, m, d, H, M, _, w, _, _ = dt.timetuple()
- # fix the weekday to be sunday=0
- w = (w + 1) % 7
- for (date_piece, selection) in zip([m, d, w, H, M], cron_settings):
- if date_piece not in selection:
- return False
- return True
- return validate_date
|