# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging import socket from decimal import Decimal import arrow import simplejson as json from django.conf import settings from influxdb import InfluxDBClient from influxdb.resultset import ResultSet from django.core.cache import cache from apilib.numerics import quantize from apilib.utils_datetime import local2utc from apps.web.core.sysparas import SysParas logger = logging.getLogger(__name__) class FluentedEngine(object): def in_signal_udp(self, devNo, ts, signal, cmd): import time if ts < (int(time.time()) - 3 * 24 * 3600): logger.warn( '[in_signal_udp] ignore of ts. devNo = {}, ts = {}, signal = {}, cmd = {}, ip = {}'.format( devNo, ts, signal, cmd, SysParas.get_private_ip(settings.FLUENTED_IP))) return else: logger.debug( '[in_signal_udp] devNo = {}, ts = {}, signal = {}, cmd = {}, ip = {}'.format( devNo, ts, signal, cmd, SysParas.get_private_ip(settings.FLUENTED_IP))) ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_SIGNAL_PORT) point = { 'devno': devNo, 'time': ts, 'signal': signal, 'cmd': str(cmd) } client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client.sendto(json.dumps(point), ip_port) def in_power_udp(self, devNo, port, ts, power, voltage, current, **kwargs): import time if ts < (int(time.time()) - 3 * 24 * 3600): logger.warn( '[in_power_udp] ignore of ts. devNo = {}, port = {}, ts = {}, power = {}, voltage = {}, current = {}, ip = {}, kwargs = {}'.format( devNo, port, ts, power, voltage, current, SysParas.get_private_ip(settings.FLUENTED_IP), kwargs)) return else: logger.debug( '[in_power_udp] devNo = {}, port = {}, ts = {}, power = {}, voltage = {}, current = {}, ip = {}, kwargs = {}'.format( devNo, port, ts, power, voltage, current, SysParas.get_private_ip(settings.FLUENTED_IP), kwargs)) ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_POWER_PORT) point = { 'time': ts, 'devno': devNo, 'port': str(port), 'power2': float(power) } if voltage is not None: point.update({'voltage2': float(voltage)}) if current is not None: point.update({'current2': float(current)}) if kwargs: point.update(kwargs) client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client.sendto(json.dumps(point), ip_port) def in_put_coins_udp(self, devNo, ts, coins, mode, port = None): import time if ts < (int(time.time()) - 3 * 24 * 3600): logger.warn( '[in_put_coins_udp] ignore of ts. devNo = {}, ts = {}, coins = {}, mode = {}, port = {}, ip = {}'.format( devNo, ts, coins, mode, port, SysParas.get_private_ip(settings.FLUENTED_IP))) return else: logger.debug( '[in_put_coins_udp] devNo = {}, ts = {}, coins = {}, mode = {}, port = {}, ip = {}'.format( devNo, ts, coins, mode, port, SysParas.get_private_ip(settings.FLUENTED_IP))) ip_port = (SysParas.get_private_ip(settings.FLUENTED_IP), settings.FLUENTED_OFFLINE_PORT) point = { 'time': ts, 'devno': devNo, 'coins': int(coins), 'mode': mode } if port: point.update({'port': str(port)}) client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client.sendto(json.dumps(point), ip_port) class InfluxDBEngine(object): def __init__(self, dbName, measurement, retention_policy = None): self.client = InfluxDBClient( host = settings.INFLUXDB_IP, port = settings.INFLUXDB_PORT, username = settings.INFLUXDB_USER, password = settings.INFLUXDB_PWD, database = dbName, timeout = 5) self.database = dbName self.retention_policy = retention_policy self.measurement = measurement @property def full_qualify_meaurement(self): if not self.retention_policy: return '"{}"'.format(self.measurement) else: return '"{}"."{}"'.format(self.retention_policy, self.measurement) def read_data(self, sqlStr): try: rs = self.client.query(sqlStr) # type: ResultSet return [point for point in rs.get_points()] except Exception, e: logger.exception('append_power,e=%s,str=%s' % (e, sqlStr)) return None class SignalManager(object): inst = None @staticmethod def instence(): if SignalManager.inst is not None: return SignalManager.inst enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'signal') # type: InfluxDBEngine SignalManager.inst = SignalManager(enginer) return SignalManager.inst def __init__(self, enginer): self.enginer = enginer # type: InfluxDBEngine def get(self, devNo, sTime = None, eTime = None): # type: (str, datetime, datetime)->dict sTime = local2utc(sTime) if sTime else None eTime = local2utc(eTime) if eTime else None if sTime and eTime: selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time >= '%s' and time <= '%s' order by time" % ( devNo, sTime, eTime) elif sTime: selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time >= '%s' order by time" % ( devNo, sTime) elif eTime: selectStr = "select time,signal,usage,cmd from signal where devno = '%s' and time <= '%s' order by time" % ( devNo, eTime) else: selectStr = "select time,signal,usage,cmd from signal where devno = '%s' order by time" % (devNo) points = self.enginer.read_data(selectStr) if not points: return [] resultList = [] for point in points: resultList.append( { 'time': arrow.get(point['time']).to(settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'), 'signal': point['signal'], 'usage': point['usage'], 'cmd': point['cmd'] }) return resultList class PowerManager(object): inst = None @staticmethod def instence(): if PowerManager.inst is not None: return PowerManager.inst enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'powers') # type: InfluxDBEngine PowerManager.inst = PowerManager(enginer) return PowerManager.inst def __init__(self, enginer): self.enginer = enginer def get(self, devNo, port, sTime, eTime, interval = 300, fields = [{'field': 'power2', 'scale': '1.0', 'precision': '0.01'}, {'field': 'voltage2', 'scale': '1.0', 'precision': '0.01'}, {'field': 'current2', 'scale': '1.0', 'precision': '0.01'}]): def format_date(dateStr): return arrow.get(str(dateStr)).to(settings.TIME_ZONE).naive def format_point(point, fields): rv = { 'time': format_date(point['time']) } for item in fields: field = item['field'] scale = Decimal(item['scale']) precision = item['precision'] if field in point and point[field] is not None: rv[field] = quantize(Decimal(str(point[field])) * scale, places = precision) else: rv[field] = '-' return rv def format_null_point(_time, fields): rv = { 'time': _time } for item in fields: field = item['field'] rv[field] = '-' return rv query_start_time = sTime - datetime.timedelta(seconds = interval) query_end_time = eTime + datetime.timedelta(seconds = interval) selectStr = "select time, {} from {} where devno = '{}' and port = '{}' and time >= '{}' and time <= '{}' order by time".format( ','.join([item['field'] for item in fields]), self.enginer.full_qualify_meaurement, devNo, port, local2utc(query_start_time), local2utc(query_end_time)) points = self.enginer.read_data(selectStr) if points is None: raise Exception(u'查询数据失败') total = len(points) if total == 0: return [] import pandas as pd df = pd.DataFrame(points) min_time = format_date(df['time'].min()) max_time = format_date(df['time'].max()) rv = [] first_section = None if min_time > sTime: left_rv = [] next_time = min_time while True: next_time = (next_time - datetime.timedelta(seconds = interval)) left_rv.append(format_null_point(next_time, fields)) if next_time < sTime: break if len(left_rv) > 0: left_rv.reverse() rv.extend(left_rv) middle_section = None if max_time > min_time: # now_point = now = format_point(points[0], fields) # now = { # 'time': format_date(now_point['time']), # 'power': now_point['power'], # 'voltage': format_field(now_point.get('voltage', '-')), # 'current': format_field(now_point.get('current', '-')) # } rv.append(now) idx = 1 # right_point = points[idx] right = format_point(points[idx], fields) # { # 'time': format_date(right_point['time']), # 'power': right_point['power'], # 'voltage': format_field(right_point.get('voltage', '-')), # 'current': format_field(right_point.get('current', '-')) # } total = len(points) while True: diff = (right['time'] - now['time']).total_seconds() if diff < (interval + interval / 2): rv.append(right) now = right idx = idx + 1 if idx >= total: break else: right = format_point(points[idx], fields) # { # 'time': format_date(points[idx]['time']), # 'power': points[idx]['power'], # 'voltage': format_field(points[idx].get('voltage', '-')), # 'current': format_field(points[idx].get('current', '-')) # } # print('after: now = {}; right = {}'.format(now, right)) else: if diff > 2 * interval: # print('before: now = {}; right = {}'.format(now, right)) last_now = now now = format_null_point((last_now['time'] + datetime.timedelta(seconds = interval)), fields) rv.append(now) # print('after: now = {}; right = {}'.format(now, right)) else: # print('before: now = {}; right = {}'.format(now, right)) rv.append({ 'time': (now['time'] + datetime.timedelta(seconds = diff / 2)), 'power2': now['power2'], 'voltage2': now['voltage2'], 'current2': now['current2'] }) rv.append(right) now = right idx = idx + 1 if idx >= total: break else: right = format_point(points[idx], fields) # print('after: now = {}; right = {}'.format(now, right)) if eTime > max_time: right_rv = [] next_time = max_time while True: next_time = (next_time + datetime.timedelta(seconds = interval)) right_rv.append(format_null_point(next_time, fields)) if next_time >= eTime: break if len(right_rv) > 0: rv.extend(right_rv) return rv def get_raw(self, devNo, port, sTime, eTime): def format_date(dateStr): return arrow.get(str(dateStr)).to(settings.TIME_ZONE).naive selectStr = "select time, power2, voltage2, current2 from %s where devno = '%s' and port = '%s' and time >= '%s' and time <= '%s' order by time" % ( self.enginer.full_qualify_meaurement, devNo, port, local2utc(sTime), local2utc(eTime)) points = self.enginer.read_data(selectStr) rv = [] for item in points: _time = item.pop('time') item['time'] = format_date(_time) rv.append(item) return rv class OfflineCoinsManager(object): inst = None @staticmethod def instence(): if OfflineCoinsManager.inst is not None: return OfflineCoinsManager.inst enginer = InfluxDBEngine(settings.INFLUXDB_DB, 'offline_coins') # type: InfluxDBEngine OfflineCoinsManager.inst = OfflineCoinsManager(enginer) return OfflineCoinsManager.inst def __init__(self, enginer): self.enginer = enginer # type: InfluxDBEngine def get(self, devNo, sTime, eTime): # type: (str, datetime, datetime)->list sTime = local2utc(sTime) eTime = local2utc(eTime) selectStr = "select time,port,coins,mode from offline_coins where devno = '%s' and time >= '%s' and time <= '%s' order by time" % ( devNo, sTime, eTime) points = self.enginer.read_data(selectStr) if not points: return [] rv = [] for point in points: item = { 'dateTimeAdded': arrow.get(point['time']).to(settings.TIME_ZONE).format('YYYY-MM-DD HH:mm:ss'), 'count': int(point['coins']) } if point['port']: item.update({'port': point['port']}) rv.append(item) return rv class OfflineManager(object): """ 离线设备消息通知 """ _CACHE = cache @staticmethod def cache_key(devNo): return "celery_device_offline_{}".format(devNo) @classmethod def add_cache(cls, devNo, tid): cls._CACHE.set(cls.cache_key(devNo), tid) @classmethod def delete_cache(cls, devNo): cls._CACHE.delete(cls.cache_key(devNo)) @classmethod def get_cache(cls, devNo): return cls._CACHE.get(cls.cache_key(devNo)) @staticmethod def prepare_registe_check(devNo): from apps.web.device.models import Device from apps.web.dealer.models import Dealer dev = Device.get_dev(devNo) if not dev: logger.error("[OFFLINE_DEVICE_TASK] registe not dev, devNo is <{}>".format(devNo)) return 0 dealer = Dealer.get_dealer(dev.ownerId) if not dealer: logger.error("[OFFLINE_DEVICE_TASK] registe not dealer, devNo is <{}>".format(devNo)) return 0 # 没有打开开关的经销商直接忽略 if not dealer.get("offlineNotifySwitch", False): logger.error("[OFFLINE_DEVICE_TASK] register not dealer switch! devNo is <{}>".format(devNo)) return 0 offlineNotifyTime = int(dealer.get("offlineNotifyTime", 1) or 1) return offlineNotifyTime @classmethod def registe_device_offline_notify(cls, devNo, check = True): """ 注册设备的离线通知 :param devNo: 设备编号 :param check: 是否无条件塞入队列 :return: """ # 发现任务已经被提交 退出不在注册通知任务 if cls.get_cache(devNo): logger.error("[OFFLINE_DEVICE_TASK] notify has beed registe! devNo is <{}>".format(devNo)) return # celery 异步注册任务 默认小时之后发送通知给经销商 if check: offlineNotifyTime = cls.prepare_registe_check(devNo) else: offlineNotifyTime = 1 if not offlineNotifyTime or offlineNotifyTime <= 0: return from taskmanager.mediator import task_caller result = task_caller( "device_offline_notify", delay = offlineNotifyTime * 3600, devNo = devNo ) if not result: logger.error("[OFFLINE_DEVICE_TASK] task caller not return a async result! devNo is <{}>".format(devNo)) return tid = str(result) logger.info("[OFFLINE_DEVICE_TASK] registe a task, devNo <{}>, tid <{}>".format(devNo, tid)) cls.add_cache(devNo, tid) @classmethod def discard_device_offline_notify(cls, devNo): """ 取消设备离线通知 :param devNo: :return: """ # 查看任务是否已经被注册 没有注册通知的就不进行处理 tid = cls.get_cache(devNo) if not tid: logger.info("[OFFLINE_DEVICE_TASK] not discard a task, devNo <{}>, taskId <{}>".format(devNo, tid)) return from taskmanager.operator import app as celeryApp celeryApp.control.revoke(tid) cls.delete_cache(devNo) logger.info("[OFFLINE_DEVICE_TASK] discard a task done, devNo <{}>, tid <{}>".format(devNo, tid))