# -*- coding: utf-8 -*- # !/usr/bin/env python import os import signal import sys import uuid import blinker import gevent from apps.web.constant import Const from apps.web.core.mqtt_client import MqttClient from apps.web.device.models import Device, DeviceDict message_received = blinker.signal('message-received') class MqttContext(object): def __init__(self, **kwargs): self.cmds = kwargs['cmds'] self.topic_prefix = kwargs.get('topicPrefix', Const.SERVER_TOPIC_PREFIX) self.host = kwargs['host'] self.port = kwargs['port'] self.user = kwargs['user'] self.password = kwargs['password'] self.logger = kwargs['logger'] self.platform_env = kwargs['platform_env'] self.debug = kwargs['debug'] self.queue = kwargs['queue'] self.name = kwargs['name'] self.checkRegistered = kwargs.get('checkRegistered', True) self.mqttc = MqttClient(client_id = '{}_{}'.format(self.name, str(uuid.uuid1()))) self.mqttc.on_message = self.on_message self.mqttc.on_connect = self.on_connect self.mqttc.on_disconnect = self.on_disconnect def __handle_exit(self, sig, frame): self.logger.debug('handle signal. sig = {}, frame = {}'.format(sig, frame)) if not self.alive: self.logger.debug('has handle this signal.') return self.alive = False mqttc = self.mqttc self.mqttc = None mqttc.disconnect() mqttc.close() def __set_signals(self): signal.signal(signal.SIGTERM, self.__handle_exit) signal.signal(signal.SIGINT, self.__handle_exit) # signal.signal(signal.SIGKILL, self.__handle_exit) if hasattr(signal, 'SIGQUIT'): signal.signal(signal.SIGQUIT, self.__handle_exit) # if hasattr(signal, 'siginterrupt'): # signal.siginterrupt(signal.SIGTERM, False) def __enter__(self): self.alive = True self.__set_signals() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.mqttc: self.mqttc.disconnect() self.mqttc.close() sys.exit(0) def on_connect(self, client, userdata, flags, rc): self.logger.debug( '[on_connect] eventer listener started successfully.'.format( repr(client), '{}:{}/{}'.format(self.host, self.port, 'mqtt'), self.platform_env, self.debug)) for cmdNo, options in self.cmds.iteritems(): if not self.queue: device_topic = '{}/+/{}'.format(self.topic_prefix, cmdNo) else: device_topic = '$queue/{}/+/{}'.format(self.topic_prefix, cmdNo) client.subscribe(device_topic, qos = options.get('qos', Const.MQTT_QOS)) self.logger.debug('subscribe topic(%s) success' % device_topic) def on_disconnect(self, mqttc, userdata, rc): self.logger.debug( '[on_disconnect] mqttc<{},{}:{}>. userdata: {}; rc: {}'.format( mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc))) def on_message(self, mqttc, obj, msg): self.logger.debug('[on_message] {}: {}: {}'.format(os.getpid(), msg.topic, str(msg.payload))) tokens = msg.topic.split('/') devNo = tokens[-2] cmd = tokens[-1] prefix = '/'.join(tokens[0:len(tokens) - 2]) if (prefix != self.topic_prefix) or (not devNo) or (not cmd): self.logger.warning('device report invalid msg.') return cmd = str(cmd) devNo = str(devNo) dev = Device.get_dev(devNo = devNo) # type: DeviceDict if dev is None: self.logger.warning('dev<{}> is not existed.'.format(devNo)) return if (not self.debug and dev.debug) or (self.debug and not dev.debug) or (self.debug != dev.debug): self.logger.debug('script debug<{}> of {} is not equal device debug<{}>. msg = {}'.format( self.debug, dev.devNo, dev.debug, msg.payload)) return if self.cmds[cmd].get('checkRegistered', True): if not dev.is_registered or not dev.devTypeCode: self.logger.warning('dev<{}> is not registered.'.format(devNo)) return gevent.spawn(message_received.send, '/message', dev = dev, cmd = cmd, payload = msg.payload) def start(self): self.logger.debug('start mqtt.') self.mqttc.username_pw_set(self.user, self.password) self.mqttc.connect(self.host, self.port, 60) self.mqttc.loop_forever()