123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import datetime
- import logging
- import socket
- from decimal import Decimal
- import arrow
- import simplejson as json
- from django.conf import settings
- from influxdb import InfluxDBClient
- from influxdb.resultset import ResultSet
- from django.core.cache import cache
- from apilib.numerics import quantize
- from apilib.utils_datetime import local2utc
- from apps.web.core.sysparas import SysParas
- logger = logging.getLogger(__name__)
- class FluentedEngine(object):
- def in_signal_udp(self, devNo, ts, signal, cmd):
- import time
- if ts < (int(time.time()) - 3 * 24 * 3600):
- logger.warn(
- '[in_signal_udp] ignore of ts. devNo = {}, ts = {}, signal = {}, cmd = {}, ip = {}'.format(
- devNo, ts, signal, cmd, SysParas.get_private_ip(settings.FLUENTED_IP)))
- return
- else:
- logger.debug(
- '[in_signal_udp] devNo = {}, ts = {}, signal = {}, cmd = {}, ip = {}'.format(
- devNo, ts, signal, cmd, SysParas.get_private_ip(settings.FLUENTED_IP)))
- ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_SIGNAL_PORT)
- point = {
- 'devno': devNo,
- 'time': ts,
- 'signal': signal,
- 'cmd': str(cmd)
- }
- client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- client.sendto(json.dumps(point), ip_port)
- def in_power_udp(self, devNo, port, ts, power, voltage, current, **kwargs):
- import time
- if ts < (int(time.time()) - 3 * 24 * 3600):
- logger.warn(
- '[in_power_udp] ignore of ts. devNo = {}, port = {}, ts = {}, power = {}, voltage = {}, current = {}, ip = {}, kwargs = {}'.format(
- devNo, port, ts, power, voltage, current, SysParas.get_private_ip(settings.FLUENTED_IP), kwargs))
- return
- else:
- logger.debug(
- '[in_power_udp] devNo = {}, port = {}, ts = {}, power = {}, voltage = {}, current = {}, ip = {}, kwargs = {}'.format(
- devNo, port, ts, power, voltage, current, SysParas.get_private_ip(settings.FLUENTED_IP), kwargs))
- ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_POWER_PORT)
- point = {
- 'time': ts,
- 'devno': devNo,
- 'port': str(port),
- 'power2': float(power)
- }
- if voltage is not None:
- point.update({'voltage2': float(voltage)})
- if current is not None:
- point.update({'current2': float(current)})
- if kwargs:
- point.update(kwargs)
- client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- client.sendto(json.dumps(point), ip_port)
- def in_put_coins_udp(self, devNo, ts, coins, mode, port = None):
- import time
- if ts < (int(time.time()) - 3 * 24 * 3600):
- logger.warn(
- '[in_put_coins_udp] ignore of ts. devNo = {}, ts = {}, coins = {}, mode = {}, port = {}, ip = {}'.format(
- devNo, ts, coins, mode, port, SysParas.get_private_ip(settings.FLUENTED_IP)))
- return
- else:
- logger.debug(
- '[in_put_coins_udp] devNo = {}, ts = {}, coins = {}, mode = {}, port = {}, ip = {}'.format(
- devNo, ts, coins, mode, port, SysParas.get_private_ip(settings.FLUENTED_IP)))
- ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_OFFLINE_PORT)
- point = {
- 'time': ts,
- 'devno': devNo,
- 'coins': int(coins),
- 'mode': mode
- }
- if port:
- point.update({'port': str(port)})
- client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- client.sendto(json.dumps(point), ip_port)
- class InfluxDBEngine(object):
- def __init__(self, dbName, measurement, retention_policy = None):
- self.client = InfluxDBClient(
- host = settings.INFLUXDB_IP,
- port = settings.INFLUXDB_PORT,
- username = settings.INFLUXDB_USER,
- password = settings.INFLUXDB_PWD,
- database = dbName,
- timeout = 5)
- self.database = dbName
- self.retention_policy = retention_policy
- self.measurement = measurement
- @property
- def full_qualify_meaurement(self):
- if not self.retention_policy:
- return '"{}"'.format(self.measurement)
- else:
- return '"{}"."{}"'.format(self.retention_policy, self.measurement)
- def read_data(self, sqlStr):
- try:
- rs = self.client.query(sqlStr) # type: ResultSet
- return [point for point in rs.get_points()]
- except Exception, e:
- logger.exception('append_power,e=%s,str=%s' % (e, sqlStr))
- return None
- class SignalManager(object):
- inst = None
- @staticmethod
- def instence():
- if SignalManager.inst is not None:
- return SignalManager.inst
- enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'signal') # type: InfluxDBEngine
- SignalManager.inst = SignalManager(enginer)
- return SignalManager.inst
- def __init__(self, enginer):
- self.enginer = enginer # type: InfluxDBEngine
- def get(self, devNo, sTime = None, eTime = None):
- # type: (str, datetime, datetime)->dict
- sTime = local2utc(sTime) if sTime else None
- eTime = local2utc(eTime) if eTime else None
- if sTime and eTime:
- selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time >= '%s' and time <= '%s' order by time" % (
- devNo, sTime, eTime)
- elif sTime:
- selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time >= '%s' order by time" % (
- devNo, sTime)
- elif eTime:
- selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time <= '%s' order by time" % (
- devNo, eTime)
- else:
- selectStr = "select time,signal,usage,cmd from signal where devno = '%s' order by time" % (devNo)
- points = self.enginer.read_data(selectStr)
- if not points:
- return []
- resultList = []
- for point in points:
- resultList.append(
- {
- 'time': arrow.get(point['time']).to(settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
- 'signal': point['signal'],
- 'usage': point['usage'],
- 'cmd': point['cmd']
- })
- return resultList
- class PowerManager(object):
- inst = None
- @staticmethod
- def instence():
- if PowerManager.inst is not None:
- return PowerManager.inst
- enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'powers') # type: InfluxDBEngine
- PowerManager.inst = PowerManager(enginer)
- return PowerManager.inst
- def __init__(self, enginer):
- self.enginer = enginer
- def get(self, devNo, port, sTime, eTime, interval = 300,
- fields = [{'field': 'power2', 'scale': '1.0', 'precision': '0.01'},
- {'field': 'voltage2', 'scale': '1.0', 'precision': '0.01'},
- {'field': 'current2', 'scale': '1.0', 'precision': '0.01'}]):
- def format_date(dateStr):
- return arrow.get(str(dateStr)).to(settings.TIME_ZONE).naive
- def format_point(point, fields):
- rv = {
- 'time': format_date(point['time'])
- }
- for item in fields:
- field = item['field']
- scale = Decimal(item['scale'])
- precision = item['precision']
- if field in point and point[field] is not None:
- rv[field] = quantize(Decimal(str(point[field])) * scale, places = precision)
- else:
- rv[field] = '-'
- return rv
- def format_null_point(_time, fields):
- rv = {
- 'time': _time
- }
- for item in fields:
- field = item['field']
- rv[field] = '-'
- return rv
- query_start_time = sTime - datetime.timedelta(seconds = interval)
- query_end_time = eTime + datetime.timedelta(seconds = interval)
- selectStr = "select time, {} from {} where devno = '{}' and port = '{}' and time >= '{}' and time <= '{}' order by time".format(
- ','.join([item['field'] for item in fields]), self.enginer.full_qualify_meaurement, devNo, port,
- local2utc(query_start_time),
- local2utc(query_end_time))
- points = self.enginer.read_data(selectStr)
- if points is None:
- raise Exception(u'查询数据失败')
- total = len(points)
- if total == 0:
- return []
- import pandas as pd
- df = pd.DataFrame(points)
- min_time = format_date(df['time'].min())
- max_time = format_date(df['time'].max())
- rv = []
- first_section = None
- if min_time > sTime:
- left_rv = []
- next_time = min_time
- while True:
- next_time = (next_time - datetime.timedelta(seconds = interval))
- left_rv.append(format_null_point(next_time, fields))
- if next_time < sTime:
- break
- if len(left_rv) > 0:
- left_rv.reverse()
- rv.extend(left_rv)
- middle_section = None
- if max_time > min_time:
- # now_point =
- now = format_point(points[0], fields)
- # now = {
- # 'time': format_date(now_point['time']),
- # 'power': now_point['power'],
- # 'voltage': format_field(now_point.get('voltage', '-')),
- # 'current': format_field(now_point.get('current', '-'))
- # }
- rv.append(now)
- idx = 1
- # right_point = points[idx]
- right = format_point(points[idx], fields)
- # {
- # 'time': format_date(right_point['time']),
- # 'power': right_point['power'],
- # 'voltage': format_field(right_point.get('voltage', '-')),
- # 'current': format_field(right_point.get('current', '-'))
- # }
- total = len(points)
- while True:
- diff = (right['time'] - now['time']).total_seconds()
- if diff < (interval + interval / 2):
- rv.append(right)
- now = right
- idx = idx + 1
- if idx >= total:
- break
- else:
- right = format_point(points[idx], fields)
- # {
- # 'time': format_date(points[idx]['time']),
- # 'power': points[idx]['power'],
- # 'voltage': format_field(points[idx].get('voltage', '-')),
- # 'current': format_field(points[idx].get('current', '-'))
- # }
- # print('after: now = {}; right = {}'.format(now, right))
- else:
- if diff > 2 * interval:
- # print('before: now = {}; right = {}'.format(now, right))
- last_now = now
- now = format_null_point((last_now['time'] + datetime.timedelta(seconds = interval)), fields)
- rv.append(now)
- # print('after: now = {}; right = {}'.format(now, right))
- else:
- # print('before: now = {}; right = {}'.format(now, right))
- rv.append({
- 'time': (now['time'] + datetime.timedelta(seconds = diff / 2)),
- 'power2': now['power2'],
- 'voltage2': now['voltage2'],
- 'current2': now['current2']
- })
- rv.append(right)
- now = right
- idx = idx + 1
- if idx >= total:
- break
- else:
- right = format_point(points[idx], fields)
- # print('after: now = {}; right = {}'.format(now, right))
- if eTime > max_time:
- right_rv = []
- next_time = max_time
- while True:
- next_time = (next_time + datetime.timedelta(seconds = interval))
- right_rv.append(format_null_point(next_time, fields))
- if next_time >= eTime:
- break
- if len(right_rv) > 0:
- rv.extend(right_rv)
- return rv
- def get_raw(self, devNo, port, sTime, eTime):
- def format_date(dateStr):
- return arrow.get(str(dateStr)).to(settings.TIME_ZONE).naive
- selectStr = "select time, power2, voltage2, current2 from %s where devno = '%s' and port = '%s' and time >= '%s' and time <= '%s' order by time" % (
- self.enginer.full_qualify_meaurement, devNo, port, local2utc(sTime), local2utc(eTime))
- points = self.enginer.read_data(selectStr)
- rv = []
- for item in points:
- _time = item.pop('time')
- item['time'] = format_date(_time)
- rv.append(item)
- return rv
- class OfflineCoinsManager(object):
- inst = None
- @staticmethod
- def instence():
- if OfflineCoinsManager.inst is not None:
- return OfflineCoinsManager.inst
- enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'offline_coins') # type: InfluxDBEngine
- OfflineCoinsManager.inst = OfflineCoinsManager(enginer)
- return OfflineCoinsManager.inst
- def __init__(self, enginer):
- self.enginer = enginer # type: InfluxDBEngine
- def get(self, devNo, sTime, eTime):
- # type: (str, datetime, datetime)->list
- sTime = local2utc(sTime)
- eTime = local2utc(eTime)
- selectStr = "select time,port,coins,mode from offline_coins where devno = '%s' and time >= '%s' and time <= '%s' order by time" % (
- devNo, sTime, eTime)
- points = self.enginer.read_data(selectStr)
- if not points:
- return []
- rv = []
- for point in points:
- item = {
- 'dateTimeAdded': arrow.get(point['time']).to(settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'),
- 'count': int(point['coins'])
- }
- if point['port']:
- item.update({'port': point['port']})
- rv.append(item)
- return rv
- class OfflineManager(object):
- """
- 离线设备消息通知
- """
- _CACHE = cache
- @staticmethod
- def cache_key(devNo):
- return "celery_device_offline_{}".format(devNo)
- @classmethod
- def add_cache(cls, devNo, tid):
- cls._CACHE.set(cls.cache_key(devNo), tid)
- @classmethod
- def delete_cache(cls, devNo):
- cls._CACHE.delete(cls.cache_key(devNo))
- @classmethod
- def get_cache(cls, devNo):
- return cls._CACHE.get(cls.cache_key(devNo))
- @staticmethod
- def prepare_registe_check(devNo):
- from apps.web.device.models import Device
- from apps.web.dealer.models import Dealer
- dev = Device.get_dev(devNo)
- if not dev:
- logger.error("[OFFLINE_DEVICE_TASK] registe not dev, devNo is <{}>".format(devNo))
- return 0
- dealer = Dealer.get_dealer(dev.ownerId)
- if not dealer:
- logger.error("[OFFLINE_DEVICE_TASK] registe not dealer, devNo is <{}>".format(devNo))
- return 0
- # 没有打开开关的经销商直接忽略
- if not dealer.get("offlineNotifySwitch", False):
- logger.error("[OFFLINE_DEVICE_TASK] register not dealer switch! devNo is <{}>".format(devNo))
- return 0
- offlineNotifyTime = int(dealer.get("offlineNotifyTime", 1) or 1)
- return offlineNotifyTime
- @classmethod
- def registe_device_offline_notify(cls, devNo, check = True):
- """
- 注册设备的离线通知
- :param devNo: 设备编号
- :param check: 是否无条件塞入队列
- :return:
- """
- # 发现任务已经被提交 退出不在注册通知任务
- if cls.get_cache(devNo):
- logger.error("[OFFLINE_DEVICE_TASK] notify has beed registe! devNo is <{}>".format(devNo))
- return
- # celery 异步注册任务 默认小时之后发送通知给经销商
- if check:
- offlineNotifyTime = cls.prepare_registe_check(devNo)
- else:
- offlineNotifyTime = 1
- if not offlineNotifyTime or offlineNotifyTime <= 0:
- return
- from taskmanager.mediator import task_caller
- result = task_caller(
- "device_offline_notify",
- delay = offlineNotifyTime * 3600,
- devNo = devNo
- )
- if not result:
- logger.error("[OFFLINE_DEVICE_TASK] task caller not return a async result! devNo is <{}>".format(devNo))
- return
- tid = str(result)
- logger.info("[OFFLINE_DEVICE_TASK] registe a task, devNo <{}>, tid <{}>".format(devNo, tid))
- cls.add_cache(devNo, tid)
- @classmethod
- def discard_device_offline_notify(cls, devNo):
- """
- 取消设备离线通知
- :param devNo:
- :return:
- """
- # 查看任务是否已经被注册 没有注册通知的就不进行处理
- tid = cls.get_cache(devNo)
- if not tid:
- logger.info("[OFFLINE_DEVICE_TASK] not discard a task, devNo <{}>, taskId <{}>".format(devNo, tid))
- return
- from taskmanager.operator import app as celeryApp
- celeryApp.control.revoke(tid)
- cls.delete_cache(devNo)
- logger.info("[OFFLINE_DEVICE_TASK] discard a task done, devNo <{}>, tid <{}>".format(devNo, tid))
|