consumer.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739
  1. from __future__ import absolute_import
  2. import copy
  3. import collections
  4. import logging
  5. import time
  6. import weakref
  7. from kafka.vendor import six
  8. from .base import BaseCoordinator
  9. from .assignors.range import RangePartitionAssignor
  10. from .assignors.roundrobin import RoundRobinPartitionAssignor
  11. from .protocol import ConsumerProtocol
  12. from .. import errors as Errors
  13. from ..future import Future
  14. from ..metrics import AnonMeasurable
  15. from ..metrics.stats import Avg, Count, Max, Rate
  16. from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest
  17. from ..structs import OffsetAndMetadata, TopicPartition
  18. from ..util import WeakMethod
  19. log = logging.getLogger(__name__)
  20. class ConsumerCoordinator(BaseCoordinator):
  21. """This class manages the coordination process with the consumer coordinator."""
  22. DEFAULT_CONFIG = {
  23. 'group_id': 'kafka-python-default-group',
  24. 'enable_auto_commit': True,
  25. 'auto_commit_interval_ms': 5000,
  26. 'default_offset_commit_callback': lambda offsets, response: True,
  27. 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
  28. 'session_timeout_ms': 30000,
  29. 'heartbeat_interval_ms': 3000,
  30. 'retry_backoff_ms': 100,
  31. 'api_version': (0, 9),
  32. 'exclude_internal_topics': True,
  33. 'metric_group_prefix': 'consumer'
  34. }
  35. def __init__(self, client, subscription, metrics, **configs):
  36. """Initialize the coordination manager.
  37. Keyword Arguments:
  38. group_id (str): name of the consumer group to join for dynamic
  39. partition assignment (if enabled), and to use for fetching and
  40. committing offsets. Default: 'kafka-python-default-group'
  41. enable_auto_commit (bool): If true the consumer's offset will be
  42. periodically committed in the background. Default: True.
  43. auto_commit_interval_ms (int): milliseconds between automatic
  44. offset commits, if enable_auto_commit is True. Default: 5000.
  45. default_offset_commit_callback (callable): called as
  46. callback(offsets, response) response will be either an Exception
  47. or a OffsetCommitResponse struct. This callback can be used to
  48. trigger custom actions when a commit request completes.
  49. assignors (list): List of objects to use to distribute partition
  50. ownership amongst consumer instances when group management is
  51. used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
  52. heartbeat_interval_ms (int): The expected time in milliseconds
  53. between heartbeats to the consumer coordinator when using
  54. Kafka's group management feature. Heartbeats are used to ensure
  55. that the consumer's session stays active and to facilitate
  56. rebalancing when new consumers join or leave the group. The
  57. value must be set lower than session_timeout_ms, but typically
  58. should be set no higher than 1/3 of that value. It can be
  59. adjusted even lower to control the expected time for normal
  60. rebalances. Default: 3000
  61. session_timeout_ms (int): The timeout used to detect failures when
  62. using Kafka's group managementment facilities. Default: 30000
  63. retry_backoff_ms (int): Milliseconds to backoff when retrying on
  64. errors. Default: 100.
  65. exclude_internal_topics (bool): Whether records from internal topics
  66. (such as offsets) should be exposed to the consumer. If set to
  67. True the only way to receive records from an internal topic is
  68. subscribing to it. Requires 0.10+. Default: True
  69. """
  70. super(ConsumerCoordinator, self).__init__(client, metrics, **configs)
  71. self.config = copy.copy(self.DEFAULT_CONFIG)
  72. for key in self.config:
  73. if key in configs:
  74. self.config[key] = configs[key]
  75. if self.config['api_version'] >= (0, 9) and self.config['group_id'] is not None:
  76. assert self.config['assignors'], 'Coordinator requires assignors'
  77. self._subscription = subscription
  78. self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
  79. self._assignment_snapshot = None
  80. self._cluster = client.cluster
  81. self._cluster.request_update()
  82. self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
  83. self._auto_commit_task = None
  84. if self.config['enable_auto_commit']:
  85. if self.config['api_version'] < (0, 8, 1):
  86. log.warning('Broker version (%s) does not support offset'
  87. ' commits; disabling auto-commit.',
  88. self.config['api_version'])
  89. self.config['enable_auto_commit'] = False
  90. elif self.config['group_id'] is None:
  91. log.warning('group_id is None: disabling auto-commit.')
  92. self.config['enable_auto_commit'] = False
  93. else:
  94. interval = self.config['auto_commit_interval_ms'] / 1000.0
  95. self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
  96. self._auto_commit_task.reschedule()
  97. self.consumer_sensors = ConsumerCoordinatorMetrics(
  98. metrics, self.config['metric_group_prefix'], self._subscription)
  99. def __del__(self):
  100. if hasattr(self, '_cluster') and self._cluster:
  101. self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
  102. def protocol_type(self):
  103. return ConsumerProtocol.PROTOCOL_TYPE
  104. def group_protocols(self):
  105. """Returns list of preferred (protocols, metadata)"""
  106. topics = self._subscription.subscription
  107. assert topics is not None, 'Consumer has not subscribed to topics'
  108. metadata_list = []
  109. for assignor in self.config['assignors']:
  110. metadata = assignor.metadata(topics)
  111. group_protocol = (assignor.name, metadata)
  112. metadata_list.append(group_protocol)
  113. return metadata_list
  114. def _handle_metadata_update(self, cluster):
  115. # if we encounter any unauthorized topics, raise an exception
  116. if cluster.unauthorized_topics:
  117. raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics)
  118. if self._subscription.subscribed_pattern:
  119. topics = []
  120. for topic in cluster.topics(self.config['exclude_internal_topics']):
  121. if self._subscription.subscribed_pattern.match(topic):
  122. topics.append(topic)
  123. if set(topics) != self._subscription.subscription:
  124. self._subscription.change_subscription(topics)
  125. self._client.set_topics(self._subscription.group_subscription())
  126. # check if there are any changes to the metadata which should trigger
  127. # a rebalance
  128. if self._subscription_metadata_changed(cluster):
  129. if (self.config['api_version'] >= (0, 9)
  130. and self.config['group_id'] is not None):
  131. self._subscription.mark_for_reassignment()
  132. # If we haven't got group coordinator support,
  133. # just assign all partitions locally
  134. else:
  135. self._subscription.assign_from_subscribed([
  136. TopicPartition(topic, partition)
  137. for topic in self._subscription.subscription
  138. for partition in self._metadata_snapshot[topic]
  139. ])
  140. def _build_metadata_snapshot(self, subscription, cluster):
  141. metadata_snapshot = {}
  142. for topic in subscription.group_subscription():
  143. partitions = cluster.partitions_for_topic(topic) or []
  144. metadata_snapshot[topic] = set(partitions)
  145. return metadata_snapshot
  146. def _subscription_metadata_changed(self, cluster):
  147. if not self._subscription.partitions_auto_assigned():
  148. return False
  149. metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
  150. if self._metadata_snapshot != metadata_snapshot:
  151. self._metadata_snapshot = metadata_snapshot
  152. return True
  153. return False
  154. def _lookup_assignor(self, name):
  155. for assignor in self.config['assignors']:
  156. if assignor.name == name:
  157. return assignor
  158. return None
  159. def _on_join_complete(self, generation, member_id, protocol,
  160. member_assignment_bytes):
  161. # if we were the assignor, then we need to make sure that there have
  162. # been no metadata updates since the rebalance begin. Otherwise, we
  163. # won't rebalance again until the next metadata change
  164. if self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot:
  165. self._subscription.mark_for_reassignment()
  166. return
  167. assignor = self._lookup_assignor(protocol)
  168. assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
  169. assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
  170. # set the flag to refresh last committed offsets
  171. self._subscription.needs_fetch_committed_offsets = True
  172. # update partition assignment
  173. self._subscription.assign_from_subscribed(assignment.partitions())
  174. # give the assignor a chance to update internal state
  175. # based on the received assignment
  176. assignor.on_assignment(assignment)
  177. # reschedule the auto commit starting from now
  178. if self._auto_commit_task:
  179. self._auto_commit_task.reschedule()
  180. assigned = set(self._subscription.assigned_partitions())
  181. log.info("Setting newly assigned partitions %s for group %s",
  182. assigned, self.group_id)
  183. # execute the user's callback after rebalance
  184. if self._subscription.listener:
  185. try:
  186. self._subscription.listener.on_partitions_assigned(assigned)
  187. except Exception:
  188. log.exception("User provided listener %s for group %s"
  189. " failed on partition assignment: %s",
  190. self._subscription.listener, self.group_id,
  191. assigned)
  192. def _perform_assignment(self, leader_id, assignment_strategy, members):
  193. assignor = self._lookup_assignor(assignment_strategy)
  194. assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
  195. member_metadata = {}
  196. all_subscribed_topics = set()
  197. for member_id, metadata_bytes in members:
  198. metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
  199. member_metadata[member_id] = metadata
  200. all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member
  201. # the leader will begin watching for changes to any of the topics
  202. # the group is interested in, which ensures that all metadata changes
  203. # will eventually be seen
  204. # Because assignment typically happens within response callbacks,
  205. # we cannot block on metadata updates here (no recursion into poll())
  206. self._subscription.group_subscribe(all_subscribed_topics)
  207. self._client.set_topics(self._subscription.group_subscription())
  208. # keep track of the metadata used for assignment so that we can check
  209. # after rebalance completion whether anything has changed
  210. self._cluster.request_update()
  211. self._assignment_snapshot = self._metadata_snapshot
  212. log.debug("Performing assignment for group %s using strategy %s"
  213. " with subscriptions %s", self.group_id, assignor.name,
  214. member_metadata)
  215. assignments = assignor.assign(self._cluster, member_metadata)
  216. log.debug("Finished assignment for group %s: %s", self.group_id, assignments)
  217. group_assignment = {}
  218. for member_id, assignment in six.iteritems(assignments):
  219. group_assignment[member_id] = assignment
  220. return group_assignment
  221. def _on_join_prepare(self, generation, member_id):
  222. # commit offsets prior to rebalance if auto-commit enabled
  223. self._maybe_auto_commit_offsets_sync()
  224. # execute the user's callback before rebalance
  225. log.info("Revoking previously assigned partitions %s for group %s",
  226. self._subscription.assigned_partitions(), self.group_id)
  227. if self._subscription.listener:
  228. try:
  229. revoked = set(self._subscription.assigned_partitions())
  230. self._subscription.listener.on_partitions_revoked(revoked)
  231. except Exception:
  232. log.exception("User provided subscription listener %s"
  233. " for group %s failed on_partitions_revoked",
  234. self._subscription.listener, self.group_id)
  235. self._assignment_snapshot = None
  236. self._subscription.mark_for_reassignment()
  237. def need_rejoin(self):
  238. """Check whether the group should be rejoined
  239. Returns:
  240. bool: True if consumer should rejoin group, False otherwise
  241. """
  242. return (self._subscription.partitions_auto_assigned() and
  243. (super(ConsumerCoordinator, self).need_rejoin() or
  244. self._subscription.needs_partition_assignment))
  245. def refresh_committed_offsets_if_needed(self):
  246. """Fetch committed offsets for assigned partitions."""
  247. if self._subscription.needs_fetch_committed_offsets:
  248. offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions())
  249. for partition, offset in six.iteritems(offsets):
  250. # verify assignment is still active
  251. if self._subscription.is_assigned(partition):
  252. self._subscription.assignment[partition].committed = offset.offset
  253. self._subscription.needs_fetch_committed_offsets = False
  254. def fetch_committed_offsets(self, partitions):
  255. """Fetch the current committed offsets for specified partitions
  256. Arguments:
  257. partitions (list of TopicPartition): partitions to fetch
  258. Returns:
  259. dict: {TopicPartition: OffsetAndMetadata}
  260. """
  261. if not partitions:
  262. return {}
  263. while True:
  264. self.ensure_coordinator_known()
  265. # contact coordinator to fetch committed offsets
  266. future = self._send_offset_fetch_request(partitions)
  267. self._client.poll(future=future)
  268. if future.succeeded():
  269. return future.value
  270. if not future.retriable():
  271. raise future.exception # pylint: disable-msg=raising-bad-type
  272. time.sleep(self.config['retry_backoff_ms'] / 1000.0)
  273. def close(self, autocommit=True):
  274. """Close the coordinator, leave the current group,
  275. and reset local generation / member_id.
  276. Keyword Arguments:
  277. autocommit (bool): If auto-commit is configured for this consumer,
  278. this optional flag causes the consumer to attempt to commit any
  279. pending consumed offsets prior to close. Default: True
  280. """
  281. try:
  282. if autocommit:
  283. self._maybe_auto_commit_offsets_sync()
  284. finally:
  285. super(ConsumerCoordinator, self).close()
  286. def commit_offsets_async(self, offsets, callback=None):
  287. """Commit specific offsets asynchronously.
  288. Arguments:
  289. offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit
  290. callback (callable, optional): called as callback(offsets, response)
  291. response will be either an Exception or a OffsetCommitResponse
  292. struct. This callback can be used to trigger custom actions when
  293. a commit request completes.
  294. Returns:
  295. Future: indicating whether the commit was successful or not
  296. """
  297. assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
  298. assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
  299. assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
  300. offsets.values()))
  301. if callback is None:
  302. callback = self.config['default_offset_commit_callback']
  303. self._subscription.needs_fetch_committed_offsets = True
  304. future = self._send_offset_commit_request(offsets)
  305. future.add_both(callback, offsets)
  306. return future
  307. def commit_offsets_sync(self, offsets):
  308. """Commit specific offsets synchronously.
  309. This method will retry until the commit completes successfully or an
  310. unrecoverable error is encountered.
  311. Arguments:
  312. offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit
  313. Raises error on failure
  314. """
  315. assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
  316. assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
  317. assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
  318. offsets.values()))
  319. if not offsets:
  320. return
  321. while True:
  322. self.ensure_coordinator_known()
  323. future = self._send_offset_commit_request(offsets)
  324. self._client.poll(future=future)
  325. if future.succeeded():
  326. return future.value
  327. if not future.retriable():
  328. raise future.exception # pylint: disable-msg=raising-bad-type
  329. time.sleep(self.config['retry_backoff_ms'] / 1000.0)
  330. def _maybe_auto_commit_offsets_sync(self):
  331. if self._auto_commit_task is None:
  332. return
  333. try:
  334. self.commit_offsets_sync(self._subscription.all_consumed_offsets())
  335. # The three main group membership errors are known and should not
  336. # require a stacktrace -- just a warning
  337. except (Errors.UnknownMemberIdError,
  338. Errors.IllegalGenerationError,
  339. Errors.RebalanceInProgressError):
  340. log.warning("Offset commit failed: group membership out of date"
  341. " This is likely to cause duplicate message"
  342. " delivery.")
  343. except Exception:
  344. log.exception("Offset commit failed: This is likely to cause"
  345. " duplicate message delivery")
  346. def _send_offset_commit_request(self, offsets):
  347. """Commit offsets for the specified list of topics and partitions.
  348. This is a non-blocking call which returns a request future that can be
  349. polled in the case of a synchronous commit or ignored in the
  350. asynchronous case.
  351. Arguments:
  352. offsets (dict of {TopicPartition: OffsetAndMetadata}): what should
  353. be committed
  354. Returns:
  355. Future: indicating whether the commit was successful or not
  356. """
  357. assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
  358. assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
  359. assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
  360. offsets.values()))
  361. if not offsets:
  362. log.debug('No offsets to commit')
  363. return Future().success(True)
  364. elif self.coordinator_unknown():
  365. return Future().failure(Errors.GroupCoordinatorNotAvailableError)
  366. node_id = self.coordinator_id
  367. # create the offset commit request
  368. offset_data = collections.defaultdict(dict)
  369. for tp, offset in six.iteritems(offsets):
  370. offset_data[tp.topic][tp.partition] = offset
  371. if self.config['api_version'] >= (0, 9):
  372. request = OffsetCommitRequest[2](
  373. self.group_id,
  374. self.generation,
  375. self.member_id,
  376. OffsetCommitRequest[2].DEFAULT_RETENTION_TIME,
  377. [(
  378. topic, [(
  379. partition,
  380. offset.offset,
  381. offset.metadata
  382. ) for partition, offset in six.iteritems(partitions)]
  383. ) for topic, partitions in six.iteritems(offset_data)]
  384. )
  385. elif self.config['api_version'] >= (0, 8, 2):
  386. request = OffsetCommitRequest[1](
  387. self.group_id, -1, '',
  388. [(
  389. topic, [(
  390. partition,
  391. offset.offset,
  392. -1,
  393. offset.metadata
  394. ) for partition, offset in six.iteritems(partitions)]
  395. ) for topic, partitions in six.iteritems(offset_data)]
  396. )
  397. elif self.config['api_version'] >= (0, 8, 1):
  398. request = OffsetCommitRequest[0](
  399. self.group_id,
  400. [(
  401. topic, [(
  402. partition,
  403. offset.offset,
  404. offset.metadata
  405. ) for partition, offset in six.iteritems(partitions)]
  406. ) for topic, partitions in six.iteritems(offset_data)]
  407. )
  408. log.debug("Sending offset-commit request with %s for group %s to %s",
  409. offsets, self.group_id, node_id)
  410. future = Future()
  411. _f = self._client.send(node_id, request)
  412. _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())
  413. _f.add_errback(self._failed_request, node_id, request, future)
  414. return future
  415. def _handle_offset_commit_response(self, offsets, future, send_time, response):
  416. # TODO look at adding request_latency_ms to response (like java kafka)
  417. self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
  418. unauthorized_topics = set()
  419. for topic, partitions in response.topics:
  420. for partition, error_code in partitions:
  421. tp = TopicPartition(topic, partition)
  422. offset = offsets[tp]
  423. error_type = Errors.for_code(error_code)
  424. if error_type is Errors.NoError:
  425. log.debug("Group %s committed offset %s for partition %s",
  426. self.group_id, offset, tp)
  427. if self._subscription.is_assigned(tp):
  428. self._subscription.assignment[tp].committed = offset.offset
  429. elif error_type is Errors.GroupAuthorizationFailedError:
  430. log.error("Not authorized to commit offsets for group %s",
  431. self.group_id)
  432. future.failure(error_type(self.group_id))
  433. return
  434. elif error_type is Errors.TopicAuthorizationFailedError:
  435. unauthorized_topics.add(topic)
  436. elif error_type in (Errors.OffsetMetadataTooLargeError,
  437. Errors.InvalidCommitOffsetSizeError):
  438. # raise the error to the user
  439. log.debug("OffsetCommit for group %s failed on partition %s"
  440. " %s", self.group_id, tp, error_type.__name__)
  441. future.failure(error_type())
  442. return
  443. elif error_type is Errors.GroupLoadInProgressError:
  444. # just retry
  445. log.debug("OffsetCommit for group %s failed: %s",
  446. self.group_id, error_type.__name__)
  447. future.failure(error_type(self.group_id))
  448. return
  449. elif error_type in (Errors.GroupCoordinatorNotAvailableError,
  450. Errors.NotCoordinatorForGroupError,
  451. Errors.RequestTimedOutError):
  452. log.debug("OffsetCommit for group %s failed: %s",
  453. self.group_id, error_type.__name__)
  454. self.coordinator_dead(error_type())
  455. future.failure(error_type(self.group_id))
  456. return
  457. elif error_type in (Errors.UnknownMemberIdError,
  458. Errors.IllegalGenerationError,
  459. Errors.RebalanceInProgressError):
  460. # need to re-join group
  461. error = error_type(self.group_id)
  462. log.debug("OffsetCommit for group %s failed: %s",
  463. self.group_id, error)
  464. self._subscription.mark_for_reassignment()
  465. future.failure(Errors.CommitFailedError(
  466. "Commit cannot be completed since the group has"
  467. " already rebalanced and assigned the partitions to"
  468. " another member. This means that the time between"
  469. " subsequent calls to poll() was longer than the"
  470. " configured session.timeout.ms, which typically"
  471. " implies that the poll loop is spending too much time"
  472. " message processing. You can address this either by"
  473. " increasing the session timeout or by reducing the"
  474. " maximum size of batches returned in poll() with"
  475. " max.poll.records."))
  476. return
  477. else:
  478. log.error("Group %s failed to commit partition %s at offset"
  479. " %s: %s", self.group_id, tp, offset,
  480. error_type.__name__)
  481. future.failure(error_type())
  482. return
  483. if unauthorized_topics:
  484. log.error("Not authorized to commit to topics %s for group %s",
  485. unauthorized_topics, self.group_id)
  486. future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
  487. else:
  488. future.success(True)
  489. def _send_offset_fetch_request(self, partitions):
  490. """Fetch the committed offsets for a set of partitions.
  491. This is a non-blocking call. The returned future can be polled to get
  492. the actual offsets returned from the broker.
  493. Arguments:
  494. partitions (list of TopicPartition): the partitions to fetch
  495. Returns:
  496. Future: resolves to dict of offsets: {TopicPartition: int}
  497. """
  498. assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
  499. assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
  500. if not partitions:
  501. return Future().success({})
  502. elif self.coordinator_unknown():
  503. return Future().failure(Errors.GroupCoordinatorNotAvailableError)
  504. node_id = self.coordinator_id
  505. # Verify node is ready
  506. if not self._client.ready(node_id):
  507. log.debug("Node %s not ready -- failing offset fetch request",
  508. node_id)
  509. return Future().failure(Errors.NodeNotReadyError)
  510. log.debug("Group %s fetching committed offsets for partitions: %s",
  511. self.group_id, partitions)
  512. # construct the request
  513. topic_partitions = collections.defaultdict(set)
  514. for tp in partitions:
  515. topic_partitions[tp.topic].add(tp.partition)
  516. if self.config['api_version'] >= (0, 8, 2):
  517. request = OffsetFetchRequest[1](
  518. self.group_id,
  519. list(topic_partitions.items())
  520. )
  521. else:
  522. request = OffsetFetchRequest[0](
  523. self.group_id,
  524. list(topic_partitions.items())
  525. )
  526. # send the request with a callback
  527. future = Future()
  528. _f = self._client.send(node_id, request)
  529. _f.add_callback(self._handle_offset_fetch_response, future)
  530. _f.add_errback(self._failed_request, node_id, request, future)
  531. return future
  532. def _handle_offset_fetch_response(self, future, response):
  533. offsets = {}
  534. for topic, partitions in response.topics:
  535. for partition, offset, metadata, error_code in partitions:
  536. tp = TopicPartition(topic, partition)
  537. error_type = Errors.for_code(error_code)
  538. if error_type is not Errors.NoError:
  539. error = error_type()
  540. log.debug("Group %s failed to fetch offset for partition"
  541. " %s: %s", self.group_id, tp, error)
  542. if error_type is Errors.GroupLoadInProgressError:
  543. # just retry
  544. future.failure(error)
  545. elif error_type is Errors.NotCoordinatorForGroupError:
  546. # re-discover the coordinator and retry
  547. self.coordinator_dead(error_type())
  548. future.failure(error)
  549. elif error_type in (Errors.UnknownMemberIdError,
  550. Errors.IllegalGenerationError):
  551. # need to re-join group
  552. self._subscription.mark_for_reassignment()
  553. future.failure(error)
  554. elif error_type is Errors.UnknownTopicOrPartitionError:
  555. log.warning("OffsetFetchRequest -- unknown topic %s"
  556. " (have you committed any offsets yet?)",
  557. topic)
  558. continue
  559. else:
  560. log.error("Unknown error fetching offsets for %s: %s",
  561. tp, error)
  562. future.failure(error)
  563. return
  564. elif offset >= 0:
  565. # record the position with the offset
  566. # (-1 indicates no committed offset to fetch)
  567. offsets[tp] = OffsetAndMetadata(offset, metadata)
  568. else:
  569. log.debug("Group %s has no committed offset for partition"
  570. " %s", self.group_id, tp)
  571. future.success(offsets)
  572. class AutoCommitTask(object):
  573. def __init__(self, coordinator, interval):
  574. self._coordinator = coordinator
  575. self._client = coordinator._client
  576. self._interval = interval
  577. def reschedule(self, at=None):
  578. if at is None:
  579. at = time.time() + self._interval
  580. self._client.schedule(self, at)
  581. def __call__(self):
  582. if self._coordinator.coordinator_unknown():
  583. log.debug("Cannot auto-commit offsets for group %s because the"
  584. " coordinator is unknown", self._coordinator.group_id)
  585. backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
  586. self.reschedule(time.time() + backoff)
  587. return
  588. self._coordinator.commit_offsets_async(
  589. self._coordinator._subscription.all_consumed_offsets(),
  590. self._handle_commit_response)
  591. def _handle_commit_response(self, offsets, result):
  592. if result is True:
  593. log.debug("Successfully auto-committed offsets for group %s",
  594. self._coordinator.group_id)
  595. next_at = time.time() + self._interval
  596. elif not isinstance(result, BaseException):
  597. raise Errors.IllegalStateError(
  598. 'Unrecognized result in _handle_commit_response: %s'
  599. % result)
  600. elif hasattr(result, 'retriable') and result.retriable:
  601. log.debug("Failed to auto-commit offsets for group %s: %s,"
  602. " will retry immediately", self._coordinator.group_id,
  603. result)
  604. next_at = time.time()
  605. else:
  606. log.warning("Auto offset commit failed for group %s: %s",
  607. self._coordinator.group_id, result)
  608. next_at = time.time() + self._interval
  609. self.reschedule(next_at)
  610. class ConsumerCoordinatorMetrics(object):
  611. def __init__(self, metrics, metric_group_prefix, subscription):
  612. self.metrics = metrics
  613. self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
  614. self.commit_latency = metrics.sensor('commit-latency')
  615. self.commit_latency.add(metrics.metric_name(
  616. 'commit-latency-avg', self.metric_group_name,
  617. 'The average time taken for a commit request'), Avg())
  618. self.commit_latency.add(metrics.metric_name(
  619. 'commit-latency-max', self.metric_group_name,
  620. 'The max time taken for a commit request'), Max())
  621. self.commit_latency.add(metrics.metric_name(
  622. 'commit-rate', self.metric_group_name,
  623. 'The number of commit calls per second'), Rate(sampled_stat=Count()))
  624. num_parts = AnonMeasurable(lambda config, now:
  625. len(subscription.assigned_partitions()))
  626. metrics.add_metric(metrics.metric_name(
  627. 'assigned-partitions', self.metric_group_name,
  628. 'The number of partitions currently assigned to this consumer'),
  629. num_parts)