# -*- coding: utf-8 -*- # !/usr/bin/env python import getopt import logging import sys from typing import TYPE_CHECKING try: options, args = getopt.getopt(sys.argv[1:], 'l:q:e:p:d:m:b', ['queue=', 'env=', 'port=', 'debug=']) except getopt.GetoptError as e: print(str(e)) sys.exit() platform_env = 'testing' queue = False port = None debug = '' for name, value in options: if name in ('-e', '--env'): platform_env = value if name in ('-q', '--queue'): queue = value if name in ('-p', '--port'): port = int(value) if name in ('-d', '--debug'): debug = value if not port: print 'no port parameter.' sys.exit(1) import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env)) PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..') sys.path.insert(0, PROJECT_ROOT) from script.base import init_env init_env(interactive = False) from django.conf import settings from apps.web.core.sysparas import SysParas from script.eventer.handlers import UnknowCmdHandler from script.eventer.handlers.shake_hand import ShakeHandHandler from script.eventer.handlers.offline_coins import OfflineCoinsHandler from script.eventer.mqtt_context import MqttContext from apps.web.constant import Const from apps.web.device.models import DeviceDict if TYPE_CHECKING: pass logger = logging.getLogger(__name__) host = SysParas.get_private_ip(settings.MQTT_HOSTNAME) def worker(cmd, dev, payload): # type: (str, DeviceDict, str)->None try: if cmd == '507': ShakeHandHandler(cmd, dev, payload).do() # 添加特性用来过滤大量507, 推送端口的时候需要注意配置特性 if dev.is_registered and dev.owner.supports("supportDelixi"): from apps.web.south_intf.delixi import DelixiNorther DelixiNorther.send_port_status(dev) # 上海城运消息推送 try: if dev.is_registered: from apps.web.south_intf.shanghai_urban_data_collection_platform import \ ShangHaiUrbanDataCollectionPlatformModel, ShangHaiUrbanDataCollectionPlatform dealer_info = ShangHaiUrbanDataCollectionPlatformModel.get_dealer_info(dev.ownerId) if dealer_info: ShangHaiUrbanDataCollectionPlatform(dev, **dealer_info).celery_push_heatbeat() except: pass elif cmd == '505' or cmd == '205': OfflineCoinsHandler(cmd, dev, payload).do() else: UnknowCmdHandler(cmd, dev, payload).do() except Exception as e: logger.exception(e) with MqttContext(host = host, port = port, user = settings.MQTT_USER, password = settings.MQTT_PSWD, worker = worker, platform_env = platform_env, queue = queue, debug = debug, cmds = { '205': { 'qos': Const.MQTT_QOS, 'checkRegistered': True }, '505': { 'qos': Const.MQTT_QOS, 'checkRegistered': True }, '507': { 'qos': Const.MQTT_QOS, 'checkRegistered': False } }, logger = logger, name = 'new_listener') as context: context.start() logger.debug('exit now.')