base.py 94 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import copy
  4. import datetime
  5. import logging
  6. import uuid
  7. from arrow import Arrow
  8. from bson.objectid import ObjectId
  9. from django.conf import settings
  10. from mongoengine.errors import DoesNotExist
  11. from mongoengine.errors import NotUniqueError
  12. from typing import Tuple, Optional, TYPE_CHECKING, Iterable
  13. from apilib.monetary import VirtualCoin, RMB, Ratio
  14. from apilib.utils_string import make_title_from_dict
  15. from apilib.utils_sys import memcache_lock
  16. from apps.web.agent.models import Agent
  17. from apps.web.south_intf.shangdong_platform import ShanDongNorther
  18. from apps.web.common.transaction import UserConsumeSubType
  19. from apps.web.constant import Const, FAULT_LEVEL, DeviceCmdCode, ErrorCode, DeviceErrorCodeDesc, \
  20. START_DEVICE_STATUS, DEALER_CONSUMPTION_AGG_KIND
  21. from apps.web.core.networking import MessageSender
  22. from apps.web.dealer.define import DEALER_INCOME_SOURCE
  23. from apps.web.dealer.models import Dealer
  24. from apps.web.device.models import Device, FaultRecord, Group, GroupDict, Part, DeviceType
  25. from apps.web.eventer import Event
  26. from apps.web.report.ledger import Ledger
  27. from apps.web.south_intf.platform import notify_event_to_north
  28. from apps.web.user.models import MyUser, RechargeRecord, CardConsumeRecord, CardRechargeRecord, ConsumeRecord, Card, \
  29. CardRechargeOrder, ServiceProgress
  30. from apps.web.user.models import RefundMoneyRecord
  31. from apps.web.user.transaction_deprecated import refund_money, refund_cash
  32. from apps.web.user.utils import freeze_user_balance
  33. from apps.web.utils import set_start_key_status
  34. if TYPE_CHECKING:
  35. from apps.web.core.adapter.base import SmartBox
  36. from apps.web.device.models import DeviceDict
  37. logger = logging.getLogger(__name__)
  38. class WorkEvent(Event):
  39. """
  40. 开始工作或者结束工作的事件
  41. """
  42. def __init__(self, smartBox, event_data):
  43. # type:(SmartBox,dict)->None
  44. super(WorkEvent, self).__init__(smartBox)
  45. self.event_data = event_data
  46. def do(self, **args):
  47. Device.update_dev_control_cache(self.device['devNo'], self.event_data)
  48. def get_managerialOpenId_by_openId(self, openId):
  49. try:
  50. user = MyUser.objects.filter(openId = openId, groupId = self.device.groupId).first()
  51. if not user:
  52. return None
  53. else:
  54. return user.managerialOpenId
  55. except Exception, e:
  56. logger.exception('get managerial openid error=%s' % e)
  57. return None
  58. # 记录充值记录数据。注意需要在卡的余额修改后,然后记录
  59. def record_refund_money_for_card(self, money, cardId, orderNo = None):
  60. card = Card.objects.get(id = cardId)
  61. groupId = self.device['groupId']
  62. group = Group.get_group(groupId)
  63. if group is None:
  64. return False
  65. # 增加充值记录
  66. payload = {
  67. 'orderNo': orderNo or str(uuid.uuid4()),
  68. 'devNo': self.device['devNo'],
  69. 'logicalCode': self.device['logicalCode'],
  70. 'devTypeName': self.device.devTypeName,
  71. 'ownerId': self.device['ownerId'],
  72. 'groupId': self.device['groupId'],
  73. 'groupName': group['groupName'],
  74. 'groupNumber': self.device['groupNumber'],
  75. 'address': group['address'],
  76. 'openId': card.openId,
  77. 'nickname': card.nickName,
  78. 'money': money,
  79. 'coins': VirtualCoin(money.amount),
  80. 'result': 'success',
  81. 'via': 'refund',
  82. 'attachParas': {'cardNo': card.cardNo}
  83. }
  84. record = RechargeRecord(**payload)
  85. try:
  86. record.save()
  87. except Exception, e:
  88. logger.error('cardNo = %s,save recharge record error=%s' % (card.cardNo, e))
  89. return False
  90. newRcd = CardRechargeRecord(
  91. orderNo = payload.get('orderNo'),
  92. cardNo=card.cardNo,
  93. cardId = str(card.id),
  94. openId = card.openId,
  95. ownerId = ObjectId(self.device['ownerId']),
  96. money = money,
  97. coins = money,
  98. preBalance = card.balance - money,
  99. balance = card.balance,
  100. devNo = self.device['devNo'],
  101. devTypeCode = self.device['devType']['code'],
  102. logicalCode = self.device['logicalCode'],
  103. groupId = self.device['groupId'],
  104. address = group['address'],
  105. groupNumber = self.device['groupNumber'],
  106. groupName = group['groupName'],
  107. status = 'success',
  108. rechargeType = 'netpay',
  109. remarks = u'退币'
  110. )
  111. try:
  112. newRcd.save()
  113. except Exception, e:
  114. logger.exception('save card consume rcd error=%s' % e)
  115. def refund_money_for_card(self, money, cardId, orderNo = None):
  116. # 添加金币
  117. try:
  118. Card.get_collection().update({'_id': ObjectId(cardId)}, {'$inc': {'balance': money.mongo_amount}})
  119. except Exception, e:
  120. logger.error('feedback coins error=%s' % e)
  121. return None
  122. self.record_refund_money_for_card(money, cardId, orderNo)
  123. try:
  124. card = Card.objects.get(id = cardId)
  125. card.lastMaxBalance = card.balance
  126. return card.save()
  127. except Exception, e:
  128. logger.info('save error=%s ' % e)
  129. # 用于比如刷卡回收余额的时候,还需要更新上次消费订单的消费数据
  130. def update_consume_record_refund_money_for_card(self, money, card):
  131. try:
  132. # 有可能出现一张卡刷使用多个端口,这个时候回收余额只能分到3个消费订单上,因为回收余额的时候,没有上报回收具体哪一个端口的,所以无法对应到端口
  133. cardRcds = CardConsumeRecord.objects.filter(openId = card.openId, cardId = str(card.id),
  134. result = 'success').order_by('-dateTimeAdded')
  135. for rcd in cardRcds:
  136. if rcd.servicedInfo.has_key('refundedMoney'):
  137. continue
  138. else:
  139. break
  140. rcd.servicedInfo.update({'refundedMoney': money.mongo_amount})
  141. rcd.save()
  142. if not rcd.linkedConsumeRcdOrderNo:
  143. return
  144. consumeRcd = ConsumeRecord.objects.get(orderNo = rcd.linkedConsumeRcdOrderNo)
  145. consumeRcd.servicedInfo.update({'refundedMoney': money.mongo_amount})
  146. consumeRcd.save()
  147. except Exception, e:
  148. logger.exception('update_consume_record_refund_money_for_card = %s,cardNo=%s' % (e, card.cardNo))
  149. # 通知充电完成
  150. def notify_charging_finished(self, managerialOpenId):
  151. if not managerialOpenId:
  152. return
  153. group = Group.get_group(self.device['groupId'])
  154. self.notify_user(managerialOpenId, 'service_complete', **{
  155. 'title': u'您订购的服务已经完成',
  156. 'service': u'充电服务(设备编号:%s, 地址:%s)' % (self.device['logicalCode'], group['address']),
  157. 'finishTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  158. 'remark': u'谢谢您的支持'
  159. })
  160. # ID/IC卡扣费。-1:异常错误,0:余额不足 1:正常
  161. def consume_money_for_card(self, card, money):
  162. # type:(Card, RMB)->Tuple[int, RMB]
  163. # 如果卡被解绑了或者挂失了,直接判断
  164. if card.openId == '' or card.frozen:
  165. return 0, RMB(0)
  166. if card.balance < money:
  167. card.balance = RMB(0)
  168. else:
  169. card.balance = (card.balance - money)
  170. try:
  171. card.save()
  172. except Exception as e:
  173. logger.exception(e)
  174. return -1, RMB(0)
  175. return 1, card.balance
  176. # 无值卡,通知扣费
  177. def notify_balance_has_consume_for_card(self, card, money, desc = u''):
  178. if not card or not card.managerialOpenId: # 有可能卡没有绑定,就不需要
  179. return
  180. self.notify_user(card.managerialOpenId, 'consume_notify', **{
  181. 'title': u'亲,您绑定的卡:%s(名称:%s),正在扣费。%s\\n' % (card.cardNo, card.cardName, desc),
  182. 'money': u'%s元\\n' % money,
  183. 'serviceType': u'刷卡消费\\n',
  184. 'finishTime': datetime.datetime.now().strftime(Const.DATETIME_FMT)
  185. })
  186. # 记录卡的消费
  187. def record_consume_for_card(self, card, money, desc = u'', servicedInfo = None, sid = None, attachParas = None):
  188. # type: (Card, RMB, unicode, Optional[dict], Optional[str], Optional[dict])->Tuple[str, str]
  189. servicedInfo = {} if servicedInfo is None else servicedInfo
  190. attachParas = {} if attachParas is None else attachParas
  191. group = Group.get_group(self.device['groupId'])
  192. address = group['address']
  193. group_number = self.device['groupNumber']
  194. # TODO 这里应该与其他插入消费记录的地方一样,运用model 同时应该取消time字段与其他model一样用dateTimeAdded
  195. now = datetime.datetime.now()
  196. new_record = {
  197. 'orderNo': ConsumeRecord.make_no(card.cardNo, UserConsumeSubType.CARD),
  198. 'time': now.strftime("%Y-%m-%d %H:%M:%S"),
  199. 'dateTimeAdded': now,
  200. 'openId': card.openId,
  201. 'ownerId': self.device['ownerId'],
  202. 'coin': money.mongo_amount,
  203. 'money': money.mongo_amount,
  204. 'devNo': self.device['devNo'],
  205. 'logicalCode': self.device['logicalCode'],
  206. 'groupId': self.device['groupId'],
  207. 'address': address,
  208. 'groupNumber': group_number,
  209. 'groupName': group['groupName'],
  210. 'devTypeCode': self.device.devTypeCode,
  211. 'devTypeName': self.device.devTypeName,
  212. 'isNormal': True,
  213. 'status': ConsumeRecord.Status.RUNNING,
  214. 'remarks': u'刷卡消费',
  215. 'errorDesc': '',
  216. 'sequanceNo': '',
  217. 'desc': desc,
  218. 'attachParas': attachParas,
  219. 'servicedInfo': servicedInfo
  220. }
  221. ConsumeRecord.get_collection().insert_one(new_record)
  222. # 刷卡消费也记录一条数据
  223. new_card_record = {
  224. 'orderNo': CardConsumeRecord.make_no(),
  225. 'openId': card.openId,
  226. 'cardId': str(card.id),
  227. 'money': money.mongo_amount,
  228. 'balance': card.balance.mongo_amount,
  229. 'devNo': self.device['devNo'],
  230. 'devType': self.device['devType']['name'],
  231. 'logicalCode': self.device['logicalCode'],
  232. 'groupId': self.device['groupId'],
  233. 'address': address,
  234. 'groupNumber': group_number,
  235. 'groupName': group['groupName'],
  236. 'result': 'success',
  237. 'remarks': u'刷卡消费',
  238. 'sequanceNo': '',
  239. 'dateTimeAdded': datetime.datetime.now(),
  240. 'desc': desc,
  241. 'servicedInfo': servicedInfo,
  242. 'linkedConsumeRcdOrderNo': str(new_record['orderNo'])
  243. }
  244. if sid is not None:
  245. new_card_record.update({'sid': sid})
  246. CardConsumeRecord.get_collection().insert_one(new_card_record)
  247. return new_record['orderNo'], new_card_record['orderNo']
  248. def record_consume_for_coin(self, money, desc=u'', servicedInfo=None, remarks=u'投币或者刷卡消费'):
  249. # type: (RMB, basestring, Optional[dict], basestring)->str
  250. """
  251. 记录硬币的消费
  252. :param money:
  253. :param desc:
  254. :param remarks:
  255. :param servicedInfo:
  256. :return:
  257. """
  258. servicedInfo = {} if servicedInfo is None else servicedInfo
  259. group = Group.get_group(self.device['groupId'])
  260. address = group['address']
  261. group_number = self.device['groupNumber']
  262. now = datetime.datetime.now()
  263. new_record = {
  264. 'orderNo': ConsumeRecord.make_no(self.device.logicalCode, UserConsumeSubType.COIN),
  265. 'dateTimeAdded': now,
  266. 'openId': '',
  267. 'ownerId': self.device['ownerId'],
  268. 'coin': money.mongo_amount,
  269. 'money': money.mongo_amount,
  270. 'devNo': self.device['devNo'],
  271. 'logicalCode': self.device['logicalCode'],
  272. 'groupId': self.device['groupId'],
  273. 'address': address,
  274. 'groupNumber': group_number,
  275. 'groupName': group['groupName'],
  276. 'devTypeCode': self.device.devTypeCode,
  277. 'devTypeName': self.device.devTypeName,
  278. 'isNormal': True,
  279. 'status': ConsumeRecord.Status.RUNNING,
  280. 'remarks': remarks,
  281. 'errorDesc': '',
  282. 'sequanceNo': '',
  283. 'desc': desc,
  284. 'attachParas': {},
  285. 'servicedInfo': servicedInfo
  286. }
  287. ConsumeRecord.get_collection().insert_one(new_record)
  288. return new_record['orderNo']
  289. def find_card_by_card_no(self, cardNo):
  290. dealer = Dealer.objects(id = self.device.ownerId).first() # type: Dealer
  291. if not dealer:
  292. logger.error('dealer is not found, dealer id = {}'.format(self.device.ownerId))
  293. return None
  294. agent = Agent.objects(id = dealer.agentId).first() # type: Agent
  295. if not agent:
  296. logger.error('agent is not found, agentId=%s' % dealer.agentId)
  297. return None
  298. group = Group.get_group(self.device.groupId) # type: GroupDict
  299. if not group:
  300. logger.error('group is not found, groupid=%s' % group.groupId)
  301. return None
  302. try:
  303. card = Card.objects.get(cardNo = cardNo, agentId = str(agent.id))
  304. except DoesNotExist:
  305. return None
  306. except Exception as e:
  307. logger.error(e)
  308. return None
  309. return card
  310. def update_card_dealer_and_type(self, cardNo, cardType='ID'):
  311. """
  312. :param cardNo:
  313. :param cardType:
  314. :return:
  315. """
  316. dealer = Dealer.objects(id=self.device.ownerId).first() # type: Dealer
  317. if not dealer:
  318. logger.error('dealer is not found, dealer id = {}'.format(self.device.ownerId))
  319. return None
  320. group = Group.get_group(self.device.groupId) # type: GroupDict
  321. if not group:
  322. logger.error('group is not found, groupId = %s' % group.groupId)
  323. return None
  324. try:
  325. card = Card.objects.get(cardNo=cardNo, dealerId=self.device.ownerId, openId__ne='')
  326. except DoesNotExist:
  327. logger.error('card is not exist, cardNo=%s' % cardNo)
  328. return None
  329. except Exception as e:
  330. logger.error(e)
  331. return None
  332. else:
  333. # 每次刷卡的时候 刷新一次devNo, 表示最后一次的卡刷的设备
  334. card.devNo = self.device.devNo
  335. # 如果沒有綁定信息, 直接綁定本次刷卡的信息. 后续不再更新
  336. if not card.dealerId or not card.groupId:
  337. card.dealerId = str(dealer.id)
  338. card.groupId = group.groupId
  339. card.cardType = cardType
  340. card.save()
  341. return card
  342. def recharge_id_card(self, card, rechargeType, order):
  343. # type: (Card, str, CardRechargeOrder)->None
  344. if not order or (order.money == RMB(0) and order.rechargeType != "sendCoin"):
  345. return
  346. status = Card.get_card_status(str(card.id))
  347. if status == 'busy':
  348. return
  349. # 锁机制
  350. card.__class__.set_card_status(str(card.id), 'busy')
  351. try:
  352. # 记录总额度的变化到充值订单中
  353. balance = card.balance + order.coins
  354. preBalance = card.balance
  355. # 更新充值订单,已经完成
  356. order.update_after_recharge_id_card(self.device, balance, preBalance)
  357. CardRechargeRecord.add_record(
  358. card=card,
  359. group=Group.get_group(order.groupId),
  360. order=order,
  361. device=self.device
  362. )
  363. # 刷新卡里面的余额
  364. card.recharge(order.chargeAmount, order.bestowAmount)
  365. card.account_recharge(order.rechargeOrder)
  366. except Exception as e:
  367. logger.exception(e)
  368. return
  369. finally:
  370. card.__class__.set_card_status(str(card.id), 'idle')
  371. def recharge_ic_card(self, card, preBalance, rechargeType, order, need_verify = True):
  372. # type:(Card, RMB, str, CardRechargeOrder, bool)->bool
  373. """
  374. # rechargeType有两种,一种是用直接覆写overwrite的方式,一种是append追加钱的方式。
  375. # 不同的的设备,充值的方式还不一样.注意:money是实际用户付的钱,coins是给用户充值的钱,比如付10块(money),充15(coins)。
  376. :param card:
  377. :param preBalance:
  378. :param rechargeType:
  379. :param order:
  380. :return:
  381. """
  382. if not order or order.coins == RMB(0):
  383. return False
  384. status = Card.get_card_status(str(card.id))
  385. if status == 'busy':
  386. return False
  387. Card.set_card_status(str(card.id), 'busy')
  388. try:
  389. # IC卡需要下发到设备,设备写卡,把余额打入卡中
  390. if rechargeType == 'overwrite':
  391. sendMoney = preBalance + order.coins
  392. else:
  393. sendMoney = order.coins
  394. # 先判断order最近一次充值是否OK. 满足三个条件才认为上次充值成功:
  395. # 1、操作时间已经超过三天
  396. # 2、操作结果是串口超时, 即result == 1
  397. # 3、当前余额大于最后一次充值操作的充值前余额
  398. if need_verify and len(order.operationLog) > 0:
  399. log = order.operationLog[-1]
  400. result = log['result']
  401. time_delta = (datetime.datetime.now() - log['time']).total_seconds()
  402. last_pre_balance = RMB(log['preBalance'])
  403. if (result == ErrorCode.DEVICE_CONN_FAIL or result == ErrorCode.BOARD_UART_TIMEOUT) \
  404. and (time_delta > 3 * 24 * 3600 or preBalance > last_pre_balance):
  405. logger.debug('{} recharge verify result is finished.'.format(repr(card)))
  406. order.update_after_recharge_ic_card(device = self.device,
  407. sendMoney = sendMoney,
  408. preBalance = preBalance,
  409. result = ErrorCode.SUCCESS,
  410. description = u'充值校验结束')
  411. CardRechargeRecord.add_record(
  412. card = card,
  413. group = Group.get_group(order.groupId),
  414. order = order,
  415. device = self.device)
  416. return False
  417. try:
  418. operation_result, balance = self.deviceAdapter.recharge_card(card.cardNo, sendMoney,
  419. orderNo = order.orderNo)
  420. order.update_after_recharge_ic_card(device = self.device,
  421. sendMoney = sendMoney,
  422. preBalance = preBalance,
  423. syncBalance = balance,
  424. result = operation_result['result'],
  425. description = operation_result['description'])
  426. if operation_result['result'] != ErrorCode.SUCCESS:
  427. return False
  428. if not balance:
  429. balance = preBalance + order.coins
  430. CardRechargeRecord.add_record(
  431. card = card,
  432. group = Group.get_group(order.groupId),
  433. order = order,
  434. device = self.device)
  435. # 刷新卡里面的余额
  436. card.balance = balance
  437. card.lastMaxBalance = balance
  438. card.save()
  439. return True
  440. except Exception as e:
  441. order.update_after_recharge_ic_card(device = self.device,
  442. sendMoney = sendMoney,
  443. preBalance = preBalance,
  444. syncBalance = balance,
  445. result = ErrorCode.EXCEPTION,
  446. description = e.message)
  447. return False
  448. except Exception as e:
  449. logger.exception(e)
  450. return False
  451. finally:
  452. Card.set_card_status(str(card.id), 'idle')
  453. def recharge_ic_card_realiable(self, card, preBalance, rechargeType, order, need_verify = True):
  454. # type:(Card, RMB, str, CardRechargeOrder, bool)->bool
  455. """
  456. # rechargeType有两种,一种是用直接覆写overwrite的方式,一种是append追加钱的方式。
  457. # 不同的的设备,充值的方式还不一样.注意:money是实际用户付的钱,coins是给用户充值的钱,比如付10块(money),充15(coins)。
  458. :param card:
  459. :param preBalance:
  460. :param rechargeType:
  461. :param order:
  462. :return:
  463. """
  464. if not order or order.coins == RMB(0):
  465. return False
  466. with memcache_lock('{}-{}-{}'.format(self.device.devNo, str(order.id), 'ic_recharge'), value = '1',
  467. expire = 300) as acquired:
  468. if not acquired:
  469. logger.debug('order<{}> is doing.'.format(repr(order)))
  470. return
  471. try:
  472. if order.status == 'finished':
  473. logger.debug('order<{}> has been finished.'.format(repr(order)))
  474. return
  475. # IC卡需要下发到设备,设备写卡,把余额打入卡中
  476. if rechargeType == 'overwrite':
  477. sendMoney = preBalance + order.coins
  478. else:
  479. sendMoney = order.coins
  480. if not order.processingLog:
  481. order.init_processing_log(device = self.device,
  482. sendMoney = sendMoney,
  483. preBalance = preBalance)
  484. order.save()
  485. return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
  486. log = copy.deepcopy(order.processingLog)
  487. result = log.get('result', None)
  488. if result == ErrorCode.IC_RECHARGE_FAIL:
  489. if result:
  490. order.operationLog.append(log)
  491. order.init_processing_log(device = self.device,
  492. sendMoney = sendMoney,
  493. preBalance = preBalance)
  494. order.save()
  495. return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
  496. else:
  497. # 先判断order最近一次充值是否OK. 满足三个条件才认为上次
  498. # 充值成功:
  499. # 1、操作时间已经超过三天
  500. # 2、操作结果是串口超时, 即result == 1
  501. # 3、当前余额大于最后一次充值操作的充值前余额
  502. time_delta = (datetime.datetime.now() - log['time']).total_seconds()
  503. last_pre_balance = RMB(log['preBalance'])
  504. if (time_delta > 3 * 24 * 3600 or preBalance > last_pre_balance):
  505. logger.debug('{} recharge verify result is finished.'.format(repr(card)))
  506. log.update({
  507. 'result': ErrorCode.SUCCESS,
  508. 'description': u'充值校验结束'
  509. })
  510. order.operationLog.append(log)
  511. order.processingLog = {}
  512. order.status = 'finished'
  513. order.save()
  514. CardRechargeRecord.add_record(
  515. card = card,
  516. group = Group.get_group(order.groupId),
  517. order = order,
  518. device = self.device)
  519. else:
  520. logger.debug('verify not recharge order<{}>'.format(repr(order)))
  521. order.operationLog.append(log)
  522. order.init_processing_log(device = self.device,
  523. sendMoney = sendMoney,
  524. preBalance = preBalance)
  525. order.save()
  526. return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
  527. except Exception as e:
  528. logger.exception(e)
  529. def update_card_recharge_for_success_event(self, orderId, balance):
  530. logger.info('update_card_recharge_for_success_event,orderId=%s' % orderId)
  531. order = CardRechargeOrder.objects.get(id = ObjectId(orderId))
  532. if order.status == 'finished': # double check,存在设备ack之前,已经发送了一条事件到路上了,所以需要检查
  533. return
  534. card = Card.objects.get(id = order.cardId)
  535. group = Group.get_group(self.device['groupId'])
  536. RechargeRecord.get_collection().update(
  537. {'_id': ObjectId(order.rechargeNo)},
  538. {'$set': {
  539. 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  540. 'groupId': self.device['groupId'],
  541. 'devNo': self.device['devNo'],
  542. 'logicalCode': self.device['logicalCode'],
  543. 'ownerId': self.device['ownerId'],
  544. 'groupName': group['groupName'],
  545. 'groupNumber': self.device['groupNumber'],
  546. 'address': group['address'],
  547. 'devType': self.device.get('devType', {}).get('name', ''),
  548. }},
  549. multi = True
  550. )
  551. #: 并且添加一条卡的成功充值的记录
  552. newRcd = CardRechargeRecord(
  553. cardId = str(card.id),
  554. cardNo = card.cardNo,
  555. openId = card.openId,
  556. ownerId = ObjectId(self.device['ownerId']),
  557. money = order.money,
  558. coins = order.coins,
  559. balance = balance,
  560. preBalance = card.balance,
  561. devNo = self.device['devNo'],
  562. devTypeCode = self.device['devType']['code'],
  563. logicalCode = self.device['logicalCode'],
  564. groupId = self.device['groupId'],
  565. address = group['address'],
  566. groupNumber = self.device['groupNumber'],
  567. groupName = group['groupName'],
  568. status = 'success',
  569. rechargeType = 'netpay'
  570. )
  571. try:
  572. newRcd.save()
  573. except Exception as e:
  574. logger.exception('save card consume rcd error=%s' % e)
  575. # 刷新卡里面的余额
  576. try:
  577. card.balance = balance
  578. card.lastMaxBalance = balance
  579. card.save()
  580. except Exception as e:
  581. logger.exception(e)
  582. try:
  583. order.status = 'finished'
  584. order.rechargeType = 'netpay'
  585. order.save()
  586. except Exception as e:
  587. logger.exception(e)
  588. # 更新收录有值卡
  589. def update_card_balance(self, card, balance):
  590. try:
  591. card.balance = balance
  592. card.save()
  593. except Exception as e:
  594. logger.exception(e)
  595. # 根据消耗的电量计算当前地址下的本次消耗电费
  596. def calc_elec_fee(self, spendElec):
  597. group = Group.objects.get(id = self.device['groupId'])
  598. return float(group.otherConf.get('elecFee')) * spendElec
  599. def calc_service_fee(self, fee, elec_charge, service_charge):
  600. elecFee = fee * Ratio(elec_charge) * Ratio(1 / float(elec_charge + service_charge))
  601. serviceFee = (fee - elecFee)
  602. return elecFee, serviceFee
  603. def time_ratio_pricing(self, value, leftTime, actualNeedTime):
  604. return value * Ratio(leftTime) * Ratio(1 / float(actualNeedTime))
  605. def get_backCoins(self, coins, leftTime, actualNeedTime):
  606. return self.time_ratio_pricing(value = coins, leftTime = leftTime, actualNeedTime = actualNeedTime)
  607. def get_backMoney(self, money, leftTime, actualNeedTime):
  608. return self.time_ratio_pricing(value = money, leftTime = leftTime, actualNeedTime = actualNeedTime)
  609. def generate_service_complete_title_by_devType(self, devTypeId, templateMap):
  610. try:
  611. devType = DeviceType.objects(id=devTypeId).first()
  612. if devType is None:
  613. return ''
  614. if devType.finishedMessageTemplateDict == {}:
  615. return ''
  616. for _ in templateMap.keys():
  617. if _ not in devType.finishedMessageTemplateDict['keys']:
  618. templateMap.pop(_)
  619. return devType.finishedMessageTemplateDict['template'].format(**templateMap)
  620. except Exception as e:
  621. logger.error(e)
  622. return ''
  623. def notify_user_service_complete(self, service_name, openid, port, address, reason, finished_time, url = None, extra = None):
  624. # type: (basestring, str, str, str, str, str, str, list)->None
  625. title_list = [
  626. {u'': u'{}结束,感谢您的使用!'.format(service_name)},
  627. {u'设备编号': self.device.logicalCode}
  628. ]
  629. if port:
  630. title_list.extend([{u'设备端口': port}, {u'设备地址': address}])
  631. else:
  632. title_list.extend(
  633. [{u'设备地址': address}])
  634. if extra:
  635. title_list.extend(extra)
  636. if reason:
  637. title_list.extend([{u'结束原因': reason}])
  638. dealer = self.device.owner # type: Dealer
  639. if dealer.showServicePhone:
  640. remark = u'客服联系电话:{}'.format(dealer.service_phone)
  641. else:
  642. remark = u'我们竭诚为您服务,有任何问题请联系客服!'
  643. if not finished_time:
  644. finished_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  645. agent = Agent.objects.filter(id=str(dealer.agentId)).first()
  646. if not agent:
  647. logger.warning('agent<id={}> is not exists.'.format(str(dealer.agentId)))
  648. return
  649. kw = {}
  650. # TODO: 微信订阅模板
  651. if agent.customizedUserSubGzhAllowable:
  652. if port:
  653. service = u"{}服务({}-端口{})".format(self.device.majorDeviceType, self.device.logicalCode, port)
  654. if len(service) > 20:
  655. service = u'{}服务({})'.format(self.device.majorDeviceType, self.device.logicalCode)
  656. if len(service) > 20:
  657. service = u'{}服务'.format(self.device.majorDeviceType)
  658. else:
  659. service = u'{}服务({})'.format(service_name, self.device.logicalCode)
  660. if len(service) > 20:
  661. service = u'{}服务'.format(self.device.majorDeviceType)
  662. kw = {
  663. 'service': service,
  664. 'finishTime': finished_time,
  665. 'remark': remark,
  666. }
  667. else:
  668. kw = {
  669. 'title': make_title_from_dict(title_list),
  670. 'service': u'{}服务'.format(service_name),
  671. 'finishTime': finished_time,
  672. 'remark': remark
  673. }
  674. kw.update({'url': url or settings.SERVER_END_BASE_SSL_URL + '/user/index.html?v=1.0.31#/user/consume'})
  675. self.notify_user(openid,'service_complete', **kw)
  676. def notify_to_sd_norther(self, portStr, consumeDict, platform=ShanDongNorther.platform.default): # type:(str, dict, int) -> None
  677. """
  678. 上报 订单结束信息到山东 省平台 或者济南静态平台(同一份协议 不同服务器)
  679. """
  680. try:
  681. logicalCode = self.device.logicalCode
  682. part = Part.objects.filter(logicalCode=logicalCode, partNo=portStr).first()
  683. dealer = self.device.owner
  684. if not part or not dealer:
  685. raise Exception("no part or no dealer")
  686. # 主动上报 能找到唯一的norther
  687. norther = ShanDongNorther.get_norther(dealerId=self.device.ownerId, platform=platform).first() # type: ShanDongNorther
  688. if not norther:
  689. return
  690. northerDict = {
  691. "connectorId": str(part.id),
  692. "startTime": consumeDict.get("startTime", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
  693. "endTime": consumeDict.get("finishedTime", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
  694. "totalPower": consumeDict.get("elec", 0),
  695. "totalElecMoney": consumeDict.get("spendMoney", 0),
  696. }
  697. logger.info("northerId:{}, devNo:{}, dealerId:{} norther to sd infoDict = {}".format(
  698. str(norther.id),
  699. self.device.devNo,
  700. self.device.ownerId,
  701. northerDict)
  702. )
  703. # TODO 这个地方建议不同的实现放在model 内部实现 对外保持norther的notify接口统一
  704. if norther.isSdPlatform:
  705. norther.notification_order_info(northerDict)
  706. else:
  707. norther.notification_order_info_jn(northerDict)
  708. except Exception as e:
  709. logger.info(e)
  710. def refund_net_pay(self, user, lineInfo, refundedMoney, refundCoins, consumeDict, is_cash):
  711. # type: (MyUser, dict, RMB, VirtualCoin, dict, bool)->None
  712. logger.debug(
  713. 'refund for net pay. user = {}, lineInfo = {}, refundedMoney = {}, refundCoins = {}, is_cash = {}'.format(
  714. repr(user), lineInfo, refundedMoney, refundCoins, is_cash))
  715. refund_recharge_ids = []
  716. if is_cash:
  717. if 'rechargeRcdId' in lineInfo:
  718. refund_recharge_ids.append(lineInfo['rechargeRcdId'])
  719. else:
  720. pay_info = lineInfo.get('payInfo', list())
  721. for item in pay_info:
  722. if 'rechargeRcdId' in item:
  723. refund_recharge_ids.append(item['rechargeRcdId'])
  724. if refundCoins <= VirtualCoin(0):
  725. # 只能退现金的情况
  726. consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: RMB(0).mongo_amount})
  727. if len(refund_recharge_ids) > 0:
  728. logger.info('need refund cash, ids = %s' % str(refund_recharge_ids))
  729. if refundedMoney <= RMB(0):
  730. consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: RMB(0).mongo_amount})
  731. return
  732. left_refund = refundedMoney
  733. all_refund = RMB(0)
  734. for recharge_record_id in refund_recharge_ids[::-1]:
  735. recharge_record = RechargeRecord.objects(id = str(recharge_record_id)).first() # type: RechargeRecord
  736. if not recharge_record:
  737. logger.error('not find recharge record {}'.format(recharge_record_id))
  738. continue
  739. if recharge_record.openId != lineInfo['openId']:
  740. logger.error('is not my record. {} != {}'.format(recharge_record.openId, lineInfo['openId']))
  741. continue
  742. this_refund = min(recharge_record.money, left_refund)
  743. if this_refund <= RMB(0):
  744. continue
  745. logger.debug('try to refund money {} for recharge record {}'.format(this_refund, repr(recharge_record)))
  746. try:
  747. all_refund += this_refund
  748. refund_order = refund_cash(
  749. recharge_record, this_refund, VirtualCoin(0), user = user) # type: RefundMoneyRecord
  750. if not refund_order:
  751. logger.error(
  752. 'refund money<{}> failure. recharge = {}'.format(this_refund, repr(recharge_record)))
  753. except Exception as e:
  754. logger.exception(e)
  755. left_refund = left_refund - this_refund
  756. if left_refund <= RMB(0):
  757. break
  758. if all_refund > RMB(0):
  759. consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: all_refund.mongo_amount})
  760. else:
  761. if refundCoins > VirtualCoin(0):
  762. consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_COINS: refundCoins.mongo_amount})
  763. refund_money(self.device, refundCoins, lineInfo['openId'])
  764. class FaultEvent(Event):
  765. def __init__(self, smartBox, event_data):
  766. # type:(SmartBox,dict)->None
  767. super(FaultEvent, self).__init__(smartBox)
  768. self.event_data = event_data
  769. def is_notify_dealer(self):
  770. if self.dealer:
  771. return self.dealer.devFaultPushDealerSwitch
  772. else:
  773. return False
  774. def is_notify_user(self):
  775. if self.dealer:
  776. return self.dealer.devFaultPushUserSwitch
  777. else:
  778. return False
  779. def record(self, faultCode = '', description = None, title = '', detail = None, level = FAULT_LEVEL.NORMAL, portNo = 0):
  780. # 故障记录入库
  781. if self.dealer is not None:
  782. dealerId = self.dealer.id
  783. else:
  784. dealerId = None
  785. logicalCode = self.device['logicalCode']
  786. now = datetime.datetime.now()
  787. created_within_five_minutes = now - datetime.timedelta(minutes = 5)
  788. # 这个应该是防止重复报
  789. try:
  790. fault_record = FaultRecord.objects(
  791. status="init",
  792. faultCode = faultCode,
  793. title = title,
  794. description = description,
  795. logicalCode = logicalCode,
  796. portNo = portNo,
  797. createdTime__gt = created_within_five_minutes
  798. ).first()
  799. if not fault_record:
  800. group = Group.get_group(self.device['groupId'])
  801. fault_record = FaultRecord(
  802. logicalCode = logicalCode,
  803. imei = self.device['devNo'],
  804. portNo = portNo,
  805. faultCode = faultCode,
  806. title = title,
  807. description = description or self.event_data.get('statusInfo', ''),
  808. groupName = group.get('groupName', ''),
  809. address = group.get('address', '')
  810. )
  811. if dealerId is not None:
  812. fault_record.dealerId = dealerId
  813. if detail is not None:
  814. if not isinstance(detail, dict):
  815. raise TypeError('detail has to be a dict')
  816. fault_record.detail = detail
  817. fault_record = fault_record.save()
  818. else:
  819. logger.debug('%r existed within 5 minutes' % (fault_record,))
  820. return fault_record
  821. except NotUniqueError as e:
  822. logger.error('cannot insert FaultRecord, dev=%s, error=%s' % (self.device['devNo'], e))
  823. def do(self, **args):
  824. """不要将所有的信息不加筛选的打入缓存"""
  825. group = self.device.group
  826. # 通知经销商
  827. if self.is_notify_dealer():
  828. titleDictList = [
  829. {u'告警名称': self.event_data.get("faultName", u"设备告警")},
  830. {u'设备编号': self.device["logicalCode"]},
  831. {u'地址名称': group["groupName"]}
  832. ]
  833. self.notify_dealer('device_fault', **{
  834. 'title': make_title_from_dict(titleDictList),
  835. 'device': u'%s号设备-%s端口\\n' % (
  836. self.device['groupNumber'], self.event_data.get('port')) if self.event_data.has_key(
  837. 'port') else u'%s号设备\\n' % self.device['groupNumber'],
  838. 'faultType': u'%s\\n' % self.event_data['faultType'] if self.event_data.has_key(
  839. 'faultType') else u'设备告警\\n',
  840. 'notifyTime': u'%s\\n' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  841. 'fault': u'%s(详细情况,建议您到管理后台的告警模块进行查看)\\n' % self.event_data['desc'] if self.event_data.has_key(
  842. 'desc') else self.event_data.get('statusInfo', ''),
  843. })
  844. # 通知用户
  845. if self.is_notify_user():
  846. if self.device.has_key('openId'):
  847. user = MyUser.objects(openId = self.device.get('openId'), groupId = self.device['groupId']).first()
  848. self.notify_user(user.managerialOpenId if user else '', 'device_fault',
  849. **{
  850. 'title': u'注意!注意!您正在使用的设备发生了故障,设备编码:%s' % self.device['logicalCode'],
  851. 'fault': self.event_data['statusInfo'],
  852. 'notifyTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  853. })
  854. # 写入故障记录
  855. faultRecord = self.record(
  856. faultCode = self.event_data.get('faultCode', ''),
  857. description = self.event_data.get('desc', None),
  858. title = self.event_data.get('faultName', ''),
  859. detail = self.event_data.get('detail', None),
  860. level = self.event_data.get('level', FAULT_LEVEL.NORMAL),
  861. portNo = self.event_data.get('port', 0)
  862. )
  863. # 梁溪的告警
  864. if faultRecord:
  865. self.north_to_liangxi(faultRecord)
  866. notify_event_to_north(
  867. self.dealer,
  868. self.device,
  869. level = Const.EVENT_NORMAL,
  870. desc = self.event_data['desc'] if self.event_data.has_key('desc') else self.event_data.get('statusInfo', '')
  871. )
  872. def north_to_liangxi(self, faultRecord):
  873. from taskmanager.mediator import task_caller
  874. task_caller(
  875. 'send_to_xf_falut',
  876. devNo=self.device.devNo,
  877. faultId=str(faultRecord.id)
  878. )
  879. def time_ratio_pricing(self, value, leftTime, actualNeedTime):
  880. return value * Ratio(leftTime) * Ratio(1 / float(actualNeedTime))
  881. def get_backCoins(self, coins, leftTime, actualNeedTime):
  882. return self.time_ratio_pricing(value = coins, leftTime = leftTime, actualNeedTime = actualNeedTime)
  883. def get_backMoney(self, money, leftTime, actualNeedTime):
  884. return self.time_ratio_pricing(value = money, leftTime = leftTime, actualNeedTime = actualNeedTime)
  885. class AckEventProcessorIntf(object):
  886. def pre_processing(self, device, event_data):
  887. # type:(DeviceDict, dict)->dict
  888. return event_data
  889. class AckEvent(WorkEvent):
  890. """
  891. 目前设计为不能重入. 如果执行有异常, 肯定是代码问题
  892. """
  893. def __init__(self, smartBox, event_data, pre_processor = None):
  894. # type:(SmartBox, dict, AckEventProcessorIntf)->None
  895. super(AckEvent, self).__init__(smartBox, event_data)
  896. self.pre_processor = pre_processor
  897. def ack_msg(self):
  898. if self.event_data['status'] == 'finishing':
  899. logger.debug('finishing status is not need to ack.')
  900. return
  901. payload = {
  902. 'order_id': self.event_data['order_id'],
  903. 'order_type': self.event_data['order_type'],
  904. 'status': self.event_data['status']
  905. }
  906. # if response:
  907. # payload.update(response)
  908. MessageSender.send_no_wait(device = self.device,
  909. cmd = DeviceCmdCode.EVENT_ACK,
  910. payload = payload)
  911. def do_impl(self, **args):
  912. raise Exception('must implement do_impl interface.')
  913. def do(self, **args):
  914. if 'order_id' not in self.event_data or 'order_type' not in self.event_data:
  915. logger.error('order id or type is null.')
  916. return
  917. order_id = self.event_data['order_id']
  918. order_type = self.event_data['order_type']
  919. with memcache_lock('{}-{}-{}'.format(self.device.devNo, order_id, order_type), value = '1',
  920. expire = 300) as acquired:
  921. if not acquired:
  922. logger.debug('ack message<{}-{}-{}> is duplicate.'.format(self.device.devNo, order_id, order_type))
  923. return
  924. try:
  925. self.do_impl(**args)
  926. except Exception as e:
  927. import traceback
  928. logger.warning(traceback.format_exc())
  929. if 'status' in self.event_data:
  930. self.ack_msg()
  931. class ComNetPayAckEvent(AckEvent):
  932. def post_before_start(self, order=None):
  933. pass
  934. def post_after_start(self, order=None):
  935. raise NotImplementedError('must implement post_after_start.')
  936. def post_before_finish(self, order=None):
  937. pass
  938. def post_after_finish(self, order=None):
  939. raise NotImplementedError('must implement post_after_finish.')
  940. def do_finished_event(self, order, sub_orders, merge_order_info):
  941. raise NotImplementedError('must implement do_finished_event.')
  942. def do_running_order(self, order, result):
  943. # type: (ConsumeRecord, dict)->None
  944. self.post_before_start(order)
  945. if order.status in ['running', 'finished']:
  946. logger.debug('order<{}> no need to ack. this has done.'.format(repr(order)))
  947. return
  948. if order.status == 'unknown':
  949. dealer = self.device.owner
  950. # 设备支持上报告扣款
  951. if 'device_callback_to_freeze_balance' in dealer.features:
  952. freeze_user_balance(self.device, Group.get_group(order.groupId), order)
  953. err_desc = u'设备启动成功,事件上报成功,补充扣款'
  954. else:
  955. raise Exception('Invalid operation!')
  956. else:
  957. set_start_key_status(start_key=order.startKey,
  958. state=START_DEVICE_STATUS.FINISHED,
  959. order_id=str(order.id))
  960. err_desc = ''
  961. if 'master' in result:
  962. order.association = {
  963. 'master': result['master']
  964. }
  965. order.servicedInfo.update({'masterOrderNo': result['master']})
  966. order.errorDesc = err_desc
  967. order.isNormal = True
  968. order.status = 'running'
  969. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  970. order.save()
  971. self.post_after_start(order=order)
  972. def deal_running_event(self, order):
  973. # type: (ConsumeRecord)->None
  974. if 'sub' in self.event_data:
  975. order.association = {
  976. 'sub': [item['order_id'] for item in self.event_data['sub']]
  977. }
  978. order.servicedInfo.update(
  979. {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))})
  980. order.save()
  981. self.do_running_order(order, self.event_data)
  982. sub_order_map = {}
  983. for item in self.event_data['sub']:
  984. item['master'] = self.event_data['order_id']
  985. item['sts'] = self.event_data['sts']
  986. sub_order_map[item['order_id']] = item
  987. sub_orders = [_ for _ in ConsumeRecord.objects.filter(orderNo__in = sub_order_map.keys())]
  988. for sub_order in sub_orders:
  989. self.do_running_order(sub_order, sub_order_map[sub_order.orderNo])
  990. else:
  991. self.do_running_order(order, self.event_data)
  992. sub_orders = []
  993. account_info = self.merge_order(order, sub_orders)
  994. cache_info = {
  995. 'startTime': Arrow.fromdatetime(order.startTime, tzinfo = settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
  996. 'status': Const.DEV_WORK_STATUS_WORKING,
  997. 'isStart': True,
  998. 'openId': order.openId,
  999. 'orderNo': order.orderNo,
  1000. 'port': order.used_port
  1001. }
  1002. if sub_orders:
  1003. cache_info.update({
  1004. 'subOrderNos': [_.orderNo for _ in sub_orders]
  1005. })
  1006. cache_info.update(account_info)
  1007. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1008. if order.used_port < Const.NO_PORT:
  1009. current_port_info = current_cache_info.get(str(order.used_port))
  1010. if not current_port_info or not current_cache_info.get("orderNo") or (order.orderNo == current_port_info.get('orderNo', None)) or current_port_info.get('status', 0) == 0:
  1011. Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
  1012. else:
  1013. if not current_cache_info or not current_cache_info.get("orderNo") (order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get('status', 0) == 0:
  1014. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1015. ServiceProgress.new_progress_for_order(order = order,
  1016. device = self.device,
  1017. cache_info = cache_info)
  1018. def do_finished_order(self, order, result):
  1019. self.post_before_finish(order=order)
  1020. if 'sub' in result:
  1021. order.association = {
  1022. 'sub': [item['order_id'] for item in self.event_data['sub']]
  1023. }
  1024. order.servicedInfo.update(
  1025. {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))})
  1026. elif 'master' in result:
  1027. order.association = {
  1028. 'master': result['master']
  1029. }
  1030. order.servicedInfo.update({'masterOrderNo': result['master']})
  1031. if order.status == 'running':
  1032. order.status = 'finished'
  1033. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  1034. order.save()
  1035. else:
  1036. if order.status == 'unknown':
  1037. dealer = self.device.owner
  1038. # 设备支持上报告扣款
  1039. if 'device_callback_to_freeze_balance' in dealer.features:
  1040. freeze_user_balance(self.device, Group.get_group(order.groupId), order)
  1041. err_desc = u'结束事件上报成功,补充扣款'
  1042. else:
  1043. raise Exception('Invalid operation!')
  1044. else:
  1045. # created状态
  1046. set_start_key_status(start_key=order.startKey,
  1047. state=START_DEVICE_STATUS.FINISHED,
  1048. order_id=str(order.id))
  1049. err_desc = ''
  1050. order.isNormal = True
  1051. order.status = 'finished'
  1052. order.errorDesc = err_desc
  1053. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  1054. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  1055. order.save()
  1056. self.post_after_finish(order=order)
  1057. def deal_finished_event(self, order):
  1058. # type: (ConsumeRecord)->None
  1059. if order.status == 'finished':
  1060. logger.debug('order<{}> has finished.'.format(repr(order)))
  1061. return
  1062. self.do_finished_order(order, self.event_data)
  1063. sub_orders = []
  1064. if 'sub' in self.event_data:
  1065. sub_order_map = {}
  1066. for item in self.event_data['sub']:
  1067. item['status'] = 'finished'
  1068. item['master'] = str(order.orderNo)
  1069. item['sts'] = self.event_data['sts']
  1070. item['fts'] = self.event_data['fts']
  1071. sub_order_map[item['order_id']] = item
  1072. sub_orders = [_ for _ in ConsumeRecord.objects.filter(orderNo__in = sub_order_map.keys())]
  1073. for sub_order in sub_orders:
  1074. self.do_finished_order(sub_order, sub_order_map[sub_order.orderNo])
  1075. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1076. if order.used_port != Const.NO_PORT:
  1077. current_port_info = current_cache_info.get(str(order.used_port))
  1078. if current_port_info and (order.orderNo == current_port_info.get('orderNo', None)):
  1079. Device.clear_port_control_cache(self.device.devNo, order.used_port)
  1080. else:
  1081. if current_cache_info and order.orderNo == current_cache_info.get('orderNo', None):
  1082. Device.invalid_device_control_cache(self.device.devNo)
  1083. ServiceProgress.objects(open_id = order.openId,
  1084. device_imei = self.device.devNo,
  1085. port = int(order.used_port),
  1086. consumeOrder__orderNo = order.orderNo).update_one(
  1087. upsert = False,
  1088. **{
  1089. 'isFinished': True,
  1090. 'finished_time': Arrow.fromdatetime(order.finishedTime, tzinfo = settings.TIME_ZONE).timestamp,
  1091. 'expireAt': datetime.datetime.now()
  1092. })
  1093. merge_order_info = self.merge_order(order, sub_orders)
  1094. self.do_finished_event(order, sub_orders, merge_order_info)
  1095. def merge_order(self, master_order, sub_orders):
  1096. # type:(ConsumeRecord, list)->dict
  1097. raise NotImplementedError('must implement merge_order.')
  1098. def do_impl(self, **args):
  1099. order_id = self.event_data['order_id']
  1100. order = ConsumeRecord.objects(ownerId = self.device.ownerId,
  1101. orderNo = order_id).first() # type: ConsumeRecord
  1102. if not order:
  1103. logger.debug('order<no={}> is not exist.'.format(self.event_data['order_id']))
  1104. return
  1105. if order.status in ['end', 'waitPay', 'finished']:
  1106. logger.debug('order<{}> has been finished.'.format(repr(order)))
  1107. return
  1108. if order.used_port != self.event_data['port']:
  1109. logger.error('port is not equal. {} != {}'.format(self.event_data['port'], order.used_port))
  1110. return
  1111. if self.pre_processor:
  1112. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1113. if self.event_data['status'] in ['running', 'finishing']:
  1114. return self.deal_running_event(order)
  1115. if self.event_data['status'] == 'finished':
  1116. return self.deal_finished_event(order)
  1117. class IcStartAckEvent(AckEvent):
  1118. def __init__(self, smartBox, event_data, pre_processor=None):
  1119. super(IcStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1120. if self.pre_processor:
  1121. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1122. self.card = self.update_card_dealer_and_type(self.event_data.get('cardNo'),
  1123. cardType=self.event_data.get('cardType', 'IC'))
  1124. def post_before_start(self, order=None):
  1125. pass
  1126. def post_after_start(self, order=None):
  1127. raise NotImplementedError('must implement post_after_start.')
  1128. def post_before_finish(self, order=None):
  1129. pass
  1130. def post_after_finish(self, order=None):
  1131. raise NotImplementedError('must implement post_after_finish.')
  1132. def checkout_order(self, order):
  1133. raise NotImplementedError('must implement checkout_order.')
  1134. def get_or_create_order(self):
  1135. # type:()-> tuple
  1136. def new_one(order_id):
  1137. # type:( str)->ConsumeRecord
  1138. if self.event_data['order_id'] == order_id:
  1139. fee = VirtualCoin(self.event_data['fee'])
  1140. else:
  1141. sub = self.event_data.get('sub', [])
  1142. fee = VirtualCoin(map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0])
  1143. order = ConsumeRecord.objects(startKey=order_id).first()
  1144. if not order:
  1145. attach_paras = {'chargeIndex': self.event_data['port']}
  1146. order = ConsumeRecord.new_one(
  1147. order_no=ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.CARD),
  1148. user=MyUser(openId=self.card.openId, nickname=self.card.nickName),
  1149. device=self.device,
  1150. group=Group.get_group(self.device.groupId),
  1151. package=self.event_data.get('package', {}),
  1152. attach_paras=attach_paras,
  1153. pay_info={
  1154. 'via': 'card',
  1155. 'coins': fee.mongo_amount
  1156. },
  1157. start_key=order_id)
  1158. order.servicedInfo = {'cardNo': self.event_data.get('cardNo'), 'chargeIndex': self.event_data['port']}
  1159. order.save()
  1160. card_order = CardConsumeRecord.objects(orderNo=order.orderNo).first()
  1161. if not card_order:
  1162. group = Group.get_group(self.device.groupId) # type:GroupDict
  1163. card_order = CardConsumeRecord(orderNo=order.orderNo,
  1164. openId=self.card.openId,
  1165. cardId=str(self.card.id),
  1166. money=fee.mongo_amount,
  1167. balance=self.card.balance.mongo_amount,
  1168. devNo=self.device.devNo,
  1169. devType=self.device.devTypeName,
  1170. logicalCode=self.device.logicalCode,
  1171. groupId=self.device.groupId,
  1172. address=group.address,
  1173. groupNumber=self.device.groupNumber,
  1174. groupName=group.groupName,
  1175. result='success',
  1176. remarks=u'刷卡消费',
  1177. dateTimeAdded=datetime.datetime.now(),
  1178. linkedConsumeRcdOrderNo=order.orderNo).save()
  1179. # 生成订单处理卡消费(预付费还是后付费)
  1180. try:
  1181. self.checkout_order(order)
  1182. except Exception as e:
  1183. order.isNormal = False
  1184. order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
  1185. order.save()
  1186. return order
  1187. def new_sub_order(order_id, master_order_id):
  1188. order = new_one(order_id)
  1189. order.isNormal = True
  1190. order.status = self.event_data['status']
  1191. order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1192. if 'fts' in self.event_data:
  1193. order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1194. if ('master' not in order.association) or (order.association['master'] != master_order_id):
  1195. order.association = {'master': master_order_id}
  1196. order.servicedInfo['masterOrderNo'] = master_order_id
  1197. order.save()
  1198. return order
  1199. master_order = new_one(self.event_data['order_id'])
  1200. master_order.save()
  1201. sub_orders = []
  1202. for item in self.event_data.get('sub', []):
  1203. sub_order = new_sub_order(item['order_id'], master_order.orderNo)
  1204. sub_orders.append(sub_order)
  1205. sub_order_id_list = [item.orderNo for item in sub_orders]
  1206. has_done = True
  1207. if master_order.status == ConsumeRecord.Status.FINISHED:
  1208. logger.debug('master order<{}> has been done.'.format(repr(master_order)))
  1209. has_done = True
  1210. if master_order.status == ConsumeRecord.Status.CREATED:
  1211. has_done = False
  1212. master_order.association = {'sub': sub_order_id_list}
  1213. if sub_order_id_list:
  1214. master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
  1215. else:
  1216. if master_order.status == ConsumeRecord.Status.RUNNING and \
  1217. self.event_data['status'] == ConsumeRecord.Status.FINISHED:
  1218. has_done = False
  1219. if set(master_order.association['sub']) != set(sub_order_id_list):
  1220. master_order.association = {'sub': sub_order_id_list}
  1221. if sub_order_id_list:
  1222. master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
  1223. has_done = False
  1224. master_order.isNormal = True
  1225. master_order.status = self.event_data['status']
  1226. if not master_order.startTime:
  1227. if 'sts' in self.event_data:
  1228. master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1229. else:
  1230. master_order.startTime = datetime.datetime.now()
  1231. if 'fts' in self.event_data:
  1232. master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1233. master_order.save()
  1234. return has_done, master_order, sub_orders
  1235. def merge_order(self, master_order, sub_orders):
  1236. # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
  1237. raise NotImplementedError('must implement merge_order.')
  1238. def do_finished_event(self, order, merge_order_info):
  1239. # type:(ConsumeRecord, dict)->None
  1240. raise NotImplementedError('must implement do_finished_event.')
  1241. def deal_running_order(self, master_order, sub_orders):
  1242. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1243. self.post_before_start(master_order)
  1244. try:
  1245. start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
  1246. cache_info = {
  1247. 'port': str(master_order.used_port),
  1248. 'cardId': str(self.card.id),
  1249. 'openId': self.card.openId,
  1250. 'startTime': start_time_str,
  1251. 'isStart': True,
  1252. 'status': Const.DEV_WORK_STATUS_WORKING,
  1253. 'orderNo': master_order.orderNo,
  1254. 'subOrderNos': [item.orderNo for item in sub_orders]
  1255. }
  1256. cache_info.update(self.merge_order(master_order, sub_orders))
  1257. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1258. if master_order.used_port < Const.NO_PORT:
  1259. current_port_info = current_cache_info.get(str(master_order.used_port), {})
  1260. if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo',
  1261. None) or current_port_info.get(
  1262. 'status', 0) == Const.DEV_WORK_STATUS_IDLE:
  1263. Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
  1264. else:
  1265. if start_time_str > current_port_info.get('startTime', '1970-01-01'):
  1266. Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
  1267. else:
  1268. pass
  1269. else:
  1270. if not current_cache_info or (
  1271. master_order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get(
  1272. 'status', 0) == Const.DEV_WORK_STATUS_IDLE:
  1273. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1274. else:
  1275. if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
  1276. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1277. else:
  1278. pass
  1279. ServiceProgress.new_progress_for_order(order=master_order,
  1280. device=self.device,
  1281. cache_info=cache_info)
  1282. finally:
  1283. self.post_after_start(order=master_order)
  1284. def deal_finished_order(self, master_order, sub_orders):
  1285. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1286. self.post_before_finish(master_order)
  1287. try:
  1288. self.do_finished_event(master_order, self.merge_order(master_order, sub_orders))
  1289. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1290. if master_order.used_port < Const.NO_PORT:
  1291. current_port_info = current_cache_info.get(str(master_order.used_port))
  1292. if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
  1293. Device.clear_port_control_cache(devNo=self.device.devNo, port=master_order.used_port)
  1294. else:
  1295. if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
  1296. Device.invalid_device_control_cache(self.device.devNo)
  1297. ServiceProgress.objects(open_id=self.card.openId,
  1298. device_imei=self.device.devNo,
  1299. port=int(master_order.used_port)).update_one(
  1300. upsert=False,
  1301. **{
  1302. 'isFinished': True,
  1303. 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
  1304. tzinfo=settings.TIME_ZONE).timestamp,
  1305. 'expireAt': datetime.datetime.now()
  1306. })
  1307. finally:
  1308. self.post_after_finish(order=master_order)
  1309. def do_impl(self, **args):
  1310. if 'port' not in self.event_data:
  1311. logger.warn('port is not exist.')
  1312. return
  1313. order_id = self.event_data['order_id']
  1314. order_type = self.event_data['order_type']
  1315. if not self.card or not self.card.openId or self.card.frozen:
  1316. logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
  1317. return
  1318. has_done, master_order, sub_orders = self.get_or_create_order()
  1319. if has_done:
  1320. logger.debug('order<{}> has been done.'.format(repr(master_order)))
  1321. return
  1322. if self.event_data['status'] == 'running':
  1323. self.deal_running_order(master_order, sub_orders)
  1324. if self.event_data['status'] == 'finished':
  1325. self.deal_finished_order(master_order, sub_orders)
  1326. class IcRechargeAckEvent(AckEvent):
  1327. def __init__(self, smartBox, event_data, pre_processor = None):
  1328. super(IcRechargeAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1329. if self.pre_processor:
  1330. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1331. def do_impl(self, **args):
  1332. order = CardRechargeOrder.objects(orderNo = self.event_data['order_id']).first() # type: CardRechargeOrder
  1333. if not order:
  1334. logger.error('order<orderNo={}> is not exist.'.format(self.event_data['order_id']))
  1335. return
  1336. if order.status == 'finished':
  1337. logger.debug('order<{}> has be done.'.format(repr(order)))
  1338. return
  1339. if not order.processingLog:
  1340. logger.error('order<{}> has no processing log.'.format(repr(order)))
  1341. return
  1342. cardNo = self.event_data['cardNo']
  1343. if str(cardNo) != order.cardNo:
  1344. logger.error('check card no failure. {} != {}'.format(cardNo, order.cardNo))
  1345. return
  1346. card = Card.objects(cardNo = order.cardNo).first()
  1347. if not card:
  1348. logger.error('not find card<{}>'.format(card.cardNo))
  1349. return
  1350. result = self.event_data['rst']
  1351. if result == ErrorCode.BOARD_UART_TIMEOUT:
  1352. order.processingLog['result'] = result
  1353. order.processingLog.update({
  1354. 'result': self.event_data['rst'],
  1355. 'description': DeviceErrorCodeDesc.get(result)
  1356. })
  1357. order.save()
  1358. else:
  1359. # 刷新卡里面的余额
  1360. balance = VirtualCoin(self.event_data.get('balance'))
  1361. if balance != card.balance:
  1362. card.balance = balance
  1363. card.lastMaxBalance = balance
  1364. card.save()
  1365. if result == ErrorCode.DEVICE_SUCCESS:
  1366. log = copy.deepcopy(order.processingLog)
  1367. log.update({
  1368. 'result': result,
  1369. 'description': DeviceErrorCodeDesc.get(result),
  1370. 'syncBalance': balance.mongo_amount
  1371. })
  1372. order.operationLog.append(log)
  1373. order.processingLog = {}
  1374. order.status = 'finished'
  1375. order.save()
  1376. CardRechargeRecord.add_record(
  1377. card = card,
  1378. group = Group.get_group(order.groupId),
  1379. order = order,
  1380. device = self.device)
  1381. else:
  1382. order.processingLog.update({
  1383. 'result': result,
  1384. 'description': DeviceErrorCodeDesc.get(result),
  1385. 'syncBalance': balance.mongo_amount
  1386. })
  1387. order.save()
  1388. class IdStartAckEvent(AckEvent):
  1389. def __init__(self, smartBox, event_data, pre_processor = None):
  1390. super(IdStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1391. if self.pre_processor:
  1392. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1393. self.card = self.update_card_dealer_and_type(self.event_data.get('cardNo'), cardType=self.event_data.get('cardType', 'ID')) # type: Card
  1394. def post_before_start(self, order=None):
  1395. pass
  1396. def post_after_start(self, order=None):
  1397. raise NotImplementedError('must implement post_after_start.')
  1398. def post_before_finish(self, order=None):
  1399. pass
  1400. def post_after_finish(self, order=None):
  1401. raise NotImplementedError('must implement post_after_finish.')
  1402. def checkout_order(self, order):
  1403. raise NotImplementedError('must implement checkout_order.')
  1404. def get_or_create_order(self):
  1405. # type:()-> tuple
  1406. def new_one(order_id):
  1407. # type:( str)->ConsumeRecord
  1408. if self.event_data['order_id'] == order_id:
  1409. fee = VirtualCoin(self.event_data['fee'])
  1410. else:
  1411. sub = self.event_data.get('sub',[])
  1412. fee = VirtualCoin(map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0])
  1413. order = ConsumeRecord.objects(startKey = order_id).first()
  1414. if not order:
  1415. attach_paras = {'chargeIndex': self.event_data['port']}
  1416. order = ConsumeRecord.new_one(
  1417. order_no = ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.CARD),
  1418. user = MyUser(openId = self.card.openId, nickname = self.card.nickName),
  1419. device = self.device,
  1420. group = Group.get_group(self.device.groupId),
  1421. package=self.event_data.get('package', {}),
  1422. attach_paras = attach_paras,
  1423. pay_info = {
  1424. 'via': 'card',
  1425. 'coins': fee.mongo_amount
  1426. },
  1427. start_key = order_id)
  1428. order.servicedInfo = {'cardNo': self.event_data.get('cardNo'), 'chargeIndex': self.event_data['port']}
  1429. order.save()
  1430. card_order = CardConsumeRecord.objects(orderNo = order.orderNo).first()
  1431. if not card_order:
  1432. group = Group.get_group(self.device.groupId) # type:GroupDict
  1433. card_order = CardConsumeRecord(orderNo = order.orderNo,
  1434. openId = self.card.openId,
  1435. cardId = str(self.card.id),
  1436. money = fee.mongo_amount,
  1437. balance = self.card.balance.mongo_amount,
  1438. devNo = self.device.devNo,
  1439. devType = self.device.devTypeName,
  1440. logicalCode = self.device.logicalCode,
  1441. groupId = self.device.groupId,
  1442. address = group.address,
  1443. groupNumber = self.device.groupNumber,
  1444. groupName = group.groupName,
  1445. result = 'success',
  1446. remarks = u'刷卡消费',
  1447. dateTimeAdded = datetime.datetime.now(),
  1448. linkedConsumeRcdOrderNo = order.orderNo).save()
  1449. # 生成订单处理卡消费(预付费还是后付费)
  1450. try:
  1451. self.checkout_order(order)
  1452. except Exception as e:
  1453. order.isNormal = False
  1454. order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
  1455. order.save()
  1456. return order
  1457. def new_sub_order(order_id, master_order_id):
  1458. order = new_one(order_id)
  1459. order.isNormal = True
  1460. order.status = self.event_data['status']
  1461. order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1462. if 'fts' in self.event_data:
  1463. order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1464. if ('master' not in order.association) or (order.association['master'] != master_order_id):
  1465. order.association = {'master': master_order_id}
  1466. order.servicedInfo['masterOrderNo'] = master_order_id
  1467. order.save()
  1468. return order
  1469. master_order = new_one(self.event_data['order_id'])
  1470. master_order.save()
  1471. sub_orders = []
  1472. for item in self.event_data.get('sub', []):
  1473. sub_order = new_sub_order(item['order_id'], master_order.orderNo)
  1474. sub_orders.append(sub_order)
  1475. sub_order_id_list = [item.orderNo for item in sub_orders]
  1476. has_done = True
  1477. if master_order.status == ConsumeRecord.Status.FINISHED:
  1478. logger.debug('master order<{}> has been done.'.format(repr(master_order)))
  1479. has_done = True
  1480. if master_order.status == ConsumeRecord.Status.CREATED:
  1481. has_done = False
  1482. master_order.association = {'sub': sub_order_id_list}
  1483. if sub_order_id_list:
  1484. master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
  1485. else:
  1486. if master_order.status == ConsumeRecord.Status.RUNNING and \
  1487. self.event_data['status'] == ConsumeRecord.Status.FINISHED:
  1488. has_done = False
  1489. if set(master_order.association['sub']) != set(sub_order_id_list):
  1490. master_order.association = {'sub': sub_order_id_list}
  1491. if sub_order_id_list:
  1492. master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
  1493. has_done = False
  1494. master_order.isNormal = True
  1495. master_order.status = self.event_data['status']
  1496. if not master_order.startTime:
  1497. if 'sts' in self.event_data:
  1498. master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1499. else:
  1500. master_order.startTime = datetime.datetime.now()
  1501. if 'fts' in self.event_data:
  1502. master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1503. master_order.save()
  1504. return has_done, master_order, sub_orders
  1505. def merge_order(self, master_order, sub_orders):
  1506. # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
  1507. raise NotImplementedError('must implement merge_order.')
  1508. def do_finished_event(self, order, merge_order_info):
  1509. # type:(ConsumeRecord, dict)->None
  1510. raise NotImplementedError('must implement do_finished_event.')
  1511. def deal_running_order(self, master_order, sub_orders):
  1512. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1513. self.post_before_start(master_order)
  1514. try:
  1515. start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
  1516. cache_info = {
  1517. 'port': str(master_order.used_port),
  1518. 'cardId': str(self.card.id),
  1519. 'openId': self.card.openId,
  1520. 'startTime': start_time_str,
  1521. 'isStart': True,
  1522. 'status': Const.DEV_WORK_STATUS_WORKING,
  1523. 'orderNo': master_order.orderNo,
  1524. 'subOrderNos': [item.orderNo for item in sub_orders]
  1525. }
  1526. cache_info.update(self.merge_order(master_order, sub_orders))
  1527. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1528. if master_order.used_port < Const.NO_PORT:
  1529. current_port_info = current_cache_info.get(str(master_order.used_port), {})
  1530. if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get('status', 0) == Const.DEV_WORK_STATUS_IDLE:
  1531. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1532. else:
  1533. if start_time_str > current_port_info.get('startTime', '1970-01-01'):
  1534. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1535. else:
  1536. pass
  1537. else:
  1538. if not current_cache_info or (master_order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get('status', 0) == Const.DEV_WORK_STATUS_IDLE:
  1539. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1540. else:
  1541. if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
  1542. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1543. else:
  1544. pass
  1545. ServiceProgress.new_progress_for_order(order=master_order,
  1546. device=self.device,
  1547. cache_info = cache_info)
  1548. finally:
  1549. self.post_after_start(order=master_order)
  1550. def deal_finished_order(self, master_order, sub_orders):
  1551. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1552. self.post_before_finish(master_order)
  1553. try:
  1554. self.do_finished_event(master_order, self.merge_order(master_order, sub_orders))
  1555. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1556. if master_order.used_port < Const.NO_PORT:
  1557. current_port_info = current_cache_info.get(str(master_order.used_port))
  1558. if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
  1559. Device.clear_port_control_cache(devNo = self.device.devNo, port = master_order.used_port)
  1560. else:
  1561. if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
  1562. Device.invalid_device_control_cache(self.device.devNo)
  1563. ServiceProgress.objects(open_id=self.card.openId,
  1564. device_imei=self.device.devNo,
  1565. port=int(master_order.used_port)).update_one(
  1566. upsert=False,
  1567. **{
  1568. 'isFinished': True,
  1569. 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
  1570. tzinfo=settings.TIME_ZONE).timestamp,
  1571. 'expireAt': datetime.datetime.now()
  1572. })
  1573. finally:
  1574. self.post_after_finish(order=master_order)
  1575. def do_impl(self, **args):
  1576. if 'port' not in self.event_data:
  1577. logger.warn('port is not exist.')
  1578. return
  1579. order_id = self.event_data['order_id']
  1580. order_type = self.event_data['order_type']
  1581. if not self.card or not self.card.openId or self.card.frozen:
  1582. logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
  1583. return
  1584. has_done, master_order, sub_orders = self.get_or_create_order()
  1585. if has_done:
  1586. logger.debug('order<{}> has been done.'.format(repr(master_order)))
  1587. return
  1588. if self.event_data['status'] == 'running':
  1589. self.deal_running_order(master_order, sub_orders)
  1590. if self.event_data['status'] == 'finished':
  1591. self.deal_finished_order(master_order, sub_orders)
  1592. class VirtualCardStartAckEvent(AckEvent):
  1593. def __init__(self, smartBox, event_data, pre_processor = None):
  1594. super(VirtualCardStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1595. if self.pre_processor:
  1596. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1597. self.card, self.virtualCard = self.get_virtual_card_by_card()
  1598. def get_virtual_card_by_card(self):
  1599. cardNo = self.event_data.get("cardNo")
  1600. card = Card.objects.filter(cardNo=cardNo).first()
  1601. if not card or not card.openId or float(card.balance) != 0:
  1602. logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
  1603. return
  1604. try:
  1605. dealer = Dealer.objects.get(id=card.dealerId)
  1606. agent = Agent.objects.get(id=dealer.agentId)
  1607. features = agent.features
  1608. except Exception as e:
  1609. features = []
  1610. return card, card.bound_virtual_card if "vCardNeedBind" in features else card.related_virtual_card
  1611. def post_after_start(self, order = None):
  1612. raise NotImplementedError('must implement post_after_start.')
  1613. def post_after_finish(self, order = None):
  1614. raise NotImplementedError('must implement post_after_finish.')
  1615. def checkout_order(self, order):
  1616. raise NotImplementedError('must implement checkout_order.')
  1617. def get_or_create_order(self):
  1618. # type:()-> tuple
  1619. def new_one(order_id):
  1620. # type:(str)->ConsumeRecord
  1621. if self.event_data['order_id'] == order_id: # 判断master_order还是sub
  1622. fee = VirtualCoin(self.event_data['fee'])
  1623. else:
  1624. sub = self.event_data.get('sub',[])
  1625. fee = map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0]
  1626. order = ConsumeRecord.objects(startKey = order_id).first()
  1627. if not order:
  1628. attach_paras = {'chargeIndex': self.event_data['port']}
  1629. order = ConsumeRecord.new_one(
  1630. order_no = ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.VIRTUAL_CARD),
  1631. user = MyUser(openId = self.card.openId, nickname = self.card.nickName),
  1632. device = self.device,
  1633. group = Group.get_group(self.device.groupId),
  1634. package = self.event_data.get('package',{}),
  1635. attach_paras=attach_paras,
  1636. pay_info={
  1637. 'via': 'virtualCard',
  1638. 'itemId': str(self.virtualCard.id),
  1639. 'money': fee.mongo_amount,
  1640. 'coins': fee.mongo_amount,
  1641. 'deduct': []
  1642. },
  1643. start_key=order_id)
  1644. order.servicedInfo = {'cardNo':self.event_data.get('cardNo'),'chargeIndex': self.event_data['port']}
  1645. order.save()
  1646. card_order = CardConsumeRecord.objects(orderNo = order.orderNo).first()
  1647. if not card_order:
  1648. group = Group.get_group(self.device.groupId) # type:GroupDict
  1649. card_order = CardConsumeRecord(orderNo = order.orderNo,
  1650. openId = self.card.openId,
  1651. cardId = str(self.card.id),
  1652. money = VirtualCoin(0).mongo_amount,
  1653. balance = self.card.balance.mongo_amount,
  1654. devNo = self.device.devNo,
  1655. devType = self.device.devTypeName,
  1656. logicalCode = self.device.logicalCode,
  1657. groupId = self.device.groupId,
  1658. address = group.address,
  1659. groupNumber = self.device.groupNumber,
  1660. groupName = group.groupName,
  1661. result = 'success',
  1662. remarks = u'虚拟卡消费',
  1663. dateTimeAdded = datetime.datetime.now(),
  1664. linkedConsumeRcdOrderNo = order.orderNo).save()
  1665. # 生成订单处理卡消费(预付费还是后付费)
  1666. try:
  1667. self.checkout_order(order)
  1668. except Exception as e:
  1669. order.isNormal = False
  1670. order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
  1671. order.save()
  1672. freeze_user_balance(self.device, group, order, self.virtualCard)
  1673. return order
  1674. def new_sub_order(order_id, master_order_id):
  1675. order = new_one(order_id)
  1676. order.isNormal = True
  1677. order.status = self.event_data['status']
  1678. order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1679. if 'fts' in self.event_data:
  1680. order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1681. if ('master' not in order.association) or (order.association['master'] != master_order_id):
  1682. order.association = {'master': master_order_id}
  1683. order.save()
  1684. return order
  1685. master_order = new_one(self.event_data['order_id'])
  1686. master_order.save()
  1687. sub_orders = []
  1688. for item in self.event_data.get('sub', []):
  1689. sub_order = new_sub_order(item['order_id'], master_order.orderNo)
  1690. sub_orders.append(sub_order)
  1691. sub_order_id_list = [item.orderNo for item in sub_orders]
  1692. has_done = True
  1693. if master_order.status == ConsumeRecord.Status.FINISHED:
  1694. logger.debug('master order<{}> has been done.'.format(repr(master_order)))
  1695. has_done = True
  1696. if master_order.status == ConsumeRecord.Status.CREATED:
  1697. has_done = False
  1698. master_order.association = {'sub': sub_order_id_list}
  1699. else:
  1700. if master_order.status == ConsumeRecord.Status.RUNNING and \
  1701. self.event_data['status'] == ConsumeRecord.Status.FINISHED:
  1702. has_done = False
  1703. if set(master_order.association['sub']) != set(sub_order_id_list):
  1704. master_order.association = {'sub': sub_order_id_list}
  1705. has_done = False
  1706. master_order.isNormal = True
  1707. master_order.status = self.event_data['status']
  1708. if not master_order.startTime:
  1709. if 'sts' in self.event_data:
  1710. master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1711. else:
  1712. master_order.startTime = datetime.datetime.now()
  1713. if 'fts' in self.event_data:
  1714. master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1715. master_order.save()
  1716. return has_done, master_order, sub_orders
  1717. def merge_order(self, master_order, sub_orders):
  1718. # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
  1719. raise NotImplementedError('must implement merge_order.')
  1720. def do_finished_event(self, order, sub_orders, merge_order_info):
  1721. # type:(ConsumeRecord,list, Iterable[ConsumeRecord])->None
  1722. raise NotImplementedError('must implement do_finished_event.')
  1723. def deal_running_order(self, master_order, sub_orders):
  1724. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1725. try:
  1726. start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
  1727. cache_info = {
  1728. 'port': str(master_order.used_port),
  1729. 'cardId': str(self.card.id),
  1730. 'openId': self.card.openId,
  1731. 'startTime': start_time_str,
  1732. 'isStart': True,
  1733. 'status': Const.DEV_WORK_STATUS_WORKING,
  1734. 'orderNo': master_order.orderNo,
  1735. 'subOrderNos': [item.orderNo for item in sub_orders]
  1736. }
  1737. cache_info.update(self.merge_order(master_order, sub_orders))
  1738. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1739. if master_order.used_port < Const.NO_PORT:
  1740. current_port_info = current_cache_info.get(str(master_order.used_port), {})
  1741. if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get('status', 0) == 0:
  1742. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1743. else:
  1744. if start_time_str > current_port_info.get('startTime', '1970-01-01'):
  1745. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1746. else:
  1747. pass
  1748. else:
  1749. if not current_cache_info:
  1750. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1751. else:
  1752. if master_order.orderNo == current_cache_info.get('orderNo', ''):
  1753. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1754. else:
  1755. if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
  1756. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1757. else:
  1758. pass
  1759. ServiceProgress.new_progress_for_order(order = master_order,
  1760. device = self.device,
  1761. cache_info = cache_info)
  1762. finally:
  1763. self.post_after_start(order=master_order)
  1764. def deal_finished_order(self, master_order, sub_orders):
  1765. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1766. try:
  1767. self.do_finished_event(master_order, sub_orders, self.merge_order(master_order, sub_orders))
  1768. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1769. if master_order.used_port < Const.NO_PORT:
  1770. current_port_info = current_cache_info.get(str(master_order.used_port))
  1771. if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
  1772. Device.clear_port_control_cache(devNo = self.device.devNo, port = master_order.used_port)
  1773. else:
  1774. if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
  1775. Device.invalid_device_control_cache(self.device.devNo)
  1776. ServiceProgress.objects(open_id = self.card.openId,
  1777. device_imei = self.device.devNo,
  1778. port = int(master_order.used_port),
  1779. consumeOrder__orderNo = master_order.orderNo).update_one(
  1780. upsert = False,
  1781. **{
  1782. 'isFinished': True,
  1783. 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
  1784. tzinfo = settings.TIME_ZONE).timestamp,
  1785. 'expireAt': datetime.datetime.now()
  1786. })
  1787. finally:
  1788. self.post_after_finish(order=master_order)
  1789. def do_impl(self, **args):
  1790. if 'port' not in self.event_data:
  1791. logger.warn('port is not exist.')
  1792. return
  1793. order_id = self.event_data['order_id']
  1794. order_type = self.event_data['order_type']
  1795. cardType = self.event_data.get('cardType','ID')
  1796. card = self.update_card_dealer_and_type(str(self.event_data['cardNo']), cardType) # type: Card
  1797. if not card or not card.openId or card.frozen:
  1798. logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
  1799. return
  1800. has_done, master_order, sub_orders = self.get_or_create_order()
  1801. if has_done:
  1802. logger.debug('order<{}> has been done.'.format(repr(master_order)))
  1803. return
  1804. if self.event_data['status'] == 'running':
  1805. self.deal_running_order(master_order, sub_orders)
  1806. if self.event_data['status'] == 'finished':
  1807. self.deal_finished_order(master_order, sub_orders)
  1808. class CardRefundAckEvent(AckEvent):
  1809. def __init__(self, smartBox, event_data, pre_processor):
  1810. super(CardRefundAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1811. if self.pre_processor:
  1812. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1813. def do_impl(self, **args):
  1814. order_no = self.event_data['order_id']
  1815. order = RechargeRecord.objects(orderNo = order_no).first()
  1816. if order:
  1817. logger.debug('refund order<orderNo={}> has been done.'.format(order_no))
  1818. return
  1819. card = self.update_card_dealer_and_type(self.event_data['cardNo'], self.event_data['cardType']) # type: Card
  1820. if not card:
  1821. logger.debug('card<cardNo={}> is not bound.'.format(self.event_data['cardNo']))
  1822. return
  1823. self.refund_money_for_card(RMB(self.event_data['backMoney']), card.id, order_no)