1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- from gevent import monkey
- monkey.patch_all()
- import getopt
- import sys
- try:
- options, args = getopt.getopt(sys.argv[1:], 'l:w:e:m:b',
- ['log=', 'worker=', 'env=', 'maxLogSize=', 'maxLogCount='])
- except getopt.GetoptError as e:
- print(str(e))
- sys.exit()
- log_file = None
- platform_env = 'testing'
- workerCount = 1000
- maxLogSize = 10
- maxLogCount = 10
- for name, value in options:
- if name in ('-l', '--log'):
- log_file = value
- if name in ('-e', '--env'):
- platform_env = value
- if name in ('-w', '--worker'):
- workerCount = value
- if name in ('-m', '--maxLogSize'):
- maxLogSize = int(value)
- if name in ('-b', '--maxLogCount'):
- maxLogCount = int(value)
- import os
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env))
- PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
- sys.path.insert(0, PROJECT_ROOT)
- from script.base import init_env, get_logger
- init_env(interactive = False)
- logger = get_logger(__name__)
- from script.pulsar import PulsarConsumerApp
- import mqtt_message
- def my_listener(consumer, message):
- try:
- print("====Received message '{}' id='{}'".format(message.data(), message.message_id()))
- data = message.data()
- rawMsg = mqtt_message.RawMessage()
- rawMsg.ParseFromString(data)
- print rawMsg.topic
- print rawMsg.SchemaVersion
- print rawMsg.payload
- except:
- pass
- finally:
- consumer.acknowledge(message)
- app = PulsarConsumerApp(
- logger = logger,
- service_url = 'http://121.43.115.71:8080',
- subscription_name = 'mqtt',
- topic = 'mqtt')
- app.add_message_listener(my_listener)
- app.set_worker_count(1000)
- app.start()
|