12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- from __future__ import absolute_import
- import collections
- import logging
- from kafka.vendor import six
- from .abstract import AbstractPartitionAssignor
- from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
- log = logging.getLogger(__name__)
- class RangePartitionAssignor(AbstractPartitionAssignor):
- """
- The range assignor works on a per-topic basis. For each topic, we lay out
- the available partitions in numeric order and the consumers in
- lexicographic order. We then divide the number of partitions by the total
- number of consumers to determine the number of partitions to assign to each
- consumer. If it does not evenly divide, then the first few consumers will
- have one extra partition.
- For example, suppose there are two consumers C0 and C1, two topics t0 and
- t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
- t0p2, t1p0, t1p1, and t1p2.
- The assignment will be:
- C0: [t0p0, t0p1, t1p0, t1p1]
- C1: [t0p2, t1p2]
- """
- name = 'range'
- version = 0
- @classmethod
- def assign(cls, cluster, member_metadata):
- consumers_per_topic = collections.defaultdict(list)
- for member, metadata in six.iteritems(member_metadata):
- for topic in metadata.subscription:
- consumers_per_topic[topic].append(member)
- # construct {member_id: {topic: [partition, ...]}}
- assignment = collections.defaultdict(dict)
- for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
- partitions = cluster.partitions_for_topic(topic)
- if partitions is None:
- log.warning('No partition metadata for topic %s', topic)
- continue
- partitions = sorted(list(partitions))
- partitions_for_topic = len(partitions)
- consumers_for_topic.sort()
- partitions_per_consumer = len(partitions) // len(consumers_for_topic)
- consumers_with_extra = len(partitions) % len(consumers_for_topic)
- for i in range(len(consumers_for_topic)):
- start = partitions_per_consumer * i
- start += min(i, consumers_with_extra)
- length = partitions_per_consumer
- if not i + 1 > consumers_with_extra:
- length += 1
- member = consumers_for_topic[i]
- assignment[member][topic] = partitions[start:start+length]
- protocol_assignment = {}
- for member_id in member_metadata:
- protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
- cls.version,
- sorted(assignment[member_id].items()),
- b'')
- return protocol_assignment
- @classmethod
- def metadata(cls, topics):
- return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
- @classmethod
- def on_assignment(cls, assignment):
- pass
|