123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748 |
- from __future__ import absolute_import, division
- import abc
- import copy
- import logging
- import time
- import weakref
- from kafka.vendor import six
- from .heartbeat import Heartbeat
- from .. import errors as Errors
- from ..future import Future
- from ..metrics import AnonMeasurable
- from ..metrics.stats import Avg, Count, Max, Rate
- from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
- from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
- LeaveGroupRequest, SyncGroupRequest)
- log = logging.getLogger('kafka.coordinator')
- class BaseCoordinator(object):
- """
- BaseCoordinator implements group management for a single group member
- by interacting with a designated Kafka broker (the coordinator). Group
- semantics are provided by extending this class. See ConsumerCoordinator
- for example usage.
- From a high level, Kafka's group management protocol consists of the
- following sequence of actions:
- 1. Group Registration: Group members register with the coordinator providing
- their own metadata (such as the set of topics they are interested in).
- 2. Group/Leader Selection: The coordinator select the members of the group
- and chooses one member as the leader.
- 3. State Assignment: The leader collects the metadata from all the members
- of the group and assigns state.
- 4. Group Stabilization: Each member receives the state assigned by the
- leader and begins processing.
- To leverage this protocol, an implementation must define the format of
- metadata provided by each member for group registration in
- :meth:`.group_protocols` and the format of the state assignment provided by
- the leader in :meth:`._perform_assignment` and which becomes available to
- members in :meth:`._on_join_complete`.
- """
- DEFAULT_CONFIG = {
- 'group_id': 'kafka-python-default-group',
- 'session_timeout_ms': 30000,
- 'heartbeat_interval_ms': 3000,
- 'retry_backoff_ms': 100,
- 'api_version': (0, 9),
- 'metric_group_prefix': '',
- }
- def __init__(self, client, metrics, **configs):
- """
- Keyword Arguments:
- group_id (str): name of the consumer group to join for dynamic
- partition assignment (if enabled), and to use for fetching and
- committing offsets. Default: 'kafka-python-default-group'
- session_timeout_ms (int): The timeout used to detect failures when
- using Kafka's group managementment facilities. Default: 30000
- heartbeat_interval_ms (int): The expected time in milliseconds
- between heartbeats to the consumer coordinator when using
- Kafka's group management feature. Heartbeats are used to ensure
- that the consumer's session stays active and to facilitate
- rebalancing when new consumers join or leave the group. The
- value must be set lower than session_timeout_ms, but typically
- should be set no higher than 1/3 of that value. It can be
- adjusted even lower to control the expected time for normal
- rebalances. Default: 3000
- retry_backoff_ms (int): Milliseconds to backoff when retrying on
- errors. Default: 100.
- """
- self.config = copy.copy(self.DEFAULT_CONFIG)
- for key in self.config:
- if key in configs:
- self.config[key] = configs[key]
- self._client = client
- self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.group_id = self.config['group_id']
- self.coordinator_id = None
- self.rejoin_needed = True
- self.rejoining = False
- self.heartbeat = Heartbeat(**self.config)
- self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
- self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
- self.config['metric_group_prefix'])
- def __del__(self):
- if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
- self.heartbeat_task.disable()
- @abc.abstractmethod
- def protocol_type(self):
- """
- Unique identifier for the class of protocols implements
- (e.g. "consumer" or "connect").
- Returns:
- str: protocol type name
- """
- pass
- @abc.abstractmethod
- def group_protocols(self):
- """Return the list of supported group protocols and metadata.
- This list is submitted by each group member via a JoinGroupRequest.
- The order of the protocols in the list indicates the preference of the
- protocol (the first entry is the most preferred). The coordinator takes
- this preference into account when selecting the generation protocol
- (generally more preferred protocols will be selected as long as all
- members support them and there is no disagreement on the preference).
- Note: metadata must be type bytes or support an encode() method
- Returns:
- list: [(protocol, metadata), ...]
- """
- pass
- @abc.abstractmethod
- def _on_join_prepare(self, generation, member_id):
- """Invoked prior to each group join or rejoin.
- This is typically used to perform any cleanup from the previous
- generation (such as committing offsets for the consumer)
- Arguments:
- generation (int): The previous generation or -1 if there was none
- member_id (str): The identifier of this member in the previous group
- or '' if there was none
- """
- pass
- @abc.abstractmethod
- def _perform_assignment(self, leader_id, protocol, members):
- """Perform assignment for the group.
- This is used by the leader to push state to all the members of the group
- (e.g. to push partition assignments in the case of the new consumer)
- Arguments:
- leader_id (str): The id of the leader (which is this member)
- protocol (str): the chosen group protocol (assignment strategy)
- members (list): [(member_id, metadata_bytes)] from
- JoinGroupResponse. metadata_bytes are associated with the chosen
- group protocol, and the Coordinator subclass is responsible for
- decoding metadata_bytes based on that protocol.
- Returns:
- dict: {member_id: assignment}; assignment must either be bytes
- or have an encode() method to convert to bytes
- """
- pass
- @abc.abstractmethod
- def _on_join_complete(self, generation, member_id, protocol,
- member_assignment_bytes):
- """Invoked when a group member has successfully joined a group.
- Arguments:
- generation (int): the generation that was joined
- member_id (str): the identifier for the local member in the group
- protocol (str): the protocol selected by the coordinator
- member_assignment_bytes (bytes): the protocol-encoded assignment
- propagated from the group leader. The Coordinator instance is
- responsible for decoding based on the chosen protocol.
- """
- pass
- def coordinator_unknown(self):
- """Check if we know who the coordinator is and have an active connection
- Side-effect: reset coordinator_id to None if connection failed
- Returns:
- bool: True if the coordinator is unknown
- """
- if self.coordinator_id is None:
- return True
- if self._client.is_disconnected(self.coordinator_id):
- self.coordinator_dead('Node Disconnected')
- return True
- return False
- def ensure_coordinator_known(self):
- """Block until the coordinator for this group is known
- (and we have an active connection -- java client uses unsent queue).
- """
- while self.coordinator_unknown():
- # Prior to 0.8.2 there was no group coordinator
- # so we will just pick a node at random and treat
- # it as the "coordinator"
- if self.config['api_version'] < (0, 8, 2):
- self.coordinator_id = self._client.least_loaded_node()
- if self.coordinator_id is not None:
- self._client.ready(self.coordinator_id)
- continue
- future = self._send_group_coordinator_request()
- self._client.poll(future=future)
- if future.failed():
- if future.retriable():
- if getattr(future.exception, 'invalid_metadata', False):
- log.debug('Requesting metadata for group coordinator request: %s', future.exception)
- metadata_update = self._client.cluster.request_update()
- self._client.poll(future=metadata_update)
- else:
- time.sleep(self.config['retry_backoff_ms'] / 1000)
- else:
- raise future.exception # pylint: disable-msg=raising-bad-type
- def need_rejoin(self):
- """Check whether the group should be rejoined (e.g. if metadata changes)
- Returns:
- bool: True if it should, False otherwise
- """
- return self.rejoin_needed
- def ensure_active_group(self):
- """Ensure that the group is active (i.e. joined and synced)"""
- if not self.need_rejoin():
- return
- if not self.rejoining:
- self._on_join_prepare(self.generation, self.member_id)
- self.rejoining = True
- while self.need_rejoin():
- self.ensure_coordinator_known()
- # ensure that there are no pending requests to the coordinator.
- # This is important in particular to avoid resending a pending
- # JoinGroup request.
- while not self.coordinator_unknown():
- if not self._client.in_flight_request_count(self.coordinator_id):
- break
- self._client.poll(delayed_tasks=False)
- else:
- continue
- future = self._send_join_group_request()
- self._client.poll(future=future)
- if future.succeeded():
- member_assignment_bytes = future.value
- self._on_join_complete(self.generation, self.member_id,
- self.protocol, member_assignment_bytes)
- self.rejoining = False
- self.heartbeat_task.reset()
- else:
- assert future.failed()
- exception = future.exception
- if isinstance(exception, (Errors.UnknownMemberIdError,
- Errors.RebalanceInProgressError,
- Errors.IllegalGenerationError)):
- continue
- elif not future.retriable():
- raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000)
- def _send_join_group_request(self):
- """Join the group and return the assignment for the next generation.
- This function handles both JoinGroup and SyncGroup, delegating to
- :meth:`._perform_assignment` if elected leader by the coordinator.
- Returns:
- Future: resolves to the encoded-bytes assignment returned from the
- group leader
- """
- if self.coordinator_unknown():
- e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
- return Future().failure(e)
- elif not self._client.ready(self.coordinator_id, metadata_priority=False):
- e = Errors.NodeNotReadyError(self.coordinator_id)
- return Future().failure(e)
- # send a join group request to the coordinator
- log.info("(Re-)joining group %s", self.group_id)
- request = JoinGroupRequest[0](
- self.group_id,
- self.config['session_timeout_ms'],
- self.member_id,
- self.protocol_type(),
- [(protocol,
- metadata if isinstance(metadata, bytes) else metadata.encode())
- for protocol, metadata in self.group_protocols()])
- # create the request for the coordinator
- log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
- future = Future()
- _f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_join_group_response, future, time.time())
- _f.add_errback(self._failed_request, self.coordinator_id,
- request, future)
- return future
- def _failed_request(self, node_id, request, future, error):
- log.error('Error sending %s to node %s [%s]',
- request.__class__.__name__, node_id, error)
- # Marking coordinator dead
- # unless the error is caused by internal client pipelining
- if not isinstance(error, (Errors.NodeNotReadyError,
- Errors.TooManyInFlightRequests)):
- self.coordinator_dead(error)
- future.failure(error)
- def _handle_join_group_response(self, future, send_time, response):
- error_type = Errors.for_code(response.error_code)
- if error_type is Errors.NoError:
- log.debug("Received successful JoinGroup response for group %s: %s",
- self.group_id, response)
- self.member_id = response.member_id
- self.generation = response.generation_id
- self.rejoin_needed = False
- self.protocol = response.group_protocol
- log.info("Joined group '%s' (generation %s) with member_id %s",
- self.group_id, self.generation, self.member_id)
- self.sensors.join_latency.record((time.time() - send_time) * 1000)
- if response.leader_id == response.member_id:
- log.info("Elected group leader -- performing partition"
- " assignments using %s", self.protocol)
- self._on_join_leader(response).chain(future)
- else:
- self._on_join_follower().chain(future)
- elif error_type is Errors.GroupLoadInProgressError:
- log.debug("Attempt to join group %s rejected since coordinator %s"
- " is loading the group.", self.group_id, self.coordinator_id)
- # backoff and retry
- future.failure(error_type(response))
- elif error_type is Errors.UnknownMemberIdError:
- # reset the member id and retry immediately
- error = error_type(self.member_id)
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- log.debug("Attempt to join group %s failed due to unknown member id",
- self.group_id)
- future.failure(error)
- elif error_type in (Errors.GroupCoordinatorNotAvailableError,
- Errors.NotCoordinatorForGroupError):
- # re-discover the coordinator and retry with backoff
- self.coordinator_dead(error_type())
- log.debug("Attempt to join group %s failed due to obsolete "
- "coordinator information: %s", self.group_id,
- error_type.__name__)
- future.failure(error_type())
- elif error_type in (Errors.InconsistentGroupProtocolError,
- Errors.InvalidSessionTimeoutError,
- Errors.InvalidGroupIdError):
- # log the error and re-throw the exception
- error = error_type(response)
- log.error("Attempt to join group %s failed due to fatal error: %s",
- self.group_id, error)
- future.failure(error)
- elif error_type is Errors.GroupAuthorizationFailedError:
- future.failure(error_type(self.group_id))
- else:
- # unexpected error, throw the exception
- error = error_type()
- log.error("Unexpected error in join group response: %s", error)
- future.failure(error)
- def _on_join_follower(self):
- # send follower's sync group with an empty assignment
- request = SyncGroupRequest[0](
- self.group_id,
- self.generation,
- self.member_id,
- {})
- log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
- self.group_id, self.coordinator_id, request)
- return self._send_sync_group_request(request)
- def _on_join_leader(self, response):
- """
- Perform leader synchronization and send back the assignment
- for the group via SyncGroupRequest
- Arguments:
- response (JoinResponse): broker response to parse
- Returns:
- Future: resolves to member assignment encoded-bytes
- """
- try:
- group_assignment = self._perform_assignment(response.leader_id,
- response.group_protocol,
- response.members)
- except Exception as e:
- return Future().failure(e)
- request = SyncGroupRequest[0](
- self.group_id,
- self.generation,
- self.member_id,
- [(member_id,
- assignment if isinstance(assignment, bytes) else assignment.encode())
- for member_id, assignment in six.iteritems(group_assignment)])
- log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
- self.group_id, self.coordinator_id, request)
- return self._send_sync_group_request(request)
- def _send_sync_group_request(self, request):
- if self.coordinator_unknown():
- e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
- return Future().failure(e)
- # We assume that coordinator is ready if we're sending SyncGroup
- # as it typically follows a successful JoinGroup
- # Also note that if client.ready() enforces a metadata priority policy,
- # we can get into an infinite loop if the leader assignment process
- # itself requests a metadata update
- future = Future()
- _f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_sync_group_response, future, time.time())
- _f.add_errback(self._failed_request, self.coordinator_id,
- request, future)
- return future
- def _handle_sync_group_response(self, future, send_time, response):
- error_type = Errors.for_code(response.error_code)
- if error_type is Errors.NoError:
- log.info("Successfully joined group %s with generation %s",
- self.group_id, self.generation)
- self.sensors.sync_latency.record((time.time() - send_time) * 1000)
- future.success(response.member_assignment)
- return
- # Always rejoin on error
- self.rejoin_needed = True
- if error_type is Errors.GroupAuthorizationFailedError:
- future.failure(error_type(self.group_id))
- elif error_type is Errors.RebalanceInProgressError:
- log.debug("SyncGroup for group %s failed due to coordinator"
- " rebalance", self.group_id)
- future.failure(error_type(self.group_id))
- elif error_type in (Errors.UnknownMemberIdError,
- Errors.IllegalGenerationError):
- error = error_type()
- log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- future.failure(error)
- elif error_type in (Errors.GroupCoordinatorNotAvailableError,
- Errors.NotCoordinatorForGroupError):
- error = error_type()
- log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
- self.coordinator_dead(error)
- future.failure(error)
- else:
- error = error_type()
- log.error("Unexpected error from SyncGroup: %s", error)
- future.failure(error)
- def _send_group_coordinator_request(self):
- """Discover the current coordinator for the group.
- Returns:
- Future: resolves to the node id of the coordinator
- """
- node_id = self._client.least_loaded_node()
- if node_id is None:
- return Future().failure(Errors.NoBrokersAvailable())
- elif not self._client.ready(node_id, metadata_priority=False):
- e = Errors.NodeNotReadyError(node_id)
- return Future().failure(e)
- log.debug("Sending group coordinator request for group %s to broker %s",
- self.group_id, node_id)
- request = GroupCoordinatorRequest[0](self.group_id)
- future = Future()
- _f = self._client.send(node_id, request)
- _f.add_callback(self._handle_group_coordinator_response, future)
- _f.add_errback(self._failed_request, node_id, request, future)
- return future
- def _handle_group_coordinator_response(self, future, response):
- log.debug("Received group coordinator response %s", response)
- if not self.coordinator_unknown():
- # We already found the coordinator, so ignore the request
- log.debug("Coordinator already known -- ignoring metadata response")
- future.success(self.coordinator_id)
- return
- error_type = Errors.for_code(response.error_code)
- if error_type is Errors.NoError:
- ok = self._client.cluster.add_group_coordinator(self.group_id, response)
- if not ok:
- # This could happen if coordinator metadata is different
- # than broker metadata
- future.failure(Errors.IllegalStateError())
- return
- self.coordinator_id = response.coordinator_id
- log.info("Discovered coordinator %s for group %s",
- self.coordinator_id, self.group_id)
- self._client.ready(self.coordinator_id)
- # start sending heartbeats only if we have a valid generation
- if self.generation > 0:
- self.heartbeat_task.reset()
- future.success(self.coordinator_id)
- elif error_type is Errors.GroupCoordinatorNotAvailableError:
- log.debug("Group Coordinator Not Available; retry")
- future.failure(error_type())
- elif error_type is Errors.GroupAuthorizationFailedError:
- error = error_type(self.group_id)
- log.error("Group Coordinator Request failed: %s", error)
- future.failure(error)
- else:
- error = error_type()
- log.error("Unrecognized failure in Group Coordinator Request: %s",
- error)
- future.failure(error)
- def coordinator_dead(self, error):
- """Mark the current coordinator as dead."""
- if self.coordinator_id is not None:
- log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
- self.coordinator_id, self.group_id, error)
- self.coordinator_id = None
- def close(self):
- """Close the coordinator, leave the current group,
- and reset local generation / member_id"""
- try:
- self._client.unschedule(self.heartbeat_task)
- except KeyError:
- pass
- if not self.coordinator_unknown() and self.generation > 0:
- # this is a minimal effort attempt to leave the group. we do not
- # attempt any resending if the request fails or times out.
- log.info('Leaving consumer group (%s).', self.group_id)
- request = LeaveGroupRequest[0](self.group_id, self.member_id)
- future = self._client.send(self.coordinator_id, request)
- future.add_callback(self._handle_leave_group_response)
- future.add_errback(log.error, "LeaveGroup request failed: %s")
- self._client.poll(future=future)
- self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.rejoin_needed = True
- def _handle_leave_group_response(self, response):
- error_type = Errors.for_code(response.error_code)
- if error_type is Errors.NoError:
- log.info("LeaveGroup request succeeded")
- else:
- log.error("LeaveGroup request failed: %s", error_type())
- def _send_heartbeat_request(self):
- """Send a heartbeat request"""
- if self.coordinator_unknown():
- e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
- return Future().failure(e)
- elif not self._client.ready(self.coordinator_id, metadata_priority=False):
- e = Errors.NodeNotReadyError(self.coordinator_id)
- return Future().failure(e)
- request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
- log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
- future = Future()
- _f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_heartbeat_response, future, time.time())
- _f.add_errback(self._failed_request, self.coordinator_id,
- request, future)
- return future
- def _handle_heartbeat_response(self, future, send_time, response):
- self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
- error_type = Errors.for_code(response.error_code)
- if error_type is Errors.NoError:
- log.debug("Received successful heartbeat response for group %s",
- self.group_id)
- future.success(None)
- elif error_type in (Errors.GroupCoordinatorNotAvailableError,
- Errors.NotCoordinatorForGroupError):
- log.warning("Heartbeat failed for group %s: coordinator (node %s)"
- " is either not started or not valid", self.group_id,
- self.coordinator_id)
- self.coordinator_dead(error_type())
- future.failure(error_type())
- elif error_type is Errors.RebalanceInProgressError:
- log.warning("Heartbeat failed for group %s because it is"
- " rebalancing", self.group_id)
- self.rejoin_needed = True
- future.failure(error_type())
- elif error_type is Errors.IllegalGenerationError:
- log.warning("Heartbeat failed for group %s: generation id is not "
- " current.", self.group_id)
- self.rejoin_needed = True
- future.failure(error_type())
- elif error_type is Errors.UnknownMemberIdError:
- log.warning("Heartbeat: local member_id was not recognized;"
- " this consumer needs to re-join")
- self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
- self.rejoin_needed = True
- future.failure(error_type)
- elif error_type is Errors.GroupAuthorizationFailedError:
- error = error_type(self.group_id)
- log.error("Heartbeat failed: authorization error: %s", error)
- future.failure(error)
- else:
- error = error_type()
- log.error("Heartbeat failed: Unhandled error: %s", error)
- future.failure(error)
- class HeartbeatTask(object):
- def __init__(self, coordinator):
- self._coordinator = coordinator
- self._heartbeat = coordinator.heartbeat
- self._client = coordinator._client
- self._request_in_flight = False
- def disable(self):
- try:
- self._client.unschedule(self)
- except KeyError:
- pass
- def reset(self):
- # start or restart the heartbeat task to be executed at the next chance
- self._heartbeat.reset_session_timeout()
- try:
- self._client.unschedule(self)
- except KeyError:
- pass
- if not self._request_in_flight:
- self._client.schedule(self, time.time())
- def __call__(self):
- if (self._coordinator.generation < 0 or
- self._coordinator.need_rejoin()):
- # no need to send the heartbeat we're not using auto-assignment
- # or if we are awaiting a rebalance
- log.info("Skipping heartbeat: no auto-assignment"
- " or waiting on rebalance")
- return
- if self._coordinator.coordinator_unknown():
- log.warning("Coordinator unknown during heartbeat -- will retry")
- self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError())
- return
- if self._heartbeat.session_expired():
- # we haven't received a successful heartbeat in one session interval
- # so mark the coordinator dead
- log.error("Heartbeat session expired - marking coordinator dead")
- self._coordinator.coordinator_dead('Heartbeat session expired')
- return
- if not self._heartbeat.should_heartbeat():
- # we don't need to heartbeat now, so reschedule for when we do
- ttl = self._heartbeat.ttl()
- log.debug("Heartbeat task unneeded now, retrying in %s", ttl)
- self._client.schedule(self, time.time() + ttl)
- else:
- self._heartbeat.sent_heartbeat()
- self._request_in_flight = True
- future = self._coordinator._send_heartbeat_request()
- future.add_callback(self._handle_heartbeat_success)
- future.add_errback(self._handle_heartbeat_failure)
- def _handle_heartbeat_success(self, v):
- log.debug("Received successful heartbeat")
- self._request_in_flight = False
- self._heartbeat.received_heartbeat()
- ttl = self._heartbeat.ttl()
- self._client.schedule(self, time.time() + ttl)
- def _handle_heartbeat_failure(self, e):
- log.warning("Heartbeat failed (%s); retrying", e)
- self._request_in_flight = False
- etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000
- self._client.schedule(self, etd)
- class GroupCoordinatorMetrics(object):
- def __init__(self, heartbeat, metrics, prefix, tags=None):
- self.heartbeat = heartbeat
- self.metrics = metrics
- self.metric_group_name = prefix + "-coordinator-metrics"
- self.heartbeat_latency = metrics.sensor('heartbeat-latency')
- self.heartbeat_latency.add(metrics.metric_name(
- 'heartbeat-response-time-max', self.metric_group_name,
- 'The max time taken to receive a response to a heartbeat request',
- tags), Max())
- self.heartbeat_latency.add(metrics.metric_name(
- 'heartbeat-rate', self.metric_group_name,
- 'The average number of heartbeats per second',
- tags), Rate(sampled_stat=Count()))
- self.join_latency = metrics.sensor('join-latency')
- self.join_latency.add(metrics.metric_name(
- 'join-time-avg', self.metric_group_name,
- 'The average time taken for a group rejoin',
- tags), Avg())
- self.join_latency.add(metrics.metric_name(
- 'join-time-max', self.metric_group_name,
- 'The max time taken for a group rejoin',
- tags), Max())
- self.join_latency.add(metrics.metric_name(
- 'join-rate', self.metric_group_name,
- 'The number of group joins per second',
- tags), Rate(sampled_stat=Count()))
- self.sync_latency = metrics.sensor('sync-latency')
- self.sync_latency.add(metrics.metric_name(
- 'sync-time-avg', self.metric_group_name,
- 'The average time taken for a group sync',
- tags), Avg())
- self.sync_latency.add(metrics.metric_name(
- 'sync-time-max', self.metric_group_name,
- 'The max time taken for a group sync',
- tags), Max())
- self.sync_latency.add(metrics.metric_name(
- 'sync-rate', self.metric_group_name,
- 'The number of group syncs per second',
- tags), Rate(sampled_stat=Count()))
- metrics.add_metric(metrics.metric_name(
- 'last-heartbeat-seconds-ago', self.metric_group_name,
- 'The number of seconds since the last controller heartbeat',
- tags), AnonMeasurable(
- lambda _, now: (now / 1000) - self.heartbeat.last_send))
|