codec.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. from __future__ import absolute_import
  2. import gzip
  3. import io
  4. import platform
  5. import struct
  6. from kafka.vendor import six
  7. from kafka.vendor.six.moves import xrange # pylint: disable=import-error
  8. _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
  9. _XERIAL_V1_FORMAT = 'bccccccBii'
  10. try:
  11. import snappy
  12. except ImportError:
  13. snappy = None
  14. try:
  15. import lz4.frame as lz4
  16. except ImportError:
  17. lz4 = None
  18. try:
  19. import lz4f
  20. except ImportError:
  21. lz4f = None
  22. try:
  23. import xxhash
  24. except ImportError:
  25. xxhash = None
  26. PYPY = bool(platform.python_implementation() == 'PyPy')
  27. def has_gzip():
  28. return True
  29. def has_snappy():
  30. return snappy is not None
  31. def has_lz4():
  32. if lz4 is not None:
  33. return True
  34. if lz4f is not None:
  35. return True
  36. return False
  37. def gzip_encode(payload, compresslevel=None):
  38. if not compresslevel:
  39. compresslevel = 9
  40. buf = io.BytesIO()
  41. # Gzip context manager introduced in python 2.7
  42. # so old-fashioned way until we decide to not support 2.6
  43. gzipper = gzip.GzipFile(fileobj=buf, mode="w", compresslevel=compresslevel)
  44. try:
  45. gzipper.write(payload)
  46. finally:
  47. gzipper.close()
  48. return buf.getvalue()
  49. def gzip_decode(payload):
  50. buf = io.BytesIO(payload)
  51. # Gzip context manager introduced in python 2.7
  52. # so old-fashioned way until we decide to not support 2.6
  53. gzipper = gzip.GzipFile(fileobj=buf, mode='r')
  54. try:
  55. return gzipper.read()
  56. finally:
  57. gzipper.close()
  58. def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
  59. """Encodes the given data with snappy compression.
  60. If xerial_compatible is set then the stream is encoded in a fashion
  61. compatible with the xerial snappy library.
  62. The block size (xerial_blocksize) controls how frequent the blocking occurs
  63. 32k is the default in the xerial library.
  64. The format winds up being:
  65. +-------------+------------+--------------+------------+--------------+
  66. | Header | Block1 len | Block1 data | Blockn len | Blockn data |
  67. +-------------+------------+--------------+------------+--------------+
  68. | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
  69. +-------------+------------+--------------+------------+--------------+
  70. It is important to note that the blocksize is the amount of uncompressed
  71. data presented to snappy at each block, whereas the blocklen is the number
  72. of bytes that will be present in the stream; so the length will always be
  73. <= blocksize.
  74. """
  75. if not has_snappy():
  76. raise NotImplementedError("Snappy codec is not available")
  77. if not xerial_compatible:
  78. return snappy.compress(payload)
  79. out = io.BytesIO()
  80. for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER):
  81. out.write(struct.pack('!' + fmt, dat))
  82. # Chunk through buffers to avoid creating intermediate slice copies
  83. if PYPY:
  84. # on pypy, snappy.compress() on a sliced buffer consumes the entire
  85. # buffer... likely a python-snappy bug, so just use a slice copy
  86. chunker = lambda payload, i, size: payload[i:size+i]
  87. elif six.PY2:
  88. # Sliced buffer avoids additional copies
  89. # pylint: disable-msg=undefined-variable
  90. chunker = lambda payload, i, size: buffer(payload, i, size)
  91. else:
  92. # snappy.compress does not like raw memoryviews, so we have to convert
  93. # tobytes, which is a copy... oh well. it's the thought that counts.
  94. # pylint: disable-msg=undefined-variable
  95. chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes()
  96. for chunk in (chunker(payload, i, xerial_blocksize)
  97. for i in xrange(0, len(payload), xerial_blocksize)):
  98. block = snappy.compress(chunk)
  99. block_size = len(block)
  100. out.write(struct.pack('!i', block_size))
  101. out.write(block)
  102. return out.getvalue()
  103. def _detect_xerial_stream(payload):
  104. """Detects if the data given might have been encoded with the blocking mode
  105. of the xerial snappy library.
  106. This mode writes a magic header of the format:
  107. +--------+--------------+------------+---------+--------+
  108. | Marker | Magic String | Null / Pad | Version | Compat |
  109. +--------+--------------+------------+---------+--------+
  110. | byte | c-string | byte | int32 | int32 |
  111. +--------+--------------+------------+---------+--------+
  112. | -126 | 'SNAPPY' | \0 | | |
  113. +--------+--------------+------------+---------+--------+
  114. The pad appears to be to ensure that SNAPPY is a valid cstring
  115. The version is the version of this format as written by xerial,
  116. in the wild this is currently 1 as such we only support v1.
  117. Compat is there to claim the miniumum supported version that
  118. can read a xerial block stream, presently in the wild this is
  119. 1.
  120. """
  121. if len(payload) > 16:
  122. header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
  123. return header == _XERIAL_V1_HEADER
  124. return False
  125. def snappy_decode(payload):
  126. if not has_snappy():
  127. raise NotImplementedError("Snappy codec is not available")
  128. if _detect_xerial_stream(payload):
  129. # TODO ? Should become a fileobj ?
  130. out = io.BytesIO()
  131. byt = payload[16:]
  132. length = len(byt)
  133. cursor = 0
  134. while cursor < length:
  135. block_size = struct.unpack_from('!i', byt[cursor:])[0]
  136. # Skip the block size
  137. cursor += 4
  138. end = cursor + block_size
  139. out.write(snappy.decompress(byt[cursor:end]))
  140. cursor = end
  141. out.seek(0)
  142. return out.read()
  143. else:
  144. return snappy.decompress(payload)
  145. if lz4:
  146. lz4_encode = lz4.compress # pylint: disable-msg=no-member
  147. elif lz4f:
  148. lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
  149. else:
  150. lz4_encode = None
  151. def lz4f_decode(payload):
  152. """Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
  153. # pylint: disable-msg=no-member
  154. ctx = lz4f.createDecompContext()
  155. data = lz4f.decompressFrame(payload, ctx)
  156. lz4f.freeDecompContext(ctx)
  157. # lz4f python module does not expose how much of the payload was
  158. # actually read if the decompression was only partial.
  159. if data['next'] != 0:
  160. raise RuntimeError('lz4f unable to decompress full payload')
  161. return data['decomp']
  162. if lz4:
  163. lz4_decode = lz4.decompress # pylint: disable-msg=no-member
  164. elif lz4f:
  165. lz4_decode = lz4f_decode
  166. else:
  167. lz4_decode = None
  168. def lz4_encode_old_kafka(payload):
  169. """Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
  170. assert xxhash is not None
  171. data = lz4_encode(payload)
  172. header_size = 7
  173. flg = data[4]
  174. if not isinstance(flg, int):
  175. flg = ord(flg)
  176. content_size_bit = ((flg >> 3) & 1)
  177. if content_size_bit:
  178. # Old kafka does not accept the content-size field
  179. # so we need to discard it and reset the header flag
  180. flg -= 8
  181. data = bytearray(data)
  182. data[4] = flg
  183. data = bytes(data)
  184. payload = data[header_size+8:]
  185. else:
  186. payload = data[header_size:]
  187. # This is the incorrect hc
  188. hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
  189. return b''.join([
  190. data[0:header_size-1],
  191. hc,
  192. payload
  193. ])
  194. def lz4_decode_old_kafka(payload):
  195. assert xxhash is not None
  196. # Kafka's LZ4 code has a bug in its header checksum implementation
  197. header_size = 7
  198. if isinstance(payload[4], int):
  199. flg = payload[4]
  200. else:
  201. flg = ord(payload[4])
  202. content_size_bit = ((flg >> 3) & 1)
  203. if content_size_bit:
  204. header_size += 8
  205. # This should be the correct hc
  206. hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
  207. munged_payload = b''.join([
  208. payload[0:header_size-1],
  209. hc,
  210. payload[header_size:]
  211. ])
  212. return lz4_decode(munged_payload)