123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- from urlparse import urlparse
- from celery.utils.log import get_task_logger
- from library.paho.mqtt.publish import _on_publish, _on_connect
- from django.conf import settings
- from apps.web.core.mqtt_client import get_client_id, MqttClient
- from apps.web.device.models import Device
- from apps.web.core.networking import MessageSender
- from apps.dispatch.commands import TopicCommand
- logger = get_task_logger(__name__)
- def publish_via_websockets(msgs, host="localhost", port=1883, path='/ws', client_id="", auth=None, tls=None):
- """Publish multiple messages to a broker, then disconnect cleanly.
- This function creates an MQTT client, connects to a broker and publishes a
- list of messages. Once the messages have been delivered, it disconnects
- cleanly from the broker.
- msgs : a list of messages to publish. Each message is either a dict or a
- tuple.
- If a dict, only the topic must be present. Default values will be
- used for any missing arguments. The dict must be of the form:
- msg = {'topic':"<topic>", 'payload':"<payload>", 'qos':<qos>,
- 'retain':<retain>}
- topic must be present and may not be empty.
- If payload is "", None or not present then a zero length payload
- will be published.
- If qos is not present, the default of 0 is used.
- If retain is not present, the default of False is used.
- If a tuple, then it must be of the form:
- ("<topic>", "<payload>", qos, retain)
- host : a string containing the address of the broker to connect to.
- Defaults to localhost.
- port : the port to connect to the broker on. Defaults to 1883.
- client_id : the MQTT client id to use. If "" or None, the Paho library will
- generate a client id automatically.
- keepalive : the keepalive timeout value for the client. Defaults to 60
- seconds.
- will : a dict containing will parameters for the client: will = {'topic':
- "<topic>", 'payload':"<payload">, 'qos':<qos>, 'retain':<retain>}.
- Topic is required, all other parameters are optional and will
- default to None, 0 and False respectively.
- Defaults to None, which indicates no will should be used.
- auth : a dict containing authentication parameters for the client:
- auth = {'username':"<username>", 'password':"<password>"}
- Username is required, password is optional and will default to None
- if not provided.
- Defaults to None, which indicates no authentication is to be used.
- tls : a dict containing TLS configuration parameters for the client:
- dict = {'ca_certs':"<ca_certs>", 'certfile':"<certfile>",
- 'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
- 'ciphers':"<ciphers">}
- ca_certs is required, all other parameters are optional and will
- default to None if not provided, which results in the client using
- the default behaviour - see the paho.mqtt.client documentation.
- Alternatively, tls input can be an SSLContext object, which will be
- processed using the tls_set_context method.
- Defaults to None, which indicates that TLS should not be used.
- transport : set to "tcp" to use the default setting of transport which is
- raw TCP. Set to "websockets" to use WebSockets as the transport.
- """
- if not isinstance(msgs, list):
- raise ValueError('msgs must be a list')
- client = MqttClient(client_id=client_id, userdata=msgs, transport='websockets')
- client.on_publish = _on_publish
- client.on_connect = _on_connect
- client.ws_set_options(path=path)
- if auth:
- username = auth.get('username')
- if username:
- password = auth.get('password')
- client.username_pw_set(username, password)
- else:
- raise KeyError("The 'username' key was not found, this is "
- "required for auth")
- if tls is not None:
- if isinstance(tls, dict):
- client.tls_set(**tls)
- else:
- # Assume input is SSLContext object
- client.tls_set_context(tls)
- client.connect(host, port)
- client.loop_forever()
- def publish_single_via_websockets(topic, payload, host, port, path, qos=0, retain=False, client_id=""):
- msgs = [{'topic': topic, 'payload': payload, 'qos': qos, 'retain': retain}]
- return publish_via_websockets(msgs=msgs, host=host, port=port, path=path, client_id=client_id)
- def send_topic_command(cmdNo, devNo, params, prefix='diag'):
- command = TopicCommand(cmdNo, devNo, params, prefix)
- dev = Device.get_dev(devNo)
- from apps.web.core.helpers import ActionDeviceBuilder
- box = ActionDeviceBuilder.create_action_device(dev)
- try:
- result = MessageSender.send(device = dev, cmd = cmdNo,
- payload = command.result_payload)
- translation = box.translate_server_cmd(result)
- except Exception as e:
- logger.exception(e)
- translation = u'未知错误'
- client_id = get_client_id(prefix='result')
- def decompose_url(url):
- if '//' not in url:
- url = '//' + url
- return urlparse(url)
- broker_url = decompose_url(settings.DEFAULT_DEALER_PUSH_BROKER_URL)
- publish_single_via_websockets(topic=command.result_topic,
- payload=translation,
- host=broker_url.hostname,
- port=broker_url.port,
- path=broker_url.path,
- client_id=client_id)
|