|
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import copy
- import datetime
- import logging
- import uuid
- from arrow import Arrow
- from bson.objectid import ObjectId
- from django.conf import settings
- from mongoengine.errors import DoesNotExist
- from mongoengine.errors import NotUniqueError
- from typing import Tuple, Optional, TYPE_CHECKING, Iterable
- from apilib.monetary import VirtualCoin, RMB, Ratio
- from apilib.utils_string import make_title_from_dict
- from apilib.utils_sys import memcache_lock
- from apps.web.agent.models import Agent
- from apps.web.api.ykt_north.view import custQueryConsume,custquery
- from apps.web.south_intf.shangdong_platform import ShanDongNorther
- from apps.web.common.transaction import UserConsumeSubType
- from apps.web.constant import Const, FAULT_LEVEL, DeviceCmdCode, ErrorCode, DeviceErrorCodeDesc, \
- START_DEVICE_STATUS, DEALER_CONSUMPTION_AGG_KIND
- from apps.web.core.networking import MessageSender
- from apps.web.dealer.define import DEALER_INCOME_SOURCE
- from apps.web.dealer.models import Dealer
- from apps.web.device.models import Device, FaultRecord, Group, GroupDict, Part, DeviceType
- from apps.web.eventer import Event
- from apps.web.report.ledger import Ledger
- from apps.web.south_intf.platform import notify_event_to_north
- from apps.web.user.models import MyUser, RechargeRecord, CardConsumeRecord, CardRechargeRecord, ConsumeRecord, Card, \
- CardRechargeOrder, ServiceProgress
- from apps.web.user.models import RefundMoneyRecord
- from apps.web.user.transaction_deprecated import refund_money, refund_cash
- from apps.web.user.utils import freeze_user_balance
- from apps.web.utils import set_start_key_status
- if TYPE_CHECKING:
- from apps.web.core.adapter.base import SmartBox
- from apps.web.device.models import DeviceDict
- logger = logging.getLogger(__name__)
- class WorkEvent(Event):
- """
- 开始工作或者结束工作的事件
- """
- def __init__(self, smartBox, event_data):
- # type:(SmartBox,dict)->None
- super(WorkEvent, self).__init__(smartBox)
- self.event_data = event_data
- def do(self, **args):
- Device.update_dev_control_cache(self.device['devNo'], self.event_data)
- def get_managerialOpenId_by_openId(self, openId):
- try:
- user = MyUser.objects.filter(openId = openId, groupId = self.device.groupId).first()
- if not user:
- return None
- else:
- return user.managerialOpenId
- except Exception, e:
- logger.exception('get managerial openid error=%s' % e)
- return None
- # 记录充值记录数据。注意需要在卡的余额修改后,然后记录
- def record_refund_money_for_card(self, money, cardId, orderNo = None):
- card = Card.objects.get(id = cardId)
- groupId = self.device['groupId']
- group = Group.get_group(groupId)
- if group is None:
- return False
- # 增加充值记录
- payload = {
- 'orderNo': orderNo or str(uuid.uuid4()),
- 'devNo': self.device['devNo'],
- 'logicalCode': self.device['logicalCode'],
- 'devTypeName': self.device.devTypeName,
- 'ownerId': self.device['ownerId'],
- 'groupId': self.device['groupId'],
- 'groupName': group['groupName'],
- 'groupNumber': self.device['groupNumber'],
- 'address': group['address'],
- 'openId': card.openId,
- 'nickname': card.nickName,
- 'money': money,
- 'coins': VirtualCoin(money.amount),
- 'result': 'success',
- 'via': 'refund',
- 'attachParas': {'cardNo': card.cardNo}
- }
- record = RechargeRecord(**payload)
- try:
- record.save()
- except Exception, e:
- logger.error('cardNo = %s,save recharge record error=%s' % (card.cardNo, e))
- return False
- newRcd = CardRechargeRecord(
- orderNo = payload.get('orderNo'),
- cardNo=card.cardNo,
- cardId = str(card.id),
- openId = card.openId,
- ownerId = ObjectId(self.device['ownerId']),
- money = money,
- coins = money,
- preBalance = card.balance - money,
- balance = card.balance,
- devNo = self.device['devNo'],
- devTypeCode = self.device['devType']['code'],
- logicalCode = self.device['logicalCode'],
- groupId = self.device['groupId'],
- address = group['address'],
- groupNumber = self.device['groupNumber'],
- groupName = group['groupName'],
- status = 'success',
- rechargeType = 'netpay',
- remarks = u'退币'
- )
- try:
- newRcd.save()
- except Exception, e:
- logger.exception('save card consume rcd error=%s' % e)
- def refund_money_for_card(self, money, cardId, orderNo = None):
- # 添加金币
- try:
- Card.get_collection().update({'_id': ObjectId(cardId)}, {'$inc': {'balance': money.mongo_amount}})
- except Exception, e:
- logger.error('feedback coins error=%s' % e)
- return None
- self.record_refund_money_for_card(money, cardId, orderNo)
- try:
- card = Card.objects.get(id = cardId)
- card.lastMaxBalance = card.balance
- return card.save()
- except Exception, e:
- logger.info('save error=%s ' % e)
- # 用于比如刷卡回收余额的时候,还需要更新上次消费订单的消费数据
- def update_consume_record_refund_money_for_card(self, money, card):
- try:
- # 有可能出现一张卡刷使用多个端口,这个时候回收余额只能分到3个消费订单上,因为回收余额的时候,没有上报回收具体哪一个端口的,所以无法对应到端口
- cardRcds = CardConsumeRecord.objects.filter(openId = card.openId, cardId = str(card.id),
- result = 'success').order_by('-dateTimeAdded')
- for rcd in cardRcds:
- if rcd.servicedInfo.has_key('refundedMoney'):
- continue
- else:
- break
- rcd.servicedInfo.update({'refundedMoney': money.mongo_amount})
- rcd.save()
- if not rcd.linkedConsumeRcdOrderNo:
- return
- consumeRcd = ConsumeRecord.objects.get(orderNo = rcd.linkedConsumeRcdOrderNo)
- consumeRcd.servicedInfo.update({'refundedMoney': money.mongo_amount})
- consumeRcd.save()
- except Exception, e:
- logger.exception('update_consume_record_refund_money_for_card = %s,cardNo=%s' % (e, card.cardNo))
- # 通知充电完成
- def notify_charging_finished(self, managerialOpenId):
- if not managerialOpenId:
- return
- group = Group.get_group(self.device['groupId'])
- self.notify_user(managerialOpenId, 'service_complete', **{
- 'title': u'您订购的服务已经完成',
- 'service': u'充电服务(设备编号:%s, 地址:%s)' % (self.device['logicalCode'], group['address']),
- 'finishTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'remark': u'谢谢您的支持'
- })
- # ID/IC卡扣费。-1:异常错误,0:余额不足 1:正常
- def consume_money_for_card(self, card, money):
- # type:(Card, RMB)->Tuple[int, RMB]
- # 如果卡被解绑了或者挂失了,直接判断
- if card.openId == '' or card.frozen:
- return 0, RMB(0)
- if card.balance < money:
- card.balance = RMB(0)
- else:
- card.balance = (card.balance - money)
- try:
- card.save()
- except Exception as e:
- logger.exception(e)
- return -1, RMB(0)
- return 1, card.balance
- # 无值卡,通知扣费
- def notify_balance_has_consume_for_card(self, card, money, desc = u''):
- if not card or not card.managerialOpenId: # 有可能卡没有绑定,就不需要
- return
- self.notify_user(card.managerialOpenId, 'consume_notify', **{
- 'title': u'亲,您绑定的卡:%s(名称:%s),正在扣费。%s\\n' % (card.cardNo, card.cardName, desc),
- 'money': u'%s元\\n' % money,
- 'serviceType': u'刷卡消费\\n',
- 'finishTime': datetime.datetime.now().strftime(Const.DATETIME_FMT)
- })
- # 记录卡的消费
- def record_consume_for_card(self, card, money, desc = u'', servicedInfo = None, sid = None, attachParas = None):
- # type: (Card, RMB, unicode, Optional[dict], Optional[str], Optional[dict])->Tuple[str, str]
- servicedInfo = {} if servicedInfo is None else servicedInfo
- attachParas = {} if attachParas is None else attachParas
- group = Group.get_group(self.device['groupId'])
- address = group['address']
- group_number = self.device['groupNumber']
- # TODO 这里应该与其他插入消费记录的地方一样,运用model 同时应该取消time字段与其他model一样用dateTimeAdded
- now = datetime.datetime.now()
- new_record = {
- 'orderNo': ConsumeRecord.make_no(card.cardNo, UserConsumeSubType.CARD),
- 'time': now.strftime("%Y-%m-%d %H:%M:%S"),
- 'dateTimeAdded': now,
- 'openId': card.openId,
- 'ownerId': self.device['ownerId'],
- 'coin': money.mongo_amount,
- 'money': money.mongo_amount,
- 'devNo': self.device['devNo'],
- 'logicalCode': self.device['logicalCode'],
- 'groupId': self.device['groupId'],
- 'address': address,
- 'groupNumber': group_number,
- 'groupName': group['groupName'],
- 'devTypeCode': self.device.devTypeCode,
- 'devTypeName': self.device.devTypeName,
- 'isNormal': True,
- 'status': ConsumeRecord.Status.RUNNING,
- 'remarks': u'刷卡消费',
- 'errorDesc': '',
- 'sequanceNo': '',
- 'desc': desc,
- 'attachParas': attachParas,
- 'servicedInfo': servicedInfo
- }
- ConsumeRecord.get_collection().insert_one(new_record)
- # 刷卡消费也记录一条数据
- new_card_record = {
- 'orderNo': CardConsumeRecord.make_no(),
- 'openId': card.openId,
- 'cardId': str(card.id),
- 'money': money.mongo_amount,
- 'balance': card.balance.mongo_amount,
- 'devNo': self.device['devNo'],
- 'devType': self.device['devType']['name'],
- 'logicalCode': self.device['logicalCode'],
- 'groupId': self.device['groupId'],
- 'address': address,
- 'groupNumber': group_number,
- 'groupName': group['groupName'],
- 'result': 'success',
- 'remarks': u'刷卡消费',
- 'sequanceNo': '',
- 'dateTimeAdded': datetime.datetime.now(),
- 'desc': desc,
- 'servicedInfo': servicedInfo,
- 'linkedConsumeRcdOrderNo': str(new_record['orderNo'])
- }
- if sid is not None:
- new_card_record.update({'sid': sid})
- CardConsumeRecord.get_collection().insert_one(new_card_record)
- return new_record['orderNo'], new_card_record['orderNo']
- def record_consume_for_coin(self, money, desc=u'', servicedInfo=None, remarks=u'投币或者刷卡消费'):
- # type: (RMB, basestring, Optional[dict], basestring)->str
- """
- 记录硬币的消费
- :param money:
- :param desc:
- :param remarks:
- :param servicedInfo:
- :return:
- """
- servicedInfo = {} if servicedInfo is None else servicedInfo
- group = Group.get_group(self.device['groupId'])
- address = group['address']
- group_number = self.device['groupNumber']
- now = datetime.datetime.now()
- new_record = {
- 'orderNo': ConsumeRecord.make_no(self.device.logicalCode, UserConsumeSubType.COIN),
- 'dateTimeAdded': now,
- 'openId': '',
- 'ownerId': self.device['ownerId'],
- 'coin': money.mongo_amount,
- 'money': money.mongo_amount,
- 'devNo': self.device['devNo'],
- 'logicalCode': self.device['logicalCode'],
- 'groupId': self.device['groupId'],
- 'address': address,
- 'groupNumber': group_number,
- 'groupName': group['groupName'],
- 'devTypeCode': self.device.devTypeCode,
- 'devTypeName': self.device.devTypeName,
- 'isNormal': True,
- 'status': ConsumeRecord.Status.RUNNING,
- 'remarks': remarks,
- 'errorDesc': '',
- 'sequanceNo': '',
- 'desc': desc,
- 'attachParas': {},
- 'servicedInfo': servicedInfo
- }
- ConsumeRecord.get_collection().insert_one(new_record)
- return new_record['orderNo']
- def find_card_by_card_no(self, cardNo):
- dealer = Dealer.objects(id = self.device.ownerId).first() # type: Dealer
- if not dealer:
- logger.error('dealer is not found, dealer id = {}'.format(self.device.ownerId))
- return None
- agent = Agent.objects(id = dealer.agentId).first() # type: Agent
- if not agent:
- logger.error('agent is not found, agentId=%s' % dealer.agentId)
- return None
- group = Group.get_group(self.device.groupId) # type: GroupDict
- if not group:
- logger.error('group is not found, groupid=%s' % group.groupId)
- return None
- try:
- card = Card.objects.get(cardNo = cardNo, agentId = str(agent.id))
- except DoesNotExist:
- return None
- except Exception as e:
- logger.error(e)
- return None
- return card
- def update_card_dealer_and_type(self, cardNo, cardType='ID'):
- """
- :param cardNo:
- :param cardType:
- :return:
- """
- dealer = Dealer.objects(id=self.device.ownerId).first() # type: Dealer
- if not dealer:
- logger.error('dealer is not found, dealer id = {}'.format(self.device.ownerId))
- return None
- group = Group.get_group(self.device.groupId) # type: GroupDict
- if not group:
- logger.error('group is not found, groupId = %s' % group.groupId)
- return None
- try:
- card = Card.objects.get(cardNo=cardNo, dealerId=self.device.ownerId, openId__ne='')
- except DoesNotExist:
- logger.error('card is not exist, cardNo=%s' % cardNo)
- return None
- except Exception as e:
- logger.error(e)
- return None
- else:
- # 每次刷卡的时候 刷新一次devNo, 表示最后一次的卡刷的设备
- card.devNo = self.device.devNo
- # 如果沒有綁定信息, 直接綁定本次刷卡的信息. 后续不再更新
- if not card.dealerId or not card.groupId:
- card.dealerId = str(dealer.id)
- card.groupId = group.groupId
- card.cardType = cardType
- card.save()
- return card
- def recharge_id_card(self, card, rechargeType, order):
- # type: (Card, str, CardRechargeOrder)->None
- if not order or (order.money == RMB(0) and order.rechargeType != "sendCoin"):
- return
- status = Card.get_card_status(str(card.id))
- if status == 'busy':
- return
- # 锁机制
- card.__class__.set_card_status(str(card.id), 'busy')
- try:
- # 记录总额度的变化到充值订单中
- balance = card.balance + order.coins
- preBalance = card.balance
- # 更新充值订单,已经完成
- order.update_after_recharge_id_card(self.device, balance, preBalance)
- CardRechargeRecord.add_record(
- card=card,
- group=Group.get_group(order.groupId),
- order=order,
- device=self.device
- )
- # 刷新卡里面的余额
- card.recharge(order.chargeAmount, order.bestowAmount)
- card.account_recharge(order.rechargeOrder)
- except Exception as e:
- logger.exception(e)
- return
- finally:
- card.__class__.set_card_status(str(card.id), 'idle')
- def recharge_ic_card(self, card, preBalance, rechargeType, order, need_verify = True):
- # type:(Card, RMB, str, CardRechargeOrder, bool)->bool
- """
- # rechargeType有两种,一种是用直接覆写overwrite的方式,一种是append追加钱的方式。
- # 不同的的设备,充值的方式还不一样.注意:money是实际用户付的钱,coins是给用户充值的钱,比如付10块(money),充15(coins)。
- :param card:
- :param preBalance:
- :param rechargeType:
- :param order:
- :return:
- """
- if not order or order.coins == RMB(0):
- return False
- status = Card.get_card_status(str(card.id))
- if status == 'busy':
- return False
- Card.set_card_status(str(card.id), 'busy')
- try:
- # IC卡需要下发到设备,设备写卡,把余额打入卡中
- if rechargeType == 'overwrite':
- sendMoney = preBalance + order.coins
- else:
- sendMoney = order.coins
- # 先判断order最近一次充值是否OK. 满足三个条件才认为上次充值成功:
- # 1、操作时间已经超过三天
- # 2、操作结果是串口超时, 即result == 1
- # 3、当前余额大于最后一次充值操作的充值前余额
- if need_verify and len(order.operationLog) > 0:
- log = order.operationLog[-1]
- result = log['result']
- time_delta = (datetime.datetime.now() - log['time']).total_seconds()
- last_pre_balance = RMB(log['preBalance'])
- if (result == ErrorCode.DEVICE_CONN_FAIL or result == ErrorCode.BOARD_UART_TIMEOUT) \
- and (time_delta > 3 * 24 * 3600 or preBalance > last_pre_balance):
- logger.debug('{} recharge verify result is finished.'.format(repr(card)))
- order.update_after_recharge_ic_card(device = self.device,
- sendMoney = sendMoney,
- preBalance = preBalance,
- result = ErrorCode.SUCCESS,
- description = u'充值校验结束')
- CardRechargeRecord.add_record(
- card = card,
- group = Group.get_group(order.groupId),
- order = order,
- device = self.device)
- return False
- try:
- operation_result, balance = self.deviceAdapter.recharge_card(card.cardNo, sendMoney,
- orderNo = order.orderNo)
- order.update_after_recharge_ic_card(device = self.device,
- sendMoney = sendMoney,
- preBalance = preBalance,
- syncBalance = balance,
- result = operation_result['result'],
- description = operation_result['description'])
- if operation_result['result'] != ErrorCode.SUCCESS:
- return False
- if not balance:
- balance = preBalance + order.coins
- CardRechargeRecord.add_record(
- card = card,
- group = Group.get_group(order.groupId),
- order = order,
- device = self.device)
- # 刷新卡里面的余额
- card.balance = balance
- card.lastMaxBalance = balance
- card.save()
- return True
- except Exception as e:
- order.update_after_recharge_ic_card(device = self.device,
- sendMoney = sendMoney,
- preBalance = preBalance,
- syncBalance = balance,
- result = ErrorCode.EXCEPTION,
- description = e.message)
- return False
- except Exception as e:
- logger.exception(e)
- return False
- finally:
- Card.set_card_status(str(card.id), 'idle')
- def recharge_ic_card_realiable(self, card, preBalance, rechargeType, order, need_verify = True):
- # type:(Card, RMB, str, CardRechargeOrder, bool)->bool
- """
- # rechargeType有两种,一种是用直接覆写overwrite的方式,一种是append追加钱的方式。
- # 不同的的设备,充值的方式还不一样.注意:money是实际用户付的钱,coins是给用户充值的钱,比如付10块(money),充15(coins)。
- :param card:
- :param preBalance:
- :param rechargeType:
- :param order:
- :return:
- """
- if not order or order.coins == RMB(0):
- return False
- with memcache_lock('{}-{}-{}'.format(self.device.devNo, str(order.id), 'ic_recharge'), value = '1',
- expire = 300) as acquired:
- if not acquired:
- logger.debug('order<{}> is doing.'.format(repr(order)))
- return
- try:
- if order.status == 'finished':
- logger.debug('order<{}> has been finished.'.format(repr(order)))
- return
- # IC卡需要下发到设备,设备写卡,把余额打入卡中
- if rechargeType == 'overwrite':
- sendMoney = preBalance + order.coins
- else:
- sendMoney = order.coins
- if not order.processingLog:
- order.init_processing_log(device = self.device,
- sendMoney = sendMoney,
- preBalance = preBalance)
- order.save()
- return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
- log = copy.deepcopy(order.processingLog)
- result = log.get('result', None)
- if result == ErrorCode.IC_RECHARGE_FAIL:
- if result:
- order.operationLog.append(log)
- order.init_processing_log(device = self.device,
- sendMoney = sendMoney,
- preBalance = preBalance)
- order.save()
- return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
- else:
- # 先判断order最近一次充值是否OK. 满足三个条件才认为上次
- # 充值成功:
- # 1、操作时间已经超过三天
- # 2、操作结果是串口超时, 即result == 1
- # 3、当前余额大于最后一次充值操作的充值前余额
- time_delta = (datetime.datetime.now() - log['time']).total_seconds()
- last_pre_balance = RMB(log['preBalance'])
- if (time_delta > 3 * 24 * 3600 or preBalance > last_pre_balance):
- logger.debug('{} recharge verify result is finished.'.format(repr(card)))
- log.update({
- 'result': ErrorCode.SUCCESS,
- 'description': u'充值校验结束'
- })
- order.operationLog.append(log)
- order.processingLog = {}
- order.status = 'finished'
- order.save()
- CardRechargeRecord.add_record(
- card = card,
- group = Group.get_group(order.groupId),
- order = order,
- device = self.device)
- else:
- logger.debug('verify not recharge order<{}>'.format(repr(order)))
- order.operationLog.append(log)
- order.init_processing_log(device = self.device,
- sendMoney = sendMoney,
- preBalance = preBalance)
- order.save()
- return self.deviceAdapter.recharge_ic_card_realiable(card.cardNo, sendMoney, order.orderNo)
- except Exception as e:
- logger.exception(e)
- def update_card_recharge_for_success_event(self, orderId, balance):
- logger.info('update_card_recharge_for_success_event,orderId=%s' % orderId)
- order = CardRechargeOrder.objects.get(id = ObjectId(orderId))
- if order.status == 'finished': # double check,存在设备ack之前,已经发送了一条事件到路上了,所以需要检查
- return
- card = Card.objects.get(id = order.cardId)
- group = Group.get_group(self.device['groupId'])
- RechargeRecord.get_collection().update(
- {'_id': ObjectId(order.rechargeNo)},
- {'$set': {
- 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'groupId': self.device['groupId'],
- 'devNo': self.device['devNo'],
- 'logicalCode': self.device['logicalCode'],
- 'ownerId': self.device['ownerId'],
- 'groupName': group['groupName'],
- 'groupNumber': self.device['groupNumber'],
- 'address': group['address'],
- 'devType': self.device.get('devType', {}).get('name', ''),
- }},
- multi = True
- )
- #: 并且添加一条卡的成功充值的记录
- newRcd = CardRechargeRecord(
- cardId = str(card.id),
- cardNo = card.cardNo,
- openId = card.openId,
- ownerId = ObjectId(self.device['ownerId']),
- money = order.money,
- coins = order.coins,
- balance = balance,
- preBalance = card.balance,
- devNo = self.device['devNo'],
- devTypeCode = self.device['devType']['code'],
- logicalCode = self.device['logicalCode'],
- groupId = self.device['groupId'],
- address = group['address'],
- groupNumber = self.device['groupNumber'],
- groupName = group['groupName'],
- status = 'success',
- rechargeType = 'netpay'
- )
- try:
- newRcd.save()
- except Exception as e:
- logger.exception('save card consume rcd error=%s' % e)
- # 刷新卡里面的余额
- try:
- card.balance = balance
- card.lastMaxBalance = balance
- card.save()
- except Exception as e:
- logger.exception(e)
- try:
- order.status = 'finished'
- order.rechargeType = 'netpay'
- order.save()
- except Exception as e:
- logger.exception(e)
- # 更新收录有值卡
- def update_card_balance(self, card, balance):
- try:
- card.balance = balance
- card.save()
- except Exception as e:
- logger.exception(e)
- # 根据消耗的电量计算当前地址下的本次消耗电费
- def calc_elec_fee(self, spendElec):
- group = Group.objects.get(id = self.device['groupId'])
- return float(group.otherConf.get('elecFee')) * spendElec
- def calc_service_fee(self, fee, elec_charge, service_charge):
- elecFee = fee * Ratio(elec_charge) * Ratio(1 / float(elec_charge + service_charge))
- serviceFee = (fee - elecFee)
- return elecFee, serviceFee
- def time_ratio_pricing(self, value, leftTime, actualNeedTime):
- return value * Ratio(leftTime) * Ratio(1 / float(actualNeedTime))
- def get_backCoins(self, coins, leftTime, actualNeedTime):
- return self.time_ratio_pricing(value = coins, leftTime = leftTime, actualNeedTime = actualNeedTime)
- def get_backMoney(self, money, leftTime, actualNeedTime):
- return self.time_ratio_pricing(value = money, leftTime = leftTime, actualNeedTime = actualNeedTime)
- def generate_service_complete_title_by_devType(self, devTypeId, templateMap):
- try:
- devType = DeviceType.objects(id=devTypeId).first()
- if devType is None:
- return ''
- if devType.finishedMessageTemplateDict == {}:
- return ''
- for _ in templateMap.keys():
- if _ not in devType.finishedMessageTemplateDict['keys']:
- templateMap.pop(_)
- return devType.finishedMessageTemplateDict['template'].format(**templateMap)
- except Exception as e:
- logger.error(e)
- return ''
- def notify_user_service_complete(self, service_name, openid, port, address, reason, finished_time, url = None, extra = None):
- # type: (basestring, str, str, str, str, str, str, list)->None
- title_list = [
- {u'': u'{}结束,感谢您的使用!'.format(service_name)},
- {u'设备编号': self.device.logicalCode}
- ]
- if port:
- title_list.extend([{u'设备端口': port}, {u'设备地址': address}])
- else:
- title_list.extend(
- [{u'设备地址': address}])
- if extra:
- title_list.extend(extra)
- if reason:
- title_list.extend([{u'结束原因': reason}])
- dealer = self.device.owner # type: Dealer
- if dealer.showServicePhone:
- remark = u'客服联系电话:{}'.format(dealer.service_phone)
- else:
- remark = u'我们竭诚为您服务,有任何问题请联系客服!'
- if not finished_time:
- finished_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- agent = Agent.objects.filter(id=str(dealer.agentId)).first()
- if not agent:
- logger.warning('agent<id={}> is not exists.'.format(str(dealer.agentId)))
- return
- kw = {}
- # TODO: 微信订阅模板
- if agent.customizedUserSubGzhAllowable:
- if port:
- service = u"{}服务({}-端口{})".format(self.device.majorDeviceType, self.device.logicalCode, port)
- if len(service) > 20:
- service = u'{}服务({})'.format(self.device.majorDeviceType, self.device.logicalCode)
- if len(service) > 20:
- service = u'{}服务'.format(self.device.majorDeviceType)
- else:
- service = u'{}服务({})'.format(service_name, self.device.logicalCode)
- if len(service) > 20:
- service = u'{}服务'.format(self.device.majorDeviceType)
- kw = {
- 'service': service,
- 'finishTime': finished_time,
- 'remark': remark,
- }
- else:
- kw = {
- 'title': make_title_from_dict(title_list),
- 'service': u'{}服务'.format(service_name),
- 'finishTime': finished_time,
- 'remark': remark
- }
- kw.update({'url': url or settings.SERVER_END_BASE_SSL_URL + '/user/index.html?v=1.0.31#/user/consume'})
- self.notify_user(openid,'service_complete', **kw)
- def notify_to_sd_norther(self, portStr, consumeDict, platform=ShanDongNorther.platform.default): # type:(str, dict, int) -> None
- """
- 上报 订单结束信息到山东 省平台 或者济南静态平台(同一份协议 不同服务器)
- """
- try:
- logicalCode = self.device.logicalCode
- part = Part.objects.filter(logicalCode=logicalCode, partNo=portStr).first()
- dealer = self.device.owner
- if not part or not dealer:
- raise Exception("no part or no dealer")
- # 主动上报 能找到唯一的norther
- norther = ShanDongNorther.get_norther(dealerId=self.device.ownerId, platform=platform).first() # type: ShanDongNorther
- if not norther:
- return
- northerDict = {
- "connectorId": str(part.id),
- "startTime": consumeDict.get("startTime", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
- "endTime": consumeDict.get("finishedTime", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
- "totalPower": consumeDict.get("elec", 0),
- "totalElecMoney": consumeDict.get("spendMoney", 0),
- }
- logger.info("northerId:{}, devNo:{}, dealerId:{} norther to sd infoDict = {}".format(
- str(norther.id),
- self.device.devNo,
- self.device.ownerId,
- northerDict)
- )
- # TODO 这个地方建议不同的实现放在model 内部实现 对外保持norther的notify接口统一
- if norther.isSdPlatform:
- norther.notification_order_info(northerDict)
- else:
- norther.notification_order_info_jn(northerDict)
- except Exception as e:
- logger.info(e)
- def query_to_ykt_norther(self,cardNo):
- resutl = custquery(cardNo)
- code = resutl['resultCode']
- if code == u'000000':
- cardInfo = {
- "customerId":resutl['result']['customerId'],
- "userName":resutl['result']['userName'],
- "userCode":resutl['result']['userCode'],
- "oddFare":resutl['result']['oddFare'],
- "custStatus":resutl['result']['custStatus'],
- "cardNo":cardNo,
- }
- return{'code':code,'message':resutl['resultMsg'],'cardInfo':cardInfo}
- elif code == u'999999':
- return {'code': code, 'message': resutl['resultMsg'], 'cardInfo': {}}
- elif code == u'555555':
- return {'code': code, 'message': '刷卡异常', 'cardInfo': {}}
- def notify_to_ykt_norther(self,cardSnr,orderNo,tradeFare,tradeDate):
- """
- 刷卡后上报信息到一网通平台
- 如果推送失败会一直推送
- """
- circulation = 3
- # 推送
- while circulation>0:
- result = custQueryConsume(cardSnr,orderNo,tradeFare,tradeDate)
- resultCode = result.get('resultCode')
- if resultCode == "000000":
- circulation = 0
- else:
- circulation -= 1
- return {
- 'resultCode':result.get('resultCode'),
- 'resultMsg':result.get('resultMsg'),
- 'orderNo':result.get('orderNo'),
- 'oddFare':result.get('oddFare')
- }
- def refund_net_pay(self, user, lineInfo, refundedMoney, refundCoins, consumeDict, is_cash):
- # type: (MyUser, dict, RMB, VirtualCoin, dict, bool)->None
- logger.debug(
- 'refund for net pay. user = {}, lineInfo = {}, refundedMoney = {}, refundCoins = {}, is_cash = {}'.format(
- repr(user), lineInfo, refundedMoney, refundCoins, is_cash))
- refund_recharge_ids = []
- if is_cash:
- if 'rechargeRcdId' in lineInfo:
- refund_recharge_ids.append(lineInfo['rechargeRcdId'])
- else:
- pay_info = lineInfo.get('payInfo', list())
- for item in pay_info:
- if 'rechargeRcdId' in item:
- refund_recharge_ids.append(item['rechargeRcdId'])
- if refundCoins <= VirtualCoin(0):
- # 只能退现金的情况
- consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: RMB(0).mongo_amount})
- if len(refund_recharge_ids) > 0:
- logger.info('need refund cash, ids = %s' % str(refund_recharge_ids))
- if refundedMoney <= RMB(0):
- consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: RMB(0).mongo_amount})
- return
- left_refund = refundedMoney
- all_refund = RMB(0)
- for recharge_record_id in refund_recharge_ids[::-1]:
- recharge_record = RechargeRecord.objects(id = str(recharge_record_id)).first() # type: RechargeRecord
- if not recharge_record:
- logger.error('not find recharge record {}'.format(recharge_record_id))
- continue
- if recharge_record.openId != lineInfo['openId']:
- logger.error('is not my record. {} != {}'.format(recharge_record.openId, lineInfo['openId']))
- continue
- this_refund = min(recharge_record.money, left_refund)
- if this_refund <= RMB(0):
- continue
- logger.debug('try to refund money {} for recharge record {}'.format(this_refund, repr(recharge_record)))
- try:
- all_refund += this_refund
- refund_order = refund_cash(
- recharge_record, this_refund, VirtualCoin(0), user = user) # type: RefundMoneyRecord
- if not refund_order:
- logger.error(
- 'refund money<{}> failure. recharge = {}'.format(this_refund, repr(recharge_record)))
- except Exception as e:
- logger.exception(e)
- left_refund = left_refund - this_refund
- if left_refund <= RMB(0):
- break
- if all_refund > RMB(0):
- consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_CASH: all_refund.mongo_amount})
- else:
- if refundCoins > VirtualCoin(0):
- consumeDict.update({DEALER_CONSUMPTION_AGG_KIND.REFUNDED_COINS: refundCoins.mongo_amount})
- refund_money(self.device, refundCoins, lineInfo['openId'])
- class FaultEvent(Event):
- def __init__(self, smartBox, event_data):
- # type:(SmartBox,dict)->None
- super(FaultEvent, self).__init__(smartBox)
- self.event_data = event_data
- def is_notify_dealer(self):
- if self.dealer:
- return self.dealer.devFaultPushDealerSwitch
- else:
- return False
- def is_notify_user(self):
- if self.dealer:
- return self.dealer.devFaultPushUserSwitch
- else:
- return False
- def record(self, faultCode = '', description = None, title = '', detail = None, level = FAULT_LEVEL.NORMAL, portNo = 0):
- # 故障记录入库
- if self.dealer is not None:
- dealerId = self.dealer.id
- else:
- dealerId = None
- logicalCode = self.device['logicalCode']
- now = datetime.datetime.now()
- created_within_five_minutes = now - datetime.timedelta(minutes = 5)
- # 这个应该是防止重复报
- try:
- fault_record = FaultRecord.objects(
- status="init",
- faultCode = faultCode,
- title = title,
- description = description,
- logicalCode = logicalCode,
- portNo = portNo,
- createdTime__gt = created_within_five_minutes
- ).first()
- if not fault_record:
- group = Group.get_group(self.device['groupId'])
- fault_record = FaultRecord(
- logicalCode = logicalCode,
- imei = self.device['devNo'],
- portNo = portNo,
- faultCode = faultCode,
- title = title,
- description = description or self.event_data.get('statusInfo', ''),
- groupName = group.get('groupName', ''),
- address = group.get('address', '')
- )
- if dealerId is not None:
- fault_record.dealerId = dealerId
- if detail is not None:
- if not isinstance(detail, dict):
- raise TypeError('detail has to be a dict')
- fault_record.detail = detail
- fault_record = fault_record.save()
- else:
- logger.debug('%r existed within 5 minutes' % (fault_record,))
- return fault_record
- except NotUniqueError as e:
- logger.error('cannot insert FaultRecord, dev=%s, error=%s' % (self.device['devNo'], e))
- def do(self, **args):
- """不要将所有的信息不加筛选的打入缓存"""
- group = self.device.group
- # 通知经销商
- if self.is_notify_dealer():
- titleDictList = [
- {u'告警名称': self.event_data.get("faultName", u"设备告警")},
- {u'设备编号': self.device["logicalCode"]},
- {u'地址名称': group["groupName"]}
- ]
- self.notify_dealer('device_fault', **{
- 'title': make_title_from_dict(titleDictList),
- 'device': u'%s号设备-%s端口\\n' % (
- self.device['groupNumber'], self.event_data.get('port')) if self.event_data.has_key(
- 'port') else u'%s号设备\\n' % self.device['groupNumber'],
- 'faultType': u'%s\\n' % self.event_data['faultType'] if self.event_data.has_key(
- 'faultType') else u'设备告警\\n',
- 'notifyTime': u'%s\\n' % datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'fault': u'%s(详细情况,建议您到管理后台的告警模块进行查看)\\n' % self.event_data['desc'] if self.event_data.has_key(
- 'desc') else self.event_data.get('statusInfo', ''),
- })
- # 通知用户
- if self.is_notify_user():
- if self.device.has_key('openId'):
- user = MyUser.objects(openId = self.device.get('openId'), groupId = self.device['groupId']).first()
- self.notify_user(user.managerialOpenId if user else '', 'device_fault',
- **{
- 'title': u'注意!注意!您正在使用的设备发生了故障,设备编码:%s' % self.device['logicalCode'],
- 'fault': self.event_data['statusInfo'],
- 'notifyTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- })
- # 写入故障记录
- faultRecord = self.record(
- faultCode = self.event_data.get('faultCode', ''),
- description = self.event_data.get('desc', None),
- title = self.event_data.get('faultName', ''),
- detail = self.event_data.get('detail', None),
- level = self.event_data.get('level', FAULT_LEVEL.NORMAL),
- portNo = self.event_data.get('port', 0)
- )
- # 梁溪的告警
- if faultRecord:
- self.north_to_liangxi(faultRecord)
- notify_event_to_north(
- self.dealer,
- self.device,
- level = Const.EVENT_NORMAL,
- desc = self.event_data['desc'] if self.event_data.has_key('desc') else self.event_data.get('statusInfo', '')
- )
- def north_to_liangxi(self, faultRecord):
- from taskmanager.mediator import task_caller
- task_caller(
- 'send_to_xf_falut',
- devNo=self.device.devNo,
- faultId=str(faultRecord.id)
- )
- def time_ratio_pricing(self, value, leftTime, actualNeedTime):
- return value * Ratio(leftTime) * Ratio(1 / float(actualNeedTime))
- def get_backCoins(self, coins, leftTime, actualNeedTime):
- return self.time_ratio_pricing(value = coins, leftTime = leftTime, actualNeedTime = actualNeedTime)
- def get_backMoney(self, money, leftTime, actualNeedTime):
- return self.time_ratio_pricing(value = money, leftTime = leftTime, actualNeedTime = actualNeedTime)
- class AckEventProcessorIntf(object):
- def pre_processing(self, device, event_data):
- # type:(DeviceDict, dict)->dict
- return event_data
- class AckEvent(WorkEvent):
- """
- 目前设计为不能重入. 如果执行有异常, 肯定是代码问题
- """
- def __init__(self, smartBox, event_data, pre_processor = None):
- # type:(SmartBox, dict, AckEventProcessorIntf)->None
- super(AckEvent, self).__init__(smartBox, event_data)
- self.pre_processor = pre_processor
- def ack_msg(self):
- if self.event_data['status'] == 'finishing':
- logger.debug('finishing status is not need to ack.')
- return
- payload = {
- 'order_id': self.event_data['order_id'],
- 'order_type': self.event_data['order_type'],
- 'status': self.event_data['status']
- }
- # if response:
- # payload.update(response)
- MessageSender.send_no_wait(device = self.device,
- cmd = DeviceCmdCode.EVENT_ACK,
- payload = payload)
- def do_impl(self, **args):
- raise Exception('must implement do_impl interface.')
- def do(self, **args):
- if 'order_id' not in self.event_data or 'order_type' not in self.event_data:
- logger.error('order id or type is null.')
- return
- order_id = self.event_data['order_id']
- order_type = self.event_data['order_type']
- with memcache_lock('{}-{}-{}'.format(self.device.devNo, order_id, order_type), value = '1',
- expire = 300) as acquired:
- if not acquired:
- logger.debug('ack message<{}-{}-{}> is duplicate.'.format(self.device.devNo, order_id, order_type))
- return
- try:
- self.do_impl(**args)
- except Exception as e:
- import traceback
- logger.warning(traceback.format_exc())
- if 'status' in self.event_data:
- self.ack_msg()
- class ComNetPayAckEvent(AckEvent):
- def post_before_start(self, order=None):
- pass
-
- def post_after_start(self, order=None):
- raise NotImplementedError('must implement post_after_start.')
-
- def post_before_finish(self, order=None):
- pass
-
- def post_after_finish(self, order=None):
- raise NotImplementedError('must implement post_after_finish.')
- def do_finished_event(self, order, sub_orders, merge_order_info):
- raise NotImplementedError('must implement do_finished_event.')
- def do_running_order(self, order, result):
- # type: (ConsumeRecord, dict)->None
- self.post_before_start(order)
- if order.status in ['running', 'finished']:
- logger.debug('order<{}> no need to ack. this has done.'.format(repr(order)))
- return
- if order.status == 'unknown':
- dealer = self.device.owner
- # 设备支持上报告扣款
- if 'device_callback_to_freeze_balance' in dealer.features:
- freeze_user_balance(self.device, Group.get_group(order.groupId), order)
- err_desc = u'设备启动成功,事件上报成功,补充扣款'
- else:
- raise Exception('Invalid operation!')
- else:
- set_start_key_status(start_key=order.startKey,
- state=START_DEVICE_STATUS.FINISHED,
- order_id=str(order.id))
- err_desc = ''
- if 'master' in result:
- order.association = {
- 'master': result['master']
- }
- order.servicedInfo.update({'masterOrderNo': result['master']})
- order.errorDesc = err_desc
- order.isNormal = True
- order.status = 'running'
- order.startTime = datetime.datetime.fromtimestamp(result['sts'])
- order.save()
- self.post_after_start(order=order)
- def deal_running_event(self, order):
- # type: (ConsumeRecord)->None
- if 'sub' in self.event_data:
- order.association = {
- 'sub': [item['order_id'] for item in self.event_data['sub']]
- }
- order.servicedInfo.update(
- {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))})
- order.save()
- self.do_running_order(order, self.event_data)
- sub_order_map = {}
- for item in self.event_data['sub']:
- item['master'] = self.event_data['order_id']
- item['sts'] = self.event_data['sts']
- sub_order_map[item['order_id']] = item
- sub_orders = [_ for _ in ConsumeRecord.objects.filter(orderNo__in = sub_order_map.keys())]
- for sub_order in sub_orders:
- self.do_running_order(sub_order, sub_order_map[sub_order.orderNo])
- else:
- self.do_running_order(order, self.event_data)
- sub_orders = []
- account_info = self.merge_order(order, sub_orders)
- cache_info = {
- 'startTime': Arrow.fromdatetime(order.startTime, tzinfo = settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
- 'status': Const.DEV_WORK_STATUS_WORKING,
- 'isStart': True,
- 'openId': order.openId,
- 'orderNo': order.orderNo,
- 'port': order.used_port
- }
- if sub_orders:
- cache_info.update({
- 'subOrderNos': [_.orderNo for _ in sub_orders]
- })
- cache_info.update(account_info)
- current_cache_info = Device.get_dev_control_cache(self.device.devNo)
- if order.used_port < Const.NO_PORT:
- current_port_info = current_cache_info.get(str(order.used_port))
- if not current_port_info or not current_cache_info.get("orderNo") or (order.orderNo == current_port_info.get('orderNo', None)) or current_port_info.get('status', 0) == 0:
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
- else:
- if not current_cache_info or not current_cache_info.get("orderNo") (order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get('status', 0) == 0:
- Device.update_dev_control_cache(self.device.devNo, cache_info)
- ServiceProgress.new_progress_for_order(order = order,
- device = self.device,
- cache_info = cache_info)
- def do_finished_order(self, order, result):
- self.post_before_finish(order=order)
- if 'sub' in result:
- order.association = {
- 'sub': [item['order_id'] for item in self.event_data['sub']]
- }
- order.servicedInfo.update(
- {'subOrderNo': '{}'.format(', '.join([item['order_id'] for item in self.event_data['sub']]))})
- elif 'master' in result:
- order.association = {
- 'master': result['master']
- }
- order.servicedInfo.update({'masterOrderNo': result['master']})
- if order.status == 'running':
- order.status = 'finished'
- order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
- order.save()
- else:
- if order.status == 'unknown':
- dealer = self.device.owner
- # 设备支持上报告扣款
- if 'device_callback_to_freeze_balance' in dealer.features:
- freeze_user_balance(self.device, Group.get_group(order.groupId), order)
- err_desc = u'结束事件上报成功,补充扣款'
- else:
- raise Exception('Invalid operation!')
- else:
- # created状态
- set_start_key_status(start_key=order.startKey,
- state=START_DEVICE_STATUS.FINISHED,
- order_id=str(order.id))
- err_desc = ''
- order.isNormal = True
- order.status = 'finished'
- order.errorDesc = err_desc
- order.startTime = datetime.datetime.fromtimestamp(result['sts'])
- order.finishedTime = datetime.datetime.fromtimestamp(result['fts'])
- order.save()
- self.post_after_finish(order=order)
- def deal_finished_event(self, order):
- # type: (ConsumeRecord)->None
- if order.status == 'finished':
- logger.debug('order<{}> has finished.'.format(repr(order)))
- return
- self.do_finished_order(order, self.event_data)
- sub_orders = []
- if 'sub' in self.event_data:
- sub_order_map = {}
- for item in self.event_data['sub']:
- item['status'] = 'finished'
- item['master'] = str(order.orderNo)
- item['sts'] = self.event_data['sts']
- item['fts'] = self.event_data['fts']
- sub_order_map[item['order_id']] = item
- sub_orders = [_ for _ in ConsumeRecord.objects.filter(orderNo__in = sub_order_map.keys())]
- for sub_order in sub_orders:
- self.do_finished_order(sub_order, sub_order_map[sub_order.orderNo])
- current_cache_info = Device.get_dev_control_cache(self.device.devNo)
- if order.used_port != Const.NO_PORT:
- current_port_info = current_cache_info.get(str(order.used_port))
- if current_port_info and (order.orderNo == current_port_info.get('orderNo', None)):
- Device.clear_port_control_cache(self.device.devNo, order.used_port)
- else:
- if current_cache_info and order.orderNo == current_cache_info.get('orderNo', None):
- Device.invalid_device_control_cache(self.device.devNo)
- ServiceProgress.objects(open_id = order.openId,
- device_imei = self.device.devNo,
- port = int(order.used_port),
- consumeOrder__orderNo = order.orderNo).update_one(
- upsert = False,
- **{
- 'isFinished': True,
- 'finished_time': Arrow.fromdatetime(order.finishedTime, tzinfo = settings.TIME_ZONE).timestamp,
- 'expireAt': datetime.datetime.now()
- })
- merge_order_info = self.merge_order(order, sub_orders)
- self.do_finished_event(order, sub_orders, merge_order_info)
- def merge_order(self, master_order, sub_orders):
- # type:(ConsumeRecord, list)->dict
- raise NotImplementedError('must implement merge_order.')
- def do_impl(self, **args):
- order_id = self.event_data['order_id']
- order = ConsumeRecord.objects(ownerId = self.device.ownerId,
- orderNo = order_id).first() # type: ConsumeRecord
- if not order:
- logger.debug('order<no={}> is not exist.'.format(self.event_data['order_id']))
- return
- if order.status in ['end', 'waitPay', 'finished']:
- logger.debug('order<{}> has been finished.'.format(repr(order)))
- return
- if order.used_port != self.event_data['port']:
- logger.error('port is not equal. {} != {}'.format(self.event_data['port'], order.used_port))
- return
- if self.pre_processor:
- self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
- if self.event_data['status'] in ['running', 'finishing']:
- return self.deal_running_event(order)
- if self.event_data['status'] == 'finished':
- return self.deal_finished_event(order)
- class IcStartAckEvent(AckEvent):
- def __init__(self, smartBox, event_data, pre_processor=None):
- super(IcStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
-
- if self.pre_processor:
- self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
- self.card = self.update_card_dealer_and_type(self.event_data.get('cardNo'),
- cardType=self.event_data.get('cardType', 'IC'))
-
- def post_before_start(self, order=None):
- pass
-
- def post_after_start(self, order=None):
- raise NotImplementedError('must implement post_after_start.')
-
- def post_before_finish(self, order=None):
- pass
-
- def post_after_finish(self, order=None):
- raise NotImplementedError('must implement post_after_finish.')
-
- def checkout_order(self, order):
- raise NotImplementedError('must implement checkout_order.')
-
- def get_or_create_order(self):
- # type:()-> tuple
-
- def new_one(order_id):
- # type:( str)->ConsumeRecord
-
- if self.event_data['order_id'] == order_id:
- fee = VirtualCoin(self.event_data['fee'])
- else:
- sub = self.event_data.get('sub', [])
- fee = VirtualCoin(map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0])
-
- order = ConsumeRecord.objects(startKey=order_id).first()
- if not order:
- attach_paras = {'chargeIndex': self.event_data['port']}
-
- order = ConsumeRecord.new_one(
- order_no=ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.CARD),
- user=MyUser(openId=self.card.openId, nickname=self.card.nickName),
- device=self.device,
- group=Group.get_group(self.device.groupId),
- package=self.event_data.get('package', {}),
- attach_paras=attach_paras,
- pay_info={
- 'via': 'card',
- 'coins': fee.mongo_amount
- },
- start_key=order_id)
- order.servicedInfo = {'cardNo': self.event_data.get('cardNo'), 'chargeIndex': self.event_data['port']}
- order.save()
- card_order = CardConsumeRecord.objects(orderNo=order.orderNo).first()
- if not card_order:
- group = Group.get_group(self.device.groupId) # type:GroupDict
- card_order = CardConsumeRecord(orderNo=order.orderNo,
- openId=self.card.openId,
- cardId=str(self.card.id),
- money=fee.mongo_amount,
- balance=self.card.balance.mongo_amount,
- devNo=self.device.devNo,
- devType=self.device.devTypeName,
- logicalCode=self.device.logicalCode,
- groupId=self.device.groupId,
- address=group.address,
- groupNumber=self.device.groupNumber,
- groupName=group.groupName,
- result='success',
- remarks=u'刷卡消费',
- dateTimeAdded=datetime.datetime.now(),
- linkedConsumeRcdOrderNo=order.orderNo).save()
-
- # 生成订单处理卡消费(预付费还是后付费)
- try:
- self.checkout_order(order)
- except Exception as e:
- order.isNormal = False
- order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
- order.save()
-
- return order
-
- def new_sub_order(order_id, master_order_id):
- order = new_one(order_id)
-
- order.isNormal = True
- order.status = self.event_data['status']
-
- order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
- if 'fts' in self.event_data:
- order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
-
- if ('master' not in order.association) or (order.association['master'] != master_order_id):
- order.association = {'master': master_order_id}
- order.servicedInfo['masterOrderNo'] = master_order_id
-
- order.save()
-
- return order
-
- master_order = new_one(self.event_data['order_id'])
- master_order.save()
-
- sub_orders = []
- for item in self.event_data.get('sub', []):
- sub_order = new_sub_order(item['order_id'], master_order.orderNo)
- sub_orders.append(sub_order)
-
- sub_order_id_list = [item.orderNo for item in sub_orders]
-
- has_done = True
- if master_order.status == ConsumeRecord.Status.FINISHED:
- logger.debug('master order<{}> has been done.'.format(repr(master_order)))
- has_done = True
-
- if master_order.status == ConsumeRecord.Status.CREATED:
- has_done = False
- master_order.association = {'sub': sub_order_id_list}
- if sub_order_id_list:
- master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
-
- else:
- if master_order.status == ConsumeRecord.Status.RUNNING and \
- self.event_data['status'] == ConsumeRecord.Status.FINISHED:
- has_done = False
-
- if set(master_order.association['sub']) != set(sub_order_id_list):
- master_order.association = {'sub': sub_order_id_list}
- if sub_order_id_list:
- master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
- has_done = False
-
- master_order.isNormal = True
- master_order.status = self.event_data['status']
-
- if not master_order.startTime:
- if 'sts' in self.event_data:
- master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
- else:
- master_order.startTime = datetime.datetime.now()
-
- if 'fts' in self.event_data:
- master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
-
- master_order.save()
-
- return has_done, master_order, sub_orders
-
- def merge_order(self, master_order, sub_orders):
- # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
-
- raise NotImplementedError('must implement merge_order.')
-
- def do_finished_event(self, order, merge_order_info):
- # type:(ConsumeRecord, dict)->None
-
- raise NotImplementedError('must implement do_finished_event.')
-
- def deal_running_order(self, master_order, sub_orders):
- # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
-
- self.post_before_start(master_order)
-
- try:
- start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
- cache_info = {
- 'port': str(master_order.used_port),
- 'cardId': str(self.card.id),
- 'openId': self.card.openId,
- 'startTime': start_time_str,
- 'isStart': True,
- 'status': Const.DEV_WORK_STATUS_WORKING,
- 'orderNo': master_order.orderNo,
- 'subOrderNos': [item.orderNo for item in sub_orders]
- }
- cache_info.update(self.merge_order(master_order, sub_orders))
-
- current_cache_info = Device.get_dev_control_cache(self.device.devNo)
- if master_order.used_port < Const.NO_PORT:
- current_port_info = current_cache_info.get(str(master_order.used_port), {})
- if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo',
- None) or current_port_info.get(
- 'status', 0) == Const.DEV_WORK_STATUS_IDLE:
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
- else:
- if start_time_str > current_port_info.get('startTime', '1970-01-01'):
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType='overwrite')
- else:
- pass
- else:
- if not current_cache_info or (
- master_order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get(
- 'status', 0) == Const.DEV_WORK_STATUS_IDLE:
- Device.update_dev_control_cache(self.device.devNo, cache_info)
- else:
- if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
- Device.update_dev_control_cache(self.device.devNo, cache_info)
- else:
- pass
-
- ServiceProgress.new_progress_for_order(order=master_order,
- device=self.device,
- cache_info=cache_info)
- finally:
- self.post_after_start(order=master_order)
-
- def deal_finished_order(self, master_order, sub_orders):
- # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
-
- self.post_before_finish(master_order)
-
- try:
- self.do_finished_event(master_order, self.merge_order(master_order, sub_orders))
-
- current_cache_info = Device.get_dev_control_cache(self.device.devNo)
- if master_order.used_port < Const.NO_PORT:
- current_port_info = current_cache_info.get(str(master_order.used_port))
- if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
- Device.clear_port_control_cache(devNo=self.device.devNo, port=master_order.used_port)
- else:
- if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
- Device.invalid_device_control_cache(self.device.devNo)
-
- ServiceProgress.objects(open_id=self.card.openId,
- device_imei=self.device.devNo,
- port=int(master_order.used_port)).update_one(
- upsert=False,
- **{
- 'isFinished': True,
- 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
- tzinfo=settings.TIME_ZONE).timestamp,
- 'expireAt': datetime.datetime.now()
- })
- finally:
- self.post_after_finish(order=master_order)
-
- def do_impl(self, **args):
- if 'port' not in self.event_data:
- logger.warn('port is not exist.')
- return
-
- order_id = self.event_data['order_id']
- order_type = self.event_data['order_type']
-
- if not self.card or not self.card.openId or self.card.frozen:
- logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
- return
-
- has_done, master_order, sub_orders = self.get_or_create_order()
- if has_done:
- logger.debug('order<{}> has been done.'.format(repr(master_order)))
- return
-
- if self.event_data['status'] == 'running':
- self.deal_running_order(master_order, sub_orders)
-
- if self.event_data['status'] == 'finished':
- self.deal_finished_order(master_order, sub_orders)
- class IcRechargeAckEvent(AckEvent):
- def __init__(self, smartBox, event_data, pre_processor = None):
- super(IcRechargeAckEvent, self).__init__(smartBox, event_data, pre_processor)
- if self.pre_processor:
- self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
- def do_impl(self, **args):
- order = CardRechargeOrder.objects(orderNo = self.event_data['order_id']).first() # type: CardRechargeOrder
- if not order:
- logger.error('order<orderNo={}> is not exist.'.format(self.event_data['order_id']))
- return
- if order.status == 'finished':
- logger.debug('order<{}> has be done.'.format(repr(order)))
- return
- if not order.processingLog:
- logger.error('order<{}> has no processing log.'.format(repr(order)))
- return
- cardNo = self.event_data['cardNo']
- if str(cardNo) != order.cardNo:
- logger.error('check card no failure. {} != {}'.format(cardNo, order.cardNo))
- return
- card = Card.objects(cardNo = order.cardNo).first()
- if not card:
- logger.error('not find card<{}>'.format(card.cardNo))
- return
- result = self.event_data['rst']
- if result == ErrorCode.BOARD_UART_TIMEOUT:
- order.processingLog['result'] = result
- order.processingLog.update({
- 'result': self.event_data['rst'],
- 'description': DeviceErrorCodeDesc.get(result)
- })
- order.save()
- else:
- # 刷新卡里面的余额
- balance = VirtualCoin(self.event_data.get('balance'))
- if balance != card.balance:
- card.balance = balance
- card.lastMaxBalance = balance
- card.save()
- if result == ErrorCode.DEVICE_SUCCESS:
- log = copy.deepcopy(order.processingLog)
- log.update({
- 'result': result,
- 'description': DeviceErrorCodeDesc.get(result),
- 'syncBalance': balance.mongo_amount
- })
- order.operationLog.append(log)
- order.processingLog = {}
- order.status = 'finished'
- order.save()
- CardRechargeRecord.add_record(
- card = card,
- group = Group.get_group(order.groupId),
- order = order,
- device = self.device)
- else:
- order.processingLog.update({
- 'result': result,
- 'description': DeviceErrorCodeDesc.get(result),
- 'syncBalance': balance.mongo_amount
- })
- order.save()
- class IdStartAckEvent(AckEvent):
- def __init__(self, smartBox, event_data, pre_processor = None):
- super(IdStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
- if self.pre_processor:
- self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
- self.card = self.update_card_dealer_and_type(self.event_data.get('cardNo'), cardType=self.event_data.get('cardType', 'ID')) # type: Card
- def post_before_start(self, order=None):
- pass
- def post_after_start(self, order=None):
- raise NotImplementedError('must implement post_after_start.')
- def post_before_finish(self, order=None):
- pass
- def post_after_finish(self, order=None):
- raise NotImplementedError('must implement post_after_finish.')
- def checkout_order(self, order):
- raise NotImplementedError('must implement checkout_order.')
- def get_or_create_order(self):
- # type:()-> tuple
- def new_one(order_id):
- # type:( str)->ConsumeRecord
- if self.event_data['order_id'] == order_id:
- fee = VirtualCoin(self.event_data['fee'])
- else:
- sub = self.event_data.get('sub',[])
- fee = VirtualCoin(map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0])
- order = ConsumeRecord.objects(startKey = order_id).first()
- if not order:
- attach_paras = {'chargeIndex': self.event_data['port']}
- order = ConsumeRecord.new_one(
- order_no = ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.CARD),
- user = MyUser(openId = self.card.openId, nickname = self.card.nickName),
- device = self.device,
- group = Group.get_group(self.device.groupId),
- package=self.event_data.get('package', {}),
- attach_paras = attach_paras,
- pay_info = {
- 'via': 'card',
- 'coins': fee.mongo_amount
- },
- start_key = order_id)
- order.servicedInfo = {'cardNo': self.event_data.get('cardNo'), 'chargeIndex': self.event_data['port']}
- order.save()
- card_order = CardConsumeRecord.objects(orderNo = order.orderNo).first()
- if not card_order:
- group = Group.get_group(self.device.groupId) # type:GroupDict
- card_order = CardConsumeRecord(orderNo = order.orderNo,
- openId = self.card.openId,
- cardId = str(self.card.id),
- money = fee.mongo_amount,
- balance = self.card.balance.mongo_amount,
- devNo = self.device.devNo,
- devType = self.device.devTypeName,
- logicalCode = self.device.logicalCode,
- groupId = self.device.groupId,
- address = group.address,
- groupNumber = self.device.groupNumber,
- groupName = group.groupName,
- result = 'success',
- remarks = u'刷卡消费',
- dateTimeAdded = datetime.datetime.now(),
- linkedConsumeRcdOrderNo = order.orderNo).save()
- # 生成订单处理卡消费(预付费还是后付费)
- try:
- self.checkout_order(order)
- except Exception as e:
- order.isNormal = False
- order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
- order.save()
- return order
- def new_sub_order(order_id, master_order_id):
- order = new_one(order_id)
- order.isNormal = True
- order.status = self.event_data['status']
- order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
- if 'fts' in self.event_data:
- order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
- if ('master' not in order.association) or (order.association['master'] != master_order_id):
- order.association = {'master': master_order_id}
- order.servicedInfo['masterOrderNo'] = master_order_id
- order.save()
- return order
- master_order = new_one(self.event_data['order_id'])
- master_order.save()
- sub_orders = []
- for item in self.event_data.get('sub', []):
- sub_order = new_sub_order(item['order_id'], master_order.orderNo)
- sub_orders.append(sub_order)
- sub_order_id_list = [item.orderNo for item in sub_orders]
- has_done = True
- if master_order.status == ConsumeRecord.Status.FINISHED:
- logger.debug('master order<{}> has been done.'.format(repr(master_order)))
- has_done = True
- if master_order.status == ConsumeRecord.Status.CREATED:
- has_done = False
- master_order.association = {'sub': sub_order_id_list}
- if sub_order_id_list:
- master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
-
- else:
- if master_order.status == ConsumeRecord.Status.RUNNING and \
- self.event_data['status'] == ConsumeRecord.Status.FINISHED:
- has_done = False
- if set(master_order.association['sub']) != set(sub_order_id_list):
- master_order.association = {'sub': sub_order_id_list}
- if sub_order_id_list:
- master_order.servicedInfo['subOrderNo'] = ', '.join(sub_order_id_list)
-
- has_done = False
- master_order.isNormal = True
- master_order.status = self.event_data['status']
- if not master_order.startTime:
- if 'sts' in self.event_data:
- master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
- else:
- master_order.startTime = datetime.datetime.now()
- if 'fts' in self.event_data:
- master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
- master_order.save()
- return has_done, master_order, sub_orders
- def merge_order(self, master_order, sub_orders):
- # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
- raise NotImplementedError('must implement merge_order.')
- def do_finished_event(self, order, merge_order_info):
- # type:(ConsumeRecord, dict)->None
- raise NotImplementedError('must implement do_finished_event.')
- def deal_running_order(self, master_order, sub_orders):
- # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
- self.post_before_start(master_order)
-
- try:
- start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
- cache_info = {
- 'port': str(master_order.used_port),
- 'cardId': str(self.card.id),
- 'openId': self.card.openId,
- 'startTime': start_time_str,
- 'isStart': True,
- 'status': Const.DEV_WORK_STATUS_WORKING,
- 'orderNo': master_order.orderNo,
- 'subOrderNos': [item.orderNo for item in sub_orders]
- }
- cache_info.update(self.merge_order(master_order, sub_orders))
- current_cache_info = Device.get_dev_control_cache(self.device.devNo)
- if master_order.used_port < Const.NO_PORT:
- current_port_info = current_cache_info.get(str(master_order.used_port), {})
- if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get('status', 0) == Const.DEV_WORK_STATUS_IDLE:
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
- else:
- if start_time_str > current_port_info.get('startTime', '1970-01-01'):
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
- else:
- pass
- else:
- if not current_cache_info or (master_order.orderNo == current_cache_info.get('orderNo', None)) or current_cache_info.get('status', 0) == Const.DEV_WORK_STATUS_IDLE:
- Device.update_dev_control_cache(self.device.devNo, cache_info)
- else:
- if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
- Device.update_dev_control_cache(self.device.devNo, cache_info)
- else:
- pass
- ServiceProgress.new_progress_for_order(order=master_order,
- device=self.device,
- cache_info = cache_info)
- finally:
- self.post_after_start(order=master_order)
- def deal_finished_order(self, master_order, sub_orders):
- # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
- self.post_before_finish(master_order)
-
- try:
- self.do_finished_event(master_order, self.merge_order(master_order, sub_orders))
- current_cache_info = Device.get_dev_control_cache(self.device.devNo)
- if master_order.used_port < Const.NO_PORT:
- current_port_info = current_cache_info.get(str(master_order.used_port))
- if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
- Device.clear_port_control_cache(devNo = self.device.devNo, port = master_order.used_port)
- else:
- if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
- Device.invalid_device_control_cache(self.device.devNo)
- ServiceProgress.objects(open_id=self.card.openId,
- device_imei=self.device.devNo,
- port=int(master_order.used_port)).update_one(
- upsert=False,
- **{
- 'isFinished': True,
- 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
- tzinfo=settings.TIME_ZONE).timestamp,
- 'expireAt': datetime.datetime.now()
- })
- finally:
- self.post_after_finish(order=master_order)
- def do_impl(self, **args):
- if 'port' not in self.event_data:
- logger.warn('port is not exist.')
- return
- order_id = self.event_data['order_id']
- order_type = self.event_data['order_type']
- if not self.card or not self.card.openId or self.card.frozen:
- logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
- return
- has_done, master_order, sub_orders = self.get_or_create_order()
- if has_done:
- logger.debug('order<{}> has been done.'.format(repr(master_order)))
- return
- if self.event_data['status'] == 'running':
- self.deal_running_order(master_order, sub_orders)
- if self.event_data['status'] == 'finished':
- self.deal_finished_order(master_order, sub_orders)
- class VirtualCardStartAckEvent(AckEvent):
- def __init__(self, smartBox, event_data, pre_processor = None):
- super(VirtualCardStartAckEvent, self).__init__(smartBox, event_data, pre_processor)
- if self.pre_processor:
- self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
- self.card, self.virtualCard = self.get_virtual_card_by_card()
- def get_virtual_card_by_card(self):
- cardNo = self.event_data.get("cardNo")
- card = Card.objects.filter(cardNo=cardNo).first()
- if not card or not card.openId or float(card.balance) != 0:
- logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
- return
- try:
- dealer = Dealer.objects.get(id=card.dealerId)
- agent = Agent.objects.get(id=dealer.agentId)
- features = agent.features
- except Exception as e:
- features = []
- return card, card.bound_virtual_card if "vCardNeedBind" in features else card.related_virtual_card
- def post_after_start(self, order = None):
- raise NotImplementedError('must implement post_after_start.')
- def post_after_finish(self, order = None):
- raise NotImplementedError('must implement post_after_finish.')
- def checkout_order(self, order):
- raise NotImplementedError('must implement checkout_order.')
- def get_or_create_order(self):
- # type:()-> tuple
- def new_one(order_id):
- # type:(str)->ConsumeRecord
- if self.event_data['order_id'] == order_id: # 判断master_order还是sub
- fee = VirtualCoin(self.event_data['fee'])
- else:
- sub = self.event_data.get('sub',[])
- fee = map(lambda x: x['fee'], filter(lambda x: x['order_id'] == order_id, sub))[0]
- order = ConsumeRecord.objects(startKey = order_id).first()
- if not order:
- attach_paras = {'chargeIndex': self.event_data['port']}
- order = ConsumeRecord.new_one(
- order_no = ConsumeRecord.make_no(self.event_data['cardNo'], UserConsumeSubType.VIRTUAL_CARD),
- user = MyUser(openId = self.card.openId, nickname = self.card.nickName),
- device = self.device,
- group = Group.get_group(self.device.groupId),
- package = self.event_data.get('package',{}),
- attach_paras=attach_paras,
- pay_info={
- 'via': 'virtualCard',
- 'itemId': str(self.virtualCard.id),
- 'money': fee.mongo_amount,
- 'coins': fee.mongo_amount,
- 'deduct': []
- },
- start_key=order_id)
- order.servicedInfo = {'cardNo':self.event_data.get('cardNo'),'chargeIndex': self.event_data['port']}
- order.save()
- card_order = CardConsumeRecord.objects(orderNo = order.orderNo).first()
- if not card_order:
- group = Group.get_group(self.device.groupId) # type:GroupDict
- card_order = CardConsumeRecord(orderNo = order.orderNo,
- openId = self.card.openId,
- cardId = str(self.card.id),
- money = VirtualCoin(0).mongo_amount,
- balance = self.card.balance.mongo_amount,
- devNo = self.device.devNo,
- devType = self.device.devTypeName,
- logicalCode = self.device.logicalCode,
- groupId = self.device.groupId,
- address = group.address,
- groupNumber = self.device.groupNumber,
- groupName = group.groupName,
- result = 'success',
- remarks = u'虚拟卡消费',
- dateTimeAdded = datetime.datetime.now(),
- linkedConsumeRcdOrderNo = order.orderNo).save()
- # 生成订单处理卡消费(预付费还是后付费)
- try:
- self.checkout_order(order)
- except Exception as e:
- order.isNormal = False
- order.errorDesc = '订单建立结算失败! ERROR={}'.format(e)
- order.save()
- freeze_user_balance(self.device, group, order, self.virtualCard)
- return order
- def new_sub_order(order_id, master_order_id):
- order = new_one(order_id)
- order.isNormal = True
- order.status = self.event_data['status']
- order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
- if 'fts' in self.event_data:
- order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
- if ('master' not in order.association) or (order.association['master'] != master_order_id):
- order.association = {'master': master_order_id}
- order.save()
- return order
- master_order = new_one(self.event_data['order_id'])
- master_order.save()
- sub_orders = []
- for item in self.event_data.get('sub', []):
- sub_order = new_sub_order(item['order_id'], master_order.orderNo)
- sub_orders.append(sub_order)
- sub_order_id_list = [item.orderNo for item in sub_orders]
- has_done = True
- if master_order.status == ConsumeRecord.Status.FINISHED:
- logger.debug('master order<{}> has been done.'.format(repr(master_order)))
- has_done = True
- if master_order.status == ConsumeRecord.Status.CREATED:
- has_done = False
- master_order.association = {'sub': sub_order_id_list}
- else:
- if master_order.status == ConsumeRecord.Status.RUNNING and \
- self.event_data['status'] == ConsumeRecord.Status.FINISHED:
- has_done = False
- if set(master_order.association['sub']) != set(sub_order_id_list):
- master_order.association = {'sub': sub_order_id_list}
- has_done = False
- master_order.isNormal = True
- master_order.status = self.event_data['status']
- if not master_order.startTime:
- if 'sts' in self.event_data:
- master_order.startTime = datetime.datetime.fromtimestamp(self.event_data['sts'])
- else:
- master_order.startTime = datetime.datetime.now()
- if 'fts' in self.event_data:
- master_order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts'])
- master_order.save()
- return has_done, master_order, sub_orders
- def merge_order(self, master_order, sub_orders):
- # type:(ConsumeRecord, Iterable[ConsumeRecord])->dict
- raise NotImplementedError('must implement merge_order.')
- def do_finished_event(self, order, sub_orders, merge_order_info):
- # type:(ConsumeRecord,list, Iterable[ConsumeRecord])->None
- raise NotImplementedError('must implement do_finished_event.')
- def deal_running_order(self, master_order, sub_orders):
- # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
- try:
- start_time_str = master_order.startTime.strftime(Const.DATETIME_FMT)
- cache_info = {
- 'port': str(master_order.used_port),
- 'cardId': str(self.card.id),
- 'openId': self.card.openId,
- 'startTime': start_time_str,
- 'isStart': True,
- 'status': Const.DEV_WORK_STATUS_WORKING,
- 'orderNo': master_order.orderNo,
- 'subOrderNos': [item.orderNo for item in sub_orders]
- }
- cache_info.update(self.merge_order(master_order, sub_orders))
- current_cache_info = Device.get_dev_control_cache(self.device.devNo)
- if master_order.used_port < Const.NO_PORT:
- current_port_info = current_cache_info.get(str(master_order.used_port), {})
- if not current_cache_info or master_order.orderNo == current_port_info.get('orderNo', None) or current_port_info.get('status', 0) == 0:
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
- else:
- if start_time_str > current_port_info.get('startTime', '1970-01-01'):
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
- else:
- pass
- else:
- if not current_cache_info:
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
- else:
- if master_order.orderNo == current_cache_info.get('orderNo', ''):
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
- else:
- if start_time_str > current_cache_info.get('startTime', '1970-01-01'):
- Device.update_port_control_cache(self.device.devNo, cache_info, updateType = 'overwrite')
- else:
- pass
- ServiceProgress.new_progress_for_order(order = master_order,
- device = self.device,
- cache_info = cache_info)
- finally:
- self.post_after_start(order=master_order)
- def deal_finished_order(self, master_order, sub_orders):
- # type: (ConsumeRecord, Iterable[ConsumeRecord])->None
- try:
- self.do_finished_event(master_order, sub_orders, self.merge_order(master_order, sub_orders))
- current_cache_info = Device.get_dev_control_cache(self.device.devNo)
- if master_order.used_port < Const.NO_PORT:
- current_port_info = current_cache_info.get(str(master_order.used_port))
- if current_port_info and (master_order.orderNo == current_port_info.get('orderNo', '')):
- Device.clear_port_control_cache(devNo = self.device.devNo, port = master_order.used_port)
- else:
- if current_cache_info and (master_order.orderNo == current_cache_info.get('orderNo', '')):
- Device.invalid_device_control_cache(self.device.devNo)
- ServiceProgress.objects(open_id = self.card.openId,
- device_imei = self.device.devNo,
- port = int(master_order.used_port),
- consumeOrder__orderNo = master_order.orderNo).update_one(
- upsert = False,
- **{
- 'isFinished': True,
- 'finished_time': Arrow.fromdatetime(master_order.finishedTime,
- tzinfo = settings.TIME_ZONE).timestamp,
- 'expireAt': datetime.datetime.now()
- })
- finally:
- self.post_after_finish(order=master_order)
- def do_impl(self, **args):
- if 'port' not in self.event_data:
- logger.warn('port is not exist.')
- return
- order_id = self.event_data['order_id']
- order_type = self.event_data['order_type']
- cardType = self.event_data.get('cardType','ID')
- card = self.update_card_dealer_and_type(str(self.event_data['cardNo']), cardType) # type: Card
- if not card or not card.openId or card.frozen:
- logger.debug('card<cardNo={}> is not exists.'.format(self.event_data['cardNo']))
- return
- has_done, master_order, sub_orders = self.get_or_create_order()
- if has_done:
- logger.debug('order<{}> has been done.'.format(repr(master_order)))
- return
- if self.event_data['status'] == 'running':
- self.deal_running_order(master_order, sub_orders)
- if self.event_data['status'] == 'finished':
- self.deal_finished_order(master_order, sub_orders)
- class CardRefundAckEvent(AckEvent):
- def __init__(self, smartBox, event_data, pre_processor):
- super(CardRefundAckEvent, self).__init__(smartBox, event_data, pre_processor)
- if self.pre_processor:
- self.event_data = self.pre_processor.pre_processing(self.device, self.event_data)
- def do_impl(self, **args):
- order_no = self.event_data['order_id']
- order = RechargeRecord.objects(orderNo = order_no).first()
- if order:
- logger.debug('refund order<orderNo={}> has been done.'.format(order_no))
- return
- card = self.update_card_dealer_and_type(self.event_data['cardNo'], self.event_data['cardType']) # type: Card
- if not card:
- logger.debug('card<cardNo={}> is not bound.'.format(self.event_data['cardNo']))
- return
- self.refund_money_for_card(RMB(self.event_data['backMoney']), card.id, order_no)
|