topology.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. # Copyright 2014-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Internal class to monitor a topology of one or more servers."""
  15. import os
  16. import random
  17. import threading
  18. import warnings
  19. import weakref
  20. from bson.py3compat import itervalues, PY3
  21. if PY3:
  22. import queue as Queue
  23. else:
  24. import Queue
  25. from pymongo import (common,
  26. helpers,
  27. periodic_executor)
  28. from pymongo.ismaster import IsMaster
  29. from pymongo.pool import PoolOptions
  30. from pymongo.topology_description import (updated_topology_description,
  31. _updated_topology_description_srv_polling,
  32. TopologyDescription,
  33. SRV_POLLING_TOPOLOGIES, TOPOLOGY_TYPE)
  34. from pymongo.errors import (ConnectionFailure,
  35. ConfigurationError,
  36. NetworkTimeout,
  37. NotPrimaryError,
  38. OperationFailure,
  39. ServerSelectionTimeoutError,
  40. WriteError)
  41. from pymongo.monitor import SrvMonitor
  42. from pymongo.monotonic import time as _time
  43. from pymongo.server import Server
  44. from pymongo.server_description import ServerDescription
  45. from pymongo.server_selectors import (any_server_selector,
  46. arbiter_server_selector,
  47. secondary_server_selector,
  48. readable_server_selector,
  49. writable_server_selector,
  50. Selection)
  51. from pymongo.client_session import _ServerSessionPool
  52. def process_events_queue(queue_ref):
  53. q = queue_ref()
  54. if not q:
  55. return False # Cancel PeriodicExecutor.
  56. while True:
  57. try:
  58. event = q.get_nowait()
  59. except Queue.Empty:
  60. break
  61. else:
  62. fn, args = event
  63. fn(*args)
  64. return True # Continue PeriodicExecutor.
  65. class Topology(object):
  66. """Monitor a topology of one or more servers."""
  67. def __init__(self, topology_settings):
  68. self._topology_id = topology_settings._topology_id
  69. self._listeners = topology_settings._pool_options.event_listeners
  70. pub = self._listeners is not None
  71. self._publish_server = pub and self._listeners.enabled_for_server
  72. self._publish_tp = pub and self._listeners.enabled_for_topology
  73. # Create events queue if there are publishers.
  74. self._events = None
  75. self.__events_executor = None
  76. if self._publish_server or self._publish_tp:
  77. self._events = Queue.Queue(maxsize=100)
  78. if self._publish_tp:
  79. self._events.put((self._listeners.publish_topology_opened,
  80. (self._topology_id,)))
  81. self._settings = topology_settings
  82. topology_description = TopologyDescription(
  83. topology_settings.get_topology_type(),
  84. topology_settings.get_server_descriptions(),
  85. topology_settings.replica_set_name,
  86. None,
  87. None,
  88. topology_settings)
  89. self._description = topology_description
  90. if self._publish_tp:
  91. initial_td = TopologyDescription(TOPOLOGY_TYPE.Unknown, {}, None,
  92. None, None, self._settings)
  93. self._events.put((
  94. self._listeners.publish_topology_description_changed,
  95. (initial_td, self._description, self._topology_id)))
  96. for seed in topology_settings.seeds:
  97. if self._publish_server:
  98. self._events.put((self._listeners.publish_server_opened,
  99. (seed, self._topology_id)))
  100. # Store the seed list to help diagnose errors in _error_message().
  101. self._seed_addresses = list(topology_description.server_descriptions())
  102. self._opened = False
  103. self._lock = threading.Lock()
  104. self._condition = self._settings.condition_class(self._lock)
  105. self._servers = {}
  106. self._pid = None
  107. self._max_cluster_time = None
  108. self._session_pool = _ServerSessionPool()
  109. if self._publish_server or self._publish_tp:
  110. def target():
  111. return process_events_queue(weak)
  112. executor = periodic_executor.PeriodicExecutor(
  113. interval=common.EVENTS_QUEUE_FREQUENCY,
  114. min_interval=0.5,
  115. target=target,
  116. name="pymongo_events_thread")
  117. # We strongly reference the executor and it weakly references
  118. # the queue via this closure. When the topology is freed, stop
  119. # the executor soon.
  120. weak = weakref.ref(self._events, executor.close)
  121. self.__events_executor = executor
  122. executor.open()
  123. self._srv_monitor = None
  124. if (self._settings.fqdn is not None and
  125. not self._settings.load_balanced):
  126. self._srv_monitor = SrvMonitor(self, self._settings)
  127. def open(self):
  128. """Start monitoring, or restart after a fork.
  129. No effect if called multiple times.
  130. .. warning:: Topology is shared among multiple threads and is protected
  131. by mutual exclusion. Using Topology from a process other than the one
  132. that initialized it will emit a warning and may result in deadlock. To
  133. prevent this from happening, MongoClient must be created after any
  134. forking.
  135. """
  136. if self._pid is None:
  137. self._pid = os.getpid()
  138. else:
  139. if os.getpid() != self._pid:
  140. warnings.warn(
  141. "MongoClient opened before fork. Create MongoClient only "
  142. "after forking. See PyMongo's documentation for details: "
  143. "https://pymongo.readthedocs.io/en/stable/faq.html#"
  144. "is-pymongo-fork-safe")
  145. with self._lock:
  146. # Reset the session pool to avoid duplicate sessions in
  147. # the child process.
  148. self._session_pool.reset()
  149. with self._lock:
  150. self._ensure_opened()
  151. def select_servers(self,
  152. selector,
  153. server_selection_timeout=None,
  154. address=None):
  155. """Return a list of Servers matching selector, or time out.
  156. :Parameters:
  157. - `selector`: function that takes a list of Servers and returns
  158. a subset of them.
  159. - `server_selection_timeout` (optional): maximum seconds to wait.
  160. If not provided, the default value common.SERVER_SELECTION_TIMEOUT
  161. is used.
  162. - `address`: optional server address to select.
  163. Calls self.open() if needed.
  164. Raises exc:`ServerSelectionTimeoutError` after
  165. `server_selection_timeout` if no matching servers are found.
  166. """
  167. if server_selection_timeout is None:
  168. server_timeout = self._settings.server_selection_timeout
  169. else:
  170. server_timeout = server_selection_timeout
  171. with self._lock:
  172. server_descriptions = self._select_servers_loop(
  173. selector, server_timeout, address)
  174. return [self.get_server_by_address(sd.address)
  175. for sd in server_descriptions]
  176. def _select_servers_loop(self, selector, timeout, address):
  177. """select_servers() guts. Hold the lock when calling this."""
  178. now = _time()
  179. end_time = now + timeout
  180. server_descriptions = self._description.apply_selector(
  181. selector, address, custom_selector=self._settings.server_selector)
  182. while not server_descriptions:
  183. # No suitable servers.
  184. if timeout == 0 or now > end_time:
  185. raise ServerSelectionTimeoutError(
  186. "%s, Timeout: %ss, Topology Description: %r" %
  187. (self._error_message(selector), timeout, self.description))
  188. self._ensure_opened()
  189. self._request_check_all()
  190. # Release the lock and wait for the topology description to
  191. # change, or for a timeout. We won't miss any changes that
  192. # came after our most recent apply_selector call, since we've
  193. # held the lock until now.
  194. self._condition.wait(common.MIN_HEARTBEAT_INTERVAL)
  195. self._description.check_compatible()
  196. now = _time()
  197. server_descriptions = self._description.apply_selector(
  198. selector, address,
  199. custom_selector=self._settings.server_selector)
  200. self._description.check_compatible()
  201. return server_descriptions
  202. def select_server(self,
  203. selector,
  204. server_selection_timeout=None,
  205. address=None):
  206. """Like select_servers, but choose a random server if several match."""
  207. return random.choice(self.select_servers(selector,
  208. server_selection_timeout,
  209. address))
  210. def select_server_by_address(self, address,
  211. server_selection_timeout=None):
  212. """Return a Server for "address", reconnecting if necessary.
  213. If the server's type is not known, request an immediate check of all
  214. servers. Time out after "server_selection_timeout" if the server
  215. cannot be reached.
  216. :Parameters:
  217. - `address`: A (host, port) pair.
  218. - `server_selection_timeout` (optional): maximum seconds to wait.
  219. If not provided, the default value
  220. common.SERVER_SELECTION_TIMEOUT is used.
  221. Calls self.open() if needed.
  222. Raises exc:`ServerSelectionTimeoutError` after
  223. `server_selection_timeout` if no matching servers are found.
  224. """
  225. return self.select_server(any_server_selector,
  226. server_selection_timeout,
  227. address)
  228. def _process_change(self, server_description, reset_pool=False):
  229. """Process a new ServerDescription on an opened topology.
  230. Hold the lock when calling this.
  231. """
  232. td_old = self._description
  233. sd_old = td_old._server_descriptions[server_description.address]
  234. if _is_stale_server_description(sd_old, server_description):
  235. # This is a stale hello response. Ignore it.
  236. return
  237. suppress_event = ((self._publish_server or self._publish_tp)
  238. and sd_old == server_description)
  239. if self._publish_server and not suppress_event:
  240. self._events.put((
  241. self._listeners.publish_server_description_changed,
  242. (sd_old, server_description,
  243. server_description.address, self._topology_id)))
  244. self._description = updated_topology_description(
  245. self._description, server_description)
  246. self._update_servers()
  247. self._receive_cluster_time_no_lock(server_description.cluster_time)
  248. if self._publish_tp and not suppress_event:
  249. self._events.put((
  250. self._listeners.publish_topology_description_changed,
  251. (td_old, self._description, self._topology_id)))
  252. # Shutdown SRV polling for unsupported cluster types.
  253. # This is only applicable if the old topology was Unknown, and the
  254. # new one is something other than Unknown or Sharded.
  255. if self._srv_monitor and (td_old.topology_type == TOPOLOGY_TYPE.Unknown
  256. and self._description.topology_type not in
  257. SRV_POLLING_TOPOLOGIES):
  258. self._srv_monitor.close()
  259. # Clear the pool from a failed heartbeat.
  260. if reset_pool:
  261. server = self._servers.get(server_description.address)
  262. if server:
  263. server.pool.reset()
  264. # Wake waiters in select_servers().
  265. self._condition.notify_all()
  266. def on_change(self, server_description, reset_pool=False):
  267. """Process a new ServerDescription after a hello call completes."""
  268. # We do no I/O holding the lock.
  269. with self._lock:
  270. # Monitors may continue working on hello calls for some time
  271. # after a call to Topology.close, so this method may be called at
  272. # any time. Ensure the topology is open before processing the
  273. # change.
  274. # Any monitored server was definitely in the topology description
  275. # once. Check if it's still in the description or if some state-
  276. # change removed it. E.g., we got a host list from the primary
  277. # that didn't include this server.
  278. if (self._opened and
  279. self._description.has_server(server_description.address)):
  280. self._process_change(server_description, reset_pool)
  281. def _process_srv_update(self, seedlist):
  282. """Process a new seedlist on an opened topology.
  283. Hold the lock when calling this.
  284. """
  285. td_old = self._description
  286. self._description = _updated_topology_description_srv_polling(
  287. self._description, seedlist)
  288. self._update_servers()
  289. if self._publish_tp:
  290. self._events.put((
  291. self._listeners.publish_topology_description_changed,
  292. (td_old, self._description, self._topology_id)))
  293. def on_srv_update(self, seedlist):
  294. """Process a new list of nodes obtained from scanning SRV records."""
  295. # We do no I/O holding the lock.
  296. with self._lock:
  297. if self._opened:
  298. self._process_srv_update(seedlist)
  299. def get_server_by_address(self, address):
  300. """Get a Server or None.
  301. Returns the current version of the server immediately, even if it's
  302. Unknown or absent from the topology. Only use this in unittests.
  303. In driver code, use select_server_by_address, since then you're
  304. assured a recent view of the server's type and wire protocol version.
  305. """
  306. return self._servers.get(address)
  307. def has_server(self, address):
  308. return address in self._servers
  309. def get_primary(self):
  310. """Return primary's address or None."""
  311. # Implemented here in Topology instead of MongoClient, so it can lock.
  312. with self._lock:
  313. topology_type = self._description.topology_type
  314. if topology_type != TOPOLOGY_TYPE.ReplicaSetWithPrimary:
  315. return None
  316. return writable_server_selector(self._new_selection())[0].address
  317. def _get_replica_set_members(self, selector):
  318. """Return set of replica set member addresses."""
  319. # Implemented here in Topology instead of MongoClient, so it can lock.
  320. with self._lock:
  321. topology_type = self._description.topology_type
  322. if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary,
  323. TOPOLOGY_TYPE.ReplicaSetNoPrimary):
  324. return set()
  325. return set([sd.address for sd in selector(self._new_selection())])
  326. def get_secondaries(self):
  327. """Return set of secondary addresses."""
  328. return self._get_replica_set_members(secondary_server_selector)
  329. def get_arbiters(self):
  330. """Return set of arbiter addresses."""
  331. return self._get_replica_set_members(arbiter_server_selector)
  332. def max_cluster_time(self):
  333. """Return a document, the highest seen $clusterTime."""
  334. return self._max_cluster_time
  335. def _receive_cluster_time_no_lock(self, cluster_time):
  336. # Driver Sessions Spec: "Whenever a driver receives a cluster time from
  337. # a server it MUST compare it to the current highest seen cluster time
  338. # for the deployment. If the new cluster time is higher than the
  339. # highest seen cluster time it MUST become the new highest seen cluster
  340. # time. Two cluster times are compared using only the BsonTimestamp
  341. # value of the clusterTime embedded field."
  342. if cluster_time:
  343. # ">" uses bson.timestamp.Timestamp's comparison operator.
  344. if (not self._max_cluster_time
  345. or cluster_time['clusterTime'] >
  346. self._max_cluster_time['clusterTime']):
  347. self._max_cluster_time = cluster_time
  348. def receive_cluster_time(self, cluster_time):
  349. with self._lock:
  350. self._receive_cluster_time_no_lock(cluster_time)
  351. def request_check_all(self, wait_time=5):
  352. """Wake all monitors, wait for at least one to check its server."""
  353. with self._lock:
  354. self._request_check_all()
  355. self._condition.wait(wait_time)
  356. def handle_getlasterror(self, address, error_msg):
  357. """Clear our pool for a server, mark it Unknown, and check it soon."""
  358. error = NotPrimaryError(error_msg, {'code': 10107, 'errmsg': error_msg})
  359. with self._lock:
  360. server = self._servers.get(address)
  361. if server:
  362. self._process_change(
  363. ServerDescription(address, error=error), True)
  364. server.request_check()
  365. def data_bearing_servers(self):
  366. """Return a list of all data-bearing servers.
  367. This includes any server that might be selected for an operation.
  368. """
  369. if self._description.topology_type == TOPOLOGY_TYPE.Single:
  370. return self._description.known_servers
  371. return self._description.readable_servers
  372. def update_pool(self, all_credentials):
  373. # Remove any stale sockets and add new sockets if pool is too small.
  374. servers = []
  375. with self._lock:
  376. # Only update pools for data-bearing servers.
  377. for sd in self.data_bearing_servers():
  378. server = self._servers[sd.address]
  379. servers.append((server,
  380. server.pool.gen.get_overall()))
  381. for server, generation in servers:
  382. server.pool.remove_stale_sockets(generation, all_credentials)
  383. def close(self):
  384. """Clear pools and terminate monitors. Topology reopens on demand."""
  385. with self._lock:
  386. for server in self._servers.values():
  387. server.close()
  388. # Mark all servers Unknown.
  389. self._description = self._description.reset()
  390. for address, sd in self._description.server_descriptions().items():
  391. if address in self._servers:
  392. self._servers[address].description = sd
  393. # Stop SRV polling thread.
  394. if self._srv_monitor:
  395. self._srv_monitor.close()
  396. self._opened = False
  397. # Publish only after releasing the lock.
  398. if self._publish_tp:
  399. self._events.put((self._listeners.publish_topology_closed,
  400. (self._topology_id,)))
  401. if self._publish_server or self._publish_tp:
  402. self.__events_executor.close()
  403. @property
  404. def description(self):
  405. return self._description
  406. def pop_all_sessions(self):
  407. """Pop all session ids from the pool."""
  408. with self._lock:
  409. return self._session_pool.pop_all()
  410. def _check_session_support(self):
  411. """Internal check for session support on non-load balanced clusters."""
  412. session_timeout = self._description.logical_session_timeout_minutes
  413. if session_timeout is None:
  414. # Maybe we need an initial scan? Can raise ServerSelectionError.
  415. if self._description.topology_type == TOPOLOGY_TYPE.Single:
  416. if not self._description.has_known_servers:
  417. self._select_servers_loop(
  418. any_server_selector,
  419. self._settings.server_selection_timeout,
  420. None)
  421. elif not self._description.readable_servers:
  422. self._select_servers_loop(
  423. readable_server_selector,
  424. self._settings.server_selection_timeout,
  425. None)
  426. session_timeout = self._description.logical_session_timeout_minutes
  427. if session_timeout is None:
  428. raise ConfigurationError(
  429. "Sessions are not supported by this MongoDB deployment")
  430. return session_timeout
  431. def get_server_session(self):
  432. """Start or resume a server session, or raise ConfigurationError."""
  433. with self._lock:
  434. # Sessions are always supported in load balanced mode.
  435. if not self._settings.load_balanced:
  436. session_timeout = self._check_session_support()
  437. else:
  438. # Sessions never time out in load balanced mode.
  439. session_timeout = float('inf')
  440. return self._session_pool.get_server_session(session_timeout)
  441. def return_server_session(self, server_session, lock):
  442. if lock:
  443. with self._lock:
  444. self._session_pool.return_server_session(
  445. server_session,
  446. self._description.logical_session_timeout_minutes)
  447. else:
  448. # Called from a __del__ method, can't use a lock.
  449. self._session_pool.return_server_session_no_lock(server_session)
  450. def _new_selection(self):
  451. """A Selection object, initially including all known servers.
  452. Hold the lock when calling this.
  453. """
  454. return Selection.from_topology_description(self._description)
  455. def _ensure_opened(self):
  456. """Start monitors, or restart after a fork.
  457. Hold the lock when calling this.
  458. """
  459. if not self._opened:
  460. self._opened = True
  461. self._update_servers()
  462. # Start or restart the events publishing thread.
  463. if self._publish_tp or self._publish_server:
  464. self.__events_executor.open()
  465. # Start the SRV polling thread.
  466. if self._srv_monitor and (self.description.topology_type in
  467. SRV_POLLING_TOPOLOGIES):
  468. self._srv_monitor.open()
  469. if self._settings.load_balanced:
  470. # Emit initial SDAM events for load balancer mode.
  471. self._process_change(ServerDescription(
  472. self._seed_addresses[0],
  473. IsMaster({'ok': 1, 'serviceId': self._topology_id,
  474. 'maxWireVersion': 13})))
  475. # Ensure that the monitors are open.
  476. for server in itervalues(self._servers):
  477. server.open()
  478. def _is_stale_error(self, address, err_ctx):
  479. server = self._servers.get(address)
  480. if server is None:
  481. # Another thread removed this server from the topology.
  482. return True
  483. if server._pool.stale_generation(
  484. err_ctx.sock_generation, err_ctx.service_id):
  485. # This is an outdated error from a previous pool version.
  486. return True
  487. # topologyVersion check, ignore error when cur_tv >= error_tv:
  488. cur_tv = server.description.topology_version
  489. error = err_ctx.error
  490. error_tv = None
  491. if error and hasattr(error, 'details'):
  492. if isinstance(error.details, dict):
  493. error_tv = error.details.get('topologyVersion')
  494. return _is_stale_error_topology_version(cur_tv, error_tv)
  495. def _handle_error(self, address, err_ctx):
  496. if self._is_stale_error(address, err_ctx):
  497. return
  498. server = self._servers[address]
  499. error = err_ctx.error
  500. exc_type = type(error)
  501. service_id = err_ctx.service_id
  502. if (issubclass(exc_type, NetworkTimeout) and
  503. err_ctx.completed_handshake):
  504. # The socket has been closed. Don't reset the server.
  505. # Server Discovery And Monitoring Spec: "When an application
  506. # operation fails because of any network error besides a socket
  507. # timeout...."
  508. return
  509. elif issubclass(exc_type, WriteError):
  510. # Ignore writeErrors.
  511. return
  512. elif issubclass(exc_type, NotPrimaryError):
  513. # As per the SDAM spec if:
  514. # - the server sees a "not primary" error, and
  515. # - the server is not shutting down, and
  516. # - the server version is >= 4.2, then
  517. # we keep the existing connection pool, but mark the server type
  518. # as Unknown and request an immediate check of the server.
  519. # Otherwise, we clear the connection pool, mark the server as
  520. # Unknown and request an immediate check of the server.
  521. err_code = error.details.get('code', -1)
  522. is_shutting_down = err_code in helpers._SHUTDOWN_CODES
  523. # Mark server Unknown, clear the pool, and request check.
  524. if not self._settings.load_balanced:
  525. self._process_change(ServerDescription(address, error=error))
  526. if is_shutting_down or (err_ctx.max_wire_version <= 7):
  527. # Clear the pool.
  528. server.reset(service_id)
  529. server.request_check()
  530. elif issubclass(exc_type, ConnectionFailure):
  531. # "Client MUST replace the server's description with type Unknown
  532. # ... MUST NOT request an immediate check of the server."
  533. if not self._settings.load_balanced:
  534. self._process_change(ServerDescription(address, error=error))
  535. # Clear the pool.
  536. server.reset(service_id)
  537. # "When a client marks a server Unknown from `Network error when
  538. # reading or writing`_, clients MUST cancel the hello check on
  539. # that server and close the current monitoring connection."
  540. server._monitor.cancel_check()
  541. elif issubclass(exc_type, OperationFailure):
  542. # Do not request an immediate check since the server is likely
  543. # shutting down.
  544. if error.code in helpers._NOT_MASTER_CODES:
  545. if not self._settings.load_balanced:
  546. self._process_change(
  547. ServerDescription(address, error=error))
  548. # Clear the pool.
  549. server.reset(service_id)
  550. def handle_error(self, address, err_ctx):
  551. """Handle an application error.
  552. May reset the server to Unknown, clear the pool, and request an
  553. immediate check depending on the error and the context.
  554. """
  555. with self._lock:
  556. self._handle_error(address, err_ctx)
  557. def _request_check_all(self):
  558. """Wake all monitors. Hold the lock when calling this."""
  559. for server in self._servers.values():
  560. server.request_check()
  561. def _update_servers(self):
  562. """Sync our Servers from TopologyDescription.server_descriptions.
  563. Hold the lock while calling this.
  564. """
  565. for address, sd in self._description.server_descriptions().items():
  566. if address not in self._servers:
  567. monitor = self._settings.monitor_class(
  568. server_description=sd,
  569. topology=self,
  570. pool=self._create_pool_for_monitor(address),
  571. topology_settings=self._settings)
  572. weak = None
  573. if self._publish_server:
  574. weak = weakref.ref(self._events)
  575. server = Server(
  576. server_description=sd,
  577. pool=self._create_pool_for_server(address),
  578. monitor=monitor,
  579. topology_id=self._topology_id,
  580. listeners=self._listeners,
  581. events=weak)
  582. self._servers[address] = server
  583. server.open()
  584. else:
  585. # Cache old is_writable value.
  586. was_writable = self._servers[address].description.is_writable
  587. # Update server description.
  588. self._servers[address].description = sd
  589. # Update is_writable value of the pool, if it changed.
  590. if was_writable != sd.is_writable:
  591. self._servers[address].pool.update_is_writable(
  592. sd.is_writable)
  593. for address, server in list(self._servers.items()):
  594. if not self._description.has_server(address):
  595. server.close()
  596. self._servers.pop(address)
  597. def _create_pool_for_server(self, address):
  598. return self._settings.pool_class(address, self._settings.pool_options)
  599. def _create_pool_for_monitor(self, address):
  600. options = self._settings.pool_options
  601. # According to the Server Discovery And Monitoring Spec, monitors use
  602. # connect_timeout for both connect_timeout and socket_timeout. The
  603. # pool only has one socket so maxPoolSize and so on aren't needed.
  604. monitor_pool_options = PoolOptions(
  605. connect_timeout=options.connect_timeout,
  606. socket_timeout=options.connect_timeout,
  607. ssl_context=options.ssl_context,
  608. ssl_match_hostname=options.ssl_match_hostname,
  609. event_listeners=options.event_listeners,
  610. appname=options.appname,
  611. driver=options.driver,
  612. server_api=options.server_api,
  613. )
  614. return self._settings.pool_class(address, monitor_pool_options,
  615. handshake=False)
  616. def _error_message(self, selector):
  617. """Format an error message if server selection fails.
  618. Hold the lock when calling this.
  619. """
  620. is_replica_set = self._description.topology_type in (
  621. TOPOLOGY_TYPE.ReplicaSetWithPrimary,
  622. TOPOLOGY_TYPE.ReplicaSetNoPrimary)
  623. if is_replica_set:
  624. server_plural = 'replica set members'
  625. elif self._description.topology_type == TOPOLOGY_TYPE.Sharded:
  626. server_plural = 'mongoses'
  627. else:
  628. server_plural = 'servers'
  629. if self._description.known_servers:
  630. # We've connected, but no servers match the selector.
  631. if selector is writable_server_selector:
  632. if is_replica_set:
  633. return 'No primary available for writes'
  634. else:
  635. return 'No %s available for writes' % server_plural
  636. else:
  637. return 'No %s match selector "%s"' % (server_plural, selector)
  638. else:
  639. addresses = list(self._description.server_descriptions())
  640. servers = list(self._description.server_descriptions().values())
  641. if not servers:
  642. if is_replica_set:
  643. # We removed all servers because of the wrong setName?
  644. return 'No %s available for replica set name "%s"' % (
  645. server_plural, self._settings.replica_set_name)
  646. else:
  647. return 'No %s available' % server_plural
  648. # 1 or more servers, all Unknown. Are they unknown for one reason?
  649. error = servers[0].error
  650. same = all(server.error == error for server in servers[1:])
  651. if same:
  652. if error is None:
  653. # We're still discovering.
  654. return 'No %s found yet' % server_plural
  655. if (is_replica_set and not
  656. set(addresses).intersection(self._seed_addresses)):
  657. # We replaced our seeds with new hosts but can't reach any.
  658. return (
  659. 'Could not reach any servers in %s. Replica set is'
  660. ' configured with internal hostnames or IPs?' %
  661. addresses)
  662. return str(error)
  663. else:
  664. return ','.join(str(server.error) for server in servers
  665. if server.error)
  666. def __repr__(self):
  667. msg = ''
  668. if not self._opened:
  669. msg = 'CLOSED '
  670. return '<%s %s%r>' % (self.__class__.__name__, msg, self._description)
  671. class _ErrorContext(object):
  672. """An error with context for SDAM error handling."""
  673. def __init__(self, error, max_wire_version, sock_generation,
  674. completed_handshake, service_id):
  675. self.error = error
  676. self.max_wire_version = max_wire_version
  677. self.sock_generation = sock_generation
  678. self.completed_handshake = completed_handshake
  679. self.service_id = service_id
  680. def _is_stale_error_topology_version(current_tv, error_tv):
  681. """Return True if the error's topologyVersion is <= current."""
  682. if current_tv is None or error_tv is None:
  683. return False
  684. if current_tv['processId'] != error_tv['processId']:
  685. return False
  686. return current_tv['counter'] >= error_tv['counter']
  687. def _is_stale_server_description(current_sd, new_sd):
  688. """Return True if the new topologyVersion is < current."""
  689. current_tv, new_tv = current_sd.topology_version, new_sd.topology_version
  690. if current_tv is None or new_tv is None:
  691. return False
  692. if current_tv['processId'] != new_tv['processId']:
  693. return False
  694. return current_tv['counter'] > new_tv['counter']