# -*- coding: utf-8 -*- # !/usr/bin/env python import Queue import time import logging import uuid import socket from library.paho.mqtt import client from library.paho.mqtt.client import WebsocketConnectionError, mqtt_cs_disconnecting, MQTT_ERR_SUCCESS, \ mqtt_cs_connected from apps.web.constant import ErrorCode import simplejson as json from apps.web.core.exceptions import MqttSubscribeError, MqttPublishError, MqttConnectError logger = logging.getLogger(__name__) def get_client_id(prefix = 'webapp_'): # type: ()->str if '_' not in prefix: prefix = prefix + '_' return prefix + str(uuid.uuid4()) class MqttClient(client.Client): def __init__(self, client_id = '', **kwargs): # type: (str, dict)->None client_id = client_id if client_id else get_client_id() super(MqttClient, self).__init__(client_id = client_id, **kwargs) def __repr__(self): return '' % (self._client_id,) def close(self): if self._sock: self._sock.close() self._sock = None if self._sockpairR: self._sockpairR.close() self._sockpairR = None if self._sockpairW: self._sockpairW.close() self._sockpairW = None class MqttSendClientSync(MqttClient): RECONNECT_WAIT_TIME = 5 WAIT_TIME_PER_LOOP = 1 MAX_CONNECT_TIMEOUT = 15 MAX_SUBSCRIBE_TIMEOUT = 15 MY_SOCKET_TIMEOUT = 15 @staticmethod def _on_log_impl(mqttc, userdata, level, buf): logger.debug( '[MqttSendClientSync:on_log][{}] mqttc<{},{}:{}>. userdata: {}; level: {}; buffer = {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), level, buf)) @staticmethod def _on_socket_open_impl(mqttc, userdata, sock): logger.debug( '[MqttSendClientSync:on_socket_open][{}] mqttc<{},{}:{}>. userdata: {}; sock: {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(sock))) sock.settimeout(MqttSendClientSync.MY_SOCKET_TIMEOUT) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @staticmethod def _on_disconnect_impl(mqttc, userdata, rc): logger.debug( '[MqttSendClientSync:on_disconnect][{}] mqttc<{},{}:{}>, userdata: {}; rc: {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc))) @staticmethod def _on_connect_impl(mqttc, userdata, flags, rc, properties = None): logger.debug( '[MqttSendClientSync:on_connect][{}] mqttc<{},{}:{}>, userdata = {}, flags = {}, rc = {}, properties = {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(flags), repr(rc), repr(properties))) subscribe_result, mid = mqttc.subscribe(mqttc.server_topic, qos = mqttc.qos) if subscribe_result != MQTT_ERR_SUCCESS: raise MqttSubscribeError('send subscribe failure.') mqttc.subscribe_count = 0 mqttc.subscribe_time = int(time.time()) @staticmethod def _on_message_impl(mqttc, obj, msg): logger.debug( '[MqttSendClientSync:on_message][{}] mqttc<{},{}:{}>, timestamp: {}; topic: {}; msg: {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, msg.timestamp, msg.topic, str(msg.payload))) received = json.loads(bytes.decode(msg.payload)) if 'sid' not in received: logger.debug('[MqttSendClientSync:on_message][{}] device not support sid.'.format(mqttc.sid, mqttc.sent[ 'IMEI'])) elif received['sid'] != mqttc.sent['sid']: logger.debug( '[MqttSendClientSync:on_message][{}] device skip sid not belong to me. sid = {}; my sid = {}'.format( mqttc.sid, mqttc.sent['IMEI'], received['sid'] if 'sid' in received else '', mqttc.sent['sid'])) return if mqttc.received.empty(): mqttc.received.put(received) mqttc.end_loop() @staticmethod def _on_subscribe_impl(mqttc, obj, mid, granted_qos): logger.debug( '[MqttSendClientSync:on_subscribe][{}] mqttc<{},{}:{}>, obj = {}, mid = {}, granted_qos = {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(obj), repr(mid), repr(granted_qos))) mqttc.subscribed = True if mqttc.publish_count == -1: mqttc.publish(mqttc.smart_box_topic, json.dumps(mqttc.sent), mqttc.qos) mqttc.round = 0 mqttc.publish_count = 0 mqttc.publish_time = int(time.time()) @staticmethod def _on_publish_impl(mqttc, userdata, mid): logger.debug( '[MqttSendClientSync:on_publish][{}] mqttc<{},{}:{}>, userdata = {}, mid = {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(mid))) mqttc.published = True def __init__(self, client_id, sid, server_topic, smart_box_topic, sent, qos = 0, **kwargs): # type: (str, str, str, str, str, int, dict)->None client_id = client_id if client_id else get_client_id() super(MqttSendClientSync, self).__init__(client_id = client_id, **kwargs) self.server_topic = server_topic self.smart_box_topic = smart_box_topic self.sent = sent self.sid = sid self.qos = qos self._exit = False self.connect_count = 0 self.connect_time = int(time.time()) self.subscribed = False self.subscribe_count = 0 self.subscribe_time = None self.published = False self.round = 0 self.publish_count = -1 self.publish_time = None self.received = Queue.Queue(1) self.on_message = self._on_message_impl self.on_connect = self._on_connect_impl self.on_disconnect = self._on_disconnect_impl self.on_subscribe = self._on_subscribe_impl self.on_publish = self._on_publish_impl self.on_log = self._on_log_impl self.on_socket_open = self._on_socket_open_impl def send(self, timeout, retry): max_time_over = (int(time.time()) + (timeout * (retry + 1)) + 5) while True: if int(time.time()) >= max_time_over: logger.debug( '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, time is over.'.format( self.sid, self._client_id, self._host, self._port)) break if self._state == mqtt_cs_disconnecting: logger.debug( '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, wait for connected.'.format( self.sid, self._client_id, self._host, self._port)) self._reconnect_wait() if self._state != mqtt_cs_connected: try: self.connect_count = 0 self.connect_time = None self.subscribed = False self.subscribe_count = 0 self.subscribe_time = None self.reconnect() self.connect_time = int(time.time()) except (socket.error, WebsocketConnectionError): continue super(MqttSendClientSync, self).loop(timeout = self.WAIT_TIME_PER_LOOP) if self._exit is True: break if self._state != mqtt_cs_connected: # if (self.connect_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_CONNECT_TIMEOUT: if (int(time.time()) - self.connect_time) >= self.MAX_CONNECT_TIMEOUT: raise MqttConnectError('[{}] mqtt connect timeout.'.format(self.sid)) else: # time.sleep(self.WAIT_TIME_PER_LOOP) # max_time_over += self.WAIT_TIME_PER_LOOP self.connect_count += 1 elif not self.subscribed: # if (self.subscribe_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_SUBSCRIBE_TIMEOUT: if (int(time.time()) - self.subscribe_time) >= self.MAX_SUBSCRIBE_TIMEOUT: # self.subscribe_count = 0 raise MqttSubscribeError('[{}] mqtt subscribe timeout.'.format(self.sid)) else: # time.sleep(self.WAIT_TIME_PER_LOOP) # max_time_over += self.WAIT_TIME_PER_LOOP self.subscribe_count += 1 else: # if (self.publish_count * self.WAIT_TIME_PER_LOOP) >= timeout: if (int(time.time()) - self.publish_time) >= timeout: if self.round >= retry: logger.debug('[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, exceed max retries({}).'.format( self.sid, self._client_id, self._host, self._port, retry)) break else: logger.debug( '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, start another round({}).'.format( self.sid, self._client_id, self._host, self._port, self.round)) self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos) self.round += 1 self.publish_count = 0 self.publish_time = int(time.time()) else: # time.sleep(self.WAIT_TIME_PER_LOOP) self.publish_count += 1 if not self.published: raise MqttPublishError('[{}] mqtt publish timeout.'.format(self.sid)) if not self.received.empty(): return self.received.get() else: return { 'cmd': self.sent['cmd'], 'IMEI': self.sent['IMEI'], 'rst': ErrorCode.DEVICE_CONN_FAIL } def end_loop(self): self._exit = True class MqttSendClientAsync(MqttClient): RECONNECT_WAIT_TIME = 5 WAIT_TIME_PER_LOOP = 1 MAX_CONNECT_TIMEOUT = 15 MY_SOCKET_TIMEOUT = 15 @staticmethod def _on_log_impl(mqttc, userdata, level, buf): logger.debug( '[MqttSendClientAsync:on_log][{}] mqttc<{},{}:{}>. userdata: {}; level: {}; buffer = {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), level, buf)) @staticmethod def _on_socket_open_impl(mqttc, userdata, sock): logger.debug( '[MqttSendClientAsync:on_socket_open][{}] mqttc<{},{}:{}>. userdata: {}; sock: {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(sock))) sock.settimeout(MqttSendClientSync.MY_SOCKET_TIMEOUT) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @staticmethod def _on_disconnect_impl(mqttc, userdata, rc): logger.debug( '[MqttSendClientAsync:on_disconnect][{}] mqttc<{},{}:{}>. userdata: {}; rc: {}'.format( mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc))) @staticmethod def _on_connect_impl(mqttc, userdata, flags, rc, properties = None): logger.debug( '[MqttSendClientAsync:on_connect][{}] mqttc = {}, userdata = {}, flags = {}, rc = {}, properties = {}'.format( mqttc.sid, repr(mqttc), repr(userdata), repr(flags), repr(rc), repr(properties))) @staticmethod def _on_publish_impl(mqttc, userdata, mid): logger.debug( '[MqttSendClientAsync:on_publish][{}] client = {}, userdata = {}, mid = {}'.format( mqttc.sid, repr(mqttc), repr(userdata), repr(mid))) mqttc.published = True mqttc.end_loop() def __init__(self, client_id, sid, smart_box_topic, sent, qos = 0, **kwargs): # type: (str, str, str, str, int, dict)->None client_id = client_id if client_id else get_client_id() super(MqttSendClientAsync, self).__init__(client_id = client_id, **kwargs) self.qos = qos self.smart_box_topic = smart_box_topic self.sent = sent self.sid = sid self._exit = False self.connect_count = 0 self.connect_time = int(time.time()) self.published = False self.publish_count = -1 self.publish_time = None self.round = 0 self.on_connect = self._on_connect_impl self.on_disconnect = self._on_disconnect_impl self.on_publish = self._on_publish_impl self.on_socket_open = self._on_socket_open_impl self.on_log = self._on_log_impl def send(self, timeout, retry = 0): max_time_over = (int(time.time()) + timeout + 5) while True: if int(time.time()) >= max_time_over: logger.debug( '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, time is over.'.format( self.sid, self._client_id, self._host, self._port)) break if self._state == mqtt_cs_disconnecting: logger.debug( '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, wait for connected.'.format( self.sid, self._client_id, self._host, self._port)) self._reconnect_wait() if self._state != mqtt_cs_connected: try: self.connect_count = 0 self.connect_time = None self.reconnect() self.connect_time = int(time.time()) except (socket.error, WebsocketConnectionError): continue super(MqttSendClientAsync, self).loop(timeout = 0) if self._exit is True: break if self._state != mqtt_cs_connected: # if (self.connect_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_CONNECT_TIMEOUT: if (int(time.time()) - self.connect_time) >= self.MAX_CONNECT_TIMEOUT: raise MqttConnectError('[{}] mqtt connect timeout.'.format(self.sid)) else: # time.sleep(self.WAIT_TIME_PER_LOOP) # max_time_over += self.WAIT_TIME_PER_LOOP self.connect_count += 1 else: if self.publish_count == -1: self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos) self.publish_count = 0 self.round = 0 self.publish_time = int(time.time()) else: # if (self.publish_count * self.WAIT_TIME_PER_LOOP) >= timeout: if (int(time.time()) - self.publish_time) >= timeout: if self.round >= retry: logger.debug( '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, exceed max retries({}).'.format( self.sid, self._client_id, self._host, self._port, retry)) break else: logger.debug( '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, start another round({}).'.format( self.sid, self._client_id, self._host, self._port, self.round)) self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos) self.round += 1 self.publish_count = 0 self.publish_time = int(time.time()) else: # time.sleep(self.WAIT_TIME_PER_LOOP) self.publish_count += 1 if not self.published: raise MqttPublishError('[{}] mqtt publish timeout.'.format(self.sid)) def end_loop(self): self._exit = True