server.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. # Copyright 2014-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Communicate with one MongoDB server in a topology."""
  15. from datetime import datetime
  16. from bson import _decode_all_selective
  17. from pymongo.errors import NotPrimaryError, OperationFailure
  18. from pymongo.helpers import _check_command_response
  19. from pymongo.message import _convert_exception, _OpMsg
  20. from pymongo.response import Response, PinnedResponse
  21. from pymongo.server_type import SERVER_TYPE
  22. _CURSOR_DOC_FIELDS = {'cursor': {'firstBatch': 1, 'nextBatch': 1}}
  23. class Server(object):
  24. def __init__(self, server_description, pool, monitor, topology_id=None,
  25. listeners=None, events=None):
  26. """Represent one MongoDB server."""
  27. self._description = server_description
  28. self._pool = pool
  29. self._monitor = monitor
  30. self._topology_id = topology_id
  31. self._publish = listeners is not None and listeners.enabled_for_server
  32. self._listener = listeners
  33. self._events = None
  34. if self._publish:
  35. self._events = events()
  36. def open(self):
  37. """Start monitoring, or restart after a fork.
  38. Multiple calls have no effect.
  39. """
  40. if not self._pool.opts.load_balanced:
  41. self._monitor.open()
  42. def reset(self, service_id=None):
  43. """Clear the connection pool."""
  44. self.pool.reset(service_id)
  45. def close(self):
  46. """Clear the connection pool and stop the monitor.
  47. Reconnect with open().
  48. """
  49. if self._publish:
  50. self._events.put((self._listener.publish_server_closed,
  51. (self._description.address, self._topology_id)))
  52. self._monitor.close()
  53. self._pool.reset()
  54. def request_check(self):
  55. """Check the server's state soon."""
  56. self._monitor.request_check()
  57. def run_operation(
  58. self, sock_info, operation,
  59. set_secondary_okay, listeners, unpack_res):
  60. """Run a _Query or _GetMore operation and return a Response object.
  61. This method is used only to run _Query/_GetMore operations from
  62. cursors.
  63. Can raise ConnectionFailure, OperationFailure, etc.
  64. :Parameters:
  65. - `operation`: A _Query or _GetMore object.
  66. - `set_secondary_okay`: Pass to operation.get_message.
  67. - `all_credentials`: dict, maps auth source to MongoCredential.
  68. - `listeners`: Instance of _EventListeners or None.
  69. - `unpack_res`: A callable that decodes the wire protocol response.
  70. """
  71. duration = None
  72. publish = listeners.enabled_for_commands
  73. if publish:
  74. start = datetime.now()
  75. use_cmd = operation.use_command(sock_info)
  76. more_to_come = (operation.sock_mgr
  77. and operation.sock_mgr.more_to_come)
  78. if more_to_come:
  79. request_id = 0
  80. else:
  81. message = operation.get_message(
  82. set_secondary_okay, sock_info, use_cmd)
  83. request_id, data, max_doc_size = self._split_message(message)
  84. if publish:
  85. cmd, dbn = operation.as_command(sock_info)
  86. listeners.publish_command_start(
  87. cmd, dbn, request_id, sock_info.address,
  88. service_id=sock_info.service_id)
  89. start = datetime.now()
  90. try:
  91. if more_to_come:
  92. reply = sock_info.receive_message(None)
  93. else:
  94. sock_info.send_message(data, max_doc_size)
  95. reply = sock_info.receive_message(request_id)
  96. # Unpack and check for command errors.
  97. if use_cmd:
  98. user_fields = _CURSOR_DOC_FIELDS
  99. legacy_response = False
  100. else:
  101. user_fields = None
  102. legacy_response = True
  103. docs = unpack_res(reply, operation.cursor_id,
  104. operation.codec_options,
  105. legacy_response=legacy_response,
  106. user_fields=user_fields)
  107. if use_cmd:
  108. first = docs[0]
  109. operation.client._process_response(first, operation.session)
  110. _check_command_response(first, sock_info.max_wire_version)
  111. except Exception as exc:
  112. if publish:
  113. duration = datetime.now() - start
  114. if isinstance(exc, (NotPrimaryError, OperationFailure)):
  115. failure = exc.details
  116. else:
  117. failure = _convert_exception(exc)
  118. listeners.publish_command_failure(
  119. duration, failure, operation.name,
  120. request_id, sock_info.address,
  121. service_id=sock_info.service_id)
  122. raise
  123. if publish:
  124. duration = datetime.now() - start
  125. # Must publish in find / getMore / explain command response
  126. # format.
  127. if use_cmd:
  128. res = docs[0]
  129. elif operation.name == "explain":
  130. res = docs[0] if docs else {}
  131. else:
  132. res = {"cursor": {"id": reply.cursor_id,
  133. "ns": operation.namespace()},
  134. "ok": 1}
  135. if operation.name == "find":
  136. res["cursor"]["firstBatch"] = docs
  137. else:
  138. res["cursor"]["nextBatch"] = docs
  139. listeners.publish_command_success(
  140. duration, res, operation.name, request_id,
  141. sock_info.address, service_id=sock_info.service_id)
  142. # Decrypt response.
  143. client = operation.client
  144. if client and client._encrypter:
  145. if use_cmd:
  146. decrypted = client._encrypter.decrypt(
  147. reply.raw_command_response())
  148. docs = _decode_all_selective(
  149. decrypted, operation.codec_options, user_fields)
  150. if client._should_pin_cursor(operation.session) or operation.exhaust:
  151. sock_info.pin_cursor()
  152. if isinstance(reply, _OpMsg):
  153. # In OP_MSG, the server keeps sending only if the
  154. # more_to_come flag is set.
  155. more_to_come = reply.more_to_come
  156. else:
  157. # In OP_REPLY, the server keeps sending until cursor_id is 0.
  158. more_to_come = bool(operation.exhaust and reply.cursor_id)
  159. if operation.sock_mgr:
  160. operation.sock_mgr.update_exhaust(more_to_come)
  161. response = PinnedResponse(
  162. data=reply,
  163. address=self._description.address,
  164. socket_info=sock_info,
  165. duration=duration,
  166. request_id=request_id,
  167. from_command=use_cmd,
  168. docs=docs,
  169. more_to_come=more_to_come)
  170. else:
  171. response = Response(
  172. data=reply,
  173. address=self._description.address,
  174. duration=duration,
  175. request_id=request_id,
  176. from_command=use_cmd,
  177. docs=docs)
  178. return response
  179. def get_socket(self, all_credentials, handler=None):
  180. return self.pool.get_socket(all_credentials, handler)
  181. @property
  182. def description(self):
  183. return self._description
  184. @description.setter
  185. def description(self, server_description):
  186. assert server_description.address == self._description.address
  187. self._description = server_description
  188. @property
  189. def pool(self):
  190. return self._pool
  191. def _split_message(self, message):
  192. """Return request_id, data, max_doc_size.
  193. :Parameters:
  194. - `message`: (request_id, data, max_doc_size) or (request_id, data)
  195. """
  196. if len(message) == 3:
  197. return message
  198. else:
  199. # get_more and kill_cursors messages don't include BSON documents.
  200. request_id, data = message
  201. return request_id, data, 0
  202. def __repr__(self):
  203. return '<%s %r>' % (self.__class__.__name__, self._description)