roundrobin.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. from __future__ import absolute_import
  2. import collections
  3. import itertools
  4. import logging
  5. from kafka.vendor import six
  6. from .abstract import AbstractPartitionAssignor
  7. from ...common import TopicPartition
  8. from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
  9. log = logging.getLogger(__name__)
  10. class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
  11. """
  12. The roundrobin assignor lays out all the available partitions and all the
  13. available consumers. It then proceeds to do a roundrobin assignment from
  14. partition to consumer. If the subscriptions of all consumer instances are
  15. identical, then the partitions will be uniformly distributed. (i.e., the
  16. partition ownership counts will be within a delta of exactly one across all
  17. consumers.)
  18. For example, suppose there are two consumers C0 and C1, two topics t0 and
  19. t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
  20. t0p2, t1p0, t1p1, and t1p2.
  21. The assignment will be:
  22. C0: [t0p0, t0p2, t1p1]
  23. C1: [t0p1, t1p0, t1p2]
  24. When subscriptions differ across consumer instances, the assignment process
  25. still considers each consumer instance in round robin fashion but skips
  26. over an instance if it is not subscribed to the topic. Unlike the case when
  27. subscriptions are identical, this can result in imbalanced assignments.
  28. For example, suppose we have three consumers C0, C1, C2, and three topics
  29. t0, t1, t2, with unbalanced partitions t0p0, t1p0, t1p1, t2p0, t2p1, t2p2,
  30. where C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is
  31. subscribed to t0, t1, t2.
  32. The assignment will be:
  33. C0: [t0p0]
  34. C1: [t1p0]
  35. C2: [t1p1, t2p0, t2p1, t2p2]
  36. """
  37. name = 'roundrobin'
  38. version = 0
  39. @classmethod
  40. def assign(cls, cluster, member_metadata):
  41. all_topics = set()
  42. for metadata in six.itervalues(member_metadata):
  43. all_topics.update(metadata.subscription)
  44. all_topic_partitions = []
  45. for topic in all_topics:
  46. partitions = cluster.partitions_for_topic(topic)
  47. if partitions is None:
  48. log.warning('No partition metadata for topic %s', topic)
  49. continue
  50. for partition in partitions:
  51. all_topic_partitions.append(TopicPartition(topic, partition))
  52. all_topic_partitions.sort()
  53. # construct {member_id: {topic: [partition, ...]}}
  54. assignment = collections.defaultdict(lambda: collections.defaultdict(list))
  55. member_iter = itertools.cycle(sorted(member_metadata.keys()))
  56. for partition in all_topic_partitions:
  57. member_id = next(member_iter)
  58. # Because we constructed all_topic_partitions from the set of
  59. # member subscribed topics, we should be safe assuming that
  60. # each topic in all_topic_partitions is in at least one member
  61. # subscription; otherwise this could yield an infinite loop
  62. while partition.topic not in member_metadata[member_id].subscription:
  63. member_id = next(member_iter)
  64. assignment[member_id][partition.topic].append(partition.partition)
  65. protocol_assignment = {}
  66. for member_id in member_metadata:
  67. protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
  68. cls.version,
  69. sorted(assignment[member_id].items()),
  70. b'')
  71. return protocol_assignment
  72. @classmethod
  73. def metadata(cls, topics):
  74. return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
  75. @classmethod
  76. def on_assignment(cls, assignment):
  77. pass