listen_device_anqihuandian.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import uuid
  2. from datetime import datetime
  3. import simplejson as json
  4. from concurrent.futures import ThreadPoolExecutor
  5. from base import init_env
  6. init_env(interactive = True)
  7. from apps.web.core.mqtt_client import MqttClient
  8. class LoggerHandler(object):
  9. def __init__(self):
  10. self._executor = ThreadPoolExecutor(max_workers=3)
  11. @property
  12. def executor(self):
  13. return self._executor
  14. def insert(self, msgDict):
  15. if msgDict.get("cmd") not in (210, 100):
  16. return
  17. print("------------------insert")
  18. self.executor.submit(self.log_msg, msgDict)
  19. @staticmethod
  20. def log_msg(msg):
  21. devNo = msg.get("IMEI")
  22. data = msg.get("data") or ""
  23. funCode = msg.get("funCode") or data[8: 10]
  24. topic = "smartBox" if funCode else "server"
  25. dateTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  26. msg = "TOPIC: {}\nDATE: {}\nIMEI: {}\nDATA: {}\nCODE: {}\n".format(topic, dateTime, devNo, data, funCode)
  27. fileName = "./an_qi_log/{}.log".format(devNo)
  28. with open(fileName, "a") as f:
  29. f.write(msg)
  30. loggerHandler = LoggerHandler()
  31. def generate_device_imei():
  32. path = "./devNos.txt"
  33. with open(path) as devFile:
  34. devNos = devFile.readlines()
  35. return devNos
  36. def on_connect(client, *args, **kwargs):
  37. for devNo in generate_device_imei():
  38. device_topic = "+/{}/210".format(devNo.strip())
  39. client.subscribe(device_topic)
  40. print 'subscribe topic(%s) success' % device_topic
  41. device_topic = "+/{}/100".format(devNo.strip())
  42. client.subscribe(device_topic)
  43. print 'subscribe topic(%s) success' % device_topic
  44. def on_message(m, obj, msg):
  45. print("on message++++++++++++++++++++++++++++++")
  46. msgDict = json.loads(bytes.decode(msg.payload))
  47. print msgDict
  48. print type(msgDict)
  49. loggerHandler.insert(msgDict)
  50. mqttClient = MqttClient(client_id='webapp_' + str(uuid.uuid1()))
  51. MQTT_U = "UCQ3FCdN"
  52. MQTT_P = "Wh4tN8eJKZ5GeV8n72BVbUTILr5VJmXf"
  53. try:
  54. mqttClient.on_message = on_message
  55. mqttClient.on_connect = on_connect
  56. mqttClient.username_pw_set(MQTT_U, MQTT_P)
  57. mqttClient.connect("120.27.251.159", 1883, 60)
  58. mqttClient.loop_forever()
  59. finally:
  60. mqttClient.disconnect()
  61. mqttClient.close()