tasks.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. from urlparse import urlparse
  4. from celery.utils.log import get_task_logger
  5. from library.paho.mqtt.publish import _on_publish, _on_connect
  6. from django.conf import settings
  7. from apps.web.core.mqtt_client import get_client_id, MqttClient
  8. from apps.web.device.models import Device
  9. from apps.web.core.networking import MessageSender
  10. from apps.dispatch.commands import TopicCommand
  11. logger = get_task_logger(__name__)
  12. def publish_via_websockets(msgs, host="localhost", port=1883, path='/ws', client_id="", auth=None, tls=None):
  13. """Publish multiple messages to a broker, then disconnect cleanly.
  14. This function creates an MQTT client, connects to a broker and publishes a
  15. list of messages. Once the messages have been delivered, it disconnects
  16. cleanly from the broker.
  17. msgs : a list of messages to publish. Each message is either a dict or a
  18. tuple.
  19. If a dict, only the topic must be present. Default values will be
  20. used for any missing arguments. The dict must be of the form:
  21. msg = {'topic':"<topic>", 'payload':"<payload>", 'qos':<qos>,
  22. 'retain':<retain>}
  23. topic must be present and may not be empty.
  24. If payload is "", None or not present then a zero length payload
  25. will be published.
  26. If qos is not present, the default of 0 is used.
  27. If retain is not present, the default of False is used.
  28. If a tuple, then it must be of the form:
  29. ("<topic>", "<payload>", qos, retain)
  30. host : a string containing the address of the broker to connect to.
  31. Defaults to localhost.
  32. port : the port to connect to the broker on. Defaults to 1883.
  33. client_id : the MQTT client id to use. If "" or None, the Paho library will
  34. generate a client id automatically.
  35. keepalive : the keepalive timeout value for the client. Defaults to 60
  36. seconds.
  37. will : a dict containing will parameters for the client: will = {'topic':
  38. "<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
  39. Topic is required, all other parameters are optional and will
  40. default to None, 0 and False respectively.
  41. Defaults to None, which indicates no will should be used.
  42. auth : a dict containing authentication parameters for the client:
  43. auth = {'username':"<username>", 'password':"<password>"}
  44. Username is required, password is optional and will default to None
  45. if not provided.
  46. Defaults to None, which indicates no authentication is to be used.
  47. tls : a dict containing TLS configuration parameters for the client:
  48. dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
  49. 'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
  50. 'ciphers':"<ciphers">}
  51. ca_certs is required, all other parameters are optional and will
  52. default to None if not provided, which results in the client using
  53. the default behaviour - see the paho.mqtt.client documentation.
  54. Alternatively, tls input can be an SSLContext object, which will be
  55. processed using the tls_set_context method.
  56. Defaults to None, which indicates that TLS should not be used.
  57. transport : set to "tcp" to use the default setting of transport which is
  58. raw TCP. Set to "websockets" to use WebSockets as the transport.
  59. """
  60. if not isinstance(msgs, list):
  61. raise ValueError('msgs must be a list')
  62. client = MqttClient(client_id=client_id, userdata=msgs, transport='websockets')
  63. client.on_publish = _on_publish
  64. client.on_connect = _on_connect
  65. client.ws_set_options(path=path)
  66. if auth:
  67. username = auth.get('username')
  68. if username:
  69. password = auth.get('password')
  70. client.username_pw_set(username, password)
  71. else:
  72. raise KeyError("The 'username' key was not found, this is "
  73. "required for auth")
  74. if tls is not None:
  75. if isinstance(tls, dict):
  76. client.tls_set(**tls)
  77. else:
  78. # Assume input is SSLContext object
  79. client.tls_set_context(tls)
  80. client.connect(host, port)
  81. client.loop_forever()
  82. def publish_single_via_websockets(topic, payload, host, port, path, qos=0, retain=False, client_id=""):
  83. msgs = [{'topic': topic, 'payload': payload, 'qos': qos, 'retain': retain}]
  84. return publish_via_websockets(msgs=msgs, host=host, port=port, path=path, client_id=client_id)
  85. def send_topic_command(cmdNo, devNo, params, prefix='diag'):
  86. command = TopicCommand(cmdNo, devNo, params, prefix)
  87. dev = Device.get_dev(devNo)
  88. from apps.web.core.helpers import ActionDeviceBuilder
  89. box = ActionDeviceBuilder.create_action_device(dev)
  90. try:
  91. result = MessageSender.send(device = dev, cmd = cmdNo,
  92. payload = command.result_payload)
  93. translation = box.translate_server_cmd(result)
  94. except Exception as e:
  95. logger.exception(e)
  96. translation = u'未知错误'
  97. client_id = get_client_id(prefix='result')
  98. def decompose_url(url):
  99. if '//' not in url:
  100. url = '//' + url
  101. return urlparse(url)
  102. broker_url = decompose_url(settings.DEFAULT_DEALER_PUSH_BROKER_URL)
  103. publish_single_via_websockets(topic=command.result_topic,
  104. payload=translation,
  105. host=broker_url.hostname,
  106. port=broker_url.port,
  107. path=broker_url.path,
  108. client_id=client_id)