__init__.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import time
  4. import gevent
  5. import pulsar
  6. from gevent.pool import Pool
  7. class PulsarConsumerApp(object):
  8. def __init__(self, logger, service_url, subscription_name, topic):
  9. self.logger = logger
  10. self.service_url = service_url
  11. self.topic = topic
  12. self.subscription_name = subscription_name
  13. self.message_listener = None
  14. def add_message_listener(self, message_listener):
  15. self.message_listener = message_listener
  16. def set_worker_count(self, worker_count):
  17. self.worker_count = worker_count
  18. def start(self):
  19. client = pulsar.Client(
  20. service_url = self.service_url,
  21. operation_timeout_seconds = 30,
  22. io_threads = 1,
  23. concurrent_lookup_requests = 50000,
  24. connection_timeout_ms = 10000)
  25. try:
  26. consumer = client.subscribe(topic = self.topic,
  27. subscription_name = self.subscription_name,
  28. consumer_type = pulsar.ConsumerType.Shared,
  29. schema = pulsar.schema.BytesSchema())
  30. workerPool = Pool(self.worker_count or 1000)
  31. while True:
  32. msg = consumer.receive()
  33. self.logger.debug(msg)
  34. workerPool.start(gevent.spawn(self.message_listener, consumer, msg))
  35. gevent.sleep(0)
  36. workerPool.wait_available()
  37. # workerPool.wait_available()
  38. except Exception as e:
  39. self.logger.exception(e)
  40. try:
  41. client.close()
  42. except:
  43. pass