read_preferences.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. # Copyright 2012-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License",
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Utilities for choosing which member of a replica set to read from."""
  15. from bson.py3compat import abc, integer_types
  16. from pymongo import max_staleness_selectors
  17. from pymongo.errors import ConfigurationError
  18. from pymongo.server_selectors import (member_with_tags_server_selector,
  19. secondary_with_tags_server_selector)
  20. _PRIMARY = 0
  21. _PRIMARY_PREFERRED = 1
  22. _SECONDARY = 2
  23. _SECONDARY_PREFERRED = 3
  24. _NEAREST = 4
  25. _MONGOS_MODES = (
  26. 'primary',
  27. 'primaryPreferred',
  28. 'secondary',
  29. 'secondaryPreferred',
  30. 'nearest',
  31. )
  32. def _validate_tag_sets(tag_sets):
  33. """Validate tag sets for a MongoReplicaSetClient.
  34. """
  35. if tag_sets is None:
  36. return tag_sets
  37. if not isinstance(tag_sets, list):
  38. raise TypeError((
  39. "Tag sets %r invalid, must be a list") % (tag_sets,))
  40. if len(tag_sets) == 0:
  41. raise ValueError((
  42. "Tag sets %r invalid, must be None or contain at least one set of"
  43. " tags") % (tag_sets,))
  44. for tags in tag_sets:
  45. if not isinstance(tags, abc.Mapping):
  46. raise TypeError(
  47. "Tag set %r invalid, must be an instance of dict, "
  48. "bson.son.SON or other type that inherits from "
  49. "collection.Mapping" % (tags,))
  50. return tag_sets
  51. def _invalid_max_staleness_msg(max_staleness):
  52. return ("maxStalenessSeconds must be a positive integer, not %s" %
  53. max_staleness)
  54. # Some duplication with common.py to avoid import cycle.
  55. def _validate_max_staleness(max_staleness):
  56. """Validate max_staleness."""
  57. if max_staleness == -1:
  58. return -1
  59. if not isinstance(max_staleness, integer_types):
  60. raise TypeError(_invalid_max_staleness_msg(max_staleness))
  61. if max_staleness <= 0:
  62. raise ValueError(_invalid_max_staleness_msg(max_staleness))
  63. return max_staleness
  64. def _validate_hedge(hedge):
  65. """Validate hedge."""
  66. if hedge is None:
  67. return None
  68. if not isinstance(hedge, dict):
  69. raise TypeError("hedge must be a dictionary, not %r" % (hedge,))
  70. return hedge
  71. class _ServerMode(object):
  72. """Base class for all read preferences.
  73. """
  74. __slots__ = ("__mongos_mode", "__mode", "__tag_sets", "__max_staleness",
  75. "__hedge")
  76. def __init__(self, mode, tag_sets=None, max_staleness=-1, hedge=None):
  77. self.__mongos_mode = _MONGOS_MODES[mode]
  78. self.__mode = mode
  79. self.__tag_sets = _validate_tag_sets(tag_sets)
  80. self.__max_staleness = _validate_max_staleness(max_staleness)
  81. self.__hedge = _validate_hedge(hedge)
  82. @property
  83. def name(self):
  84. """The name of this read preference.
  85. """
  86. return self.__class__.__name__
  87. @property
  88. def mongos_mode(self):
  89. """The mongos mode of this read preference.
  90. """
  91. return self.__mongos_mode
  92. @property
  93. def document(self):
  94. """Read preference as a document.
  95. """
  96. doc = {'mode': self.__mongos_mode}
  97. if self.__tag_sets not in (None, [{}]):
  98. doc['tags'] = self.__tag_sets
  99. if self.__max_staleness != -1:
  100. doc['maxStalenessSeconds'] = self.__max_staleness
  101. if self.__hedge not in (None, {}):
  102. doc['hedge'] = self.__hedge
  103. return doc
  104. @property
  105. def mode(self):
  106. """The mode of this read preference instance.
  107. """
  108. return self.__mode
  109. @property
  110. def tag_sets(self):
  111. """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
  112. read only from members whose ``dc`` tag has the value ``"ny"``.
  113. To specify a priority-order for tag sets, provide a list of
  114. tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
  115. set, ``{}``, means "read from any member that matches the mode,
  116. ignoring tags." MongoReplicaSetClient tries each set of tags in turn
  117. until it finds a set of tags with at least one matching member.
  118. .. seealso:: `Data-Center Awareness
  119. <http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_
  120. """
  121. return list(self.__tag_sets) if self.__tag_sets else [{}]
  122. @property
  123. def max_staleness(self):
  124. """The maximum estimated length of time (in seconds) a replica set
  125. secondary can fall behind the primary in replication before it will
  126. no longer be selected for operations, or -1 for no maximum."""
  127. return self.__max_staleness
  128. @property
  129. def hedge(self):
  130. """The read preference ``hedge`` parameter.
  131. A dictionary that configures how the server will perform hedged reads.
  132. It consists of the following keys:
  133. - ``enabled``: Enables or disables hedged reads in sharded clusters.
  134. Hedged reads are automatically enabled in MongoDB 4.4+ when using a
  135. ``nearest`` read preference. To explicitly enable hedged reads, set
  136. the ``enabled`` key to ``true``::
  137. >>> Nearest(hedge={'enabled': True})
  138. To explicitly disable hedged reads, set the ``enabled`` key to
  139. ``False``::
  140. >>> Nearest(hedge={'enabled': False})
  141. .. versionadded:: 3.11
  142. """
  143. return self.__hedge
  144. @property
  145. def min_wire_version(self):
  146. """The wire protocol version the server must support.
  147. Some read preferences impose version requirements on all servers (e.g.
  148. maxStalenessSeconds requires MongoDB 3.4 / maxWireVersion 5).
  149. All servers' maxWireVersion must be at least this read preference's
  150. `min_wire_version`, or the driver raises
  151. :exc:`~pymongo.errors.ConfigurationError`.
  152. """
  153. return 0 if self.__max_staleness == -1 else 5
  154. def __repr__(self):
  155. return "%s(tag_sets=%r, max_staleness=%r, hedge=%r)" % (
  156. self.name, self.__tag_sets, self.__max_staleness, self.__hedge)
  157. def __eq__(self, other):
  158. if isinstance(other, _ServerMode):
  159. return (self.mode == other.mode and
  160. self.tag_sets == other.tag_sets and
  161. self.max_staleness == other.max_staleness and
  162. self.hedge == other.hedge)
  163. return NotImplemented
  164. def __ne__(self, other):
  165. return not self == other
  166. def __getstate__(self):
  167. """Return value of object for pickling.
  168. Needed explicitly because __slots__() defined.
  169. """
  170. return {'mode': self.__mode,
  171. 'tag_sets': self.__tag_sets,
  172. 'max_staleness': self.__max_staleness,
  173. 'hedge': self.__hedge}
  174. def __setstate__(self, value):
  175. """Restore from pickling."""
  176. self.__mode = value['mode']
  177. self.__mongos_mode = _MONGOS_MODES[self.__mode]
  178. self.__tag_sets = _validate_tag_sets(value['tag_sets'])
  179. self.__max_staleness = _validate_max_staleness(value['max_staleness'])
  180. self.__hedge = _validate_hedge(value['hedge'])
  181. class Primary(_ServerMode):
  182. """Primary read preference.
  183. * When directly connected to one mongod queries are allowed if the server
  184. is standalone or a replica set primary.
  185. * When connected to a mongos queries are sent to the primary of a shard.
  186. * When connected to a replica set queries are sent to the primary of
  187. the replica set.
  188. """
  189. __slots__ = ()
  190. def __init__(self):
  191. super(Primary, self).__init__(_PRIMARY)
  192. def __call__(self, selection):
  193. """Apply this read preference to a Selection."""
  194. return selection.primary_selection
  195. def __repr__(self):
  196. return "Primary()"
  197. def __eq__(self, other):
  198. if isinstance(other, _ServerMode):
  199. return other.mode == _PRIMARY
  200. return NotImplemented
  201. class PrimaryPreferred(_ServerMode):
  202. """PrimaryPreferred read preference.
  203. * When directly connected to one mongod queries are allowed to standalone
  204. servers, to a replica set primary, or to replica set secondaries.
  205. * When connected to a mongos queries are sent to the primary of a shard if
  206. available, otherwise a shard secondary.
  207. * When connected to a replica set queries are sent to the primary if
  208. available, otherwise a secondary.
  209. .. note:: When a :class:`~pymongo.mongo_client.MongoClient` is first
  210. created reads will be routed to an available secondary until the
  211. primary of the replica set is discovered.
  212. :Parameters:
  213. - `tag_sets`: The :attr:`~tag_sets` to use if the primary is not
  214. available.
  215. - `max_staleness`: (integer, in seconds) The maximum estimated
  216. length of time a replica set secondary can fall behind the primary in
  217. replication before it will no longer be selected for operations.
  218. Default -1, meaning no maximum. If it is set, it must be at least
  219. 90 seconds.
  220. - `hedge`: The :attr:`~hedge` to use if the primary is not available.
  221. .. versionchanged:: 3.11
  222. Added ``hedge`` parameter.
  223. """
  224. __slots__ = ()
  225. def __init__(self, tag_sets=None, max_staleness=-1, hedge=None):
  226. super(PrimaryPreferred, self).__init__(
  227. _PRIMARY_PREFERRED, tag_sets, max_staleness, hedge)
  228. def __call__(self, selection):
  229. """Apply this read preference to Selection."""
  230. if selection.primary:
  231. return selection.primary_selection
  232. else:
  233. return secondary_with_tags_server_selector(
  234. self.tag_sets,
  235. max_staleness_selectors.select(
  236. self.max_staleness, selection))
  237. class Secondary(_ServerMode):
  238. """Secondary read preference.
  239. * When directly connected to one mongod queries are allowed to standalone
  240. servers, to a replica set primary, or to replica set secondaries.
  241. * When connected to a mongos queries are distributed among shard
  242. secondaries. An error is raised if no secondaries are available.
  243. * When connected to a replica set queries are distributed among
  244. secondaries. An error is raised if no secondaries are available.
  245. :Parameters:
  246. - `tag_sets`: The :attr:`~tag_sets` for this read preference.
  247. - `max_staleness`: (integer, in seconds) The maximum estimated
  248. length of time a replica set secondary can fall behind the primary in
  249. replication before it will no longer be selected for operations.
  250. Default -1, meaning no maximum. If it is set, it must be at least
  251. 90 seconds.
  252. - `hedge`: The :attr:`~hedge` for this read preference.
  253. .. versionchanged:: 3.11
  254. Added ``hedge`` parameter.
  255. """
  256. __slots__ = ()
  257. def __init__(self, tag_sets=None, max_staleness=-1, hedge=None):
  258. super(Secondary, self).__init__(
  259. _SECONDARY, tag_sets, max_staleness, hedge)
  260. def __call__(self, selection):
  261. """Apply this read preference to Selection."""
  262. return secondary_with_tags_server_selector(
  263. self.tag_sets,
  264. max_staleness_selectors.select(
  265. self.max_staleness, selection))
  266. class SecondaryPreferred(_ServerMode):
  267. """SecondaryPreferred read preference.
  268. * When directly connected to one mongod queries are allowed to standalone
  269. servers, to a replica set primary, or to replica set secondaries.
  270. * When connected to a mongos queries are distributed among shard
  271. secondaries, or the shard primary if no secondary is available.
  272. * When connected to a replica set queries are distributed among
  273. secondaries, or the primary if no secondary is available.
  274. .. note:: When a :class:`~pymongo.mongo_client.MongoClient` is first
  275. created reads will be routed to the primary of the replica set until
  276. an available secondary is discovered.
  277. :Parameters:
  278. - `tag_sets`: The :attr:`~tag_sets` for this read preference.
  279. - `max_staleness`: (integer, in seconds) The maximum estimated
  280. length of time a replica set secondary can fall behind the primary in
  281. replication before it will no longer be selected for operations.
  282. Default -1, meaning no maximum. If it is set, it must be at least
  283. 90 seconds.
  284. - `hedge`: The :attr:`~hedge` for this read preference.
  285. .. versionchanged:: 3.11
  286. Added ``hedge`` parameter.
  287. """
  288. __slots__ = ()
  289. def __init__(self, tag_sets=None, max_staleness=-1, hedge=None):
  290. super(SecondaryPreferred, self).__init__(
  291. _SECONDARY_PREFERRED, tag_sets, max_staleness, hedge)
  292. def __call__(self, selection):
  293. """Apply this read preference to Selection."""
  294. secondaries = secondary_with_tags_server_selector(
  295. self.tag_sets,
  296. max_staleness_selectors.select(
  297. self.max_staleness, selection))
  298. if secondaries:
  299. return secondaries
  300. else:
  301. return selection.primary_selection
  302. class Nearest(_ServerMode):
  303. """Nearest read preference.
  304. * When directly connected to one mongod queries are allowed to standalone
  305. servers, to a replica set primary, or to replica set secondaries.
  306. * When connected to a mongos queries are distributed among all members of
  307. a shard.
  308. * When connected to a replica set queries are distributed among all
  309. members.
  310. :Parameters:
  311. - `tag_sets`: The :attr:`~tag_sets` for this read preference.
  312. - `max_staleness`: (integer, in seconds) The maximum estimated
  313. length of time a replica set secondary can fall behind the primary in
  314. replication before it will no longer be selected for operations.
  315. Default -1, meaning no maximum. If it is set, it must be at least
  316. 90 seconds.
  317. - `hedge`: The :attr:`~hedge` for this read preference.
  318. .. versionchanged:: 3.11
  319. Added ``hedge`` parameter.
  320. """
  321. __slots__ = ()
  322. def __init__(self, tag_sets=None, max_staleness=-1, hedge=None):
  323. super(Nearest, self).__init__(
  324. _NEAREST, tag_sets, max_staleness, hedge)
  325. def __call__(self, selection):
  326. """Apply this read preference to Selection."""
  327. return member_with_tags_server_selector(
  328. self.tag_sets,
  329. max_staleness_selectors.select(
  330. self.max_staleness, selection))
  331. _ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
  332. Secondary, SecondaryPreferred, Nearest)
  333. def make_read_preference(mode, tag_sets, max_staleness=-1):
  334. if mode == _PRIMARY:
  335. if tag_sets not in (None, [{}]):
  336. raise ConfigurationError("Read preference primary "
  337. "cannot be combined with tags")
  338. if max_staleness != -1:
  339. raise ConfigurationError("Read preference primary cannot be "
  340. "combined with maxStalenessSeconds")
  341. return Primary()
  342. return _ALL_READ_PREFERENCES[mode](tag_sets, max_staleness)
  343. _MODES = (
  344. 'PRIMARY',
  345. 'PRIMARY_PREFERRED',
  346. 'SECONDARY',
  347. 'SECONDARY_PREFERRED',
  348. 'NEAREST',
  349. )
  350. class ReadPreference(object):
  351. """An enum that defines the read preference modes supported by PyMongo.
  352. See :doc:`/examples/high_availability` for code examples.
  353. A read preference is used in three cases:
  354. :class:`~pymongo.mongo_client.MongoClient` connected to a single mongod:
  355. - ``PRIMARY``: Queries are allowed if the server is standalone or a replica
  356. set primary.
  357. - All other modes allow queries to standalone servers, to a replica set
  358. primary, or to replica set secondaries.
  359. :class:`~pymongo.mongo_client.MongoClient` initialized with the
  360. ``replicaSet`` option:
  361. - ``PRIMARY``: Read from the primary. This is the default, and provides the
  362. strongest consistency. If no primary is available, raise
  363. :class:`~pymongo.errors.AutoReconnect`.
  364. - ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is
  365. none, read from a secondary.
  366. - ``SECONDARY``: Read from a secondary. If no secondary is available,
  367. raise :class:`~pymongo.errors.AutoReconnect`.
  368. - ``SECONDARY_PREFERRED``: Read from a secondary if available, otherwise
  369. from the primary.
  370. - ``NEAREST``: Read from any member.
  371. :class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a
  372. sharded cluster of replica sets:
  373. - ``PRIMARY``: Read from the primary of the shard, or raise
  374. :class:`~pymongo.errors.OperationFailure` if there is none.
  375. This is the default.
  376. - ``PRIMARY_PREFERRED``: Read from the primary of the shard, or if there is
  377. none, read from a secondary of the shard.
  378. - ``SECONDARY``: Read from a secondary of the shard, or raise
  379. :class:`~pymongo.errors.OperationFailure` if there is none.
  380. - ``SECONDARY_PREFERRED``: Read from a secondary of the shard if available,
  381. otherwise from the shard primary.
  382. - ``NEAREST``: Read from any shard member.
  383. """
  384. PRIMARY = Primary()
  385. PRIMARY_PREFERRED = PrimaryPreferred()
  386. SECONDARY = Secondary()
  387. SECONDARY_PREFERRED = SecondaryPreferred()
  388. NEAREST = Nearest()
  389. def read_pref_mode_from_name(name):
  390. """Get the read preference mode from mongos/uri name.
  391. """
  392. return _MONGOS_MODES.index(name)
  393. class MovingAverage(object):
  394. """Tracks an exponentially-weighted moving average."""
  395. def __init__(self):
  396. self.average = None
  397. def add_sample(self, sample):
  398. if sample < 0:
  399. # Likely system time change while waiting for hello response
  400. # and not using time.monotonic. Ignore it, the next one will
  401. # probably be valid.
  402. return
  403. if self.average is None:
  404. self.average = sample
  405. else:
  406. # The Server Selection Spec requires an exponentially weighted
  407. # average with alpha = 0.2.
  408. self.average = 0.8 * self.average + 0.2 * sample
  409. def get(self):
  410. """Get the calculated average, or None if no samples yet."""
  411. return self.average
  412. def reset(self):
  413. self.average = None