multiprocess.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. from __future__ import absolute_import
  2. from collections import namedtuple
  3. import logging
  4. from multiprocessing import Process, Manager as MPManager
  5. import time
  6. import warnings
  7. from kafka.vendor.six.moves import queue # pylint: disable=import-error
  8. from ..common import KafkaError
  9. from .base import (
  10. Consumer,
  11. AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
  12. NO_MESSAGES_WAIT_TIME_SECONDS,
  13. FULL_QUEUE_WAIT_TIME_SECONDS,
  14. MAX_BACKOFF_SECONDS,
  15. )
  16. from .simple import SimpleConsumer
  17. log = logging.getLogger(__name__)
  18. Events = namedtuple("Events", ["start", "pause", "exit"])
  19. def _mp_consume(client, group, topic, message_queue, size, events, **consumer_options):
  20. """
  21. A child process worker which consumes messages based on the
  22. notifications given by the controller process
  23. NOTE: Ideally, this should have been a method inside the Consumer
  24. class. However, multiprocessing module has issues in windows. The
  25. functionality breaks unless this function is kept outside of a class
  26. """
  27. # Initial interval for retries in seconds.
  28. interval = 1
  29. while not events.exit.is_set():
  30. try:
  31. # Make the child processes open separate socket connections
  32. client.reinit()
  33. # We will start consumers without auto-commit. Auto-commit will be
  34. # done by the master controller process.
  35. consumer = SimpleConsumer(client, group, topic,
  36. auto_commit=False,
  37. auto_commit_every_n=None,
  38. auto_commit_every_t=None,
  39. **consumer_options)
  40. # Ensure that the consumer provides the partition information
  41. consumer.provide_partition_info()
  42. while True:
  43. # Wait till the controller indicates us to start consumption
  44. events.start.wait()
  45. # If we are asked to quit, do so
  46. if events.exit.is_set():
  47. break
  48. # Consume messages and add them to the queue. If the controller
  49. # indicates a specific number of messages, follow that advice
  50. count = 0
  51. message = consumer.get_message()
  52. if message:
  53. while True:
  54. try:
  55. message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
  56. break
  57. except queue.Full:
  58. if events.exit.is_set(): break
  59. count += 1
  60. # We have reached the required size. The controller might have
  61. # more than what he needs. Wait for a while.
  62. # Without this logic, it is possible that we run into a big
  63. # loop consuming all available messages before the controller
  64. # can reset the 'start' event
  65. if count == size.value:
  66. events.pause.wait()
  67. else:
  68. # In case we did not receive any message, give up the CPU for
  69. # a while before we try again
  70. time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
  71. consumer.stop()
  72. except KafkaError as e:
  73. # Retry with exponential backoff
  74. log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
  75. time.sleep(interval)
  76. interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
  77. class MultiProcessConsumer(Consumer):
  78. """
  79. A consumer implementation that consumes partitions for a topic in
  80. parallel using multiple processes
  81. Arguments:
  82. client: a connected SimpleClient
  83. group: a name for this consumer, used for offset storage and must be unique
  84. If you are connecting to a server that does not support offset
  85. commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
  86. topic: the topic to consume
  87. Keyword Arguments:
  88. partitions: An optional list of partitions to consume the data from
  89. auto_commit: default True. Whether or not to auto commit the offsets
  90. auto_commit_every_n: default 100. How many messages to consume
  91. before a commit
  92. auto_commit_every_t: default 5000. How much time (in milliseconds) to
  93. wait before commit
  94. num_procs: Number of processes to start for consuming messages.
  95. The available partitions will be divided among these processes
  96. partitions_per_proc: Number of partitions to be allocated per process
  97. (overrides num_procs)
  98. Auto commit details:
  99. If both auto_commit_every_n and auto_commit_every_t are set, they will
  100. reset one another when one is triggered. These triggers simply call the
  101. commit method on this class. A manual call to commit will also reset
  102. these triggers
  103. """
  104. def __init__(self, client, group, topic,
  105. partitions=None,
  106. auto_commit=True,
  107. auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
  108. auto_commit_every_t=AUTO_COMMIT_INTERVAL,
  109. num_procs=1,
  110. partitions_per_proc=0,
  111. **simple_consumer_options):
  112. warnings.warn('This class has been deprecated and will be removed in a'
  113. ' future release. Use KafkaConsumer instead',
  114. DeprecationWarning)
  115. # Initiate the base consumer class
  116. super(MultiProcessConsumer, self).__init__(
  117. client, group, topic,
  118. partitions=partitions,
  119. auto_commit=auto_commit,
  120. auto_commit_every_n=auto_commit_every_n,
  121. auto_commit_every_t=auto_commit_every_t)
  122. # Variables for managing and controlling the data flow from
  123. # consumer child process to master
  124. manager = MPManager()
  125. self.queue = manager.Queue(1024) # Child consumers dump messages into this
  126. self.events = Events(
  127. start = manager.Event(), # Indicates the consumers to start fetch
  128. exit = manager.Event(), # Requests the consumers to shutdown
  129. pause = manager.Event()) # Requests the consumers to pause fetch
  130. self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
  131. # dict.keys() returns a view in py3 + it's not a thread-safe operation
  132. # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
  133. # It's safer to copy dict as it only runs during the init.
  134. partitions = list(self.offsets.copy().keys())
  135. # By default, start one consumer process for all partitions
  136. # The logic below ensures that
  137. # * we do not cross the num_procs limit
  138. # * we have an even distribution of partitions among processes
  139. if partitions_per_proc:
  140. num_procs = len(partitions) / partitions_per_proc
  141. if num_procs * partitions_per_proc < len(partitions):
  142. num_procs += 1
  143. # The final set of chunks
  144. chunks = [partitions[proc::num_procs] for proc in range(num_procs)]
  145. self.procs = []
  146. for chunk in chunks:
  147. options = {'partitions': list(chunk)}
  148. if simple_consumer_options:
  149. simple_consumer_options.pop('partitions', None)
  150. options.update(simple_consumer_options)
  151. args = (client.copy(), self.group, self.topic, self.queue,
  152. self.size, self.events)
  153. proc = Process(target=_mp_consume, args=args, kwargs=options)
  154. proc.daemon = True
  155. proc.start()
  156. self.procs.append(proc)
  157. def __repr__(self):
  158. return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
  159. (self.group, self.topic, len(self.procs))
  160. def stop(self):
  161. # Set exit and start off all waiting consumers
  162. self.events.exit.set()
  163. self.events.pause.set()
  164. self.events.start.set()
  165. for proc in self.procs:
  166. proc.join()
  167. proc.terminate()
  168. super(MultiProcessConsumer, self).stop()
  169. def __iter__(self):
  170. """
  171. Iterator to consume the messages available on this consumer
  172. """
  173. # Trigger the consumer procs to start off.
  174. # We will iterate till there are no more messages available
  175. self.size.value = 0
  176. self.events.pause.set()
  177. while True:
  178. self.events.start.set()
  179. try:
  180. # We will block for a small while so that the consumers get
  181. # a chance to run and put some messages in the queue
  182. # TODO: This is a hack and will make the consumer block for
  183. # at least one second. Need to find a better way of doing this
  184. partition, message = self.queue.get(block=True, timeout=1)
  185. except queue.Empty:
  186. break
  187. # Count, check and commit messages if necessary
  188. self.offsets[partition] = message.offset + 1
  189. self.events.start.clear()
  190. self.count_since_commit += 1
  191. self._auto_commit()
  192. yield message
  193. self.events.start.clear()
  194. def get_messages(self, count=1, block=True, timeout=10):
  195. """
  196. Fetch the specified number of messages
  197. Keyword Arguments:
  198. count: Indicates the maximum number of messages to be fetched
  199. block: If True, the API will block till all messages are fetched.
  200. If block is a positive integer the API will block until that
  201. many messages are fetched.
  202. timeout: When blocking is requested the function will block for
  203. the specified time (in seconds) until count messages is
  204. fetched. If None, it will block forever.
  205. """
  206. messages = []
  207. # Give a size hint to the consumers. Each consumer process will fetch
  208. # a maximum of "count" messages. This will fetch more messages than
  209. # necessary, but these will not be committed to kafka. Also, the extra
  210. # messages can be provided in subsequent runs
  211. self.size.value = count
  212. self.events.pause.clear()
  213. if timeout is not None:
  214. max_time = time.time() + timeout
  215. new_offsets = {}
  216. while count > 0 and (timeout is None or timeout > 0):
  217. # Trigger consumption only if the queue is empty
  218. # By doing this, we will ensure that consumers do not
  219. # go into overdrive and keep consuming thousands of
  220. # messages when the user might need only a few
  221. if self.queue.empty():
  222. self.events.start.set()
  223. block_next_call = block is True or block > len(messages)
  224. try:
  225. partition, message = self.queue.get(block_next_call,
  226. timeout)
  227. except queue.Empty:
  228. break
  229. _msg = (partition, message) if self.partition_info else message
  230. messages.append(_msg)
  231. new_offsets[partition] = message.offset + 1
  232. count -= 1
  233. if timeout is not None:
  234. timeout = max_time - time.time()
  235. self.size.value = 0
  236. self.events.start.clear()
  237. self.events.pause.set()
  238. # Update and commit offsets if necessary
  239. self.offsets.update(new_offsets)
  240. self.count_since_commit += len(messages)
  241. self._auto_commit()
  242. return messages