123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- from __future__ import absolute_import
- import io
- import time
- from ..codec import (has_gzip, has_snappy, has_lz4,
- gzip_decode, snappy_decode,
- lz4_decode, lz4_decode_old_kafka)
- from .frame import KafkaBytes
- from .struct import Struct
- from .types import (
- Int8, Int32, Int64, Bytes, Schema, AbstractType
- )
- from ..util import crc32, WeakMethod
- class Message(Struct):
- SCHEMAS = [
- Schema(
- ('crc', Int32),
- ('magic', Int8),
- ('attributes', Int8),
- ('key', Bytes),
- ('value', Bytes)),
- Schema(
- ('crc', Int32),
- ('magic', Int8),
- ('attributes', Int8),
- ('timestamp', Int64),
- ('key', Bytes),
- ('value', Bytes)),
- ]
- SCHEMA = SCHEMAS[1]
- CODEC_MASK = 0x07
- CODEC_GZIP = 0x01
- CODEC_SNAPPY = 0x02
- CODEC_LZ4 = 0x03
- TIMESTAMP_TYPE_MASK = 0x08
- HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
- def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
- timestamp=None):
- assert value is None or isinstance(value, bytes), 'value must be bytes'
- assert key is None or isinstance(key, bytes), 'key must be bytes'
- assert magic > 0 or timestamp is None, 'timestamp not supported in v0'
- # Default timestamp to now for v1 messages
- if magic > 0 and timestamp is None:
- timestamp = int(time.time() * 1000)
- self.timestamp = timestamp
- self.crc = crc
- self._validated_crc = None
- self.magic = magic
- self.attributes = attributes
- self.key = key
- self.value = value
- self.encode = WeakMethod(self._encode_self)
- @property
- def timestamp_type(self):
- """0 for CreateTime; 1 for LogAppendTime; None if unsupported.
- Value is determined by broker; produced messages should always set to 0
- Requires Kafka >= 0.10 / message version >= 1
- """
- if self.magic == 0:
- return None
- elif self.attributes & self.TIMESTAMP_TYPE_MASK:
- return 1
- else:
- return 0
- def _encode_self(self, recalc_crc=True):
- version = self.magic
- if version == 1:
- fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value)
- elif version == 0:
- fields = (self.crc, self.magic, self.attributes, self.key, self.value)
- else:
- raise ValueError('Unrecognized message version: %s' % version)
- message = Message.SCHEMAS[version].encode(fields)
- if not recalc_crc:
- return message
- self.crc = crc32(message[4:])
- crc_field = self.SCHEMAS[version].fields[0]
- return crc_field.encode(self.crc) + message[4:]
- @classmethod
- def decode(cls, data):
- _validated_crc = None
- if isinstance(data, bytes):
- _validated_crc = crc32(data[4:])
- data = io.BytesIO(data)
- # Partial decode required to determine message version
- base_fields = cls.SCHEMAS[0].fields[0:3]
- crc, magic, attributes = [field.decode(data) for field in base_fields]
- remaining = cls.SCHEMAS[magic].fields[3:]
- fields = [field.decode(data) for field in remaining]
- if magic == 1:
- timestamp = fields[0]
- else:
- timestamp = None
- msg = cls(fields[-1], key=fields[-2],
- magic=magic, attributes=attributes, crc=crc,
- timestamp=timestamp)
- msg._validated_crc = _validated_crc
- return msg
- def validate_crc(self):
- if self._validated_crc is None:
- raw_msg = self._encode_self(recalc_crc=False)
- self._validated_crc = crc32(raw_msg[4:])
- if self.crc == self._validated_crc:
- return True
- return False
- def is_compressed(self):
- return self.attributes & self.CODEC_MASK != 0
- def decompress(self):
- codec = self.attributes & self.CODEC_MASK
- assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
- if codec == self.CODEC_GZIP:
- assert has_gzip(), 'Gzip decompression unsupported'
- raw_bytes = gzip_decode(self.value)
- elif codec == self.CODEC_SNAPPY:
- assert has_snappy(), 'Snappy decompression unsupported'
- raw_bytes = snappy_decode(self.value)
- elif codec == self.CODEC_LZ4:
- assert has_lz4(), 'LZ4 decompression unsupported'
- if self.magic == 0:
- raw_bytes = lz4_decode_old_kafka(self.value)
- else:
- raw_bytes = lz4_decode(self.value)
- else:
- raise Exception('This should be impossible')
- return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))
- def __hash__(self):
- return hash(self._encode_self(recalc_crc=False))
- class PartialMessage(bytes):
- def __repr__(self):
- return 'PartialMessage(%s)' % self
- class MessageSet(AbstractType):
- ITEM = Schema(
- ('offset', Int64),
- ('message', Bytes)
- )
- HEADER_SIZE = 12 # offset + message_size
- @classmethod
- def encode(cls, items):
- # RecordAccumulator encodes messagesets internally
- if isinstance(items, (io.BytesIO, KafkaBytes)):
- size = Int32.decode(items)
- # rewind and return all the bytes
- items.seek(items.tell() - 4)
- return items.read(size + 4)
- encoded_values = []
- for (offset, message) in items:
- encoded_values.append(Int64.encode(offset))
- encoded_values.append(Bytes.encode(message))
- encoded = b''.join(encoded_values)
- return Bytes.encode(encoded)
- @classmethod
- def decode(cls, data, bytes_to_read=None):
- """Compressed messages should pass in bytes_to_read (via message size)
- otherwise, we decode from data as Int32
- """
- if isinstance(data, bytes):
- data = io.BytesIO(data)
- if bytes_to_read is None:
- bytes_to_read = Int32.decode(data)
- # if FetchRequest max_bytes is smaller than the available message set
- # the server returns partial data for the final message
- # So create an internal buffer to avoid over-reading
- raw = io.BytesIO(data.read(bytes_to_read))
- items = []
- while bytes_to_read:
- try:
- offset = Int64.decode(raw)
- msg_bytes = Bytes.decode(raw)
- bytes_to_read -= 8 + 4 + len(msg_bytes)
- items.append((offset, len(msg_bytes), Message.decode(msg_bytes)))
- except ValueError:
- # PartialMessage to signal that max_bytes may be too small
- items.append((None, None, PartialMessage()))
- break
- return items
- @classmethod
- def repr(cls, messages):
- if isinstance(messages, (KafkaBytes, io.BytesIO)):
- offset = messages.tell()
- decoded = cls.decode(messages)
- messages.seek(offset)
- messages = decoded
- return str([cls.ITEM.repr(m) for m in messages])
|