123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import time
- import gevent
- import pulsar
- from gevent.pool import Pool
- class PulsarConsumerApp(object):
- def __init__(self, logger, service_url, subscription_name, topic):
- self.logger = logger
- self.service_url = service_url
- self.topic = topic
- self.subscription_name = subscription_name
- self.message_listener = None
- def add_message_listener(self, message_listener):
- self.message_listener = message_listener
- def set_worker_count(self, worker_count):
- self.worker_count = worker_count
- def start(self):
- client = pulsar.Client(
- service_url = self.service_url,
- operation_timeout_seconds = 30,
- io_threads = 1,
- concurrent_lookup_requests = 50000,
- connection_timeout_ms = 10000)
- try:
- consumer = client.subscribe(topic = self.topic,
- subscription_name = self.subscription_name,
- consumer_type = pulsar.ConsumerType.Shared,
- schema = pulsar.schema.BytesSchema())
- workerPool = Pool(self.worker_count or 1000)
- while True:
- msg = consumer.receive()
- self.logger.debug(msg)
- workerPool.start(gevent.spawn(self.message_listener, consumer, msg))
- gevent.sleep(0)
- workerPool.wait_available()
- # workerPool.wait_available()
- except Exception as e:
- self.logger.exception(e)
- try:
- client.close()
- except:
- pass
|