# -*- coding: utf-8 -*- # !/usr/bin/env python """ 主要是投掷于celery的一系列任务 ~~~~~~~~~ """ import datetime import logging from celery import Task from typing import Optional logger = logging.getLogger(__name__) class TraceTask(Task): abstract = True def __init__(self): self.offline_task = False def on_success(self, retval, task_id, args, kwargs): """Success handler. Run by the worker if the task executes successfully. :param retval: The return value of the task. :param task_id: Unique id of the executed task. :param args: Original arguments for the executed task. :param kwargs: Original keyword arguments for the executed task. The return value of this handler is ignored. """ logger.debug('on_success. task_id = {}, retval = {}'.format(task_id, retval)) def on_retry(self, exc, task_id, args, kwargs, einfo): """Retry handler. This is run by the worker when the task is to be retried. :param exc: The exception sent to :meth:`retry`. :param task_id: Unique id of the retried task. :param args: Original arguments for the retried task. :param kwargs: Original keyword arguments for the retried task. :keyword einfo: :class:`~billiard.einfo.ExceptionInfo` instance, containing the traceback. The return value of this handler is ignored. """ logger.debug('on_retry. task_id = {}'.format(task_id)) def on_failure(self, exc, task_id, args, kwargs, einfo): """Error handler. This is run by the worker when the task fails. :param exc: The exception raised by the task. :param task_id: Unique id of the failed task. :param args: Original arguments for the task that failed. :param kwargs: Original keyword arguments for the task that failed. :keyword einfo: :class:`~billiard.einfo.ExceptionInfo` instance, containing the traceback. The return value of this handler is ignored. """ logger.error( 'on_failure. exc = {}, task_id = {}, args = {}, kwargs = {}, einfo = {}'.format( exc, task_id, args, kwargs, einfo)) def after_return(self, status, retval, task_id, args, kwargs, einfo): """Handler called after the task returns. :param status: Current task state. :param retval: Task return value/exception. :param task_id: Unique id of the task. :param args: Original arguments for the task that failed. :param kwargs: Original keyword arguments for the task that failed. :keyword einfo: :class:`~billiard.einfo.ExceptionInfo` instance, containing the traceback (if any). The return value of this handler is ignored. """ logger.debug( 'after_return. task_id = {}, status = {}, args = {}, kwargs = {}, exception = {}, retval = {}, headers = {}'.format( task_id, status, str(args), str(kwargs), str(einfo), str(retval), self.request.headers)) if 'offline_task_id' in self.request.headers and self.request.headers['offline_task_id']: from apps.web.core.models import OfflineTask offline_task_id = str(self.request.headers['offline_task_id']) offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask] if not offline_task: logger.error('no such task id({})'.format(offline_task_id)) else: updated = offline_task.update(celery_task_id = task_id, status = status, finishedTime = datetime.datetime.now()) if not updated: logger.error(u'update offline task(id=%s) failed.' % (offline_task_id,)) else: logger.debug(u'update offline task(id=%s) success.' % (offline_task_id,))