buffer.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. from __future__ import absolute_import, division
  2. import collections
  3. import io
  4. import threading
  5. import time
  6. from ..codec import (has_gzip, has_snappy, has_lz4,
  7. gzip_encode, snappy_encode,
  8. lz4_encode, lz4_encode_old_kafka)
  9. from .. import errors as Errors
  10. from ..metrics.stats import Rate
  11. from ..protocol.types import Int32, Int64
  12. from ..protocol.message import MessageSet, Message
  13. class MessageSetBuffer(object):
  14. """Wrap a buffer for writing MessageSet batches.
  15. Arguments:
  16. buf (IO stream): a buffer for writing data. Typically BytesIO.
  17. batch_size (int): maximum number of bytes to write to the buffer.
  18. Keyword Arguments:
  19. compression_type ('gzip', 'snappy', None): compress messages before
  20. publishing. Default: None.
  21. """
  22. _COMPRESSORS = {
  23. 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
  24. 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
  25. 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
  26. 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
  27. }
  28. def __init__(self, buf, batch_size, compression_type=None, message_version=0):
  29. if compression_type is not None:
  30. assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
  31. # Kafka 0.8/0.9 had a quirky lz4...
  32. if compression_type == 'lz4' and message_version == 0:
  33. compression_type = 'lz4-old-kafka'
  34. checker, encoder, attributes = self._COMPRESSORS[compression_type]
  35. assert checker(), 'Compression Libraries Not Found'
  36. self._compressor = encoder
  37. self._compression_attributes = attributes
  38. else:
  39. self._compressor = None
  40. self._compression_attributes = None
  41. self._message_version = message_version
  42. self._buffer = buf
  43. # Init MessageSetSize to 0 -- update on close
  44. self._buffer.seek(0)
  45. self._buffer.write(Int32.encode(0))
  46. self._batch_size = batch_size
  47. self._closed = False
  48. self._messages = 0
  49. self._bytes_written = 4 # Int32 header is 4 bytes
  50. self._final_size = None
  51. def append(self, offset, message):
  52. """Append a Message to the MessageSet.
  53. Arguments:
  54. offset (int): offset of the message
  55. message (Message or bytes): message struct or encoded bytes
  56. Returns: bytes written
  57. """
  58. if isinstance(message, Message):
  59. encoded = message.encode()
  60. else:
  61. encoded = bytes(message)
  62. msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
  63. self._buffer.write(msg)
  64. self._messages += 1
  65. self._bytes_written += len(msg)
  66. return len(msg)
  67. def has_room_for(self, key, value):
  68. if self._closed:
  69. return False
  70. if not self._messages:
  71. return True
  72. needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
  73. if key is not None:
  74. needed_bytes += len(key)
  75. if value is not None:
  76. needed_bytes += len(value)
  77. return self._buffer.tell() + needed_bytes < self._batch_size
  78. def is_full(self):
  79. if self._closed:
  80. return True
  81. return self._buffer.tell() >= self._batch_size
  82. def close(self):
  83. # This method may be called multiple times on the same batch
  84. # i.e., on retries
  85. # we need to make sure we only close it out once
  86. # otherwise compressed messages may be double-compressed
  87. # see Issue 718
  88. if not self._closed:
  89. if self._compressor:
  90. # TODO: avoid copies with bytearray / memoryview
  91. uncompressed_size = self._buffer.tell()
  92. self._buffer.seek(4)
  93. msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)),
  94. attributes=self._compression_attributes,
  95. magic=self._message_version)
  96. encoded = msg.encode()
  97. self._buffer.seek(4)
  98. self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
  99. self._buffer.write(Int32.encode(len(encoded)))
  100. self._buffer.write(encoded)
  101. # Update the message set size (less the 4 byte header),
  102. # and return with buffer ready for full read()
  103. self._final_size = self._buffer.tell()
  104. self._buffer.seek(0)
  105. self._buffer.write(Int32.encode(self._final_size - 4))
  106. self._buffer.seek(0)
  107. self._closed = True
  108. def size_in_bytes(self):
  109. return self._final_size or self._buffer.tell()
  110. def compression_rate(self):
  111. return self.size_in_bytes() / self._bytes_written
  112. def buffer(self):
  113. return self._buffer
  114. class SimpleBufferPool(object):
  115. """A simple pool of BytesIO objects with a weak memory ceiling."""
  116. def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'):
  117. """Create a new buffer pool.
  118. Arguments:
  119. memory (int): maximum memory that this buffer pool can allocate
  120. poolable_size (int): memory size per buffer to cache in the free
  121. list rather than deallocating
  122. """
  123. self._poolable_size = poolable_size
  124. self._lock = threading.RLock()
  125. buffers = int(memory / poolable_size) if poolable_size else 0
  126. self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
  127. self._waiters = collections.deque()
  128. self.wait_time = None
  129. if metrics:
  130. self.wait_time = metrics.sensor('bufferpool-wait-time')
  131. self.wait_time.add(metrics.metric_name(
  132. 'bufferpool-wait-ratio', metric_group_prefix,
  133. 'The fraction of time an appender waits for space allocation.'),
  134. Rate())
  135. def allocate(self, size, max_time_to_block_ms):
  136. """
  137. Allocate a buffer of the given size. This method blocks if there is not
  138. enough memory and the buffer pool is configured with blocking mode.
  139. Arguments:
  140. size (int): The buffer size to allocate in bytes [ignored]
  141. max_time_to_block_ms (int): The maximum time in milliseconds to
  142. block for buffer memory to be available
  143. Returns:
  144. io.BytesIO
  145. """
  146. with self._lock:
  147. # check if we have a free buffer of the right size pooled
  148. if self._free:
  149. return self._free.popleft()
  150. elif self._poolable_size == 0:
  151. return io.BytesIO()
  152. else:
  153. # we are out of buffers and will have to block
  154. buf = None
  155. more_memory = threading.Condition(self._lock)
  156. self._waiters.append(more_memory)
  157. # loop over and over until we have a buffer or have reserved
  158. # enough memory to allocate one
  159. while buf is None:
  160. start_wait = time.time()
  161. more_memory.wait(max_time_to_block_ms / 1000.0)
  162. end_wait = time.time()
  163. if self.wait_time:
  164. self.wait_time.record(end_wait - start_wait)
  165. if self._free:
  166. buf = self._free.popleft()
  167. else:
  168. self._waiters.remove(more_memory)
  169. raise Errors.KafkaTimeoutError(
  170. "Failed to allocate memory within the configured"
  171. " max blocking time")
  172. # remove the condition for this thread to let the next thread
  173. # in line start getting memory
  174. removed = self._waiters.popleft()
  175. assert removed is more_memory, 'Wrong condition'
  176. # signal any additional waiters if there is more memory left
  177. # over for them
  178. if self._free and self._waiters:
  179. self._waiters[0].notify()
  180. # unlock and return the buffer
  181. return buf
  182. def deallocate(self, buf):
  183. """
  184. Return buffers to the pool. If they are of the poolable size add them
  185. to the free list, otherwise just mark the memory as free.
  186. Arguments:
  187. buffer_ (io.BytesIO): The buffer to return
  188. """
  189. with self._lock:
  190. # BytesIO.truncate here makes the pool somewhat pointless
  191. # but we stick with the BufferPool API until migrating to
  192. # bytesarray / memoryview. The buffer we return must not
  193. # expose any prior data on read().
  194. buf.truncate(0)
  195. self._free.append(buf)
  196. if self._waiters:
  197. self._waiters[0].notify()
  198. def queued(self):
  199. """The number of threads blocked waiting on memory."""
  200. with self._lock:
  201. return len(self._waiters)
  202. '''
  203. class BufferPool(object):
  204. """
  205. A pool of ByteBuffers kept under a given memory limit. This class is fairly
  206. specific to the needs of the producer. In particular it has the following
  207. properties:
  208. * There is a special "poolable size" and buffers of this size are kept in a
  209. free list and recycled
  210. * It is fair. That is all memory is given to the longest waiting thread
  211. until it has sufficient memory. This prevents starvation or deadlock when
  212. a thread asks for a large chunk of memory and needs to block until
  213. multiple buffers are deallocated.
  214. """
  215. def __init__(self, memory, poolable_size):
  216. """Create a new buffer pool.
  217. Arguments:
  218. memory (int): maximum memory that this buffer pool can allocate
  219. poolable_size (int): memory size per buffer to cache in the free
  220. list rather than deallocating
  221. """
  222. self._poolable_size = poolable_size
  223. self._lock = threading.RLock()
  224. self._free = collections.deque()
  225. self._waiters = collections.deque()
  226. self._total_memory = memory
  227. self._available_memory = memory
  228. #self.metrics = metrics;
  229. #self.waitTime = this.metrics.sensor("bufferpool-wait-time");
  230. #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
  231. #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
  232. def allocate(self, size, max_time_to_block_ms):
  233. """
  234. Allocate a buffer of the given size. This method blocks if there is not
  235. enough memory and the buffer pool is configured with blocking mode.
  236. Arguments:
  237. size (int): The buffer size to allocate in bytes
  238. max_time_to_block_ms (int): The maximum time in milliseconds to
  239. block for buffer memory to be available
  240. Returns:
  241. buffer
  242. Raises:
  243. InterruptedException If the thread is interrupted while blocked
  244. IllegalArgumentException if size is larger than the total memory
  245. controlled by the pool (and hence we would block forever)
  246. """
  247. assert size <= self._total_memory, (
  248. "Attempt to allocate %d bytes, but there is a hard limit of %d on"
  249. " memory allocations." % (size, self._total_memory))
  250. with self._lock:
  251. # check if we have a free buffer of the right size pooled
  252. if (size == self._poolable_size and len(self._free) > 0):
  253. return self._free.popleft()
  254. # now check if the request is immediately satisfiable with the
  255. # memory on hand or if we need to block
  256. free_list_size = len(self._free) * self._poolable_size
  257. if self._available_memory + free_list_size >= size:
  258. # we have enough unallocated or pooled memory to immediately
  259. # satisfy the request
  260. self._free_up(size)
  261. self._available_memory -= size
  262. raise NotImplementedError()
  263. #return ByteBuffer.allocate(size)
  264. else:
  265. # we are out of memory and will have to block
  266. accumulated = 0
  267. buf = None
  268. more_memory = threading.Condition(self._lock)
  269. self._waiters.append(more_memory)
  270. # loop over and over until we have a buffer or have reserved
  271. # enough memory to allocate one
  272. while (accumulated < size):
  273. start_wait = time.time()
  274. if not more_memory.wait(max_time_to_block_ms / 1000.0):
  275. raise Errors.KafkaTimeoutError(
  276. "Failed to allocate memory within the configured"
  277. " max blocking time")
  278. end_wait = time.time()
  279. #this.waitTime.record(endWait - startWait, time.milliseconds());
  280. # check if we can satisfy this request from the free list,
  281. # otherwise allocate memory
  282. if (accumulated == 0
  283. and size == self._poolable_size
  284. and self._free):
  285. # just grab a buffer from the free list
  286. buf = self._free.popleft()
  287. accumulated = size
  288. else:
  289. # we'll need to allocate memory, but we may only get
  290. # part of what we need on this iteration
  291. self._free_up(size - accumulated)
  292. got = min(size - accumulated, self._available_memory)
  293. self._available_memory -= got
  294. accumulated += got
  295. # remove the condition for this thread to let the next thread
  296. # in line start getting memory
  297. removed = self._waiters.popleft()
  298. assert removed is more_memory, 'Wrong condition'
  299. # signal any additional waiters if there is more memory left
  300. # over for them
  301. if (self._available_memory > 0 or len(self._free) > 0):
  302. if len(self._waiters) > 0:
  303. self._waiters[0].notify()
  304. # unlock and return the buffer
  305. if buf is None:
  306. raise NotImplementedError()
  307. #return ByteBuffer.allocate(size)
  308. else:
  309. return buf
  310. def _free_up(self, size):
  311. """
  312. Attempt to ensure we have at least the requested number of bytes of
  313. memory for allocation by deallocating pooled buffers (if needed)
  314. """
  315. while self._free and self._available_memory < size:
  316. self._available_memory += self._free.pop().capacity
  317. def deallocate(self, buffer_, size=None):
  318. """
  319. Return buffers to the pool. If they are of the poolable size add them
  320. to the free list, otherwise just mark the memory as free.
  321. Arguments:
  322. buffer (io.BytesIO): The buffer to return
  323. size (int): The size of the buffer to mark as deallocated, note
  324. that this maybe smaller than buffer.capacity since the buffer
  325. may re-allocate itself during in-place compression
  326. """
  327. with self._lock:
  328. if size is None:
  329. size = buffer_.capacity
  330. if (size == self._poolable_size and size == buffer_.capacity):
  331. buffer_.seek(0)
  332. buffer_.truncate()
  333. self._free.append(buffer_)
  334. else:
  335. self._available_memory += size
  336. if self._waiters:
  337. more_mem = self._waiters[0]
  338. more_mem.notify()
  339. def available_memory(self):
  340. """The total free memory both unallocated and in the free list."""
  341. with self._lock:
  342. return self._available_memory + len(self._free) * self._poolable_size
  343. def unallocated_memory(self):
  344. """Get the unallocated memory (not in the free list or in use)."""
  345. with self._lock:
  346. return self._available_memory
  347. def queued(self):
  348. """The number of threads blocked waiting on memory."""
  349. with self._lock:
  350. return len(self._waiters)
  351. def poolable_size(self):
  352. """The buffer size that will be retained in the free list after use."""
  353. return self._poolable_size
  354. def total_memory(self):
  355. """The total memory managed by this pool."""
  356. return self._total_memory
  357. '''