# -*- coding: utf-8 -*- # !/usr/bin/env python from gevent import monkey monkey.patch_all() import getopt import sys try: options, args = getopt.getopt(sys.argv[1:], 'l:w:e:m:b', ['log=', 'worker=', 'env=', 'maxLogSize=', 'maxLogCount=']) except getopt.GetoptError as e: print(str(e)) sys.exit() log_file = None platform_env = 'testing' workerCount = 1000 maxLogSize = 10 maxLogCount = 10 for name, value in options: if name in ('-l', '--log'): log_file = value if name in ('-e', '--env'): platform_env = value if name in ('-w', '--worker'): workerCount = value if name in ('-m', '--maxLogSize'): maxLogSize = int(value) if name in ('-b', '--maxLogCount'): maxLogCount = int(value) import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env)) PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..') sys.path.insert(0, PROJECT_ROOT) from script.base import init_env, get_logger init_env(interactive = False) logger = get_logger(__name__) from script.pulsar import PulsarConsumerApp import mqtt_message def my_listener(consumer, message): try: print("====Received message '{}' id='{}'".format(message.data(), message.message_id())) data = message.data() rawMsg = mqtt_message.RawMessage() rawMsg.ParseFromString(data) print rawMsg.topic print rawMsg.SchemaVersion print rawMsg.payload except: pass finally: consumer.acknowledge(message) app = PulsarConsumerApp( logger = logger, service_url = 'http://121.43.115.71:8080', subscription_name = 'mqtt', topic = 'mqtt') app.add_message_listener(my_listener) app.set_worker_count(1000) app.start()