subscribe.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. # Copyright (c) 2016 Roger Light <roger@atchoo.org>
  2. #
  3. # All rights reserved. This program and the accompanying materials
  4. # are made available under the terms of the Eclipse Public License v1.0
  5. # and Eclipse Distribution License v1.0 which accompany this distribution.
  6. #
  7. # The Eclipse Public License is available at
  8. # http://www.eclipse.org/legal/epl-v10.html
  9. # and the Eclipse Distribution License is available at
  10. # http://www.eclipse.org/org/documents/edl-v10.php.
  11. #
  12. # Contributors:
  13. # Roger Light - initial API and implementation
  14. """
  15. This module provides some helper functions to allow straightforward subscribing
  16. to topics and retrieving messages. The two functions are simple(), which
  17. returns one or messages matching a set of topics, and callback() which allows
  18. you to pass a callback for processing of messages.
  19. """
  20. from __future__ import absolute_import
  21. from . import client as paho
  22. from .. import mqtt
  23. def _on_connect(client, userdata, flags, rc):
  24. """Internal callback"""
  25. if rc != 0:
  26. raise mqtt.MQTTException(paho.connack_string(rc))
  27. if isinstance(userdata['topics'], list):
  28. for topic in userdata['topics']:
  29. client.subscribe(topic, userdata['qos'])
  30. else:
  31. client.subscribe(userdata['topics'], userdata['qos'])
  32. def _on_message_callback(client, userdata, message):
  33. """Internal callback"""
  34. userdata['callback'](client, userdata['userdata'], message)
  35. def _on_message_simple(client, userdata, message):
  36. """Internal callback"""
  37. if userdata['msg_count'] == 0:
  38. return
  39. # Don't process stale retained messages if 'retained' was false
  40. if message.retain and not userdata['retained']:
  41. return
  42. userdata['msg_count'] = userdata['msg_count'] - 1
  43. if userdata['messages'] is None and userdata['msg_count'] == 0:
  44. userdata['messages'] = message
  45. client.disconnect()
  46. return
  47. userdata['messages'].append(message)
  48. if userdata['msg_count'] == 0:
  49. client.disconnect()
  50. def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
  51. port=1883, client_id="", keepalive=60, will=None, auth=None,
  52. tls=None, protocol=paho.MQTTv311, transport="tcp",
  53. clean_session=True, proxy_args=None):
  54. """Subscribe to a list of topics and process them in a callback function.
  55. This function creates an MQTT client, connects to a broker and subscribes
  56. to a list of topics. Incoming messages are processed by the user provided
  57. callback. This is a blocking function and will never return.
  58. callback : function of the form "on_message(client, userdata, message)" for
  59. processing the messages received.
  60. topics : either a string containing a single topic to subscribe to, or a
  61. list of topics to subscribe to.
  62. qos : the qos to use when subscribing. This is applied to all topics.
  63. userdata : passed to the callback
  64. hostname : a string containing the address of the broker to connect to.
  65. Defaults to localhost.
  66. port : the port to connect to the broker on. Defaults to 1883.
  67. client_id : the MQTT client id to use. If "" or None, the Paho library will
  68. generate a client id automatically.
  69. keepalive : the keepalive timeout value for the client. Defaults to 60
  70. seconds.
  71. will : a dict containing will parameters for the client: will = {'topic':
  72. "<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
  73. Topic is required, all other parameters are optional and will
  74. default to None, 0 and False respectively.
  75. Defaults to None, which indicates no will should be used.
  76. auth : a dict containing authentication parameters for the client:
  77. auth = {'username':"<username>", 'password':"<password>"}
  78. Username is required, password is optional and will default to None
  79. if not provided.
  80. Defaults to None, which indicates no authentication is to be used.
  81. tls : a dict containing TLS configuration parameters for the client:
  82. dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
  83. 'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
  84. 'ciphers':"<ciphers">, 'insecure':"<bool>"}
  85. ca_certs is required, all other parameters are optional and will
  86. default to None if not provided, which results in the client using
  87. the default behaviour - see the paho.mqtt.client documentation.
  88. Alternatively, tls input can be an SSLContext object, which will be
  89. processed using the tls_set_context method.
  90. Defaults to None, which indicates that TLS should not be used.
  91. transport : set to "tcp" to use the default setting of transport which is
  92. raw TCP. Set to "websockets" to use WebSockets as the transport.
  93. clean_session : a boolean that determines the client type. If True,
  94. the broker will remove all information about this client
  95. when it disconnects. If False, the client is a persistent
  96. client and subscription information and queued messages
  97. will be retained when the client disconnects.
  98. Defaults to True.
  99. proxy_args: a dictionary that will be given to the client.
  100. """
  101. if qos < 0 or qos > 2:
  102. raise ValueError('qos must be in the range 0-2')
  103. callback_userdata = {
  104. 'callback':callback,
  105. 'topics':topics,
  106. 'qos':qos,
  107. 'userdata':userdata}
  108. client = paho.Client(client_id=client_id, userdata=callback_userdata,
  109. protocol=protocol, transport=transport,
  110. clean_session=clean_session)
  111. client.on_message = _on_message_callback
  112. client.on_connect = _on_connect
  113. if proxy_args is not None:
  114. client.proxy_set(**proxy_args)
  115. if auth:
  116. username = auth.get('username')
  117. if username:
  118. password = auth.get('password')
  119. client.username_pw_set(username, password)
  120. else:
  121. raise KeyError("The 'username' key was not found, this is "
  122. "required for auth")
  123. if will is not None:
  124. client.will_set(**will)
  125. if tls is not None:
  126. if isinstance(tls, dict):
  127. insecure = tls.pop('insecure', False)
  128. client.tls_set(**tls)
  129. if insecure:
  130. # Must be set *after* the `client.tls_set()` call since it sets
  131. # up the SSL context that `client.tls_insecure_set` alters.
  132. client.tls_insecure_set(insecure)
  133. else:
  134. # Assume input is SSLContext object
  135. client.tls_set_context(tls)
  136. client.connect(hostname, port, keepalive)
  137. client.loop_forever()
  138. def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
  139. port=1883, client_id="", keepalive=60, will=None, auth=None,
  140. tls=None, protocol=paho.MQTTv311, transport="tcp",
  141. clean_session=True, proxy_args=None):
  142. """Subscribe to a list of topics and return msg_count messages.
  143. This function creates an MQTT client, connects to a broker and subscribes
  144. to a list of topics. Once "msg_count" messages have been received, it
  145. disconnects cleanly from the broker and returns the messages.
  146. topics : either a string containing a single topic to subscribe to, or a
  147. list of topics to subscribe to.
  148. qos : the qos to use when subscribing. This is applied to all topics.
  149. msg_count : the number of messages to retrieve from the broker.
  150. if msg_count == 1 then a single MQTTMessage will be returned.
  151. if msg_count > 1 then a list of MQTTMessages will be returned.
  152. retained : If set to True, retained messages will be processed the same as
  153. non-retained messages. If set to False, retained messages will
  154. be ignored. This means that with retained=False and msg_count=1,
  155. the function will return the first message received that does
  156. not have the retained flag set.
  157. hostname : a string containing the address of the broker to connect to.
  158. Defaults to localhost.
  159. port : the port to connect to the broker on. Defaults to 1883.
  160. client_id : the MQTT client id to use. If "" or None, the Paho library will
  161. generate a client id automatically.
  162. keepalive : the keepalive timeout value for the client. Defaults to 60
  163. seconds.
  164. will : a dict containing will parameters for the client: will = {'topic':
  165. "<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
  166. Topic is required, all other parameters are optional and will
  167. default to None, 0 and False respectively.
  168. Defaults to None, which indicates no will should be used.
  169. auth : a dict containing authentication parameters for the client:
  170. auth = {'username':"<username>", 'password':"<password>"}
  171. Username is required, password is optional and will default to None
  172. if not provided.
  173. Defaults to None, which indicates no authentication is to be used.
  174. tls : a dict containing TLS configuration parameters for the client:
  175. dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
  176. 'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
  177. 'ciphers':"<ciphers">, 'insecure':"<bool>"}
  178. ca_certs is required, all other parameters are optional and will
  179. default to None if not provided, which results in the client using
  180. the default behaviour - see the paho.mqtt.client documentation.
  181. Alternatively, tls input can be an SSLContext object, which will be
  182. processed using the tls_set_context method.
  183. Defaults to None, which indicates that TLS should not be used.
  184. transport : set to "tcp" to use the default setting of transport which is
  185. raw TCP. Set to "websockets" to use WebSockets as the transport.
  186. clean_session : a boolean that determines the client type. If True,
  187. the broker will remove all information about this client
  188. when it disconnects. If False, the client is a persistent
  189. client and subscription information and queued messages
  190. will be retained when the client disconnects.
  191. Defaults to True.
  192. proxy_args: a dictionary that will be given to the client.
  193. """
  194. if msg_count < 1:
  195. raise ValueError('msg_count must be > 0')
  196. # Set ourselves up to return a single message if msg_count == 1, or a list
  197. # if > 1.
  198. if msg_count == 1:
  199. messages = None
  200. else:
  201. messages = []
  202. userdata = {'retained':retained, 'msg_count':msg_count, 'messages':messages}
  203. callback(_on_message_simple, topics, qos, userdata, hostname, port,
  204. client_id, keepalive, will, auth, tls, protocol, transport,
  205. clean_session, proxy_args)
  206. return userdata['messages']