import uuid from datetime import datetime import simplejson as json from concurrent.futures import ThreadPoolExecutor from base import init_env init_env(interactive = True) from apps.web.core.mqtt_client import MqttClient class LoggerHandler(object): def __init__(self): self._executor = ThreadPoolExecutor(max_workers=3) @property def executor(self): return self._executor def insert(self, msgDict): if msgDict.get("cmd") not in (210, 100): return print("------------------insert") self.executor.submit(self.log_msg, msgDict) @staticmethod def log_msg(msg): devNo = msg.get("IMEI") data = msg.get("data") or "" funCode = msg.get("funCode") or data[8: 10] topic = "smartBox" if funCode else "server" dateTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") msg = "TOPIC: {}\nDATE: {}\nIMEI: {}\nDATA: {}\nCODE: {}\n".format(topic, dateTime, devNo, data, funCode) fileName = "./an_qi_log/{}.log".format(devNo) with open(fileName, "a") as f: f.write(msg) loggerHandler = LoggerHandler() def generate_device_imei(): path = "./devNos.txt" with open(path) as devFile: devNos = devFile.readlines() return devNos def on_connect(client, *args, **kwargs): for devNo in generate_device_imei(): device_topic = "+/{}/210".format(devNo.strip()) client.subscribe(device_topic) print 'subscribe topic(%s) success' % device_topic device_topic = "+/{}/100".format(devNo.strip()) client.subscribe(device_topic) print 'subscribe topic(%s) success' % device_topic def on_message(m, obj, msg): print("on message++++++++++++++++++++++++++++++") msgDict = json.loads(bytes.decode(msg.payload)) print msgDict print type(msgDict) loggerHandler.insert(msgDict) mqttClient = MqttClient(client_id='webapp_' + str(uuid.uuid1())) MQTT_U = "UCQ3FCdN" MQTT_P = "Wh4tN8eJKZ5GeV8n72BVbUTILr5VJmXf" try: mqttClient.on_message = on_message mqttClient.on_connect = on_connect mqttClient.username_pw_set(MQTT_U, MQTT_P) mqttClient.connect("120.27.251.159", 1883, 60) mqttClient.loop_forever() finally: mqttClient.disconnect() mqttClient.close()