123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608 |
- # Copyright 2014-present MongoDB, Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you
- # may not use this file except in compliance with the License. You
- # may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied. See the License for the specific language governing
- # permissions and limitations under the License.
- """Represent a deployment of MongoDB servers."""
- from collections import namedtuple
- from pymongo import common
- from pymongo.errors import ConfigurationError
- from pymongo.read_preferences import ReadPreference
- from pymongo.server_description import ServerDescription
- from pymongo.server_selectors import Selection
- from pymongo.server_type import SERVER_TYPE
- # Enumeration for various kinds of MongoDB cluster topologies.
- TOPOLOGY_TYPE = namedtuple('TopologyType', [
- 'Single', 'ReplicaSetNoPrimary', 'ReplicaSetWithPrimary', 'Sharded',
- 'Unknown', 'LoadBalanced'])(*range(6))
- # Topologies compatible with SRV record polling.
- SRV_POLLING_TOPOLOGIES = (TOPOLOGY_TYPE.Unknown, TOPOLOGY_TYPE.Sharded)
- class TopologyDescription(object):
- def __init__(self,
- topology_type,
- server_descriptions,
- replica_set_name,
- max_set_version,
- max_election_id,
- topology_settings):
- """Representation of a deployment of MongoDB servers.
- :Parameters:
- - `topology_type`: initial type
- - `server_descriptions`: dict of (address, ServerDescription) for
- all seeds
- - `replica_set_name`: replica set name or None
- - `max_set_version`: greatest setVersion seen from a primary, or None
- - `max_election_id`: greatest electionId seen from a primary, or None
- - `topology_settings`: a TopologySettings
- """
- self._topology_type = topology_type
- self._replica_set_name = replica_set_name
- self._server_descriptions = server_descriptions
- self._max_set_version = max_set_version
- self._max_election_id = max_election_id
- # The heartbeat_frequency is used in staleness estimates.
- self._topology_settings = topology_settings
- # Is PyMongo compatible with all servers' wire protocols?
- self._incompatible_err = None
- if self._topology_type != TOPOLOGY_TYPE.LoadBalanced:
- self._init_incompatible_err()
- # Server Discovery And Monitoring Spec: Whenever a client updates the
- # TopologyDescription from a hello response, it MUST set
- # TopologyDescription.logicalSessionTimeoutMinutes to the smallest
- # logicalSessionTimeoutMinutes value among ServerDescriptions of all
- # data-bearing server types. If any have a null
- # logicalSessionTimeoutMinutes, then
- # TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
- readable_servers = self.readable_servers
- if not readable_servers:
- self._ls_timeout_minutes = None
- elif any(s.logical_session_timeout_minutes is None
- for s in readable_servers):
- self._ls_timeout_minutes = None
- else:
- self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
- for s in readable_servers)
- def _init_incompatible_err(self):
- """Internal compatibility check for non-load balanced topologies."""
- for s in self._server_descriptions.values():
- if not s.is_server_type_known:
- continue
- # s.min/max_wire_version is the server's wire protocol.
- # MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports.
- server_too_new = (
- # Server too new.
- s.min_wire_version is not None
- and s.min_wire_version > common.MAX_SUPPORTED_WIRE_VERSION)
- server_too_old = (
- # Server too old.
- s.max_wire_version is not None
- and s.max_wire_version < common.MIN_SUPPORTED_WIRE_VERSION)
- if server_too_new:
- self._incompatible_err = (
- "Server at %s:%d requires wire version %d, but this "
- "version of PyMongo only supports up to %d."
- % (s.address[0], s.address[1],
- s.min_wire_version, common.MAX_SUPPORTED_WIRE_VERSION))
- elif server_too_old:
- self._incompatible_err = (
- "Server at %s:%d reports wire version %d, but this "
- "version of PyMongo requires at least %d (MongoDB %s)."
- % (s.address[0], s.address[1],
- s.max_wire_version,
- common.MIN_SUPPORTED_WIRE_VERSION,
- common.MIN_SUPPORTED_SERVER_VERSION))
- break
- def check_compatible(self):
- """Raise ConfigurationError if any server is incompatible.
- A server is incompatible if its wire protocol version range does not
- overlap with PyMongo's.
- """
- if self._incompatible_err:
- raise ConfigurationError(self._incompatible_err)
- def has_server(self, address):
- return address in self._server_descriptions
- def reset_server(self, address):
- """A copy of this description, with one server marked Unknown."""
- unknown_sd = self._server_descriptions[address].to_unknown()
- return updated_topology_description(self, unknown_sd)
- def reset(self):
- """A copy of this description, with all servers marked Unknown."""
- if self._topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
- topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
- else:
- topology_type = self._topology_type
- # The default ServerDescription's type is Unknown.
- sds = dict((address, ServerDescription(address))
- for address in self._server_descriptions)
- return TopologyDescription(
- topology_type,
- sds,
- self._replica_set_name,
- self._max_set_version,
- self._max_election_id,
- self._topology_settings)
- def server_descriptions(self):
- """Dict of (address,
- :class:`~pymongo.server_description.ServerDescription`)."""
- return self._server_descriptions.copy()
- @property
- def topology_type(self):
- """The type of this topology."""
- return self._topology_type
- @property
- def topology_type_name(self):
- """The topology type as a human readable string.
- .. versionadded:: 3.4
- """
- return TOPOLOGY_TYPE._fields[self._topology_type]
- @property
- def replica_set_name(self):
- """The replica set name."""
- return self._replica_set_name
- @property
- def max_set_version(self):
- """Greatest setVersion seen from a primary, or None."""
- return self._max_set_version
- @property
- def max_election_id(self):
- """Greatest electionId seen from a primary, or None."""
- return self._max_election_id
- @property
- def logical_session_timeout_minutes(self):
- """Minimum logical session timeout, or None."""
- return self._ls_timeout_minutes
- @property
- def known_servers(self):
- """List of Servers of types besides Unknown."""
- return [s for s in self._server_descriptions.values()
- if s.is_server_type_known]
- @property
- def has_known_servers(self):
- """Whether there are any Servers of types besides Unknown."""
- return any(s for s in self._server_descriptions.values()
- if s.is_server_type_known)
- @property
- def readable_servers(self):
- """List of readable Servers."""
- return [s for s in self._server_descriptions.values() if s.is_readable]
- @property
- def common_wire_version(self):
- """Minimum of all servers' max wire versions, or None."""
- servers = self.known_servers
- if servers:
- return min(s.max_wire_version for s in self.known_servers)
- return None
- @property
- def heartbeat_frequency(self):
- return self._topology_settings.heartbeat_frequency
- def apply_selector(self, selector, address, custom_selector=None):
- def apply_local_threshold(selection):
- if not selection:
- return []
- settings = self._topology_settings
- # Round trip time in seconds.
- fastest = min(
- s.round_trip_time for s in selection.server_descriptions)
- threshold = settings.local_threshold_ms / 1000.0
- return [s for s in selection.server_descriptions
- if (s.round_trip_time - fastest) <= threshold]
- if getattr(selector, 'min_wire_version', 0):
- common_wv = self.common_wire_version
- if common_wv and common_wv < selector.min_wire_version:
- raise ConfigurationError(
- "%s requires min wire version %d, but topology's min"
- " wire version is %d" % (selector,
- selector.min_wire_version,
- common_wv))
- if self.topology_type in (TOPOLOGY_TYPE.Single,
- TOPOLOGY_TYPE.LoadBalanced):
- # Ignore selectors for standalone and load balancer mode.
- return self.known_servers
- elif address:
- # Ignore selectors when explicit address is requested.
- description = self.server_descriptions().get(address)
- return [description] if description else []
- elif self.topology_type == TOPOLOGY_TYPE.Sharded:
- # Ignore read preference.
- selection = Selection.from_topology_description(self)
- else:
- selection = selector(Selection.from_topology_description(self))
- # Apply custom selector followed by localThresholdMS.
- if custom_selector is not None and selection:
- selection = selection.with_server_descriptions(
- custom_selector(selection.server_descriptions))
- return apply_local_threshold(selection)
- def has_readable_server(self, read_preference=ReadPreference.PRIMARY):
- """Does this topology have any readable servers available matching the
- given read preference?
- :Parameters:
- - `read_preference`: an instance of a read preference from
- :mod:`~pymongo.read_preferences`. Defaults to
- :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
- .. note:: When connected directly to a single server this method
- always returns ``True``.
- .. versionadded:: 3.4
- """
- common.validate_read_preference("read_preference", read_preference)
- return any(self.apply_selector(read_preference, None))
- def has_writable_server(self):
- """Does this topology have a writable server available?
- .. note:: When connected directly to a single server this method
- always returns ``True``.
- .. versionadded:: 3.4
- """
- return self.has_readable_server(ReadPreference.PRIMARY)
- def __repr__(self):
- # Sort the servers by address.
- servers = sorted(self._server_descriptions.values(),
- key=lambda sd: sd.address)
- return "<%s id: %s, topology_type: %s, servers: %r>" % (
- self.__class__.__name__, self._topology_settings._topology_id,
- self.topology_type_name, servers)
- # If topology type is Unknown and we receive a hello response, what should
- # the new topology type be?
- _SERVER_TYPE_TO_TOPOLOGY_TYPE = {
- SERVER_TYPE.Mongos: TOPOLOGY_TYPE.Sharded,
- SERVER_TYPE.RSPrimary: TOPOLOGY_TYPE.ReplicaSetWithPrimary,
- SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
- SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
- SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
- # Note: SERVER_TYPE.LoadBalancer and Unknown are intentionally left out.
- }
- def updated_topology_description(topology_description, server_description):
- """Return an updated copy of a TopologyDescription.
- :Parameters:
- - `topology_description`: the current TopologyDescription
- - `server_description`: a new ServerDescription that resulted from
- a hello call
- Called after attempting (successfully or not) to call hello on the
- server at server_description.address. Does not modify topology_description.
- """
- address = server_description.address
- # These values will be updated, if necessary, to form the new
- # TopologyDescription.
- topology_type = topology_description.topology_type
- set_name = topology_description.replica_set_name
- max_set_version = topology_description.max_set_version
- max_election_id = topology_description.max_election_id
- server_type = server_description.server_type
- # Don't mutate the original dict of server descriptions; copy it.
- sds = topology_description.server_descriptions()
- # Replace this server's description with the new one.
- sds[address] = server_description
- if topology_type == TOPOLOGY_TYPE.Single:
- # Set server type to Unknown if replica set name does not match.
- if (set_name is not None and
- set_name != server_description.replica_set_name):
- error = ConfigurationError(
- "client is configured to connect to a replica set named "
- "'%s' but this node belongs to a set named '%s'" % (
- set_name, server_description.replica_set_name))
- sds[address] = server_description.to_unknown(error=error)
- # Single type never changes.
- return TopologyDescription(
- TOPOLOGY_TYPE.Single,
- sds,
- set_name,
- max_set_version,
- max_election_id,
- topology_description._topology_settings)
- if topology_type == TOPOLOGY_TYPE.Unknown:
- if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.LoadBalancer):
- if len(topology_description._topology_settings.seeds) == 1:
- topology_type = TOPOLOGY_TYPE.Single
- else:
- # Remove standalone from Topology when given multiple seeds.
- sds.pop(address)
- elif server_type not in (SERVER_TYPE.Unknown, SERVER_TYPE.RSGhost):
- topology_type = _SERVER_TYPE_TO_TOPOLOGY_TYPE[server_type]
- if topology_type == TOPOLOGY_TYPE.Sharded:
- if server_type not in (SERVER_TYPE.Mongos, SERVER_TYPE.Unknown):
- sds.pop(address)
- elif topology_type == TOPOLOGY_TYPE.ReplicaSetNoPrimary:
- if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
- sds.pop(address)
- elif server_type == SERVER_TYPE.RSPrimary:
- (topology_type,
- set_name,
- max_set_version,
- max_election_id) = _update_rs_from_primary(sds,
- set_name,
- server_description,
- max_set_version,
- max_election_id)
- elif server_type in (
- SERVER_TYPE.RSSecondary,
- SERVER_TYPE.RSArbiter,
- SERVER_TYPE.RSOther):
- topology_type, set_name = _update_rs_no_primary_from_member(
- sds, set_name, server_description)
- elif topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
- if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
- sds.pop(address)
- topology_type = _check_has_primary(sds)
- elif server_type == SERVER_TYPE.RSPrimary:
- (topology_type,
- set_name,
- max_set_version,
- max_election_id) = _update_rs_from_primary(sds,
- set_name,
- server_description,
- max_set_version,
- max_election_id)
- elif server_type in (
- SERVER_TYPE.RSSecondary,
- SERVER_TYPE.RSArbiter,
- SERVER_TYPE.RSOther):
- topology_type = _update_rs_with_primary_from_member(
- sds, set_name, server_description)
- else:
- # Server type is Unknown or RSGhost: did we just lose the primary?
- topology_type = _check_has_primary(sds)
- # Return updated copy.
- return TopologyDescription(topology_type,
- sds,
- set_name,
- max_set_version,
- max_election_id,
- topology_description._topology_settings)
- def _updated_topology_description_srv_polling(topology_description, seedlist):
- """Return an updated copy of a TopologyDescription.
- :Parameters:
- - `topology_description`: the current TopologyDescription
- - `seedlist`: a list of new seeds new ServerDescription that resulted from
- a hello call
- """
- # Create a copy of the server descriptions.
- sds = topology_description.server_descriptions()
- # If seeds haven't changed, don't do anything.
- if set(sds.keys()) == set(seedlist):
- return topology_description
- # Add SDs corresponding to servers recently added to the SRV record.
- for address in seedlist:
- if address not in sds:
- sds[address] = ServerDescription(address)
- # Remove SDs corresponding to servers no longer part of the SRV record.
- for address in list(sds.keys()):
- if address not in seedlist:
- sds.pop(address)
- return TopologyDescription(
- topology_description.topology_type,
- sds,
- topology_description.replica_set_name,
- topology_description.max_set_version,
- topology_description.max_election_id,
- topology_description._topology_settings)
- def _update_rs_from_primary(
- sds,
- replica_set_name,
- server_description,
- max_set_version,
- max_election_id):
- """Update topology description from a primary's hello response.
- Pass in a dict of ServerDescriptions, current replica set name, the
- ServerDescription we are processing, and the TopologyDescription's
- max_set_version and max_election_id if any.
- Returns (new topology type, new replica_set_name, new max_set_version,
- new max_election_id).
- """
- if replica_set_name is None:
- replica_set_name = server_description.replica_set_name
- elif replica_set_name != server_description.replica_set_name:
- # We found a primary but it doesn't have the replica_set_name
- # provided by the user.
- sds.pop(server_description.address)
- return (_check_has_primary(sds),
- replica_set_name,
- max_set_version,
- max_election_id)
- max_election_tuple = max_set_version, max_election_id
- if None not in server_description.election_tuple:
- if (None not in max_election_tuple and
- max_election_tuple > server_description.election_tuple):
- # Stale primary, set to type Unknown.
- sds[server_description.address] = server_description.to_unknown()
- return (_check_has_primary(sds),
- replica_set_name,
- max_set_version,
- max_election_id)
- max_election_id = server_description.election_id
- if (server_description.set_version is not None and
- (max_set_version is None or
- server_description.set_version > max_set_version)):
- max_set_version = server_description.set_version
- # We've heard from the primary. Is it the same primary as before?
- for server in sds.values():
- if (server.server_type is SERVER_TYPE.RSPrimary
- and server.address != server_description.address):
- # Reset old primary's type to Unknown.
- sds[server.address] = server.to_unknown()
- # There can be only one prior primary.
- break
- # Discover new hosts from this primary's response.
- for new_address in server_description.all_hosts:
- if new_address not in sds:
- sds[new_address] = ServerDescription(new_address)
- # Remove hosts not in the response.
- for addr in set(sds) - server_description.all_hosts:
- sds.pop(addr)
- # If the host list differs from the seed list, we may not have a primary
- # after all.
- return (_check_has_primary(sds),
- replica_set_name,
- max_set_version,
- max_election_id)
- def _update_rs_with_primary_from_member(
- sds,
- replica_set_name,
- server_description):
- """RS with known primary. Process a response from a non-primary.
- Pass in a dict of ServerDescriptions, current replica set name, and the
- ServerDescription we are processing.
- Returns new topology type.
- """
- assert replica_set_name is not None
- if replica_set_name != server_description.replica_set_name:
- sds.pop(server_description.address)
- elif (server_description.me and
- server_description.address != server_description.me):
- sds.pop(server_description.address)
- # Had this member been the primary?
- return _check_has_primary(sds)
- def _update_rs_no_primary_from_member(
- sds,
- replica_set_name,
- server_description):
- """RS without known primary. Update from a non-primary's response.
- Pass in a dict of ServerDescriptions, current replica set name, and the
- ServerDescription we are processing.
- Returns (new topology type, new replica_set_name).
- """
- topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
- if replica_set_name is None:
- replica_set_name = server_description.replica_set_name
- elif replica_set_name != server_description.replica_set_name:
- sds.pop(server_description.address)
- return topology_type, replica_set_name
- # This isn't the primary's response, so don't remove any servers
- # it doesn't report. Only add new servers.
- for address in server_description.all_hosts:
- if address not in sds:
- sds[address] = ServerDescription(address)
- if (server_description.me and
- server_description.address != server_description.me):
- sds.pop(server_description.address)
- return topology_type, replica_set_name
- def _check_has_primary(sds):
- """Current topology type is ReplicaSetWithPrimary. Is primary still known?
- Pass in a dict of ServerDescriptions.
- Returns new topology type.
- """
- for s in sds.values():
- if s.server_type == SERVER_TYPE.RSPrimary:
- return TOPOLOGY_TYPE.ReplicaSetWithPrimary
- else:
- return TOPOLOGY_TYPE.ReplicaSetNoPrimary
|