# -*- coding: utf-8 -*- #!/usr/bin/env python import os, sys from os.path import abspath, join PROJECT_ROOT = abspath(os.path.split(os.path.realpath(__file__))[0] + "/..") sys.path.insert(0, PROJECT_ROOT) sys.path.insert(0, join(PROJECT_ROOT, "apps")) import threading import uuid import simplejson as json from apps.web.constant import Const from apps.web.core.mqtt_client import MqttClient class MqttTest(threading.Thread): def __init__(self, host, port, user = None, password = None): super(MqttTest, self).__init__() self.host = str(host) self.port = int(port) self.user = user self.password = password def __repr__(self): return 'MqttTest' % ( self.host, self.port, self.user, self.password) def run(self): def on_message(mqttc, obj, msg): print msg.topic, msg.payload received = json.loads(msg.payload) if 'sid' not in received: print 'ERROR: no sid' return if 'cmd' not in received: print "ERROR: no cmd" if 'IMEI' not in received: print "ERROR: no imei" mqttc.publish(msg.topic.replace(Const.DEVICE_TOPIC_PREFIX, Const.SERVER_TOPIC_PREFIX), json.dumps({'rst': 0, 'IMEI': received['IMEI'], 'cmd': received['cmd'], 'sid': received['sid']}), qos = Const.MQTT_QOS) def on_subscribe(mqttc, obj, mid, granted_qos): print 'on_subscribe' def on_unsubscribe(client, userdata, mid): print 'on_unsubscribe' mqttc = MqttClient(client_id = 'webapp_' + str(uuid.uuid4())) try: print 'start mqtt test process' mqttc.on_message = on_message mqttc.on_subscribe = on_subscribe mqttc.on_unsubscribe = on_unsubscribe if self.user and self.password: mqttc.username_pw_set(self.user, self.password) mqttc.connect(self.host, self.port) mqttc.subscribe(Const.DEVICE_TOPIC_PREFIX + '/#', qos = Const.MQTT_QOS) mqttc.loop_forever() except Exception, e: print str(e) finally: mqttc.unsubscribe(Const.DEVICE_TOPIC_PREFIX + '/#') mqttc.disconnect() mqttc.close() tester = MqttTest(host = '127.0.0.1', port = 1883) tester.start() tester.join()