network.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. # Copyright 2015-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Internal network layer helper methods."""
  15. import datetime
  16. import errno
  17. import socket
  18. import struct
  19. from bson import _decode_all_selective
  20. from bson.py3compat import PY3
  21. from pymongo import helpers, message
  22. from pymongo.common import MAX_MESSAGE_SIZE
  23. from pymongo.compression_support import decompress, _NO_COMPRESSION
  24. from pymongo.errors import (AutoReconnect,
  25. NotPrimaryError,
  26. OperationFailure,
  27. ProtocolError,
  28. NetworkTimeout,
  29. _OperationCancelled)
  30. from pymongo.message import _UNPACK_REPLY, _OpMsg
  31. from pymongo.monotonic import time
  32. from pymongo.socket_checker import _errno_from_exception
  33. _UNPACK_HEADER = struct.Struct("<iiii").unpack
  34. def command(sock_info, dbname, spec, secondary_ok, is_mongos,
  35. read_preference, codec_options, session, client, check=True,
  36. allowable_errors=None, address=None,
  37. check_keys=False, listeners=None, max_bson_size=None,
  38. read_concern=None,
  39. parse_write_concern_error=False,
  40. collation=None,
  41. compression_ctx=None,
  42. use_op_msg=False,
  43. unacknowledged=False,
  44. user_fields=None,
  45. exhaust_allowed=False):
  46. """Execute a command over the socket, or raise socket.error.
  47. :Parameters:
  48. - `sock`: a raw socket instance
  49. - `dbname`: name of the database on which to run the command
  50. - `spec`: a command document as an ordered dict type, eg SON.
  51. - `secondary_ok`: whether to set the secondaryOkay wire protocol bit
  52. - `is_mongos`: are we connected to a mongos?
  53. - `read_preference`: a read preference
  54. - `codec_options`: a CodecOptions instance
  55. - `session`: optional ClientSession instance.
  56. - `client`: optional MongoClient instance for updating $clusterTime.
  57. - `check`: raise OperationFailure if there are errors
  58. - `allowable_errors`: errors to ignore if `check` is True
  59. - `address`: the (host, port) of `sock`
  60. - `check_keys`: if True, check `spec` for invalid keys
  61. - `listeners`: An instance of :class:`~pymongo.monitoring.EventListeners`
  62. - `max_bson_size`: The maximum encoded bson size for this server
  63. - `read_concern`: The read concern for this command.
  64. - `parse_write_concern_error`: Whether to parse the ``writeConcernError``
  65. field in the command response.
  66. - `collation`: The collation for this command.
  67. - `compression_ctx`: optional compression Context.
  68. - `use_op_msg`: True if we should use OP_MSG.
  69. - `unacknowledged`: True if this is an unacknowledged command.
  70. - `user_fields` (optional): Response fields that should be decoded
  71. using the TypeDecoders from codec_options, passed to
  72. bson._decode_all_selective.
  73. - `exhaust_allowed`: True if we should enable OP_MSG exhaustAllowed.
  74. """
  75. name = next(iter(spec))
  76. ns = dbname + '.$cmd'
  77. flags = 4 if secondary_ok else 0
  78. # Publish the original command document, perhaps with lsid and $clusterTime.
  79. orig = spec
  80. if is_mongos and not use_op_msg:
  81. spec = message._maybe_add_read_preference(spec, read_preference)
  82. if read_concern and not (session and session.in_transaction):
  83. if read_concern.level:
  84. spec['readConcern'] = read_concern.document
  85. if session:
  86. session._update_read_concern(spec, sock_info)
  87. if collation is not None:
  88. spec['collation'] = collation
  89. publish = listeners is not None and listeners.enabled_for_commands
  90. if publish:
  91. start = datetime.datetime.now()
  92. if compression_ctx and name.lower() in _NO_COMPRESSION:
  93. compression_ctx = None
  94. if (client and client._encrypter and
  95. not client._encrypter._bypass_auto_encryption):
  96. spec = orig = client._encrypter.encrypt(
  97. dbname, spec, check_keys, codec_options)
  98. # We already checked the keys, no need to do it again.
  99. check_keys = False
  100. if use_op_msg:
  101. flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
  102. flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0
  103. request_id, msg, size, max_doc_size = message._op_msg(
  104. flags, spec, dbname, read_preference, secondary_ok, check_keys,
  105. codec_options, ctx=compression_ctx)
  106. # If this is an unacknowledged write then make sure the encoded doc(s)
  107. # are small enough, otherwise rely on the server to return an error.
  108. if (unacknowledged and max_bson_size is not None and
  109. max_doc_size > max_bson_size):
  110. message._raise_document_too_large(name, size, max_bson_size)
  111. else:
  112. request_id, msg, size = message.query(
  113. flags, ns, 0, -1, spec, None, codec_options, check_keys,
  114. compression_ctx)
  115. if (max_bson_size is not None
  116. and size > max_bson_size + message._COMMAND_OVERHEAD):
  117. message._raise_document_too_large(
  118. name, size, max_bson_size + message._COMMAND_OVERHEAD)
  119. if publish:
  120. encoding_duration = datetime.datetime.now() - start
  121. listeners.publish_command_start(orig, dbname, request_id, address,
  122. service_id=sock_info.service_id)
  123. start = datetime.datetime.now()
  124. try:
  125. sock_info.sock.sendall(msg)
  126. if use_op_msg and unacknowledged:
  127. # Unacknowledged, fake a successful command response.
  128. reply = None
  129. response_doc = {"ok": 1}
  130. else:
  131. reply = receive_message(sock_info, request_id)
  132. sock_info.more_to_come = reply.more_to_come
  133. unpacked_docs = reply.unpack_response(
  134. codec_options=codec_options, user_fields=user_fields)
  135. response_doc = unpacked_docs[0]
  136. if client:
  137. client._process_response(response_doc, session)
  138. if check:
  139. helpers._check_command_response(
  140. response_doc, sock_info.max_wire_version, allowable_errors,
  141. parse_write_concern_error=parse_write_concern_error)
  142. except Exception as exc:
  143. if publish:
  144. duration = (datetime.datetime.now() - start) + encoding_duration
  145. if isinstance(exc, (NotPrimaryError, OperationFailure)):
  146. failure = exc.details
  147. else:
  148. failure = message._convert_exception(exc)
  149. listeners.publish_command_failure(
  150. duration, failure, name, request_id, address,
  151. service_id=sock_info.service_id)
  152. raise
  153. if publish:
  154. duration = (datetime.datetime.now() - start) + encoding_duration
  155. listeners.publish_command_success(
  156. duration, response_doc, name, request_id, address,
  157. service_id=sock_info.service_id)
  158. if client and client._encrypter and reply:
  159. decrypted = client._encrypter.decrypt(reply.raw_command_response())
  160. response_doc = _decode_all_selective(decrypted, codec_options,
  161. user_fields)[0]
  162. return response_doc
  163. _UNPACK_COMPRESSION_HEADER = struct.Struct("<iiB").unpack
  164. def receive_message(sock_info, request_id, max_message_size=MAX_MESSAGE_SIZE):
  165. """Receive a raw BSON message or raise socket.error."""
  166. timeout = sock_info.sock.gettimeout()
  167. if timeout:
  168. deadline = time() + timeout
  169. else:
  170. deadline = None
  171. # Ignore the response's request id.
  172. length, _, response_to, op_code = _UNPACK_HEADER(
  173. _receive_data_on_socket(sock_info, 16, deadline))
  174. # No request_id for exhaust cursor "getMore".
  175. if request_id is not None:
  176. if request_id != response_to:
  177. raise ProtocolError("Got response id %r but expected "
  178. "%r" % (response_to, request_id))
  179. if length <= 16:
  180. raise ProtocolError("Message length (%r) not longer than standard "
  181. "message header size (16)" % (length,))
  182. if length > max_message_size:
  183. raise ProtocolError("Message length (%r) is larger than server max "
  184. "message size (%r)" % (length, max_message_size))
  185. if op_code == 2012:
  186. op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(
  187. _receive_data_on_socket(sock_info, 9, deadline))
  188. data = decompress(
  189. _receive_data_on_socket(sock_info, length - 25, deadline),
  190. compressor_id)
  191. else:
  192. data = _receive_data_on_socket(sock_info, length - 16, deadline)
  193. try:
  194. unpack_reply = _UNPACK_REPLY[op_code]
  195. except KeyError:
  196. raise ProtocolError("Got opcode %r but expected "
  197. "%r" % (op_code, _UNPACK_REPLY.keys()))
  198. return unpack_reply(data)
  199. _POLL_TIMEOUT = 0.5
  200. def wait_for_read(sock_info, deadline):
  201. """Block until at least one byte is read, or a timeout, or a cancel."""
  202. context = sock_info.cancel_context
  203. # Only Monitor connections can be cancelled.
  204. if context:
  205. sock = sock_info.sock
  206. while True:
  207. # SSLSocket can have buffered data which won't be caught by select.
  208. if hasattr(sock, 'pending') and sock.pending() > 0:
  209. readable = True
  210. else:
  211. # Wait up to 500ms for the socket to become readable and then
  212. # check for cancellation.
  213. if deadline:
  214. timeout = max(min(deadline - time(), _POLL_TIMEOUT), 0.001)
  215. else:
  216. timeout = _POLL_TIMEOUT
  217. readable = sock_info.socket_checker.select(
  218. sock, read=True, timeout=timeout)
  219. if context.cancelled:
  220. raise _OperationCancelled('hello cancelled')
  221. if readable:
  222. return
  223. if deadline and time() > deadline:
  224. raise socket.timeout("timed out")
  225. # memoryview was introduced in Python 2.7 but we only use it on Python 3
  226. # because before 2.7.4 the struct module did not support memoryview:
  227. # https://bugs.python.org/issue10212.
  228. # In Jython, using slice assignment on a memoryview results in a
  229. # NullPointerException.
  230. if not PY3:
  231. def _receive_data_on_socket(sock_info, length, deadline):
  232. buf = bytearray(length)
  233. i = 0
  234. while length:
  235. try:
  236. wait_for_read(sock_info, deadline)
  237. chunk = sock_info.sock.recv(length)
  238. except (IOError, OSError) as exc:
  239. if _errno_from_exception(exc) == errno.EINTR:
  240. continue
  241. raise
  242. if chunk == b"":
  243. raise AutoReconnect("connection closed")
  244. buf[i:i + len(chunk)] = chunk
  245. i += len(chunk)
  246. length -= len(chunk)
  247. return bytes(buf)
  248. else:
  249. def _receive_data_on_socket(sock_info, length, deadline):
  250. buf = bytearray(length)
  251. mv = memoryview(buf)
  252. bytes_read = 0
  253. while bytes_read < length:
  254. try:
  255. wait_for_read(sock_info, deadline)
  256. chunk_length = sock_info.sock.recv_into(mv[bytes_read:])
  257. except (IOError, OSError) as exc:
  258. if _errno_from_exception(exc) == errno.EINTR:
  259. continue
  260. raise
  261. if chunk_length == 0:
  262. raise AutoReconnect("connection closed")
  263. bytes_read += chunk_length
  264. return mv