monitor.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. # Copyright 2014-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Class to monitor a MongoDB server on a background thread."""
  15. import atexit
  16. import threading
  17. import weakref
  18. from bson.py3compat import PY3
  19. from pymongo import common, periodic_executor
  20. from pymongo.errors import (NotPrimaryError,
  21. OperationFailure,
  22. _OperationCancelled)
  23. from pymongo.ismaster import IsMaster
  24. from pymongo.monotonic import time as _time
  25. from pymongo.periodic_executor import _shutdown_executors
  26. from pymongo.read_preferences import MovingAverage
  27. from pymongo.server_description import ServerDescription
  28. from pymongo.srv_resolver import _SrvResolver
  29. def _sanitize(error):
  30. """PYTHON-2433 Clear error traceback info."""
  31. if PY3:
  32. error.__traceback__ = None
  33. error.__context__ = None
  34. error.__cause__ = None
  35. class MonitorBase(object):
  36. def __init__(self, topology, name, interval, min_interval):
  37. """Base class to do periodic work on a background thread.
  38. The the background thread is signaled to stop when the Topology or
  39. this instance is freed.
  40. """
  41. # We strongly reference the executor and it weakly references us via
  42. # this closure. When the monitor is freed, stop the executor soon.
  43. def target():
  44. monitor = self_ref()
  45. if monitor is None:
  46. return False # Stop the executor.
  47. monitor._run()
  48. return True
  49. executor = periodic_executor.PeriodicExecutor(
  50. interval=interval,
  51. min_interval=min_interval,
  52. target=target,
  53. name=name)
  54. self._executor = executor
  55. def _on_topology_gc(dummy=None):
  56. # This prevents GC from waiting 10 seconds for 'hello' to complete
  57. # See test_cleanup_executors_on_client_del.
  58. monitor = self_ref()
  59. if monitor:
  60. monitor.gc_safe_close()
  61. # Avoid cycles. When self or topology is freed, stop executor soon.
  62. self_ref = weakref.ref(self, executor.close)
  63. self._topology = weakref.proxy(topology, _on_topology_gc)
  64. _register(self)
  65. def open(self):
  66. """Start monitoring, or restart after a fork.
  67. Multiple calls have no effect.
  68. """
  69. self._executor.open()
  70. def gc_safe_close(self):
  71. """GC safe close."""
  72. self._executor.close()
  73. def close(self):
  74. """Close and stop monitoring.
  75. open() restarts the monitor after closing.
  76. """
  77. self.gc_safe_close()
  78. def join(self, timeout=None):
  79. """Wait for the monitor to stop."""
  80. self._executor.join(timeout)
  81. def request_check(self):
  82. """If the monitor is sleeping, wake it soon."""
  83. self._executor.wake()
  84. class Monitor(MonitorBase):
  85. def __init__(
  86. self,
  87. server_description,
  88. topology,
  89. pool,
  90. topology_settings):
  91. """Class to monitor a MongoDB server on a background thread.
  92. Pass an initial ServerDescription, a Topology, a Pool, and
  93. TopologySettings.
  94. The Topology is weakly referenced. The Pool must be exclusive to this
  95. Monitor.
  96. """
  97. super(Monitor, self).__init__(
  98. topology,
  99. "pymongo_server_monitor_thread",
  100. topology_settings.heartbeat_frequency,
  101. common.MIN_HEARTBEAT_INTERVAL)
  102. self._server_description = server_description
  103. self._pool = pool
  104. self._settings = topology_settings
  105. self._listeners = self._settings._pool_options.event_listeners
  106. pub = self._listeners is not None
  107. self._publish = pub and self._listeners.enabled_for_server_heartbeat
  108. self._cancel_context = None
  109. self._rtt_monitor = _RttMonitor(
  110. topology, topology_settings, topology._create_pool_for_monitor(
  111. server_description.address))
  112. self.heartbeater = None
  113. def cancel_check(self):
  114. """Cancel any concurrent hello check.
  115. Note: this is called from a weakref.proxy callback and MUST NOT take
  116. any locks.
  117. """
  118. context = self._cancel_context
  119. if context:
  120. # Note: we cannot close the socket because doing so may cause
  121. # concurrent reads/writes to hang until a timeout occurs
  122. # (depending on the platform).
  123. context.cancel()
  124. def _start_rtt_monitor(self):
  125. """Start an _RttMonitor that periodically runs ping."""
  126. # If this monitor is closed directly before (or during) this open()
  127. # call, the _RttMonitor will not be closed. Checking if this monitor
  128. # was closed directly after resolves the race.
  129. self._rtt_monitor.open()
  130. if self._executor._stopped:
  131. self._rtt_monitor.close()
  132. def gc_safe_close(self):
  133. self._executor.close()
  134. self._rtt_monitor.gc_safe_close()
  135. self.cancel_check()
  136. def close(self):
  137. self.gc_safe_close()
  138. self._rtt_monitor.close()
  139. # Increment the generation and maybe close the socket. If the executor
  140. # thread has the socket checked out, it will be closed when checked in.
  141. self._reset_connection()
  142. def _reset_connection(self):
  143. # Clear our pooled connection.
  144. self._pool.reset()
  145. def _run(self):
  146. try:
  147. prev_sd = self._server_description
  148. try:
  149. self._server_description = self._check_server()
  150. except _OperationCancelled as exc:
  151. _sanitize(exc)
  152. # Already closed the connection, wait for the next check.
  153. self._server_description = ServerDescription(
  154. self._server_description.address, error=exc)
  155. if prev_sd.is_server_type_known:
  156. # Immediately retry since we've already waited 500ms to
  157. # discover that we've been cancelled.
  158. self._executor.skip_sleep()
  159. return
  160. # Update the Topology and clear the server pool on error.
  161. self._topology.on_change(self._server_description,
  162. reset_pool=self._server_description.error)
  163. if (self._server_description.is_server_type_known and
  164. self._server_description.topology_version):
  165. self._start_rtt_monitor()
  166. # Immediately check for the next streaming response.
  167. self._executor.skip_sleep()
  168. if self._server_description.error and prev_sd.is_server_type_known:
  169. # Immediately retry on network errors.
  170. self._executor.skip_sleep()
  171. except ReferenceError:
  172. # Topology was garbage-collected.
  173. self.close()
  174. def _check_server(self):
  175. """Call hello or read the next streaming response.
  176. Returns a ServerDescription.
  177. """
  178. start = _time()
  179. try:
  180. try:
  181. return self._check_once()
  182. except (OperationFailure, NotPrimaryError) as exc:
  183. # Update max cluster time even when hello fails.
  184. self._topology.receive_cluster_time(
  185. exc.details.get('$clusterTime'))
  186. raise
  187. except ReferenceError:
  188. raise
  189. except Exception as error:
  190. _sanitize(error)
  191. sd = self._server_description
  192. address = sd.address
  193. duration = _time() - start
  194. if self._publish:
  195. awaited = sd.is_server_type_known and sd.topology_version
  196. self._listeners.publish_server_heartbeat_failed(
  197. address, duration, error, awaited)
  198. self._reset_connection()
  199. if isinstance(error, _OperationCancelled):
  200. raise
  201. self._rtt_monitor.reset()
  202. # Server type defaults to Unknown.
  203. return ServerDescription(address, error=error)
  204. def _check_once(self):
  205. """A single attempt to call hello.
  206. Returns a ServerDescription, or raises an exception.
  207. """
  208. address = self._server_description.address
  209. if self._publish:
  210. self._listeners.publish_server_heartbeat_started(address)
  211. if self._cancel_context and self._cancel_context.cancelled:
  212. self._reset_connection()
  213. with self._pool.get_socket({}) as sock_info:
  214. self._cancel_context = sock_info.cancel_context
  215. response, round_trip_time = self._check_with_socket(sock_info)
  216. if not response.awaitable:
  217. self._rtt_monitor.add_sample(round_trip_time)
  218. sd = ServerDescription(address, response,
  219. self._rtt_monitor.average())
  220. if self._publish:
  221. self._listeners.publish_server_heartbeat_succeeded(
  222. address, round_trip_time, response, response.awaitable)
  223. return sd
  224. def _check_with_socket(self, conn):
  225. """Return (Hello, round_trip_time).
  226. Can raise ConnectionFailure or OperationFailure.
  227. """
  228. cluster_time = self._topology.max_cluster_time()
  229. start = _time()
  230. if conn.more_to_come:
  231. # Read the next streaming hello (MongoDB 4.4+).
  232. response = IsMaster(conn._next_reply(), awaitable=True)
  233. elif (conn.performed_handshake and
  234. self._server_description.topology_version):
  235. # Initiate streaming hello (MongoDB 4.4+).
  236. response = conn._hello(
  237. cluster_time,
  238. self._server_description.topology_version,
  239. self._settings.heartbeat_frequency,
  240. None)
  241. else:
  242. # New connection handshake or polling hello (MongoDB <4.4).
  243. response = conn._hello(cluster_time, None, None, None)
  244. return response, _time() - start
  245. class SrvMonitor(MonitorBase):
  246. def __init__(self, topology, topology_settings):
  247. """Class to poll SRV records on a background thread.
  248. Pass a Topology and a TopologySettings.
  249. The Topology is weakly referenced.
  250. """
  251. super(SrvMonitor, self).__init__(
  252. topology,
  253. "pymongo_srv_polling_thread",
  254. common.MIN_SRV_RESCAN_INTERVAL,
  255. topology_settings.heartbeat_frequency)
  256. self._settings = topology_settings
  257. self._seedlist = self._settings._seeds
  258. self._fqdn = self._settings.fqdn
  259. def _run(self):
  260. seedlist = self._get_seedlist()
  261. if seedlist:
  262. self._seedlist = seedlist
  263. try:
  264. self._topology.on_srv_update(self._seedlist)
  265. except ReferenceError:
  266. # Topology was garbage-collected.
  267. self.close()
  268. def _get_seedlist(self):
  269. """Poll SRV records for a seedlist.
  270. Returns a list of ServerDescriptions.
  271. """
  272. try:
  273. seedlist, ttl = _SrvResolver(self._fqdn).get_hosts_and_min_ttl()
  274. if len(seedlist) == 0:
  275. # As per the spec: this should be treated as a failure.
  276. raise Exception
  277. except Exception:
  278. # As per the spec, upon encountering an error:
  279. # - An error must not be raised
  280. # - SRV records must be rescanned every heartbeatFrequencyMS
  281. # - Topology must be left unchanged
  282. self.request_check()
  283. return None
  284. else:
  285. self._executor.update_interval(
  286. max(ttl, common.MIN_SRV_RESCAN_INTERVAL))
  287. return seedlist
  288. class _RttMonitor(MonitorBase):
  289. def __init__(self, topology, topology_settings, pool):
  290. """Maintain round trip times for a server.
  291. The Topology is weakly referenced.
  292. """
  293. super(_RttMonitor, self).__init__(
  294. topology,
  295. "pymongo_server_rtt_thread",
  296. topology_settings.heartbeat_frequency,
  297. common.MIN_HEARTBEAT_INTERVAL)
  298. self._pool = pool
  299. self._moving_average = MovingAverage()
  300. self._lock = threading.Lock()
  301. def close(self):
  302. self.gc_safe_close()
  303. # Increment the generation and maybe close the socket. If the executor
  304. # thread has the socket checked out, it will be closed when checked in.
  305. self._pool.reset()
  306. def add_sample(self, sample):
  307. """Add a RTT sample."""
  308. with self._lock:
  309. self._moving_average.add_sample(sample)
  310. def average(self):
  311. """Get the calculated average, or None if no samples yet."""
  312. with self._lock:
  313. return self._moving_average.get()
  314. def reset(self):
  315. """Reset the average RTT."""
  316. with self._lock:
  317. return self._moving_average.reset()
  318. def _run(self):
  319. try:
  320. # NOTE: This thread is only run when when using the streaming
  321. # heartbeat protocol (MongoDB 4.4+).
  322. # XXX: Skip check if the server is unknown?
  323. rtt = self._ping()
  324. self.add_sample(rtt)
  325. except ReferenceError:
  326. # Topology was garbage-collected.
  327. self.close()
  328. except Exception:
  329. self._pool.reset()
  330. def _ping(self):
  331. """Run a "hello" command and return the RTT."""
  332. with self._pool.get_socket({}) as sock_info:
  333. if self._executor._stopped:
  334. raise Exception('_RttMonitor closed')
  335. start = _time()
  336. sock_info.hello()
  337. return _time() - start
  338. # Close monitors to cancel any in progress streaming checks before joining
  339. # executor threads. For an explanation of how this works see the comment
  340. # about _EXECUTORS in periodic_executor.py.
  341. _MONITORS = set()
  342. def _register(monitor):
  343. ref = weakref.ref(monitor, _unregister)
  344. _MONITORS.add(ref)
  345. def _unregister(monitor_ref):
  346. _MONITORS.remove(monitor_ref)
  347. def _shutdown_monitors():
  348. if _MONITORS is None:
  349. return
  350. # Copy the set. Closing monitors removes them.
  351. monitors = list(_MONITORS)
  352. # Close all monitors.
  353. for ref in monitors:
  354. monitor = ref()
  355. if monitor:
  356. monitor.gc_safe_close()
  357. monitor = None
  358. def _shutdown_resources():
  359. # _shutdown_monitors/_shutdown_executors may already be GC'd at shutdown.
  360. shutdown = _shutdown_monitors
  361. if shutdown:
  362. shutdown()
  363. shutdown = _shutdown_executors
  364. if shutdown:
  365. shutdown()
  366. atexit.register(_shutdown_resources)