# -*- coding: utf-8 -*- # !/usr/bin/env python import time import gevent import pulsar from gevent.pool import Pool class PulsarConsumerApp(object): def __init__(self, logger, service_url, subscription_name, topic): self.logger = logger self.service_url = service_url self.topic = topic self.subscription_name = subscription_name self.message_listener = None def add_message_listener(self, message_listener): self.message_listener = message_listener def set_worker_count(self, worker_count): self.worker_count = worker_count def start(self): client = pulsar.Client( service_url = self.service_url, operation_timeout_seconds = 30, io_threads = 1, concurrent_lookup_requests = 50000, connection_timeout_ms = 10000) try: consumer = client.subscribe(topic = self.topic, subscription_name = self.subscription_name, consumer_type = pulsar.ConsumerType.Shared, schema = pulsar.schema.BytesSchema()) workerPool = Pool(self.worker_count or 1000) while True: msg = consumer.receive() self.logger.debug(msg) workerPool.start(gevent.spawn(self.message_listener, consumer, msg)) gevent.sleep(0) workerPool.wait_available() # workerPool.wait_available() except Exception as e: self.logger.exception(e) try: client.close() except: pass