# coding=utf-8 import datetime import json import logging import base64 import hmac import time import uuid import typing from hashlib import sha256 from urlparse import urljoin import requests from apps.web.api.zhejiang.constant import EventDeviceCategory from apps.web.api.zhejiang.models import ZheJiangFireFight from apps.web.api.zhejiang.exceptions import ZheJiangNotifierException from apps.web.common.models import District if typing.TYPE_CHECKING: from apps.web.device.models import DeviceDict, Part logger = logging.getLogger(__name__) def get_client_token(ak, sk, timestamp): """ 获取客户端的token """ # 时间戳为毫秒数 timestamp *= 1000 ak = str(ak) sk = str(sk) logger.info("[get_client_token] ak = {}-{}, sk = {}-{}, timestamp = {}-{}".format(type(ak), ak, type(sk), sk, type(timestamp), timestamp)) # 获取认证字符串 authStr = base64.b64encode("{}{}".format(ak, timestamp)) # 获取认证摘要字符串 注意要是16进制的字符串 auth = hmac.new(key=sk, msg=authStr, digestmod=sha256).hexdigest() # 获取clientToken clientToken = base64.b64encode("{}:{}:{}".format(auth, ak, timestamp)) logger.info("[get_client_token] token = {}".format(clientToken)) return clientToken def verify_client_token(clientToken, ak, sk): ak = str(ak) sk = str(sk) vAuth, vAk, vt = clientToken.split(":") if ak != vAk: return False authStr = base64.b64encode("{}{}".format(ak, vt)) auth = base64.b64encode(hmac.new(key=sk, msg=authStr, digestmod=sha256).hexdigest()) if auth != vAuth: return False # 时间戳的单位是毫秒 if int(time.time() * 1000) > int(vt): return False return True def check_none_value(iterable): """ 检查必传参数是否有空值 注意和0以及空串区分开 """ return all(filter(lambda x: x is None, iterable)) class ZheJiangNotifier(object): URL = "" EXPIRE_TIME = 1000 def __init__(self, ak, sk, pid, **kwargs): self._ak = ak self._sk = sk self._pid = pid if "url" in kwargs: self._baseUrl = kwargs["url"] else: self._baseUrl = self.URL if "expire" in kwargs: self._expire = kwargs["expire"] else: self._expire = self.EXPIRE_TIME def __str__(self): return "[ZheJiangNotifier]-<{}>".format(self._pid) def _get_header(self, **kwargs): """ 附加请求头 """ timeStamp = kwargs.pop("timeStamp", None) or (int(time.time()) + self._expire) headers = { "Client-Token": get_client_token(self._ak, self._sk, timeStamp), "Content-Type": "application/json" } for _k, _v in kwargs.items(): headers[_k] = _v return headers def _request(self, **kwargs): headers = kwargs.pop("headers", dict()) headers = self._get_header(**headers) path = kwargs.pop("path") url = urljoin(self._baseUrl, path) # 找出操作数据的类型 1---修改或者新增 2----删除 payload = { "opt_type": kwargs.pop("opt", 0), "lists": kwargs["lists"] } logger.info("[ZheJiangNotifier _request], notifier = {}, headers = {}, url = {}, push payload = {}".format(self, headers, url, json.dumps(payload, indent=4))) try: response = requests.post(url=url, headers=headers, json=payload, verify=False, timeout=10) except requests.Timeout: raise ZheJiangNotifierException(u"请求超时") return self._handle_response(response) def _handle_response(self, response): try: response.raise_for_status() except requests.RequestException as ree: raise ZheJiangNotifierException(ree.message) result = response.json() logger.info("[ZheJiangNotifier _handle_response] notifier = {} result = {}".format(self, json.dumps(result, indent=4))) # TODO 根据code的定义 有可能重新发出请求 code = result["code"] return result def push_company(self, *company): """ 推送公司信息 保持最小信息推送原则 """ lists = list() for _item in company: data = { "company_id": _item.pop("companyId", None), "company_name": _item.pop("companyName", None), "parent_id": self._pid, "company_code": _item.pop("companyCode", None), "address": _item.pop("address", None), "region_code": _item.pop("regionCode", None), "company_category": _item.pop("companyCategory", None), "company_type": _item.pop("companyType", None), "industry_type": _item.pop("industryType", None), "fire_manager": _item.pop("fireManager", None), "fire_manager_tel": _item.pop("fireManagerTel", None), "fire_liable": _item.pop("fireLiable", None), "fire_liable_tel": _item.pop("fireLiableTel", None), "create_time": _item.pop("createTime", None), "update_time": _item.pop("updateTime", None), } if not check_none_value(data.values()): raise ZheJiangNotifierException(u"推送单位参数错误") lists.append(data) logger.info("[ZheJiangNotifier push_company], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4))) response = self._request(opt=0, lists=lists, path="fire/company/update") return response def push_device(self, *device): """ 推送设备的信息 推送设备类型固定为 小型充电桩(非汽车) """ lists = list() for _item in device: data = { "device_id": _item.pop("deviceId"), "device_name": _item.pop("logicalCode"), "parentId": self._pid, "device_code": _item.pop("devNo"), "location": _item.pop("location"), "device_manufactory": _item.pop("creator"), "device_type": "44", "relation_type": "1", "relation_id": _item.pop("companyId"), "create_time": _item.pop("createTime"), "update_time": _item.pop("updateTime") } if not check_none_value(data.values()): raise ZheJiangNotifierException(u"推送设备参数错误") lists.append(data) logger.info("[ZheJiangNotifier push_device], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4))) response = self._request(opt=0, lists=lists, path="fire/device/update") return response def push_part(self, *part): """ 推送部件的时候 推送部件类型固定为充电口 """ lists = list() for _item in part: data = { "part_id": _item.pop("partId"), "part_name": _item.pop("partName"), "parent_id": self._pid, "sensor_code": _item.pop("sensorCode"), "address": _item.pop("address"), "parts_type": "137", "relation_type": "1", "relation_id": _item.pop("companyId"), "device_id": _item.pop("deviceId"), "create_time": _item.pop("createTime"), "update_time": _item.pop("updateTime") } if not check_none_value(data.values()): raise ZheJiangNotifierException(u"推送部件参数错误") lists.append(data) logger.info("[ZheJiangNotifier push_part], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4))) response = self._request(opt=0, lists=lists, path="fire/part/update") return response def push_device_state(self, *state): """ 推送设备的运行状态 应该是设备上线或者设备离线的时候使用 """ lists = list() for _item in state: data = { "event_id": _item.pop("eventId"), "device_category": int(_item.pop("deviceCategory")), "device_id": _item.pop("deviceId"), "parent_id": self._pid, "online_status": int(_item.pop("onlineStatus")), "work_status": int(_item.pop("workStatus")), "event_time": _item.pop("eventTime") } if not check_none_value(data.values()): raise ZheJiangNotifierException(u"推送设备运行状态错误") lists.append(data) logger.info("[ZheJiangNotifier push_device_state], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4))) response = self._request(opt=0, lists=lists, path="fire/devicestate/report") return response def push_fault(self, **kwargs): """ 推送故障信息 """ data = { "event_id": kwargs.pop("eventId"), "device_category": int(kwargs.pop("deviceCategory")), "device_id": kwargs.pop("deviceId"), "parent_id": self._pid, "fault_type": int(kwargs.pop("faultType")), "event_time": kwargs.pop("eventTime") } logger.info("[ZheJiangNotifier push_fault], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4))) if not check_none_value(data.values()): raise ZheJiangNotifierException(u"推送设备故障状态错误,缺少参数") response = self._request(opt=0, lists=[data], path="fire/fault/report") return response def push_fault_handle(self, **kwargs): data = { "event_id": kwargs.pop("eventId"), "device_category": int(kwargs.pop("deviceCategory")), "device_id": kwargs.pop("deviceId"), "parent_id": self._pid, "fault_type": int(kwargs.pop("faultType")), "happen_time": kwargs.pop("happenTime"), "process_type": kwargs.pop("processType") } if data["process_type"] == "0": data.update({ "fault_content": kwargs.pop("faultContent"), "reportperson_name": kwargs.pop("reportpersonName") }) logger.info("[ZheJiangNotifier push_fault_handle], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4))) if not check_none_value(data.values()): raise ZheJiangNotifierException(u"推送设备信息故障操作信息失败,缺少参数") response = self._request(opt=0, lists=[data], path="fire/faultprocess/report") return response def push_alarm(self, **kwargs): """ 推送火警信息 """ data = { "event_id": kwargs.pop("eventId"), "device_category": int(kwargs.pop("deviceCategory")), "device_id": kwargs.pop("deviceId"), "parent_id": self._pid, "alarm_type": kwargs.pop("alarmType"), "event_time": kwargs.pop("eventTime") } logger.info("[ZheJiangNotifier push_alarm], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4))) if not check_none_value(data.values()): raise ZheJiangNotifierException(u"推送设备火警状态错误,缺少参数") response = self._request(opt=0, lists=[data], path="fire/firealarm/report") return response def push_alarm_handle(self, **kwargs): data = { "event_id": kwargs.pop("eventId"), "device_category": int(kwargs.pop("deviceCategory")), "device_id": kwargs.pop("deviceId"), "parent_id": self._pid, "alarm_type": int(kwargs.pop("alarmType")), "check_time": kwargs.pop("checkTime"), "handle_time": kwargs.pop("handleTime"), "process_type": kwargs.pop("processType"), "handle_status": kwargs.pop("handleStatus") } logger.info("[ZheJiangNotifier push_alarm_handle], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4))) if not check_none_value(data.values()): raise ZheJiangNotifierException(u"推送设备火警信息故障操作信息失败,缺少参数") response = self._request(opt=0, lists=[data], path="fire/firealarmprocess/report") return response class ZheJiangNorther(object): def __init__(self, fight): # type:(ZheJiangFireFight) -> None self._firefight = fight self._notifier = ZheJiangNotifier( ak=fight.ak, sk=fight.sk, pid=fight.parentId, url=fight.url ) def _report_company(self): self._notifier.push_company({ "companyId": str(self._firefight.id), "companyName": self._firefight.companyName, "companyCode": self._firefight.companyCode, "address": self._firefight.address, "regionCode": self._firefight.regionCode, "companyCategory": self._firefight.companyCategory, "companyType": self._firefight.companyType, "industryType": self._firefight.industryType, "fireManager": self._firefight.fireManager, "fireManagerTel": self._firefight.fireManagerTel, "fireLiable": self._firefight.fireLiable, "fireLiableTel": self._firefight.fireLiableTel, "createTime": self._firefight.createTime.strftime("%Y-%m-%d %H:%M:%S"), "updateTime": self._firefight.updateTime.strftime("%Y-%m-%d %H:%M:%S") }) def _report_devices(self): dealer = self._firefight.dealer devices = dealer.get_own_devices() dataList = list() dataStateList = list() # 逐条推送设备的信息 for _dev in devices: # type: DeviceDict devObj = _dev.my_obj dataList.append({ "deviceId": str(devObj.id), "logicalCode": _dev.logicalCode, "devNo": _dev.devNo, "location": District.get_district(devObj.districtId), "creator": devObj.mf, "companyId": str(self._firefight.id), "createTime": devObj.dateTimeAdded, "updateTime": devObj.dateTimeUpdated.strftime("%Y-%m-%d %H:%M:%S") }) # 同时推送一次设备的状态 dataStateList.append({ "eventId": "".join(str(uuid.uuid4()).split("-")), "deviceCategory": EventDeviceCategory.DEVICE, "deviceId": str(devObj.id), "onlineStatus": 1, "workStatus": 1, "eventTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) self._notifier.push_device(*dataList) self._notifier.push_device_state(*dataStateList) def _report_parts(self): dealer = self._firefight.dealer devices = dealer.get_own_devices() for _dev in devices: # type: DeviceDict _parts = _dev.parts partsList = list() for _part in _parts: # type: Part partsList.append({ "partId": str(_part.id), "partName": _part.partName, "sensorCode": "", "address": "", "companyId": str(self._firefight.id), "deviceId": _dev.id, "createTime": _part.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S"), "updateTime": _part.dateTimeUpdated.strftime("%Y-%m-%d %H:%M:%S") }) self._notifier.push_part(*partsList) def report(self): self._report_company() self._report_devices() self._report_parts()