mqtt_client.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import Queue
  4. import time
  5. import logging
  6. import uuid
  7. import socket
  8. from library.paho.mqtt import client
  9. from library.paho.mqtt.client import WebsocketConnectionError, mqtt_cs_disconnecting, MQTT_ERR_SUCCESS, \
  10. mqtt_cs_connected
  11. from apps.web.constant import ErrorCode
  12. import simplejson as json
  13. from apps.web.core.exceptions import MqttSubscribeError, MqttPublishError, MqttConnectError
  14. logger = logging.getLogger(__name__)
  15. def get_client_id(prefix = 'webapp_'):
  16. # type: ()->str
  17. if '_' not in prefix:
  18. prefix = prefix + '_'
  19. return prefix + str(uuid.uuid4())
  20. class MqttClient(client.Client):
  21. def __init__(self, client_id = '', **kwargs):
  22. # type: (str, dict)->None
  23. client_id = client_id if client_id else get_client_id()
  24. super(MqttClient, self).__init__(client_id = client_id, **kwargs)
  25. def __repr__(self):
  26. return '<MQTTClient client_id=%s>' % (self._client_id,)
  27. def close(self):
  28. if self._sock:
  29. self._sock.close()
  30. self._sock = None
  31. if self._sockpairR:
  32. self._sockpairR.close()
  33. self._sockpairR = None
  34. if self._sockpairW:
  35. self._sockpairW.close()
  36. self._sockpairW = None
  37. class MqttSendClientSync(MqttClient):
  38. RECONNECT_WAIT_TIME = 5
  39. WAIT_TIME_PER_LOOP = 1
  40. MAX_CONNECT_TIMEOUT = 15
  41. MAX_SUBSCRIBE_TIMEOUT = 15
  42. MY_SOCKET_TIMEOUT = 15
  43. @staticmethod
  44. def _on_log_impl(mqttc, userdata, level, buf):
  45. logger.debug(
  46. '[MqttSendClientSync:on_log][{}] mqttc<{},{}:{}>. userdata: {}; level: {}; buffer = {}'.format(
  47. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), level, buf))
  48. @staticmethod
  49. def _on_socket_open_impl(mqttc, userdata, sock):
  50. logger.debug(
  51. '[MqttSendClientSync:on_socket_open][{}] mqttc<{},{}:{}>. userdata: {}; sock: {}'.format(
  52. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(sock)))
  53. sock.settimeout(MqttSendClientSync.MY_SOCKET_TIMEOUT)
  54. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  55. @staticmethod
  56. def _on_disconnect_impl(mqttc, userdata, rc):
  57. logger.debug(
  58. '[MqttSendClientSync:on_disconnect][{}] mqttc<{},{}:{}>, userdata: {}; rc: {}'.format(
  59. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc)))
  60. @staticmethod
  61. def _on_connect_impl(mqttc, userdata, flags, rc, properties = None):
  62. logger.debug(
  63. '[MqttSendClientSync:on_connect][{}] mqttc<{},{}:{}>, userdata = {}, flags = {}, rc = {}, properties = {}'.format(
  64. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(flags), repr(rc),
  65. repr(properties)))
  66. subscribe_result, mid = mqttc.subscribe(mqttc.server_topic, qos = mqttc.qos)
  67. if subscribe_result != MQTT_ERR_SUCCESS:
  68. raise MqttSubscribeError('send subscribe failure.')
  69. mqttc.subscribe_count = 0
  70. mqttc.subscribe_time = int(time.time())
  71. @staticmethod
  72. def _on_message_impl(mqttc, obj, msg):
  73. logger.debug(
  74. '[MqttSendClientSync:on_message][{}] mqttc<{},{}:{}>, timestamp: {}; topic: {}; msg: {}'.format(
  75. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, msg.timestamp, msg.topic, str(msg.payload)))
  76. received = json.loads(bytes.decode(msg.payload))
  77. if 'sid' not in received:
  78. logger.debug('[MqttSendClientSync:on_message][{}] device<IMEI={}> not support sid.'.format(mqttc.sid,
  79. mqttc.sent[
  80. 'IMEI']))
  81. elif received['sid'] != mqttc.sent['sid']:
  82. logger.debug(
  83. '[MqttSendClientSync:on_message][{}] device<IMEI={}> skip sid not belong to me. sid = {}; my sid = {}'.format(
  84. mqttc.sid, mqttc.sent['IMEI'], received['sid'] if 'sid' in received else '', mqttc.sent['sid']))
  85. return
  86. if mqttc.received.empty():
  87. mqttc.received.put(received)
  88. mqttc.end_loop()
  89. @staticmethod
  90. def _on_subscribe_impl(mqttc, obj, mid, granted_qos):
  91. logger.debug(
  92. '[MqttSendClientSync:on_subscribe][{}] mqttc<{},{}:{}>, obj = {}, mid = {}, granted_qos = {}'.format(
  93. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(obj), repr(mid), repr(granted_qos)))
  94. mqttc.subscribed = True
  95. if mqttc.publish_count == -1:
  96. mqttc.publish(mqttc.smart_box_topic, json.dumps(mqttc.sent), mqttc.qos)
  97. mqttc.round = 0
  98. mqttc.publish_count = 0
  99. mqttc.publish_time = int(time.time())
  100. @staticmethod
  101. def _on_publish_impl(mqttc, userdata, mid):
  102. logger.debug(
  103. '[MqttSendClientSync:on_publish][{}] mqttc<{},{}:{}>, userdata = {}, mid = {}'.format(
  104. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(mid)))
  105. mqttc.published = True
  106. def __init__(self, client_id, sid, server_topic, smart_box_topic, sent, qos = 0, **kwargs):
  107. # type: (str, str, str, str, str, int, dict)->None
  108. client_id = client_id if client_id else get_client_id()
  109. super(MqttSendClientSync, self).__init__(client_id = client_id, **kwargs)
  110. self.server_topic = server_topic
  111. self.smart_box_topic = smart_box_topic
  112. self.sent = sent
  113. self.sid = sid
  114. self.qos = qos
  115. self._exit = False
  116. self.connect_count = 0
  117. self.connect_time = int(time.time())
  118. self.subscribed = False
  119. self.subscribe_count = 0
  120. self.subscribe_time = None
  121. self.published = False
  122. self.round = 0
  123. self.publish_count = -1
  124. self.publish_time = None
  125. self.received = Queue.Queue(1)
  126. self.on_message = self._on_message_impl
  127. self.on_connect = self._on_connect_impl
  128. self.on_disconnect = self._on_disconnect_impl
  129. self.on_subscribe = self._on_subscribe_impl
  130. self.on_publish = self._on_publish_impl
  131. self.on_log = self._on_log_impl
  132. self.on_socket_open = self._on_socket_open_impl
  133. def send(self, timeout, retry):
  134. max_time_over = (int(time.time()) + (timeout * (retry + 1)) + 5)
  135. while True:
  136. if int(time.time()) >= max_time_over:
  137. logger.debug(
  138. '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, time is over.'.format(
  139. self.sid, self._client_id, self._host, self._port))
  140. break
  141. if self._state == mqtt_cs_disconnecting:
  142. logger.debug(
  143. '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, wait for connected.'.format(
  144. self.sid, self._client_id, self._host, self._port))
  145. self._reconnect_wait()
  146. if self._state != mqtt_cs_connected:
  147. try:
  148. self.connect_count = 0
  149. self.connect_time = None
  150. self.subscribed = False
  151. self.subscribe_count = 0
  152. self.subscribe_time = None
  153. self.reconnect()
  154. self.connect_time = int(time.time())
  155. except (socket.error, WebsocketConnectionError):
  156. continue
  157. super(MqttSendClientSync, self).loop(timeout = self.WAIT_TIME_PER_LOOP)
  158. if self._exit is True:
  159. break
  160. if self._state != mqtt_cs_connected:
  161. # if (self.connect_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_CONNECT_TIMEOUT:
  162. if (int(time.time()) - self.connect_time) >= self.MAX_CONNECT_TIMEOUT:
  163. raise MqttConnectError('[{}] mqtt connect timeout.'.format(self.sid))
  164. else:
  165. # time.sleep(self.WAIT_TIME_PER_LOOP)
  166. # max_time_over += self.WAIT_TIME_PER_LOOP
  167. self.connect_count += 1
  168. elif not self.subscribed:
  169. # if (self.subscribe_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_SUBSCRIBE_TIMEOUT:
  170. if (int(time.time()) - self.subscribe_time) >= self.MAX_SUBSCRIBE_TIMEOUT:
  171. # self.subscribe_count = 0
  172. raise MqttSubscribeError('[{}] mqtt subscribe timeout.'.format(self.sid))
  173. else:
  174. # time.sleep(self.WAIT_TIME_PER_LOOP)
  175. # max_time_over += self.WAIT_TIME_PER_LOOP
  176. self.subscribe_count += 1
  177. else:
  178. # if (self.publish_count * self.WAIT_TIME_PER_LOOP) >= timeout:
  179. if (int(time.time()) - self.publish_time) >= timeout:
  180. if self.round >= retry:
  181. logger.debug('[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, exceed max retries({}).'.format(
  182. self.sid, self._client_id, self._host, self._port, retry))
  183. break
  184. else:
  185. logger.debug(
  186. '[MqttSendClientSync:send][{}] mqttc<{},{}:{}>, start another round({}).'.format(
  187. self.sid, self._client_id, self._host, self._port, self.round))
  188. self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos)
  189. self.round += 1
  190. self.publish_count = 0
  191. self.publish_time = int(time.time())
  192. else:
  193. # time.sleep(self.WAIT_TIME_PER_LOOP)
  194. self.publish_count += 1
  195. if not self.published:
  196. raise MqttPublishError('[{}] mqtt publish timeout.'.format(self.sid))
  197. if not self.received.empty():
  198. return self.received.get()
  199. else:
  200. return {
  201. 'cmd': self.sent['cmd'],
  202. 'IMEI': self.sent['IMEI'],
  203. 'rst': ErrorCode.DEVICE_CONN_FAIL
  204. }
  205. def end_loop(self):
  206. self._exit = True
  207. class MqttSendClientAsync(MqttClient):
  208. RECONNECT_WAIT_TIME = 5
  209. WAIT_TIME_PER_LOOP = 1
  210. MAX_CONNECT_TIMEOUT = 15
  211. MY_SOCKET_TIMEOUT = 15
  212. @staticmethod
  213. def _on_log_impl(mqttc, userdata, level, buf):
  214. logger.debug(
  215. '[MqttSendClientAsync:on_log][{}] mqttc<{},{}:{}>. userdata: {}; level: {}; buffer = {}'.format(
  216. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), level, buf))
  217. @staticmethod
  218. def _on_socket_open_impl(mqttc, userdata, sock):
  219. logger.debug(
  220. '[MqttSendClientAsync:on_socket_open][{}] mqttc<{},{}:{}>. userdata: {}; sock: {}'.format(
  221. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(sock)))
  222. sock.settimeout(MqttSendClientSync.MY_SOCKET_TIMEOUT)
  223. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  224. @staticmethod
  225. def _on_disconnect_impl(mqttc, userdata, rc):
  226. logger.debug(
  227. '[MqttSendClientAsync:on_disconnect][{}] mqttc<{},{}:{}>. userdata: {}; rc: {}'.format(
  228. mqttc.sid, mqttc._client_id, mqttc._host, mqttc._port, repr(userdata), repr(rc)))
  229. @staticmethod
  230. def _on_connect_impl(mqttc, userdata, flags, rc, properties = None):
  231. logger.debug(
  232. '[MqttSendClientAsync:on_connect][{}] mqttc = {}, userdata = {}, flags = {}, rc = {}, properties = {}'.format(
  233. mqttc.sid, repr(mqttc), repr(userdata), repr(flags), repr(rc), repr(properties)))
  234. @staticmethod
  235. def _on_publish_impl(mqttc, userdata, mid):
  236. logger.debug(
  237. '[MqttSendClientAsync:on_publish][{}] client = {}, userdata = {}, mid = {}'.format(
  238. mqttc.sid, repr(mqttc), repr(userdata), repr(mid)))
  239. mqttc.published = True
  240. mqttc.end_loop()
  241. def __init__(self, client_id, sid, smart_box_topic, sent, qos = 0, **kwargs):
  242. # type: (str, str, str, str, int, dict)->None
  243. client_id = client_id if client_id else get_client_id()
  244. super(MqttSendClientAsync, self).__init__(client_id = client_id, **kwargs)
  245. self.qos = qos
  246. self.smart_box_topic = smart_box_topic
  247. self.sent = sent
  248. self.sid = sid
  249. self._exit = False
  250. self.connect_count = 0
  251. self.connect_time = int(time.time())
  252. self.published = False
  253. self.publish_count = -1
  254. self.publish_time = None
  255. self.round = 0
  256. self.on_connect = self._on_connect_impl
  257. self.on_disconnect = self._on_disconnect_impl
  258. self.on_publish = self._on_publish_impl
  259. self.on_socket_open = self._on_socket_open_impl
  260. self.on_log = self._on_log_impl
  261. def send(self, timeout, retry = 0):
  262. max_time_over = (int(time.time()) + timeout + 5)
  263. while True:
  264. if int(time.time()) >= max_time_over:
  265. logger.debug(
  266. '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, time is over.'.format(
  267. self.sid, self._client_id, self._host, self._port))
  268. break
  269. if self._state == mqtt_cs_disconnecting:
  270. logger.debug(
  271. '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, wait for connected.'.format(
  272. self.sid, self._client_id, self._host, self._port))
  273. self._reconnect_wait()
  274. if self._state != mqtt_cs_connected:
  275. try:
  276. self.connect_count = 0
  277. self.connect_time = None
  278. self.reconnect()
  279. self.connect_time = int(time.time())
  280. except (socket.error, WebsocketConnectionError):
  281. continue
  282. super(MqttSendClientAsync, self).loop(timeout = 0)
  283. if self._exit is True:
  284. break
  285. if self._state != mqtt_cs_connected:
  286. # if (self.connect_count * self.WAIT_TIME_PER_LOOP) >= self.MAX_CONNECT_TIMEOUT:
  287. if (int(time.time()) - self.connect_time) >= self.MAX_CONNECT_TIMEOUT:
  288. raise MqttConnectError('[{}] mqtt connect timeout.'.format(self.sid))
  289. else:
  290. # time.sleep(self.WAIT_TIME_PER_LOOP)
  291. # max_time_over += self.WAIT_TIME_PER_LOOP
  292. self.connect_count += 1
  293. else:
  294. if self.publish_count == -1:
  295. self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos)
  296. self.publish_count = 0
  297. self.round = 0
  298. self.publish_time = int(time.time())
  299. else:
  300. # if (self.publish_count * self.WAIT_TIME_PER_LOOP) >= timeout:
  301. if (int(time.time()) - self.publish_time) >= timeout:
  302. if self.round >= retry:
  303. logger.debug(
  304. '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, exceed max retries({}).'.format(
  305. self.sid, self._client_id, self._host, self._port, retry))
  306. break
  307. else:
  308. logger.debug(
  309. '[MqttSendClientAsync:send][{}] mqttc<{},{}:{}>, start another round({}).'.format(
  310. self.sid, self._client_id, self._host, self._port, self.round))
  311. self.publish(self.smart_box_topic, json.dumps(self.sent), self.qos)
  312. self.round += 1
  313. self.publish_count = 0
  314. self.publish_time = int(time.time())
  315. else:
  316. # time.sleep(self.WAIT_TIME_PER_LOOP)
  317. self.publish_count += 1
  318. if not self.published:
  319. raise MqttPublishError('[{}] mqtt publish timeout.'.format(self.sid))
  320. def end_loop(self):
  321. self._exit = True