sqlitedb.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. import operator
  2. import pickle
  3. import threading
  4. from peewee import *
  5. from huey.api import Huey
  6. from huey.constants import EmptyData
  7. from huey.storage import BaseStorage
  8. class BaseModel(Model):
  9. class Meta:
  10. database = SqliteDatabase(None) # Placeholder.
  11. class Task(BaseModel):
  12. queue = CharField()
  13. data = BlobField()
  14. class Schedule(BaseModel):
  15. queue = CharField()
  16. data = BlobField()
  17. timestamp = TimestampField()
  18. class KeyValue(BaseModel):
  19. queue = CharField()
  20. key = CharField()
  21. value = BlobField()
  22. class Meta:
  23. primary_key = CompositeKey('queue', 'key')
  24. class SqliteStorage(BaseStorage):
  25. def __init__(self, name='huey', filename='huey.db', **storage_kwargs):
  26. self.filename = filename
  27. self.database = SqliteDatabase(filename, **storage_kwargs)
  28. super(SqliteStorage, self).__init__(name)
  29. self.initialize_task_table()
  30. def initialize_task_table(self):
  31. Task._meta.database = self.database
  32. Schedule._meta.database = self.database
  33. KeyValue._meta.database = self.database
  34. self.database.create_tables([Task, Schedule, KeyValue], safe=True)
  35. def tasks(self, *columns):
  36. return Task.select(*columns).where(Task.queue == self.name)
  37. def delete(self):
  38. return Task.delete().where(Task.queue == self.name)
  39. def schedule(self, *columns):
  40. return (Schedule
  41. .select(*columns)
  42. .where(Schedule.queue == self.name)
  43. .order_by(Schedule.timestamp))
  44. def kv(self, *columns):
  45. return KeyValue.select(*columns).where(KeyValue.queue == self.name)
  46. def enqueue(self, data):
  47. Task.create(queue=self.name, data=data)
  48. def dequeue(self):
  49. try:
  50. task = (self
  51. .tasks()
  52. .order_by(Task.id)
  53. .limit(1)
  54. .get())
  55. except Task.DoesNotExist:
  56. return
  57. res = self.delete().where(Task.id == task.id).execute()
  58. if res == 1:
  59. return task.data
  60. def unqueue(self, data):
  61. return (self
  62. .delete()
  63. .where(Task.data == data)
  64. .execute())
  65. def queue_size(self):
  66. return self.tasks().count()
  67. def enqueued_items(self, limit=None):
  68. query = self.tasks(Task.data).tuples()
  69. if limit is not None:
  70. query = query.limit(limit)
  71. return map(operator.itemgetter(0), query)
  72. def flush_queue(self):
  73. self.delete().execute()
  74. def add_to_schedule(self, data, ts):
  75. Schedule.create(data=data, timestamp=ts, queue=self.name)
  76. def read_schedule(self, ts):
  77. tasks = (self
  78. .schedule(Schedule.id, Schedule.data)
  79. .where(Schedule.timestamp <= ts)
  80. .tuples())
  81. id_list, data = [], []
  82. for task_id, task_data in tasks:
  83. id_list.append(task_id)
  84. data.append(task_data)
  85. if id_list:
  86. (Schedule
  87. .delete()
  88. .where(Schedule.id << id_list)
  89. .execute())
  90. return data
  91. def schedule_size(self):
  92. return self.schedule().count()
  93. def scheduled_items(self, limit=None):
  94. tasks = (self
  95. .schedule(Schedule.data)
  96. .order_by(Schedule.timestamp)
  97. .tuples())
  98. return map(operator.itemgetter(0), tasks)
  99. def flush_schedule(self):
  100. return Schedule.delete().where(Schedule.queue == self.name).execute()
  101. def put_data(self, key, value):
  102. KeyValue.create(queue=self.name, key=key, value=value)
  103. def peek_data(self, key):
  104. try:
  105. kv = self.kv(KeyValue.value).where(KeyValue.key == key).get()
  106. except KeyValue.DoesNotExist:
  107. return EmptyData
  108. else:
  109. return kv.value
  110. def pop_data(self, key):
  111. try:
  112. kv = self.kv().where(KeyValue.key == key).get()
  113. except KeyValue.DoesNotExist:
  114. return EmptyData
  115. else:
  116. dq = KeyValue.delete().where(
  117. (KeyValue.queue == self.name) &
  118. (KeyValue.key == key))
  119. return kv.value if dq.execute() == 1 else EmptyData
  120. def has_data_for_key(self, key):
  121. return self.kv().where(KeyValue.key == key).exists()
  122. def put_if_empty(self, key, value):
  123. sql = ('INSERT OR ABORT INTO "keyvalue" ("queue", "key", "value") '
  124. 'VALUES (?, ?, ?);')
  125. try:
  126. res = self.database.execute_sql(sql, (self.name, key, value))
  127. except IntegrityError:
  128. return False
  129. else:
  130. return True
  131. def result_store_size(self):
  132. return self.kv().count()
  133. def result_items(self):
  134. query = self.kv(KeyValue.key, KeyValue.value).tuples()
  135. return dict((k, pickle.loads(v)) for k, v in query.iterator())
  136. def flush_results(self):
  137. return KeyValue.delete().where(KeyValue.queue == self.name).execute()
  138. def put_error(self, metadata):
  139. pass
  140. def get_error(self, limit=None, offset=0):
  141. pass
  142. def flush_errors(self):
  143. pass
  144. def emit(self, message):
  145. pass
  146. def __iter__(self):
  147. return self
  148. def next(self):
  149. raise StopIteration
  150. __next__ = next
  151. class SqliteHuey(Huey):
  152. def get_storage(self, filename='huey.db', **sqlite_kwargs):
  153. return SqliteStorage(
  154. name=self.name,
  155. filename=filename,
  156. **sqlite_kwargs)