range.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from __future__ import absolute_import
  2. import collections
  3. import logging
  4. from kafka.vendor import six
  5. from .abstract import AbstractPartitionAssignor
  6. from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
  7. log = logging.getLogger(__name__)
  8. class RangePartitionAssignor(AbstractPartitionAssignor):
  9. """
  10. The range assignor works on a per-topic basis. For each topic, we lay out
  11. the available partitions in numeric order and the consumers in
  12. lexicographic order. We then divide the number of partitions by the total
  13. number of consumers to determine the number of partitions to assign to each
  14. consumer. If it does not evenly divide, then the first few consumers will
  15. have one extra partition.
  16. For example, suppose there are two consumers C0 and C1, two topics t0 and
  17. t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
  18. t0p2, t1p0, t1p1, and t1p2.
  19. The assignment will be:
  20. C0: [t0p0, t0p1, t1p0, t1p1]
  21. C1: [t0p2, t1p2]
  22. """
  23. name = 'range'
  24. version = 0
  25. @classmethod
  26. def assign(cls, cluster, member_metadata):
  27. consumers_per_topic = collections.defaultdict(list)
  28. for member, metadata in six.iteritems(member_metadata):
  29. for topic in metadata.subscription:
  30. consumers_per_topic[topic].append(member)
  31. # construct {member_id: {topic: [partition, ...]}}
  32. assignment = collections.defaultdict(dict)
  33. for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
  34. partitions = cluster.partitions_for_topic(topic)
  35. if partitions is None:
  36. log.warning('No partition metadata for topic %s', topic)
  37. continue
  38. partitions = sorted(list(partitions))
  39. partitions_for_topic = len(partitions)
  40. consumers_for_topic.sort()
  41. partitions_per_consumer = len(partitions) // len(consumers_for_topic)
  42. consumers_with_extra = len(partitions) % len(consumers_for_topic)
  43. for i in range(len(consumers_for_topic)):
  44. start = partitions_per_consumer * i
  45. start += min(i, consumers_with_extra)
  46. length = partitions_per_consumer
  47. if not i + 1 > consumers_with_extra:
  48. length += 1
  49. member = consumers_for_topic[i]
  50. assignment[member][topic] = partitions[start:start+length]
  51. protocol_assignment = {}
  52. for member_id in member_metadata:
  53. protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
  54. cls.version,
  55. sorted(assignment[member_id].items()),
  56. b'')
  57. return protocol_assignment
  58. @classmethod
  59. def metadata(cls, topics):
  60. return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
  61. @classmethod
  62. def on_assignment(cls, assignment):
  63. pass