message.py 62 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776
  1. # Copyright 2009-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """**DEPRECATED** Tools for creating `messages
  15. <http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>`_ to be sent to
  16. MongoDB.
  17. .. note:: This module is for internal use and is generally not needed by
  18. application developers.
  19. .. versionchanged:: 3.12
  20. This module is deprecated and will be removed in PyMongo 4.0.
  21. """
  22. import datetime
  23. import random
  24. import struct
  25. import bson
  26. from bson import (CodecOptions,
  27. decode,
  28. encode,
  29. _decode_selective,
  30. _dict_to_bson,
  31. _make_c_string)
  32. from bson.codec_options import DEFAULT_CODEC_OPTIONS
  33. from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS,
  34. RawBSONDocument)
  35. from bson.py3compat import b, StringIO
  36. from bson.son import SON
  37. try:
  38. from pymongo import _cmessage
  39. _use_c = True
  40. except ImportError:
  41. _use_c = False
  42. from pymongo.errors import (ConfigurationError,
  43. CursorNotFound,
  44. DocumentTooLarge,
  45. ExecutionTimeout,
  46. InvalidOperation,
  47. NotPrimaryError,
  48. OperationFailure,
  49. ProtocolError)
  50. from pymongo.hello_compat import HelloCompat
  51. from pymongo.read_concern import DEFAULT_READ_CONCERN
  52. from pymongo.read_preferences import ReadPreference
  53. from pymongo.write_concern import WriteConcern
  54. MAX_INT32 = 2147483647
  55. MIN_INT32 = -2147483648
  56. # Overhead allowed for encoded command documents.
  57. _COMMAND_OVERHEAD = 16382
  58. _INSERT = 0
  59. _UPDATE = 1
  60. _DELETE = 2
  61. _EMPTY = b''
  62. _BSONOBJ = b'\x03'
  63. _ZERO_8 = b'\x00'
  64. _ZERO_16 = b'\x00\x00'
  65. _ZERO_32 = b'\x00\x00\x00\x00'
  66. _ZERO_64 = b'\x00\x00\x00\x00\x00\x00\x00\x00'
  67. _SKIPLIM = b'\x00\x00\x00\x00\xff\xff\xff\xff'
  68. _OP_MAP = {
  69. _INSERT: b'\x04documents\x00\x00\x00\x00\x00',
  70. _UPDATE: b'\x04updates\x00\x00\x00\x00\x00',
  71. _DELETE: b'\x04deletes\x00\x00\x00\x00\x00',
  72. }
  73. _FIELD_MAP = {
  74. 'insert': 'documents',
  75. 'update': 'updates',
  76. 'delete': 'deletes'
  77. }
  78. _UJOIN = u"%s.%s"
  79. _UNICODE_REPLACE_CODEC_OPTIONS = CodecOptions(
  80. unicode_decode_error_handler='replace')
  81. def _randint():
  82. """Generate a pseudo random 32 bit integer."""
  83. return random.randint(MIN_INT32, MAX_INT32)
  84. def _maybe_add_read_preference(spec, read_preference):
  85. """Add $readPreference to spec when appropriate."""
  86. mode = read_preference.mode
  87. document = read_preference.document
  88. # Only add $readPreference if it's something other than primary to avoid
  89. # problems with mongos versions that don't support read preferences. Also,
  90. # for maximum backwards compatibility, don't add $readPreference for
  91. # secondaryPreferred unless tags or maxStalenessSeconds are in use (setting
  92. # the secondaryOkay bit has the same effect).
  93. if mode and (
  94. mode != ReadPreference.SECONDARY_PREFERRED.mode or
  95. len(document) > 1):
  96. if "$query" not in spec:
  97. spec = SON([("$query", spec)])
  98. spec["$readPreference"] = document
  99. return spec
  100. def _convert_exception(exception):
  101. """Convert an Exception into a failure document for publishing."""
  102. return {'errmsg': str(exception),
  103. 'errtype': exception.__class__.__name__}
  104. def _convert_write_result(operation, command, result):
  105. """Convert a legacy write result to write command format."""
  106. # Based on _merge_legacy from bulk.py
  107. affected = result.get("n", 0)
  108. res = {"ok": 1, "n": affected}
  109. errmsg = result.get("errmsg", result.get("err", ""))
  110. if errmsg:
  111. # The write was successful on at least the primary so don't return.
  112. if result.get("wtimeout"):
  113. res["writeConcernError"] = {"errmsg": errmsg,
  114. "code": 64,
  115. "errInfo": {"wtimeout": True}}
  116. else:
  117. # The write failed.
  118. error = {"index": 0,
  119. "code": result.get("code", 8),
  120. "errmsg": errmsg}
  121. if "errInfo" in result:
  122. error["errInfo"] = result["errInfo"]
  123. res["writeErrors"] = [error]
  124. return res
  125. if operation == "insert":
  126. # GLE result for insert is always 0 in most MongoDB versions.
  127. res["n"] = len(command['documents'])
  128. elif operation == "update":
  129. if "upserted" in result:
  130. res["upserted"] = [{"index": 0, "_id": result["upserted"]}]
  131. # Versions of MongoDB before 2.6 don't return the _id for an
  132. # upsert if _id is not an ObjectId.
  133. elif result.get("updatedExisting") is False and affected == 1:
  134. # If _id is in both the update document *and* the query spec
  135. # the update document _id takes precedence.
  136. update = command['updates'][0]
  137. _id = update["u"].get("_id", update["q"].get("_id"))
  138. res["upserted"] = [{"index": 0, "_id": _id}]
  139. return res
  140. _OPTIONS = SON([
  141. ('tailable', 2),
  142. ('oplogReplay', 8),
  143. ('noCursorTimeout', 16),
  144. ('awaitData', 32),
  145. ('allowPartialResults', 128)])
  146. _MODIFIERS = SON([
  147. ('$query', 'filter'),
  148. ('$orderby', 'sort'),
  149. ('$hint', 'hint'),
  150. ('$comment', 'comment'),
  151. ('$maxScan', 'maxScan'),
  152. ('$maxTimeMS', 'maxTimeMS'),
  153. ('$max', 'max'),
  154. ('$min', 'min'),
  155. ('$returnKey', 'returnKey'),
  156. ('$showRecordId', 'showRecordId'),
  157. ('$showDiskLoc', 'showRecordId'), # <= MongoDb 3.0
  158. ('$snapshot', 'snapshot')])
  159. def _gen_find_command(coll, spec, projection, skip, limit, batch_size, options,
  160. read_concern, collation=None, session=None,
  161. allow_disk_use=None):
  162. """Generate a find command document."""
  163. cmd = SON([('find', coll)])
  164. if '$query' in spec:
  165. cmd.update([(_MODIFIERS[key], val) if key in _MODIFIERS else (key, val)
  166. for key, val in spec.items()])
  167. if '$explain' in cmd:
  168. cmd.pop('$explain')
  169. if '$readPreference' in cmd:
  170. cmd.pop('$readPreference')
  171. else:
  172. cmd['filter'] = spec
  173. if projection:
  174. cmd['projection'] = projection
  175. if skip:
  176. cmd['skip'] = skip
  177. if limit:
  178. cmd['limit'] = abs(limit)
  179. if limit < 0:
  180. cmd['singleBatch'] = True
  181. if batch_size:
  182. cmd['batchSize'] = batch_size
  183. if read_concern.level and not (session and session.in_transaction):
  184. cmd['readConcern'] = read_concern.document
  185. if collation:
  186. cmd['collation'] = collation
  187. if allow_disk_use is not None:
  188. cmd['allowDiskUse'] = allow_disk_use
  189. if options:
  190. cmd.update([(opt, True)
  191. for opt, val in _OPTIONS.items()
  192. if options & val])
  193. return cmd
  194. def _gen_get_more_command(cursor_id, coll, batch_size, max_await_time_ms):
  195. """Generate a getMore command document."""
  196. cmd = SON([('getMore', cursor_id),
  197. ('collection', coll)])
  198. if batch_size:
  199. cmd['batchSize'] = batch_size
  200. if max_await_time_ms is not None:
  201. cmd['maxTimeMS'] = max_await_time_ms
  202. return cmd
  203. class _Query(object):
  204. """A query operation."""
  205. __slots__ = ('flags', 'db', 'coll', 'ntoskip', 'spec',
  206. 'fields', 'codec_options', 'read_preference', 'limit',
  207. 'batch_size', 'name', 'read_concern', 'collation',
  208. 'session', 'client', 'allow_disk_use', '_as_command',
  209. 'exhaust')
  210. # For compatibility with the _GetMore class.
  211. sock_mgr = None
  212. cursor_id = None
  213. def __init__(self, flags, db, coll, ntoskip, spec, fields,
  214. codec_options, read_preference, limit,
  215. batch_size, read_concern, collation, session, client,
  216. allow_disk_use, exhaust):
  217. self.flags = flags
  218. self.db = db
  219. self.coll = coll
  220. self.ntoskip = ntoskip
  221. self.spec = spec
  222. self.fields = fields
  223. self.codec_options = codec_options
  224. self.read_preference = read_preference
  225. self.read_concern = read_concern
  226. self.limit = limit
  227. self.batch_size = batch_size
  228. self.collation = collation
  229. self.session = session
  230. self.client = client
  231. self.allow_disk_use = allow_disk_use
  232. self.name = 'find'
  233. self._as_command = None
  234. self.exhaust = exhaust
  235. def namespace(self):
  236. return _UJOIN % (self.db, self.coll)
  237. def use_command(self, sock_info):
  238. use_find_cmd = False
  239. if sock_info.max_wire_version >= 4 and not self.exhaust:
  240. use_find_cmd = True
  241. elif sock_info.max_wire_version >= 8:
  242. # OP_MSG supports exhaust on MongoDB 4.2+
  243. use_find_cmd = True
  244. elif not self.read_concern.ok_for_legacy:
  245. raise ConfigurationError(
  246. 'read concern level of %s is not valid '
  247. 'with a max wire version of %d.'
  248. % (self.read_concern.level,
  249. sock_info.max_wire_version))
  250. if sock_info.max_wire_version < 5 and self.collation is not None:
  251. raise ConfigurationError(
  252. 'Specifying a collation is unsupported with a max wire '
  253. 'version of %d.' % (sock_info.max_wire_version,))
  254. if sock_info.max_wire_version < 4 and self.allow_disk_use is not None:
  255. raise ConfigurationError(
  256. 'Specifying allowDiskUse is unsupported with a max wire '
  257. 'version of %d.' % (sock_info.max_wire_version,))
  258. sock_info.validate_session(self.client, self.session)
  259. return use_find_cmd
  260. def as_command(self, sock_info):
  261. """Return a find command document for this query."""
  262. # We use the command twice: on the wire and for command monitoring.
  263. # Generate it once, for speed and to avoid repeating side-effects.
  264. if self._as_command is not None:
  265. return self._as_command
  266. explain = '$explain' in self.spec
  267. cmd = _gen_find_command(
  268. self.coll, self.spec, self.fields, self.ntoskip,
  269. self.limit, self.batch_size, self.flags, self.read_concern,
  270. self.collation, self.session, self.allow_disk_use)
  271. if explain:
  272. self.name = 'explain'
  273. cmd = SON([('explain', cmd)])
  274. session = self.session
  275. sock_info.add_server_api(cmd)
  276. if session:
  277. session._apply_to(cmd, False, self.read_preference, sock_info)
  278. # Explain does not support readConcern.
  279. if not explain and not session.in_transaction:
  280. session._update_read_concern(cmd, sock_info)
  281. sock_info.send_cluster_time(cmd, session, self.client)
  282. # Support auto encryption
  283. client = self.client
  284. if (client._encrypter and
  285. not client._encrypter._bypass_auto_encryption):
  286. cmd = client._encrypter.encrypt(
  287. self.db, cmd, False, self.codec_options)
  288. self._as_command = cmd, self.db
  289. return self._as_command
  290. def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
  291. """Get a query message, possibly setting the secondaryOk bit."""
  292. if set_secondary_ok:
  293. # Set the secondaryOk bit.
  294. flags = self.flags | 4
  295. else:
  296. flags = self.flags
  297. ns = self.namespace()
  298. spec = self.spec
  299. if use_cmd:
  300. spec = self.as_command(sock_info)[0]
  301. if sock_info.op_msg_enabled:
  302. request_id, msg, size, _ = _op_msg(
  303. 0, spec, self.db, self.read_preference,
  304. set_secondary_ok, False, self.codec_options,
  305. ctx=sock_info.compression_context)
  306. return request_id, msg, size
  307. ns = _UJOIN % (self.db, "$cmd")
  308. ntoreturn = -1 # All DB commands return 1 document
  309. else:
  310. # OP_QUERY treats ntoreturn of -1 and 1 the same, return
  311. # one document and close the cursor. We have to use 2 for
  312. # batch size if 1 is specified.
  313. ntoreturn = self.batch_size == 1 and 2 or self.batch_size
  314. if self.limit:
  315. if ntoreturn:
  316. ntoreturn = min(self.limit, ntoreturn)
  317. else:
  318. ntoreturn = self.limit
  319. if sock_info.is_mongos:
  320. spec = _maybe_add_read_preference(spec,
  321. self.read_preference)
  322. return query(flags, ns, self.ntoskip, ntoreturn,
  323. spec, None if use_cmd else self.fields,
  324. self.codec_options, ctx=sock_info.compression_context)
  325. class _GetMore(object):
  326. """A getmore operation."""
  327. __slots__ = ('db', 'coll', 'ntoreturn', 'cursor_id', 'max_await_time_ms',
  328. 'codec_options', 'read_preference', 'session', 'client',
  329. 'sock_mgr', '_as_command', 'exhaust')
  330. name = 'getMore'
  331. def __init__(self, db, coll, ntoreturn, cursor_id, codec_options,
  332. read_preference, session, client, max_await_time_ms,
  333. sock_mgr, exhaust):
  334. self.db = db
  335. self.coll = coll
  336. self.ntoreturn = ntoreturn
  337. self.cursor_id = cursor_id
  338. self.codec_options = codec_options
  339. self.read_preference = read_preference
  340. self.session = session
  341. self.client = client
  342. self.max_await_time_ms = max_await_time_ms
  343. self.sock_mgr = sock_mgr
  344. self._as_command = None
  345. self.exhaust = exhaust
  346. def namespace(self):
  347. return _UJOIN % (self.db, self.coll)
  348. def use_command(self, sock_info):
  349. use_cmd = False
  350. if sock_info.max_wire_version >= 4 and not self.exhaust:
  351. use_cmd = True
  352. elif sock_info.max_wire_version >= 8:
  353. # OP_MSG supports exhaust on MongoDB 4.2+
  354. use_cmd = True
  355. sock_info.validate_session(self.client, self.session)
  356. return use_cmd
  357. def as_command(self, sock_info):
  358. """Return a getMore command document for this query."""
  359. # See _Query.as_command for an explanation of this caching.
  360. if self._as_command is not None:
  361. return self._as_command
  362. cmd = _gen_get_more_command(self.cursor_id, self.coll,
  363. self.ntoreturn,
  364. self.max_await_time_ms)
  365. if self.session:
  366. self.session._apply_to(cmd, False, self.read_preference, sock_info)
  367. sock_info.add_server_api(cmd)
  368. sock_info.send_cluster_time(cmd, self.session, self.client)
  369. # Support auto encryption
  370. client = self.client
  371. if (client._encrypter and
  372. not client._encrypter._bypass_auto_encryption):
  373. cmd = client._encrypter.encrypt(
  374. self.db, cmd, False, self.codec_options)
  375. self._as_command = cmd, self.db
  376. return self._as_command
  377. def get_message(self, dummy0, sock_info, use_cmd=False):
  378. """Get a getmore message."""
  379. ns = self.namespace()
  380. ctx = sock_info.compression_context
  381. if use_cmd:
  382. spec = self.as_command(sock_info)[0]
  383. if sock_info.op_msg_enabled:
  384. if self.sock_mgr:
  385. flags = _OpMsg.EXHAUST_ALLOWED
  386. else:
  387. flags = 0
  388. request_id, msg, size, _ = _op_msg(
  389. flags, spec, self.db, None,
  390. False, False, self.codec_options,
  391. ctx=sock_info.compression_context)
  392. return request_id, msg, size
  393. ns = _UJOIN % (self.db, "$cmd")
  394. return query(0, ns, 0, -1, spec, None, self.codec_options, ctx=ctx)
  395. return get_more(ns, self.ntoreturn, self.cursor_id, ctx)
  396. class _RawBatchQuery(_Query):
  397. def use_command(self, sock_info):
  398. # Compatibility checks.
  399. super(_RawBatchQuery, self).use_command(sock_info)
  400. if sock_info.max_wire_version >= 8:
  401. # MongoDB 4.2+ supports exhaust over OP_MSG
  402. return True
  403. elif sock_info.op_msg_enabled and not self.exhaust:
  404. return True
  405. return False
  406. class _RawBatchGetMore(_GetMore):
  407. def use_command(self, sock_info):
  408. # Compatibility checks.
  409. super(_RawBatchGetMore, self).use_command(sock_info)
  410. if sock_info.max_wire_version >= 8:
  411. # MongoDB 4.2+ supports exhaust over OP_MSG
  412. return True
  413. elif sock_info.op_msg_enabled and not self.exhaust:
  414. return True
  415. return False
  416. class _CursorAddress(tuple):
  417. """The server address (host, port) of a cursor, with namespace property."""
  418. def __new__(cls, address, namespace):
  419. self = tuple.__new__(cls, address)
  420. self.__namespace = namespace
  421. return self
  422. @property
  423. def namespace(self):
  424. """The namespace this cursor."""
  425. return self.__namespace
  426. def __hash__(self):
  427. # Two _CursorAddress instances with different namespaces
  428. # must not hash the same.
  429. return (self + (self.__namespace,)).__hash__()
  430. def __eq__(self, other):
  431. if isinstance(other, _CursorAddress):
  432. return (tuple(self) == tuple(other)
  433. and self.namespace == other.namespace)
  434. return NotImplemented
  435. def __ne__(self, other):
  436. return not self == other
  437. _pack_compression_header = struct.Struct("<iiiiiiB").pack
  438. _COMPRESSION_HEADER_SIZE = 25
  439. def _compress(operation, data, ctx):
  440. """Takes message data, compresses it, and adds an OP_COMPRESSED header."""
  441. compressed = ctx.compress(data)
  442. request_id = _randint()
  443. header = _pack_compression_header(
  444. _COMPRESSION_HEADER_SIZE + len(compressed), # Total message length
  445. request_id, # Request id
  446. 0, # responseTo
  447. 2012, # operation id
  448. operation, # original operation id
  449. len(data), # uncompressed message length
  450. ctx.compressor_id) # compressor id
  451. return request_id, header + compressed
  452. def __last_error(namespace, args):
  453. """Data to send to do a lastError.
  454. """
  455. cmd = SON([("getlasterror", 1)])
  456. cmd.update(args)
  457. splitns = namespace.split('.', 1)
  458. return query(0, splitns[0] + '.$cmd', 0, -1, cmd,
  459. None, DEFAULT_CODEC_OPTIONS)
  460. _pack_header = struct.Struct("<iiii").pack
  461. def __pack_message(operation, data):
  462. """Takes message data and adds a message header based on the operation.
  463. Returns the resultant message string.
  464. """
  465. rid = _randint()
  466. message = _pack_header(16 + len(data), rid, 0, operation)
  467. return rid, message + data
  468. _pack_int = struct.Struct("<i").pack
  469. def _insert(collection_name, docs, check_keys, flags, opts):
  470. """Get an OP_INSERT message"""
  471. encode = _dict_to_bson # Make local. Uses extensions.
  472. if len(docs) == 1:
  473. encoded = encode(docs[0], check_keys, opts)
  474. return b"".join([
  475. b"\x00\x00\x00\x00", # Flags don't matter for one doc.
  476. _make_c_string(collection_name),
  477. encoded]), len(encoded)
  478. encoded = [encode(doc, check_keys, opts) for doc in docs]
  479. if not encoded:
  480. raise InvalidOperation("cannot do an empty bulk insert")
  481. return b"".join([
  482. _pack_int(flags),
  483. _make_c_string(collection_name),
  484. b"".join(encoded)]), max(map(len, encoded))
  485. def _insert_compressed(
  486. collection_name, docs, check_keys, continue_on_error, opts, ctx):
  487. """Internal compressed unacknowledged insert message helper."""
  488. op_insert, max_bson_size = _insert(
  489. collection_name, docs, check_keys, continue_on_error, opts)
  490. rid, msg = _compress(2002, op_insert, ctx)
  491. return rid, msg, max_bson_size
  492. def _insert_uncompressed(collection_name, docs, check_keys,
  493. safe, last_error_args, continue_on_error, opts):
  494. """Internal insert message helper."""
  495. op_insert, max_bson_size = _insert(
  496. collection_name, docs, check_keys, continue_on_error, opts)
  497. rid, msg = __pack_message(2002, op_insert)
  498. if safe:
  499. rid, gle, _ = __last_error(collection_name, last_error_args)
  500. return rid, msg + gle, max_bson_size
  501. return rid, msg, max_bson_size
  502. if _use_c:
  503. _insert_uncompressed = _cmessage._insert_message
  504. def insert(collection_name, docs, check_keys,
  505. safe, last_error_args, continue_on_error, opts, ctx=None):
  506. """**DEPRECATED** Get an **insert** message.
  507. .. versionchanged:: 3.12
  508. This function is deprecated and will be removed in PyMongo 4.0.
  509. """
  510. if ctx:
  511. return _insert_compressed(
  512. collection_name, docs, check_keys, continue_on_error, opts, ctx)
  513. return _insert_uncompressed(collection_name, docs, check_keys, safe,
  514. last_error_args, continue_on_error, opts)
  515. def _update(collection_name, upsert, multi, spec, doc, check_keys, opts):
  516. """Get an OP_UPDATE message."""
  517. flags = 0
  518. if upsert:
  519. flags += 1
  520. if multi:
  521. flags += 2
  522. encode = _dict_to_bson # Make local. Uses extensions.
  523. encoded_update = encode(doc, check_keys, opts)
  524. return b"".join([
  525. _ZERO_32,
  526. _make_c_string(collection_name),
  527. _pack_int(flags),
  528. encode(spec, False, opts),
  529. encoded_update]), len(encoded_update)
  530. def _update_compressed(
  531. collection_name, upsert, multi, spec, doc, check_keys, opts, ctx):
  532. """Internal compressed unacknowledged update message helper."""
  533. op_update, max_bson_size = _update(
  534. collection_name, upsert, multi, spec, doc, check_keys, opts)
  535. rid, msg = _compress(2001, op_update, ctx)
  536. return rid, msg, max_bson_size
  537. def _update_uncompressed(collection_name, upsert, multi, spec,
  538. doc, safe, last_error_args, check_keys, opts):
  539. """Internal update message helper."""
  540. op_update, max_bson_size = _update(
  541. collection_name, upsert, multi, spec, doc, check_keys, opts)
  542. rid, msg = __pack_message(2001, op_update)
  543. if safe:
  544. rid, gle, _ = __last_error(collection_name, last_error_args)
  545. return rid, msg + gle, max_bson_size
  546. return rid, msg, max_bson_size
  547. if _use_c:
  548. _update_uncompressed = _cmessage._update_message
  549. def update(collection_name, upsert, multi, spec,
  550. doc, safe, last_error_args, check_keys, opts, ctx=None):
  551. """**DEPRECATED** Get an **update** message.
  552. .. versionchanged:: 3.12
  553. This function is deprecated and will be removed in PyMongo 4.0.
  554. """
  555. if ctx:
  556. return _update_compressed(
  557. collection_name, upsert, multi, spec, doc, check_keys, opts, ctx)
  558. return _update_uncompressed(collection_name, upsert, multi, spec,
  559. doc, safe, last_error_args, check_keys, opts)
  560. _pack_op_msg_flags_type = struct.Struct("<IB").pack
  561. _pack_byte = struct.Struct("<B").pack
  562. def _op_msg_no_header(flags, command, identifier, docs, check_keys, opts):
  563. """Get a OP_MSG message.
  564. Note: this method handles multiple documents in a type one payload but
  565. it does not perform batch splitting and the total message size is
  566. only checked *after* generating the entire message.
  567. """
  568. # Encode the command document in payload 0 without checking keys.
  569. encoded = _dict_to_bson(command, False, opts)
  570. flags_type = _pack_op_msg_flags_type(flags, 0)
  571. total_size = len(encoded)
  572. max_doc_size = 0
  573. if identifier:
  574. type_one = _pack_byte(1)
  575. cstring = _make_c_string(identifier)
  576. encoded_docs = [_dict_to_bson(doc, check_keys, opts) for doc in docs]
  577. size = len(cstring) + sum(len(doc) for doc in encoded_docs) + 4
  578. encoded_size = _pack_int(size)
  579. total_size += size
  580. max_doc_size = max(len(doc) for doc in encoded_docs)
  581. data = ([flags_type, encoded, type_one, encoded_size, cstring] +
  582. encoded_docs)
  583. else:
  584. data = [flags_type, encoded]
  585. return b''.join(data), total_size, max_doc_size
  586. def _op_msg_compressed(flags, command, identifier, docs, check_keys, opts,
  587. ctx):
  588. """Internal OP_MSG message helper."""
  589. msg, total_size, max_bson_size = _op_msg_no_header(
  590. flags, command, identifier, docs, check_keys, opts)
  591. rid, msg = _compress(2013, msg, ctx)
  592. return rid, msg, total_size, max_bson_size
  593. def _op_msg_uncompressed(flags, command, identifier, docs, check_keys, opts):
  594. """Internal compressed OP_MSG message helper."""
  595. data, total_size, max_bson_size = _op_msg_no_header(
  596. flags, command, identifier, docs, check_keys, opts)
  597. request_id, op_message = __pack_message(2013, data)
  598. return request_id, op_message, total_size, max_bson_size
  599. if _use_c:
  600. _op_msg_uncompressed = _cmessage._op_msg
  601. def _op_msg(flags, command, dbname, read_preference, secondary_ok, check_keys,
  602. opts, ctx=None):
  603. """Get a OP_MSG message."""
  604. command['$db'] = dbname
  605. # getMore commands do not send $readPreference.
  606. if read_preference is not None and "$readPreference" not in command:
  607. if secondary_ok and not read_preference.mode:
  608. command["$readPreference"] = (
  609. ReadPreference.PRIMARY_PREFERRED.document)
  610. else:
  611. command["$readPreference"] = read_preference.document
  612. name = next(iter(command))
  613. try:
  614. identifier = _FIELD_MAP.get(name)
  615. docs = command.pop(identifier)
  616. except KeyError:
  617. identifier = ""
  618. docs = None
  619. try:
  620. if ctx:
  621. return _op_msg_compressed(
  622. flags, command, identifier, docs, check_keys, opts, ctx)
  623. return _op_msg_uncompressed(
  624. flags, command, identifier, docs, check_keys, opts)
  625. finally:
  626. # Add the field back to the command.
  627. if identifier:
  628. command[identifier] = docs
  629. def _query(options, collection_name, num_to_skip,
  630. num_to_return, query, field_selector, opts, check_keys):
  631. """Get an OP_QUERY message."""
  632. encoded = _dict_to_bson(query, check_keys, opts)
  633. if field_selector:
  634. efs = _dict_to_bson(field_selector, False, opts)
  635. else:
  636. efs = b""
  637. max_bson_size = max(len(encoded), len(efs))
  638. return b"".join([
  639. _pack_int(options),
  640. _make_c_string(collection_name),
  641. _pack_int(num_to_skip),
  642. _pack_int(num_to_return),
  643. encoded,
  644. efs]), max_bson_size
  645. def _query_compressed(options, collection_name, num_to_skip,
  646. num_to_return, query, field_selector,
  647. opts, check_keys=False, ctx=None):
  648. """Internal compressed query message helper."""
  649. op_query, max_bson_size = _query(
  650. options,
  651. collection_name,
  652. num_to_skip,
  653. num_to_return,
  654. query,
  655. field_selector,
  656. opts,
  657. check_keys)
  658. rid, msg = _compress(2004, op_query, ctx)
  659. return rid, msg, max_bson_size
  660. def _query_uncompressed(options, collection_name, num_to_skip,
  661. num_to_return, query, field_selector, opts, check_keys=False):
  662. """Internal query message helper."""
  663. op_query, max_bson_size = _query(
  664. options,
  665. collection_name,
  666. num_to_skip,
  667. num_to_return,
  668. query,
  669. field_selector,
  670. opts,
  671. check_keys)
  672. rid, msg = __pack_message(2004, op_query)
  673. return rid, msg, max_bson_size
  674. if _use_c:
  675. _query_uncompressed = _cmessage._query_message
  676. def query(options, collection_name, num_to_skip, num_to_return,
  677. query, field_selector, opts, check_keys=False, ctx=None):
  678. """**DEPRECATED** Get a **query** message.
  679. .. versionchanged:: 3.12
  680. This function is deprecated and will be removed in PyMongo 4.0.
  681. """
  682. if ctx:
  683. return _query_compressed(options, collection_name, num_to_skip,
  684. num_to_return, query, field_selector,
  685. opts, check_keys, ctx)
  686. return _query_uncompressed(options, collection_name, num_to_skip,
  687. num_to_return, query, field_selector, opts,
  688. check_keys)
  689. _pack_long_long = struct.Struct("<q").pack
  690. def _get_more(collection_name, num_to_return, cursor_id):
  691. """Get an OP_GET_MORE message."""
  692. return b"".join([
  693. _ZERO_32,
  694. _make_c_string(collection_name),
  695. _pack_int(num_to_return),
  696. _pack_long_long(cursor_id)])
  697. def _get_more_compressed(collection_name, num_to_return, cursor_id, ctx):
  698. """Internal compressed getMore message helper."""
  699. return _compress(
  700. 2005, _get_more(collection_name, num_to_return, cursor_id), ctx)
  701. def _get_more_uncompressed(collection_name, num_to_return, cursor_id):
  702. """Internal getMore message helper."""
  703. return __pack_message(
  704. 2005, _get_more(collection_name, num_to_return, cursor_id))
  705. if _use_c:
  706. _get_more_uncompressed = _cmessage._get_more_message
  707. def get_more(collection_name, num_to_return, cursor_id, ctx=None):
  708. """**DEPRECATED** Get a **getMore** message.
  709. .. versionchanged:: 3.12
  710. This function is deprecated and will be removed in PyMongo 4.0.
  711. """
  712. if ctx:
  713. return _get_more_compressed(
  714. collection_name, num_to_return, cursor_id, ctx)
  715. return _get_more_uncompressed(collection_name, num_to_return, cursor_id)
  716. def _delete(collection_name, spec, opts, flags):
  717. """Get an OP_DELETE message."""
  718. encoded = _dict_to_bson(spec, False, opts) # Uses extensions.
  719. return b"".join([
  720. _ZERO_32,
  721. _make_c_string(collection_name),
  722. _pack_int(flags),
  723. encoded]), len(encoded)
  724. def _delete_compressed(collection_name, spec, opts, flags, ctx):
  725. """Internal compressed unacknowledged delete message helper."""
  726. op_delete, max_bson_size = _delete(collection_name, spec, opts, flags)
  727. rid, msg = _compress(2006, op_delete, ctx)
  728. return rid, msg, max_bson_size
  729. def _delete_uncompressed(
  730. collection_name, spec, safe, last_error_args, opts, flags=0):
  731. """Internal delete message helper."""
  732. op_delete, max_bson_size = _delete(collection_name, spec, opts, flags)
  733. rid, msg = __pack_message(2006, op_delete)
  734. if safe:
  735. rid, gle, _ = __last_error(collection_name, last_error_args)
  736. return rid, msg + gle, max_bson_size
  737. return rid, msg, max_bson_size
  738. def delete(
  739. collection_name, spec, safe, last_error_args, opts, flags=0, ctx=None):
  740. """**DEPRECATED** Get a **delete** message.
  741. `opts` is a CodecOptions. `flags` is a bit vector that may contain
  742. the SingleRemove flag or not:
  743. http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#op-delete
  744. .. versionchanged:: 3.12
  745. This function is deprecated and will be removed in PyMongo 4.0.
  746. """
  747. if ctx:
  748. return _delete_compressed(collection_name, spec, opts, flags, ctx)
  749. return _delete_uncompressed(
  750. collection_name, spec, safe, last_error_args, opts, flags)
  751. def kill_cursors(cursor_ids):
  752. """**DEPRECATED** Get a **killCursors** message.
  753. .. versionchanged:: 3.12
  754. This function is deprecated and will be removed in PyMongo 4.0.
  755. """
  756. num_cursors = len(cursor_ids)
  757. pack = struct.Struct("<ii" + ("q" * num_cursors)).pack
  758. op_kill_cursors = pack(0, num_cursors, *cursor_ids)
  759. return __pack_message(2007, op_kill_cursors)
  760. class _BulkWriteContext(object):
  761. """A wrapper around SocketInfo for use with write splitting functions."""
  762. __slots__ = ('db_name', 'command', 'sock_info', 'op_id',
  763. 'name', 'field', 'publish', 'start_time', 'listeners',
  764. 'session', 'compress', 'op_type', 'codec')
  765. def __init__(self, database_name, command, sock_info, operation_id,
  766. listeners, session, op_type, codec):
  767. self.db_name = database_name
  768. self.command = command
  769. self.sock_info = sock_info
  770. self.op_id = operation_id
  771. self.listeners = listeners
  772. self.publish = listeners.enabled_for_commands
  773. self.name = next(iter(command))
  774. self.field = _FIELD_MAP[self.name]
  775. self.start_time = datetime.datetime.now() if self.publish else None
  776. self.session = session
  777. self.compress = True if sock_info.compression_context else False
  778. self.op_type = op_type
  779. self.codec = codec
  780. sock_info.add_server_api(command)
  781. def _batch_command(self, docs):
  782. namespace = self.db_name + '.$cmd'
  783. request_id, msg, to_send = _do_bulk_write_command(
  784. namespace, self.op_type, self.command, docs, self.check_keys,
  785. self.codec, self)
  786. if not to_send:
  787. raise InvalidOperation("cannot do an empty bulk write")
  788. return request_id, msg, to_send
  789. def execute(self, docs, client):
  790. request_id, msg, to_send = self._batch_command(docs)
  791. result = self.write_command(request_id, msg, to_send)
  792. client._process_response(result, self.session)
  793. return result, to_send
  794. def execute_unack(self, docs, client):
  795. request_id, msg, to_send = self._batch_command(docs)
  796. # Though this isn't strictly a "legacy" write, the helper
  797. # handles publishing commands and sending our message
  798. # without receiving a result. Send 0 for max_doc_size
  799. # to disable size checking. Size checking is handled while
  800. # the documents are encoded to BSON.
  801. self.legacy_write(request_id, msg, 0, False, to_send)
  802. return to_send
  803. @property
  804. def check_keys(self):
  805. """Should we check keys for this operation type?"""
  806. return False
  807. @property
  808. def max_bson_size(self):
  809. """A proxy for SockInfo.max_bson_size."""
  810. return self.sock_info.max_bson_size
  811. @property
  812. def max_message_size(self):
  813. """A proxy for SockInfo.max_message_size."""
  814. if self.compress:
  815. # Subtract 16 bytes for the message header.
  816. return self.sock_info.max_message_size - 16
  817. return self.sock_info.max_message_size
  818. @property
  819. def max_write_batch_size(self):
  820. """A proxy for SockInfo.max_write_batch_size."""
  821. return self.sock_info.max_write_batch_size
  822. @property
  823. def max_split_size(self):
  824. """The maximum size of a BSON command before batch splitting."""
  825. return self.max_bson_size
  826. def legacy_bulk_insert(
  827. self, request_id, msg, max_doc_size, acknowledged, docs, compress):
  828. if compress:
  829. request_id, msg = _compress(
  830. 2002, msg, self.sock_info.compression_context)
  831. return self.legacy_write(
  832. request_id, msg, max_doc_size, acknowledged, docs)
  833. def legacy_write(self, request_id, msg, max_doc_size, acknowledged, docs):
  834. """A proxy for SocketInfo.legacy_write that handles event publishing.
  835. """
  836. if self.publish:
  837. duration = datetime.datetime.now() - self.start_time
  838. cmd = self._start(request_id, docs)
  839. start = datetime.datetime.now()
  840. try:
  841. result = self.sock_info.legacy_write(
  842. request_id, msg, max_doc_size, acknowledged)
  843. if self.publish:
  844. duration = (datetime.datetime.now() - start) + duration
  845. if result is not None:
  846. reply = _convert_write_result(self.name, cmd, result)
  847. else:
  848. # Comply with APM spec.
  849. reply = {'ok': 1}
  850. self._succeed(request_id, reply, duration)
  851. except Exception as exc:
  852. if self.publish:
  853. duration = (datetime.datetime.now() - start) + duration
  854. if isinstance(exc, OperationFailure):
  855. failure = _convert_write_result(
  856. self.name, cmd, exc.details)
  857. elif isinstance(exc, NotPrimaryError):
  858. failure = exc.details
  859. else:
  860. failure = _convert_exception(exc)
  861. self._fail(request_id, failure, duration)
  862. raise
  863. finally:
  864. self.start_time = datetime.datetime.now()
  865. return result
  866. def write_command(self, request_id, msg, docs):
  867. """A proxy for SocketInfo.write_command that handles event publishing.
  868. """
  869. if self.publish:
  870. duration = datetime.datetime.now() - self.start_time
  871. self._start(request_id, docs)
  872. start = datetime.datetime.now()
  873. try:
  874. reply = self.sock_info.write_command(request_id, msg)
  875. if self.publish:
  876. duration = (datetime.datetime.now() - start) + duration
  877. self._succeed(request_id, reply, duration)
  878. except Exception as exc:
  879. if self.publish:
  880. duration = (datetime.datetime.now() - start) + duration
  881. if isinstance(exc, (NotPrimaryError, OperationFailure)):
  882. failure = exc.details
  883. else:
  884. failure = _convert_exception(exc)
  885. self._fail(request_id, failure, duration)
  886. raise
  887. finally:
  888. self.start_time = datetime.datetime.now()
  889. return reply
  890. def _start(self, request_id, docs):
  891. """Publish a CommandStartedEvent."""
  892. cmd = self.command.copy()
  893. cmd[self.field] = docs
  894. self.listeners.publish_command_start(
  895. cmd, self.db_name,
  896. request_id, self.sock_info.address, self.op_id,
  897. self.sock_info.service_id)
  898. return cmd
  899. def _succeed(self, request_id, reply, duration):
  900. """Publish a CommandSucceededEvent."""
  901. self.listeners.publish_command_success(
  902. duration, reply, self.name,
  903. request_id, self.sock_info.address, self.op_id,
  904. self.sock_info.service_id)
  905. def _fail(self, request_id, failure, duration):
  906. """Publish a CommandFailedEvent."""
  907. self.listeners.publish_command_failure(
  908. duration, failure, self.name,
  909. request_id, self.sock_info.address, self.op_id,
  910. self.sock_info.service_id)
  911. # From the Client Side Encryption spec:
  912. # Because automatic encryption increases the size of commands, the driver
  913. # MUST split bulk writes at a reduced size limit before undergoing automatic
  914. # encryption. The write payload MUST be split at 2MiB (2097152).
  915. _MAX_SPLIT_SIZE_ENC = 2097152
  916. class _EncryptedBulkWriteContext(_BulkWriteContext):
  917. __slots__ = ()
  918. def _batch_command(self, docs):
  919. namespace = self.db_name + '.$cmd'
  920. msg, to_send = _encode_batched_write_command(
  921. namespace, self.op_type, self.command, docs, self.check_keys,
  922. self.codec, self)
  923. if not to_send:
  924. raise InvalidOperation("cannot do an empty bulk write")
  925. # Chop off the OP_QUERY header to get a properly batched write command.
  926. cmd_start = msg.index(b"\x00", 4) + 9
  927. cmd = _inflate_bson(memoryview(msg)[cmd_start:],
  928. DEFAULT_RAW_BSON_OPTIONS)
  929. return cmd, to_send
  930. def execute(self, docs, client):
  931. cmd, to_send = self._batch_command(docs)
  932. result = self.sock_info.command(
  933. self.db_name, cmd, codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
  934. session=self.session, client=client)
  935. return result, to_send
  936. def execute_unack(self, docs, client):
  937. cmd, to_send = self._batch_command(docs)
  938. self.sock_info.command(
  939. self.db_name, cmd, write_concern=WriteConcern(w=0),
  940. session=self.session, client=client)
  941. return to_send
  942. @property
  943. def max_split_size(self):
  944. """Reduce the batch splitting size."""
  945. return _MAX_SPLIT_SIZE_ENC
  946. def _raise_document_too_large(operation, doc_size, max_size):
  947. """Internal helper for raising DocumentTooLarge."""
  948. if operation == "insert":
  949. raise DocumentTooLarge("BSON document too large (%d bytes)"
  950. " - the connected server supports"
  951. " BSON document sizes up to %d"
  952. " bytes." % (doc_size, max_size))
  953. else:
  954. # There's nothing intelligent we can say
  955. # about size for update and delete
  956. raise DocumentTooLarge("%r command document too large" % (operation,))
  957. def _do_batched_insert(collection_name, docs, check_keys,
  958. safe, last_error_args, continue_on_error, opts,
  959. ctx):
  960. """Insert `docs` using multiple batches.
  961. """
  962. def _insert_message(insert_message, send_safe):
  963. """Build the insert message with header and GLE.
  964. """
  965. request_id, final_message = __pack_message(2002, insert_message)
  966. if send_safe:
  967. request_id, error_message, _ = __last_error(collection_name,
  968. last_error_args)
  969. final_message += error_message
  970. return request_id, final_message
  971. send_safe = safe or not continue_on_error
  972. last_error = None
  973. data = StringIO()
  974. data.write(struct.pack("<i", int(continue_on_error)))
  975. data.write(_make_c_string(collection_name))
  976. message_length = begin_loc = data.tell()
  977. has_docs = False
  978. to_send = []
  979. encode = _dict_to_bson # Make local
  980. compress = ctx.compress and not (safe or send_safe)
  981. for doc in docs:
  982. encoded = encode(doc, check_keys, opts)
  983. encoded_length = len(encoded)
  984. too_large = (encoded_length > ctx.max_bson_size)
  985. message_length += encoded_length
  986. if message_length < ctx.max_message_size and not too_large:
  987. data.write(encoded)
  988. to_send.append(doc)
  989. has_docs = True
  990. continue
  991. if has_docs:
  992. # We have enough data, send this message.
  993. try:
  994. if compress:
  995. rid, msg = None, data.getvalue()
  996. else:
  997. rid, msg = _insert_message(data.getvalue(), send_safe)
  998. ctx.legacy_bulk_insert(
  999. rid, msg, 0, send_safe, to_send, compress)
  1000. # Exception type could be OperationFailure or a subtype
  1001. # (e.g. DuplicateKeyError)
  1002. except OperationFailure as exc:
  1003. # Like it says, continue on error...
  1004. if continue_on_error:
  1005. # Store exception details to re-raise after the final batch.
  1006. last_error = exc
  1007. # With unacknowledged writes just return at the first error.
  1008. elif not safe:
  1009. return
  1010. # With acknowledged writes raise immediately.
  1011. else:
  1012. raise
  1013. if too_large:
  1014. _raise_document_too_large(
  1015. "insert", encoded_length, ctx.max_bson_size)
  1016. message_length = begin_loc + encoded_length
  1017. data.seek(begin_loc)
  1018. data.truncate()
  1019. data.write(encoded)
  1020. to_send = [doc]
  1021. if not has_docs:
  1022. raise InvalidOperation("cannot do an empty bulk insert")
  1023. if compress:
  1024. request_id, msg = None, data.getvalue()
  1025. else:
  1026. request_id, msg = _insert_message(data.getvalue(), safe)
  1027. ctx.legacy_bulk_insert(request_id, msg, 0, safe, to_send, compress)
  1028. # Re-raise any exception stored due to continue_on_error
  1029. if last_error is not None:
  1030. raise last_error
  1031. if _use_c:
  1032. _do_batched_insert = _cmessage._do_batched_insert
  1033. # OP_MSG -------------------------------------------------------------
  1034. _OP_MSG_MAP = {
  1035. _INSERT: b'documents\x00',
  1036. _UPDATE: b'updates\x00',
  1037. _DELETE: b'deletes\x00',
  1038. }
  1039. def _batched_op_msg_impl(
  1040. operation, command, docs, check_keys, ack, opts, ctx, buf):
  1041. """Create a batched OP_MSG write."""
  1042. max_bson_size = ctx.max_bson_size
  1043. max_write_batch_size = ctx.max_write_batch_size
  1044. max_message_size = ctx.max_message_size
  1045. flags = b"\x00\x00\x00\x00" if ack else b"\x02\x00\x00\x00"
  1046. # Flags
  1047. buf.write(flags)
  1048. # Type 0 Section
  1049. buf.write(b"\x00")
  1050. buf.write(_dict_to_bson(command, False, opts))
  1051. # Type 1 Section
  1052. buf.write(b"\x01")
  1053. size_location = buf.tell()
  1054. # Save space for size
  1055. buf.write(b"\x00\x00\x00\x00")
  1056. try:
  1057. buf.write(_OP_MSG_MAP[operation])
  1058. except KeyError:
  1059. raise InvalidOperation('Unknown command')
  1060. if operation in (_UPDATE, _DELETE):
  1061. check_keys = False
  1062. to_send = []
  1063. idx = 0
  1064. for doc in docs:
  1065. # Encode the current operation
  1066. value = _dict_to_bson(doc, check_keys, opts)
  1067. doc_length = len(value)
  1068. new_message_size = buf.tell() + doc_length
  1069. # Does first document exceed max_message_size?
  1070. doc_too_large = (idx == 0 and (new_message_size > max_message_size))
  1071. # When OP_MSG is used unacknowleged we have to check
  1072. # document size client side or applications won't be notified.
  1073. # Otherwise we let the server deal with documents that are too large
  1074. # since ordered=False causes those documents to be skipped instead of
  1075. # halting the bulk write operation.
  1076. unacked_doc_too_large = (not ack and (doc_length > max_bson_size))
  1077. if doc_too_large or unacked_doc_too_large:
  1078. write_op = list(_FIELD_MAP.keys())[operation]
  1079. _raise_document_too_large(
  1080. write_op, len(value), max_bson_size)
  1081. # We have enough data, return this batch.
  1082. if new_message_size > max_message_size:
  1083. break
  1084. buf.write(value)
  1085. to_send.append(doc)
  1086. idx += 1
  1087. # We have enough documents, return this batch.
  1088. if idx == max_write_batch_size:
  1089. break
  1090. # Write type 1 section size
  1091. length = buf.tell()
  1092. buf.seek(size_location)
  1093. buf.write(_pack_int(length - size_location))
  1094. return to_send, length
  1095. def _encode_batched_op_msg(
  1096. operation, command, docs, check_keys, ack, opts, ctx):
  1097. """Encode the next batched insert, update, or delete operation
  1098. as OP_MSG.
  1099. """
  1100. buf = StringIO()
  1101. to_send, _ = _batched_op_msg_impl(
  1102. operation, command, docs, check_keys, ack, opts, ctx, buf)
  1103. return buf.getvalue(), to_send
  1104. if _use_c:
  1105. _encode_batched_op_msg = _cmessage._encode_batched_op_msg
  1106. def _batched_op_msg_compressed(
  1107. operation, command, docs, check_keys, ack, opts, ctx):
  1108. """Create the next batched insert, update, or delete operation
  1109. with OP_MSG, compressed.
  1110. """
  1111. data, to_send = _encode_batched_op_msg(
  1112. operation, command, docs, check_keys, ack, opts, ctx)
  1113. request_id, msg = _compress(
  1114. 2013,
  1115. data,
  1116. ctx.sock_info.compression_context)
  1117. return request_id, msg, to_send
  1118. def _batched_op_msg(
  1119. operation, command, docs, check_keys, ack, opts, ctx):
  1120. """OP_MSG implementation entry point."""
  1121. buf = StringIO()
  1122. # Save space for message length and request id
  1123. buf.write(_ZERO_64)
  1124. # responseTo, opCode
  1125. buf.write(b"\x00\x00\x00\x00\xdd\x07\x00\x00")
  1126. to_send, length = _batched_op_msg_impl(
  1127. operation, command, docs, check_keys, ack, opts, ctx, buf)
  1128. # Header - request id and message length
  1129. buf.seek(4)
  1130. request_id = _randint()
  1131. buf.write(_pack_int(request_id))
  1132. buf.seek(0)
  1133. buf.write(_pack_int(length))
  1134. return request_id, buf.getvalue(), to_send
  1135. if _use_c:
  1136. _batched_op_msg = _cmessage._batched_op_msg
  1137. def _do_batched_op_msg(
  1138. namespace, operation, command, docs, check_keys, opts, ctx):
  1139. """Create the next batched insert, update, or delete operation
  1140. using OP_MSG.
  1141. """
  1142. command['$db'] = namespace.split('.', 1)[0]
  1143. if 'writeConcern' in command:
  1144. ack = bool(command['writeConcern'].get('w', 1))
  1145. else:
  1146. ack = True
  1147. if ctx.sock_info.compression_context:
  1148. return _batched_op_msg_compressed(
  1149. operation, command, docs, check_keys, ack, opts, ctx)
  1150. return _batched_op_msg(
  1151. operation, command, docs, check_keys, ack, opts, ctx)
  1152. # End OP_MSG -----------------------------------------------------
  1153. def _batched_write_command_compressed(
  1154. namespace, operation, command, docs, check_keys, opts, ctx):
  1155. """Create the next batched insert, update, or delete command, compressed.
  1156. """
  1157. data, to_send = _encode_batched_write_command(
  1158. namespace, operation, command, docs, check_keys, opts, ctx)
  1159. request_id, msg = _compress(
  1160. 2004,
  1161. data,
  1162. ctx.sock_info.compression_context)
  1163. return request_id, msg, to_send
  1164. def _encode_batched_write_command(
  1165. namespace, operation, command, docs, check_keys, opts, ctx):
  1166. """Encode the next batched insert, update, or delete command.
  1167. """
  1168. buf = StringIO()
  1169. to_send, _ = _batched_write_command_impl(
  1170. namespace, operation, command, docs, check_keys, opts, ctx, buf)
  1171. return buf.getvalue(), to_send
  1172. if _use_c:
  1173. _encode_batched_write_command = _cmessage._encode_batched_write_command
  1174. def _batched_write_command(
  1175. namespace, operation, command, docs, check_keys, opts, ctx):
  1176. """Create the next batched insert, update, or delete command.
  1177. """
  1178. buf = StringIO()
  1179. # Save space for message length and request id
  1180. buf.write(_ZERO_64)
  1181. # responseTo, opCode
  1182. buf.write(b"\x00\x00\x00\x00\xd4\x07\x00\x00")
  1183. # Write OP_QUERY write command
  1184. to_send, length = _batched_write_command_impl(
  1185. namespace, operation, command, docs, check_keys, opts, ctx, buf)
  1186. # Header - request id and message length
  1187. buf.seek(4)
  1188. request_id = _randint()
  1189. buf.write(_pack_int(request_id))
  1190. buf.seek(0)
  1191. buf.write(_pack_int(length))
  1192. return request_id, buf.getvalue(), to_send
  1193. if _use_c:
  1194. _batched_write_command = _cmessage._batched_write_command
  1195. def _do_batched_write_command(
  1196. namespace, operation, command, docs, check_keys, opts, ctx):
  1197. """Batched write commands entry point."""
  1198. if ctx.sock_info.compression_context:
  1199. return _batched_write_command_compressed(
  1200. namespace, operation, command, docs, check_keys, opts, ctx)
  1201. return _batched_write_command(
  1202. namespace, operation, command, docs, check_keys, opts, ctx)
  1203. def _do_bulk_write_command(
  1204. namespace, operation, command, docs, check_keys, opts, ctx):
  1205. """Bulk write commands entry point."""
  1206. if ctx.sock_info.max_wire_version > 5:
  1207. return _do_batched_op_msg(
  1208. namespace, operation, command, docs, check_keys, opts, ctx)
  1209. return _do_batched_write_command(
  1210. namespace, operation, command, docs, check_keys, opts, ctx)
  1211. def _batched_write_command_impl(
  1212. namespace, operation, command, docs, check_keys, opts, ctx, buf):
  1213. """Create a batched OP_QUERY write command."""
  1214. max_bson_size = ctx.max_bson_size
  1215. max_write_batch_size = ctx.max_write_batch_size
  1216. # Max BSON object size + 16k - 2 bytes for ending NUL bytes.
  1217. # Server guarantees there is enough room: SERVER-10643.
  1218. max_cmd_size = max_bson_size + _COMMAND_OVERHEAD
  1219. max_split_size = ctx.max_split_size
  1220. # No options
  1221. buf.write(_ZERO_32)
  1222. # Namespace as C string
  1223. buf.write(b(namespace))
  1224. buf.write(_ZERO_8)
  1225. # Skip: 0, Limit: -1
  1226. buf.write(_SKIPLIM)
  1227. # Where to write command document length
  1228. command_start = buf.tell()
  1229. buf.write(encode(command))
  1230. # Start of payload
  1231. buf.seek(-1, 2)
  1232. # Work around some Jython weirdness.
  1233. buf.truncate()
  1234. try:
  1235. buf.write(_OP_MAP[operation])
  1236. except KeyError:
  1237. raise InvalidOperation('Unknown command')
  1238. if operation in (_UPDATE, _DELETE):
  1239. check_keys = False
  1240. # Where to write list document length
  1241. list_start = buf.tell() - 4
  1242. to_send = []
  1243. idx = 0
  1244. for doc in docs:
  1245. # Encode the current operation
  1246. key = b(str(idx))
  1247. value = encode(doc, check_keys, opts)
  1248. # Is there enough room to add this document? max_cmd_size accounts for
  1249. # the two trailing null bytes.
  1250. doc_too_large = len(value) > max_cmd_size
  1251. if doc_too_large:
  1252. write_op = list(_FIELD_MAP.keys())[operation]
  1253. _raise_document_too_large(
  1254. write_op, len(value), max_bson_size)
  1255. enough_data = (idx >= 1 and
  1256. (buf.tell() + len(key) + len(value)) >= max_split_size)
  1257. enough_documents = (idx >= max_write_batch_size)
  1258. if enough_data or enough_documents:
  1259. break
  1260. buf.write(_BSONOBJ)
  1261. buf.write(key)
  1262. buf.write(_ZERO_8)
  1263. buf.write(value)
  1264. to_send.append(doc)
  1265. idx += 1
  1266. # Finalize the current OP_QUERY message.
  1267. # Close list and command documents
  1268. buf.write(_ZERO_16)
  1269. # Write document lengths and request id
  1270. length = buf.tell()
  1271. buf.seek(list_start)
  1272. buf.write(_pack_int(length - list_start - 1))
  1273. buf.seek(command_start)
  1274. buf.write(_pack_int(length - command_start))
  1275. return to_send, length
  1276. class _OpReply(object):
  1277. """A MongoDB OP_REPLY response message."""
  1278. __slots__ = ("flags", "cursor_id", "number_returned", "documents")
  1279. UNPACK_FROM = struct.Struct("<iqii").unpack_from
  1280. OP_CODE = 1
  1281. def __init__(self, flags, cursor_id, number_returned, documents):
  1282. self.flags = flags
  1283. self.cursor_id = cursor_id
  1284. self.number_returned = number_returned
  1285. self.documents = documents
  1286. def raw_response(self, cursor_id=None, user_fields=None):
  1287. """Check the response header from the database, without decoding BSON.
  1288. Check the response for errors and unpack.
  1289. Can raise CursorNotFound, NotPrimaryError, ExecutionTimeout, or
  1290. OperationFailure.
  1291. :Parameters:
  1292. - `cursor_id` (optional): cursor_id we sent to get this response -
  1293. used for raising an informative exception when we get cursor id not
  1294. valid at server response.
  1295. """
  1296. if self.flags & 1:
  1297. # Shouldn't get this response if we aren't doing a getMore
  1298. if cursor_id is None:
  1299. raise ProtocolError("No cursor id for getMore operation")
  1300. # Fake a getMore command response. OP_GET_MORE provides no
  1301. # document.
  1302. msg = "Cursor not found, cursor id: %d" % (cursor_id,)
  1303. errobj = {"ok": 0, "errmsg": msg, "code": 43}
  1304. raise CursorNotFound(msg, 43, errobj)
  1305. elif self.flags & 2:
  1306. error_object = bson.BSON(self.documents).decode()
  1307. # Fake the ok field if it doesn't exist.
  1308. error_object.setdefault("ok", 0)
  1309. if error_object["$err"].startswith(HelloCompat.LEGACY_ERROR):
  1310. raise NotPrimaryError(error_object["$err"], error_object)
  1311. elif error_object.get("code") == 50:
  1312. raise ExecutionTimeout(error_object.get("$err"),
  1313. error_object.get("code"),
  1314. error_object)
  1315. raise OperationFailure("database error: %s" %
  1316. error_object.get("$err"),
  1317. error_object.get("code"),
  1318. error_object)
  1319. if self.documents:
  1320. return [self.documents]
  1321. return []
  1322. def unpack_response(self, cursor_id=None,
  1323. codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
  1324. user_fields=None, legacy_response=False):
  1325. """Unpack a response from the database and decode the BSON document(s).
  1326. Check the response for errors and unpack, returning a dictionary
  1327. containing the response data.
  1328. Can raise CursorNotFound, NotPrimaryError, ExecutionTimeout, or
  1329. OperationFailure.
  1330. :Parameters:
  1331. - `cursor_id` (optional): cursor_id we sent to get this response -
  1332. used for raising an informative exception when we get cursor id not
  1333. valid at server response
  1334. - `codec_options` (optional): an instance of
  1335. :class:`~bson.codec_options.CodecOptions`
  1336. """
  1337. self.raw_response(cursor_id)
  1338. if legacy_response:
  1339. return bson.decode_all(self.documents, codec_options)
  1340. return bson._decode_all_selective(
  1341. self.documents, codec_options, user_fields)
  1342. def command_response(self):
  1343. """Unpack a command response."""
  1344. docs = self.unpack_response()
  1345. assert self.number_returned == 1
  1346. return docs[0]
  1347. def raw_command_response(self):
  1348. """Return the bytes of the command response."""
  1349. # This should never be called on _OpReply.
  1350. raise NotImplementedError
  1351. @property
  1352. def more_to_come(self):
  1353. """Is the moreToCome bit set on this response?"""
  1354. return False
  1355. @classmethod
  1356. def unpack(cls, msg):
  1357. """Construct an _OpReply from raw bytes."""
  1358. # PYTHON-945: ignore starting_from field.
  1359. flags, cursor_id, _, number_returned = cls.UNPACK_FROM(msg)
  1360. # Convert Python 3 memoryview to bytes. Note we should call
  1361. # memoryview.tobytes() if we start using memoryview in Python 2.7.
  1362. documents = bytes(msg[20:])
  1363. return cls(flags, cursor_id, number_returned, documents)
  1364. class _OpMsg(object):
  1365. """A MongoDB OP_MSG response message."""
  1366. __slots__ = ("flags", "cursor_id", "number_returned", "payload_document")
  1367. UNPACK_FROM = struct.Struct("<IBi").unpack_from
  1368. OP_CODE = 2013
  1369. # Flag bits.
  1370. CHECKSUM_PRESENT = 1
  1371. MORE_TO_COME = 1 << 1
  1372. EXHAUST_ALLOWED = 1 << 16 # Only present on requests.
  1373. def __init__(self, flags, payload_document):
  1374. self.flags = flags
  1375. self.payload_document = payload_document
  1376. def raw_response(self, cursor_id=None, user_fields={}):
  1377. """
  1378. cursor_id is ignored
  1379. user_fields is used to determine which fields must not be decoded
  1380. """
  1381. inflated_response = _decode_selective(
  1382. RawBSONDocument(self.payload_document), user_fields,
  1383. DEFAULT_RAW_BSON_OPTIONS)
  1384. return [inflated_response]
  1385. def unpack_response(self, cursor_id=None,
  1386. codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
  1387. user_fields=None, legacy_response=False):
  1388. """Unpack a OP_MSG command response.
  1389. :Parameters:
  1390. - `cursor_id` (optional): Ignored, for compatibility with _OpReply.
  1391. - `codec_options` (optional): an instance of
  1392. :class:`~bson.codec_options.CodecOptions`
  1393. """
  1394. # If _OpMsg is in-use, this cannot be a legacy response.
  1395. assert not legacy_response
  1396. return bson._decode_all_selective(
  1397. self.payload_document, codec_options, user_fields)
  1398. def command_response(self):
  1399. """Unpack a command response."""
  1400. return self.unpack_response()[0]
  1401. def raw_command_response(self):
  1402. """Return the bytes of the command response."""
  1403. return self.payload_document
  1404. @property
  1405. def more_to_come(self):
  1406. """Is the moreToCome bit set on this response?"""
  1407. return self.flags & self.MORE_TO_COME
  1408. @classmethod
  1409. def unpack(cls, msg):
  1410. """Construct an _OpMsg from raw bytes."""
  1411. flags, first_payload_type, first_payload_size = cls.UNPACK_FROM(msg)
  1412. if flags != 0:
  1413. if flags & cls.CHECKSUM_PRESENT:
  1414. raise ProtocolError(
  1415. "Unsupported OP_MSG flag checksumPresent: "
  1416. "0x%x" % (flags,))
  1417. if flags ^ cls.MORE_TO_COME:
  1418. raise ProtocolError(
  1419. "Unsupported OP_MSG flags: 0x%x" % (flags,))
  1420. if first_payload_type != 0:
  1421. raise ProtocolError(
  1422. "Unsupported OP_MSG payload type: "
  1423. "0x%x" % (first_payload_type,))
  1424. if len(msg) != first_payload_size + 5:
  1425. raise ProtocolError("Unsupported OP_MSG reply: >1 section")
  1426. # Convert Python 3 memoryview to bytes. Note we should call
  1427. # memoryview.tobytes() if we start using memoryview in Python 2.7.
  1428. payload_document = bytes(msg[5:])
  1429. return cls(flags, payload_document)
  1430. _UNPACK_REPLY = {
  1431. _OpReply.OP_CODE: _OpReply.unpack,
  1432. _OpMsg.OP_CODE: _OpMsg.unpack,
  1433. }
  1434. def _first_batch(sock_info, db, coll, query, ntoreturn,
  1435. secondary_ok, codec_options, read_preference, cmd, listeners):
  1436. """Simple query helper for retrieving a first (and possibly only) batch."""
  1437. query = _Query(
  1438. 0, db, coll, 0, query, None, codec_options,
  1439. read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, None,
  1440. None, None, False)
  1441. name = next(iter(cmd))
  1442. publish = listeners.enabled_for_commands
  1443. if publish:
  1444. start = datetime.datetime.now()
  1445. request_id, msg, max_doc_size = query.get_message(secondary_ok, sock_info)
  1446. if publish:
  1447. encoding_duration = datetime.datetime.now() - start
  1448. listeners.publish_command_start(
  1449. cmd, db, request_id, sock_info.address,
  1450. service_id=sock_info.service_id)
  1451. start = datetime.datetime.now()
  1452. sock_info.send_message(msg, max_doc_size)
  1453. reply = sock_info.receive_message(request_id)
  1454. try:
  1455. docs = reply.unpack_response(None, codec_options)
  1456. except Exception as exc:
  1457. if publish:
  1458. duration = (datetime.datetime.now() - start) + encoding_duration
  1459. if isinstance(exc, (NotPrimaryError, OperationFailure)):
  1460. failure = exc.details
  1461. else:
  1462. failure = _convert_exception(exc)
  1463. listeners.publish_command_failure(
  1464. duration, failure, name, request_id, sock_info.address,
  1465. service_id=sock_info.service_id)
  1466. raise
  1467. # listIndexes
  1468. if 'cursor' in cmd:
  1469. result = {
  1470. u'cursor': {
  1471. u'firstBatch': docs,
  1472. u'id': reply.cursor_id,
  1473. u'ns': u'%s.%s' % (db, coll)
  1474. },
  1475. u'ok': 1.0
  1476. }
  1477. # fsyncUnlock, currentOp
  1478. else:
  1479. result = docs[0] if docs else {}
  1480. result[u'ok'] = 1.0
  1481. if publish:
  1482. duration = (datetime.datetime.now() - start) + encoding_duration
  1483. listeners.publish_command_success(
  1484. duration, result, name, request_id, sock_info.address,
  1485. service_id=sock_info.service_id)
  1486. return result