test_mqtt.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. import os, sys
  4. from os.path import abspath, join
  5. PROJECT_ROOT = abspath(os.path.split(os.path.realpath(__file__))[0] + "/..")
  6. sys.path.insert(0, PROJECT_ROOT)
  7. sys.path.insert(0, join(PROJECT_ROOT, "apps"))
  8. import threading
  9. import uuid
  10. import simplejson as json
  11. from apps.web.constant import Const
  12. from apps.web.core.mqtt_client import MqttClient
  13. class MqttTest(threading.Thread):
  14. def __init__(self, host, port, user = None, password = None):
  15. super(MqttTest, self).__init__()
  16. self.host = str(host)
  17. self.port = int(port)
  18. self.user = user
  19. self.password = password
  20. def __repr__(self):
  21. return 'MqttTest<host = %s, port = %s, user = %s, password = %s>' % (
  22. self.host, self.port, self.user, self.password)
  23. def run(self):
  24. def on_message(mqttc, obj, msg):
  25. print msg.topic, msg.payload
  26. received = json.loads(msg.payload)
  27. if 'sid' not in received:
  28. print 'ERROR: no sid'
  29. return
  30. if 'cmd' not in received:
  31. print "ERROR: no cmd"
  32. if 'IMEI' not in received:
  33. print "ERROR: no imei"
  34. mqttc.publish(msg.topic.replace(Const.DEVICE_TOPIC_PREFIX, Const.SERVER_TOPIC_PREFIX),
  35. json.dumps({'rst': 0, 'IMEI': received['IMEI'], 'cmd': received['cmd'], 'sid': received['sid']}),
  36. qos = Const.MQTT_QOS)
  37. def on_subscribe(mqttc, obj, mid, granted_qos):
  38. print 'on_subscribe'
  39. def on_unsubscribe(client, userdata, mid):
  40. print 'on_unsubscribe'
  41. mqttc = MqttClient(client_id = 'webapp_' + str(uuid.uuid4()))
  42. try:
  43. print 'start mqtt test process'
  44. mqttc.on_message = on_message
  45. mqttc.on_subscribe = on_subscribe
  46. mqttc.on_unsubscribe = on_unsubscribe
  47. if self.user and self.password:
  48. mqttc.username_pw_set(self.user, self.password)
  49. mqttc.connect(self.host, self.port)
  50. mqttc.subscribe(Const.DEVICE_TOPIC_PREFIX + '/#', qos = Const.MQTT_QOS)
  51. mqttc.loop_forever()
  52. except Exception, e:
  53. print str(e)
  54. finally:
  55. mqttc.unsubscribe(Const.DEVICE_TOPIC_PREFIX + '/#')
  56. mqttc.disconnect()
  57. mqttc.close()
  58. tester = MqttTest(host = '127.0.0.1', port = 1883)
  59. tester.start()
  60. tester.join()