# -*- coding: utf-8 -*- # !/usr/bin/env python """ 为了让任务模块不影响基本业务,需要做统一的处理 """ import logging import datetime import os import time from django.conf import settings from typing import TYPE_CHECKING, Optional import simplejson as json from celery.local import PromiseProxy from apps.thirdparties.dingding import DingDingRobot from apps.web.core.utils import async_operation # noinspection PyUnresolvedReferences from taskmanager.tasks import ( report_feedback_to_dealer_via_wechat, report_new_payment_to_dealer_via_wechat, report_to_user_via_wechat, send_msg_to_user_via_wechat, report_to_dealer_via_wechat, whale_withdraw_order_alert, withdraw_error_alert, send_topic_command, poll_dealer_recharge_record, poll_user_recharge_record, report_to_user_low_power, turn_on_power_huan_dian_gui, test_sync, check_withdraw_via_bank, generate_ad_excel_report, generate_business_stats_report_by_dealer, export_consume_order_excel_from_db, export_API_order_excel_from_db, export_on_points_order_excel_from_db, export_send_coins_to_card_order_excel_from_db, export_charge_order_excel_from_db, export_group_stat_excel_from_db, export_vcard_info_excel_from_db, export_group_user_account_excel_form_db, generate_simCharge_excel_report, generate_dealerWithDraw_excel_report, generate_biz_stats_for_manager, import_simcard_excel_to_db, export_simcard_excel_from_db, export_device_excel_from_db, manager_export_charge_order_excel_from_db, manager_export_consume_order_excel_from_db, manager_export_dealer_info_excel_from_db, export_aggregate_dealer_income, device_offline_notify, batch_set_device_params, set_device_params, batch_set_server_settings, set_server_settings, send_to_xf_all_dev_info, send_to_xf_falut, send_to_xf_fault_handle, notify_insurance_order_subscribe, notify_insurance_order_cancel, push_shanghai_platform_heatbeat, export_modify_customer_balance_record_excel_from_db, ) logger = logging.getLogger(__name__) if TYPE_CHECKING: from taskmanager import TraceTask _task_registry = {k: v for k, v in globals().items() if isinstance(v, PromiseProxy)} # type: dict def task_caller(func_name, delay = None, expires = None, offline_task_id = None, **kwargs): """ 任务中继器,为了不影响主线任务运作 注意做了保护, 如果rabbitmq锁死, 会暂时停止 所以只能提交可以允许忽略的任务 :param func_name: :param delay: :param expires: :param offline_task_id: :param kwargs: :return: """ if settings.NO_CELERY_TASK: logger.info('[task_caller] NO_CELERY_TASK') return if settings.DEBUG_CELERY_TASK_ROUTINE: try: f = _task_registry.get(func_name) # type: TraceTask logger.info('[task_caller] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs))) async_operation(f, **kwargs) except Exception as e: logger.exception('[task_caller] error=%s' % (e,)) else: from apps.web.core.models import SystemSettings, OfflineTask if SystemSettings.get_system_setting("DISABLE_CELERY", False): logger.info('[task_caller] function(name={}) result=no publish.'.format(func_name)) if offline_task_id: offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask] if not offline_task: logger.error('no such task id({})'.format(offline_task_id)) else: updated = offline_task.update(celery_task_id = offline_task_id, status = 'FAILURE', finishedTime = datetime.datetime.now()) if not updated: logger.error(u'update offline task(id=%s) failed.' % (offline_task_id,)) else: logger.debug(u'update offline task(id=%s) success.' % (offline_task_id,)) else: if settings.CELERY_PUBLISH_METHOD == 'gevent': try: import gevent gevent.spawn(celery_task_impl, func_name, delay, expires, offline_task_id, **kwargs) except Exception as e: logger.exception(e) elif settings.CELERY_PUBLISH_METHOD == 'thread': async_operation(celery_task_impl, func_name, delay, expires, offline_task_id, **kwargs) else: celery_task_impl(func_name, delay, expires, offline_task_id, **kwargs) def celery_task_impl(func_name, delay = None, expires = None, offline_task_id = None, **kwargs): start = time.time() try: f = _task_registry.get(func_name) # type: TraceTask logger.info('[task_caller] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs))) now_time = datetime.datetime.utcnow() if delay: eta = now_time + datetime.timedelta(seconds = delay) else: eta = None if expires: expires = now_time + datetime.timedelta(seconds = expires) else: expires = None if offline_task_id: headers = {'offline_task_id': offline_task_id} result = f.s(**kwargs).apply_async(eta = eta, expires = expires, ignore_result = True, headers = headers) # from apps.web.core.models import SystemSettings, OfflineTask # offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask] # offline_task.update(celery_task_id = result) else: result = f.s(**kwargs).apply_async(eta = eta, expires = expires, ignore_result = True) logger.info('[task_caller] function(name=%s) result=%s' % (f.__name__, result)) except Exception as e: logger.exception('[task_caller] error=%s' % (e,)) finally: end = time.time() over_time = (end - start) if over_time > 5: logger.warning('[task_caller] function(name=%s) result=exceed %s seconds.' % (func_name, over_time)) if over_time > 30: logger.warning('[task_caller] function(name=%s) result=exceed 30 seconds to disable.' % func_name) from apps.web.core.models import SystemSettings SystemSettings.set_mem_setting("DISABLE_CELERY", True) DingDingRobot().send_msg( msg = u'[CELERY告警({})] 发布任务时间超过30秒,暂时屏蔽所有发布。请检查RABBITMQ是否正常。'.format(os.environ.get('MY_ENV'))) def task_caller_once(func_name, routing_key, queue, **kwargs): """ 任务中继器,为了不影响主线任务运作 :param queue: :param routing_key: :param func_name: :param kwargs: :return: """ try: f = _task_registry.get(func_name) # type: TraceTask logger.info('[task_caller_once] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs))) result = f.s(**kwargs).apply_async(routing_key = routing_key, queue = queue) logger.info('[task_caller_once] function(name=%s) result=%s' % (f.__name__, result)) except Exception as e: logger.error('[task_caller_once] error=%s' % (e,))