legacy.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. from __future__ import absolute_import
  2. import logging
  3. import struct
  4. from kafka.vendor import six # pylint: disable=import-error
  5. import kafka.protocol.commit
  6. import kafka.protocol.fetch
  7. import kafka.protocol.message
  8. import kafka.protocol.metadata
  9. import kafka.protocol.offset
  10. import kafka.protocol.produce
  11. import kafka.structs
  12. from kafka.codec import gzip_encode, snappy_encode
  13. from kafka.errors import ProtocolError, UnsupportedCodecError
  14. from kafka.structs import ConsumerMetadataResponse
  15. from kafka.util import (
  16. crc32, read_short_string, relative_unpack,
  17. write_int_string, group_by_topic_and_partition)
  18. log = logging.getLogger(__name__)
  19. ATTRIBUTE_CODEC_MASK = 0x03
  20. CODEC_NONE = 0x00
  21. CODEC_GZIP = 0x01
  22. CODEC_SNAPPY = 0x02
  23. ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
  24. class KafkaProtocol(object):
  25. """
  26. Class to encapsulate all of the protocol encoding/decoding.
  27. This class does not have any state associated with it, it is purely
  28. for organization.
  29. """
  30. PRODUCE_KEY = 0
  31. FETCH_KEY = 1
  32. OFFSET_KEY = 2
  33. METADATA_KEY = 3
  34. OFFSET_COMMIT_KEY = 8
  35. OFFSET_FETCH_KEY = 9
  36. CONSUMER_METADATA_KEY = 10
  37. ###################
  38. # Private API #
  39. ###################
  40. @classmethod
  41. def _encode_message_header(cls, client_id, correlation_id, request_key,
  42. version=0):
  43. """
  44. Encode the common request envelope
  45. """
  46. return struct.pack('>hhih%ds' % len(client_id),
  47. request_key, # ApiKey
  48. version, # ApiVersion
  49. correlation_id, # CorrelationId
  50. len(client_id), # ClientId size
  51. client_id) # ClientId
  52. @classmethod
  53. def _encode_message_set(cls, messages):
  54. """
  55. Encode a MessageSet. Unlike other arrays in the protocol,
  56. MessageSets are not length-prefixed
  57. Format
  58. ======
  59. MessageSet => [Offset MessageSize Message]
  60. Offset => int64
  61. MessageSize => int32
  62. """
  63. message_set = []
  64. for message in messages:
  65. encoded_message = KafkaProtocol._encode_message(message)
  66. message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
  67. len(encoded_message),
  68. encoded_message))
  69. return b''.join(message_set)
  70. @classmethod
  71. def _encode_message(cls, message):
  72. """
  73. Encode a single message.
  74. The magic number of a message is a format version number.
  75. The only supported magic number right now is zero
  76. Format
  77. ======
  78. Message => Crc MagicByte Attributes Key Value
  79. Crc => int32
  80. MagicByte => int8
  81. Attributes => int8
  82. Key => bytes
  83. Value => bytes
  84. """
  85. if message.magic == 0:
  86. msg = b''.join([
  87. struct.pack('>BB', message.magic, message.attributes),
  88. write_int_string(message.key),
  89. write_int_string(message.value)
  90. ])
  91. crc = crc32(msg)
  92. msg = struct.pack('>i%ds' % len(msg), crc, msg)
  93. else:
  94. raise ProtocolError("Unexpected magic number: %d" % message.magic)
  95. return msg
  96. ##################
  97. # Public API #
  98. ##################
  99. @classmethod
  100. def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
  101. """
  102. Encode a ProduceRequest struct
  103. Arguments:
  104. payloads: list of ProduceRequestPayload
  105. acks: How "acky" you want the request to be
  106. 1: written to disk by the leader
  107. 0: immediate response
  108. -1: waits for all replicas to be in sync
  109. timeout: Maximum time (in ms) the server will wait for replica acks.
  110. This is _not_ a socket timeout
  111. Returns: ProduceRequest
  112. """
  113. if acks not in (1, 0, -1):
  114. raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
  115. topics = []
  116. for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
  117. topic_msgs = []
  118. for partition, payload in topic_payloads.items():
  119. partition_msgs = []
  120. for msg in payload.messages:
  121. m = kafka.protocol.message.Message(
  122. msg.value, key=msg.key,
  123. magic=msg.magic, attributes=msg.attributes
  124. )
  125. partition_msgs.append((0, m.encode()))
  126. topic_msgs.append((partition, partition_msgs))
  127. topics.append((topic, topic_msgs))
  128. return kafka.protocol.produce.ProduceRequest[0](
  129. required_acks=acks,
  130. timeout=timeout,
  131. topics=topics
  132. )
  133. @classmethod
  134. def decode_produce_response(cls, response):
  135. """
  136. Decode ProduceResponse to ProduceResponsePayload
  137. Arguments:
  138. response: ProduceResponse
  139. Return: list of ProduceResponsePayload
  140. """
  141. return [
  142. kafka.structs.ProduceResponsePayload(topic, partition, error, offset)
  143. for topic, partitions in response.topics
  144. for partition, error, offset in partitions
  145. ]
  146. @classmethod
  147. def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096):
  148. """
  149. Encodes a FetchRequest struct
  150. Arguments:
  151. payloads: list of FetchRequestPayload
  152. max_wait_time (int, optional): ms to block waiting for min_bytes
  153. data. Defaults to 100.
  154. min_bytes (int, optional): minimum bytes required to return before
  155. max_wait_time. Defaults to 4096.
  156. Return: FetchRequest
  157. """
  158. return kafka.protocol.fetch.FetchRequest[0](
  159. replica_id=-1,
  160. max_wait_time=max_wait_time,
  161. min_bytes=min_bytes,
  162. topics=[(
  163. topic,
  164. [(
  165. partition,
  166. payload.offset,
  167. payload.max_bytes)
  168. for partition, payload in topic_payloads.items()])
  169. for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
  170. @classmethod
  171. def decode_fetch_response(cls, response):
  172. """
  173. Decode FetchResponse struct to FetchResponsePayloads
  174. Arguments:
  175. response: FetchResponse
  176. """
  177. return [
  178. kafka.structs.FetchResponsePayload(
  179. topic, partition, error, highwater_offset, [
  180. offset_and_msg
  181. for offset_and_msg in cls.decode_message_set(messages)])
  182. for topic, partitions in response.topics
  183. for partition, error, highwater_offset, messages in partitions
  184. ]
  185. @classmethod
  186. def decode_message_set(cls, messages):
  187. for offset, _, message in messages:
  188. if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
  189. inner_messages = message.decompress()
  190. for (inner_offset, _msg_size, inner_msg) in inner_messages:
  191. yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg)
  192. else:
  193. yield kafka.structs.OffsetAndMessage(offset, message)
  194. @classmethod
  195. def encode_offset_request(cls, payloads=()):
  196. return kafka.protocol.offset.OffsetRequest[0](
  197. replica_id=-1,
  198. topics=[(
  199. topic,
  200. [(
  201. partition,
  202. payload.time,
  203. payload.max_offsets)
  204. for partition, payload in six.iteritems(topic_payloads)])
  205. for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
  206. @classmethod
  207. def decode_offset_response(cls, response):
  208. """
  209. Decode OffsetResponse into OffsetResponsePayloads
  210. Arguments:
  211. response: OffsetResponse
  212. Returns: list of OffsetResponsePayloads
  213. """
  214. return [
  215. kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets))
  216. for topic, partitions in response.topics
  217. for partition, error, offsets in partitions
  218. ]
  219. @classmethod
  220. def encode_list_offset_request(cls, payloads=()):
  221. return kafka.protocol.offset.OffsetRequest[1](
  222. replica_id=-1,
  223. topics=[(
  224. topic,
  225. [(
  226. partition,
  227. payload.time)
  228. for partition, payload in six.iteritems(topic_payloads)])
  229. for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
  230. @classmethod
  231. def decode_list_offset_response(cls, response):
  232. """
  233. Decode OffsetResponse_v2 into ListOffsetResponsePayloads
  234. Arguments:
  235. response: OffsetResponse_v2
  236. Returns: list of ListOffsetResponsePayloads
  237. """
  238. return [
  239. kafka.structs.ListOffsetResponsePayload(topic, partition, error, timestamp, offset)
  240. for topic, partitions in response.topics
  241. for partition, error, timestamp, offset in partitions
  242. ]
  243. @classmethod
  244. def encode_metadata_request(cls, topics=(), payloads=None):
  245. """
  246. Encode a MetadataRequest
  247. Arguments:
  248. topics: list of strings
  249. """
  250. if payloads is not None:
  251. topics = payloads
  252. return kafka.protocol.metadata.MetadataRequest[0](topics)
  253. @classmethod
  254. def decode_metadata_response(cls, response):
  255. return response
  256. @classmethod
  257. def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
  258. """
  259. Encode a ConsumerMetadataRequest
  260. Arguments:
  261. client_id: string
  262. correlation_id: int
  263. payloads: string (consumer group)
  264. """
  265. message = []
  266. message.append(cls._encode_message_header(client_id, correlation_id,
  267. KafkaProtocol.CONSUMER_METADATA_KEY))
  268. message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
  269. msg = b''.join(message)
  270. return write_int_string(msg)
  271. @classmethod
  272. def decode_consumer_metadata_response(cls, data):
  273. """
  274. Decode bytes to a ConsumerMetadataResponse
  275. Arguments:
  276. data: bytes to decode
  277. """
  278. ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
  279. (host, cur) = read_short_string(data, cur)
  280. ((port,), cur) = relative_unpack('>i', data, cur)
  281. return ConsumerMetadataResponse(error, nodeId, host, port)
  282. @classmethod
  283. def encode_offset_commit_request(cls, group, payloads):
  284. """
  285. Encode an OffsetCommitRequest struct
  286. Arguments:
  287. group: string, the consumer group you are committing offsets for
  288. payloads: list of OffsetCommitRequestPayload
  289. """
  290. return kafka.protocol.commit.OffsetCommitRequest[0](
  291. consumer_group=group,
  292. topics=[(
  293. topic,
  294. [(
  295. partition,
  296. payload.offset,
  297. payload.metadata)
  298. for partition, payload in six.iteritems(topic_payloads)])
  299. for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
  300. @classmethod
  301. def decode_offset_commit_response(cls, response):
  302. """
  303. Decode OffsetCommitResponse to an OffsetCommitResponsePayload
  304. Arguments:
  305. response: OffsetCommitResponse
  306. """
  307. return [
  308. kafka.structs.OffsetCommitResponsePayload(topic, partition, error)
  309. for topic, partitions in response.topics
  310. for partition, error in partitions
  311. ]
  312. @classmethod
  313. def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
  314. """
  315. Encode an OffsetFetchRequest struct. The request is encoded using
  316. version 0 if from_kafka is false, indicating a request for Zookeeper
  317. offsets. It is encoded using version 1 otherwise, indicating a request
  318. for Kafka offsets.
  319. Arguments:
  320. group: string, the consumer group you are fetching offsets for
  321. payloads: list of OffsetFetchRequestPayload
  322. from_kafka: bool, default False, set True for Kafka-committed offsets
  323. """
  324. version = 1 if from_kafka else 0
  325. return kafka.protocol.commit.OffsetFetchRequest[version](
  326. consumer_group=group,
  327. topics=[(
  328. topic,
  329. list(topic_payloads.keys()))
  330. for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
  331. @classmethod
  332. def decode_offset_fetch_response(cls, response):
  333. """
  334. Decode OffsetFetchResponse to OffsetFetchResponsePayloads
  335. Arguments:
  336. response: OffsetFetchResponse
  337. """
  338. return [
  339. kafka.structs.OffsetFetchResponsePayload(
  340. topic, partition, offset, metadata, error
  341. )
  342. for topic, partitions in response.topics
  343. for partition, offset, metadata, error in partitions
  344. ]
  345. def create_message(payload, key=None):
  346. """
  347. Construct a Message
  348. Arguments:
  349. payload: bytes, the payload to send to Kafka
  350. key: bytes, a key used for partition routing (optional)
  351. """
  352. return kafka.structs.Message(0, 0, key, payload)
  353. def create_gzip_message(payloads, key=None, compresslevel=None):
  354. """
  355. Construct a Gzipped Message containing multiple Messages
  356. The given payloads will be encoded, compressed, and sent as a single atomic
  357. message to Kafka.
  358. Arguments:
  359. payloads: list(bytes), a list of payload to send be sent to Kafka
  360. key: bytes, a key used for partition routing (optional)
  361. """
  362. message_set = KafkaProtocol._encode_message_set(
  363. [create_message(payload, pl_key) for payload, pl_key in payloads])
  364. gzipped = gzip_encode(message_set, compresslevel=compresslevel)
  365. codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
  366. return kafka.structs.Message(0, 0x00 | codec, key, gzipped)
  367. def create_snappy_message(payloads, key=None):
  368. """
  369. Construct a Snappy Message containing multiple Messages
  370. The given payloads will be encoded, compressed, and sent as a single atomic
  371. message to Kafka.
  372. Arguments:
  373. payloads: list(bytes), a list of payload to send be sent to Kafka
  374. key: bytes, a key used for partition routing (optional)
  375. """
  376. message_set = KafkaProtocol._encode_message_set(
  377. [create_message(payload, pl_key) for payload, pl_key in payloads])
  378. snapped = snappy_encode(message_set)
  379. codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
  380. return kafka.structs.Message(0, 0x00 | codec, key, snapped)
  381. def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
  382. """Create a message set using the given codec.
  383. If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
  384. return a list containing a single codec-encoded message.
  385. """
  386. if codec == CODEC_NONE:
  387. return [create_message(m, k) for m, k in messages]
  388. elif codec == CODEC_GZIP:
  389. return [create_gzip_message(messages, key, compresslevel)]
  390. elif codec == CODEC_SNAPPY:
  391. return [create_snappy_message(messages, key)]
  392. else:
  393. raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)