fetcher.py 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029
  1. from __future__ import absolute_import
  2. import collections
  3. import copy
  4. import logging
  5. import random
  6. import sys
  7. import time
  8. from kafka.vendor import six
  9. import kafka.errors as Errors
  10. from kafka.future import Future
  11. from kafka.metrics.stats import Avg, Count, Max, Rate
  12. from kafka.protocol.fetch import FetchRequest
  13. from kafka.protocol.message import PartialMessage
  14. from kafka.protocol.offset import (
  15. OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
  16. )
  17. from kafka.serializer import Deserializer
  18. from kafka.structs import TopicPartition, OffsetAndTimestamp
  19. log = logging.getLogger(__name__)
  20. ConsumerRecord = collections.namedtuple("ConsumerRecord",
  21. ["topic", "partition", "offset", "timestamp", "timestamp_type",
  22. "key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
  23. class NoOffsetForPartitionError(Errors.KafkaError):
  24. pass
  25. class RecordTooLargeError(Errors.KafkaError):
  26. pass
  27. class Fetcher(six.Iterator):
  28. DEFAULT_CONFIG = {
  29. 'key_deserializer': None,
  30. 'value_deserializer': None,
  31. 'fetch_min_bytes': 1,
  32. 'fetch_max_wait_ms': 500,
  33. 'fetch_max_bytes': 52428800,
  34. 'max_partition_fetch_bytes': 1048576,
  35. 'max_poll_records': sys.maxsize,
  36. 'check_crcs': True,
  37. 'skip_double_compressed_messages': False,
  38. 'iterator_refetch_records': 1, # undocumented -- interface may change
  39. 'metric_group_prefix': 'consumer',
  40. 'api_version': (0, 8, 0),
  41. 'retry_backoff_ms': 100
  42. }
  43. def __init__(self, client, subscriptions, metrics, **configs):
  44. """Initialize a Kafka Message Fetcher.
  45. Keyword Arguments:
  46. key_deserializer (callable): Any callable that takes a
  47. raw message key and returns a deserialized key.
  48. value_deserializer (callable, optional): Any callable that takes a
  49. raw message value and returns a deserialized value.
  50. fetch_min_bytes (int): Minimum amount of data the server should
  51. return for a fetch request, otherwise wait up to
  52. fetch_max_wait_ms for more data to accumulate. Default: 1.
  53. fetch_max_wait_ms (int): The maximum amount of time in milliseconds
  54. the server will block before answering the fetch request if
  55. there isn't sufficient data to immediately satisfy the
  56. requirement given by fetch_min_bytes. Default: 500.
  57. fetch_max_bytes (int): The maximum amount of data the server should
  58. return for a fetch request. This is not an absolute maximum, if
  59. the first message in the first non-empty partition of the fetch
  60. is larger than this value, the message will still be returned
  61. to ensure that the consumer can make progress. NOTE: consumer
  62. performs fetches to multiple brokers in parallel so memory
  63. usage will depend on the number of brokers containing
  64. partitions for the topic.
  65. Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
  66. max_partition_fetch_bytes (int): The maximum amount of data
  67. per-partition the server will return. The maximum total memory
  68. used for a request = #partitions * max_partition_fetch_bytes.
  69. This size must be at least as large as the maximum message size
  70. the server allows or else it is possible for the producer to
  71. send messages larger than the consumer can fetch. If that
  72. happens, the consumer can get stuck trying to fetch a large
  73. message on a certain partition. Default: 1048576.
  74. check_crcs (bool): Automatically check the CRC32 of the records
  75. consumed. This ensures no on-the-wire or on-disk corruption to
  76. the messages occurred. This check adds some overhead, so it may
  77. be disabled in cases seeking extreme performance. Default: True
  78. skip_double_compressed_messages (bool): A bug in KafkaProducer
  79. caused some messages to be corrupted via double-compression.
  80. By default, the fetcher will return the messages as a compressed
  81. blob of bytes with a single offset, i.e. how the message was
  82. actually published to the cluster. If you prefer to have the
  83. fetcher automatically detect corrupt messages and skip them,
  84. set this option to True. Default: False.
  85. """
  86. self.config = copy.copy(self.DEFAULT_CONFIG)
  87. for key in self.config:
  88. if key in configs:
  89. self.config[key] = configs[key]
  90. self._client = client
  91. self._subscriptions = subscriptions
  92. self._records = collections.deque() # (offset, topic_partition, messages)
  93. self._unauthorized_topics = set()
  94. self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
  95. self._record_too_large_partitions = dict() # {topic_partition: offset}
  96. self._iterator = None
  97. self._fetch_futures = collections.deque()
  98. self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
  99. def send_fetches(self):
  100. """Send FetchRequests asynchronously for all assigned partitions.
  101. Note: noop if there are unconsumed records internal to the fetcher
  102. Returns:
  103. List of Futures: each future resolves to a FetchResponse
  104. """
  105. futures = []
  106. for node_id, request in six.iteritems(self._create_fetch_requests()):
  107. if self._client.ready(node_id):
  108. log.debug("Sending FetchRequest to node %s", node_id)
  109. future = self._client.send(node_id, request)
  110. future.error_on_callbacks=True
  111. future.add_callback(self._handle_fetch_response, request, time.time())
  112. future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
  113. futures.append(future)
  114. self._fetch_futures.extend(futures)
  115. self._clean_done_fetch_futures()
  116. return futures
  117. def _clean_done_fetch_futures(self):
  118. while True:
  119. if not self._fetch_futures:
  120. break
  121. if not self._fetch_futures[0].is_done:
  122. break
  123. self._fetch_futures.popleft()
  124. def in_flight_fetches(self):
  125. """Return True if there are any unprocessed FetchRequests in flight."""
  126. self._clean_done_fetch_futures()
  127. return bool(self._fetch_futures)
  128. def update_fetch_positions(self, partitions):
  129. """Update the fetch positions for the provided partitions.
  130. Arguments:
  131. partitions (list of TopicPartitions): partitions to update
  132. Raises:
  133. NoOffsetForPartitionError: if no offset is stored for a given
  134. partition and no reset policy is available
  135. """
  136. # reset the fetch position to the committed position
  137. for tp in partitions:
  138. if not self._subscriptions.is_assigned(tp):
  139. log.warning("partition %s is not assigned - skipping offset"
  140. " update", tp)
  141. continue
  142. elif self._subscriptions.is_fetchable(tp):
  143. log.warning("partition %s is still fetchable -- skipping offset"
  144. " update", tp)
  145. continue
  146. # TODO: If there are several offsets to reset,
  147. # we could submit offset requests in parallel
  148. # for now, each call to _reset_offset will block
  149. if self._subscriptions.is_offset_reset_needed(tp):
  150. self._reset_offset(tp)
  151. elif self._subscriptions.assignment[tp].committed is None:
  152. # there's no committed position, so we need to reset with the
  153. # default strategy
  154. self._subscriptions.need_offset_reset(tp)
  155. self._reset_offset(tp)
  156. else:
  157. committed = self._subscriptions.assignment[tp].committed
  158. log.debug("Resetting offset for partition %s to the committed"
  159. " offset %s", tp, committed)
  160. self._subscriptions.seek(tp, committed)
  161. def get_offsets_by_times(self, timestamps, timeout_ms):
  162. offsets = self._retrieve_offsets(timestamps, timeout_ms)
  163. for tp in timestamps:
  164. if tp not in offsets:
  165. offsets[tp] = None
  166. else:
  167. offset, timestamp = offsets[tp]
  168. offsets[tp] = OffsetAndTimestamp(offset, timestamp)
  169. return offsets
  170. def beginning_offsets(self, partitions, timeout_ms):
  171. return self.beginning_or_end_offset(
  172. partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
  173. def end_offsets(self, partitions, timeout_ms):
  174. return self.beginning_or_end_offset(
  175. partitions, OffsetResetStrategy.LATEST, timeout_ms)
  176. def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
  177. timestamps = dict([(tp, timestamp) for tp in partitions])
  178. offsets = self._retrieve_offsets(timestamps, timeout_ms)
  179. for tp in timestamps:
  180. offsets[tp] = offsets[tp][0]
  181. return offsets
  182. def _reset_offset(self, partition):
  183. """Reset offsets for the given partition using the offset reset strategy.
  184. Arguments:
  185. partition (TopicPartition): the partition that needs reset offset
  186. Raises:
  187. NoOffsetForPartitionError: if no offset reset strategy is defined
  188. """
  189. timestamp = self._subscriptions.assignment[partition].reset_strategy
  190. if timestamp is OffsetResetStrategy.EARLIEST:
  191. strategy = 'earliest'
  192. elif timestamp is OffsetResetStrategy.LATEST:
  193. strategy = 'latest'
  194. else:
  195. raise NoOffsetForPartitionError(partition)
  196. log.debug("Resetting offset for partition %s to %s offset.",
  197. partition, strategy)
  198. offsets = self._retrieve_offsets({partition: timestamp})
  199. if partition not in offsets:
  200. raise NoOffsetForPartitionError(partition)
  201. offset = offsets[partition][0]
  202. # we might lose the assignment while fetching the offset,
  203. # so check it is still active
  204. if self._subscriptions.is_assigned(partition):
  205. self._subscriptions.seek(partition, offset)
  206. def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
  207. """Fetch offset for each partition passed in ``timestamps`` map.
  208. Blocks until offsets are obtained, a non-retriable exception is raised
  209. or ``timeout_ms`` passed.
  210. Arguments:
  211. timestamps: {TopicPartition: int} dict with timestamps to fetch
  212. offsets by. -1 for the latest available, -2 for the earliest
  213. available. Otherwise timestamp is treated as epoch miliseconds.
  214. Returns:
  215. {TopicPartition: (int, int)}: Mapping of partition to
  216. retrieved offset and timestamp. If offset does not exist for
  217. the provided timestamp, that partition will be missing from
  218. this mapping.
  219. """
  220. if not timestamps:
  221. return {}
  222. start_time = time.time()
  223. remaining_ms = timeout_ms
  224. while remaining_ms > 0:
  225. future = self._send_offset_requests(timestamps)
  226. self._client.poll(future=future, timeout_ms=remaining_ms)
  227. if future.succeeded():
  228. return future.value
  229. if not future.retriable():
  230. raise future.exception # pylint: disable-msg=raising-bad-type
  231. elapsed_ms = (time.time() - start_time) * 1000
  232. remaining_ms = timeout_ms - elapsed_ms
  233. if remaining_ms < 0:
  234. break
  235. if future.exception.invalid_metadata:
  236. refresh_future = self._client.cluster.request_update()
  237. self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
  238. else:
  239. time.sleep(self.config['retry_backoff_ms'] / 1000.0)
  240. elapsed_ms = (time.time() - start_time) * 1000
  241. remaining_ms = timeout_ms - elapsed_ms
  242. raise Errors.KafkaTimeoutError(
  243. "Failed to get offsets by timestamps in %s ms" % timeout_ms)
  244. def _raise_if_offset_out_of_range(self):
  245. """Check FetchResponses for offset out of range.
  246. Raises:
  247. OffsetOutOfRangeError: if any partition from previous FetchResponse
  248. contains OffsetOutOfRangeError and the default_reset_policy is
  249. None
  250. """
  251. if not self._offset_out_of_range_partitions:
  252. return
  253. current_out_of_range_partitions = {}
  254. # filter only the fetchable partitions
  255. for partition, offset in six.iteritems(self._offset_out_of_range_partitions):
  256. if not self._subscriptions.is_fetchable(partition):
  257. log.debug("Ignoring fetched records for %s since it is no"
  258. " longer fetchable", partition)
  259. continue
  260. position = self._subscriptions.assignment[partition].position
  261. # ignore partition if the current position != offset in FetchResponse
  262. # e.g. after seek()
  263. if position is not None and offset == position:
  264. current_out_of_range_partitions[partition] = position
  265. self._offset_out_of_range_partitions.clear()
  266. if current_out_of_range_partitions:
  267. raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions)
  268. def _raise_if_unauthorized_topics(self):
  269. """Check FetchResponses for topic authorization failures.
  270. Raises:
  271. TopicAuthorizationFailedError
  272. """
  273. if self._unauthorized_topics:
  274. topics = set(self._unauthorized_topics)
  275. self._unauthorized_topics.clear()
  276. raise Errors.TopicAuthorizationFailedError(topics)
  277. def _raise_if_record_too_large(self):
  278. """Check FetchResponses for messages larger than the max per partition.
  279. Raises:
  280. RecordTooLargeError: if there is a message larger than fetch size
  281. """
  282. if not self._record_too_large_partitions:
  283. return
  284. copied_record_too_large_partitions = dict(self._record_too_large_partitions)
  285. self._record_too_large_partitions.clear()
  286. raise RecordTooLargeError(
  287. "There are some messages at [Partition=Offset]: %s "
  288. " whose size is larger than the fetch size %s"
  289. " and hence cannot be ever returned."
  290. " Increase the fetch size, or decrease the maximum message"
  291. " size the broker will allow.",
  292. copied_record_too_large_partitions,
  293. self.config['max_partition_fetch_bytes'])
  294. def fetched_records(self, max_records=None):
  295. """Returns previously fetched records and updates consumed offsets.
  296. Arguments:
  297. max_records (int): Maximum number of records returned. Defaults
  298. to max_poll_records configuration.
  299. Raises:
  300. OffsetOutOfRangeError: if no subscription offset_reset_strategy
  301. InvalidMessageError: if message crc validation fails (check_crcs
  302. must be set to True)
  303. RecordTooLargeError: if a message is larger than the currently
  304. configured max_partition_fetch_bytes
  305. TopicAuthorizationError: if consumer is not authorized to fetch
  306. messages from the topic
  307. Returns: (records (dict), partial (bool))
  308. records: {TopicPartition: [messages]}
  309. partial: True if records returned did not fully drain any pending
  310. partition requests. This may be useful for choosing when to
  311. pipeline additional fetch requests.
  312. """
  313. if max_records is None:
  314. max_records = self.config['max_poll_records']
  315. assert max_records > 0
  316. if self._subscriptions.needs_partition_assignment:
  317. return {}, False
  318. self._raise_if_offset_out_of_range()
  319. self._raise_if_unauthorized_topics()
  320. self._raise_if_record_too_large()
  321. drained = collections.defaultdict(list)
  322. partial = bool(self._records and max_records)
  323. while self._records and max_records > 0:
  324. part = self._records.popleft()
  325. max_records -= self._append(drained, part, max_records)
  326. if part.has_more():
  327. self._records.appendleft(part)
  328. else:
  329. partial &= False
  330. return dict(drained), partial
  331. def _append(self, drained, part, max_records):
  332. tp = part.topic_partition
  333. fetch_offset = part.fetch_offset
  334. if not self._subscriptions.is_assigned(tp):
  335. # this can happen when a rebalance happened before
  336. # fetched records are returned to the consumer's poll call
  337. log.debug("Not returning fetched records for partition %s"
  338. " since it is no longer assigned", tp)
  339. else:
  340. # note that the position should always be available
  341. # as long as the partition is still assigned
  342. position = self._subscriptions.assignment[tp].position
  343. if not self._subscriptions.is_fetchable(tp):
  344. # this can happen when a partition is paused before
  345. # fetched records are returned to the consumer's poll call
  346. log.debug("Not returning fetched records for assigned partition"
  347. " %s since it is no longer fetchable", tp)
  348. elif fetch_offset == position:
  349. part_records = part.take(max_records)
  350. if not part_records:
  351. return 0
  352. next_offset = part_records[-1].offset + 1
  353. log.log(0, "Returning fetched records at offset %d for assigned"
  354. " partition %s and update position to %s", position,
  355. tp, next_offset)
  356. for record in part_records:
  357. # Fetched compressed messages may include additional records
  358. if record.offset < fetch_offset:
  359. log.debug("Skipping message offset: %s (expecting %s)",
  360. record.offset, fetch_offset)
  361. continue
  362. drained[tp].append(record)
  363. self._subscriptions.assignment[tp].position = next_offset
  364. return len(part_records)
  365. else:
  366. # these records aren't next in line based on the last consumed
  367. # position, ignore them they must be from an obsolete request
  368. log.debug("Ignoring fetched records for %s at offset %s since"
  369. " the current position is %d", tp, part.fetch_offset,
  370. position)
  371. part.discard()
  372. return 0
  373. def _message_generator(self):
  374. """Iterate over fetched_records"""
  375. if self._subscriptions.needs_partition_assignment:
  376. raise StopIteration('Subscription needs partition assignment')
  377. while self._records:
  378. # Check on each iteration since this is a generator
  379. self._raise_if_offset_out_of_range()
  380. self._raise_if_unauthorized_topics()
  381. self._raise_if_record_too_large()
  382. # Send additional FetchRequests when the internal queue is low
  383. # this should enable moderate pipelining
  384. if len(self._records) <= self.config['iterator_refetch_records']:
  385. self.send_fetches()
  386. part = self._records.popleft()
  387. tp = part.topic_partition
  388. fetch_offset = part.fetch_offset
  389. if not self._subscriptions.is_assigned(tp):
  390. # this can happen when a rebalance happened before
  391. # fetched records are returned
  392. log.debug("Not returning fetched records for partition %s"
  393. " since it is no longer assigned", tp)
  394. continue
  395. # note that the position should always be available
  396. # as long as the partition is still assigned
  397. position = self._subscriptions.assignment[tp].position
  398. if not self._subscriptions.is_fetchable(tp):
  399. # this can happen when a partition is paused before
  400. # fetched records are returned
  401. log.debug("Not returning fetched records for assigned partition"
  402. " %s since it is no longer fetchable", tp)
  403. elif fetch_offset == position:
  404. log.log(0, "Returning fetched records at offset %d for assigned"
  405. " partition %s", position, tp)
  406. # We can ignore any prior signal to drop pending message sets
  407. # because we are starting from a fresh one where fetch_offset == position
  408. # i.e., the user seek()'d to this position
  409. self._subscriptions.assignment[tp].drop_pending_message_set = False
  410. for msg in part.messages:
  411. # Because we are in a generator, it is possible for
  412. # subscription state to change between yield calls
  413. # so we need to re-check on each loop
  414. # this should catch assignment changes, pauses
  415. # and resets via seek_to_beginning / seek_to_end
  416. if not self._subscriptions.is_fetchable(tp):
  417. log.debug("Not returning fetched records for partition %s"
  418. " since it is no longer fetchable", tp)
  419. break
  420. # If there is a seek during message iteration,
  421. # we should stop unpacking this message set and
  422. # wait for a new fetch response that aligns with the
  423. # new seek position
  424. elif self._subscriptions.assignment[tp].drop_pending_message_set:
  425. log.debug("Skipping remainder of message set for partition %s", tp)
  426. self._subscriptions.assignment[tp].drop_pending_message_set = False
  427. break
  428. # Compressed messagesets may include earlier messages
  429. elif msg.offset < self._subscriptions.assignment[tp].position:
  430. log.debug("Skipping message offset: %s (expecting %s)",
  431. msg.offset,
  432. self._subscriptions.assignment[tp].position)
  433. continue
  434. self._subscriptions.assignment[tp].position = msg.offset + 1
  435. yield msg
  436. else:
  437. # these records aren't next in line based on the last consumed
  438. # position, ignore them they must be from an obsolete request
  439. log.debug("Ignoring fetched records for %s at offset %s since"
  440. " the current position is %d", tp, part.fetch_offset,
  441. position)
  442. def _unpack_message_set(self, tp, messages):
  443. try:
  444. for offset, size, msg in messages:
  445. if self.config['check_crcs'] and not msg.validate_crc():
  446. raise Errors.InvalidMessageError(msg)
  447. elif msg.is_compressed():
  448. # If relative offset is used, we need to decompress the entire message first to compute
  449. # the absolute offset.
  450. inner_mset = msg.decompress()
  451. # There should only ever be a single layer of compression
  452. if inner_mset[0][-1].is_compressed():
  453. log.warning('MessageSet at %s offset %d appears '
  454. ' double-compressed. This should not'
  455. ' happen -- check your producers!',
  456. tp, offset)
  457. if self.config['skip_double_compressed_messages']:
  458. log.warning('Skipping double-compressed message at'
  459. ' %s %d', tp, offset)
  460. continue
  461. if msg.magic > 0:
  462. last_offset, _, _ = inner_mset[-1]
  463. absolute_base_offset = offset - last_offset
  464. else:
  465. absolute_base_offset = -1
  466. for inner_offset, inner_size, inner_msg in inner_mset:
  467. if msg.magic > 0:
  468. # When magic value is greater than 0, the timestamp
  469. # of a compressed message depends on the
  470. # typestamp type of the wrapper message:
  471. if msg.timestamp_type == 0: # CREATE_TIME (0)
  472. inner_timestamp = inner_msg.timestamp
  473. elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
  474. inner_timestamp = msg.timestamp
  475. else:
  476. raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type))
  477. else:
  478. inner_timestamp = msg.timestamp
  479. if absolute_base_offset >= 0:
  480. inner_offset += absolute_base_offset
  481. key = self._deserialize(
  482. self.config['key_deserializer'],
  483. tp.topic, inner_msg.key)
  484. value = self._deserialize(
  485. self.config['value_deserializer'],
  486. tp.topic, inner_msg.value)
  487. yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
  488. inner_timestamp, msg.timestamp_type,
  489. key, value, inner_msg.crc,
  490. len(inner_msg.key) if inner_msg.key is not None else -1,
  491. len(inner_msg.value) if inner_msg.value is not None else -1)
  492. else:
  493. key = self._deserialize(
  494. self.config['key_deserializer'],
  495. tp.topic, msg.key)
  496. value = self._deserialize(
  497. self.config['value_deserializer'],
  498. tp.topic, msg.value)
  499. yield ConsumerRecord(tp.topic, tp.partition, offset,
  500. msg.timestamp, msg.timestamp_type,
  501. key, value, msg.crc,
  502. len(msg.key) if msg.key is not None else -1,
  503. len(msg.value) if msg.value is not None else -1)
  504. # If unpacking raises StopIteration, it is erroneously
  505. # caught by the generator. We want all exceptions to be raised
  506. # back to the user. See Issue 545
  507. except StopIteration as e:
  508. log.exception('StopIteration raised unpacking messageset: %s', e)
  509. raise Exception('StopIteration raised unpacking messageset')
  510. # If unpacking raises AssertionError, it means decompression unsupported
  511. # See Issue 1033
  512. except AssertionError as e:
  513. log.exception('AssertionError raised unpacking messageset: %s', e)
  514. raise
  515. def __iter__(self): # pylint: disable=non-iterator-returned
  516. return self
  517. def __next__(self):
  518. if not self._iterator:
  519. self._iterator = self._message_generator()
  520. try:
  521. return next(self._iterator)
  522. except StopIteration:
  523. self._iterator = None
  524. raise
  525. def _deserialize(self, f, topic, bytes_):
  526. if not f:
  527. return bytes_
  528. if isinstance(f, Deserializer):
  529. return f.deserialize(topic, bytes_)
  530. return f(bytes_)
  531. def _send_offset_requests(self, timestamps):
  532. """Fetch offsets for each partition in timestamps dict. This may send
  533. request to multiple nodes, based on who is Leader for partition.
  534. Arguments:
  535. timestamps (dict): {TopicPartition: int} mapping of fetching
  536. timestamps.
  537. Returns:
  538. Future: resolves to a mapping of retrieved offsets
  539. """
  540. timestamps_by_node = collections.defaultdict(dict)
  541. for partition, timestamp in six.iteritems(timestamps):
  542. node_id = self._client.cluster.leader_for_partition(partition)
  543. if node_id is None:
  544. self._client.add_topic(partition.topic)
  545. log.debug("Partition %s is unknown for fetching offset,"
  546. " wait for metadata refresh", partition)
  547. return Future().failure(Errors.StaleMetadata(partition))
  548. elif node_id == -1:
  549. log.debug("Leader for partition %s unavailable for fetching "
  550. "offset, wait for metadata refresh", partition)
  551. return Future().failure(
  552. Errors.LeaderNotAvailableError(partition))
  553. else:
  554. timestamps_by_node[node_id][partition] = timestamp
  555. # Aggregate results until we have all
  556. list_offsets_future = Future()
  557. responses = []
  558. node_count = len(timestamps_by_node)
  559. def on_success(value):
  560. responses.append(value)
  561. if len(responses) == node_count:
  562. offsets = {}
  563. for r in responses:
  564. offsets.update(r)
  565. list_offsets_future.success(offsets)
  566. def on_fail(err):
  567. if not list_offsets_future.is_done:
  568. list_offsets_future.failure(err)
  569. for node_id, timestamps in six.iteritems(timestamps_by_node):
  570. _f = self._send_offset_request(node_id, timestamps)
  571. _f.add_callback(on_success)
  572. _f.add_errback(on_fail)
  573. return list_offsets_future
  574. def _send_offset_request(self, node_id, timestamps):
  575. by_topic = collections.defaultdict(list)
  576. for tp, timestamp in six.iteritems(timestamps):
  577. if self.config['api_version'] >= (0, 10, 1):
  578. data = (tp.partition, timestamp)
  579. else:
  580. data = (tp.partition, timestamp, 1)
  581. by_topic[tp.topic].append(data)
  582. if self.config['api_version'] >= (0, 10, 1):
  583. request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
  584. else:
  585. request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))
  586. # Client returns a future that only fails on network issues
  587. # so create a separate future and attach a callback to update it
  588. # based on response error codes
  589. future = Future()
  590. _f = self._client.send(node_id, request)
  591. _f.add_callback(self._handle_offset_response, future)
  592. _f.add_errback(lambda e: future.failure(e))
  593. return future
  594. def _handle_offset_response(self, future, response):
  595. """Callback for the response of the list offset call above.
  596. Arguments:
  597. future (Future): the future to update based on response
  598. response (OffsetResponse): response from the server
  599. Raises:
  600. AssertionError: if response does not match partition
  601. """
  602. timestamp_offset_map = {}
  603. for topic, part_data in response.topics:
  604. for partition_info in part_data:
  605. partition, error_code = partition_info[:2]
  606. partition = TopicPartition(topic, partition)
  607. error_type = Errors.for_code(error_code)
  608. if error_type is Errors.NoError:
  609. if response.API_VERSION == 0:
  610. offsets = partition_info[2]
  611. assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
  612. if not offsets:
  613. offset = UNKNOWN_OFFSET
  614. else:
  615. offset = offsets[0]
  616. log.debug("Handling v0 ListOffsetResponse response for %s. "
  617. "Fetched offset %s", partition, offset)
  618. if offset != UNKNOWN_OFFSET:
  619. timestamp_offset_map[partition] = (offset, None)
  620. else:
  621. timestamp, offset = partition_info[2:]
  622. log.debug("Handling ListOffsetResponse response for %s. "
  623. "Fetched offset %s, timestamp %s",
  624. partition, offset, timestamp)
  625. if offset != UNKNOWN_OFFSET:
  626. timestamp_offset_map[partition] = (offset, timestamp)
  627. elif error_type is Errors.UnsupportedForMessageFormatError:
  628. # The message format on the broker side is before 0.10.0,
  629. # we simply put None in the response.
  630. log.debug("Cannot search by timestamp for partition %s because the"
  631. " message format version is before 0.10.0", partition)
  632. elif error_type is Errors.NotLeaderForPartitionError:
  633. log.debug("Attempt to fetch offsets for partition %s failed due"
  634. " to obsolete leadership information, retrying.",
  635. partition)
  636. future.failure(error_type(partition))
  637. return
  638. elif error_type is Errors.UnknownTopicOrPartitionError:
  639. log.warn("Received unknown topic or partition error in ListOffset "
  640. "request for partition %s. The topic/partition " +
  641. "may not exist or the user may not have Describe access "
  642. "to it.", partition)
  643. future.failure(error_type(partition))
  644. return
  645. else:
  646. log.warning("Attempt to fetch offsets for partition %s failed due to:"
  647. " %s", partition, error_type)
  648. future.failure(error_type(partition))
  649. return
  650. if not future.is_done:
  651. future.success(timestamp_offset_map)
  652. def _fetchable_partitions(self):
  653. fetchable = self._subscriptions.fetchable_partitions()
  654. pending = set([part.topic_partition for part in self._records])
  655. return fetchable.difference(pending)
  656. def _create_fetch_requests(self):
  657. """Create fetch requests for all assigned partitions, grouped by node.
  658. FetchRequests skipped if no leader, or node has requests in flight
  659. Returns:
  660. dict: {node_id: FetchRequest, ...} (version depends on api_version)
  661. """
  662. # create the fetch info as a dict of lists of partition info tuples
  663. # which can be passed to FetchRequest() via .items()
  664. fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
  665. for partition in self._fetchable_partitions():
  666. node_id = self._client.cluster.leader_for_partition(partition)
  667. position = self._subscriptions.assignment[partition].position
  668. # fetch if there is a leader and no in-flight requests
  669. if node_id is None or node_id == -1:
  670. log.debug("No leader found for partition %s."
  671. " Requesting metadata update", partition)
  672. self._client.cluster.request_update()
  673. elif self._client.in_flight_request_count(node_id) == 0:
  674. partition_info = (
  675. partition.partition,
  676. position,
  677. self.config['max_partition_fetch_bytes']
  678. )
  679. fetchable[node_id][partition.topic].append(partition_info)
  680. log.debug("Adding fetch request for partition %s at offset %d",
  681. partition, position)
  682. if self.config['api_version'] >= (0, 10, 1):
  683. version = 3
  684. elif self.config['api_version'] >= (0, 10):
  685. version = 2
  686. elif self.config['api_version'] == (0, 9):
  687. version = 1
  688. else:
  689. version = 0
  690. requests = {}
  691. for node_id, partition_data in six.iteritems(fetchable):
  692. if version < 3:
  693. requests[node_id] = FetchRequest[version](
  694. -1, # replica_id
  695. self.config['fetch_max_wait_ms'],
  696. self.config['fetch_min_bytes'],
  697. partition_data.items())
  698. else:
  699. # As of version == 3 partitions will be returned in order as
  700. # they are requested, so to avoid starvation with
  701. # `fetch_max_bytes` option we need this shuffle
  702. # NOTE: we do have partition_data in random order due to usage
  703. # of unordered structures like dicts, but that does not
  704. # guarantee equal distribution, and starting in Python3.6
  705. # dicts retain insert order.
  706. partition_data = list(partition_data.items())
  707. random.shuffle(partition_data)
  708. requests[node_id] = FetchRequest[version](
  709. -1, # replica_id
  710. self.config['fetch_max_wait_ms'],
  711. self.config['fetch_min_bytes'],
  712. self.config['fetch_max_bytes'],
  713. partition_data)
  714. return requests
  715. def _handle_fetch_response(self, request, send_time, response):
  716. """The callback for fetch completion"""
  717. total_bytes = 0
  718. total_count = 0
  719. recv_time = time.time()
  720. fetch_offsets = {}
  721. for topic, partitions in request.topics:
  722. for partition, offset, _ in partitions:
  723. fetch_offsets[TopicPartition(topic, partition)] = offset
  724. # randomized ordering should improve balance for short-lived consumers
  725. random.shuffle(response.topics)
  726. for topic, partitions in response.topics:
  727. random.shuffle(partitions)
  728. for partition, error_code, highwater, messages in partitions:
  729. tp = TopicPartition(topic, partition)
  730. error_type = Errors.for_code(error_code)
  731. if not self._subscriptions.is_fetchable(tp):
  732. # this can happen when a rebalance happened or a partition
  733. # consumption paused while fetch is still in-flight
  734. log.debug("Ignoring fetched records for partition %s"
  735. " since it is no longer fetchable", tp)
  736. elif error_type is Errors.NoError:
  737. self._subscriptions.assignment[tp].highwater = highwater
  738. # we are interested in this fetch only if the beginning
  739. # offset (of the *request*) matches the current consumed position
  740. # Note that the *response* may return a messageset that starts
  741. # earlier (e.g., compressed messages) or later (e.g., compacted topic)
  742. fetch_offset = fetch_offsets[tp]
  743. position = self._subscriptions.assignment[tp].position
  744. if position is None or position != fetch_offset:
  745. log.debug("Discarding fetch response for partition %s"
  746. " since its offset %d does not match the"
  747. " expected offset %d", tp, fetch_offset,
  748. position)
  749. continue
  750. num_bytes = 0
  751. partial = None
  752. if messages and isinstance(messages[-1][-1], PartialMessage):
  753. partial = messages.pop()
  754. if messages:
  755. log.debug("Adding fetched record for partition %s with"
  756. " offset %d to buffered record list", tp,
  757. position)
  758. unpacked = list(self._unpack_message_set(tp, messages))
  759. self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked))
  760. last_offset, _, _ = messages[-1]
  761. self._sensors.records_fetch_lag.record(highwater - last_offset)
  762. num_bytes = sum(msg[1] for msg in messages)
  763. elif partial:
  764. # we did not read a single message from a non-empty
  765. # buffer because that message's size is larger than
  766. # fetch size, in this case record this exception
  767. self._record_too_large_partitions[tp] = fetch_offset
  768. self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
  769. total_bytes += num_bytes
  770. total_count += len(messages)
  771. elif error_type in (Errors.NotLeaderForPartitionError,
  772. Errors.UnknownTopicOrPartitionError):
  773. self._client.cluster.request_update()
  774. elif error_type is Errors.OffsetOutOfRangeError:
  775. fetch_offset = fetch_offsets[tp]
  776. log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp)
  777. if self._subscriptions.has_default_offset_reset_policy():
  778. self._subscriptions.need_offset_reset(tp)
  779. log.info("Resetting offset for topic-partition %s", tp)
  780. else:
  781. self._offset_out_of_range_partitions[tp] = fetch_offset
  782. elif error_type is Errors.TopicAuthorizationFailedError:
  783. log.warn("Not authorized to read from topic %s.", tp.topic)
  784. self._unauthorized_topics.add(tp.topic)
  785. elif error_type is Errors.UnknownError:
  786. log.warn("Unknown error fetching data for topic-partition %s", tp)
  787. else:
  788. raise error_type('Unexpected error while fetching data')
  789. # Because we are currently decompressing messages lazily, the sensors here
  790. # will get compressed bytes / message set stats when compression is enabled
  791. self._sensors.bytes_fetched.record(total_bytes)
  792. self._sensors.records_fetched.record(total_count)
  793. if response.API_VERSION >= 1:
  794. self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
  795. self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
  796. class PartitionRecords(object):
  797. def __init__(self, fetch_offset, tp, messages):
  798. self.fetch_offset = fetch_offset
  799. self.topic_partition = tp
  800. self.messages = messages
  801. # When fetching an offset that is in the middle of a
  802. # compressed batch, we will get all messages in the batch.
  803. # But we want to start 'take' at the fetch_offset
  804. for i, msg in enumerate(messages):
  805. if msg.offset == fetch_offset:
  806. self.message_idx = i
  807. def discard(self):
  808. self.messages = None
  809. def take(self, n):
  810. if not self.has_more():
  811. return []
  812. next_idx = self.message_idx + n
  813. res = self.messages[self.message_idx:next_idx]
  814. self.message_idx = next_idx
  815. if self.has_more():
  816. self.fetch_offset = self.messages[self.message_idx].offset
  817. return res
  818. def has_more(self):
  819. return self.messages and self.message_idx < len(self.messages)
  820. class FetchManagerMetrics(object):
  821. def __init__(self, metrics, prefix):
  822. self.metrics = metrics
  823. self.group_name = '%s-fetch-manager-metrics' % prefix
  824. self.bytes_fetched = metrics.sensor('bytes-fetched')
  825. self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
  826. 'The average number of bytes fetched per request'), Avg())
  827. self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name,
  828. 'The maximum number of bytes fetched per request'), Max())
  829. self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name,
  830. 'The average number of bytes consumed per second'), Rate())
  831. self.records_fetched = self.metrics.sensor('records-fetched')
  832. self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name,
  833. 'The average number of records in each request'), Avg())
  834. self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name,
  835. 'The average number of records consumed per second'), Rate())
  836. self.fetch_latency = metrics.sensor('fetch-latency')
  837. self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name,
  838. 'The average time taken for a fetch request.'), Avg())
  839. self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name,
  840. 'The max time taken for any fetch request.'), Max())
  841. self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name,
  842. 'The number of fetch requests per second.'), Rate(sampled_stat=Count()))
  843. self.records_fetch_lag = metrics.sensor('records-lag')
  844. self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name,
  845. 'The maximum lag in terms of number of records for any partition in self window'), Max())
  846. self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time')
  847. self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name,
  848. 'The average throttle time in ms'), Avg())
  849. self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name,
  850. 'The maximum throttle time in ms'), Max())
  851. def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
  852. # record bytes fetched
  853. name = '.'.join(['topic', topic, 'bytes-fetched'])
  854. bytes_fetched = self.metrics.get_sensor(name)
  855. if not bytes_fetched:
  856. metric_tags = {'topic': topic.replace('.', '_')}
  857. bytes_fetched = self.metrics.sensor(name)
  858. bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
  859. self.group_name,
  860. 'The average number of bytes fetched per request for topic %s' % topic,
  861. metric_tags), Avg())
  862. bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
  863. self.group_name,
  864. 'The maximum number of bytes fetched per request for topic %s' % topic,
  865. metric_tags), Max())
  866. bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
  867. self.group_name,
  868. 'The average number of bytes consumed per second for topic %s' % topic,
  869. metric_tags), Rate())
  870. bytes_fetched.record(num_bytes)
  871. # record records fetched
  872. name = '.'.join(['topic', topic, 'records-fetched'])
  873. records_fetched = self.metrics.get_sensor(name)
  874. if not records_fetched:
  875. metric_tags = {'topic': topic.replace('.', '_')}
  876. records_fetched = self.metrics.sensor(name)
  877. records_fetched.add(self.metrics.metric_name('records-per-request-avg',
  878. self.group_name,
  879. 'The average number of records in each request for topic %s' % topic,
  880. metric_tags), Avg())
  881. records_fetched.add(self.metrics.metric_name('records-consumed-rate',
  882. self.group_name,
  883. 'The average number of records consumed per second for topic %s' % topic,
  884. metric_tags), Rate())
  885. records_fetched.record(num_records)