publish.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. # Copyright (c) 2014 Roger Light <roger@atchoo.org>
  2. #
  3. # All rights reserved. This program and the accompanying materials
  4. # are made available under the terms of the Eclipse Public License v1.0
  5. # and Eclipse Distribution License v1.0 which accompany this distribution.
  6. #
  7. # The Eclipse Public License is available at
  8. # http://www.eclipse.org/legal/epl-v10.html
  9. # and the Eclipse Distribution License is available at
  10. # http://www.eclipse.org/org/documents/edl-v10.php.
  11. #
  12. # Contributors:
  13. # Roger Light - initial API and implementation
  14. """
  15. This module provides some helper functions to allow straightforward publishing
  16. of messages in a one-shot manner. In other words, they are useful for the
  17. situation where you have a single/multiple messages you want to publish to a
  18. broker, then disconnect and nothing else is required.
  19. """
  20. from __future__ import absolute_import
  21. import collections
  22. try:
  23. from collections.abc import Iterable
  24. except ImportError:
  25. from collections import Iterable
  26. from . import client as paho
  27. from .. import mqtt
  28. def _do_publish(client):
  29. """Internal function"""
  30. message = client._userdata.popleft()
  31. if isinstance(message, dict):
  32. client.publish(**message)
  33. elif isinstance(message, (tuple, list)):
  34. client.publish(*message)
  35. else:
  36. raise TypeError('message must be a dict, tuple, or list')
  37. def _on_connect(client, userdata, flags, rc):
  38. """Internal callback"""
  39. #pylint: disable=invalid-name, unused-argument
  40. if rc == 0:
  41. if len(userdata) > 0:
  42. _do_publish(client)
  43. else:
  44. raise mqtt.MQTTException(paho.connack_string(rc))
  45. def _on_publish(client, userdata, mid):
  46. """Internal callback"""
  47. #pylint: disable=unused-argument
  48. if len(userdata) == 0:
  49. client.disconnect()
  50. else:
  51. _do_publish(client)
  52. def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
  53. will=None, auth=None, tls=None, protocol=paho.MQTTv311,
  54. transport="tcp", proxy_args=None):
  55. """Publish multiple messages to a broker, then disconnect cleanly.
  56. This function creates an MQTT client, connects to a broker and publishes a
  57. list of messages. Once the messages have been delivered, it disconnects
  58. cleanly from the broker.
  59. msgs : a list of messages to publish. Each message is either a dict or a
  60. tuple.
  61. If a dict, only the topic must be present. Default values will be
  62. used for any missing arguments. The dict must be of the form:
  63. msg = {'topic':"<topic>", 'payload':"<payload>", 'qos':<qos>,
  64. 'retain':<retain>}
  65. topic must be present and may not be empty.
  66. If payload is "", None or not present then a zero length payload
  67. will be published.
  68. If qos is not present, the default of 0 is used.
  69. If retain is not present, the default of False is used.
  70. If a tuple, then it must be of the form:
  71. ("<topic>", "<payload>", qos, retain)
  72. hostname : a string containing the address of the broker to connect to.
  73. Defaults to localhost.
  74. port : the port to connect to the broker on. Defaults to 1883.
  75. client_id : the MQTT client id to use. If "" or None, the Paho library will
  76. generate a client id automatically.
  77. keepalive : the keepalive timeout value for the client. Defaults to 60
  78. seconds.
  79. will : a dict containing will parameters for the client: will = {'topic':
  80. "<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
  81. Topic is required, all other parameters are optional and will
  82. default to None, 0 and False respectively.
  83. Defaults to None, which indicates no will should be used.
  84. auth : a dict containing authentication parameters for the client:
  85. auth = {'username':"<username>", 'password':"<password>"}
  86. Username is required, password is optional and will default to None
  87. if not provided.
  88. Defaults to None, which indicates no authentication is to be used.
  89. tls : a dict containing TLS configuration parameters for the client:
  90. dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
  91. 'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
  92. 'ciphers':"<ciphers">, 'insecure':"<bool>"}
  93. ca_certs is required, all other parameters are optional and will
  94. default to None if not provided, which results in the client using
  95. the default behaviour - see the paho.mqtt.client documentation.
  96. Alternatively, tls input can be an SSLContext object, which will be
  97. processed using the tls_set_context method.
  98. Defaults to None, which indicates that TLS should not be used.
  99. transport : set to "tcp" to use the default setting of transport which is
  100. raw TCP. Set to "websockets" to use WebSockets as the transport.
  101. proxy_args: a dictionary that will be given to the client.
  102. """
  103. if not isinstance(msgs, Iterable):
  104. raise TypeError('msgs must be an iterable')
  105. client = paho.Client(client_id=client_id, userdata=collections.deque(msgs),
  106. protocol=protocol, transport=transport)
  107. client.on_publish = _on_publish
  108. client.on_connect = _on_connect
  109. if proxy_args is not None:
  110. client.proxy_set(**proxy_args)
  111. if auth:
  112. username = auth.get('username')
  113. if username:
  114. password = auth.get('password')
  115. client.username_pw_set(username, password)
  116. else:
  117. raise KeyError("The 'username' key was not found, this is "
  118. "required for auth")
  119. if will is not None:
  120. client.will_set(**will)
  121. if tls is not None:
  122. if isinstance(tls, dict):
  123. insecure = tls.pop('insecure', False)
  124. client.tls_set(**tls)
  125. if insecure:
  126. # Must be set *after* the `client.tls_set()` call since it sets
  127. # up the SSL context that `client.tls_insecure_set` alters.
  128. client.tls_insecure_set(insecure)
  129. else:
  130. # Assume input is SSLContext object
  131. client.tls_set_context(tls)
  132. client.connect(hostname, port, keepalive)
  133. client.loop_forever()
  134. def single(topic, payload=None, qos=0, retain=False, hostname="localhost",
  135. port=1883, client_id="", keepalive=60, will=None, auth=None,
  136. tls=None, protocol=paho.MQTTv311, transport="tcp", proxy_args=None):
  137. """Publish a single message to a broker, then disconnect cleanly.
  138. This function creates an MQTT client, connects to a broker and publishes a
  139. single message. Once the message has been delivered, it disconnects cleanly
  140. from the broker.
  141. topic : the only required argument must be the topic string to which the
  142. payload will be published.
  143. payload : the payload to be published. If "" or None, a zero length payload
  144. will be published.
  145. qos : the qos to use when publishing, default to 0.
  146. retain : set the message to be retained (True) or not (False).
  147. hostname : a string containing the address of the broker to connect to.
  148. Defaults to localhost.
  149. port : the port to connect to the broker on. Defaults to 1883.
  150. client_id : the MQTT client id to use. If "" or None, the Paho library will
  151. generate a client id automatically.
  152. keepalive : the keepalive timeout value for the client. Defaults to 60
  153. seconds.
  154. will : a dict containing will parameters for the client: will = {'topic':
  155. "<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
  156. Topic is required, all other parameters are optional and will
  157. default to None, 0 and False respectively.
  158. Defaults to None, which indicates no will should be used.
  159. auth : a dict containing authentication parameters for the client:
  160. auth = {'username':"<username>", 'password':"<password>"}
  161. Username is required, password is optional and will default to None
  162. if not provided.
  163. Defaults to None, which indicates no authentication is to be used.
  164. tls : a dict containing TLS configuration parameters for the client:
  165. dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
  166. 'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
  167. 'ciphers':"<ciphers">, 'insecure':"<bool>"}
  168. ca_certs is required, all other parameters are optional and will
  169. default to None if not provided, which results in the client using
  170. the default behaviour - see the paho.mqtt.client documentation.
  171. Defaults to None, which indicates that TLS should not be used.
  172. Alternatively, tls input can be an SSLContext object, which will be
  173. processed using the tls_set_context method.
  174. transport : set to "tcp" to use the default setting of transport which is
  175. raw TCP. Set to "websockets" to use WebSockets as the transport.
  176. proxy_args: a dictionary that will be given to the client.
  177. """
  178. msg = {'topic':topic, 'payload':payload, 'qos':qos, 'retain':retain}
  179. multiple([msg], hostname, port, client_id, keepalive, will, auth, tls,
  180. protocol, transport, proxy_args)