timescale.py 17 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import logging
  5. import socket
  6. import arrow
  7. import simplejson as json
  8. from django.conf import settings
  9. from influxdb import InfluxDBClient
  10. from influxdb.resultset import ResultSet
  11. from django.core.cache import cache
  12. from apilib.utils_datetime import local2utc
  13. from apps.web.core.sysparas import SysParas
  14. logger = logging.getLogger(__name__)
  15. class FluentedEngine(object):
  16. def in_signal_udp(self, devNo, ts, signal, cmd):
  17. logger.debug(
  18. '[in_signal_udp] devNo = {}, ts = {}, signal = {}, cmd = {}, ip = {}'.format(
  19. devNo, ts, signal, cmd, SysParas.get_private_ip(settings.FLUENTED_IP)))
  20. ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_SIGNAL_PORT)
  21. point = {
  22. 'devno': devNo,
  23. 'time': ts,
  24. 'signal': signal,
  25. 'cmd': str(cmd)
  26. }
  27. client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  28. client.sendto(json.dumps(point), ip_port)
  29. def in_power_udp(self, devNo, port, ts, power, voltage, current):
  30. logger.debug(
  31. '[in_power_udp] devNo = {}, port = {}, ts = {}, power = {}, voltage = {}, current = {}, ip = {}'.format(
  32. devNo, port, ts, power, voltage, current, SysParas.get_private_ip(settings.FLUENTED_IP)))
  33. ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_POWER_PORT)
  34. point = {
  35. 'time': ts,
  36. 'devno': devNo,
  37. 'port': str(port),
  38. 'power2': float(power)
  39. }
  40. if voltage is not None:
  41. point.update({'voltage2': float(voltage)})
  42. if current is not None:
  43. point.update({'current2': float(current)})
  44. client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  45. client.sendto(json.dumps(point), ip_port)
  46. def in_put_coins_udp(self, devNo, ts, coins, mode, port = None):
  47. logger.debug(
  48. '[in_put_coins_udp] devNo = {}, ts = {}, coins = {}, mode = {}, port = {}, ip = {}'.format(
  49. devNo, ts, coins, mode, port, SysParas.get_private_ip(settings.FLUENTED_IP)))
  50. ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_OFFLINE_PORT)
  51. point = {
  52. 'time': ts,
  53. 'devno': devNo,
  54. 'coins': int(coins),
  55. 'mode': mode
  56. }
  57. if port:
  58. point.update({'port': str(port)})
  59. client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  60. client.sendto(json.dumps(point), ip_port)
  61. class InfluxDBEngine(object):
  62. def __init__(self, dbName, measurement, retention_policy = None):
  63. self.client = InfluxDBClient(
  64. host = settings.INFLUXDB_IP,
  65. port = settings.INFLUXDB_PORT,
  66. username = settings.INFLUXDB_USER,
  67. password = settings.INFLUXDB_PWD,
  68. database = dbName,
  69. timeout = 5)
  70. self.database = dbName
  71. self.retention_policy = retention_policy
  72. self.measurement = measurement
  73. @property
  74. def full_qualify_meaurement(self):
  75. if not self.retention_policy:
  76. return '"{}"'.format(self.measurement)
  77. else:
  78. return '"{}"."{}"'.format(self.retention_policy, self.measurement)
  79. def read_data(self, sqlStr):
  80. try:
  81. rs = self.client.query(sqlStr) # type: ResultSet
  82. return [point for point in rs.get_points()]
  83. except Exception, e:
  84. logger.exception('append_power,e=%s,str=%s' % (e, sqlStr))
  85. return None
  86. class SignalManager(object):
  87. inst = None
  88. @staticmethod
  89. def instence():
  90. if SignalManager.inst is not None:
  91. return SignalManager.inst
  92. enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'signal') # type: InfluxDBEngine
  93. SignalManager.inst = SignalManager(enginer)
  94. return SignalManager.inst
  95. def __init__(self, enginer):
  96. self.enginer = enginer # type: InfluxDBEngine
  97. def get(self, devNo, sTime = None, eTime = None):
  98. # type: (str, datetime, datetime)->dict
  99. sTime = local2utc(sTime) if sTime else None
  100. eTime = local2utc(eTime) if eTime else None
  101. if sTime and eTime:
  102. selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time >= '%s' and time <= '%s' order by time" % (
  103. devNo, sTime, eTime)
  104. elif sTime:
  105. selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time >= '%s' order by time" % (
  106. devNo, sTime)
  107. elif eTime:
  108. selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time <= '%s' order by time" % (
  109. devNo, eTime)
  110. else:
  111. selectStr = "select time,signal,usage,cmd from signal where devno = '%s' order by time" % (devNo)
  112. points = self.enginer.read_data(selectStr)
  113. if not points:
  114. return []
  115. resultList = []
  116. for point in points:
  117. resultList.append(
  118. {
  119. 'time': arrow.get(point['time']).to(settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
  120. 'signal': point['signal'],
  121. 'usage': point['usage'],
  122. 'cmd': point['cmd']
  123. })
  124. return resultList
  125. class PowerManager(object):
  126. inst = None
  127. @staticmethod
  128. def instence():
  129. if PowerManager.inst is not None:
  130. return PowerManager.inst
  131. enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'powers') # type: InfluxDBEngine
  132. PowerManager.inst = PowerManager(enginer)
  133. return PowerManager.inst
  134. def __init__(self, enginer):
  135. self.enginer = enginer
  136. def get(self, devNo, port, sTime, eTime, interval = 300):
  137. def format_date(dateStr):
  138. return arrow.get(str(dateStr)).to(settings.TIME_ZONE).naive
  139. def format_point(point):
  140. rv = {
  141. 'time': format_date(point['time'])
  142. }
  143. if 'power2' in point and point['power2'] is not None:
  144. rv['power'] = round(point['power2'], 2)
  145. else:
  146. rv['power'] = round(point['power'], 2)
  147. if 'voltage2' in point and point['voltage2'] is not None and point['voltage2'] != '-':
  148. rv['voltage'] = round(point['voltage2'], 2)
  149. elif 'voltage' in point and point['voltage'] is not None and point['voltage'] != '-':
  150. rv['voltage'] = round(point['voltage'], 2)
  151. else:
  152. rv['voltage'] = '-'
  153. if 'current2' in point and point['current2'] is not None and point['current2'] != '-':
  154. rv['current'] = round(point['current2'], 2)
  155. elif 'current' in point and point['current'] is not None and point['current'] != '-':
  156. rv['current'] = round(point['current'], 2)
  157. else:
  158. rv['current'] = '-'
  159. return rv
  160. query_start_time = sTime - datetime.timedelta(seconds = interval)
  161. query_end_time = eTime + datetime.timedelta(seconds = interval)
  162. selectStr = "select time, power, power2, voltage, voltage2, current, current2 from %s where devno = '%s' and port = '%s' and time >= '%s' and time <= '%s' order by time" % (
  163. self.enginer.full_qualify_meaurement, devNo, port, local2utc(query_start_time), local2utc(query_end_time))
  164. points = self.enginer.read_data(selectStr)
  165. if points is None:
  166. raise Exception(u'查询数据失败')
  167. total = len(points)
  168. if total == 0:
  169. return []
  170. import pandas as pd
  171. df = pd.DataFrame(points)
  172. min_time = format_date(df['time'].min())
  173. max_time = format_date(df['time'].max())
  174. rv = []
  175. first_section = None
  176. if min_time > sTime:
  177. left_rv = []
  178. next_time = min_time
  179. while True:
  180. next_time = (next_time - datetime.timedelta(seconds = interval))
  181. left_rv.append({
  182. 'time': next_time,
  183. 'power': '-',
  184. 'voltage': '-',
  185. 'current': '-'
  186. })
  187. if next_time < sTime:
  188. break
  189. if len(left_rv) > 0:
  190. left_rv.reverse()
  191. rv.extend(left_rv)
  192. middle_section = None
  193. if max_time > min_time:
  194. # now_point =
  195. now = format_point(points[0])
  196. # now = {
  197. # 'time': format_date(now_point['time']),
  198. # 'power': now_point['power'],
  199. # 'voltage': format_field(now_point.get('voltage', '-')),
  200. # 'current': format_field(now_point.get('current', '-'))
  201. # }
  202. rv.append(now)
  203. idx = 1
  204. # right_point = points[idx]
  205. right = format_point(points[idx])
  206. # {
  207. # 'time': format_date(right_point['time']),
  208. # 'power': right_point['power'],
  209. # 'voltage': format_field(right_point.get('voltage', '-')),
  210. # 'current': format_field(right_point.get('current', '-'))
  211. # }
  212. total = len(points)
  213. while True:
  214. diff = (right['time'] - now['time']).total_seconds()
  215. if diff < (interval + interval / 2):
  216. rv.append(right)
  217. now = right
  218. idx = idx + 1
  219. if idx >= total:
  220. break
  221. else:
  222. right = format_point(points[idx])
  223. # {
  224. # 'time': format_date(points[idx]['time']),
  225. # 'power': points[idx]['power'],
  226. # 'voltage': format_field(points[idx].get('voltage', '-')),
  227. # 'current': format_field(points[idx].get('current', '-'))
  228. # }
  229. # print('after: now = {}; right = {}'.format(now, right))
  230. else:
  231. if diff > 2 * interval:
  232. # print('before: now = {}; right = {}'.format(now, right))
  233. last_now = now
  234. now = {
  235. 'time': (last_now['time'] + datetime.timedelta(seconds = interval)),
  236. 'power': '-',
  237. 'voltage': '-',
  238. 'current': '-'
  239. }
  240. rv.append(now)
  241. # print('after: now = {}; right = {}'.format(now, right))
  242. else:
  243. # print('before: now = {}; right = {}'.format(now, right))
  244. rv.append({
  245. 'time': (now['time'] + datetime.timedelta(seconds = diff / 2)),
  246. 'power': now['power'],
  247. 'voltage': now['voltage'],
  248. 'current': now['current']
  249. })
  250. rv.append(right)
  251. now = right
  252. idx = idx + 1
  253. if idx >= total:
  254. break
  255. else:
  256. right = format_point(points[idx])
  257. # print('after: now = {}; right = {}'.format(now, right))
  258. if eTime > max_time:
  259. right_rv = []
  260. next_time = max_time
  261. while True:
  262. next_time = (next_time + datetime.timedelta(seconds = interval))
  263. right_rv.append({
  264. 'time': next_time,
  265. 'power': '-',
  266. 'voltage': '-',
  267. 'current': '-'
  268. })
  269. if next_time >= eTime:
  270. break
  271. if len(right_rv) > 0:
  272. rv.extend(right_rv)
  273. return rv
  274. def get_raw(self, devNo, port, sTime, eTime):
  275. def format_date(dateStr):
  276. return arrow.get(str(dateStr)).to(settings.TIME_ZONE).naive
  277. selectStr = "select time, power, power2, voltage, voltage2, current, current2 from %s where devno = '%s' and port = '%s' and time >= '%s' and time <= '%s' order by time" % (
  278. self.enginer.full_qualify_meaurement, devNo, port, local2utc(sTime), local2utc(eTime))
  279. points = self.enginer.read_data(selectStr)
  280. rv = []
  281. for item in points:
  282. _time = item.pop('time')
  283. item['time'] = format_date(_time)
  284. rv.append(item)
  285. return rv
  286. class OfflineCoinsManager(object):
  287. inst = None
  288. @staticmethod
  289. def instence():
  290. if OfflineCoinsManager.inst is not None:
  291. return OfflineCoinsManager.inst
  292. enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'offline_coins') # type: InfluxDBEngine
  293. OfflineCoinsManager.inst = OfflineCoinsManager(enginer)
  294. return OfflineCoinsManager.inst
  295. def __init__(self, enginer):
  296. self.enginer = enginer # type: InfluxDBEngine
  297. def get(self, devNo, sTime, eTime):
  298. # type: (str, datetime, datetime)->list
  299. sTime = local2utc(sTime)
  300. eTime = local2utc(eTime)
  301. selectStr = "select time,port,coins,mode from offline_coins where devno = '%s' and time >= '%s' and time <= '%s' order by time" % (
  302. devNo, sTime, eTime)
  303. points = self.enginer.read_data(selectStr)
  304. if not points:
  305. return []
  306. rv = []
  307. for point in points:
  308. item = {
  309. 'dateTimeAdded': arrow.get(point['time']).to(settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
  310. 'count': int(point['coins'])
  311. }
  312. if point['port']:
  313. item.update({'port': point['port']})
  314. rv.append(item)
  315. return rv
  316. class OfflineManager(object):
  317. """
  318. 离线设备消息通知
  319. """
  320. _CACHE = cache
  321. @staticmethod
  322. def cache_key(devNo):
  323. return "celery_device_offline_{}".format(devNo)
  324. @classmethod
  325. def add_cache(cls, devNo, tid):
  326. cls._CACHE.set(cls.cache_key(devNo), tid)
  327. @classmethod
  328. def delete_cache(cls, devNo):
  329. cls._CACHE.delete(cls.cache_key(devNo))
  330. @classmethod
  331. def get_cache(cls, devNo):
  332. return cls._CACHE.get(cls.cache_key(devNo))
  333. @staticmethod
  334. def prepare_registe_check(devNo):
  335. from apps.web.device.models import Device
  336. from apps.web.dealer.models import Dealer
  337. dev = Device.get_dev(devNo)
  338. if not dev:
  339. logger.error("[OFFLINE_DEVICE_TASK] registe not dev, devNo is <{}>".format(devNo))
  340. return 0
  341. dealer = Dealer.get_dealer(dev.ownerId)
  342. if not dealer:
  343. logger.error("[OFFLINE_DEVICE_TASK] registe not dealer, devNo is <{}>".format(devNo))
  344. return 0
  345. # 没有打开开关的经销商直接忽略
  346. if not dealer.get("offlineNotifySwitch", False):
  347. logger.error("[OFFLINE_DEVICE_TASK] register not dealer switch! devNo is <{}>".format(devNo))
  348. return 0
  349. offlineNotifyTime = int(dealer.get("offlineNotifyTime", 1) or 1)
  350. return offlineNotifyTime
  351. @classmethod
  352. def registe_device_offline_notify(cls, devNo, check = True):
  353. """
  354. 注册设备的离线通知
  355. :param devNo: 设备编号
  356. :param check: 是否无条件塞入队列
  357. :return:
  358. """
  359. # 发现任务已经被提交 退出不在注册通知任务
  360. if cls.get_cache(devNo):
  361. logger.error("[OFFLINE_DEVICE_TASK] notify has beed registe! devNo is <{}>".format(devNo))
  362. return
  363. # celery 异步注册任务 默认小时之后发送通知给经销商
  364. if check:
  365. offlineNotifyTime = cls.prepare_registe_check(devNo)
  366. else:
  367. offlineNotifyTime = 1
  368. if not offlineNotifyTime:
  369. return
  370. from taskmanager.mediator import task_caller
  371. result = task_caller(
  372. "device_offline_notify",
  373. delay = offlineNotifyTime * 3600,
  374. devNo = devNo
  375. )
  376. if not result:
  377. logger.error("[OFFLINE_DEVICE_TASK] task caller not return a async result! devNo is <{}>".format(devNo))
  378. return
  379. tid = str(result)
  380. logger.info("[OFFLINE_DEVICE_TASK] registe a task, devNo <{}>, tid <{}>".format(devNo, tid))
  381. cls.add_cache(devNo, tid)
  382. @classmethod
  383. def discard_device_offline_notify(cls, devNo):
  384. """
  385. 取消设备离线通知
  386. :param devNo:
  387. :return:
  388. """
  389. # 查看任务是否已经被注册 没有注册通知的就不进行处理
  390. tid = cls.get_cache(devNo)
  391. if not tid:
  392. logger.info("[OFFLINE_DEVICE_TASK] not discard a task, devNo <{}>, taskId <{}>".format(devNo, tid))
  393. return
  394. from taskmanager.operator import app as celeryApp
  395. celeryApp.control.revoke(tid)
  396. cls.delete_cache(devNo)
  397. logger.info("[OFFLINE_DEVICE_TASK] discard a task done, devNo <{}>, tid <{}>".format(devNo, tid))