# -*- coding: utf-8 -*- # !/usr/bin/env python from gevent import monkey monkey.patch_all() import datetime import getopt import logging import sys import time from typing import TYPE_CHECKING try: options, args = getopt.getopt(sys.argv[1:], 'l:q:e:p:d:m:b', ['queue=', 'env=', 'port=', 'debug=']) except getopt.GetoptError as e: print(str(e)) sys.exit() platform_env = 'testing' queue = False port = None debug = '' for name, value in options: if name in ('-e', '--env'): platform_env = value if name in ('-q', '--queue'): queue = value if name in ('-p', '--port'): port = int(value) if name in ('-d', '--debug'): debug = value if not port: print 'no port parameter.' sys.exit(1) import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env)) PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..') sys.path.insert(0, PROJECT_ROOT) from script.base import init_env init_env(interactive = False) import simplejson as json from django.conf import settings from apps.web.core.sysparas import SysParas from script.eventer.mqtt_context_co import MqttContext, message_received from apps.web.device.timescale import OfflineManager, FluentedEngine from apps.web.device.models import Device, DeviceCmdStatics from apps.web.constant import Const if TYPE_CHECKING: from apps.web.device.models import DeviceDict logger = logging.getLogger(__name__) host = SysParas.get_private_ip(settings.MQTT_HOSTNAME) @message_received.connect_via('/message') def worker(sender, cmd, dev, payload): # type: (object, str, DeviceDict, str)->None if cmd == 'log': logger.debug('log message[{}]: {}'.format(dev.devNo, payload)) else: try: device_event = json.loads(bytes.decode(payload)) except ValueError: logger.error( "can not loads message payload, cmd = <{}>, dev= <{}>, payload = <{}>".format(cmd, dev.devNo, payload)) return if dev.devNo != device_event.get('IMEI', None): logger.warning('invalid msg for not same devNo({} != {})'.format(dev.devNo, device_event.get('IMEI'))) return if cmd != str(device_event.get('cmd', '')): logger.warning('invalid msg for not same cmd({} != {})'.format(cmd, device_event.get('cmd', None))) return if cmd == '208': FluentedEngine().in_signal_udp(devNo = dev.devNo, ts = int(time.time()), signal = 0, cmd = cmd) # 准备发起设备离线通知 OfflineManager.registe_device_offline_notify(devNo = dev.devNo) elif cmd in ['207', '200']: try: signal = int(device_event.get('signal', 10)) except Exception: return FluentedEngine().in_signal_udp(devNo = dev.devNo, ts = int(time.time()), signal = signal, cmd = cmd) # 设备离线通知的取消 OfflineManager.discard_device_offline_notify(devNo = dev.devNo) if cmd == '200': # 此处记录下sim卡的激活时间,不想放到设备管理中做,放这里做方便修改 logger.debug('dev<{}> 200 message.'.format(dev.devNo)) if (not dev.has_key('ownerId')) or (not dev['ownerId']): try: devObj = dev.my_obj if not devObj.simActiveFirstTime: devObj.simActiveFirstTime = datetime.datetime.now() devObj.save() except Exception, e: pass DeviceCmdStatics.record(dev.devNo, '200') else: pass with MqttContext(host = host, port = port, user = settings.MQTT_USER, password = settings.MQTT_PSWD, platform_env = platform_env, queue = queue, debug = debug, cmds = { '207': { 'qos': Const.MQTT_QOS, 'checkRegistered': False }, '200': { 'qos': Const.MQTT_QOS, 'checkRegistered': False }, '208': { 'qos': Const.MQTT_QOS, 'checkRegistered': False }, 'log': { 'qos': Const.MQTT_QOS, 'checkRegistered': False } }, logger = logger, name = 'monitor', checkRegistered = False) as context: context.start() logger.debug('exit now.')