123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """
- 为了让任务模块不影响基本业务,需要做统一的处理
- """
- import logging
- import datetime
- import os
- import time
- from django.conf import settings
- from typing import TYPE_CHECKING, Optional
- import simplejson as json
- from celery.local import PromiseProxy
- from apps.thirdparties.dingding import DingDingRobot
- from apps.web.core.utils import async_operation
- # noinspection PyUnresolvedReferences
- from taskmanager.tasks import (
- report_feedback_to_dealer_via_wechat,
- report_new_payment_to_dealer_via_wechat,
- report_to_user_via_wechat,
- send_msg_to_user_via_wechat,
- report_to_dealer_via_wechat,
- whale_withdraw_order_alert,
- withdraw_error_alert,
- send_topic_command,
- poll_dealer_recharge_record,
- poll_user_recharge_record,
- report_to_user_low_power,
- turn_on_power_huan_dian_gui,
- test_sync,
- check_withdraw_via_bank,
- generate_ad_excel_report,
- generate_business_stats_report_by_dealer,
- export_consume_order_excel_from_db,
- export_API_order_excel_from_db,
- export_on_points_order_excel_from_db,
- export_send_coins_to_card_order_excel_from_db,
- export_charge_order_excel_from_db,
- export_group_stat_excel_from_db,
- export_vcard_info_excel_from_db,
- export_group_user_account_excel_form_db,
- generate_simCharge_excel_report,
- generate_dealerWithDraw_excel_report,
- generate_biz_stats_for_manager,
- import_simcard_excel_to_db,
- export_simcard_excel_from_db,
- export_device_excel_from_db,
- manager_export_charge_order_excel_from_db,
- manager_export_consume_order_excel_from_db,
- manager_export_dealer_info_excel_from_db,
- export_aggregate_dealer_income,
- device_offline_notify,
- batch_set_device_params,
- set_device_params,
- batch_set_server_settings,
- set_server_settings,
- send_to_xf_all_dev_info,
- send_to_xf_falut,
- send_to_xf_fault_handle,
- notify_insurance_order_subscribe,
- notify_insurance_order_cancel,
- push_shanghai_platform_heatbeat,
- export_modify_customer_balance_record_excel_from_db,
- )
- logger = logging.getLogger(__name__)
- if TYPE_CHECKING:
- from taskmanager import TraceTask
- _task_registry = {k: v for k, v in globals().items() if isinstance(v, PromiseProxy)} # type: dict
- def task_caller(func_name, delay = None, expires = None, offline_task_id = None, **kwargs):
- """
- 任务中继器,为了不影响主线任务运作
- 注意做了保护, 如果rabbitmq锁死, 会暂时停止
- 所以只能提交可以允许忽略的任务
- :param func_name:
- :param delay:
- :param expires:
- :param offline_task_id:
- :param kwargs:
- :return:
- """
- if settings.NO_CELERY_TASK:
- logger.info('[task_caller] NO_CELERY_TASK')
- return
- if settings.DEBUG_CELERY_TASK_ROUTINE:
- try:
- f = _task_registry.get(func_name) # type: TraceTask
- logger.info('[task_caller] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
- async_operation(f, **kwargs)
- except Exception as e:
- logger.exception('[task_caller] error=%s' % (e,))
- else:
- from apps.web.core.models import SystemSettings, OfflineTask
- if SystemSettings.get_system_setting("DISABLE_CELERY", False):
- logger.info('[task_caller] function(name={}) result=no publish.'.format(func_name))
- if offline_task_id:
- offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask]
- if not offline_task:
- logger.error('no such task id({})'.format(offline_task_id))
- else:
- updated = offline_task.update(celery_task_id = offline_task_id,
- status = 'FAILURE',
- finishedTime = datetime.datetime.now())
- if not updated:
- logger.error(u'update offline task(id=%s) failed.' % (offline_task_id,))
- else:
- logger.debug(u'update offline task(id=%s) success.' % (offline_task_id,))
- else:
- if settings.CELERY_PUBLISH_METHOD == 'gevent':
- try:
- import gevent
- gevent.spawn(celery_task_impl, func_name, delay, expires, offline_task_id, **kwargs)
- except Exception as e:
- logger.exception(e)
- elif settings.CELERY_PUBLISH_METHOD == 'thread':
- async_operation(celery_task_impl, func_name, delay, expires, offline_task_id, **kwargs)
- else:
- celery_task_impl(func_name, delay, expires, offline_task_id, **kwargs)
- def celery_task_impl(func_name, delay = None, expires = None, offline_task_id = None, **kwargs):
- start = time.time()
- try:
- f = _task_registry.get(func_name) # type: TraceTask
- logger.info('[task_caller] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
- now_time = datetime.datetime.utcnow()
- if delay:
- eta = now_time + datetime.timedelta(seconds = delay)
- else:
- eta = None
- if expires:
- expires = now_time + datetime.timedelta(seconds = expires)
- else:
- expires = None
- if offline_task_id:
- headers = {'offline_task_id': offline_task_id}
- result = f.s(**kwargs).apply_async(eta = eta, expires = expires, ignore_result = True,
- headers = headers)
- # from apps.web.core.models import SystemSettings, OfflineTask
- # offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask]
- # offline_task.update(celery_task_id = result)
- else:
- result = f.s(**kwargs).apply_async(eta = eta, expires = expires, ignore_result = True)
- logger.info('[task_caller] function(name=%s) result=%s' % (f.__name__, result))
- except Exception as e:
- logger.exception('[task_caller] error=%s' % (e,))
- finally:
- end = time.time()
- over_time = (end - start)
- if over_time > 5:
- logger.warning('[task_caller] function(name=%s) result=exceed %s seconds.' % (func_name, over_time))
- if over_time > 30:
- logger.warning('[task_caller] function(name=%s) result=exceed 30 seconds to disable.' % func_name)
- from apps.web.core.models import SystemSettings
- SystemSettings.set_mem_setting("DISABLE_CELERY", True)
- DingDingRobot().send_msg(
- msg = u'[CELERY告警({})] 发布任务时间超过30秒,暂时屏蔽所有发布。请检查RABBITMQ是否正常。'.format(os.environ.get('MY_ENV')))
- def task_caller_once(func_name, routing_key, queue, **kwargs):
- """
- 任务中继器,为了不影响主线任务运作
- :param queue:
- :param routing_key:
- :param func_name:
- :param kwargs:
- :return:
- """
- try:
- f = _task_registry.get(func_name) # type: TraceTask
- logger.info('[task_caller_once] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
- result = f.s(**kwargs).apply_async(routing_key = routing_key, queue = queue)
- logger.info('[task_caller_once] function(name=%s) result=%s' % (f.__name__, result))
- except Exception as e:
- logger.error('[task_caller_once] error=%s' % (e,))
|