utils.py 16 KB


  1. # coding=utf-8
  2. import datetime
  3. import json
  4. import logging
  5. import base64
  6. import hmac
  7. import time
  8. import uuid
  9. import typing
  10. from hashlib import sha256
  11. from urlparse import urljoin
  12. import requests
  13. from apps.web.api.zhejiang.constant import EventDeviceCategory
  14. from apps.web.api.zhejiang.models import ZheJiangFireFight
  15. from apps.web.api.zhejiang.exceptions import ZheJiangNotifierException
  16. from apps.web.common.models import District
  17. if typing.TYPE_CHECKING:
  18. from apps.web.device.models import DeviceDict, Part
  19. logger = logging.getLogger(__name__)
  20. def get_client_token(ak, sk, timestamp):
  21. """
  22. 获取客户端的token
  23. """
  24. # 时间戳为毫秒数
  25. timestamp *= 1000
  26. ak = str(ak)
  27. sk = str(sk)
  28. logger.info("[get_client_token] ak = {}-{}, sk = {}-{}, timestamp = {}-{}".format(type(ak), ak, type(sk), sk, type(timestamp), timestamp))
  29. # 获取认证字符串
  30. authStr = base64.b64encode("{}{}".format(ak, timestamp))
  31. # 获取认证摘要字符串 注意要是16进制的字符串
  32. auth = hmac.new(key=sk, msg=authStr, digestmod=sha256).hexdigest()
  33. # 获取clientToken
  34. clientToken = base64.b64encode("{}:{}:{}".format(auth, ak, timestamp))
  35. logger.info("[get_client_token] token = {}".format(clientToken))
  36. return clientToken
  37. def verify_client_token(clientToken, ak, sk):
  38. ak = str(ak)
  39. sk = str(sk)
  40. vAuth, vAk, vt = clientToken.split(":")
  41. if ak != vAk:
  42. return False
  43. authStr = base64.b64encode("{}{}".format(ak, vt))
  44. auth = base64.b64encode(hmac.new(key=sk, msg=authStr, digestmod=sha256).hexdigest())
  45. if auth != vAuth:
  46. return False
  47. # 时间戳的单位是毫秒
  48. if int(time.time() * 1000) > int(vt):
  49. return False
  50. return True
  51. def check_none_value(iterable):
  52. """
  53. 检查必传参数是否有空值 注意和0以及空串区分开
  54. """
  55. return all(filter(lambda x: x is None, iterable))
  56. class ZheJiangNotifier(object):
  57. URL = ""
  58. EXPIRE_TIME = 1000
  59. def __init__(self, ak, sk, pid, **kwargs):
  60. self._ak = ak
  61. self._sk = sk
  62. self._pid = pid
  63. if "url" in kwargs:
  64. self._baseUrl = kwargs["url"]
  65. else:
  66. self._baseUrl = self.URL
  67. if "expire" in kwargs:
  68. self._expire = kwargs["expire"]
  69. else:
  70. self._expire = self.EXPIRE_TIME
  71. def __str__(self):
  72. return "[ZheJiangNotifier]-<{}>".format(self._pid)
  73. def _get_header(self, **kwargs):
  74. """
  75. 附加请求头
  76. """
  77. timeStamp = kwargs.pop("timeStamp", None) or (int(time.time()) + self._expire)
  78. headers = {
  79. "Client-Token": get_client_token(self._ak, self._sk, timeStamp),
  80. "Content-Type": "application/json"
  81. }
  82. for _k, _v in kwargs.items():
  83. headers[_k] = _v
  84. return headers
  85. def _request(self, **kwargs):
  86. headers = kwargs.pop("headers", dict())
  87. headers = self._get_header(**headers)
  88. path = kwargs.pop("path")
  89. url = urljoin(self._baseUrl, path)
  90. # 找出操作数据的类型 1---修改或者新增 2----删除
  91. payload = {
  92. "opt_type": kwargs.pop("opt", 0),
  93. "lists": kwargs["lists"]
  94. }
  95. logger.info("[ZheJiangNotifier _request], notifier = {}, headers = {}, url = {}, push payload = {}".format(self, headers, url, json.dumps(payload, indent=4)))
  96. try:
  97. response = requests.post(url=url, headers=headers, json=payload, verify=False, timeout=10)
  98. except requests.Timeout:
  99. raise ZheJiangNotifierException(u"请求超时")
  100. return self._handle_response(response)
  101. def _handle_response(self, response):
  102. try:
  103. response.raise_for_status()
  104. except requests.RequestException as ree:
  105. raise ZheJiangNotifierException(ree.message)
  106. result = response.json()
  107. logger.info("[ZheJiangNotifier _handle_response] notifier = {} result = {}".format(self, json.dumps(result, indent=4)))
  108. # TODO 根据code的定义 有可能重新发出请求
  109. code = result["code"]
  110. return result
  111. def push_company(self, *company):
  112. """
  113. 推送公司信息 保持最小信息推送原则
  114. """
  115. lists = list()
  116. for _item in company:
  117. data = {
  118. "company_id": _item.pop("companyId", None),
  119. "company_name": _item.pop("companyName", None),
  120. "parent_id": self._pid,
  121. "company_code": _item.pop("companyCode", None),
  122. "address": _item.pop("address", None),
  123. "region_code": _item.pop("regionCode", None),
  124. "company_category": _item.pop("companyCategory", None),
  125. "company_type": _item.pop("companyType", None),
  126. "industry_type": _item.pop("industryType", None),
  127. "fire_manager": _item.pop("fireManager", None),
  128. "fire_manager_tel": _item.pop("fireManagerTel", None),
  129. "fire_liable": _item.pop("fireLiable", None),
  130. "fire_liable_tel": _item.pop("fireLiableTel", None),
  131. "create_time": _item.pop("createTime", None),
  132. "update_time": _item.pop("updateTime", None),
  133. }
  134. if not check_none_value(data.values()):
  135. raise ZheJiangNotifierException(u"推送单位参数错误")
  136. lists.append(data)
  137. logger.info("[ZheJiangNotifier push_company], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4)))
  138. response = self._request(opt=0, lists=lists, path="fire/company/update")
  139. return response
  140. def push_device(self, *device):
  141. """
  142. 推送设备的信息 推送设备类型固定为 小型充电桩(非汽车)
  143. """
  144. lists = list()
  145. for _item in device:
  146. data = {
  147. "device_id": _item.pop("deviceId"),
  148. "device_name": _item.pop("logicalCode"),
  149. "parentId": self._pid,
  150. "device_code": _item.pop("devNo"),
  151. "location": _item.pop("location"),
  152. "device_manufactory": _item.pop("creator"),
  153. "device_type": "44",
  154. "relation_type": "1",
  155. "relation_id": _item.pop("companyId"),
  156. "create_time": _item.pop("createTime"),
  157. "update_time": _item.pop("updateTime")
  158. }
  159. if not check_none_value(data.values()):
  160. raise ZheJiangNotifierException(u"推送设备参数错误")
  161. lists.append(data)
  162. logger.info("[ZheJiangNotifier push_device], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4)))
  163. response = self._request(opt=0, lists=lists, path="fire/device/update")
  164. return response
  165. def push_part(self, *part):
  166. """
  167. 推送部件的时候 推送部件类型固定为充电口
  168. """
  169. lists = list()
  170. for _item in part:
  171. data = {
  172. "part_id": _item.pop("partId"),
  173. "part_name": _item.pop("partName"),
  174. "parent_id": self._pid,
  175. "sensor_code": _item.pop("sensorCode"),
  176. "address": _item.pop("address"),
  177. "parts_type": "137",
  178. "relation_type": "1",
  179. "relation_id": _item.pop("companyId"),
  180. "device_id": _item.pop("deviceId"),
  181. "create_time": _item.pop("createTime"),
  182. "update_time": _item.pop("updateTime")
  183. }
  184. if not check_none_value(data.values()):
  185. raise ZheJiangNotifierException(u"推送部件参数错误")
  186. lists.append(data)
  187. logger.info("[ZheJiangNotifier push_part], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4)))
  188. response = self._request(opt=0, lists=lists, path="fire/part/update")
  189. return response
  190. def push_device_state(self, *state):
  191. """
  192. 推送设备的运行状态 应该是设备上线或者设备离线的时候使用
  193. """
  194. lists = list()
  195. for _item in state:
  196. data = {
  197. "event_id": _item.pop("eventId"),
  198. "device_category": int(_item.pop("deviceCategory")),
  199. "device_id": _item.pop("deviceId"),
  200. "parent_id": self._pid,
  201. "online_status": int(_item.pop("onlineStatus")),
  202. "work_status": int(_item.pop("workStatus")),
  203. "event_time": _item.pop("eventTime")
  204. }
  205. if not check_none_value(data.values()):
  206. raise ZheJiangNotifierException(u"推送设备运行状态错误")
  207. lists.append(data)
  208. logger.info("[ZheJiangNotifier push_device_state], notifier = {}, push data = {}".format(self, json.dumps(lists, indent=4)))
  209. response = self._request(opt=0, lists=lists, path="fire/devicestate/report")
  210. return response
  211. def push_fault(self, **kwargs):
  212. """
  213. 推送故障信息
  214. """
  215. data = {
  216. "event_id": kwargs.pop("eventId"),
  217. "device_category": int(kwargs.pop("deviceCategory")),
  218. "device_id": kwargs.pop("deviceId"),
  219. "parent_id": self._pid,
  220. "fault_type": int(kwargs.pop("faultType")),
  221. "event_time": kwargs.pop("eventTime")
  222. }
  223. logger.info("[ZheJiangNotifier push_fault], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4)))
  224. if not check_none_value(data.values()):
  225. raise ZheJiangNotifierException(u"推送设备故障状态错误,缺少参数")
  226. response = self._request(opt=0, lists=[data], path="fire/fault/report")
  227. return response
  228. def push_fault_handle(self, **kwargs):
  229. data = {
  230. "event_id": kwargs.pop("eventId"),
  231. "device_category": int(kwargs.pop("deviceCategory")),
  232. "device_id": kwargs.pop("deviceId"),
  233. "parent_id": self._pid,
  234. "fault_type": int(kwargs.pop("faultType")),
  235. "happen_time": kwargs.pop("happenTime"),
  236. "process_type": kwargs.pop("processType")
  237. }
  238. if data["process_type"] == "0":
  239. data.update({
  240. "fault_content": kwargs.pop("faultContent"),
  241. "reportperson_name": kwargs.pop("reportpersonName")
  242. })
  243. logger.info("[ZheJiangNotifier push_fault_handle], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4)))
  244. if not check_none_value(data.values()):
  245. raise ZheJiangNotifierException(u"推送设备信息故障操作信息失败,缺少参数")
  246. response = self._request(opt=0, lists=[data], path="fire/faultprocess/report")
  247. return response
  248. def push_alarm(self, **kwargs):
  249. """
  250. 推送火警信息
  251. """
  252. data = {
  253. "event_id": kwargs.pop("eventId"),
  254. "device_category": int(kwargs.pop("deviceCategory")),
  255. "device_id": kwargs.pop("deviceId"),
  256. "parent_id": self._pid,
  257. "alarm_type": kwargs.pop("alarmType"),
  258. "event_time": kwargs.pop("eventTime")
  259. }
  260. logger.info("[ZheJiangNotifier push_alarm], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4)))
  261. if not check_none_value(data.values()):
  262. raise ZheJiangNotifierException(u"推送设备火警状态错误,缺少参数")
  263. response = self._request(opt=0, lists=[data], path="fire/firealarm/report")
  264. return response
  265. def push_alarm_handle(self, **kwargs):
  266. data = {
  267. "event_id": kwargs.pop("eventId"),
  268. "device_category": int(kwargs.pop("deviceCategory")),
  269. "device_id": kwargs.pop("deviceId"),
  270. "parent_id": self._pid,
  271. "alarm_type": int(kwargs.pop("alarmType")),
  272. "check_time": kwargs.pop("checkTime"),
  273. "handle_time": kwargs.pop("handleTime"),
  274. "process_type": kwargs.pop("processType"),
  275. "handle_status": kwargs.pop("handleStatus")
  276. }
  277. logger.info("[ZheJiangNotifier push_alarm_handle], notifier = {}, push data = {}".format(self, json.dumps(data, indent=4)))
  278. if not check_none_value(data.values()):
  279. raise ZheJiangNotifierException(u"推送设备火警信息故障操作信息失败,缺少参数")
  280. response = self._request(opt=0, lists=[data], path="fire/firealarmprocess/report")
  281. return response
  282. class ZheJiangNorther(object):
  283. def __init__(self, fight): # type:(ZheJiangFireFight) -> None
  284. self._firefight = fight
  285. self._notifier = ZheJiangNotifier(
  286. ak=fight.ak, sk=fight.sk, pid=fight.parentId, url=fight.url
  287. )
  288. def _report_company(self):
  289. self._notifier.push_company({
  290. "companyId": str(self._firefight.id),
  291. "companyName": self._firefight.companyName,
  292. "companyCode": self._firefight.companyCode,
  293. "address": self._firefight.address,
  294. "regionCode": self._firefight.regionCode,
  295. "companyCategory": self._firefight.companyCategory,
  296. "companyType": self._firefight.companyType,
  297. "industryType": self._firefight.industryType,
  298. "fireManager": self._firefight.fireManager,
  299. "fireManagerTel": self._firefight.fireManagerTel,
  300. "fireLiable": self._firefight.fireLiable,
  301. "fireLiableTel": self._firefight.fireLiableTel,
  302. "createTime": self._firefight.createTime.strftime("%Y-%m-%d %H:%M:%S"),
  303. "updateTime": self._firefight.updateTime.strftime("%Y-%m-%d %H:%M:%S")
  304. })
  305. def _report_devices(self):
  306. dealer = self._firefight.dealer
  307. devices = dealer.get_own_devices()
  308. dataList = list()
  309. dataStateList = list()
  310. # 逐条推送设备的信息
  311. for _dev in devices: # type: DeviceDict
  312. devObj = _dev.my_obj
  313. dataList.append({
  314. "deviceId": str(devObj.id),
  315. "logicalCode": _dev.logicalCode,
  316. "devNo": _dev.devNo,
  317. "location": District.get_district(devObj.districtId),
  318. "creator": devObj.mf,
  319. "companyId": str(self._firefight.id),
  320. "createTime": _dev.dateTimeBinded,
  321. "updateTime": _dev.lastRegisterTime
  322. })
  323. # 同时推送一次设备的状态
  324. dataStateList.append({
  325. "eventId": "".join(str(uuid.uuid4()).split("-")),
  326. "deviceCategory": EventDeviceCategory.DEVICE,
  327. "deviceId": str(devObj.id),
  328. "onlineStatus": 1,
  329. "workStatus": 1,
  330. "eventTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  331. })
  332. self._notifier.push_device(*dataList)
  333. self._notifier.push_device_state(*dataStateList)
  334. def _report_parts(self):
  335. dealer = self._firefight.dealer
  336. devices = dealer.get_own_devices()
  337. for _dev in devices: # type: DeviceDict
  338. _parts = _dev.parts
  339. partsList = list()
  340. for _part in _parts: # type: Part
  341. partsList.append({
  342. "partId": str(_part.id),
  343. "partName": _part.partName,
  344. "sensorCode": "",
  345. "address": "",
  346. "companyId": str(self._firefight.id),
  347. "deviceId": _dev.id,
  348. "createTime": _part.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S"),
  349. "updateTime": _part.dateTimeUpdated.strftime("%Y-%m-%d %H:%M:%S")
  350. })
  351. self._notifier.push_part(*partsList)
  352. def report(self):
  353. self._report_company()
  354. self._report_devices()
  355. self._report_parts()