change_stream.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. # Copyright 2017 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Watch changes on a collection, a database, or the entire cluster."""
  15. import copy
  16. from bson import _bson_to_dict
  17. from bson.raw_bson import RawBSONDocument
  18. from pymongo import common
  19. from pymongo.aggregation import (_CollectionAggregationCommand,
  20. _DatabaseAggregationCommand)
  21. from pymongo.collation import validate_collation_or_none
  22. from pymongo.command_cursor import CommandCursor
  23. from pymongo.errors import (ConnectionFailure,
  24. CursorNotFound,
  25. InvalidOperation,
  26. OperationFailure,
  27. PyMongoError)
  28. # The change streams spec considers the following server errors from the
  29. # getMore command non-resumable. All other getMore errors are resumable.
  30. _RESUMABLE_GETMORE_ERRORS = frozenset([
  31. 6, # HostUnreachable
  32. 7, # HostNotFound
  33. 89, # NetworkTimeout
  34. 91, # ShutdownInProgress
  35. 189, # PrimarySteppedDown
  36. 262, # ExceededTimeLimit
  37. 9001, # SocketException
  38. 10107, # NotWritablePrimary
  39. 11600, # InterruptedAtShutdown
  40. 11602, # InterruptedDueToReplStateChange
  41. 13435, # NotPrimaryNoSecondaryOk
  42. 13436, # NotPrimaryOrSecondary
  43. 63, # StaleShardVersion
  44. 150, # StaleEpoch
  45. 13388, # StaleConfig
  46. 234, # RetryChangeStream
  47. 133, # FailedToSatisfyReadPreference
  48. 216, # ElectionInProgress
  49. ])
  50. class ChangeStream(object):
  51. """The internal abstract base class for change stream cursors.
  52. Should not be called directly by application developers. Use
  53. :meth:`pymongo.collection.Collection.watch`,
  54. :meth:`pymongo.database.Database.watch`, or
  55. :meth:`pymongo.mongo_client.MongoClient.watch` instead.
  56. .. versionadded:: 3.6
  57. .. mongodoc:: changeStreams
  58. """
  59. def __init__(self, target, pipeline, full_document, resume_after,
  60. max_await_time_ms, batch_size, collation,
  61. start_at_operation_time, session, start_after):
  62. if pipeline is None:
  63. pipeline = []
  64. elif not isinstance(pipeline, list):
  65. raise TypeError("pipeline must be a list")
  66. common.validate_string_or_none('full_document', full_document)
  67. validate_collation_or_none(collation)
  68. common.validate_non_negative_integer_or_none("batchSize", batch_size)
  69. self._decode_custom = False
  70. self._orig_codec_options = target.codec_options
  71. if target.codec_options.type_registry._decoder_map:
  72. self._decode_custom = True
  73. # Keep the type registry so that we support encoding custom types
  74. # in the pipeline.
  75. self._target = target.with_options(
  76. codec_options=target.codec_options.with_options(
  77. document_class=RawBSONDocument))
  78. else:
  79. self._target = target
  80. self._pipeline = copy.deepcopy(pipeline)
  81. self._full_document = full_document
  82. self._uses_start_after = start_after is not None
  83. self._uses_resume_after = resume_after is not None
  84. self._resume_token = copy.deepcopy(start_after or resume_after)
  85. self._max_await_time_ms = max_await_time_ms
  86. self._batch_size = batch_size
  87. self._collation = collation
  88. self._start_at_operation_time = start_at_operation_time
  89. self._session = session
  90. # Initialize cursor.
  91. self._cursor = self._create_cursor()
  92. @property
  93. def _aggregation_command_class(self):
  94. """The aggregation command class to be used."""
  95. raise NotImplementedError
  96. @property
  97. def _client(self):
  98. """The client against which the aggregation commands for
  99. this ChangeStream will be run. """
  100. raise NotImplementedError
  101. def _change_stream_options(self):
  102. """Return the options dict for the $changeStream pipeline stage."""
  103. options = {}
  104. if self._full_document is not None:
  105. options['fullDocument'] = self._full_document
  106. resume_token = self.resume_token
  107. if resume_token is not None:
  108. if self._uses_start_after:
  109. options['startAfter'] = resume_token
  110. else:
  111. options['resumeAfter'] = resume_token
  112. if self._start_at_operation_time is not None:
  113. options['startAtOperationTime'] = self._start_at_operation_time
  114. return options
  115. def _command_options(self):
  116. """Return the options dict for the aggregation command."""
  117. options = {}
  118. if self._max_await_time_ms is not None:
  119. options["maxAwaitTimeMS"] = self._max_await_time_ms
  120. if self._batch_size is not None:
  121. options["batchSize"] = self._batch_size
  122. return options
  123. def _aggregation_pipeline(self):
  124. """Return the full aggregation pipeline for this ChangeStream."""
  125. options = self._change_stream_options()
  126. full_pipeline = [{'$changeStream': options}]
  127. full_pipeline.extend(self._pipeline)
  128. return full_pipeline
  129. def _process_result(
  130. self, result, session, server, sock_info, secondary_ok):
  131. """Callback that caches the postBatchResumeToken or
  132. startAtOperationTime from a changeStream aggregate command response
  133. containing an empty batch of change documents.
  134. This is implemented as a callback because we need access to the wire
  135. version in order to determine whether to cache this value.
  136. """
  137. if not result['cursor']['firstBatch']:
  138. if 'postBatchResumeToken' in result['cursor']:
  139. self._resume_token = result['cursor']['postBatchResumeToken']
  140. elif (self._start_at_operation_time is None and
  141. self._uses_resume_after is False and
  142. self._uses_start_after is False and
  143. sock_info.max_wire_version >= 7):
  144. self._start_at_operation_time = result.get("operationTime")
  145. # PYTHON-2181: informative error on missing operationTime.
  146. if self._start_at_operation_time is None:
  147. raise OperationFailure(
  148. "Expected field 'operationTime' missing from command "
  149. "response : %r" % (result, ))
  150. def _run_aggregation_cmd(self, session, explicit_session):
  151. """Run the full aggregation pipeline for this ChangeStream and return
  152. the corresponding CommandCursor.
  153. """
  154. cmd = self._aggregation_command_class(
  155. self._target, CommandCursor, self._aggregation_pipeline(),
  156. self._command_options(), explicit_session,
  157. result_processor=self._process_result)
  158. return self._client._retryable_read(
  159. cmd.get_cursor, self._target._read_preference_for(session),
  160. session)
  161. def _create_cursor(self):
  162. with self._client._tmp_session(self._session, close=False) as s:
  163. return self._run_aggregation_cmd(
  164. session=s,
  165. explicit_session=self._session is not None)
  166. def _resume(self):
  167. """Reestablish this change stream after a resumable error."""
  168. try:
  169. self._cursor.close()
  170. except PyMongoError:
  171. pass
  172. self._cursor = self._create_cursor()
  173. def close(self):
  174. """Close this ChangeStream."""
  175. self._cursor.close()
  176. def __iter__(self):
  177. return self
  178. @property
  179. def resume_token(self):
  180. """The cached resume token that will be used to resume after the most
  181. recently returned change.
  182. .. versionadded:: 3.9
  183. """
  184. return copy.deepcopy(self._resume_token)
  185. def next(self):
  186. """Advance the cursor.
  187. This method blocks until the next change document is returned or an
  188. unrecoverable error is raised. This method is used when iterating over
  189. all changes in the cursor. For example::
  190. try:
  191. resume_token = None
  192. pipeline = [{'$match': {'operationType': 'insert'}}]
  193. with db.collection.watch(pipeline) as stream:
  194. for insert_change in stream:
  195. print(insert_change)
  196. resume_token = stream.resume_token
  197. except pymongo.errors.PyMongoError:
  198. # The ChangeStream encountered an unrecoverable error or the
  199. # resume attempt failed to recreate the cursor.
  200. if resume_token is None:
  201. # There is no usable resume token because there was a
  202. # failure during ChangeStream initialization.
  203. logging.error('...')
  204. else:
  205. # Use the interrupted ChangeStream's resume token to create
  206. # a new ChangeStream. The new stream will continue from the
  207. # last seen insert change without missing any events.
  208. with db.collection.watch(
  209. pipeline, resume_after=resume_token) as stream:
  210. for insert_change in stream:
  211. print(insert_change)
  212. Raises :exc:`StopIteration` if this ChangeStream is closed.
  213. """
  214. while self.alive:
  215. doc = self.try_next()
  216. if doc is not None:
  217. return doc
  218. raise StopIteration
  219. __next__ = next
  220. @property
  221. def alive(self):
  222. """Does this cursor have the potential to return more data?
  223. .. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise
  224. :exc:`StopIteration` and :meth:`try_next` can return ``None``.
  225. .. versionadded:: 3.8
  226. """
  227. return self._cursor.alive
  228. def try_next(self):
  229. """Advance the cursor without blocking indefinitely.
  230. This method returns the next change document without waiting
  231. indefinitely for the next change. For example::
  232. with db.collection.watch() as stream:
  233. while stream.alive:
  234. change = stream.try_next()
  235. # Note that the ChangeStream's resume token may be updated
  236. # even when no changes are returned.
  237. print("Current resume token: %r" % (stream.resume_token,))
  238. if change is not None:
  239. print("Change document: %r" % (change,))
  240. continue
  241. # We end up here when there are no recent changes.
  242. # Sleep for a while before trying again to avoid flooding
  243. # the server with getMore requests when no changes are
  244. # available.
  245. time.sleep(10)
  246. If no change document is cached locally then this method runs a single
  247. getMore command. If the getMore yields any documents, the next
  248. document is returned, otherwise, if the getMore returns no documents
  249. (because there have been no changes) then ``None`` is returned.
  250. :Returns:
  251. The next change document or ``None`` when no document is available
  252. after running a single getMore or when the cursor is closed.
  253. .. versionadded:: 3.8
  254. """
  255. # Attempt to get the next change with at most one getMore and at most
  256. # one resume attempt.
  257. try:
  258. change = self._cursor._try_next(True)
  259. except (ConnectionFailure, CursorNotFound):
  260. self._resume()
  261. change = self._cursor._try_next(False)
  262. except OperationFailure as exc:
  263. if exc._max_wire_version is None:
  264. raise
  265. is_resumable = ((exc._max_wire_version >= 9 and
  266. exc.has_error_label("ResumableChangeStreamError")) or
  267. (exc._max_wire_version < 9 and
  268. exc.code in _RESUMABLE_GETMORE_ERRORS))
  269. if not is_resumable:
  270. raise
  271. self._resume()
  272. change = self._cursor._try_next(False)
  273. # If no changes are available.
  274. if change is None:
  275. # We have either iterated over all documents in the cursor,
  276. # OR the most-recently returned batch is empty. In either case,
  277. # update the cached resume token with the postBatchResumeToken if
  278. # one was returned. We also clear the startAtOperationTime.
  279. if self._cursor._post_batch_resume_token is not None:
  280. self._resume_token = self._cursor._post_batch_resume_token
  281. self._start_at_operation_time = None
  282. return change
  283. # Else, changes are available.
  284. try:
  285. resume_token = change['_id']
  286. except KeyError:
  287. self.close()
  288. raise InvalidOperation(
  289. "Cannot provide resume functionality when the resume "
  290. "token is missing.")
  291. # If this is the last change document from the current batch, cache the
  292. # postBatchResumeToken.
  293. if (not self._cursor._has_next() and
  294. self._cursor._post_batch_resume_token):
  295. resume_token = self._cursor._post_batch_resume_token
  296. # Hereafter, don't use startAfter; instead use resumeAfter.
  297. self._uses_start_after = False
  298. self._uses_resume_after = True
  299. # Cache the resume token and clear startAtOperationTime.
  300. self._resume_token = resume_token
  301. self._start_at_operation_time = None
  302. if self._decode_custom:
  303. return _bson_to_dict(change.raw, self._orig_codec_options)
  304. return change
  305. def __enter__(self):
  306. return self
  307. def __exit__(self, exc_type, exc_val, exc_tb):
  308. self.close()
  309. class CollectionChangeStream(ChangeStream):
  310. """A change stream that watches changes on a single collection.
  311. Should not be called directly by application developers. Use
  312. helper method :meth:`pymongo.collection.Collection.watch` instead.
  313. .. versionadded:: 3.7
  314. """
  315. @property
  316. def _aggregation_command_class(self):
  317. return _CollectionAggregationCommand
  318. @property
  319. def _client(self):
  320. return self._target.database.client
  321. class DatabaseChangeStream(ChangeStream):
  322. """A change stream that watches changes on all collections in a database.
  323. Should not be called directly by application developers. Use
  324. helper method :meth:`pymongo.database.Database.watch` instead.
  325. .. versionadded:: 3.7
  326. """
  327. @property
  328. def _aggregation_command_class(self):
  329. return _DatabaseAggregationCommand
  330. @property
  331. def _client(self):
  332. return self._target.client
  333. class ClusterChangeStream(DatabaseChangeStream):
  334. """A change stream that watches changes on all collections in the cluster.
  335. Should not be called directly by application developers. Use
  336. helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.
  337. .. versionadded:: 3.7
  338. """
  339. def _change_stream_options(self):
  340. options = super(ClusterChangeStream, self)._change_stream_options()
  341. options["allChangesForCluster"] = True
  342. return options