mediator.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. 为了让任务模块不影响基本业务,需要做统一的处理
  5. """
  6. import logging
  7. import datetime
  8. import os
  9. import time
  10. from django.conf import settings
  11. from typing import TYPE_CHECKING, Optional
  12. import simplejson as json
  13. from celery.local import PromiseProxy
  14. from apps.thirdparties.dingding import DingDingRobot
  15. from apps.web.core.utils import async_operation
  16. # noinspection PyUnresolvedReferences
  17. from taskmanager.tasks import (
  18. report_feedback_to_dealer_via_wechat,
  19. report_new_payment_to_dealer_via_wechat,
  20. report_to_user_via_wechat,
  21. send_msg_to_user_via_wechat,
  22. report_to_dealer_via_wechat,
  23. whale_withdraw_order_alert,
  24. withdraw_error_alert,
  25. send_topic_command,
  26. poll_dealer_recharge_record,
  27. poll_user_recharge_record,
  28. report_to_user_low_power,
  29. turn_on_power_huan_dian_gui,
  30. test_sync,
  31. check_withdraw_via_bank,
  32. generate_ad_excel_report,
  33. generate_business_stats_report_by_dealer,
  34. export_consume_order_excel_from_db,
  35. export_API_order_excel_from_db,
  36. export_on_points_order_excel_from_db,
  37. export_send_coins_to_card_order_excel_from_db,
  38. export_charge_order_excel_from_db,
  39. export_group_stat_excel_from_db,
  40. export_vcard_info_excel_from_db,
  41. export_group_user_account_excel_form_db,
  42. generate_simCharge_excel_report,
  43. generate_dealerWithDraw_excel_report,
  44. generate_biz_stats_for_manager,
  45. import_simcard_excel_to_db,
  46. export_simcard_excel_from_db,
  47. export_device_excel_from_db,
  48. manager_export_charge_order_excel_from_db,
  49. manager_export_consume_order_excel_from_db,
  50. manager_export_dealer_info_excel_from_db,
  51. export_aggregate_dealer_income,
  52. device_offline_notify,
  53. batch_set_device_params,
  54. set_device_params,
  55. batch_set_server_settings,
  56. set_server_settings,
  57. send_to_xf_all_dev_info,
  58. send_to_xf_falut,
  59. send_to_xf_fault_handle,
  60. notify_insurance_order_subscribe,
  61. notify_insurance_order_cancel,
  62. push_shanghai_platform_heatbeat,
  63. export_modify_customer_balance_record_excel_from_db,
  64. )
  65. logger = logging.getLogger(__name__)
  66. if TYPE_CHECKING:
  67. from taskmanager import TraceTask
  68. _task_registry = {k: v for k, v in globals().items() if isinstance(v, PromiseProxy)} # type: dict
  69. def task_caller(func_name, delay = None, expires = None, offline_task_id = None, **kwargs):
  70. """
  71. 任务中继器,为了不影响主线任务运作
  72. 注意做了保护, 如果rabbitmq锁死, 会暂时停止
  73. 所以只能提交可以允许忽略的任务
  74. :param func_name:
  75. :param delay:
  76. :param expires:
  77. :param offline_task_id:
  78. :param kwargs:
  79. :return:
  80. """
  81. if settings.NO_CELERY_TASK:
  82. logger.info('[task_caller] NO_CELERY_TASK')
  83. return
  84. if settings.DEBUG_CELERY_TASK_ROUTINE:
  85. try:
  86. f = _task_registry.get(func_name) # type: TraceTask
  87. logger.info('[task_caller] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
  88. async_operation(f, **kwargs)
  89. except Exception as e:
  90. logger.exception('[task_caller] error=%s' % (e,))
  91. else:
  92. from apps.web.core.models import SystemSettings, OfflineTask
  93. if SystemSettings.get_system_setting("DISABLE_CELERY", False):
  94. logger.info('[task_caller] function(name={}) result=no publish.'.format(func_name))
  95. if offline_task_id:
  96. offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask]
  97. if not offline_task:
  98. logger.error('no such task id({})'.format(offline_task_id))
  99. else:
  100. updated = offline_task.update(celery_task_id = offline_task_id,
  101. status = 'FAILURE',
  102. finishedTime = datetime.datetime.now())
  103. if not updated:
  104. logger.error(u'update offline task(id=%s) failed.' % (offline_task_id,))
  105. else:
  106. logger.debug(u'update offline task(id=%s) success.' % (offline_task_id,))
  107. else:
  108. if settings.CELERY_PUBLISH_METHOD == 'gevent':
  109. try:
  110. import gevent
  111. gevent.spawn(celery_task_impl, func_name, delay, expires, offline_task_id, **kwargs)
  112. except Exception as e:
  113. logger.exception(e)
  114. elif settings.CELERY_PUBLISH_METHOD == 'thread':
  115. async_operation(celery_task_impl, func_name, delay, expires, offline_task_id, **kwargs)
  116. else:
  117. celery_task_impl(func_name, delay, expires, offline_task_id, **kwargs)
  118. def celery_task_impl(func_name, delay = None, expires = None, offline_task_id = None, **kwargs):
  119. start = time.time()
  120. try:
  121. f = _task_registry.get(func_name) # type: TraceTask
  122. logger.info('[task_caller] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
  123. now_time = datetime.datetime.utcnow()
  124. if delay:
  125. eta = now_time + datetime.timedelta(seconds = delay)
  126. else:
  127. eta = None
  128. if expires:
  129. expires = now_time + datetime.timedelta(seconds = expires)
  130. else:
  131. expires = None
  132. if offline_task_id:
  133. headers = {'offline_task_id': offline_task_id}
  134. result = f.s(**kwargs).apply_async(eta = eta, expires = expires, ignore_result = True,
  135. headers = headers)
  136. # from apps.web.core.models import SystemSettings, OfflineTask
  137. # offline_task = OfflineTask.objects(id = offline_task_id).first() # type: Optional[OfflineTask]
  138. # offline_task.update(celery_task_id = result)
  139. else:
  140. result = f.s(**kwargs).apply_async(eta = eta, expires = expires, ignore_result = True)
  141. logger.info('[task_caller] function(name=%s) result=%s' % (f.__name__, result))
  142. except Exception as e:
  143. logger.exception('[task_caller] error=%s' % (e,))
  144. finally:
  145. end = time.time()
  146. over_time = (end - start)
  147. if over_time > 5:
  148. logger.warning('[task_caller] function(name=%s) result=exceed %s seconds.' % (func_name, over_time))
  149. if over_time > 30:
  150. logger.warning('[task_caller] function(name=%s) result=exceed 30 seconds to disable.' % func_name)
  151. from apps.web.core.models import SystemSettings
  152. SystemSettings.set_mem_setting("DISABLE_CELERY", True)
  153. DingDingRobot().send_msg(
  154. msg = u'[CELERY告警({})] 发布任务时间超过30秒,暂时屏蔽所有发布。请检查RABBITMQ是否正常。'.format(os.environ.get('MY_ENV')))
  155. def task_caller_once(func_name, routing_key, queue, **kwargs):
  156. """
  157. 任务中继器,为了不影响主线任务运作
  158. :param queue:
  159. :param routing_key:
  160. :param func_name:
  161. :param kwargs:
  162. :return:
  163. """
  164. try:
  165. f = _task_registry.get(func_name) # type: TraceTask
  166. logger.info('[task_caller_once] function(name=%s)(kwargs=%s)' % (f.__name__, json.dumps(kwargs)))
  167. result = f.s(**kwargs).apply_async(routing_key = routing_key, queue = queue)
  168. logger.info('[task_caller_once] function(name=%s) result=%s' % (f.__name__, result))
  169. except Exception as e:
  170. logger.error('[task_caller_once] error=%s' % (e,))