command_cursor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. # Copyright 2014-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """CommandCursor class to iterate over command results."""
  15. from collections import deque
  16. from bson import _convert_raw_document_lists_to_streams
  17. from bson.py3compat import integer_types
  18. from pymongo.cursor import _SocketManager, _CURSOR_CLOSED_ERRORS
  19. from pymongo.errors import (ConnectionFailure,
  20. InvalidOperation,
  21. OperationFailure)
  22. from pymongo.message import (_CursorAddress,
  23. _GetMore,
  24. _RawBatchGetMore)
  25. from pymongo.response import PinnedResponse
  26. class CommandCursor(object):
  27. """A cursor / iterator over command cursors."""
  28. _getmore_class = _GetMore
  29. def __init__(self, collection, cursor_info, address, retrieved=0,
  30. batch_size=0, max_await_time_ms=None, session=None,
  31. explicit_session=False):
  32. """Create a new command cursor.
  33. The parameter 'retrieved' is unused.
  34. """
  35. self.__sock_mgr = None
  36. self.__collection = collection
  37. self.__id = cursor_info['id']
  38. self.__data = deque(cursor_info['firstBatch'])
  39. self.__postbatchresumetoken = cursor_info.get('postBatchResumeToken')
  40. self.__address = address
  41. self.__batch_size = batch_size
  42. self.__max_await_time_ms = max_await_time_ms
  43. self.__session = session
  44. self.__explicit_session = explicit_session
  45. self.__killed = (self.__id == 0)
  46. if self.__killed:
  47. self.__end_session(True)
  48. if "ns" in cursor_info:
  49. self.__ns = cursor_info["ns"]
  50. else:
  51. self.__ns = collection.full_name
  52. self.batch_size(batch_size)
  53. if (not isinstance(max_await_time_ms, integer_types)
  54. and max_await_time_ms is not None):
  55. raise TypeError("max_await_time_ms must be an integer or None")
  56. def __del__(self):
  57. self.__die()
  58. def __die(self, synchronous=False):
  59. """Closes this cursor.
  60. """
  61. already_killed = self.__killed
  62. self.__killed = True
  63. if self.__id and not already_killed:
  64. cursor_id = self.__id
  65. address = _CursorAddress(
  66. self.__address, self.__ns)
  67. else:
  68. # Skip killCursors.
  69. cursor_id = 0
  70. address = None
  71. self.__collection.database.client._cleanup_cursor(
  72. synchronous,
  73. cursor_id,
  74. address,
  75. self.__sock_mgr,
  76. self.__session,
  77. self.__explicit_session)
  78. if not self.__explicit_session:
  79. self.__session = None
  80. self.__sock_mgr = None
  81. def __end_session(self, synchronous):
  82. if self.__session and not self.__explicit_session:
  83. self.__session._end_session(lock=synchronous)
  84. self.__session = None
  85. def close(self):
  86. """Explicitly close / kill this cursor.
  87. """
  88. self.__die(True)
  89. def batch_size(self, batch_size):
  90. """Limits the number of documents returned in one batch. Each batch
  91. requires a round trip to the server. It can be adjusted to optimize
  92. performance and limit data transfer.
  93. .. note:: batch_size can not override MongoDB's internal limits on the
  94. amount of data it will return to the client in a single batch (i.e
  95. if you set batch size to 1,000,000,000, MongoDB will currently only
  96. return 4-16MB of results per batch).
  97. Raises :exc:`TypeError` if `batch_size` is not an integer.
  98. Raises :exc:`ValueError` if `batch_size` is less than ``0``.
  99. :Parameters:
  100. - `batch_size`: The size of each batch of results requested.
  101. """
  102. if not isinstance(batch_size, integer_types):
  103. raise TypeError("batch_size must be an integer")
  104. if batch_size < 0:
  105. raise ValueError("batch_size must be >= 0")
  106. self.__batch_size = batch_size == 1 and 2 or batch_size
  107. return self
  108. def _has_next(self):
  109. """Returns `True` if the cursor has documents remaining from the
  110. previous batch."""
  111. return len(self.__data) > 0
  112. @property
  113. def _post_batch_resume_token(self):
  114. """Retrieve the postBatchResumeToken from the response to a
  115. changeStream aggregate or getMore."""
  116. return self.__postbatchresumetoken
  117. def _maybe_pin_connection(self, sock_info):
  118. client = self.__collection.database.client
  119. if not client._should_pin_cursor(self.__session):
  120. return
  121. if not self.__sock_mgr:
  122. sock_info.pin_cursor()
  123. sock_mgr = _SocketManager(sock_info, False)
  124. # Ensure the connection gets returned when the entire result is
  125. # returned in the first batch.
  126. if self.__id == 0:
  127. sock_mgr.close()
  128. else:
  129. self.__sock_mgr = sock_mgr
  130. def __send_message(self, operation):
  131. """Send a getmore message and handle the response.
  132. """
  133. client = self.__collection.database.client
  134. try:
  135. response = client._run_operation(
  136. operation, self._unpack_response, address=self.__address)
  137. except OperationFailure as exc:
  138. if exc.code in _CURSOR_CLOSED_ERRORS:
  139. # Don't send killCursors because the cursor is already closed.
  140. self.__killed = True
  141. # Return the session and pinned connection, if necessary.
  142. self.close()
  143. raise
  144. except ConnectionFailure:
  145. # Don't send killCursors because the cursor is already closed.
  146. self.__killed = True
  147. # Return the session and pinned connection, if necessary.
  148. self.close()
  149. raise
  150. except Exception:
  151. self.close()
  152. raise
  153. if isinstance(response, PinnedResponse):
  154. if not self.__sock_mgr:
  155. self.__sock_mgr = _SocketManager(response.socket_info,
  156. response.more_to_come)
  157. if response.from_command:
  158. cursor = response.docs[0]['cursor']
  159. documents = cursor['nextBatch']
  160. self.__postbatchresumetoken = cursor.get('postBatchResumeToken')
  161. self.__id = cursor['id']
  162. else:
  163. documents = response.docs
  164. self.__id = response.data.cursor_id
  165. if self.__id == 0:
  166. self.close()
  167. self.__data = deque(documents)
  168. def _unpack_response(self, response, cursor_id, codec_options,
  169. user_fields=None, legacy_response=False):
  170. return response.unpack_response(cursor_id, codec_options, user_fields,
  171. legacy_response)
  172. def _refresh(self):
  173. """Refreshes the cursor with more data from the server.
  174. Returns the length of self.__data after refresh. Will exit early if
  175. self.__data is already non-empty. Raises OperationFailure when the
  176. cursor cannot be refreshed due to an error on the query.
  177. """
  178. if len(self.__data) or self.__killed:
  179. return len(self.__data)
  180. if self.__id: # Get More
  181. dbname, collname = self.__ns.split('.', 1)
  182. read_pref = self.__collection._read_preference_for(self.session)
  183. self.__send_message(
  184. self._getmore_class(dbname,
  185. collname,
  186. self.__batch_size,
  187. self.__id,
  188. self.__collection.codec_options,
  189. read_pref,
  190. self.__session,
  191. self.__collection.database.client,
  192. self.__max_await_time_ms,
  193. self.__sock_mgr, False))
  194. else: # Cursor id is zero nothing else to return
  195. self.__die(True)
  196. return len(self.__data)
  197. @property
  198. def alive(self):
  199. """Does this cursor have the potential to return more data?
  200. Even if :attr:`alive` is ``True``, :meth:`next` can raise
  201. :exc:`StopIteration`. Best to use a for loop::
  202. for doc in collection.aggregate(pipeline):
  203. print(doc)
  204. .. note:: :attr:`alive` can be True while iterating a cursor from
  205. a failed server. In this case :attr:`alive` will return False after
  206. :meth:`next` fails to retrieve the next batch of results from the
  207. server.
  208. """
  209. return bool(len(self.__data) or (not self.__killed))
  210. @property
  211. def cursor_id(self):
  212. """Returns the id of the cursor."""
  213. return self.__id
  214. @property
  215. def address(self):
  216. """The (host, port) of the server used, or None.
  217. .. versionadded:: 3.0
  218. """
  219. return self.__address
  220. @property
  221. def session(self):
  222. """The cursor's :class:`~pymongo.client_session.ClientSession`, or None.
  223. .. versionadded:: 3.6
  224. """
  225. if self.__explicit_session:
  226. return self.__session
  227. def __iter__(self):
  228. return self
  229. def next(self):
  230. """Advance the cursor."""
  231. # Block until a document is returnable.
  232. while self.alive:
  233. doc = self._try_next(True)
  234. if doc is not None:
  235. return doc
  236. raise StopIteration
  237. __next__ = next
  238. def _try_next(self, get_more_allowed):
  239. """Advance the cursor blocking for at most one getMore command."""
  240. if not len(self.__data) and not self.__killed and get_more_allowed:
  241. self._refresh()
  242. if len(self.__data):
  243. coll = self.__collection
  244. return coll.database._fix_outgoing(self.__data.popleft(), coll)
  245. else:
  246. return None
  247. def __enter__(self):
  248. return self
  249. def __exit__(self, exc_type, exc_val, exc_tb):
  250. self.close()
  251. class RawBatchCommandCursor(CommandCursor):
  252. _getmore_class = _RawBatchGetMore
  253. def __init__(self, collection, cursor_info, address, retrieved=0,
  254. batch_size=0, max_await_time_ms=None, session=None,
  255. explicit_session=False):
  256. """Create a new cursor / iterator over raw batches of BSON data.
  257. Should not be called directly by application developers -
  258. see :meth:`~pymongo.collection.Collection.aggregate_raw_batches`
  259. instead.
  260. .. mongodoc:: cursors
  261. """
  262. assert not cursor_info.get('firstBatch')
  263. super(RawBatchCommandCursor, self).__init__(
  264. collection, cursor_info, address, retrieved, batch_size,
  265. max_await_time_ms, session, explicit_session)
  266. def _unpack_response(self, response, cursor_id, codec_options,
  267. user_fields=None, legacy_response=False):
  268. raw_response = response.raw_response(
  269. cursor_id, user_fields=user_fields)
  270. if not legacy_response:
  271. # OP_MSG returns firstBatch/nextBatch documents as a BSON array
  272. # Re-assemble the array of documents into a document stream
  273. _convert_raw_document_lists_to_streams(raw_response[0])
  274. return raw_response
  275. def __getitem__(self, index):
  276. raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor")