123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import os
- import signal
- import sys
- import uuid
- import blinker
- import gevent
- from apps.web.constant import Const
- from apps.web.core.mqtt_client import MqttClient
- from apps.web.device.models import Device, DeviceDict
- message_received = blinker.signal('message-received')
- class MqttContext(object):
- def __init__(self, **kwargs):
- self.cmds = kwargs['cmds']
- self.topic_prefix = kwargs.get('topicPrefix', Const.SERVER_TOPIC_PREFIX)
- self.host = kwargs['host']
- self.port = kwargs['port']
- self.user = kwargs['user']
- self.password = kwargs['password']
- self.logger = kwargs['logger']
- self.platform_env = kwargs['platform_env']
- self.debug = kwargs['debug']
- self.queue = kwargs['queue']
- self.name = kwargs['name']
- self.checkRegistered = kwargs.get('checkRegistered', True)
- self.mqttc = MqttClient(client_id = '{}_{}'.format(self.name, str(uuid.uuid1())))
- self.mqttc.on_message = self.on_message
- self.mqttc.on_connect = self.on_connect
- self.mqttc.on_disconnect = self.on_disconnect
- def __handle_exit(self, sig, frame):
- self.logger.debug('handle signal. sig = {}, frame = {}'.format(sig, frame))
- if not self.alive:
- self.logger.debug('has handle this signal.')
- return
- self.alive = False
- mqttc = self.mqttc
- self.mqttc = None
- mqttc.disconnect()
- mqttc.close()
- def __set_signals(self):
- signal.signal(signal.SIGTERM, self.__handle_exit)
- signal.signal(signal.SIGINT, self.__handle_exit)
- # signal.signal(signal.SIGKILL, self.__handle_exit)
- if hasattr(signal, 'SIGQUIT'):
- signal.signal(signal.SIGQUIT, self.__handle_exit)
- # if hasattr(signal, 'siginterrupt'):
- # signal.siginterrupt(signal.SIGTERM, False)
- def __enter__(self):
- self.alive = True
- self.__set_signals()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if self.mqttc:
- self.mqttc.disconnect()
- self.mqttc.close()
- sys.exit(0)
- def on_connect(self, client, userdata, flags, rc):
- self.logger.debug(
- '[on_connect] eventer listener<client={}, server={}, env={}, debug={}> started successfully.'.format(
- repr(client), '{}:{}/{}'.format(self.host, self.port, 'mqtt'), self.platform_env, self.debug))
- for cmdNo, options in self.cmds.iteritems():
- if not self.queue:
- device_topic = '{}/+/{}'.format(self.topic_prefix, cmdNo)
- else:
- device_topic = '$queue/{}/+/{}'.format(self.topic_prefix, cmdNo)
- client.subscribe(device_topic, qos = options.get('qos', Const.MQTT_QOS))
- self.logger.debug('subscribe topic(%s) success' % device_topic)
- def on_disconnect(self, mqttc, userdata, rc):
- self.logger.debug(
- '[on_disconnect] mqttc<{},{}:{}>. userdata: {}; rc: {}'.format(
- mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc)))
- def on_message(self, mqttc, obj, msg):
- self.logger.debug('[on_message] {}: {}: {}'.format(os.getpid(), msg.topic, str(msg.payload)))
- tokens = msg.topic.split('/')
- devNo = tokens[-2]
- cmd = tokens[-1]
- prefix = '/'.join(tokens[0:len(tokens) - 2])
- if (prefix != self.topic_prefix) or (not devNo) or (not cmd):
- self.logger.warning('device report invalid msg.')
- return
- cmd = str(cmd)
- devNo = str(devNo)
- dev = Device.get_dev(devNo = devNo) # type: DeviceDict
- if dev is None:
- self.logger.warning('dev<{}> is not existed.'.format(devNo))
- return
- if (not self.debug and dev.debug) or (self.debug and not dev.debug) or (self.debug != dev.debug):
- self.logger.debug('script debug<{}> of {} is not equal device debug<{}>. msg = {}'.format(
- self.debug, dev.devNo, dev.debug, msg.payload))
- return
- if self.cmds[cmd].get('checkRegistered', True):
- if not dev.is_registered or not dev.devTypeCode:
- self.logger.warning('dev<{}> is not registered.'.format(devNo))
- return
- gevent.spawn(message_received.send, '/message', dev = dev, cmd = cmd, payload = msg.payload)
- def start(self):
- self.logger.debug('start mqtt.')
- self.mqttc.username_pw_set(self.user, self.password)
- self.mqttc.connect(self.host, self.port, 60)
- self.mqttc.loop_forever()
|