123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- import json
- import re
- import time
- try:
- from redis import ConnectionPool
- from redis import Redis
- from redis.exceptions import ConnectionError
- except ImportError:
- ConnectionPool = Redis = ConnectionError = None
- from huey.api import Huey
- from huey.constants import EmptyData
- class BaseStorage(object):
- def __init__(self, name='huey', **storage_kwargs):
- self.name = name
- def enqueue(self, data):
- """
- Given an opaque chunk of data, add it to the queue.
- :param bytes data: Task data.
- :return: No return value.
- """
- raise NotImplementedError
- def dequeue(self):
- """
- Atomically remove data from the queue. If no data is available, no data
- is returned.
- :return: Opaque binary task data or None if queue is empty.
- """
- raise NotImplementedError
- def unqueue(self, data):
- """
- Atomically remove the given data from the queue, if it is present. This
- method is used to "delete" a task without executing it. It is not
- expected that this method will be used very often. Up to the
- implementation whether more than one instance of the task can be
- deleted, but typically each chunk of data is unique because it has a
- UUID4 task id.
- :param bytes data: Task data to remove.
- :return: Number of tasks deleted.
- """
- raise NotImplementedError
- def queue_size(self):
- """
- Return the length of the queue.
- :return: Number of tasks.
- """
- raise NotImplementedError
- def enqueued_items(self, limit=None):
- """
- Non-destructively read the given number of tasks from the queue. If no
- limit is specified, all tasks will be read.
- :param int limit: Restrict the number of tasks returned.
- :return: A list containing opaque binary task data.
- """
- raise NotImplementedError
- def flush_queue(self):
- """
- Remove all data from the queue.
- :return: No return value.
- """
- raise NotImplementedError
- def add_to_schedule(self, data, ts):
- """
- Add the given task data to the schedule, to be executed at the given
- timestamp.
- :param bytes data: Task data.
- :param datetime ts: Timestamp at which task should be executed.
- :return: No return value.
- """
- raise NotImplementedError
- def read_schedule(self, ts):
- """
- Read all tasks from the schedule that should be executed at or before
- the given timestamp. Once read, the tasks are removed from the
- schedule.
- :param datetime ts: Timestamp
- :return: List containing task data for tasks which should be executed
- at or before the given timestamp.
- """
- raise NotImplementedError
- def schedule_size(self):
- """
- :return: The number of tasks currently in the schedule.
- """
- raise NotImplementedError
- def scheduled_items(self, limit=None):
- """
- Non-destructively read the given number of tasks from the schedule.
- :param int limit: Restrict the number of tasks returned.
- :return: List of tasks that are in schedule, in order from soonest to
- latest.
- """
- raise NotImplementedError
- def flush_schedule(self):
- """
- Delete all scheduled tasks.
- :return: No return value.
- """
- raise NotImplementedError
- def put_data(self, key, value):
- """
- Store an arbitrary key/value pair.
- :param bytes key: lookup key
- :param bytes value: value
- :return: No return value.
- """
- raise NotImplementedError
- def peek_data(self, key):
- """
- Non-destructively read the value at the given key, if it exists.
- :param bytes key: Key to read.
- :return: Associated value, if key exists, or ``EmptyData``.
- """
- raise NotImplementedError
- def pop_data(self, key):
- """
- Destructively read the value at the given key, if it exists.
- :param bytes key: Key to read.
- :return: Associated value, if key exists, or ``EmptyData``.
- """
- raise NotImplementedError
- def has_data_for_key(self, key):
- """
- Return whether there is data for the given key.
- :return: Boolean value.
- """
- raise NotImplementedError
- def put_if_empty(self, key, value):
- """
- Atomically write data only if the key is not already set.
- :param bytes key: Key to check/set.
- :param bytes value: Arbitrary data.
- :return: Boolean whether key/value was set.
- """
- if self.has_data_for_key(key):
- return False
- self.put_data(key, value)
- return True
- def result_store_size(self):
- """
- :return: Number of key/value pairs in the result store.
- """
- raise NotImplementedError
- def result_items(self):
- """
- Non-destructively read all the key/value pairs from the data-store.
- :return: Dictionary mapping all key/value pairs in the data-store.
- """
- raise NotImplementedError
- def flush_results(self):
- """
- Delete all key/value pairs from the data-store.
- :return: No return value.
- """
- raise NotImplementedError
- def put_error(self, metadata):
- """
- Log an error in the error store. The ``max_errors`` parameter is used
- to prevent the error store from growing without bounds.
- :param metadata: Store the metadata in the error store.
- :return: No return value.
- """
- raise NotImplementedError
- def get_errors(self, limit=None, offset=0):
- """
- Non-destructively read error data from the error store.
- :param int limit: Restrict number of rows returned.
- :param int offset: Start reading at the given offset.
- :return: List of error metadata.
- """
- raise NotImplementedError
- def flush_errors(self):
- """
- Delete all error metadata from the error store.
- :return: No return value.
- """
- raise NotImplementedError
- def emit(self, message):
- """
- Publish a message from the consumer.
- """
- raise NotImplementedError
- def __iter__(self):
- """
- Successively yield events emitted by the huey consumer(s).
- :return: Iterator that yields consumer event metadata.
- """
- # Iterate over consumer-sent events.
- raise NotImplementedError
- def flush_all(self):
- """
- Remove all persistent or semi-persistent data.
- :return: No return value.
- """
- self.flush_queue()
- self.flush_schedule()
- self.flush_results()
- self.flush_errors()
- # A custom lua script to pass to redis that will read tasks from the schedule
- # and atomically pop them from the sorted set and return them. It won't return
- # anything if it isn't able to remove the items it reads.
- SCHEDULE_POP_LUA = """\
- local key = KEYS[1]
- local unix_ts = ARGV[1]
- local res = redis.call('zrangebyscore', key, '-inf', unix_ts)
- if #res and redis.call('zremrangebyscore', key, '-inf', unix_ts) == #res then
- return res
- end"""
- class RedisStorage(BaseStorage):
- redis_client = Redis
- def __init__(self, name='huey', blocking=False, read_timeout=1,
- max_errors=1000, connection_pool=None, url=None,
- client_name=None, **connection_params):
- if Redis is None:
- raise ImportError('"redis" python module not found, cannot use '
- 'Redis storage backend. Run "pip install redis" '
- 'to install.')
- if sum(1 for p in (url, connection_pool, connection_params) if p) > 1:
- raise ValueError(
- 'The connection configuration is over-determined. '
- 'Please specify only one of the the following: '
- '"url", "connection_pool", or "connection_params"')
- if url:
- connection_pool = ConnectionPool.from_url(
- url, decode_components=True)
- elif connection_pool is None:
- connection_pool = ConnectionPool(**connection_params)
- self.pool = connection_pool
- self.conn = self.redis_client(connection_pool=connection_pool)
- self.connection_params = connection_params
- self._pop = self.conn.register_script(SCHEDULE_POP_LUA)
- self.name = self.clean_name(name)
- self.queue_key = 'huey.redis.%s' % self.name
- self.schedule_key = 'huey.schedule.%s' % self.name
- self.result_key = 'huey.results.%s' % self.name
- self.error_key = 'huey.errors.%s' % self.name
- if client_name is not None:
- self.conn.client_setname(client_name)
- self.blocking = blocking
- self.read_timeout = read_timeout
- self.max_errors = max_errors
- def clean_name(self, name):
- return re.sub('[^a-z0-9]', '', name)
- def convert_ts(self, ts):
- return time.mktime(ts.timetuple())
- def enqueue(self, data):
- self.conn.lpush(self.queue_key, data)
- def dequeue(self):
- if self.blocking:
- try:
- return self.conn.brpop(
- self.queue_key,
- timeout=self.read_timeout)[1]
- except (ConnectionError, TypeError, IndexError):
- # Unfortunately, there is no way to differentiate a socket
- # timing out and a host being unreachable.
- return None
- else:
- return self.conn.rpop(self.queue_key)
- def unqueue(self, data):
- return self.conn.lrem(self.queue_key, data)
- def queue_size(self):
- return self.conn.llen(self.queue_key)
- def enqueued_items(self, limit=None):
- limit = limit or -1
- return self.conn.lrange(self.queue_key, 0, limit)
- def flush_queue(self):
- self.conn.delete(self.queue_key)
- def add_to_schedule(self, data, ts):
- self.conn.zadd(self.schedule_key, data, self.convert_ts(ts))
- def read_schedule(self, ts):
- unix_ts = self.convert_ts(ts)
- # invoke the redis lua script that will atomically pop off
- # all the tasks older than the given timestamp
- tasks = self._pop(keys=[self.schedule_key], args=[unix_ts])
- return [] if tasks is None else tasks
- def schedule_size(self):
- return self.conn.zcard(self.schedule_key)
- def scheduled_items(self, limit=None):
- limit = limit or -1
- return self.conn.zrange(self.schedule_key, 0, limit, withscores=False)
- def flush_schedule(self):
- self.conn.delete(self.schedule_key)
- def put_data(self, key, value):
- self.conn.hset(self.result_key, key, value)
- def peek_data(self, key):
- pipe = self.conn.pipeline()
- pipe.hexists(self.result_key, key)
- pipe.hget(self.result_key, key)
- exists, val = pipe.execute()
- return EmptyData if not exists else val
- def pop_data(self, key):
- pipe = self.conn.pipeline()
- pipe.hexists(self.result_key, key)
- pipe.hget(self.result_key, key)
- pipe.hdel(self.result_key, key)
- exists, val, n = pipe.execute()
- return EmptyData if not exists else val
- def has_data_for_key(self, key):
- return self.conn.hexists(self.result_key, key)
- def put_if_empty(self, key, value):
- return self.conn.hsetnx(self.result_key, key, value)
- def result_store_size(self):
- return self.conn.hlen(self.result_key)
- def result_items(self):
- return self.conn.hgetall(self.result_key)
- def flush_results(self):
- self.conn.delete(self.result_key)
- def put_error(self, metadata):
- self.conn.lpush(self.error_key, metadata)
- if self.conn.llen(self.error_key) > self.max_errors:
- self.conn.ltrim(self.error_key, 0, int(self.max_errors * .9))
- def get_errors(self, limit=None, offset=0):
- if limit is None:
- limit = -1
- return self.conn.lrange(self.error_key, offset, limit)
- def flush_errors(self):
- self.conn.delete(self.error_key)
- def emit(self, message):
- self.conn.publish(self.name, message)
- def listener(self):
- """
- Create a channel for listening to raw event data.
- :return: a Redis pubsub object.
- """
- pubsub = self.conn.pubsub()
- pubsub.subscribe([self.name])
- return pubsub
- def __iter__(self):
- return _EventIterator(self.listener())
- class _EventIterator(object):
- def __init__(self, pubsub):
- self.listener = pubsub.listen()
- next(self.listener)
- def next(self):
- return json.loads(next(self.listener)['data'].decode('utf-8'))
- __next__ = next
- class RedisHuey(Huey):
- def get_storage(self, read_timeout=1, max_errors=1000,
- connection_pool=None, url=None, **connection_params):
- return RedisStorage(
- name=self.name,
- blocking=self.blocking,
- read_timeout=read_timeout,
- max_errors=max_errors,
- connection_pool=connection_pool,
- url=url,
- **connection_params)
|