# -*- coding: utf-8 -*- # !/usr/bin/env python """ 上海城运充电桩数据采集公共平台 接口ip与端口:http://211.136.105.235:8081/ljzwsq parentId:XJCLvU7Q2qJFYLkG secret:5cgj1M7YNMDvXlA7 """ import datetime import json import logging import traceback import requests from Crypto.Util.Padding import pad, unpad from Crypto.Cipher import AES from mongoengine import StringField from apps.web.constant import Const from apps.web.core.db import Searchable from apps.web.device.models import DeviceDict, Device SERVER_HOST = 'http://211.136.105.235:8081/ljzwsq' PARENT_ID = 'XJCLvU7Q2qJFYLkG' SECRET = '5cgj1M7YNMDvXlA7' logger = logging.getLogger(__name__) class AesUtil(object): def __init__(self, key=None): self.key = key or SECRET # 初始化密钥 self.length = AES.block_size # 初始化数据块大小 self.aes = AES.new(self.key.encode('utf-8'), AES.MODE_ECB) # 初始化AES,ECB模式的实例 def encrypt(self, encrData): # 加密函数 res = self.aes.encrypt(pad(encrData, self.length)) return res.encode('hex') def decrypt(self, decrData): # 解密函数 res = decrData.decode("hex") try: msg = self.aes.decrypt(res) return unpad(msg, self.length) except Exception, e: return None class ShangHaiUrbanDataCollectionPlatformModel(Searchable): dealerId = StringField(verbose_name='经销商') parentId = StringField(verbose_name='合作方认证 key') serverHost = StringField(verbose_name='接口地址') secret = StringField(verbose_name='密钥') meta = { 'collection': 'shanghai_urban_data_collection_platform' } @classmethod def get_dealer_info(cls, dealerId): obj = cls.objects.filter(dealerId=dealerId).first() if not obj: return {} else: return { 'dealerId': obj.dealerId, 'parentId': obj.parentId, 'serverHost': obj.serverHost, 'secret': obj.secret, } @classmethod def all_dealers(cls): objs = cls.objects.all() return map(lambda obj: { 'dealerId': obj.dealerId, 'parentId': obj.parentId, 'serverHost': obj.serverHost, 'secret': obj.secret, }, objs) class ShangHaiUrbanDataCollectionPlatform: def __init__(self, dev, serverHost, parentId, secret, **kw): self.device = dev # type: DeviceDict self.serverHost = serverHost self.parentId = parentId self.AES = AesUtil(secret) def post(self, url, data): try: kw = { 'parentId': self.parentId, 'data': data } logger.info( 'send to shanghai urban data collection platform url=< {} > data=< {} >'.format(url, json.dumps(data))) url = self.serverHost + url return requests.post(url=url, json=kw, timeout=3).json() except: logger.error(traceback.format_exc()) return {} # 新增充电桩基础信息 def add_dev_info(self): url = '/power/api/PushPowerBasicDataSave' data = { # "powerCode": self.device.logicalCode, "powerName": self.device.devTypeName, "code": self.device.logicalCode, "address": self.device.group.get('address', ''), "longitude": '{:0.3f}'.format(self.device.get('lng', 0.0)), "latitude": '{:0.3f}'.format(self.device.get('lat', 0.0)), "powerNodeList": [{'nodeIndex': '{}'.format(k), 'code': '{}-{}'.format(self.device.logicalCode, k)} for k in self.device.deviceAdapter.get_port_status() if k.isdigit()] } logger.info( 'send to shanghai urban data collection platform url=< {} > source=< {} >'.format(url, json.dumps(data, ensure_ascii=False))) data = self.AES.encrypt(json.dumps(data)) res = self.post(url, data) logger.info(json.dumps(res, ensure_ascii=False)) return res # 更新充电桩基础信息 def update_dev_info(self): url = '/power/api/PushPowerBasicDataUpdate' data = { # "powerCode": self.device.logicalCode, "powerName": self.device.devTypeName, "code": self.device.logicalCode, "longitude": '{:0.3f}'.format(self.device.get('lng', 0.0)), "latitude": '{:0.3f}'.format(self.device.get('lat', 0.0)), "powerNodeList": [{'nodeIndex': '{}'.format(k), 'code': '{}-{}'.format(self.device.logicalCode, k)} for k in self.device.deviceAdapter.get_port_status() if k.isdigit()] } logger.info( 'send to shanghai urban data collection platform url=< {} > source=< {} >'.format(url, json.dumps(data, ensure_ascii=False))) data = self.AES.encrypt(json.dumps(data)) res = self.post(url, data) logger.info(json.dumps(res, ensure_ascii=False)) return res # 充电桩状态心跳 def push_heatbeat(self): url = '/power/api/PowerNodeStatusBeat' powerNodeList = [] ports_info = self.device.deviceAdapter.get_port_status() for k, v in ports_info.items(): if k.isdigit(): powerNodeList.append( { "powerCode": self.device.logicalCode, # 充电桩 "code": '{}-{}'.format(self.device.logicalCode, k), # 端口 "isOnline": "在线" if self.device.online else "离线", "status": "充电中" if v.get('status') == Const.DEV_WORK_STATUS_WORKING else "空闲", "normalStatus": "故障" if v.get('status') == Const.DEV_WORK_STATUS_FAULT else "正常", "powerAlarmDetailList": [ { "powerCode": self.device.logicalCode, # 充电桩 "nodeCode": '{}-{}'.format(self.device.logicalCode, k), # 端口 "alarmType": "端口故障", "alarmContent": "端口故障", "alarmCreateDate": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') } ] if v.get('status') == Const.DEV_WORK_STATUS_FAULT else [] } ) data = {'powerNodeList': powerNodeList} logger.info( 'send to shanghai urban data collection platform url=< {} > source=< {} >'.format(url, json.dumps(data, ensure_ascii=False))) data = self.AES.encrypt(json.dumps(data)) res = self.post(url, data) logger.info(json.dumps(res, ensure_ascii=False)) return res def celery_push_heatbeat(self): res = self.push_heatbeat() if res['success'] == True: return else: if '不存在' in res.get('msg', ''): self.add_dev_info() self.update_dev_info() self.push_heatbeat() else: pass