keyed.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from __future__ import absolute_import
  2. import logging
  3. import warnings
  4. from .base import Producer
  5. from ..partitioner import HashedPartitioner
  6. log = logging.getLogger(__name__)
  7. class KeyedProducer(Producer):
  8. """
  9. A producer which distributes messages to partitions based on the key
  10. See Producer class for Arguments
  11. Additional Arguments:
  12. partitioner: A partitioner class that will be used to get the partition
  13. to send the message to. Must be derived from Partitioner.
  14. Defaults to HashedPartitioner.
  15. """
  16. def __init__(self, *args, **kwargs):
  17. self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
  18. self.partitioners = {}
  19. super(KeyedProducer, self).__init__(*args, **kwargs)
  20. def _next_partition(self, topic, key):
  21. if topic not in self.partitioners:
  22. if not self.client.has_metadata_for_topic(topic):
  23. self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
  24. self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
  25. partitioner = self.partitioners[topic]
  26. return partitioner.partition(key)
  27. def send_messages(self, topic, key, *msg):
  28. partition = self._next_partition(topic, key)
  29. return self._send_messages(topic, partition, *msg, key=key)
  30. # DEPRECATED
  31. def send(self, topic, key, msg):
  32. warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning)
  33. return self.send_messages(topic, key, msg)
  34. def __repr__(self):
  35. return '<KeyedProducer batch=%s>' % self.async