message.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from __future__ import absolute_import
  2. import io
  3. import time
  4. from ..codec import (has_gzip, has_snappy, has_lz4,
  5. gzip_decode, snappy_decode,
  6. lz4_decode, lz4_decode_old_kafka)
  7. from .frame import KafkaBytes
  8. from .struct import Struct
  9. from .types import (
  10. Int8, Int32, Int64, Bytes, Schema, AbstractType
  11. )
  12. from ..util import crc32, WeakMethod
  13. class Message(Struct):
  14. SCHEMAS = [
  15. Schema(
  16. ('crc', Int32),
  17. ('magic', Int8),
  18. ('attributes', Int8),
  19. ('key', Bytes),
  20. ('value', Bytes)),
  21. Schema(
  22. ('crc', Int32),
  23. ('magic', Int8),
  24. ('attributes', Int8),
  25. ('timestamp', Int64),
  26. ('key', Bytes),
  27. ('value', Bytes)),
  28. ]
  29. SCHEMA = SCHEMAS[1]
  30. CODEC_MASK = 0x07
  31. CODEC_GZIP = 0x01
  32. CODEC_SNAPPY = 0x02
  33. CODEC_LZ4 = 0x03
  34. TIMESTAMP_TYPE_MASK = 0x08
  35. HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
  36. def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
  37. timestamp=None):
  38. assert value is None or isinstance(value, bytes), 'value must be bytes'
  39. assert key is None or isinstance(key, bytes), 'key must be bytes'
  40. assert magic > 0 or timestamp is None, 'timestamp not supported in v0'
  41. # Default timestamp to now for v1 messages
  42. if magic > 0 and timestamp is None:
  43. timestamp = int(time.time() * 1000)
  44. self.timestamp = timestamp
  45. self.crc = crc
  46. self._validated_crc = None
  47. self.magic = magic
  48. self.attributes = attributes
  49. self.key = key
  50. self.value = value
  51. self.encode = WeakMethod(self._encode_self)
  52. @property
  53. def timestamp_type(self):
  54. """0 for CreateTime; 1 for LogAppendTime; None if unsupported.
  55. Value is determined by broker; produced messages should always set to 0
  56. Requires Kafka >= 0.10 / message version >= 1
  57. """
  58. if self.magic == 0:
  59. return None
  60. elif self.attributes & self.TIMESTAMP_TYPE_MASK:
  61. return 1
  62. else:
  63. return 0
  64. def _encode_self(self, recalc_crc=True):
  65. version = self.magic
  66. if version == 1:
  67. fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value)
  68. elif version == 0:
  69. fields = (self.crc, self.magic, self.attributes, self.key, self.value)
  70. else:
  71. raise ValueError('Unrecognized message version: %s' % version)
  72. message = Message.SCHEMAS[version].encode(fields)
  73. if not recalc_crc:
  74. return message
  75. self.crc = crc32(message[4:])
  76. crc_field = self.SCHEMAS[version].fields[0]
  77. return crc_field.encode(self.crc) + message[4:]
  78. @classmethod
  79. def decode(cls, data):
  80. _validated_crc = None
  81. if isinstance(data, bytes):
  82. _validated_crc = crc32(data[4:])
  83. data = io.BytesIO(data)
  84. # Partial decode required to determine message version
  85. base_fields = cls.SCHEMAS[0].fields[0:3]
  86. crc, magic, attributes = [field.decode(data) for field in base_fields]
  87. remaining = cls.SCHEMAS[magic].fields[3:]
  88. fields = [field.decode(data) for field in remaining]
  89. if magic == 1:
  90. timestamp = fields[0]
  91. else:
  92. timestamp = None
  93. msg = cls(fields[-1], key=fields[-2],
  94. magic=magic, attributes=attributes, crc=crc,
  95. timestamp=timestamp)
  96. msg._validated_crc = _validated_crc
  97. return msg
  98. def validate_crc(self):
  99. if self._validated_crc is None:
  100. raw_msg = self._encode_self(recalc_crc=False)
  101. self._validated_crc = crc32(raw_msg[4:])
  102. if self.crc == self._validated_crc:
  103. return True
  104. return False
  105. def is_compressed(self):
  106. return self.attributes & self.CODEC_MASK != 0
  107. def decompress(self):
  108. codec = self.attributes & self.CODEC_MASK
  109. assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
  110. if codec == self.CODEC_GZIP:
  111. assert has_gzip(), 'Gzip decompression unsupported'
  112. raw_bytes = gzip_decode(self.value)
  113. elif codec == self.CODEC_SNAPPY:
  114. assert has_snappy(), 'Snappy decompression unsupported'
  115. raw_bytes = snappy_decode(self.value)
  116. elif codec == self.CODEC_LZ4:
  117. assert has_lz4(), 'LZ4 decompression unsupported'
  118. if self.magic == 0:
  119. raw_bytes = lz4_decode_old_kafka(self.value)
  120. else:
  121. raw_bytes = lz4_decode(self.value)
  122. else:
  123. raise Exception('This should be impossible')
  124. return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))
  125. def __hash__(self):
  126. return hash(self._encode_self(recalc_crc=False))
  127. class PartialMessage(bytes):
  128. def __repr__(self):
  129. return 'PartialMessage(%s)' % self
  130. class MessageSet(AbstractType):
  131. ITEM = Schema(
  132. ('offset', Int64),
  133. ('message', Bytes)
  134. )
  135. HEADER_SIZE = 12 # offset + message_size
  136. @classmethod
  137. def encode(cls, items):
  138. # RecordAccumulator encodes messagesets internally
  139. if isinstance(items, (io.BytesIO, KafkaBytes)):
  140. size = Int32.decode(items)
  141. # rewind and return all the bytes
  142. items.seek(items.tell() - 4)
  143. return items.read(size + 4)
  144. encoded_values = []
  145. for (offset, message) in items:
  146. encoded_values.append(Int64.encode(offset))
  147. encoded_values.append(Bytes.encode(message))
  148. encoded = b''.join(encoded_values)
  149. return Bytes.encode(encoded)
  150. @classmethod
  151. def decode(cls, data, bytes_to_read=None):
  152. """Compressed messages should pass in bytes_to_read (via message size)
  153. otherwise, we decode from data as Int32
  154. """
  155. if isinstance(data, bytes):
  156. data = io.BytesIO(data)
  157. if bytes_to_read is None:
  158. bytes_to_read = Int32.decode(data)
  159. # if FetchRequest max_bytes is smaller than the available message set
  160. # the server returns partial data for the final message
  161. # So create an internal buffer to avoid over-reading
  162. raw = io.BytesIO(data.read(bytes_to_read))
  163. items = []
  164. while bytes_to_read:
  165. try:
  166. offset = Int64.decode(raw)
  167. msg_bytes = Bytes.decode(raw)
  168. bytes_to_read -= 8 + 4 + len(msg_bytes)
  169. items.append((offset, len(msg_bytes), Message.decode(msg_bytes)))
  170. except ValueError:
  171. # PartialMessage to signal that max_bytes may be too small
  172. items.append((None, None, PartialMessage()))
  173. break
  174. return items
  175. @classmethod
  176. def repr(cls, messages):
  177. if isinstance(messages, (KafkaBytes, io.BytesIO)):
  178. offset = messages.tell()
  179. decoded = cls.decode(messages)
  180. messages.seek(offset)
  181. messages = decoded
  182. return str([cls.ITEM.repr(m) for m in messages])