123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440 |
- # Copyright 2014-present MongoDB, Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you
- # may not use this file except in compliance with the License. You
- # may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied. See the License for the specific language governing
- # permissions and limitations under the License.
- """Class to monitor a MongoDB server on a background thread."""
- import atexit
- import threading
- import weakref
- from bson.py3compat import PY3
- from pymongo import common, periodic_executor
- from pymongo.errors import (NotPrimaryError,
- OperationFailure,
- _OperationCancelled)
- from pymongo.ismaster import IsMaster
- from pymongo.monotonic import time as _time
- from pymongo.periodic_executor import _shutdown_executors
- from pymongo.read_preferences import MovingAverage
- from pymongo.server_description import ServerDescription
- from pymongo.srv_resolver import _SrvResolver
- def _sanitize(error):
- """PYTHON-2433 Clear error traceback info."""
- if PY3:
- error.__traceback__ = None
- error.__context__ = None
- error.__cause__ = None
- class MonitorBase(object):
- def __init__(self, topology, name, interval, min_interval):
- """Base class to do periodic work on a background thread.
- The the background thread is signaled to stop when the Topology or
- this instance is freed.
- """
- # We strongly reference the executor and it weakly references us via
- # this closure. When the monitor is freed, stop the executor soon.
- def target():
- monitor = self_ref()
- if monitor is None:
- return False # Stop the executor.
- monitor._run()
- return True
- executor = periodic_executor.PeriodicExecutor(
- interval=interval,
- min_interval=min_interval,
- target=target,
- name=name)
- self._executor = executor
- def _on_topology_gc(dummy=None):
- # This prevents GC from waiting 10 seconds for 'hello' to complete
- # See test_cleanup_executors_on_client_del.
- monitor = self_ref()
- if monitor:
- monitor.gc_safe_close()
- # Avoid cycles. When self or topology is freed, stop executor soon.
- self_ref = weakref.ref(self, executor.close)
- self._topology = weakref.proxy(topology, _on_topology_gc)
- _register(self)
- def open(self):
- """Start monitoring, or restart after a fork.
- Multiple calls have no effect.
- """
- self._executor.open()
- def gc_safe_close(self):
- """GC safe close."""
- self._executor.close()
- def close(self):
- """Close and stop monitoring.
- open() restarts the monitor after closing.
- """
- self.gc_safe_close()
- def join(self, timeout=None):
- """Wait for the monitor to stop."""
- self._executor.join(timeout)
- def request_check(self):
- """If the monitor is sleeping, wake it soon."""
- self._executor.wake()
- class Monitor(MonitorBase):
- def __init__(
- self,
- server_description,
- topology,
- pool,
- topology_settings):
- """Class to monitor a MongoDB server on a background thread.
- Pass an initial ServerDescription, a Topology, a Pool, and
- TopologySettings.
- The Topology is weakly referenced. The Pool must be exclusive to this
- Monitor.
- """
- super(Monitor, self).__init__(
- topology,
- "pymongo_server_monitor_thread",
- topology_settings.heartbeat_frequency,
- common.MIN_HEARTBEAT_INTERVAL)
- self._server_description = server_description
- self._pool = pool
- self._settings = topology_settings
- self._listeners = self._settings._pool_options.event_listeners
- pub = self._listeners is not None
- self._publish = pub and self._listeners.enabled_for_server_heartbeat
- self._cancel_context = None
- self._rtt_monitor = _RttMonitor(
- topology, topology_settings, topology._create_pool_for_monitor(
- server_description.address))
- self.heartbeater = None
- def cancel_check(self):
- """Cancel any concurrent hello check.
- Note: this is called from a weakref.proxy callback and MUST NOT take
- any locks.
- """
- context = self._cancel_context
- if context:
- # Note: we cannot close the socket because doing so may cause
- # concurrent reads/writes to hang until a timeout occurs
- # (depending on the platform).
- context.cancel()
- def _start_rtt_monitor(self):
- """Start an _RttMonitor that periodically runs ping."""
- # If this monitor is closed directly before (or during) this open()
- # call, the _RttMonitor will not be closed. Checking if this monitor
- # was closed directly after resolves the race.
- self._rtt_monitor.open()
- if self._executor._stopped:
- self._rtt_monitor.close()
- def gc_safe_close(self):
- self._executor.close()
- self._rtt_monitor.gc_safe_close()
- self.cancel_check()
- def close(self):
- self.gc_safe_close()
- self._rtt_monitor.close()
- # Increment the generation and maybe close the socket. If the executor
- # thread has the socket checked out, it will be closed when checked in.
- self._reset_connection()
- def _reset_connection(self):
- # Clear our pooled connection.
- self._pool.reset()
- def _run(self):
- try:
- prev_sd = self._server_description
- try:
- self._server_description = self._check_server()
- except _OperationCancelled as exc:
- _sanitize(exc)
- # Already closed the connection, wait for the next check.
- self._server_description = ServerDescription(
- self._server_description.address, error=exc)
- if prev_sd.is_server_type_known:
- # Immediately retry since we've already waited 500ms to
- # discover that we've been cancelled.
- self._executor.skip_sleep()
- return
- # Update the Topology and clear the server pool on error.
- self._topology.on_change(self._server_description,
- reset_pool=self._server_description.error)
- if (self._server_description.is_server_type_known and
- self._server_description.topology_version):
- self._start_rtt_monitor()
- # Immediately check for the next streaming response.
- self._executor.skip_sleep()
- if self._server_description.error and prev_sd.is_server_type_known:
- # Immediately retry on network errors.
- self._executor.skip_sleep()
- except ReferenceError:
- # Topology was garbage-collected.
- self.close()
- def _check_server(self):
- """Call hello or read the next streaming response.
- Returns a ServerDescription.
- """
- start = _time()
- try:
- try:
- return self._check_once()
- except (OperationFailure, NotPrimaryError) as exc:
- # Update max cluster time even when hello fails.
- self._topology.receive_cluster_time(
- exc.details.get('$clusterTime'))
- raise
- except ReferenceError:
- raise
- except Exception as error:
- _sanitize(error)
- sd = self._server_description
- address = sd.address
- duration = _time() - start
- if self._publish:
- awaited = sd.is_server_type_known and sd.topology_version
- self._listeners.publish_server_heartbeat_failed(
- address, duration, error, awaited)
- self._reset_connection()
- if isinstance(error, _OperationCancelled):
- raise
- self._rtt_monitor.reset()
- # Server type defaults to Unknown.
- return ServerDescription(address, error=error)
- def _check_once(self):
- """A single attempt to call hello.
- Returns a ServerDescription, or raises an exception.
- """
- address = self._server_description.address
- if self._publish:
- self._listeners.publish_server_heartbeat_started(address)
- if self._cancel_context and self._cancel_context.cancelled:
- self._reset_connection()
- with self._pool.get_socket({}) as sock_info:
- self._cancel_context = sock_info.cancel_context
- response, round_trip_time = self._check_with_socket(sock_info)
- if not response.awaitable:
- self._rtt_monitor.add_sample(round_trip_time)
- sd = ServerDescription(address, response,
- self._rtt_monitor.average())
- if self._publish:
- self._listeners.publish_server_heartbeat_succeeded(
- address, round_trip_time, response, response.awaitable)
- return sd
- def _check_with_socket(self, conn):
- """Return (Hello, round_trip_time).
- Can raise ConnectionFailure or OperationFailure.
- """
- cluster_time = self._topology.max_cluster_time()
- start = _time()
- if conn.more_to_come:
- # Read the next streaming hello (MongoDB 4.4+).
- response = IsMaster(conn._next_reply(), awaitable=True)
- elif (conn.performed_handshake and
- self._server_description.topology_version):
- # Initiate streaming hello (MongoDB 4.4+).
- response = conn._hello(
- cluster_time,
- self._server_description.topology_version,
- self._settings.heartbeat_frequency,
- None)
- else:
- # New connection handshake or polling hello (MongoDB <4.4).
- response = conn._hello(cluster_time, None, None, None)
- return response, _time() - start
- class SrvMonitor(MonitorBase):
- def __init__(self, topology, topology_settings):
- """Class to poll SRV records on a background thread.
- Pass a Topology and a TopologySettings.
- The Topology is weakly referenced.
- """
- super(SrvMonitor, self).__init__(
- topology,
- "pymongo_srv_polling_thread",
- common.MIN_SRV_RESCAN_INTERVAL,
- topology_settings.heartbeat_frequency)
- self._settings = topology_settings
- self._seedlist = self._settings._seeds
- self._fqdn = self._settings.fqdn
- def _run(self):
- seedlist = self._get_seedlist()
- if seedlist:
- self._seedlist = seedlist
- try:
- self._topology.on_srv_update(self._seedlist)
- except ReferenceError:
- # Topology was garbage-collected.
- self.close()
- def _get_seedlist(self):
- """Poll SRV records for a seedlist.
- Returns a list of ServerDescriptions.
- """
- try:
- seedlist, ttl = _SrvResolver(self._fqdn).get_hosts_and_min_ttl()
- if len(seedlist) == 0:
- # As per the spec: this should be treated as a failure.
- raise Exception
- except Exception:
- # As per the spec, upon encountering an error:
- # - An error must not be raised
- # - SRV records must be rescanned every heartbeatFrequencyMS
- # - Topology must be left unchanged
- self.request_check()
- return None
- else:
- self._executor.update_interval(
- max(ttl, common.MIN_SRV_RESCAN_INTERVAL))
- return seedlist
- class _RttMonitor(MonitorBase):
- def __init__(self, topology, topology_settings, pool):
- """Maintain round trip times for a server.
- The Topology is weakly referenced.
- """
- super(_RttMonitor, self).__init__(
- topology,
- "pymongo_server_rtt_thread",
- topology_settings.heartbeat_frequency,
- common.MIN_HEARTBEAT_INTERVAL)
- self._pool = pool
- self._moving_average = MovingAverage()
- self._lock = threading.Lock()
- def close(self):
- self.gc_safe_close()
- # Increment the generation and maybe close the socket. If the executor
- # thread has the socket checked out, it will be closed when checked in.
- self._pool.reset()
- def add_sample(self, sample):
- """Add a RTT sample."""
- with self._lock:
- self._moving_average.add_sample(sample)
- def average(self):
- """Get the calculated average, or None if no samples yet."""
- with self._lock:
- return self._moving_average.get()
- def reset(self):
- """Reset the average RTT."""
- with self._lock:
- return self._moving_average.reset()
- def _run(self):
- try:
- # NOTE: This thread is only run when when using the streaming
- # heartbeat protocol (MongoDB 4.4+).
- # XXX: Skip check if the server is unknown?
- rtt = self._ping()
- self.add_sample(rtt)
- except ReferenceError:
- # Topology was garbage-collected.
- self.close()
- except Exception:
- self._pool.reset()
- def _ping(self):
- """Run a "hello" command and return the RTT."""
- with self._pool.get_socket({}) as sock_info:
- if self._executor._stopped:
- raise Exception('_RttMonitor closed')
- start = _time()
- sock_info.hello()
- return _time() - start
- # Close monitors to cancel any in progress streaming checks before joining
- # executor threads. For an explanation of how this works see the comment
- # about _EXECUTORS in periodic_executor.py.
- _MONITORS = set()
- def _register(monitor):
- ref = weakref.ref(monitor, _unregister)
- _MONITORS.add(ref)
- def _unregister(monitor_ref):
- _MONITORS.remove(monitor_ref)
- def _shutdown_monitors():
- if _MONITORS is None:
- return
- # Copy the set. Closing monitors removes them.
- monitors = list(_MONITORS)
- # Close all monitors.
- for ref in monitors:
- monitor = ref()
- if monitor:
- monitor.gc_safe_close()
- monitor = None
- def _shutdown_resources():
- # _shutdown_monitors/_shutdown_executors may already be GC'd at shutdown.
- shutdown = _shutdown_monitors
- if shutdown:
- shutdown()
- shutdown = _shutdown_executors
- if shutdown:
- shutdown()
- atexit.register(_shutdown_resources)
|