123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- from __future__ import absolute_import
- try:
- from itertools import zip_longest as izip_longest, repeat # pylint: disable=E0611
- except ImportError:
- from itertools import izip_longest as izip_longest, repeat # pylint: disable=E0611
- import logging
- import sys
- import time
- import warnings
- from kafka.vendor import six
- from kafka.vendor.six.moves import queue # pylint: disable=import-error
- from .base import (
- Consumer,
- FETCH_DEFAULT_BLOCK_TIMEOUT,
- AUTO_COMMIT_MSG_COUNT,
- AUTO_COMMIT_INTERVAL,
- FETCH_MIN_BYTES,
- FETCH_BUFFER_SIZE_BYTES,
- MAX_FETCH_BUFFER_SIZE_BYTES,
- FETCH_MAX_WAIT_TIME,
- ITER_TIMEOUT_SECONDS,
- NO_MESSAGES_WAIT_TIME_SECONDS
- )
- from ..common import (
- FetchRequestPayload, KafkaError, OffsetRequestPayload,
- ConsumerFetchSizeTooSmall,
- UnknownTopicOrPartitionError, NotLeaderForPartitionError,
- OffsetOutOfRangeError, FailedPayloadsError, check_error
- )
- from kafka.protocol.message import PartialMessage
- log = logging.getLogger(__name__)
- class FetchContext(object):
- """
- Class for managing the state of a consumer during fetch
- """
- def __init__(self, consumer, block, timeout):
- warnings.warn('deprecated - this class will be removed in a future'
- ' release', DeprecationWarning)
- self.consumer = consumer
- self.block = block
- if block:
- if not timeout:
- timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
- self.timeout = timeout * 1000
- def __enter__(self):
- """Set fetch values based on blocking status"""
- self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
- self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
- if self.block:
- self.consumer.fetch_max_wait_time = self.timeout
- self.consumer.fetch_min_bytes = 1
- else:
- self.consumer.fetch_min_bytes = 0
- def __exit__(self, type, value, traceback):
- """Reset values"""
- self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
- self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
- class SimpleConsumer(Consumer):
- """
- A simple consumer implementation that consumes all/specified partitions
- for a topic
- Arguments:
- client: a connected SimpleClient
- group: a name for this consumer, used for offset storage and must be unique
- If you are connecting to a server that does not support offset
- commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
- topic: the topic to consume
- Keyword Arguments:
- partitions: An optional list of partitions to consume the data from
- auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- fetch_size_bytes: number of bytes to request in a FetchRequest
- buffer_size: default 4K. Initial number of bytes to tell kafka we
- have available. This will double as needed.
- max_buffer_size: default 16K. Max number of bytes to tell kafka we have
- available. None means no limit.
- iter_timeout: default None. How much time (in seconds) to wait for a
- message in the iterator before exiting. None means no
- timeout, so it will wait forever.
- auto_offset_reset: default largest. Reset partition offsets upon
- OffsetOutOfRangeError. Valid values are largest and smallest.
- Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
- Auto commit details:
- If both auto_commit_every_n and auto_commit_every_t are set, they will
- reset one another when one is triggered. These triggers simply call the
- commit method on this class. A manual call to commit will also reset
- these triggers
- """
- def __init__(self, client, group, topic, auto_commit=True, partitions=None,
- auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES,
- buffer_size=FETCH_BUFFER_SIZE_BYTES,
- max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
- iter_timeout=None,
- auto_offset_reset='largest'):
- warnings.warn('deprecated - this class will be removed in a future'
- ' release. Use KafkaConsumer instead.',
- DeprecationWarning)
- super(SimpleConsumer, self).__init__(
- client, group, topic,
- partitions=partitions,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
- if max_buffer_size is not None and buffer_size > max_buffer_size:
- raise ValueError('buffer_size (%d) is greater than '
- 'max_buffer_size (%d)' %
- (buffer_size, max_buffer_size))
- self.buffer_size = buffer_size
- self.max_buffer_size = max_buffer_size
- self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.fetch_min_bytes = fetch_size_bytes
- self.fetch_offsets = self.offsets.copy()
- self.iter_timeout = iter_timeout
- self.auto_offset_reset = auto_offset_reset
- self.queue = queue.Queue()
- def __repr__(self):
- return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
- (self.group, self.topic, str(self.offsets.keys()))
- def reset_partition_offset(self, partition):
- """Update offsets using auto_offset_reset policy (smallest|largest)
- Arguments:
- partition (int): the partition for which offsets should be updated
- Returns: Updated offset on success, None on failure
- """
- LATEST = -1
- EARLIEST = -2
- if self.auto_offset_reset == 'largest':
- reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)]
- elif self.auto_offset_reset == 'smallest':
- reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)]
- else:
- # Let's raise an reasonable exception type if user calls
- # outside of an exception context
- if sys.exc_info() == (None, None, None):
- raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
- 'valid auto_offset_reset setting '
- '(largest|smallest)')
- # Otherwise we should re-raise the upstream exception
- # b/c it typically includes additional data about
- # the request that triggered it, and we do not want to drop that
- raise # pylint: disable=E0704
- # send_offset_request
- log.info('Resetting topic-partition offset to %s for %s:%d',
- self.auto_offset_reset, self.topic, partition)
- try:
- (resp, ) = self.client.send_offset_request(reqs)
- except KafkaError as e:
- log.error('%s sending offset request for %s:%d',
- e.__class__.__name__, self.topic, partition)
- else:
- self.offsets[partition] = resp.offsets[0]
- self.fetch_offsets[partition] = resp.offsets[0]
- return resp.offsets[0]
- def seek(self, offset, whence=None, partition=None):
- """
- Alter the current offset in the consumer, similar to fseek
- Arguments:
- offset: how much to modify the offset
- whence: where to modify it from, default is None
- * None is an absolute offset
- * 0 is relative to the earliest available offset (head)
- * 1 is relative to the current offset
- * 2 is relative to the latest known offset (tail)
- partition: modify which partition, default is None.
- If partition is None, would modify all partitions.
- """
- if whence is None: # set an absolute offset
- if partition is None:
- for tmp_partition in self.offsets:
- self.offsets[tmp_partition] = offset
- else:
- self.offsets[partition] = offset
- elif whence == 1: # relative to current position
- if partition is None:
- for tmp_partition, _offset in self.offsets.items():
- self.offsets[tmp_partition] = _offset + offset
- else:
- self.offsets[partition] += offset
- elif whence in (0, 2): # relative to beginning or end
- reqs = []
- deltas = {}
- if partition is None:
- # divide the request offset by number of partitions,
- # distribute the remained evenly
- (delta, rem) = divmod(offset, len(self.offsets))
- for tmp_partition, r in izip_longest(self.offsets.keys(),
- repeat(1, rem),
- fillvalue=0):
- deltas[tmp_partition] = delta + r
- for tmp_partition in self.offsets.keys():
- if whence == 0:
- reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1))
- elif whence == 2:
- reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1))
- else:
- pass
- else:
- deltas[partition] = offset
- if whence == 0:
- reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1))
- elif whence == 2:
- reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
- else:
- pass
- resps = self.client.send_offset_request(reqs)
- for resp in resps:
- self.offsets[resp.partition] = \
- resp.offsets[0] + deltas[resp.partition]
- else:
- raise ValueError('Unexpected value for `whence`, %d' % whence)
- # Reset queue and fetch offsets since they are invalid
- self.fetch_offsets = self.offsets.copy()
- self.count_since_commit += 1
- if self.auto_commit:
- self.commit()
- self.queue = queue.Queue()
- def get_messages(self, count=1, block=True, timeout=0.1):
- """
- Fetch the specified number of messages
- Keyword Arguments:
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till all messages are fetched.
- If block is a positive integer the API will block until that
- many messages are fetched.
- timeout: When blocking is requested the function will block for
- the specified time (in seconds) until count messages is
- fetched. If None, it will block forever.
- """
- messages = []
- if timeout is not None:
- timeout += time.time()
- new_offsets = {}
- log.debug('getting %d messages', count)
- while len(messages) < count:
- block_time = timeout - time.time()
- log.debug('calling _get_message block=%s timeout=%s', block, block_time)
- block_next_call = block is True or block > len(messages)
- result = self._get_message(block_next_call, block_time,
- get_partition_info=True,
- update_offset=False)
- log.debug('got %s from _get_messages', result)
- if not result:
- if block_next_call and (timeout is None or time.time() <= timeout):
- continue
- break
- partition, message = result
- _msg = (partition, message) if self.partition_info else message
- messages.append(_msg)
- new_offsets[partition] = message.offset + 1
- # Update and commit offsets if necessary
- self.offsets.update(new_offsets)
- self.count_since_commit += len(messages)
- self._auto_commit()
- log.debug('got %d messages: %s', len(messages), messages)
- return messages
- def get_message(self, block=True, timeout=0.1, get_partition_info=None):
- return self._get_message(block, timeout, get_partition_info)
- def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
- update_offset=True):
- """
- If no messages can be fetched, returns None.
- If get_partition_info is None, it defaults to self.partition_info
- If get_partition_info is True, returns (partition, message)
- If get_partition_info is False, returns message
- """
- start_at = time.time()
- while self.queue.empty():
- # We're out of messages, go grab some more.
- log.debug('internal queue empty, fetching more messages')
- with FetchContext(self, block, timeout):
- self._fetch()
- if not block or time.time() > (start_at + timeout):
- break
- try:
- partition, message = self.queue.get_nowait()
- if update_offset:
- # Update partition offset
- self.offsets[partition] = message.offset + 1
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
- if get_partition_info is None:
- get_partition_info = self.partition_info
- if get_partition_info:
- return partition, message
- else:
- return message
- except queue.Empty:
- log.debug('internal queue empty after fetch - returning None')
- return None
- def __iter__(self):
- if self.iter_timeout is None:
- timeout = ITER_TIMEOUT_SECONDS
- else:
- timeout = self.iter_timeout
- while True:
- message = self.get_message(True, timeout)
- if message:
- yield message
- elif self.iter_timeout is None:
- # We did not receive any message yet but we don't have a
- # timeout, so give up the CPU for a while before trying again
- time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
- else:
- # Timed out waiting for a message
- break
- def _fetch(self):
- # Create fetch request payloads for all the partitions
- partitions = dict((p, self.buffer_size)
- for p in self.fetch_offsets.keys())
- while partitions:
- requests = []
- for partition, buffer_size in six.iteritems(partitions):
- requests.append(FetchRequestPayload(self.topic, partition,
- self.fetch_offsets[partition],
- buffer_size))
- # Send request
- responses = self.client.send_fetch_request(
- requests,
- max_wait_time=int(self.fetch_max_wait_time),
- min_bytes=self.fetch_min_bytes,
- fail_on_error=False
- )
- retry_partitions = {}
- for resp in responses:
- try:
- check_error(resp)
- except UnknownTopicOrPartitionError:
- log.error('UnknownTopicOrPartitionError for %s:%d',
- resp.topic, resp.partition)
- self.client.reset_topic_metadata(resp.topic)
- raise
- except NotLeaderForPartitionError:
- log.error('NotLeaderForPartitionError for %s:%d',
- resp.topic, resp.partition)
- self.client.reset_topic_metadata(resp.topic)
- continue
- except OffsetOutOfRangeError:
- log.warning('OffsetOutOfRangeError for %s:%d. '
- 'Resetting partition offset...',
- resp.topic, resp.partition)
- self.reset_partition_offset(resp.partition)
- # Retry this partition
- retry_partitions[resp.partition] = partitions[resp.partition]
- continue
- except FailedPayloadsError as e:
- log.warning('FailedPayloadsError for %s:%d',
- e.payload.topic, e.payload.partition)
- # Retry this partition
- retry_partitions[e.payload.partition] = partitions[e.payload.partition]
- continue
- partition = resp.partition
- buffer_size = partitions[partition]
- # Check for partial message
- if resp.messages and isinstance(resp.messages[-1].message, PartialMessage):
- # If buffer is at max and all we got was a partial message
- # raise ConsumerFetchSizeTooSmall
- if (self.max_buffer_size is not None and
- buffer_size == self.max_buffer_size and
- len(resp.messages) == 1):
- log.error('Max fetch size %d too small', self.max_buffer_size)
- raise ConsumerFetchSizeTooSmall()
- if self.max_buffer_size is None:
- buffer_size *= 2
- else:
- buffer_size = min(buffer_size * 2, self.max_buffer_size)
- log.warning('Fetch size too small, increase to %d (2x) '
- 'and retry', buffer_size)
- retry_partitions[partition] = buffer_size
- resp.messages.pop()
- for message in resp.messages:
- if message.offset < self.fetch_offsets[partition]:
- log.debug('Skipping message %s because its offset is less than the consumer offset',
- message)
- continue
- # Put the message in our queue
- self.queue.put((partition, message))
- self.fetch_offsets[partition] = message.offset + 1
- partitions = retry_partitions
|