base.py 95 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import copy
  4. import datetime
  5. import logging
  6. import uuid
  7. from arrow import Arrow
  8. from bson.objectid import ObjectId
  9. from django.conf import settings
  10. from mongoengine.errors import DoesNotExist
  11. from mongoengine.errors import NotUniqueError
  12. from typing import Tuple, Optional, TYPE_CHECKING, Iterable
  13. from apilib.monetary import VirtualCoin, RMB, Ratio
  14. from apilib.utils_string import make_title_from_dict
  15. from apilib.utils_sys import memcache_lock
  16. from apps.web.agent.models import Agent
  17. from apps.web.api.ykt_north.view import custQueryConsume,custquery
  18. from apps.web.south_intf.shangdong_platform import ShanDongNorther
  19. from apps.web.common.transaction import UserConsumeSubType
  20. from apps.web.constant import Const, FAULT_LEVEL, DeviceCmdCode, ErrorCode, DeviceErrorCodeDesc, \
  21. START_DEVICE_STATUS, DEALER_CONSUMPTION_AGG_KIND
  22. from apps.web.core.networking import MessageSender
  23. from apps.web.dealer.define import DEALER_INCOME_SOURCE
  24. from apps.web.dealer.models import Dealer
  25. from apps.web.device.models import Device, FaultRecord, Group, GroupDict, Part, DeviceType
  26. from apps.web.eventer import Event
  27. from apps.web.report.ledger import Ledger
  28. from apps.web.south_intf.platform import notify_event_to_north
  29. from apps.web.user.models import MyUser, RechargeRecord, CardConsumeRecord, CardRechargeRecord, ConsumeRecord, Card, \
  30. CardRechargeOrder, ServiceProgress
  31. from apps.web.user.models import RefundMoneyRecord
  32. from apps.web.user.transaction_deprecated import refund_money, refund_cash
  33. from apps.web.user.utils import freeze_user_balance
  34. from apps.web.utils import set_start_key_status
  35. if TYPE_CHECKING:
  36. from apps.web.core.adapter.base import SmartBox
  37. from apps.web.device.models import DeviceDict
  38. logger = logging.getLogger(__name__)
  39. class WorkEvent(Event):
  40. """
  41. 开始工作或者结束工作的事件
  42. """
  43. def __init__(self, smartBox, event_data):
  44. # type:(SmartBox,dict)->None
  45. super(WorkEvent, self).__init__(smartBox)
  46. self.event_data = event_data
  47. def do(self, **args):
  48. Device.update_dev_control_cache(self.device['devNo'], self.event_data)
  49. def get_managerialOpenId_by_openId(self, openId):
  50. try:
  51. user = MyUser.objects.filter(openId = openId, groupId = self.device.groupId).first()
  52. if not user:
  53. return None
  54. else:
  55. return user.managerialOpenId
  56. except Exception, e:
  57. logger.exception('get managerial openid error=%s' % e)
  58. return None
  59. # 记录充值记录数据。注意需要在卡的余额修改后,然后记录
  60. def record_refund_money_for_card(self, money, cardId, orderNo = None):
  61. card = Card.objects.get(id = cardId)
  62. groupId = self.device['groupId']
  63. group = Group.get_group(groupId)
  64. if group is None:
  65. return False
  66. # 增加充值记录
  67. payload = {
  68. 'orderNo': orderNo or str(uuid.uuid4()),
  69. 'devNo': self.device['devNo'],
  70. 'logicalCode': self.device['logicalCode'],
  71. 'devTypeName': self.device.devTypeName,
  72. 'ownerId': self.device['ownerId'],
  73. 'groupId': self.device['groupId'],
  74. 'groupName': group['groupName'],
  75. 'groupNumber': self.device['groupNumber'],
  76. 'address': group['address'],
  77. 'openId': card.openId,
  78. 'nickname': card.nickName,
  79. 'money': money,
  80. 'coins': VirtualCoin(money.amount),
  81. 'result': 'success',
  82. 'via': 'refund',
  83. 'attachParas': {'cardNo': card.cardNo}
  84. }
  85. record = RechargeRecord(**payload)
  86. try:
  87. record.save()
  88. except Exception, e:
  89. logger.error('cardNo = %s,save recharge record error=%s' % (card.cardNo, e))
  90. return False
  91. newRcd = CardRechargeRecord(
  92. orderNo = payload.get('orderNo'),
  93. cardNo=card.cardNo,
  94. cardId = str(card.id),
  95. openId = card.openId,
  96. ownerId = ObjectId(self.device['ownerId']),
  97. money = money,
  98. coins = money,
  99. preBalance = card.balance - money,
  100. balance = card.balance,
  101. devNo = self.device['devNo'],
  102. devTypeCode = self.device['devType']['code'],
  103. logicalCode = self.device['logicalCode'],
  104. groupId = self.device['groupId'],
  105. address = group['address'],
  106. groupNumber = self.device['groupNumber'],
  107. groupName = group['groupName'],
  108. status = 'success',
  109. rechargeType = 'netpay',
  110. remarks = u'退币'
  111. )
  112. try:
  113. newRcd.save()
  114. except Exception, e:
  115. logger.exception('save card consume rcd error=%s' % e)
  116. def refund_money_for_card(self, money, cardId, orderNo = None):
  117. # 添加金币
  118. try:
  119. Card.get_collection().update({'_id': ObjectId(cardId)}, {'$inc': {'balance': money.mongo_amount}})
  120. except Exception, e:
  121. logger.error('feedback coins error=%s' % e)
  122. return None
  123. self.record_refund_money_for_card(money, cardId, orderNo)
  124. try:
  125. card = Card.objects.get(id = cardId)
  126. card.lastMaxBalance = card.balance
  127. return card.save()
  128. except Exception, e:
  129. logger.info('save error=%s ' % e)
  130. # 用于比如刷卡回收余额的时候,还需要更新上次消费订单的消费数据
  131. def update_consume_record_refund_money_for_card(self, money, card):
  132. try:
  133. # 有可能出现一张卡刷使用多个端口,这个时候回收余额只能分到3个消费订单上,因为回收余额的时候,没有上报回收具体哪一个端口的,所以无法对应到端口
  134. cardRcds = CardConsumeRecord.objects.filter(openId = card.openId, cardId = str(card.id),
  135. result = 'success').order_by('-dateTimeAdded')
  136. for rcd in cardRcds:
  137. if rcd.servicedInfo.has_key('refundedMoney'):
  138. continue
  139. else:
  140. break
  141. rcd.servicedInfo.update({'refundedMoney': money.mongo_amount})
  142. rcd.save()
  143. if not rcd.linkedConsumeRcdOrderNo:
  144. return
  145. consumeRcd = ConsumeRecord.objects.get(orderNo = rcd.linkedConsumeRcdOrderNo)
  146. consumeRcd.servicedInfo.update({'refundedMoney': money.mongo_amount})
  147. consumeRcd.save()
  148. except Exception, e:
  149. logger.exception('update_consume_record_refund_money_for_card = %s,cardNo=%s' % (e, card.cardNo))
  150. # 通知充电完成
  151. def notify_charging_finished(self, managerialOpenId):
  152. if not managerialOpenId:
  153. return
  154. group = Group.get_group(self.device['groupId'])
  155. self.notify_user(managerialOpenId, 'service_complete', **{
  156. 'title': u'您订购的服务已经完成',
  157. 'service': u'充电服务(设备编号:%s, 地址:%s)' % (self.device['logicalCode'], group['address']),
  158. 'finishTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  159. 'remark': u'谢谢您的支持'
  160. })
  161. # ID/IC卡扣费。-1:异常错误,0:余额不足 1:正常
  162. def consume_money_for_card(self, card, money):
  163. # type:(Card, RMB)->Tuple[int, RMB]
  164. # 如果卡被解绑了或者挂失了,直接判断
  165. if card.openId == '' or card.frozen:
  166. return 0, RMB(0)
  167. if card.balance < money:
  168. card.balance = RMB(0)
  169. else:
  170. card.balance = (card.balance - money)
  171. try:
  172. card.save()
  173. except Exception as e:
  174. logger.exception(e)
  175. return -1, RMB(0)
  176. return 1, card.balance
  177. # 无值卡,通知扣费
  178. def notify_balance_has_consume_for_card(self, card, money, desc = u''):
  179. if not card or not card.managerialOpenId: # 有可能卡没有绑定,就不需要
  180. return
  181. self.notify_user(card.managerialOpenId, 'consume_notify', **{
  182. 'title': u'亲,您绑定的卡:%s(名称:%s),正在扣费。%s\\n' % (card.cardNo, card.cardName, desc),
  183. 'money': u'%s元\\n' % money,
  184. 'serviceType': u'刷卡消费\\n',
  185. 'finishTime': datetime.datetime.now().strftime(Const.DATETIME_FMT)
  186. })
  187. # 记录卡的消费
  188. def record_consume_for_card(self, card, money, desc = u'', servicedInfo = None, sid = None, attachParas = None):
  189. # type: (Card, RMB, unicode, Optional[dict], Optional[str], Optional[dict])->Tuple[str, str]
  190. servicedInfo = {} if servicedInfo is None else servicedInfo
  191. attachParas = {} if attachParas is None else attachParas
  192. group = Group.get_group(self.device['groupId'])
  193. address = group['address']
  194. group_number = self.device['groupNumber']
  195. # TODO 这里应该与其他插入消费记录的地方一样,运用model 同时应该取消time字段与其他model一样用dateTimeAdded
  196. now = datetime.datetime.now()
  197. new_record = {
  198. 'orderNo': ConsumeRecord.make_no(card.cardNo, UserConsumeSubType.CARD),
  199. 'time': now.strftime("%Y-%m-%d %H:%M:%S"),
  200. 'dateTimeAdded': now,
  201. 'openId': card.openId,
  202. 'ownerId': self.device['ownerId'],
  203. 'coin': money.mongo_amount,
  204. 'money': money.mongo_amount,
  205. 'devNo': self.device['devNo'],
  206. 'logicalCode': self.device['logicalCode'],
  207. 'groupId': self.device['groupId'],
  208. 'address': address,
  209. 'groupNumber': group_number,
  210. 'groupName': group['groupName'],
  211. 'devTypeCode': self.device.devTypeCode,
  212. 'devTypeName': self.device.devTypeName,
  213. 'isNormal': True,
  214. 'status': ConsumeRecord.Status.RUNNING,
  215. 'remarks': u'刷卡消费',
  216. 'errorDesc': '',
  217. 'sequanceNo': '',
  218. 'desc': desc,
  219. 'attachParas': attachParas,
  220. 'servicedInfo': servicedInfo
  221. }
  222. ConsumeRecord.get_collection().insert_one(new_record)
  223. # 刷卡消费也记录一条数据
  224. new_card_record = {
  225. 'orderNo': CardConsumeRecord.make_no(),
  226. 'openId': card.openId,
  227. 'cardId': str(card.id),
  228. 'money': money.mongo_amount,
  229. 'balance': card.balance.mongo_amount,
  230. 'devNo': self.device['devNo'],
  231. 'devType': self.device['devType']['name'],
  232. 'logicalCode': self.device['logicalCode'],
  233. 'groupId': self.device['groupId'],
  234. 'address': address,
  235. 'groupNumber': group_number,
  236. 'groupName': group['groupName'],
  237. 'result': 'success',
  238. 'remarks': u'刷卡消费',
  239. 'sequanceNo': '',
  240. 'dateTimeAdded': datetime.datetime.now(),
  241. 'desc': desc,
  242. 'servicedInfo': servicedInfo,
  243. 'linkedConsumeRcdOrderNo': str(new_record['orderNo'])
  244. }
  245. if sid is not None:
  246. new_card_record.update({'sid': sid})
  247. CardConsumeRecord.get_collection().insert_one(new_card_record)
  248. return new_record['orderNo'], new_card_record['orderNo']
  249. def record_consume_for_coin(self, money, desc=u'', servicedInfo=None, remarks=u'投币或者刷卡消费'):
  250. # type: (RMB, basestring, Optional[dict], basestring)->str
  251. """
  252. 记录硬币的消费
  253. :param money:
  254. :param desc:
  255. :param remarks:
  256. :param servicedInfo:
  257. :return:
  258. """
  259. servicedInfo = {} if servicedInfo is None else servicedInfo
  260. group = Group.get_group(self.device['groupId'])
  261. address = group['address']
  262. group_number = self.device['groupNumber']
  263. now = datetime.datetime.now()
  264. new_record = {
  265. 'orderNo': ConsumeRecord.make_no(self.device.logicalCode, UserConsumeSubType.COIN),
  266. 'dateTimeAdded': now,
  267. 'openId': '',
  268. 'ownerId': self.device['ownerId'],
  269. 'coin': money.mongo_amount,
  270. 'money': money.mongo_amount,
  271. 'devNo': self.device['devNo'],
  272. 'logicalCode': self.device['logicalCode'],
  273. 'groupId': self.device['groupId'],
  274. 'address': address,
  275. 'groupNumber': group_number,
  276. 'groupName': group['groupName'],
  277. 'devTypeCode': self.device.devTypeCode,
  278. 'devTypeName': self.device.devTypeName,
  279. 'isNormal': True,
  280. 'status': ConsumeRecord.Status.RUNNING,
  281. 'remarks': remarks,
  282. 'errorDesc': '',
  283. 'sequanceNo': '',
  284. 'desc': desc,
  285. 'attachParas': {},
  286. 'servicedInfo': servicedInfo
  287. }
  288. ConsumeRecord.get_collection().insert_one(new_record)
  289. return new_record['orderNo']
  290. def find_card_by_card_no(self, cardNo):
  291. dealer = Dealer.objects(id = self.device.ownerId).first() # type: Dealer
  292. if not dealer:
  293. logger.error('dealer is not found, dealer id = {}'.format(self.device.ownerId))
  294. return None
  295. agent = Agent.objects(id = dealer.agentId).first() # type: Agent
  296. if not agent:
  297. logger.error('agent is not found, agentId=%s' % dealer.agentId)
  298. return None
  299. group = Group.get_group(self.device.groupId) # type: GroupDict
  300. if not group:
  301. logger.error('group is not found, groupid=%s' % group.groupId)
  302. return None
  303. try:
  304. card = Card.objects.get(cardNo = cardNo, agentId = str(agent.id))
  305. except DoesNotExist:
  306. return None
  307. except Exception as e:
  308. logger.error(e)
  309. return None
  310. return card
  311. def update_card_dealer_and_type(self, cardNo, cardType='ID'):
  312. """
  313. :param cardNo:
  314. :param cardType:
  315. :return:
  316. """
  317. dealer = Dealer.objects(id=self.device.ownerId).first() # type: Dealer
  318. if not dealer:
  319. logger.error('dealer is not found, dealer id = {}'.format(self.device.ownerId))
  320. return None
  321. group = Group.get_group(self.device.groupId) # type: GroupDict
  322. if not group:
  323. logger.error('group is not found, groupId = %s' % group.groupId)
  324. return None
  325. try:
  326. card = Card.objects.get(cardNo=cardNo, dealerId=self.device.ownerId, openId__ne='')
  327. except DoesNotExist:
  328. logger.error('card is not exist, cardNo=%s' % cardNo)
  329. return None
  330. except Exception as e:
  331. logger.error(e)
  332. return None
  333. else:
  334. # 每次刷卡的时候 刷新一次devNo, 表示最后一次的卡刷的设备
  335. card.devNo = self.device.devNo
  336. # 如果沒有綁定信息, 直接綁定本次刷卡的信息. 后续不再更新
  337. if not card.dealerId or not card.groupId:
  338. card.dealerId = str(dealer.id)
  339. card.groupId = group.groupId
  340. card.cardType = cardType
  341. card.save()
  342. return card
  343. def recharge_id_card(self, card, rechargeType, order):
  344. # type: (Card, str, CardRechargeOrder)->None
  345. if not order or (order.money == RMB(0) and order.rechargeType != "sendCoin"):
  346. return
  347. status = Card.get_card_status(str(card.id))
  348. if status == 'busy':
  349. return
  350. # 锁机制
  351. card.__class__.set_card_status(str(card.id), 'busy')
  352. try:
  353. # 记录总额度的变化到充值订单中
  354. balance = card.balance + order.coins
  355. preBalance = card.balance
  356. # 更新充值订单,已经完成
  357. order.update_after_recharge_id_card(self.device, balance, preBalance)
  358. CardRechargeRecord.add_record(
  359. card=card,
  360. group=Group.get_group(order.groupId),
  361. order=order,
  362. device=self.device
  363. )
  364. # 刷新卡里面的余额
  365. card.recharge(order.chargeAmount, order.bestowAmount)
  366. card.account_recharge(order.rechargeOrder)
  367. except Exception as e:
  368. logger.exception(e)
  369. return
  370. finally:
  371. card.__class__.set_card_status(str(card.id), 'idle')
  372. def recharge_ic_card(self, card, preBalance, rechargeType, order, need_verify = True):
  373. # type:(Card, RMB, str, CardRechargeOrder, bool)->bool
  374. """
  375. # rechargeType有两种,一种是用直接覆写overwrite的方式,一种是append追加钱的方式。
  376. # 不同的的设备,充值的方式还不一样.注意:money是实际用户付的钱,coins是给用户充值的钱,比如付10块(money),充15(coins)。
  377. :param card:
  378. :param preBalance:
  379. :param rechargeType:
  380. :param order:
  381. :return:
  382. """
  383. if not order or order.coins == RMB(0):
  384. return False
  385. status = Card.get_card_status(str(card.id))
  386. if status == 'busy':
  387. return False
  388. Card.set_card_status(str(card.id), 'busy')
  389. try:
  390. # IC卡需要下发到设备,设备写卡,把余额打入卡中
  391. if rechargeType == 'overwrite':
  392. sendMoney = preBalance + order.coins
  393. else:
  394. sendMoney = order.coins
  395. # 先判断order最近一次充值是否OK. 满足三个条件才认为上次充值成功:
  396. # 1、操作时间已经超过三天
  397. # 2、操作结果是串口超时, 即result == 1
  398. # 3、当前余额大于最后一次充值操作的充值前余额
  399. if need_verify and len(order.operationLog) > 0:
  400. log = order.operationLog[-1]
  401. result = log['result']
  402. time_delta = (datetime.datetime.now() - log['time']).total_seconds()
  403. last_pre_balance = RMB(log['preBalance'])
  404. if (result == ErrorCode.DEVICE_CONN_FAIL or result == ErrorCode.BOARD_UART_TIMEOUT) \
  405. and (time_delta > 3 * 24 * 3600 or preBalance > last_pre_balance):
  406. logger.debug('{} recharge verify result is finished.'.format(repr(card)))
  407. order.update_after_recharge_ic_card(device = self.device,
  408. sendMoney = sendMoney,
  409. preBalance = preBalance,
  410. result = ErrorCode.SUCCESS,
  411. description = u'充值校验结束')
  412. CardRechargeRecord.add_record(
  413. card = card,
  414. group = Group.get_group(order.groupId),
  415. order = order,
  416. device = self.device)
  417. return False
  418. try:
  419. operation_result, balance = self.deviceAdapter.recharge_card(card.cardNo, sendMoney,
  420. orderNo = order.orderNo)
  421. order.update_after_recharge_ic_card(device = self.device,
  422. sendMoney = sendMoney,
  423. preBalance = preBalance,
  424. syncBalance = balance,
  425. result = operation_result['result'],
  426. description = operation_result['description'])
  427. if operation_result['result'] != ErrorCode.SUCCESS:
  428. return False
  429. if not balance:
  430. balance = preBalance + order.coins
  431. CardRechargeRecord.add_record(
  432. card = card,
  433. group = Group.get_group(order.groupId),
  434. order = order,
  435. device = self.device)
  436. # 刷新卡里面的余额
  437. card.balance = balance
  438. card.lastMaxBalance = balance
  439. card.save()
  440. return True
  441. except Exception as e:
  442. order.update_after_recharge_ic_card(device = self.device,
  443. sendMoney = sendMoney,
  444. preBalance = preBalance,
  445. syncBalance = balance,
  446. result = ErrorCode.EXCEPTION,
  447. description = e.message)
  448. return False
  449. except Exception as e:
  450. logger.exception(e)
  451. return False
  452. finally:
  453. Card.set_card_status(str(card.id), 'idle')
  454. def recharge_ic_card_realiable(self, card, preBalance, rechargeType, order, need_verify = True):
  455. # type:(Card, RMB, str, CardRechargeOrder, bool)->bool
  456. """
  457. # rechargeType有两种,一种是用直接覆写overwrite的方式,一种是append追加钱的方式。
  458. # 不同的的设备,充值的方式还不一样.注意:money是实际用户付的钱,coins是给用户充值的钱,比如付10块(money),充15(coins)。
  459. :param card:
  460. :param preBalance:
  461. :param rechargeType:
  462. :param order:
  463. :return:
  464. """
  465. if not order or order.coins == RMB(0):
  466. return False
  467. with memcache_lock('{}-{}-{}'.format(self.device.devNo, str(order.id), 'ic_recharge'), value = '1',
  468. expire = 300) as acquired:
  469. if not acquired:
  470. logger.debug('order<{}> is doing.'.format(repr(order)))
  471. return
  472. try:
  473. if order.status == 'finished':
  474. logger.debug('order<{}> has been finished.'.format(repr(order)))
  475. return
  476. # IC卡需要下发到设备,设备写卡,把余额打入卡中
  477. if rechargeType == 'overwrite':
  478. sendMoney = preBalance + order.coins
  479. else:
  480. sendMoney = order.coins
  481. if not order.processingLog:
  482. order.init_processing_log(device = self.device,
  483. sendMoney = sendMoney,
  484. preBalance = preBalance)
  485. order.save()
  486. return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
  487. log = copy.deepcopy(order.processingLog)
  488. result = log.get('result', None)
  489. if result == ErrorCode.IC_RECHARGE_FAIL:
  490. if result:
  491. order.operationLog.append(log)
  492. order.init_processing_log(device = self.device,
  493. sendMoney = sendMoney,
  494. preBalance = preBalance)
  495. order.save()
  496. return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
  497. else:
  498. # 先判断order最近一次充值是否OK. 满足三个条件才认为上次
  499. # 充值成功:
  500. # 1、操作时间已经超过三天
  501. # 2、操作结果是串口超时, 即result == 1
  502. # 3、当前余额大于最后一次充值操作的充值前余额
  503. time_delta = (datetime.datetime.now() - log['time']).total_seconds()
  504. last_pre_balance = RMB(log['preBalance'])
  505. if (time_delta > 3 * 24 * 3600 or preBalance > last_pre_balance):
  506. logger.debug('{} recharge verify result is finished.'.format(repr(card)))
  507. log.update({
  508. 'result': ErrorCode.SUCCESS,
  509. 'description': u'充值校验结束'
  510. })
  511. order.operationLog.append(log)
  512. order.processingLog = {}
  513. order.status = 'finished'
  514. order.save()
  515. CardRechargeRecord.add_record(
  516. card = card,
  517. group = Group.get_group(order.groupId),
  518. order = order,
  519. device = self.device)
  520. else:
  521. logger.debug('verify not recharge order<{}>'.format(repr(order)))
  522. order.operationLog.append(log)
  523. order.init_processing_log(device = self.device,
  524. sendMoney = sendMoney,
  525. preBalance = preBalance)
  526. order.save()
  527. return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
  528. except Exception as e:
  529. logger.exception(e)
  530. def update_card_recharge_for_success_event(self, orderId, balance):
  531. logger.info('update_card_recharge_for_success_event,orderId=%s' % orderId)
  532. order = CardRechargeOrder.objects.get(id = ObjectId(orderId))
  533. if order.status == 'finished': # double check,存在设备ack之前,已经发送了一条事件到路上了,所以需要检查
  534. return
  535. card = Card.objects.get(id = order.cardId)
  536. group = Group.get_group(self.device['groupId'])
  537. RechargeRecord.get_collection().update(
  538. {'_id': ObjectId(order.rechargeNo)},
  539. {'$set': {
  540. 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  541. 'groupId': self.device['groupId'],
  542. 'devNo': self.device['devNo'],
  543. 'logicalCode': self.device['logicalCode'],
  544. 'ownerId': self.device['ownerId'],
  545. 'groupName': group['groupName'],
  546. 'groupNumber': self.device['groupNumber'],
  547. 'address': group['address'],
  548. 'devType': self.device.get('devType', {}).get('name', ''),
  549. }},
  550. multi = True
  551. )
  552. #: 并且添加一条卡的成功充值的记录
  553. newRcd = CardRechargeRecord(
  554. cardId = str(card.id),
  555. cardNo = card.cardNo,
  556. openId = card.openId,
  557. ownerId = ObjectId(self.device['ownerId']),
  558. money = order.money,
  559. coins = order.coins,
  560. balance = balance,
  561. preBalance = card.balance,
  562. devNo = self.device['devNo'],
  563. devTypeCode = self.device['devType']['code'],
  564. logicalCode = self.device['logicalCode'],
  565. groupId = self.device['groupId'],
  566. address = group['address'],
  567. groupNumber = self.device['groupNumber'],
  568. groupName = group['groupName'],
  569. status = 'success',
  570. rechargeType = 'netpay'
  571. )
  572. try:
  573. newRcd.save()
  574. except Exception as e:
  575. logger.exception('save card consume rcd error=%s' % e)
  576. # 刷新卡里面的余额
  577. try:
  578. card.balance = balance
  579. card.lastMaxBalance = balance
  580. card.save()
  581. except Exception as e:
  582. logger.exception(e)
  583. try:
  584. order.status = 'finished'
  585. order.rechargeType = 'netpay'
  586. order.save()
  587. except Exception as e:
  588. logger.exception(e)
  589. # 更新收录有值卡
  590. def update_card_balance(self, card, balance):
  591. try:
  592. card.balance = balance
  593. card.save()
  594. except Exception as e:
  595. logger.exception(e)
  596. # 根据消耗的电量计算当前地址下的本次消耗电费
  597. def calc_elec_fee(self, spendElec):
  598. group = Group.objects.get(id = self.device['groupId'])
  599. return float(group.otherConf.get('elecFee')) * spendElec
  600. def calc_service_fee(self, fee, elec_charge, service_charge):
  601. elecFee = fee * Ratio(elec_charge) * Ratio(1 / float(elec_charge + service_charge))
  602. serviceFee = (fee - elecFee)
  603. return elecFee, serviceFee
  604. def time_ratio_pricing(self, value, leftTime, actualNeedTime):
  605. return value * Ratio(leftTime) * Ratio(1 / float(actualNeedTime))
  606. def get_backCoins(self, coins, leftTime, actualNeedTime):
  607. return self.time_ratio_pricing(value = coins, leftTime = leftTime, actualNeedTime = actualNeedTime)
  608. def get_backMoney(self, money, leftTime, actualNeedTime):
  609. return self.time_ratio_pricing(value = money, leftTime = leftTime, actualNeedTime = actualNeedTime)
  610. def generate_service_complete_title_by_devType(self, devTypeId, templateMap):
  611. try:
  612. devType = DeviceType.objects(id=devTypeId).first()
  613. if devType is None:
  614. return ''
  615. if devType.finishedMessageTemplateDict == {}:
  616. return ''
  617. for _ in templateMap.keys():
  618. if _ not in devType.finishedMessageTemplateDict['keys']:
  619. templateMap.pop(_)
  620. return devType.finishedMessageTemplateDict['template'].format(**templateMap)
  621. except Exception as e:
  622. logger.error(e)
  623. return ''
  624. def notify_user_service_complete(self, service_name, openid, port, address, reason, finished_time, url = None, extra = None):
  625. # type: (basestring, str, str, str, str, str, str, list)->None
  626. title_list = [
  627. {u'': u'{}结束,感谢您的使用!'.format(service_name)},
  628. {u'设备编号': self.device.logicalCode}
  629. ]
  630. if port:
  631. title_list.extend([{u'设备端口': port}, {u'设备地址': address}])
  632. else:
  633. title_list.extend(
  634. [{u'设备地址': address}])
  635. if extra:
  636. title_list.extend(extra)
  637. if reason:
  638. title_list.extend([{u'结束原因': reason}])
  639. dealer = self.device.owner # type: Dealer
  640. if dealer.showServicePhone:
  641. remark = u'客服联系电话:{}'.format(dealer.service_phone)
  642. else:
  643. remark = u'我们竭诚为您服务,有任何问题请联系客服!'
  644. if not finished_time:
  645. finished_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  646. agent = Agent.objects.filter(id=str(dealer.agentId)).first()
  647. if not agent:
  648. logger.warning('agent<id={}> is not exists.'.format(str(dealer.agentId)))
  649. return
  650. kw = {}
  651. # TODO: 微信订阅模板
  652. if agent.customizedUserSubGzhAllowable:
  653. if port:
  654. service = u"{}服务({}-端口{})".format(self.device.majorDeviceType, self.device.logicalCode, port)
  655. if len(service) > 20:
  656. service = u'{}服务({})'.format(self.device.majorDeviceType, self.device.logicalCode)
  657. if len(service) > 20:
  658. service = u'{}服务'.format(self.device.majorDeviceType)
  659. else:
  660. service = u'{}服务({})'.format(service_name, self.device.logicalCode)
  661. if len(service) > 20:
  662. service = u'{}服务'.format(self.device.majorDeviceType)
  663. kw = {
  664. 'service': service,
  665. 'finishTime': finished_time,
  666. 'remark': remark,
  667. }
  668. else:
  669. kw = {
  670. 'title': make_title_from_dict(title_list),
  671. 'service': u'{}服务'.format(service_name),
  672. 'finishTime': finished_time,
  673. 'remark': remark
  674. }
  675. kw.update({'url': url or settings.SERVER_END_BASE_SSL_URL + '/user/index.html?v=1.0.31#/user/consume'})
  676. self.notify_user(openid,'service_complete', **kw)
  677. def notify_to_sd_norther(self, portStr, consumeDict, platform=ShanDongNorther.platform.default): # type:(str, dict, int) -> None
  678. """
  679. 上报 订单结束信息到山东 省平台 或者济南静态平台(同一份协议 不同服务器)
  680. """
  681. try:
  682. logicalCode = self.device.logicalCode
  683. part = Part.objects.filter(logicalCode=logicalCode, partNo=portStr).first()
  684. dealer = self.device.owner
  685. if not part or not dealer:
  686. raise Exception("no part or no dealer")
  687. # 主动上报 能找到唯一的norther
  688. norther = ShanDongNorther.get_norther(dealerId=self.device.ownerId, platform=platform).first() # type: ShanDongNorther
  689. if not norther:
  690. return
  691. northerDict = {
  692. "connectorId": str(part.id),
  693. "startTime": consumeDict.get("startTime", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
  694. "endTime": consumeDict.get("finishedTime", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
  695. "totalPower": consumeDict.get("elec", 0),
  696. "totalElecMoney": consumeDict.get("spendMoney", 0),
  697. }
  698. logger.info("northerId:{}, devNo:{}, dealerId:{} norther to sd infoDict = {}".format(
  699. str(norther.id),
  700. self.device.devNo,
  701. self.device.ownerId,
  702. northerDict)
  703. )
  704. # TODO 这个地方建议不同的实现放在model 内部实现 对外保持norther的notify接口统一
  705. if norther.isSdPlatform:
  706. norther.notification_order_info(northerDict)
  707. else:
  708. norther.notification_order_info_jn(northerDict)
  709. except Exception as e:
  710. logger.info(e)
  711. def query_to_ykt_norther(self,cardNo):
  712. resutl = custquery(cardNo)
  713. code = resutl['resultCode']
  714. if code == u'000000':
  715. cardInfo = {
  716. "customerId":resutl['result']['customerId'],
  717. "userName":resutl['result']['userName'],
  718. "userCode":resutl['result']['userCode'],
  719. "oddFare":resutl['result']['oddFare'],
  720. "custStatus":resutl['result']['custStatus'],
  721. "cardNo":cardNo,
  722. }
  723. return{'code':code,'message':resutl['resultMsg'],'cardInfo':cardInfo}
  724. elif code == u'999999':
  725. return {'code': code, 'message': resutl['resultMsg'], 'cardInfo': {}}
  726. elif code == u'555555':
  727. return {'code': code, 'message': '刷卡异常', 'cardInfo': {}}
  728. def notify_to_ykt_norther(self,cardSnr,orderNo,tradeFare,tradeDate):
  729. """
  730. 刷卡后上报信息到一网通平台
  731. 如果推送失败会一直推送
  732. """
  733. circulation = 3
  734. # 推送
  735. while circulation>0:
  736. result = custQueryConsume(cardSnr,orderNo,tradeFare,tradeDate)
  737. resultCode = result.get('resultCode')
  738. if resultCode == "000000":
  739. circulation = 0
  740. else:
  741. circulation -= 1
  742. return {
  743. 'resultCode':result.get('resultCode'),
  744. 'resultMsg':result.get('resultMsg'),
  745. 'orderNo':result.get('orderNo'),
  746. 'oddFare':result.get('oddFare')
  747. }
  748. def refund_net_pay(self, user, lineInfo, refundedMoney, refundCoins, consumeDict, is_cash):
  749. # type: (MyUser, dict, RMB, VirtualCoin, dict, bool)->None
  750. logger.debug(
  751. 'refund for net pay. user = {}, lineInfo = {}, refundedMoney = {}, refundCoins = {}, is_cash = {}'.format(
  752. repr(user), lineInfo, refundedMoney, refundCoins, is_cash))
  753. refund_recharge_ids = []
  754. if is_cash:
  755. if 'rechargeRcdId' in lineInfo:
  756. refund_recharge_ids.append(lineInfo['rechargeRcdId'])
  757. else:
  758. pay_info = lineInfo.get('payInfo', list())
  759. for item in pay_info:
  760. if 'rechargeRcdId' in item:
  761. refund_recharge_ids.append(item['rechargeRcdId'])
  762. if refundCoins <= VirtualCoin(0):
  763. # 只能退现金的情况
  764. consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: RMB(0).mongo_amount})
  765. if len(refund_recharge_ids) > 0:
  766. logger.info('need refund cash, ids = %s' % str(refund_recharge_ids))
  767. if refundedMoney <= RMB(0):
  768. consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: RMB(0).mongo_amount})
  769. return
  770. left_refund = refundedMoney
  771. all_refund = RMB(0)
  772. for recharge_record_id in refund_recharge_ids[::-1]:
  773. recharge_record = RechargeRecord.objects(id = str(recharge_record_id)).first() # type: RechargeRecord
  774. if not recharge_record:
  775. logger.error('not find recharge record {}'.format(recharge_record_id))
  776. continue
  777. if recharge_record.openId != lineInfo['openId']:
  778. logger.error('is not my record. {} != {}'.format(recharge_record.openId, lineInfo['openId']))
  779. continue
  780. this_refund = min(recharge_record.money, left_refund)
  781. if this_refund <= RMB(0):
  782. continue
  783. logger.debug('try to refund money {} for recharge record {}'.format(this_refund, repr(recharge_record)))
  784. try:
  785. all_refund += this_refund
  786. refund_order = refund_cash(
  787. recharge_record, this_refund, VirtualCoin(0), user = user) # type: RefundMoneyRecord
  788. if not refund_order:
  789. logger.error(
  790. 'refund money<{}> failure. recharge = {}'.format(this_refund, repr(recharge_record)))
  791. except Exception as e:
  792. logger.exception(e)
  793. left_refund = left_refund - this_refund
  794. if left_refund <= RMB(0):
  795. break
  796. if all_refund > RMB(0):
  797. consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: all_refund.mongo_amount})
  798. else:
  799. if refundCoins > VirtualCoin(0):
  800. consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_COINS: refundCoins.mongo_amount})
  801. refund_money(self.device, refundCoins, lineInfo['openId'])
  802. class FaultEvent(Event):
  803. def __init__(self, smartBox, event_data):
  804. # type:(SmartBox,dict)->None
  805. super(FaultEvent, self).__init__(smartBox)
  806. self.event_data = event_data
  807. def is_notify_dealer(self):
  808. if self.dealer:
  809. return self.dealer.devFaultPushDealerSwitch
  810. else:
  811. return False
  812. def is_notify_user(self):
  813. if self.dealer:
  814. return self.dealer.devFaultPushUserSwitch
  815. else:
  816. return False
  817. def record(self, faultCode = '', description = None, title = '', detail = None, level = FAULT_LEVEL.NORMAL, portNo = 0):
  818. # 故障记录入库
  819. if self.dealer is not None:
  820. dealerId = self.dealer.id
  821. else:
  822. dealerId = None
  823. logicalCode = self.device['logicalCode']
  824. now = datetime.datetime.now()
  825. created_within_five_minutes = now - datetime.timedelta(minutes = 5)
  826. # 这个应该是防止重复报
  827. try:
  828. fault_record = FaultRecord.objects(
  829. status="init",
  830. faultCode = faultCode,
  831. title = title,
  832. description = description,
  833. logicalCode = logicalCode,
  834. portNo = portNo,
  835. createdTime__gt = created_within_five_minutes
  836. ).first()
  837. if not fault_record:
  838. group = Group.get_group(self.device['groupId'])
  839. fault_record = FaultRecord(
  840. logicalCode = logicalCode,
  841. imei = self.device['devNo'],
  842. portNo = portNo,
  843. faultCode = faultCode,
  844. title = title,
  845. description = description or self.event_data.get('statusInfo', ''),
  846. groupName = group.get('groupName', ''),
  847. address = group.get('address', '')
  848. )
  849. if dealerId is not None:
  850. fault_record.dealerId = dealerId
  851. if detail is not None:
  852. if not isinstance(detail, dict):
  853. raise TypeError('detail has to be a dict')
  854. fault_record.detail = detail
  855. fault_record = fault_record.save()
  856. else:
  857. logger.debug('%r existed within 5 minutes' % (fault_record,))
  858. return fault_record
  859. except NotUniqueError as e:
  860. logger.error('cannot insert FaultRecord, dev=%s, error=%s' % (self.device['devNo'], e))
  861. def do(self, **args):
  862. """不要将所有的信息不加筛选的打入缓存"""
  863. group = self.device.group
  864. # 通知经销商
  865. if self.is_notify_dealer():
  866. titleDictList = [
  867. {u'告警名称': self.event_data.get("faultName", u"设备告警")},
  868. {u'设备编号': self.device["logicalCode"]},
  869. {u'地址名称': group["groupName"]}
  870. ]
  871. self.notify_dealer('device_fault', **{
  872. 'title': make_title_from_dict(titleDictList),
  873. 'device': u'%s号设备-%s端口\\n' % (
  874. self.device['groupNumber'], self.event_data.get('port')) if self.event_data.has_key(
  875. 'port') else u'%s号设备\\n' % self.device['groupNumber'],
  876. 'faultType': u'%s\\n' % self.event_data['faultType'] if self.event_data.has_key(
  877. 'faultType') else u'设备告警\\n',
  878. 'notifyTime': u'%s\\n' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  879. 'fault': u'%s(详细情况,建议您到管理后台的告警模块进行查看)\\n' % self.event_data['desc'] if self.event_data.has_key(
  880. 'desc') else self.event_data.get('statusInfo', ''),
  881. })
  882. # 通知用户
  883. if self.is_notify_user():
  884. if self.device.has_key('openId'):
  885. user = MyUser.objects(openId = self.device.get('openId'), groupId = self.device['groupId']).first()
  886. self.notify_user(user.managerialOpenId if user else '', 'device_fault',
  887. **{
  888. 'title': u'注意!注意!您正在使用的设备发生了故障,设备编码:%s' % self.device['logicalCode'],
  889. 'fault': self.event_data['statusInfo'],
  890. 'notifyTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  891. })
  892. # 写入故障记录
  893. faultRecord = self.record(
  894. faultCode = self.event_data.get('faultCode', ''),
  895. description = self.event_data.get('desc', None),
  896. title = self.event_data.get('faultName', ''),
  897. detail = self.event_data.get('detail', None),
  898. level = self.event_data.get('level', FAULT_LEVEL.NORMAL),
  899. portNo = self.event_data.get('port', 0)
  900. )
  901. # 梁溪的告警
  902. if faultRecord:
  903. self.north_to_liangxi(faultRecord)
  904. notify_event_to_north(
  905. self.dealer,
  906. self.device,
  907. level = Const.EVENT_NORMAL,
  908. desc = self.event_data['desc'] if self.event_data.has_key('desc') else self.event_data.get('statusInfo', '')
  909. )
  910. def north_to_liangxi(self, faultRecord):
  911. from taskmanager.mediator import task_caller
  912. task_caller(
  913. 'send_to_xf_falut',
  914. devNo=self.device.devNo,
  915. faultId=str(faultRecord.id)
  916. )
  917. def time_ratio_pricing(self, value, leftTime, actualNeedTime):
  918. return value * Ratio(leftTime) * Ratio(1 / float(actualNeedTime))
  919. def get_backCoins(self, coins, leftTime, actualNeedTime):
  920. return self.time_ratio_pricing(value = coins, leftTime = leftTime, actualNeedTime = actualNeedTime)
  921. def get_backMoney(self, money, leftTime, actualNeedTime):
  922. return self.time_ratio_pricing(value = money, leftTime = leftTime, actualNeedTime = actualNeedTime)
  923. class AckEventProcessorIntf(object):
  924. def pre_processing(self, device, event_data):
  925. # type:(DeviceDict, dict)->dict
  926. return event_data
  927. class AckEvent(WorkEvent):
  928. """
  929. 目前设计为不能重入. 如果执行有异常, 肯定是代码问题
  930. """
  931. def __init__(self, smartBox, event_data, pre_processor = None):
  932. # type:(SmartBox, dict, AckEventProcessorIntf)->None
  933. super(AckEvent, self).__init__(smartBox, event_data)
  934. self.pre_processor = pre_processor
  935. def ack_msg(self):
  936. if self.event_data['status'] == 'finishing':
  937. logger.debug('finishing status is not need to ack.')
  938. return
  939. payload = {
  940. 'order_id': self.event_data['order_id'],
  941. 'order_type': self.event_data['order_type'],
  942. 'status': self.event_data['status']
  943. }
  944. # if response:
  945. # payload.update(response)
  946. MessageSender.send_no_wait(device = self.device,
  947. cmd = DeviceCmdCode.EVENT_ACK,
  948. payload = payload)
  949. def do_impl(self, **args):
  950. raise Exception('must implement do_impl interface.')
  951. def do(self, **args):
  952. if 'order_id' not in self.event_data or 'order_type' not in self.event_data:
  953. logger.error('order id or type is null.')
  954. return
  955. order_id = self.event_data['order_id']
  956. order_type = self.event_data['order_type']
  957. with memcache_lock('{}-{}-{}'.format(self.device.devNo, order_id, order_type), value = '1',
  958. expire = 300) as acquired:
  959. if not acquired:
  960. logger.debug('ack message<{}-{}-{}> is duplicate.'.format(self.device.devNo, order_id, order_type))
  961. return
  962. try:
  963. self.do_impl(**args)
  964. except Exception as e:
  965. import traceback
  966. logger.warning(traceback.format_exc())
  967. if 'status' in self.event_data:
  968. self.ack_msg()
  969. class ComNetPayAckEvent(AckEvent):
  970. def post_before_start(self, order=None):
  971. pass
  972. def post_after_start(self, order=None):
  973. raise NotImplementedError('must implement post_after_start.')
  974. def post_before_finish(self, order=None):
  975. pass
  976. def post_after_finish(self, order=None):
  977. raise NotImplementedError('must implement post_after_finish.')
  978. def do_finished_event(self, order, sub_orders, merge_order_info):
  979. raise NotImplementedError('must implement do_finished_event.')
  980. def do_running_order(self, order, result):
  981. # type: (ConsumeRecord, dict)->None
  982. self.post_before_start(order)
  983. if order.status in ['running', 'finished']:
  984. logger.debug('order<{}> no need to ack. this has done.'.format(repr(order)))
  985. return
  986. if order.status == 'unknown':
  987. dealer = self.device.owner
  988. # 设备支持上报告扣款
  989. if 'device_callback_to_freeze_balance' in dealer.features:
  990. freeze_user_balance(self.device, Group.get_group(order.groupId), order)
  991. err_desc = u'设备启动成功,事件上报成功,补充扣款'
  992. else:
  993. raise Exception('Invalid operation!')
  994. else:
  995. set_start_key_status(start_key=order.startKey,
  996. state=START_DEVICE_STATUS.FINISHED,
  997. order_id=str(order.id))
  998. err_desc = ''
  999. if 'master' in result:
  1000. order.association = {
  1001. 'master': result['master']
  1002. }
  1003. order.servicedInfo.update({'masterOrderNo': result['master']})
  1004. order.errorDesc = err_desc
  1005. order.isNormal = True
  1006. order.status = 'running'
  1007. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  1008. order.save()
  1009. self.post_after_start(order=order)
  1010. def deal_running_event(self, order):
  1011. # type: (ConsumeRecord)->None
  1012. if 'sub' in self.event_data:
  1013. order.association = {
  1014. 'sub': [item['order_id'] for item in self.event_data['sub']]
  1015. }
  1016. order.servicedInfo.update(
  1017. {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))})
  1018. order.save()
  1019. self.do_running_order(order, self.event_data)
  1020. sub_order_map = {}
  1021. for item in self.event_data['sub']:
  1022. item['master'] = self.event_data['order_id']
  1023. item['sts'] = self.event_data['sts']
  1024. sub_order_map[item['order_id']] = item
  1025. sub_orders = [_ for _ in ConsumeRecord.objects.filter(orderNo__in = sub_order_map.keys())]
  1026. for sub_order in sub_orders:
  1027. self.do_running_order(sub_order, sub_order_map[sub_order.orderNo])
  1028. else:
  1029. self.do_running_order(order, self.event_data)
  1030. sub_orders = []
  1031. account_info = self.merge_order(order, sub_orders)
  1032. cache_info = {
  1033. 'startTime': Arrow.fromdatetime(order.startTime, tzinfo = settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
  1034. 'status': Const.DEV_WORK_STATUS_WORKING,
  1035. 'isStart': True,
  1036. 'openId': order.openId,
  1037. 'orderNo': order.orderNo,
  1038. 'port': order.used_port
  1039. }
  1040. if sub_orders:
  1041. cache_info.update({
  1042. 'subOrderNos': [_.orderNo for _ in sub_orders]
  1043. })
  1044. cache_info.update(account_info)
  1045. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1046. if order.used_port < Const.NO_PORT:
  1047. current_port_info = current_cache_info.get(str(order.used_port))
  1048. if not current_port_info or not current_cache_info.get("orderNo") or (order.orderNo == current_port_info.get('orderNo', None)) or current_port_info.get('status', 0) == 0:
  1049. Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
  1050. else:
  1051. if not current_cache_info or not current_cache_info.get("orderNo") (order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get('status', 0) == 0:
  1052. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1053. ServiceProgress.new_progress_for_order(order = order,
  1054. device = self.device,
  1055. cache_info = cache_info)
  1056. def do_finished_order(self, order, result):
  1057. self.post_before_finish(order=order)
  1058. if 'sub' in result:
  1059. order.association = {
  1060. 'sub': [item['order_id'] for item in self.event_data['sub']]
  1061. }
  1062. order.servicedInfo.update(
  1063. {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))})
  1064. elif 'master' in result:
  1065. order.association = {
  1066. 'master': result['master']
  1067. }
  1068. order.servicedInfo.update({'masterOrderNo': result['master']})
  1069. if order.status == 'running':
  1070. order.status = 'finished'
  1071. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  1072. order.save()
  1073. else:
  1074. if order.status == 'unknown':
  1075. dealer = self.device.owner
  1076. # 设备支持上报告扣款
  1077. if 'device_callback_to_freeze_balance' in dealer.features:
  1078. freeze_user_balance(self.device, Group.get_group(order.groupId), order)
  1079. err_desc = u'结束事件上报成功,补充扣款'
  1080. else:
  1081. raise Exception('Invalid operation!')
  1082. else:
  1083. # created状态
  1084. set_start_key_status(start_key=order.startKey,
  1085. state=START_DEVICE_STATUS.FINISHED,
  1086. order_id=str(order.id))
  1087. err_desc = ''
  1088. order.isNormal = True
  1089. order.status = 'finished'
  1090. order.errorDesc = err_desc
  1091. order.startTime = datetime.datetime.fromtimestamp(result['sts'])
  1092. order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
  1093. order.save()
  1094. self.post_after_finish(order=order)
  1095. def deal_finished_event(self, order):
  1096. # type: (ConsumeRecord)->None
  1097. if order.status == 'finished':
  1098. logger.debug('order<{}> has finished.'.format(repr(order)))
  1099. return
  1100. self.do_finished_order(order, self.event_data)
  1101. sub_orders = []
  1102. if 'sub' in self.event_data:
  1103. sub_order_map = {}
  1104. for item in self.event_data['sub']:
  1105. item['status'] = 'finished'
  1106. item['master'] = str(order.orderNo)
  1107. item['sts'] = self.event_data['sts']
  1108. item['fts'] = self.event_data['fts']
  1109. sub_order_map[item['order_id']] = item
  1110. sub_orders = [_ for _ in ConsumeRecord.objects.filter(orderNo__in = sub_order_map.keys())]
  1111. for sub_order in sub_orders:
  1112. self.do_finished_order(sub_order, sub_order_map[sub_order.orderNo])
  1113. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1114. if order.used_port != Const.NO_PORT:
  1115. current_port_info = current_cache_info.get(str(order.used_port))
  1116. if current_port_info and (order.orderNo == current_port_info.get('orderNo', None)):
  1117. Device.clear_port_control_cache(self.device.devNo, order.used_port)
  1118. else:
  1119. if current_cache_info and order.orderNo == current_cache_info.get('orderNo', None):
  1120. Device.invalid_device_control_cache(self.device.devNo)
  1121. ServiceProgress.objects(open_id = order.openId,
  1122. device_imei = self.device.devNo,
  1123. port = int(order.used_port),
  1124. consumeOrder__orderNo = order.orderNo).update_one(
  1125. upsert = False,
  1126. **{
  1127. 'isFinished': True,
  1128. 'finished_time': Arrow.fromdatetime(order.finishedTime, tzinfo = settings.TIME_ZONE).timestamp,
  1129. 'expireAt': datetime.datetime.now()
  1130. })
  1131. merge_order_info = self.merge_order(order, sub_orders)
  1132. self.do_finished_event(order, sub_orders, merge_order_info)
  1133. def merge_order(self, master_order, sub_orders):
  1134. # type:(ConsumeRecord, list)->dict
  1135. raise NotImplementedError('must implement merge_order.')
  1136. def do_impl(self, **args):
  1137. order_id = self.event_data['order_id']
  1138. order = ConsumeRecord.objects(ownerId = self.device.ownerId,
  1139. orderNo = order_id).first() # type: ConsumeRecord
  1140. if not order:
  1141. logger.debug('order<no={}> is not exist.'.format(self.event_data['order_id']))
  1142. return
  1143. if order.status in ['end', 'waitPay', 'finished']:
  1144. logger.debug('order<{}> has been finished.'.format(repr(order)))
  1145. return
  1146. if order.used_port != self.event_data['port']:
  1147. logger.error('port is not equal. {} != {}'.format(self.event_data['port'], order.used_port))
  1148. return
  1149. if self.pre_processor:
  1150. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1151. if self.event_data['status'] in ['running', 'finishing']:
  1152. return self.deal_running_event(order)
  1153. if self.event_data['status'] == 'finished':
  1154. return self.deal_finished_event(order)
  1155. class IcStartAckEvent(AckEvent):
  1156. def __init__(self, smartBox, event_data, pre_processor=None):
  1157. super(IcStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1158. if self.pre_processor:
  1159. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1160. self.card = self.update_card_dealer_and_type(self.event_data.get('cardNo'),
  1161. cardType=self.event_data.get('cardType', 'IC'))
  1162. def post_before_start(self, order=None):
  1163. pass
  1164. def post_after_start(self, order=None):
  1165. raise NotImplementedError('must implement post_after_start.')
  1166. def post_before_finish(self, order=None):
  1167. pass
  1168. def post_after_finish(self, order=None):
  1169. raise NotImplementedError('must implement post_after_finish.')
  1170. def checkout_order(self, order):
  1171. raise NotImplementedError('must implement checkout_order.')
  1172. def get_or_create_order(self):
  1173. # type:()-> tuple
  1174. def new_one(order_id):
  1175. # type:( str)->ConsumeRecord
  1176. if self.event_data['order_id'] == order_id:
  1177. fee = VirtualCoin(self.event_data['fee'])
  1178. else:
  1179. sub = self.event_data.get('sub', [])
  1180. fee = VirtualCoin(map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0])
  1181. order = ConsumeRecord.objects(startKey=order_id).first()
  1182. if not order:
  1183. attach_paras = {'chargeIndex': self.event_data['port']}
  1184. order = ConsumeRecord.new_one(
  1185. order_no=ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.CARD),
  1186. user=MyUser(openId=self.card.openId, nickname=self.card.nickName),
  1187. device=self.device,
  1188. group=Group.get_group(self.device.groupId),
  1189. package=self.event_data.get('package', {}),
  1190. attach_paras=attach_paras,
  1191. pay_info={
  1192. 'via': 'card',
  1193. 'coins': fee.mongo_amount
  1194. },
  1195. start_key=order_id)
  1196. order.servicedInfo = {'cardNo': self.event_data.get('cardNo'), 'chargeIndex': self.event_data['port']}
  1197. order.save()
  1198. card_order = CardConsumeRecord.objects(orderNo=order.orderNo).first()
  1199. if not card_order:
  1200. group = Group.get_group(self.device.groupId) # type:GroupDict
  1201. card_order = CardConsumeRecord(orderNo=order.orderNo,
  1202. openId=self.card.openId,
  1203. cardId=str(self.card.id),
  1204. money=fee.mongo_amount,
  1205. balance=self.card.balance.mongo_amount,
  1206. devNo=self.device.devNo,
  1207. devType=self.device.devTypeName,
  1208. logicalCode=self.device.logicalCode,
  1209. groupId=self.device.groupId,
  1210. address=group.address,
  1211. groupNumber=self.device.groupNumber,
  1212. groupName=group.groupName,
  1213. result='success',
  1214. remarks=u'刷卡消费',
  1215. dateTimeAdded=datetime.datetime.now(),
  1216. linkedConsumeRcdOrderNo=order.orderNo).save()
  1217. # 生成订单处理卡消费(预付费还是后付费)
  1218. try:
  1219. self.checkout_order(order)
  1220. except Exception as e:
  1221. order.isNormal = False
  1222. order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
  1223. order.save()
  1224. return order
  1225. def new_sub_order(order_id, master_order_id):
  1226. order = new_one(order_id)
  1227. order.isNormal = True
  1228. order.status = self.event_data['status']
  1229. order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1230. if 'fts' in self.event_data:
  1231. order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1232. if ('master' not in order.association) or (order.association['master'] != master_order_id):
  1233. order.association = {'master': master_order_id}
  1234. order.servicedInfo['masterOrderNo'] = master_order_id
  1235. order.save()
  1236. return order
  1237. master_order = new_one(self.event_data['order_id'])
  1238. master_order.save()
  1239. sub_orders = []
  1240. for item in self.event_data.get('sub', []):
  1241. sub_order = new_sub_order(item['order_id'], master_order.orderNo)
  1242. sub_orders.append(sub_order)
  1243. sub_order_id_list = [item.orderNo for item in sub_orders]
  1244. has_done = True
  1245. if master_order.status == ConsumeRecord.Status.FINISHED:
  1246. logger.debug('master order<{}> has been done.'.format(repr(master_order)))
  1247. has_done = True
  1248. if master_order.status == ConsumeRecord.Status.CREATED:
  1249. has_done = False
  1250. master_order.association = {'sub': sub_order_id_list}
  1251. if sub_order_id_list:
  1252. master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
  1253. else:
  1254. if master_order.status == ConsumeRecord.Status.RUNNING and \
  1255. self.event_data['status'] == ConsumeRecord.Status.FINISHED:
  1256. has_done = False
  1257. if set(master_order.association['sub']) != set(sub_order_id_list):
  1258. master_order.association = {'sub': sub_order_id_list}
  1259. if sub_order_id_list:
  1260. master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
  1261. has_done = False
  1262. master_order.isNormal = True
  1263. master_order.status = self.event_data['status']
  1264. if not master_order.startTime:
  1265. if 'sts' in self.event_data:
  1266. master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1267. else:
  1268. master_order.startTime = datetime.datetime.now()
  1269. if 'fts' in self.event_data:
  1270. master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1271. master_order.save()
  1272. return has_done, master_order, sub_orders
  1273. def merge_order(self, master_order, sub_orders):
  1274. # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
  1275. raise NotImplementedError('must implement merge_order.')
  1276. def do_finished_event(self, order, merge_order_info):
  1277. # type:(ConsumeRecord, dict)->None
  1278. raise NotImplementedError('must implement do_finished_event.')
  1279. def deal_running_order(self, master_order, sub_orders):
  1280. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1281. self.post_before_start(master_order)
  1282. try:
  1283. start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
  1284. cache_info = {
  1285. 'port': str(master_order.used_port),
  1286. 'cardId': str(self.card.id),
  1287. 'openId': self.card.openId,
  1288. 'startTime': start_time_str,
  1289. 'isStart': True,
  1290. 'status': Const.DEV_WORK_STATUS_WORKING,
  1291. 'orderNo': master_order.orderNo,
  1292. 'subOrderNos': [item.orderNo for item in sub_orders]
  1293. }
  1294. cache_info.update(self.merge_order(master_order, sub_orders))
  1295. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1296. if master_order.used_port < Const.NO_PORT:
  1297. current_port_info = current_cache_info.get(str(master_order.used_port), {})
  1298. if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo',
  1299. None) or current_port_info.get(
  1300. 'status', 0) == Const.DEV_WORK_STATUS_IDLE:
  1301. Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
  1302. else:
  1303. if start_time_str > current_port_info.get('startTime', '1970-01-01'):
  1304. Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
  1305. else:
  1306. pass
  1307. else:
  1308. if not current_cache_info or (
  1309. master_order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get(
  1310. 'status', 0) == Const.DEV_WORK_STATUS_IDLE:
  1311. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1312. else:
  1313. if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
  1314. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1315. else:
  1316. pass
  1317. ServiceProgress.new_progress_for_order(order=master_order,
  1318. device=self.device,
  1319. cache_info=cache_info)
  1320. finally:
  1321. self.post_after_start(order=master_order)
  1322. def deal_finished_order(self, master_order, sub_orders):
  1323. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1324. self.post_before_finish(master_order)
  1325. try:
  1326. self.do_finished_event(master_order, self.merge_order(master_order, sub_orders))
  1327. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1328. if master_order.used_port < Const.NO_PORT:
  1329. current_port_info = current_cache_info.get(str(master_order.used_port))
  1330. if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
  1331. Device.clear_port_control_cache(devNo=self.device.devNo, port=master_order.used_port)
  1332. else:
  1333. if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
  1334. Device.invalid_device_control_cache(self.device.devNo)
  1335. ServiceProgress.objects(open_id=self.card.openId,
  1336. device_imei=self.device.devNo,
  1337. port=int(master_order.used_port)).update_one(
  1338. upsert=False,
  1339. **{
  1340. 'isFinished': True,
  1341. 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
  1342. tzinfo=settings.TIME_ZONE).timestamp,
  1343. 'expireAt': datetime.datetime.now()
  1344. })
  1345. finally:
  1346. self.post_after_finish(order=master_order)
  1347. def do_impl(self, **args):
  1348. if 'port' not in self.event_data:
  1349. logger.warn('port is not exist.')
  1350. return
  1351. order_id = self.event_data['order_id']
  1352. order_type = self.event_data['order_type']
  1353. if not self.card or not self.card.openId or self.card.frozen:
  1354. logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
  1355. return
  1356. has_done, master_order, sub_orders = self.get_or_create_order()
  1357. if has_done:
  1358. logger.debug('order<{}> has been done.'.format(repr(master_order)))
  1359. return
  1360. if self.event_data['status'] == 'running':
  1361. self.deal_running_order(master_order, sub_orders)
  1362. if self.event_data['status'] == 'finished':
  1363. self.deal_finished_order(master_order, sub_orders)
  1364. class IcRechargeAckEvent(AckEvent):
  1365. def __init__(self, smartBox, event_data, pre_processor = None):
  1366. super(IcRechargeAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1367. if self.pre_processor:
  1368. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1369. def do_impl(self, **args):
  1370. order = CardRechargeOrder.objects(orderNo = self.event_data['order_id']).first() # type: CardRechargeOrder
  1371. if not order:
  1372. logger.error('order<orderNo={}> is not exist.'.format(self.event_data['order_id']))
  1373. return
  1374. if order.status == 'finished':
  1375. logger.debug('order<{}> has be done.'.format(repr(order)))
  1376. return
  1377. if not order.processingLog:
  1378. logger.error('order<{}> has no processing log.'.format(repr(order)))
  1379. return
  1380. cardNo = self.event_data['cardNo']
  1381. if str(cardNo) != order.cardNo:
  1382. logger.error('check card no failure. {} != {}'.format(cardNo, order.cardNo))
  1383. return
  1384. card = Card.objects(cardNo = order.cardNo).first()
  1385. if not card:
  1386. logger.error('not find card<{}>'.format(card.cardNo))
  1387. return
  1388. result = self.event_data['rst']
  1389. if result == ErrorCode.BOARD_UART_TIMEOUT:
  1390. order.processingLog['result'] = result
  1391. order.processingLog.update({
  1392. 'result': self.event_data['rst'],
  1393. 'description': DeviceErrorCodeDesc.get(result)
  1394. })
  1395. order.save()
  1396. else:
  1397. # 刷新卡里面的余额
  1398. balance = VirtualCoin(self.event_data.get('balance'))
  1399. if balance != card.balance:
  1400. card.balance = balance
  1401. card.lastMaxBalance = balance
  1402. card.save()
  1403. if result == ErrorCode.DEVICE_SUCCESS:
  1404. log = copy.deepcopy(order.processingLog)
  1405. log.update({
  1406. 'result': result,
  1407. 'description': DeviceErrorCodeDesc.get(result),
  1408. 'syncBalance': balance.mongo_amount
  1409. })
  1410. order.operationLog.append(log)
  1411. order.processingLog = {}
  1412. order.status = 'finished'
  1413. order.save()
  1414. CardRechargeRecord.add_record(
  1415. card = card,
  1416. group = Group.get_group(order.groupId),
  1417. order = order,
  1418. device = self.device)
  1419. else:
  1420. order.processingLog.update({
  1421. 'result': result,
  1422. 'description': DeviceErrorCodeDesc.get(result),
  1423. 'syncBalance': balance.mongo_amount
  1424. })
  1425. order.save()
  1426. class IdStartAckEvent(AckEvent):
  1427. def __init__(self, smartBox, event_data, pre_processor = None):
  1428. super(IdStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1429. if self.pre_processor:
  1430. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1431. self.card = self.update_card_dealer_and_type(self.event_data.get('cardNo'), cardType=self.event_data.get('cardType', 'ID')) # type: Card
  1432. def post_before_start(self, order=None):
  1433. pass
  1434. def post_after_start(self, order=None):
  1435. raise NotImplementedError('must implement post_after_start.')
  1436. def post_before_finish(self, order=None):
  1437. pass
  1438. def post_after_finish(self, order=None):
  1439. raise NotImplementedError('must implement post_after_finish.')
  1440. def checkout_order(self, order):
  1441. raise NotImplementedError('must implement checkout_order.')
  1442. def get_or_create_order(self):
  1443. # type:()-> tuple
  1444. def new_one(order_id):
  1445. # type:( str)->ConsumeRecord
  1446. if self.event_data['order_id'] == order_id:
  1447. fee = VirtualCoin(self.event_data['fee'])
  1448. else:
  1449. sub = self.event_data.get('sub',[])
  1450. fee = VirtualCoin(map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0])
  1451. order = ConsumeRecord.objects(startKey = order_id).first()
  1452. if not order:
  1453. attach_paras = {'chargeIndex': self.event_data['port']}
  1454. order = ConsumeRecord.new_one(
  1455. order_no = ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.CARD),
  1456. user = MyUser(openId = self.card.openId, nickname = self.card.nickName),
  1457. device = self.device,
  1458. group = Group.get_group(self.device.groupId),
  1459. package=self.event_data.get('package', {}),
  1460. attach_paras = attach_paras,
  1461. pay_info = {
  1462. 'via': 'card',
  1463. 'coins': fee.mongo_amount
  1464. },
  1465. start_key = order_id)
  1466. order.servicedInfo = {'cardNo': self.event_data.get('cardNo'), 'chargeIndex': self.event_data['port']}
  1467. order.save()
  1468. card_order = CardConsumeRecord.objects(orderNo = order.orderNo).first()
  1469. if not card_order:
  1470. group = Group.get_group(self.device.groupId) # type:GroupDict
  1471. card_order = CardConsumeRecord(orderNo = order.orderNo,
  1472. openId = self.card.openId,
  1473. cardId = str(self.card.id),
  1474. money = fee.mongo_amount,
  1475. balance = self.card.balance.mongo_amount,
  1476. devNo = self.device.devNo,
  1477. devType = self.device.devTypeName,
  1478. logicalCode = self.device.logicalCode,
  1479. groupId = self.device.groupId,
  1480. address = group.address,
  1481. groupNumber = self.device.groupNumber,
  1482. groupName = group.groupName,
  1483. result = 'success',
  1484. remarks = u'刷卡消费',
  1485. dateTimeAdded = datetime.datetime.now(),
  1486. linkedConsumeRcdOrderNo = order.orderNo).save()
  1487. # 生成订单处理卡消费(预付费还是后付费)
  1488. try:
  1489. self.checkout_order(order)
  1490. except Exception as e:
  1491. order.isNormal = False
  1492. order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
  1493. order.save()
  1494. return order
  1495. def new_sub_order(order_id, master_order_id):
  1496. order = new_one(order_id)
  1497. order.isNormal = True
  1498. order.status = self.event_data['status']
  1499. order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1500. if 'fts' in self.event_data:
  1501. order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1502. if ('master' not in order.association) or (order.association['master'] != master_order_id):
  1503. order.association = {'master': master_order_id}
  1504. order.servicedInfo['masterOrderNo'] = master_order_id
  1505. order.save()
  1506. return order
  1507. master_order = new_one(self.event_data['order_id'])
  1508. master_order.save()
  1509. sub_orders = []
  1510. for item in self.event_data.get('sub', []):
  1511. sub_order = new_sub_order(item['order_id'], master_order.orderNo)
  1512. sub_orders.append(sub_order)
  1513. sub_order_id_list = [item.orderNo for item in sub_orders]
  1514. has_done = True
  1515. if master_order.status == ConsumeRecord.Status.FINISHED:
  1516. logger.debug('master order<{}> has been done.'.format(repr(master_order)))
  1517. has_done = True
  1518. if master_order.status == ConsumeRecord.Status.CREATED:
  1519. has_done = False
  1520. master_order.association = {'sub': sub_order_id_list}
  1521. if sub_order_id_list:
  1522. master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
  1523. else:
  1524. if master_order.status == ConsumeRecord.Status.RUNNING and \
  1525. self.event_data['status'] == ConsumeRecord.Status.FINISHED:
  1526. has_done = False
  1527. if set(master_order.association['sub']) != set(sub_order_id_list):
  1528. master_order.association = {'sub': sub_order_id_list}
  1529. if sub_order_id_list:
  1530. master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
  1531. has_done = False
  1532. master_order.isNormal = True
  1533. master_order.status = self.event_data['status']
  1534. if not master_order.startTime:
  1535. if 'sts' in self.event_data:
  1536. master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1537. else:
  1538. master_order.startTime = datetime.datetime.now()
  1539. if 'fts' in self.event_data:
  1540. master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1541. master_order.save()
  1542. return has_done, master_order, sub_orders
  1543. def merge_order(self, master_order, sub_orders):
  1544. # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
  1545. raise NotImplementedError('must implement merge_order.')
  1546. def do_finished_event(self, order, merge_order_info):
  1547. # type:(ConsumeRecord, dict)->None
  1548. raise NotImplementedError('must implement do_finished_event.')
  1549. def deal_running_order(self, master_order, sub_orders):
  1550. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1551. self.post_before_start(master_order)
  1552. try:
  1553. start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
  1554. cache_info = {
  1555. 'port': str(master_order.used_port),
  1556. 'cardId': str(self.card.id),
  1557. 'openId': self.card.openId,
  1558. 'startTime': start_time_str,
  1559. 'isStart': True,
  1560. 'status': Const.DEV_WORK_STATUS_WORKING,
  1561. 'orderNo': master_order.orderNo,
  1562. 'subOrderNos': [item.orderNo for item in sub_orders]
  1563. }
  1564. cache_info.update(self.merge_order(master_order, sub_orders))
  1565. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1566. if master_order.used_port < Const.NO_PORT:
  1567. current_port_info = current_cache_info.get(str(master_order.used_port), {})
  1568. if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get('status', 0) == Const.DEV_WORK_STATUS_IDLE:
  1569. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1570. else:
  1571. if start_time_str > current_port_info.get('startTime', '1970-01-01'):
  1572. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1573. else:
  1574. pass
  1575. else:
  1576. if not current_cache_info or (master_order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get('status', 0) == Const.DEV_WORK_STATUS_IDLE:
  1577. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1578. else:
  1579. if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
  1580. Device.update_dev_control_cache(self.device.devNo, cache_info)
  1581. else:
  1582. pass
  1583. ServiceProgress.new_progress_for_order(order=master_order,
  1584. device=self.device,
  1585. cache_info = cache_info)
  1586. finally:
  1587. self.post_after_start(order=master_order)
  1588. def deal_finished_order(self, master_order, sub_orders):
  1589. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1590. self.post_before_finish(master_order)
  1591. try:
  1592. self.do_finished_event(master_order, self.merge_order(master_order, sub_orders))
  1593. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1594. if master_order.used_port < Const.NO_PORT:
  1595. current_port_info = current_cache_info.get(str(master_order.used_port))
  1596. if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
  1597. Device.clear_port_control_cache(devNo = self.device.devNo, port = master_order.used_port)
  1598. else:
  1599. if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
  1600. Device.invalid_device_control_cache(self.device.devNo)
  1601. ServiceProgress.objects(open_id=self.card.openId,
  1602. device_imei=self.device.devNo,
  1603. port=int(master_order.used_port)).update_one(
  1604. upsert=False,
  1605. **{
  1606. 'isFinished': True,
  1607. 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
  1608. tzinfo=settings.TIME_ZONE).timestamp,
  1609. 'expireAt': datetime.datetime.now()
  1610. })
  1611. finally:
  1612. self.post_after_finish(order=master_order)
  1613. def do_impl(self, **args):
  1614. if 'port' not in self.event_data:
  1615. logger.warn('port is not exist.')
  1616. return
  1617. order_id = self.event_data['order_id']
  1618. order_type = self.event_data['order_type']
  1619. if not self.card or not self.card.openId or self.card.frozen:
  1620. logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
  1621. return
  1622. has_done, master_order, sub_orders = self.get_or_create_order()
  1623. if has_done:
  1624. logger.debug('order<{}> has been done.'.format(repr(master_order)))
  1625. return
  1626. if self.event_data['status'] == 'running':
  1627. self.deal_running_order(master_order, sub_orders)
  1628. if self.event_data['status'] == 'finished':
  1629. self.deal_finished_order(master_order, sub_orders)
  1630. class VirtualCardStartAckEvent(AckEvent):
  1631. def __init__(self, smartBox, event_data, pre_processor = None):
  1632. super(VirtualCardStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1633. if self.pre_processor:
  1634. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1635. self.card, self.virtualCard = self.get_virtual_card_by_card()
  1636. def get_virtual_card_by_card(self):
  1637. cardNo = self.event_data.get("cardNo")
  1638. card = Card.objects.filter(cardNo=cardNo).first()
  1639. if not card or not card.openId or float(card.balance) != 0:
  1640. logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
  1641. return
  1642. try:
  1643. dealer = Dealer.objects.get(id=card.dealerId)
  1644. agent = Agent.objects.get(id=dealer.agentId)
  1645. features = agent.features
  1646. except Exception as e:
  1647. features = []
  1648. return card, card.bound_virtual_card if "vCardNeedBind" in features else card.related_virtual_card
  1649. def post_after_start(self, order = None):
  1650. raise NotImplementedError('must implement post_after_start.')
  1651. def post_after_finish(self, order = None):
  1652. raise NotImplementedError('must implement post_after_finish.')
  1653. def checkout_order(self, order):
  1654. raise NotImplementedError('must implement checkout_order.')
  1655. def get_or_create_order(self):
  1656. # type:()-> tuple
  1657. def new_one(order_id):
  1658. # type:(str)->ConsumeRecord
  1659. if self.event_data['order_id'] == order_id: # 判断master_order还是sub
  1660. fee = VirtualCoin(self.event_data['fee'])
  1661. else:
  1662. sub = self.event_data.get('sub',[])
  1663. fee = map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0]
  1664. order = ConsumeRecord.objects(startKey = order_id).first()
  1665. if not order:
  1666. attach_paras = {'chargeIndex': self.event_data['port']}
  1667. order = ConsumeRecord.new_one(
  1668. order_no = ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.VIRTUAL_CARD),
  1669. user = MyUser(openId = self.card.openId, nickname = self.card.nickName),
  1670. device = self.device,
  1671. group = Group.get_group(self.device.groupId),
  1672. package = self.event_data.get('package',{}),
  1673. attach_paras=attach_paras,
  1674. pay_info={
  1675. 'via': 'virtualCard',
  1676. 'itemId': str(self.virtualCard.id),
  1677. 'money': fee.mongo_amount,
  1678. 'coins': fee.mongo_amount,
  1679. 'deduct': []
  1680. },
  1681. start_key=order_id)
  1682. order.servicedInfo = {'cardNo':self.event_data.get('cardNo'),'chargeIndex': self.event_data['port']}
  1683. order.save()
  1684. card_order = CardConsumeRecord.objects(orderNo = order.orderNo).first()
  1685. if not card_order:
  1686. group = Group.get_group(self.device.groupId) # type:GroupDict
  1687. card_order = CardConsumeRecord(orderNo = order.orderNo,
  1688. openId = self.card.openId,
  1689. cardId = str(self.card.id),
  1690. money = VirtualCoin(0).mongo_amount,
  1691. balance = self.card.balance.mongo_amount,
  1692. devNo = self.device.devNo,
  1693. devType = self.device.devTypeName,
  1694. logicalCode = self.device.logicalCode,
  1695. groupId = self.device.groupId,
  1696. address = group.address,
  1697. groupNumber = self.device.groupNumber,
  1698. groupName = group.groupName,
  1699. result = 'success',
  1700. remarks = u'虚拟卡消费',
  1701. dateTimeAdded = datetime.datetime.now(),
  1702. linkedConsumeRcdOrderNo = order.orderNo).save()
  1703. # 生成订单处理卡消费(预付费还是后付费)
  1704. try:
  1705. self.checkout_order(order)
  1706. except Exception as e:
  1707. order.isNormal = False
  1708. order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
  1709. order.save()
  1710. freeze_user_balance(self.device, group, order, self.virtualCard)
  1711. return order
  1712. def new_sub_order(order_id, master_order_id):
  1713. order = new_one(order_id)
  1714. order.isNormal = True
  1715. order.status = self.event_data['status']
  1716. order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1717. if 'fts' in self.event_data:
  1718. order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1719. if ('master' not in order.association) or (order.association['master'] != master_order_id):
  1720. order.association = {'master': master_order_id}
  1721. order.save()
  1722. return order
  1723. master_order = new_one(self.event_data['order_id'])
  1724. master_order.save()
  1725. sub_orders = []
  1726. for item in self.event_data.get('sub', []):
  1727. sub_order = new_sub_order(item['order_id'], master_order.orderNo)
  1728. sub_orders.append(sub_order)
  1729. sub_order_id_list = [item.orderNo for item in sub_orders]
  1730. has_done = True
  1731. if master_order.status == ConsumeRecord.Status.FINISHED:
  1732. logger.debug('master order<{}> has been done.'.format(repr(master_order)))
  1733. has_done = True
  1734. if master_order.status == ConsumeRecord.Status.CREATED:
  1735. has_done = False
  1736. master_order.association = {'sub': sub_order_id_list}
  1737. else:
  1738. if master_order.status == ConsumeRecord.Status.RUNNING and \
  1739. self.event_data['status'] == ConsumeRecord.Status.FINISHED:
  1740. has_done = False
  1741. if set(master_order.association['sub']) != set(sub_order_id_list):
  1742. master_order.association = {'sub': sub_order_id_list}
  1743. has_done = False
  1744. master_order.isNormal = True
  1745. master_order.status = self.event_data['status']
  1746. if not master_order.startTime:
  1747. if 'sts' in self.event_data:
  1748. master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
  1749. else:
  1750. master_order.startTime = datetime.datetime.now()
  1751. if 'fts' in self.event_data:
  1752. master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
  1753. master_order.save()
  1754. return has_done, master_order, sub_orders
  1755. def merge_order(self, master_order, sub_orders):
  1756. # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
  1757. raise NotImplementedError('must implement merge_order.')
  1758. def do_finished_event(self, order, sub_orders, merge_order_info):
  1759. # type:(ConsumeRecord,list, Iterable[ConsumeRecord])->None
  1760. raise NotImplementedError('must implement do_finished_event.')
  1761. def deal_running_order(self, master_order, sub_orders):
  1762. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1763. try:
  1764. start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
  1765. cache_info = {
  1766. 'port': str(master_order.used_port),
  1767. 'cardId': str(self.card.id),
  1768. 'openId': self.card.openId,
  1769. 'startTime': start_time_str,
  1770. 'isStart': True,
  1771. 'status': Const.DEV_WORK_STATUS_WORKING,
  1772. 'orderNo': master_order.orderNo,
  1773. 'subOrderNos': [item.orderNo for item in sub_orders]
  1774. }
  1775. cache_info.update(self.merge_order(master_order, sub_orders))
  1776. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1777. if master_order.used_port < Const.NO_PORT:
  1778. current_port_info = current_cache_info.get(str(master_order.used_port), {})
  1779. if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get('status', 0) == 0:
  1780. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1781. else:
  1782. if start_time_str > current_port_info.get('startTime', '1970-01-01'):
  1783. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1784. else:
  1785. pass
  1786. else:
  1787. if not current_cache_info:
  1788. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1789. else:
  1790. if master_order.orderNo == current_cache_info.get('orderNo', ''):
  1791. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1792. else:
  1793. if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
  1794. Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
  1795. else:
  1796. pass
  1797. ServiceProgress.new_progress_for_order(order = master_order,
  1798. device = self.device,
  1799. cache_info = cache_info)
  1800. finally:
  1801. self.post_after_start(order=master_order)
  1802. def deal_finished_order(self, master_order, sub_orders):
  1803. # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
  1804. try:
  1805. self.do_finished_event(master_order, sub_orders, self.merge_order(master_order, sub_orders))
  1806. current_cache_info = Device.get_dev_control_cache(self.device.devNo)
  1807. if master_order.used_port < Const.NO_PORT:
  1808. current_port_info = current_cache_info.get(str(master_order.used_port))
  1809. if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
  1810. Device.clear_port_control_cache(devNo = self.device.devNo, port = master_order.used_port)
  1811. else:
  1812. if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
  1813. Device.invalid_device_control_cache(self.device.devNo)
  1814. ServiceProgress.objects(open_id = self.card.openId,
  1815. device_imei = self.device.devNo,
  1816. port = int(master_order.used_port),
  1817. consumeOrder__orderNo = master_order.orderNo).update_one(
  1818. upsert = False,
  1819. **{
  1820. 'isFinished': True,
  1821. 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
  1822. tzinfo = settings.TIME_ZONE).timestamp,
  1823. 'expireAt': datetime.datetime.now()
  1824. })
  1825. finally:
  1826. self.post_after_finish(order=master_order)
  1827. def do_impl(self, **args):
  1828. if 'port' not in self.event_data:
  1829. logger.warn('port is not exist.')
  1830. return
  1831. order_id = self.event_data['order_id']
  1832. order_type = self.event_data['order_type']
  1833. cardType = self.event_data.get('cardType','ID')
  1834. card = self.update_card_dealer_and_type(str(self.event_data['cardNo']), cardType) # type: Card
  1835. if not card or not card.openId or card.frozen:
  1836. logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
  1837. return
  1838. has_done, master_order, sub_orders = self.get_or_create_order()
  1839. if has_done:
  1840. logger.debug('order<{}> has been done.'.format(repr(master_order)))
  1841. return
  1842. if self.event_data['status'] == 'running':
  1843. self.deal_running_order(master_order, sub_orders)
  1844. if self.event_data['status'] == 'finished':
  1845. self.deal_finished_order(master_order, sub_orders)
  1846. class CardRefundAckEvent(AckEvent):
  1847. def __init__(self, smartBox, event_data, pre_processor):
  1848. super(CardRefundAckEvent, self).__init__(smartBox, event_data, pre_processor)
  1849. if self.pre_processor:
  1850. self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
  1851. def do_impl(self, **args):
  1852. order_no = self.event_data['order_id']
  1853. order = RechargeRecord.objects(orderNo = order_no).first()
  1854. if order:
  1855. logger.debug('refund order<orderNo={}> has been done.'.format(order_no))
  1856. return
  1857. card = self.update_card_dealer_and_type(self.event_data['cardNo'], self.event_data['cardType']) # type: Card
  1858. if not card:
  1859. logger.debug('card<cardNo={}> is not bound.'.format(self.event_data['cardNo']))
  1860. return
  1861. self.refund_money_for_card(RMB(self.event_data['backMoney']), card.id, order_no)