storage.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. import json
  2. import re
  3. import time
  4. try:
  5. from redis import ConnectionPool
  6. from redis import Redis
  7. from redis.exceptions import ConnectionError
  8. except ImportError:
  9. ConnectionPool = Redis = ConnectionError = None
  10. from huey.api import Huey
  11. from huey.constants import EmptyData
  12. class BaseStorage(object):
  13. def __init__(self, name='huey', **storage_kwargs):
  14. self.name = name
  15. def enqueue(self, data):
  16. """
  17. Given an opaque chunk of data, add it to the queue.
  18. :param bytes data: Task data.
  19. :return: No return value.
  20. """
  21. raise NotImplementedError
  22. def dequeue(self):
  23. """
  24. Atomically remove data from the queue. If no data is available, no data
  25. is returned.
  26. :return: Opaque binary task data or None if queue is empty.
  27. """
  28. raise NotImplementedError
  29. def unqueue(self, data):
  30. """
  31. Atomically remove the given data from the queue, if it is present. This
  32. method is used to "delete" a task without executing it. It is not
  33. expected that this method will be used very often. Up to the
  34. implementation whether more than one instance of the task can be
  35. deleted, but typically each chunk of data is unique because it has a
  36. UUID4 task id.
  37. :param bytes data: Task data to remove.
  38. :return: Number of tasks deleted.
  39. """
  40. raise NotImplementedError
  41. def queue_size(self):
  42. """
  43. Return the length of the queue.
  44. :return: Number of tasks.
  45. """
  46. raise NotImplementedError
  47. def enqueued_items(self, limit=None):
  48. """
  49. Non-destructively read the given number of tasks from the queue. If no
  50. limit is specified, all tasks will be read.
  51. :param int limit: Restrict the number of tasks returned.
  52. :return: A list containing opaque binary task data.
  53. """
  54. raise NotImplementedError
  55. def flush_queue(self):
  56. """
  57. Remove all data from the queue.
  58. :return: No return value.
  59. """
  60. raise NotImplementedError
  61. def add_to_schedule(self, data, ts):
  62. """
  63. Add the given task data to the schedule, to be executed at the given
  64. timestamp.
  65. :param bytes data: Task data.
  66. :param datetime ts: Timestamp at which task should be executed.
  67. :return: No return value.
  68. """
  69. raise NotImplementedError
  70. def read_schedule(self, ts):
  71. """
  72. Read all tasks from the schedule that should be executed at or before
  73. the given timestamp. Once read, the tasks are removed from the
  74. schedule.
  75. :param datetime ts: Timestamp
  76. :return: List containing task data for tasks which should be executed
  77. at or before the given timestamp.
  78. """
  79. raise NotImplementedError
  80. def schedule_size(self):
  81. """
  82. :return: The number of tasks currently in the schedule.
  83. """
  84. raise NotImplementedError
  85. def scheduled_items(self, limit=None):
  86. """
  87. Non-destructively read the given number of tasks from the schedule.
  88. :param int limit: Restrict the number of tasks returned.
  89. :return: List of tasks that are in schedule, in order from soonest to
  90. latest.
  91. """
  92. raise NotImplementedError
  93. def flush_schedule(self):
  94. """
  95. Delete all scheduled tasks.
  96. :return: No return value.
  97. """
  98. raise NotImplementedError
  99. def put_data(self, key, value):
  100. """
  101. Store an arbitrary key/value pair.
  102. :param bytes key: lookup key
  103. :param bytes value: value
  104. :return: No return value.
  105. """
  106. raise NotImplementedError
  107. def peek_data(self, key):
  108. """
  109. Non-destructively read the value at the given key, if it exists.
  110. :param bytes key: Key to read.
  111. :return: Associated value, if key exists, or ``EmptyData``.
  112. """
  113. raise NotImplementedError
  114. def pop_data(self, key):
  115. """
  116. Destructively read the value at the given key, if it exists.
  117. :param bytes key: Key to read.
  118. :return: Associated value, if key exists, or ``EmptyData``.
  119. """
  120. raise NotImplementedError
  121. def has_data_for_key(self, key):
  122. """
  123. Return whether there is data for the given key.
  124. :return: Boolean value.
  125. """
  126. raise NotImplementedError
  127. def put_if_empty(self, key, value):
  128. """
  129. Atomically write data only if the key is not already set.
  130. :param bytes key: Key to check/set.
  131. :param bytes value: Arbitrary data.
  132. :return: Boolean whether key/value was set.
  133. """
  134. if self.has_data_for_key(key):
  135. return False
  136. self.put_data(key, value)
  137. return True
  138. def result_store_size(self):
  139. """
  140. :return: Number of key/value pairs in the result store.
  141. """
  142. raise NotImplementedError
  143. def result_items(self):
  144. """
  145. Non-destructively read all the key/value pairs from the data-store.
  146. :return: Dictionary mapping all key/value pairs in the data-store.
  147. """
  148. raise NotImplementedError
  149. def flush_results(self):
  150. """
  151. Delete all key/value pairs from the data-store.
  152. :return: No return value.
  153. """
  154. raise NotImplementedError
  155. def put_error(self, metadata):
  156. """
  157. Log an error in the error store. The ``max_errors`` parameter is used
  158. to prevent the error store from growing without bounds.
  159. :param metadata: Store the metadata in the error store.
  160. :return: No return value.
  161. """
  162. raise NotImplementedError
  163. def get_errors(self, limit=None, offset=0):
  164. """
  165. Non-destructively read error data from the error store.
  166. :param int limit: Restrict number of rows returned.
  167. :param int offset: Start reading at the given offset.
  168. :return: List of error metadata.
  169. """
  170. raise NotImplementedError
  171. def flush_errors(self):
  172. """
  173. Delete all error metadata from the error store.
  174. :return: No return value.
  175. """
  176. raise NotImplementedError
  177. def emit(self, message):
  178. """
  179. Publish a message from the consumer.
  180. """
  181. raise NotImplementedError
  182. def __iter__(self):
  183. """
  184. Successively yield events emitted by the huey consumer(s).
  185. :return: Iterator that yields consumer event metadata.
  186. """
  187. # Iterate over consumer-sent events.
  188. raise NotImplementedError
  189. def flush_all(self):
  190. """
  191. Remove all persistent or semi-persistent data.
  192. :return: No return value.
  193. """
  194. self.flush_queue()
  195. self.flush_schedule()
  196. self.flush_results()
  197. self.flush_errors()
  198. # A custom lua script to pass to redis that will read tasks from the schedule
  199. # and atomically pop them from the sorted set and return them. It won't return
  200. # anything if it isn't able to remove the items it reads.
  201. SCHEDULE_POP_LUA = """\
  202. local key = KEYS[1]
  203. local unix_ts = ARGV[1]
  204. local res = redis.call('zrangebyscore', key, '-inf', unix_ts)
  205. if #res and redis.call('zremrangebyscore', key, '-inf', unix_ts) == #res then
  206. return res
  207. end"""
  208. class RedisStorage(BaseStorage):
  209. redis_client = Redis
  210. def __init__(self, name='huey', blocking=False, read_timeout=1,
  211. max_errors=1000, connection_pool=None, url=None,
  212. client_name=None, **connection_params):
  213. if Redis is None:
  214. raise ImportError('"redis" python module not found, cannot use '
  215. 'Redis storage backend. Run "pip install redis" '
  216. 'to install.')
  217. if sum(1 for p in (url, connection_pool, connection_params) if p) > 1:
  218. raise ValueError(
  219. 'The connection configuration is over-determined. '
  220. 'Please specify only one of the the following: '
  221. '"url", "connection_pool", or "connection_params"')
  222. if url:
  223. connection_pool = ConnectionPool.from_url(
  224. url, decode_components=True)
  225. elif connection_pool is None:
  226. connection_pool = ConnectionPool(**connection_params)
  227. self.pool = connection_pool
  228. self.conn = self.redis_client(connection_pool=connection_pool)
  229. self.connection_params = connection_params
  230. self._pop = self.conn.register_script(SCHEDULE_POP_LUA)
  231. self.name = self.clean_name(name)
  232. self.queue_key = 'huey.redis.%s' % self.name
  233. self.schedule_key = 'huey.schedule.%s' % self.name
  234. self.result_key = 'huey.results.%s' % self.name
  235. self.error_key = 'huey.errors.%s' % self.name
  236. if client_name is not None:
  237. self.conn.client_setname(client_name)
  238. self.blocking = blocking
  239. self.read_timeout = read_timeout
  240. self.max_errors = max_errors
  241. def clean_name(self, name):
  242. return re.sub('[^a-z0-9]', '', name)
  243. def convert_ts(self, ts):
  244. return time.mktime(ts.timetuple())
  245. def enqueue(self, data):
  246. self.conn.lpush(self.queue_key, data)
  247. def dequeue(self):
  248. if self.blocking:
  249. try:
  250. return self.conn.brpop(
  251. self.queue_key,
  252. timeout=self.read_timeout)[1]
  253. except (ConnectionError, TypeError, IndexError):
  254. # Unfortunately, there is no way to differentiate a socket
  255. # timing out and a host being unreachable.
  256. return None
  257. else:
  258. return self.conn.rpop(self.queue_key)
  259. def unqueue(self, data):
  260. return self.conn.lrem(self.queue_key, data)
  261. def queue_size(self):
  262. return self.conn.llen(self.queue_key)
  263. def enqueued_items(self, limit=None):
  264. limit = limit or -1
  265. return self.conn.lrange(self.queue_key, 0, limit)
  266. def flush_queue(self):
  267. self.conn.delete(self.queue_key)
  268. def add_to_schedule(self, data, ts):
  269. self.conn.zadd(self.schedule_key, data, self.convert_ts(ts))
  270. def read_schedule(self, ts):
  271. unix_ts = self.convert_ts(ts)
  272. # invoke the redis lua script that will atomically pop off
  273. # all the tasks older than the given timestamp
  274. tasks = self._pop(keys=[self.schedule_key], args=[unix_ts])
  275. return [] if tasks is None else tasks
  276. def schedule_size(self):
  277. return self.conn.zcard(self.schedule_key)
  278. def scheduled_items(self, limit=None):
  279. limit = limit or -1
  280. return self.conn.zrange(self.schedule_key, 0, limit, withscores=False)
  281. def flush_schedule(self):
  282. self.conn.delete(self.schedule_key)
  283. def put_data(self, key, value):
  284. self.conn.hset(self.result_key, key, value)
  285. def peek_data(self, key):
  286. pipe = self.conn.pipeline()
  287. pipe.hexists(self.result_key, key)
  288. pipe.hget(self.result_key, key)
  289. exists, val = pipe.execute()
  290. return EmptyData if not exists else val
  291. def pop_data(self, key):
  292. pipe = self.conn.pipeline()
  293. pipe.hexists(self.result_key, key)
  294. pipe.hget(self.result_key, key)
  295. pipe.hdel(self.result_key, key)
  296. exists, val, n = pipe.execute()
  297. return EmptyData if not exists else val
  298. def has_data_for_key(self, key):
  299. return self.conn.hexists(self.result_key, key)
  300. def put_if_empty(self, key, value):
  301. return self.conn.hsetnx(self.result_key, key, value)
  302. def result_store_size(self):
  303. return self.conn.hlen(self.result_key)
  304. def result_items(self):
  305. return self.conn.hgetall(self.result_key)
  306. def flush_results(self):
  307. self.conn.delete(self.result_key)
  308. def put_error(self, metadata):
  309. self.conn.lpush(self.error_key, metadata)
  310. if self.conn.llen(self.error_key) > self.max_errors:
  311. self.conn.ltrim(self.error_key, 0, int(self.max_errors * .9))
  312. def get_errors(self, limit=None, offset=0):
  313. if limit is None:
  314. limit = -1
  315. return self.conn.lrange(self.error_key, offset, limit)
  316. def flush_errors(self):
  317. self.conn.delete(self.error_key)
  318. def emit(self, message):
  319. self.conn.publish(self.name, message)
  320. def listener(self):
  321. """
  322. Create a channel for listening to raw event data.
  323. :return: a Redis pubsub object.
  324. """
  325. pubsub = self.conn.pubsub()
  326. pubsub.subscribe([self.name])
  327. return pubsub
  328. def __iter__(self):
  329. return _EventIterator(self.listener())
  330. class _EventIterator(object):
  331. def __init__(self, pubsub):
  332. self.listener = pubsub.listen()
  333. next(self.listener)
  334. def next(self):
  335. return json.loads(next(self.listener)['data'].decode('utf-8'))
  336. __next__ = next
  337. class RedisHuey(Huey):
  338. def get_storage(self, read_timeout=1, max_errors=1000,
  339. connection_pool=None, url=None, **connection_params):
  340. return RedisStorage(
  341. name=self.name,
  342. blocking=self.blocking,
  343. read_timeout=read_timeout,
  344. max_errors=max_errors,
  345. connection_pool=connection_pool,
  346. url=url,
  347. **connection_params)