base.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.base
  4. ~~~~~~~~~~~~~~~~
  5. The task implementation has been moved to :mod:`celery.app.task`.
  6. This contains the backward compatible Task class used in the old API,
  7. and shouldn't be used in new applications.
  8. """
  9. from __future__ import absolute_import
  10. from kombu import Exchange
  11. from celery import current_app
  12. from celery.app.task import Context, TaskType, Task as BaseTask # noqa
  13. from celery.five import class_property, reclassmethod
  14. from celery.schedules import maybe_schedule
  15. from celery.utils.log import get_task_logger
  16. __all__ = ['Task', 'PeriodicTask', 'task']
  17. #: list of methods that must be classmethods in the old API.
  18. _COMPAT_CLASSMETHODS = (
  19. 'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
  20. 'AsyncResult', 'subtask', '_get_request', '_get_exec_options',
  21. )
  22. class Task(BaseTask):
  23. """Deprecated Task base class.
  24. Modern applications should use :class:`celery.Task` instead.
  25. """
  26. abstract = True
  27. __bound__ = False
  28. __v2_compat__ = True
  29. #- Deprecated compat. attributes -:
  30. queue = None
  31. routing_key = None
  32. exchange = None
  33. exchange_type = None
  34. delivery_mode = None
  35. mandatory = False # XXX deprecated
  36. immediate = False # XXX deprecated
  37. priority = None
  38. type = 'regular'
  39. disable_error_emails = False
  40. accept_magic_kwargs = False
  41. from_config = BaseTask.from_config + (
  42. ('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),
  43. ('delivery_mode', 'CELERY_DEFAULT_DELIVERY_MODE'),
  44. )
  45. # In old Celery the @task decorator didn't exist, so one would create
  46. # classes instead and use them directly (e.g. MyTask.apply_async()).
  47. # the use of classmethods was a hack so that it was not necessary
  48. # to instantiate the class before using it, but it has only
  49. # given us pain (like all magic).
  50. for name in _COMPAT_CLASSMETHODS:
  51. locals()[name] = reclassmethod(getattr(BaseTask, name))
  52. @class_property
  53. def request(cls):
  54. return cls._get_request()
  55. @classmethod
  56. def get_logger(self, **kwargs):
  57. return get_task_logger(self.name)
  58. @classmethod
  59. def establish_connection(self):
  60. """Deprecated method used to get a broker connection.
  61. Should be replaced with :meth:`@Celery.connection`
  62. instead, or by acquiring connections from the connection pool:
  63. .. code-block:: python
  64. # using the connection pool
  65. with celery.pool.acquire(block=True) as conn:
  66. ...
  67. # establish fresh connection
  68. with celery.connection() as conn:
  69. ...
  70. """
  71. return self._get_app().connection()
  72. def get_publisher(self, connection=None, exchange=None,
  73. exchange_type=None, **options):
  74. """Deprecated method to get the task publisher (now called producer).
  75. Should be replaced with :class:`@amqp.TaskProducer`:
  76. .. code-block:: python
  77. with celery.connection() as conn:
  78. with celery.amqp.TaskProducer(conn) as prod:
  79. my_task.apply_async(producer=prod)
  80. """
  81. exchange = self.exchange if exchange is None else exchange
  82. if exchange_type is None:
  83. exchange_type = self.exchange_type
  84. connection = connection or self.establish_connection()
  85. return self._get_app().amqp.TaskProducer(
  86. connection,
  87. exchange=exchange and Exchange(exchange, exchange_type),
  88. routing_key=self.routing_key, **options
  89. )
  90. @classmethod
  91. def get_consumer(self, connection=None, queues=None, **kwargs):
  92. """Deprecated method used to get consumer for the queue
  93. this task is sent to.
  94. Should be replaced with :class:`@amqp.TaskConsumer` instead:
  95. """
  96. Q = self._get_app().amqp
  97. connection = connection or self.establish_connection()
  98. if queues is None:
  99. queues = Q.queues[self.queue] if self.queue else Q.default_queue
  100. return Q.TaskConsumer(connection, queues, **kwargs)
  101. class PeriodicTask(Task):
  102. """A periodic task is a task that adds itself to the
  103. :setting:`CELERYBEAT_SCHEDULE` setting."""
  104. abstract = True
  105. ignore_result = True
  106. relative = False
  107. options = None
  108. compat = True
  109. def __init__(self):
  110. if not hasattr(self, 'run_every'):
  111. raise NotImplementedError(
  112. 'Periodic tasks must have a run_every attribute')
  113. self.run_every = maybe_schedule(self.run_every, self.relative)
  114. super(PeriodicTask, self).__init__()
  115. @classmethod
  116. def on_bound(cls, app):
  117. app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
  118. 'task': cls.name,
  119. 'schedule': cls.run_every,
  120. 'args': (),
  121. 'kwargs': {},
  122. 'options': cls.options or {},
  123. 'relative': cls.relative,
  124. }
  125. def task(*args, **kwargs):
  126. """Decorator to create a task class out of any callable.
  127. **Examples**
  128. .. code-block:: python
  129. @task()
  130. def refresh_feed(url):
  131. return Feed.objects.get(url=url).refresh()
  132. With setting extra options and using retry.
  133. .. code-block:: python
  134. @task(max_retries=10)
  135. def refresh_feed(url):
  136. try:
  137. return Feed.objects.get(url=url).refresh()
  138. except socket.error as exc:
  139. refresh_feed.retry(exc=exc)
  140. Calling the resulting task:
  141. >>> refresh_feed('http://example.com/rss') # Regular
  142. <Feed: http://example.com/rss>
  143. >>> refresh_feed.delay('http://example.com/rss') # Async
  144. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  145. """
  146. return current_app.task(*args, **dict({'accept_magic_kwargs': False,
  147. 'base': Task}, **kwargs))
  148. def periodic_task(*args, **options):
  149. """Decorator to create a task class out of any callable.
  150. .. admonition:: Examples
  151. .. code-block:: python
  152. @task()
  153. def refresh_feed(url):
  154. return Feed.objects.get(url=url).refresh()
  155. With setting extra options and using retry.
  156. .. code-block:: python
  157. from celery.task import current
  158. @task(exchange='feeds')
  159. def refresh_feed(url):
  160. try:
  161. return Feed.objects.get(url=url).refresh()
  162. except socket.error as exc:
  163. current.retry(exc=exc)
  164. Calling the resulting task:
  165. >>> refresh_feed('http://example.com/rss') # Regular
  166. <Feed: http://example.com/rss>
  167. >>> refresh_feed.delay('http://example.com/rss') # Async
  168. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  169. """
  170. return task(**dict({'base': PeriodicTask}, **options))