base.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. from __future__ import absolute_import
  2. import atexit
  3. import logging
  4. import numbers
  5. from threading import Lock
  6. import warnings
  7. from kafka.errors import (
  8. UnknownTopicOrPartitionError, check_error, KafkaError)
  9. from kafka.structs import (
  10. OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload)
  11. from kafka.util import ReentrantTimer
  12. log = logging.getLogger('kafka.consumer')
  13. AUTO_COMMIT_MSG_COUNT = 100
  14. AUTO_COMMIT_INTERVAL = 5000
  15. FETCH_DEFAULT_BLOCK_TIMEOUT = 1
  16. FETCH_MAX_WAIT_TIME = 100
  17. FETCH_MIN_BYTES = 4096
  18. FETCH_BUFFER_SIZE_BYTES = 4096
  19. MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
  20. ITER_TIMEOUT_SECONDS = 60
  21. NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
  22. FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
  23. MAX_BACKOFF_SECONDS = 60
  24. class Consumer(object):
  25. """
  26. Base class to be used by other consumers. Not to be used directly
  27. This base class provides logic for
  28. * initialization and fetching metadata of partitions
  29. * Auto-commit logic
  30. * APIs for fetching pending message count
  31. """
  32. def __init__(self, client, group, topic, partitions=None, auto_commit=True,
  33. auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
  34. auto_commit_every_t=AUTO_COMMIT_INTERVAL):
  35. warnings.warn('deprecated -- this class will be removed in a future'
  36. ' release. Use KafkaConsumer instead.',
  37. DeprecationWarning)
  38. self.client = client
  39. self.topic = topic
  40. self.group = group
  41. self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
  42. self.offsets = {}
  43. if partitions is None:
  44. partitions = self.client.get_partition_ids_for_topic(topic)
  45. else:
  46. assert all(isinstance(x, numbers.Integral) for x in partitions)
  47. # Variables for handling offset commits
  48. self.commit_lock = Lock()
  49. self.commit_timer = None
  50. self.count_since_commit = 0
  51. self.auto_commit = auto_commit
  52. self.auto_commit_every_n = auto_commit_every_n
  53. self.auto_commit_every_t = auto_commit_every_t
  54. # Set up the auto-commit timer
  55. if auto_commit is True and auto_commit_every_t is not None:
  56. self.commit_timer = ReentrantTimer(auto_commit_every_t,
  57. self.commit)
  58. self.commit_timer.start()
  59. # Set initial offsets
  60. if self.group is not None:
  61. self.fetch_last_known_offsets(partitions)
  62. else:
  63. for partition in partitions:
  64. self.offsets[partition] = 0
  65. # Register a cleanup handler
  66. def cleanup(obj):
  67. obj.stop()
  68. self._cleanup_func = cleanup
  69. atexit.register(cleanup, self)
  70. self.partition_info = False # Do not return partition info in msgs
  71. def provide_partition_info(self):
  72. """
  73. Indicates that partition info must be returned by the consumer
  74. """
  75. self.partition_info = True
  76. def fetch_last_known_offsets(self, partitions=None):
  77. if self.group is None:
  78. raise ValueError('SimpleClient.group must not be None')
  79. if partitions is None:
  80. partitions = self.client.get_partition_ids_for_topic(self.topic)
  81. responses = self.client.send_offset_fetch_request(
  82. self.group,
  83. [OffsetFetchRequestPayload(self.topic, p) for p in partitions],
  84. fail_on_error=False
  85. )
  86. for resp in responses:
  87. try:
  88. check_error(resp)
  89. # API spec says server won't set an error here
  90. # but 0.8.1.1 does actually...
  91. except UnknownTopicOrPartitionError:
  92. pass
  93. # -1 offset signals no commit is currently stored
  94. if resp.offset == -1:
  95. self.offsets[resp.partition] = 0
  96. # Otherwise we committed the stored offset
  97. # and need to fetch the next one
  98. else:
  99. self.offsets[resp.partition] = resp.offset
  100. def commit(self, partitions=None):
  101. """Commit stored offsets to Kafka via OffsetCommitRequest (v0)
  102. Keyword Arguments:
  103. partitions (list): list of partitions to commit, default is to commit
  104. all of them
  105. Returns: True on success, False on failure
  106. """
  107. # short circuit if nothing happened. This check is kept outside
  108. # to prevent un-necessarily acquiring a lock for checking the state
  109. if self.count_since_commit == 0:
  110. return
  111. with self.commit_lock:
  112. # Do this check again, just in case the state has changed
  113. # during the lock acquiring timeout
  114. if self.count_since_commit == 0:
  115. return
  116. reqs = []
  117. if partitions is None: # commit all partitions
  118. partitions = list(self.offsets.keys())
  119. log.debug('Committing new offsets for %s, partitions %s',
  120. self.topic, partitions)
  121. for partition in partitions:
  122. offset = self.offsets[partition]
  123. log.debug('Commit offset %d in SimpleConsumer: '
  124. 'group=%s, topic=%s, partition=%s',
  125. offset, self.group, self.topic, partition)
  126. reqs.append(OffsetCommitRequestPayload(self.topic, partition,
  127. offset, None))
  128. try:
  129. self.client.send_offset_commit_request(self.group, reqs)
  130. except KafkaError as e:
  131. log.error('%s saving offsets: %s', e.__class__.__name__, e)
  132. return False
  133. else:
  134. self.count_since_commit = 0
  135. return True
  136. def _auto_commit(self):
  137. """
  138. Check if we have to commit based on number of messages and commit
  139. """
  140. # Check if we are supposed to do an auto-commit
  141. if not self.auto_commit or self.auto_commit_every_n is None:
  142. return
  143. if self.count_since_commit >= self.auto_commit_every_n:
  144. self.commit()
  145. def stop(self):
  146. if self.commit_timer is not None:
  147. self.commit_timer.stop()
  148. self.commit()
  149. if hasattr(self, '_cleanup_func'):
  150. # Remove cleanup handler now that we've stopped
  151. # py3 supports unregistering
  152. if hasattr(atexit, 'unregister'):
  153. atexit.unregister(self._cleanup_func) # pylint: disable=no-member
  154. # py2 requires removing from private attribute...
  155. else:
  156. # ValueError on list.remove() if the exithandler no longer
  157. # exists is fine here
  158. try:
  159. atexit._exithandlers.remove( # pylint: disable=no-member
  160. (self._cleanup_func, (self,), {}))
  161. except ValueError:
  162. pass
  163. del self._cleanup_func
  164. def pending(self, partitions=None):
  165. """
  166. Gets the pending message count
  167. Keyword Arguments:
  168. partitions (list): list of partitions to check for, default is to check all
  169. """
  170. if partitions is None:
  171. partitions = self.offsets.keys()
  172. total = 0
  173. reqs = []
  174. for partition in partitions:
  175. reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
  176. resps = self.client.send_offset_request(reqs)
  177. for resp in resps:
  178. partition = resp.partition
  179. pending = resp.offsets[0]
  180. offset = self.offsets[partition]
  181. total += pending - offset
  182. return total