# -*- coding: utf-8 -*- # !/usr/bin/env python import copy import datetime import logging import uuid from arrow import Arrow from bson.objectid import ObjectId from django.conf import settings from mongoengine.errors import DoesNotExist from mongoengine.errors import NotUniqueError from typing import Tuple, Optional, TYPE_CHECKING, Iterable from apilib.monetary import VirtualCoin, RMB, Ratio from apilib.utils_string import make_title_from_dict from apilib.utils_sys import memcache_lock from apps.web.agent.models import Agent from apps.web.south_intf.shangdong_platform import ShanDongNorther from apps.web.common.transaction import UserConsumeSubType from apps.web.constant import Const, FAULT_LEVEL, DeviceCmdCode, ErrorCode, DeviceErrorCodeDesc, \ START_DEVICE_STATUS, DEALER_CONSUMPTION_AGG_KIND from apps.web.core.networking import MessageSender from apps.web.dealer.define import DEALER_INCOME_SOURCE from apps.web.dealer.models import Dealer from apps.web.device.models import Device, FaultRecord, Group, GroupDict, Part, DeviceType from apps.web.eventer import Event from apps.web.report.ledger import Ledger from apps.web.south_intf.platform import notify_event_to_north from apps.web.user.models import MyUser, RechargeRecord, CardConsumeRecord, CardRechargeRecord, ConsumeRecord, Card, \ CardRechargeOrder, ServiceProgress from apps.web.user.models import RefundMoneyRecord from apps.web.user.transaction_deprecated import refund_money, refund_cash from apps.web.user.utils import freeze_user_balance from apps.web.utils import set_start_key_status if TYPE_CHECKING: from apps.web.core.adapter.base import SmartBox from apps.web.device.models import DeviceDict logger = logging.getLogger(__name__) class WorkEvent(Event): """ 开始工作或者结束工作的事件 """ def __init__(self, smartBox, event_data): # type:(SmartBox,dict)->None super(WorkEvent, self).__init__(smartBox) self.event_data = event_data def do(self, **args): Device.update_dev_control_cache(self.device['devNo'], self.event_data) def get_managerialOpenId_by_openId(self, openId): try: user = MyUser.objects.filter(openId = openId, groupId = self.device.groupId).first() if not user: return None else: return user.managerialOpenId except Exception, e: logger.exception('get managerial openid error=%s' % e) return None # 记录充值记录数据。注意需要在卡的余额修改后,然后记录 def record_refund_money_for_card(self, money, cardId, orderNo = None): card = Card.objects.get(id = cardId) groupId = self.device['groupId'] group = Group.get_group(groupId) if group is None: return False # 增加充值记录 payload = { 'orderNo': orderNo or str(uuid.uuid4()), 'devNo': self.device['devNo'], 'logicalCode': self.device['logicalCode'], 'devTypeName': self.device.devTypeName, 'ownerId': self.device['ownerId'], 'groupId': self.device['groupId'], 'groupName': group['groupName'], 'groupNumber': self.device['groupNumber'], 'address': group['address'], 'openId': card.openId, 'nickname': card.nickName, 'money': money, 'coins': VirtualCoin(money.amount), 'result': 'success', 'via': 'refund', 'attachParas': {'cardNo': card.cardNo} } record = RechargeRecord(**payload) try: record.save() except Exception, e: logger.error('cardNo = %s,save recharge record error=%s' % (card.cardNo, e)) return False newRcd = CardRechargeRecord( orderNo = payload.get('orderNo'), cardNo=card.cardNo, cardId = str(card.id), openId = card.openId, ownerId = ObjectId(self.device['ownerId']), money = money, coins = money, preBalance = card.balance - money, balance = card.balance, devNo = self.device['devNo'], devTypeCode = self.device['devType']['code'], logicalCode = self.device['logicalCode'], groupId = self.device['groupId'], address = group['address'], groupNumber = self.device['groupNumber'], groupName = group['groupName'], status = 'success', rechargeType = 'netpay', remarks = u'退币' ) try: newRcd.save() except Exception, e: logger.exception('save card consume rcd error=%s' % e) def refund_money_for_card(self, money, cardId, orderNo = None): # 添加金币 try: Card.get_collection().update({'_id': ObjectId(cardId)}, {'$inc': {'balance': money.mongo_amount}}) except Exception, e: logger.error('feedback coins error=%s' % e) return None self.record_refund_money_for_card(money, cardId, orderNo) try: card = Card.objects.get(id = cardId) card.lastMaxBalance = card.balance return card.save() except Exception, e: logger.info('save error=%s ' % e) # 用于比如刷卡回收余额的时候,还需要更新上次消费订单的消费数据 def update_consume_record_refund_money_for_card(self, money, card): try: # 有可能出现一张卡刷使用多个端口,这个时候回收余额只能分到3个消费订单上,因为回收余额的时候,没有上报回收具体哪一个端口的,所以无法对应到端口 cardRcds = CardConsumeRecord.objects.filter(openId = card.openId, cardId = str(card.id), result = 'success').order_by('-dateTimeAdded') for rcd in cardRcds: if rcd.servicedInfo.has_key('refundedMoney'): continue else: break rcd.servicedInfo.update({'refundedMoney': money.mongo_amount}) rcd.save() if not rcd.linkedConsumeRcdOrderNo: return consumeRcd = ConsumeRecord.objects.get(orderNo = rcd.linkedConsumeRcdOrderNo) consumeRcd.servicedInfo.update({'refundedMoney': money.mongo_amount}) consumeRcd.save() except Exception, e: logger.exception('update_consume_record_refund_money_for_card = %s,cardNo=%s' % (e, card.cardNo)) # 通知充电完成 def notify_charging_finished(self, managerialOpenId): if not managerialOpenId: return group = Group.get_group(self.device['groupId']) self.notify_user(managerialOpenId, 'service_complete', **{ 'title': u'您订购的服务已经完成', 'service': u'充电服务(设备编号:%s, 地址:%s)' % (self.device['logicalCode'], group['address']), 'finishTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'remark': u'谢谢您的支持' }) # ID/IC卡扣费。-1:异常错误,0:余额不足 1:正常 def consume_money_for_card(self, card, money): # type:(Card, RMB)->Tuple[int, RMB] # 如果卡被解绑了或者挂失了,直接判断 if card.openId == '' or card.frozen: return 0, RMB(0) if card.balance < money: card.balance = RMB(0) else: card.balance = (card.balance - money) try: card.save() except Exception as e: logger.exception(e) return -1, RMB(0) return 1, card.balance # 无值卡,通知扣费 def notify_balance_has_consume_for_card(self, card, money, desc = u''): if not card or not card.managerialOpenId: # 有可能卡没有绑定,就不需要 return self.notify_user(card.managerialOpenId, 'consume_notify', **{ 'title': u'亲,您绑定的卡:%s(名称:%s),正在扣费。%s\\n' % (card.cardNo, card.cardName, desc), 'money': u'%s元\\n' % money, 'serviceType': u'刷卡消费\\n', 'finishTime': datetime.datetime.now().strftime(Const.DATETIME_FMT) }) # 记录卡的消费 def record_consume_for_card(self, card, money, desc = u'', servicedInfo = None, sid = None, attachParas = None): # type: (Card, RMB, unicode, Optional[dict], Optional[str], Optional[dict])->Tuple[str, str] servicedInfo = {} if servicedInfo is None else servicedInfo attachParas = {} if attachParas is None else attachParas group = Group.get_group(self.device['groupId']) address = group['address'] group_number = self.device['groupNumber'] # TODO 这里应该与其他插入消费记录的地方一样,运用model 同时应该取消time字段与其他model一样用dateTimeAdded now = datetime.datetime.now() new_record = { 'orderNo': ConsumeRecord.make_no(card.cardNo, UserConsumeSubType.CARD), 'time': now.strftime("%Y-%m-%d %H:%M:%S"), 'dateTimeAdded': now, 'openId': card.openId, 'ownerId': self.device['ownerId'], 'coin': money.mongo_amount, 'money': money.mongo_amount, 'devNo': self.device['devNo'], 'logicalCode': self.device['logicalCode'], 'groupId': self.device['groupId'], 'address': address, 'groupNumber': group_number, 'groupName': group['groupName'], 'devTypeCode': self.device.devTypeCode, 'devTypeName': self.device.devTypeName, 'isNormal': True, 'status': ConsumeRecord.Status.RUNNING, 'remarks': u'刷卡消费', 'errorDesc': '', 'sequanceNo': '', 'desc': desc, 'attachParas': attachParas, 'servicedInfo': servicedInfo } ConsumeRecord.get_collection().insert_one(new_record) # 刷卡消费也记录一条数据 new_card_record = { 'orderNo': CardConsumeRecord.make_no(), 'openId': card.openId, 'cardId': str(card.id), 'money': money.mongo_amount, 'balance': card.balance.mongo_amount, 'devNo': self.device['devNo'], 'devType': self.device['devType']['name'], 'logicalCode': self.device['logicalCode'], 'groupId': self.device['groupId'], 'address': address, 'groupNumber': group_number, 'groupName': group['groupName'], 'result': 'success', 'remarks': u'刷卡消费', 'sequanceNo': '', 'dateTimeAdded': datetime.datetime.now(), 'desc': desc, 'servicedInfo': servicedInfo, 'linkedConsumeRcdOrderNo': str(new_record['orderNo']) } if sid is not None: new_card_record.update({'sid': sid}) CardConsumeRecord.get_collection().insert_one(new_card_record) return new_record['orderNo'], new_card_record['orderNo'] def record_consume_for_coin(self, money, desc=u'', servicedInfo=None, remarks=u'投币或者刷卡消费'): # type: (RMB, basestring, Optional[dict], basestring)->str """ 记录硬币的消费 :param money: :param desc: :param remarks: :param servicedInfo: :return: """ servicedInfo = {} if servicedInfo is None else servicedInfo group = Group.get_group(self.device['groupId']) address = group['address'] group_number = self.device['groupNumber'] now = datetime.datetime.now() new_record = { 'orderNo': ConsumeRecord.make_no(self.device.logicalCode, UserConsumeSubType.COIN), 'dateTimeAdded': now, 'openId': '', 'ownerId': self.device['ownerId'], 'coin': money.mongo_amount, 'money': money.mongo_amount, 'devNo': self.device['devNo'], 'logicalCode': self.device['logicalCode'], 'groupId': self.device['groupId'], 'address': address, 'groupNumber': group_number, 'groupName': group['groupName'], 'devTypeCode': self.device.devTypeCode, 'devTypeName': self.device.devTypeName, 'isNormal': True, 'status': ConsumeRecord.Status.RUNNING, 'remarks': remarks, 'errorDesc': '', 'sequanceNo': '', 'desc': desc, 'attachParas': {}, 'servicedInfo': servicedInfo } ConsumeRecord.get_collection().insert_one(new_record) return new_record['orderNo'] def find_card_by_card_no(self, cardNo): dealer = Dealer.objects(id = self.device.ownerId).first() # type: Dealer if not dealer: logger.error('dealer is not found, dealer id = {}'.format(self.device.ownerId)) return None agent = Agent.objects(id = dealer.agentId).first() # type: Agent if not agent: logger.error('agent is not found, agentId=%s' % dealer.agentId) return None group = Group.get_group(self.device.groupId) # type: GroupDict if not group: logger.error('group is not found, groupid=%s' % group.groupId) return None try: card = Card.objects.get(cardNo = cardNo, agentId = str(agent.id)) except DoesNotExist: return None except Exception as e: logger.error(e) return None return card def update_card_dealer_and_type(self, cardNo, cardType='ID'): """ :param cardNo: :param cardType: :return: """ dealer = Dealer.objects(id=self.device.ownerId).first() # type: Dealer if not dealer: logger.error('dealer is not found, dealer id = {}'.format(self.device.ownerId)) return None group = Group.get_group(self.device.groupId) # type: GroupDict if not group: logger.error('group is not found, groupId = %s' % group.groupId) return None try: card = Card.objects.get(cardNo=cardNo, dealerId=self.device.ownerId, openId__ne='') except DoesNotExist: logger.error('card is not exist, cardNo=%s' % cardNo) return None except Exception as e: logger.error(e) return None else: # 每次刷卡的时候 刷新一次devNo, 表示最后一次的卡刷的设备 card.devNo = self.device.devNo # 如果沒有綁定信息, 直接綁定本次刷卡的信息. 后续不再更新 if not card.dealerId or not card.groupId: card.dealerId = str(dealer.id) card.groupId = group.groupId card.cardType = cardType card.save() return card def recharge_id_card(self, card, rechargeType, order): # type: (Card, str, CardRechargeOrder)->None if not order or (order.money == RMB(0) and order.rechargeType != "sendCoin"): return status = Card.get_card_status(str(card.id)) if status == 'busy': return # 锁机制 card.__class__.set_card_status(str(card.id), 'busy') try: # 记录总额度的变化到充值订单中 balance = card.balance + order.coins preBalance = card.balance # 更新充值订单,已经完成 order.update_after_recharge_id_card(self.device, balance, preBalance) CardRechargeRecord.add_record( card=card, group=Group.get_group(order.groupId), order=order, device=self.device ) # 刷新卡里面的余额 card.recharge(order.chargeAmount, order.bestowAmount) card.account_recharge(order.rechargeOrder) except Exception as e: logger.exception(e) return finally: card.__class__.set_card_status(str(card.id), 'idle') def recharge_ic_card(self, card, preBalance, rechargeType, order, need_verify = True): # type:(Card, RMB, str, CardRechargeOrder, bool)->bool """ # rechargeType有两种,一种是用直接覆写overwrite的方式,一种是append追加钱的方式。 # 不同的的设备,充值的方式还不一样.注意:money是实际用户付的钱,coins是给用户充值的钱,比如付10块(money),充15(coins)。 :param card: :param preBalance: :param rechargeType: :param order: :return: """ if not order or order.coins == RMB(0): return False status = Card.get_card_status(str(card.id)) if status == 'busy': return False Card.set_card_status(str(card.id), 'busy') try: # IC卡需要下发到设备,设备写卡,把余额打入卡中 if rechargeType == 'overwrite': sendMoney = preBalance + order.coins else: sendMoney = order.coins # 先判断order最近一次充值是否OK. 满足三个条件才认为上次充值成功: # 1、操作时间已经超过三天 # 2、操作结果是串口超时, 即result == 1 # 3、当前余额大于最后一次充值操作的充值前余额 if need_verify and len(order.operationLog) > 0: log = order.operationLog[-1] result = log['result'] time_delta = (datetime.datetime.now() - log['time']).total_seconds() last_pre_balance = RMB(log['preBalance']) if (result == ErrorCode.DEVICE_CONN_FAIL or result == ErrorCode.BOARD_UART_TIMEOUT) \ and (time_delta > 3 * 24 * 3600 or preBalance > last_pre_balance): logger.debug('{} recharge verify result is finished.'.format(repr(card))) order.update_after_recharge_ic_card(device = self.device, sendMoney = sendMoney, preBalance = preBalance, result = ErrorCode.SUCCESS, description = u'充值校验结束') CardRechargeRecord.add_record( card = card, group = Group.get_group(order.groupId), order = order, device = self.device) return False try: operation_result, balance = self.deviceAdapter.recharge_card(card.cardNo, sendMoney, orderNo = order.orderNo) order.update_after_recharge_ic_card(device = self.device, sendMoney = sendMoney, preBalance = preBalance, syncBalance = balance, result = operation_result['result'], description = operation_result['description']) if operation_result['result'] != ErrorCode.SUCCESS: return False if not balance: balance = preBalance + order.coins CardRechargeRecord.add_record( card = card, group = Group.get_group(order.groupId), order = order, device = self.device) # 刷新卡里面的余额 card.balance = balance card.lastMaxBalance = balance card.save() return True except Exception as e: order.update_after_recharge_ic_card(device = self.device, sendMoney = sendMoney, preBalance = preBalance, syncBalance = balance, result = ErrorCode.EXCEPTION, description = e.message) return False except Exception as e: logger.exception(e) return False finally: Card.set_card_status(str(card.id), 'idle') def recharge_ic_card_realiable(self, card, preBalance, rechargeType, order, need_verify = True): # type:(Card, RMB, str, CardRechargeOrder, bool)->bool """ # rechargeType有两种,一种是用直接覆写overwrite的方式,一种是append追加钱的方式。 # 不同的的设备,充值的方式还不一样.注意:money是实际用户付的钱,coins是给用户充值的钱,比如付10块(money),充15(coins)。 :param card: :param preBalance: :param rechargeType: :param order: :return: """ if not order or order.coins == RMB(0): return False with memcache_lock('{}-{}-{}'.format(self.device.devNo, str(order.id), 'ic_recharge'), value = '1', expire = 300) as acquired: if not acquired: logger.debug('order<{}> is doing.'.format(repr(order))) return try: if order.status == 'finished': logger.debug('order<{}> has been finished.'.format(repr(order))) return # IC卡需要下发到设备,设备写卡,把余额打入卡中 if rechargeType == 'overwrite': sendMoney = preBalance + order.coins else: sendMoney = order.coins if not order.processingLog: order.init_processing_log(device = self.device, sendMoney = sendMoney, preBalance = preBalance) order.save() return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo) log = copy.deepcopy(order.processingLog) result = log.get('result', None) if result == ErrorCode.IC_RECHARGE_FAIL: if result: order.operationLog.append(log) order.init_processing_log(device = self.device, sendMoney = sendMoney, preBalance = preBalance) order.save() return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo) else: # 先判断order最近一次充值是否OK. 满足三个条件才认为上次 # 充值成功: # 1、操作时间已经超过三天 # 2、操作结果是串口超时, 即result == 1 # 3、当前余额大于最后一次充值操作的充值前余额 time_delta = (datetime.datetime.now() - log['time']).total_seconds() last_pre_balance = RMB(log['preBalance']) if (time_delta > 3 * 24 * 3600 or preBalance > last_pre_balance): logger.debug('{} recharge verify result is finished.'.format(repr(card))) log.update({ 'result': ErrorCode.SUCCESS, 'description': u'充值校验结束' }) order.operationLog.append(log) order.processingLog = {} order.status = 'finished' order.save() CardRechargeRecord.add_record( card = card, group = Group.get_group(order.groupId), order = order, device = self.device) else: logger.debug('verify not recharge order<{}>'.format(repr(order))) order.operationLog.append(log) order.init_processing_log(device = self.device, sendMoney = sendMoney, preBalance = preBalance) order.save() return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo) except Exception as e: logger.exception(e) def update_card_recharge_for_success_event(self, orderId, balance): logger.info('update_card_recharge_for_success_event,orderId=%s' % orderId) order = CardRechargeOrder.objects.get(id = ObjectId(orderId)) if order.status == 'finished': # double check,存在设备ack之前,已经发送了一条事件到路上了,所以需要检查 return card = Card.objects.get(id = order.cardId) group = Group.get_group(self.device['groupId']) RechargeRecord.get_collection().update( {'_id': ObjectId(order.rechargeNo)}, {'$set': { 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'groupId': self.device['groupId'], 'devNo': self.device['devNo'], 'logicalCode': self.device['logicalCode'], 'ownerId': self.device['ownerId'], 'groupName': group['groupName'], 'groupNumber': self.device['groupNumber'], 'address': group['address'], 'devType': self.device.get('devType', {}).get('name', ''), }}, multi = True ) #: 并且添加一条卡的成功充值的记录 newRcd = CardRechargeRecord( cardId = str(card.id), cardNo = card.cardNo, openId = card.openId, ownerId = ObjectId(self.device['ownerId']), money = order.money, coins = order.coins, balance = balance, preBalance = card.balance, devNo = self.device['devNo'], devTypeCode = self.device['devType']['code'], logicalCode = self.device['logicalCode'], groupId = self.device['groupId'], address = group['address'], groupNumber = self.device['groupNumber'], groupName = group['groupName'], status = 'success', rechargeType = 'netpay' ) try: newRcd.save() except Exception as e: logger.exception('save card consume rcd error=%s' % e) # 刷新卡里面的余额 try: card.balance = balance card.lastMaxBalance = balance card.save() except Exception as e: logger.exception(e) try: order.status = 'finished' order.rechargeType = 'netpay' order.save() except Exception as e: logger.exception(e) # 更新收录有值卡 def update_card_balance(self, card, balance): try: card.balance = balance card.save() except Exception as e: logger.exception(e) # 根据消耗的电量计算当前地址下的本次消耗电费 def calc_elec_fee(self, spendElec): group = Group.objects.get(id = self.device['groupId']) return float(group.otherConf.get('elecFee')) * spendElec def calc_service_fee(self, fee, elec_charge, service_charge): elecFee = fee * Ratio(elec_charge) * Ratio(1 / float(elec_charge + service_charge)) serviceFee = (fee - elecFee) return elecFee, serviceFee def time_ratio_pricing(self, value, leftTime, actualNeedTime): return value * Ratio(leftTime) * Ratio(1 / float(actualNeedTime)) def get_backCoins(self, coins, leftTime, actualNeedTime): return self.time_ratio_pricing(value = coins, leftTime = leftTime, actualNeedTime = actualNeedTime) def get_backMoney(self, money, leftTime, actualNeedTime): return self.time_ratio_pricing(value = money, leftTime = leftTime, actualNeedTime = actualNeedTime) def generate_service_complete_title_by_devType(self, devTypeId, templateMap): try: devType = DeviceType.objects(id=devTypeId).first() if devType is None: return '' if devType.finishedMessageTemplateDict == {}: return '' for _ in templateMap.keys(): if _ not in devType.finishedMessageTemplateDict['keys']: templateMap.pop(_) return devType.finishedMessageTemplateDict['template'].format(**templateMap) except Exception as e: logger.error(e) return '' def notify_user_service_complete(self, service_name, openid, port, address, reason, finished_time, url = None, extra = None): # type: (basestring, str, str, str, str, str, str, list)->None title_list = [ {u'': u'{}结束,感谢您的使用!'.format(service_name)}, {u'设备编号': self.device.logicalCode} ] if port: title_list.extend([{u'设备端口': port}, {u'设备地址': address}]) else: title_list.extend( [{u'设备地址': address}]) if extra: title_list.extend(extra) if reason: title_list.extend([{u'结束原因': reason}]) dealer = self.device.owner # type: Dealer if dealer.showServicePhone: remark = u'客服联系电话:{}'.format(dealer.service_phone) else: remark = u'我们竭诚为您服务,有任何问题请联系客服!' if not finished_time: finished_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') agent = Agent.objects.filter(id=str(dealer.agentId)).first() if not agent: logger.warning('agent is not exists.'.format(str(dealer.agentId))) return kw = {} # TODO: 微信订阅模板 if agent.customizedUserSubGzhAllowable: if port: service = u"{}服务({}-端口{})".format(self.device.majorDeviceType, self.device.logicalCode, port) if len(service) > 20: service = u'{}服务({})'.format(self.device.majorDeviceType, self.device.logicalCode) if len(service) > 20: service = u'{}服务'.format(self.device.majorDeviceType) else: service = u'{}服务({})'.format(service_name, self.device.logicalCode) if len(service) > 20: service = u'{}服务'.format(self.device.majorDeviceType) kw = { 'service': service, 'finishTime': finished_time, 'remark': remark, } else: kw = { 'title': make_title_from_dict(title_list), 'service': u'{}服务'.format(service_name), 'finishTime': finished_time, 'remark': remark } kw.update({'url': url or settings.SERVER_END_BASE_SSL_URL + '/user/index.html?v=1.0.31#/user/consume'}) self.notify_user(openid,'service_complete', **kw) def notify_to_sd_norther(self, portStr, consumeDict, platform=ShanDongNorther.platform.default): # type:(str, dict, int) -> None """ 上报 订单结束信息到山东 省平台 或者济南静态平台(同一份协议 不同服务器) """ try: logicalCode = self.device.logicalCode part = Part.objects.filter(logicalCode=logicalCode, partNo=portStr).first() dealer = self.device.owner if not part or not dealer: raise Exception("no part or no dealer") # 主动上报 能找到唯一的norther norther = ShanDongNorther.get_norther(dealerId=self.device.ownerId, platform=platform).first() # type: ShanDongNorther if not norther: return northerDict = { "connectorId": str(part.id), "startTime": consumeDict.get("startTime", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")), "endTime": consumeDict.get("finishedTime", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")), "totalPower": consumeDict.get("elec", 0), "totalElecMoney": consumeDict.get("spendMoney", 0), } logger.info("northerId:{}, devNo:{}, dealerId:{} norther to sd infoDict = {}".format( str(norther.id), self.device.devNo, self.device.ownerId, northerDict) ) # TODO 这个地方建议不同的实现放在model 内部实现 对外保持norther的notify接口统一 if norther.isSdPlatform: norther.notification_order_info(northerDict) else: norther.notification_order_info_jn(northerDict) except Exception as e: logger.info(e) def refund_net_pay(self, user, lineInfo, refundedMoney, refundCoins, consumeDict, is_cash): # type: (MyUser, dict, RMB, VirtualCoin, dict, bool)->None logger.debug( 'refund for net pay. user = {}, lineInfo = {}, refundedMoney = {}, refundCoins = {}, is_cash = {}'.format( repr(user), lineInfo, refundedMoney, refundCoins, is_cash)) refund_recharge_ids = [] if is_cash: if 'rechargeRcdId' in lineInfo: refund_recharge_ids.append(lineInfo['rechargeRcdId']) else: pay_info = lineInfo.get('payInfo', list()) for item in pay_info: if 'rechargeRcdId' in item: refund_recharge_ids.append(item['rechargeRcdId']) if refundCoins <= VirtualCoin(0): # 只能退现金的情况 consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: RMB(0).mongo_amount}) if len(refund_recharge_ids) > 0: logger.info('need refund cash, ids = %s' % str(refund_recharge_ids)) if refundedMoney <= RMB(0): consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: RMB(0).mongo_amount}) return left_refund = refundedMoney all_refund = RMB(0) for recharge_record_id in refund_recharge_ids[::-1]: recharge_record = RechargeRecord.objects(id = str(recharge_record_id)).first() # type: RechargeRecord if not recharge_record: logger.error('not find recharge record {}'.format(recharge_record_id)) continue if recharge_record.openId != lineInfo['openId']: logger.error('is not my record. {} != {}'.format(recharge_record.openId, lineInfo['openId'])) continue this_refund = min(recharge_record.money, left_refund) if this_refund <= RMB(0): continue logger.debug('try to refund money {} for recharge record {}'.format(this_refund, repr(recharge_record))) try: all_refund += this_refund refund_order = refund_cash( recharge_record, this_refund, VirtualCoin(0), user = user) # type: RefundMoneyRecord if not refund_order: logger.error( 'refund money<{}> failure. recharge = {}'.format(this_refund, repr(recharge_record))) except Exception as e: logger.exception(e) left_refund = left_refund - this_refund if left_refund <= RMB(0): break if all_refund > RMB(0): consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: all_refund.mongo_amount}) else: if refundCoins > VirtualCoin(0): consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_COINS: refundCoins.mongo_amount}) refund_money(self.device, refundCoins, lineInfo['openId']) class FaultEvent(Event): def __init__(self, smartBox, event_data): # type:(SmartBox,dict)->None super(FaultEvent, self).__init__(smartBox) self.event_data = event_data def is_notify_dealer(self): if self.dealer: return self.dealer.devFaultPushDealerSwitch else: return False def is_notify_user(self): if self.dealer: return self.dealer.devFaultPushUserSwitch else: return False def record(self, faultCode = '', description = None, title = '', detail = None, level = FAULT_LEVEL.NORMAL, portNo = 0): # 故障记录入库 if self.dealer is not None: dealerId = self.dealer.id else: dealerId = None logicalCode = self.device['logicalCode'] now = datetime.datetime.now() created_within_five_minutes = now - datetime.timedelta(minutes = 5) # 这个应该是防止重复报 try: fault_record = FaultRecord.objects( status="init", faultCode = faultCode, title = title, description = description, logicalCode = logicalCode, portNo = portNo, createdTime__gt = created_within_five_minutes ).first() if not fault_record: group = Group.get_group(self.device['groupId']) fault_record = FaultRecord( logicalCode = logicalCode, imei = self.device['devNo'], portNo = portNo, faultCode = faultCode, title = title, description = description or self.event_data.get('statusInfo', ''), groupName = group.get('groupName', ''), address = group.get('address', '') ) if dealerId is not None: fault_record.dealerId = dealerId if detail is not None: if not isinstance(detail, dict): raise TypeError('detail has to be a dict') fault_record.detail = detail fault_record = fault_record.save() else: logger.debug('%r existed within 5 minutes' % (fault_record,)) return fault_record except NotUniqueError as e: logger.error('cannot insert FaultRecord, dev=%s, error=%s' % (self.device['devNo'], e)) def do(self, **args): """不要将所有的信息不加筛选的打入缓存""" group = self.device.group # 通知经销商 if self.is_notify_dealer(): titleDictList = [ {u'告警名称': self.event_data.get("faultName", u"设备告警")}, {u'设备编号': self.device["logicalCode"]}, {u'地址名称': group["groupName"]} ] self.notify_dealer('device_fault', **{ 'title': make_title_from_dict(titleDictList), 'device': u'%s号设备-%s端口\\n' % ( self.device['groupNumber'], self.event_data.get('port')) if self.event_data.has_key( 'port') else u'%s号设备\\n' % self.device['groupNumber'], 'faultType': u'%s\\n' % self.event_data['faultType'] if self.event_data.has_key( 'faultType') else u'设备告警\\n', 'notifyTime': u'%s\\n' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'fault': u'%s(详细情况,建议您到管理后台的告警模块进行查看)\\n' % self.event_data['desc'] if self.event_data.has_key( 'desc') else self.event_data.get('statusInfo', ''), }) # 通知用户 if self.is_notify_user(): if self.device.has_key('openId'): user = MyUser.objects(openId = self.device.get('openId'), groupId = self.device['groupId']).first() self.notify_user(user.managerialOpenId if user else '', 'device_fault', **{ 'title': u'注意!注意!您正在使用的设备发生了故障,设备编码:%s' % self.device['logicalCode'], 'fault': self.event_data['statusInfo'], 'notifyTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 写入故障记录 faultRecord = self.record( faultCode = self.event_data.get('faultCode', ''), description = self.event_data.get('desc', None), title = self.event_data.get('faultName', ''), detail = self.event_data.get('detail', None), level = self.event_data.get('level', FAULT_LEVEL.NORMAL), portNo = self.event_data.get('port', 0) ) # 梁溪的告警 if faultRecord: self.north_to_liangxi(faultRecord) notify_event_to_north( self.dealer, self.device, level = Const.EVENT_NORMAL, desc = self.event_data['desc'] if self.event_data.has_key('desc') else self.event_data.get('statusInfo', '') ) def north_to_liangxi(self, faultRecord): from taskmanager.mediator import task_caller task_caller( 'send_to_xf_falut', devNo=self.device.devNo, faultId=str(faultRecord.id) ) def time_ratio_pricing(self, value, leftTime, actualNeedTime): return value * Ratio(leftTime) * Ratio(1 / float(actualNeedTime)) def get_backCoins(self, coins, leftTime, actualNeedTime): return self.time_ratio_pricing(value = coins, leftTime = leftTime, actualNeedTime = actualNeedTime) def get_backMoney(self, money, leftTime, actualNeedTime): return self.time_ratio_pricing(value = money, leftTime = leftTime, actualNeedTime = actualNeedTime) class AckEventProcessorIntf(object): def pre_processing(self, device, event_data): # type:(DeviceDict, dict)->dict return event_data class AckEvent(WorkEvent): """ 目前设计为不能重入. 如果执行有异常, 肯定是代码问题 """ def __init__(self, smartBox, event_data, pre_processor = None): # type:(SmartBox, dict, AckEventProcessorIntf)->None super(AckEvent, self).__init__(smartBox, event_data) self.pre_processor = pre_processor def ack_msg(self): if self.event_data['status'] == 'finishing': logger.debug('finishing status is not need to ack.') return payload = { 'order_id': self.event_data['order_id'], 'order_type': self.event_data['order_type'], 'status': self.event_data['status'] } # if response: # payload.update(response) MessageSender.send_no_wait(device = self.device, cmd = DeviceCmdCode.EVENT_ACK, payload = payload) def do_impl(self, **args): raise Exception('must implement do_impl interface.') def do(self, **args): if 'order_id' not in self.event_data or 'order_type' not in self.event_data: logger.error('order id or type is null.') return order_id = self.event_data['order_id'] order_type = self.event_data['order_type'] with memcache_lock('{}-{}-{}'.format(self.device.devNo, order_id, order_type), value = '1', expire = 300) as acquired: if not acquired: logger.debug('ack message<{}-{}-{}> is duplicate.'.format(self.device.devNo, order_id, order_type)) return try: self.do_impl(**args) except Exception as e: import traceback logger.warning(traceback.format_exc()) if 'status' in self.event_data: self.ack_msg() class ComNetPayAckEvent(AckEvent): def post_before_start(self, order=None): pass def post_after_start(self, order=None): raise NotImplementedError('must implement post_after_start.') def post_before_finish(self, order=None): pass def post_after_finish(self, order=None): raise NotImplementedError('must implement post_after_finish.') def do_finished_event(self, order, sub_orders, merge_order_info): raise NotImplementedError('must implement do_finished_event.') def do_running_order(self, order, result): # type: (ConsumeRecord, dict)->None self.post_before_start(order) if order.status in ['running', 'finished']: logger.debug('order<{}> no need to ack. this has done.'.format(repr(order))) return if order.status == 'unknown': dealer = self.device.owner # 设备支持上报告扣款 if 'device_callback_to_freeze_balance' in dealer.features: freeze_user_balance(self.device, Group.get_group(order.groupId), order) err_desc = u'设备启动成功,事件上报成功,补充扣款' else: raise Exception('Invalid operation!') else: set_start_key_status(start_key=order.startKey, state=START_DEVICE_STATUS.FINISHED, order_id=str(order.id)) err_desc = '' if 'master' in result: order.association = { 'master': result['master'] } order.servicedInfo.update({'masterOrderNo': result['master']}) order.errorDesc = err_desc order.isNormal = True order.status = 'running' order.startTime = datetime.datetime.fromtimestamp(result['sts']) order.save() self.post_after_start(order=order) def deal_running_event(self, order): # type: (ConsumeRecord)->None if 'sub' in self.event_data: order.association = { 'sub': [item['order_id'] for item in self.event_data['sub']] } order.servicedInfo.update( {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))}) order.save() self.do_running_order(order, self.event_data) sub_order_map = {} for item in self.event_data['sub']: item['master'] = self.event_data['order_id'] item['sts'] = self.event_data['sts'] sub_order_map[item['order_id']] = item sub_orders = [_ for _ in ConsumeRecord.objects.filter(orderNo__in = sub_order_map.keys())] for sub_order in sub_orders: self.do_running_order(sub_order, sub_order_map[sub_order.orderNo]) else: self.do_running_order(order, self.event_data) sub_orders = [] account_info = self.merge_order(order, sub_orders) cache_info = { 'startTime': Arrow.fromdatetime(order.startTime, tzinfo = settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'), 'status': Const.DEV_WORK_STATUS_WORKING, 'isStart': True, 'openId': order.openId, 'orderNo': order.orderNo, 'port': order.used_port } if sub_orders: cache_info.update({ 'subOrderNos': [_.orderNo for _ in sub_orders] }) cache_info.update(account_info) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if order.used_port < Const.NO_PORT: current_port_info = current_cache_info.get(str(order.used_port)) if not current_port_info or not current_cache_info.get("orderNo") or (order.orderNo == current_port_info.get('orderNo', None)) or current_port_info.get('status', 0) == 0: Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite') else: if not current_cache_info or not current_cache_info.get("orderNo") (order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get('status', 0) == 0: Device.update_dev_control_cache(self.device.devNo, cache_info) ServiceProgress.new_progress_for_order(order = order, device = self.device, cache_info = cache_info) def do_finished_order(self, order, result): self.post_before_finish(order=order) if 'sub' in result: order.association = { 'sub': [item['order_id'] for item in self.event_data['sub']] } order.servicedInfo.update( {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))}) elif 'master' in result: order.association = { 'master': result['master'] } order.servicedInfo.update({'masterOrderNo': result['master']}) if order.status == 'running': order.status = 'finished' order.finishedTime = datetime.datetime.fromtimestamp(result['fts']) order.save() else: if order.status == 'unknown': dealer = self.device.owner # 设备支持上报告扣款 if 'device_callback_to_freeze_balance' in dealer.features: freeze_user_balance(self.device, Group.get_group(order.groupId), order) err_desc = u'结束事件上报成功,补充扣款' else: raise Exception('Invalid operation!') else: # created状态 set_start_key_status(start_key=order.startKey, state=START_DEVICE_STATUS.FINISHED, order_id=str(order.id)) err_desc = '' order.isNormal = True order.status = 'finished' order.errorDesc = err_desc order.startTime = datetime.datetime.fromtimestamp(result['sts']) order.finishedTime = datetime.datetime.fromtimestamp(result['fts']) order.save() self.post_after_finish(order=order) def deal_finished_event(self, order): # type: (ConsumeRecord)->None if order.status == 'finished': logger.debug('order<{}> has finished.'.format(repr(order))) return self.do_finished_order(order, self.event_data) sub_orders = [] if 'sub' in self.event_data: sub_order_map = {} for item in self.event_data['sub']: item['status'] = 'finished' item['master'] = str(order.orderNo) item['sts'] = self.event_data['sts'] item['fts'] = self.event_data['fts'] sub_order_map[item['order_id']] = item sub_orders = [_ for _ in ConsumeRecord.objects.filter(orderNo__in = sub_order_map.keys())] for sub_order in sub_orders: self.do_finished_order(sub_order, sub_order_map[sub_order.orderNo]) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if order.used_port != Const.NO_PORT: current_port_info = current_cache_info.get(str(order.used_port)) if current_port_info and (order.orderNo == current_port_info.get('orderNo', None)): Device.clear_port_control_cache(self.device.devNo, order.used_port) else: if current_cache_info and order.orderNo == current_cache_info.get('orderNo', None): Device.invalid_device_control_cache(self.device.devNo) ServiceProgress.objects(open_id = order.openId, device_imei = self.device.devNo, port = int(order.used_port), consumeOrder__orderNo = order.orderNo).update_one( upsert = False, **{ 'isFinished': True, 'finished_time': Arrow.fromdatetime(order.finishedTime, tzinfo = settings.TIME_ZONE).timestamp, 'expireAt': datetime.datetime.now() }) merge_order_info = self.merge_order(order, sub_orders) self.do_finished_event(order, sub_orders, merge_order_info) def merge_order(self, master_order, sub_orders): # type:(ConsumeRecord, list)->dict raise NotImplementedError('must implement merge_order.') def do_impl(self, **args): order_id = self.event_data['order_id'] order = ConsumeRecord.objects(ownerId = self.device.ownerId, orderNo = order_id).first() # type: ConsumeRecord if not order: logger.debug('order is not exist.'.format(self.event_data['order_id'])) return if order.status in ['end', 'waitPay', 'finished']: logger.debug('order<{}> has been finished.'.format(repr(order))) return if order.used_port != self.event_data['port']: logger.error('port is not equal. {} != {}'.format(self.event_data['port'], order.used_port)) return if self.pre_processor: self.event_data = self.pre_processor.pre_processing(self.device, self.event_data) if self.event_data['status'] in ['running', 'finishing']: return self.deal_running_event(order) if self.event_data['status'] == 'finished': return self.deal_finished_event(order) class IcStartAckEvent(AckEvent): def __init__(self, smartBox, event_data, pre_processor=None): super(IcStartAckEvent, self).__init__(smartBox, event_data, pre_processor) if self.pre_processor: self.event_data = self.pre_processor.pre_processing(self.device, self.event_data) self.card = self.update_card_dealer_and_type(self.event_data.get('cardNo'), cardType=self.event_data.get('cardType', 'IC')) def post_before_start(self, order=None): pass def post_after_start(self, order=None): raise NotImplementedError('must implement post_after_start.') def post_before_finish(self, order=None): pass def post_after_finish(self, order=None): raise NotImplementedError('must implement post_after_finish.') def checkout_order(self, order): raise NotImplementedError('must implement checkout_order.') def get_or_create_order(self): # type:()-> tuple def new_one(order_id): # type:( str)->ConsumeRecord if self.event_data['order_id'] == order_id: fee = VirtualCoin(self.event_data['fee']) else: sub = self.event_data.get('sub', []) fee = VirtualCoin(map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0]) order = ConsumeRecord.objects(startKey=order_id).first() if not order: attach_paras = {'chargeIndex': self.event_data['port']} order = ConsumeRecord.new_one( order_no=ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.CARD), user=MyUser(openId=self.card.openId, nickname=self.card.nickName), device=self.device, group=Group.get_group(self.device.groupId), package=self.event_data.get('package', {}), attach_paras=attach_paras, pay_info={ 'via': 'card', 'coins': fee.mongo_amount }, start_key=order_id) order.servicedInfo = {'cardNo': self.event_data.get('cardNo'), 'chargeIndex': self.event_data['port']} order.save() card_order = CardConsumeRecord.objects(orderNo=order.orderNo).first() if not card_order: group = Group.get_group(self.device.groupId) # type:GroupDict card_order = CardConsumeRecord(orderNo=order.orderNo, openId=self.card.openId, cardId=str(self.card.id), money=fee.mongo_amount, balance=self.card.balance.mongo_amount, devNo=self.device.devNo, devType=self.device.devTypeName, logicalCode=self.device.logicalCode, groupId=self.device.groupId, address=group.address, groupNumber=self.device.groupNumber, groupName=group.groupName, result='success', remarks=u'刷卡消费', dateTimeAdded=datetime.datetime.now(), linkedConsumeRcdOrderNo=order.orderNo).save() # 生成订单处理卡消费(预付费还是后付费) try: self.checkout_order(order) except Exception as e: order.isNormal = False order.errorDesc = '订单建立结算失败! ERROR={}'.format(e) order.save() return order def new_sub_order(order_id, master_order_id): order = new_one(order_id) order.isNormal = True order.status = self.event_data['status'] order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts']) if 'fts' in self.event_data: order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts']) if ('master' not in order.association) or (order.association['master'] != master_order_id): order.association = {'master': master_order_id} order.servicedInfo['masterOrderNo'] = master_order_id order.save() return order master_order = new_one(self.event_data['order_id']) master_order.save() sub_orders = [] for item in self.event_data.get('sub', []): sub_order = new_sub_order(item['order_id'], master_order.orderNo) sub_orders.append(sub_order) sub_order_id_list = [item.orderNo for item in sub_orders] has_done = True if master_order.status == ConsumeRecord.Status.FINISHED: logger.debug('master order<{}> has been done.'.format(repr(master_order))) has_done = True if master_order.status == ConsumeRecord.Status.CREATED: has_done = False master_order.association = {'sub': sub_order_id_list} if sub_order_id_list: master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list) else: if master_order.status == ConsumeRecord.Status.RUNNING and \ self.event_data['status'] == ConsumeRecord.Status.FINISHED: has_done = False if set(master_order.association['sub']) != set(sub_order_id_list): master_order.association = {'sub': sub_order_id_list} if sub_order_id_list: master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list) has_done = False master_order.isNormal = True master_order.status = self.event_data['status'] if not master_order.startTime: if 'sts' in self.event_data: master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts']) else: master_order.startTime = datetime.datetime.now() if 'fts' in self.event_data: master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts']) master_order.save() return has_done, master_order, sub_orders def merge_order(self, master_order, sub_orders): # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict raise NotImplementedError('must implement merge_order.') def do_finished_event(self, order, merge_order_info): # type:(ConsumeRecord, dict)->None raise NotImplementedError('must implement do_finished_event.') def deal_running_order(self, master_order, sub_orders): # type: (ConsumeRecord, Iterable[ConsumeRecord])->None self.post_before_start(master_order) try: start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT) cache_info = { 'port': str(master_order.used_port), 'cardId': str(self.card.id), 'openId': self.card.openId, 'startTime': start_time_str, 'isStart': True, 'status': Const.DEV_WORK_STATUS_WORKING, 'orderNo': master_order.orderNo, 'subOrderNos': [item.orderNo for item in sub_orders] } cache_info.update(self.merge_order(master_order, sub_orders)) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if master_order.used_port < Const.NO_PORT: current_port_info = current_cache_info.get(str(master_order.used_port), {}) if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get( 'status', 0) == Const.DEV_WORK_STATUS_IDLE: Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite') else: if start_time_str > current_port_info.get('startTime', '1970-01-01'): Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite') else: pass else: if not current_cache_info or ( master_order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get( 'status', 0) == Const.DEV_WORK_STATUS_IDLE: Device.update_dev_control_cache(self.device.devNo, cache_info) else: if start_time_str > current_cache_info.get('startTime', '1970-01-01'): Device.update_dev_control_cache(self.device.devNo, cache_info) else: pass ServiceProgress.new_progress_for_order(order=master_order, device=self.device, cache_info=cache_info) finally: self.post_after_start(order=master_order) def deal_finished_order(self, master_order, sub_orders): # type: (ConsumeRecord, Iterable[ConsumeRecord])->None self.post_before_finish(master_order) try: self.do_finished_event(master_order, self.merge_order(master_order, sub_orders)) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if master_order.used_port < Const.NO_PORT: current_port_info = current_cache_info.get(str(master_order.used_port)) if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')): Device.clear_port_control_cache(devNo=self.device.devNo, port=master_order.used_port) else: if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')): Device.invalid_device_control_cache(self.device.devNo) ServiceProgress.objects(open_id=self.card.openId, device_imei=self.device.devNo, port=int(master_order.used_port)).update_one( upsert=False, **{ 'isFinished': True, 'finished_time': Arrow.fromdatetime(master_order.finishedTime, tzinfo=settings.TIME_ZONE).timestamp, 'expireAt': datetime.datetime.now() }) finally: self.post_after_finish(order=master_order) def do_impl(self, **args): if 'port' not in self.event_data: logger.warn('port is not exist.') return order_id = self.event_data['order_id'] order_type = self.event_data['order_type'] if not self.card or not self.card.openId or self.card.frozen: logger.debug('card is not exists.'.format(self.event_data['cardNo'])) return has_done, master_order, sub_orders = self.get_or_create_order() if has_done: logger.debug('order<{}> has been done.'.format(repr(master_order))) return if self.event_data['status'] == 'running': self.deal_running_order(master_order, sub_orders) if self.event_data['status'] == 'finished': self.deal_finished_order(master_order, sub_orders) class IcRechargeAckEvent(AckEvent): def __init__(self, smartBox, event_data, pre_processor = None): super(IcRechargeAckEvent, self).__init__(smartBox, event_data, pre_processor) if self.pre_processor: self.event_data = self.pre_processor.pre_processing(self.device, self.event_data) def do_impl(self, **args): order = CardRechargeOrder.objects(orderNo = self.event_data['order_id']).first() # type: CardRechargeOrder if not order: logger.error('order is not exist.'.format(self.event_data['order_id'])) return if order.status == 'finished': logger.debug('order<{}> has be done.'.format(repr(order))) return if not order.processingLog: logger.error('order<{}> has no processing log.'.format(repr(order))) return cardNo = self.event_data['cardNo'] if str(cardNo) != order.cardNo: logger.error('check card no failure. {} != {}'.format(cardNo, order.cardNo)) return card = Card.objects(cardNo = order.cardNo).first() if not card: logger.error('not find card<{}>'.format(card.cardNo)) return result = self.event_data['rst'] if result == ErrorCode.BOARD_UART_TIMEOUT: order.processingLog['result'] = result order.processingLog.update({ 'result': self.event_data['rst'], 'description': DeviceErrorCodeDesc.get(result) }) order.save() else: # 刷新卡里面的余额 balance = VirtualCoin(self.event_data.get('balance')) if balance != card.balance: card.balance = balance card.lastMaxBalance = balance card.save() if result == ErrorCode.DEVICE_SUCCESS: log = copy.deepcopy(order.processingLog) log.update({ 'result': result, 'description': DeviceErrorCodeDesc.get(result), 'syncBalance': balance.mongo_amount }) order.operationLog.append(log) order.processingLog = {} order.status = 'finished' order.save() CardRechargeRecord.add_record( card = card, group = Group.get_group(order.groupId), order = order, device = self.device) else: order.processingLog.update({ 'result': result, 'description': DeviceErrorCodeDesc.get(result), 'syncBalance': balance.mongo_amount }) order.save() class IdStartAckEvent(AckEvent): def __init__(self, smartBox, event_data, pre_processor = None): super(IdStartAckEvent, self).__init__(smartBox, event_data, pre_processor) if self.pre_processor: self.event_data = self.pre_processor.pre_processing(self.device, self.event_data) self.card = self.update_card_dealer_and_type(self.event_data.get('cardNo'), cardType=self.event_data.get('cardType', 'ID')) # type: Card def post_before_start(self, order=None): pass def post_after_start(self, order=None): raise NotImplementedError('must implement post_after_start.') def post_before_finish(self, order=None): pass def post_after_finish(self, order=None): raise NotImplementedError('must implement post_after_finish.') def checkout_order(self, order): raise NotImplementedError('must implement checkout_order.') def get_or_create_order(self): # type:()-> tuple def new_one(order_id): # type:( str)->ConsumeRecord if self.event_data['order_id'] == order_id: fee = VirtualCoin(self.event_data['fee']) else: sub = self.event_data.get('sub',[]) fee = VirtualCoin(map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0]) order = ConsumeRecord.objects(startKey = order_id).first() if not order: attach_paras = {'chargeIndex': self.event_data['port']} order = ConsumeRecord.new_one( order_no = ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.CARD), user = MyUser(openId = self.card.openId, nickname = self.card.nickName), device = self.device, group = Group.get_group(self.device.groupId), package=self.event_data.get('package', {}), attach_paras = attach_paras, pay_info = { 'via': 'card', 'coins': fee.mongo_amount }, start_key = order_id) order.servicedInfo = {'cardNo': self.event_data.get('cardNo'), 'chargeIndex': self.event_data['port']} order.save() card_order = CardConsumeRecord.objects(orderNo = order.orderNo).first() if not card_order: group = Group.get_group(self.device.groupId) # type:GroupDict card_order = CardConsumeRecord(orderNo = order.orderNo, openId = self.card.openId, cardId = str(self.card.id), money = fee.mongo_amount, balance = self.card.balance.mongo_amount, devNo = self.device.devNo, devType = self.device.devTypeName, logicalCode = self.device.logicalCode, groupId = self.device.groupId, address = group.address, groupNumber = self.device.groupNumber, groupName = group.groupName, result = 'success', remarks = u'刷卡消费', dateTimeAdded = datetime.datetime.now(), linkedConsumeRcdOrderNo = order.orderNo).save() # 生成订单处理卡消费(预付费还是后付费) try: self.checkout_order(order) except Exception as e: order.isNormal = False order.errorDesc = '订单建立结算失败! ERROR={}'.format(e) order.save() return order def new_sub_order(order_id, master_order_id): order = new_one(order_id) order.isNormal = True order.status = self.event_data['status'] order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts']) if 'fts' in self.event_data: order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts']) if ('master' not in order.association) or (order.association['master'] != master_order_id): order.association = {'master': master_order_id} order.servicedInfo['masterOrderNo'] = master_order_id order.save() return order master_order = new_one(self.event_data['order_id']) master_order.save() sub_orders = [] for item in self.event_data.get('sub', []): sub_order = new_sub_order(item['order_id'], master_order.orderNo) sub_orders.append(sub_order) sub_order_id_list = [item.orderNo for item in sub_orders] has_done = True if master_order.status == ConsumeRecord.Status.FINISHED: logger.debug('master order<{}> has been done.'.format(repr(master_order))) has_done = True if master_order.status == ConsumeRecord.Status.CREATED: has_done = False master_order.association = {'sub': sub_order_id_list} if sub_order_id_list: master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list) else: if master_order.status == ConsumeRecord.Status.RUNNING and \ self.event_data['status'] == ConsumeRecord.Status.FINISHED: has_done = False if set(master_order.association['sub']) != set(sub_order_id_list): master_order.association = {'sub': sub_order_id_list} if sub_order_id_list: master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list) has_done = False master_order.isNormal = True master_order.status = self.event_data['status'] if not master_order.startTime: if 'sts' in self.event_data: master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts']) else: master_order.startTime = datetime.datetime.now() if 'fts' in self.event_data: master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts']) master_order.save() return has_done, master_order, sub_orders def merge_order(self, master_order, sub_orders): # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict raise NotImplementedError('must implement merge_order.') def do_finished_event(self, order, merge_order_info): # type:(ConsumeRecord, dict)->None raise NotImplementedError('must implement do_finished_event.') def deal_running_order(self, master_order, sub_orders): # type: (ConsumeRecord, Iterable[ConsumeRecord])->None self.post_before_start(master_order) try: start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT) cache_info = { 'port': str(master_order.used_port), 'cardId': str(self.card.id), 'openId': self.card.openId, 'startTime': start_time_str, 'isStart': True, 'status': Const.DEV_WORK_STATUS_WORKING, 'orderNo': master_order.orderNo, 'subOrderNos': [item.orderNo for item in sub_orders] } cache_info.update(self.merge_order(master_order, sub_orders)) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if master_order.used_port < Const.NO_PORT: current_port_info = current_cache_info.get(str(master_order.used_port), {}) if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get('status', 0) == Const.DEV_WORK_STATUS_IDLE: Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite') else: if start_time_str > current_port_info.get('startTime', '1970-01-01'): Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite') else: pass else: if not current_cache_info or (master_order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get('status', 0) == Const.DEV_WORK_STATUS_IDLE: Device.update_dev_control_cache(self.device.devNo, cache_info) else: if start_time_str > current_cache_info.get('startTime', '1970-01-01'): Device.update_dev_control_cache(self.device.devNo, cache_info) else: pass ServiceProgress.new_progress_for_order(order=master_order, device=self.device, cache_info = cache_info) finally: self.post_after_start(order=master_order) def deal_finished_order(self, master_order, sub_orders): # type: (ConsumeRecord, Iterable[ConsumeRecord])->None self.post_before_finish(master_order) try: self.do_finished_event(master_order, self.merge_order(master_order, sub_orders)) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if master_order.used_port < Const.NO_PORT: current_port_info = current_cache_info.get(str(master_order.used_port)) if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')): Device.clear_port_control_cache(devNo = self.device.devNo, port = master_order.used_port) else: if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')): Device.invalid_device_control_cache(self.device.devNo) ServiceProgress.objects(open_id=self.card.openId, device_imei=self.device.devNo, port=int(master_order.used_port)).update_one( upsert=False, **{ 'isFinished': True, 'finished_time': Arrow.fromdatetime(master_order.finishedTime, tzinfo=settings.TIME_ZONE).timestamp, 'expireAt': datetime.datetime.now() }) finally: self.post_after_finish(order=master_order) def do_impl(self, **args): if 'port' not in self.event_data: logger.warn('port is not exist.') return order_id = self.event_data['order_id'] order_type = self.event_data['order_type'] if not self.card or not self.card.openId or self.card.frozen: logger.debug('card is not exists.'.format(self.event_data['cardNo'])) return has_done, master_order, sub_orders = self.get_or_create_order() if has_done: logger.debug('order<{}> has been done.'.format(repr(master_order))) return if self.event_data['status'] == 'running': self.deal_running_order(master_order, sub_orders) if self.event_data['status'] == 'finished': self.deal_finished_order(master_order, sub_orders) class VirtualCardStartAckEvent(AckEvent): def __init__(self, smartBox, event_data, pre_processor = None): super(VirtualCardStartAckEvent, self).__init__(smartBox, event_data, pre_processor) if self.pre_processor: self.event_data = self.pre_processor.pre_processing(self.device, self.event_data) self.card, self.virtualCard = self.get_virtual_card_by_card() def get_virtual_card_by_card(self): cardNo = self.event_data.get("cardNo") card = Card.objects.filter(cardNo=cardNo).first() if not card or not card.openId or float(card.balance) != 0: logger.debug('card is not exists.'.format(self.event_data['cardNo'])) return try: dealer = Dealer.objects.get(id=card.dealerId) agent = Agent.objects.get(id=dealer.agentId) features = agent.features except Exception as e: features = [] return card, card.bound_virtual_card if "vCardNeedBind" in features else card.related_virtual_card def post_after_start(self, order = None): raise NotImplementedError('must implement post_after_start.') def post_after_finish(self, order = None): raise NotImplementedError('must implement post_after_finish.') def checkout_order(self, order): raise NotImplementedError('must implement checkout_order.') def get_or_create_order(self): # type:()-> tuple def new_one(order_id): # type:(str)->ConsumeRecord if self.event_data['order_id'] == order_id: # 判断master_order还是sub fee = VirtualCoin(self.event_data['fee']) else: sub = self.event_data.get('sub',[]) fee = map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0] order = ConsumeRecord.objects(startKey = order_id).first() if not order: attach_paras = {'chargeIndex': self.event_data['port']} order = ConsumeRecord.new_one( order_no = ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.VIRTUAL_CARD), user = MyUser(openId = self.card.openId, nickname = self.card.nickName), device = self.device, group = Group.get_group(self.device.groupId), package = self.event_data.get('package',{}), attach_paras=attach_paras, pay_info={ 'via': 'virtualCard', 'itemId': str(self.virtualCard.id), 'money': fee.mongo_amount, 'coins': fee.mongo_amount, 'deduct': [] }, start_key=order_id) order.servicedInfo = {'cardNo':self.event_data.get('cardNo'),'chargeIndex': self.event_data['port']} order.save() card_order = CardConsumeRecord.objects(orderNo = order.orderNo).first() if not card_order: group = Group.get_group(self.device.groupId) # type:GroupDict card_order = CardConsumeRecord(orderNo = order.orderNo, openId = self.card.openId, cardId = str(self.card.id), money = VirtualCoin(0).mongo_amount, balance = self.card.balance.mongo_amount, devNo = self.device.devNo, devType = self.device.devTypeName, logicalCode = self.device.logicalCode, groupId = self.device.groupId, address = group.address, groupNumber = self.device.groupNumber, groupName = group.groupName, result = 'success', remarks = u'虚拟卡消费', dateTimeAdded = datetime.datetime.now(), linkedConsumeRcdOrderNo = order.orderNo).save() # 生成订单处理卡消费(预付费还是后付费) try: self.checkout_order(order) except Exception as e: order.isNormal = False order.errorDesc = '订单建立结算失败! ERROR={}'.format(e) order.save() freeze_user_balance(self.device, group, order, self.virtualCard) return order def new_sub_order(order_id, master_order_id): order = new_one(order_id) order.isNormal = True order.status = self.event_data['status'] order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts']) if 'fts' in self.event_data: order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts']) if ('master' not in order.association) or (order.association['master'] != master_order_id): order.association = {'master': master_order_id} order.save() return order master_order = new_one(self.event_data['order_id']) master_order.save() sub_orders = [] for item in self.event_data.get('sub', []): sub_order = new_sub_order(item['order_id'], master_order.orderNo) sub_orders.append(sub_order) sub_order_id_list = [item.orderNo for item in sub_orders] has_done = True if master_order.status == ConsumeRecord.Status.FINISHED: logger.debug('master order<{}> has been done.'.format(repr(master_order))) has_done = True if master_order.status == ConsumeRecord.Status.CREATED: has_done = False master_order.association = {'sub': sub_order_id_list} else: if master_order.status == ConsumeRecord.Status.RUNNING and \ self.event_data['status'] == ConsumeRecord.Status.FINISHED: has_done = False if set(master_order.association['sub']) != set(sub_order_id_list): master_order.association = {'sub': sub_order_id_list} has_done = False master_order.isNormal = True master_order.status = self.event_data['status'] if not master_order.startTime: if 'sts' in self.event_data: master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts']) else: master_order.startTime = datetime.datetime.now() if 'fts' in self.event_data: master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts']) master_order.save() return has_done, master_order, sub_orders def merge_order(self, master_order, sub_orders): # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict raise NotImplementedError('must implement merge_order.') def do_finished_event(self, order, sub_orders, merge_order_info): # type:(ConsumeRecord,list, Iterable[ConsumeRecord])->None raise NotImplementedError('must implement do_finished_event.') def deal_running_order(self, master_order, sub_orders): # type: (ConsumeRecord, Iterable[ConsumeRecord])->None try: start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT) cache_info = { 'port': str(master_order.used_port), 'cardId': str(self.card.id), 'openId': self.card.openId, 'startTime': start_time_str, 'isStart': True, 'status': Const.DEV_WORK_STATUS_WORKING, 'orderNo': master_order.orderNo, 'subOrderNos': [item.orderNo for item in sub_orders] } cache_info.update(self.merge_order(master_order, sub_orders)) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if master_order.used_port < Const.NO_PORT: current_port_info = current_cache_info.get(str(master_order.used_port), {}) if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get('status', 0) == 0: Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite') else: if start_time_str > current_port_info.get('startTime', '1970-01-01'): Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite') else: pass else: if not current_cache_info: Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite') else: if master_order.orderNo == current_cache_info.get('orderNo', ''): Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite') else: if start_time_str > current_cache_info.get('startTime', '1970-01-01'): Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite') else: pass ServiceProgress.new_progress_for_order(order = master_order, device = self.device, cache_info = cache_info) finally: self.post_after_start(order=master_order) def deal_finished_order(self, master_order, sub_orders): # type: (ConsumeRecord, Iterable[ConsumeRecord])->None try: self.do_finished_event(master_order, sub_orders, self.merge_order(master_order, sub_orders)) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if master_order.used_port < Const.NO_PORT: current_port_info = current_cache_info.get(str(master_order.used_port)) if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')): Device.clear_port_control_cache(devNo = self.device.devNo, port = master_order.used_port) else: if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')): Device.invalid_device_control_cache(self.device.devNo) ServiceProgress.objects(open_id = self.card.openId, device_imei = self.device.devNo, port = int(master_order.used_port), consumeOrder__orderNo = master_order.orderNo).update_one( upsert = False, **{ 'isFinished': True, 'finished_time': Arrow.fromdatetime(master_order.finishedTime, tzinfo = settings.TIME_ZONE).timestamp, 'expireAt': datetime.datetime.now() }) finally: self.post_after_finish(order=master_order) def do_impl(self, **args): if 'port' not in self.event_data: logger.warn('port is not exist.') return order_id = self.event_data['order_id'] order_type = self.event_data['order_type'] cardType = self.event_data.get('cardType','ID') card = self.update_card_dealer_and_type(str(self.event_data['cardNo']), cardType) # type: Card if not card or not card.openId or card.frozen: logger.debug('card is not exists.'.format(self.event_data['cardNo'])) return has_done, master_order, sub_orders = self.get_or_create_order() if has_done: logger.debug('order<{}> has been done.'.format(repr(master_order))) return if self.event_data['status'] == 'running': self.deal_running_order(master_order, sub_orders) if self.event_data['status'] == 'finished': self.deal_finished_order(master_order, sub_orders) class CardRefundAckEvent(AckEvent): def __init__(self, smartBox, event_data, pre_processor): super(CardRefundAckEvent, self).__init__(smartBox, event_data, pre_processor) if self.pre_processor: self.event_data = self.pre_processor.pre_processing(self.device, self.event_data) def do_impl(self, **args): order_no = self.event_data['order_id'] order = RechargeRecord.objects(orderNo = order_no).first() if order: logger.debug('refund order has been done.'.format(order_no)) return card = self.update_card_dealer_and_type(self.event_data['cardNo'], self.event_data['cardType']) # type: Card if not card: logger.debug('card is not bound.'.format(self.event_data['cardNo'])) return self.refund_money_for_card(RMB(self.event_data['backMoney']), card.id, order_no)