job.py 20 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.job
  4. ~~~~~~~~~~~~~~~~~
  5. This module defines the :class:`Request` class,
  6. which specifies how tasks are executed.
  7. """
  8. from __future__ import absolute_import, unicode_literals
  9. import logging
  10. import socket
  11. import sys
  12. from billiard.einfo import ExceptionInfo
  13. from datetime import datetime
  14. from weakref import ref
  15. from kombu.utils import kwdict, reprcall
  16. from kombu.utils.encoding import safe_repr, safe_str
  17. from celery import signals
  18. from celery.app.trace import trace_task, trace_task_ret
  19. from celery.exceptions import (
  20. Ignore, TaskRevokedError, InvalidTaskError,
  21. SoftTimeLimitExceeded, TimeLimitExceeded,
  22. WorkerLostError, Terminated, Retry, Reject,
  23. )
  24. from celery.five import items, monotonic, string_t
  25. from celery.platforms import signals as _signals
  26. from celery.utils import fun_takes_kwargs
  27. from celery.utils.functional import noop
  28. from celery.utils.log import get_logger
  29. from celery.utils.serialization import get_pickled_exception
  30. from celery.utils.text import truncate
  31. from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
  32. from . import state
  33. __all__ = ['Request']
  34. IS_PYPY = hasattr(sys, 'pypy_version_info')
  35. logger = get_logger(__name__)
  36. debug, info, warn, error = (logger.debug, logger.info,
  37. logger.warning, logger.error)
  38. _does_info = False
  39. _does_debug = False
  40. def __optimize__():
  41. # this is also called by celery.app.trace.setup_worker_optimizations
  42. global _does_debug
  43. global _does_info
  44. _does_debug = logger.isEnabledFor(logging.DEBUG)
  45. _does_info = logger.isEnabledFor(logging.INFO)
  46. __optimize__()
  47. # Localize
  48. tz_utc = timezone.utc
  49. tz_or_local = timezone.tz_or_local
  50. send_revoked = signals.task_revoked.send
  51. task_accepted = state.task_accepted
  52. task_ready = state.task_ready
  53. revoked_tasks = state.revoked
  54. NEEDS_KWDICT = sys.version_info <= (2, 6)
  55. #: Use when no message object passed to :class:`Request`.
  56. DEFAULT_FIELDS = {
  57. 'headers': None,
  58. 'reply_to': None,
  59. 'correlation_id': None,
  60. 'delivery_info': {
  61. 'exchange': None,
  62. 'routing_key': None,
  63. 'priority': 0,
  64. 'redelivered': False,
  65. },
  66. }
  67. class Request(object):
  68. """A request for task execution."""
  69. if not IS_PYPY: # pragma: no cover
  70. __slots__ = (
  71. 'app', 'name', 'id', 'args', 'kwargs', 'on_ack',
  72. 'hostname', 'eventer', 'connection_errors', 'task', 'eta',
  73. 'expires', 'request_dict', 'acknowledged', 'on_reject',
  74. 'utc', 'time_start', 'worker_pid', '_already_revoked',
  75. '_terminate_on_ack', '_apply_result',
  76. '_tzlocal', '__weakref__', '__dict__',
  77. )
  78. #: Format string used to log task success.
  79. success_msg = """\
  80. Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s
  81. """
  82. #: Format string used to log task failure.
  83. error_msg = """\
  84. Task %(name)s[%(id)s] raised exception: %(exc)s
  85. """
  86. #: Format string used to log internal error.
  87. internal_error_msg = """\
  88. Task %(name)s[%(id)s] INTERNAL ERROR: %(exc)s
  89. """
  90. ignored_msg = """\
  91. Task %(name)s[%(id)s] ignored
  92. """
  93. rejected_msg = """\
  94. Task %(name)s[%(id)s] %(exc)s
  95. """
  96. #: Format string used to log task retry.
  97. retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
  98. def __init__(self, body, on_ack=noop,
  99. hostname=None, eventer=None, app=None,
  100. connection_errors=None, request_dict=None,
  101. message=None, task=None, on_reject=noop, **opts):
  102. self.app = app
  103. name = self.name = body['task']
  104. self.id = body['id']
  105. self.args = body.get('args', [])
  106. self.kwargs = body.get('kwargs', {})
  107. try:
  108. self.kwargs.items
  109. except AttributeError:
  110. raise InvalidTaskError(
  111. 'Task keyword arguments is not a mapping')
  112. if NEEDS_KWDICT:
  113. self.kwargs = kwdict(self.kwargs)
  114. eta = body.get('eta')
  115. expires = body.get('expires')
  116. utc = self.utc = body.get('utc', False)
  117. self.on_ack = on_ack
  118. self.on_reject = on_reject
  119. self.hostname = hostname or socket.gethostname()
  120. self.eventer = eventer
  121. self.connection_errors = connection_errors or ()
  122. self.task = task or self.app.tasks[name]
  123. self.acknowledged = self._already_revoked = False
  124. self.time_start = self.worker_pid = self._terminate_on_ack = None
  125. self._apply_result = None
  126. self._tzlocal = None
  127. # timezone means the message is timezone-aware, and the only timezone
  128. # supported at this point is UTC.
  129. if eta is not None:
  130. try:
  131. self.eta = maybe_iso8601(eta)
  132. except (AttributeError, ValueError, TypeError) as exc:
  133. raise InvalidTaskError(
  134. 'invalid eta value {0!r}: {1}'.format(eta, exc))
  135. if utc:
  136. self.eta = maybe_make_aware(self.eta, self.tzlocal)
  137. else:
  138. self.eta = None
  139. if expires is not None:
  140. try:
  141. self.expires = maybe_iso8601(expires)
  142. except (AttributeError, ValueError, TypeError) as exc:
  143. raise InvalidTaskError(
  144. 'invalid expires value {0!r}: {1}'.format(expires, exc))
  145. if utc:
  146. self.expires = maybe_make_aware(self.expires, self.tzlocal)
  147. else:
  148. self.expires = None
  149. if message:
  150. delivery_info = message.delivery_info or {}
  151. properties = message.properties or {}
  152. body.update({
  153. 'headers': message.headers,
  154. 'reply_to': properties.get('reply_to'),
  155. 'correlation_id': properties.get('correlation_id'),
  156. 'delivery_info': {
  157. 'exchange': delivery_info.get('exchange'),
  158. 'routing_key': delivery_info.get('routing_key'),
  159. 'priority': delivery_info.get('priority'),
  160. 'redelivered': delivery_info.get('redelivered'),
  161. }
  162. })
  163. else:
  164. body.update(DEFAULT_FIELDS)
  165. self.request_dict = body
  166. @property
  167. def delivery_info(self):
  168. return self.request_dict['delivery_info']
  169. def extend_with_default_kwargs(self):
  170. """Extend the tasks keyword arguments with standard task arguments.
  171. Currently these are `logfile`, `loglevel`, `task_id`,
  172. `task_name`, `task_retries`, and `delivery_info`.
  173. See :meth:`celery.task.base.Task.run` for more information.
  174. Magic keyword arguments are deprecated and will be removed
  175. in version 4.0.
  176. """
  177. kwargs = dict(self.kwargs)
  178. default_kwargs = {'logfile': None, # deprecated
  179. 'loglevel': None, # deprecated
  180. 'task_id': self.id,
  181. 'task_name': self.name,
  182. 'task_retries': self.request_dict.get('retries', 0),
  183. 'task_is_eager': False,
  184. 'delivery_info': self.delivery_info}
  185. fun = self.task.run
  186. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  187. extend_with = dict((key, val) for key, val in items(default_kwargs)
  188. if key in supported_keys)
  189. kwargs.update(extend_with)
  190. return kwargs
  191. def execute_using_pool(self, pool, **kwargs):
  192. """Used by the worker to send this task to the pool.
  193. :param pool: A :class:`celery.concurrency.base.TaskPool` instance.
  194. :raises celery.exceptions.TaskRevokedError: if the task was revoked
  195. and ignored.
  196. """
  197. uuid = self.id
  198. task = self.task
  199. if self.revoked():
  200. raise TaskRevokedError(uuid)
  201. hostname = self.hostname
  202. kwargs = self.kwargs
  203. if task.accept_magic_kwargs:
  204. kwargs = self.extend_with_default_kwargs()
  205. request = self.request_dict
  206. request.update({'hostname': hostname, 'is_eager': False,
  207. 'delivery_info': self.delivery_info,
  208. 'group': self.request_dict.get('taskset')})
  209. timeout, soft_timeout = request.get('timelimit', (None, None))
  210. timeout = timeout or task.time_limit
  211. soft_timeout = soft_timeout or task.soft_time_limit
  212. result = pool.apply_async(
  213. trace_task_ret,
  214. args=(self.name, uuid, self.args, kwargs, request),
  215. accept_callback=self.on_accepted,
  216. timeout_callback=self.on_timeout,
  217. callback=self.on_success,
  218. error_callback=self.on_failure,
  219. soft_timeout=soft_timeout,
  220. timeout=timeout,
  221. correlation_id=uuid,
  222. )
  223. # cannot create weakref to None
  224. self._apply_result = ref(result) if result is not None else result
  225. return result
  226. def execute(self, loglevel=None, logfile=None):
  227. """Execute the task in a :func:`~celery.app.trace.trace_task`.
  228. :keyword loglevel: The loglevel used by the task.
  229. :keyword logfile: The logfile used by the task.
  230. """
  231. if self.revoked():
  232. return
  233. # acknowledge task as being processed.
  234. if not self.task.acks_late:
  235. self.acknowledge()
  236. kwargs = self.kwargs
  237. if self.task.accept_magic_kwargs:
  238. kwargs = self.extend_with_default_kwargs()
  239. request = self.request_dict
  240. request.update({'loglevel': loglevel, 'logfile': logfile,
  241. 'hostname': self.hostname, 'is_eager': False,
  242. 'delivery_info': self.delivery_info})
  243. retval = trace_task(self.task, self.id, self.args, kwargs, request,
  244. **{'hostname': self.hostname,
  245. 'loader': self.app.loader})
  246. self.acknowledge()
  247. return retval
  248. def maybe_expire(self):
  249. """If expired, mark the task as revoked."""
  250. if self.expires:
  251. now = datetime.now(tz_or_local(self.tzlocal) if self.utc else None)
  252. if now > self.expires:
  253. revoked_tasks.add(self.id)
  254. return True
  255. def terminate(self, pool, signal=None):
  256. signal = _signals.signum(signal or 'TERM')
  257. if self.time_start:
  258. pool.terminate_job(self.worker_pid, signal)
  259. self._announce_revoked('terminated', True, signal, False)
  260. else:
  261. self._terminate_on_ack = pool, signal
  262. if self._apply_result is not None:
  263. obj = self._apply_result() # is a weakref
  264. if obj is not None:
  265. obj.terminate(signal)
  266. def _announce_revoked(self, reason, terminated, signum, expired):
  267. task_ready(self)
  268. self.send_event('task-revoked',
  269. terminated=terminated, signum=signum, expired=expired)
  270. if self.store_errors:
  271. self.task.backend.mark_as_revoked(self.id, reason, request=self)
  272. self.acknowledge()
  273. self._already_revoked = True
  274. send_revoked(self.task, request=self,
  275. terminated=terminated, signum=signum, expired=expired)
  276. def revoked(self):
  277. """If revoked, skip task and mark state."""
  278. expired = False
  279. if self._already_revoked:
  280. return True
  281. if self.expires:
  282. expired = self.maybe_expire()
  283. if self.id in revoked_tasks:
  284. info('Discarding revoked task: %s[%s]', self.name, self.id)
  285. self._announce_revoked(
  286. 'expired' if expired else 'revoked', False, None, expired,
  287. )
  288. return True
  289. return False
  290. def send_event(self, type, **fields):
  291. if self.eventer and self.eventer.enabled:
  292. self.eventer.send(type, uuid=self.id, **fields)
  293. def on_accepted(self, pid, time_accepted):
  294. """Handler called when task is accepted by worker pool."""
  295. self.worker_pid = pid
  296. self.time_start = time_accepted
  297. task_accepted(self)
  298. if not self.task.acks_late:
  299. self.acknowledge()
  300. self.send_event('task-started')
  301. if _does_debug:
  302. debug('Task accepted: %s[%s] pid:%r', self.name, self.id, pid)
  303. if self._terminate_on_ack is not None:
  304. self.terminate(*self._terminate_on_ack)
  305. def on_timeout(self, soft, timeout):
  306. """Handler called if the task times out."""
  307. task_ready(self)
  308. if soft:
  309. warn('Soft time limit (%ss) exceeded for %s[%s]',
  310. timeout, self.name, self.id)
  311. exc = SoftTimeLimitExceeded(timeout)
  312. else:
  313. error('Hard time limit (%ss) exceeded for %s[%s]',
  314. timeout, self.name, self.id)
  315. exc = TimeLimitExceeded(timeout)
  316. if self.store_errors:
  317. self.task.backend.mark_as_failure(self.id, exc, request=self)
  318. def on_success(self, ret_value, now=None, nowfun=monotonic):
  319. """Handler called if the task was successfully processed."""
  320. if isinstance(ret_value, ExceptionInfo):
  321. if isinstance(ret_value.exception, (
  322. SystemExit, KeyboardInterrupt)):
  323. raise ret_value.exception
  324. return self.on_failure(ret_value)
  325. task_ready(self)
  326. if self.task.acks_late:
  327. self.acknowledge()
  328. if self.eventer and self.eventer.enabled:
  329. now = nowfun()
  330. runtime = self.time_start and (now - self.time_start) or 0
  331. self.send_event('task-succeeded',
  332. result=safe_repr(ret_value), runtime=runtime)
  333. if _does_info:
  334. now = now or nowfun()
  335. runtime = self.time_start and (now - self.time_start) or 0
  336. info(self.success_msg.strip(), {
  337. 'id': self.id, 'name': self.name,
  338. 'return_value': self.repr_result(ret_value),
  339. 'runtime': runtime})
  340. def on_retry(self, exc_info):
  341. """Handler called if the task should be retried."""
  342. if self.task.acks_late:
  343. self.acknowledge()
  344. self.send_event('task-retried',
  345. exception=safe_repr(exc_info.exception.exc),
  346. traceback=safe_str(exc_info.traceback))
  347. if _does_info:
  348. info(self.retry_msg.strip(),
  349. {'id': self.id, 'name': self.name,
  350. 'exc': exc_info.exception})
  351. def on_failure(self, exc_info):
  352. """Handler called if the task raised an exception."""
  353. task_ready(self)
  354. send_failed_event = True
  355. if not exc_info.internal:
  356. exc = exc_info.exception
  357. if isinstance(exc, Retry):
  358. return self.on_retry(exc_info)
  359. # These are special cases where the process would not have had
  360. # time to write the result.
  361. if self.store_errors:
  362. if isinstance(exc, WorkerLostError):
  363. self.task.backend.mark_as_failure(
  364. self.id, exc, request=self,
  365. )
  366. elif isinstance(exc, Terminated):
  367. self._announce_revoked('terminated', True, str(exc), False)
  368. send_failed_event = False # already sent revoked event
  369. # (acks_late) acknowledge after result stored.
  370. if self.task.acks_late:
  371. self.acknowledge()
  372. self._log_error(exc_info, send_failed_event=send_failed_event)
  373. def _log_error(self, einfo, send_failed_event=True):
  374. einfo.exception = get_pickled_exception(einfo.exception)
  375. exception, traceback, exc_info, internal, sargs, skwargs = (
  376. safe_repr(einfo.exception),
  377. safe_str(einfo.traceback),
  378. einfo.exc_info,
  379. einfo.internal,
  380. safe_repr(self.args),
  381. safe_repr(self.kwargs),
  382. )
  383. format = self.error_msg
  384. description = 'raised exception'
  385. severity = logging.ERROR
  386. if send_failed_event:
  387. self.send_event(
  388. 'task-failed', exception=exception, traceback=traceback,
  389. )
  390. if internal:
  391. if isinstance(einfo.exception, MemoryError):
  392. raise MemoryError('Process got: %s' % (einfo.exception, ))
  393. elif isinstance(einfo.exception, Reject):
  394. format = self.rejected_msg
  395. description = 'rejected'
  396. severity = logging.WARN
  397. exc_info = einfo
  398. self.reject(requeue=einfo.exception.requeue)
  399. elif isinstance(einfo.exception, Ignore):
  400. format = self.ignored_msg
  401. description = 'ignored'
  402. severity = logging.INFO
  403. exc_info = None
  404. self.acknowledge()
  405. else:
  406. format = self.internal_error_msg
  407. description = 'INTERNAL ERROR'
  408. severity = logging.CRITICAL
  409. context = {
  410. 'hostname': self.hostname,
  411. 'id': self.id,
  412. 'name': self.name,
  413. 'exc': exception,
  414. 'traceback': traceback,
  415. 'args': sargs,
  416. 'kwargs': skwargs,
  417. 'description': description,
  418. }
  419. logger.log(severity, format.strip(), context,
  420. exc_info=exc_info,
  421. extra={'data': {'id': self.id,
  422. 'name': self.name,
  423. 'args': sargs,
  424. 'kwargs': skwargs,
  425. 'hostname': self.hostname,
  426. 'internal': internal}})
  427. self.task.send_error_email(context, einfo.exception)
  428. def acknowledge(self):
  429. """Acknowledge task."""
  430. if not self.acknowledged:
  431. self.on_ack(logger, self.connection_errors)
  432. self.acknowledged = True
  433. def reject(self, requeue=False):
  434. if not self.acknowledged:
  435. self.on_reject(logger, self.connection_errors, requeue)
  436. self.acknowledged = True
  437. def repr_result(self, result, maxlen=46):
  438. # 46 is the length needed to fit
  439. # 'the quick brown fox jumps over the lazy dog' :)
  440. if not isinstance(result, string_t):
  441. result = safe_repr(result)
  442. return truncate(result) if len(result) > maxlen else result
  443. def info(self, safe=False):
  444. return {'id': self.id,
  445. 'name': self.name,
  446. 'args': self.args if safe else safe_repr(self.args),
  447. 'kwargs': self.kwargs if safe else safe_repr(self.kwargs),
  448. 'hostname': self.hostname,
  449. 'time_start': self.time_start,
  450. 'acknowledged': self.acknowledged,
  451. 'delivery_info': self.delivery_info,
  452. 'worker_pid': self.worker_pid}
  453. def __str__(self):
  454. return '{0.name}[{0.id}]{1}{2}'.format(self,
  455. ' eta:[{0}]'.format(self.eta) if self.eta else '',
  456. ' expires:[{0}]'.format(self.expires) if self.expires else '')
  457. shortinfo = __str__
  458. def __repr__(self):
  459. return '<{0} {1}: {2}>'.format(
  460. type(self).__name__, self.id,
  461. reprcall(self.name, self.args, self.kwargs))
  462. @property
  463. def tzlocal(self):
  464. if self._tzlocal is None:
  465. self._tzlocal = self.app.conf.CELERY_TIMEZONE
  466. return self._tzlocal
  467. @property
  468. def store_errors(self):
  469. return (not self.task.ignore_result
  470. or self.task.store_errors_even_if_ignored)
  471. @property
  472. def task_id(self):
  473. # XXX compat
  474. return self.id
  475. @task_id.setter # noqa
  476. def task_id(self, value):
  477. self.id = value
  478. @property
  479. def task_name(self):
  480. # XXX compat
  481. return self.name
  482. @task_name.setter # noqa
  483. def task_name(self, value):
  484. self.name = value
  485. @property
  486. def reply_to(self):
  487. # used by rpc backend when failures reported by parent process
  488. return self.request_dict['reply_to']
  489. @property
  490. def correlation_id(self):
  491. # used similarly to reply_to
  492. return self.request_dict['correlation_id']