123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """
- 主要是投掷于celery的一系列任务
- ~~~~~~~~~
- """
- import datetime
- import logging
- from celery import Task
- from typing import Optional
- logger = logging.getLogger(__name__)
- class TraceTask(Task):
- abstract = True
- def __init__(self):
- self.offline_task = False
- def on_success(self, retval, task_id, args, kwargs):
- """Success handler.
- Run by the worker if the task executes successfully.
- :param retval: The return value of the task.
- :param task_id: Unique id of the executed task.
- :param args: Original arguments for the executed task.
- :param kwargs: Original keyword arguments for the executed task.
- The return value of this handler is ignored.
- """
- logger.debug('on_success. task_id = {}, retval = {}'.format(task_id, retval))
- def on_retry(self, exc, task_id, args, kwargs, einfo):
- """Retry handler.
- This is run by the worker when the task is to be retried.
- :param exc: The exception sent to :meth:`retry`.
- :param task_id: Unique id of the retried task.
- :param args: Original arguments for the retried task.
- :param kwargs: Original keyword arguments for the retried task.
- :keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
- instance, containing the traceback.
- The return value of this handler is ignored.
- """
- logger.debug('on_retry. task_id = {}'.format(task_id))
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- """Error handler.
- This is run by the worker when the task fails.
- :param exc: The exception raised by the task.
- :param task_id: Unique id of the failed task.
- :param args: Original arguments for the task that failed.
- :param kwargs: Original keyword arguments for the task
- that failed.
- :keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
- instance, containing the traceback.
- The return value of this handler is ignored.
- """
- logger.error(
- 'on_failure. exc = {}, task_id = {}, args = {}, kwargs = {}, einfo = {}'.format(
- exc, task_id, args, kwargs, einfo))
- def after_return(self, status, retval, task_id, args, kwargs, einfo):
- """Handler called after the task returns.
- :param status: Current task state.
- :param retval: Task return value/exception.
- :param task_id: Unique id of the task.
- :param args: Original arguments for the task that failed.
- :param kwargs: Original keyword arguments for the task
- that failed.
- :keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
- instance, containing the traceback (if any).
- The return value of this handler is ignored.
- """
- logger.debug(
- 'after_return. task_id = {}, status = {}, args = {}, kwargs = {}, exception = {}, retval = {}, headers = {}'.format(
- task_id, status, str(args), str(kwargs), str(einfo), str(retval), self.request.headers))
- if 'offline_task_id' in self.request.headers and self.request.headers['offline_task_id']:
- from apps.web.core.models import OfflineTask
- offline_task_id = str(self.request.headers['offline_task_id'])
- offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask]
- if not offline_task:
- logger.error('no such task id({})'.format(offline_task_id))
- else:
- updated = offline_task.update(celery_task_id = task_id,
- status = status,
- finishedTime = datetime.datetime.now())
- if not updated:
- logger.error(u'update offline task(id=%s) failed.' % (offline_task_id,))
- else:
- logger.debug(u'update offline task(id=%s) success.' % (offline_task_id,))
|