# -*- coding: utf-8 -*- # !/usr/bin/env python import json from typing import TYPE_CHECKING from apps.web.constant import Const, DeviceCmdCode, MQTT_TIMEOUT from apps.web.core.adapter.base import SmartBox from apps.web.core.device_define.cxjz import SetResponse, CXJZDeviceSettingsValidator, ChargeMode, DEFAULT_REFUND_PROTECTION_TIME, DEFAULT_SERVICE_FEE, DEFAULT_REFUND_CAL_BASE from apps.web.core.exceptions import ServiceException from apps.web.core.networking import MessageSender from apps.web.device.models import Device if TYPE_CHECKING: from apps.web.user.models import ConsumeRecord class ChargingCXJZBox(SmartBox): @staticmethod def _convert_consume_type(_type): if _type == "AF": return "fault" elif _type == "01": return "card" elif _type == "02": return "coin" elif _type == "03": return "test" else: return "mobile" @staticmethod def _parse_port_status(data): """ 0A 00 0000 0000 0000 """ port, consumeType, leftTime, leftElec, power = int(data[: 2], 16), data[2: 4], int(data[4: 8], 16), int(data[8: 12], 16), int(data[12: 16], 16) if not leftElec and not leftTime: return {"port": port, "status": Const.DEV_WORK_STATUS_IDLE } else: return {"port": port,"status": Const.DEV_WORK_STATUS_WORKING, "consumeType": ChargingCXJZBox._convert_consume_type(consumeType), "leftTime": leftTime, "leftElec": leftElec, "power": power} def _send_data(self, funCode, data, cmd=DeviceCmdCode.OPERATE_DEV_SYNC, timeout=MQTT_TIMEOUT.NORMAL): result = MessageSender.send( device = self.device, cmd = cmd, payload = { "IMEI": self._device["devNo"], "funCode": funCode, "data": data }, timeout = timeout ) if "rst" in result and result.get("rst") != 0: if result.get("rst") == -1: raise ServiceException({"result": 2, "description": u"设备网络故障,请重新"}) if result.get("rst") == 1: raise ServiceException({"result": 2, "description": u"充电桩无响应,请稍后再试试"}) return result def _get_A0_data(self, data=None): """ 查询整机当前的工作状态 """ if not data: data = self._send_data("A0", "01FFFF")["data"] result = list() for _index in range(0, len(data[10: -6]), 16): content = data[10: -6][_index: _index+16] result.append(self._parse_port_status(content)) return result def _get_A1_data(self, port, data=None): """ 查询单个端口的运行状态 """ if not data: data = self._send_data("A1", "{:02X}FFFF".format(port))["data"] return self._parse_port_status(data[10: -6]) def _get_A2_data(self, data=None): """ 查询计币信息 """ if not data: data = self._send_data("A2", "01FFFF")["data"] return { "cardCode": data[10: -6][4: 8], "cardCount": int(data[10: -6][8: 12], 16), "coinCount": int(data[10: -6][12: 16], 16), } def _get_A3_data(self, data=None): """ 查询工作模式 """ if not data: data = self._send_data("A3", "01FFFF")["data"] return { "powerSwitch": bool(int(data[10: -6][0: 2], 16)), "chargeMode": int(data[10: -6][2: 4], 16), "maxPower": int(data[10: -6][4: 8], 16), "stopSwitch": bool(int(data[10: -6][8: 10], 16)), "coinUpperLimit": int(data[10: -6][10: 14], 16), "coinSwitch": bool(int(data[10: -6][14: 16], 16)) } def _get_A4_data(self, data=None): """ 查询时间模式 """ if not data: data = self._send_data("A4", "01FFFF")["data"] return { "fullStopTime": int(data[10: -4][0: 4], 16), "cardCoinElec": int(data[10: -4][4: 8], 16) / 10.0, "time1": int(data[10: -4][8: 12], 16), "coinTime": int(data[10: -4][12: 16], 16), "coinMax": int(data[10: -4][16: 18], 16) } def _get_A8_data(self, data=None): if not data: data = self._send_data("A8", "01FFFF")["data"] return { "power1": int(data[4: 8], 16), "time2": int(data[8: 12], 16), "power2": int(data[12: 16], 16), "time3": int(data[16: 20], 16), "power3": int(data[20: 24], 16), "time4": int(data[24: 28], 16) } def _get_A9_data(self, data=None): """ 查询空载设置 """ if not data: data = self._send_data("A9", "01FFFF")["data"] return { "reportTime": int(data[10: -6][0: 2], 16), "reCheckTime": int(data[10: -6][2: 4], 16), "noLoadTime": int(data[10: -6][4: 8], 16), "fullPower": int(data[10: -6][8: 12], 16), "noLoadPower": int(data[10: -6][12: 16], 16) } def _get_AE_data(self, data=None): """ 获取温度烟感设置 """ if not data: data = self._send_data("AE", "01FFFF")["data"] return { "temperature": int(data[10: -6][4: 8], 16), "smoke": int(data[10: -6][8: 12], 16), "warning": bool(int(data[10: -6][12: 14], 16)), "smokeSwitch": bool(int(data[10: -6][14: 16], 16)) } def _get_server_config(self): """ 读取服务器的参数配置 """ otherConf = self.device.get("otherConf") return { "refundProtectionTime": otherConf.get("refundProtectionTime", DEFAULT_REFUND_PROTECTION_TIME), "serviceFee": otherConf.get("serviceFee", DEFAULT_SERVICE_FEE), "refundCalBase": otherConf.get("refundCalBase", DEFAULT_REFUND_CAL_BASE) } def _set_AF(self): self._send_data("AF", "01FFFF", cmd=220) return True def _set_B1(self, devCode=None): """ 设置设备编号 不建议使用 桩编号默认000000 """ if not devCode: devCode = self.device.logicalCode.replace("G", "") self._send_data("B1", devCode, cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE) return True def _set_B0(self): """ 清除计币信息 """ data = self._send_data("B0", "01FFFF")["data"] return data[2: 4] == SetResponse.SUCCESS def _set_B2(self, cardCode): """ 设置设备区号 即刷卡编号 """ data = self._send_data("B2", "01{}".format(cardCode))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_B3(self, chargeMode): data = self._send_data("B3", "01{:04X}".format(chargeMode))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_B4(self, maxPower): data = self._send_data("B4", "01{:04X}".format(maxPower))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_B5(self, stopSwitch): data = self._send_data("B5", "0100{:02X}".format(stopSwitch))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_B6(self, coinSwitch): data = self._send_data("B6", "0100{:02X}".format(coinSwitch))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_B7(self, coinUpperLimit): data = self._send_data("B7", "01{:04X}".format(coinUpperLimit))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_B8(self, stopTime): data = self._send_data("B8", "01{:04X}".format(stopTime))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_B9(self, fullPower): data = self._send_data("B9", "01{:04X}".format(fullPower))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_BA(self, noLoadPower): data = self._send_data("BA", "01{:04X}".format(noLoadPower))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C1(self, cardTime): data = self._send_data("C1", "01{:04X}".format(cardTime))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C2(self, cardCoinElec): data = self._send_data("C2", "01{:04X}".format(cardCoinElec))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C3(self, coinTime): data = self._send_data("C3", "01{:04X}".format(coinTime))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C4(self, coinMax): data = self._send_data("C4", "01{:04X}".format(coinMax))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C5(self, reportTime): data = self._send_data("C5", "01{:04X}".format(reportTime))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C6(self, smokeSwitch): data = self._send_data("C6", "0100{:02X}".format(smokeSwitch))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C7(self, reCheckTime): data = self._send_data("C7", "01{:04X}".format(reCheckTime))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C9(self, powerSwitch): data = self._send_data("C9", "01{:04X}".format(powerSwitch))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C8_A1(self, time1): data = self._send_data("C8", "01{:04X}A1".format(time1))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C8_A2(self, time2): data = self._send_data("C8", "01{:04X}A2".format(time2))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C8_A3(self, time3): data = self._send_data("C8", "01{:04X}A3".format(time3))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C8_A4(self, time4): data = self._send_data("C8", "01{:04X}A4".format(time4))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C8_C1(self, power1): data = self._send_data("C8", "01{:04X}C1".format(power1))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C8_C2(self, power2): data = self._send_data("C8", "01{:04X}C2".format(power2))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_C8_C3(self, power3): data = self._send_data("C8", "01{:04X}C3".format(power3))["data"] return data[2: 4] == SetResponse.SUCCESS def _set_D0(self): data = self._send_data("D0", "01FFFF")["data"] return data[2: 4] == SetResponse.SUCCESS def _set_D1(self): data = self._send_data("D1", "01FFFF")["data"] return data[2: 4] == SetResponse.SUCCESS def _time_start(self, port, needTime): return self._send_data("CA", "{:02X}{:04X}0000".format(port, needTime), cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE) def _elec_start(self, port, needElec): return self._send_data("CA", "{:02X}0000{:04X}".format(port, needElec), cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE) def _power_start(self, port, coins): return self._send_data("CC", "{:02X}{:04X}0000".format(port, coins), cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE) def _stop(self, port): return self._send_data("FD", "{:02X}".format(port)) def _get_all_settings(self): """ 自定义指令 获取所有的参数设置 """ data = self._send_data("FA", "", timeout=5)["data"] content = data[10: -6] result = dict() result.update(self._get_A2_data(content[: 32])) result.update(self._get_A3_data(content[32: 64])) result.update(self._get_A4_data(content[64: 96])) result.update(self._get_A8_data(content[96: 128])) result.update(self._get_A9_data(content[128: 160])) result.update(self._get_AE_data(content[160: 192])) return result def _set_all_settings(self, **kwargs): """ 自定义指令 设置所有的参数设置 """ result = self._send_data("FB", "{:4}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}".format( kwargs["cardCode"], kwargs["chargeMode"], kwargs["maxPower"], kwargs["stopSwitch"], kwargs["coinSwitch"], kwargs["coinUpperLimit"], kwargs["noLoadTime"], kwargs["fullPower"], kwargs["noLoadPower"], kwargs["fullStopTime"], kwargs["time1"], kwargs["cardCoinElec"], kwargs["coinTime"], kwargs["coinMax"], kwargs["reportTime"], kwargs["smokeSwitch"], kwargs["reCheckTime"], kwargs["time1"], kwargs["time2"], kwargs["time3"], kwargs["time4"], kwargs["power1"], kwargs["power2"], kwargs["power3"], kwargs["powerSwitch"], )) return result @staticmethod def _parse_finished(data): return { "reason": data[2: 4], "port": data[10: 12], "consumeType": ChargingCXJZBox._convert_consume_type(data[12: 14]), "leftTime": data[14: 18], "leftElec": data[18: 22], "power": data[22: 26] } def _parse_fault(self, data): return self._get_AE_data(data) def _check_package(self, package): otherConf = self.device.get("otherConf") chargeMode = otherConf.get("chargeMode") if not chargeMode: chargeMode = self._get_A3_data()["chargeMode"] chargeElec, chargeCoins = 0, 0 # 首先是时间模式下 if chargeMode == ChargeMode.TIME: if package["unit"] != u"次": raise ServiceException({"result": 2, "description": u"启动失败,套餐单位错误(1001)"}) chargeCoins = int(package["coins"]) elif chargeMode == ChargeMode.ELEC: if package["unit"] != u"度": raise ServiceException({"result": 2, "description": u"启动失败,套餐单位错误(1002)"}) chargeElec = int(float(package["time"]) * 10) else: raise ServiceException({"result": 2, "description": u"启动失败,工作模式错误(2001)"}) return chargeMode, chargeElec, chargeCoins def analyze_event_data(self, data): funCode = data[2: 4] if funCode == "AE": result = self._parse_finished(data) else: result = self._parse_fault(data) result["cmdCode"] = funCode return result def start_device_realiable(self, order): # type:(ConsumeRecord)->dict chargeMode, chargeElec, chargeCoins = self._check_package(order.package) port = int(order.attachParas["chargeIndex"]) data = { "funCode": "FC", "order_id": order.orderNo, "port": port, "open_id": order.openId, "charge_mode": chargeMode, "coins": chargeCoins, "time": 0, # 该参数已经废弃 不再需要纯时间模式的启动 "elec": chargeElec } result = MessageSender.send( device = self.device, cmd = DeviceCmdCode.OPERATE_DEV_SYNC, payload = data, timeout = 120 ) return result def get_dev_setting(self): data = self._get_all_settings() data["coinTime"] = data["time1"] - data["coinTime"] serverData = self._get_server_config() data.update(serverData) return data def set_device_function(self, request, lastSetConf): if "cleanCoins" in lastSetConf: self._set_B0() if "testDevice" in lastSetConf: self._set_AF() if "lockDevice" in lastSetConf: self._set_D0() if "unLockDevice" in lastSetConf: self._set_D1() def set_device_function_param(self, request, lastSetConf): validator = CXJZDeviceSettingsValidator(request.POST) if not validator.is_valid(): raise ServiceException({"result": 2, "description": json.dumps(validator.str_errors).decode("unicode-escape")}) data = validator.suit_data() # 先保存一份数据库 otherConf = self.device.get("otherConf") otherConf.update(data) Device.objects.filter(devNo=self.device.devNo).update(otherConf=otherConf) Device.invalid_device_cache(self.device.devNo) self._set_all_settings(**data) def get_port_status_from_dev(self): result = self._get_A0_data() portDict = dict() # 做一次数据类型转换 for _item in result: _port = str(_item.pop("port")) portDict[_port] = _item allPorts, usedPorts, usePorts = self.get_port_static_info(portDict) Device.update_dev_control_cache(self.device.devNo, {'allPorts': allPorts, 'usedPorts': usedPorts, 'usePorts': usePorts}) devCache = Device.get_dev_control_cache(self.device.devNo) for _portStr, _info in portDict.items(): if _portStr in devCache: devCache[_portStr].update(_info) else: devCache[_portStr] = _info Device.update_dev_control_cache(self.device.devNo, devCache) return portDict def get_port_info(self, port): return self._get_A1_data(int(port)) def get_port_status(self, force = False): if force: self.get_port_status_from_dev() devCache = Device.get_dev_control_cache(self.device.devNo) portStatus = dict() for _k, _v in devCache.items(): if isinstance(_k, (str, unicode)) and _k.isdigit(): portStatus[_k] = {"status": _v.get("status", Const.DEV_WORK_STATUS_IDLE)} return portStatus def active_deactive_port(self, port, active): if not active: self._stop(port)