# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime,time import hashlib import hmac import json import logging import math import random import itsdangerous import requests from django.conf import settings from mongoengine import StringField, DateTimeField, IntField, QuerySet from bson import ObjectId from django.core.cache import cache from apps.web.agent.models import Agent from apps.web.api.utils import AES_CBC_PKCS5padding_encrypt, AES_CBC_PKCS5padding_decrypt from apps.web.common.models import District from apps.web.constant import Const from apps.web.core.db import Searchable from apps.web.dealer.models import Dealer from apps.web.device.models import Device, Group, Part, GroupDict, DeviceDict,SwapGroup,DeviceType from apps.web.user.models import ConsumeRecord,RechargeRecord from apps.web.south_intf.shangdong_platform import GB2260 from apps.web.constant import DeviceOnlineStatus logger = logging.getLogger(__name__) #: 互联互通签约合同信息 class SwapContract(Searchable): source = StringField(verbose_name = "签约渠道方,比如快电、新电途") swapLabel = StringField(verbose_name = "交换信息标记,比如微付乐和快电互通,就是wfl_kd",unique = True) operatorType = StringField(verbose_name = "签约运营商在我们系统中的类型, dealer/agent/manager", default = "manager") operatorInnerId = StringField(verbose_name = "运营商在我们系统中的的ID,比如manager ID") OperatorId = StringField(verbose_name = "运营商ID") #对于新营业执照(三码合一),其组织机构代码为社会信用代码去掉前八位和最后一位后的 中间的数字 OperatorName = StringField(verbose_name = "机构全称") OperatorTell = StringField(verbose_name = "运营商客服电话1") OperatorTel2 = StringField(verbose_name = "运营商客服电话2") OperatorRegAddress = StringField(verbose_name = "运营商注册地址") OperatorNote = StringField(verbose_name = "备注信息") deviceChangedTime = DateTimeField(default = datetime.datetime.now, verbose_name = '设备变化的最新时间') brokerage = IntField(verbose_name = "佣金比例", default = 10) #消息通讯相关字段 # 北向信息 即我们推送到平台所需要的信息时候需要用到的 northToken = StringField(verbose_name=u"token", default="") # 推送信息的时候的身份验证 northTokenExpiredTime = DateTimeField(verbose_name=u"token的过期时间", default=datetime.datetime.now) # 过期重新获取 northPort = StringField(verbose_name=u"推送的IP地址", default="") tokenSecret = StringField(verbose_name=u"tokenSecret", default="98YNJUDJDIE838KLMNXYNXL") # 我方登录北向平台的账号信息:OperatorID + northOperatorSecret密钥 secretFromUs = StringField(verbose_name=u"北向平台机构秘钥", default="") # 获取northToken时候使用,后续通讯需要token sigSecretFromUs = StringField(vebose_name=u"北向平台机构签名秘钥", default="") # 我方主动推送数据的时候, 加、解密 数据使用 dataSecretFromUs = StringField(verbose_name=u"北向数据秘钥", default="") # 我方主动推送数据的时候,加、解密 数据使用 dataSecretIVFromUs = StringField(verbose_name=u"北向数据秘钥向量", default="")# 我方主动推送数据的时候,加、解密 数据使用 # 我们的信息 即其他平台拉取的时候需要用到的 operatorId2Us = StringField(verbose_name=u"账号", default="") # 省平台登录我方服务器获取token时候使用(由我方提供) secret2Us = StringField(verbose_name=u"密码", default="") # 省平台登录我方服务器获取token时候使用(由我方提供) sigSecret2Us = StringField(vebose_name=u"平台机构秘钥", default="") # 省平台拉取我方服务器数据时候 加、解密数据使用 dataSecret2Us = StringField(verbose_name=u"数据秘钥", default="") # 省平台拉取我方服务器数据时候 加、解密数据使用 dataSecretIV2Us = StringField(verbose_name=u"数据秘钥", default="") # 省平台拉取我方服务器数据时候 加、解密数据使用 statusMap = { str(Const.DEV_WORK_STATUS_IDLE):1, str(Const.DEV_WORK_STATUS_WORKING):3, str(Const.DEV_WORK_STATUS_FAULT):255, str(Const.DEV_WORK_STATUS_FORBIDDEN):255, str(Const.DEV_WORK_STATUS_PAUSE):2, str(Const.DEV_WORK_STATUS_CONNECTED):2, str(Const.DEV_WORK_STATUS_FINISHED):2, str(Const.DEV_WORK_STATUS_MAINTENANCE):2, str(Const.DEV_WORK_STATUS_APPOINTMENT):4, str(Const.DEV_WORK_STATUS_FAULT_OVERLOAD):255, str(Const.DEV_WORK_STATUS_OCCUPY):2, str(Const.DEV_WORK_STATUS_ESTOP):2, str(Const.DEV_WORK_STATUS_READY):2, str(Const.DEV_WORK_STATUS_FAULT_RELAY_CONNECT):255 } @property def ipPort(self): return self.northPort def generate_json_token(self, data, expire=None): its = itsdangerous.TimedJSONWebSignatureSerializer(self.tokenSecret, expire) return its.dumps(data) def parse_json_token(self,s, expire=None): its = itsdangerous.TimedJSONWebSignatureSerializer(self.tokenSecret, expire) try: result = its.loads(s) except itsdangerous.BadData: return dict() return result def get_token_data(self): """ 获取token的载数据 身份验证以平台为维度获取 那么token的范围也以 平台为准 即 northOperatorID """ return { "id": str(self.id), } @classmethod def get_norther_by_label(cls,label): return cls.objects(swapLabel = label).first() @classmethod def get_norther(cls, **kwargs): # type:(dict) -> QuerySet """ 获取norther的时候 有两个维度获取方式 第一种,我方主动推送的方式,由于每一个norther的 dealerId 唯一 所以一定能找到唯一的一个norther 第二种,我方被动回复 由于省平台拉取信息是以平台为单位,即AgentOperator 此时的norther不唯一了(有可能同一个平台下 有很多经销商都要对接) """ filters = {} # 通过token的信息找 被动回复 不唯一 kwargs.get("id") and filters.update({"id": ObjectId(kwargs["id"])}) return cls.objects.filter(**filters) def get_sig(self, data, push=False): """ 生成签名字符串 根据使用的场景,确认签名的盐 :param data: 生成签名的数据 iter :param push: 推送还是拉取 :return: """ sigSecret = str(self.sigSecretFromUs) if push else str(self.sigSecret2Us) return hmac.new(sigSecret, data, hashlib.md5).hexdigest().upper() def send_request(self, url, **kwargs): """ 主动发送HTTP请求获取数据 秘钥以及签名 :param url: :param kwargs: :return: """ headers = {"Content-Type": "application/json;charset=utf-8"} token = kwargs.pop("token", None) if token: headers.update({"Authorization": "Bearer {}".format(token)}) timeout = kwargs.pop("timeout", 5) # 主动推送 加密以及向量为 dataSecret 和 dataSecretIV data = AES_CBC_PKCS5padding_encrypt( json.dumps(kwargs), dataSecret=self.dataSecretFromUs, dataSecretIV=self.dataSecretIVFromUs ) data = { "OperatorID": self.OperatorID, "TimeStamp": datetime.datetime.now().strftime("%Y%m%d%H%M%S"), "Seq": "{:0>4}".format(random.randint(1, 1)), "Data": data } sig = self.get_sig(data.get("OperatorID") + data.get("Data") + data.get("TimeStamp") + data.get("Seq"), push=True) data.update({"Sig": sig}) try: response = requests.post(url = url, json = data, headers = headers, timeout = timeout) except requests.Timeout: return dict() except Exception as e: logger.exception(e) return dict() if response.status_code != 200: return dict() # 这个地方的解密 仅仅是为了解密数据 打印日志 try: responseData = response.json().get("Data", "") if responseData: responseData = json.loads( AES_CBC_PKCS5padding_decrypt(responseData) or "{}" ) logger.info("response result:{}".format(response.json())) logger.info("receive responseData:{}".format(responseData)) except Exception,e: return dict() return response.json() def join_url(self, path): """ 拼接url :param path: :return: """ # 版本号的确定 return "{ipPort}/{path}".format( ipPort=self.ipPort, path=path ) def get_token(self): """ 获取平台的token 更新token有效期 :return: """ if self.northToken and self.northTokenExpiredTime > datetime.datetime.now(): return self.northToken url = self.join_url("query_token") data = { "OperatorID": self.OperatorID, "OperatorSecret": self.secretFromUs } result = self.send_request(url, **data) ret = result.get("Ret") if ret != 0: return responseJson = result.get("Data") responseData = json.loads( AES_CBC_PKCS5padding_decrypt(s=responseJson, dataSecret=self.dataSecretFromUs, dataSecretIV=self.dataSecretIVFromUs) or "{}" ) # 防止解析出错 tokenAvailableTime = responseData.get("TokenAvailableTime", 0) token = responseData.get("AccessToken", "") # 数据库更新 self.update( northToken=token, northTokenExpiredTime=datetime.datetime.now()+datetime.timedelta(seconds = tokenAvailableTime) ) return token def notification_station(self, groupId): """ 充电站信息上报 GROUP :param groupId: :return: """ data = self.get_station(groupId, self) url = self.join_url("notification_stationInfo") token = self.get_token() self.send_request(url = url, token = token, StationInfo = [data]) def notification_order_info(self, consumeDict, stopReason = None): """ 当运营商平台完成一次充电时,将订单信息推送至省级平台。 :param consumeDict: :param stopReason: :return: """ # 参数格式化 _time = datetime.datetime.now().strftime("%Y%m%d%H%M%S") totalPower = float("{:.2f}".format(float(consumeDict.get("totalPower")))) totalElecMoney = float("{:.2f}".format(float(consumeDict.get("totalElecMoney")))) seq = self.agentOperatorID + _time + str(random.randint(0, 9999)) logger.info("Shan Dong Norther send order Info, seq is {}".format(seq)) data = { "StartChargeSeq": seq, "ConnectorID": consumeDict.get("connectorId"), "StartTime": consumeDict.get("startTime"), "EndTime": consumeDict.get("endTime"), "TotalPower": totalPower, "TotalElecMoney": totalElecMoney, "TotalSeviceMoney": 0.00, "TotalMoney": totalElecMoney, "StopReason": stopReason or 1 } logger.info('notification_order_info:{}'.format(data)) url = self.join_url("notification_orderInfo") token = self.get_token() self.send_request(url, token = token, **data) def retry_push_notification_order_info(self, retry_push_dict): """ 能源局出现问题需要手动推送时候找到日志然后执行 此函数只能手动执行 """ data = retry_push_dict.copy() url = self.join_url("notification_orderInfo") token = self.get_token() self.send_request(url, token = token, **data) logger.info('retry_push_notification_order_info is ok seq={}'.format(retry_push_dict['StartChargeSeq'])) def alarm_report(self, devNo, code, desc = None, status = None): """ 当充电接口发生异常告警或故障时,运营商企业平台主动推送信息到省级平台。 :param devNo: :param code: :param desc: :param status: :return: """ desc = desc or u"设备故障" status = status or 0 data = { "equipmentID": devNo, "alert_time": str(datetime.datetime.now())[19:], "alert_code": code, "describe": desc, "status": status } logger.info('alarm_report:{}'.format(data)) url = self.join_url("alarm_report") token = self.get_token() self.send_request(url, token = token, AlarmInfos = [data]) def notification_stationStatus(self, connectorId,status): data = { "ConnectorID": str(connectorId), "Status": status, } url = self.join_url("notification_stationStatus") token = self.get_token() self.send_request(url, token = token, ConnectorStatusInfo = data) def get_station(self,swap): """ 获取站点信息 :param groupId: :param norther: :return: """ # 获取设备信息 EquipmentInfos = [] devNos = Device.get_devNos_by_group([swap.groupId]) for devNo in devNos: dev = Device.get_dev(devNo) # type: DeviceDict if (u'交流' not in dev.majorDeviceType) and (u'直流' not in dev.majorDeviceType) : continue devInfo = self.get_equipment(dev) if devInfo: EquipmentInfos.append(devInfo) # 充电站信息 group = Group.get_group(swap.groupId) data = { "StationID": swap.StationID, # 充电站ID 20 "OperatorID": self.OperatorID, # 组织机构代码 9 "EquipmentOwnerID": self.OperatorID, # 设备所属方组织机构代码 9 "StationName": group.get("groupName"), # 充电站名称描述 50 "CountryCode": "CN", # 国家代码 固定 "AreaCode": GB2260.get_code(District.get_area(group.get("districtId"))), # 地区编码 20 "Address": group.get("address"), # 详细地址 50 "StationTel": swap.StationTel, # 站点责任人电话, "ServiceTel": swap.ServiceTel, # 站点服务电话 "StationType": swap.StationType, # 站点类型 "StationStatus": swap.StationStatus, # 站点状态 "ParkNums": swap.ParkNums, # 车位数量 0代表未知 "StationLng": "{:.6f}".format(float(swap.gcjLng)), # 精度(6位小数) "StationLat": "{:.6f}".format(float(swap.gcjLat)), # 维度(6位小数) 'SiteGuide':swap.SiteGuide, "Construction": swap.Construction, # 建设场所 'Pictures':[pic['Url'] for pic in swap.Pictures], 'MatchCars':swap.MatchCars, 'Parkinfo':swap.ParkInfo, 'BusineHours':swap.BusineHours, 'ElectricityFee':swap.ElectricityFee, 'ServiceFee':swap.ServiceFee, "ParkFee": swap.ParkFee, "Payment": swap.Payment, "SupportOrder": 0, # 是否支持预约 'Remark':swap.Remark, "EquipmentInfos": EquipmentInfos, # 充电站信息 } return data def get_equipment(self,dev): # type:(DeviceDict, SwapContract) -> dict ConnectorInfo = [] devType = DeviceType.objects(id = dev['devType']['id']).first() if devType is None: return None for part in dev.parts: ConnectorInfo.append(self.get_connector(devType,part)) EquipmentType = 1 if u'交流' in devType.majorDeviceType: EquipmentType = 2 elif u'交直流' in devType.majorDeviceType: EquipmentType = 3 data = { "EquipmentID": dev.devNo, 'ManufacturerID':devType.extraInfo['ManufacturerID'], "ManufacturerName": devType.extraInfo['ManufacturerName'], "EquipmentModel": devType.extraInfo['ManufacturerName'], "EquipmentType": EquipmentType, "ConnectorInfos": ConnectorInfo, 'Power':devType.extraInfo['portPower'], "EquipmentName": dev.get("logicalCode"), } return data def get_connector(self,devType,part): # type:(Part, SwapContract) -> dict """ 获取部件信息 """ return { "ConnectorID": str(part.id), "ConnectorName": part.partName, "ConnectorType": devType.extraInfo['ConnectorType'], "VoltageUpperLimits": devType.extraInfo['VoltageUpperLimits'], "VoltageLowerLimits": devType.extraInfo['VoltageLowerLimits'], "Current": devType.extraInfo['portCurrent'], "Power": devType.extraInfo['portPower'], "ParkNo": "-", "NationalStandard": 1, } @staticmethod def get_station_state(stationId, startTime, endTime): """ 获取充电站的 一段时间内的统计信息 主要是电量 :param groupId: :param startTime: :param endTime :return: """ swap = SwapGroup.objects(StationID = stationId).first() if not swap: return {} EquipmentStatsInfos = [] devNos = Device.get_devNos_by_group([swap.groupId]) for devNo in devNos: dev = Device.get_dev(devNo) # type: DeviceDict if (u'交流' not in dev.majorDeviceType) and (u'直流' not in dev.majorDeviceType) : continue tempState = SwapContract.get_equipment_state(dev, startTime, endTime) EquipmentStatsInfos.append(tempState) return { "EquipmentStatsInfos": EquipmentStatsInfos } @staticmethod def get_equipment_state(dev, startTime, endTime): """ 获取充电设备的 一段时间内的统计信息 主要是电量 :param devNo: :param startTime: :param endTime: :return: """ ConnectorStatsInfo = [] devElec = 0 filters = { "devNo": dev['devNo'], "finishedTime__gte": startTime, "finishedTime__lte": endTime, } records = ConsumeRecord.objects.filter(**filters).only("servicedInfo","attachParas") portElecDict = {} for item in records: elec = item.servicedInfo.get("elec", 0.0) devElec += elec portNo = str(item.servicedInfo['chargeIndex']) if portNo not in portElecDict: portElecDict[portNo] = 0.0 else: portElecDict[portNo] += elec for part in dev.parts: elec = portElecDict.get(str(part.partNo),0.0) ConnectorStatsInfo.append({"ConnectorID": str(part.id),"ConnectorElectricity": float("{:.1f}".format(float(elec)))}) return { "EquipmentID": dev['devNo'], "EquipmentElectricity": devElec, "ConnectorStatsInfos": ConnectorStatsInfo } @staticmethod def get_connector_status_infos(stationId): """ 获取充电站的当前状态 :param groupId: :return: """ swap = SwapGroup.objects(StationID = stationId).first() if not swap: return [] ConnectorStatusInfos = [] devNos = Device.get_devNos_by_group([swap.groupId]) for devNo in devNos: device = Device.get_dev(devNo) if (u'交流' not in device.majorDeviceType) and (u'直流' not in device.majorDeviceType) : continue parts = Part.objects.filter(logicalCode = device.logicalCode) online = device.get("online", True) for part in parts: # 判断端口当前状态 if not online: status = 0 else: status = SwapContract.statusMap.get(part.status,1) data = { "ConnectorID": str(part.id), "Status": status, } ConnectorStatusInfos.append(data) return ConnectorStatusInfos @staticmethod def get_policy_info(partId): """ 获取 端口的计费信息 :param partId: :return: """ DEFAULT_ELEC_PRICE = 1.500 ELEC_FUNCS = [SwapContract.get_elec_price_by_package, SwapContract.get_elec_price_by_conf, SwapContract.get_elec_price_by_consume] part = Part.objects.filter(id = partId).first() if not part: return devNo = Device.get_devNo_by_logicalCode(part.logicalCode) for func in ELEC_FUNCS: try: elecPrice = func(devNo) except Exception: elecPrice = None if elecPrice: break else: elecPrice = DEFAULT_ELEC_PRICE return { "StartTime": "000000", "ElecPrice": elecPrice, "SevicePrice": None } @staticmethod def get_elec_price_by_package(devNo): """ 通过套餐获取电费 :param devNo: :return: """ device = Device.get_dev(devNo) package = device.get("washConfig", dict()).get("1", dict()) if not package: return price = package.get("price") time = package.get("time") unit = package.get("unit") if unit != u"度": return if not all([price, time]): return try: elecPrice = float("{:.4f}".format(float(price) / float(time))) except ZeroDivisionError: return return elecPrice @staticmethod def get_elec_price_by_conf(devNo): """ 通过设备设置设置电费 :param devNo: :return: """ device = Device.get_dev(devNo) elecPrice = device.get("otherConf", dict()).get("elecPrice") return float("{:.4f}".format(float(elecPrice))) @staticmethod def get_elec_price_by_consume(devNo): """ 通过 最近一次的消费记录获取电费 :param devNo: :return: """ record = ConsumeRecord.objects.filter(devNo = devNo).sort("-id").first() if not record or not record.servicedInfo: return elec = record.servicedInfo.get("elec") spend = record.servicedInfo.get("spend") if not all([elec, spend]): return try: elecPrice = float("{:.4f}".format(float(spend) / float(elec))) except ZeroDivisionError: return return elecPrice @staticmethod def find_need_notify_northers(dev): ownerId = dev['ownerId'] owner = Dealer.objects(id = ownerId).first() if owner is None: return [] agent = Agent.objects(id = owner.agentId).first() if agent is None: return [] northers = SwapContract.objects(operatorInnerId__in = [ownerId,owner.agentId,agent.managerId]) return northers @staticmethod def notify_2_all_northers_port_status(dev,portNo,status): status = SwapContract.statusMap.get(status,1) northers = SwapContract.find_need_notify_northers(dev) part = Part.objects(logicalCode = dev['logicalCode'],partNo = str(portNo)).first() for norther in northers: logger.info('norther label=%s,port=%s,notification_stationStatus=%s' % (norther.swapLabel,portNo,status)) norther.notification_stationStatus(str(part.id),status) @staticmethod def notify_2_all_northers_port_network_status(dev,networkStatus,workStatus=1): if networkStatus == DeviceOnlineStatus.DEV_STATUS_OFFLINE: status = 0# 离线 else: status = SwapContract.statusMap.get(workStatus,1) # 如果是在线,应该以设备实际的工作状态为主 northers = SwapContract.find_need_notify_northers(dev) for norther in northers: ports = Part.objects(logicalCode = dev['logicalCode']) for port in ports: logger.info('norther label=%s,port=%s,notification_stationStatus=%s' % (norther.swapLabel,port,status)) norther.notification_stationStatus(port.id,status) @staticmethod def notify_2_all_northers_order_status(dev,orderStatus): northers = SwapContract.find_need_notify_northers(dev) for norther in northers: logger.info('norther label=%s,notification_stationStatus=%s' % (norther.swapLabel,orderStatus)) url = norther.join_url("notification_equip_charge_status") token = norther.get_token() norther.send_request(url, token = token, **orderStatus) def notification_start_charge_result(self,data): data.update({'StartTime':datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) url = self.join_url("notification_start_charge_result") token = self.get_token() self.send_request(url, token = token, **data) def notification_stop_charge_result(self,data): url = self.join_url("notification_stop_charge_result") token = self.get_token() self.send_request(url, token = token, **data) @staticmethod def notify_2_all_northers_order_info(dev,data): northers = SwapContract.find_need_notify_northers(dev) for norther in northers: url = norther.join_url("notification_charge_order_info") logger.info('norther label=%s,notification_charge_order_info=%s' % (norther.swapLabel,data)) token = norther.get_token() norther.send_request(url, token = token, **data) def check_charge_orders(self,startTime,endTime): rechargeRcds = RechargeRecord.get_collection().find({'time':{'$gte':startTime,'$lte':endTime},'extraInfo.swapSource':self.label}) OrderCount = 0 TotalOrderPower = 0 TotalOrderMoney = 0 ChargeOrders = [] for rcd in rechargeRcds: consumeRcd = ConsumeRecord.objects(orderNo = rcd['extraInfo']['consumeOrderNo']).first() if consumeRcd is None: continue ChargeOrders.append({ 'StartChargeSeq':rcd.wxOrderNo, 'TotalPower':consumeRcd.servicedInfo.get('TotalPower',0), 'TotalMoney':consumeRcd.servicedInfo.get('TotalMoney',0) }) TotalOrderPower += consumeRcd.servicedInfo.get('TotalPower',0) TotalOrderMoney += consumeRcd.servicedInfo.get('TotalMoney',0) OrderCount += 1 result = { 'CheckOrderSeq':self.OperatorID + str(time.time()*1000) + str(random.randint(10000,99999)), 'StartTime':startTime, 'EndTime':endTime, 'OrderCount':OrderCount, 'TotalOrderPower':TotalOrderPower, 'TotalOrderMoney':TotalOrderMoney, 'ChargeOrders':ChargeOrders } url = self.join_url("notification_charge_order_infb") token = self.get_token() result = self.send_request(url, token = token, **result) ret = result.get("Ret") if ret != 0: return {} responseJson = result.get("Data") responseData = json.loads( AES_CBC_PKCS5padding_decrypt(s=responseJson, dataSecret=self.dataSecretFromUs, dataSecretIV=self.dataSecretIVFromUs) or "{}" ) return responseData @staticmethod def update_swap_time_and_num(groupId,ownerId,agentId,managerId,devNum=0): northers = SwapContract.objects(operatorInnerId__in = [ownerId,str(agentId),str(managerId)]) for norther in northers: norther.deviceChangedTime = datetime.datetime.now() try: norther.save() except Exception,e: continue