123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import Queue
- import time
- import logging
- import uuid
- import socket
- from library.paho.mqtt import client
- from library.paho.mqtt.client import WebsocketConnectionError, mqtt_cs_disconnecting, MQTT_ERR_SUCCESS, \
- mqtt_cs_connected
- from apps.web.constant import ErrorCode
- import simplejson as json
- from apps.web.core.exceptions import MqttSubscribeError, MqttPublishError, MqttConnectError
- logger = logging.getLogger(__name__)
- def get_client_id(prefix = 'webapp_'):
- # type: ()->str
- if '_' not in prefix:
- prefix = prefix + '_'
- return prefix + str(uuid.uuid4())
- class MqttClient(client.Client):
- def __init__(self, client_id = '', **kwargs):
- # type: (str, dict)->None
- client_id = client_id if client_id else get_client_id()
- super(MqttClient, self).__init__(client_id = client_id, **kwargs)
- def __repr__(self):
- return '<MQTTClient client_id=%s>' % (self._client_id,)
- def close(self):
- if self._sock:
- self._sock.close()
- self._sock = None
- if self._sockpairR:
- self._sockpairR.close()
- self._sockpairR = None
- if self._sockpairW:
- self._sockpairW.close()
- self._sockpairW = None
- class MqttSendClientSync(MqttClient):
- RECONNECT_WAIT_TIME = 5
- WAIT_TIME_PER_LOOP = 1
- MAX_CONNECT_TIMEOUT = 15
- MAX_SUBSCRIBE_TIMEOUT = 15
- MY_SOCKET_TIMEOUT = 15
- @staticmethod
- def _on_log_impl(mqttc, userdata, level, buf):
- logger.debug(
- '[MqttSendClientSync:on_log][{}] mqttc<{},{}:{}>. userdata: {}; level: {}; buffer = {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), level, buf))
- @staticmethod
- def _on_socket_open_impl(mqttc, userdata, sock):
- logger.debug(
- '[MqttSendClientSync:on_socket_open][{}] mqttc<{},{}:{}>. userdata: {}; sock: {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(sock)))
- sock.settimeout(MqttSendClientSync.MY_SOCKET_TIMEOUT)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- @staticmethod
- def _on_disconnect_impl(mqttc, userdata, rc):
- logger.debug(
- '[MqttSendClientSync:on_disconnect][{}] mqttc<{},{}:{}>, userdata: {}; rc: {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc)))
- @staticmethod
- def _on_connect_impl(mqttc, userdata, flags, rc, properties = None):
- logger.debug(
- '[MqttSendClientSync:on_connect][{}] mqttc<{},{}:{}>, userdata = {}, flags = {}, rc = {}, properties = {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(flags), repr(rc),
- repr(properties)))
- subscribe_result, mid = mqttc.subscribe(mqttc.server_topic, qos = mqttc.qos)
- if subscribe_result != MQTT_ERR_SUCCESS:
- raise MqttSubscribeError('send subscribe failure.')
- mqttc.subscribe_count = 0
- mqttc.subscribe_time = int(time.time())
- @staticmethod
- def _on_message_impl(mqttc, obj, msg):
- logger.debug(
- '[MqttSendClientSync:on_message][{}] mqttc<{},{}:{}>, timestamp: {}; topic: {}; msg: {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, msg.timestamp, msg.topic, str(msg.payload)))
- received = json.loads(bytes.decode(msg.payload))
- if 'sid' not in received:
- logger.debug('[MqttSendClientSync:on_message][{}] device<IMEI={}> not support sid.'.format(mqttc.sid,
- mqttc.sent[
- 'IMEI']))
- elif received['sid'] != mqttc.sent['sid']:
- logger.debug(
- '[MqttSendClientSync:on_message][{}] device<IMEI={}> skip sid not belong to me. sid = {}; my sid = {}'.format(
- mqttc.sid, mqttc.sent['IMEI'], received['sid'] if 'sid' in received else '', mqttc.sent['sid']))
- return
- if mqttc.received.empty():
- mqttc.received.put(received)
- mqttc.end_loop()
- @staticmethod
- def _on_subscribe_impl(mqttc, obj, mid, granted_qos):
- logger.debug(
- '[MqttSendClientSync:on_subscribe][{}] mqttc<{},{}:{}>, obj = {}, mid = {}, granted_qos = {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(obj), repr(mid), repr(granted_qos)))
- mqttc.subscribed = True
- if mqttc.publish_count == -1:
- mqttc.publish(mqttc.smart_box_topic, json.dumps(mqttc.sent), mqttc.qos)
- mqttc.round = 0
- mqttc.publish_count = 0
- mqttc.publish_time = int(time.time())
- @staticmethod
- def _on_publish_impl(mqttc, userdata, mid):
- logger.debug(
- '[MqttSendClientSync:on_publish][{}] mqttc<{},{}:{}>, userdata = {}, mid = {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(mid)))
- mqttc.published = True
- def __init__(self, client_id, sid, server_topic, smart_box_topic, sent, qos = 0, **kwargs):
- # type: (str, str, str, str, str, int, dict)->None
- client_id = client_id if client_id else get_client_id()
- super(MqttSendClientSync, self).__init__(client_id = client_id, **kwargs)
- self.server_topic = server_topic
- self.smart_box_topic = smart_box_topic
- self.sent = sent
- self.sid = sid
- self.qos = qos
- self._exit = False
- self.connect_count = 0
- self.connect_time = int(time.time())
- self.subscribed = False
- self.subscribe_count = 0
- self.subscribe_time = None
- self.published = False
- self.round = 0
- self.publish_count = -1
- self.publish_time = None
- self.received = Queue.Queue(1)
- self.on_message = self._on_message_impl
- self.on_connect = self._on_connect_impl
- self.on_disconnect = self._on_disconnect_impl
- self.on_subscribe = self._on_subscribe_impl
- self.on_publish = self._on_publish_impl
- self.on_log = self._on_log_impl
- self.on_socket_open = self._on_socket_open_impl
- def send(self, timeout, retry):
- max_time_over = (int(time.time()) + (timeout * (retry + 1)) + 5)
- while True:
- if int(time.time()) >= max_time_over:
- logger.debug(
- '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, time is over.'.format(
- self.sid, self._client_id, self._host, self._port))
- break
- if self._state == mqtt_cs_disconnecting:
- logger.debug(
- '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, wait for connected.'.format(
- self.sid, self._client_id, self._host, self._port))
- self._reconnect_wait()
- if self._state != mqtt_cs_connected:
- try:
- self.connect_count = 0
- self.connect_time = None
- self.subscribed = False
- self.subscribe_count = 0
- self.subscribe_time = None
- self.reconnect()
- self.connect_time = int(time.time())
- except (socket.error, WebsocketConnectionError):
- continue
- super(MqttSendClientSync, self).loop(timeout = self.WAIT_TIME_PER_LOOP)
- if self._exit is True:
- break
- if self._state != mqtt_cs_connected:
- # if (self.connect_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_CONNECT_TIMEOUT:
- if (int(time.time()) - self.connect_time) >= self.MAX_CONNECT_TIMEOUT:
- raise MqttConnectError('[{}] mqtt connect timeout.'.format(self.sid))
- else:
- # time.sleep(self.WAIT_TIME_PER_LOOP)
- # max_time_over += self.WAIT_TIME_PER_LOOP
- self.connect_count += 1
- elif not self.subscribed:
- # if (self.subscribe_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_SUBSCRIBE_TIMEOUT:
- if (int(time.time()) - self.subscribe_time) >= self.MAX_SUBSCRIBE_TIMEOUT:
- # self.subscribe_count = 0
- raise MqttSubscribeError('[{}] mqtt subscribe timeout.'.format(self.sid))
- else:
- # time.sleep(self.WAIT_TIME_PER_LOOP)
- # max_time_over += self.WAIT_TIME_PER_LOOP
- self.subscribe_count += 1
- else:
- # if (self.publish_count * self.WAIT_TIME_PER_LOOP) >= timeout:
- if (int(time.time()) - self.publish_time) >= timeout:
- if self.round >= retry:
- logger.debug('[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, exceed max retries({}).'.format(
- self.sid, self._client_id, self._host, self._port, retry))
- break
- else:
- logger.debug(
- '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, start another round({}).'.format(
- self.sid, self._client_id, self._host, self._port, self.round))
- self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos)
- self.round += 1
- self.publish_count = 0
- self.publish_time = int(time.time())
- else:
- # time.sleep(self.WAIT_TIME_PER_LOOP)
- self.publish_count += 1
- if not self.published:
- raise MqttPublishError('[{}] mqtt publish timeout.'.format(self.sid))
- if not self.received.empty():
- return self.received.get()
- else:
- return {
- 'cmd': self.sent['cmd'],
- 'IMEI': self.sent['IMEI'],
- 'rst': ErrorCode.DEVICE_CONN_FAIL
- }
- def end_loop(self):
- self._exit = True
- class MqttSendClientAsync(MqttClient):
- RECONNECT_WAIT_TIME = 5
- WAIT_TIME_PER_LOOP = 1
- MAX_CONNECT_TIMEOUT = 15
- MY_SOCKET_TIMEOUT = 15
- @staticmethod
- def _on_log_impl(mqttc, userdata, level, buf):
- logger.debug(
- '[MqttSendClientAsync:on_log][{}] mqttc<{},{}:{}>. userdata: {}; level: {}; buffer = {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), level, buf))
- @staticmethod
- def _on_socket_open_impl(mqttc, userdata, sock):
- logger.debug(
- '[MqttSendClientAsync:on_socket_open][{}] mqttc<{},{}:{}>. userdata: {}; sock: {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(sock)))
- sock.settimeout(MqttSendClientSync.MY_SOCKET_TIMEOUT)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- @staticmethod
- def _on_disconnect_impl(mqttc, userdata, rc):
- logger.debug(
- '[MqttSendClientAsync:on_disconnect][{}] mqttc<{},{}:{}>. userdata: {}; rc: {}'.format(
- mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc)))
- @staticmethod
- def _on_connect_impl(mqttc, userdata, flags, rc, properties = None):
- logger.debug(
- '[MqttSendClientAsync:on_connect][{}] mqttc = {}, userdata = {}, flags = {}, rc = {}, properties = {}'.format(
- mqttc.sid, repr(mqttc), repr(userdata), repr(flags), repr(rc), repr(properties)))
- @staticmethod
- def _on_publish_impl(mqttc, userdata, mid):
- logger.debug(
- '[MqttSendClientAsync:on_publish][{}] client = {}, userdata = {}, mid = {}'.format(
- mqttc.sid, repr(mqttc), repr(userdata), repr(mid)))
- mqttc.published = True
- mqttc.end_loop()
- def __init__(self, client_id, sid, smart_box_topic, sent, qos = 0, **kwargs):
- # type: (str, str, str, str, int, dict)->None
- client_id = client_id if client_id else get_client_id()
- super(MqttSendClientAsync, self).__init__(client_id = client_id, **kwargs)
- self.qos = qos
- self.smart_box_topic = smart_box_topic
- self.sent = sent
- self.sid = sid
- self._exit = False
- self.connect_count = 0
- self.connect_time = int(time.time())
- self.published = False
- self.publish_count = -1
- self.publish_time = None
- self.round = 0
- self.on_connect = self._on_connect_impl
- self.on_disconnect = self._on_disconnect_impl
- self.on_publish = self._on_publish_impl
- self.on_socket_open = self._on_socket_open_impl
- self.on_log = self._on_log_impl
- def send(self, timeout, retry = 0):
- max_time_over = (int(time.time()) + timeout + 5)
- while True:
- if int(time.time()) >= max_time_over:
- logger.debug(
- '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, time is over.'.format(
- self.sid, self._client_id, self._host, self._port))
- break
- if self._state == mqtt_cs_disconnecting:
- logger.debug(
- '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, wait for connected.'.format(
- self.sid, self._client_id, self._host, self._port))
- self._reconnect_wait()
- if self._state != mqtt_cs_connected:
- try:
- self.connect_count = 0
- self.connect_time = None
- self.reconnect()
- self.connect_time = int(time.time())
- except (socket.error, WebsocketConnectionError):
- continue
- super(MqttSendClientAsync, self).loop(timeout = 0)
- if self._exit is True:
- break
- if self._state != mqtt_cs_connected:
- # if (self.connect_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_CONNECT_TIMEOUT:
- if (int(time.time()) - self.connect_time) >= self.MAX_CONNECT_TIMEOUT:
- raise MqttConnectError('[{}] mqtt connect timeout.'.format(self.sid))
- else:
- # time.sleep(self.WAIT_TIME_PER_LOOP)
- # max_time_over += self.WAIT_TIME_PER_LOOP
- self.connect_count += 1
- else:
- if self.publish_count == -1:
- self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos)
- self.publish_count = 0
- self.round = 0
- self.publish_time = int(time.time())
- else:
- # if (self.publish_count * self.WAIT_TIME_PER_LOOP) >= timeout:
- if (int(time.time()) - self.publish_time) >= timeout:
- if self.round >= retry:
- logger.debug(
- '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, exceed max retries({}).'.format(
- self.sid, self._client_id, self._host, self._port, retry))
- break
- else:
- logger.debug(
- '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, start another round({}).'.format(
- self.sid, self._client_id, self._host, self._port, self.round))
- self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos)
- self.round += 1
- self.publish_count = 0
- self.publish_time = int(time.time())
- else:
- # time.sleep(self.WAIT_TIME_PER_LOOP)
- self.publish_count += 1
- if not self.published:
- raise MqttPublishError('[{}] mqtt publish timeout.'.format(self.sid))
- def end_loop(self):
- self._exit = True
|