mediator.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. 为了让任务模块不影响基本业务,需要做统一的处理
  5. """
  6. import logging
  7. import datetime
  8. import os
  9. import time
  10. from django.conf import settings
  11. from typing import TYPE_CHECKING, Optional
  12. import simplejson as json
  13. from celery.local import PromiseProxy
  14. from apps.thirdparties.dingding import DingDingRobot
  15. from apps.web.core.utils import async_operation
  16. # noinspection PyUnresolvedReferences
  17. from taskmanager.tasks import (
  18. report_feedback_to_dealer_via_wechat,
  19. report_new_payment_to_dealer_via_wechat,
  20. report_to_user_via_wechat,
  21. send_msg_to_user_via_wechat,
  22. report_to_dealer_via_wechat,
  23. whale_withdraw_order_alert,
  24. withdraw_error_alert,
  25. send_topic_command,
  26. poll_dealer_recharge_record,
  27. poll_user_recharge_record,
  28. report_to_user_low_power,
  29. turn_on_power_huan_dian_gui,
  30. test_sync,
  31. check_wechat_withdraw_via_bank,
  32. generate_ad_excel_report,
  33. generate_business_stats_report_by_dealer,
  34. export_consume_order_excel_from_db,
  35. export_API_order_excel_from_db,
  36. export_on_points_order_excel_from_db,
  37. export_send_coins_to_card_order_excel_from_db,
  38. export_charge_order_excel_from_db,
  39. export_group_stat_excel_from_db,
  40. export_vcard_info_excel_from_db,
  41. export_group_user_account_excel_form_db,
  42. generate_simCharge_excel_report,
  43. generate_dealerWithDraw_excel_report,
  44. generate_biz_stats_for_manager,
  45. import_simcard_excel_to_db,
  46. export_simcard_excel_from_db,
  47. export_device_excel_from_db,
  48. manager_export_charge_order_excel_from_db,
  49. manager_export_consume_order_excel_from_db,
  50. manager_export_dealer_info_excel_from_db,
  51. export_aggregate_dealer_income,
  52. query_merchant_status,
  53. device_offline_notify,
  54. batch_set_device_params,
  55. set_device_params,
  56. batch_set_server_settings,
  57. set_server_settings,
  58. send_to_xf_all_dev_info,
  59. send_to_xf_falut,
  60. send_to_xf_fault_handle,
  61. notify_insurance_order_subscribe,
  62. notify_insurance_order_cancel,
  63. push_shanghai_platform_heatbeat,
  64. export_modify_customer_balance_record_excel_from_db,
  65. pull_consume_order_timeout
  66. )
  67. logger = logging.getLogger(__name__)
  68. if TYPE_CHECKING:
  69. from taskmanager import TraceTask
  70. _task_registry = {k: v for k, v in globals().items() if isinstance(v, PromiseProxy)} # type: dict
  71. def task_caller(func_name, delay=None, expires=None, offline_task_id=None, **kwargs):
  72. """
  73. 任务中继器,为了不影响主线任务运作
  74. 注意做了保护, 如果rabbitmq锁死, 会暂时停止
  75. 所以只能提交可以允许忽略的任务
  76. :param func_name:
  77. :param delay:
  78. :param expires:
  79. :param offline_task_id:
  80. :param kwargs:
  81. :return:
  82. """
  83. if settings.NO_CELERY_TASK:
  84. logger.info('[task_caller] NO_CELERY_TASK')
  85. return None
  86. result = None
  87. if settings.DEBUG_CELERY_TASK_ROUTINE:
  88. try:
  89. f = _task_registry.get(func_name) # type: TraceTask
  90. logger.info('[task_caller] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
  91. async_operation(f, **kwargs)
  92. except Exception as e:
  93. logger.exception('[task_caller] error=%s' % (e,))
  94. else:
  95. from apps.web.core.models import SystemSettings, OfflineTask
  96. if SystemSettings.get_system_setting("DISABLE_CELERY", False):
  97. logger.info('[task_caller] function(name={}) result=no publish.'.format(func_name))
  98. if offline_task_id:
  99. offline_task = OfflineTask.objects(id=offline_task_id).first() # type: Optional[OfflineTask]
  100. if not offline_task:
  101. logger.error('no such task id({})'.format(offline_task_id))
  102. else:
  103. updated = offline_task.update(celery_task_id=offline_task_id,
  104. status='FAILURE',
  105. finishedTime=datetime.datetime.now())
  106. if not updated:
  107. logger.error(u'update offline task(id=%s) failed.' % (offline_task_id,))
  108. else:
  109. logger.debug(u'update offline task(id=%s) success.' % (offline_task_id,))
  110. else:
  111. start = time.time()
  112. try:
  113. f = _task_registry.get(func_name) # type: TraceTask
  114. logger.info('[task_caller] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
  115. now_time = datetime.datetime.utcnow()
  116. if delay:
  117. eta = now_time + datetime.timedelta(seconds=delay)
  118. else:
  119. eta = None
  120. if expires:
  121. expires = now_time + datetime.timedelta(seconds=expires)
  122. else:
  123. expires = None
  124. if offline_task_id:
  125. headers = {'offline_task_id': offline_task_id}
  126. result = f.s(**kwargs).apply_async(eta=eta, expires=expires, ignore_result=True,
  127. headers=headers)
  128. else:
  129. result = f.s(**kwargs).apply_async(eta=eta, expires=expires, ignore_result=True)
  130. logger.info('[task_caller] function(name=%s) result=%s' % (f.__name__, result))
  131. except Exception as e:
  132. logger.exception('[task_caller] error=%s' % (e,))
  133. finally:
  134. end = time.time()
  135. over_time = (end - start)
  136. if over_time > 5:
  137. logger.warning('[task_caller] function(name=%s) result=exceed %s seconds.' % (func_name, over_time))
  138. if over_time > 30:
  139. logger.warning('[task_caller] function(name=%s) result=exceed 30 seconds to disable.' % func_name)
  140. SystemSettings.set_mem_setting("DISABLE_CELERY", True)
  141. DingDingRobot().send_msg(
  142. msg=u'[CELERY告警({})] 发布任务时间超过30秒,暂时屏蔽所有发布。请检查RABBITMQ是否正常。'.format(os.environ.get('MY_ENV')))
  143. return result
  144. def task_caller_once(func_name, routing_key, queue, **kwargs):
  145. """
  146. 任务中继器,为了不影响主线任务运作
  147. :param queue:
  148. :param routing_key:
  149. :param func_name:
  150. :param kwargs:
  151. :return:
  152. """
  153. try:
  154. f = _task_registry.get(func_name) # type: TraceTask
  155. logger.info('[task_caller_once] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
  156. result = f.s(**kwargs).apply_async(routing_key=routing_key, queue=queue)
  157. logger.info('[task_caller_once] function(name=%s) result=%s' % (f.__name__, result))
  158. except Exception as e:
  159. logger.error('[task_caller_once] error=%s' % (e,))