|
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import json
- from typing import TYPE_CHECKING
- from apps.web.constant import Const, DeviceCmdCode, MQTT_TIMEOUT
- from apps.web.core.adapter.base import SmartBox
- from apps.web.core.device_define.cxjz import SetResponse, CXJZDeviceSettingsValidator, ChargeMode, DEFAULT_REFUND_PROTECTION_TIME, DEFAULT_SERVICE_FEE, DEFAULT_REFUND_CAL_BASE
- from apps.web.core.exceptions import ServiceException
- from apps.web.core.networking import MessageSender
- from apps.web.device.models import Device
- if TYPE_CHECKING:
- from apps.web.user.models import ConsumeRecord
- class ChargingCXJZBox(SmartBox):
- @staticmethod
- def _convert_consume_type(_type):
- if _type == "AF":
- return "fault"
- elif _type == "01":
- return "card"
- elif _type == "02":
- return "coin"
- elif _type == "03":
- return "test"
- else:
- return "mobile"
- @staticmethod
- def _parse_port_status(data):
- """ 0A 00 0000 0000 0000 """
- port, consumeType, leftTime, leftElec, power = int(data[: 2], 16), data[2: 4], int(data[4: 8], 16), int(data[8: 12], 16), int(data[12: 16], 16)
- if not leftElec and not leftTime:
- return {"port": port, "status": Const.DEV_WORK_STATUS_IDLE }
- else:
- return {"port": port,"status": Const.DEV_WORK_STATUS_WORKING,
- "consumeType": ChargingCXJZBox._convert_consume_type(consumeType), "leftTime": leftTime, "leftElec": leftElec, "power": power}
- def _send_data(self, funCode, data, cmd=DeviceCmdCode.OPERATE_DEV_SYNC, timeout=MQTT_TIMEOUT.NORMAL):
- result = MessageSender.send(
- device = self.device,
- cmd = cmd,
- payload = {
- "IMEI": self._device["devNo"],
- "funCode": funCode,
- "data": data
- },
- timeout = timeout
- )
- if "rst" in result and result.get("rst") != 0:
- if result.get("rst") == -1:
- raise ServiceException({"result": 2, "description": u"设备网络故障,请重新"})
- if result.get("rst") == 1:
- raise ServiceException({"result": 2, "description": u"充电桩无响应,请稍后再试试"})
- return result
- def _get_A0_data(self, data=None):
- """ 查询整机当前的工作状态 """
- if not data:
- data = self._send_data("A0", "01FFFF")["data"]
- result = list()
- for _index in range(0, len(data[10: -6]), 16):
- content = data[10: -6][_index: _index+16]
- result.append(self._parse_port_status(content))
- return result
- def _get_A1_data(self, port, data=None):
- """ 查询单个端口的运行状态 """
- if not data:
- data = self._send_data("A1", "{:02X}FFFF".format(port))["data"]
- return self._parse_port_status(data[10: -6])
- def _get_A2_data(self, data=None):
- """ 查询计币信息 """
- if not data:
- data = self._send_data("A2", "01FFFF")["data"]
- return {
- "cardCode": data[10: -6][4: 8],
- "cardCount": int(data[10: -6][8: 12], 16),
- "coinCount": int(data[10: -6][12: 16], 16),
- }
- def _get_A3_data(self, data=None):
- """ 查询工作模式 """
- if not data:
- data = self._send_data("A3", "01FFFF")["data"]
- return {
- "powerSwitch": bool(int(data[10: -6][0: 2], 16)),
- "chargeMode": int(data[10: -6][2: 4], 16),
- "maxPower": int(data[10: -6][4: 8], 16),
- "stopSwitch": bool(int(data[10: -6][8: 10], 16)),
- "coinUpperLimit": int(data[10: -6][10: 14], 16),
- "coinSwitch": bool(int(data[10: -6][14: 16], 16))
- }
- def _get_A4_data(self, data=None):
- """ 查询时间模式 """
- if not data:
- data = self._send_data("A4", "01FFFF")["data"]
- return {
- "fullStopTime": int(data[10: -4][0: 4], 16),
- "cardCoinElec": int(data[10: -4][4: 8], 16) / 10.0,
- "time1": int(data[10: -4][8: 12], 16),
- "coinTime": int(data[10: -4][12: 16], 16),
- "coinMax": int(data[10: -4][16: 18], 16)
- }
- def _get_A8_data(self, data=None):
- if not data:
- data = self._send_data("A8", "01FFFF")["data"]
- return {
- "power1": int(data[4: 8], 16),
- "time2": int(data[8: 12], 16),
- "power2": int(data[12: 16], 16),
- "time3": int(data[16: 20], 16),
- "power3": int(data[20: 24], 16),
- "time4": int(data[24: 28], 16)
- }
- def _get_A9_data(self, data=None):
- """ 查询空载设置 """
- if not data:
- data = self._send_data("A9", "01FFFF")["data"]
- return {
- "reportTime": int(data[10: -6][0: 2], 16),
- "reCheckTime": int(data[10: -6][2: 4], 16),
- "noLoadTime": int(data[10: -6][4: 8], 16),
- "fullPower": int(data[10: -6][8: 12], 16),
- "noLoadPower": int(data[10: -6][12: 16], 16)
- }
- def _get_AE_data(self, data=None):
- """ 获取温度烟感设置 """
- if not data:
- data = self._send_data("AE", "01FFFF")["data"]
- return {
- "temperature": int(data[10: -6][4: 8], 16),
- "smoke": int(data[10: -6][8: 12], 16),
- "warning": bool(int(data[10: -6][12: 14], 16)),
- "smokeSwitch": bool(int(data[10: -6][14: 16], 16))
- }
- def _get_server_config(self):
- """
- 读取服务器的参数配置
- """
- otherConf = self.device.get("otherConf")
- return {
- "refundProtectionTime": otherConf.get("refundProtectionTime", DEFAULT_REFUND_PROTECTION_TIME),
- "serviceFee": otherConf.get("serviceFee", DEFAULT_SERVICE_FEE),
- "refundCalBase": otherConf.get("refundCalBase", DEFAULT_REFUND_CAL_BASE)
- }
- def _set_AF(self):
- self._send_data("AF", "01FFFF", cmd=220)
- return True
- def _set_B1(self, devCode=None):
- """ 设置设备编号 不建议使用 桩编号默认000000 """
- if not devCode:
- devCode = self.device.logicalCode.replace("G", "")
- self._send_data("B1", devCode, cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE)
- return True
- def _set_B0(self):
- """ 清除计币信息 """
- data = self._send_data("B0", "01FFFF")["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_B2(self, cardCode):
- """ 设置设备区号 即刷卡编号 """
- data = self._send_data("B2", "01{}".format(cardCode))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_B3(self, chargeMode):
- data = self._send_data("B3", "01{:04X}".format(chargeMode))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_B4(self, maxPower):
- data = self._send_data("B4", "01{:04X}".format(maxPower))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_B5(self, stopSwitch):
- data = self._send_data("B5", "0100{:02X}".format(stopSwitch))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_B6(self, coinSwitch):
- data = self._send_data("B6", "0100{:02X}".format(coinSwitch))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_B7(self, coinUpperLimit):
- data = self._send_data("B7", "01{:04X}".format(coinUpperLimit))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_B8(self, stopTime):
- data = self._send_data("B8", "01{:04X}".format(stopTime))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_B9(self, fullPower):
- data = self._send_data("B9", "01{:04X}".format(fullPower))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_BA(self, noLoadPower):
- data = self._send_data("BA", "01{:04X}".format(noLoadPower))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C1(self, cardTime):
- data = self._send_data("C1", "01{:04X}".format(cardTime))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C2(self, cardCoinElec):
- data = self._send_data("C2", "01{:04X}".format(cardCoinElec))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C3(self, coinTime):
- data = self._send_data("C3", "01{:04X}".format(coinTime))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C4(self, coinMax):
- data = self._send_data("C4", "01{:04X}".format(coinMax))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C5(self, reportTime):
- data = self._send_data("C5", "01{:04X}".format(reportTime))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C6(self, smokeSwitch):
- data = self._send_data("C6", "0100{:02X}".format(smokeSwitch))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C7(self, reCheckTime):
- data = self._send_data("C7", "01{:04X}".format(reCheckTime))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C9(self, powerSwitch):
- data = self._send_data("C9", "01{:04X}".format(powerSwitch))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C8_A1(self, time1):
- data = self._send_data("C8", "01{:04X}A1".format(time1))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C8_A2(self, time2):
- data = self._send_data("C8", "01{:04X}A2".format(time2))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C8_A3(self, time3):
- data = self._send_data("C8", "01{:04X}A3".format(time3))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C8_A4(self, time4):
- data = self._send_data("C8", "01{:04X}A4".format(time4))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C8_C1(self, power1):
- data = self._send_data("C8", "01{:04X}C1".format(power1))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C8_C2(self, power2):
- data = self._send_data("C8", "01{:04X}C2".format(power2))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_C8_C3(self, power3):
- data = self._send_data("C8", "01{:04X}C3".format(power3))["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_D0(self):
- data = self._send_data("D0", "01FFFF")["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _set_D1(self):
- data = self._send_data("D1", "01FFFF")["data"]
- return data[2: 4] == SetResponse.SUCCESS
- def _time_start(self, port, needTime):
- return self._send_data("CA", "{:02X}{:04X}0000".format(port, needTime), cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE)
- def _elec_start(self, port, needElec):
- return self._send_data("CA", "{:02X}0000{:04X}".format(port, needElec), cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE)
- def _power_start(self, port, coins):
- return self._send_data("CC", "{:02X}{:04X}0000".format(port, coins), cmd=DeviceCmdCode.OPERATE_DEV_NO_RESPONSE)
- def _stop(self, port):
- return self._send_data("FD", "{:02X}".format(port))
- def _get_all_settings(self):
- """ 自定义指令 获取所有的参数设置 """
- data = self._send_data("FA", "", timeout=5)["data"]
- content = data[10: -6]
- result = dict()
- result.update(self._get_A2_data(content[: 32]))
- result.update(self._get_A3_data(content[32: 64]))
- result.update(self._get_A4_data(content[64: 96]))
- result.update(self._get_A8_data(content[96: 128]))
- result.update(self._get_A9_data(content[128: 160]))
- result.update(self._get_AE_data(content[160: 192]))
- return result
- def _set_all_settings(self, **kwargs):
- """ 自定义指令 设置所有的参数设置 """
- result = self._send_data("FB", "{:4}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}{:04X}".format(
- kwargs["cardCode"],
- kwargs["chargeMode"],
- kwargs["maxPower"],
- kwargs["stopSwitch"],
- kwargs["coinSwitch"],
- kwargs["coinUpperLimit"],
- kwargs["noLoadTime"],
- kwargs["fullPower"],
- kwargs["noLoadPower"],
- kwargs["fullStopTime"],
- kwargs["time1"],
- kwargs["cardCoinElec"],
- kwargs["coinTime"],
- kwargs["coinMax"],
- kwargs["reportTime"],
- kwargs["smokeSwitch"],
- kwargs["reCheckTime"],
- kwargs["time1"],
- kwargs["time2"],
- kwargs["time3"],
- kwargs["time4"],
- kwargs["power1"],
- kwargs["power2"],
- kwargs["power3"],
- kwargs["powerSwitch"],
- ))
- return result
- @staticmethod
- def _parse_finished(data):
- return {
- "reason": data[2: 4],
- "port": data[10: 12],
- "consumeType": ChargingCXJZBox._convert_consume_type(data[12: 14]),
- "leftTime": data[14: 18],
- "leftElec": data[18: 22],
- "power": data[22: 26]
- }
- def _parse_fault(self, data):
- return self._get_AE_data(data)
- def _check_package(self, package):
- otherConf = self.device.get("otherConf")
- chargeMode = otherConf.get("chargeMode")
- if not chargeMode:
- chargeMode = self._get_A3_data()["chargeMode"]
- chargeElec, chargeCoins = 0, 0
- # 首先是时间模式下
- if chargeMode == ChargeMode.TIME:
- if package["unit"] != u"次":
- raise ServiceException({"result": 2, "description": u"启动失败,套餐单位错误(1001)"})
- chargeCoins = int(package["coins"])
- elif chargeMode == ChargeMode.ELEC:
- if package["unit"] != u"度":
- raise ServiceException({"result": 2, "description": u"启动失败,套餐单位错误(1002)"})
- chargeElec = int(float(package["time"]) * 10)
- else:
- raise ServiceException({"result": 2, "description": u"启动失败,工作模式错误(2001)"})
- return chargeMode, chargeElec, chargeCoins
- def analyze_event_data(self, data):
- funCode = data[2: 4]
- if funCode == "AE":
- result = self._parse_finished(data)
- else:
- result = self._parse_fault(data)
- result["cmdCode"] = funCode
- return result
- def start_device_realiable(self, order): # type:(ConsumeRecord)->dict
- chargeMode, chargeElec, chargeCoins = self._check_package(order.package)
- port = int(order.attachParas["chargeIndex"])
- data = {
- "funCode": "FC",
- "order_id": order.orderNo,
- "port": port,
- "open_id": order.openId,
- "charge_mode": chargeMode,
- "coins": chargeCoins,
- "time": 0, # 该参数已经废弃 不再需要纯时间模式的启动
- "elec": chargeElec
- }
- result = MessageSender.send(
- device = self.device,
- cmd = DeviceCmdCode.OPERATE_DEV_SYNC,
- payload = data,
- timeout = 120
- )
- return result
- def get_dev_setting(self):
- data = self._get_all_settings()
- data["coinTime"] = data["time1"] - data["coinTime"]
- serverData = self._get_server_config()
- data.update(serverData)
- return data
- def set_device_function(self, request, lastSetConf):
- if "cleanCoins" in lastSetConf:
- self._set_B0()
- if "testDevice" in lastSetConf:
- self._set_AF()
- if "lockDevice" in lastSetConf:
- self._set_D0()
- if "unLockDevice" in lastSetConf:
- self._set_D1()
- def set_device_function_param(self, request, lastSetConf):
- validator = CXJZDeviceSettingsValidator(request.POST)
- if not validator.is_valid():
- raise ServiceException({"result": 2, "description": json.dumps(validator.str_errors).decode("unicode-escape")})
- data = validator.suit_data()
- # 先保存一份数据库
- otherConf = self.device.get("otherConf")
- otherConf.update(data)
- Device.objects.filter(devNo=self.device.devNo).update(otherConf=otherConf)
- Device.invalid_device_cache(self.device.devNo)
- self._set_all_settings(**data)
- def get_port_status_from_dev(self):
- result = self._get_A0_data()
- portDict = dict()
- # 做一次数据类型转换
- for _item in result:
- _port = str(_item.pop("port"))
- portDict[_port] = _item
- allPorts, usedPorts, usePorts = self.get_port_static_info(portDict)
- Device.update_dev_control_cache(self.device.devNo, {'allPorts': allPorts, 'usedPorts': usedPorts, 'usePorts': usePorts})
- devCache = Device.get_dev_control_cache(self.device.devNo)
- for _portStr, _info in portDict.items():
- if _portStr in devCache:
- devCache[_portStr].update(_info)
- else:
- devCache[_portStr] = _info
- Device.update_dev_control_cache(self.device.devNo, devCache)
- return portDict
- def get_port_info(self, port):
- return self._get_A1_data(int(port))
- def get_port_status(self, force = False):
- if force:
- self.get_port_status_from_dev()
- devCache = Device.get_dev_control_cache(self.device.devNo)
- portStatus = dict()
- for _k, _v in devCache.items():
- if isinstance(_k, (str, unicode)) and _k.isdigit():
- portStatus[_k] = {"status": _v.get("status", Const.DEV_WORK_STATUS_IDLE)}
- return portStatus
- def active_deactive_port(self, port, active):
- if not active:
- self._stop(port)
|