mqtt_context.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import os
  4. import signal
  5. import sys
  6. import threading
  7. import uuid
  8. from apps.web.constant import Const
  9. from apps.web.core.mqtt_client import MqttClient
  10. from apps.web.device.models import Device, DeviceDict
  11. class MqttContext(object):
  12. def __init__(self, **kwargs):
  13. self.worker = kwargs['worker']
  14. self.cmds = kwargs['cmds']
  15. self.topic_prefix = kwargs.get('topicPrefix', Const.SERVER_TOPIC_PREFIX)
  16. self.host = kwargs['host']
  17. self.port = kwargs['port']
  18. self.user = kwargs['user']
  19. self.password = kwargs['password']
  20. self.logger = kwargs['logger']
  21. self.platform_env = kwargs['platform_env']
  22. self.debug = kwargs['debug']
  23. self.queue = kwargs['queue']
  24. self.name = kwargs['name']
  25. self.checkRegistered = kwargs.get('checkRegistered', True)
  26. self.mqttc = MqttClient(client_id = '{}_{}'.format(self.name, str(uuid.uuid1())))
  27. self.mqttc.on_message = self.on_message
  28. self.mqttc.on_connect = self.on_connect
  29. self.mqttc.on_disconnect = self.on_disconnect
  30. def __handle_exit(self, sig, frame):
  31. self.logger.debug('handle signal. sig = {}, frame = {}'.format(sig, frame))
  32. if not self.alive:
  33. self.logger.debug('has handle this signal.')
  34. return
  35. self.alive = False
  36. mqttc = self.mqttc
  37. self.mqttc = None
  38. mqttc.disconnect()
  39. mqttc.close()
  40. def __set_signals(self):
  41. signal.signal(signal.SIGTERM, self.__handle_exit)
  42. signal.signal(signal.SIGINT, self.__handle_exit)
  43. # signal.signal(signal.SIGKILL, self.__handle_exit)
  44. if hasattr(signal, 'SIGQUIT'):
  45. signal.signal(signal.SIGQUIT, self.__handle_exit)
  46. # if hasattr(signal, 'siginterrupt'):
  47. # signal.siginterrupt(signal.SIGTERM, False)
  48. def __enter__(self):
  49. self.alive = True
  50. self.__set_signals()
  51. return self
  52. def __exit__(self, exc_type, exc_val, exc_tb):
  53. if self.mqttc:
  54. self.mqttc.disconnect()
  55. self.mqttc.close()
  56. sys.exit(0)
  57. def on_connect(self, client, userdata, flags, rc):
  58. self.logger.debug(
  59. '[on_connect] eventer listener<client={}, server={}, env={}, debug={}> started successfully.'.format(
  60. repr(client), '{}:{}/{}'.format(self.host, self.port, 'mqtt'), self.platform_env, self.debug))
  61. for cmdNo, options in self.cmds.iteritems():
  62. if not self.queue:
  63. device_topic = '{}/+/{}'.format(self.topic_prefix, cmdNo)
  64. else:
  65. device_topic = '$queue/{}/+/{}'.format(self.topic_prefix, cmdNo)
  66. client.subscribe(device_topic, qos = options.get('qos', Const.MQTT_QOS))
  67. self.logger.debug('subscribe topic(%s) success' % device_topic)
  68. def on_disconnect(self, mqttc, userdata, rc):
  69. self.logger.debug(
  70. '[on_disconnect] mqttc<{},{}:{}>. userdata: {}; rc: {}'.format(
  71. mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc)))
  72. def on_message(self, mqttc, obj, msg):
  73. self.logger.debug('[on_message] {}: {}: {}'.format(os.getpid(), msg.topic, str(msg.payload)))
  74. tokens = msg.topic.split('/')
  75. devNo = tokens[-2]
  76. cmd = tokens[-1]
  77. prefix = '/'.join(tokens[0:len(tokens) - 2])
  78. if (prefix != self.topic_prefix) or (not devNo) or (not cmd):
  79. self.logger.warning('device report invalid msg.')
  80. return
  81. cmd = str(cmd)
  82. devNo = str(devNo)
  83. dev = Device.get_dev(devNo = devNo) # type: DeviceDict
  84. if dev is None:
  85. self.logger.warning('dev<{}> is not existed.'.format(devNo))
  86. return
  87. # 特别的处理, 解决昌源注册错误造成流量问题
  88. if cmd == '100' and dev.driverCode == Const.DEVICE_TYPE_CODE_CHANGING_CY4:
  89. if not dev.is_registered or dev.devTypeCode != Const.DEVICE_TYPE_CODE_CHANGING_CY4:
  90. if 'data' in msg.payload:
  91. from apps.web.core.helpers import ActionDeviceBuilder
  92. adapter = ActionDeviceBuilder.create_action_device(dev, Const.DEVICE_TYPE_CODE_CHANGING_CY4)
  93. event_data = adapter.analyze_event_data(msg.payload['data'])
  94. if event_data.get("cmdCode") in ["D3", "D9", "DA"]:
  95. self.logger.warning(
  96. 'changyuan4 register error for {}. payload = {}'.format(dev, str(msg.payload)))
  97. return adapter.ack_event()
  98. if (not self.debug and dev.debug) or (self.debug and not dev.debug) or (self.debug != dev.debug):
  99. self.logger.debug('script debug<{}> of {} is not equal device debug<{}>. msg = {}'.format(
  100. self.debug, dev.devNo, dev.debug, msg.payload))
  101. return
  102. if self.cmds[cmd].get('checkRegistered', True):
  103. if not dev.is_registered or not dev.devTypeCode:
  104. self.logger.warning('dev<{}> is not registered.'.format(devNo))
  105. return
  106. t = threading.Thread(target = self.worker, args = (cmd, dev, msg.payload,))
  107. t.setDaemon(False)
  108. t.start()
  109. def start(self):
  110. self.logger.debug('start mqtt.')
  111. self.mqttc.username_pw_set(self.user, self.password)
  112. self.mqttc.connect(self.host, self.port, 60)
  113. self.mqttc.loop_forever()