topology_description.py 23 KB


  1. # Copyright 2014-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Represent a deployment of MongoDB servers."""
  15. from collections import namedtuple
  16. from pymongo import common
  17. from pymongo.errors import ConfigurationError
  18. from pymongo.read_preferences import ReadPreference
  19. from pymongo.server_description import ServerDescription
  20. from pymongo.server_selectors import Selection
  21. from pymongo.server_type import SERVER_TYPE
  22. # Enumeration for various kinds of MongoDB cluster topologies.
  23. TOPOLOGY_TYPE = namedtuple('TopologyType', [
  24. 'Single', 'ReplicaSetNoPrimary', 'ReplicaSetWithPrimary', 'Sharded',
  25. 'Unknown', 'LoadBalanced'])(*range(6))
  26. # Topologies compatible with SRV record polling.
  27. SRV_POLLING_TOPOLOGIES = (TOPOLOGY_TYPE.Unknown, TOPOLOGY_TYPE.Sharded)
  28. class TopologyDescription(object):
  29. def __init__(self,
  30. topology_type,
  31. server_descriptions,
  32. replica_set_name,
  33. max_set_version,
  34. max_election_id,
  35. topology_settings):
  36. """Representation of a deployment of MongoDB servers.
  37. :Parameters:
  38. - `topology_type`: initial type
  39. - `server_descriptions`: dict of (address, ServerDescription) for
  40. all seeds
  41. - `replica_set_name`: replica set name or None
  42. - `max_set_version`: greatest setVersion seen from a primary, or None
  43. - `max_election_id`: greatest electionId seen from a primary, or None
  44. - `topology_settings`: a TopologySettings
  45. """
  46. self._topology_type = topology_type
  47. self._replica_set_name = replica_set_name
  48. self._server_descriptions = server_descriptions
  49. self._max_set_version = max_set_version
  50. self._max_election_id = max_election_id
  51. # The heartbeat_frequency is used in staleness estimates.
  52. self._topology_settings = topology_settings
  53. # Is PyMongo compatible with all servers' wire protocols?
  54. self._incompatible_err = None
  55. if self._topology_type != TOPOLOGY_TYPE.LoadBalanced:
  56. self._init_incompatible_err()
  57. # Server Discovery And Monitoring Spec: Whenever a client updates the
  58. # TopologyDescription from a hello response, it MUST set
  59. # TopologyDescription.logicalSessionTimeoutMinutes to the smallest
  60. # logicalSessionTimeoutMinutes value among ServerDescriptions of all
  61. # data-bearing server types. If any have a null
  62. # logicalSessionTimeoutMinutes, then
  63. # TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
  64. readable_servers = self.readable_servers
  65. if not readable_servers:
  66. self._ls_timeout_minutes = None
  67. elif any(s.logical_session_timeout_minutes is None
  68. for s in readable_servers):
  69. self._ls_timeout_minutes = None
  70. else:
  71. self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
  72. for s in readable_servers)
  73. def _init_incompatible_err(self):
  74. """Internal compatibility check for non-load balanced topologies."""
  75. for s in self._server_descriptions.values():
  76. if not s.is_server_type_known:
  77. continue
  78. # s.min/max_wire_version is the server's wire protocol.
  79. # MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports.
  80. server_too_new = (
  81. # Server too new.
  82. s.min_wire_version is not None
  83. and s.min_wire_version > common.MAX_SUPPORTED_WIRE_VERSION)
  84. server_too_old = (
  85. # Server too old.
  86. s.max_wire_version is not None
  87. and s.max_wire_version < common.MIN_SUPPORTED_WIRE_VERSION)
  88. if server_too_new:
  89. self._incompatible_err = (
  90. "Server at %s:%d requires wire version %d, but this "
  91. "version of PyMongo only supports up to %d."
  92. % (s.address[0], s.address[1],
  93. s.min_wire_version, common.MAX_SUPPORTED_WIRE_VERSION))
  94. elif server_too_old:
  95. self._incompatible_err = (
  96. "Server at %s:%d reports wire version %d, but this "
  97. "version of PyMongo requires at least %d (MongoDB %s)."
  98. % (s.address[0], s.address[1],
  99. s.max_wire_version,
  100. common.MIN_SUPPORTED_WIRE_VERSION,
  101. common.MIN_SUPPORTED_SERVER_VERSION))
  102. break
  103. def check_compatible(self):
  104. """Raise ConfigurationError if any server is incompatible.
  105. A server is incompatible if its wire protocol version range does not
  106. overlap with PyMongo's.
  107. """
  108. if self._incompatible_err:
  109. raise ConfigurationError(self._incompatible_err)
  110. def has_server(self, address):
  111. return address in self._server_descriptions
  112. def reset_server(self, address):
  113. """A copy of this description, with one server marked Unknown."""
  114. unknown_sd = self._server_descriptions[address].to_unknown()
  115. return updated_topology_description(self, unknown_sd)
  116. def reset(self):
  117. """A copy of this description, with all servers marked Unknown."""
  118. if self._topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
  119. topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
  120. else:
  121. topology_type = self._topology_type
  122. # The default ServerDescription's type is Unknown.
  123. sds = dict((address, ServerDescription(address))
  124. for address in self._server_descriptions)
  125. return TopologyDescription(
  126. topology_type,
  127. sds,
  128. self._replica_set_name,
  129. self._max_set_version,
  130. self._max_election_id,
  131. self._topology_settings)
  132. def server_descriptions(self):
  133. """Dict of (address,
  134. :class:`~pymongo.server_description.ServerDescription`)."""
  135. return self._server_descriptions.copy()
  136. @property
  137. def topology_type(self):
  138. """The type of this topology."""
  139. return self._topology_type
  140. @property
  141. def topology_type_name(self):
  142. """The topology type as a human readable string.
  143. .. versionadded:: 3.4
  144. """
  145. return TOPOLOGY_TYPE._fields[self._topology_type]
  146. @property
  147. def replica_set_name(self):
  148. """The replica set name."""
  149. return self._replica_set_name
  150. @property
  151. def max_set_version(self):
  152. """Greatest setVersion seen from a primary, or None."""
  153. return self._max_set_version
  154. @property
  155. def max_election_id(self):
  156. """Greatest electionId seen from a primary, or None."""
  157. return self._max_election_id
  158. @property
  159. def logical_session_timeout_minutes(self):
  160. """Minimum logical session timeout, or None."""
  161. return self._ls_timeout_minutes
  162. @property
  163. def known_servers(self):
  164. """List of Servers of types besides Unknown."""
  165. return [s for s in self._server_descriptions.values()
  166. if s.is_server_type_known]
  167. @property
  168. def has_known_servers(self):
  169. """Whether there are any Servers of types besides Unknown."""
  170. return any(s for s in self._server_descriptions.values()
  171. if s.is_server_type_known)
  172. @property
  173. def readable_servers(self):
  174. """List of readable Servers."""
  175. return [s for s in self._server_descriptions.values() if s.is_readable]
  176. @property
  177. def common_wire_version(self):
  178. """Minimum of all servers' max wire versions, or None."""
  179. servers = self.known_servers
  180. if servers:
  181. return min(s.max_wire_version for s in self.known_servers)
  182. return None
  183. @property
  184. def heartbeat_frequency(self):
  185. return self._topology_settings.heartbeat_frequency
  186. def apply_selector(self, selector, address, custom_selector=None):
  187. def apply_local_threshold(selection):
  188. if not selection:
  189. return []
  190. settings = self._topology_settings
  191. # Round trip time in seconds.
  192. fastest = min(
  193. s.round_trip_time for s in selection.server_descriptions)
  194. threshold = settings.local_threshold_ms / 1000.0
  195. return [s for s in selection.server_descriptions
  196. if (s.round_trip_time - fastest) <= threshold]
  197. if getattr(selector, 'min_wire_version', 0):
  198. common_wv = self.common_wire_version
  199. if common_wv and common_wv < selector.min_wire_version:
  200. raise ConfigurationError(
  201. "%s requires min wire version %d, but topology's min"
  202. " wire version is %d" % (selector,
  203. selector.min_wire_version,
  204. common_wv))
  205. if self.topology_type in (TOPOLOGY_TYPE.Single,
  206. TOPOLOGY_TYPE.LoadBalanced):
  207. # Ignore selectors for standalone and load balancer mode.
  208. return self.known_servers
  209. elif address:
  210. # Ignore selectors when explicit address is requested.
  211. description = self.server_descriptions().get(address)
  212. return [description] if description else []
  213. elif self.topology_type == TOPOLOGY_TYPE.Sharded:
  214. # Ignore read preference.
  215. selection = Selection.from_topology_description(self)
  216. else:
  217. selection = selector(Selection.from_topology_description(self))
  218. # Apply custom selector followed by localThresholdMS.
  219. if custom_selector is not None and selection:
  220. selection = selection.with_server_descriptions(
  221. custom_selector(selection.server_descriptions))
  222. return apply_local_threshold(selection)
  223. def has_readable_server(self, read_preference=ReadPreference.PRIMARY):
  224. """Does this topology have any readable servers available matching the
  225. given read preference?
  226. :Parameters:
  227. - `read_preference`: an instance of a read preference from
  228. :mod:`~pymongo.read_preferences`. Defaults to
  229. :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
  230. .. note:: When connected directly to a single server this method
  231. always returns ``True``.
  232. .. versionadded:: 3.4
  233. """
  234. common.validate_read_preference("read_preference", read_preference)
  235. return any(self.apply_selector(read_preference, None))
  236. def has_writable_server(self):
  237. """Does this topology have a writable server available?
  238. .. note:: When connected directly to a single server this method
  239. always returns ``True``.
  240. .. versionadded:: 3.4
  241. """
  242. return self.has_readable_server(ReadPreference.PRIMARY)
  243. def __repr__(self):
  244. # Sort the servers by address.
  245. servers = sorted(self._server_descriptions.values(),
  246. key=lambda sd: sd.address)
  247. return "<%s id: %s, topology_type: %s, servers: %r>" % (
  248. self.__class__.__name__, self._topology_settings._topology_id,
  249. self.topology_type_name, servers)
  250. # If topology type is Unknown and we receive a hello response, what should
  251. # the new topology type be?
  252. _SERVER_TYPE_TO_TOPOLOGY_TYPE = {
  253. SERVER_TYPE.Mongos: TOPOLOGY_TYPE.Sharded,
  254. SERVER_TYPE.RSPrimary: TOPOLOGY_TYPE.ReplicaSetWithPrimary,
  255. SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
  256. SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
  257. SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
  258. # Note: SERVER_TYPE.LoadBalancer and Unknown are intentionally left out.
  259. }
  260. def updated_topology_description(topology_description, server_description):
  261. """Return an updated copy of a TopologyDescription.
  262. :Parameters:
  263. - `topology_description`: the current TopologyDescription
  264. - `server_description`: a new ServerDescription that resulted from
  265. a hello call
  266. Called after attempting (successfully or not) to call hello on the
  267. server at server_description.address. Does not modify topology_description.
  268. """
  269. address = server_description.address
  270. # These values will be updated, if necessary, to form the new
  271. # TopologyDescription.
  272. topology_type = topology_description.topology_type
  273. set_name = topology_description.replica_set_name
  274. max_set_version = topology_description.max_set_version
  275. max_election_id = topology_description.max_election_id
  276. server_type = server_description.server_type
  277. # Don't mutate the original dict of server descriptions; copy it.
  278. sds = topology_description.server_descriptions()
  279. # Replace this server's description with the new one.
  280. sds[address] = server_description
  281. if topology_type == TOPOLOGY_TYPE.Single:
  282. # Set server type to Unknown if replica set name does not match.
  283. if (set_name is not None and
  284. set_name != server_description.replica_set_name):
  285. error = ConfigurationError(
  286. "client is configured to connect to a replica set named "
  287. "'%s' but this node belongs to a set named '%s'" % (
  288. set_name, server_description.replica_set_name))
  289. sds[address] = server_description.to_unknown(error=error)
  290. # Single type never changes.
  291. return TopologyDescription(
  292. TOPOLOGY_TYPE.Single,
  293. sds,
  294. set_name,
  295. max_set_version,
  296. max_election_id,
  297. topology_description._topology_settings)
  298. if topology_type == TOPOLOGY_TYPE.Unknown:
  299. if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.LoadBalancer):
  300. if len(topology_description._topology_settings.seeds) == 1:
  301. topology_type = TOPOLOGY_TYPE.Single
  302. else:
  303. # Remove standalone from Topology when given multiple seeds.
  304. sds.pop(address)
  305. elif server_type not in (SERVER_TYPE.Unknown, SERVER_TYPE.RSGhost):
  306. topology_type = _SERVER_TYPE_TO_TOPOLOGY_TYPE[server_type]
  307. if topology_type == TOPOLOGY_TYPE.Sharded:
  308. if server_type not in (SERVER_TYPE.Mongos, SERVER_TYPE.Unknown):
  309. sds.pop(address)
  310. elif topology_type == TOPOLOGY_TYPE.ReplicaSetNoPrimary:
  311. if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
  312. sds.pop(address)
  313. elif server_type == SERVER_TYPE.RSPrimary:
  314. (topology_type,
  315. set_name,
  316. max_set_version,
  317. max_election_id) = _update_rs_from_primary(sds,
  318. set_name,
  319. server_description,
  320. max_set_version,
  321. max_election_id)
  322. elif server_type in (
  323. SERVER_TYPE.RSSecondary,
  324. SERVER_TYPE.RSArbiter,
  325. SERVER_TYPE.RSOther):
  326. topology_type, set_name = _update_rs_no_primary_from_member(
  327. sds, set_name, server_description)
  328. elif topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
  329. if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
  330. sds.pop(address)
  331. topology_type = _check_has_primary(sds)
  332. elif server_type == SERVER_TYPE.RSPrimary:
  333. (topology_type,
  334. set_name,
  335. max_set_version,
  336. max_election_id) = _update_rs_from_primary(sds,
  337. set_name,
  338. server_description,
  339. max_set_version,
  340. max_election_id)
  341. elif server_type in (
  342. SERVER_TYPE.RSSecondary,
  343. SERVER_TYPE.RSArbiter,
  344. SERVER_TYPE.RSOther):
  345. topology_type = _update_rs_with_primary_from_member(
  346. sds, set_name, server_description)
  347. else:
  348. # Server type is Unknown or RSGhost: did we just lose the primary?
  349. topology_type = _check_has_primary(sds)
  350. # Return updated copy.
  351. return TopologyDescription(topology_type,
  352. sds,
  353. set_name,
  354. max_set_version,
  355. max_election_id,
  356. topology_description._topology_settings)
  357. def _updated_topology_description_srv_polling(topology_description, seedlist):
  358. """Return an updated copy of a TopologyDescription.
  359. :Parameters:
  360. - `topology_description`: the current TopologyDescription
  361. - `seedlist`: a list of new seeds new ServerDescription that resulted from
  362. a hello call
  363. """
  364. # Create a copy of the server descriptions.
  365. sds = topology_description.server_descriptions()
  366. # If seeds haven't changed, don't do anything.
  367. if set(sds.keys()) == set(seedlist):
  368. return topology_description
  369. # Add SDs corresponding to servers recently added to the SRV record.
  370. for address in seedlist:
  371. if address not in sds:
  372. sds[address] = ServerDescription(address)
  373. # Remove SDs corresponding to servers no longer part of the SRV record.
  374. for address in list(sds.keys()):
  375. if address not in seedlist:
  376. sds.pop(address)
  377. return TopologyDescription(
  378. topology_description.topology_type,
  379. sds,
  380. topology_description.replica_set_name,
  381. topology_description.max_set_version,
  382. topology_description.max_election_id,
  383. topology_description._topology_settings)
  384. def _update_rs_from_primary(
  385. sds,
  386. replica_set_name,
  387. server_description,
  388. max_set_version,
  389. max_election_id):
  390. """Update topology description from a primary's hello response.
  391. Pass in a dict of ServerDescriptions, current replica set name, the
  392. ServerDescription we are processing, and the TopologyDescription's
  393. max_set_version and max_election_id if any.
  394. Returns (new topology type, new replica_set_name, new max_set_version,
  395. new max_election_id).
  396. """
  397. if replica_set_name is None:
  398. replica_set_name = server_description.replica_set_name
  399. elif replica_set_name != server_description.replica_set_name:
  400. # We found a primary but it doesn't have the replica_set_name
  401. # provided by the user.
  402. sds.pop(server_description.address)
  403. return (_check_has_primary(sds),
  404. replica_set_name,
  405. max_set_version,
  406. max_election_id)
  407. max_election_tuple = max_set_version, max_election_id
  408. if None not in server_description.election_tuple:
  409. if (None not in max_election_tuple and
  410. max_election_tuple > server_description.election_tuple):
  411. # Stale primary, set to type Unknown.
  412. sds[server_description.address] = server_description.to_unknown()
  413. return (_check_has_primary(sds),
  414. replica_set_name,
  415. max_set_version,
  416. max_election_id)
  417. max_election_id = server_description.election_id
  418. if (server_description.set_version is not None and
  419. (max_set_version is None or
  420. server_description.set_version > max_set_version)):
  421. max_set_version = server_description.set_version
  422. # We've heard from the primary. Is it the same primary as before?
  423. for server in sds.values():
  424. if (server.server_type is SERVER_TYPE.RSPrimary
  425. and server.address != server_description.address):
  426. # Reset old primary's type to Unknown.
  427. sds[server.address] = server.to_unknown()
  428. # There can be only one prior primary.
  429. break
  430. # Discover new hosts from this primary's response.
  431. for new_address in server_description.all_hosts:
  432. if new_address not in sds:
  433. sds[new_address] = ServerDescription(new_address)
  434. # Remove hosts not in the response.
  435. for addr in set(sds) - server_description.all_hosts:
  436. sds.pop(addr)
  437. # If the host list differs from the seed list, we may not have a primary
  438. # after all.
  439. return (_check_has_primary(sds),
  440. replica_set_name,
  441. max_set_version,
  442. max_election_id)
  443. def _update_rs_with_primary_from_member(
  444. sds,
  445. replica_set_name,
  446. server_description):
  447. """RS with known primary. Process a response from a non-primary.
  448. Pass in a dict of ServerDescriptions, current replica set name, and the
  449. ServerDescription we are processing.
  450. Returns new topology type.
  451. """
  452. assert replica_set_name is not None
  453. if replica_set_name != server_description.replica_set_name:
  454. sds.pop(server_description.address)
  455. elif (server_description.me and
  456. server_description.address != server_description.me):
  457. sds.pop(server_description.address)
  458. # Had this member been the primary?
  459. return _check_has_primary(sds)
  460. def _update_rs_no_primary_from_member(
  461. sds,
  462. replica_set_name,
  463. server_description):
  464. """RS without known primary. Update from a non-primary's response.
  465. Pass in a dict of ServerDescriptions, current replica set name, and the
  466. ServerDescription we are processing.
  467. Returns (new topology type, new replica_set_name).
  468. """
  469. topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
  470. if replica_set_name is None:
  471. replica_set_name = server_description.replica_set_name
  472. elif replica_set_name != server_description.replica_set_name:
  473. sds.pop(server_description.address)
  474. return topology_type, replica_set_name
  475. # This isn't the primary's response, so don't remove any servers
  476. # it doesn't report. Only add new servers.
  477. for address in server_description.all_hosts:
  478. if address not in sds:
  479. sds[address] = ServerDescription(address)
  480. if (server_description.me and
  481. server_description.address != server_description.me):
  482. sds.pop(server_description.address)
  483. return topology_type, replica_set_name
  484. def _check_has_primary(sds):
  485. """Current topology type is ReplicaSetWithPrimary. Is primary still known?
  486. Pass in a dict of ServerDescriptions.
  487. Returns new topology type.
  488. """
  489. for s in sds.values():
  490. if s.server_type == SERVER_TYPE.RSPrimary:
  491. return TOPOLOGY_TYPE.ReplicaSetWithPrimary
  492. else:
  493. return TOPOLOGY_TYPE.ReplicaSetNoPrimary