# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging from typing import TYPE_CHECKING from apilib.monetary import RMB from apps.web.common.models import District from apps.web.constant import DeviceCmdCode, CONSUMETYPE, FAULT_CODE, FAULT_LEVEL from apps.web.core.exceptions import ServiceException from apps.web.core.networking import MessageSender from apps.web.device.models import Device, PortReport, Group, Part from apps.web.eventer.base import FaultEvent, WorkEvent, AckEventProcessorIntf from apps.web.eventer import EventBuilder from apps.web.eventer.policy_common import PolicyComNetPayAckEvent, PolicyOnlineCardStartAckEvent, \ StartAckEventPreProcessor from apps.web.south_intf.liangxi_fire import LiangXiXiaoFang from apps.web.south_intf.zhejiang_fire import send_event_to_zhejiang from apps.web.user.models import CardRechargeOrder, Card if TYPE_CHECKING: from apps.web.device.models import DeviceDict logger = logging.getLogger(__name__) class builder(EventBuilder): def __getEvent__(self, device_event): if 'order_id' in device_event: if device_event['order_type'] == 'com_start': return MyComNetPayAckEvent(self.deviceAdapter, device_event) if device_event['order_type'] == 'ic_recharge': pass if device_event['order_type'] == 'id_start': return OnlineCardStartAckEvent(self.deviceAdapter, device_event) if device_event['order_type'] == 'card_refund': pass else: if 'event_type' in device_event: if device_event['event_type'] == 'card': return CardEvent(self.deviceAdapter, device_event) if 'data' not in device_event: return None # 100228 互感器事件处理 if 'type' in device_event: if device_event['type'] == 'alert': return InteroperatorAlertEvent(self.deviceAdapter, device_event) if device_event['type'] == 'report': return InteroperatorReport(self.deviceAdapter, device_event) else: event_data = self.deviceAdapter.analyze_event_data(device_event['data']) if event_data is None or 'cmdCode' not in event_data: return None if 'today_coins' in device_event and 'ts' in device_event: event_data.update({'today_coins': device_event['today_coins']}) event_data.update({'ts': device_event['ts']}) if event_data.get('cmdCode') == '35' or event_data.get('cmdCode') == '41': return ZHIXIA2InductorEvent(self.deviceAdapter, event_data) return None class CardEvent(WorkEvent): def do(self): if self.event_data['funCode'] == '22': self._do_get_balance() def _do_get_balance(self): cardNo = str(int(self.event_data['card_id'], 16)) money = self.event_data['money'] * 0.1 logger.info('[_do_get_balance] receive cardNo = {}'.format(cardNo)) card = self.update_card_dealer_and_type(cardNo) # type: Card data = {'funCode': '23', 'card_id': self.event_data['card_id'], 'balance': 0, 'card_type': self.event_data['card_type']} # 十六进制卡号 dealer = self.device.owner # 无主卡或者是卡被冻结 if not card or not card.openId or card.frozen or not dealer: logger.info('[_do_get_balance] receive cardNo = {}, card invalid!'.format(cardNo)) data.update({'result': 1}) return self.send_mqtt(data=data) # 是否存在没有到账的余额 进行充值 card_recharge_order = CardRechargeOrder.get_last_to_do_one(str(card.id)) self.recharge_id_card( card=card, rechargeType='append', order=card_recharge_order ) card.reload() # ongoingList = getattr(card, 'ongoingList', []) # 有冻结未结束的订单 # # # 先不给做一张卡只能开启一单的限制 if card.balance <= RMB(money): logger.info( '[_do_get_balance] receive cardNo = {}, card balance = {} not enough!'.format(cardNo, card.balance)) data.update({'result': 1}) return self.send_mqtt(data=data) data.update({ 'balance': int(RMB(card.balance) * 10), # 单位: 角 'attach_paras': {'openId': card.openId}, }) data.update(self.generate_rule()) if RMB(card.balance) > RMB(money): data.update({'result': 1}) else: data.update({'result': 2}) self.send_mqtt(data=data) def send_mqtt(self, data=None, cmd=DeviceCmdCode.OPERATE_DEV_SYNC): """ 发送mqtt 指令默认210 返回data """ result = MessageSender.send(self.device, cmd, data) if 'rst' in result and result['rst'] != 0: if result['rst'] == -1: raise ServiceException( {'result': 2, 'description': u'该设备正在玩命找网络,请您稍候再试', 'rst': -1}) elif result['rst'] == 1: raise ServiceException( {'result': 2, 'description': u'该设备忙,无响应,请您稍候再试。也可能是您的设备版本过低,暂时不支持此功能', 'rst': 1}) else: if cmd in [DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, DeviceCmdCode.PASSTHROUGH_OPERATE_DEV_NO_RESPONSE]: return return result.get('data', 'ok') def generate_rule(self): forIdcard = self.device.policyTemp.get('forIdcard', {}) policyType = forIdcard.get('policyType', 'time') billingMethod = forIdcard.get('billingMethod', 'prepaid') account_type = 'real' # account_type(max, real, min) 计费的规则是最大功率计费,还是实时功率计费 if 'card_accumulate_max_power' in self.device.owner.features: account_type = 'max' policy = {'billing_method': billingMethod} if billingMethod == CONSUMETYPE.POSTPAID: # 后付费 if policyType == 'time': policy.update({ 'rule_type': 'TIME_ELEC_MONEY', 'rule': { 'account_type': account_type, 'step': self.deviceAdapter.get_uart_step(is_card=True) }}) elif policyType == 'elec': elecFee = float(forIdcard.get('rule', {}).get('price', 1)) # 电费单价, 用于显示金额 policy.update({ 'rule_type': 'ELEC', 'rule': { 'account_type': account_type, 'elec_fee': int(elecFee * 100) } }) else: raise ServiceException({'result': 2, 'description': u'套餐单位错误,请联系经销商'}) else: # 预付费 money = self.event_data['money'] * 0.1 if policyType == 'time': policy.update({ 'rule_type': 'MONEY', 'rule': { 'account_type': account_type, 'amount': int(money * 100), 'step': self.deviceAdapter.get_uart_step(is_card=True) }, }) elif policyType == 'elec': elecFee = float(forIdcard.get('rule', {}).get('price', 1)) # 电费单价, 用于显示金额 policy.update({ 'rule_type': 'MONEY_BY_ELEC', 'rule': { 'account_type': account_type, 'amount': int(money * 100), 'elec_fee': int(elecFee * 100) }, }) else: raise ServiceException({'result': 2, 'description': u'套餐单位错误,请联系经销商'}) return policy class ZHIXIA2InductorEvent(FaultEvent): def do(self, **args): # 处理数据 voltage = self.event_data.get('voltage', -1) temperature = self.event_data.get('temperature', -1) smokeWarning = self.event_data.get('smokeWarning', False) try: otherConf = Device.objects.get(devNo=self.device.devNo).otherConf maxVoltage = otherConf.get('voltageThre', 230) if voltage >= maxVoltage: desc = u'当前电压%s伏超过了门限值:%s伏' % (voltage, maxVoltage) self.record(faultCode=FAULT_CODE.OVER_VOLTAGE, description=desc, title=u'主机电压过高', detail=None, level=FAULT_LEVEL.CRITICAL) maxTemperature = otherConf.get('temThre', 60) if temperature > maxTemperature: desc = u'当前主机温度%s度超过了门限值:%s度' % (temperature, maxTemperature) self.record(faultCode=FAULT_CODE.OVER_TEMPERATURE, description=desc, title=u'主机温度过高', detail=None, level=FAULT_LEVEL.CRITICAL) self.send_fault_to_xf() if smokeWarning: desc = u'当前主机出现冒烟,请第一时间确定是否有起火。此告警十万火急,请迅速联系物业、消防相关部门!' self.record(faultCode=FAULT_CODE.SMOKE, description=desc, title=u'主机出现冒烟', detail=None, level=FAULT_LEVEL.FATAL) self.send_fault_to_xf() except Exception as e: logger.error('some error=%s' % e) return def send_fault_to_xf(self): try: code = self.event_data.get('cmdCode') # 无锡 梁溪区 粤万通 消防对接 推送 if code == '41': faultCode = '02' faultContent = u'烟雾报警' elif code == '35': faultCode = '01' faultContent = u'温度报警' else: return from taskmanager.mediator import task_caller task_caller('send_to_xf_falut', dealerId=self.device.ownerId, devNo=self.device.devNo, faultCode=faultCode, faultContent=faultContent) except Exception: pass class InteroperatorAlertEvent(FaultEvent): def __Analyze_alert_data(self, data): alertInfo = {'cmdCode': data['cmd'], 'logicalCode': self.device['logicalCode']} address = Group.get_group(self.device['groupId'])['address'] # 这里判断数据格式 if 'status' not in data: logger.error('Data arrays have no keywords status') return # 这里做漏电告警处理 if '5' in data['status']: electricityNum = str(int(data['values'][0:4], 16)) + 'mA' alertInfo['electricity'] = {'electricityNum': electricityNum, 'address': address, 'reasonCode': '12', 'reason': u'在{}编号为{}发生漏电,漏电量为{}' .format(address, self.device['logicalCode'], electricityNum)} # 这里做高温告警处理 if '6' in data['status']: temperatureAccess = [index for index, acces in enumerate(data['status'], 1) if acces == '6'] temperatureAlertList = [] for i in temperatureAccess: temperatureValue = str(int(data['values'][(i - 1) * 4:(i - 1) * 4 + 4], 16)) temperatureAlertList.append( {'temperatureValue': temperatureValue, 'address': address, 'reasonCode': '11', 'reason': u'在{}编号为{}的设备有高温预警,当前温度为{}摄氏度' .format(address, self.device['logicalCode'], temperatureValue)}) alertInfo['temperature'] = temperatureAlertList return alertInfo def do(self, **args): # 判断不存在的设备网上报 if not self.device.ownerId: logger.error('This device cant find a dealer') return # 是否存在温感和电感 temperaturePart = Part.objects(logicalCode=self.device['logicalCode'], partType='3001') electricityPart = Part.objects(logicalCode=self.device['logicalCode'], partType='3002') if not temperaturePart.count() or not electricityPart.count(): logger.error( 'There are no transformers in the locigalcode {} equipment'.format(self.device['logicalCode'])) return # 处理数据 eventInfo = self.__Analyze_alert_data(self.event_data['data']) try: # 先处理高温情况 if 'temperature' in eventInfo: for InfoDetail in eventInfo['temperature']: send_event_to_zhejiang(self.dealer, self.device, InfoDetail, partId=temperaturePart[0].id) # 提示用户 group = Group.get_group(self.device['groupId']) self.notify_dealer('device_fault', **{ 'title': u'注意!注意!您的设备发生故障', 'device': u'组号::%s, 二维码编号:%s' % (self.device['groupNumber'], self.device['logicalCode']), 'location': u'组名称:%s, 地址:%s' % (group['groupName'], group['address']), 'fault': InfoDetail['reason'], 'notifyTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 上报高温至消防 # if self.device["ownerId"] in ("5b4ed32e8732d67bd0626528", "5b6c29388732d669f3ae6f94"): group = Group.get_group(self.device['groupId']) districtInfo = District.get_district(group["districtId"]) self.device.update({"districtInfo": districtInfo, "groupAddr": group["address"]}) LiangXiXiaoFang.send_to_xf_fault(self.device, "01", u"设备温度过高") # 处理漏电情况 elif 'electricity' in eventInfo: # 获取漏电告警插件 send_event_to_zhejiang(self.dealer, self.device, eventInfo['electricity'], partId=electricityPart[0].id) # 提示用户 group = Group.get_group(self.device['groupId']) self.notify_dealer('device_fault', **{ 'title': u'注意!注意!您的设备发生故障', 'device': u'组号::%s, 二维码编号:%s' % (self.device['groupNumber'], self.device['logicalCode']), 'location': u'组名称:%s, 地址:%s' % (group['groupName'], group['address']), 'fault': eventInfo['electricity']['reason'], 'notifyTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) # 上报漏电至消防 # if self.device["ownerId"] in ("5b4ed32e8732d67bd0626528", "5b6c29388732d669f3ae6f94"): group = Group.get_group(self.device['groupId']) districtInfo = District.get_district(group["districtId"]) self.device.update({"districtInfo": districtInfo, "groupAddr": group["address"]}) LiangXiXiaoFang.send_to_xf_fault(self.device, "04", u"设备发生漏电") except: logger.error('Array {} nonspecification'.format(eventInfo)) return self.record(detail=eventInfo) class InteroperatorReport(WorkEvent): def do(self, **args): if 'type' not in self.event_data: logger.error('Array {} is not format,lose a key named "type"'.format(self.event_data)) if self.event_data.get('type') == 'report': devReportDict = {'logicalCode': 'logicalCode', 'time': self.event_data['time_stamp'], 'portInfo': {}} temperature = '' voltage = 220 try: # 拿到个数判断是不是第一次 reportNum = PortReport.get_collection().find({ 'logicalCode': self.device['logicalCode'] }).sort('time', -1).count() if reportNum: # 获取上一次存储的信息 reportLast = PortReport.get_collection().find({ 'logicalCode': self.device['logicalCode'] }).sort('time', -1)[0] for ii in range(10): power = self.__saveDate(1, msgDict=self.event_data, ii=ii) if power: electricity = float(power) / voltage / 10 else: electricity = reportLast['portInfo'][str(ii + 1)]['electricity'] temperatureR = self.__saveDate(2, msgDict=self.event_data, ii=ii, electricity=electricity, devReportDict=devReportDict) if temperatureR: temperature = temperatureR devReportDict.update({'temperature': temperature}) # 查看现在的跟以前差距多少 timeInterval = devReportDict['time'] - reportLast['time'] if timeInterval > 2: PortReportNewList = [ {"logicalCode": self.device['logicalCode'], "temperature": reportLast['temperature'], 'portInfo': reportLast['portInfo'], 'time': reportLast['time'] + (v + 1) * 2} for v in range(int(timeInterval / 2) - 1)] PortReport.get_collection().insert_many(PortReportNewList) # 首存的情况 else: for ii in range(10): power = self.__saveDate(1, msgDict=self.event_data, ii=ii) electricity = float(power) / voltage / 10 temperatureR = self.__saveDate(2, msgDict=self.event_data, ii=ii, electricity=electricity, devReportDict=devReportDict) if temperatureR: temperature = temperatureR devReportDict.update({'temperature': temperature}) except Exception as e: logger.error('solve dev=%s device report has an error e=%s' % (self.device.devNo, e)) finally: newInfo = PortReport( logicalCode=self.device['logicalCode'], temperature=devReportDict['temperature'], time=devReportDict['time'], portInfo=devReportDict['portInfo'] ) newInfo.save() def __saveDate(self, data, msgDict, ii, electricity=None, devReportDict=None): # 存储数据库 if data == 1: powerData = msgDict['data']['power_data'][0 + 4 * ii:4 + 4 * ii] power = int(powerData, 16) return power if data == 2: temperature = '' status = 'idle' if electricity == 0 else 'busy' devReportDict['portInfo'].update( {str(ii + 1): {'electricity': round(electricity, 3), 'status': status}}) if ii < 4 and msgDict['data']['temp_data'][0 + 4 * ii:4 + 4 * ii] != '0000': temperatureNum = msgDict['data']['temp_data'][0 + 4 * ii:4 + 4 * ii] temperature = int(temperatureNum, 16) return temperature class AckEventPreProcessor(StartAckEventPreProcessor): def analysis_reason(self, reason, fault_code=None): FINISHED_CHARGE_REASON_MAP = { 0: u'购买的充电时间或电量用完了。', 1: u'插头被拔或者松动,或者电瓶已经充满(系统判断为异常断电,电瓶车充电器种类繁多,可能存在误差)', 2: u'电池已经充满', 3: u'设备或者端口故障。建议您根据已经充电的时间评估是否需要到现场换到其他端口充电', 4: u'警告!您的电池功率超过本机最大限制。为了公共安全,不建议您在该充电桩充电', 5: u'刷卡退费结束', 6: u'可能是插头被拔掉或者未连接充电器。如果不是自己操作,建议您到现场检查是否有人误操作', # 7: u'远程方式停止充电。如果不是自己操作,建议到现场尽快恢复充电', # 服务器定义的停止事件 0xC0: u'计费方式无效', 0xC1: u'订购套餐已用完', 0xC2: u'订购时间已用完', 0xC3: u'订购电量已用完', 0xC4: u'动态计算功率后, 时间已用完', 0xC5: u'订单异常,设备可能离线超过1小时, 平台结单', 0xC6: u'系统检测到充电已结束, 平台结单(0X24)', 0xC7: u'系统检测到充电已结束, 平台结单(0X34)', 0xC8: u'用户远程停止订单', 0xC9: u'经销商远程停止订单', 0xCA: u'系统检测到订单已结束, 平台结单(0xCA)', 0xCB: u'充电时长已达到最大限制(0xCB)', 0xCC: u'充电电量已达到最大限制(0xCC)', 0xCD: u'设备升级中... 关闭当前订单', } return FINISHED_CHARGE_REASON_MAP.get(reason, '充电结束') class MyComNetPayAckEvent(PolicyComNetPayAckEvent): def __init__(self, smartBox, event_data): super(PolicyComNetPayAckEvent, self).__init__(smartBox, event_data, AckEventPreProcessor()) class CardStartAckEventPreProcessor(AckEventProcessorIntf): def analysis_reason(self, reason, fault_code=None): pass def pre_processing(self, device, event_data): # type:(DeviceDict, dict)->dict pass class OnlineCardStartAckEvent(PolicyOnlineCardStartAckEvent): def __init__(self, smartBox, event_data): super(PolicyOnlineCardStartAckEvent, self).__init__(smartBox, event_data, AckEventPreProcessor())