123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029 |
- from __future__ import absolute_import
- import collections
- import copy
- import logging
- import random
- import sys
- import time
- from kafka.vendor import six
- import kafka.errors as Errors
- from kafka.future import Future
- from kafka.metrics.stats import Avg, Count, Max, Rate
- from kafka.protocol.fetch import FetchRequest
- from kafka.protocol.message import PartialMessage
- from kafka.protocol.offset import (
- OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
- )
- from kafka.serializer import Deserializer
- from kafka.structs import TopicPartition, OffsetAndTimestamp
- log = logging.getLogger(__name__)
- ConsumerRecord = collections.namedtuple("ConsumerRecord",
- ["topic", "partition", "offset", "timestamp", "timestamp_type",
- "key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
- class NoOffsetForPartitionError(Errors.KafkaError):
- pass
- class RecordTooLargeError(Errors.KafkaError):
- pass
- class Fetcher(six.Iterator):
- DEFAULT_CONFIG = {
- 'key_deserializer': None,
- 'value_deserializer': None,
- 'fetch_min_bytes': 1,
- 'fetch_max_wait_ms': 500,
- 'fetch_max_bytes': 52428800,
- 'max_partition_fetch_bytes': 1048576,
- 'max_poll_records': sys.maxsize,
- 'check_crcs': True,
- 'skip_double_compressed_messages': False,
- 'iterator_refetch_records': 1, # undocumented -- interface may change
- 'metric_group_prefix': 'consumer',
- 'api_version': (0, 8, 0),
- 'retry_backoff_ms': 100
- }
- def __init__(self, client, subscriptions, metrics, **configs):
- """Initialize a Kafka Message Fetcher.
- Keyword Arguments:
- key_deserializer (callable): Any callable that takes a
- raw message key and returns a deserialized key.
- value_deserializer (callable, optional): Any callable that takes a
- raw message value and returns a deserialized value.
- fetch_min_bytes (int): Minimum amount of data the server should
- return for a fetch request, otherwise wait up to
- fetch_max_wait_ms for more data to accumulate. Default: 1.
- fetch_max_wait_ms (int): The maximum amount of time in milliseconds
- the server will block before answering the fetch request if
- there isn't sufficient data to immediately satisfy the
- requirement given by fetch_min_bytes. Default: 500.
- fetch_max_bytes (int): The maximum amount of data the server should
- return for a fetch request. This is not an absolute maximum, if
- the first message in the first non-empty partition of the fetch
- is larger than this value, the message will still be returned
- to ensure that the consumer can make progress. NOTE: consumer
- performs fetches to multiple brokers in parallel so memory
- usage will depend on the number of brokers containing
- partitions for the topic.
- Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
- max_partition_fetch_bytes (int): The maximum amount of data
- per-partition the server will return. The maximum total memory
- used for a request = #partitions * max_partition_fetch_bytes.
- This size must be at least as large as the maximum message size
- the server allows or else it is possible for the producer to
- send messages larger than the consumer can fetch. If that
- happens, the consumer can get stuck trying to fetch a large
- message on a certain partition. Default: 1048576.
- check_crcs (bool): Automatically check the CRC32 of the records
- consumed. This ensures no on-the-wire or on-disk corruption to
- the messages occurred. This check adds some overhead, so it may
- be disabled in cases seeking extreme performance. Default: True
- skip_double_compressed_messages (bool): A bug in KafkaProducer
- caused some messages to be corrupted via double-compression.
- By default, the fetcher will return the messages as a compressed
- blob of bytes with a single offset, i.e. how the message was
- actually published to the cluster. If you prefer to have the
- fetcher automatically detect corrupt messages and skip them,
- set this option to True. Default: False.
- """
- self.config = copy.copy(self.DEFAULT_CONFIG)
- for key in self.config:
- if key in configs:
- self.config[key] = configs[key]
- self._client = client
- self._subscriptions = subscriptions
- self._records = collections.deque() # (offset, topic_partition, messages)
- self._unauthorized_topics = set()
- self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
- self._record_too_large_partitions = dict() # {topic_partition: offset}
- self._iterator = None
- self._fetch_futures = collections.deque()
- self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
- def send_fetches(self):
- """Send FetchRequests asynchronously for all assigned partitions.
- Note: noop if there are unconsumed records internal to the fetcher
- Returns:
- List of Futures: each future resolves to a FetchResponse
- """
- futures = []
- for node_id, request in six.iteritems(self._create_fetch_requests()):
- if self._client.ready(node_id):
- log.debug("Sending FetchRequest to node %s", node_id)
- future = self._client.send(node_id, request)
- future.error_on_callbacks=True
- future.add_callback(self._handle_fetch_response, request, time.time())
- future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
- futures.append(future)
- self._fetch_futures.extend(futures)
- self._clean_done_fetch_futures()
- return futures
- def _clean_done_fetch_futures(self):
- while True:
- if not self._fetch_futures:
- break
- if not self._fetch_futures[0].is_done:
- break
- self._fetch_futures.popleft()
- def in_flight_fetches(self):
- """Return True if there are any unprocessed FetchRequests in flight."""
- self._clean_done_fetch_futures()
- return bool(self._fetch_futures)
- def update_fetch_positions(self, partitions):
- """Update the fetch positions for the provided partitions.
- Arguments:
- partitions (list of TopicPartitions): partitions to update
- Raises:
- NoOffsetForPartitionError: if no offset is stored for a given
- partition and no reset policy is available
- """
- # reset the fetch position to the committed position
- for tp in partitions:
- if not self._subscriptions.is_assigned(tp):
- log.warning("partition %s is not assigned - skipping offset"
- " update", tp)
- continue
- elif self._subscriptions.is_fetchable(tp):
- log.warning("partition %s is still fetchable -- skipping offset"
- " update", tp)
- continue
- # TODO: If there are several offsets to reset,
- # we could submit offset requests in parallel
- # for now, each call to _reset_offset will block
- if self._subscriptions.is_offset_reset_needed(tp):
- self._reset_offset(tp)
- elif self._subscriptions.assignment[tp].committed is None:
- # there's no committed position, so we need to reset with the
- # default strategy
- self._subscriptions.need_offset_reset(tp)
- self._reset_offset(tp)
- else:
- committed = self._subscriptions.assignment[tp].committed
- log.debug("Resetting offset for partition %s to the committed"
- " offset %s", tp, committed)
- self._subscriptions.seek(tp, committed)
- def get_offsets_by_times(self, timestamps, timeout_ms):
- offsets = self._retrieve_offsets(timestamps, timeout_ms)
- for tp in timestamps:
- if tp not in offsets:
- offsets[tp] = None
- else:
- offset, timestamp = offsets[tp]
- offsets[tp] = OffsetAndTimestamp(offset, timestamp)
- return offsets
- def beginning_offsets(self, partitions, timeout_ms):
- return self.beginning_or_end_offset(
- partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
- def end_offsets(self, partitions, timeout_ms):
- return self.beginning_or_end_offset(
- partitions, OffsetResetStrategy.LATEST, timeout_ms)
- def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
- timestamps = dict([(tp, timestamp) for tp in partitions])
- offsets = self._retrieve_offsets(timestamps, timeout_ms)
- for tp in timestamps:
- offsets[tp] = offsets[tp][0]
- return offsets
- def _reset_offset(self, partition):
- """Reset offsets for the given partition using the offset reset strategy.
- Arguments:
- partition (TopicPartition): the partition that needs reset offset
- Raises:
- NoOffsetForPartitionError: if no offset reset strategy is defined
- """
- timestamp = self._subscriptions.assignment[partition].reset_strategy
- if timestamp is OffsetResetStrategy.EARLIEST:
- strategy = 'earliest'
- elif timestamp is OffsetResetStrategy.LATEST:
- strategy = 'latest'
- else:
- raise NoOffsetForPartitionError(partition)
- log.debug("Resetting offset for partition %s to %s offset.",
- partition, strategy)
- offsets = self._retrieve_offsets({partition: timestamp})
- if partition not in offsets:
- raise NoOffsetForPartitionError(partition)
- offset = offsets[partition][0]
- # we might lose the assignment while fetching the offset,
- # so check it is still active
- if self._subscriptions.is_assigned(partition):
- self._subscriptions.seek(partition, offset)
- def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
- """Fetch offset for each partition passed in ``timestamps`` map.
- Blocks until offsets are obtained, a non-retriable exception is raised
- or ``timeout_ms`` passed.
- Arguments:
- timestamps: {TopicPartition: int} dict with timestamps to fetch
- offsets by. -1 for the latest available, -2 for the earliest
- available. Otherwise timestamp is treated as epoch miliseconds.
- Returns:
- {TopicPartition: (int, int)}: Mapping of partition to
- retrieved offset and timestamp. If offset does not exist for
- the provided timestamp, that partition will be missing from
- this mapping.
- """
- if not timestamps:
- return {}
- start_time = time.time()
- remaining_ms = timeout_ms
- while remaining_ms > 0:
- future = self._send_offset_requests(timestamps)
- self._client.poll(future=future, timeout_ms=remaining_ms)
- if future.succeeded():
- return future.value
- if not future.retriable():
- raise future.exception # pylint: disable-msg=raising-bad-type
- elapsed_ms = (time.time() - start_time) * 1000
- remaining_ms = timeout_ms - elapsed_ms
- if remaining_ms < 0:
- break
- if future.exception.invalid_metadata:
- refresh_future = self._client.cluster.request_update()
- self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
- else:
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
- elapsed_ms = (time.time() - start_time) * 1000
- remaining_ms = timeout_ms - elapsed_ms
- raise Errors.KafkaTimeoutError(
- "Failed to get offsets by timestamps in %s ms" % timeout_ms)
- def _raise_if_offset_out_of_range(self):
- """Check FetchResponses for offset out of range.
- Raises:
- OffsetOutOfRangeError: if any partition from previous FetchResponse
- contains OffsetOutOfRangeError and the default_reset_policy is
- None
- """
- if not self._offset_out_of_range_partitions:
- return
- current_out_of_range_partitions = {}
- # filter only the fetchable partitions
- for partition, offset in six.iteritems(self._offset_out_of_range_partitions):
- if not self._subscriptions.is_fetchable(partition):
- log.debug("Ignoring fetched records for %s since it is no"
- " longer fetchable", partition)
- continue
- position = self._subscriptions.assignment[partition].position
- # ignore partition if the current position != offset in FetchResponse
- # e.g. after seek()
- if position is not None and offset == position:
- current_out_of_range_partitions[partition] = position
- self._offset_out_of_range_partitions.clear()
- if current_out_of_range_partitions:
- raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions)
- def _raise_if_unauthorized_topics(self):
- """Check FetchResponses for topic authorization failures.
- Raises:
- TopicAuthorizationFailedError
- """
- if self._unauthorized_topics:
- topics = set(self._unauthorized_topics)
- self._unauthorized_topics.clear()
- raise Errors.TopicAuthorizationFailedError(topics)
- def _raise_if_record_too_large(self):
- """Check FetchResponses for messages larger than the max per partition.
- Raises:
- RecordTooLargeError: if there is a message larger than fetch size
- """
- if not self._record_too_large_partitions:
- return
- copied_record_too_large_partitions = dict(self._record_too_large_partitions)
- self._record_too_large_partitions.clear()
- raise RecordTooLargeError(
- "There are some messages at [Partition=Offset]: %s "
- " whose size is larger than the fetch size %s"
- " and hence cannot be ever returned."
- " Increase the fetch size, or decrease the maximum message"
- " size the broker will allow.",
- copied_record_too_large_partitions,
- self.config['max_partition_fetch_bytes'])
- def fetched_records(self, max_records=None):
- """Returns previously fetched records and updates consumed offsets.
- Arguments:
- max_records (int): Maximum number of records returned. Defaults
- to max_poll_records configuration.
- Raises:
- OffsetOutOfRangeError: if no subscription offset_reset_strategy
- InvalidMessageError: if message crc validation fails (check_crcs
- must be set to True)
- RecordTooLargeError: if a message is larger than the currently
- configured max_partition_fetch_bytes
- TopicAuthorizationError: if consumer is not authorized to fetch
- messages from the topic
- Returns: (records (dict), partial (bool))
- records: {TopicPartition: [messages]}
- partial: True if records returned did not fully drain any pending
- partition requests. This may be useful for choosing when to
- pipeline additional fetch requests.
- """
- if max_records is None:
- max_records = self.config['max_poll_records']
- assert max_records > 0
- if self._subscriptions.needs_partition_assignment:
- return {}, False
- self._raise_if_offset_out_of_range()
- self._raise_if_unauthorized_topics()
- self._raise_if_record_too_large()
- drained = collections.defaultdict(list)
- partial = bool(self._records and max_records)
- while self._records and max_records > 0:
- part = self._records.popleft()
- max_records -= self._append(drained, part, max_records)
- if part.has_more():
- self._records.appendleft(part)
- else:
- partial &= False
- return dict(drained), partial
- def _append(self, drained, part, max_records):
- tp = part.topic_partition
- fetch_offset = part.fetch_offset
- if not self._subscriptions.is_assigned(tp):
- # this can happen when a rebalance happened before
- # fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
- else:
- # note that the position should always be available
- # as long as the partition is still assigned
- position = self._subscriptions.assignment[tp].position
- if not self._subscriptions.is_fetchable(tp):
- # this can happen when a partition is paused before
- # fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
- elif fetch_offset == position:
- part_records = part.take(max_records)
- if not part_records:
- return 0
- next_offset = part_records[-1].offset + 1
- log.log(0, "Returning fetched records at offset %d for assigned"
- " partition %s and update position to %s", position,
- tp, next_offset)
- for record in part_records:
- # Fetched compressed messages may include additional records
- if record.offset < fetch_offset:
- log.debug("Skipping message offset: %s (expecting %s)",
- record.offset, fetch_offset)
- continue
- drained[tp].append(record)
- self._subscriptions.assignment[tp].position = next_offset
- return len(part_records)
- else:
- # these records aren't next in line based on the last consumed
- # position, ignore them they must be from an obsolete request
- log.debug("Ignoring fetched records for %s at offset %s since"
- " the current position is %d", tp, part.fetch_offset,
- position)
- part.discard()
- return 0
- def _message_generator(self):
- """Iterate over fetched_records"""
- if self._subscriptions.needs_partition_assignment:
- raise StopIteration('Subscription needs partition assignment')
- while self._records:
- # Check on each iteration since this is a generator
- self._raise_if_offset_out_of_range()
- self._raise_if_unauthorized_topics()
- self._raise_if_record_too_large()
- # Send additional FetchRequests when the internal queue is low
- # this should enable moderate pipelining
- if len(self._records) <= self.config['iterator_refetch_records']:
- self.send_fetches()
- part = self._records.popleft()
- tp = part.topic_partition
- fetch_offset = part.fetch_offset
- if not self._subscriptions.is_assigned(tp):
- # this can happen when a rebalance happened before
- # fetched records are returned
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
- continue
- # note that the position should always be available
- # as long as the partition is still assigned
- position = self._subscriptions.assignment[tp].position
- if not self._subscriptions.is_fetchable(tp):
- # this can happen when a partition is paused before
- # fetched records are returned
- log.debug("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
- elif fetch_offset == position:
- log.log(0, "Returning fetched records at offset %d for assigned"
- " partition %s", position, tp)
- # We can ignore any prior signal to drop pending message sets
- # because we are starting from a fresh one where fetch_offset == position
- # i.e., the user seek()'d to this position
- self._subscriptions.assignment[tp].drop_pending_message_set = False
- for msg in part.messages:
- # Because we are in a generator, it is possible for
- # subscription state to change between yield calls
- # so we need to re-check on each loop
- # this should catch assignment changes, pauses
- # and resets via seek_to_beginning / seek_to_end
- if not self._subscriptions.is_fetchable(tp):
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer fetchable", tp)
- break
- # If there is a seek during message iteration,
- # we should stop unpacking this message set and
- # wait for a new fetch response that aligns with the
- # new seek position
- elif self._subscriptions.assignment[tp].drop_pending_message_set:
- log.debug("Skipping remainder of message set for partition %s", tp)
- self._subscriptions.assignment[tp].drop_pending_message_set = False
- break
- # Compressed messagesets may include earlier messages
- elif msg.offset < self._subscriptions.assignment[tp].position:
- log.debug("Skipping message offset: %s (expecting %s)",
- msg.offset,
- self._subscriptions.assignment[tp].position)
- continue
- self._subscriptions.assignment[tp].position = msg.offset + 1
- yield msg
- else:
- # these records aren't next in line based on the last consumed
- # position, ignore them they must be from an obsolete request
- log.debug("Ignoring fetched records for %s at offset %s since"
- " the current position is %d", tp, part.fetch_offset,
- position)
- def _unpack_message_set(self, tp, messages):
- try:
- for offset, size, msg in messages:
- if self.config['check_crcs'] and not msg.validate_crc():
- raise Errors.InvalidMessageError(msg)
- elif msg.is_compressed():
- # If relative offset is used, we need to decompress the entire message first to compute
- # the absolute offset.
- inner_mset = msg.decompress()
- # There should only ever be a single layer of compression
- if inner_mset[0][-1].is_compressed():
- log.warning('MessageSet at %s offset %d appears '
- ' double-compressed. This should not'
- ' happen -- check your producers!',
- tp, offset)
- if self.config['skip_double_compressed_messages']:
- log.warning('Skipping double-compressed message at'
- ' %s %d', tp, offset)
- continue
- if msg.magic > 0:
- last_offset, _, _ = inner_mset[-1]
- absolute_base_offset = offset - last_offset
- else:
- absolute_base_offset = -1
- for inner_offset, inner_size, inner_msg in inner_mset:
- if msg.magic > 0:
- # When magic value is greater than 0, the timestamp
- # of a compressed message depends on the
- # typestamp type of the wrapper message:
- if msg.timestamp_type == 0: # CREATE_TIME (0)
- inner_timestamp = inner_msg.timestamp
- elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
- inner_timestamp = msg.timestamp
- else:
- raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type))
- else:
- inner_timestamp = msg.timestamp
- if absolute_base_offset >= 0:
- inner_offset += absolute_base_offset
- key = self._deserialize(
- self.config['key_deserializer'],
- tp.topic, inner_msg.key)
- value = self._deserialize(
- self.config['value_deserializer'],
- tp.topic, inner_msg.value)
- yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
- inner_timestamp, msg.timestamp_type,
- key, value, inner_msg.crc,
- len(inner_msg.key) if inner_msg.key is not None else -1,
- len(inner_msg.value) if inner_msg.value is not None else -1)
- else:
- key = self._deserialize(
- self.config['key_deserializer'],
- tp.topic, msg.key)
- value = self._deserialize(
- self.config['value_deserializer'],
- tp.topic, msg.value)
- yield ConsumerRecord(tp.topic, tp.partition, offset,
- msg.timestamp, msg.timestamp_type,
- key, value, msg.crc,
- len(msg.key) if msg.key is not None else -1,
- len(msg.value) if msg.value is not None else -1)
- # If unpacking raises StopIteration, it is erroneously
- # caught by the generator. We want all exceptions to be raised
- # back to the user. See Issue 545
- except StopIteration as e:
- log.exception('StopIteration raised unpacking messageset: %s', e)
- raise Exception('StopIteration raised unpacking messageset')
- # If unpacking raises AssertionError, it means decompression unsupported
- # See Issue 1033
- except AssertionError as e:
- log.exception('AssertionError raised unpacking messageset: %s', e)
- raise
- def __iter__(self): # pylint: disable=non-iterator-returned
- return self
- def __next__(self):
- if not self._iterator:
- self._iterator = self._message_generator()
- try:
- return next(self._iterator)
- except StopIteration:
- self._iterator = None
- raise
- def _deserialize(self, f, topic, bytes_):
- if not f:
- return bytes_
- if isinstance(f, Deserializer):
- return f.deserialize(topic, bytes_)
- return f(bytes_)
- def _send_offset_requests(self, timestamps):
- """Fetch offsets for each partition in timestamps dict. This may send
- request to multiple nodes, based on who is Leader for partition.
- Arguments:
- timestamps (dict): {TopicPartition: int} mapping of fetching
- timestamps.
- Returns:
- Future: resolves to a mapping of retrieved offsets
- """
- timestamps_by_node = collections.defaultdict(dict)
- for partition, timestamp in six.iteritems(timestamps):
- node_id = self._client.cluster.leader_for_partition(partition)
- if node_id is None:
- self._client.add_topic(partition.topic)
- log.debug("Partition %s is unknown for fetching offset,"
- " wait for metadata refresh", partition)
- return Future().failure(Errors.StaleMetadata(partition))
- elif node_id == -1:
- log.debug("Leader for partition %s unavailable for fetching "
- "offset, wait for metadata refresh", partition)
- return Future().failure(
- Errors.LeaderNotAvailableError(partition))
- else:
- timestamps_by_node[node_id][partition] = timestamp
- # Aggregate results until we have all
- list_offsets_future = Future()
- responses = []
- node_count = len(timestamps_by_node)
- def on_success(value):
- responses.append(value)
- if len(responses) == node_count:
- offsets = {}
- for r in responses:
- offsets.update(r)
- list_offsets_future.success(offsets)
- def on_fail(err):
- if not list_offsets_future.is_done:
- list_offsets_future.failure(err)
- for node_id, timestamps in six.iteritems(timestamps_by_node):
- _f = self._send_offset_request(node_id, timestamps)
- _f.add_callback(on_success)
- _f.add_errback(on_fail)
- return list_offsets_future
- def _send_offset_request(self, node_id, timestamps):
- by_topic = collections.defaultdict(list)
- for tp, timestamp in six.iteritems(timestamps):
- if self.config['api_version'] >= (0, 10, 1):
- data = (tp.partition, timestamp)
- else:
- data = (tp.partition, timestamp, 1)
- by_topic[tp.topic].append(data)
- if self.config['api_version'] >= (0, 10, 1):
- request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
- else:
- request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))
- # Client returns a future that only fails on network issues
- # so create a separate future and attach a callback to update it
- # based on response error codes
- future = Future()
- _f = self._client.send(node_id, request)
- _f.add_callback(self._handle_offset_response, future)
- _f.add_errback(lambda e: future.failure(e))
- return future
- def _handle_offset_response(self, future, response):
- """Callback for the response of the list offset call above.
- Arguments:
- future (Future): the future to update based on response
- response (OffsetResponse): response from the server
- Raises:
- AssertionError: if response does not match partition
- """
- timestamp_offset_map = {}
- for topic, part_data in response.topics:
- for partition_info in part_data:
- partition, error_code = partition_info[:2]
- partition = TopicPartition(topic, partition)
- error_type = Errors.for_code(error_code)
- if error_type is Errors.NoError:
- if response.API_VERSION == 0:
- offsets = partition_info[2]
- assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
- if not offsets:
- offset = UNKNOWN_OFFSET
- else:
- offset = offsets[0]
- log.debug("Handling v0 ListOffsetResponse response for %s. "
- "Fetched offset %s", partition, offset)
- if offset != UNKNOWN_OFFSET:
- timestamp_offset_map[partition] = (offset, None)
- else:
- timestamp, offset = partition_info[2:]
- log.debug("Handling ListOffsetResponse response for %s. "
- "Fetched offset %s, timestamp %s",
- partition, offset, timestamp)
- if offset != UNKNOWN_OFFSET:
- timestamp_offset_map[partition] = (offset, timestamp)
- elif error_type is Errors.UnsupportedForMessageFormatError:
- # The message format on the broker side is before 0.10.0,
- # we simply put None in the response.
- log.debug("Cannot search by timestamp for partition %s because the"
- " message format version is before 0.10.0", partition)
- elif error_type is Errors.NotLeaderForPartitionError:
- log.debug("Attempt to fetch offsets for partition %s failed due"
- " to obsolete leadership information, retrying.",
- partition)
- future.failure(error_type(partition))
- return
- elif error_type is Errors.UnknownTopicOrPartitionError:
- log.warn("Received unknown topic or partition error in ListOffset "
- "request for partition %s. The topic/partition " +
- "may not exist or the user may not have Describe access "
- "to it.", partition)
- future.failure(error_type(partition))
- return
- else:
- log.warning("Attempt to fetch offsets for partition %s failed due to:"
- " %s", partition, error_type)
- future.failure(error_type(partition))
- return
- if not future.is_done:
- future.success(timestamp_offset_map)
- def _fetchable_partitions(self):
- fetchable = self._subscriptions.fetchable_partitions()
- pending = set([part.topic_partition for part in self._records])
- return fetchable.difference(pending)
- def _create_fetch_requests(self):
- """Create fetch requests for all assigned partitions, grouped by node.
- FetchRequests skipped if no leader, or node has requests in flight
- Returns:
- dict: {node_id: FetchRequest, ...} (version depends on api_version)
- """
- # create the fetch info as a dict of lists of partition info tuples
- # which can be passed to FetchRequest() via .items()
- fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
- for partition in self._fetchable_partitions():
- node_id = self._client.cluster.leader_for_partition(partition)
- position = self._subscriptions.assignment[partition].position
- # fetch if there is a leader and no in-flight requests
- if node_id is None or node_id == -1:
- log.debug("No leader found for partition %s."
- " Requesting metadata update", partition)
- self._client.cluster.request_update()
- elif self._client.in_flight_request_count(node_id) == 0:
- partition_info = (
- partition.partition,
- position,
- self.config['max_partition_fetch_bytes']
- )
- fetchable[node_id][partition.topic].append(partition_info)
- log.debug("Adding fetch request for partition %s at offset %d",
- partition, position)
- if self.config['api_version'] >= (0, 10, 1):
- version = 3
- elif self.config['api_version'] >= (0, 10):
- version = 2
- elif self.config['api_version'] == (0, 9):
- version = 1
- else:
- version = 0
- requests = {}
- for node_id, partition_data in six.iteritems(fetchable):
- if version < 3:
- requests[node_id] = FetchRequest[version](
- -1, # replica_id
- self.config['fetch_max_wait_ms'],
- self.config['fetch_min_bytes'],
- partition_data.items())
- else:
- # As of version == 3 partitions will be returned in order as
- # they are requested, so to avoid starvation with
- # `fetch_max_bytes` option we need this shuffle
- # NOTE: we do have partition_data in random order due to usage
- # of unordered structures like dicts, but that does not
- # guarantee equal distribution, and starting in Python3.6
- # dicts retain insert order.
- partition_data = list(partition_data.items())
- random.shuffle(partition_data)
- requests[node_id] = FetchRequest[version](
- -1, # replica_id
- self.config['fetch_max_wait_ms'],
- self.config['fetch_min_bytes'],
- self.config['fetch_max_bytes'],
- partition_data)
- return requests
- def _handle_fetch_response(self, request, send_time, response):
- """The callback for fetch completion"""
- total_bytes = 0
- total_count = 0
- recv_time = time.time()
- fetch_offsets = {}
- for topic, partitions in request.topics:
- for partition, offset, _ in partitions:
- fetch_offsets[TopicPartition(topic, partition)] = offset
- # randomized ordering should improve balance for short-lived consumers
- random.shuffle(response.topics)
- for topic, partitions in response.topics:
- random.shuffle(partitions)
- for partition, error_code, highwater, messages in partitions:
- tp = TopicPartition(topic, partition)
- error_type = Errors.for_code(error_code)
- if not self._subscriptions.is_fetchable(tp):
- # this can happen when a rebalance happened or a partition
- # consumption paused while fetch is still in-flight
- log.debug("Ignoring fetched records for partition %s"
- " since it is no longer fetchable", tp)
- elif error_type is Errors.NoError:
- self._subscriptions.assignment[tp].highwater = highwater
- # we are interested in this fetch only if the beginning
- # offset (of the *request*) matches the current consumed position
- # Note that the *response* may return a messageset that starts
- # earlier (e.g., compressed messages) or later (e.g., compacted topic)
- fetch_offset = fetch_offsets[tp]
- position = self._subscriptions.assignment[tp].position
- if position is None or position != fetch_offset:
- log.debug("Discarding fetch response for partition %s"
- " since its offset %d does not match the"
- " expected offset %d", tp, fetch_offset,
- position)
- continue
- num_bytes = 0
- partial = None
- if messages and isinstance(messages[-1][-1], PartialMessage):
- partial = messages.pop()
- if messages:
- log.debug("Adding fetched record for partition %s with"
- " offset %d to buffered record list", tp,
- position)
- unpacked = list(self._unpack_message_set(tp, messages))
- self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked))
- last_offset, _, _ = messages[-1]
- self._sensors.records_fetch_lag.record(highwater - last_offset)
- num_bytes = sum(msg[1] for msg in messages)
- elif partial:
- # we did not read a single message from a non-empty
- # buffer because that message's size is larger than
- # fetch size, in this case record this exception
- self._record_too_large_partitions[tp] = fetch_offset
- self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
- total_bytes += num_bytes
- total_count += len(messages)
- elif error_type in (Errors.NotLeaderForPartitionError,
- Errors.UnknownTopicOrPartitionError):
- self._client.cluster.request_update()
- elif error_type is Errors.OffsetOutOfRangeError:
- fetch_offset = fetch_offsets[tp]
- log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp)
- if self._subscriptions.has_default_offset_reset_policy():
- self._subscriptions.need_offset_reset(tp)
- log.info("Resetting offset for topic-partition %s", tp)
- else:
- self._offset_out_of_range_partitions[tp] = fetch_offset
- elif error_type is Errors.TopicAuthorizationFailedError:
- log.warn("Not authorized to read from topic %s.", tp.topic)
- self._unauthorized_topics.add(tp.topic)
- elif error_type is Errors.UnknownError:
- log.warn("Unknown error fetching data for topic-partition %s", tp)
- else:
- raise error_type('Unexpected error while fetching data')
- # Because we are currently decompressing messages lazily, the sensors here
- # will get compressed bytes / message set stats when compression is enabled
- self._sensors.bytes_fetched.record(total_bytes)
- self._sensors.records_fetched.record(total_count)
- if response.API_VERSION >= 1:
- self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
- self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
- class PartitionRecords(object):
- def __init__(self, fetch_offset, tp, messages):
- self.fetch_offset = fetch_offset
- self.topic_partition = tp
- self.messages = messages
- # When fetching an offset that is in the middle of a
- # compressed batch, we will get all messages in the batch.
- # But we want to start 'take' at the fetch_offset
- for i, msg in enumerate(messages):
- if msg.offset == fetch_offset:
- self.message_idx = i
- def discard(self):
- self.messages = None
- def take(self, n):
- if not self.has_more():
- return []
- next_idx = self.message_idx + n
- res = self.messages[self.message_idx:next_idx]
- self.message_idx = next_idx
- if self.has_more():
- self.fetch_offset = self.messages[self.message_idx].offset
- return res
- def has_more(self):
- return self.messages and self.message_idx < len(self.messages)
- class FetchManagerMetrics(object):
- def __init__(self, metrics, prefix):
- self.metrics = metrics
- self.group_name = '%s-fetch-manager-metrics' % prefix
- self.bytes_fetched = metrics.sensor('bytes-fetched')
- self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
- 'The average number of bytes fetched per request'), Avg())
- self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name,
- 'The maximum number of bytes fetched per request'), Max())
- self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name,
- 'The average number of bytes consumed per second'), Rate())
- self.records_fetched = self.metrics.sensor('records-fetched')
- self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name,
- 'The average number of records in each request'), Avg())
- self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name,
- 'The average number of records consumed per second'), Rate())
- self.fetch_latency = metrics.sensor('fetch-latency')
- self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name,
- 'The average time taken for a fetch request.'), Avg())
- self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name,
- 'The max time taken for any fetch request.'), Max())
- self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name,
- 'The number of fetch requests per second.'), Rate(sampled_stat=Count()))
- self.records_fetch_lag = metrics.sensor('records-lag')
- self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name,
- 'The maximum lag in terms of number of records for any partition in self window'), Max())
- self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time')
- self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name,
- 'The average throttle time in ms'), Avg())
- self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name,
- 'The maximum throttle time in ms'), Max())
- def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
- # record bytes fetched
- name = '.'.join(['topic', topic, 'bytes-fetched'])
- bytes_fetched = self.metrics.get_sensor(name)
- if not bytes_fetched:
- metric_tags = {'topic': topic.replace('.', '_')}
- bytes_fetched = self.metrics.sensor(name)
- bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
- self.group_name,
- 'The average number of bytes fetched per request for topic %s' % topic,
- metric_tags), Avg())
- bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
- self.group_name,
- 'The maximum number of bytes fetched per request for topic %s' % topic,
- metric_tags), Max())
- bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
- self.group_name,
- 'The average number of bytes consumed per second for topic %s' % topic,
- metric_tags), Rate())
- bytes_fetched.record(num_bytes)
- # record records fetched
- name = '.'.join(['topic', topic, 'records-fetched'])
- records_fetched = self.metrics.get_sensor(name)
- if not records_fetched:
- metric_tags = {'topic': topic.replace('.', '_')}
- records_fetched = self.metrics.sensor(name)
- records_fetched.add(self.metrics.metric_name('records-per-request-avg',
- self.group_name,
- 'The average number of records in each request for topic %s' % topic,
- metric_tags), Avg())
- records_fetched.add(self.metrics.metric_name('records-consumed-rate',
- self.group_name,
- 'The average number of records consumed per second for topic %s' % topic,
- metric_tags), Rate())
- records_fetched.record(num_records)
|