kehang.py 19 KB


  1. # coding=utf-8
  2. import datetime
  3. import logging
  4. from arrow import Arrow
  5. from django.conf import settings
  6. from typing import TYPE_CHECKING, Iterable
  7. from apilib.monetary import VirtualCoin, RMB
  8. from apps.web.common.models import TempValues
  9. from apps.web.constant import START_DEVICE_STATUS, DEALER_CONSUMPTION_AGG_KIND, Const
  10. from apps.web.core.device_define.kehang import BillingType, REASON_MAP, CARD_STATUS, CARD_OPT
  11. from apps.web.device.models import Device, Group
  12. from apps.web.eventer import EventBuilder
  13. from apps.web.eventer.base import ComNetPayAckEvent, IdStartAckEvent, AckEventProcessorIntf, WorkEvent
  14. from apps.web.user.models import Card, ServiceProgress, CardRechargeOrder, CardRechargeRecord
  15. from apps.web.user.utils import clear_frozen_user_balance
  16. from apps.web.utils import set_start_key_status
  17. if TYPE_CHECKING:
  18. from apps.web.user.models import ConsumeRecord
  19. logger = logging.getLogger(__name__)
  20. class builder(EventBuilder):
  21. def __getEvent__(self, device_event):
  22. if 'order_id' in device_event:
  23. if device_event["order_type"] == "com_start":
  24. return MyComNetPayAckEvent(self.deviceAdapter, device_event)
  25. elif device_event["order_type"] == "id_start":
  26. return IDPayAckEvent(self.deviceAdapter, device_event)
  27. event_data = self.deviceAdapter.analyze_event_data(device_event['data'])
  28. if event_data is None or 'cmdCode' not in event_data:
  29. return None
  30. if event_data["cmdCode"] in ["22", "11", "12", "16"]:
  31. return KeHangWorkEvent(self.deviceAdapter, event_data)
  32. class KeHangWorkEvent(WorkEvent):
  33. def _do_query_card(self):
  34. logger.info("device <{}> do query id card balance, event = {}".format(self.device.devNo, self.event_data))
  35. cardNo = self.event_data["cardNo"]
  36. card = self.update_card_dealer_and_type(cardNo) # type: Card
  37. # 非法卡 (经销商绑卡会有 openId = any......)
  38. if not card or not card.openId:
  39. logger.info("device <{}> receive query card, error card = <{}>".format(self.device.devNo, cardNo))
  40. return self.deviceAdapter._response_card(int(cardNo), CARD_STATUS.ILLEGAL, float(VirtualCoin(0)), openId="")
  41. # 冻结卡
  42. if card.frozen:
  43. logger.info("device <{}> receive query card, frozen card = <{}>".format(self.device.devNo, cardNo))
  44. return self.deviceAdapter._response_card(int(cardNo), CARD_STATUS.FROZEN, float(VirtualCoin(card.balance)) * 10, openId=str(card.id))
  45. # 正常的卡余额回复 不需要管余额是否足够 交由模块判断
  46. logger.info("device <{}> receive query card success, card = <{}>, balance = <{}>".format(self.device.devNo, cardNo, card.balance))
  47. return self.deviceAdapter._response_card(int(cardNo), CARD_STATUS.SUCCESS, float(VirtualCoin(card.balance)) * 10, openId=str(card.id))
  48. def _do_ic_card(self):
  49. logger.info("device <{}> do card start event, event = {}".format(self.device.devNo, self.event_data))
  50. cardNo = self.event_data["cardNo"]
  51. card = self.update_card_dealer_and_type(cardNo, "IC")
  52. # 卡不存在的地方 直接退出
  53. if not card:
  54. return
  55. # 卡存在的情况下 分为退费 和 扣费两种
  56. if self.event_data["opt"] == CARD_OPT.DEDUCT:
  57. orderNo, cardOrderNo = self.record_consume_for_card(card, RMB(self.event_data["cost"]), desc=u"离线卡启动")
  58. consumeDict = {
  59. "orderNo": "orderNo",
  60. "cardOrderNo": cardOrderNo
  61. }
  62. ServiceProgress.register_card_service(
  63. self.device,
  64. int(self.event_data["port"]),
  65. card,
  66. consumeDict
  67. )
  68. Device.update_dev_control_cache(self.device.devNo, {str(self.event_data["port"]): {"status": Const.DEV_WORK_STATUS_WORKING}})
  69. else:
  70. self.record_refund_money_for_card(RMB(self.event_data["cost"]), str(card.id))
  71. Device.clear_port_control_cache(self.device.devNo, str(self.event_data["port"]))
  72. def _do_ic_card_sync_balance(self):
  73. logger.info("device <{}> do ic card sync balance, event = {}".format(self.device.devNo, self.event_data))
  74. cardNo = self.event_data["cardNo"]
  75. cardType = self.event_data["type"]
  76. card = self.update_card_dealer_and_type(cardNo, "IC")
  77. if not card:
  78. logger.info("device <{}> do ic card <{}> sync balance, card not find".format(self.device.devNo, cardNo))
  79. return
  80. money, coins, orderNos, cardOrderNos = CardRechargeOrder.get_to_do_list(str(card.id))
  81. if not orderNos:
  82. logger.info("device <{}> do ic card <{}> sync balance, charge orders not find".format(self.device.devNo, cardNo))
  83. return
  84. # 订单号保存到缓存里
  85. TempValues.set('%s-%s' % (self.device.devNo, cardNo), value=cardOrderNos)
  86. asyncMoney = (RMB(self.event_data["balance"]) + RMB(coins)) * 10
  87. try:
  88. CardRechargeOrder.update_card_order_has_finished(str(card.id))
  89. except Exception as e:
  90. logger.exception('%s' % e)
  91. else:
  92. self.deviceAdapter._response_sync_card_balance(cardNo, asyncMoney)
  93. def _do_ic_card_sync_response(self):
  94. logger.info("device <{}> do ic card sync balance response, event = {}".format(self.device.devNo, self.event_data))
  95. cardNo = self.event_data["cardNo"]
  96. card = self.update_card_dealer_and_type(cardNo, "IC")
  97. balance = self.event_data["balance"]
  98. success = self.event_data["success"]
  99. if not success:
  100. logger.info("device <{}> do ic card sync balance response not success, event = {}".format(self.device.devNo, self.event_data))
  101. return
  102. cardOrderNos = TempValues.get('%s-%s' % (self.device.devNo, cardNo))
  103. cardOrders = CardRechargeOrder.objects.filter(orderNo__in=cardOrderNos)
  104. for _order in cardOrders: # type: CardRechargeOrder
  105. _order.update_after_recharge_ic_card(
  106. device=self.device,
  107. sendMoney=RMB(_order.coins),
  108. preBalance=card.balance
  109. )
  110. preBalance = card.balance
  111. card.balance = card.balance + _order.coins
  112. # 创建充值记录
  113. CardRechargeRecord.add_record(
  114. card=card,
  115. group=Group.get_group(_order.groupId),
  116. order=_order,
  117. device=self.device
  118. )
  119. self.update_card_balance(card, RMB(balance))
  120. TempValues.remove('%s-%s' % (self.device.devNo, cardNo))
  121. def do(self, **args):
  122. if self.event_data["cmdCode"] == "22":
  123. return self._do_query_card()
  124. if self.event_data["cmdCode"] == "11":
  125. return self._do_ic_card()
  126. if self.event_data["cmdCode"] == "12":
  127. return self._do_ic_card_sync_balance()
  128. if self.event_data["cmdCode"] == "16":
  129. return self._do_ic_card_sync_response()
  130. class MyComNetPayAckEvent(ComNetPayAckEvent):
  131. def do_running_order(self, order, result): # type: (ConsumeRecord, dict) -> None
  132. """
  133. 处理运行订单
  134. :param order: 用户服务器订单
  135. :param result: device_event 设备侧订单
  136. :return:
  137. """
  138. # 订单消息已经被回复过
  139. if order.status in ["running", "finished"]:
  140. logger.debug('order<{}> no need to deal. this has done.'.format(repr(order)))
  141. return
  142. # 启动设备的时候 设备实际启动成功 但是订单串口超时 不知道订单的明确状态 后面启动时间又重新上报
  143. if order.status == "unknown":
  144. errorDesc = u"设备信号恢复,订单正常运行"
  145. logger.info("order <{}> timeout to running")
  146. # 正常运行的订单
  147. else:
  148. errorDesc = u""
  149. if 'master' in result:
  150. order.association = {
  151. 'master': result['master']
  152. }
  153. order.servicedInfo.update({'masterOrderNo': result['master']})
  154. order.errorDesc = errorDesc
  155. order.isNormal = True
  156. order.status = 'running'
  157. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  158. order.save()
  159. set_start_key_status(start_key=order.startKey, state=START_DEVICE_STATUS.FINISHED, order_id=str(order.id))
  160. def do_finished_order(self, order, result): # type: (ConsumeRecord, dict) -> None
  161. """
  162. 处理结束运行订单
  163. :param order: 用户服务器订单
  164. :param result: device_event 设备侧订单
  165. :return:
  166. """
  167. portCache = Device.get_port_control_cache(self.device.devNo, str(order.used_port))
  168. self.event_data["portCache"] = portCache
  169. # 子单归并主单
  170. if 'sub' in result:
  171. order.association = {
  172. 'sub': [item['order_id'] for item in self.event_data['sub']]
  173. }
  174. order.servicedInfo.update(
  175. {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))})
  176. # 主单归并自身
  177. elif 'master' in result:
  178. order.association = {
  179. 'master': result['master']
  180. }
  181. order.servicedInfo.update({'masterOrderNo': result['master']})
  182. # 此时理论上服务器订单状态有三种可能(finished在上层已经被排除)
  183. # 正常的状态 相当于订单由运行状态 即将切换为finished状态
  184. if order.status == "running":
  185. order.isNormal = True
  186. order.status = "finished"
  187. order.errorDesc = u""
  188. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  189. # 非正常状态 相当于订单最开始串口超时 然后直接变为结束
  190. elif order.status == "unknown":
  191. order.isNormal = True
  192. order.status = "finished"
  193. order.errorDesc = u"设备信号恢复,订单正常结束(0001)"
  194. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  195. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  196. # 正常状态 相当于订单启动失败或者是中间running单没有上来
  197. elif order.status == "created":
  198. order.isNormal = True
  199. order.status = "finished"
  200. order.errorDesc = u"设备信号恢复,订单正常结束(0002)"
  201. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  202. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  203. else:
  204. logger.warning('order<{}> status = <{}> to finished. no deal with'.format(repr(order), order.status))
  205. order.save()
  206. set_start_key_status(start_key=order.startKey, state=START_DEVICE_STATUS.FINISHED, order_id=str(order.id))
  207. def do_finished_event(self, order, sub_orders, merge_order_info): # type:(ConsumeRecord, list, dict) -> None
  208. """
  209. 订单的状态已经完成 进一步事件 扣费等等
  210. :param order: 处理完毕的订单(主订单)
  211. :param sub_orders: 子订单
  212. :param merge_order_info: 合并单的信息
  213. :return:
  214. """
  215. order.reload()
  216. # 解析event的参数 这个left是转换后的时间
  217. """
  218. 比如1元240分钟,充电后变为120分钟,一分钟后119,然后,过一会进入下一档,80分钟,这时候如果停止,我会把80分钟转换为230多分钟给你
  219. """
  220. left = self.event_data["left"]
  221. # 获取端口的缓存
  222. portCache = self.event_data["portCache"]
  223. need = portCache["needValue"]
  224. coins = portCache["coins"]
  225. billingType = portCache["billingType"]
  226. # need 和 left 单位始终相同 所以不会造成退费金额出错
  227. backCoins = VirtualCoin(coins) * ((left * 1.0) / need)
  228. if not self.device.is_auto_refund:
  229. backCoins = VirtualCoin(0)
  230. else:
  231. backCoins = min(VirtualCoin(coins), backCoins)
  232. # 算电量 算时间
  233. if billingType == BillingType.TIME:
  234. duration = need - left
  235. spendElec = 0
  236. extra = [
  237. {u"本次订购时长": "{}分钟".format(need)},
  238. {u"本次实际使用时长": "{}分钟".format(need-left)},
  239. {u"消费金额": "{}(金币)".format((VirtualCoin(coins) - VirtualCoin(backCoins)).amount)}
  240. ]
  241. else:
  242. # 电量模式下 时间有可能不太准 考虑断电的情况
  243. duration = (self.event_data["fts"] - self.event_data["sts"]) / 60
  244. spendElec = (need - left) / 100.0
  245. extra = [
  246. {u"本次订购电量": "{} 度".format(need/100.0)},
  247. {u"消费金额": "{}(金币)".format((VirtualCoin(coins) - VirtualCoin(backCoins)).amount)}
  248. ]
  249. clear_frozen_user_balance(self.device, order, duration, spendElec=spendElec, backCoins=backCoins, user=order.user)
  250. # 组织消费信息
  251. consumeDict = {
  252. "reason": self._get_finish_reason(),
  253. "billingType": u"时间计费" if billingType == BillingType.TIME else u"电量计费",
  254. "leftTime": left,
  255. DEALER_CONSUMPTION_AGG_KIND.DURATION: duration,
  256. DEALER_CONSUMPTION_AGG_KIND.SPEND_MONEY: (VirtualCoin(coins) - VirtualCoin(backCoins)).mongo_amount,
  257. DEALER_CONSUMPTION_AGG_KIND.REFUNDED_COINS: str(backCoins)
  258. }
  259. spendElec and consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.ELEC: spendElec})
  260. spendElec and consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.ELECFEE: self.calc_elec_fee(spendElec)})
  261. order.update_service_info(consumeDict)
  262. self.notify_user_service_complete(
  263. service_name='充电',
  264. openid=order.user.managerialOpenId,
  265. port=str(order.used_port),
  266. address=order.address,
  267. reason=consumeDict["reason"],
  268. finished_time=order.finishedTime.strftime('%Y-%m-%d %H:%M:%S'),
  269. extra=extra
  270. )
  271. def merge_order(self, master_order, sub_orders): # type:(ConsumeRecord, list)->dict
  272. """
  273. 主板暂时不支持续充 如果续充的话 时间不准
  274. :param master_order:
  275. :param sub_orders:
  276. :return:
  277. """
  278. start_time = Arrow.fromdatetime(master_order.startTime, tzinfo=settings.TIME_ZONE)
  279. need = self.event_data["charge_param"]
  280. billingType = self.event_data.get("billing_type", BillingType.TIME)
  281. coins = master_order.package["coins"]
  282. price = master_order.package["price"]
  283. portCache = {
  284. "openId": master_order.openId,
  285. "consumeType": "mobile",
  286. "needKind": "needTime" if billingType == BillingType.TIME else "needElec",
  287. "estimatedTs": int(start_time.timestamp + 720 * 60 * 60),
  288. "needValue": need,
  289. "billingType": billingType
  290. }
  291. for _sub in sub_orders:
  292. coins += _sub.package["coins"]
  293. price += _sub.package["price"]
  294. portCache["coins"] = coins
  295. portCache["price"] = price
  296. if billingType == BillingType.TIME:
  297. master_order.update_service_info({'needTime': need})
  298. else:
  299. master_order.update_service_info({'needElec': need})
  300. return portCache
  301. def _get_finish_reason(self):
  302. if "reason" in self.event_data:
  303. return REASON_MAP.get(self.event_data["reason"], u"未知原因")
  304. return u"未知原因"
  305. class IDPayProcessor(AckEventProcessorIntf):
  306. def pre_processing(self, device, event_data):
  307. event_data["cardNo"] = str(event_data.pop("card", ""))
  308. event_data["fee"] = event_data["cost"] / 10.0
  309. if "sub" in event_data and isinstance(event_data["sub"], list):
  310. new_sub = list()
  311. for _item in event_data["sub"]:
  312. _item["cardNo"] = _item.pop("card")
  313. _item["fee"] = _item["cost"] / 10.0
  314. new_sub.append(_item)
  315. event_data["sub"] = new_sub
  316. return event_data
  317. class IDPayAckEvent(IdStartAckEvent):
  318. """
  319. 端口
  320. 查询卡
  321. """
  322. def __init__(self, smartBox, event_data, pre_processor=None):
  323. super(IDPayAckEvent, self).__init__(smartBox, event_data, IDPayProcessor())
  324. def _get_finish_reason(self):
  325. if "reason" in self.event_data:
  326. return REASON_MAP.get(self.event_data["reason"], u"未知原因")
  327. return u"未知原因"
  328. def post_after_start(self, order=None):
  329. pass
  330. def post_after_finish(self, order=None):
  331. pass
  332. def checkout_order(self, order):
  333. fee = VirtualCoin(order.coin)
  334. self.card.freeze_balance(transaction_id=str(order.id), fee=fee)
  335. def do_finished_event(self, order, merge_order_info): # type:(ConsumeRecord, dict)->None
  336. left = self.event_data["left"]
  337. billingType = self.event_data["billing_type"]
  338. if self.device.is_auto_refund:
  339. backCoins = min(VirtualCoin(self.event_data["refund"]), VirtualCoin(merge_order_info["coins"]))
  340. else:
  341. backCoins = VirtualCoin(0)
  342. consumeCoins = VirtualCoin(merge_order_info["coins"]) - backCoins
  343. self.card.clear_frozen_balance(str(order.id), backCoins)
  344. self.card.reload()
  345. extra = [
  346. {u"消费金额": "{}(金币)".format(consumeCoins.amount)},
  347. {u"卡内余额": "{} (金币)".format(self.card.balance.mongo_amount)}
  348. ]
  349. consumeDict = {
  350. 'reason': self._get_finish_reason(),
  351. 'chargeIndex': str(order.used_port),
  352. 'cardNo': self.event_data['cardNo'],
  353. "left": left,
  354. "billingType": billingType
  355. }
  356. if backCoins > VirtualCoin(0):
  357. self.record_refund_money_for_card(backCoins, str(self.card.id), orderNo=order.orderNo)
  358. consumeDict.update({
  359. DEALER_CONSUMPTION_AGG_KIND.REFUNDED_COINS: backCoins.mongo_amount
  360. })
  361. order.update_service_info(consumeDict)
  362. self.notify_user_service_complete(
  363. service_name='充电',
  364. openid=self.card.managerialOpenId,
  365. port=str(order.used_port),
  366. address=order.address,
  367. reason=self.event_data.get('reasonDesc'),
  368. finished_time=order.finishedTime.strftime('%Y-%m-%d %H:%M:%S'),
  369. extra=extra
  370. )
  371. def merge_order(self, master_order, sub_orders): # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
  372. """
  373. 刷卡的退费是由主板决定的 金币以及钱只是为了展示
  374. """
  375. start_time = Arrow.fromdatetime(master_order.startTime, tzinfo=settings.TIME_ZONE)
  376. billingType = self.event_data.get("billing_type", BillingType.TIME)
  377. coins = master_order.coin
  378. price = master_order.money
  379. portCache = {
  380. "openId": master_order.openId,
  381. "consumeType": "card",
  382. "estimatedTs": int(start_time.timestamp + 720 * 60 * 60),
  383. "billingType": billingType
  384. }
  385. for _sub in sub_orders:
  386. coins += _sub.coin
  387. price += _sub.money
  388. portCache["coins"] = str(coins)
  389. portCache["price"] = str(price)
  390. return portCache