builtins.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.builtins
  4. ~~~~~~~~~~~~~~~~~~~
  5. Built-in tasks that are always available in all
  6. app instances. E.g. chord, group and xmap.
  7. """
  8. from __future__ import absolute_import
  9. from collections import deque
  10. from celery._state import get_current_worker_task
  11. from celery.utils import uuid
  12. __all__ = ['shared_task', 'load_shared_tasks']
  13. #: global list of functions defining tasks that should be
  14. #: added to all apps.
  15. _shared_tasks = set()
  16. def shared_task(constructor):
  17. """Decorator that specifies a function that generates a built-in task.
  18. The function will then be called for every new app instance created
  19. (lazily, so more exactly when the task registry for that app is needed).
  20. The function must take a single ``app`` argument.
  21. """
  22. _shared_tasks.add(constructor)
  23. return constructor
  24. def load_shared_tasks(app):
  25. """Create built-in tasks for an app instance."""
  26. constructors = set(_shared_tasks)
  27. for constructor in constructors:
  28. constructor(app)
  29. @shared_task
  30. def add_backend_cleanup_task(app):
  31. """The backend cleanup task can be used to clean up the default result
  32. backend.
  33. If the configured backend requires periodic cleanup this task is also
  34. automatically configured to run every day at midnight (requires
  35. :program:`celery beat` to be running).
  36. """
  37. @app.task(name='celery.backend_cleanup',
  38. shared=False, _force_evaluate=True)
  39. def backend_cleanup():
  40. app.backend.cleanup()
  41. return backend_cleanup
  42. @shared_task
  43. def add_unlock_chord_task(app):
  44. """This task is used by result backends without native chord support.
  45. It joins chords by creating a task chain polling the header for completion.
  46. """
  47. from celery.canvas import signature
  48. from celery.exceptions import ChordError
  49. from celery.result import result_from_tuple
  50. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  51. @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
  52. default_retry_delay=1, ignore_result=True, _force_evaluate=True)
  53. def unlock_chord(group_id, callback, interval=None, propagate=None,
  54. max_retries=None, result=None,
  55. Result=app.AsyncResult, GroupResult=app.GroupResult,
  56. result_from_tuple=result_from_tuple):
  57. # if propagate is disabled exceptions raised by chord tasks
  58. # will be sent as part of the result list to the chord callback.
  59. # Since 3.1 propagate will be enabled by default, and instead
  60. # the chord callback changes state to FAILURE with the
  61. # exception set to ChordError.
  62. propagate = default_propagate if propagate is None else propagate
  63. if interval is None:
  64. interval = unlock_chord.default_retry_delay
  65. # check if the task group is ready, and if so apply the callback.
  66. deps = GroupResult(
  67. group_id,
  68. [result_from_tuple(r, app=app) for r in result],
  69. )
  70. j = deps.join_native if deps.supports_native_join else deps.join
  71. if deps.ready():
  72. callback = signature(callback, app=app)
  73. try:
  74. ret = j(propagate=propagate)
  75. except Exception as exc:
  76. try:
  77. culprit = next(deps._failed_join_report())
  78. reason = 'Dependency {0.id} raised {1!r}'.format(
  79. culprit, exc,
  80. )
  81. except StopIteration:
  82. reason = repr(exc)
  83. app._tasks[callback.task].backend.fail_from_current_stack(
  84. callback.id, exc=ChordError(reason),
  85. )
  86. else:
  87. try:
  88. callback.delay(ret)
  89. except Exception as exc:
  90. app._tasks[callback.task].backend.fail_from_current_stack(
  91. callback.id,
  92. exc=ChordError('Callback error: {0!r}'.format(exc)),
  93. )
  94. else:
  95. return unlock_chord.retry(countdown=interval,
  96. max_retries=max_retries)
  97. return unlock_chord
  98. @shared_task
  99. def add_map_task(app):
  100. from celery.canvas import signature
  101. @app.task(name='celery.map', shared=False, _force_evaluate=True)
  102. def xmap(task, it):
  103. task = signature(task, app=app).type
  104. return [task(item) for item in it]
  105. return xmap
  106. @shared_task
  107. def add_starmap_task(app):
  108. from celery.canvas import signature
  109. @app.task(name='celery.starmap', shared=False, _force_evaluate=True)
  110. def xstarmap(task, it):
  111. task = signature(task, app=app).type
  112. return [task(*item) for item in it]
  113. return xstarmap
  114. @shared_task
  115. def add_chunk_task(app):
  116. from celery.canvas import chunks as _chunks
  117. @app.task(name='celery.chunks', shared=False, _force_evaluate=True)
  118. def chunks(task, it, n):
  119. return _chunks.apply_chunks(task, it, n)
  120. return chunks
  121. @shared_task
  122. def add_group_task(app):
  123. _app = app
  124. from celery.canvas import maybe_signature, signature
  125. from celery.result import result_from_tuple
  126. class Group(app.Task):
  127. app = _app
  128. name = 'celery.group'
  129. accept_magic_kwargs = False
  130. _decorated = True
  131. def run(self, tasks, result, group_id, partial_args):
  132. app = self.app
  133. result = result_from_tuple(result, app)
  134. # any partial args are added to all tasks in the group
  135. taskit = (signature(task, app=app).clone(partial_args)
  136. for i, task in enumerate(tasks))
  137. if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
  138. return app.GroupResult(
  139. result.id,
  140. [stask.apply(group_id=group_id) for stask in taskit],
  141. )
  142. with app.producer_or_acquire() as pub:
  143. [stask.apply_async(group_id=group_id, publisher=pub,
  144. add_to_parent=False) for stask in taskit]
  145. parent = get_current_worker_task()
  146. if parent:
  147. parent.add_trail(result)
  148. return result
  149. def prepare(self, options, tasks, args, **kwargs):
  150. options['group_id'] = group_id = (
  151. options.setdefault('task_id', uuid()))
  152. def prepare_member(task):
  153. task = maybe_signature(task, app=self.app)
  154. task.options['group_id'] = group_id
  155. return task, task.freeze()
  156. try:
  157. tasks, res = list(zip(
  158. *[prepare_member(task) for task in tasks]
  159. ))
  160. except ValueError: # tasks empty
  161. tasks, res = [], []
  162. return (tasks, self.app.GroupResult(group_id, res), group_id, args)
  163. def apply_async(self, partial_args=(), kwargs={}, **options):
  164. if self.app.conf.CELERY_ALWAYS_EAGER:
  165. return self.apply(partial_args, kwargs, **options)
  166. tasks, result, gid, args = self.prepare(
  167. options, args=partial_args, **kwargs
  168. )
  169. super(Group, self).apply_async((
  170. list(tasks), result.as_tuple(), gid, args), **options
  171. )
  172. return result
  173. def apply(self, args=(), kwargs={}, **options):
  174. return super(Group, self).apply(
  175. self.prepare(options, args=args, **kwargs),
  176. **options).get()
  177. return Group
  178. @shared_task
  179. def add_chain_task(app):
  180. from celery.canvas import Signature, chord, group, maybe_signature
  181. _app = app
  182. class Chain(app.Task):
  183. app = _app
  184. name = 'celery.chain'
  185. accept_magic_kwargs = False
  186. _decorated = True
  187. def prepare_steps(self, args, tasks):
  188. app = self.app
  189. steps = deque(tasks)
  190. next_step = prev_task = prev_res = None
  191. tasks, results = [], []
  192. i = 0
  193. while steps:
  194. # First task get partial args from chain.
  195. task = maybe_signature(steps.popleft(), app=app)
  196. task = task.clone() if i else task.clone(args)
  197. res = task.freeze()
  198. i += 1
  199. if isinstance(task, group) and steps and \
  200. not isinstance(steps[0], group):
  201. # automatically upgrade group(..) | s to chord(group, s)
  202. try:
  203. next_step = steps.popleft()
  204. # for chords we freeze by pretending it's a normal
  205. # task instead of a group.
  206. res = Signature.freeze(next_step)
  207. task = chord(task, body=next_step, task_id=res.task_id)
  208. except IndexError:
  209. pass # no callback, so keep as group
  210. if prev_task:
  211. # link previous task to this task.
  212. prev_task.link(task)
  213. # set the results parent attribute.
  214. if not res.parent:
  215. res.parent = prev_res
  216. if not isinstance(prev_task, chord):
  217. results.append(res)
  218. tasks.append(task)
  219. prev_task, prev_res = task, res
  220. return tasks, results
  221. def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
  222. task_id=None, link=None, link_error=None, **options):
  223. if self.app.conf.CELERY_ALWAYS_EAGER:
  224. return self.apply(args, kwargs, **options)
  225. options.pop('publisher', None)
  226. tasks, results = self.prepare_steps(args, kwargs['tasks'])
  227. result = results[-1]
  228. if group_id:
  229. tasks[-1].set(group_id=group_id)
  230. if chord:
  231. tasks[-1].set(chord=chord)
  232. if task_id:
  233. tasks[-1].set(task_id=task_id)
  234. result = tasks[-1].type.AsyncResult(task_id)
  235. # make sure we can do a link() and link_error() on a chain object.
  236. if link:
  237. tasks[-1].set(link=link)
  238. # and if any task in the chain fails, call the errbacks
  239. if link_error:
  240. for task in tasks:
  241. task.set(link_error=link_error)
  242. tasks[0].apply_async()
  243. return result
  244. def apply(self, args=(), kwargs={}, signature=maybe_signature,
  245. **options):
  246. app = self.app
  247. last, fargs = None, args # fargs passed to first task only
  248. for task in kwargs['tasks']:
  249. res = signature(task, app=app).clone(fargs).apply(
  250. last and (last.get(), ),
  251. )
  252. res.parent, last, fargs = last, res, None
  253. return last
  254. return Chain
  255. @shared_task
  256. def add_chord_task(app):
  257. """Every chord is executed in a dedicated task, so that the chord
  258. can be used as a signature, and this generates the task
  259. responsible for that."""
  260. from celery import group
  261. from celery.canvas import maybe_signature
  262. _app = app
  263. default_propagate = app.conf.CELERY_CHORD_PROPAGATES
  264. class Chord(app.Task):
  265. app = _app
  266. name = 'celery.chord'
  267. accept_magic_kwargs = False
  268. ignore_result = False
  269. _decorated = True
  270. def run(self, header, body, partial_args=(), interval=None,
  271. countdown=1, max_retries=None, propagate=None,
  272. eager=False, **kwargs):
  273. app = self.app
  274. propagate = default_propagate if propagate is None else propagate
  275. group_id = uuid()
  276. AsyncResult = app.AsyncResult
  277. prepare_member = self._prepare_member
  278. # - convert back to group if serialized
  279. tasks = header.tasks if isinstance(header, group) else header
  280. header = group([
  281. maybe_signature(s, app=app).clone() for s in tasks
  282. ])
  283. # - eager applies the group inline
  284. if eager:
  285. return header.apply(args=partial_args, task_id=group_id)
  286. results = [AsyncResult(prepare_member(task, body, group_id))
  287. for task in header.tasks]
  288. # - fallback implementations schedules the chord_unlock task here
  289. app.backend.on_chord_apply(group_id, body,
  290. interval=interval,
  291. countdown=countdown,
  292. max_retries=max_retries,
  293. propagate=propagate,
  294. result=results)
  295. # - call the header group, returning the GroupResult.
  296. final_res = header(*partial_args, task_id=group_id)
  297. return final_res
  298. def _prepare_member(self, task, body, group_id):
  299. opts = task.options
  300. # d.setdefault would work but generating uuid's are expensive
  301. try:
  302. task_id = opts['task_id']
  303. except KeyError:
  304. task_id = opts['task_id'] = uuid()
  305. opts.update(chord=body, group_id=group_id)
  306. return task_id
  307. def apply_async(self, args=(), kwargs={}, task_id=None,
  308. group_id=None, chord=None, **options):
  309. app = self.app
  310. if app.conf.CELERY_ALWAYS_EAGER:
  311. return self.apply(args, kwargs, **options)
  312. header = kwargs.pop('header')
  313. body = kwargs.pop('body')
  314. header, body = (list(maybe_signature(header, app=app)),
  315. maybe_signature(body, app=app))
  316. # forward certain options to body
  317. if chord is not None:
  318. body.options['chord'] = chord
  319. if group_id is not None:
  320. body.options['group_id'] = group_id
  321. [body.link(s) for s in options.pop('link', [])]
  322. [body.link_error(s) for s in options.pop('link_error', [])]
  323. body_result = body.freeze(task_id)
  324. parent = super(Chord, self).apply_async((header, body, args),
  325. kwargs, **options)
  326. body_result.parent = parent
  327. return body_result
  328. def apply(self, args=(), kwargs={}, propagate=True, **options):
  329. body = kwargs['body']
  330. res = super(Chord, self).apply(args, dict(kwargs, eager=True),
  331. **options)
  332. return maybe_signature(body, app=self.app).apply(
  333. args=(res.get(propagate=propagate).get(), ))
  334. return Chord