cluster.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. from __future__ import absolute_import
  2. import collections
  3. import copy
  4. import logging
  5. import threading
  6. import time
  7. from kafka.vendor import six
  8. from . import errors as Errors
  9. from .future import Future
  10. from .structs import BrokerMetadata, PartitionMetadata, TopicPartition
  11. log = logging.getLogger(__name__)
  12. class ClusterMetadata(object):
  13. """
  14. A class to manage kafka cluster metadata.
  15. This class does not perform any IO. It simply updates internal state
  16. given API responses (MetadataResponse, GroupCoordinatorResponse).
  17. Keyword Arguments:
  18. retry_backoff_ms (int): Milliseconds to backoff when retrying on
  19. errors. Default: 100.
  20. metadata_max_age_ms (int): The period of time in milliseconds after
  21. which we force a refresh of metadata even if we haven't seen any
  22. partition leadership changes to proactively discover any new
  23. brokers or partitions. Default: 300000
  24. """
  25. DEFAULT_CONFIG = {
  26. 'retry_backoff_ms': 100,
  27. 'metadata_max_age_ms': 300000,
  28. }
  29. def __init__(self, **configs):
  30. self._brokers = {} # node_id -> BrokerMetadata
  31. self._partitions = {} # topic -> partition -> PartitionMetadata
  32. self._broker_partitions = collections.defaultdict(set) # node_id -> {TopicPartition...}
  33. self._groups = {} # group_name -> node_id
  34. self._last_refresh_ms = 0
  35. self._last_successful_refresh_ms = 0
  36. self._need_update = False
  37. self._future = None
  38. self._listeners = set()
  39. self._lock = threading.Lock()
  40. self.need_all_topic_metadata = False
  41. self.unauthorized_topics = set()
  42. self.internal_topics = set()
  43. self.controller = None
  44. self.config = copy.copy(self.DEFAULT_CONFIG)
  45. for key in self.config:
  46. if key in configs:
  47. self.config[key] = configs[key]
  48. def brokers(self):
  49. """Get all BrokerMetadata
  50. Returns:
  51. set: {BrokerMetadata, ...}
  52. """
  53. return set(self._brokers.values())
  54. def broker_metadata(self, broker_id):
  55. """Get BrokerMetadata
  56. Arguments:
  57. broker_id (int): node_id for a broker to check
  58. Returns:
  59. BrokerMetadata or None if not found
  60. """
  61. return self._brokers.get(broker_id)
  62. def partitions_for_topic(self, topic):
  63. """Return set of all partitions for topic (whether available or not)
  64. Arguments:
  65. topic (str): topic to check for partitions
  66. Returns:
  67. set: {partition (int), ...}
  68. """
  69. if topic not in self._partitions:
  70. return None
  71. return set(self._partitions[topic].keys())
  72. def available_partitions_for_topic(self, topic):
  73. """Return set of partitions with known leaders
  74. Arguments:
  75. topic (str): topic to check for partitions
  76. Returns:
  77. set: {partition (int), ...}
  78. """
  79. if topic not in self._partitions:
  80. return None
  81. return set([partition for partition, metadata
  82. in six.iteritems(self._partitions[topic])
  83. if metadata.leader != -1])
  84. def leader_for_partition(self, partition):
  85. """Return node_id of leader, -1 unavailable, None if unknown."""
  86. if partition.topic not in self._partitions:
  87. return None
  88. elif partition.partition not in self._partitions[partition.topic]:
  89. return None
  90. return self._partitions[partition.topic][partition.partition].leader
  91. def partitions_for_broker(self, broker_id):
  92. """Return TopicPartitions for which the broker is a leader.
  93. Arguments:
  94. broker_id (int): node id for a broker
  95. Returns:
  96. set: {TopicPartition, ...}
  97. """
  98. return self._broker_partitions.get(broker_id)
  99. def coordinator_for_group(self, group):
  100. """Return node_id of group coordinator.
  101. Arguments:
  102. group (str): name of consumer group
  103. Returns:
  104. int: node_id for group coordinator
  105. """
  106. return self._groups.get(group)
  107. def ttl(self):
  108. """Milliseconds until metadata should be refreshed"""
  109. now = time.time() * 1000
  110. if self._need_update:
  111. ttl = 0
  112. else:
  113. metadata_age = now - self._last_successful_refresh_ms
  114. ttl = self.config['metadata_max_age_ms'] - metadata_age
  115. retry_age = now - self._last_refresh_ms
  116. next_retry = self.config['retry_backoff_ms'] - retry_age
  117. return max(ttl, next_retry, 0)
  118. def refresh_backoff(self):
  119. """Return milliseconds to wait before attempting to retry after failure"""
  120. return self.config['retry_backoff_ms']
  121. def request_update(self):
  122. """Flags metadata for update, return Future()
  123. Actual update must be handled separately. This method will only
  124. change the reported ttl()
  125. Returns:
  126. kafka.future.Future (value will be the cluster object after update)
  127. """
  128. with self._lock:
  129. self._need_update = True
  130. if not self._future or self._future.is_done:
  131. self._future = Future()
  132. return self._future
  133. def topics(self, exclude_internal_topics=True):
  134. """Get set of known topics.
  135. Arguments:
  136. exclude_internal_topics (bool): Whether records from internal topics
  137. (such as offsets) should be exposed to the consumer. If set to
  138. True the only way to receive records from an internal topic is
  139. subscribing to it. Default True
  140. Returns:
  141. set: {topic (str), ...}
  142. """
  143. topics = set(self._partitions.keys())
  144. if exclude_internal_topics:
  145. return topics - self.internal_topics
  146. else:
  147. return topics
  148. def failed_update(self, exception):
  149. """Update cluster state given a failed MetadataRequest."""
  150. f = None
  151. with self._lock:
  152. if self._future:
  153. f = self._future
  154. self._future = None
  155. if f:
  156. f.failure(exception)
  157. self._last_refresh_ms = time.time() * 1000
  158. def update_metadata(self, metadata):
  159. """Update cluster state given a MetadataResponse.
  160. Arguments:
  161. metadata (MetadataResponse): broker response to a metadata request
  162. Returns: None
  163. """
  164. # In the common case where we ask for a single topic and get back an
  165. # error, we should fail the future
  166. if len(metadata.topics) == 1 and metadata.topics[0][0] != 0:
  167. error_code, topic = metadata.topics[0][:2]
  168. error = Errors.for_code(error_code)(topic)
  169. return self.failed_update(error)
  170. if not metadata.brokers:
  171. log.warning("No broker metadata found in MetadataResponse")
  172. _new_brokers = {}
  173. for broker in metadata.brokers:
  174. if metadata.API_VERSION == 0:
  175. node_id, host, port = broker
  176. rack = None
  177. else:
  178. node_id, host, port, rack = broker
  179. _new_brokers.update({
  180. node_id: BrokerMetadata(node_id, host, port, rack)
  181. })
  182. if metadata.API_VERSION == 0:
  183. _new_controller = None
  184. else:
  185. _new_controller = _new_brokers.get(metadata.controller_id)
  186. _new_partitions = {}
  187. _new_broker_partitions = collections.defaultdict(set)
  188. _new_unauthorized_topics = set()
  189. _new_internal_topics = set()
  190. for topic_data in metadata.topics:
  191. if metadata.API_VERSION == 0:
  192. error_code, topic, partitions = topic_data
  193. is_internal = False
  194. else:
  195. error_code, topic, is_internal, partitions = topic_data
  196. if is_internal:
  197. _new_internal_topics.add(topic)
  198. error_type = Errors.for_code(error_code)
  199. if error_type is Errors.NoError:
  200. _new_partitions[topic] = {}
  201. for p_error, partition, leader, replicas, isr in partitions:
  202. _new_partitions[topic][partition] = PartitionMetadata(
  203. topic=topic, partition=partition, leader=leader,
  204. replicas=replicas, isr=isr, error=p_error)
  205. if leader != -1:
  206. _new_broker_partitions[leader].add(
  207. TopicPartition(topic, partition))
  208. elif error_type is Errors.LeaderNotAvailableError:
  209. log.warning("Topic %s is not available during auto-create"
  210. " initialization", topic)
  211. elif error_type is Errors.UnknownTopicOrPartitionError:
  212. log.error("Topic %s not found in cluster metadata", topic)
  213. elif error_type is Errors.TopicAuthorizationFailedError:
  214. log.error("Topic %s is not authorized for this client", topic)
  215. _new_unauthorized_topics.add(topic)
  216. elif error_type is Errors.InvalidTopicError:
  217. log.error("'%s' is not a valid topic name", topic)
  218. else:
  219. log.error("Error fetching metadata for topic %s: %s",
  220. topic, error_type)
  221. with self._lock:
  222. self._brokers = _new_brokers
  223. self.controller = _new_controller
  224. self._partitions = _new_partitions
  225. self._broker_partitions = _new_broker_partitions
  226. self.unauthorized_topics = _new_unauthorized_topics
  227. self.internal_topics = _new_internal_topics
  228. f = None
  229. if self._future:
  230. f = self._future
  231. self._future = None
  232. self._need_update = False
  233. now = time.time() * 1000
  234. self._last_refresh_ms = now
  235. self._last_successful_refresh_ms = now
  236. if f:
  237. f.success(self)
  238. log.debug("Updated cluster metadata to %s", self)
  239. for listener in self._listeners:
  240. listener(self)
  241. def add_listener(self, listener):
  242. """Add a callback function to be called on each metadata update"""
  243. self._listeners.add(listener)
  244. def remove_listener(self, listener):
  245. """Remove a previously added listener callback"""
  246. self._listeners.remove(listener)
  247. def add_group_coordinator(self, group, response):
  248. """Update with metadata for a group coordinator
  249. Arguments:
  250. group (str): name of group from GroupCoordinatorRequest
  251. response (GroupCoordinatorResponse): broker response
  252. Returns:
  253. bool: True if metadata is updated, False on error
  254. """
  255. log.debug("Updating coordinator for %s: %s", group, response)
  256. error_type = Errors.for_code(response.error_code)
  257. if error_type is not Errors.NoError:
  258. log.error("GroupCoordinatorResponse error: %s", error_type)
  259. self._groups[group] = -1
  260. return False
  261. node_id = response.coordinator_id
  262. coordinator = BrokerMetadata(
  263. response.coordinator_id,
  264. response.host,
  265. response.port,
  266. None)
  267. # Assume that group coordinators are just brokers
  268. # (this is true now, but could diverge in future)
  269. if node_id not in self._brokers:
  270. self._brokers[node_id] = coordinator
  271. # If this happens, either brokers have moved without
  272. # changing IDs, or our assumption above is wrong
  273. else:
  274. node = self._brokers[node_id]
  275. if coordinator.host != node.host or coordinator.port != node.port:
  276. log.error("GroupCoordinator metadata conflicts with existing"
  277. " broker metadata. Coordinator: %s, Broker: %s",
  278. coordinator, node)
  279. self._groups[group] = node_id
  280. return False
  281. log.info("Group coordinator for %s is %s", group, coordinator)
  282. self._groups[group] = node_id
  283. return True
  284. def with_partitions(self, partitions_to_add):
  285. """Returns a copy of cluster metadata with partitions added"""
  286. new_metadata = ClusterMetadata(**self.config)
  287. new_metadata._brokers = copy.deepcopy(self._brokers)
  288. new_metadata._partitions = copy.deepcopy(self._partitions)
  289. new_metadata._broker_partitions = copy.deepcopy(self._broker_partitions)
  290. new_metadata._groups = copy.deepcopy(self._groups)
  291. new_metadata.internal_topics = copy.deepcopy(self.internal_topics)
  292. new_metadata.unauthorized_topics = copy.deepcopy(self.unauthorized_topics)
  293. for partition in partitions_to_add:
  294. new_metadata._partitions[partition.topic][partition.partition] = partition
  295. if partition.leader is not None and partition.leader != -1:
  296. new_metadata._broker_partitions[partition.leader].add(
  297. TopicPartition(partition.topic, partition.partition))
  298. return new_metadata
  299. def __str__(self):
  300. return 'ClusterMetadata(brokers: %d, topics: %d, groups: %d)' % \
  301. (len(self._brokers), len(self._partitions), len(self._groups))