mongo_client.py 106 KB


  1. # Copyright 2009-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Tools for connecting to MongoDB.
  15. .. seealso:: :doc:`/examples/high_availability` for examples of connecting
  16. to replica sets or sets of mongos servers.
  17. To get a :class:`~pymongo.database.Database` instance from a
  18. :class:`MongoClient` use either dictionary-style or attribute-style
  19. access:
  20. .. doctest::
  21. >>> from pymongo import MongoClient
  22. >>> c = MongoClient()
  23. >>> c.test_database
  24. Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'test_database')
  25. >>> c['test-database']
  26. Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'test-database')
  27. """
  28. import contextlib
  29. import datetime
  30. import threading
  31. import warnings
  32. import weakref
  33. from collections import defaultdict
  34. from bson.codec_options import DEFAULT_CODEC_OPTIONS
  35. from bson.py3compat import (integer_types,
  36. string_type)
  37. from bson.son import SON
  38. from pymongo import (common,
  39. database,
  40. helpers,
  41. message,
  42. periodic_executor,
  43. uri_parser,
  44. client_session)
  45. from pymongo.change_stream import ClusterChangeStream
  46. from pymongo.client_options import ClientOptions
  47. from pymongo.command_cursor import CommandCursor
  48. from pymongo.cursor_manager import CursorManager
  49. from pymongo.errors import (AutoReconnect,
  50. BulkWriteError,
  51. ConfigurationError,
  52. ConnectionFailure,
  53. InvalidOperation,
  54. NotPrimaryError,
  55. OperationFailure,
  56. PyMongoError,
  57. ServerSelectionTimeoutError)
  58. from pymongo.pool import ConnectionClosedReason
  59. from pymongo.read_preferences import ReadPreference
  60. from pymongo.server_selectors import (writable_preferred_server_selector,
  61. writable_server_selector)
  62. from pymongo.server_type import SERVER_TYPE
  63. from pymongo.topology import (Topology,
  64. _ErrorContext)
  65. from pymongo.topology_description import TOPOLOGY_TYPE
  66. from pymongo.settings import TopologySettings
  67. from pymongo.uri_parser import (_handle_option_deprecations,
  68. _handle_security_options,
  69. _normalize_options,
  70. _check_options)
  71. from pymongo.write_concern import DEFAULT_WRITE_CONCERN
  72. class MongoClient(common.BaseObject):
  73. """
  74. A client-side representation of a MongoDB cluster.
  75. Instances can represent either a standalone MongoDB server, a replica
  76. set, or a sharded cluster. Instances of this class are responsible for
  77. maintaining up-to-date state of the cluster, and possibly cache
  78. resources related to this, including background threads for monitoring,
  79. and connection pools.
  80. """
  81. HOST = "localhost"
  82. PORT = 27017
  83. # Define order to retrieve options from ClientOptions for __repr__.
  84. # No host/port; these are retrieved from TopologySettings.
  85. _constructor_args = ('document_class', 'tz_aware', 'connect')
  86. def __init__(
  87. self,
  88. host=None,
  89. port=None,
  90. document_class=dict,
  91. tz_aware=None,
  92. connect=None,
  93. type_registry=None,
  94. **kwargs):
  95. """Client for a MongoDB instance, a replica set, or a set of mongoses.
  96. The client object is thread-safe and has connection-pooling built in.
  97. If an operation fails because of a network error,
  98. :class:`~pymongo.errors.ConnectionFailure` is raised and the client
  99. reconnects in the background. Application code should handle this
  100. exception (recognizing that the operation failed) and then continue to
  101. execute.
  102. The `host` parameter can be a full `mongodb URI
  103. <http://dochub.mongodb.org/core/connections>`_, in addition to
  104. a simple hostname. It can also be a list of hostnames or
  105. URIs. Any port specified in the host string(s) will override
  106. the `port` parameter. If multiple mongodb URIs containing
  107. database or auth information are passed, the last database,
  108. username, and password present will be used. For username and
  109. passwords reserved characters like ':', '/', '+' and '@' must be
  110. percent encoded following RFC 2396::
  111. try:
  112. # Python 3.x
  113. from urllib.parse import quote_plus
  114. except ImportError:
  115. # Python 2.x
  116. from urllib import quote_plus
  117. uri = "mongodb://%s:%s@%s" % (
  118. quote_plus(user), quote_plus(password), host)
  119. client = MongoClient(uri)
  120. Unix domain sockets are also supported. The socket path must be percent
  121. encoded in the URI::
  122. uri = "mongodb://%s:%s@%s" % (
  123. quote_plus(user), quote_plus(password), quote_plus(socket_path))
  124. client = MongoClient(uri)
  125. But not when passed as a simple hostname::
  126. client = MongoClient('/tmp/mongodb-27017.sock')
  127. Starting with version 3.6, PyMongo supports mongodb+srv:// URIs. The
  128. URI must include one, and only one, hostname. The hostname will be
  129. resolved to one or more DNS `SRV records
  130. <https://en.wikipedia.org/wiki/SRV_record>`_ which will be used
  131. as the seed list for connecting to the MongoDB deployment. When using
  132. SRV URIs, the `authSource` and `replicaSet` configuration options can
  133. be specified using `TXT records
  134. <https://en.wikipedia.org/wiki/TXT_record>`_. See the
  135. `Initial DNS Seedlist Discovery spec
  136. <https://github.com/mongodb/specifications/blob/master/source/
  137. initial-dns-seedlist-discovery/initial-dns-seedlist-discovery.rst>`_
  138. for more details. Note that the use of SRV URIs implicitly enables
  139. TLS support. Pass tls=false in the URI to override.
  140. .. note:: MongoClient creation will block waiting for answers from
  141. DNS when mongodb+srv:// URIs are used.
  142. .. note:: Starting with version 3.0 the :class:`MongoClient`
  143. constructor no longer blocks while connecting to the server or
  144. servers, and it no longer raises
  145. :class:`~pymongo.errors.ConnectionFailure` if they are
  146. unavailable, nor :class:`~pymongo.errors.ConfigurationError`
  147. if the user's credentials are wrong. Instead, the constructor
  148. returns immediately and launches the connection process on
  149. background threads. You can check if the server is available
  150. like this::
  151. from pymongo.errors import ConnectionFailure
  152. client = MongoClient()
  153. try:
  154. # The ping command is cheap and does not require auth.
  155. client.admin.command('ping')
  156. except ConnectionFailure:
  157. print("Server not available")
  158. .. warning:: When using PyMongo in a multiprocessing context, please
  159. read :ref:`multiprocessing` first.
  160. .. note:: Many of the following options can be passed using a MongoDB
  161. URI or keyword parameters. If the same option is passed in a URI and
  162. as a keyword parameter the keyword parameter takes precedence.
  163. :Parameters:
  164. - `host` (optional): hostname or IP address or Unix domain socket
  165. path of a single mongod or mongos instance to connect to, or a
  166. mongodb URI, or a list of hostnames / mongodb URIs. If `host` is
  167. an IPv6 literal it must be enclosed in '[' and ']' characters
  168. following the RFC2732 URL syntax (e.g. '[::1]' for localhost).
  169. Multihomed and round robin DNS addresses are **not** supported.
  170. - `port` (optional): port number on which to connect
  171. - `document_class` (optional): default class to use for
  172. documents returned from queries on this client
  173. - `tz_aware` (optional): if ``True``,
  174. :class:`~datetime.datetime` instances returned as values
  175. in a document by this :class:`MongoClient` will be timezone
  176. aware (otherwise they will be naive)
  177. - `connect` (optional): if ``True`` (the default), immediately
  178. begin connecting to MongoDB in the background. Otherwise connect
  179. on the first operation.
  180. - `type_registry` (optional): instance of
  181. :class:`~bson.codec_options.TypeRegistry` to enable encoding
  182. and decoding of custom types.
  183. | **Other optional parameters can be passed as keyword arguments:**
  184. - `directConnection` (optional): if ``True``, forces this client to
  185. connect directly to the specified MongoDB host as a standalone.
  186. If ``false``, the client connects to the entire replica set of
  187. which the given MongoDB host(s) is a part. If this is ``True``
  188. and a mongodb+srv:// URI or a URI containing multiple seeds is
  189. provided, an exception will be raised.
  190. - `maxPoolSize` (optional): The maximum allowable number of
  191. concurrent connections to each connected server. Requests to a
  192. server will block if there are `maxPoolSize` outstanding
  193. connections to the requested server. Defaults to 100. Cannot be 0.
  194. - `minPoolSize` (optional): The minimum required number of concurrent
  195. connections that the pool will maintain to each connected server.
  196. Default is 0.
  197. - `maxIdleTimeMS` (optional): The maximum number of milliseconds that
  198. a connection can remain idle in the pool before being removed and
  199. replaced. Defaults to `None` (no limit).
  200. - `socketTimeoutMS`: (integer or None) Controls how long (in
  201. milliseconds) the driver will wait for a response after sending an
  202. ordinary (non-monitoring) database operation before concluding that
  203. a network error has occurred. ``0`` or ``None`` means no timeout.
  204. Defaults to ``None`` (no timeout).
  205. - `connectTimeoutMS`: (integer or None) Controls how long (in
  206. milliseconds) the driver will wait during server monitoring when
  207. connecting a new socket to a server before concluding the server
  208. is unavailable. ``0`` or ``None`` means no timeout.
  209. Defaults to ``20000`` (20 seconds).
  210. - `server_selector`: (callable or None) Optional, user-provided
  211. function that augments server selection rules. The function should
  212. accept as an argument a list of
  213. :class:`~pymongo.server_description.ServerDescription` objects and
  214. return a list of server descriptions that should be considered
  215. suitable for the desired operation.
  216. - `serverSelectionTimeoutMS`: (integer) Controls how long (in
  217. milliseconds) the driver will wait to find an available,
  218. appropriate server to carry out a database operation; while it is
  219. waiting, multiple server monitoring operations may be carried out,
  220. each controlled by `connectTimeoutMS`. Defaults to ``30000`` (30
  221. seconds).
  222. - `waitQueueTimeoutMS`: (integer or None) How long (in milliseconds)
  223. a thread will wait for a socket from the pool if the pool has no
  224. free sockets. Defaults to ``None`` (no timeout).
  225. - `waitQueueMultiple`: (integer or None) Multiplied by maxPoolSize
  226. to give the number of threads allowed to wait for a socket at one
  227. time. Defaults to ``None`` (no limit).
  228. - `heartbeatFrequencyMS`: (optional) The number of milliseconds
  229. between periodic server checks, or None to accept the default
  230. frequency of 10 seconds.
  231. - `appname`: (string or None) The name of the application that
  232. created this MongoClient instance. MongoDB 3.4 and newer will
  233. print this value in the server log upon establishing each
  234. connection. It is also recorded in the slow query log and
  235. profile collections.
  236. - `driver`: (pair or None) A driver implemented on top of PyMongo can
  237. pass a :class:`~pymongo.driver_info.DriverInfo` to add its name,
  238. version, and platform to the message printed in the server log when
  239. establishing a connection.
  240. - `event_listeners`: a list or tuple of event listeners. See
  241. :mod:`~pymongo.monitoring` for details.
  242. - `retryWrites`: (boolean) Whether supported write operations
  243. executed within this MongoClient will be retried once after a
  244. network error on MongoDB 3.6+. Defaults to ``True``.
  245. The supported write operations are:
  246. - :meth:`~pymongo.collection.Collection.bulk_write`, as long as
  247. :class:`~pymongo.operations.UpdateMany` or
  248. :class:`~pymongo.operations.DeleteMany` are not included.
  249. - :meth:`~pymongo.collection.Collection.delete_one`
  250. - :meth:`~pymongo.collection.Collection.insert_one`
  251. - :meth:`~pymongo.collection.Collection.insert_many`
  252. - :meth:`~pymongo.collection.Collection.replace_one`
  253. - :meth:`~pymongo.collection.Collection.update_one`
  254. - :meth:`~pymongo.collection.Collection.find_one_and_delete`
  255. - :meth:`~pymongo.collection.Collection.find_one_and_replace`
  256. - :meth:`~pymongo.collection.Collection.find_one_and_update`
  257. Unsupported write operations include, but are not limited to,
  258. :meth:`~pymongo.collection.Collection.aggregate` using the ``$out``
  259. pipeline operator and any operation with an unacknowledged write
  260. concern (e.g. {w: 0})). See
  261. https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst
  262. - `retryReads`: (boolean) Whether supported read operations
  263. executed within this MongoClient will be retried once after a
  264. network error on MongoDB 3.6+. Defaults to ``True``.
  265. The supported read operations are:
  266. :meth:`~pymongo.collection.Collection.find`,
  267. :meth:`~pymongo.collection.Collection.find_one`,
  268. :meth:`~pymongo.collection.Collection.aggregate` without ``$out``,
  269. :meth:`~pymongo.collection.Collection.distinct`,
  270. :meth:`~pymongo.collection.Collection.count`,
  271. :meth:`~pymongo.collection.Collection.estimated_document_count`,
  272. :meth:`~pymongo.collection.Collection.count_documents`,
  273. :meth:`pymongo.collection.Collection.watch`,
  274. :meth:`~pymongo.collection.Collection.list_indexes`,
  275. :meth:`pymongo.database.Database.watch`,
  276. :meth:`~pymongo.database.Database.list_collections`,
  277. :meth:`pymongo.mongo_client.MongoClient.watch`,
  278. and :meth:`~pymongo.mongo_client.MongoClient.list_databases`.
  279. Unsupported read operations include, but are not limited to:
  280. :meth:`~pymongo.collection.Collection.map_reduce`,
  281. :meth:`~pymongo.collection.Collection.inline_map_reduce`,
  282. :meth:`~pymongo.database.Database.command`,
  283. and any getMore operation on a cursor.
  284. Enabling retryable reads makes applications more resilient to
  285. transient errors such as network failures, database upgrades, and
  286. replica set failovers. For an exact definition of which errors
  287. trigger a retry, see the `retryable reads specification
  288. <https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.rst>`_.
  289. - `socketKeepAlive`: (boolean) **DEPRECATED** Whether to send
  290. periodic keep-alive packets on connected sockets. Defaults to
  291. ``True``. Disabling it is not recommended, see
  292. https://docs.mongodb.com/manual/faq/diagnostics/#does-tcp-keepalive-time-affect-mongodb-deployments",
  293. - `compressors`: Comma separated list of compressors for wire
  294. protocol compression. The list is used to negotiate a compressor
  295. with the server. Currently supported options are "snappy", "zlib"
  296. and "zstd". Support for snappy requires the
  297. `python-snappy <https://pypi.org/project/python-snappy/>`_ package.
  298. zlib support requires the Python standard library zlib module. zstd
  299. requires the `zstandard <https://pypi.org/project/zstandard/>`_
  300. package. By default no compression is used. Compression support
  301. must also be enabled on the server. MongoDB 3.4+ supports snappy
  302. compression. MongoDB 3.6 adds support for zlib. MongoDB 4.2 adds
  303. support for zstd.
  304. - `zlibCompressionLevel`: (int) The zlib compression level to use
  305. when zlib is used as the wire protocol compressor. Supported values
  306. are -1 through 9. -1 tells the zlib library to use its default
  307. compression level (usually 6). 0 means no compression. 1 is best
  308. speed. 9 is best compression. Defaults to -1.
  309. - `uuidRepresentation`: The BSON representation to use when encoding
  310. from and decoding to instances of :class:`~uuid.UUID`. Valid
  311. values are `pythonLegacy` (the default), `javaLegacy`,
  312. `csharpLegacy`, `standard` and `unspecified`. New applications
  313. should consider setting this to `standard` for cross language
  314. compatibility. See :ref:`handling-uuid-data-example` for details.
  315. - `unicode_decode_error_handler`: The error handler to apply when
  316. a Unicode-related error occurs during BSON decoding that would
  317. otherwise raise :exc:`UnicodeDecodeError`. Valid options include
  318. 'strict', 'replace', and 'ignore'. Defaults to 'strict'.
  319. | **Write Concern options:**
  320. | (Only set if passed. No default values.)
  321. - `w`: (integer or string) If this is a replica set, write operations
  322. will block until they have been replicated to the specified number
  323. or tagged set of servers. `w=<int>` always includes the replica set
  324. primary (e.g. w=3 means write to the primary and wait until
  325. replicated to **two** secondaries). Passing w=0 **disables write
  326. acknowledgement** and all other write concern options.
  327. - `wTimeoutMS`: (integer) Used in conjunction with `w`. Specify a value
  328. in milliseconds to control how long to wait for write propagation
  329. to complete. If replication does not complete in the given
  330. timeframe, a timeout exception is raised. Passing wTimeoutMS=0
  331. will cause **write operations to wait indefinitely**.
  332. - `journal`: If ``True`` block until write operations have been
  333. committed to the journal. Cannot be used in combination with
  334. `fsync`. Prior to MongoDB 2.6 this option was ignored if the server
  335. was running without journaling. Starting with MongoDB 2.6 write
  336. operations will fail with an exception if this option is used when
  337. the server is running without journaling.
  338. - `fsync`: If ``True`` and the server is running without journaling,
  339. blocks until the server has synced all data files to disk. If the
  340. server is running with journaling, this acts the same as the `j`
  341. option, blocking until write operations have been committed to the
  342. journal. Cannot be used in combination with `j`.
  343. | **Replica set keyword arguments for connecting with a replica set
  344. - either directly or via a mongos:**
  345. - `replicaSet`: (string or None) The name of the replica set to
  346. connect to. The driver will verify that all servers it connects to
  347. match this name. Implies that the hosts specified are a seed list
  348. and the driver should attempt to find all members of the set.
  349. Defaults to ``None``.
  350. | **Read Preference:**
  351. - `readPreference`: The replica set read preference for this client.
  352. One of ``primary``, ``primaryPreferred``, ``secondary``,
  353. ``secondaryPreferred``, or ``nearest``. Defaults to ``primary``.
  354. - `readPreferenceTags`: Specifies a tag set as a comma-separated list
  355. of colon-separated key-value pairs. For example ``dc:ny,rack:1``.
  356. Defaults to ``None``.
  357. - `maxStalenessSeconds`: (integer) The maximum estimated
  358. length of time a replica set secondary can fall behind the primary
  359. in replication before it will no longer be selected for operations.
  360. Defaults to ``-1``, meaning no maximum. If maxStalenessSeconds
  361. is set, it must be a positive integer greater than or equal to
  362. 90 seconds.
  363. .. seealso:: :doc:`/examples/server_selection`
  364. | **Authentication:**
  365. - `username`: A string.
  366. - `password`: A string.
  367. Although username and password must be percent-escaped in a MongoDB
  368. URI, they must not be percent-escaped when passed as parameters. In
  369. this example, both the space and slash special characters are passed
  370. as-is::
  371. MongoClient(username="user name", password="pass/word")
  372. - `authSource`: The database to authenticate on. Defaults to the
  373. database specified in the URI, if provided, or to "admin".
  374. - `authMechanism`: See :data:`~pymongo.auth.MECHANISMS` for options.
  375. If no mechanism is specified, PyMongo automatically uses MONGODB-CR
  376. when connected to a pre-3.0 version of MongoDB, SCRAM-SHA-1 when
  377. connected to MongoDB 3.0 through 3.6, and negotiates the mechanism
  378. to use (SCRAM-SHA-1 or SCRAM-SHA-256) when connected to MongoDB
  379. 4.0+.
  380. - `authMechanismProperties`: Used to specify authentication mechanism
  381. specific options. To specify the service name for GSSAPI
  382. authentication pass authMechanismProperties='SERVICE_NAME:<service
  383. name>'.
  384. To specify the session token for MONGODB-AWS authentication pass
  385. ``authMechanismProperties='AWS_SESSION_TOKEN:<session token>'``.
  386. .. seealso:: :doc:`/examples/authentication`
  387. | **TLS/SSL configuration:**
  388. - `tls`: (boolean) If ``True``, create the connection to the server
  389. using transport layer security. Defaults to ``False``.
  390. - `tlsInsecure`: (boolean) Specify whether TLS constraints should be
  391. relaxed as much as possible. Setting ``tlsInsecure=True`` implies
  392. ``tlsAllowInvalidCertificates=True`` and
  393. ``tlsAllowInvalidHostnames=True``. Defaults to ``False``. Think
  394. very carefully before setting this to ``True`` as it dramatically
  395. reduces the security of TLS.
  396. - `tlsAllowInvalidCertificates`: (boolean) If ``True``, continues
  397. the TLS handshake regardless of the outcome of the certificate
  398. verification process. If this is ``False``, and a value is not
  399. provided for ``tlsCAFile``, PyMongo will attempt to load system
  400. provided CA certificates. If the python version in use does not
  401. support loading system CA certificates then the ``tlsCAFile``
  402. parameter must point to a file of CA certificates.
  403. ``tlsAllowInvalidCertificates=False`` implies ``tls=True``.
  404. Defaults to ``False``. Think very carefully before setting this
  405. to ``True`` as that could make your application vulnerable to
  406. man-in-the-middle attacks.
  407. - `tlsAllowInvalidHostnames`: (boolean) If ``True``, disables TLS
  408. hostname verification. ``tlsAllowInvalidHostnames=False`` implies
  409. ``tls=True``. Defaults to ``False``. Think very carefully before
  410. setting this to ``True`` as that could make your application
  411. vulnerable to man-in-the-middle attacks.
  412. - `tlsCAFile`: A file containing a single or a bundle of
  413. "certification authority" certificates, which are used to validate
  414. certificates passed from the other end of the connection.
  415. Implies ``tls=True``. Defaults to ``None``.
  416. - `tlsCertificateKeyFile`: A file containing the client certificate
  417. and private key. Implies ``tls=True``. Defaults to ``None``.
  418. - `tlsCRLFile`: A file containing a PEM or DER formatted
  419. certificate revocation list. Only supported by python 2.7.9+
  420. (pypy 2.5.1+). Implies ``tls=True``. Defaults to ``None``.
  421. - `tlsCertificateKeyFilePassword`: The password or passphrase for
  422. decrypting the private key in ``tlsCertificateKeyFile``. Only
  423. necessary if the private key is encrypted. Only supported by
  424. python 2.7.9+ (pypy 2.5.1+) and 3.3+. Defaults to ``None``.
  425. - `tlsDisableOCSPEndpointCheck`: (boolean) If ``True``, disables
  426. certificate revocation status checking via the OCSP responder
  427. specified on the server certificate. Defaults to ``False``.
  428. - `ssl`: (boolean) Alias for ``tls``.
  429. | **Read Concern options:**
  430. | (If not set explicitly, this will use the server default)
  431. - `readConcernLevel`: (string) The read concern level specifies the
  432. level of isolation for read operations. For example, a read
  433. operation using a read concern level of ``majority`` will only
  434. return data that has been written to a majority of nodes. If the
  435. level is left unspecified, the server default will be used.
  436. | **Client side encryption options:**
  437. | (If not set explicitly, client side encryption will not be enabled.)
  438. - `auto_encryption_opts`: A
  439. :class:`~pymongo.encryption_options.AutoEncryptionOpts` which
  440. configures this client to automatically encrypt collection commands
  441. and automatically decrypt results. See
  442. :ref:`automatic-client-side-encryption` for an example.
  443. If a :class:`MongoClient` is configured with
  444. ``auto_encryption_opts`` and a non-None ``maxPoolSize``, a
  445. separate internal ``MongoClient`` is created if any of the
  446. following are true:
  447. - A ``key_vault_client`` is not passed to
  448. :class:`~pymongo.encryption_options.AutoEncryptionOpts`
  449. - ``bypass_auto_encrpytion=False`` is passed to
  450. :class:`~pymongo.encryption_options.AutoEncryptionOpts`
  451. | **Versioned API options:**
  452. | (If not set explicitly, Versioned API will not be enabled.)
  453. - `server_api`: A
  454. :class:`~pymongo.server_api.ServerApi` which configures this
  455. client to use Versioned API. See :ref:`versioned-api-ref` for
  456. details.
  457. .. mongodoc:: connections
  458. .. versionchanged:: 3.12
  459. Added the ``server_api`` keyword argument.
  460. The following keyword arguments were deprecated:
  461. - ``ssl_certfile`` and ``ssl_keyfile`` were deprecated in favor
  462. of ``tlsCertificateKeyFile``.
  463. .. versionchanged:: 3.11
  464. Added the following keyword arguments and URI options:
  465. - ``tlsDisableOCSPEndpointCheck``
  466. - ``directConnection``
  467. .. versionchanged:: 3.9
  468. Added the ``retryReads`` keyword argument and URI option.
  469. Added the ``tlsInsecure`` keyword argument and URI option.
  470. The following keyword arguments and URI options were deprecated:
  471. - ``wTimeout`` was deprecated in favor of ``wTimeoutMS``.
  472. - ``j`` was deprecated in favor of ``journal``.
  473. - ``ssl_cert_reqs`` was deprecated in favor of
  474. ``tlsAllowInvalidCertificates``.
  475. - ``ssl_match_hostname`` was deprecated in favor of
  476. ``tlsAllowInvalidHostnames``.
  477. - ``ssl_ca_certs`` was deprecated in favor of ``tlsCAFile``.
  478. - ``ssl_certfile`` was deprecated in favor of
  479. ``tlsCertificateKeyFile``.
  480. - ``ssl_crlfile`` was deprecated in favor of ``tlsCRLFile``.
  481. - ``ssl_pem_passphrase`` was deprecated in favor of
  482. ``tlsCertificateKeyFilePassword``.
  483. .. versionchanged:: 3.9
  484. ``retryWrites`` now defaults to ``True``.
  485. .. versionchanged:: 3.8
  486. Added the ``server_selector`` keyword argument.
  487. Added the ``type_registry`` keyword argument.
  488. .. versionchanged:: 3.7
  489. Added the ``driver`` keyword argument.
  490. .. versionchanged:: 3.6
  491. Added support for mongodb+srv:// URIs.
  492. Added the ``retryWrites`` keyword argument and URI option.
  493. .. versionchanged:: 3.5
  494. Add ``username`` and ``password`` options. Document the
  495. ``authSource``, ``authMechanism``, and ``authMechanismProperties``
  496. options.
  497. Deprecated the ``socketKeepAlive`` keyword argument and URI option.
  498. ``socketKeepAlive`` now defaults to ``True``.
  499. .. versionchanged:: 3.0
  500. :class:`~pymongo.mongo_client.MongoClient` is now the one and only
  501. client class for a standalone server, mongos, or replica set.
  502. It includes the functionality that had been split into
  503. :class:`~pymongo.mongo_client.MongoReplicaSetClient`: it can connect
  504. to a replica set, discover all its members, and monitor the set for
  505. stepdowns, elections, and reconfigs.
  506. The :class:`~pymongo.mongo_client.MongoClient` constructor no
  507. longer blocks while connecting to the server or servers, and it no
  508. longer raises :class:`~pymongo.errors.ConnectionFailure` if they
  509. are unavailable, nor :class:`~pymongo.errors.ConfigurationError`
  510. if the user's credentials are wrong. Instead, the constructor
  511. returns immediately and launches the connection process on
  512. background threads.
  513. Therefore the ``alive`` method is removed since it no longer
  514. provides meaningful information; even if the client is disconnected,
  515. it may discover a server in time to fulfill the next operation.
  516. In PyMongo 2.x, :class:`~pymongo.MongoClient` accepted a list of
  517. standalone MongoDB servers and used the first it could connect to::
  518. MongoClient(['host1.com:27017', 'host2.com:27017'])
  519. A list of multiple standalones is no longer supported; if multiple
  520. servers are listed they must be members of the same replica set, or
  521. mongoses in the same sharded cluster.
  522. The behavior for a list of mongoses is changed from "high
  523. availability" to "load balancing". Before, the client connected to
  524. the lowest-latency mongos in the list, and used it until a network
  525. error prompted it to re-evaluate all mongoses' latencies and
  526. reconnect to one of them. In PyMongo 3, the client monitors its
  527. network latency to all the mongoses continuously, and distributes
  528. operations evenly among those with the lowest latency. See
  529. :ref:`mongos-load-balancing` for more information.
  530. The ``connect`` option is added.
  531. The ``start_request``, ``in_request``, and ``end_request`` methods
  532. are removed, as well as the ``auto_start_request`` option.
  533. The ``copy_database`` method is removed, see the
  534. :doc:`copy_database examples </examples/copydb>` for alternatives.
  535. The :meth:`MongoClient.disconnect` method is removed; it was a
  536. synonym for :meth:`~pymongo.MongoClient.close`.
  537. :class:`~pymongo.mongo_client.MongoClient` no longer returns an
  538. instance of :class:`~pymongo.database.Database` for attribute names
  539. with leading underscores. You must use dict-style lookups instead::
  540. client['__my_database__']
  541. Not::
  542. client.__my_database__
  543. """
  544. self.__init_kwargs = {'host': host,
  545. 'port': port,
  546. 'document_class': document_class,
  547. 'tz_aware': tz_aware,
  548. 'connect': connect,
  549. 'type_registry': type_registry}
  550. self.__init_kwargs.update(kwargs)
  551. if host is None:
  552. host = self.HOST
  553. if isinstance(host, string_type):
  554. host = [host]
  555. if port is None:
  556. port = self.PORT
  557. if not isinstance(port, int):
  558. raise TypeError("port must be an instance of int")
  559. # _pool_class, _monitor_class, and _condition_class are for deep
  560. # customization of PyMongo, e.g. Motor.
  561. pool_class = kwargs.pop('_pool_class', None)
  562. monitor_class = kwargs.pop('_monitor_class', None)
  563. condition_class = kwargs.pop('_condition_class', None)
  564. # Parse options passed as kwargs.
  565. keyword_opts = common._CaseInsensitiveDictionary(kwargs)
  566. keyword_opts['document_class'] = document_class
  567. seeds = set()
  568. username = None
  569. password = None
  570. dbase = None
  571. opts = common._CaseInsensitiveDictionary()
  572. fqdn = None
  573. for entity in host:
  574. # A hostname can only include a-z, 0-9, '-' and '.'. If we find a '/'
  575. # it must be a URI,
  576. # https://en.wikipedia.org/wiki/Hostname#Restrictions_on_valid_host_names
  577. if "/" in entity:
  578. # Determine connection timeout from kwargs.
  579. timeout = keyword_opts.get("connecttimeoutms")
  580. if timeout is not None:
  581. timeout = common.validate_timeout_or_none_or_zero(
  582. keyword_opts.cased_key("connecttimeoutms"), timeout)
  583. res = uri_parser.parse_uri(
  584. entity, port, validate=True, warn=True, normalize=False,
  585. connect_timeout=timeout)
  586. seeds.update(res["nodelist"])
  587. username = res["username"] or username
  588. password = res["password"] or password
  589. dbase = res["database"] or dbase
  590. opts = res["options"]
  591. fqdn = res["fqdn"]
  592. else:
  593. seeds.update(uri_parser.split_hosts(entity, port))
  594. if not seeds:
  595. raise ConfigurationError("need to specify at least one host")
  596. # Add options with named keyword arguments to the parsed kwarg options.
  597. if type_registry is not None:
  598. keyword_opts['type_registry'] = type_registry
  599. if tz_aware is None:
  600. tz_aware = opts.get('tz_aware', False)
  601. if connect is None:
  602. connect = opts.get('connect', True)
  603. keyword_opts['tz_aware'] = tz_aware
  604. keyword_opts['connect'] = connect
  605. # Handle deprecated options in kwarg options.
  606. keyword_opts = _handle_option_deprecations(keyword_opts)
  607. # Validate kwarg options.
  608. keyword_opts = common._CaseInsensitiveDictionary(dict(common.validate(
  609. keyword_opts.cased_key(k), v) for k, v in keyword_opts.items()))
  610. # Override connection string options with kwarg options.
  611. opts.update(keyword_opts)
  612. # Handle security-option conflicts in combined options.
  613. opts = _handle_security_options(opts)
  614. # Normalize combined options.
  615. opts = _normalize_options(opts)
  616. _check_options(seeds, opts)
  617. # Username and password passed as kwargs override user info in URI.
  618. username = opts.get("username", username)
  619. password = opts.get("password", password)
  620. if 'socketkeepalive' in opts:
  621. warnings.warn(
  622. "The socketKeepAlive option is deprecated. It now"
  623. "defaults to true and disabling it is not recommended, see "
  624. "https://docs.mongodb.com/manual/faq/diagnostics/"
  625. "#does-tcp-keepalive-time-affect-mongodb-deployments",
  626. DeprecationWarning, stacklevel=2)
  627. self.__options = options = ClientOptions(
  628. username, password, dbase, opts)
  629. self.__default_database_name = dbase
  630. self.__lock = threading.Lock()
  631. self.__cursor_manager = None
  632. self.__kill_cursors_queue = []
  633. self._event_listeners = options.pool_options.event_listeners
  634. # Cache of existing indexes used by ensure_index ops.
  635. self.__index_cache = {}
  636. self.__index_cache_lock = threading.Lock()
  637. super(MongoClient, self).__init__(options.codec_options,
  638. options.read_preference,
  639. options.write_concern,
  640. options.read_concern)
  641. self.__all_credentials = {}
  642. creds = options.credentials
  643. if creds:
  644. self._cache_credentials(creds.source, creds)
  645. self._topology_settings = TopologySettings(
  646. seeds=seeds,
  647. replica_set_name=options.replica_set_name,
  648. pool_class=pool_class,
  649. pool_options=options.pool_options,
  650. monitor_class=monitor_class,
  651. condition_class=condition_class,
  652. local_threshold_ms=options.local_threshold_ms,
  653. server_selection_timeout=options.server_selection_timeout,
  654. server_selector=options.server_selector,
  655. heartbeat_frequency=options.heartbeat_frequency,
  656. fqdn=fqdn,
  657. direct_connection=options.direct_connection,
  658. load_balanced=options.load_balanced,
  659. )
  660. self._topology = Topology(self._topology_settings)
  661. def target():
  662. client = self_ref()
  663. if client is None:
  664. return False # Stop the executor.
  665. MongoClient._process_periodic_tasks(client)
  666. return True
  667. executor = periodic_executor.PeriodicExecutor(
  668. interval=common.KILL_CURSOR_FREQUENCY,
  669. min_interval=0.5,
  670. target=target,
  671. name="pymongo_kill_cursors_thread")
  672. # We strongly reference the executor and it weakly references us via
  673. # this closure. When the client is freed, stop the executor soon.
  674. self_ref = weakref.ref(self, executor.close)
  675. self._kill_cursors_executor = executor
  676. if connect:
  677. self._get_topology()
  678. self._encrypter = None
  679. if self.__options.auto_encryption_opts:
  680. from pymongo.encryption import _Encrypter
  681. self._encrypter = _Encrypter(
  682. self, self.__options.auto_encryption_opts)
  683. def _duplicate(self, **kwargs):
  684. args = self.__init_kwargs.copy()
  685. args.update(kwargs)
  686. return MongoClient(**args)
  687. def _cache_credentials(self, source, credentials, connect=False):
  688. """Save a set of authentication credentials.
  689. The credentials are used to login a socket whenever one is created.
  690. If `connect` is True, verify the credentials on the server first.
  691. """
  692. # Don't let other threads affect this call's data.
  693. all_credentials = self.__all_credentials.copy()
  694. if source in all_credentials:
  695. # Nothing to do if we already have these credentials.
  696. if credentials == all_credentials[source]:
  697. return
  698. raise OperationFailure('Another user is already authenticated '
  699. 'to this database. You must logout first.')
  700. if connect:
  701. server = self._get_topology().select_server(
  702. writable_preferred_server_selector)
  703. # get_socket() logs out of the database if logged in with old
  704. # credentials, and logs in with new ones.
  705. with server.get_socket(all_credentials) as sock_info:
  706. sock_info.authenticate(credentials)
  707. # If several threads run _cache_credentials at once, last one wins.
  708. self.__all_credentials[source] = credentials
  709. def _purge_credentials(self, source):
  710. """Purge credentials from the authentication cache."""
  711. self.__all_credentials.pop(source, None)
  712. def _cached(self, dbname, coll, index):
  713. """Test if `index` is cached."""
  714. cache = self.__index_cache
  715. now = datetime.datetime.utcnow()
  716. with self.__index_cache_lock:
  717. return (dbname in cache and
  718. coll in cache[dbname] and
  719. index in cache[dbname][coll] and
  720. now < cache[dbname][coll][index])
  721. def _cache_index(self, dbname, collection, index, cache_for):
  722. """Add an index to the index cache for ensure_index operations."""
  723. now = datetime.datetime.utcnow()
  724. expire = datetime.timedelta(seconds=cache_for) + now
  725. with self.__index_cache_lock:
  726. if dbname not in self.__index_cache:
  727. self.__index_cache[dbname] = {}
  728. self.__index_cache[dbname][collection] = {}
  729. self.__index_cache[dbname][collection][index] = expire
  730. elif collection not in self.__index_cache[dbname]:
  731. self.__index_cache[dbname][collection] = {}
  732. self.__index_cache[dbname][collection][index] = expire
  733. else:
  734. self.__index_cache[dbname][collection][index] = expire
  735. def _purge_index(self, database_name,
  736. collection_name=None, index_name=None):
  737. """Purge an index from the index cache.
  738. If `index_name` is None purge an entire collection.
  739. If `collection_name` is None purge an entire database.
  740. """
  741. with self.__index_cache_lock:
  742. if not database_name in self.__index_cache:
  743. return
  744. if collection_name is None:
  745. del self.__index_cache[database_name]
  746. return
  747. if not collection_name in self.__index_cache[database_name]:
  748. return
  749. if index_name is None:
  750. del self.__index_cache[database_name][collection_name]
  751. return
  752. if index_name in self.__index_cache[database_name][collection_name]:
  753. del self.__index_cache[database_name][collection_name][index_name]
  754. def _server_property(self, attr_name):
  755. """An attribute of the current server's description.
  756. If the client is not connected, this will block until a connection is
  757. established or raise ServerSelectionTimeoutError if no server is
  758. available.
  759. Not threadsafe if used multiple times in a single method, since
  760. the server may change. In such cases, store a local reference to a
  761. ServerDescription first, then use its properties.
  762. """
  763. server = self._topology.select_server(
  764. writable_server_selector)
  765. return getattr(server.description, attr_name)
  766. def watch(self, pipeline=None, full_document=None, resume_after=None,
  767. max_await_time_ms=None, batch_size=None, collation=None,
  768. start_at_operation_time=None, session=None, start_after=None):
  769. """Watch changes on this cluster.
  770. Performs an aggregation with an implicit initial ``$changeStream``
  771. stage and returns a
  772. :class:`~pymongo.change_stream.ClusterChangeStream` cursor which
  773. iterates over changes on all databases on this cluster.
  774. Introduced in MongoDB 4.0.
  775. .. code-block:: python
  776. with client.watch() as stream:
  777. for change in stream:
  778. print(change)
  779. The :class:`~pymongo.change_stream.ClusterChangeStream` iterable
  780. blocks until the next change document is returned or an error is
  781. raised. If the
  782. :meth:`~pymongo.change_stream.ClusterChangeStream.next` method
  783. encounters a network error when retrieving a batch from the server,
  784. it will automatically attempt to recreate the cursor such that no
  785. change events are missed. Any error encountered during the resume
  786. attempt indicates there may be an outage and will be raised.
  787. .. code-block:: python
  788. try:
  789. with client.watch(
  790. [{'$match': {'operationType': 'insert'}}]) as stream:
  791. for insert_change in stream:
  792. print(insert_change)
  793. except pymongo.errors.PyMongoError:
  794. # The ChangeStream encountered an unrecoverable error or the
  795. # resume attempt failed to recreate the cursor.
  796. logging.error('...')
  797. For a precise description of the resume process see the
  798. `change streams specification`_.
  799. :Parameters:
  800. - `pipeline` (optional): A list of aggregation pipeline stages to
  801. append to an initial ``$changeStream`` stage. Not all
  802. pipeline stages are valid after a ``$changeStream`` stage, see the
  803. MongoDB documentation on change streams for the supported stages.
  804. - `full_document` (optional): The fullDocument to pass as an option
  805. to the ``$changeStream`` stage. Allowed values: 'updateLookup'.
  806. When set to 'updateLookup', the change notification for partial
  807. updates will include both a delta describing the changes to the
  808. document, as well as a copy of the entire document that was
  809. changed from some time after the change occurred.
  810. - `resume_after` (optional): A resume token. If provided, the
  811. change stream will start returning changes that occur directly
  812. after the operation specified in the resume token. A resume token
  813. is the _id value of a change document.
  814. - `max_await_time_ms` (optional): The maximum time in milliseconds
  815. for the server to wait for changes before responding to a getMore
  816. operation.
  817. - `batch_size` (optional): The maximum number of documents to return
  818. per batch.
  819. - `collation` (optional): The :class:`~pymongo.collation.Collation`
  820. to use for the aggregation.
  821. - `start_at_operation_time` (optional): If provided, the resulting
  822. change stream will only return changes that occurred at or after
  823. the specified :class:`~bson.timestamp.Timestamp`. Requires
  824. MongoDB >= 4.0.
  825. - `session` (optional): a
  826. :class:`~pymongo.client_session.ClientSession`.
  827. - `start_after` (optional): The same as `resume_after` except that
  828. `start_after` can resume notifications after an invalidate event.
  829. This option and `resume_after` are mutually exclusive.
  830. :Returns:
  831. A :class:`~pymongo.change_stream.ClusterChangeStream` cursor.
  832. .. versionchanged:: 3.9
  833. Added the ``start_after`` parameter.
  834. .. versionadded:: 3.7
  835. .. mongodoc:: changeStreams
  836. .. _change streams specification:
  837. https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst
  838. """
  839. return ClusterChangeStream(
  840. self.admin, pipeline, full_document, resume_after, max_await_time_ms,
  841. batch_size, collation, start_at_operation_time, session,
  842. start_after)
  843. @property
  844. def event_listeners(self):
  845. """The event listeners registered for this client.
  846. See :mod:`~pymongo.monitoring` for details.
  847. """
  848. return self._event_listeners.event_listeners
  849. @property
  850. def topology_description(self):
  851. """The description of the connected MongoDB deployment.
  852. >>> client.topology_description
  853. <TopologyDescription id: 605a7b04e76489833a7c6113, topology_type: ReplicaSetWithPrimary, servers: [<ServerDescription ('localhost', 27017) server_type: RSPrimary, rtt: 0.0007973677999995488>, <ServerDescription ('localhost', 27018) server_type: RSSecondary, rtt: 0.0005540556000003249>, <ServerDescription ('localhost', 27019) server_type: RSSecondary, rtt: 0.0010367483999999649>]>
  854. >>> client.topology_description.topology_type_name
  855. 'ReplicaSetWithPrimary'
  856. Note that the description is periodically updated in the background
  857. but the returned object itself is immutable. Access this property again
  858. to get a more recent
  859. :class:`~pymongo.topology_description.TopologyDescription`.
  860. :Returns:
  861. An instance of
  862. :class:`~pymongo.topology_description.TopologyDescription`.
  863. .. versionadded:: 3.12
  864. """
  865. return self._topology.description
  866. @property
  867. def address(self):
  868. """(host, port) of the current standalone, primary, or mongos, or None.
  869. Accessing :attr:`address` raises :exc:`~.errors.InvalidOperation` if
  870. the client is load-balancing among mongoses, since there is no single
  871. address. Use :attr:`nodes` instead.
  872. If the client is not connected, this will block until a connection is
  873. established or raise ServerSelectionTimeoutError if no server is
  874. available.
  875. .. versionadded:: 3.0
  876. """
  877. topology_type = self._topology._description.topology_type
  878. if topology_type == TOPOLOGY_TYPE.Sharded:
  879. raise InvalidOperation(
  880. 'Cannot use "address" property when load balancing among'
  881. ' mongoses, use "nodes" instead.')
  882. if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary,
  883. TOPOLOGY_TYPE.Single,
  884. TOPOLOGY_TYPE.LoadBalanced):
  885. return None
  886. return self._server_property('address')
  887. @property
  888. def primary(self):
  889. """The (host, port) of the current primary of the replica set.
  890. Returns ``None`` if this client is not connected to a replica set,
  891. there is no primary, or this client was created without the
  892. `replicaSet` option.
  893. .. versionadded:: 3.0
  894. MongoClient gained this property in version 3.0 when
  895. MongoReplicaSetClient's functionality was merged in.
  896. """
  897. return self._topology.get_primary()
  898. @property
  899. def secondaries(self):
  900. """The secondary members known to this client.
  901. A sequence of (host, port) pairs. Empty if this client is not
  902. connected to a replica set, there are no visible secondaries, or this
  903. client was created without the `replicaSet` option.
  904. .. versionadded:: 3.0
  905. MongoClient gained this property in version 3.0 when
  906. MongoReplicaSetClient's functionality was merged in.
  907. """
  908. return self._topology.get_secondaries()
  909. @property
  910. def arbiters(self):
  911. """Arbiters in the replica set.
  912. A sequence of (host, port) pairs. Empty if this client is not
  913. connected to a replica set, there are no arbiters, or this client was
  914. created without the `replicaSet` option.
  915. """
  916. return self._topology.get_arbiters()
  917. @property
  918. def is_primary(self):
  919. """If this client is connected to a server that can accept writes.
  920. True if the current server is a standalone, mongos, or the primary of
  921. a replica set. If the client is not connected, this will block until a
  922. connection is established or raise ServerSelectionTimeoutError if no
  923. server is available.
  924. """
  925. return self._server_property('is_writable')
  926. @property
  927. def is_mongos(self):
  928. """If this client is connected to mongos. If the client is not
  929. connected, this will block until a connection is established or raise
  930. ServerSelectionTimeoutError if no server is available..
  931. """
  932. return self._server_property('server_type') == SERVER_TYPE.Mongos
  933. @property
  934. def max_pool_size(self):
  935. """The maximum allowable number of concurrent connections to each
  936. connected server. Requests to a server will block if there are
  937. `maxPoolSize` outstanding connections to the requested server.
  938. Defaults to 100. Cannot be 0.
  939. When a server's pool has reached `max_pool_size`, operations for that
  940. server block waiting for a socket to be returned to the pool. If
  941. ``waitQueueTimeoutMS`` is set, a blocked operation will raise
  942. :exc:`~pymongo.errors.ConnectionFailure` after a timeout.
  943. By default ``waitQueueTimeoutMS`` is not set.
  944. """
  945. return self.__options.pool_options.max_pool_size
  946. @property
  947. def min_pool_size(self):
  948. """The minimum required number of concurrent connections that the pool
  949. will maintain to each connected server. Default is 0.
  950. """
  951. return self.__options.pool_options.min_pool_size
  952. @property
  953. def max_idle_time_ms(self):
  954. """The maximum number of milliseconds that a connection can remain
  955. idle in the pool before being removed and replaced. Defaults to
  956. `None` (no limit).
  957. """
  958. seconds = self.__options.pool_options.max_idle_time_seconds
  959. if seconds is None:
  960. return None
  961. return 1000 * seconds
  962. @property
  963. def nodes(self):
  964. """Set of all currently connected servers.
  965. .. warning:: When connected to a replica set the value of :attr:`nodes`
  966. can change over time as :class:`MongoClient`'s view of the replica
  967. set changes. :attr:`nodes` can also be an empty set when
  968. :class:`MongoClient` is first instantiated and hasn't yet connected
  969. to any servers, or a network partition causes it to lose connection
  970. to all servers.
  971. """
  972. description = self._topology.description
  973. return frozenset(s.address for s in description.known_servers)
  974. @property
  975. def max_bson_size(self):
  976. """The largest BSON object the connected server accepts in bytes.
  977. If the client is not connected, this will block until a connection is
  978. established or raise ServerSelectionTimeoutError if no server is
  979. available.
  980. """
  981. return self._server_property('max_bson_size')
  982. @property
  983. def max_message_size(self):
  984. """The largest message the connected server accepts in bytes.
  985. If the client is not connected, this will block until a connection is
  986. established or raise ServerSelectionTimeoutError if no server is
  987. available.
  988. """
  989. return self._server_property('max_message_size')
  990. @property
  991. def max_write_batch_size(self):
  992. """The maxWriteBatchSize reported by the server.
  993. If the client is not connected, this will block until a connection is
  994. established or raise ServerSelectionTimeoutError if no server is
  995. available.
  996. Returns a default value when connected to server versions prior to
  997. MongoDB 2.6.
  998. """
  999. return self._server_property('max_write_batch_size')
  1000. @property
  1001. def local_threshold_ms(self):
  1002. """The local threshold for this instance."""
  1003. return self.__options.local_threshold_ms
  1004. @property
  1005. def server_selection_timeout(self):
  1006. """The server selection timeout for this instance in seconds."""
  1007. return self.__options.server_selection_timeout
  1008. @property
  1009. def retry_writes(self):
  1010. """If this instance should retry supported write operations."""
  1011. return self.__options.retry_writes
  1012. @property
  1013. def retry_reads(self):
  1014. """If this instance should retry supported write operations."""
  1015. return self.__options.retry_reads
  1016. def _is_writable(self):
  1017. """Attempt to connect to a writable server, or return False.
  1018. """
  1019. topology = self._get_topology() # Starts monitors if necessary.
  1020. try:
  1021. svr = topology.select_server(writable_server_selector)
  1022. # When directly connected to a secondary, arbiter, etc.,
  1023. # select_server returns it, whatever the selector. Check
  1024. # again if the server is writable.
  1025. return svr.description.is_writable
  1026. except ConnectionFailure:
  1027. return False
  1028. def _end_sessions(self, session_ids):
  1029. """Send endSessions command(s) with the given session ids."""
  1030. try:
  1031. # Use SocketInfo.command directly to avoid implicitly creating
  1032. # another session.
  1033. with self._socket_for_reads(
  1034. ReadPreference.PRIMARY_PREFERRED,
  1035. None) as (sock_info, secondary_ok):
  1036. if not sock_info.supports_sessions:
  1037. return
  1038. for i in range(0, len(session_ids), common._MAX_END_SESSIONS):
  1039. spec = SON([('endSessions',
  1040. session_ids[i:i + common._MAX_END_SESSIONS])])
  1041. sock_info.command(
  1042. 'admin', spec, secondary_ok, client=self)
  1043. except PyMongoError:
  1044. # Drivers MUST ignore any errors returned by the endSessions
  1045. # command.
  1046. pass
  1047. def close(self):
  1048. """Cleanup client resources and disconnect from MongoDB.
  1049. On MongoDB >= 3.6, end all server sessions created by this client by
  1050. sending one or more endSessions commands.
  1051. Close all sockets in the connection pools and stop the monitor threads.
  1052. If this instance is used again it will be automatically re-opened and
  1053. the threads restarted unless auto encryption is enabled. A client
  1054. enabled with auto encryption cannot be used again after being closed;
  1055. any attempt will raise :exc:`~.errors.InvalidOperation`.
  1056. .. versionchanged:: 3.6
  1057. End all server sessions created by this client.
  1058. """
  1059. session_ids = self._topology.pop_all_sessions()
  1060. if session_ids:
  1061. self._end_sessions(session_ids)
  1062. # Stop the periodic task thread and then send pending killCursor
  1063. # requests before closing the topology.
  1064. self._kill_cursors_executor.close()
  1065. self._process_kill_cursors()
  1066. self._topology.close()
  1067. if self._encrypter:
  1068. # TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened.
  1069. self._encrypter.close()
  1070. def set_cursor_manager(self, manager_class):
  1071. """DEPRECATED - Set this client's cursor manager.
  1072. Raises :class:`TypeError` if `manager_class` is not a subclass of
  1073. :class:`~pymongo.cursor_manager.CursorManager`. A cursor manager
  1074. handles closing cursors. Different managers can implement different
  1075. policies in terms of when to actually kill a cursor that has
  1076. been closed.
  1077. :Parameters:
  1078. - `manager_class`: cursor manager to use
  1079. .. versionchanged:: 3.3
  1080. Deprecated, for real this time.
  1081. .. versionchanged:: 3.0
  1082. Undeprecated.
  1083. """
  1084. warnings.warn(
  1085. "set_cursor_manager is Deprecated",
  1086. DeprecationWarning,
  1087. stacklevel=2)
  1088. manager = manager_class(self)
  1089. if not isinstance(manager, CursorManager):
  1090. raise TypeError("manager_class must be a subclass of "
  1091. "CursorManager")
  1092. self.__cursor_manager = manager
  1093. def _get_topology(self):
  1094. """Get the internal :class:`~pymongo.topology.Topology` object.
  1095. If this client was created with "connect=False", calling _get_topology
  1096. launches the connection process in the background.
  1097. """
  1098. self._topology.open()
  1099. with self.__lock:
  1100. self._kill_cursors_executor.open()
  1101. return self._topology
  1102. @contextlib.contextmanager
  1103. def _get_socket(self, server, session):
  1104. in_txn = session and session.in_transaction
  1105. with _MongoClientErrorHandler(self, server, session) as err_handler:
  1106. # Reuse the pinned connection, if it exists.
  1107. if in_txn and session._pinned_connection:
  1108. yield session._pinned_connection
  1109. return
  1110. with server.get_socket(
  1111. self.__all_credentials, handler=err_handler) as sock_info:
  1112. # Pin this session to the selected server or connection.
  1113. if (in_txn and server.description.server_type in (
  1114. SERVER_TYPE.Mongos, SERVER_TYPE.LoadBalancer)):
  1115. session._pin(server, sock_info)
  1116. err_handler.contribute_socket(sock_info)
  1117. if (self._encrypter and
  1118. not self._encrypter._bypass_auto_encryption and
  1119. sock_info.max_wire_version < 8):
  1120. raise ConfigurationError(
  1121. 'Auto-encryption requires a minimum MongoDB version '
  1122. 'of 4.2')
  1123. yield sock_info
  1124. def _select_server(self, server_selector, session, address=None):
  1125. """Select a server to run an operation on this client.
  1126. :Parameters:
  1127. - `server_selector`: The server selector to use if the session is
  1128. not pinned and no address is given.
  1129. - `session`: The ClientSession for the next operation, or None. May
  1130. be pinned to a mongos server address.
  1131. - `address` (optional): Address when sending a message
  1132. to a specific server, used for getMore.
  1133. """
  1134. try:
  1135. topology = self._get_topology()
  1136. if session and not session.in_transaction:
  1137. session._transaction.reset()
  1138. address = address or (session and session._pinned_address)
  1139. if address:
  1140. # We're running a getMore or this session is pinned to a mongos.
  1141. server = topology.select_server_by_address(address)
  1142. if not server:
  1143. raise AutoReconnect('server %s:%d no longer available'
  1144. % address)
  1145. else:
  1146. server = topology.select_server(server_selector)
  1147. return server
  1148. except PyMongoError as exc:
  1149. # Server selection errors in a transaction are transient.
  1150. if session and session.in_transaction:
  1151. exc._add_error_label("TransientTransactionError")
  1152. session._unpin()
  1153. raise
  1154. def _socket_for_writes(self, session):
  1155. server = self._select_server(writable_server_selector, session)
  1156. return self._get_socket(server, session)
  1157. @contextlib.contextmanager
  1158. def _secondaryok_for_server(self, read_preference, server, session):
  1159. assert read_preference is not None, "read_preference must not be None"
  1160. # Get a socket for a server matching the read preference, and yield
  1161. # sock_info, secondary_ok. Server Selection Spec: "secondaryOK must be
  1162. # sent to mongods with topology type Single. If the server type is
  1163. # Mongos, follow the rules for passing read preference to mongos, even
  1164. # for topology type Single."
  1165. # Thread safe: if the type is single it cannot change.
  1166. topology = self._get_topology()
  1167. single = topology.description.topology_type == TOPOLOGY_TYPE.Single
  1168. with self._get_socket(server, session) as sock_info:
  1169. secondary_ok = (single and not sock_info.is_mongos) or (
  1170. read_preference != ReadPreference.PRIMARY)
  1171. yield sock_info, secondary_ok
  1172. @contextlib.contextmanager
  1173. def _socket_for_reads(self, read_preference, session):
  1174. assert read_preference is not None, "read_preference must not be None"
  1175. # Get a socket for a server matching the read preference, and yield
  1176. # sock_info, secondary_ok. Server Selection Spec: "secondaryOK must be
  1177. # sent to mongods with topology type Single. If the server type is
  1178. # Mongos, follow the rules for passing read preference to mongos, even
  1179. # for topology type Single."
  1180. # Thread safe: if the type is single it cannot change.
  1181. topology = self._get_topology()
  1182. single = topology.description.topology_type == TOPOLOGY_TYPE.Single
  1183. server = self._select_server(read_preference, session)
  1184. with self._get_socket(server, session) as sock_info:
  1185. secondary_ok = (single and not sock_info.is_mongos) or (
  1186. read_preference != ReadPreference.PRIMARY)
  1187. yield sock_info, secondary_ok
  1188. def _should_pin_cursor(self, session):
  1189. return (self.__options.load_balanced and
  1190. not (session and session.in_transaction))
  1191. def _run_operation(self, operation, unpack_res, address=None):
  1192. """Run a _Query/_GetMore operation and return a Response.
  1193. :Parameters:
  1194. - `operation`: a _Query or _GetMore object.
  1195. - `unpack_res`: A callable that decodes the wire protocol response.
  1196. - `address` (optional): Optional address when sending a message
  1197. to a specific server, used for getMore.
  1198. """
  1199. if operation.sock_mgr:
  1200. server = self._select_server(
  1201. operation.read_preference, operation.session, address=address)
  1202. with operation.sock_mgr.lock:
  1203. with _MongoClientErrorHandler(
  1204. self, server, operation.session) as err_handler:
  1205. err_handler.contribute_socket(operation.sock_mgr.sock)
  1206. return server.run_operation(
  1207. operation.sock_mgr.sock, operation, True,
  1208. self._event_listeners, unpack_res)
  1209. def _cmd(session, server, sock_info, secondary_ok):
  1210. return server.run_operation(
  1211. sock_info, operation, secondary_ok, self._event_listeners,
  1212. unpack_res)
  1213. return self._retryable_read(
  1214. _cmd, operation.read_preference, operation.session,
  1215. address=address, retryable=isinstance(operation, message._Query))
  1216. def _retry_with_session(self, retryable, func, session, bulk):
  1217. """Execute an operation with at most one consecutive retries
  1218. Returns func()'s return value on success. On error retries the same
  1219. command once.
  1220. Re-raises any exception thrown by func().
  1221. """
  1222. retryable = (retryable and self.retry_writes
  1223. and session and not session.in_transaction)
  1224. return self._retry_internal(retryable, func, session, bulk)
  1225. def _retry_internal(self, retryable, func, session, bulk):
  1226. """Internal retryable write helper."""
  1227. max_wire_version = 0
  1228. last_error = None
  1229. retrying = False
  1230. def is_retrying():
  1231. return bulk.retrying if bulk else retrying
  1232. # Increment the transaction id up front to ensure any retry attempt
  1233. # will use the proper txnNumber, even if server or socket selection
  1234. # fails before the command can be sent.
  1235. if retryable and session and not session.in_transaction:
  1236. session._start_retryable_write()
  1237. if bulk:
  1238. bulk.started_retryable_write = True
  1239. while True:
  1240. try:
  1241. server = self._select_server(writable_server_selector, session)
  1242. supports_session = (
  1243. session is not None and
  1244. server.description.retryable_writes_supported)
  1245. with self._get_socket(server, session) as sock_info:
  1246. max_wire_version = sock_info.max_wire_version
  1247. if retryable and not supports_session:
  1248. if is_retrying():
  1249. # A retry is not possible because this server does
  1250. # not support sessions raise the last error.
  1251. raise last_error
  1252. retryable = False
  1253. return func(session, sock_info, retryable)
  1254. except ServerSelectionTimeoutError:
  1255. if is_retrying():
  1256. # The application may think the write was never attempted
  1257. # if we raise ServerSelectionTimeoutError on the retry
  1258. # attempt. Raise the original exception instead.
  1259. raise last_error
  1260. # A ServerSelectionTimeoutError error indicates that there may
  1261. # be a persistent outage. Attempting to retry in this case will
  1262. # most likely be a waste of time.
  1263. raise
  1264. except PyMongoError as exc:
  1265. if not retryable:
  1266. raise
  1267. # Add the RetryableWriteError label, if applicable.
  1268. _add_retryable_write_error(exc, max_wire_version)
  1269. retryable_error = exc.has_error_label("RetryableWriteError")
  1270. if retryable_error:
  1271. session._unpin()
  1272. if is_retrying() or not retryable_error:
  1273. raise
  1274. if bulk:
  1275. bulk.retrying = True
  1276. else:
  1277. retrying = True
  1278. last_error = exc
  1279. def _retryable_read(self, func, read_pref, session, address=None,
  1280. retryable=True):
  1281. """Execute an operation with at most one consecutive retries
  1282. Returns func()'s return value on success. On error retries the same
  1283. command once.
  1284. Re-raises any exception thrown by func().
  1285. """
  1286. retryable = (retryable and
  1287. self.retry_reads
  1288. and not (session and session.in_transaction))
  1289. last_error = None
  1290. retrying = False
  1291. while True:
  1292. try:
  1293. server = self._select_server(
  1294. read_pref, session, address=address)
  1295. if not server.description.retryable_reads_supported:
  1296. retryable = False
  1297. with self._secondaryok_for_server(
  1298. read_pref, server, session) as (
  1299. sock_info, secondary_ok):
  1300. if retrying and not retryable:
  1301. # A retry is not possible because this server does
  1302. # not support retryable reads, raise the last error.
  1303. raise last_error
  1304. return func(session, server, sock_info, secondary_ok)
  1305. except ServerSelectionTimeoutError:
  1306. if retrying:
  1307. # The application may think the write was never attempted
  1308. # if we raise ServerSelectionTimeoutError on the retry
  1309. # attempt. Raise the original exception instead.
  1310. raise last_error
  1311. # A ServerSelectionTimeoutError error indicates that there may
  1312. # be a persistent outage. Attempting to retry in this case will
  1313. # most likely be a waste of time.
  1314. raise
  1315. except ConnectionFailure as exc:
  1316. if not retryable or retrying:
  1317. raise
  1318. retrying = True
  1319. last_error = exc
  1320. except OperationFailure as exc:
  1321. if not retryable or retrying:
  1322. raise
  1323. if exc.code not in helpers._RETRYABLE_ERROR_CODES:
  1324. raise
  1325. retrying = True
  1326. last_error = exc
  1327. def _retryable_write(self, retryable, func, session):
  1328. """Internal retryable write helper."""
  1329. with self._tmp_session(session) as s:
  1330. return self._retry_with_session(retryable, func, s, None)
  1331. def _handle_getlasterror(self, address, error_msg):
  1332. """Clear our pool for a server, mark it Unknown, and check it soon."""
  1333. self._topology.handle_getlasterror(address, error_msg)
  1334. def __eq__(self, other):
  1335. if isinstance(other, self.__class__):
  1336. return self.address == other.address
  1337. return NotImplemented
  1338. def __ne__(self, other):
  1339. return not self == other
  1340. def __hash__(self):
  1341. return hash(self.address)
  1342. def _repr_helper(self):
  1343. def option_repr(option, value):
  1344. """Fix options whose __repr__ isn't usable in a constructor."""
  1345. if option == 'document_class':
  1346. if value is dict:
  1347. return 'document_class=dict'
  1348. else:
  1349. return 'document_class=%s.%s' % (value.__module__,
  1350. value.__name__)
  1351. if option in common.TIMEOUT_OPTIONS and value is not None:
  1352. return "%s=%s" % (option, int(value * 1000))
  1353. return '%s=%r' % (option, value)
  1354. # Host first...
  1355. options = ['host=%r' % [
  1356. '%s:%d' % (host, port) if port is not None else host
  1357. for host, port in self._topology_settings.seeds]]
  1358. # ... then everything in self._constructor_args...
  1359. options.extend(
  1360. option_repr(key, self.__options._options[key])
  1361. for key in self._constructor_args)
  1362. # ... then everything else.
  1363. options.extend(
  1364. option_repr(key, self.__options._options[key])
  1365. for key in self.__options._options
  1366. if key not in set(self._constructor_args)
  1367. and key != 'username' and key != 'password')
  1368. return ', '.join(options)
  1369. def __repr__(self):
  1370. return ("MongoClient(%s)" % (self._repr_helper(),))
  1371. def __getattr__(self, name):
  1372. """Get a database by name.
  1373. Raises :class:`~pymongo.errors.InvalidName` if an invalid
  1374. database name is used.
  1375. :Parameters:
  1376. - `name`: the name of the database to get
  1377. """
  1378. if name.startswith('_'):
  1379. raise AttributeError(
  1380. "MongoClient has no attribute %r. To access the %s"
  1381. " database, use client[%r]." % (name, name, name))
  1382. return self.__getitem__(name)
  1383. def __getitem__(self, name):
  1384. """Get a database by name.
  1385. Raises :class:`~pymongo.errors.InvalidName` if an invalid
  1386. database name is used.
  1387. :Parameters:
  1388. - `name`: the name of the database to get
  1389. """
  1390. return database.Database(self, name)
  1391. def close_cursor(self, cursor_id, address=None):
  1392. """DEPRECATED - Send a kill cursors message soon with the given id.
  1393. Raises :class:`TypeError` if `cursor_id` is not an instance of
  1394. ``(int, long)``. What closing the cursor actually means
  1395. depends on this client's cursor manager.
  1396. This method may be called from a :class:`~pymongo.cursor.Cursor`
  1397. destructor during garbage collection, so it isn't safe to take a
  1398. lock or do network I/O. Instead, we schedule the cursor to be closed
  1399. soon on a background thread.
  1400. :Parameters:
  1401. - `cursor_id`: id of cursor to close
  1402. - `address` (optional): (host, port) pair of the cursor's server.
  1403. If it is not provided, the client attempts to close the cursor on
  1404. the primary or standalone, or a mongos server.
  1405. .. versionchanged:: 3.7
  1406. Deprecated.
  1407. .. versionchanged:: 3.0
  1408. Added ``address`` parameter.
  1409. """
  1410. warnings.warn(
  1411. "close_cursor is deprecated.",
  1412. DeprecationWarning,
  1413. stacklevel=2)
  1414. if not isinstance(cursor_id, integer_types):
  1415. raise TypeError("cursor_id must be an instance of (int, long)")
  1416. self._close_cursor_soon(cursor_id, address)
  1417. def _cleanup_cursor(self, locks_allowed, cursor_id, address, sock_mgr,
  1418. session, explicit_session):
  1419. """Cleanup a cursor from cursor.close() or __del__.
  1420. This method handles cleanup for Cursors/CommandCursors including any
  1421. pinned connection or implicit session attached at the time the cursor
  1422. was closed or garbage collected.
  1423. :Parameters:
  1424. - `locks_allowed`: True if we are allowed to acquire locks.
  1425. - `cursor_id`: The cursor id which may be 0.
  1426. - `address`: The _CursorAddress.
  1427. - `sock_mgr`: The _SocketManager for the pinned connection or None.
  1428. - `session`: The cursor's session.
  1429. - `explicit_session`: True if the session was passed explicitly.
  1430. """
  1431. if locks_allowed:
  1432. if cursor_id:
  1433. if sock_mgr and sock_mgr.more_to_come:
  1434. # If this is an exhaust cursor and we haven't completely
  1435. # exhausted the result set we *must* close the socket
  1436. # to stop the server from sending more data.
  1437. sock_mgr.sock.close_socket(
  1438. ConnectionClosedReason.ERROR)
  1439. else:
  1440. self._close_cursor_now(
  1441. cursor_id, address, session=session,
  1442. sock_mgr=sock_mgr)
  1443. if sock_mgr:
  1444. sock_mgr.close()
  1445. else:
  1446. # The cursor will be closed later in a different session.
  1447. if cursor_id or sock_mgr:
  1448. self._close_cursor_soon(cursor_id, address, sock_mgr)
  1449. if session and not explicit_session:
  1450. session._end_session(lock=locks_allowed)
  1451. def _close_cursor_soon(self, cursor_id, address, sock_mgr=None):
  1452. """Request that a cursor and/or connection be cleaned up soon
  1453. What closing the cursor actually means depends on this client's
  1454. cursor manager. If there is none, the cursor is closed asynchronously
  1455. on a background thread.
  1456. """
  1457. if self.__cursor_manager is not None:
  1458. self.__cursor_manager.close(cursor_id, address)
  1459. else:
  1460. self.__kill_cursors_queue.append((address, cursor_id, sock_mgr))
  1461. def _close_cursor_now(self, cursor_id, address=None, session=None,
  1462. sock_mgr=None):
  1463. """Send a kill cursors message with the given id.
  1464. What closing the cursor actually means depends on this client's
  1465. cursor manager. If there is none, the cursor is closed synchronously
  1466. on the current thread.
  1467. """
  1468. if not isinstance(cursor_id, integer_types):
  1469. raise TypeError("cursor_id must be an instance of (int, long)")
  1470. if self.__cursor_manager is not None:
  1471. self.__cursor_manager.close(cursor_id, address)
  1472. else:
  1473. try:
  1474. if sock_mgr:
  1475. with sock_mgr.lock:
  1476. # Cursor is pinned to LB outside of a transaction.
  1477. self._kill_cursor_impl(
  1478. [cursor_id], address, session, sock_mgr.sock)
  1479. else:
  1480. self._kill_cursors(
  1481. [cursor_id], address, self._get_topology(), session)
  1482. except PyMongoError:
  1483. # Make another attempt to kill the cursor later.
  1484. self._close_cursor_soon(cursor_id, address)
  1485. def kill_cursors(self, cursor_ids, address=None):
  1486. """DEPRECATED - Send a kill cursors message soon with the given ids.
  1487. Raises :class:`TypeError` if `cursor_ids` is not an instance of
  1488. ``list``.
  1489. :Parameters:
  1490. - `cursor_ids`: list of cursor ids to kill
  1491. - `address` (optional): (host, port) pair of the cursor's server.
  1492. If it is not provided, the client attempts to close the cursor on
  1493. the primary or standalone, or a mongos server.
  1494. .. versionchanged:: 3.3
  1495. Deprecated.
  1496. .. versionchanged:: 3.0
  1497. Now accepts an `address` argument. Schedules the cursors to be
  1498. closed on a background thread instead of sending the message
  1499. immediately.
  1500. """
  1501. warnings.warn(
  1502. "kill_cursors is deprecated.",
  1503. DeprecationWarning,
  1504. stacklevel=2)
  1505. if not isinstance(cursor_ids, list):
  1506. raise TypeError("cursor_ids must be a list")
  1507. # "Atomic", needs no lock.
  1508. for cursor_id in cursor_ids:
  1509. self.__kill_cursors_queue.append((address, cursor_id, None))
  1510. def _kill_cursors(self, cursor_ids, address, topology, session):
  1511. """Send a kill cursors message with the given ids."""
  1512. if address:
  1513. # address could be a tuple or _CursorAddress, but
  1514. # select_server_by_address needs (host, port).
  1515. server = topology.select_server_by_address(tuple(address))
  1516. else:
  1517. # Application called close_cursor() with no address.
  1518. server = topology.select_server(writable_server_selector)
  1519. with self._get_socket(server, session) as sock_info:
  1520. self._kill_cursor_impl(cursor_ids, address, session, sock_info)
  1521. def _kill_cursor_impl(self, cursor_ids, address, session, sock_info):
  1522. listeners = self._event_listeners
  1523. publish = listeners.enabled_for_commands
  1524. try:
  1525. namespace = address.namespace
  1526. db, coll = namespace.split('.', 1)
  1527. except AttributeError:
  1528. namespace = None
  1529. db = coll = "OP_KILL_CURSORS"
  1530. spec = SON([('killCursors', coll), ('cursors', cursor_ids)])
  1531. if sock_info.max_wire_version >= 4 and namespace is not None:
  1532. sock_info.command(db, spec, session=session, client=self)
  1533. else:
  1534. if publish:
  1535. start = datetime.datetime.now()
  1536. request_id, msg = message.kill_cursors(cursor_ids)
  1537. if publish:
  1538. duration = datetime.datetime.now() - start
  1539. # Here and below, address could be a tuple or
  1540. # _CursorAddress. We always want to publish a
  1541. # tuple to match the rest of the monitoring
  1542. # API.
  1543. listeners.publish_command_start(
  1544. spec, db, request_id, tuple(address),
  1545. service_id=sock_info.service_id)
  1546. start = datetime.datetime.now()
  1547. try:
  1548. sock_info.send_message(msg, 0)
  1549. except Exception as exc:
  1550. if publish:
  1551. dur = ((datetime.datetime.now() - start) + duration)
  1552. listeners.publish_command_failure(
  1553. dur, message._convert_exception(exc),
  1554. 'killCursors', request_id,
  1555. tuple(address), service_id=sock_info.service_id)
  1556. raise
  1557. if publish:
  1558. duration = ((datetime.datetime.now() - start) + duration)
  1559. # OP_KILL_CURSORS returns no reply, fake one.
  1560. reply = {'cursorsUnknown': cursor_ids, 'ok': 1}
  1561. listeners.publish_command_success(
  1562. duration, reply, 'killCursors', request_id,
  1563. tuple(address), service_id=sock_info.service_id)
  1564. def _process_kill_cursors(self):
  1565. """Process any pending kill cursors requests."""
  1566. address_to_cursor_ids = defaultdict(list)
  1567. pinned_cursors = []
  1568. # Other threads or the GC may append to the queue concurrently.
  1569. while True:
  1570. try:
  1571. address, cursor_id, sock_mgr = self.__kill_cursors_queue.pop()
  1572. except IndexError:
  1573. break
  1574. if sock_mgr:
  1575. pinned_cursors.append((address, cursor_id, sock_mgr))
  1576. else:
  1577. address_to_cursor_ids[address].append(cursor_id)
  1578. for address, cursor_id, sock_mgr in pinned_cursors:
  1579. try:
  1580. self._cleanup_cursor(True, cursor_id, address, sock_mgr,
  1581. None, False)
  1582. except Exception:
  1583. helpers._handle_exception()
  1584. # Don't re-open topology if it's closed and there's no pending cursors.
  1585. if address_to_cursor_ids:
  1586. topology = self._get_topology()
  1587. for address, cursor_ids in address_to_cursor_ids.items():
  1588. try:
  1589. self._kill_cursors(
  1590. cursor_ids, address, topology, session=None)
  1591. except Exception:
  1592. helpers._handle_exception()
  1593. # This method is run periodically by a background thread.
  1594. def _process_periodic_tasks(self):
  1595. """Process any pending kill cursors requests and
  1596. maintain connection pool parameters."""
  1597. self._process_kill_cursors()
  1598. try:
  1599. self._topology.update_pool(self.__all_credentials)
  1600. except Exception:
  1601. helpers._handle_exception()
  1602. def __start_session(self, implicit, **kwargs):
  1603. # Driver Sessions Spec: "If startSession is called when multiple users
  1604. # are authenticated drivers MUST raise an error with the error message
  1605. # 'Cannot call startSession when multiple users are authenticated.'"
  1606. authset = set(self.__all_credentials.values())
  1607. if len(authset) > 1:
  1608. raise InvalidOperation("Cannot call start_session when"
  1609. " multiple users are authenticated")
  1610. # Raises ConfigurationError if sessions are not supported.
  1611. server_session = self._get_server_session()
  1612. opts = client_session.SessionOptions(**kwargs)
  1613. return client_session.ClientSession(
  1614. self, server_session, opts, authset, implicit)
  1615. def start_session(self,
  1616. causal_consistency=None,
  1617. default_transaction_options=None,
  1618. snapshot=False):
  1619. """Start a logical session.
  1620. This method takes the same parameters as
  1621. :class:`~pymongo.client_session.SessionOptions`. See the
  1622. :mod:`~pymongo.client_session` module for details and examples.
  1623. Requires MongoDB 3.6. It is an error to call :meth:`start_session`
  1624. if this client has been authenticated to multiple databases using the
  1625. deprecated method :meth:`~pymongo.database.Database.authenticate`.
  1626. A :class:`~pymongo.client_session.ClientSession` may only be used with
  1627. the MongoClient that started it. :class:`ClientSession` instances are
  1628. **not thread-safe or fork-safe**. They can only be used by one thread
  1629. or process at a time. A single :class:`ClientSession` cannot be used
  1630. to run multiple operations concurrently.
  1631. :Returns:
  1632. An instance of :class:`~pymongo.client_session.ClientSession`.
  1633. .. versionadded:: 3.6
  1634. """
  1635. return self.__start_session(
  1636. False,
  1637. causal_consistency=causal_consistency,
  1638. default_transaction_options=default_transaction_options,
  1639. snapshot=snapshot)
  1640. def _get_server_session(self):
  1641. """Internal: start or resume a _ServerSession."""
  1642. return self._topology.get_server_session()
  1643. def _return_server_session(self, server_session, lock):
  1644. """Internal: return a _ServerSession to the pool."""
  1645. return self._topology.return_server_session(server_session, lock)
  1646. def _ensure_session(self, session=None):
  1647. """If provided session is None, lend a temporary session."""
  1648. if session:
  1649. return session
  1650. try:
  1651. # Don't make implicit sessions causally consistent. Applications
  1652. # should always opt-in.
  1653. return self.__start_session(True, causal_consistency=False)
  1654. except (ConfigurationError, InvalidOperation):
  1655. # Sessions not supported, or multiple users authenticated.
  1656. return None
  1657. @contextlib.contextmanager
  1658. def _tmp_session(self, session, close=True):
  1659. """If provided session is None, lend a temporary session."""
  1660. if session:
  1661. # Don't call end_session.
  1662. yield session
  1663. return
  1664. s = self._ensure_session(session)
  1665. if s:
  1666. try:
  1667. yield s
  1668. except Exception as exc:
  1669. if isinstance(exc, ConnectionFailure):
  1670. s._server_session.mark_dirty()
  1671. # Always call end_session on error.
  1672. s.end_session()
  1673. raise
  1674. finally:
  1675. # Call end_session when we exit this scope.
  1676. if close:
  1677. s.end_session()
  1678. else:
  1679. yield None
  1680. def _send_cluster_time(self, command, session):
  1681. topology_time = self._topology.max_cluster_time()
  1682. session_time = session.cluster_time if session else None
  1683. if topology_time and session_time:
  1684. if topology_time['clusterTime'] > session_time['clusterTime']:
  1685. cluster_time = topology_time
  1686. else:
  1687. cluster_time = session_time
  1688. else:
  1689. cluster_time = topology_time or session_time
  1690. if cluster_time:
  1691. command['$clusterTime'] = cluster_time
  1692. def _process_response(self, reply, session):
  1693. self._topology.receive_cluster_time(reply.get('$clusterTime'))
  1694. if session is not None:
  1695. session._process_response(reply)
  1696. def server_info(self, session=None):
  1697. """Get information about the MongoDB server we're connected to.
  1698. :Parameters:
  1699. - `session` (optional): a
  1700. :class:`~pymongo.client_session.ClientSession`.
  1701. .. versionchanged:: 3.6
  1702. Added ``session`` parameter.
  1703. """
  1704. return self.admin.command("buildinfo",
  1705. read_preference=ReadPreference.PRIMARY,
  1706. session=session)
  1707. def list_databases(self, session=None, **kwargs):
  1708. """Get a cursor over the databases of the connected server.
  1709. :Parameters:
  1710. - `session` (optional): a
  1711. :class:`~pymongo.client_session.ClientSession`.
  1712. - `**kwargs` (optional): Optional parameters of the
  1713. `listDatabases command
  1714. <https://docs.mongodb.com/manual/reference/command/listDatabases/>`_
  1715. can be passed as keyword arguments to this method. The supported
  1716. options differ by server version.
  1717. :Returns:
  1718. An instance of :class:`~pymongo.command_cursor.CommandCursor`.
  1719. .. versionadded:: 3.6
  1720. """
  1721. cmd = SON([("listDatabases", 1)])
  1722. cmd.update(kwargs)
  1723. admin = self._database_default_options("admin")
  1724. res = admin._retryable_read_command(cmd, session=session)
  1725. # listDatabases doesn't return a cursor (yet). Fake one.
  1726. cursor = {
  1727. "id": 0,
  1728. "firstBatch": res["databases"],
  1729. "ns": "admin.$cmd",
  1730. }
  1731. return CommandCursor(admin["$cmd"], cursor, None)
  1732. def list_database_names(self, session=None):
  1733. """Get a list of the names of all databases on the connected server.
  1734. :Parameters:
  1735. - `session` (optional): a
  1736. :class:`~pymongo.client_session.ClientSession`.
  1737. .. versionadded:: 3.6
  1738. """
  1739. return [doc["name"]
  1740. for doc in self.list_databases(session, nameOnly=True)]
  1741. def database_names(self, session=None):
  1742. """**DEPRECATED**: Get a list of the names of all databases on the
  1743. connected server.
  1744. :Parameters:
  1745. - `session` (optional): a
  1746. :class:`~pymongo.client_session.ClientSession`.
  1747. .. versionchanged:: 3.7
  1748. Deprecated. Use :meth:`list_database_names` instead.
  1749. .. versionchanged:: 3.6
  1750. Added ``session`` parameter.
  1751. """
  1752. warnings.warn("database_names is deprecated. Use list_database_names "
  1753. "instead.", DeprecationWarning, stacklevel=2)
  1754. return self.list_database_names(session)
  1755. def drop_database(self, name_or_database, session=None):
  1756. """Drop a database.
  1757. Raises :class:`TypeError` if `name_or_database` is not an instance of
  1758. :class:`basestring` (:class:`str` in python 3) or
  1759. :class:`~pymongo.database.Database`.
  1760. :Parameters:
  1761. - `name_or_database`: the name of a database to drop, or a
  1762. :class:`~pymongo.database.Database` instance representing the
  1763. database to drop
  1764. - `session` (optional): a
  1765. :class:`~pymongo.client_session.ClientSession`.
  1766. .. versionchanged:: 3.6
  1767. Added ``session`` parameter.
  1768. .. note:: The :attr:`~pymongo.mongo_client.MongoClient.write_concern` of
  1769. this client is automatically applied to this operation when using
  1770. MongoDB >= 3.4.
  1771. .. versionchanged:: 3.4
  1772. Apply this client's write concern automatically to this operation
  1773. when connected to MongoDB >= 3.4.
  1774. """
  1775. name = name_or_database
  1776. if isinstance(name, database.Database):
  1777. name = name.name
  1778. if not isinstance(name, string_type):
  1779. raise TypeError("name_or_database must be an instance "
  1780. "of %s or a Database" % (string_type.__name__,))
  1781. self._purge_index(name)
  1782. with self._socket_for_writes(session) as sock_info:
  1783. self[name]._command(
  1784. sock_info,
  1785. "dropDatabase",
  1786. read_preference=ReadPreference.PRIMARY,
  1787. write_concern=self._write_concern_for(session),
  1788. parse_write_concern_error=True,
  1789. session=session)
  1790. def get_default_database(self, default=None, codec_options=None,
  1791. read_preference=None, write_concern=None, read_concern=None):
  1792. """Get the database named in the MongoDB connection URI.
  1793. >>> uri = 'mongodb://host/my_database'
  1794. >>> client = MongoClient(uri)
  1795. >>> db = client.get_default_database()
  1796. >>> assert db.name == 'my_database'
  1797. >>> db = client.get_database()
  1798. >>> assert db.name == 'my_database'
  1799. Useful in scripts where you want to choose which database to use
  1800. based only on the URI in a configuration file.
  1801. :Parameters:
  1802. - `default` (optional): the database name to use if no database name
  1803. was provided in the URI.
  1804. - `codec_options` (optional): An instance of
  1805. :class:`~bson.codec_options.CodecOptions`. If ``None`` (the
  1806. default) the :attr:`codec_options` of this :class:`MongoClient` is
  1807. used.
  1808. - `read_preference` (optional): The read preference to use. If
  1809. ``None`` (the default) the :attr:`read_preference` of this
  1810. :class:`MongoClient` is used. See :mod:`~pymongo.read_preferences`
  1811. for options.
  1812. - `write_concern` (optional): An instance of
  1813. :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
  1814. default) the :attr:`write_concern` of this :class:`MongoClient` is
  1815. used.
  1816. - `read_concern` (optional): An instance of
  1817. :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
  1818. default) the :attr:`read_concern` of this :class:`MongoClient` is
  1819. used.
  1820. .. versionchanged:: 3.8
  1821. Undeprecated. Added the ``default``, ``codec_options``,
  1822. ``read_preference``, ``write_concern`` and ``read_concern``
  1823. parameters.
  1824. .. versionchanged:: 3.5
  1825. Deprecated, use :meth:`get_database` instead.
  1826. """
  1827. if self.__default_database_name is None and default is None:
  1828. raise ConfigurationError(
  1829. 'No default database name defined or provided.')
  1830. return database.Database(
  1831. self, self.__default_database_name or default, codec_options,
  1832. read_preference, write_concern, read_concern)
  1833. def get_database(self, name=None, codec_options=None, read_preference=None,
  1834. write_concern=None, read_concern=None):
  1835. """Get a :class:`~pymongo.database.Database` with the given name and
  1836. options.
  1837. Useful for creating a :class:`~pymongo.database.Database` with
  1838. different codec options, read preference, and/or write concern from
  1839. this :class:`MongoClient`.
  1840. >>> client.read_preference
  1841. Primary()
  1842. >>> db1 = client.test
  1843. >>> db1.read_preference
  1844. Primary()
  1845. >>> from pymongo import ReadPreference
  1846. >>> db2 = client.get_database(
  1847. ... 'test', read_preference=ReadPreference.SECONDARY)
  1848. >>> db2.read_preference
  1849. Secondary(tag_sets=None)
  1850. :Parameters:
  1851. - `name` (optional): The name of the database - a string. If ``None``
  1852. (the default) the database named in the MongoDB connection URI is
  1853. returned.
  1854. - `codec_options` (optional): An instance of
  1855. :class:`~bson.codec_options.CodecOptions`. If ``None`` (the
  1856. default) the :attr:`codec_options` of this :class:`MongoClient` is
  1857. used.
  1858. - `read_preference` (optional): The read preference to use. If
  1859. ``None`` (the default) the :attr:`read_preference` of this
  1860. :class:`MongoClient` is used. See :mod:`~pymongo.read_preferences`
  1861. for options.
  1862. - `write_concern` (optional): An instance of
  1863. :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
  1864. default) the :attr:`write_concern` of this :class:`MongoClient` is
  1865. used.
  1866. - `read_concern` (optional): An instance of
  1867. :class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
  1868. default) the :attr:`read_concern` of this :class:`MongoClient` is
  1869. used.
  1870. .. versionchanged:: 3.5
  1871. The `name` parameter is now optional, defaulting to the database
  1872. named in the MongoDB connection URI.
  1873. """
  1874. if name is None:
  1875. if self.__default_database_name is None:
  1876. raise ConfigurationError('No default database defined')
  1877. name = self.__default_database_name
  1878. return database.Database(
  1879. self, name, codec_options, read_preference,
  1880. write_concern, read_concern)
  1881. def _database_default_options(self, name):
  1882. """Get a Database instance with the default settings."""
  1883. return self.get_database(
  1884. name, codec_options=DEFAULT_CODEC_OPTIONS,
  1885. read_preference=ReadPreference.PRIMARY,
  1886. write_concern=DEFAULT_WRITE_CONCERN)
  1887. @property
  1888. def is_locked(self):
  1889. """**DEPRECATED**: Is this server locked? While locked, all write
  1890. operations are blocked, although read operations may still be allowed.
  1891. Use :meth:`unlock` to unlock.
  1892. Deprecated. Users of MongoDB version 3.2 or newer can run the
  1893. `currentOp command`_ directly with
  1894. :meth:`~pymongo.database.Database.command`::
  1895. is_locked = client.admin.command('currentOp').get('fsyncLock')
  1896. Users of MongoDB version 2.6 and 3.0 can query the "inprog" virtual
  1897. collection::
  1898. is_locked = client.admin["$cmd.sys.inprog"].find_one().get('fsyncLock')
  1899. .. versionchanged:: 3.11
  1900. Deprecated.
  1901. .. _currentOp command: https://docs.mongodb.com/manual/reference/command/currentOp/
  1902. """
  1903. warnings.warn("is_locked is deprecated. See the documentation for "
  1904. "more information.", DeprecationWarning, stacklevel=2)
  1905. ops = self._database_default_options('admin')._current_op()
  1906. return bool(ops.get('fsyncLock', 0))
  1907. def fsync(self, **kwargs):
  1908. """**DEPRECATED**: Flush all pending writes to datafiles.
  1909. Optional parameters can be passed as keyword arguments:
  1910. - `lock`: If True lock the server to disallow writes.
  1911. - `async`: If True don't block while synchronizing.
  1912. - `session` (optional): a
  1913. :class:`~pymongo.client_session.ClientSession`.
  1914. .. note:: Starting with Python 3.7 `async` is a reserved keyword.
  1915. The async option to the fsync command can be passed using a
  1916. dictionary instead::
  1917. options = {'async': True}
  1918. client.fsync(**options)
  1919. Deprecated. Run the `fsync command`_ directly with
  1920. :meth:`~pymongo.database.Database.command` instead. For example::
  1921. client.admin.command('fsync', lock=True)
  1922. .. versionchanged:: 3.11
  1923. Deprecated.
  1924. .. versionchanged:: 3.6
  1925. Added ``session`` parameter.
  1926. .. warning:: `async` and `lock` can not be used together.
  1927. .. warning:: MongoDB does not support the `async` option
  1928. on Windows and will raise an exception on that
  1929. platform.
  1930. .. _fsync command: https://docs.mongodb.com/manual/reference/command/fsync/
  1931. """
  1932. warnings.warn("fsync is deprecated. Use "
  1933. "client.admin.command('fsync') instead.",
  1934. DeprecationWarning, stacklevel=2)
  1935. self.admin.command("fsync",
  1936. read_preference=ReadPreference.PRIMARY, **kwargs)
  1937. def unlock(self, session=None):
  1938. """**DEPRECATED**: Unlock a previously locked server.
  1939. :Parameters:
  1940. - `session` (optional): a
  1941. :class:`~pymongo.client_session.ClientSession`.
  1942. Deprecated. Users of MongoDB version 3.2 or newer can run the
  1943. `fsyncUnlock command`_ directly with
  1944. :meth:`~pymongo.database.Database.command`::
  1945. client.admin.command('fsyncUnlock')
  1946. Users of MongoDB version 2.6 and 3.0 can query the "unlock" virtual
  1947. collection::
  1948. client.admin["$cmd.sys.unlock"].find_one()
  1949. .. versionchanged:: 3.11
  1950. Deprecated.
  1951. .. versionchanged:: 3.6
  1952. Added ``session`` parameter.
  1953. .. _fsyncUnlock command: https://docs.mongodb.com/manual/reference/command/fsyncUnlock/
  1954. """
  1955. warnings.warn("unlock is deprecated. Use "
  1956. "client.admin.command('fsyncUnlock') instead. For "
  1957. "MongoDB 2.6 and 3.0, see the documentation for "
  1958. "more information.",
  1959. DeprecationWarning, stacklevel=2)
  1960. cmd = SON([("fsyncUnlock", 1)])
  1961. with self._socket_for_writes(session) as sock_info:
  1962. if sock_info.max_wire_version >= 4:
  1963. try:
  1964. with self._tmp_session(session) as s:
  1965. sock_info.command(
  1966. "admin", cmd, session=s, client=self)
  1967. except OperationFailure as exc:
  1968. # Ignore "DB not locked" to replicate old behavior
  1969. if exc.code != 125:
  1970. raise
  1971. else:
  1972. message._first_batch(sock_info, "admin", "$cmd.sys.unlock",
  1973. {}, -1, True, self.codec_options,
  1974. ReadPreference.PRIMARY, cmd,
  1975. self._event_listeners)
  1976. def __enter__(self):
  1977. return self
  1978. def __exit__(self, exc_type, exc_val, exc_tb):
  1979. self.close()
  1980. def __iter__(self):
  1981. return self
  1982. def __next__(self):
  1983. raise TypeError("'MongoClient' object is not iterable")
  1984. next = __next__
  1985. def _retryable_error_doc(exc):
  1986. """Return the server response from PyMongo exception or None."""
  1987. if isinstance(exc, BulkWriteError):
  1988. # Check the last writeConcernError to determine if this
  1989. # BulkWriteError is retryable.
  1990. wces = exc.details['writeConcernErrors']
  1991. wce = wces[-1] if wces else None
  1992. return wce
  1993. if isinstance(exc, (NotPrimaryError, OperationFailure)):
  1994. return exc.details
  1995. return None
  1996. def _add_retryable_write_error(exc, max_wire_version):
  1997. doc = _retryable_error_doc(exc)
  1998. if doc:
  1999. code = doc.get('code', 0)
  2000. # retryWrites on MMAPv1 should raise an actionable error.
  2001. if (code == 20 and
  2002. str(exc).startswith("Transaction numbers")):
  2003. errmsg = (
  2004. "This MongoDB deployment does not support "
  2005. "retryable writes. Please add retryWrites=false "
  2006. "to your connection string.")
  2007. raise OperationFailure(errmsg, code, exc.details)
  2008. if max_wire_version >= 9:
  2009. # In MongoDB 4.4+, the server reports the error labels.
  2010. for label in doc.get('errorLabels', []):
  2011. exc._add_error_label(label)
  2012. else:
  2013. if code in helpers._RETRYABLE_ERROR_CODES:
  2014. exc._add_error_label("RetryableWriteError")
  2015. # Connection errors are always retryable except NotPrimaryError which is
  2016. # handled above.
  2017. if (isinstance(exc, ConnectionFailure) and
  2018. not isinstance(exc, NotPrimaryError)):
  2019. exc._add_error_label("RetryableWriteError")
  2020. class _MongoClientErrorHandler(object):
  2021. """Handle errors raised when executing an operation."""
  2022. __slots__ = ('client', 'server_address', 'session', 'max_wire_version',
  2023. 'sock_generation', 'completed_handshake', 'service_id',
  2024. 'handled')
  2025. def __init__(self, client, server, session):
  2026. self.client = client
  2027. self.server_address = server.description.address
  2028. self.session = session
  2029. self.max_wire_version = common.MIN_WIRE_VERSION
  2030. # XXX: When get_socket fails, this generation could be out of date:
  2031. # "Note that when a network error occurs before the handshake
  2032. # completes then the error's generation number is the generation
  2033. # of the pool at the time the connection attempt was started."
  2034. self.sock_generation = server.pool.gen.get_overall()
  2035. self.completed_handshake = False
  2036. self.service_id = None
  2037. self.handled = False
  2038. def contribute_socket(self, sock_info):
  2039. """Provide socket information to the error handler."""
  2040. self.max_wire_version = sock_info.max_wire_version
  2041. self.sock_generation = sock_info.generation
  2042. self.service_id = sock_info.service_id
  2043. self.completed_handshake = True
  2044. def handle(self, exc_type, exc_val):
  2045. if self.handled or exc_type is None:
  2046. return
  2047. self.handled = True
  2048. if self.session:
  2049. if issubclass(exc_type, ConnectionFailure):
  2050. if self.session.in_transaction:
  2051. exc_val._add_error_label("TransientTransactionError")
  2052. self.session._server_session.mark_dirty()
  2053. if issubclass(exc_type, PyMongoError):
  2054. if (exc_val.has_error_label("TransientTransactionError") or
  2055. exc_val.has_error_label("RetryableWriteError")):
  2056. self.session._unpin()
  2057. err_ctx = _ErrorContext(
  2058. exc_val, self.max_wire_version, self.sock_generation,
  2059. self.completed_handshake, self.service_id)
  2060. self.client._topology.handle_error(self.server_address, err_ctx)
  2061. def __enter__(self):
  2062. return self
  2063. def __exit__(self, exc_type, exc_val, exc_tb):
  2064. return self.handle(exc_type, exc_val)