__init__.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.database
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. SQLAlchemy result store backend.
  6. """
  7. from __future__ import absolute_import
  8. from functools import wraps
  9. from celery import states
  10. from celery.exceptions import ImproperlyConfigured
  11. from celery.five import range
  12. from celery.utils.timeutils import maybe_timedelta
  13. from celery.backends.base import BaseBackend
  14. from .models import Task, TaskSet
  15. from .session import ResultSession
  16. __all__ = ['DatabaseBackend']
  17. def _sqlalchemy_installed():
  18. try:
  19. import sqlalchemy
  20. except ImportError:
  21. raise ImproperlyConfigured(
  22. 'The database result backend requires SQLAlchemy to be installed.'
  23. 'See http://pypi.python.org/pypi/SQLAlchemy')
  24. return sqlalchemy
  25. _sqlalchemy_installed()
  26. from sqlalchemy.exc import DatabaseError, OperationalError
  27. def retry(fun):
  28. @wraps(fun)
  29. def _inner(*args, **kwargs):
  30. max_retries = kwargs.pop('max_retries', 3)
  31. for retries in range(max_retries):
  32. try:
  33. return fun(*args, **kwargs)
  34. except (DatabaseError, OperationalError):
  35. if retries + 1 >= max_retries:
  36. raise
  37. return _inner
  38. class DatabaseBackend(BaseBackend):
  39. """The database result backend."""
  40. # ResultSet.iterate should sleep this much between each pool,
  41. # to not bombard the database with queries.
  42. subpolling_interval = 0.5
  43. def __init__(self, dburi=None, expires=None,
  44. engine_options=None, url=None, **kwargs):
  45. # The `url` argument was added later and is used by
  46. # the app to set backend by url (celery.backends.get_backend_by_url)
  47. super(DatabaseBackend, self).__init__(**kwargs)
  48. conf = self.app.conf
  49. self.expires = maybe_timedelta(self.prepare_expires(expires))
  50. self.dburi = url or dburi or conf.CELERY_RESULT_DBURI
  51. self.engine_options = dict(
  52. engine_options or {},
  53. **conf.CELERY_RESULT_ENGINE_OPTIONS or {})
  54. self.short_lived_sessions = kwargs.get(
  55. 'short_lived_sessions',
  56. conf.CELERY_RESULT_DB_SHORT_LIVED_SESSIONS,
  57. )
  58. tablenames = conf.CELERY_RESULT_DB_TABLENAMES or {}
  59. Task.__table__.name = tablenames.get('task', 'celery_taskmeta')
  60. TaskSet.__table__.name = tablenames.get('group', 'celery_tasksetmeta')
  61. if not self.dburi:
  62. raise ImproperlyConfigured(
  63. 'Missing connection string! Do you have '
  64. 'CELERY_RESULT_DBURI set to a real value?')
  65. def ResultSession(self):
  66. return ResultSession(
  67. dburi=self.dburi,
  68. short_lived_sessions=self.short_lived_sessions,
  69. **self.engine_options
  70. )
  71. @retry
  72. def _store_result(self, task_id, result, status,
  73. traceback=None, max_retries=3, **kwargs):
  74. """Store return value and status of an executed task."""
  75. session = self.ResultSession()
  76. try:
  77. task = session.query(Task).filter(Task.task_id == task_id).first()
  78. if not task:
  79. task = Task(task_id)
  80. session.add(task)
  81. session.flush()
  82. task.result = result
  83. task.status = status
  84. task.traceback = traceback
  85. session.commit()
  86. return result
  87. finally:
  88. session.close()
  89. @retry
  90. def _get_task_meta_for(self, task_id):
  91. """Get task metadata for a task by id."""
  92. session = self.ResultSession()
  93. try:
  94. task = session.query(Task).filter(Task.task_id == task_id).first()
  95. if task is None:
  96. task = Task(task_id)
  97. task.status = states.PENDING
  98. task.result = None
  99. return task.to_dict()
  100. finally:
  101. session.close()
  102. @retry
  103. def _save_group(self, group_id, result):
  104. """Store the result of an executed group."""
  105. session = self.ResultSession()
  106. try:
  107. group = TaskSet(group_id, result)
  108. session.add(group)
  109. session.flush()
  110. session.commit()
  111. return result
  112. finally:
  113. session.close()
  114. @retry
  115. def _restore_group(self, group_id):
  116. """Get metadata for group by id."""
  117. session = self.ResultSession()
  118. try:
  119. group = session.query(TaskSet).filter(
  120. TaskSet.taskset_id == group_id).first()
  121. if group:
  122. return group.to_dict()
  123. finally:
  124. session.close()
  125. @retry
  126. def _delete_group(self, group_id):
  127. """Delete metadata for group by id."""
  128. session = self.ResultSession()
  129. try:
  130. session.query(TaskSet).filter(
  131. TaskSet.taskset_id == group_id).delete()
  132. session.flush()
  133. session.commit()
  134. finally:
  135. session.close()
  136. @retry
  137. def _forget(self, task_id):
  138. """Forget about result."""
  139. session = self.ResultSession()
  140. try:
  141. session.query(Task).filter(Task.task_id == task_id).delete()
  142. session.commit()
  143. finally:
  144. session.close()
  145. def cleanup(self):
  146. """Delete expired metadata."""
  147. session = self.ResultSession()
  148. expires = self.expires
  149. now = self.app.now()
  150. try:
  151. session.query(Task).filter(
  152. Task.date_done < (now - expires)).delete()
  153. session.query(TaskSet).filter(
  154. TaskSet.date_done < (now - expires)).delete()
  155. session.commit()
  156. finally:
  157. session.close()
  158. def __reduce__(self, args=(), kwargs={}):
  159. kwargs.update(
  160. dict(dburi=self.dburi,
  161. expires=self.expires,
  162. engine_options=self.engine_options))
  163. return super(DatabaseBackend, self).__reduce__(args, kwargs)