pool.py 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524
  1. # Copyright 2011-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. import contextlib
  15. import copy
  16. import os
  17. import platform
  18. import socket
  19. import sys
  20. import threading
  21. import collections
  22. import weakref
  23. from pymongo.ssl_support import (
  24. SSLError as _SSLError,
  25. HAS_SNI as _HAVE_SNI,
  26. IPADDR_SAFE as _IPADDR_SAFE)
  27. from bson import DEFAULT_CODEC_OPTIONS
  28. from bson.py3compat import imap, itervalues, _unicode
  29. from bson.son import SON
  30. from pymongo import auth, helpers, thread_util, __version__
  31. from pymongo.client_session import _validate_session_write_concern
  32. from pymongo.common import (MAX_BSON_SIZE,
  33. MAX_IDLE_TIME_SEC,
  34. MAX_MESSAGE_SIZE,
  35. MAX_POOL_SIZE,
  36. MAX_WIRE_VERSION,
  37. MAX_WRITE_BATCH_SIZE,
  38. MIN_POOL_SIZE,
  39. ORDERED_TYPES,
  40. WAIT_QUEUE_TIMEOUT)
  41. from pymongo.errors import (AutoReconnect,
  42. CertificateError,
  43. ConnectionFailure,
  44. ConfigurationError,
  45. InvalidOperation,
  46. DocumentTooLarge,
  47. NetworkTimeout,
  48. NotPrimaryError,
  49. OperationFailure,
  50. PyMongoError)
  51. from pymongo.hello_compat import HelloCompat
  52. from pymongo._ipaddress import is_ip_address
  53. from pymongo.ismaster import IsMaster
  54. from pymongo.monotonic import time as _time
  55. from pymongo.monitoring import (ConnectionCheckOutFailedReason,
  56. ConnectionClosedReason)
  57. from pymongo.network import (command,
  58. receive_message)
  59. from pymongo.read_preferences import ReadPreference
  60. from pymongo.server_api import _add_to_command
  61. from pymongo.server_type import SERVER_TYPE
  62. from pymongo.socket_checker import SocketChecker
  63. # Always use our backport so we always have support for IP address matching
  64. from pymongo.ssl_match_hostname import match_hostname
  65. try:
  66. from fcntl import fcntl, F_GETFD, F_SETFD, FD_CLOEXEC
  67. def _set_non_inheritable_non_atomic(fd):
  68. """Set the close-on-exec flag on the given file descriptor."""
  69. flags = fcntl(fd, F_GETFD)
  70. fcntl(fd, F_SETFD, flags | FD_CLOEXEC)
  71. except ImportError:
  72. # Windows, various platforms we don't claim to support
  73. # (Jython, IronPython, ...), systems that don't provide
  74. # everything we need from fcntl, etc.
  75. def _set_non_inheritable_non_atomic(dummy):
  76. """Dummy function for platforms that don't provide fcntl."""
  77. pass
  78. _MAX_TCP_KEEPIDLE = 120
  79. _MAX_TCP_KEEPINTVL = 10
  80. _MAX_TCP_KEEPCNT = 9
  81. if sys.platform == 'win32':
  82. try:
  83. import _winreg as winreg
  84. except ImportError:
  85. import winreg
  86. def _query(key, name, default):
  87. try:
  88. value, _ = winreg.QueryValueEx(key, name)
  89. # Ensure the value is a number or raise ValueError.
  90. return int(value)
  91. except (OSError, ValueError):
  92. # QueryValueEx raises OSError when the key does not exist (i.e.
  93. # the system is using the Windows default value).
  94. return default
  95. try:
  96. with winreg.OpenKey(
  97. winreg.HKEY_LOCAL_MACHINE,
  98. r"SYSTEM\CurrentControlSet\Services\Tcpip\Parameters") as key:
  99. _WINDOWS_TCP_IDLE_MS = _query(key, "KeepAliveTime", 7200000)
  100. _WINDOWS_TCP_INTERVAL_MS = _query(key, "KeepAliveInterval", 1000)
  101. except OSError:
  102. # We could not check the default values because winreg.OpenKey failed.
  103. # Assume the system is using the default values.
  104. _WINDOWS_TCP_IDLE_MS = 7200000
  105. _WINDOWS_TCP_INTERVAL_MS = 1000
  106. def _set_keepalive_times(sock):
  107. idle_ms = min(_WINDOWS_TCP_IDLE_MS, _MAX_TCP_KEEPIDLE * 1000)
  108. interval_ms = min(_WINDOWS_TCP_INTERVAL_MS,
  109. _MAX_TCP_KEEPINTVL * 1000)
  110. if (idle_ms < _WINDOWS_TCP_IDLE_MS or
  111. interval_ms < _WINDOWS_TCP_INTERVAL_MS):
  112. sock.ioctl(socket.SIO_KEEPALIVE_VALS,
  113. (1, idle_ms, interval_ms))
  114. else:
  115. def _set_tcp_option(sock, tcp_option, max_value):
  116. if hasattr(socket, tcp_option):
  117. sockopt = getattr(socket, tcp_option)
  118. try:
  119. # PYTHON-1350 - NetBSD doesn't implement getsockopt for
  120. # TCP_KEEPIDLE and friends. Don't attempt to set the
  121. # values there.
  122. default = sock.getsockopt(socket.IPPROTO_TCP, sockopt)
  123. if default > max_value:
  124. sock.setsockopt(socket.IPPROTO_TCP, sockopt, max_value)
  125. except socket.error:
  126. pass
  127. def _set_keepalive_times(sock):
  128. _set_tcp_option(sock, 'TCP_KEEPIDLE', _MAX_TCP_KEEPIDLE)
  129. _set_tcp_option(sock, 'TCP_KEEPINTVL', _MAX_TCP_KEEPINTVL)
  130. _set_tcp_option(sock, 'TCP_KEEPCNT', _MAX_TCP_KEEPCNT)
  131. _METADATA = SON([
  132. ('driver', SON([('name', 'PyMongo'), ('version', __version__)])),
  133. ])
  134. if sys.platform.startswith('linux'):
  135. # platform.linux_distribution was deprecated in Python 3.5.
  136. if sys.version_info[:2] < (3, 5):
  137. # Distro name and version (e.g. Ubuntu 16.04 xenial)
  138. _name = ' '.join([part for part in
  139. platform.linux_distribution() if part])
  140. else:
  141. _name = platform.system()
  142. _METADATA['os'] = SON([
  143. ('type', platform.system()),
  144. ('name', _name),
  145. ('architecture', platform.machine()),
  146. # Kernel version (e.g. 4.4.0-17-generic).
  147. ('version', platform.release())
  148. ])
  149. elif sys.platform == 'darwin':
  150. _METADATA['os'] = SON([
  151. ('type', platform.system()),
  152. ('name', platform.system()),
  153. ('architecture', platform.machine()),
  154. # (mac|i|tv)OS(X) version (e.g. 10.11.6) instead of darwin
  155. # kernel version.
  156. ('version', platform.mac_ver()[0])
  157. ])
  158. elif sys.platform == 'win32':
  159. _METADATA['os'] = SON([
  160. ('type', platform.system()),
  161. # "Windows XP", "Windows 7", "Windows 10", etc.
  162. ('name', ' '.join((platform.system(), platform.release()))),
  163. ('architecture', platform.machine()),
  164. # Windows patch level (e.g. 5.1.2600-SP3)
  165. ('version', '-'.join(platform.win32_ver()[1:3]))
  166. ])
  167. elif sys.platform.startswith('java'):
  168. _name, _ver, _arch = platform.java_ver()[-1]
  169. _METADATA['os'] = SON([
  170. # Linux, Windows 7, Mac OS X, etc.
  171. ('type', _name),
  172. ('name', _name),
  173. # x86, x86_64, AMD64, etc.
  174. ('architecture', _arch),
  175. # Linux kernel version, OSX version, etc.
  176. ('version', _ver)
  177. ])
  178. else:
  179. # Get potential alias (e.g. SunOS 5.11 becomes Solaris 2.11)
  180. _aliased = platform.system_alias(
  181. platform.system(), platform.release(), platform.version())
  182. _METADATA['os'] = SON([
  183. ('type', platform.system()),
  184. ('name', ' '.join([part for part in _aliased[:2] if part])),
  185. ('architecture', platform.machine()),
  186. ('version', _aliased[2])
  187. ])
  188. if platform.python_implementation().startswith('PyPy'):
  189. _METADATA['platform'] = ' '.join(
  190. (platform.python_implementation(),
  191. '.'.join(imap(str, sys.pypy_version_info)),
  192. '(Python %s)' % '.'.join(imap(str, sys.version_info))))
  193. elif sys.platform.startswith('java'):
  194. _METADATA['platform'] = ' '.join(
  195. (platform.python_implementation(),
  196. '.'.join(imap(str, sys.version_info)),
  197. '(%s)' % ' '.join((platform.system(), platform.release()))))
  198. else:
  199. _METADATA['platform'] = ' '.join(
  200. (platform.python_implementation(),
  201. '.'.join(imap(str, sys.version_info))))
  202. # If the first getaddrinfo call of this interpreter's life is on a thread,
  203. # while the main thread holds the import lock, getaddrinfo deadlocks trying
  204. # to import the IDNA codec. Import it here, where presumably we're on the
  205. # main thread, to avoid the deadlock. See PYTHON-607.
  206. u'foo'.encode('idna')
  207. # Remove after PYTHON-2712
  208. _MOCK_SERVICE_ID = False
  209. def _raise_connection_failure(address, error, msg_prefix=None):
  210. """Convert a socket.error to ConnectionFailure and raise it."""
  211. host, port = address
  212. # If connecting to a Unix socket, port will be None.
  213. if port is not None:
  214. msg = '%s:%d: %s' % (host, port, error)
  215. else:
  216. msg = '%s: %s' % (host, error)
  217. if msg_prefix:
  218. msg = msg_prefix + msg
  219. if isinstance(error, socket.timeout):
  220. raise NetworkTimeout(msg)
  221. elif isinstance(error, _SSLError) and 'timed out' in str(error):
  222. # CPython 2.7 and PyPy 2.x do not distinguish network
  223. # timeouts from other SSLErrors (https://bugs.python.org/issue10272).
  224. # Luckily, we can work around this limitation because the phrase
  225. # 'timed out' appears in all the timeout related SSLErrors raised
  226. # on the above platforms.
  227. raise NetworkTimeout(msg)
  228. else:
  229. raise AutoReconnect(msg)
  230. class PoolOptions(object):
  231. __slots__ = ('__max_pool_size', '__min_pool_size',
  232. '__max_idle_time_seconds',
  233. '__connect_timeout', '__socket_timeout',
  234. '__wait_queue_timeout', '__wait_queue_multiple',
  235. '__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
  236. '__event_listeners', '__appname', '__driver', '__metadata',
  237. '__compression_settings', '__server_api', '__load_balanced')
  238. def __init__(self, max_pool_size=MAX_POOL_SIZE,
  239. min_pool_size=MIN_POOL_SIZE,
  240. max_idle_time_seconds=MAX_IDLE_TIME_SEC, connect_timeout=None,
  241. socket_timeout=None, wait_queue_timeout=WAIT_QUEUE_TIMEOUT,
  242. wait_queue_multiple=None, ssl_context=None,
  243. ssl_match_hostname=True, socket_keepalive=True,
  244. event_listeners=None, appname=None, driver=None,
  245. compression_settings=None, server_api=None,
  246. load_balanced=None):
  247. self.__max_pool_size = max_pool_size
  248. self.__min_pool_size = min_pool_size
  249. self.__max_idle_time_seconds = max_idle_time_seconds
  250. self.__connect_timeout = connect_timeout
  251. self.__socket_timeout = socket_timeout
  252. self.__wait_queue_timeout = wait_queue_timeout
  253. self.__wait_queue_multiple = wait_queue_multiple
  254. self.__ssl_context = ssl_context
  255. self.__ssl_match_hostname = ssl_match_hostname
  256. self.__socket_keepalive = socket_keepalive
  257. self.__event_listeners = event_listeners
  258. self.__appname = appname
  259. self.__driver = driver
  260. self.__compression_settings = compression_settings
  261. self.__server_api = server_api
  262. self.__load_balanced = load_balanced
  263. self.__metadata = copy.deepcopy(_METADATA)
  264. if appname:
  265. self.__metadata['application'] = {'name': appname}
  266. # Combine the "driver" MongoClient option with PyMongo's info, like:
  267. # {
  268. # 'driver': {
  269. # 'name': 'PyMongo|MyDriver',
  270. # 'version': '3.7.0|1.2.3',
  271. # },
  272. # 'platform': 'CPython 3.6.0|MyPlatform'
  273. # }
  274. if driver:
  275. if driver.name:
  276. self.__metadata['driver']['name'] = "%s|%s" % (
  277. _METADATA['driver']['name'], driver.name)
  278. if driver.version:
  279. self.__metadata['driver']['version'] = "%s|%s" % (
  280. _METADATA['driver']['version'], driver.version)
  281. if driver.platform:
  282. self.__metadata['platform'] = "%s|%s" % (
  283. _METADATA['platform'], driver.platform)
  284. @property
  285. def non_default_options(self):
  286. """The non-default options this pool was created with.
  287. Added for CMAP's :class:`PoolCreatedEvent`.
  288. """
  289. opts = {}
  290. if self.__max_pool_size != MAX_POOL_SIZE:
  291. opts['maxPoolSize'] = self.__max_pool_size
  292. if self.__min_pool_size != MIN_POOL_SIZE:
  293. opts['minPoolSize'] = self.__min_pool_size
  294. if self.__max_idle_time_seconds != MAX_IDLE_TIME_SEC:
  295. opts['maxIdleTimeMS'] = self.__max_idle_time_seconds * 1000
  296. if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT:
  297. opts['waitQueueTimeoutMS'] = self.__wait_queue_timeout * 1000
  298. return opts
  299. @property
  300. def max_pool_size(self):
  301. """The maximum allowable number of concurrent connections to each
  302. connected server. Requests to a server will block if there are
  303. `maxPoolSize` outstanding connections to the requested server.
  304. Defaults to 100. Cannot be 0.
  305. When a server's pool has reached `max_pool_size`, operations for that
  306. server block waiting for a socket to be returned to the pool. If
  307. ``waitQueueTimeoutMS`` is set, a blocked operation will raise
  308. :exc:`~pymongo.errors.ConnectionFailure` after a timeout.
  309. By default ``waitQueueTimeoutMS`` is not set.
  310. """
  311. return self.__max_pool_size
  312. @property
  313. def min_pool_size(self):
  314. """The minimum required number of concurrent connections that the pool
  315. will maintain to each connected server. Default is 0.
  316. """
  317. return self.__min_pool_size
  318. @property
  319. def max_idle_time_seconds(self):
  320. """The maximum number of seconds that a connection can remain
  321. idle in the pool before being removed and replaced. Defaults to
  322. `None` (no limit).
  323. """
  324. return self.__max_idle_time_seconds
  325. @property
  326. def connect_timeout(self):
  327. """How long a connection can take to be opened before timing out.
  328. """
  329. return self.__connect_timeout
  330. @property
  331. def socket_timeout(self):
  332. """How long a send or receive on a socket can take before timing out.
  333. """
  334. return self.__socket_timeout
  335. @property
  336. def wait_queue_timeout(self):
  337. """How long a thread will wait for a socket from the pool if the pool
  338. has no free sockets.
  339. """
  340. return self.__wait_queue_timeout
  341. @property
  342. def wait_queue_multiple(self):
  343. """Multiplied by max_pool_size to give the number of threads allowed
  344. to wait for a socket at one time.
  345. """
  346. return self.__wait_queue_multiple
  347. @property
  348. def ssl_context(self):
  349. """An SSLContext instance or None.
  350. """
  351. return self.__ssl_context
  352. @property
  353. def ssl_match_hostname(self):
  354. """Call ssl.match_hostname if cert_reqs is not ssl.CERT_NONE.
  355. """
  356. return self.__ssl_match_hostname
  357. @property
  358. def socket_keepalive(self):
  359. """Whether to send periodic messages to determine if a connection
  360. is closed.
  361. """
  362. return self.__socket_keepalive
  363. @property
  364. def event_listeners(self):
  365. """An instance of pymongo.monitoring._EventListeners.
  366. """
  367. return self.__event_listeners
  368. @property
  369. def appname(self):
  370. """The application name, for sending with hello in server handshake.
  371. """
  372. return self.__appname
  373. @property
  374. def driver(self):
  375. """Driver name and version, for sending with hello in handshake.
  376. """
  377. return self.__driver
  378. @property
  379. def compression_settings(self):
  380. return self.__compression_settings
  381. @property
  382. def metadata(self):
  383. """A dict of metadata about the application, driver, os, and platform.
  384. """
  385. return self.__metadata.copy()
  386. @property
  387. def server_api(self):
  388. """A pymongo.server_api.ServerApi or None.
  389. """
  390. return self.__server_api
  391. @property
  392. def load_balanced(self):
  393. """True if this Pool is configured in load balanced mode.
  394. """
  395. return self.__load_balanced
  396. def _negotiate_creds(all_credentials):
  397. """Return one credential that needs mechanism negotiation, if any.
  398. """
  399. if all_credentials:
  400. for creds in all_credentials.values():
  401. if creds.mechanism == 'DEFAULT' and creds.username:
  402. return creds
  403. return None
  404. def _speculative_context(all_credentials):
  405. """Return the _AuthContext to use for speculative auth, if any.
  406. """
  407. if all_credentials and len(all_credentials) == 1:
  408. creds = next(itervalues(all_credentials))
  409. return auth._AuthContext.from_credentials(creds)
  410. return None
  411. class _CancellationContext(object):
  412. def __init__(self):
  413. self._cancelled = False
  414. def cancel(self):
  415. """Cancel this context."""
  416. self._cancelled = True
  417. @property
  418. def cancelled(self):
  419. """Was cancel called?"""
  420. return self._cancelled
  421. class SocketInfo(object):
  422. """Store a socket with some metadata.
  423. :Parameters:
  424. - `sock`: a raw socket object
  425. - `pool`: a Pool instance
  426. - `address`: the server's (host, port)
  427. - `id`: the id of this socket in it's pool
  428. """
  429. def __init__(self, sock, pool, address, id):
  430. self.pool_ref = weakref.ref(pool)
  431. self.sock = sock
  432. self.address = address
  433. self.id = id
  434. self.authset = set()
  435. self.closed = False
  436. self.last_checkin_time = _time()
  437. self.performed_handshake = False
  438. self.is_writable = False
  439. self.max_wire_version = MAX_WIRE_VERSION
  440. self.max_bson_size = MAX_BSON_SIZE
  441. self.max_message_size = MAX_MESSAGE_SIZE
  442. self.max_write_batch_size = MAX_WRITE_BATCH_SIZE
  443. self.supports_sessions = False
  444. self.hello_ok = None
  445. self.is_mongos = False
  446. self.op_msg_enabled = False
  447. self.listeners = pool.opts.event_listeners
  448. self.enabled_for_cmap = pool.enabled_for_cmap
  449. self.compression_settings = pool.opts.compression_settings
  450. self.compression_context = None
  451. self.socket_checker = SocketChecker()
  452. # Support for mechanism negotiation on the initial handshake.
  453. # Maps credential to saslSupportedMechs.
  454. self.negotiated_mechanisms = {}
  455. self.auth_ctx = {}
  456. # The pool's generation changes with each reset() so we can close
  457. # sockets created before the last reset.
  458. self.pool_gen = pool.gen
  459. self.generation = self.pool_gen.get_overall()
  460. self.ready = False
  461. self.cancel_context = None
  462. if not pool.handshake:
  463. # This is a Monitor connection.
  464. self.cancel_context = _CancellationContext()
  465. self.opts = pool.opts
  466. self.more_to_come = False
  467. # For load balancer support.
  468. self.service_id = None
  469. # When executing a transaction in load balancing mode, this flag is
  470. # set to true to indicate that the session now owns the connection.
  471. self.pinned_txn = False
  472. self.pinned_cursor = False
  473. self.active = False
  474. def pin_txn(self):
  475. self.pinned_txn = True
  476. assert not self.pinned_cursor
  477. def pin_cursor(self):
  478. self.pinned_cursor = True
  479. assert not self.pinned_txn
  480. def unpin(self):
  481. pool = self.pool_ref()
  482. if pool:
  483. pool.return_socket(self)
  484. else:
  485. self.close_socket(ConnectionClosedReason.STALE)
  486. def hello_cmd(self):
  487. if self.opts.server_api or self.hello_ok:
  488. return SON([(HelloCompat.CMD, 1)])
  489. else:
  490. return SON([(HelloCompat.LEGACY_CMD, 1), ('helloOk', True)])
  491. def hello(self, all_credentials=None):
  492. return self._hello(None, None, None, all_credentials)
  493. def _hello(self, cluster_time, topology_version,
  494. heartbeat_frequency, all_credentials):
  495. cmd = self.hello_cmd()
  496. performing_handshake = not self.performed_handshake
  497. awaitable = False
  498. if performing_handshake:
  499. self.performed_handshake = True
  500. cmd['client'] = self.opts.metadata
  501. if self.compression_settings:
  502. cmd['compression'] = self.compression_settings.compressors
  503. if self.opts.load_balanced:
  504. cmd['loadBalanced'] = True
  505. elif topology_version is not None:
  506. cmd['topologyVersion'] = topology_version
  507. cmd['maxAwaitTimeMS'] = int(heartbeat_frequency*1000)
  508. awaitable = True
  509. # If connect_timeout is None there is no timeout.
  510. if self.opts.connect_timeout:
  511. self.sock.settimeout(
  512. self.opts.connect_timeout + heartbeat_frequency)
  513. if self.max_wire_version >= 6 and cluster_time is not None:
  514. cmd['$clusterTime'] = cluster_time
  515. # XXX: Simplify in PyMongo 4.0 when all_credentials is always a single
  516. # unchangeable value per MongoClient.
  517. creds = _negotiate_creds(all_credentials)
  518. if creds:
  519. cmd['saslSupportedMechs'] = creds.source + '.' + creds.username
  520. auth_ctx = _speculative_context(all_credentials)
  521. if auth_ctx:
  522. cmd['speculativeAuthenticate'] = auth_ctx.speculate_command()
  523. doc = self.command('admin', cmd, publish_events=False,
  524. exhaust_allowed=awaitable)
  525. # PYTHON-2712 will remove this topologyVersion fallback logic.
  526. if self.opts.load_balanced and _MOCK_SERVICE_ID:
  527. process_id = doc.get('topologyVersion', {}).get('processId')
  528. doc.setdefault('serviceId', process_id)
  529. if not self.opts.load_balanced:
  530. doc.pop('serviceId', None)
  531. hello = IsMaster(doc, awaitable=awaitable)
  532. self.is_writable = hello.is_writable
  533. self.max_wire_version = hello.max_wire_version
  534. self.max_bson_size = hello.max_bson_size
  535. self.max_message_size = hello.max_message_size
  536. self.max_write_batch_size = hello.max_write_batch_size
  537. self.supports_sessions = (
  538. hello.logical_session_timeout_minutes is not None)
  539. self.hello_ok = hello.hello_ok
  540. self.is_mongos = hello.server_type == SERVER_TYPE.Mongos
  541. if performing_handshake and self.compression_settings:
  542. ctx = self.compression_settings.get_compression_context(
  543. hello.compressors)
  544. self.compression_context = ctx
  545. self.op_msg_enabled = hello.max_wire_version >= 6
  546. if creds:
  547. self.negotiated_mechanisms[creds] = hello.sasl_supported_mechs
  548. if auth_ctx:
  549. auth_ctx.parse_response(hello)
  550. if auth_ctx.speculate_succeeded():
  551. self.auth_ctx[auth_ctx.credentials] = auth_ctx
  552. if self.opts.load_balanced:
  553. if not hello.service_id:
  554. raise ConfigurationError(
  555. 'Driver attempted to initialize in load balancing mode,'
  556. ' but the server does not support this mode')
  557. self.service_id = hello.service_id
  558. self.generation = self.pool_gen.get(self.service_id)
  559. return hello
  560. def _next_reply(self):
  561. reply = self.receive_message(None)
  562. self.more_to_come = reply.more_to_come
  563. unpacked_docs = reply.unpack_response()
  564. response_doc = unpacked_docs[0]
  565. helpers._check_command_response(response_doc, self.max_wire_version)
  566. # Remove after PYTHON-2712.
  567. if not self.opts.load_balanced:
  568. response_doc.pop('serviceId', None)
  569. return response_doc
  570. def command(self, dbname, spec, secondary_ok=False,
  571. read_preference=ReadPreference.PRIMARY,
  572. codec_options=DEFAULT_CODEC_OPTIONS, check=True,
  573. allowable_errors=None, check_keys=False,
  574. read_concern=None,
  575. write_concern=None,
  576. parse_write_concern_error=False,
  577. collation=None,
  578. session=None,
  579. client=None,
  580. retryable_write=False,
  581. publish_events=True,
  582. user_fields=None,
  583. exhaust_allowed=False):
  584. """Execute a command or raise an error.
  585. :Parameters:
  586. - `dbname`: name of the database on which to run the command
  587. - `spec`: a command document as a dict, SON, or mapping object
  588. - `secondary_ok`: whether to set the secondaryOkay wire protocol bit
  589. - `read_preference`: a read preference
  590. - `codec_options`: a CodecOptions instance
  591. - `check`: raise OperationFailure if there are errors
  592. - `allowable_errors`: errors to ignore if `check` is True
  593. - `check_keys`: if True, check `spec` for invalid keys
  594. - `read_concern`: The read concern for this command.
  595. - `write_concern`: The write concern for this command.
  596. - `parse_write_concern_error`: Whether to parse the
  597. ``writeConcernError`` field in the command response.
  598. - `collation`: The collation for this command.
  599. - `session`: optional ClientSession instance.
  600. - `client`: optional MongoClient for gossipping $clusterTime.
  601. - `retryable_write`: True if this command is a retryable write.
  602. - `publish_events`: Should we publish events for this command?
  603. - `user_fields` (optional): Response fields that should be decoded
  604. using the TypeDecoders from codec_options, passed to
  605. bson._decode_all_selective.
  606. """
  607. self.validate_session(client, session)
  608. session = _validate_session_write_concern(session, write_concern)
  609. # Ensure command name remains in first place.
  610. if not isinstance(spec, ORDERED_TYPES):
  611. spec = SON(spec)
  612. if (read_concern and self.max_wire_version < 4
  613. and not read_concern.ok_for_legacy):
  614. raise ConfigurationError(
  615. 'read concern level of %s is not valid '
  616. 'with a max wire version of %d.'
  617. % (read_concern.level, self.max_wire_version))
  618. if not (write_concern is None or write_concern.acknowledged or
  619. collation is None):
  620. raise ConfigurationError(
  621. 'Collation is unsupported for unacknowledged writes.')
  622. if (self.max_wire_version >= 5 and
  623. write_concern and
  624. not write_concern.is_server_default):
  625. spec['writeConcern'] = write_concern.document
  626. elif self.max_wire_version < 5 and collation is not None:
  627. raise ConfigurationError(
  628. 'Must be connected to MongoDB 3.4+ to use a collation.')
  629. self.add_server_api(spec)
  630. if session:
  631. session._apply_to(spec, retryable_write, read_preference,
  632. self)
  633. self.send_cluster_time(spec, session, client)
  634. listeners = self.listeners if publish_events else None
  635. unacknowledged = write_concern and not write_concern.acknowledged
  636. if self.op_msg_enabled:
  637. self._raise_if_not_writable(unacknowledged)
  638. try:
  639. return command(self, dbname, spec, secondary_ok,
  640. self.is_mongos, read_preference, codec_options,
  641. session, client, check, allowable_errors,
  642. self.address, check_keys, listeners,
  643. self.max_bson_size, read_concern,
  644. parse_write_concern_error=parse_write_concern_error,
  645. collation=collation,
  646. compression_ctx=self.compression_context,
  647. use_op_msg=self.op_msg_enabled,
  648. unacknowledged=unacknowledged,
  649. user_fields=user_fields,
  650. exhaust_allowed=exhaust_allowed)
  651. except (OperationFailure, NotPrimaryError):
  652. raise
  653. # Catch socket.error, KeyboardInterrupt, etc. and close ourselves.
  654. except BaseException as error:
  655. self._raise_connection_failure(error)
  656. def send_message(self, message, max_doc_size):
  657. """Send a raw BSON message or raise ConnectionFailure.
  658. If a network exception is raised, the socket is closed.
  659. """
  660. if (self.max_bson_size is not None
  661. and max_doc_size > self.max_bson_size):
  662. raise DocumentTooLarge(
  663. "BSON document too large (%d bytes) - the connected server "
  664. "supports BSON document sizes up to %d bytes." %
  665. (max_doc_size, self.max_bson_size))
  666. try:
  667. self.sock.sendall(message)
  668. except BaseException as error:
  669. self._raise_connection_failure(error)
  670. def receive_message(self, request_id):
  671. """Receive a raw BSON message or raise ConnectionFailure.
  672. If any exception is raised, the socket is closed.
  673. """
  674. try:
  675. return receive_message(self, request_id, self.max_message_size)
  676. except BaseException as error:
  677. self._raise_connection_failure(error)
  678. def _raise_if_not_writable(self, unacknowledged):
  679. """Raise NotPrimaryError on unacknowledged write if this socket is not
  680. writable.
  681. """
  682. if unacknowledged and not self.is_writable:
  683. # Write won't succeed, bail as if we'd received a not primary error.
  684. raise NotPrimaryError("not primary", {
  685. "ok": 0, "errmsg": "not primary", "code": 10107})
  686. def legacy_write(self, request_id, msg, max_doc_size, with_last_error):
  687. """Send OP_INSERT, etc., optionally returning response as a dict.
  688. Can raise ConnectionFailure or OperationFailure.
  689. :Parameters:
  690. - `request_id`: an int.
  691. - `msg`: bytes, an OP_INSERT, OP_UPDATE, or OP_DELETE message,
  692. perhaps with a getlasterror command appended.
  693. - `max_doc_size`: size in bytes of the largest document in `msg`.
  694. - `with_last_error`: True if a getlasterror command is appended.
  695. """
  696. self._raise_if_not_writable(not with_last_error)
  697. self.send_message(msg, max_doc_size)
  698. if with_last_error:
  699. reply = self.receive_message(request_id)
  700. return helpers._check_gle_response(reply.command_response(),
  701. self.max_wire_version)
  702. def write_command(self, request_id, msg):
  703. """Send "insert" etc. command, returning response as a dict.
  704. Can raise ConnectionFailure or OperationFailure.
  705. :Parameters:
  706. - `request_id`: an int.
  707. - `msg`: bytes, the command message.
  708. """
  709. self.send_message(msg, 0)
  710. reply = self.receive_message(request_id)
  711. result = reply.command_response()
  712. # Raises NotPrimaryError or OperationFailure.
  713. helpers._check_command_response(result, self.max_wire_version)
  714. return result
  715. def check_auth(self, all_credentials):
  716. """Update this socket's authentication.
  717. Log in or out to bring this socket's credentials up to date with
  718. those provided. Can raise ConnectionFailure or OperationFailure.
  719. :Parameters:
  720. - `all_credentials`: dict, maps auth source to MongoCredential.
  721. """
  722. if all_credentials or self.authset:
  723. cached = set(itervalues(all_credentials))
  724. authset = self.authset.copy()
  725. # Logout any credentials that no longer exist in the cache.
  726. for credentials in authset - cached:
  727. auth.logout(credentials.source, self)
  728. self.authset.discard(credentials)
  729. for credentials in cached - authset:
  730. self.authenticate(credentials)
  731. # CMAP spec says to publish the ready event only after authenticating
  732. # the connection.
  733. if not self.ready:
  734. self.ready = True
  735. if self.enabled_for_cmap:
  736. self.listeners.publish_connection_ready(self.address, self.id)
  737. def authenticate(self, credentials):
  738. """Log in to the server and store these credentials in `authset`.
  739. Can raise ConnectionFailure or OperationFailure.
  740. :Parameters:
  741. - `credentials`: A MongoCredential.
  742. """
  743. auth.authenticate(credentials, self)
  744. self.authset.add(credentials)
  745. # negotiated_mechanisms are no longer needed.
  746. self.negotiated_mechanisms.pop(credentials, None)
  747. self.auth_ctx.pop(credentials, None)
  748. def validate_session(self, client, session):
  749. """Validate this session before use with client.
  750. Raises error if this session is logged in as a different user or
  751. the client is not the one that created the session.
  752. """
  753. if session:
  754. if session._client is not client:
  755. raise InvalidOperation(
  756. 'Can only use session with the MongoClient that'
  757. ' started it')
  758. if session._authset != self.authset:
  759. raise InvalidOperation(
  760. 'Cannot use session after authenticating with different'
  761. ' credentials')
  762. def close_socket(self, reason):
  763. """Close this connection with a reason."""
  764. if self.closed:
  765. return
  766. self._close_socket()
  767. if reason and self.enabled_for_cmap:
  768. self.listeners.publish_connection_closed(
  769. self.address, self.id, reason)
  770. def _close_socket(self):
  771. """Close this connection."""
  772. if self.closed:
  773. return
  774. self.closed = True
  775. if self.cancel_context:
  776. self.cancel_context.cancel()
  777. # Note: We catch exceptions to avoid spurious errors on interpreter
  778. # shutdown.
  779. try:
  780. self.sock.close()
  781. except Exception:
  782. pass
  783. def socket_closed(self):
  784. """Return True if we know socket has been closed, False otherwise."""
  785. return self.socket_checker.socket_closed(self.sock)
  786. def send_cluster_time(self, command, session, client):
  787. """Add cluster time for MongoDB >= 3.6."""
  788. if self.max_wire_version >= 6 and client:
  789. client._send_cluster_time(command, session)
  790. def add_server_api(self, command):
  791. """Add server_api parameters."""
  792. if self.opts.server_api:
  793. _add_to_command(command, self.opts.server_api)
  794. def update_last_checkin_time(self):
  795. self.last_checkin_time = _time()
  796. def update_is_writable(self, is_writable):
  797. self.is_writable = is_writable
  798. def idle_time_seconds(self):
  799. """Seconds since this socket was last checked into its pool."""
  800. return _time() - self.last_checkin_time
  801. def _raise_connection_failure(self, error):
  802. # Catch *all* exceptions from socket methods and close the socket. In
  803. # regular Python, socket operations only raise socket.error, even if
  804. # the underlying cause was a Ctrl-C: a signal raised during socket.recv
  805. # is expressed as an EINTR error from poll. See internal_select_ex() in
  806. # socketmodule.c. All error codes from poll become socket.error at
  807. # first. Eventually in PyEval_EvalFrameEx the interpreter checks for
  808. # signals and throws KeyboardInterrupt into the current frame on the
  809. # main thread.
  810. #
  811. # But in Gevent and Eventlet, the polling mechanism (epoll, kqueue,
  812. # ...) is called in Python code, which experiences the signal as a
  813. # KeyboardInterrupt from the start, rather than as an initial
  814. # socket.error, so we catch that, close the socket, and reraise it.
  815. #
  816. # The connection closed event will be emitted later in return_socket.
  817. if self.ready:
  818. reason = None
  819. else:
  820. reason = ConnectionClosedReason.ERROR
  821. self.close_socket(reason)
  822. # SSLError from PyOpenSSL inherits directly from Exception.
  823. if isinstance(error, (IOError, OSError, _SSLError)):
  824. _raise_connection_failure(self.address, error)
  825. else:
  826. raise
  827. def __eq__(self, other):
  828. return self.sock == other.sock
  829. def __ne__(self, other):
  830. return not self == other
  831. def __hash__(self):
  832. return hash(self.sock)
  833. def __repr__(self):
  834. return "SocketInfo(%s)%s at %s" % (
  835. repr(self.sock),
  836. self.closed and " CLOSED" or "",
  837. id(self)
  838. )
  839. def _create_connection(address, options):
  840. """Given (host, port) and PoolOptions, connect and return a socket object.
  841. Can raise socket.error.
  842. This is a modified version of create_connection from CPython >= 2.7.
  843. """
  844. host, port = address
  845. # Check if dealing with a unix domain socket
  846. if host.endswith('.sock'):
  847. if not hasattr(socket, "AF_UNIX"):
  848. raise ConnectionFailure("UNIX-sockets are not supported "
  849. "on this system")
  850. sock = socket.socket(socket.AF_UNIX)
  851. # SOCK_CLOEXEC not supported for Unix sockets.
  852. _set_non_inheritable_non_atomic(sock.fileno())
  853. try:
  854. sock.connect(host)
  855. return sock
  856. except socket.error:
  857. sock.close()
  858. raise
  859. # Don't try IPv6 if we don't support it. Also skip it if host
  860. # is 'localhost' (::1 is fine). Avoids slow connect issues
  861. # like PYTHON-356.
  862. family = socket.AF_INET
  863. if socket.has_ipv6 and host != 'localhost':
  864. family = socket.AF_UNSPEC
  865. err = None
  866. for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
  867. af, socktype, proto, dummy, sa = res
  868. # SOCK_CLOEXEC was new in CPython 3.2, and only available on a limited
  869. # number of platforms (newer Linux and *BSD). Starting with CPython 3.4
  870. # all file descriptors are created non-inheritable. See PEP 446.
  871. try:
  872. sock = socket.socket(
  873. af, socktype | getattr(socket, 'SOCK_CLOEXEC', 0), proto)
  874. except socket.error:
  875. # Can SOCK_CLOEXEC be defined even if the kernel doesn't support
  876. # it?
  877. sock = socket.socket(af, socktype, proto)
  878. # Fallback when SOCK_CLOEXEC isn't available.
  879. _set_non_inheritable_non_atomic(sock.fileno())
  880. try:
  881. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  882. sock.settimeout(options.connect_timeout)
  883. sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,
  884. options.socket_keepalive)
  885. if options.socket_keepalive:
  886. _set_keepalive_times(sock)
  887. sock.connect(sa)
  888. return sock
  889. except socket.error as e:
  890. err = e
  891. sock.close()
  892. if err is not None:
  893. raise err
  894. else:
  895. # This likely means we tried to connect to an IPv6 only
  896. # host with an OS/kernel or Python interpreter that doesn't
  897. # support IPv6. The test case is Jython2.5.1 which doesn't
  898. # support IPv6 at all.
  899. raise socket.error('getaddrinfo failed')
  900. def _configured_socket(address, options):
  901. """Given (host, port) and PoolOptions, return a configured socket.
  902. Can raise socket.error, ConnectionFailure, or CertificateError.
  903. Sets socket's SSL and timeout options.
  904. """
  905. sock = _create_connection(address, options)
  906. ssl_context = options.ssl_context
  907. if ssl_context is not None:
  908. host = address[0]
  909. try:
  910. # According to RFC6066, section 3, IPv4 and IPv6 literals are
  911. # not permitted for SNI hostname.
  912. # Previous to Python 3.7 wrap_socket would blindly pass
  913. # IP addresses as SNI hostname.
  914. # https://bugs.python.org/issue32185
  915. # We have to pass hostname / ip address to wrap_socket
  916. # to use SSLContext.check_hostname.
  917. if _HAVE_SNI and (not is_ip_address(host) or _IPADDR_SAFE):
  918. sock = ssl_context.wrap_socket(sock, server_hostname=host)
  919. else:
  920. sock = ssl_context.wrap_socket(sock)
  921. except CertificateError:
  922. sock.close()
  923. # Raise CertificateError directly like we do after match_hostname
  924. # below.
  925. raise
  926. except (IOError, OSError, _SSLError) as exc:
  927. sock.close()
  928. # We raise AutoReconnect for transient and permanent SSL handshake
  929. # failures alike. Permanent handshake failures, like protocol
  930. # mismatch, will be turned into ServerSelectionTimeoutErrors later.
  931. _raise_connection_failure(address, exc, "SSL handshake failed: ")
  932. if (ssl_context.verify_mode and not
  933. getattr(ssl_context, "check_hostname", False) and
  934. options.ssl_match_hostname):
  935. try:
  936. match_hostname(sock.getpeercert(), hostname=host)
  937. except CertificateError:
  938. sock.close()
  939. raise
  940. sock.settimeout(options.socket_timeout)
  941. return sock
  942. class _PoolClosedError(PyMongoError):
  943. """Internal error raised when a thread tries to get a connection from a
  944. closed pool.
  945. """
  946. pass
  947. class _PoolGeneration(object):
  948. def __init__(self):
  949. # Maps service_id to generation.
  950. self._generations = collections.defaultdict(int)
  951. # Overall pool generation.
  952. self._generation = 0
  953. def get(self, service_id):
  954. """Get the generation for the given service_id."""
  955. if service_id is None:
  956. return self._generation
  957. return self._generations[service_id]
  958. def get_overall(self):
  959. """Get the Pool's overall generation."""
  960. return self._generation
  961. def inc(self, service_id):
  962. """Increment the generation for the given service_id."""
  963. self._generation += 1
  964. if service_id is None:
  965. for service_id in self._generations:
  966. self._generations[service_id] += 1
  967. else:
  968. self._generations[service_id] += 1
  969. def stale(self, gen, service_id):
  970. """Return if the given generation for a given service_id is stale."""
  971. return gen != self.get(service_id)
  972. class PoolState(object):
  973. PAUSED = 1
  974. READY = 2
  975. CLOSED = 3
  976. # Do *not* explicitly inherit from object or Jython won't call __del__
  977. # http://bugs.jython.org/issue1057
  978. class Pool:
  979. def __init__(self, address, options, handshake=True):
  980. """
  981. :Parameters:
  982. - `address`: a (hostname, port) tuple
  983. - `options`: a PoolOptions instance
  984. - `handshake`: whether to call hello for each new SocketInfo
  985. """
  986. # Check a socket's health with socket_closed() every once in a while.
  987. # Can override for testing: 0 to always check, None to never check.
  988. self._check_interval_seconds = 1
  989. # LIFO pool. Sockets are ordered on idle time. Sockets claimed
  990. # and returned to pool from the left side. Stale sockets removed
  991. # from the right side.
  992. self.sockets = collections.deque()
  993. self.lock = threading.Lock()
  994. self.active_sockets = 0
  995. # Monotonically increasing connection ID required for CMAP Events.
  996. self.next_connection_id = 1
  997. self.closed = False
  998. # Track whether the sockets in this pool are writeable or not.
  999. self.is_writable = None
  1000. # Keep track of resets, so we notice sockets created before the most
  1001. # recent reset and close them.
  1002. # self.generation = 0
  1003. self.gen = _PoolGeneration()
  1004. self.pid = os.getpid()
  1005. self.address = address
  1006. self.opts = options
  1007. self.handshake = handshake
  1008. # Don't publish events in Monitor pools.
  1009. self.enabled_for_cmap = (
  1010. self.handshake and
  1011. self.opts.event_listeners is not None and
  1012. self.opts.event_listeners.enabled_for_cmap)
  1013. if (self.opts.wait_queue_multiple is None or
  1014. self.opts.max_pool_size is None):
  1015. max_waiters = None
  1016. else:
  1017. max_waiters = (
  1018. self.opts.max_pool_size * self.opts.wait_queue_multiple)
  1019. self._socket_semaphore = thread_util.create_semaphore(
  1020. self.opts.max_pool_size, max_waiters)
  1021. if self.enabled_for_cmap:
  1022. self.opts.event_listeners.publish_pool_created(
  1023. self.address, self.opts.non_default_options)
  1024. # Retain references to pinned connections to prevent the CPython GC
  1025. # from thinking that a cursor's pinned connection can be GC'd when the
  1026. # cursor is GC'd (see PYTHON-2751).
  1027. self.__pinned_sockets = set()
  1028. self.ncursors = 0
  1029. self.ntxns = 0
  1030. def _reset(self, close, service_id=None):
  1031. with self.lock:
  1032. if self.closed:
  1033. return
  1034. self.gen.inc(service_id)
  1035. newpid = os.getpid()
  1036. if self.pid != newpid:
  1037. self.pid = newpid
  1038. self.active_sockets = 0
  1039. if service_id is None:
  1040. sockets, self.sockets = self.sockets, collections.deque()
  1041. else:
  1042. discard = collections.deque()
  1043. keep = collections.deque()
  1044. for sock_info in self.sockets:
  1045. if sock_info.service_id == service_id:
  1046. discard.append(sock_info)
  1047. else:
  1048. keep.append(sock_info)
  1049. sockets = discard
  1050. self.sockets = keep
  1051. if close:
  1052. self.closed = True
  1053. listeners = self.opts.event_listeners
  1054. # CMAP spec says that close() MUST close sockets before publishing the
  1055. # PoolClosedEvent but that reset() SHOULD close sockets *after*
  1056. # publishing the PoolClearedEvent.
  1057. if close:
  1058. for sock_info in sockets:
  1059. sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED)
  1060. if self.enabled_for_cmap:
  1061. listeners.publish_pool_closed(self.address)
  1062. else:
  1063. if self.enabled_for_cmap:
  1064. listeners.publish_pool_cleared(self.address,
  1065. service_id=service_id)
  1066. for sock_info in sockets:
  1067. sock_info.close_socket(ConnectionClosedReason.STALE)
  1068. def update_is_writable(self, is_writable):
  1069. """Updates the is_writable attribute on all sockets currently in the
  1070. Pool.
  1071. """
  1072. self.is_writable = is_writable
  1073. with self.lock:
  1074. for socket in self.sockets:
  1075. socket.update_is_writable(self.is_writable)
  1076. def reset(self, service_id=None):
  1077. self._reset(close=False, service_id=service_id)
  1078. def close(self):
  1079. self._reset(close=True)
  1080. def stale_generation(self, gen, service_id):
  1081. return self.gen.stale(gen, service_id)
  1082. def remove_stale_sockets(self, reference_generation, all_credentials):
  1083. """Removes stale sockets then adds new ones if pool is too small and
  1084. has not been reset. The `reference_generation` argument specifies the
  1085. `generation` at the point in time this operation was requested on the
  1086. pool.
  1087. """
  1088. if self.opts.max_idle_time_seconds is not None:
  1089. with self.lock:
  1090. while (self.sockets and
  1091. self.sockets[-1].idle_time_seconds() > self.opts.max_idle_time_seconds):
  1092. sock_info = self.sockets.pop()
  1093. sock_info.close_socket(ConnectionClosedReason.IDLE)
  1094. while True:
  1095. with self.lock:
  1096. if (len(self.sockets) + self.active_sockets >=
  1097. self.opts.min_pool_size):
  1098. # There are enough sockets in the pool.
  1099. break
  1100. # We must acquire the semaphore to respect max_pool_size.
  1101. if not self._socket_semaphore.acquire(False):
  1102. break
  1103. try:
  1104. sock_info = self.connect(all_credentials)
  1105. with self.lock:
  1106. # Close connection and return if the pool was reset during
  1107. # socket creation or while acquiring the pool lock.
  1108. if self.gen.get_overall() != reference_generation:
  1109. sock_info.close_socket(ConnectionClosedReason.STALE)
  1110. break
  1111. self.sockets.appendleft(sock_info)
  1112. finally:
  1113. self._socket_semaphore.release()
  1114. def connect(self, all_credentials=None):
  1115. """Connect to Mongo and return a new SocketInfo.
  1116. Can raise ConnectionFailure or CertificateError.
  1117. Note that the pool does not keep a reference to the socket -- you
  1118. must call return_socket() when you're done with it.
  1119. """
  1120. with self.lock:
  1121. conn_id = self.next_connection_id
  1122. self.next_connection_id += 1
  1123. listeners = self.opts.event_listeners
  1124. if self.enabled_for_cmap:
  1125. listeners.publish_connection_created(self.address, conn_id)
  1126. try:
  1127. sock = _configured_socket(self.address, self.opts)
  1128. except BaseException as error:
  1129. if self.enabled_for_cmap:
  1130. listeners.publish_connection_closed(
  1131. self.address, conn_id, ConnectionClosedReason.ERROR)
  1132. if isinstance(error, (IOError, OSError, _SSLError)):
  1133. _raise_connection_failure(self.address, error)
  1134. raise
  1135. sock_info = SocketInfo(sock, self, self.address, conn_id)
  1136. try:
  1137. if self.handshake:
  1138. sock_info.hello(all_credentials)
  1139. self.is_writable = sock_info.is_writable
  1140. sock_info.check_auth(all_credentials)
  1141. except BaseException:
  1142. sock_info.close_socket(ConnectionClosedReason.ERROR)
  1143. raise
  1144. return sock_info
  1145. @contextlib.contextmanager
  1146. def get_socket(self, all_credentials, handler=None):
  1147. """Get a socket from the pool. Use with a "with" statement.
  1148. Returns a :class:`SocketInfo` object wrapping a connected
  1149. :class:`socket.socket`.
  1150. This method should always be used in a with-statement::
  1151. with pool.get_socket(credentials) as socket_info:
  1152. socket_info.send_message(msg)
  1153. data = socket_info.receive_message(op_code, request_id)
  1154. The socket is logged in or out as needed to match ``all_credentials``
  1155. using the correct authentication mechanism for the server's wire
  1156. protocol version.
  1157. Can raise ConnectionFailure or OperationFailure.
  1158. :Parameters:
  1159. - `all_credentials`: dict, maps auth source to MongoCredential.
  1160. - `handler` (optional): A _MongoClientErrorHandler.
  1161. """
  1162. listeners = self.opts.event_listeners
  1163. if self.enabled_for_cmap:
  1164. listeners.publish_connection_check_out_started(self.address)
  1165. sock_info = self._get_socket(all_credentials)
  1166. if self.enabled_for_cmap:
  1167. listeners.publish_connection_checked_out(
  1168. self.address, sock_info.id)
  1169. try:
  1170. yield sock_info
  1171. except:
  1172. # Exception in caller. Ensure the connection gets returned.
  1173. # Note that when pinned is True, the session owns the
  1174. # connection and it is responsible for checking the connection
  1175. # back into the pool.
  1176. pinned = sock_info.pinned_txn or sock_info.pinned_cursor
  1177. if handler:
  1178. # Perform SDAM error handling rules while the connection is
  1179. # still checked out.
  1180. exc_type, exc_val, _ = sys.exc_info()
  1181. handler.handle(exc_type, exc_val)
  1182. if not pinned and sock_info.active:
  1183. self.return_socket(sock_info)
  1184. raise
  1185. if sock_info.pinned_txn:
  1186. with self.lock:
  1187. self.__pinned_sockets.add(sock_info)
  1188. self.ntxns += 1
  1189. elif sock_info.pinned_cursor:
  1190. with self.lock:
  1191. self.__pinned_sockets.add(sock_info)
  1192. self.ncursors += 1
  1193. elif sock_info.active:
  1194. self.return_socket(sock_info)
  1195. def _get_socket(self, all_credentials):
  1196. """Get or create a SocketInfo. Can raise ConnectionFailure."""
  1197. # We use the pid here to avoid issues with fork / multiprocessing.
  1198. # See test.test_client:TestClient.test_fork for an example of
  1199. # what could go wrong otherwise
  1200. if self.pid != os.getpid():
  1201. self.reset()
  1202. if self.closed:
  1203. if self.enabled_for_cmap:
  1204. self.opts.event_listeners.publish_connection_check_out_failed(
  1205. self.address, ConnectionCheckOutFailedReason.POOL_CLOSED)
  1206. raise _PoolClosedError(
  1207. 'Attempted to check out a connection from closed connection '
  1208. 'pool')
  1209. # Get a free socket or create one.
  1210. if not self._socket_semaphore.acquire(
  1211. True, self.opts.wait_queue_timeout):
  1212. self._raise_wait_queue_timeout()
  1213. # We've now acquired the semaphore and must release it on error.
  1214. sock_info = None
  1215. incremented = False
  1216. try:
  1217. with self.lock:
  1218. self.active_sockets += 1
  1219. incremented = True
  1220. while sock_info is None:
  1221. try:
  1222. with self.lock:
  1223. sock_info = self.sockets.popleft()
  1224. except IndexError:
  1225. # Can raise ConnectionFailure or CertificateError.
  1226. sock_info = self.connect(all_credentials)
  1227. else:
  1228. if self._perished(sock_info):
  1229. sock_info = None
  1230. sock_info.check_auth(all_credentials)
  1231. except BaseException:
  1232. if sock_info:
  1233. # We checked out a socket but authentication failed.
  1234. sock_info.close_socket(ConnectionClosedReason.ERROR)
  1235. self._socket_semaphore.release()
  1236. if incremented:
  1237. with self.lock:
  1238. self.active_sockets -= 1
  1239. if self.enabled_for_cmap:
  1240. self.opts.event_listeners.publish_connection_check_out_failed(
  1241. self.address, ConnectionCheckOutFailedReason.CONN_ERROR)
  1242. raise
  1243. sock_info.active = True
  1244. return sock_info
  1245. def return_socket(self, sock_info):
  1246. """Return the socket to the pool, or if it's closed discard it.
  1247. :Parameters:
  1248. - `sock_info`: The socket to check into the pool.
  1249. """
  1250. txn = sock_info.pinned_txn
  1251. cursor = sock_info.pinned_cursor
  1252. sock_info.active = False
  1253. sock_info.pinned_txn = False
  1254. sock_info.pinned_cursor = False
  1255. self.__pinned_sockets.discard(sock_info)
  1256. listeners = self.opts.event_listeners
  1257. if self.enabled_for_cmap:
  1258. listeners.publish_connection_checked_in(self.address, sock_info.id)
  1259. if self.pid != os.getpid():
  1260. self.reset()
  1261. else:
  1262. if self.closed:
  1263. sock_info.close_socket(ConnectionClosedReason.POOL_CLOSED)
  1264. elif sock_info.closed:
  1265. # CMAP requires the closed event be emitted after the check in.
  1266. if self.enabled_for_cmap:
  1267. listeners.publish_connection_closed(
  1268. self.address, sock_info.id,
  1269. ConnectionClosedReason.ERROR)
  1270. else:
  1271. with self.lock:
  1272. # Hold the lock to ensure this section does not race with
  1273. # Pool.reset().
  1274. if self.stale_generation(sock_info.generation,
  1275. sock_info.service_id):
  1276. sock_info.close_socket(ConnectionClosedReason.STALE)
  1277. else:
  1278. sock_info.update_last_checkin_time()
  1279. sock_info.update_is_writable(self.is_writable)
  1280. self.sockets.appendleft(sock_info)
  1281. self._socket_semaphore.release()
  1282. with self.lock:
  1283. if txn:
  1284. self.ntxns -= 1
  1285. elif cursor:
  1286. self.ncursors -= 1
  1287. self.active_sockets -= 1
  1288. def _perished(self, sock_info):
  1289. """Return True and close the connection if it is "perished".
  1290. This side-effecty function checks if this socket has been idle for
  1291. for longer than the max idle time, or if the socket has been closed by
  1292. some external network error, or if the socket's generation is outdated.
  1293. Checking sockets lets us avoid seeing *some*
  1294. :class:`~pymongo.errors.AutoReconnect` exceptions on server
  1295. hiccups, etc. We only check if the socket was closed by an external
  1296. error if it has been > 1 second since the socket was checked into the
  1297. pool, to keep performance reasonable - we can't avoid AutoReconnects
  1298. completely anyway.
  1299. """
  1300. idle_time_seconds = sock_info.idle_time_seconds()
  1301. # If socket is idle, open a new one.
  1302. if (self.opts.max_idle_time_seconds is not None and
  1303. idle_time_seconds > self.opts.max_idle_time_seconds):
  1304. sock_info.close_socket(ConnectionClosedReason.IDLE)
  1305. return True
  1306. if (self._check_interval_seconds is not None and (
  1307. 0 == self._check_interval_seconds or
  1308. idle_time_seconds > self._check_interval_seconds)):
  1309. if sock_info.socket_closed():
  1310. sock_info.close_socket(ConnectionClosedReason.ERROR)
  1311. return True
  1312. if self.stale_generation(sock_info.generation, sock_info.service_id):
  1313. sock_info.close_socket(ConnectionClosedReason.STALE)
  1314. return True
  1315. return False
  1316. def _raise_wait_queue_timeout(self):
  1317. listeners = self.opts.event_listeners
  1318. if self.enabled_for_cmap:
  1319. listeners.publish_connection_check_out_failed(
  1320. self.address, ConnectionCheckOutFailedReason.TIMEOUT)
  1321. if self.opts.load_balanced:
  1322. other_ops = self.active_sockets - self.ncursors - self.ntxns
  1323. raise ConnectionFailure(
  1324. 'Timeout waiting for connection from the connection pool. '
  1325. 'maxPoolSize: %s, connections in use by cursors: %s, '
  1326. 'connections in use by transactions: %s, connections in use '
  1327. 'by other operations: %s, wait_queue_timeout: %s' % (
  1328. self.opts.max_pool_size, self.ncursors, self.ntxns,
  1329. other_ops, self.opts.wait_queue_timeout))
  1330. raise ConnectionFailure(
  1331. 'Timed out while checking out a connection from connection pool. '
  1332. 'maxPoolSize: %s, wait_queue_timeout: %s' % (
  1333. self.opts.max_pool_size, self.opts.wait_queue_timeout))
  1334. def __del__(self):
  1335. # Avoid ResourceWarnings in Python 3
  1336. # Close all sockets without calling reset() or close() because it is
  1337. # not safe to acquire a lock in __del__.
  1338. for sock_info in self.sockets:
  1339. sock_info.close_socket(None)