protocol.py 1.0 KB

123456789101112131415161718192021222324252627282930313233
  1. from __future__ import absolute_import
  2. from kafka.protocol.struct import Struct
  3. from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
  4. from kafka.structs import TopicPartition
  5. class ConsumerProtocolMemberMetadata(Struct):
  6. SCHEMA = Schema(
  7. ('version', Int16),
  8. ('subscription', Array(String('utf-8'))),
  9. ('user_data', Bytes))
  10. class ConsumerProtocolMemberAssignment(Struct):
  11. SCHEMA = Schema(
  12. ('version', Int16),
  13. ('assignment', Array(
  14. ('topic', String('utf-8')),
  15. ('partitions', Array(Int32)))),
  16. ('user_data', Bytes))
  17. def partitions(self):
  18. return [TopicPartition(topic, partition)
  19. for topic, partitions in self.assignment # pylint: disable-msg=no-member
  20. for partition in partitions]
  21. class ConsumerProtocol(object):
  22. PROTOCOL_TYPE = 'consumer'
  23. ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
  24. METADATA = ConsumerProtocolMemberMetadata
  25. ASSIGNMENT = ConsumerProtocolMemberAssignment