tasks.py 13 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import requests
  5. from celery.utils.log import get_task_logger
  6. from django.conf import settings
  7. from mongoengine import DoesNotExist
  8. from typing import Optional, TYPE_CHECKING
  9. from apps.web.agent.models import Agent
  10. from apps.web.common.transaction.pay import PayManager, RefundManager
  11. from apps.web.constant import Const
  12. from apps.web.core.bridge import WechatClientProxy
  13. from apps.web.core.helpers import ActionDeviceBuilder
  14. from apps.web.core.models import WechatUserManagerApp, WechatUserSubscribeManagerApp
  15. from apps.web.core.payment import PaymentGateway
  16. from apps.web.dealer.models import Dealer, VirtualCard
  17. from apps.web.device.models import Group, Device
  18. from apps.web.helpers import get_wechat_user_manager_mp_proxy, get_wechat_user_sub_manager_mp_proxy, \
  19. get_wechat_user_messager_app
  20. from apps.web.user.models import RechargeRecord, UserVirtualCard, RefundMoneyRecord
  21. from apps.web.promotion.models import InsuranceOrder, Insurance
  22. from apps.web.user.transaction import post_pay
  23. from apps.web.user.transaction_deprecated import refund_post_pay
  24. from apps.web.utils import concat_front_end_url
  25. if TYPE_CHECKING:
  26. from apps.web.common.transaction.pay import PayRecordPoller
  27. from apps.web.user.models import MyUser, ConsumeRecord
  28. from apps.web.core.payment import PaymentGatewayT
  29. logger = get_task_logger('user.tasks')
  30. def report_to_user_via_wechat(openId, dealerId, templateName, url = None, **kwargs):
  31. if not settings.WECHAT_PUSH_ENABLED:
  32. logger.info('WECHAT_PUSH_ENABLED is false, you are probably in a dev/testing env')
  33. else:
  34. dealer = Dealer.objects(id = str(dealerId)).first() # type: Optional[Dealer]
  35. if not dealer:
  36. return {'info': 'dealer does not exist, id=%s' % (dealerId,)}
  37. agent = Agent.objects(id = str(dealer.agentId)).first() # type: Optional[Agent]
  38. if not agent:
  39. return {'info': 'agent does not exist, id=%s' % (dealer.agentId,)}
  40. try:
  41. wechat_mp_proxy = get_wechat_user_sub_manager_mp_proxy(agent)
  42. wechat_mp_proxy.publish(openId = openId, templateName = templateName, url = url, **kwargs)
  43. except Exception as e:
  44. logger.info('wechat_mp_proxy.publish is error=<{}>'.format(e))
  45. try:
  46. wechat_mp_proxy = get_wechat_user_manager_mp_proxy(agent)
  47. wechat_mp_proxy.notify(openId = openId, templateName = templateName, url = url, **kwargs)
  48. except Exception as e:
  49. logger.info('wechat_mp_proxy.notify is error=<{}>'.format(e))
  50. def send_msg_to_user_via_wechat(productId, openId, templateName, url = None, **kwargs):
  51. if not settings.WECHAT_PUSH_ENABLED:
  52. logger.info('WECHAT_PUSH_ENABLED is false, you are probably in a dev/testing env')
  53. else:
  54. try:
  55. product_agent = Agent.objects(id = productId).first()
  56. if not product_agent:
  57. logger.warning('user<id={}> has no product agent id.')
  58. return
  59. app = get_wechat_user_messager_app(product_agent)
  60. if templateName not in app.templateIdMap:
  61. logger.warning('is not support {}'.format(templateName))
  62. return
  63. template = app.templateIdMap[templateName]
  64. payload = kwargs.get(app.__class__.__name__, None)
  65. if not payload:
  66. logger.warning('not support subscribe message.')
  67. return
  68. if isinstance(app, WechatUserManagerApp):
  69. WechatClientProxy(app).notify_msg(openId, template, url, **payload)
  70. elif isinstance(app, WechatUserSubscribeManagerApp):
  71. WechatClientProxy(app).publish_msg(openId, template, url, **payload)
  72. else:
  73. logger.warning('no support app type.')
  74. except Exception as e:
  75. logger.exception(e)
  76. def poll_user_recharge_record(pay_app_type, record_id, interval, total_count):
  77. # type: (str, str, int, int)->None
  78. poller_cls = PayManager().get_poller(pay_app_type = pay_app_type)
  79. poller = poller_cls(
  80. record_id = record_id,
  81. interval = interval,
  82. total_count = total_count,
  83. record_cls = RechargeRecord,
  84. post_pay = post_pay) # type: PayRecordPoller
  85. poller.start()
  86. def test_sync():
  87. headers = {'Content-Type': 'application/json'}
  88. with requests.sessions.Session() as session:
  89. res = session.request(
  90. url = 'http://develop.5tao5ai.com/user/test',
  91. method = 'get',
  92. headers = headers,
  93. timeout = 15)
  94. return 'ok'
  95. def report_to_user_low_power(devNo, line, power, managerialOpenId, dealerId):
  96. """
  97. 低电量用户告警
  98. :param devNo:
  99. :param line:
  100. :param power:
  101. :param managerialOpenId:
  102. :param dealerId:
  103. :return:
  104. """
  105. dev = Device.get_dev(devNo)
  106. box = ActionDeviceBuilder.create_action_device(dev)
  107. result = box.get_port_info(line)
  108. group = Group.get_group(dev.get("groupId"))
  109. if result.get("power", 999) < int(power):
  110. report_to_user_via_wechat(
  111. openId = managerialOpenId or "",
  112. dealerId = dealerId,
  113. templateName = "device_fault",
  114. device = u"{}-{}".format(dev.logicalCode, line),
  115. title = u"充电设备低功率告警",
  116. level = u"一般",
  117. address = u"{}".format(group.get("groupName")),
  118. fault = u"您使用 <{}> 设备 <{}> 号充电端口进行充电,当前充电功率 <{}> 过低,可能会影响正常充电。".format(dev.logicalCode, line, power),
  119. notifyTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  120. )
  121. def notify_virtual_card_expired():
  122. """
  123. 虚拟卡卷到期提醒
  124. 提醒方式是 过期前三天 过期后三天 每天18:00提醒一次
  125. 霍珀的 虚拟卡 名称叫 电子月票 使用特性 is_huopo 来标识
  126. :return:
  127. """
  128. agentIdList = list()
  129. huoPoAgentIdList = list()
  130. # 找到需要通知的经销商
  131. agents = Agent.objects.only("id", "features")
  132. for agent in agents:
  133. if agent.features and "notify_virtual_card_expired" in agent.features:
  134. if "is_huopo" not in agent.features:
  135. agentIdList.append(str(agent.id))
  136. else:
  137. huoPoAgentIdList.append(str(agent.id))
  138. dealerIdList = [str(dealer.id) for dealer in Dealer.objects.filter(agentId__in = agentIdList).only("id", "agentId")]
  139. huoPoDealerIdList = [str(dealer.id) for dealer in
  140. Dealer.objects.filter(agentId__in = huoPoAgentIdList).only("id", "agentId")]
  141. # 先找母卡 母卡判断母卡是否停售或者删除
  142. cardTypes = VirtualCard.objects.filter(
  143. ownerId__in = dealerIdList + huoPoDealerIdList,
  144. status = 1
  145. )
  146. vCards = list()
  147. nowTime = datetime.datetime.now()
  148. for cardType in cardTypes:
  149. leftTime = nowTime - datetime.timedelta(days = min(cardType.needRenewMax, Const.VCARD_NEED_NOTIFY_TIME))
  150. rightTime = nowTime + datetime.timedelta(days = min(cardType.needRenewMin, Const.VCARD_NEED_NOTIFY_TIME))
  151. vCards.extend(
  152. UserVirtualCard.objects.filter(
  153. cardTypeId = str(cardType.id),
  154. expiredTime__gt = leftTime,
  155. expiredTime__lt = rightTime
  156. )
  157. )
  158. vCardInfoList = []
  159. for vCard in vCards:
  160. if vCard.dealerId in dealerIdList:
  161. cardName = u"优惠卡卷"
  162. else:
  163. cardName = u"电子月票"
  164. # 即将过期的
  165. if vCard.expiredTime > nowTime:
  166. title = u"您的{}<{}>即将过期,为避免影响您的正常使用,请及时续费充值".format(cardName, vCard.cardName)
  167. # 已经过期的
  168. else:
  169. title = u"您的{}<{}>已经过期,如需继续使用,请及时续费充值".format(cardName, vCard.cardName)
  170. expiredTime = vCard.expiredTime.strftime("%Y-%m-%d %H:%M:%S")
  171. vCardInfo = {
  172. "openId" : vCard.managerialOpenId,
  173. "dealerId" : vCard.dealerId,
  174. "cardNo" : vCard.cardNo,
  175. "cardName":cardName,
  176. "title" : title,
  177. "expiredTime" : expiredTime
  178. }
  179. vCardInfoList.append(vCardInfo)
  180. for vCardInfo in vCardInfoList:
  181. report_to_user_via_wechat(
  182. openId=vCardInfo.get("openId"),
  183. dealerId=vCardInfo.get("dealerId"),
  184. templateName="service_expired",
  185. title=vCardInfo.get("title"),
  186. cardNo=vCardInfo.get("cardNo"),
  187. expiredTime=vCardInfo.get("expiredTime"),
  188. remark=u"您可以打开公众号的个人中心,选择{},然后选择您的卡票进行续费".format(vCardInfo.get("cardName"))
  189. )
  190. def notify_insurance_order_subscribe(orderId): # type:(InsuranceOrder) -> None
  191. """
  192. 通知用户投保成功的通知
  193. {{first.DATA}}
  194. 保单名称:{{keyword1.DATA}}
  195. 保单种类:{{keyword2.DATA}}
  196. 保单单号:{{keyword3.DATA}}
  197. 支付保费:{{keyword4.DATA}}
  198. 保单生效时间:{{keyword5.DATA}}
  199. {{remark.DATA}}
  200. :param orderId: 用户保险单的 ID
  201. :return:
  202. """
  203. try:
  204. order = InsuranceOrder.objects.get(id = orderId)
  205. except DoesNotExist:
  206. logger.error("can not find insurance order <id={}> to report user!".format(orderId))
  207. return
  208. # 防止任务重入以及任务过于延时 做拦截
  209. if not order.effective:
  210. logger.warning(
  211. "insurance order <id={}> is not effective <effective={}>, need not to report user!".format(orderId,
  212. order.effective))
  213. return
  214. if order.endTime < datetime.datetime.now():
  215. logger.warning(
  216. "insurance order <id={}> is expired <endTime={}>, need not to report user!".format(orderId, order.endTime))
  217. return
  218. insurance = order.insurance # type: Insurance
  219. user = order.user # type: MyUser
  220. title = u"已加入保险保障,本服务由【中国平安】承保"
  221. url = concat_front_end_url(uri = '/user/index.html#/insurance/compensate')
  222. report_to_user_via_wechat(
  223. openId = user.managerialOpenId,
  224. dealerId = order.ownerId,
  225. templateName = "insurance_sub",
  226. title = title,
  227. name = insurance.name,
  228. category = insurance.categoryName,
  229. orderNo = order.orderNo,
  230. money = order.money,
  231. validTime = u"{}至{}".format(
  232. order.startTime.strftime("%Y-%m-%d %H:%M:%S"),
  233. order.endTime.strftime("%Y-%m-%d %H:%M:%S")
  234. ),
  235. remark = u"点击查看理赔条款>>>",
  236. url = url
  237. )
  238. def notify_insurance_order_cancel(orderId):
  239. """
  240. 通知用户保单取消的通知
  241. {{first.DATA}}
  242. 保单号:{{keyword1.DATA}}
  243. 产品名称:{{keyword2.DATA}}
  244. 被保险人:{{keyword3.DATA}}
  245. 保障期间:{{keyword4.DATA}}
  246. 退款金额:{{keyword5.DATA}}
  247. {{remark.DATA}}
  248. :param orderId: 用户保险单的ID
  249. :return:
  250. """
  251. try:
  252. order = InsuranceOrder.objects.get(id = orderId)
  253. except DoesNotExist:
  254. logger.error("can not find insurance order <id={}> to report user!".format(orderId))
  255. return
  256. # 防止被错误调用 判断一次状态
  257. if order.effective:
  258. logger.warning(
  259. "insurance order <id={}> is effective <effective={}>, need not to report user cancel!".format(orderId,
  260. order.effective))
  261. return
  262. insurance = order.insurance # type: Insurance
  263. user = order.user # type: MyUser
  264. title = u"退保成功通知"
  265. report_to_user_via_wechat(
  266. openId = user.managerialOpenId,
  267. dealerId = order.ownerId,
  268. templateName = "insurance_cancel",
  269. title = title,
  270. orderNo = order.orderNo,
  271. name = insurance.name,
  272. user = u"微信昵称-{}".format(user.nickname),
  273. cancelTime = u"于 {} 退保".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
  274. money = order.money,
  275. remark = u"感谢您的使用"
  276. )
  277. def pull_refund_order():
  278. """
  279. 拉取 状态不定的 退款订单
  280. """
  281. refundOrders = RefundMoneyRecord.objects.filter(
  282. status__in = [RefundMoneyRecord.Status.PROCESSING],
  283. datetimeAdded__gt = datetime.datetime.now() - datetime.timedelta(days = 2)
  284. )
  285. for refundOrder in refundOrders:
  286. rechargeOrder = RechargeRecord.objects.get(id = refundOrder.rechargeObjId) # type: RechargeRecord
  287. payOrder = rechargeOrder.payOrder
  288. payGateway = PaymentGateway.clone_from_order(payOrder) # type: PaymentGatewayT
  289. try:
  290. puller = RefundManager().get_poller(payGateway.pay_app_type)
  291. puller(refundOrder).pull(payGateway, payOrder, refund_post_pay)
  292. except Exception as e:
  293. logger.exception(e)
  294. def pull_consume_order_timeout(orderNo):
  295. """
  296. 拉取消费订单是否会被超时
  297. """
  298. order = get_consume_order(orderNo) # type: ConsumeRecord
  299. if order.status == order.Status.WAIT_CONF:
  300. order.to_failure(desc=u"订单支付超时")