monitoring.py 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647
  1. # Copyright 2015-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Tools to monitor driver events.
  15. .. versionadded:: 3.1
  16. .. attention:: Starting in PyMongo 3.11, the monitoring classes outlined below
  17. are included in the PyMongo distribution under the
  18. :mod:`~pymongo.event_loggers` submodule.
  19. Use :func:`register` to register global listeners for specific events.
  20. Listeners must inherit from one of the abstract classes below and implement
  21. the correct functions for that class.
  22. For example, a simple command logger might be implemented like this::
  23. import logging
  24. from pymongo import monitoring
  25. class CommandLogger(monitoring.CommandListener):
  26. def started(self, event):
  27. logging.info("Command {0.command_name} with request id "
  28. "{0.request_id} started on server "
  29. "{0.connection_id}".format(event))
  30. def succeeded(self, event):
  31. logging.info("Command {0.command_name} with request id "
  32. "{0.request_id} on server {0.connection_id} "
  33. "succeeded in {0.duration_micros} "
  34. "microseconds".format(event))
  35. def failed(self, event):
  36. logging.info("Command {0.command_name} with request id "
  37. "{0.request_id} on server {0.connection_id} "
  38. "failed in {0.duration_micros} "
  39. "microseconds".format(event))
  40. monitoring.register(CommandLogger())
  41. Server discovery and monitoring events are also available. For example::
  42. class ServerLogger(monitoring.ServerListener):
  43. def opened(self, event):
  44. logging.info("Server {0.server_address} added to topology "
  45. "{0.topology_id}".format(event))
  46. def description_changed(self, event):
  47. previous_server_type = event.previous_description.server_type
  48. new_server_type = event.new_description.server_type
  49. if new_server_type != previous_server_type:
  50. # server_type_name was added in PyMongo 3.4
  51. logging.info(
  52. "Server {0.server_address} changed type from "
  53. "{0.previous_description.server_type_name} to "
  54. "{0.new_description.server_type_name}".format(event))
  55. def closed(self, event):
  56. logging.warning("Server {0.server_address} removed from topology "
  57. "{0.topology_id}".format(event))
  58. class HeartbeatLogger(monitoring.ServerHeartbeatListener):
  59. def started(self, event):
  60. logging.info("Heartbeat sent to server "
  61. "{0.connection_id}".format(event))
  62. def succeeded(self, event):
  63. # The reply.document attribute was added in PyMongo 3.4.
  64. logging.info("Heartbeat to server {0.connection_id} "
  65. "succeeded with reply "
  66. "{0.reply.document}".format(event))
  67. def failed(self, event):
  68. logging.warning("Heartbeat to server {0.connection_id} "
  69. "failed with error {0.reply}".format(event))
  70. class TopologyLogger(monitoring.TopologyListener):
  71. def opened(self, event):
  72. logging.info("Topology with id {0.topology_id} "
  73. "opened".format(event))
  74. def description_changed(self, event):
  75. logging.info("Topology description updated for "
  76. "topology id {0.topology_id}".format(event))
  77. previous_topology_type = event.previous_description.topology_type
  78. new_topology_type = event.new_description.topology_type
  79. if new_topology_type != previous_topology_type:
  80. # topology_type_name was added in PyMongo 3.4
  81. logging.info(
  82. "Topology {0.topology_id} changed type from "
  83. "{0.previous_description.topology_type_name} to "
  84. "{0.new_description.topology_type_name}".format(event))
  85. # The has_writable_server and has_readable_server methods
  86. # were added in PyMongo 3.4.
  87. if not event.new_description.has_writable_server():
  88. logging.warning("No writable servers available.")
  89. if not event.new_description.has_readable_server():
  90. logging.warning("No readable servers available.")
  91. def closed(self, event):
  92. logging.info("Topology with id {0.topology_id} "
  93. "closed".format(event))
  94. Connection monitoring and pooling events are also available. For example::
  95. class ConnectionPoolLogger(ConnectionPoolListener):
  96. def pool_created(self, event):
  97. logging.info("[pool {0.address}] pool created".format(event))
  98. def pool_cleared(self, event):
  99. logging.info("[pool {0.address}] pool cleared".format(event))
  100. def pool_closed(self, event):
  101. logging.info("[pool {0.address}] pool closed".format(event))
  102. def connection_created(self, event):
  103. logging.info("[pool {0.address}][conn #{0.connection_id}] "
  104. "connection created".format(event))
  105. def connection_ready(self, event):
  106. logging.info("[pool {0.address}][conn #{0.connection_id}] "
  107. "connection setup succeeded".format(event))
  108. def connection_closed(self, event):
  109. logging.info("[pool {0.address}][conn #{0.connection_id}] "
  110. "connection closed, reason: "
  111. "{0.reason}".format(event))
  112. def connection_check_out_started(self, event):
  113. logging.info("[pool {0.address}] connection check out "
  114. "started".format(event))
  115. def connection_check_out_failed(self, event):
  116. logging.info("[pool {0.address}] connection check out "
  117. "failed, reason: {0.reason}".format(event))
  118. def connection_checked_out(self, event):
  119. logging.info("[pool {0.address}][conn #{0.connection_id}] "
  120. "connection checked out of pool".format(event))
  121. def connection_checked_in(self, event):
  122. logging.info("[pool {0.address}][conn #{0.connection_id}] "
  123. "connection checked into pool".format(event))
  124. Event listeners can also be registered per instance of
  125. :class:`~pymongo.mongo_client.MongoClient`::
  126. client = MongoClient(event_listeners=[CommandLogger()])
  127. Note that previously registered global listeners are automatically included
  128. when configuring per client event listeners. Registering a new global listener
  129. will not add that listener to existing client instances.
  130. .. note:: Events are delivered **synchronously**. Application threads block
  131. waiting for event handlers (e.g. :meth:`~CommandListener.started`) to
  132. return. Care must be taken to ensure that your event handlers are efficient
  133. enough to not adversely affect overall application performance.
  134. .. warning:: The command documents published through this API are *not* copies.
  135. If you intend to modify them in any way you must copy them in your event
  136. handler first.
  137. """
  138. from collections import namedtuple
  139. from bson.py3compat import abc
  140. from pymongo.hello_compat import HelloCompat
  141. from pymongo.helpers import _handle_exception
  142. _Listeners = namedtuple('Listeners',
  143. ('command_listeners', 'server_listeners',
  144. 'server_heartbeat_listeners', 'topology_listeners',
  145. 'cmap_listeners'))
  146. _LISTENERS = _Listeners([], [], [], [], [])
  147. class _EventListener(object):
  148. """Abstract base class for all event listeners."""
  149. class CommandListener(_EventListener):
  150. """Abstract base class for command listeners.
  151. Handles `CommandStartedEvent`, `CommandSucceededEvent`,
  152. and `CommandFailedEvent`.
  153. """
  154. def started(self, event):
  155. """Abstract method to handle a `CommandStartedEvent`.
  156. :Parameters:
  157. - `event`: An instance of :class:`CommandStartedEvent`.
  158. """
  159. raise NotImplementedError
  160. def succeeded(self, event):
  161. """Abstract method to handle a `CommandSucceededEvent`.
  162. :Parameters:
  163. - `event`: An instance of :class:`CommandSucceededEvent`.
  164. """
  165. raise NotImplementedError
  166. def failed(self, event):
  167. """Abstract method to handle a `CommandFailedEvent`.
  168. :Parameters:
  169. - `event`: An instance of :class:`CommandFailedEvent`.
  170. """
  171. raise NotImplementedError
  172. class ConnectionPoolListener(_EventListener):
  173. """Abstract base class for connection pool listeners.
  174. Handles all of the connection pool events defined in the Connection
  175. Monitoring and Pooling Specification:
  176. :class:`PoolCreatedEvent`, :class:`PoolClearedEvent`,
  177. :class:`PoolClosedEvent`, :class:`ConnectionCreatedEvent`,
  178. :class:`ConnectionReadyEvent`, :class:`ConnectionClosedEvent`,
  179. :class:`ConnectionCheckOutStartedEvent`,
  180. :class:`ConnectionCheckOutFailedEvent`,
  181. :class:`ConnectionCheckedOutEvent`,
  182. and :class:`ConnectionCheckedInEvent`.
  183. .. versionadded:: 3.9
  184. """
  185. def pool_created(self, event):
  186. """Abstract method to handle a :class:`PoolCreatedEvent`.
  187. Emitted when a Connection Pool is created.
  188. :Parameters:
  189. - `event`: An instance of :class:`PoolCreatedEvent`.
  190. """
  191. raise NotImplementedError
  192. def pool_cleared(self, event):
  193. """Abstract method to handle a `PoolClearedEvent`.
  194. Emitted when a Connection Pool is cleared.
  195. :Parameters:
  196. - `event`: An instance of :class:`PoolClearedEvent`.
  197. """
  198. raise NotImplementedError
  199. def pool_closed(self, event):
  200. """Abstract method to handle a `PoolClosedEvent`.
  201. Emitted when a Connection Pool is closed.
  202. :Parameters:
  203. - `event`: An instance of :class:`PoolClosedEvent`.
  204. """
  205. raise NotImplementedError
  206. def connection_created(self, event):
  207. """Abstract method to handle a :class:`ConnectionCreatedEvent`.
  208. Emitted when a Connection Pool creates a Connection object.
  209. :Parameters:
  210. - `event`: An instance of :class:`ConnectionCreatedEvent`.
  211. """
  212. raise NotImplementedError
  213. def connection_ready(self, event):
  214. """Abstract method to handle a :class:`ConnectionReadyEvent`.
  215. Emitted when a Connection has finished its setup, and is now ready to
  216. use.
  217. :Parameters:
  218. - `event`: An instance of :class:`ConnectionReadyEvent`.
  219. """
  220. raise NotImplementedError
  221. def connection_closed(self, event):
  222. """Abstract method to handle a :class:`ConnectionClosedEvent`.
  223. Emitted when a Connection Pool closes a Connection.
  224. :Parameters:
  225. - `event`: An instance of :class:`ConnectionClosedEvent`.
  226. """
  227. raise NotImplementedError
  228. def connection_check_out_started(self, event):
  229. """Abstract method to handle a :class:`ConnectionCheckOutStartedEvent`.
  230. Emitted when the driver starts attempting to check out a connection.
  231. :Parameters:
  232. - `event`: An instance of :class:`ConnectionCheckOutStartedEvent`.
  233. """
  234. raise NotImplementedError
  235. def connection_check_out_failed(self, event):
  236. """Abstract method to handle a :class:`ConnectionCheckOutFailedEvent`.
  237. Emitted when the driver's attempt to check out a connection fails.
  238. :Parameters:
  239. - `event`: An instance of :class:`ConnectionCheckOutFailedEvent`.
  240. """
  241. raise NotImplementedError
  242. def connection_checked_out(self, event):
  243. """Abstract method to handle a :class:`ConnectionCheckedOutEvent`.
  244. Emitted when the driver successfully checks out a Connection.
  245. :Parameters:
  246. - `event`: An instance of :class:`ConnectionCheckedOutEvent`.
  247. """
  248. raise NotImplementedError
  249. def connection_checked_in(self, event):
  250. """Abstract method to handle a :class:`ConnectionCheckedInEvent`.
  251. Emitted when the driver checks in a Connection back to the Connection
  252. Pool.
  253. :Parameters:
  254. - `event`: An instance of :class:`ConnectionCheckedInEvent`.
  255. """
  256. raise NotImplementedError
  257. class ServerHeartbeatListener(_EventListener):
  258. """Abstract base class for server heartbeat listeners.
  259. Handles `ServerHeartbeatStartedEvent`, `ServerHeartbeatSucceededEvent`,
  260. and `ServerHeartbeatFailedEvent`.
  261. .. versionadded:: 3.3
  262. """
  263. def started(self, event):
  264. """Abstract method to handle a `ServerHeartbeatStartedEvent`.
  265. :Parameters:
  266. - `event`: An instance of :class:`ServerHeartbeatStartedEvent`.
  267. """
  268. raise NotImplementedError
  269. def succeeded(self, event):
  270. """Abstract method to handle a `ServerHeartbeatSucceededEvent`.
  271. :Parameters:
  272. - `event`: An instance of :class:`ServerHeartbeatSucceededEvent`.
  273. """
  274. raise NotImplementedError
  275. def failed(self, event):
  276. """Abstract method to handle a `ServerHeartbeatFailedEvent`.
  277. :Parameters:
  278. - `event`: An instance of :class:`ServerHeartbeatFailedEvent`.
  279. """
  280. raise NotImplementedError
  281. class TopologyListener(_EventListener):
  282. """Abstract base class for topology monitoring listeners.
  283. Handles `TopologyOpenedEvent`, `TopologyDescriptionChangedEvent`, and
  284. `TopologyClosedEvent`.
  285. .. versionadded:: 3.3
  286. """
  287. def opened(self, event):
  288. """Abstract method to handle a `TopologyOpenedEvent`.
  289. :Parameters:
  290. - `event`: An instance of :class:`TopologyOpenedEvent`.
  291. """
  292. raise NotImplementedError
  293. def description_changed(self, event):
  294. """Abstract method to handle a `TopologyDescriptionChangedEvent`.
  295. :Parameters:
  296. - `event`: An instance of :class:`TopologyDescriptionChangedEvent`.
  297. """
  298. raise NotImplementedError
  299. def closed(self, event):
  300. """Abstract method to handle a `TopologyClosedEvent`.
  301. :Parameters:
  302. - `event`: An instance of :class:`TopologyClosedEvent`.
  303. """
  304. raise NotImplementedError
  305. class ServerListener(_EventListener):
  306. """Abstract base class for server listeners.
  307. Handles `ServerOpeningEvent`, `ServerDescriptionChangedEvent`, and
  308. `ServerClosedEvent`.
  309. .. versionadded:: 3.3
  310. """
  311. def opened(self, event):
  312. """Abstract method to handle a `ServerOpeningEvent`.
  313. :Parameters:
  314. - `event`: An instance of :class:`ServerOpeningEvent`.
  315. """
  316. raise NotImplementedError
  317. def description_changed(self, event):
  318. """Abstract method to handle a `ServerDescriptionChangedEvent`.
  319. :Parameters:
  320. - `event`: An instance of :class:`ServerDescriptionChangedEvent`.
  321. """
  322. raise NotImplementedError
  323. def closed(self, event):
  324. """Abstract method to handle a `ServerClosedEvent`.
  325. :Parameters:
  326. - `event`: An instance of :class:`ServerClosedEvent`.
  327. """
  328. raise NotImplementedError
  329. def _to_micros(dur):
  330. """Convert duration 'dur' to microseconds."""
  331. return int(dur.total_seconds() * 10e5)
  332. def _validate_event_listeners(option, listeners):
  333. """Validate event listeners"""
  334. if not isinstance(listeners, abc.Sequence):
  335. raise TypeError("%s must be a list or tuple" % (option,))
  336. for listener in listeners:
  337. if not isinstance(listener, _EventListener):
  338. raise TypeError("Listeners for %s must be either a "
  339. "CommandListener, ServerHeartbeatListener, "
  340. "ServerListener, TopologyListener, or "
  341. "ConnectionPoolListener." % (option,))
  342. return listeners
  343. def register(listener):
  344. """Register a global event listener.
  345. :Parameters:
  346. - `listener`: A subclasses of :class:`CommandListener`,
  347. :class:`ServerHeartbeatListener`, :class:`ServerListener`,
  348. :class:`TopologyListener`, or :class:`ConnectionPoolListener`.
  349. """
  350. if not isinstance(listener, _EventListener):
  351. raise TypeError("Listeners for %s must be either a "
  352. "CommandListener, ServerHeartbeatListener, "
  353. "ServerListener, TopologyListener, or "
  354. "ConnectionPoolListener." % (listener,))
  355. if isinstance(listener, CommandListener):
  356. _LISTENERS.command_listeners.append(listener)
  357. if isinstance(listener, ServerHeartbeatListener):
  358. _LISTENERS.server_heartbeat_listeners.append(listener)
  359. if isinstance(listener, ServerListener):
  360. _LISTENERS.server_listeners.append(listener)
  361. if isinstance(listener, TopologyListener):
  362. _LISTENERS.topology_listeners.append(listener)
  363. if isinstance(listener, ConnectionPoolListener):
  364. _LISTENERS.cmap_listeners.append(listener)
  365. # Note - to avoid bugs from forgetting which if these is all lowercase and
  366. # which are camelCase, and at the same time avoid having to add a test for
  367. # every command, use all lowercase here and test against command_name.lower().
  368. _SENSITIVE_COMMANDS = set(
  369. ["authenticate", "saslstart", "saslcontinue", "getnonce", "createuser",
  370. "updateuser", "copydbgetnonce", "copydbsaslstart", "copydb"])
  371. # The "hello" command is also deemed sensitive when attempting speculative
  372. # authentication.
  373. def _is_speculative_authenticate(command_name, doc):
  374. if (command_name.lower() in ('hello', HelloCompat.LEGACY_CMD) and
  375. 'speculativeAuthenticate' in doc):
  376. return True
  377. return False
  378. class _CommandEvent(object):
  379. """Base class for command events."""
  380. __slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id",
  381. "__service_id")
  382. def __init__(self, command_name, request_id, connection_id, operation_id,
  383. service_id=None):
  384. self.__cmd_name = command_name
  385. self.__rqst_id = request_id
  386. self.__conn_id = connection_id
  387. self.__op_id = operation_id
  388. self.__service_id = service_id
  389. @property
  390. def command_name(self):
  391. """The command name."""
  392. return self.__cmd_name
  393. @property
  394. def request_id(self):
  395. """The request id for this operation."""
  396. return self.__rqst_id
  397. @property
  398. def connection_id(self):
  399. """The address (host, port) of the server this command was sent to."""
  400. return self.__conn_id
  401. @property
  402. def service_id(self):
  403. """The service_id this command was sent to, or ``None``.
  404. .. versionadded:: 3.12
  405. """
  406. return self.__service_id
  407. @property
  408. def operation_id(self):
  409. """An id for this series of events or None."""
  410. return self.__op_id
  411. class CommandStartedEvent(_CommandEvent):
  412. """Event published when a command starts.
  413. :Parameters:
  414. - `command`: The command document.
  415. - `database_name`: The name of the database this command was run against.
  416. - `request_id`: The request id for this operation.
  417. - `connection_id`: The address (host, port) of the server this command
  418. was sent to.
  419. - `operation_id`: An optional identifier for a series of related events.
  420. - `service_id`: The service_id this command was sent to, or ``None``.
  421. """
  422. __slots__ = ("__cmd", "__db")
  423. def __init__(self, command, database_name, request_id, connection_id,
  424. operation_id, service_id=None):
  425. if not command:
  426. raise ValueError("%r is not a valid command" % (command,))
  427. # Command name must be first key.
  428. command_name = next(iter(command))
  429. super(CommandStartedEvent, self).__init__(
  430. command_name, request_id, connection_id, operation_id,
  431. service_id=service_id)
  432. cmd_name, cmd_doc = command_name.lower(), command[command_name]
  433. if (cmd_name in _SENSITIVE_COMMANDS or
  434. _is_speculative_authenticate(cmd_name, command)):
  435. self.__cmd = {}
  436. else:
  437. self.__cmd = command
  438. self.__db = database_name
  439. @property
  440. def command(self):
  441. """The command document."""
  442. return self.__cmd
  443. @property
  444. def database_name(self):
  445. """The name of the database this command was run against."""
  446. return self.__db
  447. def __repr__(self):
  448. return (
  449. "<%s %s db: %r, command: %r, operation_id: %s, "
  450. "service_id: %s>") % (
  451. self.__class__.__name__, self.connection_id,
  452. self.database_name, self.command_name, self.operation_id,
  453. self.service_id)
  454. class CommandSucceededEvent(_CommandEvent):
  455. """Event published when a command succeeds.
  456. :Parameters:
  457. - `duration`: The command duration as a datetime.timedelta.
  458. - `reply`: The server reply document.
  459. - `command_name`: The command name.
  460. - `request_id`: The request id for this operation.
  461. - `connection_id`: The address (host, port) of the server this command
  462. was sent to.
  463. - `operation_id`: An optional identifier for a series of related events.
  464. - `service_id`: The service_id this command was sent to, or ``None``.
  465. """
  466. __slots__ = ("__duration_micros", "__reply")
  467. def __init__(self, duration, reply, command_name,
  468. request_id, connection_id, operation_id, service_id=None):
  469. super(CommandSucceededEvent, self).__init__(
  470. command_name, request_id, connection_id, operation_id,
  471. service_id=service_id)
  472. self.__duration_micros = _to_micros(duration)
  473. cmd_name = command_name.lower()
  474. if (cmd_name in _SENSITIVE_COMMANDS or
  475. _is_speculative_authenticate(cmd_name, reply)):
  476. self.__reply = {}
  477. else:
  478. self.__reply = reply
  479. @property
  480. def duration_micros(self):
  481. """The duration of this operation in microseconds."""
  482. return self.__duration_micros
  483. @property
  484. def reply(self):
  485. """The server failure document for this operation."""
  486. return self.__reply
  487. def __repr__(self):
  488. return (
  489. "<%s %s command: %r, operation_id: %s, duration_micros: %s, "
  490. "service_id: %s>") % (
  491. self.__class__.__name__, self.connection_id,
  492. self.command_name, self.operation_id, self.duration_micros,
  493. self.service_id)
  494. class CommandFailedEvent(_CommandEvent):
  495. """Event published when a command fails.
  496. :Parameters:
  497. - `duration`: The command duration as a datetime.timedelta.
  498. - `failure`: The server reply document.
  499. - `command_name`: The command name.
  500. - `request_id`: The request id for this operation.
  501. - `connection_id`: The address (host, port) of the server this command
  502. was sent to.
  503. - `operation_id`: An optional identifier for a series of related events.
  504. - `service_id`: The service_id this command was sent to, or ``None``.
  505. """
  506. __slots__ = ("__duration_micros", "__failure")
  507. def __init__(self, duration, failure, command_name, request_id,
  508. connection_id, operation_id, service_id=None):
  509. super(CommandFailedEvent, self).__init__(
  510. command_name, request_id, connection_id, operation_id,
  511. service_id=service_id)
  512. self.__duration_micros = _to_micros(duration)
  513. self.__failure = failure
  514. @property
  515. def duration_micros(self):
  516. """The duration of this operation in microseconds."""
  517. return self.__duration_micros
  518. @property
  519. def failure(self):
  520. """The server failure document for this operation."""
  521. return self.__failure
  522. def __repr__(self):
  523. return (
  524. "<%s %s command: %r, operation_id: %s, duration_micros: %s, "
  525. "failure: %r, service_id: %s>") % (
  526. self.__class__.__name__, self.connection_id, self.command_name,
  527. self.operation_id, self.duration_micros, self.failure,
  528. self.service_id)
  529. class _PoolEvent(object):
  530. """Base class for pool events."""
  531. __slots__ = ("__address",)
  532. def __init__(self, address):
  533. self.__address = address
  534. @property
  535. def address(self):
  536. """The address (host, port) pair of the server the pool is attempting
  537. to connect to.
  538. """
  539. return self.__address
  540. def __repr__(self):
  541. return '%s(%r)' % (self.__class__.__name__, self.__address)
  542. class PoolCreatedEvent(_PoolEvent):
  543. """Published when a Connection Pool is created.
  544. :Parameters:
  545. - `address`: The address (host, port) pair of the server this Pool is
  546. attempting to connect to.
  547. .. versionadded:: 3.9
  548. """
  549. __slots__ = ("__options",)
  550. def __init__(self, address, options):
  551. super(PoolCreatedEvent, self).__init__(address)
  552. self.__options = options
  553. @property
  554. def options(self):
  555. """Any non-default pool options that were set on this Connection Pool.
  556. """
  557. return self.__options
  558. def __repr__(self):
  559. return '%s(%r, %r)' % (
  560. self.__class__.__name__, self.address, self.__options)
  561. class PoolClearedEvent(_PoolEvent):
  562. """Published when a Connection Pool is cleared.
  563. :Parameters:
  564. - `address`: The address (host, port) pair of the server this Pool is
  565. attempting to connect to.
  566. - `service_id`: The service_id this command was sent to, or ``None``.
  567. .. versionadded:: 3.9
  568. """
  569. __slots__ = ("__service_id",)
  570. def __init__(self, address, service_id=None):
  571. super(PoolClearedEvent, self).__init__(address)
  572. self.__service_id = service_id
  573. @property
  574. def service_id(self):
  575. """Connections with this service_id are cleared.
  576. When service_id is ``None``, all connections in the pool are cleared.
  577. .. versionadded:: 3.12
  578. """
  579. return self.__service_id
  580. def __repr__(self):
  581. return '%s(%r, %r)' % (
  582. self.__class__.__name__, self.address, self.__service_id)
  583. class PoolClosedEvent(_PoolEvent):
  584. """Published when a Connection Pool is closed.
  585. :Parameters:
  586. - `address`: The address (host, port) pair of the server this Pool is
  587. attempting to connect to.
  588. .. versionadded:: 3.9
  589. """
  590. __slots__ = ()
  591. class ConnectionClosedReason(object):
  592. """An enum that defines values for `reason` on a
  593. :class:`ConnectionClosedEvent`.
  594. .. versionadded:: 3.9
  595. """
  596. STALE = 'stale'
  597. """The pool was cleared, making the connection no longer valid."""
  598. IDLE = 'idle'
  599. """The connection became stale by being idle for too long (maxIdleTimeMS).
  600. """
  601. ERROR = 'error'
  602. """The connection experienced an error, making it no longer valid."""
  603. POOL_CLOSED = 'poolClosed'
  604. """The pool was closed, making the connection no longer valid."""
  605. class ConnectionCheckOutFailedReason(object):
  606. """An enum that defines values for `reason` on a
  607. :class:`ConnectionCheckOutFailedEvent`.
  608. .. versionadded:: 3.9
  609. """
  610. TIMEOUT = 'timeout'
  611. """The connection check out attempt exceeded the specified timeout."""
  612. POOL_CLOSED = 'poolClosed'
  613. """The pool was previously closed, and cannot provide new connections."""
  614. CONN_ERROR = 'connectionError'
  615. """The connection check out attempt experienced an error while setting up
  616. a new connection.
  617. """
  618. class _ConnectionEvent(object):
  619. """Private base class for some connection events."""
  620. __slots__ = ("__address", "__connection_id")
  621. def __init__(self, address, connection_id):
  622. self.__address = address
  623. self.__connection_id = connection_id
  624. @property
  625. def address(self):
  626. """The address (host, port) pair of the server this connection is
  627. attempting to connect to.
  628. """
  629. return self.__address
  630. @property
  631. def connection_id(self):
  632. """The ID of the Connection."""
  633. return self.__connection_id
  634. def __repr__(self):
  635. return '%s(%r, %r)' % (
  636. self.__class__.__name__, self.__address, self.__connection_id)
  637. class ConnectionCreatedEvent(_ConnectionEvent):
  638. """Published when a Connection Pool creates a Connection object.
  639. NOTE: This connection is not ready for use until the
  640. :class:`ConnectionReadyEvent` is published.
  641. :Parameters:
  642. - `address`: The address (host, port) pair of the server this
  643. Connection is attempting to connect to.
  644. - `connection_id`: The integer ID of the Connection in this Pool.
  645. .. versionadded:: 3.9
  646. """
  647. __slots__ = ()
  648. class ConnectionReadyEvent(_ConnectionEvent):
  649. """Published when a Connection has finished its setup, and is ready to use.
  650. :Parameters:
  651. - `address`: The address (host, port) pair of the server this
  652. Connection is attempting to connect to.
  653. - `connection_id`: The integer ID of the Connection in this Pool.
  654. .. versionadded:: 3.9
  655. """
  656. __slots__ = ()
  657. class ConnectionClosedEvent(_ConnectionEvent):
  658. """Published when a Connection is closed.
  659. :Parameters:
  660. - `address`: The address (host, port) pair of the server this
  661. Connection is attempting to connect to.
  662. - `connection_id`: The integer ID of the Connection in this Pool.
  663. - `reason`: A reason explaining why this connection was closed.
  664. .. versionadded:: 3.9
  665. """
  666. __slots__ = ("__reason",)
  667. def __init__(self, address, connection_id, reason):
  668. super(ConnectionClosedEvent, self).__init__(address, connection_id)
  669. self.__reason = reason
  670. @property
  671. def reason(self):
  672. """A reason explaining why this connection was closed.
  673. The reason must be one of the strings from the
  674. :class:`ConnectionClosedReason` enum.
  675. """
  676. return self.__reason
  677. def __repr__(self):
  678. return '%s(%r, %r, %r)' % (
  679. self.__class__.__name__, self.address, self.connection_id,
  680. self.__reason)
  681. class ConnectionCheckOutStartedEvent(object):
  682. """Published when the driver starts attempting to check out a connection.
  683. :Parameters:
  684. - `address`: The address (host, port) pair of the server this
  685. Connection is attempting to connect to.
  686. .. versionadded:: 3.9
  687. """
  688. __slots__ = ("__address",)
  689. def __init__(self, address):
  690. self.__address = address
  691. @property
  692. def address(self):
  693. """The address (host, port) pair of the server this connection is
  694. attempting to connect to.
  695. """
  696. return self.__address
  697. def __repr__(self):
  698. return '%s(%r)' % (self.__class__.__name__, self.__address)
  699. class ConnectionCheckOutFailedEvent(object):
  700. """Published when the driver's attempt to check out a connection fails.
  701. :Parameters:
  702. - `address`: The address (host, port) pair of the server this
  703. Connection is attempting to connect to.
  704. - `reason`: A reason explaining why connection check out failed.
  705. .. versionadded:: 3.9
  706. """
  707. __slots__ = ("__address", "__reason")
  708. def __init__(self, address, reason):
  709. self.__address = address
  710. self.__reason = reason
  711. @property
  712. def address(self):
  713. """The address (host, port) pair of the server this connection is
  714. attempting to connect to.
  715. """
  716. return self.__address
  717. @property
  718. def reason(self):
  719. """A reason explaining why connection check out failed.
  720. The reason must be one of the strings from the
  721. :class:`ConnectionCheckOutFailedReason` enum.
  722. """
  723. return self.__reason
  724. def __repr__(self):
  725. return '%s(%r, %r)' % (
  726. self.__class__.__name__, self.__address, self.__reason)
  727. class ConnectionCheckedOutEvent(_ConnectionEvent):
  728. """Published when the driver successfully checks out a Connection.
  729. :Parameters:
  730. - `address`: The address (host, port) pair of the server this
  731. Connection is attempting to connect to.
  732. - `connection_id`: The integer ID of the Connection in this Pool.
  733. .. versionadded:: 3.9
  734. """
  735. __slots__ = ()
  736. class ConnectionCheckedInEvent(_ConnectionEvent):
  737. """Published when the driver checks in a Connection into the Pool.
  738. :Parameters:
  739. - `address`: The address (host, port) pair of the server this
  740. Connection is attempting to connect to.
  741. - `connection_id`: The integer ID of the Connection in this Pool.
  742. .. versionadded:: 3.9
  743. """
  744. __slots__ = ()
  745. class _ServerEvent(object):
  746. """Base class for server events."""
  747. __slots__ = ("__server_address", "__topology_id")
  748. def __init__(self, server_address, topology_id):
  749. self.__server_address = server_address
  750. self.__topology_id = topology_id
  751. @property
  752. def server_address(self):
  753. """The address (host, port) pair of the server"""
  754. return self.__server_address
  755. @property
  756. def topology_id(self):
  757. """A unique identifier for the topology this server is a part of."""
  758. return self.__topology_id
  759. def __repr__(self):
  760. return "<%s %s topology_id: %s>" % (
  761. self.__class__.__name__, self.server_address, self.topology_id)
  762. class ServerDescriptionChangedEvent(_ServerEvent):
  763. """Published when server description changes.
  764. .. versionadded:: 3.3
  765. """
  766. __slots__ = ('__previous_description', '__new_description')
  767. def __init__(self, previous_description, new_description, *args):
  768. super(ServerDescriptionChangedEvent, self).__init__(*args)
  769. self.__previous_description = previous_description
  770. self.__new_description = new_description
  771. @property
  772. def previous_description(self):
  773. """The previous
  774. :class:`~pymongo.server_description.ServerDescription`."""
  775. return self.__previous_description
  776. @property
  777. def new_description(self):
  778. """The new
  779. :class:`~pymongo.server_description.ServerDescription`."""
  780. return self.__new_description
  781. def __repr__(self):
  782. return "<%s %s changed from: %s, to: %s>" % (
  783. self.__class__.__name__, self.server_address,
  784. self.previous_description, self.new_description)
  785. class ServerOpeningEvent(_ServerEvent):
  786. """Published when server is initialized.
  787. .. versionadded:: 3.3
  788. """
  789. __slots__ = ()
  790. class ServerClosedEvent(_ServerEvent):
  791. """Published when server is closed.
  792. .. versionadded:: 3.3
  793. """
  794. __slots__ = ()
  795. class TopologyEvent(object):
  796. """Base class for topology description events."""
  797. __slots__ = ('__topology_id')
  798. def __init__(self, topology_id):
  799. self.__topology_id = topology_id
  800. @property
  801. def topology_id(self):
  802. """A unique identifier for the topology this server is a part of."""
  803. return self.__topology_id
  804. def __repr__(self):
  805. return "<%s topology_id: %s>" % (
  806. self.__class__.__name__, self.topology_id)
  807. class TopologyDescriptionChangedEvent(TopologyEvent):
  808. """Published when the topology description changes.
  809. .. versionadded:: 3.3
  810. """
  811. __slots__ = ('__previous_description', '__new_description')
  812. def __init__(self, previous_description, new_description, *args):
  813. super(TopologyDescriptionChangedEvent, self).__init__(*args)
  814. self.__previous_description = previous_description
  815. self.__new_description = new_description
  816. @property
  817. def previous_description(self):
  818. """The previous
  819. :class:`~pymongo.topology_description.TopologyDescription`."""
  820. return self.__previous_description
  821. @property
  822. def new_description(self):
  823. """The new
  824. :class:`~pymongo.topology_description.TopologyDescription`."""
  825. return self.__new_description
  826. def __repr__(self):
  827. return "<%s topology_id: %s changed from: %s, to: %s>" % (
  828. self.__class__.__name__, self.topology_id,
  829. self.previous_description, self.new_description)
  830. class TopologyOpenedEvent(TopologyEvent):
  831. """Published when the topology is initialized.
  832. .. versionadded:: 3.3
  833. """
  834. __slots__ = ()
  835. class TopologyClosedEvent(TopologyEvent):
  836. """Published when the topology is closed.
  837. .. versionadded:: 3.3
  838. """
  839. __slots__ = ()
  840. class _ServerHeartbeatEvent(object):
  841. """Base class for server heartbeat events."""
  842. __slots__ = ('__connection_id')
  843. def __init__(self, connection_id):
  844. self.__connection_id = connection_id
  845. @property
  846. def connection_id(self):
  847. """The address (host, port) of the server this heartbeat was sent
  848. to."""
  849. return self.__connection_id
  850. def __repr__(self):
  851. return "<%s %s>" % (self.__class__.__name__, self.connection_id)
  852. class ServerHeartbeatStartedEvent(_ServerHeartbeatEvent):
  853. """Published when a heartbeat is started.
  854. .. versionadded:: 3.3
  855. """
  856. __slots__ = ()
  857. class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent):
  858. """Fired when the server heartbeat succeeds.
  859. .. versionadded:: 3.3
  860. """
  861. __slots__ = ('__duration', '__reply', '__awaited')
  862. def __init__(self, duration, reply, connection_id, awaited=False):
  863. super(ServerHeartbeatSucceededEvent, self).__init__(connection_id)
  864. self.__duration = duration
  865. self.__reply = reply
  866. self.__awaited = awaited
  867. @property
  868. def duration(self):
  869. """The duration of this heartbeat in microseconds."""
  870. return self.__duration
  871. @property
  872. def reply(self):
  873. """An instance of :class:`~pymongo.ismaster.IsMaster`.
  874. .. warning:: :class:`~pymongo.ismaster.IsMaster` is deprecated.
  875. Starting with PyMongo 4.0 this attribute will return an instance
  876. of :class:`~pymongo.hello.Hello`, which provides the same API.
  877. """
  878. return self.__reply
  879. @property
  880. def awaited(self):
  881. """Whether the heartbeat was awaited.
  882. If true, then :meth:`duration` reflects the sum of the round trip time
  883. to the server and the time that the server waited before sending a
  884. response.
  885. """
  886. return self.__awaited
  887. def __repr__(self):
  888. return "<%s %s duration: %s, awaited: %s, reply: %s>" % (
  889. self.__class__.__name__, self.connection_id,
  890. self.duration, self.awaited, self.reply)
  891. class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent):
  892. """Fired when the server heartbeat fails, either with an "ok: 0"
  893. or a socket exception.
  894. .. versionadded:: 3.3
  895. """
  896. __slots__ = ('__duration', '__reply', '__awaited')
  897. def __init__(self, duration, reply, connection_id, awaited=False):
  898. super(ServerHeartbeatFailedEvent, self).__init__(connection_id)
  899. self.__duration = duration
  900. self.__reply = reply
  901. self.__awaited = awaited
  902. @property
  903. def duration(self):
  904. """The duration of this heartbeat in microseconds."""
  905. return self.__duration
  906. @property
  907. def reply(self):
  908. """A subclass of :exc:`Exception`."""
  909. return self.__reply
  910. @property
  911. def awaited(self):
  912. """Whether the heartbeat was awaited.
  913. If true, then :meth:`duration` reflects the sum of the round trip time
  914. to the server and the time that the server waited before sending a
  915. response.
  916. """
  917. return self.__awaited
  918. def __repr__(self):
  919. return "<%s %s duration: %s, awaited: %s, reply: %r>" % (
  920. self.__class__.__name__, self.connection_id,
  921. self.duration, self.awaited, self.reply)
  922. class _EventListeners(object):
  923. """Configure event listeners for a client instance.
  924. Any event listeners registered globally are included by default.
  925. :Parameters:
  926. - `listeners`: A list of event listeners.
  927. """
  928. def __init__(self, listeners):
  929. self.__command_listeners = _LISTENERS.command_listeners[:]
  930. self.__server_listeners = _LISTENERS.server_listeners[:]
  931. lst = _LISTENERS.server_heartbeat_listeners
  932. self.__server_heartbeat_listeners = lst[:]
  933. self.__topology_listeners = _LISTENERS.topology_listeners[:]
  934. self.__cmap_listeners = _LISTENERS.cmap_listeners[:]
  935. if listeners is not None:
  936. for lst in listeners:
  937. if isinstance(lst, CommandListener):
  938. self.__command_listeners.append(lst)
  939. if isinstance(lst, ServerListener):
  940. self.__server_listeners.append(lst)
  941. if isinstance(lst, ServerHeartbeatListener):
  942. self.__server_heartbeat_listeners.append(lst)
  943. if isinstance(lst, TopologyListener):
  944. self.__topology_listeners.append(lst)
  945. if isinstance(lst, ConnectionPoolListener):
  946. self.__cmap_listeners.append(lst)
  947. self.__enabled_for_commands = bool(self.__command_listeners)
  948. self.__enabled_for_server = bool(self.__server_listeners)
  949. self.__enabled_for_server_heartbeat = bool(
  950. self.__server_heartbeat_listeners)
  951. self.__enabled_for_topology = bool(self.__topology_listeners)
  952. self.__enabled_for_cmap = bool(self.__cmap_listeners)
  953. @property
  954. def enabled_for_commands(self):
  955. """Are any CommandListener instances registered?"""
  956. return self.__enabled_for_commands
  957. @property
  958. def enabled_for_server(self):
  959. """Are any ServerListener instances registered?"""
  960. return self.__enabled_for_server
  961. @property
  962. def enabled_for_server_heartbeat(self):
  963. """Are any ServerHeartbeatListener instances registered?"""
  964. return self.__enabled_for_server_heartbeat
  965. @property
  966. def enabled_for_topology(self):
  967. """Are any TopologyListener instances registered?"""
  968. return self.__enabled_for_topology
  969. @property
  970. def enabled_for_cmap(self):
  971. """Are any ConnectionPoolListener instances registered?"""
  972. return self.__enabled_for_cmap
  973. def event_listeners(self):
  974. """List of registered event listeners."""
  975. return (self.__command_listeners[:],
  976. self.__server_heartbeat_listeners[:],
  977. self.__server_listeners[:],
  978. self.__topology_listeners[:])
  979. def publish_command_start(self, command, database_name,
  980. request_id, connection_id, op_id=None,
  981. service_id=None):
  982. """Publish a CommandStartedEvent to all command listeners.
  983. :Parameters:
  984. - `command`: The command document.
  985. - `database_name`: The name of the database this command was run
  986. against.
  987. - `request_id`: The request id for this operation.
  988. - `connection_id`: The address (host, port) of the server this
  989. command was sent to.
  990. - `op_id`: The (optional) operation id for this operation.
  991. - `service_id`: The service_id this command was sent to, or ``None``.
  992. """
  993. if op_id is None:
  994. op_id = request_id
  995. event = CommandStartedEvent(
  996. command, database_name, request_id, connection_id, op_id,
  997. service_id=service_id)
  998. for subscriber in self.__command_listeners:
  999. try:
  1000. subscriber.started(event)
  1001. except Exception:
  1002. _handle_exception()
  1003. def publish_command_success(self, duration, reply, command_name,
  1004. request_id, connection_id, op_id=None,
  1005. service_id=None):
  1006. """Publish a CommandSucceededEvent to all command listeners.
  1007. :Parameters:
  1008. - `duration`: The command duration as a datetime.timedelta.
  1009. - `reply`: The server reply document.
  1010. - `command_name`: The command name.
  1011. - `request_id`: The request id for this operation.
  1012. - `connection_id`: The address (host, port) of the server this
  1013. command was sent to.
  1014. - `op_id`: The (optional) operation id for this operation.
  1015. - `service_id`: The service_id this command was sent to, or ``None``.
  1016. """
  1017. if op_id is None:
  1018. op_id = request_id
  1019. event = CommandSucceededEvent(
  1020. duration, reply, command_name, request_id, connection_id, op_id,
  1021. service_id)
  1022. for subscriber in self.__command_listeners:
  1023. try:
  1024. subscriber.succeeded(event)
  1025. except Exception:
  1026. _handle_exception()
  1027. def publish_command_failure(self, duration, failure, command_name,
  1028. request_id, connection_id, op_id=None,
  1029. service_id=None):
  1030. """Publish a CommandFailedEvent to all command listeners.
  1031. :Parameters:
  1032. - `duration`: The command duration as a datetime.timedelta.
  1033. - `failure`: The server reply document or failure description
  1034. document.
  1035. - `command_name`: The command name.
  1036. - `request_id`: The request id for this operation.
  1037. - `connection_id`: The address (host, port) of the server this
  1038. command was sent to.
  1039. - `op_id`: The (optional) operation id for this operation.
  1040. - `service_id`: The service_id this command was sent to, or ``None``.
  1041. """
  1042. if op_id is None:
  1043. op_id = request_id
  1044. event = CommandFailedEvent(
  1045. duration, failure, command_name, request_id, connection_id, op_id,
  1046. service_id=service_id)
  1047. for subscriber in self.__command_listeners:
  1048. try:
  1049. subscriber.failed(event)
  1050. except Exception:
  1051. _handle_exception()
  1052. def publish_server_heartbeat_started(self, connection_id):
  1053. """Publish a ServerHeartbeatStartedEvent to all server heartbeat
  1054. listeners.
  1055. :Parameters:
  1056. - `connection_id`: The address (host, port) pair of the connection.
  1057. """
  1058. event = ServerHeartbeatStartedEvent(connection_id)
  1059. for subscriber in self.__server_heartbeat_listeners:
  1060. try:
  1061. subscriber.started(event)
  1062. except Exception:
  1063. _handle_exception()
  1064. def publish_server_heartbeat_succeeded(self, connection_id, duration,
  1065. reply, awaited):
  1066. """Publish a ServerHeartbeatSucceededEvent to all server heartbeat
  1067. listeners.
  1068. :Parameters:
  1069. - `connection_id`: The address (host, port) pair of the connection.
  1070. - `duration`: The execution time of the event in the highest possible
  1071. resolution for the platform.
  1072. - `reply`: The command reply.
  1073. - `awaited`: True if the response was awaited.
  1074. """
  1075. event = ServerHeartbeatSucceededEvent(duration, reply, connection_id,
  1076. awaited)
  1077. for subscriber in self.__server_heartbeat_listeners:
  1078. try:
  1079. subscriber.succeeded(event)
  1080. except Exception:
  1081. _handle_exception()
  1082. def publish_server_heartbeat_failed(self, connection_id, duration, reply,
  1083. awaited):
  1084. """Publish a ServerHeartbeatFailedEvent to all server heartbeat
  1085. listeners.
  1086. :Parameters:
  1087. - `connection_id`: The address (host, port) pair of the connection.
  1088. - `duration`: The execution time of the event in the highest possible
  1089. resolution for the platform.
  1090. - `reply`: The command reply.
  1091. - `awaited`: True if the response was awaited.
  1092. """
  1093. event = ServerHeartbeatFailedEvent(duration, reply, connection_id,
  1094. awaited)
  1095. for subscriber in self.__server_heartbeat_listeners:
  1096. try:
  1097. subscriber.failed(event)
  1098. except Exception:
  1099. _handle_exception()
  1100. def publish_server_opened(self, server_address, topology_id):
  1101. """Publish a ServerOpeningEvent to all server listeners.
  1102. :Parameters:
  1103. - `server_address`: The address (host, port) pair of the server.
  1104. - `topology_id`: A unique identifier for the topology this server
  1105. is a part of.
  1106. """
  1107. event = ServerOpeningEvent(server_address, topology_id)
  1108. for subscriber in self.__server_listeners:
  1109. try:
  1110. subscriber.opened(event)
  1111. except Exception:
  1112. _handle_exception()
  1113. def publish_server_closed(self, server_address, topology_id):
  1114. """Publish a ServerClosedEvent to all server listeners.
  1115. :Parameters:
  1116. - `server_address`: The address (host, port) pair of the server.
  1117. - `topology_id`: A unique identifier for the topology this server
  1118. is a part of.
  1119. """
  1120. event = ServerClosedEvent(server_address, topology_id)
  1121. for subscriber in self.__server_listeners:
  1122. try:
  1123. subscriber.closed(event)
  1124. except Exception:
  1125. _handle_exception()
  1126. def publish_server_description_changed(self, previous_description,
  1127. new_description, server_address,
  1128. topology_id):
  1129. """Publish a ServerDescriptionChangedEvent to all server listeners.
  1130. :Parameters:
  1131. - `previous_description`: The previous server description.
  1132. - `server_address`: The address (host, port) pair of the server.
  1133. - `new_description`: The new server description.
  1134. - `topology_id`: A unique identifier for the topology this server
  1135. is a part of.
  1136. """
  1137. event = ServerDescriptionChangedEvent(previous_description,
  1138. new_description, server_address,
  1139. topology_id)
  1140. for subscriber in self.__server_listeners:
  1141. try:
  1142. subscriber.description_changed(event)
  1143. except Exception:
  1144. _handle_exception()
  1145. def publish_topology_opened(self, topology_id):
  1146. """Publish a TopologyOpenedEvent to all topology listeners.
  1147. :Parameters:
  1148. - `topology_id`: A unique identifier for the topology this server
  1149. is a part of.
  1150. """
  1151. event = TopologyOpenedEvent(topology_id)
  1152. for subscriber in self.__topology_listeners:
  1153. try:
  1154. subscriber.opened(event)
  1155. except Exception:
  1156. _handle_exception()
  1157. def publish_topology_closed(self, topology_id):
  1158. """Publish a TopologyClosedEvent to all topology listeners.
  1159. :Parameters:
  1160. - `topology_id`: A unique identifier for the topology this server
  1161. is a part of.
  1162. """
  1163. event = TopologyClosedEvent(topology_id)
  1164. for subscriber in self.__topology_listeners:
  1165. try:
  1166. subscriber.closed(event)
  1167. except Exception:
  1168. _handle_exception()
  1169. def publish_topology_description_changed(self, previous_description,
  1170. new_description, topology_id):
  1171. """Publish a TopologyDescriptionChangedEvent to all topology listeners.
  1172. :Parameters:
  1173. - `previous_description`: The previous topology description.
  1174. - `new_description`: The new topology description.
  1175. - `topology_id`: A unique identifier for the topology this server
  1176. is a part of.
  1177. """
  1178. event = TopologyDescriptionChangedEvent(previous_description,
  1179. new_description, topology_id)
  1180. for subscriber in self.__topology_listeners:
  1181. try:
  1182. subscriber.description_changed(event)
  1183. except Exception:
  1184. _handle_exception()
  1185. def publish_pool_created(self, address, options):
  1186. """Publish a :class:`PoolCreatedEvent` to all pool listeners.
  1187. """
  1188. event = PoolCreatedEvent(address, options)
  1189. for subscriber in self.__cmap_listeners:
  1190. try:
  1191. subscriber.pool_created(event)
  1192. except Exception:
  1193. _handle_exception()
  1194. def publish_pool_cleared(self, address, service_id):
  1195. """Publish a :class:`PoolClearedEvent` to all pool listeners.
  1196. """
  1197. event = PoolClearedEvent(address, service_id)
  1198. for subscriber in self.__cmap_listeners:
  1199. try:
  1200. subscriber.pool_cleared(event)
  1201. except Exception:
  1202. _handle_exception()
  1203. def publish_pool_closed(self, address):
  1204. """Publish a :class:`PoolClosedEvent` to all pool listeners.
  1205. """
  1206. event = PoolClosedEvent(address)
  1207. for subscriber in self.__cmap_listeners:
  1208. try:
  1209. subscriber.pool_closed(event)
  1210. except Exception:
  1211. _handle_exception()
  1212. def publish_connection_created(self, address, connection_id):
  1213. """Publish a :class:`ConnectionCreatedEvent` to all connection
  1214. listeners.
  1215. """
  1216. event = ConnectionCreatedEvent(address, connection_id)
  1217. for subscriber in self.__cmap_listeners:
  1218. try:
  1219. subscriber.connection_created(event)
  1220. except Exception:
  1221. _handle_exception()
  1222. def publish_connection_ready(self, address, connection_id):
  1223. """Publish a :class:`ConnectionReadyEvent` to all connection listeners.
  1224. """
  1225. event = ConnectionReadyEvent(address, connection_id)
  1226. for subscriber in self.__cmap_listeners:
  1227. try:
  1228. subscriber.connection_ready(event)
  1229. except Exception:
  1230. _handle_exception()
  1231. def publish_connection_closed(self, address, connection_id, reason):
  1232. """Publish a :class:`ConnectionClosedEvent` to all connection
  1233. listeners.
  1234. """
  1235. event = ConnectionClosedEvent(address, connection_id, reason)
  1236. for subscriber in self.__cmap_listeners:
  1237. try:
  1238. subscriber.connection_closed(event)
  1239. except Exception:
  1240. _handle_exception()
  1241. def publish_connection_check_out_started(self, address):
  1242. """Publish a :class:`ConnectionCheckOutStartedEvent` to all connection
  1243. listeners.
  1244. """
  1245. event = ConnectionCheckOutStartedEvent(address)
  1246. for subscriber in self.__cmap_listeners:
  1247. try:
  1248. subscriber.connection_check_out_started(event)
  1249. except Exception:
  1250. _handle_exception()
  1251. def publish_connection_check_out_failed(self, address, reason):
  1252. """Publish a :class:`ConnectionCheckOutFailedEvent` to all connection
  1253. listeners.
  1254. """
  1255. event = ConnectionCheckOutFailedEvent(address, reason)
  1256. for subscriber in self.__cmap_listeners:
  1257. try:
  1258. subscriber.connection_check_out_started(event)
  1259. except Exception:
  1260. _handle_exception()
  1261. def publish_connection_checked_out(self, address, connection_id):
  1262. """Publish a :class:`ConnectionCheckedOutEvent` to all connection
  1263. listeners.
  1264. """
  1265. event = ConnectionCheckedOutEvent(address, connection_id)
  1266. for subscriber in self.__cmap_listeners:
  1267. try:
  1268. subscriber.connection_checked_out(event)
  1269. except Exception:
  1270. _handle_exception()
  1271. def publish_connection_checked_in(self, address, connection_id):
  1272. """Publish a :class:`ConnectionCheckedInEvent` to all connection
  1273. listeners.
  1274. """
  1275. event = ConnectionCheckedInEvent(address, connection_id)
  1276. for subscriber in self.__cmap_listeners:
  1277. try:
  1278. subscriber.connection_checked_in(event)
  1279. except Exception:
  1280. _handle_exception()