# -*- coding: utf-8 -*- #!/usr/bin/env python from urlparse import urlparse from celery.utils.log import get_task_logger from library.paho.mqtt.publish import _on_publish, _on_connect from django.conf import settings from apps.web.core.mqtt_client import get_client_id, MqttClient from apps.web.device.models import Device from apps.web.core.networking import MessageSender from apps.dispatch.commands import TopicCommand logger = get_task_logger(__name__) def publish_via_websockets(msgs, host="localhost", port=1883, path='/ws', client_id="", auth=None, tls=None): """Publish multiple messages to a broker, then disconnect cleanly. This function creates an MQTT client, connects to a broker and publishes a list of messages. Once the messages have been delivered, it disconnects cleanly from the broker. msgs : a list of messages to publish. Each message is either a dict or a tuple. If a dict, only the topic must be present. Default values will be used for any missing arguments. The dict must be of the form: msg = {'topic':"", 'payload':"", 'qos':, 'retain':} topic must be present and may not be empty. If payload is "", None or not present then a zero length payload will be published. If qos is not present, the default of 0 is used. If retain is not present, the default of False is used. If a tuple, then it must be of the form: ("", "", qos, retain) host : a string containing the address of the broker to connect to. Defaults to localhost. port : the port to connect to the broker on. Defaults to 1883. client_id : the MQTT client id to use. If "" or None, the Paho library will generate a client id automatically. keepalive : the keepalive timeout value for the client. Defaults to 60 seconds. will : a dict containing will parameters for the client: will = {'topic': "", 'payload':", 'qos':, 'retain':}. Topic is required, all other parameters are optional and will default to None, 0 and False respectively. Defaults to None, which indicates no will should be used. auth : a dict containing authentication parameters for the client: auth = {'username':"", 'password':""} Username is required, password is optional and will default to None if not provided. Defaults to None, which indicates no authentication is to be used. tls : a dict containing TLS configuration parameters for the client: dict = {'ca_certs':"", 'certfile':"", 'keyfile':"", 'tls_version':"", 'ciphers':"} ca_certs is required, all other parameters are optional and will default to None if not provided, which results in the client using the default behaviour - see the paho.mqtt.client documentation. Alternatively, tls input can be an SSLContext object, which will be processed using the tls_set_context method. Defaults to None, which indicates that TLS should not be used. transport : set to "tcp" to use the default setting of transport which is raw TCP. Set to "websockets" to use WebSockets as the transport. """ if not isinstance(msgs, list): raise ValueError('msgs must be a list') client = MqttClient(client_id=client_id, userdata=msgs, transport='websockets') client.on_publish = _on_publish client.on_connect = _on_connect client.ws_set_options(path=path) if auth: username = auth.get('username') if username: password = auth.get('password') client.username_pw_set(username, password) else: raise KeyError("The 'username' key was not found, this is " "required for auth") if tls is not None: if isinstance(tls, dict): client.tls_set(**tls) else: # Assume input is SSLContext object client.tls_set_context(tls) client.connect(host, port) client.loop_forever() def publish_single_via_websockets(topic, payload, host, port, path, qos=0, retain=False, client_id=""): msgs = [{'topic': topic, 'payload': payload, 'qos': qos, 'retain': retain}] return publish_via_websockets(msgs=msgs, host=host, port=port, path=path, client_id=client_id) def send_topic_command(cmdNo, devNo, params, prefix='diag'): command = TopicCommand(cmdNo, devNo, params, prefix) dev = Device.get_dev(devNo) from apps.web.core.helpers import ActionDeviceBuilder box = ActionDeviceBuilder.create_action_device(dev) try: result = MessageSender.send(device = dev, cmd = cmdNo, payload = command.result_payload) translation = box.translate_server_cmd(result) except Exception as e: logger.exception(e) translation = u'未知错误' client_id = get_client_id(prefix='result') def decompose_url(url): if '//' not in url: url = '//' + url return urlparse(url) broker_url = decompose_url(settings.DEFAULT_DEALER_PUSH_BROKER_URL) publish_single_via_websockets(topic=command.result_topic, payload=translation, host=broker_url.hostname, port=broker_url.port, path=broker_url.path, client_id=client_id)