# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging import time from decimal import Decimal import re from typing import TYPE_CHECKING from apilib.monetary import RMB from apps.web.constant import Const, DeviceCmdCode from apps.web.core.adapter.base import SmartBox from apps.web.core.exceptions import ServiceException from apps.web.core.networking import MessageSender from apps.web.device.models import Device logger = logging.getLogger(__name__) if TYPE_CHECKING: pass class CFJBox(SmartBox): def __init__(self, device): super(CFJBox, self).__init__(device) # 解析获取设备信息的返回报文 def analyze_event_data(self, data): if not data: return data = str(data) if data[:2] != "EE": return cmd = data[4:6] if cmd == "05": """EE0C05 000000000000 01 0000 00 08""" port = self.decode_str(data[18:20]) leftTime = self.decode_str(data[20:24]) reasonCode = data[24:26] if reasonCode == '00': reason = u'购买的充电时间或者电量已经用完' elif reasonCode == '01': reason = u'系统判断为异常断电(插头被拔或者松动,或者电瓶已经充满),电瓶车充电器种类繁多,可能存在误差' elif reasonCode == '02': reason = u'电池已经充满' elif reasonCode == '03': reason = u'设备或者端口故障。建议您根据已经充电的时间评估是否需要到现场换到其他端口充电' elif reasonCode == '04': reason = u'警告!您的电池功率超过本机最大限制。为了公共安全,不建议您在该充电桩充电' elif reasonCode == '05': reason = u'刷卡退费结束' elif reasonCode == '06': reason = u'可能是插头被拔掉或者未连接充电器。如果不是自己操作,建议您到现场检查是否有人误操作' elif reasonCode == '07': reason = u'远程方式停止充电。如果不是自己操作,建议到现场尽快恢复充电' elif reasonCode == '08': reason = u'远烟雾报警停止' else: desc = u'' result = locals() result.pop("self") result.pop("reasonCode") return result def _check_package(self, package): """ 获取设备启动的发送数据 根据设备的当前模式以及套餐获取 :param package: :return: """ consumeModule = self.device.get("otherConf", dict()).get("consumeModule", 1) unit = package.get("unit", u"分钟") _time = float(package.get("time", 0)) # 按时间计费 if consumeModule == 1: billingType = "time" if unit == u"小时": _time = _time * 60 elif unit == u"天": _time = _time * 24 * 60 elif unit == u"秒": _time = _time / 60 elif unit == u"分钟": _time = _time else: raise ServiceException({"result": 2, "description": u"套餐单位错误,请联系经销商"}) # 按电量计费 else: billingType = "elec" if unit != u"度": raise ServiceException({"result": 2, "description": u"套餐单位错误,请联系经销商"}) else: _time = _time return _time, unit, billingType def disable_app_device(self, switch=True): # type:(bool) -> None otherConf = self.device.get("otherConf", {}) otherConf["disableDevice"] = switch Device.objects.filter(devNo=self.device["devNo"]).update(otherConf=otherConf) Device.invalid_device_cache(self.device["devNo"]) @staticmethod def encode_str(data, length=2, ratio=1.0, base=16): # type:(str,int,float,int) -> str if not isinstance(data, Decimal): data = Decimal(data) if not isinstance(length, str): length = str(length) if not isinstance(ratio, Decimal): ratio = Decimal(ratio) end = "X" if base == 16 else "d" encodeStr = "%." + length + end encodeStr = encodeStr % (data * ratio) return encodeStr @staticmethod def decode_str(data, ratio=1.0, base=16): # type:(str,float,int) -> str """ ratio:比率单位转换 """ if not isinstance(data, str): data = str(data) return "%.10g" % (int(data, base) * ratio) @staticmethod def reverse_hex(data): # type:(str) -> str if not isinstance(data, str): raise TypeError return "".join(list(reversed(re.findall(r".{2}", data)))) def decode_long_hex_to_list(self, data, split=2, ratio=1.0, base=16): # type:(str,int,float,int) -> list """ return: list """ if len(data) % split != 0: raise Exception("Invalid data") pattern = r".{%s}" % split hex_list = re.findall(pattern, data) hex_list = map(lambda x: self.decode_str(x, ratio=ratio, base=base), hex_list) return hex_list @staticmethod def check_params_range(params, minData=None, maxData=None, desc=""): # type:(str,float,float,str) -> str """ 检查参数,返回字符串参数 """ if params is None: raise ServiceException({"result": 2, "description": u"参数错误."}) if not isinstance(params, Decimal): params = Decimal(params) if not minData and maxData: if not isinstance(maxData, Decimal): maxData = Decimal(maxData) if params <= maxData: return "%g" % params else: raise ServiceException({"result": 2, "description": u"%s超出可选范围,可选最大值为%g" % (desc, maxData)}) if not maxData and minData: if not isinstance(minData, Decimal): minData = Decimal(minData) if minData <= params: return "%g" % params else: raise ServiceException({"result": 2, "description": u"%s超出可选范围,可选最小值为%g" % (desc, minData)}) if not minData and not maxData: return "%g" % params else: if not isinstance(minData, Decimal): minData = Decimal(minData) if not isinstance(maxData, Decimal): maxData = Decimal(maxData) if minData <= params <= maxData: return "%g" % params else: raise ServiceException( {"result": 2, "description": u"%s参数超出可选范围,可取范围为%g-%g" % (desc, minData, maxData)}) def send_mqtt(self, funCode, data, cmd=DeviceCmdCode.OPERATE_DEV_SYNC): """ 发送mqtt 指令210 返回data """ if not isinstance(funCode, str): funCode = str(funCode) if not isinstance(data, str): data = str(data) result = MessageSender.send(self.device, cmd, {"IMEI": self.device["devNo"], "funCode": funCode, "data": data}) if "rst" in result and result["rst"] != 0: if result["rst"] == -1: raise ServiceException( {"result": 2, "description": u"该设备正在玩命找网络,请您稍候再试", "rst": -1}) elif result["rst"] == 1: raise ServiceException( {"result": 2, "description": u"该设备忙,无响应,请您稍候再试。也可能是您的设备版本过低,暂时不支持此功能", "rst": 1}) else: if cmd in [DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, DeviceCmdCode.PASSTHROUGH_OPERATE_DEV_NO_RESPONSE]: return if result.get("data") == "00": raise ServiceException({"result": 2, "description": u"设备操作失败.请重试"}) else: return str(result["data"][18:-2]) @staticmethod def port_is_busy(port_dict): if not port_dict: return False if "billingType" not in port_dict: return False if "status" not in port_dict: return False if "coins" not in port_dict: return False if port_dict["billingType"] not in ["time", "elec"]: return False if port_dict["billingType"] == "time": if "needTime" not in port_dict: return False else: if "needElec" not in port_dict: return False if port_dict["status"] == Const.DEV_WORK_STATUS_WORKING: return True else: return False def do_update_configs(self, updateDict): dev = Device.objects.get(devNo=self.device.devNo) deviceConfigs = dev.otherConf.get("deviceConfigs", {}) deviceConfigs.update(updateDict) dev.otherConf['deviceConfigs'] = deviceConfigs dev.save() Device.invalid_device_cache(self.device.devNo) def start_device(self, package, openId, attachParas): # type: (...)-> dict if attachParas is None: raise ServiceException({"result": 2, "description": u"请您选择合适的充电线路、电池类型信息"}) if "chargeIndex" not in attachParas: raise ServiceException({"result": 2, "description": u"请您选择合适的充电线路"}) dev = Device.objects.get(devNo=self.device["devNo"]) refundProtection = dev.otherConf.get("refundProtection", 0) refundProtectionTime = dev.otherConf.get("refundProtectionTime", 0) port = attachParas["chargeIndex"] portHex = self.encode_str(port, 2) _time, unit, billingType = self._check_package(package) coins = float(package.get("coins", 0)) coinsHex = self.encode_str(coins, length=4, ratio=10, base=10) unitHex = self.encode_str(_time, length=4) unit = package["unit"] if unit == "度": unitHex = self.encode_str(_time, length=4, ratio=100) orderNo = attachParas.get("orderNo") data = portHex + coinsHex + unitHex devInfo = MessageSender.send( device=self.device, cmd=DeviceCmdCode.OPERATE_DEV_SYNC, payload={ "IMEI": self.device["devNo"], "funCode": "02", "data": data }, timeout=120 ) if "rst" in devInfo and devInfo["rst"] != 0: if devInfo["rst"] == -1: raise ServiceException( {"result": 2, "description": u"充电桩正在玩命找网络,您的金币还在,重试不需要重新付款,建议您试试旁边其他设备,或者稍后再试哦"}) elif devInfo["rst"] == 1: self.check_serial_port_for_startcmd(attachParas["chargeIndex"]) # 收到回应报文后,需要ack响应报文。 data = devInfo["data"][18::] usePort = int(attachParas["chargeIndex"]) result = data[2:4] if result == "01": # 成功 pass elif result == "02": newValue = {str(usePort): {"status": Const.DEV_WORK_STATUS_FAULT, "statusInfo": u"充电站故障"}} Device.update_dev_control_cache(self.device["devNo"], newValue) raise ServiceException({"result": 2, "description": u"充电站故障"}) elif result == "03": newValue = {str(usePort): {"status": Const.DEV_WORK_STATUS_WORKING, "statusInfo": u""}} Device.update_dev_control_cache(self.device["devNo"], newValue) raise ServiceException({"result": 2, "description": u"该端口正在使用中"}) portDict = { "startTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "status": Const.DEV_WORK_STATUS_WORKING, "billingType": billingType, "isStart": True, "openId": openId, "refunded": False, "vCardId": self._vcard_id, "refundProtection": refundProtection, "refundProtectionTime": refundProtectionTime, "doPowerLevel": False, "consumeType": "mobile" } ctrInfo = Device.get_dev_control_cache(self.device["devNo"]) lastPortInfo = ctrInfo.get(str(usePort), {}) if billingType == "time": if self.port_is_busy(lastPortInfo) and lastPortInfo["billingType"] == "time": portDict["coins"] = float(coins) + lastPortInfo["coins"] portDict["needTime"] = _time + lastPortInfo["needTime"] else: portDict["coins"] = float(coins) portDict.update({"needTime": _time}) finishedTime = int(time.time()) + int(portDict["needTime"] * 60) else: if self.port_is_busy(lastPortInfo) and lastPortInfo["billingType"] == "elec": portDict["coins"] = float(coins) + lastPortInfo["coins"] portDict["needElec"] = _time + lastPortInfo["needElec"] else: portDict["coins"] = float(coins) portDict.update({"needElec": _time}) finishedTime = int(time.time()) + 60 * 60 * 12 portDict["coins"] = str(RMB(portDict["coins"])) portDict.update({"finishedTime": finishedTime}) if "orderNo" in attachParas: portDict.update({"orderNo": attachParas["orderNo"]}) Device.update_dev_control_cache( self.device["devNo"], { str(usePort): portDict }) devInfo["finishedTime"] = finishedTime return devInfo def get_default_port_nums(self): default_num = 1 return default_num def get_port_status(self, force=False): if force: return self.get_port_status_from_dev() ctrInfo = Device.get_dev_control_cache(self.device.devNo) if "allPorts" in ctrInfo and ctrInfo["allPorts"] > 0: allPorts = ctrInfo["allPorts"] else: allPorts = self.get_default_port_nums() statusDict = {} for ii in range(allPorts): tempDict = ctrInfo.get(str(ii + 1), {}) if "status" in tempDict: statusDict[str(ii + 1)] = {"status": tempDict.get("status")} elif "isStart" in tempDict: if tempDict["isStart"]: statusDict[str(ii + 1)] = {"status": Const.DEV_WORK_STATUS_WORKING} else: statusDict[str(ii + 1)] = {"status": Const.DEV_WORK_STATUS_IDLE} else: statusDict[str(ii + 1)] = {"status": Const.DEV_WORK_STATUS_IDLE} return statusDict def get_port_info(self, port): # type:(str) -> dict """ 获取单个端口状态 funcCode : 发送: 返回: """ pass def get_port_status_from_dev(self): """ funCode : 01 """ data = self.send_mqtt(funCode="01",data="00") portNum = int(data[:2], base=16) portData = data[2:] result = {} ii = 0 while ii < portNum: statusTemp = portData[ii * 2:ii * 2 + 2] if statusTemp == '01': status = {'status': Const.DEV_WORK_STATUS_IDLE} elif statusTemp == '02': status = {'status': Const.DEV_WORK_STATUS_WORKING} elif statusTemp == '03': status = {'status': Const.DEV_WORK_STATUS_FORBIDDEN} elif statusTemp == '04': status = {'status': Const.DEV_WORK_STATUS_FAULT} # 不再上述状态之列的 统一为故障状态 else: status = {'status': Const.DEV_WORK_STATUS_FAULT} ii += 1 result[str(ii)] = status allPorts, usedPorts, usePorts = self.get_port_static_info(result) ctrInfo = Device.get_dev_control_cache(self._device['devNo']) for strPort, info in result.items(): if ctrInfo.has_key(strPort): ctrInfo[strPort].update({'status': info['status']}) else: ctrInfo[strPort] = info ctrInfo.update({ 'allPorts': allPorts, 'usedPorts': usedPorts, 'usePorts': usePorts }) Device.update_dev_control_cache(self.device.devNo, ctrInfo) return result @property def isHaveStopEvent(self): return True def test(self, port="1", time="1"): # type:(...) -> dict """ 启动某个端口 funcCode : 02 """ portHex = self.encode_str(port, 2) timeHex = self.encode_str(time,length=4) data = portHex + "0000" + timeHex result = self.send_mqtt(funCode="02", data=data) if result: port = result[:2] status = "Charging success" if result[2:4] == "01" else "Charging failed" return {"port": port, "status": status}