123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- #
- # Minimal huey-like API using gevent and running within the parent process.
- #
- import datetime
- import heapq
- import logging
- import time
- from functools import wraps
- import gevent
- from gevent.event import AsyncResult
- from gevent.event import Event
- from gevent.pool import Pool
- from huey.api import crontab
- logger = logging.getLogger('huey.mini')
- class MiniHuey(object):
- def __init__(self, name='huey', interval=1, pool_size=None):
- self.name = name
- self._interval = interval
- self._last_check = datetime.datetime.now()
- self._periodic_interval = datetime.timedelta(seconds=60)
- self._periodic_tasks = []
- self._scheduled_tasks = []
- self._shutdown = Event()
- self._pool = Pool(pool_size)
- self._run_t = None
- def task(self, validate_func=None):
- if validate_func is not None:
- def periodic_task_wrapper(fn):
- self._periodic_tasks.append((validate_func, fn))
- return fn
- return periodic_task_wrapper
- def decorator(fn):
- @wraps(fn)
- def _inner(*args, **kwargs):
- async_result = AsyncResult()
- self._enqueue(fn, args, kwargs, async_result)
- return async_result
- def _schedule(args=None, kwargs=None, delay=None, eta=None):
- if delay is not None:
- eta = (datetime.datetime.now() +
- datetime.timedelta(seconds=delay))
- if eta is None:
- raise ValueError('Either a delay (in seconds) or an '
- 'eta (datetime) must be specified.')
- async_result = AsyncResult()
- heapq.heappush(self._scheduled_tasks,
- (eta, fn, args, kwargs, async_result))
- return async_result
- _inner.schedule = _schedule
- return _inner
- return decorator
- def start(self):
- if self._run_t is not None:
- raise Exception('Task runner is already running.')
- self._run_t = gevent.spawn(self._run)
- def stop(self):
- if self._run_t is None:
- raise Exception('Task runner does not appear to have started.')
- self._shutdown.set()
- logger.info('shutdown requested.')
- self._run_t.join()
- self._run_t = None
- def _enqueue(self, fn, args=None, kwargs=None, async_result=None):
- logger.info('enqueueing %s' % fn.__name__)
- self._pool.spawn(self._execute, fn, args, kwargs, async_result)
- def _execute(self, fn, args, kwargs, async_result):
- args = args or ()
- kwargs = kwargs or {}
- start = time.time()
- try:
- ret = fn(*args, **kwargs)
- except Exception as exc:
- logger.exception('task %s failed' % fn.__name__)
- raise
- else:
- duration = time.time() - start
- if async_result is not None:
- async_result.set(ret)
- logger.info('executed %s in %0.3fs', fn.__name__, duration)
- def _run(self):
- logger.info('task runner started.')
- while not self._shutdown.is_set():
- start = time.time()
- now = datetime.datetime.now()
- if self._last_check + self._periodic_interval <= now:
- logger.debug('checking periodic task schedule')
- self._last_check = now
- for validate_func, fn in self._periodic_tasks:
- if validate_func(now):
- self._enqueue(fn)
- if self._scheduled_tasks:
- logger.debug('checking scheduled tasks')
- # The 0-th item of a heap is always the smallest.
- while self._scheduled_tasks and \
- self._scheduled_tasks[0][0] <= now:
- eta, fn, args, kwargs, async_result = (
- heapq.heappop(self._scheduled_tasks))
- self._enqueue(fn, args, kwargs, async_result)
- # Wait for most of the remained of the time remaining.
- remaining = self._interval - (time.time() - start)
- if remaining > 0:
- if not self._shutdown.wait(remaining * 0.9):
- gevent.sleep(self._interval - (time.time() - start))
- logger.info('exiting task runner')
|