minimal.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #
  2. # Minimal huey-like API using gevent and running within the parent process.
  3. #
  4. import datetime
  5. import heapq
  6. import logging
  7. import time
  8. from functools import wraps
  9. import gevent
  10. from gevent.event import AsyncResult
  11. from gevent.event import Event
  12. from gevent.pool import Pool
  13. from huey.api import crontab
  14. logger = logging.getLogger('huey.mini')
  15. class MiniHuey(object):
  16. def __init__(self, name='huey', interval=1, pool_size=None):
  17. self.name = name
  18. self._interval = interval
  19. self._last_check = datetime.datetime.now()
  20. self._periodic_interval = datetime.timedelta(seconds=60)
  21. self._periodic_tasks = []
  22. self._scheduled_tasks = []
  23. self._shutdown = Event()
  24. self._pool = Pool(pool_size)
  25. self._run_t = None
  26. def task(self, validate_func=None):
  27. if validate_func is not None:
  28. def periodic_task_wrapper(fn):
  29. self._periodic_tasks.append((validate_func, fn))
  30. return fn
  31. return periodic_task_wrapper
  32. def decorator(fn):
  33. @wraps(fn)
  34. def _inner(*args, **kwargs):
  35. async_result = AsyncResult()
  36. self._enqueue(fn, args, kwargs, async_result)
  37. return async_result
  38. def _schedule(args=None, kwargs=None, delay=None, eta=None):
  39. if delay is not None:
  40. eta = (datetime.datetime.now() +
  41. datetime.timedelta(seconds=delay))
  42. if eta is None:
  43. raise ValueError('Either a delay (in seconds) or an '
  44. 'eta (datetime) must be specified.')
  45. async_result = AsyncResult()
  46. heapq.heappush(self._scheduled_tasks,
  47. (eta, fn, args, kwargs, async_result))
  48. return async_result
  49. _inner.schedule = _schedule
  50. return _inner
  51. return decorator
  52. def start(self):
  53. if self._run_t is not None:
  54. raise Exception('Task runner is already running.')
  55. self._run_t = gevent.spawn(self._run)
  56. def stop(self):
  57. if self._run_t is None:
  58. raise Exception('Task runner does not appear to have started.')
  59. self._shutdown.set()
  60. logger.info('shutdown requested.')
  61. self._run_t.join()
  62. self._run_t = None
  63. def _enqueue(self, fn, args=None, kwargs=None, async_result=None):
  64. logger.info('enqueueing %s' % fn.__name__)
  65. self._pool.spawn(self._execute, fn, args, kwargs, async_result)
  66. def _execute(self, fn, args, kwargs, async_result):
  67. args = args or ()
  68. kwargs = kwargs or {}
  69. start = time.time()
  70. try:
  71. ret = fn(*args, **kwargs)
  72. except Exception as exc:
  73. logger.exception('task %s failed' % fn.__name__)
  74. raise
  75. else:
  76. duration = time.time() - start
  77. if async_result is not None:
  78. async_result.set(ret)
  79. logger.info('executed %s in %0.3fs', fn.__name__, duration)
  80. def _run(self):
  81. logger.info('task runner started.')
  82. while not self._shutdown.is_set():
  83. start = time.time()
  84. now = datetime.datetime.now()
  85. if self._last_check + self._periodic_interval <= now:
  86. logger.debug('checking periodic task schedule')
  87. self._last_check = now
  88. for validate_func, fn in self._periodic_tasks:
  89. if validate_func(now):
  90. self._enqueue(fn)
  91. if self._scheduled_tasks:
  92. logger.debug('checking scheduled tasks')
  93. # The 0-th item of a heap is always the smallest.
  94. while self._scheduled_tasks and \
  95. self._scheduled_tasks[0][0] <= now:
  96. eta, fn, args, kwargs, async_result = (
  97. heapq.heappop(self._scheduled_tasks))
  98. self._enqueue(fn, args, kwargs, async_result)
  99. # Wait for most of the remained of the time remaining.
  100. remaining = self._interval - (time.time() - start)
  101. if remaining > 0:
  102. if not self._shutdown.wait(remaining * 0.9):
  103. gevent.sleep(self._interval - (time.time() - start))
  104. logger.info('exiting task runner')