# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging import re from mongoengine import DoesNotExist from typing import TYPE_CHECKING from apps.web.constant import Const from apps.web.core.exceptions import ServiceException from apps.web.core.utils import delay_async_operation from apps.web.device.models import Group, Device, Battery from apps.web.eventer import EventBuilder from apps.web.eventer.base import WorkEvent from apps.web.user.models import ServiceProgress, ConsumeRecord, MyUser logger = logging.getLogger(__name__) if TYPE_CHECKING: from apps.web.core.adapter.ywt_huandiangui import ChargeCabinet class builder(EventBuilder): def __getEvent__(self, device_event): return HuanDianGui(self.deviceAdapter, device_event) class HuanDianGui(WorkEvent): def __init__(self, smartBox, event_data): super(HuanDianGui, self).__init__(smartBox, event_data) self.deviceAdapter = smartBox # type:ChargeCabinet # 登记设备 def register_dev(self): server, port, proto = self.device.network_address devObj = Device.objects.get(devNo=self.device['devNo']) # 获取sim卡,卡号 msgInfo = self.get_msg_info() devObj.iccid = msgInfo['iccid'] devObj.softVer = msgInfo['driverVersion'] devObj.driverCode = Const.DEVICE_TYPE_CODE_YWT_HUANDIANGUI # 驱动必须赋值 devObj.save() Device.invalid_device_cache(self.device['devNo']) Device.update_online_cache(self.device['devNo'], True, 32) # 设备没有返回信号量,所以就用32 def handle_heartbeat(self): data = self.event_data.get('data', '') logicalCode = data[14:30] temperature = int(data[30:32], 16) water_sensor = int(data[32:34], 16) smoke_sensor = int(data[34:36], 16) Device.update_dev_control_cache(self.device.devNo, { 'temperature': temperature, 'water_sensor': water_sensor, 'smoke_sensor': smoke_sensor, }) Device.update_online_cache(self.device['devNo'], True, 32) # 设备没有返回信号量,所以就用32 # 刷新端口详细信息 def update_order_info(self): pass def do(self, **args): data = self.event_data['data'] funCode = data[12:14] if funCode == '80': # 注册设备 logger.info('heartbaet...') self.handle_heartbeat() elif funCode == '81': # 注册设备 self.register_dev() elif funCode == '85': logger.info('door status change') self.hand_door_change() def get_msg_info(self): pass data = self.event_data.get('data') result = {} # devNo = data[14:30] driverVersion = data[30:34] iccid = data[34:74] result['iccid'] = self.fromHex(iccid) result['driverVersion'] = driverVersion return result def fromHex(self, send_str): return reduce(lambda a, b: a + b, map(lambda _: chr(int(_, 16)), re.findall('..', send_str))) def hand_door_change(self): data = self.event_data.get('data') addr = int(data[10:12], 16) # 实际上是addr state_of_lock = int(data[16:18], 16) == 1 and 'lock' or 'open' self.event_data['addr'] = format(addr, 'd') self.event_data['state_of_lock'] = state_of_lock battery_info = {} try: # 检查电池是否已经放入了 battery_info = self.deviceAdapter._battery_info(addr) except: pass self.event_data['battery_id'] = battery_info.get("battery_id") if state_of_lock == 'open': # 开 门 self.hand_opened() elif state_of_lock == 'lock': # 关门 self.hand_closed() def hand_closed(self): """ 处理柜门被关闭的事件 完整的换电记录也会有两次关门事件上报 第一次上报的关门 是客户放入电池的关门 此次事件相对重要 需要反复校验电池是否存在 第二次上报的关门 是作为服务结束的标志 从事件中标志第一次还是第二次关门的有两种条件 第一个是记录是否完整 第二个是是否有电池 :return: """ devCache = Device.get_dev_control_cache(self.device.devNo) currentUseInfo = devCache.get("currentUseInfo", dict()) addr = self.event_data.get("addr") battery_id = self.event_data.get("battery_id") # 开启放入电池的电池编号 try: self.deviceAdapter._start_charging(addr) except Exception as e: pass # 首先检验 currentUseInfo 中的订单信息,如果没有订单信息 则是无效信息 这个地方直接将orderNo弹出 避免再次检测到有效事件 orderNo = currentUseInfo.get("orderNo", "") if not orderNo: return try: ConsumeRecord.objects.get(orderNo=orderNo) except DoesNotExist: logger.error("orderNo does not exists, orderNo is <{}>".format(orderNo)) # 没有订单的关门事件 不做任何处理 保留一切信息 return # 首先检验当前使用信息中是否有 chargeIndex2 如果说已经存在chargeIndex2的情况下 那就一定是第二次上报的关门 chargeIndex1 = currentUseInfo.get("chargeIndex1") chargeIndex2 = currentUseInfo.get("chargeIndex2") # 不应该出现的情况 if not chargeIndex1 and not chargeIndex2: # 将下一个用户的扫码开启端口设置为None 下次扫码的时候再次获取 currentUseInfo.pop("orderNo", None) Device.clear_port_control_cache(self.device.devNo, "currentUseInfo") Device.update_dev_control_cache(self.device.devNo, {"currentUseInfo": currentUseInfo}) logger.error("currentUseInfo chargeIndex info missing, curInfo is {}".format(currentUseInfo)) return # 第一次的关门事件 # 2020-9-27 安琦新增需求 用户放入电池不是上一次取走的电池 直接给经销商告警 elif chargeIndex1 and not chargeIndex2: # 上报的信息不是本订单的信息,直接忽略 if addr != chargeIndex1: self.delay_clear_current_info() return logger.info("<{}> order first closed".format(orderNo)) # 再次获取之后还是没有找到电池的SN说明电池没有被放入, 这个地方不应该在进行下一步的操作,直接将此次的行为关闭掉 if not battery_id: consumeDict = { "chargeIndex2": "-1", "oldBatteryImei": "-1", "newBatteryImei": "-1", "chargeIndex1": chargeIndex1 } ServiceProgress.update_progress_and_consume_rcd( self.device["ownerId"], { "open_id": currentUseInfo.get("openId"), "device_imei": self.device["devNo"], "isFinished": False }, consumeDict ) # 通知用户服务结束 不清除使用信息 需要经销商介入 consumeDict.update({"openId": currentUseInfo.get("openId")}) self.notify_to_user(consumeDict) currentUseInfo.pop("orderNo", None) Device.clear_port_control_cache(self.device.devNo, "currentUseInfo") Device.update_dev_control_cache(self.device.devNo, {"currentUseInfo": currentUseInfo}) # 由于用户没有放入电池 这个地方下一个开启的, nextPort不变 延时后清除正在使用的信息 self.delay_clear_current_info() return # 获取到了相应的电池的sn,继续下一步服务 进行柜门的开启, 开启的柜门在用户开启空门的时候已经确定下来了,在currentUseInfo字段里 else: # 获取上一次的消费记录里面的电池编号 openId = currentUseInfo.get("openId", "") last_battery_id = Battery.get_user_last_battery_sn(openId, self.device.ownerId) if last_battery_id and last_battery_id != battery_id: self.warning_to_dealer_diff_battery(battery_id, last_battery_id, openId) battery = Battery.get_one(self.device.ownerId, battery_id) if battery and battery.disable: consumeDict = { "chargeIndex2": "-1", "oldBatteryImei": battery_id, "newBatteryImei": "-1", "chargeIndex1": chargeIndex1 } ServiceProgress.update_progress_and_consume_rcd( self.device["ownerId"], { "open_id": currentUseInfo.get("openId"), "device_imei": self.device["devNo"], "isFinished": False }, consumeDict ) # 通知用户服务结束 不清除使用信息 需要经销商介入 consumeDict.update({"openId": currentUseInfo.get("openId")}) self.notify_to_user(consumeDict) currentUseInfo.pop("orderNo", None) Device.clear_port_control_cache(self.device.devNo, "currentUseInfo") Device.update_dev_control_cache(self.device.devNo, {"currentUseInfo": currentUseInfo}) # 将下一个空仓 置空 self.update_next_port(None) self.delay_clear_current_info() # TODO 告知经销商 警告电池已经被锁定 logger.info("disable battery <{}> has been locked!".format(battery.batterySn)) self.warning_to_dealer_disable_battery(battery_id, openId) return chargeIndex2 = currentUseInfo.pop("toOpenChargeIndex") battery_id2 = currentUseInfo.pop("toOpenBatteryId") try: self.deviceAdapter._open_door(chargeIndex2) except ServiceException: # 有可能出现的是模块和设备交互失败导致开门失败 或者网络通信不好,这个时候直接通知用户联系经销商处理 consumeDict = { "chargeIndex2": "-1", "oldBatteryImei": battery_id, "newBatteryImei": "-1", "chargeIndex1": chargeIndex1 } ServiceProgress.update_progress_and_consume_rcd( self.device["ownerId"], { "open_id": currentUseInfo.get("openId"), "device_imei": self.device["devNo"], "isFinished": False }, consumeDict ) # 通知用户服务结束 但不清除用户的使用信息 此时一定需要经销商介入处理 consumeDict.update({"openId": currentUseInfo.get("openId")}) self.notify_to_user(consumeDict) currentUseInfo.pop("orderNo", None) Device.clear_port_control_cache(self.device.devNo, "currentUseInfo") Device.update_dev_control_cache(self.device.devNo, {"currentUseInfo": currentUseInfo}) self.delay_clear_current_info() # 将下一个空仓 置空 self.update_next_port(None) return currentUseInfo.update({ "chargeIndex2": chargeIndex2, "oldBatteryImei": battery_id, "newBatteryImei": battery_id2 }) Device.update_dev_control_cache(self.device.devNo, {"currentUseInfo": currentUseInfo}) self.update_next_port(chargeIndex2) # 这个地方更新电池的信息 logger.error("the first battery id is {}".format(battery_id)) battery = Battery.get_one(self.device.ownerId, battery_id) if battery: battery.update_dev_info(self.device.devNo, addr) return # 不应该出现的情况 elif not chargeIndex1 and chargeIndex2: currentUseInfo.pop("orderNo", None) Device.clear_port_control_cache(self.device.devNo, "currentUseInfo") Device.update_dev_control_cache(self.device.devNo, {"currentUseInfo": currentUseInfo}) logger.error("chargeIndex1 missing but chargeIndex2 exists, curInfo is {}".format(currentUseInfo)) return # 第二次的关门事件 else: # 上报的信息不是本订单的信息,直接忽略 if addr != chargeIndex2: return # 第二次关门的时候也需要检测里面是否有电池信息,是为了nextPort 防止nextPort开启有电池的门, 但是不再需要反复检测 consumeDict = { "chargeIndex2": currentUseInfo.get("chargeIndex2"), "oldBatteryImei": currentUseInfo.get("oldBatteryImei"), "newBatteryImei": currentUseInfo.get("newBatteryImei"), "chargeIndex1": currentUseInfo.get("chargeIndex1") } ServiceProgress.update_progress_and_consume_rcd( self.device["ownerId"], { "open_id": currentUseInfo.get("openId"), "device_imei": self.device["devNo"], "isFinished": False }, consumeDict ) consumeDict.update({"openId": currentUseInfo.get("openId")}) self.notify_to_user(consumeDict) # 如果用户有放入电池 这个地方就是正常的结束 清空使用信息,下一个用户继续使用 否则不清空,等待经销商介入 if not battery_id: Device.clear_port_control_cache(self.device.devNo, "currentUseInfo") # 将电池信息更新 battery = Battery.get_one(dealerId=self.device.ownerId, batterySn=currentUseInfo.get("newBatteryImei")) if battery: battery.update_user_info(currentUseInfo.get("openId")) else: # 保留一段时间的信息 currentUseInfo.pop("orderNo", None) Device.clear_port_control_cache(self.device.devNo, "currentUseInfo") Device.update_dev_control_cache(self.device.devNo, {"currentUseInfo": currentUseInfo}) self.delay_clear_current_info() # 电池还在柜子里面 battery = Battery.get_one(dealerId=self.device.ownerId, batterySn=battery_id) if battery: battery.update_dev_info(self.device.devNo, addr) self.update_next_port(addr) # 更新下一个开门的端口 logger.info("<{}> order second closed".format(orderNo)) def hand_opened(self): """ 通过非指令手段开门,锁控板才会上传开门指令 :return: """ portStr = self.event_data.get("portStr") batterySn = self.event_data.get("batteryImei", "") self.warning_to_dealer(portStr, batterySn) # 出现这种非法开门之后,直接更新一个无效的currentUseInfo 将设备锁死 # currentUseInfo = {"openId": "invalid user"} # if batterySn: # currentUseInfo.update({"chargeIndex2": portStr, "newBatteryImei": batterySn}) # else: # currentUseInfo.update({"chargeIndex1": portStr}) # Device.clear_port_control_cache(self._devNo, "currentUseInfo") # Device.update_dev_control_cache(self._devNo, {"currentUseInfo": currentUseInfo}) return def delay_clear_current_info(self): """ 延时清除客户的使用信息,一般发生在换电错误的时候去处理 :return: """ seconds = int(self.device.get("otherConf", dict()).get("delayTime", 1)) * 60 delay_async_operation(HuanDianGui.delay_func, seconds, self.device.devNo) @staticmethod def delay_func(devNo): """ 延时函数 :param devNo: :return: """ Device.clear_port_control_cache(devNo, "currentUseInfo") def notify_to_user(self, currentUseInfo): """ 根据用户的换电行为提醒用户 :param currentUseInfo: :return: """ openId = currentUseInfo.get("openId") if not openId: return groupId = self.device.get("groupId") group = Group.get_group(groupId) user = MyUser.objects.filter(openId=openId, groupId=groupId).first() if not group or not user: return chargeIndex1 = currentUseInfo.get("chargeIndex1") chargeIndex2 = currentUseInfo.get("chargeIndex2") batterySn1 = currentUseInfo.get("oldBatteryImei") batterySn2 = currentUseInfo.get("newBatteryImei") numChecker = lambda x: False if x == "-1" else True # 整个流程走完了 if numChecker(chargeIndex1) and numChecker(chargeIndex2) and numChecker(batterySn1) and numChecker(batterySn2): title = u"\\n\\n设备编号:\\t\\t{logicalCode}\\n\\n服务地址:\\t\\t{group}\\n\\n放入:\\t\\t{p1}端口-{oldB}电池\\n\\n取出:\\t\\t{p2}端口-{newB}电池".format( logicalCode=self.device.get("logicalCode", ""), group=group.get("groupName", ""), p1=chargeIndex1, oldB=batterySn1, p2=chargeIndex2, newB=batterySn2, ) # 放入了电池但是开启有电池门的时候故障了 elif numChecker(chargeIndex1) and not numChecker(chargeIndex2) and numChecker(batterySn1) and not numChecker( batterySn2): title = u"\\n\\n设备编号:\\t\\t{logicalCode}\\n\\n服务地址:\\t\\t{group}\\n\\n放入:\\t\\t{p1}端口-{oldB}电池".format( logicalCode=self.device.get("logicalCode", ""), group=group.get("groupName", ""), p1=chargeIndex1, oldB=batterySn1, ) # 空柜门没有放入电池 elif numChecker(chargeIndex1) and not numChecker(chargeIndex2) and not numChecker( batterySn1) and not numChecker(batterySn2): title = u"\\n\\n设备编号:\\t\\t{logicalCode}\\n\\n服务地址:\\t\\t{group}\\n\\n放入:\\t\\t{p1}端口-未放入电池".format( logicalCode=self.device.get("logicalCode", ""), group=group.get("groupName", ""), p1=chargeIndex1, ) # 非法的情况 else: logger.error("invalid notify to user, curInfo is <{}>".format(currentUseInfo)) return self.notify_user( managerialOpenId=user.managerialOpenId if user else "", templateName="service_complete", title=title, service=u"换电柜服务", remark=u'谢谢您的支持', finishTime=datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S") ) def update_next_port(self, nextPort): """ 更新下一个开启的端口 :param nextPort: :return: """ otherConf = self.device.get("otherConf", dict()) otherConf.update({"nextPort": nextPort}) Device.objects.filter(devNo=self._devNo).update(otherConf=otherConf) Device.invalid_device_cache(self._devNo) def warning_to_dealer(self, portStr, batterySn): """ 向经销商告警 有非法柜门打开事件 :return: """ group = Group.get_group(self.device.get("groupId", "")) if batterySn: fault = u"{} 号柜门 ,内有电池, 电池编号为:{}".format(portStr, batterySn) else: fault = u"{} 号柜门, 内无电池".format(portStr) self.notify_dealer( templateName = "device_fault", title = "您的设备柜门被手动打开,如非设备人员正常检修,建议您去现场看看", device = u"{groupNumber}组-{logicalCode}".format(groupNumber=self.device["groupNumber"], logicalCode=self.device["logicalCode"]), location = u"{address}-{groupName}".format(address = group["address"], groupName = group["groupName"]), fault = fault, notifyTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) def warning_to_dealer_diff_battery(self, batterySn, lastBatterySn, openId): """ 经销告警 新旧电池不一样 :param batterySn: :param lastBatterySn: :param openId: :return: """ user = MyUser.objects.filter(openId=openId, groupId=self.device.groupId).first() activeInfo = MyUser.get_active_info(openId, agentId=user.agentId) phoneNumber = activeInfo.get("phoneNumber") group = Group.get_group(self.device.get("groupId", "")) fault = "当前放入电池编号 {}, 上次取出电池 {}, 用户 {} 联系方式 {}".format(batterySn, lastBatterySn, user.nickname, phoneNumber) self.notify_dealer( templateName="device_fault", title="用户电池编号出现异常", device=u"{groupNumber}组-{logicalCode}".format(groupNumber=self.device["groupNumber"], logicalCode=self.device["logicalCode"]), location=u"{address}-{groupName}".format(address=group["address"], groupName=group["groupName"]), fault=fault, notifyTime=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) def warning_to_dealer_disable_battery(self, batterySn, openId): """ 通知经销商 被锁定的电池已经入柜子 请及时的解锁 :param batterySn: :param openId: :return: """ user = MyUser.objects.filter(openId=openId, groupId=self.device.groupId).first() activeInfo = MyUser.get_active_info(openId, agentId=user.agentId) phoneNumber = activeInfo.get("phoneNumber") group = Group.get_group(self.device.get("groupId", "")) fault = "当前放入电池编号 {}, 用户 {} 联系方式 {}".format(batterySn, user.nickname, phoneNumber) self.notify_dealer( templateName="device_fault", title="禁用电池已被锁定", device=u"{groupNumber}组-{logicalCode}".format(groupNumber=self.device["groupNumber"], logicalCode=self.device["logicalCode"]), location=u"{address}-{groupName}".format(address=group["address"], groupName=group["groupName"]), fault=fault, notifyTime=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) def update_next_port(self, nextPort): """ 更新下一个开启的端口 :param nextPort: :return: """ otherConf = self.device.get("otherConf", dict()) otherConf.update({"nextPort": nextPort}) Device.objects.filter(devNo=self.device.devNo).update(otherConf=otherConf) Device.invalid_device_cache(self.device.devNo)