# coding=utf-8 """ 暂时不做了 通信方式为socket """ from apilib.utils_string import split_str from apps.web.core.adapter.base import * class IntSettingsIndex(object): MAX_VOLTAGE = 5 class StrSettingsIndex(object): LOGICAL_CODE = 1 STANDARD_TIME = 2 ADMIN_PASSWORD = 3 OPERATOR_PASSWORD = 4 SERVER_PASSWORD = 6 SERVER_PHONE = 8 PAY_QR_CODE = 10 class ControlIndex(object): STOP_CHARGE = 2 CHARGE_MODE = 9 CANCEL = 10 DEVICE_REBOOT = 11 class LiGeCar(SmartBox): """ 里歌汽车充电桩 """ _to_ascii = staticmethod(lambda x: binascii.hexlify(x)) _to_str = staticmethod(lambda x: binascii.unhexlify(x)) def __init__(self, device): super(LiGeCar, self).__init__(device) def _send_data(self, funCode, data, cmd = None, timeout = MQTT_TIMEOUT.NORMAL): if not cmd: cmd = DeviceCmdCode.OPERATE_DEV_SYNC result = MessageSender.send(device = self.device, cmd = cmd, payload = { "IMEI": self.device.devNo, "funCode": funCode, "data": data, }, timeout = timeout) if "rst" in result and result.get("rst") != 0: if result.get("rst") == -1: raise ServiceException({"result": 2, "description": u"网络故障,请重新试试"}) if result.get("rst") == 1: raise ServiceException({"result": 2, "description": u"充电桩无响应,请稍后再试试"}) return result def _query_int_type_settings(self, index, num): """ 查询整形参数 目前只支持查询 :param index: 查询参数起始下标 :param num: 查询参数的个数 :return: """ indexHex = fill_2_hexByte(hex(int(index)), 8, reverse=True) numHex = fill_2_hexByte(hex(int(num)), 2) sendData = "".join(("0000", "0000", "00", indexHex, numHex)) result = self._send_data("0001", sendData) if result.get("data", "")[84:86] != "00": raise ServiceException({"result": 2, "description": u"获取参数失败,请重新试试"}) data = result.get("data", "")[86:-2] # 每个参数4个字节,开始解析数据, 比率换算交由上层函数 dataList = list() for i in xrange(num): tempData = reverse_hex(data[i*8: i*8+8]) dataList.append(int(tempData)) return dataList def _query_str_type_settings(self, index, toStr=True): """ 获取字符串参数 :param index: 查询数据的下标 :return: """ indexHex = fill_2_hexByte(hex(int(index)), 8, reverse=True) sendData = "".join(("0000", "0000", "00", indexHex)) result = self._send_data("0003", sendData) if result.get("data", "")[82:84] != "00": raise ServiceException({"result": 2, "description": u"获取参数失败,请重新试试"}) data = result.get("data", "")[84:-2] if toStr: return self._to_str(data) return data def _set_int_type_settings(self, index, data): """ 设置整形参数 可对于连续参数进行设置 :param index: 设置整形参数的起始下标 :param data: iterable 形式 :return: """ indexHex = fill_2_hexByte(hex(int(index)), 8, reverse=True) numHex = fill_2_hexByte(hex(len(data)), 2) dataLenHex = fill_2_hexByte(hex(len(data)*4), 4, reverse=True) # 设置参数字节数 每个参数4个字节 dataHex = "".join([fill_2_hexByte(hex(int(item)), 8, reverse=True) for item in data]) sendData = "".join(("0000", "0000", "01", indexHex, numHex, dataLenHex, dataHex)) result = self._send_data("0003", sendData) if result.get("data")[84:86] != "00": raise ServiceException({"result": 2, "description": u"设置参数失败,请重新试试"}) def _set_str_type_settings(self, index, data, lens, makeUp=True, toAscii=True): """ 设置字符串的参数 除了 标准时间 以及 桩登陆密码 设置之外,其余所有参数均需要转为ascii码 :param index: 设置参数的索引 :param data: 设置参数的数值 :param lens: 参数的标准长度 单位是字节 :param toAscii: 是否将参数转为ascii码 :param makeUp: 是否需要将参数补齐 :return: """ if toAscii: data = self._to_str(data) if makeUp: data = ("{:<0%s}" % lens*2).format(data) indexHex = fill_2_hexByte(hex(int(index)), 8, reverse=True) dataLen = fill_2_hexByte(hex(len(data)/2), 4, reverse=True) sendData = "".join(("0000", "0000", "01", indexHex, dataLen, data)) result = self._send_data("0003", sendData) if result.get("data")[82:84] != "00": raise ServiceException({"result": 2, "description": u"设置参数失败,请重新试试"}) def _control_device(self, port, index, data): """ 后台服务器下发充电桩控制命令 按照协议规范,其实可以一次下发多条连续的命令,但业务目前不需要, 目前只做单一指令下发设计 :param index: 控制命令下标 :param data: 控制命令数据 :return: """ portHex = fill_2_hexByte(hex(int(port)), 2) indexHex = fill_2_hexByte(hex(int(index)), 8, reverse=True) dataHex = fill_2_hexByte(hex(int(data)), 8, reverse=True) sendData = "".join(("0000", "0000", portHex, indexHex, "01", "0004", dataHex)) result = self._send_data("0005", sendData) if result.get("data")[84:86] != "00": raise ServiceException({"result": 2, "description": u"操作失败,请重新试试"}) def _start_charge(self, port, chargeStrategy, times, openId, orderNo): """ 后台服务器向充电桩开始充电控制命令 目前扫码充电暂不支持 预约充电功能 reservationTime/reservationTimeOut 留作扩展 :param port: 充电端口号 :param chargeStrategy: 充电策略 0:充满为止/1:时间控制充电/2:金额控制充电/3:电量控制充电 :param times: 充电策略参数 时间单位是秒 金额单位是分 电量单位是0.01kw.h :param openId: 用户识别号 :param orderNo: 充电流水号 单号 :return: """ portHex = fill_2_hexByte(hex(int(port)), 2) typeHex = fill_2_hexByte(hex(int(1)), 8, reverse=True) stopPasswordHex = fill_2_hexByte(hex(int(0)), 8, reverse=True) chargeStrategyHex = fill_2_hexByte(hex(int(chargeStrategy)), 8, reverse=True) timesHex = fill_2_hexByte(hex(int(times)), 8, reverse=True) reservationTimeHex = fill_2_hexByte(hex(int(0)), 16, reverse=True) reservationTimeOutHex = fill_2_hexByte(hex(int(0)), 2) openIdAscii = "{:<064}".format(self._to_ascii(openId)) chargeWithoutNetHex = fill_2_hexByte(hex(int(1)), 2) chargeWithoutNetElecHex = "FFFFFFFF" orderNoAscii = "{:<064}".format(self._to_ascii(orderNo)) sendDataList = [ "0000", "0000", portHex, typeHex, stopPasswordHex, chargeStrategyHex, timesHex, reservationTimeHex, reservationTimeOutHex, openIdAscii, chargeWithoutNetHex, chargeWithoutNetElecHex, orderNoAscii ] sendData = "".join(sendDataList) result = self._send_data("0007", sendData) reasonCode = int(result.get("data", "")[74:82], 16) if reasonCode != 0: raise ServiceException({"result": 2, "description": u"启动充电失败,请重新试试{}".format(reasonCode)}) def _parse_device_port_status(self, data): """ 充电桩定期发送此信息上报充电桩当前工作状态信息 工作状态 0/空闲中 1/正准备开始充电 2/充电进行中 3/充电结束 4/启动失败 5/预约状态 6/系统故障 :param data: 原始的报文数据 :return: """ # TODO 无用数据太多 等待模块层处理 data = split_str(data, startIndex=16, lens=[4, 4, 64, 2, 2, 2, 2, 2, 8, 2, 8, 8, 8, 4, 4, 4, 4, 2, 4, 4, 4, 4, 4, 4, 4, 8, 8, 8, 8, 2, 2, 8, 2, 64, 2, 16, 8, 8, 8, 8, 8, 8, 2, 2, 2, 36, 2] ) portStr = data[4] status = data[6] connectStatus = data[9] fee = data[10] def _parse_warning(self, data): """ 解析告警事件 :param data: :return: """ # TODO 对于告警信息的处理 这个地方细分种类太多, 共计255种告警信息 是否需要进行分类处理 def _parse_account_info(self): """ 充电桩充电上传用户帐户查询报文 这个接口主要的功能在于查询用户卡号是否OK :return: """ # 对于卡片而言,目前我们只做了卡号鉴‘ # 对 def get_port_status_from_dev(self): """ 从设备端直接获取端口状态,此款主板没有直接提供,就先从换从缓存中直接读取 :return: """ devCache = Device.get_dev_control_cache(self._device["devNo"]) allPorts = devCache.get("allPorts") if not allPorts: allPorts, portNums = self._query_int_type_settings(3, 2) Device.update_dev_control_cache(self.device.devNo, {"allPorts": allPorts}) portInfo = {} for i in xrange(allPorts): portStr = str(i+1) portTemp = devCache.get(portStr) if not portTemp: portTemp = {"status": Const.DEV_WORK_STATUS_IDLE} portInfo[portStr] = portTemp return portInfo def get_port_status(self, force = False): """ 获取端口状态, 用户扫码或者经销商上分的时候会需要 :param force: :return: """ if force: return self.get_port_status_from_dev() devCache = Device.get_dev_control_cache(self._device['devNo']) statusDict = dict() allPorts = devCache.get('allPorts') if not allPorts: allPorts, portNums = self._query_int_type_settings(3, 2) Device.update_dev_control_cache(self.device.devNo, {"allPorts": allPorts}) for portNum in range(allPorts): tempDict = devCache.get(str(portNum + 1), {}) if "status" in tempDict: statusDict[str(portNum + 1)] = {'status': tempDict.get('status')} elif "isStart" in tempDict: if tempDict['isStart']: statusDict[str(portNum + 1)] = {'status': Const.DEV_WORK_STATUS_WORKING} else: statusDict[str(portNum + 1)] = {'status': Const.DEV_WORK_STATUS_IDLE} else: statusDict[str(portNum + 1)] = {'status': Const.DEV_WORK_STATUS_IDLE} allPorts, usedPorts, usePorts = self.get_port_static_info(statusDict) Device.update_dev_control_cache(self._device['devNo'], {'allPorts': allPorts, 'usedPorts': usedPorts, 'usePorts': usePorts}) return statusDict def start_device(self, package, openId, attachParas): """ 启动设备 :param package: 套餐 :param openId: 用户 :param attachParas: 其余参数 :return: """ chargeIndex = attachParas.get("chargeIndex") if not chargeIndex: raise ServiceException({"result": 2, "description": u"请选择相应的充电端口号之后再启动充电"}) unit = package.get("unit") _time = package.get("time") coins = package.get("coins") def analyze_event_data(self, data): """ 设备上报的事件 :param data: :return: """