api.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011
  1. import datetime
  2. import json
  3. import pickle
  4. import re
  5. import time
  6. import traceback
  7. import uuid
  8. from collections import OrderedDict
  9. from functools import wraps
  10. from inspect import isclass
  11. from huey.constants import EmptyData
  12. from huey.consumer import Consumer
  13. from huey.exceptions import DataStoreGetException
  14. from huey.exceptions import DataStorePutException
  15. from huey.exceptions import DataStoreTimeout
  16. from huey.exceptions import QueueException
  17. from huey.exceptions import QueueReadException
  18. from huey.exceptions import QueueRemoveException
  19. from huey.exceptions import QueueWriteException
  20. from huey.exceptions import ScheduleAddException
  21. from huey.exceptions import ScheduleReadException
  22. from huey.exceptions import TaskException
  23. from huey.exceptions import TaskLockedException
  24. from huey.registry import registry
  25. from huey.registry import TaskRegistry
  26. from huey.utils import Error
  27. from huey.utils import aware_to_utc
  28. from huey.utils import is_aware
  29. from huey.utils import is_naive
  30. from huey.utils import local_to_utc
  31. from huey.utils import make_naive
  32. from huey.utils import wrap_exception
  33. class Huey(object):
  34. """
  35. Huey executes tasks by exposing function decorators that cause the function
  36. call to be enqueued for execution by the consumer.
  37. Typically your application will only need one Huey instance, but you can
  38. have as many as you like -- the only caveat is that one consumer process
  39. must be executed for each Huey instance.
  40. :param name: a name for the task queue.
  41. :param bool result_store: whether to store task results.
  42. :param bool events: whether to enable consumer-sent events.
  43. :param store_none: Flag to indicate whether tasks that return ``None``
  44. should store their results in the result store.
  45. :param always_eager: Useful for testing, this will execute all tasks
  46. immediately, without enqueueing them.
  47. :param store_errors: Flag to indicate whether task errors should be stored.
  48. :param global_registry: Use a global registry for tasks.
  49. Example usage::
  50. from huey import RedisHuey
  51. # Create a huey instance and disable consumer-sent events.
  52. huey = RedisHuey('my-app', events=False)
  53. @huey.task()
  54. def slow_function(some_arg):
  55. # ... do something ...
  56. return some_arg
  57. @huey.periodic_task(crontab(minute='0', hour='3'))
  58. def backup():
  59. # do a backup every day at 3am
  60. return
  61. """
  62. def __init__(self, name='huey', result_store=True, events=True,
  63. store_none=False, always_eager=False, store_errors=True,
  64. blocking=False, global_registry=True, **storage_kwargs):
  65. self.name = name
  66. self.result_store = result_store
  67. self.events = events
  68. self.store_none = store_none
  69. self.always_eager = always_eager
  70. self.store_errors = store_errors
  71. self.blocking = blocking
  72. self.storage = self.get_storage(**storage_kwargs)
  73. self.pre_execute_hooks = OrderedDict()
  74. self.post_execute_hooks = OrderedDict()
  75. self.startup_hooks = OrderedDict()
  76. self._locks = set()
  77. if global_registry:
  78. self.registry = registry
  79. else:
  80. self.registry = TaskRegistry()
  81. def get_storage(self, **kwargs):
  82. raise NotImplementedError('Storage API not implemented in the base '
  83. 'Huey class. Use `RedisHuey` instead.')
  84. def create_consumer(self, **config):
  85. return Consumer(self, **config)
  86. def _normalize_execute_time(self, eta=None, delay=None, convert_utc=True):
  87. if delay and eta:
  88. raise ValueError('Both a delay and an eta cannot be '
  89. 'specified at the same time')
  90. elif delay:
  91. method = (convert_utc and datetime.datetime.utcnow or
  92. datetime.datetime.now)
  93. return method() + datetime.timedelta(seconds=delay)
  94. elif eta:
  95. if is_naive(eta) and convert_utc:
  96. eta = local_to_utc(eta)
  97. elif is_aware(eta) and convert_utc:
  98. eta = aware_to_utc(eta)
  99. elif is_aware(eta) and not convert_utc:
  100. eta = make_naive(eta)
  101. return eta
  102. def task(self, retries=0, retry_delay=0, retries_as_argument=False,
  103. include_task=False, name=None, **task_settings):
  104. def decorator(func):
  105. """
  106. Decorator to execute a function out-of-band via the consumer.
  107. """
  108. return TaskWrapper(
  109. self,
  110. func.func if isinstance(func, TaskWrapper) else func,
  111. retries=retries,
  112. retry_delay=retry_delay,
  113. retries_as_argument=retries_as_argument,
  114. include_task=include_task,
  115. name=name,
  116. **task_settings)
  117. return decorator
  118. # We specify retries and retry_delay as 0 because they become the default
  119. # values as class attributes on the derived PeriodicQueueTask instance.
  120. # Since the values the class is instantiated with will always be `None`,
  121. # we want the fallback behavior to be 0 by default.
  122. def periodic_task(self, validate_datetime, name=None, retries=0,
  123. retry_delay=0, **task_settings):
  124. """
  125. Decorator to execute a function on a specific schedule.
  126. """
  127. def decorator(func):
  128. def method_validate(self, dt):
  129. return validate_datetime(dt)
  130. return TaskWrapper(
  131. self,
  132. func.func if isinstance(func, TaskWrapper) else func,
  133. name=name,
  134. task_base=PeriodicQueueTask,
  135. default_retries=retries,
  136. default_retry_delay=retry_delay,
  137. validate_datetime=method_validate,
  138. **task_settings)
  139. return decorator
  140. def register_pre_execute(self, name, fn):
  141. """
  142. Register a pre-execute hook. The callback will be executed before the
  143. execution of all tasks. Execution of the task can be cancelled by
  144. raising a :py:class:`CancelExecution` exception. Uncaught exceptions
  145. will be logged but will not cause the task itself to be cancelled.
  146. The callback function should accept a single task instance, the return
  147. value is ignored.
  148. :param name: Name for the hook.
  149. :param fn: Callback function that accepts task to be executed.
  150. """
  151. self.pre_execute_hooks[name] = fn
  152. def unregister_pre_execute(self, name):
  153. del self.pre_execute_hooks[name]
  154. def pre_execute(self, name=None):
  155. """
  156. Decorator for registering a pre-execute hook.
  157. """
  158. def decorator(fn):
  159. self.register_pre_execute(name or fn.__name__, fn)
  160. return fn
  161. return decorator
  162. def register_post_execute(self, name, fn):
  163. """
  164. Register a post-execute hook. The callback will be executed after the
  165. execution of all tasks. Uncaught exceptions will be logged but will
  166. have no other effect on the overall operation of the consumer.
  167. The callback function should accept:
  168. * a task instance
  169. * the return value from the execution of the task (which may be None)
  170. * any exception that was raised during the execution of the task (which
  171. will be None for tasks that executed normally).
  172. The return value of the callback itself is ignored.
  173. :param name: Name for the hook.
  174. :param fn: Callback function that accepts task that was executed and
  175. the tasks return value (or None).
  176. """
  177. self.post_execute_hooks[name] = fn
  178. def unregister_post_execute(self, name):
  179. del self.post_execute_hooks[name]
  180. def post_execute(self, name=None):
  181. """
  182. Decorator for registering a post-execute hook.
  183. """
  184. def decorator(fn):
  185. self.register_post_execute(name or fn.__name__, fn)
  186. return fn
  187. return decorator
  188. def register_startup(self, name, fn):
  189. """
  190. Register a startup hook. The callback will be executed whenever a
  191. worker comes online. Uncaught exceptions will be logged but will
  192. have no other effect on the overall operation of the worker.
  193. The callback function must not accept any parameters.
  194. This API is provided to simplify setting up global resources that, for
  195. whatever reason, should not be created as import-time side-effects. For
  196. example, your tasks need to write data into a Postgres database. If you
  197. create the connection at import-time, before the worker processes are
  198. spawned, you'll likely run into errors when attempting to use the
  199. connection from the child (worker) processes. To avoid this problem,
  200. you can register a startup hook which is executed by the worker process
  201. as part of its initialization.
  202. :param name: Name for the hook.
  203. :param fn: Callback function.
  204. """
  205. self.startup_hooks[name] = fn
  206. def unregister_startup(self, name):
  207. del self.startup_hooks[name]
  208. def on_startup(self, name=None):
  209. """
  210. Decorator for registering a startup hook.
  211. """
  212. def decorator(fn):
  213. self.register_startup(name or fn.__name__, fn)
  214. return fn
  215. return decorator
  216. def _wrapped_operation(exc_class):
  217. def decorator(fn):
  218. def inner(*args, **kwargs):
  219. try:
  220. return fn(*args, **kwargs)
  221. except (KeyboardInterrupt, RuntimeError):
  222. raise
  223. except:
  224. wrap_exception(exc_class)
  225. return inner
  226. return decorator
  227. @_wrapped_operation(QueueWriteException)
  228. def _enqueue(self, msg):
  229. self.storage.enqueue(msg)
  230. @_wrapped_operation(QueueReadException)
  231. def _dequeue(self):
  232. return self.storage.dequeue()
  233. @_wrapped_operation(QueueRemoveException)
  234. def _unqueue(self, msg):
  235. return self.queue.unqueue(msg)
  236. @_wrapped_operation(DataStoreGetException)
  237. def _get_data(self, key, peek=False):
  238. if peek:
  239. return self.storage.peek_data(key)
  240. else:
  241. return self.storage.pop_data(key)
  242. @_wrapped_operation(DataStorePutException)
  243. def _put_data(self, key, value):
  244. return self.storage.put_data(key, value)
  245. @_wrapped_operation(DataStorePutException)
  246. def _put_if_empty(self, key, value):
  247. return self.storage.put_if_empty(key, value)
  248. @_wrapped_operation(DataStorePutException)
  249. def _put_error(self, metadata):
  250. self.storage.put_error(metadata)
  251. @_wrapped_operation(DataStoreGetException)
  252. def _get_errors(self, limit=None, offset=0):
  253. return self.storage.get_errors(limit=limit, offset=offset)
  254. @_wrapped_operation(ScheduleAddException)
  255. def _add_to_schedule(self, data, ts):
  256. self.storage.add_to_schedule(data, ts)
  257. @_wrapped_operation(ScheduleReadException)
  258. def _read_schedule(self, ts):
  259. return self.storage.read_schedule(ts)
  260. def emit(self, message):
  261. try:
  262. self.storage.emit(message)
  263. except:
  264. # Events always fail silently since they are treated as a non-
  265. # critical component.
  266. pass
  267. def _execute_always_eager(self, task):
  268. accum = []
  269. failure_exc = None
  270. while task is not None:
  271. for name, callback in self.pre_execute_hooks.items():
  272. callback(task)
  273. try:
  274. result = task.execute()
  275. except Exception as exc:
  276. result = None
  277. failure_exc = task_exc = exc
  278. else:
  279. task_exc = None
  280. accum.append(result)
  281. for name, callback in self.post_execute_hooks.items():
  282. callback(task, result, task_exc)
  283. if task.on_complete:
  284. task = task.on_complete
  285. task.extend_data(result)
  286. else:
  287. task = None
  288. if failure_exc is not None:
  289. raise failure_exc
  290. return accum[0] if len(accum) == 1 else accum
  291. def enqueue(self, task):
  292. if self.always_eager:
  293. return self._execute_always_eager(task)
  294. self._enqueue(self.registry.get_message_for_task(task))
  295. if not self.result_store:
  296. return
  297. if task.on_complete:
  298. q = [task]
  299. result_wrappers = []
  300. while q:
  301. current = q.pop()
  302. result_wrappers.append(TaskResultWrapper(self, current))
  303. if current.on_complete:
  304. q.append(current.on_complete)
  305. return result_wrappers
  306. else:
  307. return TaskResultWrapper(self, task)
  308. def dequeue(self):
  309. message = self._dequeue()
  310. if message:
  311. return self.registry.get_task_for_message(message)
  312. def put(self, key, value):
  313. return self._put_data(key,
  314. pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
  315. def get(self, key, peek=False):
  316. data = self._get_data(key, peek=peek)
  317. if data is EmptyData:
  318. return
  319. else:
  320. return pickle.loads(data)
  321. def put_error(self, metadata):
  322. return self._put_error(pickle.dumps(metadata))
  323. def _format_time(self, dt):
  324. if dt is None:
  325. return None
  326. return time.mktime(dt.timetuple())
  327. def _get_task_metadata(self, task, error=False, include_data=False):
  328. metadata = {
  329. 'id': task.task_id,
  330. 'task': type(task).__name__,
  331. 'retries': task.retries,
  332. 'retry_delay': task.retry_delay,
  333. 'execute_time': self._format_time(task.execute_time)}
  334. if include_data and not isinstance(task, PeriodicQueueTask):
  335. targs, tkwargs = task.get_data()
  336. if tkwargs.get("task") and isinstance(tkwargs["task"], QueueTask):
  337. del(tkwargs['task'])
  338. metadata['data'] = (targs, tkwargs)
  339. return metadata
  340. def emit_status(self, status, error=False, **data):
  341. if self.events:
  342. metadata = {'status': status, 'error': error}
  343. if error:
  344. metadata['traceback'] = traceback.format_exc()
  345. metadata.update(data)
  346. self.emit(json.dumps(metadata))
  347. def emit_task(self, status, task, error=False, **data):
  348. if self.events:
  349. metadata = self._get_task_metadata(task)
  350. metadata.update(data)
  351. self.emit_status(status, error=error, **metadata)
  352. def execute(self, task):
  353. if not isinstance(task, QueueTask):
  354. raise TypeError('Unknown object: %s' % task)
  355. try:
  356. result = task.execute()
  357. except Exception as exc:
  358. if self.store_errors:
  359. metadata = self._get_task_metadata(task, True)
  360. metadata['error'] = repr(exc)
  361. metadata['traceback'] = traceback.format_exc()
  362. self.put(task.task_id, Error(metadata))
  363. self.put_error(metadata)
  364. raise
  365. if self.result_store and not isinstance(task, PeriodicQueueTask):
  366. if result is not None or self.store_none:
  367. self.put(task.task_id, result)
  368. if task.on_complete:
  369. next_task = task.on_complete
  370. next_task.extend_data(result)
  371. self.enqueue(next_task)
  372. return result
  373. def revoke_all(self, task_class, revoke_until=None, revoke_once=False):
  374. self.put('rt:%s' % task_class.__name__, (revoke_until, revoke_once))
  375. def restore_all(self, task_class):
  376. return self._get_data('rt:%s' % task_class.__name__) is not EmptyData
  377. def revoke(self, task, revoke_until=None, revoke_once=False):
  378. self.put(task.revoke_id, (revoke_until, revoke_once))
  379. def restore(self, task):
  380. # Return value indicates whether the task was in fact revoked.
  381. return self._get_data(task.revoke_id) is not EmptyData
  382. def revoke_by_id(self, task_id, revoke_until=None, revoke_once=False):
  383. return self.revoke(QueueTask(task_id=task_id), revoke_until,
  384. revoke_once)
  385. def restore_by_id(self, task_id):
  386. return self.restore(QueueTask(task_id=task_id))
  387. def _check_revoked(self, revoke_id, dt=None, peek=True):
  388. """
  389. Checks if a task is revoked, returns a 2-tuple indicating:
  390. 1. Is task revoked?
  391. 2. Should task be restored?
  392. """
  393. res = self.get(revoke_id, peek=True)
  394. if res is None:
  395. return False, False
  396. revoke_until, revoke_once = res
  397. if revoke_once:
  398. # This task *was* revoked for one run, but now it should be
  399. # restored to normal execution (unless we are just peeking).
  400. return True, not peek
  401. elif revoke_until is not None and revoke_until <= dt:
  402. # Task is no longer revoked and can be restored.
  403. return False, True
  404. else:
  405. # Task is still revoked. Do not restore.
  406. return True, False
  407. def is_revoked(self, task, dt=None, peek=True):
  408. if isclass(task) and issubclass(task, QueueTask):
  409. revoke_id = 'rt:%s' % task.__name__
  410. is_revoked, can_restore = self._check_revoked(revoke_id, dt, peek)
  411. if can_restore:
  412. self.restore_all(task)
  413. return is_revoked
  414. if not isinstance(task, QueueTask):
  415. task = QueueTask(task_id=task)
  416. is_revoked, can_restore = self._check_revoked(task.revoke_id, dt, peek)
  417. if can_restore:
  418. self.restore(task)
  419. if not is_revoked:
  420. is_revoked = self.is_revoked(type(task), dt, peek)
  421. return is_revoked
  422. def add_schedule(self, task):
  423. msg = self.registry.get_message_for_task(task)
  424. ex_time = task.execute_time or datetime.datetime.fromtimestamp(0)
  425. self._add_to_schedule(msg, ex_time)
  426. def read_schedule(self, ts):
  427. return [self.registry.get_task_for_message(m)
  428. for m in self._read_schedule(ts)]
  429. def read_periodic(self, ts):
  430. periodic = self.registry.get_periodic_tasks()
  431. return [task for task in periodic
  432. if task.validate_datetime(ts)]
  433. def ready_to_run(self, cmd, dt=None):
  434. dt = dt or datetime.datetime.utcnow()
  435. return cmd.execute_time is None or cmd.execute_time <= dt
  436. def pending(self, limit=None):
  437. return [self.registry.get_task_for_message(m)
  438. for m in self.storage.enqueued_items(limit)]
  439. def pending_count(self):
  440. return self.storage.queue_size()
  441. def scheduled(self, limit=None):
  442. return [self.registry.get_task_for_message(m)
  443. for m in self.storage.scheduled_items(limit)]
  444. def scheduled_count(self):
  445. return self.storage.schedule_size()
  446. def all_results(self):
  447. return self.storage.result_items()
  448. def result_count(self):
  449. return self.storage.result_store_size()
  450. def errors(self, limit=None, offset=0):
  451. return [
  452. pickle.loads(error)
  453. for error in self.storage.get_errors(limit, offset)]
  454. def __len__(self):
  455. return self.pending_count()
  456. def flush(self):
  457. self.storage.flush_all()
  458. def get_tasks(self):
  459. return sorted(self.registry._registry.keys())
  460. def get_periodic_tasks(self):
  461. return [name for name, task in self.registry._registry.items()
  462. if hasattr(task, 'validate_datetime')]
  463. def get_regular_tasks(self):
  464. periodic = set(self.get_periodic_tasks())
  465. return [task for task in self.get_tasks() if task not in periodic]
  466. def lock_task(self, lock_name):
  467. """
  468. Utilize the Storage key/value APIs to implement simple locking.
  469. This lock is designed to be used to prevent multiple invocations of a
  470. task from running concurrently. Can be used as either a context-manager
  471. or as a task decorator. If using as a decorator, place it directly
  472. above the function declaration.
  473. If a second invocation occurs and the lock cannot be acquired, then a
  474. special exception is raised, which is handled by the consumer. The task
  475. will not be executed and an ``EVENT_LOCKED`` will be emitted. If the
  476. task is configured to be retried, then it will be retried normally, but
  477. the failure to acquire the lock is not considered an error.
  478. Examples:
  479. @huey.periodic_task(crontab(minute='*/5'))
  480. @huey.lock_task('reports-lock')
  481. def generate_report():
  482. # If a report takes longer than 5 minutes to generate, we do
  483. # not want to kick off another until the previous invocation
  484. # has finished.
  485. run_report()
  486. @huey.periodic_task(crontab(minute='0'))
  487. def backup():
  488. # Generate backup of code
  489. do_code_backup()
  490. # Generate database backup. Since this may take longer than an
  491. # hour, we want to ensure that it is not run concurrently.
  492. with huey.lock_task('db-backup'):
  493. do_db_backup()
  494. """
  495. return TaskLock(self, lock_name)
  496. def flush_locks(self):
  497. """
  498. Flush any stale locks (for example, when restarting the consumer).
  499. :return: List of any stale locks that were cleared.
  500. """
  501. flushed = set()
  502. for lock_key in self._locks:
  503. if self._get_data(lock_key) is not EmptyData:
  504. flushed.add(lock_key.split('.lock.', 1)[-1])
  505. return flushed
  506. def result(self, task_id, blocking=False, timeout=None, backoff=1.15,
  507. max_delay=1.0, revoke_on_timeout=False, preserve=False):
  508. """
  509. Retrieve the results of a task, given the task's ID. This
  510. method accepts the same parameters and has the same behavior
  511. as the :py:class:`TaskResultWrapper` object.
  512. """
  513. task_result = TaskResultWrapper(self, QueueTask(task_id=task_id))
  514. return task_result.get(
  515. blocking=blocking,
  516. timeout=timeout,
  517. backoff=backoff,
  518. max_delay=max_delay,
  519. revoke_on_timeout=revoke_on_timeout,
  520. preserve=preserve)
  521. class TaskWrapper(object):
  522. def __init__(self, huey, func, retries=0, retry_delay=0,
  523. retries_as_argument=False, include_task=False, name=None,
  524. task_base=None, **task_settings):
  525. self.huey = huey
  526. self.func = func
  527. self.retries = retries
  528. self.retry_delay = retry_delay
  529. self.retries_as_argument = retries_as_argument
  530. self.include_task = include_task
  531. self.name = name
  532. self.task_settings = task_settings
  533. self.task_class = create_task(
  534. QueueTask if task_base is None else task_base,
  535. func,
  536. retries_as_argument,
  537. name,
  538. include_task,
  539. **task_settings)
  540. self.huey.registry.register(self.task_class)
  541. def is_revoked(self, dt=None, peek=True):
  542. return self.huey.is_revoked(self.task_class, dt, peek)
  543. def revoke(self, revoke_until=None, revoke_once=False):
  544. self.huey.revoke_all(self.task_class, revoke_until, revoke_once)
  545. def restore(self):
  546. return self.huey.restore_all(self.task_class)
  547. def schedule(self, args=None, kwargs=None, eta=None, delay=None,
  548. convert_utc=True, task_id=None):
  549. execute_time = self.huey._normalize_execute_time(
  550. eta=eta, delay=delay, convert_utc=convert_utc)
  551. cmd = self.task_class(
  552. (args or (), kwargs or {}),
  553. execute_time=execute_time,
  554. retries=self.retries,
  555. retry_delay=self.retry_delay,
  556. task_id=task_id)
  557. return self.huey.enqueue(cmd)
  558. def __call__(self, *args, **kwargs):
  559. return self.huey.enqueue(self.s(*args, **kwargs))
  560. def call_local(self, *args, **kwargs):
  561. return self.func(*args, **kwargs)
  562. def s(self, *args, **kwargs):
  563. return self.task_class((args, kwargs), retries=self.retries,
  564. retry_delay=self.retry_delay)
  565. class TaskLock(object):
  566. """
  567. Utilize the Storage key/value APIs to implement simple locking. For more
  568. details see :py:meth:`Huey.lock_task`.
  569. """
  570. def __init__(self, huey, name):
  571. self._huey = huey
  572. self._name = name
  573. self._key = '%s.lock.%s' % (self._huey.name, self._name)
  574. self._huey._locks.add(self._key)
  575. def __call__(self, fn):
  576. @wraps(fn)
  577. def inner(*args, **kwargs):
  578. with self:
  579. return fn(*args, **kwargs)
  580. return inner
  581. def __enter__(self):
  582. if not self._huey._put_if_empty(self._key, '1'):
  583. raise TaskLockedException('unable to set lock: %s' % self._name)
  584. def __exit__(self, exc_type, exc_val, exc_tb):
  585. self._huey._get_data(self._key)
  586. class TaskResultWrapper(object):
  587. """
  588. Wrapper around task result data. When a task is executed, an instance of
  589. ``TaskResultWrapper`` is returned to provide access to the return value.
  590. To retrieve the task's result value, you can simply call the wrapper::
  591. @huey.task()
  592. def my_task(a, b):
  593. return a + b
  594. result = my_task(1, 2)
  595. # After a moment, when the consumer has executed the task and put
  596. # the result in the result storage, we can retrieve the value.
  597. print result() # Prints 3
  598. # If you want to block until the result is ready, you can pass
  599. # blocking=True. We'll also specify a 4 second timeout so we don't
  600. # block forever if the consumer goes down:
  601. result2 = my_task(2, 3)
  602. print result(blocking=True, timeout=4)
  603. """
  604. def __init__(self, huey, task):
  605. self.huey = huey
  606. self.task = task
  607. self._result = EmptyData
  608. def __call__(self, *args, **kwargs):
  609. return self.get(*args, **kwargs)
  610. def _get(self, preserve=False):
  611. task_id = self.task.task_id
  612. if self._result is EmptyData:
  613. res = self.huey._get_data(task_id, peek=preserve)
  614. if res is not EmptyData:
  615. self._result = pickle.loads(res)
  616. return self._result
  617. else:
  618. return res
  619. else:
  620. return self._result
  621. def get_raw_result(self, blocking=False, timeout=None, backoff=1.15,
  622. max_delay=1.0, revoke_on_timeout=False, preserve=False):
  623. if not blocking:
  624. res = self._get(preserve)
  625. if res is not EmptyData:
  626. return res
  627. else:
  628. start = time.time()
  629. delay = .1
  630. while self._result is EmptyData:
  631. if timeout and time.time() - start >= timeout:
  632. if revoke_on_timeout:
  633. self.revoke()
  634. raise DataStoreTimeout
  635. if delay > max_delay:
  636. delay = max_delay
  637. if self._get(preserve) is EmptyData:
  638. time.sleep(delay)
  639. delay *= backoff
  640. return self._result
  641. def get(self, blocking=False, timeout=None, backoff=1.15, max_delay=1.0,
  642. revoke_on_timeout=False, preserve=False):
  643. result = self.get_raw_result(blocking, timeout, backoff, max_delay,
  644. revoke_on_timeout, preserve)
  645. if result is not None and isinstance(result, Error):
  646. raise TaskException(result.metadata)
  647. return result
  648. def is_revoked(self):
  649. return self.huey.is_revoked(self.task, peek=True)
  650. def revoke(self):
  651. self.huey.revoke(self.task)
  652. def restore(self):
  653. return self.huey.restore(self.task)
  654. def reschedule(self, eta=None, delay=None, convert_utc=True):
  655. # Rescheduling works by revoking the currently-scheduled task (nothing
  656. # is done to check if the task has already run, however). Then the
  657. # original task's data is used to enqueue a new task with a new task ID
  658. # and execution_time.
  659. self.revoke()
  660. execute_time = self.huey._normalize_execute_time(
  661. eta=eta, delay=delay, convert_utc=convert_utc)
  662. cmd = self.task.__class__(
  663. self.task.data,
  664. execute_time=execute_time,
  665. retries=self.task.retries,
  666. retry_delay=self.task.retry_delay,
  667. task_id=None)
  668. return self.huey.enqueue(cmd)
  669. def reset(self):
  670. self._result = EmptyData
  671. def with_metaclass(meta, base=object):
  672. return meta("NewBase", (base,), {})
  673. class QueueTask(object):
  674. """
  675. A class that encapsulates the logic necessary to 'do something' given some
  676. arbitrary data. When enqueued with the :class:`Huey`, it will be
  677. stored in a queue for out-of-band execution via the consumer. See also
  678. the :meth:`task` decorator, which can be used to automatically
  679. execute any function out-of-band.
  680. Example::
  681. class SendEmailTask(QueueTask):
  682. def execute(self):
  683. data = self.get_data()
  684. send_email(data['recipient'], data['subject'], data['body'])
  685. huey.enqueue(
  686. SendEmailTask({
  687. 'recipient': 'somebody@spam.com',
  688. 'subject': 'look at this awesome website',
  689. 'body': 'http://youtube.com'
  690. })
  691. )
  692. """
  693. default_retries = 0
  694. default_retry_delay = 0
  695. def __init__(self, data=None, task_id=None, execute_time=None,
  696. retries=None, retry_delay=None, on_complete=None):
  697. self.name = type(self).__name__
  698. self.set_data(data)
  699. self.task_id = task_id or self.create_id()
  700. self.revoke_id = 'r:%s' % self.task_id
  701. self.execute_time = execute_time
  702. self.retries = retries if retries is not None else self.default_retries
  703. self.retry_delay = retry_delay if retry_delay is not None else \
  704. self.default_retry_delay
  705. self.on_complete = on_complete
  706. def __repr__(self):
  707. rep = '%s.%s: %s' % (self.__module__, self.name, self.task_id)
  708. if self.execute_time:
  709. rep += ' @%s' % self.execute_time
  710. if self.retries:
  711. rep += ' %s retries' % self.retries
  712. if self.on_complete:
  713. rep += ' -> %s' % self.on_complete
  714. return rep
  715. def create_id(self):
  716. return str(uuid.uuid4())
  717. def get_data(self):
  718. return self.data
  719. def set_data(self, data):
  720. self.data = data
  721. def extend_data(self, data):
  722. if data is None or data == ():
  723. return
  724. args, kwargs = self.get_data()
  725. if isinstance(data, tuple):
  726. args += data
  727. elif isinstance(data, dict):
  728. kwargs.update(data)
  729. else:
  730. args = args + (data,)
  731. self.set_data((args, kwargs))
  732. def then(self, task, *args, **kwargs):
  733. if self.on_complete:
  734. self.on_complete.then(task, *args, **kwargs)
  735. else:
  736. self.on_complete = task.s(*args, **kwargs)
  737. return self
  738. def execute(self):
  739. """Execute any arbitary code here"""
  740. raise NotImplementedError
  741. def __eq__(self, rhs):
  742. return (
  743. self.task_id == rhs.task_id and
  744. self.execute_time == rhs.execute_time and
  745. type(self) == type(rhs))
  746. class PeriodicQueueTask(QueueTask):
  747. def validate_datetime(self, dt):
  748. """Validate that the task should execute at the given datetime"""
  749. return False
  750. def create_task(task_class, func, retries_as_argument=False, task_name=None,
  751. include_task=False, **kwargs):
  752. def execute(self):
  753. args, kwargs = self.data or ((), {})
  754. if retries_as_argument:
  755. kwargs['retries'] = self.retries
  756. if include_task:
  757. kwargs['task'] = self
  758. return func(*args, **kwargs)
  759. attrs = {
  760. 'execute': execute,
  761. '__module__': func.__module__,
  762. '__doc__': func.__doc__}
  763. attrs.update(kwargs)
  764. if not task_name:
  765. task_name = 'queue_task_%s' % (func.__name__)
  766. return type(task_name, (task_class,), attrs)
  767. dash_re = re.compile('(\d+)-(\d+)')
  768. every_re = re.compile('\*\/(\d+)')
  769. def crontab(month='*', day='*', day_of_week='*', hour='*', minute='*'):
  770. """
  771. Convert a "crontab"-style set of parameters into a test function that will
  772. return True when the given datetime matches the parameters set forth in
  773. the crontab.
  774. For day-of-week, 0=Sunday and 6=Saturday.
  775. Acceptable inputs:
  776. * = every distinct value
  777. */n = run every "n" times, i.e. hours='*/4' == 0, 4, 8, 12, 16, 20
  778. m-n = run every time m..n
  779. m,n = run on m and n
  780. """
  781. validation = (
  782. ('m', month, range(1, 13)),
  783. ('d', day, range(1, 32)),
  784. ('w', day_of_week, range(8)), # 0-6, but also 7 for Sunday.
  785. ('H', hour, range(24)),
  786. ('M', minute, range(60))
  787. )
  788. cron_settings = []
  789. for (date_str, value, acceptable) in validation:
  790. settings = set([])
  791. if isinstance(value, int):
  792. value = str(value)
  793. for piece in value.split(','):
  794. if piece == '*':
  795. settings.update(acceptable)
  796. continue
  797. if piece.isdigit():
  798. piece = int(piece)
  799. if piece not in acceptable:
  800. raise ValueError('%d is not a valid input' % piece)
  801. elif date_str == 'w':
  802. piece %= 7
  803. settings.add(piece)
  804. else:
  805. dash_match = dash_re.match(piece)
  806. if dash_match:
  807. lhs, rhs = map(int, dash_match.groups())
  808. if lhs not in acceptable or rhs not in acceptable:
  809. raise ValueError('%s is not a valid input' % piece)
  810. elif date_str == 'w':
  811. lhs %= 7
  812. rhs %= 7
  813. settings.update(range(lhs, rhs + 1))
  814. continue
  815. every_match = every_re.match(piece)
  816. if every_match:
  817. if date_str == 'w':
  818. raise ValueError('Cannot perform this kind of matching'
  819. ' on day-of-week.')
  820. interval = int(every_match.groups()[0])
  821. settings.update(acceptable[::interval])
  822. cron_settings.append(sorted(list(settings)))
  823. def validate_date(dt):
  824. _, m, d, H, M, _, w, _, _ = dt.timetuple()
  825. # fix the weekday to be sunday=0
  826. w = (w + 1) % 7
  827. for (date_piece, selection) in zip([m, d, w, H, M], cron_settings):
  828. if date_piece not in selection:
  829. return False
  830. return True
  831. return validate_date