mqtt_consumer.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. from gevent import monkey
  4. monkey.patch_all()
  5. import getopt
  6. import sys
  7. try:
  8. options, args = getopt.getopt(sys.argv[1:], 'l:w:e:m:b',
  9. ['log=', 'worker=', 'env=', 'maxLogSize=', 'maxLogCount='])
  10. except getopt.GetoptError as e:
  11. print(str(e))
  12. sys.exit()
  13. log_file = None
  14. platform_env = 'testing'
  15. workerCount = 1000
  16. maxLogSize = 10
  17. maxLogCount = 10
  18. for name, value in options:
  19. if name in ('-l', '--log'):
  20. log_file = value
  21. if name in ('-e', '--env'):
  22. platform_env = value
  23. if name in ('-w', '--worker'):
  24. workerCount = value
  25. if name in ('-m', '--maxLogSize'):
  26. maxLogSize = int(value)
  27. if name in ('-b', '--maxLogCount'):
  28. maxLogCount = int(value)
  29. import os
  30. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env))
  31. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  32. sys.path.insert(0, PROJECT_ROOT)
  33. from script.base import init_env, get_logger
  34. init_env(interactive = False)
  35. logger = get_logger(__name__)
  36. from script.pulsar import PulsarConsumerApp
  37. import mqtt_message
  38. def my_listener(consumer, message):
  39. try:
  40. print("====Received message '{}' id='{}'".format(message.data(), message.message_id()))
  41. data = message.data()
  42. rawMsg = mqtt_message.RawMessage()
  43. rawMsg.ParseFromString(data)
  44. print rawMsg.topic
  45. print rawMsg.SchemaVersion
  46. print rawMsg.payload
  47. except:
  48. pass
  49. finally:
  50. consumer.acknowledge(message)
  51. app = PulsarConsumerApp(
  52. logger = logger,
  53. service_url = 'http://121.43.115.71:8080',
  54. subscription_name = 'mqtt',
  55. topic = 'mqtt')
  56. app.add_message_listener(my_listener)
  57. app.set_worker_count(1000)
  58. app.start()