123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- # coding=utf-8
- import datetime
- import json
- import logging
- import base64
- import hmac
- import time
- import uuid
- import typing
- from hashlib import sha256
- from urlparse import urljoin
- import requests
- from apps.web.api.zhejiang.constant import EventDeviceCategory
- from apps.web.api.zhejiang.models import ZheJiangFireFight
- from apps.web.api.zhejiang.exceptions import ZheJiangNotifierException
- from apps.web.common.models import District
- if typing.TYPE_CHECKING:
- from apps.web.device.models import DeviceDict, Part
- logger = logging.getLogger(__name__)
- def get_client_token(ak, sk, timestamp):
- """
- 获取客户端的token
- """
- # 时间戳为毫秒数
- timestamp *= 1000
- ak = str(ak)
- sk = str(sk)
- logger.info("[get_client_token] ak = {}-{}, sk = {}-{}, timestamp = {}-{}".format(type(ak), ak, type(sk), sk, type(timestamp), timestamp))
- # 获取认证字符串
- authStr = base64.b64encode("{}{}".format(ak, timestamp))
- # 获取认证摘要字符串 注意要是16进制的字符串
- auth = hmac.new(key=sk, msg=authStr, digestmod=sha256).hexdigest()
- # 获取clientToken
- clientToken = base64.b64encode("{}:{}:{}".format(auth, ak, timestamp))
- logger.info("[get_client_token] token = {}".format(clientToken))
- return clientToken
- def verify_client_token(clientToken, ak, sk):
- ak = str(ak)
- sk = str(sk)
- vAuth, vAk, vt = clientToken.split(":")
- if ak != vAk:
- return False
- authStr = base64.b64encode("{}{}".format(ak, vt))
- auth = base64.b64encode(hmac.new(key=sk, msg=authStr, digestmod=sha256).hexdigest())
- if auth != vAuth:
- return False
- # 时间戳的单位是毫秒
- if int(time.time() * 1000) > int(vt):
- return False
- return True
- def check_none_value(iterable):
- """
- 检查必传参数是否有空值 注意和0以及空串区分开
- """
- return all(filter(lambda x: x is None, iterable))
- class ZheJiangNotifier(object):
- URL = ""
- EXPIRE_TIME = 1000
- def __init__(self, ak, sk, pid, **kwargs):
- self._ak = ak
- self._sk = sk
- self._pid = pid
- if "url" in kwargs:
- self._baseUrl = kwargs["url"]
- else:
- self._baseUrl = self.URL
- if "expire" in kwargs:
- self._expire = kwargs["expire"]
- else:
- self._expire = self.EXPIRE_TIME
- def __str__(self):
- return "[ZheJiangNotifier]-<{}>".format(self._pid)
- def _get_header(self, **kwargs):
- """
- 附加请求头
- """
- timeStamp = kwargs.pop("timeStamp", None) or (int(time.time()) + self._expire)
- headers = {
- "Client-Token": get_client_token(self._ak, self._sk, timeStamp),
- "Content-Type": "application/json"
- }
- for _k, _v in kwargs.items():
- headers[_k] = _v
- return headers
- def _request(self, **kwargs):
- headers = kwargs.pop("headers", dict())
- headers = self._get_header(**headers)
- path = kwargs.pop("path")
- url = urljoin(self._baseUrl, path)
- # 找出操作数据的类型 1---修改或者新增 2----删除
- payload = {
- "opt_type": kwargs.pop("opt", 0),
- "lists": kwargs["lists"]
- }
- logger.info("[ZheJiangNotifier _request], notifier = {}, headers = {}, url = {}, push payload = {}".format(self, headers, url, json.dumps(payload, indent=4)))
- try:
- response = requests.post(url=url, headers=headers, json=payload, verify=False, timeout=10)
- except requests.Timeout:
- raise ZheJiangNotifierException(u"请求超时")
- return self._handle_response(response)
- def _handle_response(self, response):
- try:
- response.raise_for_status()
- except requests.RequestException as ree:
- raise ZheJiangNotifierException(ree.message)
- result = response.json()
- logger.info("[ZheJiangNotifier _handle_response] notifier = {} result = {}".format(self, json.dumps(result, indent=4)))
- # TODO 根据code的定义 有可能重新发出请求
- code = result["code"]
- return result
- def push_company(self, *company):
- """
- 推送公司信息 保持最小信息推送原则
- """
- lists = list()
- for _item in company:
- data = {
- "company_id": _item.pop("companyId", None),
- "company_name": _item.pop("companyName", None),
- "parent_id": self._pid,
- "company_code": _item.pop("companyCode", None),
- "address": _item.pop("address", None),
- "region_code": _item.pop("regionCode", None),
- "company_category": _item.pop("companyCategory", None),
- "company_type": _item.pop("companyType", None),
- "industry_type": _item.pop("industryType", None),
- "fire_manager": _item.pop("fireManager", None),
- "fire_manager_tel": _item.pop("fireManagerTel", None),
- "fire_liable": _item.pop("fireLiable", None),
- "fire_liable_tel": _item.pop("fireLiableTel", None),
- "create_time": _item.pop("createTime", None),
- "update_time": _item.pop("updateTime", None),
- }
- if not check_none_value(data.values()):
- raise ZheJiangNotifierException(u"推送单位参数错误")
- lists.append(data)
- logger.info("[ZheJiangNotifier push_company], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4)))
- response = self._request(opt=0, lists=lists, path="fire/company/update")
- return response
- def push_device(self, *device):
- """
- 推送设备的信息 推送设备类型固定为 小型充电桩(非汽车)
- """
- lists = list()
- for _item in device:
- data = {
- "device_id": _item.pop("deviceId"),
- "device_name": _item.pop("logicalCode"),
- "parentId": self._pid,
- "device_code": _item.pop("devNo"),
- "location": _item.pop("location"),
- "device_manufactory": _item.pop("creator"),
- "device_type": "44",
- "relation_type": "1",
- "relation_id": _item.pop("companyId"),
- "create_time": _item.pop("createTime"),
- "update_time": _item.pop("updateTime")
- }
- if not check_none_value(data.values()):
- raise ZheJiangNotifierException(u"推送设备参数错误")
- lists.append(data)
- logger.info("[ZheJiangNotifier push_device], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4)))
- response = self._request(opt=0, lists=lists, path="fire/device/update")
- return response
- def push_part(self, *part):
- """
- 推送部件的时候 推送部件类型固定为充电口
- """
- lists = list()
- for _item in part:
- data = {
- "part_id": _item.pop("partId"),
- "part_name": _item.pop("partName"),
- "parent_id": self._pid,
- "sensor_code": _item.pop("sensorCode"),
- "address": _item.pop("address"),
- "parts_type": "137",
- "relation_type": "1",
- "relation_id": _item.pop("companyId"),
- "device_id": _item.pop("deviceId"),
- "create_time": _item.pop("createTime"),
- "update_time": _item.pop("updateTime")
- }
- if not check_none_value(data.values()):
- raise ZheJiangNotifierException(u"推送部件参数错误")
- lists.append(data)
- logger.info("[ZheJiangNotifier push_part], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4)))
- response = self._request(opt=0, lists=lists, path="fire/part/update")
- return response
- def push_device_state(self, *state):
- """
- 推送设备的运行状态 应该是设备上线或者设备离线的时候使用
- """
- lists = list()
- for _item in state:
- data = {
- "event_id": _item.pop("eventId"),
- "device_category": int(_item.pop("deviceCategory")),
- "device_id": _item.pop("deviceId"),
- "parent_id": self._pid,
- "online_status": int(_item.pop("onlineStatus")),
- "work_status": int(_item.pop("workStatus")),
- "event_time": _item.pop("eventTime")
- }
- if not check_none_value(data.values()):
- raise ZheJiangNotifierException(u"推送设备运行状态错误")
- lists.append(data)
- logger.info("[ZheJiangNotifier push_device_state], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4)))
- response = self._request(opt=0, lists=lists, path="fire/devicestate/report")
- return response
- def push_fault(self, **kwargs):
- """
- 推送故障信息
- """
- data = {
- "event_id": kwargs.pop("eventId"),
- "device_category": int(kwargs.pop("deviceCategory")),
- "device_id": kwargs.pop("deviceId"),
- "parent_id": self._pid,
- "fault_type": int(kwargs.pop("faultType")),
- "event_time": kwargs.pop("eventTime")
- }
- logger.info("[ZheJiangNotifier push_fault], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4)))
- if not check_none_value(data.values()):
- raise ZheJiangNotifierException(u"推送设备故障状态错误,缺少参数")
- response = self._request(opt=0, lists=[data], path="fire/fault/report")
- return response
- def push_fault_handle(self, **kwargs):
- data = {
- "event_id": kwargs.pop("eventId"),
- "device_category": int(kwargs.pop("deviceCategory")),
- "device_id": kwargs.pop("deviceId"),
- "parent_id": self._pid,
- "fault_type": int(kwargs.pop("faultType")),
- "happen_time": kwargs.pop("happenTime"),
- "process_type": kwargs.pop("processType")
- }
- if data["process_type"] == "0":
- data.update({
- "fault_content": kwargs.pop("faultContent"),
- "reportperson_name": kwargs.pop("reportpersonName")
- })
- logger.info("[ZheJiangNotifier push_fault_handle], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4)))
- if not check_none_value(data.values()):
- raise ZheJiangNotifierException(u"推送设备信息故障操作信息失败,缺少参数")
- response = self._request(opt=0, lists=[data], path="fire/faultprocess/report")
- return response
- def push_alarm(self, **kwargs):
- """
- 推送火警信息
- """
- data = {
- "event_id": kwargs.pop("eventId"),
- "device_category": int(kwargs.pop("deviceCategory")),
- "device_id": kwargs.pop("deviceId"),
- "parent_id": self._pid,
- "alarm_type": kwargs.pop("alarmType"),
- "event_time": kwargs.pop("eventTime")
- }
- logger.info("[ZheJiangNotifier push_alarm], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4)))
- if not check_none_value(data.values()):
- raise ZheJiangNotifierException(u"推送设备火警状态错误,缺少参数")
- response = self._request(opt=0, lists=[data], path="fire/firealarm/report")
- return response
- def push_alarm_handle(self, **kwargs):
- data = {
- "event_id": kwargs.pop("eventId"),
- "device_category": int(kwargs.pop("deviceCategory")),
- "device_id": kwargs.pop("deviceId"),
- "parent_id": self._pid,
- "alarm_type": int(kwargs.pop("alarmType")),
- "check_time": kwargs.pop("checkTime"),
- "handle_time": kwargs.pop("handleTime"),
- "process_type": kwargs.pop("processType"),
- "handle_status": kwargs.pop("handleStatus")
- }
- logger.info("[ZheJiangNotifier push_alarm_handle], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4)))
- if not check_none_value(data.values()):
- raise ZheJiangNotifierException(u"推送设备火警信息故障操作信息失败,缺少参数")
- response = self._request(opt=0, lists=[data], path="fire/firealarmprocess/report")
- return response
- class ZheJiangNorther(object):
- def __init__(self, fight): # type:(ZheJiangFireFight) -> None
- self._firefight = fight
- self._notifier = ZheJiangNotifier(
- ak=fight.ak, sk=fight.sk, pid=fight.parentId, url=fight.url
- )
- def _report_company(self):
- self._notifier.push_company({
- "companyId": str(self._firefight.id),
- "companyName": self._firefight.companyName,
- "companyCode": self._firefight.companyCode,
- "address": self._firefight.address,
- "regionCode": self._firefight.regionCode,
- "companyCategory": self._firefight.companyCategory,
- "companyType": self._firefight.companyType,
- "industryType": self._firefight.industryType,
- "fireManager": self._firefight.fireManager,
- "fireManagerTel": self._firefight.fireManagerTel,
- "fireLiable": self._firefight.fireLiable,
- "fireLiableTel": self._firefight.fireLiableTel,
- "createTime": self._firefight.createTime.strftime("%Y-%m-%d %H:%M:%S"),
- "updateTime": self._firefight.updateTime.strftime("%Y-%m-%d %H:%M:%S")
- })
- def _report_devices(self):
- dealer = self._firefight.dealer
- devices = dealer.get_own_devices()
- dataList = list()
- dataStateList = list()
- # 逐条推送设备的信息
- for _dev in devices: # type: DeviceDict
- devObj = _dev.my_obj
- dataList.append({
- "deviceId": str(devObj.id),
- "logicalCode": _dev.logicalCode,
- "devNo": _dev.devNo,
- "location": District.get_district(devObj.districtId),
- "creator": devObj.mf,
- "companyId": str(self._firefight.id),
- "createTime": _dev.dateTimeBinded,
- "updateTime": _dev.lastRegisterTime
- })
- # 同时推送一次设备的状态
- dataStateList.append({
- "eventId": "".join(str(uuid.uuid4()).split("-")),
- "deviceCategory": EventDeviceCategory.DEVICE,
- "deviceId": str(devObj.id),
- "onlineStatus": 1,
- "workStatus": 1,
- "eventTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- })
- self._notifier.push_device(*dataList)
- self._notifier.push_device_state(*dataStateList)
- def _report_parts(self):
- dealer = self._firefight.dealer
- devices = dealer.get_own_devices()
- for _dev in devices: # type: DeviceDict
- _parts = _dev.parts
- partsList = list()
- for _part in _parts: # type: Part
- partsList.append({
- "partId": str(_part.id),
- "partName": _part.partName,
- "sensorCode": "",
- "address": "",
- "companyId": str(self._firefight.id),
- "deviceId": _dev.id,
- "createTime": _part.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S"),
- "updateTime": _part.dateTimeUpdated.strftime("%Y-%m-%d %H:%M:%S")
- })
- self._notifier.push_part(*partsList)
- def report(self):
- self._report_company()
- self._report_devices()
- self._report_parts()
|