base.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. from __future__ import absolute_import
  2. import atexit
  3. import logging
  4. import time
  5. try:
  6. from queue import Empty, Full, Queue # pylint: disable=import-error
  7. except ImportError:
  8. from Queue import Empty, Full, Queue # pylint: disable=import-error
  9. from collections import defaultdict
  10. from threading import Thread, Event
  11. from kafka.vendor import six
  12. from kafka.structs import (
  13. ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
  14. from kafka.errors import (
  15. kafka_errors, UnsupportedCodecError, FailedPayloadsError,
  16. RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
  17. RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
  18. from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
  19. log = logging.getLogger('kafka.producer')
  20. BATCH_SEND_DEFAULT_INTERVAL = 20
  21. BATCH_SEND_MSG_COUNT = 20
  22. # unlimited
  23. ASYNC_QUEUE_MAXSIZE = 0
  24. ASYNC_QUEUE_PUT_TIMEOUT = 0
  25. # unlimited retries by default
  26. ASYNC_RETRY_LIMIT = None
  27. ASYNC_RETRY_BACKOFF_MS = 100
  28. ASYNC_RETRY_ON_TIMEOUTS = True
  29. ASYNC_LOG_MESSAGES_ON_ERROR = True
  30. STOP_ASYNC_PRODUCER = -1
  31. ASYNC_STOP_TIMEOUT_SECS = 30
  32. SYNC_FAIL_ON_ERROR_DEFAULT = True
  33. def _send_upstream(queue, client, codec, batch_time, batch_size,
  34. req_acks, ack_timeout, retry_options, stop_event,
  35. log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
  36. stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
  37. codec_compresslevel=None):
  38. """Private method to manage producing messages asynchronously
  39. Listens on the queue for a specified number of messages or until
  40. a specified timeout and then sends messages to the brokers in grouped
  41. requests (one per broker).
  42. Messages placed on the queue should be tuples that conform to this format:
  43. ((topic, partition), message, key)
  44. Currently does not mark messages with task_done. Do not attempt to
  45. :meth:`join`!
  46. Arguments:
  47. queue (threading.Queue): the queue from which to get messages
  48. client (kafka.SimpleClient): instance to use for communicating
  49. with brokers
  50. codec (kafka.protocol.ALL_CODECS): compression codec to use
  51. batch_time (int): interval in seconds to send message batches
  52. batch_size (int): count of messages that will trigger an immediate send
  53. req_acks: required acks to use with ProduceRequests. see server protocol
  54. ack_timeout: timeout to wait for required acks. see server protocol
  55. retry_options (RetryOptions): settings for retry limits, backoff etc
  56. stop_event (threading.Event): event to monitor for shutdown signal.
  57. when this event is 'set', the producer will stop sending messages.
  58. log_messages_on_error (bool, optional): log stringified message-contents
  59. on any produce error, otherwise only log a hash() of the contents,
  60. defaults to True.
  61. stop_timeout (int or float, optional): number of seconds to continue
  62. retrying messages after stop_event is set, defaults to 30.
  63. """
  64. request_tries = {}
  65. while not stop_event.is_set():
  66. try:
  67. client.reinit()
  68. except Exception as e:
  69. log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
  70. time.sleep(float(retry_options.backoff_ms) / 1000)
  71. else:
  72. break
  73. stop_at = None
  74. while not (stop_event.is_set() and queue.empty() and not request_tries):
  75. # Handle stop_timeout
  76. if stop_event.is_set():
  77. if not stop_at:
  78. stop_at = stop_timeout + time.time()
  79. if time.time() > stop_at:
  80. log.debug('Async producer stopping due to stop_timeout')
  81. break
  82. timeout = batch_time
  83. count = batch_size
  84. send_at = time.time() + timeout
  85. msgset = defaultdict(list)
  86. # Merging messages will require a bit more work to manage correctly
  87. # for now, don't look for new batches if we have old ones to retry
  88. if request_tries:
  89. count = 0
  90. log.debug('Skipping new batch collection to handle retries')
  91. else:
  92. log.debug('Batching size: %s, timeout: %s', count, timeout)
  93. # Keep fetching till we gather enough messages or a
  94. # timeout is reached
  95. while count > 0 and timeout >= 0:
  96. try:
  97. topic_partition, msg, key = queue.get(timeout=timeout)
  98. except Empty:
  99. break
  100. # Check if the controller has requested us to stop
  101. if topic_partition == STOP_ASYNC_PRODUCER:
  102. stop_event.set()
  103. break
  104. # Adjust the timeout to match the remaining period
  105. count -= 1
  106. timeout = send_at - time.time()
  107. msgset[topic_partition].append((msg, key))
  108. # Send collected requests upstream
  109. for topic_partition, msg in msgset.items():
  110. messages = create_message_set(msg, codec, key, codec_compresslevel)
  111. req = ProduceRequestPayload(
  112. topic_partition.topic,
  113. topic_partition.partition,
  114. tuple(messages))
  115. request_tries[req] = 0
  116. if not request_tries:
  117. continue
  118. reqs_to_retry, error_cls = [], None
  119. retry_state = {
  120. 'do_backoff': False,
  121. 'do_refresh': False
  122. }
  123. def _handle_error(error_cls, request):
  124. if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)):
  125. reqs_to_retry.append(request)
  126. if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES):
  127. retry_state['do_backoff'] |= True
  128. if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
  129. retry_state['do_refresh'] |= True
  130. requests = list(request_tries.keys())
  131. log.debug('Sending: %s', requests)
  132. responses = client.send_produce_request(requests,
  133. acks=req_acks,
  134. timeout=ack_timeout,
  135. fail_on_error=False)
  136. log.debug('Received: %s', responses)
  137. for i, response in enumerate(responses):
  138. error_cls = None
  139. if isinstance(response, FailedPayloadsError):
  140. error_cls = response.__class__
  141. orig_req = response.payload
  142. elif isinstance(response, ProduceResponsePayload) and response.error:
  143. error_cls = kafka_errors.get(response.error, UnknownError)
  144. orig_req = requests[i]
  145. if error_cls:
  146. _handle_error(error_cls, orig_req)
  147. log.error('%s sending ProduceRequestPayload (#%d of %d) '
  148. 'to %s:%d with msgs %s',
  149. error_cls.__name__, (i + 1), len(requests),
  150. orig_req.topic, orig_req.partition,
  151. orig_req.messages if log_messages_on_error
  152. else hash(orig_req.messages))
  153. if not reqs_to_retry:
  154. request_tries = {}
  155. continue
  156. # doing backoff before next retry
  157. if retry_state['do_backoff'] and retry_options.backoff_ms:
  158. log.warn('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms)
  159. time.sleep(float(retry_options.backoff_ms) / 1000)
  160. # refresh topic metadata before next retry
  161. if retry_state['do_refresh']:
  162. log.warn('Async producer forcing metadata refresh metadata before retrying')
  163. try:
  164. client.load_metadata_for_topics()
  165. except Exception:
  166. log.exception("Async producer couldn't reload topic metadata.")
  167. # Apply retry limit, dropping messages that are over
  168. request_tries = dict(
  169. (key, count + 1)
  170. for (key, count) in request_tries.items()
  171. if key in reqs_to_retry
  172. and (retry_options.limit is None
  173. or (count < retry_options.limit))
  174. )
  175. # Log messages we are going to retry
  176. for orig_req in request_tries.keys():
  177. log.info('Retrying ProduceRequestPayload to %s:%d with msgs %s',
  178. orig_req.topic, orig_req.partition,
  179. orig_req.messages if log_messages_on_error
  180. else hash(orig_req.messages))
  181. if request_tries or not queue.empty():
  182. log.error('Stopped producer with %d unsent messages', len(request_tries) + queue.qsize())
  183. class Producer(object):
  184. """
  185. Base class to be used by producers
  186. Arguments:
  187. client (kafka.SimpleClient): instance to use for broker
  188. communications. If async=True, the background thread will use
  189. :meth:`client.copy`, which is expected to return a thread-safe
  190. object.
  191. codec (kafka.protocol.ALL_CODECS): compression codec to use.
  192. req_acks (int, optional): A value indicating the acknowledgements that
  193. the server must receive before responding to the request,
  194. defaults to 1 (local ack).
  195. ack_timeout (int, optional): millisecond timeout to wait for the
  196. configured req_acks, defaults to 1000.
  197. sync_fail_on_error (bool, optional): whether sync producer should
  198. raise exceptions (True), or just return errors (False),
  199. defaults to True.
  200. async (bool, optional): send message using a background thread,
  201. defaults to False.
  202. batch_send_every_n (int, optional): If async is True, messages are
  203. sent in batches of this size, defaults to 20.
  204. batch_send_every_t (int or float, optional): If async is True,
  205. messages are sent immediately after this timeout in seconds, even
  206. if there are fewer than batch_send_every_n, defaults to 20.
  207. async_retry_limit (int, optional): number of retries for failed messages
  208. or None for unlimited, defaults to None / unlimited.
  209. async_retry_backoff_ms (int, optional): milliseconds to backoff on
  210. failed messages, defaults to 100.
  211. async_retry_on_timeouts (bool, optional): whether to retry on
  212. RequestTimedOutError, defaults to True.
  213. async_queue_maxsize (int, optional): limit to the size of the
  214. internal message queue in number of messages (not size), defaults
  215. to 0 (no limit).
  216. async_queue_put_timeout (int or float, optional): timeout seconds
  217. for queue.put in send_messages for async producers -- will only
  218. apply if async_queue_maxsize > 0 and the queue is Full,
  219. defaults to 0 (fail immediately on full queue).
  220. async_log_messages_on_error (bool, optional): set to False and the
  221. async producer will only log hash() contents on failed produce
  222. requests, defaults to True (log full messages). Hash logging
  223. will not allow you to identify the specific message that failed,
  224. but it will allow you to match failures with retries.
  225. async_stop_timeout (int or float, optional): seconds to continue
  226. attempting to send queued messages after :meth:`producer.stop`,
  227. defaults to 30.
  228. Deprecated Arguments:
  229. batch_send (bool, optional): If True, messages are sent by a background
  230. thread in batches, defaults to False. Deprecated, use 'async'
  231. """
  232. ACK_NOT_REQUIRED = 0 # No ack is required
  233. ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
  234. ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
  235. DEFAULT_ACK_TIMEOUT = 1000
  236. def __init__(self, client,
  237. req_acks=ACK_AFTER_LOCAL_WRITE,
  238. ack_timeout=DEFAULT_ACK_TIMEOUT,
  239. codec=None,
  240. codec_compresslevel=None,
  241. sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
  242. async=False,
  243. batch_send=False, # deprecated, use async
  244. batch_send_every_n=BATCH_SEND_MSG_COUNT,
  245. batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
  246. async_retry_limit=ASYNC_RETRY_LIMIT,
  247. async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
  248. async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
  249. async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
  250. async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
  251. async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
  252. async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
  253. if async:
  254. assert batch_send_every_n > 0
  255. assert batch_send_every_t > 0
  256. assert async_queue_maxsize >= 0
  257. self.client = client
  258. self.async = async
  259. self.req_acks = req_acks
  260. self.ack_timeout = ack_timeout
  261. self.stopped = False
  262. if codec is None:
  263. codec = CODEC_NONE
  264. elif codec not in ALL_CODECS:
  265. raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
  266. self.codec = codec
  267. self.codec_compresslevel = codec_compresslevel
  268. if self.async:
  269. # Messages are sent through this queue
  270. self.queue = Queue(async_queue_maxsize)
  271. self.async_queue_put_timeout = async_queue_put_timeout
  272. async_retry_options = RetryOptions(
  273. limit=async_retry_limit,
  274. backoff_ms=async_retry_backoff_ms,
  275. retry_on_timeouts=async_retry_on_timeouts)
  276. self.thread_stop_event = Event()
  277. self.thread = Thread(
  278. target=_send_upstream,
  279. args=(self.queue, self.client.copy(), self.codec,
  280. batch_send_every_t, batch_send_every_n,
  281. self.req_acks, self.ack_timeout,
  282. async_retry_options, self.thread_stop_event),
  283. kwargs={'log_messages_on_error': async_log_messages_on_error,
  284. 'stop_timeout': async_stop_timeout,
  285. 'codec_compresslevel': self.codec_compresslevel}
  286. )
  287. # Thread will die if main thread exits
  288. self.thread.daemon = True
  289. self.thread.start()
  290. def cleanup(obj):
  291. if not obj.stopped:
  292. obj.stop()
  293. self._cleanup_func = cleanup
  294. atexit.register(cleanup, self)
  295. else:
  296. self.sync_fail_on_error = sync_fail_on_error
  297. def send_messages(self, topic, partition, *msg):
  298. """Helper method to send produce requests.
  299. Note that msg type *must* be encoded to bytes by user. Passing unicode
  300. message will not work, for example you should encode before calling
  301. send_messages via something like `unicode_message.encode('utf-8')`
  302. All messages will set the message 'key' to None.
  303. Arguments:
  304. topic (str): name of topic for produce request
  305. partition (int): partition number for produce request
  306. *msg (bytes): one or more message payloads
  307. Returns:
  308. ResponseRequest returned by server
  309. Raises:
  310. FailedPayloadsError: low-level connection error, can be caused by
  311. networking failures, or a malformed request.
  312. ConnectionError:
  313. KafkaUnavailableError: all known brokers are down when attempting
  314. to refresh metadata.
  315. LeaderNotAvailableError: topic or partition is initializing or
  316. a broker failed and leadership election is in progress.
  317. NotLeaderForPartitionError: metadata is out of sync; the broker
  318. that the request was sent to is not the leader for the topic
  319. or partition.
  320. UnknownTopicOrPartitionError: the topic or partition has not
  321. been created yet and auto-creation is not available.
  322. AsyncProducerQueueFull: in async mode, if too many messages are
  323. unsent and remain in the internal queue.
  324. """
  325. return self._send_messages(topic, partition, *msg)
  326. def _send_messages(self, topic, partition, *msg, **kwargs):
  327. key = kwargs.pop('key', None)
  328. # Guarantee that msg is actually a list or tuple (should always be true)
  329. if not isinstance(msg, (list, tuple)):
  330. raise TypeError("msg is not a list or tuple!")
  331. for m in msg:
  332. # The protocol allows to have key & payload with null values both,
  333. # (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense.
  334. if m is None:
  335. if key is None:
  336. raise TypeError("key and payload can't be null in one")
  337. # Raise TypeError if any non-null message is not encoded as bytes
  338. elif not isinstance(m, six.binary_type):
  339. raise TypeError("all produce message payloads must be null or type bytes")
  340. # Raise TypeError if the key is not encoded as bytes
  341. if key is not None and not isinstance(key, six.binary_type):
  342. raise TypeError("the key must be type bytes")
  343. if self.async:
  344. for idx, m in enumerate(msg):
  345. try:
  346. item = (TopicPartition(topic, partition), m, key)
  347. if self.async_queue_put_timeout == 0:
  348. self.queue.put_nowait(item)
  349. else:
  350. self.queue.put(item, True, self.async_queue_put_timeout)
  351. except Full:
  352. raise AsyncProducerQueueFull(
  353. msg[idx:],
  354. 'Producer async queue overfilled. '
  355. 'Current queue size %d.' % self.queue.qsize())
  356. resp = []
  357. else:
  358. messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
  359. req = ProduceRequestPayload(topic, partition, messages)
  360. try:
  361. resp = self.client.send_produce_request(
  362. [req], acks=self.req_acks, timeout=self.ack_timeout,
  363. fail_on_error=self.sync_fail_on_error
  364. )
  365. except Exception:
  366. log.exception("Unable to send messages")
  367. raise
  368. return resp
  369. def stop(self, timeout=None):
  370. """
  371. Stop the producer (async mode). Blocks until async thread completes.
  372. """
  373. if timeout is not None:
  374. log.warning('timeout argument to stop() is deprecated - '
  375. 'it will be removed in future release')
  376. if not self.async:
  377. log.warning('producer.stop() called, but producer is not async')
  378. return
  379. if self.stopped:
  380. log.warning('producer.stop() called, but producer is already stopped')
  381. return
  382. if self.async:
  383. self.queue.put((STOP_ASYNC_PRODUCER, None, None))
  384. self.thread_stop_event.set()
  385. self.thread.join()
  386. if hasattr(self, '_cleanup_func'):
  387. # Remove cleanup handler now that we've stopped
  388. # py3 supports unregistering
  389. if hasattr(atexit, 'unregister'):
  390. atexit.unregister(self._cleanup_func) # pylint: disable=no-member
  391. # py2 requires removing from private attribute...
  392. else:
  393. # ValueError on list.remove() if the exithandler no longer exists
  394. # but that is fine here
  395. try:
  396. atexit._exithandlers.remove( # pylint: disable=no-member
  397. (self._cleanup_func, (self,), {}))
  398. except ValueError:
  399. pass
  400. del self._cleanup_func
  401. self.stopped = True
  402. def __del__(self):
  403. if self.async and not self.stopped:
  404. self.stop()