123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- from __future__ import absolute_import
- import inspect
- import sys
- class KafkaError(RuntimeError):
- retriable = False
- # whether metadata should be refreshed on error
- invalid_metadata = False
- def __str__(self):
- if not self.args:
- return self.__class__.__name__
- return '{0}: {1}'.format(self.__class__.__name__,
- super(KafkaError, self).__str__())
- class IllegalStateError(KafkaError):
- pass
- class IllegalArgumentError(KafkaError):
- pass
- class NoBrokersAvailable(KafkaError):
- retriable = True
- invalid_metadata = True
- class NodeNotReadyError(KafkaError):
- retriable = True
- class CorrelationIdError(KafkaError):
- retriable = True
- class Cancelled(KafkaError):
- retriable = True
- class TooManyInFlightRequests(KafkaError):
- retriable = True
- class StaleMetadata(KafkaError):
- retriable = True
- invalid_metadata = True
- class UnrecognizedBrokerVersion(KafkaError):
- pass
- class CommitFailedError(KafkaError):
- pass
- class AuthenticationMethodNotSupported(KafkaError):
- pass
- class AuthenticationFailedError(KafkaError):
- retriable = False
- class BrokerResponseError(KafkaError):
- errno = None
- message = None
- description = None
- def __str__(self):
- """Add errno to standard KafkaError str"""
- return '[Error {0}] {1}'.format(
- self.errno,
- super(BrokerResponseError, self).__str__())
- class NoError(BrokerResponseError):
- errno = 0
- message = 'NO_ERROR'
- description = 'No error--it worked!'
- class UnknownError(BrokerResponseError):
- errno = -1
- message = 'UNKNOWN'
- description = 'An unexpected server error.'
- class OffsetOutOfRangeError(BrokerResponseError):
- errno = 1
- message = 'OFFSET_OUT_OF_RANGE'
- description = ('The requested offset is outside the range of offsets'
- ' maintained by the server for the given topic/partition.')
- class InvalidMessageError(BrokerResponseError):
- errno = 2
- message = 'INVALID_MESSAGE'
- description = ('This message has failed its CRC checksum, exceeds the'
- ' valid size, or is otherwise corrupt.')
- class UnknownTopicOrPartitionError(BrokerResponseError):
- errno = 3
- message = 'UNKNOWN_TOPIC_OR_PARTITION'
- description = ('This request is for a topic or partition that does not'
- ' exist on this broker.')
- retriable = True
- invalid_metadata = True
- class InvalidFetchRequestError(BrokerResponseError):
- errno = 4
- message = 'INVALID_FETCH_SIZE'
- description = 'The message has a negative size.'
- class LeaderNotAvailableError(BrokerResponseError):
- errno = 5
- message = 'LEADER_NOT_AVAILABLE'
- description = ('This error is thrown if we are in the middle of a'
- ' leadership election and there is currently no leader for'
- ' this partition and hence it is unavailable for writes.')
- retriable = True
- invalid_metadata = True
- class NotLeaderForPartitionError(BrokerResponseError):
- errno = 6
- message = 'NOT_LEADER_FOR_PARTITION'
- description = ('This error is thrown if the client attempts to send'
- ' messages to a replica that is not the leader for some'
- ' partition. It indicates that the clients metadata is out'
- ' of date.')
- retriable = True
- invalid_metadata = True
- class RequestTimedOutError(BrokerResponseError):
- errno = 7
- message = 'REQUEST_TIMED_OUT'
- description = ('This error is thrown if the request exceeds the'
- ' user-specified time limit in the request.')
- retriable = True
- class BrokerNotAvailableError(BrokerResponseError):
- errno = 8
- message = 'BROKER_NOT_AVAILABLE'
- description = ('This is not a client facing error and is used mostly by'
- ' tools when a broker is not alive.')
- class ReplicaNotAvailableError(BrokerResponseError):
- errno = 9
- message = 'REPLICA_NOT_AVAILABLE'
- description = ('If replica is expected on a broker, but is not (this can be'
- ' safely ignored).')
- class MessageSizeTooLargeError(BrokerResponseError):
- errno = 10
- message = 'MESSAGE_SIZE_TOO_LARGE'
- description = ('The server has a configurable maximum message size to avoid'
- ' unbounded memory allocation. This error is thrown if the'
- ' client attempt to produce a message larger than this'
- ' maximum.')
- class StaleControllerEpochError(BrokerResponseError):
- errno = 11
- message = 'STALE_CONTROLLER_EPOCH'
- description = 'Internal error code for broker-to-broker communication.'
- class OffsetMetadataTooLargeError(BrokerResponseError):
- errno = 12
- message = 'OFFSET_METADATA_TOO_LARGE'
- description = ('If you specify a string larger than configured maximum for'
- ' offset metadata.')
- # TODO is this deprecated? https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
- class StaleLeaderEpochCodeError(BrokerResponseError):
- errno = 13
- message = 'STALE_LEADER_EPOCH_CODE'
- class GroupLoadInProgressError(BrokerResponseError):
- errno = 14
- message = 'OFFSETS_LOAD_IN_PROGRESS'
- description = ('The broker returns this error code for an offset fetch'
- ' request if it is still loading offsets (after a leader'
- ' change for that offsets topic partition), or in response'
- ' to group membership requests (such as heartbeats) when'
- ' group metadata is being loaded by the coordinator.')
- retriable = True
- class GroupCoordinatorNotAvailableError(BrokerResponseError):
- errno = 15
- message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE'
- description = ('The broker returns this error code for group coordinator'
- ' requests, offset commits, and most group management'
- ' requests if the offsets topic has not yet been created, or'
- ' if the group coordinator is not active.')
- retriable = True
- class NotCoordinatorForGroupError(BrokerResponseError):
- errno = 16
- message = 'NOT_COORDINATOR_FOR_CONSUMER'
- description = ('The broker returns this error code if it receives an offset'
- ' fetch or commit request for a group that it is not a'
- ' coordinator for.')
- retriable = True
- class InvalidTopicError(BrokerResponseError):
- errno = 17
- message = 'INVALID_TOPIC'
- description = ('For a request which attempts to access an invalid topic'
- ' (e.g. one which has an illegal name), or if an attempt'
- ' is made to write to an internal topic (such as the'
- ' consumer offsets topic).')
- class RecordListTooLargeError(BrokerResponseError):
- errno = 18
- message = 'RECORD_LIST_TOO_LARGE'
- description = ('If a message batch in a produce request exceeds the maximum'
- ' configured segment size.')
- class NotEnoughReplicasError(BrokerResponseError):
- errno = 19
- message = 'NOT_ENOUGH_REPLICAS'
- description = ('Returned from a produce request when the number of in-sync'
- ' replicas is lower than the configured minimum and'
- ' requiredAcks is -1.')
- class NotEnoughReplicasAfterAppendError(BrokerResponseError):
- errno = 20
- message = 'NOT_ENOUGH_REPLICAS_AFTER_APPEND'
- description = ('Returned from a produce request when the message was'
- ' written to the log, but with fewer in-sync replicas than'
- ' required.')
- class InvalidRequiredAcksError(BrokerResponseError):
- errno = 21
- message = 'INVALID_REQUIRED_ACKS'
- description = ('Returned from a produce request if the requested'
- ' requiredAcks is invalid (anything other than -1, 1, or 0).')
- class IllegalGenerationError(BrokerResponseError):
- errno = 22
- message = 'ILLEGAL_GENERATION'
- description = ('Returned from group membership requests (such as heartbeats)'
- ' when the generation id provided in the request is not the'
- ' current generation.')
- class InconsistentGroupProtocolError(BrokerResponseError):
- errno = 23
- message = 'INCONSISTENT_GROUP_PROTOCOL'
- description = ('Returned in join group when the member provides a protocol'
- ' type or set of protocols which is not compatible with the'
- ' current group.')
- class InvalidGroupIdError(BrokerResponseError):
- errno = 24
- message = 'INVALID_GROUP_ID'
- description = 'Returned in join group when the groupId is empty or null.'
- class UnknownMemberIdError(BrokerResponseError):
- errno = 25
- message = 'UNKNOWN_MEMBER_ID'
- description = ('Returned from group requests (offset commits/fetches,'
- ' heartbeats, etc) when the memberId is not in the current'
- ' generation.')
- class InvalidSessionTimeoutError(BrokerResponseError):
- errno = 26
- message = 'INVALID_SESSION_TIMEOUT'
- description = ('Return in join group when the requested session timeout is'
- ' outside of the allowed range on the broker')
- class RebalanceInProgressError(BrokerResponseError):
- errno = 27
- message = 'REBALANCE_IN_PROGRESS'
- description = ('Returned in heartbeat requests when the coordinator has'
- ' begun rebalancing the group. This indicates to the client'
- ' that it should rejoin the group.')
- class InvalidCommitOffsetSizeError(BrokerResponseError):
- errno = 28
- message = 'INVALID_COMMIT_OFFSET_SIZE'
- description = ('This error indicates that an offset commit was rejected'
- ' because of oversize metadata.')
- class TopicAuthorizationFailedError(BrokerResponseError):
- errno = 29
- message = 'TOPIC_AUTHORIZATION_FAILED'
- description = ('Returned by the broker when the client is not authorized to'
- ' access the requested topic.')
- class GroupAuthorizationFailedError(BrokerResponseError):
- errno = 30
- message = 'GROUP_AUTHORIZATION_FAILED'
- description = ('Returned by the broker when the client is not authorized to'
- ' access a particular groupId.')
- class ClusterAuthorizationFailedError(BrokerResponseError):
- errno = 31
- message = 'CLUSTER_AUTHORIZATION_FAILED'
- description = ('Returned by the broker when the client is not authorized to'
- ' use an inter-broker or administrative API.')
- class InvalidTimestampError(BrokerResponseError):
- errno = 32
- message = 'INVALID_TIMESTAMP'
- description = 'The timestamp of the message is out of acceptable range.'
- class UnsupportedSaslMechanismError(BrokerResponseError):
- errno = 33
- message = 'UNSUPPORTED_SASL_MECHANISM'
- description = 'The broker does not support the requested SASL mechanism.'
- class IllegalSaslStateError(BrokerResponseError):
- errno = 34
- message = 'ILLEGAL_SASL_STATE'
- description = 'Request is not valid given the current SASL state.'
- class UnsupportedVersionError(BrokerResponseError):
- errno = 35
- message = 'UNSUPPORTED_VERSION'
- description = 'The version of API is not supported.'
- class TopicAlreadyExistsError(BrokerResponseError):
- errno = 36
- message = 'TOPIC_ALREADY_EXISTS'
- description = 'Topic with this name already exists.'
- class InvalidPartitionsError(BrokerResponseError):
- errno = 37
- message = 'INVALID_PARTITIONS'
- description = 'Number of partitions is invalid.'
- class InvalidReplicationFactorError(BrokerResponseError):
- errno = 38
- message = 'INVALID_REPLICATION_FACTOR'
- description = 'Replication-factor is invalid.'
- class InvalidReplicationAssignmentError(BrokerResponseError):
- errno = 39
- message = 'INVALID_REPLICATION_ASSIGNMENT'
- description = 'Replication assignment is invalid.'
- class InvalidConfigurationError(BrokerResponseError):
- errno = 40
- message = 'INVALID_CONFIG'
- description = 'Configuration is invalid.'
- class NotControllerError(BrokerResponseError):
- errno = 41
- message = 'NOT_CONTROLLER'
- description = 'This is not the correct controller for this cluster.'
- retriable = True
- class InvalidRequestError(BrokerResponseError):
- errno = 42
- message = 'INVALID_REQUEST'
- description = ('This most likely occurs because of a request being'
- ' malformed by the client library or the message was'
- ' sent to an incompatible broker. See the broker logs'
- ' for more details.')
- class UnsupportedForMessageFormatError(BrokerResponseError):
- errno = 43
- message = 'UNSUPPORTED_FOR_MESSAGE_FORMAT'
- description = ('The message format version on the broker does not'
- ' support this request.')
- class PolicyViolationError(BrokerResponseError):
- errno = 44
- message = 'POLICY_VIOLATION'
- description = 'Request parameters do not satisfy the configured policy.'
- class KafkaUnavailableError(KafkaError):
- pass
- class KafkaTimeoutError(KafkaError):
- pass
- class FailedPayloadsError(KafkaError):
- def __init__(self, payload, *args):
- super(FailedPayloadsError, self).__init__(*args)
- self.payload = payload
- class ConnectionError(KafkaError):
- retriable = True
- invalid_metadata = True
- class BufferUnderflowError(KafkaError):
- pass
- class ChecksumError(KafkaError):
- pass
- class ConsumerFetchSizeTooSmall(KafkaError):
- pass
- class ConsumerNoMoreData(KafkaError):
- pass
- class ConsumerTimeout(KafkaError):
- pass
- class ProtocolError(KafkaError):
- pass
- class UnsupportedCodecError(KafkaError):
- pass
- class KafkaConfigurationError(KafkaError):
- pass
- class QuotaViolationError(KafkaError):
- pass
- class AsyncProducerQueueFull(KafkaError):
- def __init__(self, failed_msgs, *args):
- super(AsyncProducerQueueFull, self).__init__(*args)
- self.failed_msgs = failed_msgs
- def _iter_broker_errors():
- for name, obj in inspect.getmembers(sys.modules[__name__]):
- if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
- yield obj
- kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
- def for_code(error_code):
- return kafka_errors.get(error_code, UnknownError)
- def check_error(response):
- if isinstance(response, Exception):
- raise response
- if response.error:
- error_class = kafka_errors.get(response.error, UnknownError)
- raise error_class(response)
- RETRY_BACKOFF_ERROR_TYPES = (
- KafkaUnavailableError, LeaderNotAvailableError,
- ConnectionError, FailedPayloadsError
- )
- RETRY_REFRESH_ERROR_TYPES = (
- NotLeaderForPartitionError, UnknownTopicOrPartitionError,
- LeaderNotAvailableError, ConnectionError
- )
- RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES
|