errors.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. from __future__ import absolute_import
  2. import inspect
  3. import sys
  4. class KafkaError(RuntimeError):
  5. retriable = False
  6. # whether metadata should be refreshed on error
  7. invalid_metadata = False
  8. def __str__(self):
  9. if not self.args:
  10. return self.__class__.__name__
  11. return '{0}: {1}'.format(self.__class__.__name__,
  12. super(KafkaError, self).__str__())
  13. class IllegalStateError(KafkaError):
  14. pass
  15. class IllegalArgumentError(KafkaError):
  16. pass
  17. class NoBrokersAvailable(KafkaError):
  18. retriable = True
  19. invalid_metadata = True
  20. class NodeNotReadyError(KafkaError):
  21. retriable = True
  22. class CorrelationIdError(KafkaError):
  23. retriable = True
  24. class Cancelled(KafkaError):
  25. retriable = True
  26. class TooManyInFlightRequests(KafkaError):
  27. retriable = True
  28. class StaleMetadata(KafkaError):
  29. retriable = True
  30. invalid_metadata = True
  31. class UnrecognizedBrokerVersion(KafkaError):
  32. pass
  33. class CommitFailedError(KafkaError):
  34. pass
  35. class AuthenticationMethodNotSupported(KafkaError):
  36. pass
  37. class AuthenticationFailedError(KafkaError):
  38. retriable = False
  39. class BrokerResponseError(KafkaError):
  40. errno = None
  41. message = None
  42. description = None
  43. def __str__(self):
  44. """Add errno to standard KafkaError str"""
  45. return '[Error {0}] {1}'.format(
  46. self.errno,
  47. super(BrokerResponseError, self).__str__())
  48. class NoError(BrokerResponseError):
  49. errno = 0
  50. message = 'NO_ERROR'
  51. description = 'No error--it worked!'
  52. class UnknownError(BrokerResponseError):
  53. errno = -1
  54. message = 'UNKNOWN'
  55. description = 'An unexpected server error.'
  56. class OffsetOutOfRangeError(BrokerResponseError):
  57. errno = 1
  58. message = 'OFFSET_OUT_OF_RANGE'
  59. description = ('The requested offset is outside the range of offsets'
  60. ' maintained by the server for the given topic/partition.')
  61. class InvalidMessageError(BrokerResponseError):
  62. errno = 2
  63. message = 'INVALID_MESSAGE'
  64. description = ('This message has failed its CRC checksum, exceeds the'
  65. ' valid size, or is otherwise corrupt.')
  66. class UnknownTopicOrPartitionError(BrokerResponseError):
  67. errno = 3
  68. message = 'UNKNOWN_TOPIC_OR_PARTITION'
  69. description = ('This request is for a topic or partition that does not'
  70. ' exist on this broker.')
  71. retriable = True
  72. invalid_metadata = True
  73. class InvalidFetchRequestError(BrokerResponseError):
  74. errno = 4
  75. message = 'INVALID_FETCH_SIZE'
  76. description = 'The message has a negative size.'
  77. class LeaderNotAvailableError(BrokerResponseError):
  78. errno = 5
  79. message = 'LEADER_NOT_AVAILABLE'
  80. description = ('This error is thrown if we are in the middle of a'
  81. ' leadership election and there is currently no leader for'
  82. ' this partition and hence it is unavailable for writes.')
  83. retriable = True
  84. invalid_metadata = True
  85. class NotLeaderForPartitionError(BrokerResponseError):
  86. errno = 6
  87. message = 'NOT_LEADER_FOR_PARTITION'
  88. description = ('This error is thrown if the client attempts to send'
  89. ' messages to a replica that is not the leader for some'
  90. ' partition. It indicates that the clients metadata is out'
  91. ' of date.')
  92. retriable = True
  93. invalid_metadata = True
  94. class RequestTimedOutError(BrokerResponseError):
  95. errno = 7
  96. message = 'REQUEST_TIMED_OUT'
  97. description = ('This error is thrown if the request exceeds the'
  98. ' user-specified time limit in the request.')
  99. retriable = True
  100. class BrokerNotAvailableError(BrokerResponseError):
  101. errno = 8
  102. message = 'BROKER_NOT_AVAILABLE'
  103. description = ('This is not a client facing error and is used mostly by'
  104. ' tools when a broker is not alive.')
  105. class ReplicaNotAvailableError(BrokerResponseError):
  106. errno = 9
  107. message = 'REPLICA_NOT_AVAILABLE'
  108. description = ('If replica is expected on a broker, but is not (this can be'
  109. ' safely ignored).')
  110. class MessageSizeTooLargeError(BrokerResponseError):
  111. errno = 10
  112. message = 'MESSAGE_SIZE_TOO_LARGE'
  113. description = ('The server has a configurable maximum message size to avoid'
  114. ' unbounded memory allocation. This error is thrown if the'
  115. ' client attempt to produce a message larger than this'
  116. ' maximum.')
  117. class StaleControllerEpochError(BrokerResponseError):
  118. errno = 11
  119. message = 'STALE_CONTROLLER_EPOCH'
  120. description = 'Internal error code for broker-to-broker communication.'
  121. class OffsetMetadataTooLargeError(BrokerResponseError):
  122. errno = 12
  123. message = 'OFFSET_METADATA_TOO_LARGE'
  124. description = ('If you specify a string larger than configured maximum for'
  125. ' offset metadata.')
  126. # TODO is this deprecated? https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
  127. class StaleLeaderEpochCodeError(BrokerResponseError):
  128. errno = 13
  129. message = 'STALE_LEADER_EPOCH_CODE'
  130. class GroupLoadInProgressError(BrokerResponseError):
  131. errno = 14
  132. message = 'OFFSETS_LOAD_IN_PROGRESS'
  133. description = ('The broker returns this error code for an offset fetch'
  134. ' request if it is still loading offsets (after a leader'
  135. ' change for that offsets topic partition), or in response'
  136. ' to group membership requests (such as heartbeats) when'
  137. ' group metadata is being loaded by the coordinator.')
  138. retriable = True
  139. class GroupCoordinatorNotAvailableError(BrokerResponseError):
  140. errno = 15
  141. message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE'
  142. description = ('The broker returns this error code for group coordinator'
  143. ' requests, offset commits, and most group management'
  144. ' requests if the offsets topic has not yet been created, or'
  145. ' if the group coordinator is not active.')
  146. retriable = True
  147. class NotCoordinatorForGroupError(BrokerResponseError):
  148. errno = 16
  149. message = 'NOT_COORDINATOR_FOR_CONSUMER'
  150. description = ('The broker returns this error code if it receives an offset'
  151. ' fetch or commit request for a group that it is not a'
  152. ' coordinator for.')
  153. retriable = True
  154. class InvalidTopicError(BrokerResponseError):
  155. errno = 17
  156. message = 'INVALID_TOPIC'
  157. description = ('For a request which attempts to access an invalid topic'
  158. ' (e.g. one which has an illegal name), or if an attempt'
  159. ' is made to write to an internal topic (such as the'
  160. ' consumer offsets topic).')
  161. class RecordListTooLargeError(BrokerResponseError):
  162. errno = 18
  163. message = 'RECORD_LIST_TOO_LARGE'
  164. description = ('If a message batch in a produce request exceeds the maximum'
  165. ' configured segment size.')
  166. class NotEnoughReplicasError(BrokerResponseError):
  167. errno = 19
  168. message = 'NOT_ENOUGH_REPLICAS'
  169. description = ('Returned from a produce request when the number of in-sync'
  170. ' replicas is lower than the configured minimum and'
  171. ' requiredAcks is -1.')
  172. class NotEnoughReplicasAfterAppendError(BrokerResponseError):
  173. errno = 20
  174. message = 'NOT_ENOUGH_REPLICAS_AFTER_APPEND'
  175. description = ('Returned from a produce request when the message was'
  176. ' written to the log, but with fewer in-sync replicas than'
  177. ' required.')
  178. class InvalidRequiredAcksError(BrokerResponseError):
  179. errno = 21
  180. message = 'INVALID_REQUIRED_ACKS'
  181. description = ('Returned from a produce request if the requested'
  182. ' requiredAcks is invalid (anything other than -1, 1, or 0).')
  183. class IllegalGenerationError(BrokerResponseError):
  184. errno = 22
  185. message = 'ILLEGAL_GENERATION'
  186. description = ('Returned from group membership requests (such as heartbeats)'
  187. ' when the generation id provided in the request is not the'
  188. ' current generation.')
  189. class InconsistentGroupProtocolError(BrokerResponseError):
  190. errno = 23
  191. message = 'INCONSISTENT_GROUP_PROTOCOL'
  192. description = ('Returned in join group when the member provides a protocol'
  193. ' type or set of protocols which is not compatible with the'
  194. ' current group.')
  195. class InvalidGroupIdError(BrokerResponseError):
  196. errno = 24
  197. message = 'INVALID_GROUP_ID'
  198. description = 'Returned in join group when the groupId is empty or null.'
  199. class UnknownMemberIdError(BrokerResponseError):
  200. errno = 25
  201. message = 'UNKNOWN_MEMBER_ID'
  202. description = ('Returned from group requests (offset commits/fetches,'
  203. ' heartbeats, etc) when the memberId is not in the current'
  204. ' generation.')
  205. class InvalidSessionTimeoutError(BrokerResponseError):
  206. errno = 26
  207. message = 'INVALID_SESSION_TIMEOUT'
  208. description = ('Return in join group when the requested session timeout is'
  209. ' outside of the allowed range on the broker')
  210. class RebalanceInProgressError(BrokerResponseError):
  211. errno = 27
  212. message = 'REBALANCE_IN_PROGRESS'
  213. description = ('Returned in heartbeat requests when the coordinator has'
  214. ' begun rebalancing the group. This indicates to the client'
  215. ' that it should rejoin the group.')
  216. class InvalidCommitOffsetSizeError(BrokerResponseError):
  217. errno = 28
  218. message = 'INVALID_COMMIT_OFFSET_SIZE'
  219. description = ('This error indicates that an offset commit was rejected'
  220. ' because of oversize metadata.')
  221. class TopicAuthorizationFailedError(BrokerResponseError):
  222. errno = 29
  223. message = 'TOPIC_AUTHORIZATION_FAILED'
  224. description = ('Returned by the broker when the client is not authorized to'
  225. ' access the requested topic.')
  226. class GroupAuthorizationFailedError(BrokerResponseError):
  227. errno = 30
  228. message = 'GROUP_AUTHORIZATION_FAILED'
  229. description = ('Returned by the broker when the client is not authorized to'
  230. ' access a particular groupId.')
  231. class ClusterAuthorizationFailedError(BrokerResponseError):
  232. errno = 31
  233. message = 'CLUSTER_AUTHORIZATION_FAILED'
  234. description = ('Returned by the broker when the client is not authorized to'
  235. ' use an inter-broker or administrative API.')
  236. class InvalidTimestampError(BrokerResponseError):
  237. errno = 32
  238. message = 'INVALID_TIMESTAMP'
  239. description = 'The timestamp of the message is out of acceptable range.'
  240. class UnsupportedSaslMechanismError(BrokerResponseError):
  241. errno = 33
  242. message = 'UNSUPPORTED_SASL_MECHANISM'
  243. description = 'The broker does not support the requested SASL mechanism.'
  244. class IllegalSaslStateError(BrokerResponseError):
  245. errno = 34
  246. message = 'ILLEGAL_SASL_STATE'
  247. description = 'Request is not valid given the current SASL state.'
  248. class UnsupportedVersionError(BrokerResponseError):
  249. errno = 35
  250. message = 'UNSUPPORTED_VERSION'
  251. description = 'The version of API is not supported.'
  252. class TopicAlreadyExistsError(BrokerResponseError):
  253. errno = 36
  254. message = 'TOPIC_ALREADY_EXISTS'
  255. description = 'Topic with this name already exists.'
  256. class InvalidPartitionsError(BrokerResponseError):
  257. errno = 37
  258. message = 'INVALID_PARTITIONS'
  259. description = 'Number of partitions is invalid.'
  260. class InvalidReplicationFactorError(BrokerResponseError):
  261. errno = 38
  262. message = 'INVALID_REPLICATION_FACTOR'
  263. description = 'Replication-factor is invalid.'
  264. class InvalidReplicationAssignmentError(BrokerResponseError):
  265. errno = 39
  266. message = 'INVALID_REPLICATION_ASSIGNMENT'
  267. description = 'Replication assignment is invalid.'
  268. class InvalidConfigurationError(BrokerResponseError):
  269. errno = 40
  270. message = 'INVALID_CONFIG'
  271. description = 'Configuration is invalid.'
  272. class NotControllerError(BrokerResponseError):
  273. errno = 41
  274. message = 'NOT_CONTROLLER'
  275. description = 'This is not the correct controller for this cluster.'
  276. retriable = True
  277. class InvalidRequestError(BrokerResponseError):
  278. errno = 42
  279. message = 'INVALID_REQUEST'
  280. description = ('This most likely occurs because of a request being'
  281. ' malformed by the client library or the message was'
  282. ' sent to an incompatible broker. See the broker logs'
  283. ' for more details.')
  284. class UnsupportedForMessageFormatError(BrokerResponseError):
  285. errno = 43
  286. message = 'UNSUPPORTED_FOR_MESSAGE_FORMAT'
  287. description = ('The message format version on the broker does not'
  288. ' support this request.')
  289. class PolicyViolationError(BrokerResponseError):
  290. errno = 44
  291. message = 'POLICY_VIOLATION'
  292. description = 'Request parameters do not satisfy the configured policy.'
  293. class KafkaUnavailableError(KafkaError):
  294. pass
  295. class KafkaTimeoutError(KafkaError):
  296. pass
  297. class FailedPayloadsError(KafkaError):
  298. def __init__(self, payload, *args):
  299. super(FailedPayloadsError, self).__init__(*args)
  300. self.payload = payload
  301. class ConnectionError(KafkaError):
  302. retriable = True
  303. invalid_metadata = True
  304. class BufferUnderflowError(KafkaError):
  305. pass
  306. class ChecksumError(KafkaError):
  307. pass
  308. class ConsumerFetchSizeTooSmall(KafkaError):
  309. pass
  310. class ConsumerNoMoreData(KafkaError):
  311. pass
  312. class ConsumerTimeout(KafkaError):
  313. pass
  314. class ProtocolError(KafkaError):
  315. pass
  316. class UnsupportedCodecError(KafkaError):
  317. pass
  318. class KafkaConfigurationError(KafkaError):
  319. pass
  320. class QuotaViolationError(KafkaError):
  321. pass
  322. class AsyncProducerQueueFull(KafkaError):
  323. def __init__(self, failed_msgs, *args):
  324. super(AsyncProducerQueueFull, self).__init__(*args)
  325. self.failed_msgs = failed_msgs
  326. def _iter_broker_errors():
  327. for name, obj in inspect.getmembers(sys.modules[__name__]):
  328. if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
  329. yield obj
  330. kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
  331. def for_code(error_code):
  332. return kafka_errors.get(error_code, UnknownError)
  333. def check_error(response):
  334. if isinstance(response, Exception):
  335. raise response
  336. if response.error:
  337. error_class = kafka_errors.get(response.error, UnknownError)
  338. raise error_class(response)
  339. RETRY_BACKOFF_ERROR_TYPES = (
  340. KafkaUnavailableError, LeaderNotAvailableError,
  341. ConnectionError, FailedPayloadsError
  342. )
  343. RETRY_REFRESH_ERROR_TYPES = (
  344. NotLeaderForPartitionError, UnknownTopicOrPartitionError,
  345. LeaderNotAvailableError, ConnectionError
  346. )
  347. RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES