123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import os
- import signal
- import sys
- import threading
- import uuid
- from apps.web.constant import Const
- from apps.web.core.mqtt_client import MqttClient
- from apps.web.device.models import Device, DeviceDict
- class MqttContext(object):
- def __init__(self, **kwargs):
- self.worker = kwargs['worker']
- self.cmds = kwargs['cmds']
- self.topic_prefix = kwargs.get('topicPrefix', Const.SERVER_TOPIC_PREFIX)
- self.host = kwargs['host']
- self.port = kwargs['port']
- self.user = kwargs['user']
- self.password = kwargs['password']
- self.logger = kwargs['logger']
- self.platform_env = kwargs['platform_env']
- self.debug = kwargs['debug']
- self.queue = kwargs['queue']
- self.name = kwargs['name']
- self.checkRegistered = kwargs.get('checkRegistered', True)
- self.mqttc = MqttClient(client_id = '{}_{}'.format(self.name, str(uuid.uuid1())))
- self.mqttc.on_message = self.on_message
- self.mqttc.on_connect = self.on_connect
- self.mqttc.on_disconnect = self.on_disconnect
- def __handle_exit(self, sig, frame):
- self.logger.debug('handle signal. sig = {}, frame = {}'.format(sig, frame))
- if not self.alive:
- self.logger.debug('has handle this signal.')
- return
- self.alive = False
- mqttc = self.mqttc
- self.mqttc = None
- mqttc.disconnect()
- mqttc.close()
- def __set_signals(self):
- signal.signal(signal.SIGTERM, self.__handle_exit)
- signal.signal(signal.SIGINT, self.__handle_exit)
- # signal.signal(signal.SIGKILL, self.__handle_exit)
- if hasattr(signal, 'SIGQUIT'):
- signal.signal(signal.SIGQUIT, self.__handle_exit)
- # if hasattr(signal, 'siginterrupt'):
- # signal.siginterrupt(signal.SIGTERM, False)
- def __enter__(self):
- self.alive = True
- self.__set_signals()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if self.mqttc:
- self.mqttc.disconnect()
- self.mqttc.close()
- sys.exit(0)
- def on_connect(self, client, userdata, flags, rc):
- self.logger.debug(
- '[on_connect] eventer listener<client={}, server={}, env={}, debug={}> started successfully.'.format(
- repr(client), '{}:{}/{}'.format(self.host, self.port, 'mqtt'), self.platform_env, self.debug))
- for cmdNo, options in self.cmds.iteritems():
- if not self.queue:
- device_topic = '{}/+/{}'.format(self.topic_prefix, cmdNo)
- else:
- device_topic = '$queue/{}/+/{}'.format(self.topic_prefix, cmdNo)
- client.subscribe(device_topic, qos = options.get('qos', Const.MQTT_QOS))
- self.logger.debug('subscribe topic(%s) success' % device_topic)
- def on_disconnect(self, mqttc, userdata, rc):
- self.logger.debug(
- '[on_disconnect] mqttc<{},{}:{}>. userdata: {}; rc: {}'.format(
- mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc)))
- def on_message(self, mqttc, obj, msg):
- self.logger.debug('[on_message] {}: {}: {}'.format(os.getpid(), msg.topic, str(msg.payload)))
- tokens = msg.topic.split('/')
- devNo = tokens[-2]
- cmd = tokens[-1]
- prefix = '/'.join(tokens[0:len(tokens) - 2])
- if (prefix != self.topic_prefix) or (not devNo) or (not cmd):
- self.logger.warning('device report invalid msg.')
- return
- cmd = str(cmd)
- devNo = str(devNo)
- dev = Device.get_dev(devNo = devNo) # type: DeviceDict
- if dev is None:
- self.logger.warning('dev<{}> is not existed.'.format(devNo))
- return
- # 特别的处理, 解决昌源注册错误造成流量问题
- if cmd == '100' and dev.driverCode == Const.DEVICE_TYPE_CODE_CHANGING_CY4:
- if not dev.is_registered or dev.devTypeCode != Const.DEVICE_TYPE_CODE_CHANGING_CY4:
- if 'data' in msg.payload:
- from apps.web.core.helpers import ActionDeviceBuilder
- adapter = ActionDeviceBuilder.create_action_device(dev, Const.DEVICE_TYPE_CODE_CHANGING_CY4)
- event_data = adapter.analyze_event_data(msg.payload['data'])
- if event_data.get("cmdCode") in ["D3", "D9", "DA"]:
- self.logger.warning(
- 'changyuan4 register error for {}. payload = {}'.format(dev, str(msg.payload)))
- return adapter.ack_event()
- if (not self.debug and dev.debug) or (self.debug and not dev.debug) or (self.debug != dev.debug):
- self.logger.debug('script debug<{}> of {} is not equal device debug<{}>. msg = {}'.format(
- self.debug, dev.devNo, dev.debug, msg.payload))
- return
- if self.cmds[cmd].get('checkRegistered', True):
- if not dev.is_registered or not dev.devTypeCode:
- self.logger.warning('dev<{}> is not registered.'.format(devNo))
- return
- t = threading.Thread(target = self.worker, args = (cmd, dev, msg.payload,))
- t.setDaemon(False)
- t.start()
- def start(self):
- self.logger.debug('start mqtt.')
- self.mqttc.username_pw_set(self.user, self.password)
- self.mqttc.connect(self.host, self.port, 60)
- self.mqttc.loop_forever()
|