# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import json import logging import time from arrow import Arrow from django.conf import settings from typing import TYPE_CHECKING from apilib.monetary import RMB, VirtualCoin from apilib.utils_string import make_title_from_dict from apps import serviceCache from apps.web.constant import Const, APP_TYPE, DEALER_CONSUMPTION_AGG_KIND, DeviceCmdCode from apps.web.core.exceptions import ServiceException from apps.web.core.networking import MessageSender from apps.web.device.models import Device from apps.web.eventer.base import FaultEvent, WorkEvent, ComNetPayAckEvent, AckEventProcessorIntf, IdStartAckEvent from apps.web.eventer import EventBuilder from apps.web.helpers import get_wechat_auth_bridge from apps.web.user.models import VCardConsumeRecord, CardRechargeOrder, MyUser, ConsumeRecord, \ RechargeRecord, Card if TYPE_CHECKING: from apps.web.device.models import DeviceDict logger = logging.getLogger(__name__) class builder(EventBuilder): def __getEvent__(self, device_event): # 订单机制事件 if 'order_id' in device_event: if device_event['order_type'] == 'com_start': return MyComNetPayAckEvent(self.deviceAdapter, device_event) if device_event['order_type'] == 'ic_recharge': pass if device_event['order_type'] == 'ic_start': pass if device_event['order_type'] == 'id_start': return OnlineCardStartAckEvent(self.deviceAdapter, device_event) if device_event['order_type'] == 'card_refund': pass else: if 'event_type' in device_event: if device_event['event_type'] == 'card': return CardEvent(self.deviceAdapter, device_event) if device_event['event_type'] == 'fault': return JinQueFaultEvent(self.deviceAdapter, device_event) else: return None if 'sCmd' in device_event: return GeneralEvent(self.deviceAdapter, device_event) class CardEvent(WorkEvent): NORMAL = 'normal' # 正常卡 ILLEGAL = 'illegal' # 非法卡 def do(self): sCmd = self.event_data['sCmd'] if sCmd == 2508: self._do_get_balance() else: pass def _do_get_balance(self): card_no = self.event_data['card_no'] cardNo = str(int(card_no, 16)) logger.info('[_do_get_balance] receive cardNo = {}'.format(cardNo)) card = self.update_card_dealer_and_type(cardNo) # type: Card data = {'funCode': 'card', 'status': 'illegal', 'balance': 0, 'cardNo': cardNo, 'card_no': card_no} # 默认为非法卡 dealer = self.device.owner # 无主卡或者是卡被冻结 if not card or not card.openId or card.frozen or not dealer: logger.info('[_do_get_balance] receive cardNo = {}, card invalid!'.format(cardNo)) data.update({'status': self.ILLEGAL}) return self.send_mqtt(data=data) if self.card_is_busy(): data.update({'status': self.ILLEGAL}) return self.send_mqtt(data=data) # 是否存在没有到账的余额 进行充值 card_recharge_order = CardRechargeOrder.get_last_to_do_one(str(card.id)) self.recharge_id_card( card=card, rechargeType='append', order=card_recharge_order ) card.reload() # ongoingList = getattr(card, 'ongoingList', []) # 有冻结未结束的订单 # # # 先不给做一张卡只能开启一单的限制 # if ongoingList: # logger.info( # '[_do_get_balance] receive cardNo = {}, card balance = {} card wroking!'.format(cardNo, card.balance)) # data.update({'status': self.ILLEGAL}) # return self.send_mqtt(data=data) onceIdcardFee = RMB(self.device.otherConf.get('onceIdcardFee', 5)) balance = RMB(card.balance) amount = min(card.balance, onceIdcardFee) data.update({ 'balance': int(balance * 100), # 单位: 分 'amount': int(amount * 100), # 单位: 分 'status': self.NORMAL, 'attach_paras': {}, }) if self.send_mqtt(data=data): self.set_card_busy() def send_mqtt(self, data=None, cmd=DeviceCmdCode.OPERATE_DEV_SYNC, otherData=None): """ 发送mqtt 指令默认210 返回data """ result = MessageSender.send(self.device, cmd, data) if 'rst' in result and result['rst'] != 0: if result['rst'] == -1: raise ServiceException( {'result': 2, 'description': u'该设备正在玩命找网络,请您稍候再试', 'rst': -1}) elif result['rst'] == 1: raise ServiceException( {'result': 2, 'description': u'该设备忙,无响应,请您稍候再试。也可能是您的设备版本过低,暂时不支持此功能', 'rst': 1}) else: if cmd in [DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, DeviceCmdCode.PASSTHROUGH_OPERATE_DEV_NO_RESPONSE]: return return result.get('data', 'ok') def card_is_busy(self): card_no = self.event_data['card_no'] key = 'jinque_{}_{}'.format(self.device.ownerId, card_no) if serviceCache.get(key): return True else: return False def set_card_busy(self): card_no = self.event_data['card_no'] key = 'jinque_{}_{}'.format(self.device.ownerId, card_no) return serviceCache.set(key, '1', 60 * 3) class GeneralEvent(WorkEvent): SUCCESS_01 = '01' BALANCE_NOT_ENOUGH_02 = '02' INVALID_CARD_03 = '03' FREEZE_CARD_04 = '04' def do(self): if self.event_data['sCmd'] == 2503: self._do_update_status() elif 1513 < self.event_data['sCmd'] < 1523: self._do_update_power() elif self.event_data['sCmd'] == 2504: self._do_update_total_elec() def _do_get_balance(self): cardNo = str(int(self.event_data.get('card_no'), 16)) logger.info('receive cardNo:{}'.format(cardNo)) card = self.update_card_dealer_and_type(cardNo) mqtt_data = { 'funCode': self.event_data.get('cmdCode') } if not card or not card.openId: data = self.event_data.get('session') data += '36' data += self.INVALID_CARD_03 data += self.event_data.get('card_no') data += self.event_data.get('fee') data += '{:0>4X}'.format(0) logger.info('no this card! card_no_hex<{}>, cardNo<{}>'.format(self.event_data.get('card_no'), cardNo)) elif card.frozen: data = self.event_data.get('session') data += '36' data += self.FREEZE_CARD_04 data += self.event_data.get('card_no') data += self.event_data.get('fee') data += '{:0>4X}'.format(0) logger.info('card is frozen card_no_hex<{}>, cardNo<{}>'.format(self.event_data.get('card_no'), cardNo)) else: # 是否存在没有到账的余额 进行充值 card_recharge_order = CardRechargeOrder.get_last_to_do_one(str(card.id)) self.recharge_id_card( card=card, rechargeType='append', order=card_recharge_order ) card.reload() fee = RMB(int(self.event_data.get('fee'), 16)) * 0.1 if card.balance >= RMB(fee): data = self.event_data.get('session') data += '36' data += self.SUCCESS_01 data += self.event_data.get('card_no') data += self.event_data.get('fee') data += '{:0>4X}'.format(int((card.balance - fee) * 10)) else: data = self.event_data.get('session') data += '36' data += self.BALANCE_NOT_ENOUGH_02 data += self.event_data.get('card_no') data += self.event_data.get('fee') data += '{:0>4X}'.format(int((card.balance - fee) * 10)) mqtt_data['data'] = data self.send_mqtt(data=mqtt_data, cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE) def send_mqtt(self, data=None, cmd=DeviceCmdCode.OPERATE_DEV_SYNC): """ 发送mqtt 指令默认210 返回data """ result = MessageSender.send(self.device, cmd, data) if 'rst' in result and result['rst'] != 0: if result['rst'] == -1: raise ServiceException( {'result': 2, 'description': u'该设备正在玩命找网络,请您稍候再试', 'rst': -1}) elif result['rst'] == 1: raise ServiceException( {'result': 2, 'description': u'该设备忙,无响应,请您稍候再试。也可能是您的设备版本过低,暂时不支持此功能', 'rst': 1}) else: if cmd in [DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, DeviceCmdCode.PASSTHROUGH_OPERATE_DEV_NO_RESPONSE]: return return result.get('data', 'ok') def _do_update_status(self): STATUS_MAP = { 'idle': 0, 'busy': 1 } status = self.event_data.get('status', {}) ts = self.event_data.get('ts', 0) now = time.time() if now - ts < 10: ctrInfo = Device.get_dev_control_cache(self.device.devNo) for strPort, status in status.items(): if strPort in ctrInfo: ctrInfo[strPort].update({'status': STATUS_MAP.get(status, 1)}) else: ctrInfo[strPort] = {'status': status} Device.update_dev_control_cache(self.device.devNo, ctrInfo) def _do_update_power(self): powers = self.event_data.get('powers', {}) ts = self.event_data.get('ts', 0) now = time.time() if now - ts < 10: ctrInfo = Device.get_dev_control_cache(self.device.devNo) for strPort, power in powers.items(): if power > 0: if strPort in ctrInfo: ctrInfo[strPort].update({'power': power}) else: ctrInfo[strPort] = {'power': power} Device.update_dev_control_cache(self.device.devNo, ctrInfo) def _do_update_total_elec(self): total_elec = round(self.event_data.get('total_elec', 0) * 0.01, 2) ts = self.event_data.get('ts', 0) now = time.time() if now - ts < 10: Device.get_collection().update_one( filter={'devNo': self.device.devNo}, update={'$set': {'otherConf.total_elec': total_elec}} ) class JinQueFaultEvent(FaultEvent): def do(self, **args): # 将告警的消息打入相应的缓存 port = self.event_data.get('port') code = self.event_data.get('code') now = time.time() ts = self.event_data.get('ts', 0) if now - ts < 10: # 0 表示整机 if not port: port = str(0) else: port = str(port) if code == 0x02: statusInfo = '用户正在使用' return elif code == 0x06: statusInfo = '未设置电价' elif code == 0x11: statusInfo = '没有检测到充电器' elif code == 0x12: statusInfo = '输出控制有故障' else: return warningData = { 'warningStatus': 2, 'warningDesc': statusInfo, 'warningTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } Device.update_dev_warning_cache(self.device.devNo, {port: warningData}) super(JinQueFaultEvent, self).do() class StartAckEventPreProcessor(AckEventProcessorIntf): def analysis_reason(self, reason, fault_code=None): FINISHED_CHARGE_REASON_MAP = { 0x00: u'充满自停', 0x01: u'订购的时间已用完', 0x04: u'超负荷断电', 0x10: u'订购的金额已用完', 0x11: u'未检测到充电器,或充电器脱落', # 服务器定义的停止事件 0xF1: u'用户远程停止', 0xF2: u'管理员远程停止', 0xF3: u'检测到充电已停止, 系统结单', } return FINISHED_CHARGE_REASON_MAP.get(reason, '充电结束') def pre_processing(self, device, event_data): # type:(DeviceDict, dict)->dict source = json.dumps(event_data, indent=4) event_data['source'] = source if 'duration' in event_data: duration = event_data.pop('duration', 0) event_data['duration'] = round((duration + 59) / 60.0, 1) if 'fts' in event_data and 'sts' in event_data: duration = event_data['fts'] - event_data['sts'] event_data['duration'] = round((duration + 59) / 60.0, 1) if 'elec' in event_data: event_data['elec'] = round(event_data['elec'] / 3600000, 2) if 'amount' in event_data: pass # 刷卡启动. 需要建单, 需要支付金额 if 'cardNo' in event_data: event_data['fee'] = event_data['amount'] * 0.01 if 'status' in event_data and event_data['status'] == 'finished': event_data['reasonDesc'] = self.analysis_reason(event_data.get('reason')) if 'left' in event_data: pass return event_data class MyComNetPayAckEvent(ComNetPayAckEvent): def __init__(self, smartBox, event_data): super(MyComNetPayAckEvent, self).__init__(smartBox, event_data, StartAckEventPreProcessor()) def post_before_start(self, order=None): # 记录处理的源数据报文 uart_source = getattr(order, 'uart_source', []) uart_source.append({ 'rece_running': self.event_data.get('source'), 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) order.uart_source = uart_source order.save() def post_after_start(self, order=None): pass def post_before_finish(self, order=None): # 记录处理的源数据报文 uart_source = getattr(order, 'uart_source', []) uart_source.append({ 'rece_finished': self.event_data.get('source'), 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) order.uart_source = uart_source order.save() def post_after_finish(self, order=None): pass def merge_order(self, master_order, sub_orders): # type:(ConsumeRecord, list)->dict start_time = Arrow.fromdatetime(master_order.startTime, tzinfo=settings.TIME_ZONE) portDict = { 'coins': str(master_order.package['coins']), 'money': str(master_order.package['price']), 'start_time': start_time.format(Const.DATETIME_FMT), 'estimatedTs': int(start_time.timestamp + 3600 * 12), 'consumeType': 'mobile' } return portDict def do_finished_event(self, master_order, sub_orders, merge_order_info): # type: (ConsumeRecord, [ConsumeRecord], dict)->None self._do_finished(master_order, sub_orders, merge_order_info) def insert_vCard_consume_record(self, vCard, order, success, consumeTotal, consumeDay): try: if success and consumeDay['count'] > 0: record = VCardConsumeRecord( orderNo=VCardConsumeRecord.make_no(order.logicalCode), openId=order.openId, nickname=order.nickname, cardId=str(vCard.id), dealerId=vCard.dealerId, devNo=order.devNo, devTypeCode = order.devTypeCode, devTypeName = order.dev_type_name, logicalCode=order.logicalCode, groupId=order.groupId, address=order.address, groupNumber=order.groupNumber, groupName=order.groupName, attachParas=order.attachParas, consumeData=consumeTotal, consumeDayData=consumeDay ) record.save() except Exception, e: logger.exception(e) def _do_finished(self, order, sub_orders, merge_order_info): # type: (ConsumeRecord, list, dict)->None duration, left, amount = self.event_data.get('duration', 0), self.event_data.get('left', VirtualCoin( 0)), self.event_data.get('amount', VirtualCoin(0)) coins = VirtualCoin(merge_order_info['coins']) # 做一个保护 left 不能超过 amount left = min(amount, left) auto_refund = self.device.is_auto_refund if amount == 0: refundRatio = 0 else: if auto_refund: refundRatio = left * 1.0 / amount else: refundRatio = 0 backCoins = coins * refundRatio logger.debug('{} auto refund enable switch is {}, coins={} refundRatio={}'.format( repr(self.device), auto_refund, coins, refundRatio)) extra = [] extra.append({u'使用详情': u'{}分钟(端口: {} )'.format(duration, order.used_port)}) user = order.user # type: MyUser if order.paymentInfo['via'] == 'free': extra.append({u'消费金额': u'当前设备免费使用'}) elif order.paymentInfo['via'] in ['netPay', 'coins', 'cash', 'coin']: if backCoins > VirtualCoin(0): extra.append({u'消费明细': u'支付{}元,退费{}元'.format(coins, backCoins)}) else: extra.append({u'消费明细': u'支付{}元'.format(coins)}) order_processing_list = [order] + sub_orders for _order in order_processing_list[::-1]: consumeDict = { 'reason': self.event_data.get('reasonDesc', None), 'chargeIndex': str(order.used_port) } need_back_coins, need_consume_coins, backCoins = self._calc_refund_info(backCoins, _order.coin) refundCash = 'refundRMB_device_event' in self.device.owner.features rechargeRcdId = _order.attachParas.get('linkedRechargeRecordId', '') if rechargeRcdId != '': rechargeRcd = RechargeRecord.objects.filter(id=rechargeRcdId, isQuickPay=True).first() else: rechargeRcd = None if refundCash and rechargeRcd: # 退现金特征 + 有充值订单 # 退现金部分 user.clear_frozen_balance(str(_order.id), _order.paymentInfo['deduct'], back_coins=VirtualCoin(0), consume_coins=VirtualCoin(_order.coin)) refundRMB = rechargeRcd.money * refundRatio self.refund_net_pay(user, {'rechargeRcdId': rechargeRcdId, 'openId': user.openId}, refundRMB, VirtualCoin(0), consumeDict, True) else: # 退金币部分 user.clear_frozen_balance(str(_order.id), _order.paymentInfo['deduct'], back_coins=need_back_coins, consume_coins=need_consume_coins) consumeDict.update({ DEALER_CONSUMPTION_AGG_KIND.COIN: _order.coin.mongo_amount, DEALER_CONSUMPTION_AGG_KIND.REFUNDED_COINS: need_back_coins.mongo_amount, }) if _order.orderNo == order.orderNo: consumeDict.update({ DEALER_CONSUMPTION_AGG_KIND.DURATION: duration, # DEALER_CONSUMPTION_AGG_KIND.ELEC: elec, # DEALER_CONSUMPTION_AGG_KIND.ELECFEE: self.deviceAdapter.calc_elec_fee(elec), }) _order.update_service_info(consumeDict) else: logger.error('not net pay rather user virtual card pay. something is wrong.') return auth_bridge = get_wechat_auth_bridge(source=self.device, app_type=APP_TYPE.WECHAT_USER_MANAGER) self.notify_user_service_complete( service_name='充电', openid=user.get_bound_pay_openid(auth_bridge.bound_openid_key), port='', address=order.address, reason=self.event_data.get('reasonDesc'), finished_time=order.finishedTime.strftime('%Y-%m-%d %H:%M:%S'), extra=extra) # 更新一次缓存 self.deviceAdapter.async_update_portinfo_from_dev() def _calc_refund_info(self, backCoins, orderCoin): if backCoins >= orderCoin: need_back_coins = orderCoin need_consume_coins = VirtualCoin(0) backCoins -= orderCoin else: need_back_coins = backCoins need_consume_coins = orderCoin - need_back_coins backCoins = VirtualCoin(0) return need_back_coins, need_consume_coins, backCoins class CardStartAckEventPreProcessor(AckEventProcessorIntf): def analysis_reason(self, reason, fault_code=None): pass def pre_processing(self, device, event_data): # type:(DeviceDict, dict)->dict pass class OnlineCardStartAckEvent(IdStartAckEvent): def __init__(self, smartBox, event_data): super(OnlineCardStartAckEvent, self).__init__(smartBox, event_data, StartAckEventPreProcessor()) def clear_card_busy(self): card_no = self.event_data['card_no'] key = 'jinque_{}_{}'.format(self.device.ownerId, card_no) return serviceCache.delete(key) def post_before_start(self, order=None): self.clear_card_busy() # 记录处理的源数据报文 uart_source = getattr(order, 'uart_source', []) uart_source.append({ 'rece_running': self.event_data.get('source'), 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) order.uart_source = uart_source order.save() def post_after_start(self, order=None): self.card.reload() # 通知用户,已经扣费 title = make_title_from_dict([ {u"设备地址": u"{}".format(self.device.group.address)}, {u"设备编号": u"{}-{}".format(self.device["logicalCode"], order.used_port)}, {u"实体卡": u"{}--No:{}".format(self.card.cardName or self.card.nickName, self.card.cardNo)}, {u"本次消费": u"{} 元".format(order.coin)}, {u"卡余额": u"{} 元".format(self.card.balance)}, ]) start_time_stamp = self.event_data.get("sts") start_time = datetime.datetime.fromtimestamp(start_time_stamp) self.notify_user( self.card.managerialOpenId, 'dev_start', **{ 'title': title, 'things': u'刷卡消费', 'remark': u'感谢您的支持!', 'time': start_time.strftime(Const.DATETIME_FMT) } ) def post_before_finish(self, order=None): self.clear_card_busy() # 记录处理的源数据报文 uart_source = getattr(order, 'uart_source', []) uart_source.append({ 'rece_finished': self.event_data.get('source'), 'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) order.uart_source = uart_source order.save() def post_after_finish(self, order=None): pass def merge_order(self, master_order, sub_orders): # type:(ConsumeRecord, list)->dict start_time = Arrow.fromdatetime(master_order.startTime, tzinfo=settings.TIME_ZONE) portDict = { 'coins': str(master_order.coin), 'money': str(master_order.money), 'start_time': start_time.format(Const.DATETIME_FMT), 'estimatedTs': int(start_time.timestamp + 3600 * 12), 'consumeType': 'card' } return portDict def _do_finished(self, order, merge_order_info): # type: (ConsumeRecord, dict)->None duration, left, amount = self.event_data.get('duration', 0), self.event_data.get('left', VirtualCoin( 0)), self.event_data.get('amount', VirtualCoin(0)) coins = VirtualCoin(merge_order_info['coins']) # 做一个保护 left 不能超过 amount left = min(amount, left) auto_refund = self.device.is_auto_refund if amount == 0: refundRatio = 0 else: if auto_refund: refundRatio = left * 1.0 / amount else: refundRatio = 0 _BACK = backCoins = coins * refundRatio logger.debug('{} auto refund enable switch is {}, coins={} refundRatio={}'.format( repr(self.device), auto_refund, coins, refundRatio)) # 分批塞入订单信息 master_info = { 'order_id': self.event_data['order_id'], 'fee': order.coin, } order_processing_list = [master_info] if 'sub' in self.event_data: order_processing_list += self.event_data['sub'] # 订单服务信息与退款处理 for _info in order_processing_list[::-1]: consumeDict = { 'reason': self.event_data.get('reasonDesc', None), } _order = ConsumeRecord.objects.filter(devNo=self.device.devNo, startKey=_info['order_id']).first() if not _order: continue consumeDict.update({ DEALER_CONSUMPTION_AGG_KIND.CONSUME_CARD: _order.coin, }) # 全退 if backCoins >= VirtualCoin(_order.coin): consumeDict.update({ DEALER_CONSUMPTION_AGG_KIND.CONSUME_CARD: _order.coin.mongo_amount, DEALER_CONSUMPTION_AGG_KIND.REFUND_CARD: _order.coin.mongo_amount, }) self.card.clear_frozen_balance(str(_order.id), _order.coin) self.record_refund_money_for_card(_order.coin, str(self.card.id), orderNo=order.orderNo) backCoins -= _order.coin else: # 部分退 consumeDict.update({ DEALER_CONSUMPTION_AGG_KIND.CONSUME_CARD: _order.coin.mongo_amount, DEALER_CONSUMPTION_AGG_KIND.REFUND_CARD: backCoins.mongo_amount, }) self.card.clear_frozen_balance(str(_order.id), backCoins) self.record_refund_money_for_card(backCoins, str(self.card.id), orderNo=order.orderNo) backCoins = VirtualCoin(0) if _order.orderNo == order.orderNo: consumeDict.update({ DEALER_CONSUMPTION_AGG_KIND.DURATION: duration, # DEALER_CONSUMPTION_AGG_KIND.ELEC: elec, # DEALER_CONSUMPTION_AGG_KIND.ELECFEE: self.deviceAdapter.calc_elec_fee(elec), }) _order.update_service_info(consumeDict) self.card.reload() extra = [] extra.append({u'在线卡片': '{}--No:{}'.format(self.card.cardName, self.card.cardNo)}) extra.append({u'使用详情': u'{}分钟(端口: {} )'.format(duration, order.used_port)}) if _BACK > VirtualCoin(0): extra.append({u'消费明细': '支付{}元,退费{}元'.format(coins, _BACK)}) else: extra.append({u'消费明细': '支付{}元'.format(coins)}) extra.append({u'卡片余额': '{}元'.format(self.card.balance)}) self.notify_user_service_complete( service_name='充电', openid=self.card.managerialOpenId, port='', address=order.address, reason=self.event_data.get('reasonDesc'), finished_time=order.finishedTime.strftime('%Y-%m-%d %H:%M:%S'), extra=extra) # 更新一次缓存 self.deviceAdapter.async_update_portinfo_from_dev() def do_finished_event(self, order, merge_order_info): # type:(ConsumeRecord, dict)->None self._do_finished(order, merge_order_info) def checkout_order(self, order): # 在线卡 执行扣费 fee = VirtualCoin(order.coin) self.card.freeze_balance(transaction_id=str(order.id), fee=fee) def _calc_refund_info(self, backCoins, orderCoin): if backCoins >= orderCoin: need_back_coins = orderCoin need_consume_coins = VirtualCoin(0) backCoins -= orderCoin else: need_back_coins = backCoins need_consume_coins = orderCoin - need_back_coins backCoins = VirtualCoin(0) return need_back_coins, need_consume_coins, backCoins