mqtt_context_co.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import os
  4. import signal
  5. import sys
  6. import uuid
  7. import blinker
  8. import gevent
  9. from apps.web.constant import Const
  10. from apps.web.core.mqtt_client import MqttClient
  11. from apps.web.device.models import Device, DeviceDict
  12. message_received = blinker.signal('message-received')
  13. class MqttContext(object):
  14. def __init__(self, **kwargs):
  15. self.cmds = kwargs['cmds']
  16. self.topic_prefix = kwargs.get('topicPrefix', Const.SERVER_TOPIC_PREFIX)
  17. self.host = kwargs['host']
  18. self.port = kwargs['port']
  19. self.user = kwargs['user']
  20. self.password = kwargs['password']
  21. self.logger = kwargs['logger']
  22. self.platform_env = kwargs['platform_env']
  23. self.debug = kwargs['debug']
  24. self.queue = kwargs['queue']
  25. self.name = kwargs['name']
  26. self.checkRegistered = kwargs.get('checkRegistered', True)
  27. self.mqttc = MqttClient(client_id = '{}_{}'.format(self.name, str(uuid.uuid1())))
  28. self.mqttc.on_message = self.on_message
  29. self.mqttc.on_connect = self.on_connect
  30. self.mqttc.on_disconnect = self.on_disconnect
  31. def __handle_exit(self, sig, frame):
  32. self.logger.debug('handle signal. sig = {}, frame = {}'.format(sig, frame))
  33. if not self.alive:
  34. self.logger.debug('has handle this signal.')
  35. return
  36. self.alive = False
  37. mqttc = self.mqttc
  38. self.mqttc = None
  39. mqttc.disconnect()
  40. mqttc.close()
  41. def __set_signals(self):
  42. signal.signal(signal.SIGTERM, self.__handle_exit)
  43. signal.signal(signal.SIGINT, self.__handle_exit)
  44. # signal.signal(signal.SIGKILL, self.__handle_exit)
  45. if hasattr(signal, 'SIGQUIT'):
  46. signal.signal(signal.SIGQUIT, self.__handle_exit)
  47. # if hasattr(signal, 'siginterrupt'):
  48. # signal.siginterrupt(signal.SIGTERM, False)
  49. def __enter__(self):
  50. self.alive = True
  51. self.__set_signals()
  52. return self
  53. def __exit__(self, exc_type, exc_val, exc_tb):
  54. if self.mqttc:
  55. self.mqttc.disconnect()
  56. self.mqttc.close()
  57. sys.exit(0)
  58. def on_connect(self, client, userdata, flags, rc):
  59. self.logger.debug(
  60. '[on_connect] eventer listener<client={}, server={}, env={}, debug={}> started successfully.'.format(
  61. repr(client), '{}:{}/{}'.format(self.host, self.port, 'mqtt'), self.platform_env, self.debug))
  62. for cmdNo, options in self.cmds.iteritems():
  63. if not self.queue:
  64. device_topic = '{}/+/{}'.format(self.topic_prefix, cmdNo)
  65. else:
  66. device_topic = '$queue/{}/+/{}'.format(self.topic_prefix, cmdNo)
  67. client.subscribe(device_topic, qos = options.get('qos', Const.MQTT_QOS))
  68. self.logger.debug('subscribe topic(%s) success' % device_topic)
  69. def on_disconnect(self, mqttc, userdata, rc):
  70. self.logger.debug(
  71. '[on_disconnect] mqttc<{},{}:{}>. userdata: {}; rc: {}'.format(
  72. mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc)))
  73. def on_message(self, mqttc, obj, msg):
  74. self.logger.debug('[on_message] {}: {}: {}'.format(os.getpid(), msg.topic, str(msg.payload)))
  75. tokens = msg.topic.split('/')
  76. devNo = tokens[-2]
  77. cmd = tokens[-1]
  78. prefix = '/'.join(tokens[0:len(tokens) - 2])
  79. if (prefix != self.topic_prefix) or (not devNo) or (not cmd):
  80. self.logger.warning('device report invalid msg.')
  81. return
  82. cmd = str(cmd)
  83. devNo = str(devNo)
  84. dev = Device.get_dev(devNo = devNo) # type: DeviceDict
  85. if dev is None:
  86. self.logger.warning('dev<{}> is not existed.'.format(devNo))
  87. return
  88. if (not self.debug and dev.debug) or (self.debug and not dev.debug) or (self.debug != dev.debug):
  89. self.logger.debug('script debug<{}> of {} is not equal device debug<{}>. msg = {}'.format(
  90. self.debug, dev.devNo, dev.debug, msg.payload))
  91. return
  92. if self.cmds[cmd].get('checkRegistered', True):
  93. if not dev.is_registered or not dev.devTypeCode:
  94. self.logger.warning('dev<{}> is not registered.'.format(devNo))
  95. return
  96. gevent.spawn(message_received.send, '/message', dev = dev, cmd = cmd, payload = msg.payload)
  97. def start(self):
  98. self.logger.debug('start mqtt.')
  99. self.mqttc.username_pw_set(self.user, self.password)
  100. self.mqttc.connect(self.host, self.port, 60)
  101. self.mqttc.loop_forever()