timescale.py 18 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import logging
  5. import socket
  6. from decimal import Decimal
  7. import arrow
  8. import simplejson as json
  9. from django.conf import settings
  10. from influxdb import InfluxDBClient
  11. from influxdb.resultset import ResultSet
  12. from django.core.cache import cache
  13. from apilib.numerics import quantize
  14. from apilib.utils_datetime import local2utc
  15. from apps.web.core.sysparas import SysParas
  16. logger = logging.getLogger(__name__)
  17. class FluentedEngine(object):
  18. def in_signal_udp(self, devNo, ts, signal, cmd):
  19. import time
  20. if ts < (int(time.time()) - 3 * 24 * 3600):
  21. logger.warn(
  22. '[in_signal_udp] ignore of ts. devNo = {}, ts = {}, signal = {}, cmd = {}, ip = {}'.format(
  23. devNo, ts, signal, cmd, SysParas.get_private_ip(settings.FLUENTED_IP)))
  24. return
  25. else:
  26. logger.debug(
  27. '[in_signal_udp] devNo = {}, ts = {}, signal = {}, cmd = {}, ip = {}'.format(
  28. devNo, ts, signal, cmd, SysParas.get_private_ip(settings.FLUENTED_IP)))
  29. ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_SIGNAL_PORT)
  30. point = {
  31. 'devno': devNo,
  32. 'time': ts,
  33. 'signal': signal,
  34. 'cmd': str(cmd)
  35. }
  36. client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  37. client.sendto(json.dumps(point), ip_port)
  38. def in_power_udp(self, devNo, port, ts, power, voltage, current, **kwargs):
  39. import time
  40. if ts < (int(time.time()) - 3 * 24 * 3600):
  41. logger.warn(
  42. '[in_power_udp] ignore of ts. devNo = {}, port = {}, ts = {}, power = {}, voltage = {}, current = {}, ip = {}, kwargs = {}'.format(
  43. devNo, port, ts, power, voltage, current, SysParas.get_private_ip(settings.FLUENTED_IP), kwargs))
  44. return
  45. else:
  46. logger.debug(
  47. '[in_power_udp] devNo = {}, port = {}, ts = {}, power = {}, voltage = {}, current = {}, ip = {}, kwargs = {}'.format(
  48. devNo, port, ts, power, voltage, current, SysParas.get_private_ip(settings.FLUENTED_IP), kwargs))
  49. ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_POWER_PORT)
  50. point = {
  51. 'time': ts,
  52. 'devno': devNo,
  53. 'port': str(port),
  54. 'power2': float(power)
  55. }
  56. if voltage is not None:
  57. point.update({'voltage2': float(voltage)})
  58. if current is not None:
  59. point.update({'current2': float(current)})
  60. if kwargs:
  61. point.update(kwargs)
  62. client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  63. client.sendto(json.dumps(point), ip_port)
  64. def in_put_coins_udp(self, devNo, ts, coins, mode, port = None):
  65. import time
  66. if ts < (int(time.time()) - 3 * 24 * 3600):
  67. logger.warn(
  68. '[in_put_coins_udp] ignore of ts. devNo = {}, ts = {}, coins = {}, mode = {}, port = {}, ip = {}'.format(
  69. devNo, ts, coins, mode, port, SysParas.get_private_ip(settings.FLUENTED_IP)))
  70. return
  71. else:
  72. logger.debug(
  73. '[in_put_coins_udp] devNo = {}, ts = {}, coins = {}, mode = {}, port = {}, ip = {}'.format(
  74. devNo, ts, coins, mode, port, SysParas.get_private_ip(settings.FLUENTED_IP)))
  75. ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_OFFLINE_PORT)
  76. point = {
  77. 'time': ts,
  78. 'devno': devNo,
  79. 'coins': int(coins),
  80. 'mode': mode
  81. }
  82. if port:
  83. point.update({'port': str(port)})
  84. client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  85. client.sendto(json.dumps(point), ip_port)
  86. class InfluxDBEngine(object):
  87. def __init__(self, dbName, measurement, retention_policy = None):
  88. self.client = InfluxDBClient(
  89. host = settings.INFLUXDB_IP,
  90. port = settings.INFLUXDB_PORT,
  91. username = settings.INFLUXDB_USER,
  92. password = settings.INFLUXDB_PWD,
  93. database = dbName,
  94. timeout = 5)
  95. self.database = dbName
  96. self.retention_policy = retention_policy
  97. self.measurement = measurement
  98. @property
  99. def full_qualify_meaurement(self):
  100. if not self.retention_policy:
  101. return '"{}"'.format(self.measurement)
  102. else:
  103. return '"{}"."{}"'.format(self.retention_policy, self.measurement)
  104. def read_data(self, sqlStr):
  105. try:
  106. rs = self.client.query(sqlStr) # type: ResultSet
  107. return [point for point in rs.get_points()]
  108. except Exception, e:
  109. logger.exception('append_power,e=%s,str=%s' % (e, sqlStr))
  110. return None
  111. class SignalManager(object):
  112. inst = None
  113. @staticmethod
  114. def instence():
  115. if SignalManager.inst is not None:
  116. return SignalManager.inst
  117. enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'signal') # type: InfluxDBEngine
  118. SignalManager.inst = SignalManager(enginer)
  119. return SignalManager.inst
  120. def __init__(self, enginer):
  121. self.enginer = enginer # type: InfluxDBEngine
  122. def get(self, devNo, sTime = None, eTime = None):
  123. # type: (str, datetime, datetime)->dict
  124. sTime = local2utc(sTime) if sTime else None
  125. eTime = local2utc(eTime) if eTime else None
  126. if sTime and eTime:
  127. selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time >= '%s' and time <= '%s' order by time" % (
  128. devNo, sTime, eTime)
  129. elif sTime:
  130. selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time >= '%s' order by time" % (
  131. devNo, sTime)
  132. elif eTime:
  133. selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time <= '%s' order by time" % (
  134. devNo, eTime)
  135. else:
  136. selectStr = "select time,signal,usage,cmd from signal where devno = '%s' order by time" % (devNo)
  137. points = self.enginer.read_data(selectStr)
  138. if not points:
  139. return []
  140. resultList = []
  141. for point in points:
  142. resultList.append(
  143. {
  144. 'time': arrow.get(point['time']).to(settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
  145. 'signal': point['signal'],
  146. 'usage': point['usage'],
  147. 'cmd': point['cmd']
  148. })
  149. return resultList
  150. class PowerManager(object):
  151. inst = None
  152. @staticmethod
  153. def instence():
  154. if PowerManager.inst is not None:
  155. return PowerManager.inst
  156. enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'powers') # type: InfluxDBEngine
  157. PowerManager.inst = PowerManager(enginer)
  158. return PowerManager.inst
  159. def __init__(self, enginer):
  160. self.enginer = enginer
  161. def get(self, devNo, port, sTime, eTime, interval = 300,
  162. fields = [{'field': 'power2', 'scale': '1.0', 'precision': '0.01'},
  163. {'field': 'voltage2', 'scale': '1.0', 'precision': '0.01'},
  164. {'field': 'current2', 'scale': '1.0', 'precision': '0.01'}]):
  165. def format_date(dateStr):
  166. return arrow.get(str(dateStr)).to(settings.TIME_ZONE).naive
  167. def format_point(point, fields):
  168. rv = {
  169. 'time': format_date(point['time'])
  170. }
  171. for item in fields:
  172. field = item['field']
  173. scale = Decimal(item['scale'])
  174. precision = item['precision']
  175. if field in point and point[field] is not None:
  176. rv[field] = quantize(Decimal(str(point[field])) * scale, places = precision)
  177. else:
  178. rv[field] = '-'
  179. return rv
  180. def format_null_point(_time, fields):
  181. rv = {
  182. 'time': _time
  183. }
  184. for item in fields:
  185. field = item['field']
  186. rv[field] = '-'
  187. return rv
  188. query_start_time = sTime - datetime.timedelta(seconds = interval)
  189. query_end_time = eTime + datetime.timedelta(seconds = interval)
  190. selectStr = "select time, {} from {} where devno = '{}' and port = '{}' and time >= '{}' and time <= '{}' order by time".format(
  191. ','.join([item['field'] for item in fields]), self.enginer.full_qualify_meaurement, devNo, port,
  192. local2utc(query_start_time),
  193. local2utc(query_end_time))
  194. points = self.enginer.read_data(selectStr)
  195. if points is None:
  196. raise Exception(u'查询数据失败')
  197. total = len(points)
  198. if total == 0:
  199. return []
  200. import pandas as pd
  201. df = pd.DataFrame(points)
  202. min_time = format_date(df['time'].min())
  203. max_time = format_date(df['time'].max())
  204. rv = []
  205. first_section = None
  206. if min_time > sTime:
  207. left_rv = []
  208. next_time = min_time
  209. while True:
  210. next_time = (next_time - datetime.timedelta(seconds = interval))
  211. left_rv.append(format_null_point(next_time, fields))
  212. if next_time < sTime:
  213. break
  214. if len(left_rv) > 0:
  215. left_rv.reverse()
  216. rv.extend(left_rv)
  217. middle_section = None
  218. if max_time > min_time:
  219. # now_point =
  220. now = format_point(points[0], fields)
  221. # now = {
  222. # 'time': format_date(now_point['time']),
  223. # 'power': now_point['power'],
  224. # 'voltage': format_field(now_point.get('voltage', '-')),
  225. # 'current': format_field(now_point.get('current', '-'))
  226. # }
  227. rv.append(now)
  228. idx = 1
  229. # right_point = points[idx]
  230. right = format_point(points[idx], fields)
  231. # {
  232. # 'time': format_date(right_point['time']),
  233. # 'power': right_point['power'],
  234. # 'voltage': format_field(right_point.get('voltage', '-')),
  235. # 'current': format_field(right_point.get('current', '-'))
  236. # }
  237. total = len(points)
  238. while True:
  239. diff = (right['time'] - now['time']).total_seconds()
  240. if diff < (interval + interval / 2):
  241. rv.append(right)
  242. now = right
  243. idx = idx + 1
  244. if idx >= total:
  245. break
  246. else:
  247. right = format_point(points[idx], fields)
  248. # {
  249. # 'time': format_date(points[idx]['time']),
  250. # 'power': points[idx]['power'],
  251. # 'voltage': format_field(points[idx].get('voltage', '-')),
  252. # 'current': format_field(points[idx].get('current', '-'))
  253. # }
  254. # print('after: now = {}; right = {}'.format(now, right))
  255. else:
  256. if diff > 2 * interval:
  257. # print('before: now = {}; right = {}'.format(now, right))
  258. last_now = now
  259. now = format_null_point((last_now['time'] + datetime.timedelta(seconds = interval)), fields)
  260. rv.append(now)
  261. # print('after: now = {}; right = {}'.format(now, right))
  262. else:
  263. # print('before: now = {}; right = {}'.format(now, right))
  264. rv.append({
  265. 'time': (now['time'] + datetime.timedelta(seconds = diff / 2)),
  266. 'power2': now['power2'],
  267. 'voltage2': now['voltage2'],
  268. 'current2': now['current2']
  269. })
  270. rv.append(right)
  271. now = right
  272. idx = idx + 1
  273. if idx >= total:
  274. break
  275. else:
  276. right = format_point(points[idx], fields)
  277. # print('after: now = {}; right = {}'.format(now, right))
  278. if eTime > max_time:
  279. right_rv = []
  280. next_time = max_time
  281. while True:
  282. next_time = (next_time + datetime.timedelta(seconds = interval))
  283. right_rv.append(format_null_point(next_time, fields))
  284. if next_time >= eTime:
  285. break
  286. if len(right_rv) > 0:
  287. rv.extend(right_rv)
  288. return rv
  289. def get_raw(self, devNo, port, sTime, eTime):
  290. def format_date(dateStr):
  291. return arrow.get(str(dateStr)).to(settings.TIME_ZONE).naive
  292. selectStr = "select time, power2, voltage2, current2 from %s where devno = '%s' and port = '%s' and time >= '%s' and time <= '%s' order by time" % (
  293. self.enginer.full_qualify_meaurement, devNo, port, local2utc(sTime), local2utc(eTime))
  294. points = self.enginer.read_data(selectStr)
  295. rv = []
  296. for item in points:
  297. _time = item.pop('time')
  298. item['time'] = format_date(_time)
  299. rv.append(item)
  300. return rv
  301. class OfflineCoinsManager(object):
  302. inst = None
  303. @staticmethod
  304. def instence():
  305. if OfflineCoinsManager.inst is not None:
  306. return OfflineCoinsManager.inst
  307. enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'offline_coins') # type: InfluxDBEngine
  308. OfflineCoinsManager.inst = OfflineCoinsManager(enginer)
  309. return OfflineCoinsManager.inst
  310. def __init__(self, enginer):
  311. self.enginer = enginer # type: InfluxDBEngine
  312. def get(self, devNo, sTime, eTime):
  313. # type: (str, datetime, datetime)->list
  314. sTime = local2utc(sTime)
  315. eTime = local2utc(eTime)
  316. selectStr = "select time,port,coins,mode from offline_coins where devno = '%s' and time >= '%s' and time <= '%s' order by time" % (
  317. devNo, sTime, eTime)
  318. points = self.enginer.read_data(selectStr)
  319. if not points:
  320. return []
  321. rv = []
  322. for point in points:
  323. item = {
  324. 'dateTimeAdded': arrow.get(point['time']).to(settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
  325. 'count': int(point['coins'])
  326. }
  327. if point['port']:
  328. item.update({'port': point['port']})
  329. rv.append(item)
  330. return rv
  331. class OfflineManager(object):
  332. """
  333. 离线设备消息通知
  334. """
  335. _CACHE = cache
  336. @staticmethod
  337. def cache_key(devNo):
  338. return "celery_device_offline_{}".format(devNo)
  339. @classmethod
  340. def add_cache(cls, devNo, tid):
  341. cls._CACHE.set(cls.cache_key(devNo), tid)
  342. @classmethod
  343. def delete_cache(cls, devNo):
  344. cls._CACHE.delete(cls.cache_key(devNo))
  345. @classmethod
  346. def get_cache(cls, devNo):
  347. return cls._CACHE.get(cls.cache_key(devNo))
  348. @staticmethod
  349. def prepare_registe_check(devNo):
  350. from apps.web.device.models import Device
  351. from apps.web.dealer.models import Dealer
  352. dev = Device.get_dev(devNo)
  353. if not dev:
  354. logger.error("[OFFLINE_DEVICE_TASK] registe not dev, devNo is <{}>".format(devNo))
  355. return 0
  356. dealer = Dealer.get_dealer(dev.ownerId)
  357. if not dealer:
  358. logger.error("[OFFLINE_DEVICE_TASK] registe not dealer, devNo is <{}>".format(devNo))
  359. return 0
  360. # 没有打开开关的经销商直接忽略
  361. if not dealer.get("offlineNotifySwitch", False):
  362. logger.error("[OFFLINE_DEVICE_TASK] register not dealer switch! devNo is <{}>".format(devNo))
  363. return 0
  364. offlineNotifyTime = int(dealer.get("offlineNotifyTime", 1) or 1)
  365. return offlineNotifyTime
  366. @classmethod
  367. def registe_device_offline_notify(cls, devNo, check = True):
  368. """
  369. 注册设备的离线通知
  370. :param devNo: 设备编号
  371. :param check: 是否无条件塞入队列
  372. :return:
  373. """
  374. # 发现任务已经被提交 退出不在注册通知任务
  375. if cls.get_cache(devNo):
  376. logger.error("[OFFLINE_DEVICE_TASK] notify has beed registe! devNo is <{}>".format(devNo))
  377. return
  378. # celery 异步注册任务 默认小时之后发送通知给经销商
  379. if check:
  380. offlineNotifyTime = cls.prepare_registe_check(devNo)
  381. else:
  382. offlineNotifyTime = 1
  383. if not offlineNotifyTime or offlineNotifyTime <= 0:
  384. return
  385. from taskmanager.mediator import task_caller
  386. result = task_caller(
  387. "device_offline_notify",
  388. delay = offlineNotifyTime * 3600,
  389. devNo = devNo
  390. )
  391. if not result:
  392. logger.error("[OFFLINE_DEVICE_TASK] task caller not return a async result! devNo is <{}>".format(devNo))
  393. return
  394. tid = str(result)
  395. logger.info("[OFFLINE_DEVICE_TASK] registe a task, devNo <{}>, tid <{}>".format(devNo, tid))
  396. cls.add_cache(devNo, tid)
  397. @classmethod
  398. def discard_device_offline_notify(cls, devNo):
  399. """
  400. 取消设备离线通知
  401. :param devNo:
  402. :return:
  403. """
  404. # 查看任务是否已经被注册 没有注册通知的就不进行处理
  405. tid = cls.get_cache(devNo)
  406. if not tid:
  407. logger.info("[OFFLINE_DEVICE_TASK] not discard a task, devNo <{}>, taskId <{}>".format(devNo, tid))
  408. return
  409. from taskmanager.operator import app as celeryApp
  410. celeryApp.control.revoke(tid)
  411. cls.delete_cache(devNo)
  412. logger.info("[OFFLINE_DEVICE_TASK] discard a task done, devNo <{}>, tid <{}>".format(devNo, tid))