__init__.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. 主要是投掷于celery的一系列任务
  5. ~~~~~~~~~
  6. """
  7. import datetime
  8. import logging
  9. from celery import Task
  10. from typing import Optional
  11. logger = logging.getLogger(__name__)
  12. class TraceTask(Task):
  13. abstract = True
  14. def __init__(self):
  15. self.offline_task = False
  16. def on_success(self, retval, task_id, args, kwargs):
  17. """Success handler.
  18. Run by the worker if the task executes successfully.
  19. :param retval: The return value of the task.
  20. :param task_id: Unique id of the executed task.
  21. :param args: Original arguments for the executed task.
  22. :param kwargs: Original keyword arguments for the executed task.
  23. The return value of this handler is ignored.
  24. """
  25. logger.debug('on_success. task_id = {}, retval = {}'.format(task_id, retval))
  26. def on_retry(self, exc, task_id, args, kwargs, einfo):
  27. """Retry handler.
  28. This is run by the worker when the task is to be retried.
  29. :param exc: The exception sent to :meth:`retry`.
  30. :param task_id: Unique id of the retried task.
  31. :param args: Original arguments for the retried task.
  32. :param kwargs: Original keyword arguments for the retried task.
  33. :keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
  34. instance, containing the traceback.
  35. The return value of this handler is ignored.
  36. """
  37. logger.debug('on_retry. task_id = {}'.format(task_id))
  38. def on_failure(self, exc, task_id, args, kwargs, einfo):
  39. """Error handler.
  40. This is run by the worker when the task fails.
  41. :param exc: The exception raised by the task.
  42. :param task_id: Unique id of the failed task.
  43. :param args: Original arguments for the task that failed.
  44. :param kwargs: Original keyword arguments for the task
  45. that failed.
  46. :keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
  47. instance, containing the traceback.
  48. The return value of this handler is ignored.
  49. """
  50. logger.error(
  51. 'on_failure. exc = {}, task_id = {}, args = {}, kwargs = {}, einfo = {}'.format(
  52. exc, task_id, args, kwargs, einfo))
  53. def after_return(self, status, retval, task_id, args, kwargs, einfo):
  54. """Handler called after the task returns.
  55. :param status: Current task state.
  56. :param retval: Task return value/exception.
  57. :param task_id: Unique id of the task.
  58. :param args: Original arguments for the task that failed.
  59. :param kwargs: Original keyword arguments for the task
  60. that failed.
  61. :keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
  62. instance, containing the traceback (if any).
  63. The return value of this handler is ignored.
  64. """
  65. logger.debug(
  66. 'after_return. task_id = {}, status = {}, args = {}, kwargs = {}, exception = {}, retval = {}, headers = {}'.format(
  67. task_id, status, str(args), str(kwargs), str(einfo), str(retval), self.request.headers))
  68. if 'offline_task_id' in self.request.headers and self.request.headers['offline_task_id']:
  69. from apps.web.core.models import OfflineTask
  70. offline_task_id = str(self.request.headers['offline_task_id'])
  71. offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask]
  72. if not offline_task:
  73. logger.error('no such task id({})'.format(offline_task_id))
  74. else:
  75. updated = offline_task.update(celery_task_id = task_id,
  76. status = status,
  77. finishedTime = datetime.datetime.now())
  78. if not updated:
  79. logger.error(u'update offline task(id=%s) failed.' % (offline_task_id,))
  80. else:
  81. logger.debug(u'update offline task(id=%s) success.' % (offline_task_id,))