base.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748
  1. from __future__ import absolute_import, division
  2. import abc
  3. import copy
  4. import logging
  5. import time
  6. import weakref
  7. from kafka.vendor import six
  8. from .heartbeat import Heartbeat
  9. from .. import errors as Errors
  10. from ..future import Future
  11. from ..metrics import AnonMeasurable
  12. from ..metrics.stats import Avg, Count, Max, Rate
  13. from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
  14. from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
  15. LeaveGroupRequest, SyncGroupRequest)
  16. log = logging.getLogger('kafka.coordinator')
  17. class BaseCoordinator(object):
  18. """
  19. BaseCoordinator implements group management for a single group member
  20. by interacting with a designated Kafka broker (the coordinator). Group
  21. semantics are provided by extending this class. See ConsumerCoordinator
  22. for example usage.
  23. From a high level, Kafka's group management protocol consists of the
  24. following sequence of actions:
  25. 1. Group Registration: Group members register with the coordinator providing
  26. their own metadata (such as the set of topics they are interested in).
  27. 2. Group/Leader Selection: The coordinator select the members of the group
  28. and chooses one member as the leader.
  29. 3. State Assignment: The leader collects the metadata from all the members
  30. of the group and assigns state.
  31. 4. Group Stabilization: Each member receives the state assigned by the
  32. leader and begins processing.
  33. To leverage this protocol, an implementation must define the format of
  34. metadata provided by each member for group registration in
  35. :meth:`.group_protocols` and the format of the state assignment provided by
  36. the leader in :meth:`._perform_assignment` and which becomes available to
  37. members in :meth:`._on_join_complete`.
  38. """
  39. DEFAULT_CONFIG = {
  40. 'group_id': 'kafka-python-default-group',
  41. 'session_timeout_ms': 30000,
  42. 'heartbeat_interval_ms': 3000,
  43. 'retry_backoff_ms': 100,
  44. 'api_version': (0, 9),
  45. 'metric_group_prefix': '',
  46. }
  47. def __init__(self, client, metrics, **configs):
  48. """
  49. Keyword Arguments:
  50. group_id (str): name of the consumer group to join for dynamic
  51. partition assignment (if enabled), and to use for fetching and
  52. committing offsets. Default: 'kafka-python-default-group'
  53. session_timeout_ms (int): The timeout used to detect failures when
  54. using Kafka's group managementment facilities. Default: 30000
  55. heartbeat_interval_ms (int): The expected time in milliseconds
  56. between heartbeats to the consumer coordinator when using
  57. Kafka's group management feature. Heartbeats are used to ensure
  58. that the consumer's session stays active and to facilitate
  59. rebalancing when new consumers join or leave the group. The
  60. value must be set lower than session_timeout_ms, but typically
  61. should be set no higher than 1/3 of that value. It can be
  62. adjusted even lower to control the expected time for normal
  63. rebalances. Default: 3000
  64. retry_backoff_ms (int): Milliseconds to backoff when retrying on
  65. errors. Default: 100.
  66. """
  67. self.config = copy.copy(self.DEFAULT_CONFIG)
  68. for key in self.config:
  69. if key in configs:
  70. self.config[key] = configs[key]
  71. self._client = client
  72. self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
  73. self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
  74. self.group_id = self.config['group_id']
  75. self.coordinator_id = None
  76. self.rejoin_needed = True
  77. self.rejoining = False
  78. self.heartbeat = Heartbeat(**self.config)
  79. self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
  80. self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
  81. self.config['metric_group_prefix'])
  82. def __del__(self):
  83. if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
  84. self.heartbeat_task.disable()
  85. @abc.abstractmethod
  86. def protocol_type(self):
  87. """
  88. Unique identifier for the class of protocols implements
  89. (e.g. "consumer" or "connect").
  90. Returns:
  91. str: protocol type name
  92. """
  93. pass
  94. @abc.abstractmethod
  95. def group_protocols(self):
  96. """Return the list of supported group protocols and metadata.
  97. This list is submitted by each group member via a JoinGroupRequest.
  98. The order of the protocols in the list indicates the preference of the
  99. protocol (the first entry is the most preferred). The coordinator takes
  100. this preference into account when selecting the generation protocol
  101. (generally more preferred protocols will be selected as long as all
  102. members support them and there is no disagreement on the preference).
  103. Note: metadata must be type bytes or support an encode() method
  104. Returns:
  105. list: [(protocol, metadata), ...]
  106. """
  107. pass
  108. @abc.abstractmethod
  109. def _on_join_prepare(self, generation, member_id):
  110. """Invoked prior to each group join or rejoin.
  111. This is typically used to perform any cleanup from the previous
  112. generation (such as committing offsets for the consumer)
  113. Arguments:
  114. generation (int): The previous generation or -1 if there was none
  115. member_id (str): The identifier of this member in the previous group
  116. or '' if there was none
  117. """
  118. pass
  119. @abc.abstractmethod
  120. def _perform_assignment(self, leader_id, protocol, members):
  121. """Perform assignment for the group.
  122. This is used by the leader to push state to all the members of the group
  123. (e.g. to push partition assignments in the case of the new consumer)
  124. Arguments:
  125. leader_id (str): The id of the leader (which is this member)
  126. protocol (str): the chosen group protocol (assignment strategy)
  127. members (list): [(member_id, metadata_bytes)] from
  128. JoinGroupResponse. metadata_bytes are associated with the chosen
  129. group protocol, and the Coordinator subclass is responsible for
  130. decoding metadata_bytes based on that protocol.
  131. Returns:
  132. dict: {member_id: assignment}; assignment must either be bytes
  133. or have an encode() method to convert to bytes
  134. """
  135. pass
  136. @abc.abstractmethod
  137. def _on_join_complete(self, generation, member_id, protocol,
  138. member_assignment_bytes):
  139. """Invoked when a group member has successfully joined a group.
  140. Arguments:
  141. generation (int): the generation that was joined
  142. member_id (str): the identifier for the local member in the group
  143. protocol (str): the protocol selected by the coordinator
  144. member_assignment_bytes (bytes): the protocol-encoded assignment
  145. propagated from the group leader. The Coordinator instance is
  146. responsible for decoding based on the chosen protocol.
  147. """
  148. pass
  149. def coordinator_unknown(self):
  150. """Check if we know who the coordinator is and have an active connection
  151. Side-effect: reset coordinator_id to None if connection failed
  152. Returns:
  153. bool: True if the coordinator is unknown
  154. """
  155. if self.coordinator_id is None:
  156. return True
  157. if self._client.is_disconnected(self.coordinator_id):
  158. self.coordinator_dead('Node Disconnected')
  159. return True
  160. return False
  161. def ensure_coordinator_known(self):
  162. """Block until the coordinator for this group is known
  163. (and we have an active connection -- java client uses unsent queue).
  164. """
  165. while self.coordinator_unknown():
  166. # Prior to 0.8.2 there was no group coordinator
  167. # so we will just pick a node at random and treat
  168. # it as the "coordinator"
  169. if self.config['api_version'] < (0, 8, 2):
  170. self.coordinator_id = self._client.least_loaded_node()
  171. if self.coordinator_id is not None:
  172. self._client.ready(self.coordinator_id)
  173. continue
  174. future = self._send_group_coordinator_request()
  175. self._client.poll(future=future)
  176. if future.failed():
  177. if future.retriable():
  178. if getattr(future.exception, 'invalid_metadata', False):
  179. log.debug('Requesting metadata for group coordinator request: %s', future.exception)
  180. metadata_update = self._client.cluster.request_update()
  181. self._client.poll(future=metadata_update)
  182. else:
  183. time.sleep(self.config['retry_backoff_ms'] / 1000)
  184. else:
  185. raise future.exception # pylint: disable-msg=raising-bad-type
  186. def need_rejoin(self):
  187. """Check whether the group should be rejoined (e.g. if metadata changes)
  188. Returns:
  189. bool: True if it should, False otherwise
  190. """
  191. return self.rejoin_needed
  192. def ensure_active_group(self):
  193. """Ensure that the group is active (i.e. joined and synced)"""
  194. if not self.need_rejoin():
  195. return
  196. if not self.rejoining:
  197. self._on_join_prepare(self.generation, self.member_id)
  198. self.rejoining = True
  199. while self.need_rejoin():
  200. self.ensure_coordinator_known()
  201. # ensure that there are no pending requests to the coordinator.
  202. # This is important in particular to avoid resending a pending
  203. # JoinGroup request.
  204. while not self.coordinator_unknown():
  205. if not self._client.in_flight_request_count(self.coordinator_id):
  206. break
  207. self._client.poll(delayed_tasks=False)
  208. else:
  209. continue
  210. future = self._send_join_group_request()
  211. self._client.poll(future=future)
  212. if future.succeeded():
  213. member_assignment_bytes = future.value
  214. self._on_join_complete(self.generation, self.member_id,
  215. self.protocol, member_assignment_bytes)
  216. self.rejoining = False
  217. self.heartbeat_task.reset()
  218. else:
  219. assert future.failed()
  220. exception = future.exception
  221. if isinstance(exception, (Errors.UnknownMemberIdError,
  222. Errors.RebalanceInProgressError,
  223. Errors.IllegalGenerationError)):
  224. continue
  225. elif not future.retriable():
  226. raise exception # pylint: disable-msg=raising-bad-type
  227. time.sleep(self.config['retry_backoff_ms'] / 1000)
  228. def _send_join_group_request(self):
  229. """Join the group and return the assignment for the next generation.
  230. This function handles both JoinGroup and SyncGroup, delegating to
  231. :meth:`._perform_assignment` if elected leader by the coordinator.
  232. Returns:
  233. Future: resolves to the encoded-bytes assignment returned from the
  234. group leader
  235. """
  236. if self.coordinator_unknown():
  237. e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
  238. return Future().failure(e)
  239. elif not self._client.ready(self.coordinator_id, metadata_priority=False):
  240. e = Errors.NodeNotReadyError(self.coordinator_id)
  241. return Future().failure(e)
  242. # send a join group request to the coordinator
  243. log.info("(Re-)joining group %s", self.group_id)
  244. request = JoinGroupRequest[0](
  245. self.group_id,
  246. self.config['session_timeout_ms'],
  247. self.member_id,
  248. self.protocol_type(),
  249. [(protocol,
  250. metadata if isinstance(metadata, bytes) else metadata.encode())
  251. for protocol, metadata in self.group_protocols()])
  252. # create the request for the coordinator
  253. log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
  254. future = Future()
  255. _f = self._client.send(self.coordinator_id, request)
  256. _f.add_callback(self._handle_join_group_response, future, time.time())
  257. _f.add_errback(self._failed_request, self.coordinator_id,
  258. request, future)
  259. return future
  260. def _failed_request(self, node_id, request, future, error):
  261. log.error('Error sending %s to node %s [%s]',
  262. request.__class__.__name__, node_id, error)
  263. # Marking coordinator dead
  264. # unless the error is caused by internal client pipelining
  265. if not isinstance(error, (Errors.NodeNotReadyError,
  266. Errors.TooManyInFlightRequests)):
  267. self.coordinator_dead(error)
  268. future.failure(error)
  269. def _handle_join_group_response(self, future, send_time, response):
  270. error_type = Errors.for_code(response.error_code)
  271. if error_type is Errors.NoError:
  272. log.debug("Received successful JoinGroup response for group %s: %s",
  273. self.group_id, response)
  274. self.member_id = response.member_id
  275. self.generation = response.generation_id
  276. self.rejoin_needed = False
  277. self.protocol = response.group_protocol
  278. log.info("Joined group '%s' (generation %s) with member_id %s",
  279. self.group_id, self.generation, self.member_id)
  280. self.sensors.join_latency.record((time.time() - send_time) * 1000)
  281. if response.leader_id == response.member_id:
  282. log.info("Elected group leader -- performing partition"
  283. " assignments using %s", self.protocol)
  284. self._on_join_leader(response).chain(future)
  285. else:
  286. self._on_join_follower().chain(future)
  287. elif error_type is Errors.GroupLoadInProgressError:
  288. log.debug("Attempt to join group %s rejected since coordinator %s"
  289. " is loading the group.", self.group_id, self.coordinator_id)
  290. # backoff and retry
  291. future.failure(error_type(response))
  292. elif error_type is Errors.UnknownMemberIdError:
  293. # reset the member id and retry immediately
  294. error = error_type(self.member_id)
  295. self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
  296. log.debug("Attempt to join group %s failed due to unknown member id",
  297. self.group_id)
  298. future.failure(error)
  299. elif error_type in (Errors.GroupCoordinatorNotAvailableError,
  300. Errors.NotCoordinatorForGroupError):
  301. # re-discover the coordinator and retry with backoff
  302. self.coordinator_dead(error_type())
  303. log.debug("Attempt to join group %s failed due to obsolete "
  304. "coordinator information: %s", self.group_id,
  305. error_type.__name__)
  306. future.failure(error_type())
  307. elif error_type in (Errors.InconsistentGroupProtocolError,
  308. Errors.InvalidSessionTimeoutError,
  309. Errors.InvalidGroupIdError):
  310. # log the error and re-throw the exception
  311. error = error_type(response)
  312. log.error("Attempt to join group %s failed due to fatal error: %s",
  313. self.group_id, error)
  314. future.failure(error)
  315. elif error_type is Errors.GroupAuthorizationFailedError:
  316. future.failure(error_type(self.group_id))
  317. else:
  318. # unexpected error, throw the exception
  319. error = error_type()
  320. log.error("Unexpected error in join group response: %s", error)
  321. future.failure(error)
  322. def _on_join_follower(self):
  323. # send follower's sync group with an empty assignment
  324. request = SyncGroupRequest[0](
  325. self.group_id,
  326. self.generation,
  327. self.member_id,
  328. {})
  329. log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
  330. self.group_id, self.coordinator_id, request)
  331. return self._send_sync_group_request(request)
  332. def _on_join_leader(self, response):
  333. """
  334. Perform leader synchronization and send back the assignment
  335. for the group via SyncGroupRequest
  336. Arguments:
  337. response (JoinResponse): broker response to parse
  338. Returns:
  339. Future: resolves to member assignment encoded-bytes
  340. """
  341. try:
  342. group_assignment = self._perform_assignment(response.leader_id,
  343. response.group_protocol,
  344. response.members)
  345. except Exception as e:
  346. return Future().failure(e)
  347. request = SyncGroupRequest[0](
  348. self.group_id,
  349. self.generation,
  350. self.member_id,
  351. [(member_id,
  352. assignment if isinstance(assignment, bytes) else assignment.encode())
  353. for member_id, assignment in six.iteritems(group_assignment)])
  354. log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
  355. self.group_id, self.coordinator_id, request)
  356. return self._send_sync_group_request(request)
  357. def _send_sync_group_request(self, request):
  358. if self.coordinator_unknown():
  359. e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
  360. return Future().failure(e)
  361. # We assume that coordinator is ready if we're sending SyncGroup
  362. # as it typically follows a successful JoinGroup
  363. # Also note that if client.ready() enforces a metadata priority policy,
  364. # we can get into an infinite loop if the leader assignment process
  365. # itself requests a metadata update
  366. future = Future()
  367. _f = self._client.send(self.coordinator_id, request)
  368. _f.add_callback(self._handle_sync_group_response, future, time.time())
  369. _f.add_errback(self._failed_request, self.coordinator_id,
  370. request, future)
  371. return future
  372. def _handle_sync_group_response(self, future, send_time, response):
  373. error_type = Errors.for_code(response.error_code)
  374. if error_type is Errors.NoError:
  375. log.info("Successfully joined group %s with generation %s",
  376. self.group_id, self.generation)
  377. self.sensors.sync_latency.record((time.time() - send_time) * 1000)
  378. future.success(response.member_assignment)
  379. return
  380. # Always rejoin on error
  381. self.rejoin_needed = True
  382. if error_type is Errors.GroupAuthorizationFailedError:
  383. future.failure(error_type(self.group_id))
  384. elif error_type is Errors.RebalanceInProgressError:
  385. log.debug("SyncGroup for group %s failed due to coordinator"
  386. " rebalance", self.group_id)
  387. future.failure(error_type(self.group_id))
  388. elif error_type in (Errors.UnknownMemberIdError,
  389. Errors.IllegalGenerationError):
  390. error = error_type()
  391. log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
  392. self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
  393. future.failure(error)
  394. elif error_type in (Errors.GroupCoordinatorNotAvailableError,
  395. Errors.NotCoordinatorForGroupError):
  396. error = error_type()
  397. log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
  398. self.coordinator_dead(error)
  399. future.failure(error)
  400. else:
  401. error = error_type()
  402. log.error("Unexpected error from SyncGroup: %s", error)
  403. future.failure(error)
  404. def _send_group_coordinator_request(self):
  405. """Discover the current coordinator for the group.
  406. Returns:
  407. Future: resolves to the node id of the coordinator
  408. """
  409. node_id = self._client.least_loaded_node()
  410. if node_id is None:
  411. return Future().failure(Errors.NoBrokersAvailable())
  412. elif not self._client.ready(node_id, metadata_priority=False):
  413. e = Errors.NodeNotReadyError(node_id)
  414. return Future().failure(e)
  415. log.debug("Sending group coordinator request for group %s to broker %s",
  416. self.group_id, node_id)
  417. request = GroupCoordinatorRequest[0](self.group_id)
  418. future = Future()
  419. _f = self._client.send(node_id, request)
  420. _f.add_callback(self._handle_group_coordinator_response, future)
  421. _f.add_errback(self._failed_request, node_id, request, future)
  422. return future
  423. def _handle_group_coordinator_response(self, future, response):
  424. log.debug("Received group coordinator response %s", response)
  425. if not self.coordinator_unknown():
  426. # We already found the coordinator, so ignore the request
  427. log.debug("Coordinator already known -- ignoring metadata response")
  428. future.success(self.coordinator_id)
  429. return
  430. error_type = Errors.for_code(response.error_code)
  431. if error_type is Errors.NoError:
  432. ok = self._client.cluster.add_group_coordinator(self.group_id, response)
  433. if not ok:
  434. # This could happen if coordinator metadata is different
  435. # than broker metadata
  436. future.failure(Errors.IllegalStateError())
  437. return
  438. self.coordinator_id = response.coordinator_id
  439. log.info("Discovered coordinator %s for group %s",
  440. self.coordinator_id, self.group_id)
  441. self._client.ready(self.coordinator_id)
  442. # start sending heartbeats only if we have a valid generation
  443. if self.generation > 0:
  444. self.heartbeat_task.reset()
  445. future.success(self.coordinator_id)
  446. elif error_type is Errors.GroupCoordinatorNotAvailableError:
  447. log.debug("Group Coordinator Not Available; retry")
  448. future.failure(error_type())
  449. elif error_type is Errors.GroupAuthorizationFailedError:
  450. error = error_type(self.group_id)
  451. log.error("Group Coordinator Request failed: %s", error)
  452. future.failure(error)
  453. else:
  454. error = error_type()
  455. log.error("Unrecognized failure in Group Coordinator Request: %s",
  456. error)
  457. future.failure(error)
  458. def coordinator_dead(self, error):
  459. """Mark the current coordinator as dead."""
  460. if self.coordinator_id is not None:
  461. log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
  462. self.coordinator_id, self.group_id, error)
  463. self.coordinator_id = None
  464. def close(self):
  465. """Close the coordinator, leave the current group,
  466. and reset local generation / member_id"""
  467. try:
  468. self._client.unschedule(self.heartbeat_task)
  469. except KeyError:
  470. pass
  471. if not self.coordinator_unknown() and self.generation > 0:
  472. # this is a minimal effort attempt to leave the group. we do not
  473. # attempt any resending if the request fails or times out.
  474. log.info('Leaving consumer group (%s).', self.group_id)
  475. request = LeaveGroupRequest[0](self.group_id, self.member_id)
  476. future = self._client.send(self.coordinator_id, request)
  477. future.add_callback(self._handle_leave_group_response)
  478. future.add_errback(log.error, "LeaveGroup request failed: %s")
  479. self._client.poll(future=future)
  480. self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
  481. self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
  482. self.rejoin_needed = True
  483. def _handle_leave_group_response(self, response):
  484. error_type = Errors.for_code(response.error_code)
  485. if error_type is Errors.NoError:
  486. log.info("LeaveGroup request succeeded")
  487. else:
  488. log.error("LeaveGroup request failed: %s", error_type())
  489. def _send_heartbeat_request(self):
  490. """Send a heartbeat request"""
  491. if self.coordinator_unknown():
  492. e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
  493. return Future().failure(e)
  494. elif not self._client.ready(self.coordinator_id, metadata_priority=False):
  495. e = Errors.NodeNotReadyError(self.coordinator_id)
  496. return Future().failure(e)
  497. request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
  498. log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
  499. future = Future()
  500. _f = self._client.send(self.coordinator_id, request)
  501. _f.add_callback(self._handle_heartbeat_response, future, time.time())
  502. _f.add_errback(self._failed_request, self.coordinator_id,
  503. request, future)
  504. return future
  505. def _handle_heartbeat_response(self, future, send_time, response):
  506. self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
  507. error_type = Errors.for_code(response.error_code)
  508. if error_type is Errors.NoError:
  509. log.debug("Received successful heartbeat response for group %s",
  510. self.group_id)
  511. future.success(None)
  512. elif error_type in (Errors.GroupCoordinatorNotAvailableError,
  513. Errors.NotCoordinatorForGroupError):
  514. log.warning("Heartbeat failed for group %s: coordinator (node %s)"
  515. " is either not started or not valid", self.group_id,
  516. self.coordinator_id)
  517. self.coordinator_dead(error_type())
  518. future.failure(error_type())
  519. elif error_type is Errors.RebalanceInProgressError:
  520. log.warning("Heartbeat failed for group %s because it is"
  521. " rebalancing", self.group_id)
  522. self.rejoin_needed = True
  523. future.failure(error_type())
  524. elif error_type is Errors.IllegalGenerationError:
  525. log.warning("Heartbeat failed for group %s: generation id is not "
  526. " current.", self.group_id)
  527. self.rejoin_needed = True
  528. future.failure(error_type())
  529. elif error_type is Errors.UnknownMemberIdError:
  530. log.warning("Heartbeat: local member_id was not recognized;"
  531. " this consumer needs to re-join")
  532. self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
  533. self.rejoin_needed = True
  534. future.failure(error_type)
  535. elif error_type is Errors.GroupAuthorizationFailedError:
  536. error = error_type(self.group_id)
  537. log.error("Heartbeat failed: authorization error: %s", error)
  538. future.failure(error)
  539. else:
  540. error = error_type()
  541. log.error("Heartbeat failed: Unhandled error: %s", error)
  542. future.failure(error)
  543. class HeartbeatTask(object):
  544. def __init__(self, coordinator):
  545. self._coordinator = coordinator
  546. self._heartbeat = coordinator.heartbeat
  547. self._client = coordinator._client
  548. self._request_in_flight = False
  549. def disable(self):
  550. try:
  551. self._client.unschedule(self)
  552. except KeyError:
  553. pass
  554. def reset(self):
  555. # start or restart the heartbeat task to be executed at the next chance
  556. self._heartbeat.reset_session_timeout()
  557. try:
  558. self._client.unschedule(self)
  559. except KeyError:
  560. pass
  561. if not self._request_in_flight:
  562. self._client.schedule(self, time.time())
  563. def __call__(self):
  564. if (self._coordinator.generation < 0 or
  565. self._coordinator.need_rejoin()):
  566. # no need to send the heartbeat we're not using auto-assignment
  567. # or if we are awaiting a rebalance
  568. log.info("Skipping heartbeat: no auto-assignment"
  569. " or waiting on rebalance")
  570. return
  571. if self._coordinator.coordinator_unknown():
  572. log.warning("Coordinator unknown during heartbeat -- will retry")
  573. self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError())
  574. return
  575. if self._heartbeat.session_expired():
  576. # we haven't received a successful heartbeat in one session interval
  577. # so mark the coordinator dead
  578. log.error("Heartbeat session expired - marking coordinator dead")
  579. self._coordinator.coordinator_dead('Heartbeat session expired')
  580. return
  581. if not self._heartbeat.should_heartbeat():
  582. # we don't need to heartbeat now, so reschedule for when we do
  583. ttl = self._heartbeat.ttl()
  584. log.debug("Heartbeat task unneeded now, retrying in %s", ttl)
  585. self._client.schedule(self, time.time() + ttl)
  586. else:
  587. self._heartbeat.sent_heartbeat()
  588. self._request_in_flight = True
  589. future = self._coordinator._send_heartbeat_request()
  590. future.add_callback(self._handle_heartbeat_success)
  591. future.add_errback(self._handle_heartbeat_failure)
  592. def _handle_heartbeat_success(self, v):
  593. log.debug("Received successful heartbeat")
  594. self._request_in_flight = False
  595. self._heartbeat.received_heartbeat()
  596. ttl = self._heartbeat.ttl()
  597. self._client.schedule(self, time.time() + ttl)
  598. def _handle_heartbeat_failure(self, e):
  599. log.warning("Heartbeat failed (%s); retrying", e)
  600. self._request_in_flight = False
  601. etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000
  602. self._client.schedule(self, etd)
  603. class GroupCoordinatorMetrics(object):
  604. def __init__(self, heartbeat, metrics, prefix, tags=None):
  605. self.heartbeat = heartbeat
  606. self.metrics = metrics
  607. self.metric_group_name = prefix + "-coordinator-metrics"
  608. self.heartbeat_latency = metrics.sensor('heartbeat-latency')
  609. self.heartbeat_latency.add(metrics.metric_name(
  610. 'heartbeat-response-time-max', self.metric_group_name,
  611. 'The max time taken to receive a response to a heartbeat request',
  612. tags), Max())
  613. self.heartbeat_latency.add(metrics.metric_name(
  614. 'heartbeat-rate', self.metric_group_name,
  615. 'The average number of heartbeats per second',
  616. tags), Rate(sampled_stat=Count()))
  617. self.join_latency = metrics.sensor('join-latency')
  618. self.join_latency.add(metrics.metric_name(
  619. 'join-time-avg', self.metric_group_name,
  620. 'The average time taken for a group rejoin',
  621. tags), Avg())
  622. self.join_latency.add(metrics.metric_name(
  623. 'join-time-max', self.metric_group_name,
  624. 'The max time taken for a group rejoin',
  625. tags), Max())
  626. self.join_latency.add(metrics.metric_name(
  627. 'join-rate', self.metric_group_name,
  628. 'The number of group joins per second',
  629. tags), Rate(sampled_stat=Count()))
  630. self.sync_latency = metrics.sensor('sync-latency')
  631. self.sync_latency.add(metrics.metric_name(
  632. 'sync-time-avg', self.metric_group_name,
  633. 'The average time taken for a group sync',
  634. tags), Avg())
  635. self.sync_latency.add(metrics.metric_name(
  636. 'sync-time-max', self.metric_group_name,
  637. 'The max time taken for a group sync',
  638. tags), Max())
  639. self.sync_latency.add(metrics.metric_name(
  640. 'sync-rate', self.metric_group_name,
  641. 'The number of group syncs per second',
  642. tags), Rate(sampled_stat=Count()))
  643. metrics.add_metric(metrics.metric_name(
  644. 'last-heartbeat-seconds-ago', self.metric_group_name,
  645. 'The number of seconds since the last controller heartbeat',
  646. tags), AnonMeasurable(
  647. lambda _, now: (now / 1000) - self.heartbeat.last_send))