caiyi.py 30 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. 整个流程:
  5. MyWorkEvent
  6. 1 刷空后收到查询事件--> 2 根据卡号在本地查询卡信息(没有则创建) --> 3 根据卡信息找到卡片是否存在未付款订单 --> 4 清理付款这些订单
  7. --> 5 清理后订单后,收到余额,返回给模块
  8. MyWorkEvent
  9. 1 收到上报事件请求扣款(洗衣机已经启动) --> 2 创建订单(默认非正常订单) --> 3 洗衣机上报状态更新订单状态(扣款成功,
  10. 修改订单为正常,添加付款信息)--> 4 核销该上报事件
  11. 白名单:serviceCache.set(yc-card-card_no_hex,1) 白名单固定卡 757996699 不走查询 不会走扣费 会生存订单!
  12. 测试卡号:757996699
  13. 日志前缀:caiyi_
  14. """
  15. import datetime
  16. import json
  17. import logging
  18. import time
  19. from functools import wraps
  20. from arrow import Arrow
  21. from django.conf import settings
  22. from mongoengine import Q
  23. from typing import TYPE_CHECKING
  24. from apilib.monetary import VirtualCoin, RMB
  25. from apilib.utils_string import cn
  26. from apps import serviceCache
  27. from apps.web.common.models import TempValues
  28. from apps.web.constant import DeviceCmdCode, ErrorCode, DeviceErrorCodeDesc, START_DEVICE_STATUS, Const
  29. from apps.web.common.transaction import UserConsumeSubType
  30. from apps.web.core.adapter.base import reverse_hex
  31. from apps.web.core.exceptions import ServiceException
  32. from apps.web.core.networking import MessageSender
  33. from apps.web.dealer.models import Dealer
  34. from apps.web.device.models import Group, Device
  35. from apps.web.eventer.base import AckEvent, WorkEvent
  36. from apps.web.eventer import EventBuilder
  37. from apps.web.user.models import ConsumeRecord, MyUser, Card, ServiceProgress
  38. from apps.web.south_intf.yuchuanApi import YuChuanApi
  39. from apps.web.user.utils import freeze_user_balance, clear_frozen_user_balance
  40. from apps.web.utils import set_start_key_status
  41. if TYPE_CHECKING:
  42. pass
  43. logger = logging.getLogger(__name__)
  44. # 学校服务器地址
  45. # url = 'http://119.136.18.14:9002'
  46. # termId = '2'
  47. # 测试服务器地址
  48. # url = 'http://120.237.110.50:7622',
  49. # termId = '104',
  50. # 配置
  51. config_dic = {
  52. # 接口
  53. 'url': 'http://119.136.18.14:9002',
  54. 'systemName': 'schooladmin',
  55. 'termId': '101',
  56. 'bagCode': "1",
  57. 'roomNumb': '12345',
  58. # 中间库
  59. 'HOST': "119.136.18.14",
  60. 'PART': '1433',
  61. 'USER': 'yczn',
  62. 'PASSWORD': 'yczn123',
  63. 'DATABASE': 'q1_base',
  64. 'TABLE': 'USER_CARDINFO'
  65. }
  66. class Tools:
  67. @staticmethod
  68. def pass_same_event(exp = 30):
  69. def warpper(func):
  70. @wraps(func)
  71. def inner(self, *args, **kwargs):
  72. if serviceCache.get("yc_%s" % self.event_data['IMEI']):
  73. logger.info('\33[36m caiyi_相同查询事件上报 pass掉 \33[0m')
  74. return
  75. serviceCache.set("yc_%s" % self.event_data['IMEI'], 1, exp)
  76. try:
  77. func(self, *args, **kwargs)
  78. except Exception as e:
  79. pass
  80. serviceCache.delete("yc_%s" % self.event_data['IMEI'])
  81. return inner
  82. return warpper
  83. class builder(EventBuilder):
  84. def __getEvent__(self, device_event):
  85. if 'funCode' in device_event and device_event['funCode'] == '11':
  86. return MyWorkEvent(self.deviceAdapter, device_event)
  87. else:
  88. if 'order_id' in device_event:
  89. if device_event['order_type'] in ['com_start', 'pulse_start']:
  90. return MyNetPayAckEvent(self.deviceAdapter, device_event)
  91. if device_event['order_type'] == 'id_start':
  92. return MyIDStartAckEvent(self.deviceAdapter, device_event)
  93. class MyWorkEvent(WorkEvent):
  94. # 保存用户到本地,没有余额信息(用card_no占用openId字段为用户以后扫码用户绑定时候直接获取根据卡号获取卡对象)
  95. def save_user_info(self, user_info, devNo):
  96. """
  97. user_info = {"factoryFixId": 2869422464,
  98. "subsidyValue": 0,
  99. "bankCardNo": "",
  100. "creditValue": 11.27,
  101. "orderingValue": 0,
  102. "mainDeputyType": 1,
  103. "cardNo": 101,
  104. "userXm": "测试人员",
  105. "userNumb": "1001",
  106. "userId": "20050814503501000008",
  107. "consumeValue": 11.27,
  108. "cardStatusId": 1,
  109. "waterValue": 10.0,
  110. "cardId": "20050814505701000002",
  111. "repastValue": 0,
  112. "identityNumber": "",
  113. "xfPwd": "MTIzNDU2"}
  114. """
  115. cardNo = str(user_info.get('factoryFixId')) # 物理卡号
  116. cardName = str(user_info.get('userXm')) # 持卡人
  117. campus_user_num = str(user_info.get('userNumb'))
  118. # 存入本地服务器
  119. card = Card(cardNo = cardNo, cardName = cardName, remarks = '宇川一卡通', agentId = self.dealer.agentId,
  120. openId = cardNo, dealerId = str(self.dealer.id), cardType = 'YuChuanCard', isHaveBalance=False, devNo=devNo, nickName=campus_user_num)
  121. card.attachParas = {'campus_user_num': campus_user_num, 'card_dealer': "宇川智能平台一卡通", "unpay_order": []}
  122. card.save()
  123. def get_old_order_by_card(self, card):
  124. # type:# (object)->Card
  125. dealers = Dealer.objects.filter(agentId = self.dealer.agentId)
  126. dealers = [str(dealer.id) for dealer in dealers]
  127. orders = ConsumeRecord.objects.filter(Q(openId__in = [card.openId, card.cardNo]) &
  128. Q(ownerId__in = dealers),
  129. remarks = 'YuChuanCard',
  130. isNormal = False,
  131. status__ne = 'finished')
  132. return orders
  133. # @Tools.pass_same_event(5)
  134. def do(self, **args):
  135. yc = YuChuanApi(**config_dic)
  136. factoryFixId_hex = str(self.event_data['cardNo'])
  137. factoryFixId = str(int(reverse_hex(factoryFixId_hex), 16))
  138. devNo = str(self.event_data.get('IMEI'))
  139. logger.info(
  140. '\33[32m caiyi_received data:{},cardNo:{} \33[0m'.format(json.dumps(self.event_data, ), factoryFixId))
  141. # 测试白名单
  142. test_card = serviceCache.get('yc-card-{}'.format(factoryFixId_hex)) or TempValues.get('yc-card-{}'.format(factoryFixId_hex)) or TempValues.get('yc-all-free')
  143. if test_card or factoryFixId_hex == '667ACCE5':
  144. factoryFixId = '757996699'
  145. logger.debug('\33[32m caiyi_yc_test_card,get_balance \33[0m')
  146. price = self.device['otherConf'].get('cardPrice', 1)
  147. packages = self.device['washConfig']
  148. package = max(packages.values(), key=lambda x: x["coins"])
  149. max_coin = int(package.get("coins"))
  150. each_coin = int(self.device['otherConf'].get('eachCoin', 1))
  151. payload = {'balance': RMB.yuan_to_fen(9999), 'price': int(price * 100), 'funCode': '11',
  152. 'cardNo': factoryFixId_hex,
  153. 'max_coin': max_coin, "driver_type": "aux_uart", "time": 15,'each_coin':each_coin}
  154. MessageSender.send_no_wait(self.device, DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, payload)
  155. logger.info("\33[32m caiyi_yc_test_card_payload:%s\33[0m" % json.dumps(payload, ensure_ascii = False,
  156. encoding = 'utf-8'))
  157. return
  158. balance = RMB(0)
  159. payload = serviceCache.get('yuchuanyikatong_%s_%s' % (self.device.logicalCode, factoryFixId))
  160. if payload:
  161. MessageSender.send_no_wait(self.device, DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, payload)
  162. logger.info(
  163. '\33[32m caiyi_payload(cache):%s\33[0m' % json.dumps(payload, encoding = 'utf-8', ensure_ascii = False))
  164. return
  165. # TODO 清理订单,请求余额
  166. # 先查询卡,如果没有就创建卡信息
  167. card = Card.objects.filter(cardNo = factoryFixId, cardType = 'YuChuanCard').first()
  168. if not card:
  169. user_info = yc.get_user_info(factoryFixId)
  170. # 如果宇川服务器上找不到卡号,则卡不对,不返回数据
  171. if not user_info:
  172. logger.info(
  173. '\33[32m caiyi_The server did not find the user cardNo:{},cardNo_hex:{}\33[0m'.format(factoryFixId,
  174. factoryFixId_hex))
  175. return
  176. self.save_user_info(user_info,devNo)
  177. balance = RMB(user_info.get('consumeValue'))
  178. else: # 如果有卡,查看之前的订单是否都付款成功
  179. unpay_ordet_list = card.attachParas.get('unpay_order')
  180. if unpay_ordet_list:
  181. unpay_orders = ConsumeRecord.objects.filter(id__in = unpay_ordet_list)
  182. userNumb = card.attachParas.get('campus_user_num')
  183. for order in unpay_orders:
  184. data = {
  185. "orderSerial": str(order.orderNo),
  186. "userNumb": str(userNumb),
  187. "factoryFixId": str(factoryFixId),
  188. "consumeValue": str(order.coin),
  189. }
  190. # 需要测试确认扣费成功
  191. res = yc.pay_order(**data)
  192. if not res:
  193. logger.info(
  194. '\33[32m caiyi_Clear order is error,cardNo:{},orderNo:{},msg:{} \33[0m'.format(factoryFixId,
  195. order.orderNo,
  196. json.dumps(
  197. res,
  198. encoding = 'utf-8',
  199. ensure_ascii = False)))
  200. continue
  201. if res['success'] == False:
  202. logger.info(
  203. "\33[32m caiyi_Clear order failed,cardNo:{},orderNo:{},msg:{} \33[0m".format(factoryFixId,
  204. order.orderNo,
  205. json.dumps(res,
  206. encoding = 'utf-8',
  207. ensure_ascii = False)))
  208. continue
  209. if res['success'] == True:
  210. order.paymentInfo['coins'] = str(order.coin)
  211. order.attachParas['coins'] = str(order.coin)
  212. order.attachParas['isCleanOrder'] = True
  213. order.status = 'finished'
  214. order.isNormal = True
  215. order.save()
  216. unpay_ordet_list.remove(str(order.id))
  217. balance = res.get('data').get('consumeValue')
  218. logger.info(
  219. "\33[32m caiyi_Clear order success,coin:{},orderNo:{},msg:{}\33[0m".format(order.coin,
  220. order.orderNo,
  221. json.dumps(res,
  222. encoding = 'utf-8',
  223. ensure_ascii = False)))
  224. # 将清单后的ordet_set塞回去
  225. card.attachParas['unpay_order'] = unpay_ordet_list
  226. card.save()
  227. # 如果还有未付订单,直接返回,无法使用!!!
  228. if unpay_ordet_list:
  229. logger.info(
  230. '\33[32m caiyi_card has unpay orders!!!,cardNo:{},cardNo_hex:{} \33[0m'.format(factoryFixId,
  231. factoryFixId))
  232. return
  233. # 没有欠费订单
  234. else:
  235. user_info = yc.get_user_info(factoryFixId)
  236. if not user_info:
  237. logger.info(
  238. '\33[32m caiyi_The YC_server did not find the user cardNo:{},cardNo_hex:{}\33[0m'.format(
  239. factoryFixId,
  240. factoryFixId_hex))
  241. return
  242. # 更新一次用户学号信息,方便查询
  243. card.attachParas['campus_user_num'] = user_info.get('userNumb')
  244. card.save()
  245. balance = RMB(user_info.get('consumeValue'))
  246. # 添加自定义定价
  247. price = int(self.device['otherConf'].get('cardPrice', 1))
  248. # 判断balance 是否能完成本次支付
  249. if balance < RMB(price):
  250. logger.info(
  251. '\33[32m caiyi_The balance on the card is insufficient. balance:{} price:{} cardNo:{},cardNo_heX:{} \33[0m'.format(
  252. balance, price, factoryFixId, factoryFixId_hex))
  253. return
  254. packages = self.device['washConfig']
  255. package = max(packages.values(), key = lambda x: x["coins"])
  256. max_coin = int(package.get("coins"))
  257. time_value = int(package.get("time", 15))
  258. # 下多少个脉冲
  259. each_coin = int(self.device['otherConf'].get('eachCoin', 1))
  260. payload = {'balance': RMB.yuan_to_fen(balance), 'price': price * 100, 'funCode': '11',
  261. 'cardNo': factoryFixId_hex,
  262. 'max_coin': int(max_coin), "driver_type": "aux_uart", "time": time_value,'each_coin':each_coin}
  263. serviceCache.set('yuchuanyikatong_%s_%s' % (self.device.logicalCode, factoryFixId), payload, 10)
  264. MessageSender.send_no_wait(self.device, DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, payload)
  265. logger.info("\33[32m caiyi_payload:%s\33[0m" % json.dumps(payload, ensure_ascii = False, encoding = 'utf-8'))
  266. class MyIDStartAckEvent(AckEvent):
  267. def get_or_create_order(self, order_id, card_no, rmb, campus_user_num):
  268. packages = self.device['washConfig']
  269. package = packages.get(str(int(rmb)), {})
  270. order = ConsumeRecord.objects(startKey = str(order_id)).first()
  271. if not order:
  272. attach_paras = {
  273. 'card_dealer': 'YuChuanCard',
  274. 'coins': str(VirtualCoin(0).mongo_amount),
  275. 'card_no': card_no,
  276. 'campus_user_num': campus_user_num,
  277. 'isCleanOrder': False}
  278. order = ConsumeRecord.new_one(
  279. order_no = ConsumeRecord.make_no(card_no, UserConsumeSubType.CARD),
  280. user = MyUser(openId = card_no, nickname = ''),
  281. device = self.device,
  282. group = Group.get_group(self.device.groupId),
  283. package = package, # TODO: 补充刷卡单价配置
  284. attach_paras = attach_paras,
  285. pay_info = {
  286. 'via': 'card',
  287. 'coins': str(VirtualCoin(0).mongo_amount),
  288. },
  289. start_key = str(order_id))
  290. order.coin = rmb
  291. order.remarks = 'YuChuanCard'
  292. order.save()
  293. return order
  294. def do_impl(self, **args):
  295. yc = YuChuanApi(**config_dic)
  296. order_id = self.event_data['order_id']
  297. card_no_hex = str(self.event_data['cardNo'])
  298. card_no = str(int(reverse_hex(card_no_hex), 16))
  299. devNo = str(self.event_data.get('IMEI'))
  300. logger.info('\33[32m caiyi_received data:{},cardNo:{} \33[0m'.format(json.dumps(self.event_data, ), card_no))
  301. # 测试白名单
  302. test_card = serviceCache.get('yc-card-{}'.format(card_no_hex)) or TempValues.get('yc-card-{}'.format(card_no_hex)) or TempValues.get('yc-all-free')
  303. if test_card:
  304. card_no = '757996699'
  305. logger.debug('caiyi_yc_test_card,to pay!!!')
  306. status = self.event_data['status']
  307. duration = self.event_data.get('duration', 0) # 本次的使用时间,单位秒 只有状态为running的时候才会出现
  308. rmb = RMB.fen_to_yuan(self.event_data['fee']) # 扣除的费用, 分为单位,转换为元
  309. # 查询该卡片
  310. card = Card.objects.filter(cardNo = card_no, cardType = 'YuChuanCard').first()
  311. if not card:
  312. # 先查询余额的时候已经做过卡片核实,如果查不到该卡片,说明有问题
  313. logger.info('\33[32m caiyi_The server did not find the user cardNo:{},cardNo_hex:{}\33[0m'.format(card_no,
  314. card_no_hex))
  315. raise ServiceException({'result': 2, 'description': u"没有找到该卡片"})
  316. userNumb = card.attachParas.get('campus_user_num')
  317. # 同一卡号20分钟以内的running单直接返回不处理
  318. last_order = serviceCache.get('yc-dev<{}>-card<{}>'.format(self.device.devNo, card_no_hex))
  319. if last_order:
  320. logger.info('\33[32m Repeat orders within 20 minutes \33[0m')
  321. return
  322. order = self.get_or_create_order(order_id, card_no, rmb, userNumb) # type: ConsumeRecord
  323. order.servicedInfo = {
  324. 'CODE': '{}-{}'.format(card.cardName, userNumb),
  325. 'cardNo': card_no
  326. }
  327. # 卡片记录最后一次刷卡的设备
  328. card.devNo = devNo
  329. card.save()
  330. # 白名单订单处理
  331. if test_card:
  332. order.paymentInfo.update({'coins': str(rmb), 'duration': str(int(duration) / 60)})
  333. order.attachParas.update({'coins': str(rmb)})
  334. order.status = self.event_data['status']
  335. order.isNormal = True
  336. order.save()
  337. logger.info(
  338. "\33[32m caiyi_order is finished, cardNo:%s,cardNo_hex:%s,coin:%s,orderNo:%s,\33[0m" % (
  339. card_no, card_no_hex, order.coin, order.orderNo,))
  340. return
  341. if status == 'running' or status == 'finished':
  342. if order.status == 'created':
  343. factoryFixId = str(card_no)
  344. pay_order_model = {
  345. "orderSerial": str(order.orderNo),
  346. "userNumb": userNumb,
  347. "factoryFixId": factoryFixId,
  348. "consumeValue": str(order.coin),
  349. }
  350. n = 0
  351. unpay_ordet_list = card.attachParas.get('unpay_order')
  352. while n < 5:
  353. res = yc.pay_order(**pay_order_model)
  354. if res['success'] == True:
  355. order.paymentInfo.update({'coins': str(rmb), 'duration': str(int(duration) / 60)})
  356. order.attachParas.update({'coins': str(rmb)})
  357. order.status = self.event_data['status']
  358. order.isNormal = True
  359. order.save()
  360. logger.info("\33[32m caiyi_order is finished, coin:%s,orderNo:%s,msg:%s\33[0m" % (
  361. order.coin, order.orderNo,
  362. json.dumps(res,
  363. encoding = 'utf-8',
  364. ensure_ascii = False)
  365. ))
  366. break
  367. if res['success'] == False:
  368. # 保存不在未支付的订单号
  369. if str(order.id) not in unpay_ordet_list:
  370. unpay_ordet_list.append(str(order.id))
  371. card.attachParas['unpay_order'] = unpay_ordet_list
  372. card.save()
  373. logger.info(
  374. "caiyi_pay order fail! devNo:%s,orderNo:%s,coin:%s,msg:%s" % (order.devNo, order.orderNo,
  375. order.coin,
  376. json.dumps(res,
  377. encoding = 'utf-8',
  378. ensure_ascii = False)))
  379. break
  380. time.sleep(1)
  381. n += 1
  382. if n == 5:
  383. unpay_ordet_list.append(str(order.id))
  384. card.attachParas['unpay_order'] = unpay_ordet_list
  385. card.save()
  386. logger.info(
  387. "\33[32m caiyi_pay order fail! cardNo:%s,orderNo:%s,coin:%s\33[0m" % (factoryFixId, order.coin,
  388. order.orderNo))
  389. elif order.status == 'running' or order.status == 'finished':
  390. # 如果该笔订单显示不正常(默认值为False)
  391. if order.isNormal is False:
  392. pay_order_model = {
  393. "orderSerial": str(order.orderNo),
  394. "userNumb": userNumb,
  395. "factoryFixId": str(card_no),
  396. "consumeValue": str(order.coin),
  397. }
  398. res = yc.pay_order(**pay_order_model)
  399. if res['success'] != True:
  400. logger.info("\33[32m caiyi_pay order fail! cardNo:%s,orderNo:%s,coin:%s,msg:%s\33[0m" % (
  401. card_no, order.coin,
  402. order.orderNo,
  403. json.dumps(res, encoding = 'utf-8', ensure_ascii = False)))
  404. return
  405. if res['success'] == True:
  406. order.paymentInfo.update({'coins': str(rmb), 'duration': str(int(duration) / 60)})
  407. order.attachParas.update({'coins': str(rmb)})
  408. order.status = self.event_data['status']
  409. order.isNormal = True
  410. order.save()
  411. logger.info("\33[32m caiyi_order is finished,cardNo:%s,coin:%s,orderNo:%s,msg:%s\33[0m" % (
  412. card_no, order.coin,
  413. order.orderNo,
  414. json.dumps(res, encoding = 'utf-8', ensure_ascii = False),))
  415. # 记录该设备 20分钟内 再次有付款单 不处理
  416. serviceCache.set('yc-dev<{}>-card<{}>'.format(self.device.devNo, card_no_hex), order.orderNo, 60 * 20)
  417. # 该笔订单已经扣费成功了,直接更改状态
  418. else:
  419. if order.status != 'finished':
  420. order.status = status
  421. order.finishedTime = datetime.datetime.now()
  422. order.save()
  423. logger.info("\33[32m caiyi_order is finished,cardNo:%s,coin:%s,orderNo:%s\33[0m" % (
  424. card_no, order.coin, order.orderNo,))
  425. serviceCache.delete('yuchuanyikatong_%s' % (card_no)) # 删除payload
  426. class MyNetPayAckEvent(AckEvent):
  427. def deal_running_event(self, order):
  428. # type: (ConsumeRecord)->None
  429. def do_running_order(order, result):
  430. # type: (ConsumeRecord, dict)->None
  431. if order.status in ['running', 'finished']:
  432. logger.debug('order<{}> no need to ack. this has done.'.format(repr(order)))
  433. return
  434. if order.status == 'timeout':
  435. freeze_user_balance(self.device, Group.get_group(order.groupId), order)
  436. err_desc = u'事件上报成功,补充扣款'
  437. else:
  438. err_desc = ''
  439. order.isNormal = True
  440. order.status = 'running'
  441. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  442. order.save()
  443. do_running_order(order, self.event_data)
  444. start_time = Arrow.fromdatetime(order.startTime, tzinfo = settings.TIME_ZONE)
  445. cache_info = {
  446. 'startTime': start_time.format('YYYY-MM-DD HH:mm:ss'),
  447. 'status': Const.DEV_WORK_STATUS_WORKING,
  448. 'openId': order.openId,
  449. 'orderNo': order.orderNo,
  450. 'coins': order.coin,
  451. 'money': order.money,
  452. 'estimatedTs': int(start_time.timestamp + order.my_package.estimated_duraion),
  453. 'unit': order.my_package.unit,
  454. 'needKind': order.my_package.need_kind,
  455. 'needValue': order.my_package.need_value
  456. }
  457. Device.update_dev_control_cache(self.device.devNo, cache_info)
  458. ServiceProgress.new_progress_for_order(order = order,
  459. device = self.device,
  460. cache_info = cache_info)
  461. def deal_finished_event(self, order):
  462. # type: (ConsumeRecord)->None
  463. def do_finished_order(order, result):
  464. if order.status == 'finished':
  465. logger.debug('order<{}> no need to ack. this has done.'.format(repr(order)))
  466. return
  467. if order.status == 'running':
  468. order.status = 'finished'
  469. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  470. order.save()
  471. else:
  472. if order.status == 'timeout':
  473. freeze_user_balance(self.device, Group.get_group(order.groupId), order)
  474. err_desc = u'事件上报成功,补充扣款'
  475. else:
  476. err_desc = ''
  477. order.isNormal = True
  478. order.status = 'finished'
  479. order.errorDesc = err_desc
  480. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  481. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  482. order.save()
  483. clear_frozen_user_balance(self.device, order, round(self.event_data['duration'] / 60.0, 2), 0, RMB(0))
  484. if order.status == 'finished':
  485. logger.debug('order<{}> has finished.'.format(repr(order)))
  486. return
  487. do_finished_order(order, self.event_data)
  488. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  489. if current_cache_info and order.orderNo == current_cache_info.get('orderNo', None):
  490. Device.invalid_device_control_cache(self.device.devNo)
  491. ServiceProgress.objects(open_id = order.openId,
  492. device_imei = self.device.devNo,
  493. port = int(order.used_port),
  494. consumeOrder__orderNo = order.orderNo).update_one(
  495. upsert = False,
  496. **{
  497. 'isFinished': True,
  498. 'finished_time': Arrow.fromdatetime(order.finishedTime, tzinfo = settings.TIME_ZONE).timestamp,
  499. 'expireAt': datetime.datetime.now()
  500. })
  501. def do_impl(self, **args):
  502. order_id = self.event_data['order_id']
  503. order = ConsumeRecord.objects(ownerId = self.device.ownerId,
  504. orderNo = order_id).first() # type: ConsumeRecord
  505. if not order:
  506. logger.debug('order<no={}> is not exist.'.format(self.event_data['order_id']))
  507. return
  508. if order.status == 'finished':
  509. logger.debug('order<{}> has been fished.'.format(repr(order)))
  510. return
  511. if self.event_data['rst'] == ErrorCode.DEVICE_CONN_FAIL:
  512. logger.error('order<{}> timeout.'.format(repr(order)))
  513. if order.status == 'created':
  514. order.isNormal = False
  515. order.status = 'timeout'
  516. order.errorDesc = DeviceErrorCodeDesc.get(ErrorCode.DEVICE_CONN_FAIL)
  517. set_start_key_status(start_key = order.startKey,
  518. state = START_DEVICE_STATUS.TIMEOUT,
  519. reason = cn(DeviceErrorCodeDesc.get(ErrorCode.DEVICE_CONN_FAIL)))
  520. return
  521. if self.event_data['rst'] != ErrorCode.DEVICE_SUCCESS:
  522. logger.error('order<{}> failure.'.format(repr(order)))
  523. if 'errorDesc' in self.event_data:
  524. error_desc = self.event_data['errorDesc']
  525. else:
  526. error_desc = DeviceErrorCodeDesc.get(self.event_data['rst'])
  527. order.status = 'finished'
  528. order.isNormal = False
  529. order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  530. order.errorDesc = error_desc
  531. order.save()
  532. set_start_key_status(start_key = order.startKey,
  533. state = START_DEVICE_STATUS.FAILURE,
  534. reason = cn(error_desc))
  535. return
  536. set_start_key_status(start_key = order.startKey,
  537. state = START_DEVICE_STATUS.FINISHED,
  538. order_id = str(order.id))
  539. if self.event_data['status'] == 'running':
  540. return self.deal_running_event(order)
  541. if self.event_data['status'] == 'finished':
  542. return self.deal_finished_event(order)