bulk.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. # Copyright 2014-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """The bulk write operations interface.
  15. .. versionadded:: 2.7
  16. """
  17. import copy
  18. from itertools import islice
  19. from bson.objectid import ObjectId
  20. from bson.raw_bson import RawBSONDocument
  21. from bson.son import SON
  22. from pymongo.client_session import _validate_session_write_concern
  23. from pymongo.common import (validate_is_mapping,
  24. validate_is_document_type,
  25. validate_ok_for_replace,
  26. validate_ok_for_update)
  27. from pymongo.helpers import _RETRYABLE_ERROR_CODES
  28. from pymongo.collation import validate_collation_or_none
  29. from pymongo.errors import (BulkWriteError,
  30. ConfigurationError,
  31. InvalidOperation,
  32. OperationFailure)
  33. from pymongo.message import (_INSERT, _UPDATE, _DELETE,
  34. _do_batched_insert,
  35. _randint,
  36. _BulkWriteContext,
  37. _EncryptedBulkWriteContext)
  38. from pymongo.read_preferences import ReadPreference
  39. from pymongo.write_concern import WriteConcern
  40. _DELETE_ALL = 0
  41. _DELETE_ONE = 1
  42. # For backwards compatibility. See MongoDB src/mongo/base/error_codes.err
  43. _BAD_VALUE = 2
  44. _UNKNOWN_ERROR = 8
  45. _WRITE_CONCERN_ERROR = 64
  46. _COMMANDS = ('insert', 'update', 'delete')
  47. # These string literals are used when we create fake server return
  48. # documents client side. We use unicode literals in python 2.x to
  49. # match the actual return values from the server.
  50. _UOP = u"op"
  51. class _Run(object):
  52. """Represents a batch of write operations.
  53. """
  54. def __init__(self, op_type):
  55. """Initialize a new Run object.
  56. """
  57. self.op_type = op_type
  58. self.index_map = []
  59. self.ops = []
  60. self.idx_offset = 0
  61. def index(self, idx):
  62. """Get the original index of an operation in this run.
  63. :Parameters:
  64. - `idx`: The Run index that maps to the original index.
  65. """
  66. return self.index_map[idx]
  67. def add(self, original_index, operation):
  68. """Add an operation to this Run instance.
  69. :Parameters:
  70. - `original_index`: The original index of this operation
  71. within a larger bulk operation.
  72. - `operation`: The operation document.
  73. """
  74. self.index_map.append(original_index)
  75. self.ops.append(operation)
  76. def _merge_command(run, full_result, offset, result):
  77. """Merge a write command result into the full bulk result.
  78. """
  79. affected = result.get("n", 0)
  80. if run.op_type == _INSERT:
  81. full_result["nInserted"] += affected
  82. elif run.op_type == _DELETE:
  83. full_result["nRemoved"] += affected
  84. elif run.op_type == _UPDATE:
  85. upserted = result.get("upserted")
  86. if upserted:
  87. n_upserted = len(upserted)
  88. for doc in upserted:
  89. doc["index"] = run.index(doc["index"] + offset)
  90. full_result["upserted"].extend(upserted)
  91. full_result["nUpserted"] += n_upserted
  92. full_result["nMatched"] += (affected - n_upserted)
  93. else:
  94. full_result["nMatched"] += affected
  95. full_result["nModified"] += result["nModified"]
  96. write_errors = result.get("writeErrors")
  97. if write_errors:
  98. for doc in write_errors:
  99. # Leave the server response intact for APM.
  100. replacement = doc.copy()
  101. idx = doc["index"] + offset
  102. replacement["index"] = run.index(idx)
  103. # Add the failed operation to the error document.
  104. replacement[_UOP] = run.ops[idx]
  105. full_result["writeErrors"].append(replacement)
  106. wc_error = result.get("writeConcernError")
  107. if wc_error:
  108. full_result["writeConcernErrors"].append(wc_error)
  109. def _raise_bulk_write_error(full_result):
  110. """Raise a BulkWriteError from the full bulk api result.
  111. """
  112. if full_result["writeErrors"]:
  113. full_result["writeErrors"].sort(
  114. key=lambda error: error["index"])
  115. raise BulkWriteError(full_result)
  116. class _Bulk(object):
  117. """The private guts of the bulk write API.
  118. """
  119. def __init__(self, collection, ordered, bypass_document_validation):
  120. """Initialize a _Bulk instance.
  121. """
  122. self.collection = collection.with_options(
  123. codec_options=collection.codec_options._replace(
  124. unicode_decode_error_handler='replace',
  125. document_class=dict))
  126. self.ordered = ordered
  127. self.ops = []
  128. self.executed = False
  129. self.bypass_doc_val = bypass_document_validation
  130. self.uses_collation = False
  131. self.uses_array_filters = False
  132. self.uses_hint = False
  133. self.is_retryable = True
  134. self.retrying = False
  135. self.started_retryable_write = False
  136. # Extra state so that we know where to pick up on a retry attempt.
  137. self.current_run = None
  138. @property
  139. def bulk_ctx_class(self):
  140. encrypter = self.collection.database.client._encrypter
  141. if encrypter and not encrypter._bypass_auto_encryption:
  142. return _EncryptedBulkWriteContext
  143. else:
  144. return _BulkWriteContext
  145. def add_insert(self, document):
  146. """Add an insert document to the list of ops.
  147. """
  148. validate_is_document_type("document", document)
  149. # Generate ObjectId client side.
  150. if not (isinstance(document, RawBSONDocument) or '_id' in document):
  151. document['_id'] = ObjectId()
  152. self.ops.append((_INSERT, document))
  153. def add_update(self, selector, update, multi=False, upsert=False,
  154. collation=None, array_filters=None, hint=None):
  155. """Create an update document and add it to the list of ops.
  156. """
  157. validate_ok_for_update(update)
  158. cmd = SON([('q', selector), ('u', update),
  159. ('multi', multi), ('upsert', upsert)])
  160. collation = validate_collation_or_none(collation)
  161. if collation is not None:
  162. self.uses_collation = True
  163. cmd['collation'] = collation
  164. if array_filters is not None:
  165. self.uses_array_filters = True
  166. cmd['arrayFilters'] = array_filters
  167. if hint is not None:
  168. self.uses_hint = True
  169. cmd['hint'] = hint
  170. if multi:
  171. # A bulk_write containing an update_many is not retryable.
  172. self.is_retryable = False
  173. self.ops.append((_UPDATE, cmd))
  174. def add_replace(self, selector, replacement, upsert=False,
  175. collation=None, hint=None):
  176. """Create a replace document and add it to the list of ops.
  177. """
  178. validate_ok_for_replace(replacement)
  179. cmd = SON([('q', selector), ('u', replacement),
  180. ('multi', False), ('upsert', upsert)])
  181. collation = validate_collation_or_none(collation)
  182. if collation is not None:
  183. self.uses_collation = True
  184. cmd['collation'] = collation
  185. if hint is not None:
  186. self.uses_hint = True
  187. cmd['hint'] = hint
  188. self.ops.append((_UPDATE, cmd))
  189. def add_delete(self, selector, limit, collation=None, hint=None):
  190. """Create a delete document and add it to the list of ops.
  191. """
  192. cmd = SON([('q', selector), ('limit', limit)])
  193. collation = validate_collation_or_none(collation)
  194. if collation is not None:
  195. self.uses_collation = True
  196. cmd['collation'] = collation
  197. if hint is not None:
  198. self.uses_hint = True
  199. cmd['hint'] = hint
  200. if limit == _DELETE_ALL:
  201. # A bulk_write containing a delete_many is not retryable.
  202. self.is_retryable = False
  203. self.ops.append((_DELETE, cmd))
  204. def gen_ordered(self):
  205. """Generate batches of operations, batched by type of
  206. operation, in the order **provided**.
  207. """
  208. run = None
  209. for idx, (op_type, operation) in enumerate(self.ops):
  210. if run is None:
  211. run = _Run(op_type)
  212. elif run.op_type != op_type:
  213. yield run
  214. run = _Run(op_type)
  215. run.add(idx, operation)
  216. yield run
  217. def gen_unordered(self):
  218. """Generate batches of operations, batched by type of
  219. operation, in arbitrary order.
  220. """
  221. operations = [_Run(_INSERT), _Run(_UPDATE), _Run(_DELETE)]
  222. for idx, (op_type, operation) in enumerate(self.ops):
  223. operations[op_type].add(idx, operation)
  224. for run in operations:
  225. if run.ops:
  226. yield run
  227. def _execute_command(self, generator, write_concern, session,
  228. sock_info, op_id, retryable, full_result):
  229. if sock_info.max_wire_version < 5:
  230. if self.uses_collation:
  231. raise ConfigurationError(
  232. 'Must be connected to MongoDB 3.4+ to use a collation.')
  233. if self.uses_hint:
  234. raise ConfigurationError(
  235. 'Must be connected to MongoDB 3.4+ to use hint.')
  236. if sock_info.max_wire_version < 6 and self.uses_array_filters:
  237. raise ConfigurationError(
  238. 'Must be connected to MongoDB 3.6+ to use arrayFilters.')
  239. db_name = self.collection.database.name
  240. client = self.collection.database.client
  241. listeners = client._event_listeners
  242. if not self.current_run:
  243. self.current_run = next(generator)
  244. run = self.current_run
  245. # sock_info.command validates the session, but we use
  246. # sock_info.write_command.
  247. sock_info.validate_session(client, session)
  248. while run:
  249. cmd = SON([(_COMMANDS[run.op_type], self.collection.name),
  250. ('ordered', self.ordered)])
  251. if not write_concern.is_server_default:
  252. cmd['writeConcern'] = write_concern.document
  253. if self.bypass_doc_val and sock_info.max_wire_version >= 4:
  254. cmd['bypassDocumentValidation'] = True
  255. bwc = self.bulk_ctx_class(
  256. db_name, cmd, sock_info, op_id, listeners, session,
  257. run.op_type, self.collection.codec_options)
  258. while run.idx_offset < len(run.ops):
  259. if session:
  260. # Start a new retryable write unless one was already
  261. # started for this command.
  262. if retryable and not self.started_retryable_write:
  263. session._start_retryable_write()
  264. self.started_retryable_write = True
  265. session._apply_to(cmd, retryable, ReadPreference.PRIMARY,
  266. sock_info)
  267. sock_info.send_cluster_time(cmd, session, client)
  268. ops = islice(run.ops, run.idx_offset, None)
  269. # Run as many ops as possible in one command.
  270. result, to_send = bwc.execute(ops, client)
  271. # Retryable writeConcernErrors halt the execution of this run.
  272. wce = result.get('writeConcernError', {})
  273. if wce.get('code', 0) in _RETRYABLE_ERROR_CODES:
  274. # Synthesize the full bulk result without modifying the
  275. # current one because this write operation may be retried.
  276. full = copy.deepcopy(full_result)
  277. _merge_command(run, full, run.idx_offset, result)
  278. _raise_bulk_write_error(full)
  279. _merge_command(run, full_result, run.idx_offset, result)
  280. # We're no longer in a retry once a command succeeds.
  281. self.retrying = False
  282. self.started_retryable_write = False
  283. if self.ordered and "writeErrors" in result:
  284. break
  285. run.idx_offset += len(to_send)
  286. # We're supposed to continue if errors are
  287. # at the write concern level (e.g. wtimeout)
  288. if self.ordered and full_result['writeErrors']:
  289. break
  290. # Reset our state
  291. self.current_run = run = next(generator, None)
  292. def execute_command(self, generator, write_concern, session):
  293. """Execute using write commands.
  294. """
  295. # nModified is only reported for write commands, not legacy ops.
  296. full_result = {
  297. "writeErrors": [],
  298. "writeConcernErrors": [],
  299. "nInserted": 0,
  300. "nUpserted": 0,
  301. "nMatched": 0,
  302. "nModified": 0,
  303. "nRemoved": 0,
  304. "upserted": [],
  305. }
  306. op_id = _randint()
  307. def retryable_bulk(session, sock_info, retryable):
  308. self._execute_command(
  309. generator, write_concern, session, sock_info, op_id,
  310. retryable, full_result)
  311. client = self.collection.database.client
  312. with client._tmp_session(session) as s:
  313. client._retry_with_session(
  314. self.is_retryable, retryable_bulk, s, self)
  315. if full_result["writeErrors"] or full_result["writeConcernErrors"]:
  316. _raise_bulk_write_error(full_result)
  317. return full_result
  318. def execute_insert_no_results(self, sock_info, run, op_id, acknowledged):
  319. """Execute insert, returning no results.
  320. """
  321. command = SON([('insert', self.collection.name),
  322. ('ordered', self.ordered)])
  323. concern = {'w': int(self.ordered)}
  324. command['writeConcern'] = concern
  325. if self.bypass_doc_val and sock_info.max_wire_version >= 4:
  326. command['bypassDocumentValidation'] = True
  327. db = self.collection.database
  328. bwc = _BulkWriteContext(
  329. db.name, command, sock_info, op_id, db.client._event_listeners,
  330. None, _INSERT, self.collection.codec_options)
  331. # Legacy batched OP_INSERT.
  332. _do_batched_insert(
  333. self.collection.full_name, run.ops, True, acknowledged, concern,
  334. not self.ordered, self.collection.codec_options, bwc)
  335. def execute_op_msg_no_results(self, sock_info, generator):
  336. """Execute write commands with OP_MSG and w=0 writeConcern, unordered.
  337. """
  338. db_name = self.collection.database.name
  339. client = self.collection.database.client
  340. listeners = client._event_listeners
  341. op_id = _randint()
  342. if not self.current_run:
  343. self.current_run = next(generator)
  344. run = self.current_run
  345. while run:
  346. cmd = SON([(_COMMANDS[run.op_type], self.collection.name),
  347. ('ordered', False),
  348. ('writeConcern', {'w': 0})])
  349. bwc = self.bulk_ctx_class(
  350. db_name, cmd, sock_info, op_id, listeners, None,
  351. run.op_type, self.collection.codec_options)
  352. while run.idx_offset < len(run.ops):
  353. ops = islice(run.ops, run.idx_offset, None)
  354. # Run as many ops as possible.
  355. to_send = bwc.execute_unack(ops, client)
  356. run.idx_offset += len(to_send)
  357. self.current_run = run = next(generator, None)
  358. def execute_command_no_results(self, sock_info, generator):
  359. """Execute write commands with OP_MSG and w=0 WriteConcern, ordered.
  360. """
  361. full_result = {
  362. "writeErrors": [],
  363. "writeConcernErrors": [],
  364. "nInserted": 0,
  365. "nUpserted": 0,
  366. "nMatched": 0,
  367. "nModified": 0,
  368. "nRemoved": 0,
  369. "upserted": [],
  370. }
  371. # Ordered bulk writes have to be acknowledged so that we stop
  372. # processing at the first error, even when the application
  373. # specified unacknowledged writeConcern.
  374. write_concern = WriteConcern()
  375. op_id = _randint()
  376. try:
  377. self._execute_command(
  378. generator, write_concern, None,
  379. sock_info, op_id, False, full_result)
  380. except OperationFailure:
  381. pass
  382. def execute_no_results(self, sock_info, generator):
  383. """Execute all operations, returning no results (w=0).
  384. """
  385. if self.uses_collation:
  386. raise ConfigurationError(
  387. 'Collation is unsupported for unacknowledged writes.')
  388. if self.uses_array_filters:
  389. raise ConfigurationError(
  390. 'arrayFilters is unsupported for unacknowledged writes.')
  391. if self.uses_hint:
  392. raise ConfigurationError(
  393. 'hint is unsupported for unacknowledged writes.')
  394. # Cannot have both unacknowledged writes and bypass document validation.
  395. if self.bypass_doc_val and sock_info.max_wire_version >= 4:
  396. raise OperationFailure("Cannot set bypass_document_validation with"
  397. " unacknowledged write concern")
  398. # OP_MSG
  399. if sock_info.max_wire_version > 5:
  400. if self.ordered:
  401. return self.execute_command_no_results(sock_info, generator)
  402. return self.execute_op_msg_no_results(sock_info, generator)
  403. coll = self.collection
  404. # If ordered is True we have to send GLE or use write
  405. # commands so we can abort on the first error.
  406. write_concern = WriteConcern(w=int(self.ordered))
  407. op_id = _randint()
  408. next_run = next(generator)
  409. while next_run:
  410. # An ordered bulk write needs to send acknowledged writes to short
  411. # circuit the next run. However, the final message on the final
  412. # run can be unacknowledged.
  413. run = next_run
  414. next_run = next(generator, None)
  415. needs_ack = self.ordered and next_run is not None
  416. try:
  417. if run.op_type == _INSERT:
  418. self.execute_insert_no_results(
  419. sock_info, run, op_id, needs_ack)
  420. elif run.op_type == _UPDATE:
  421. for operation in run.ops:
  422. doc = operation['u']
  423. check_keys = True
  424. if doc and next(iter(doc)).startswith('$'):
  425. check_keys = False
  426. coll._update(
  427. sock_info,
  428. operation['q'],
  429. doc,
  430. operation['upsert'],
  431. check_keys,
  432. operation['multi'],
  433. write_concern=write_concern,
  434. op_id=op_id,
  435. ordered=self.ordered,
  436. bypass_doc_val=self.bypass_doc_val)
  437. else:
  438. for operation in run.ops:
  439. coll._delete(sock_info,
  440. operation['q'],
  441. not operation['limit'],
  442. write_concern,
  443. op_id,
  444. self.ordered)
  445. except OperationFailure:
  446. if self.ordered:
  447. break
  448. def execute(self, write_concern, session):
  449. """Execute operations.
  450. """
  451. if not self.ops:
  452. raise InvalidOperation('No operations to execute')
  453. if self.executed:
  454. raise InvalidOperation('Bulk operations can '
  455. 'only be executed once.')
  456. self.executed = True
  457. write_concern = write_concern or self.collection.write_concern
  458. session = _validate_session_write_concern(session, write_concern)
  459. if self.ordered:
  460. generator = self.gen_ordered()
  461. else:
  462. generator = self.gen_unordered()
  463. client = self.collection.database.client
  464. if not write_concern.acknowledged:
  465. with client._socket_for_writes(session) as sock_info:
  466. self.execute_no_results(sock_info, generator)
  467. else:
  468. return self.execute_command(generator, write_concern, session)
  469. class BulkUpsertOperation(object):
  470. """An interface for adding upsert operations.
  471. """
  472. __slots__ = ('__selector', '__bulk', '__collation')
  473. def __init__(self, selector, bulk, collation):
  474. self.__selector = selector
  475. self.__bulk = bulk
  476. self.__collation = collation
  477. def update_one(self, update):
  478. """Update one document matching the selector.
  479. :Parameters:
  480. - `update` (dict): the update operations to apply
  481. """
  482. self.__bulk.add_update(self.__selector,
  483. update, multi=False, upsert=True,
  484. collation=self.__collation)
  485. def update(self, update):
  486. """Update all documents matching the selector.
  487. :Parameters:
  488. - `update` (dict): the update operations to apply
  489. """
  490. self.__bulk.add_update(self.__selector,
  491. update, multi=True, upsert=True,
  492. collation=self.__collation)
  493. def replace_one(self, replacement):
  494. """Replace one entire document matching the selector criteria.
  495. :Parameters:
  496. - `replacement` (dict): the replacement document
  497. """
  498. self.__bulk.add_replace(self.__selector, replacement, upsert=True,
  499. collation=self.__collation)
  500. class BulkWriteOperation(object):
  501. """An interface for adding update or remove operations.
  502. """
  503. __slots__ = ('__selector', '__bulk', '__collation')
  504. def __init__(self, selector, bulk, collation):
  505. self.__selector = selector
  506. self.__bulk = bulk
  507. self.__collation = collation
  508. def update_one(self, update):
  509. """Update one document matching the selector criteria.
  510. :Parameters:
  511. - `update` (dict): the update operations to apply
  512. """
  513. self.__bulk.add_update(self.__selector, update, multi=False,
  514. collation=self.__collation)
  515. def update(self, update):
  516. """Update all documents matching the selector criteria.
  517. :Parameters:
  518. - `update` (dict): the update operations to apply
  519. """
  520. self.__bulk.add_update(self.__selector, update, multi=True,
  521. collation=self.__collation)
  522. def replace_one(self, replacement):
  523. """Replace one entire document matching the selector criteria.
  524. :Parameters:
  525. - `replacement` (dict): the replacement document
  526. """
  527. self.__bulk.add_replace(self.__selector, replacement,
  528. collation=self.__collation)
  529. def remove_one(self):
  530. """Remove a single document matching the selector criteria.
  531. """
  532. self.__bulk.add_delete(self.__selector, _DELETE_ONE,
  533. collation=self.__collation)
  534. def remove(self):
  535. """Remove all documents matching the selector criteria.
  536. """
  537. self.__bulk.add_delete(self.__selector, _DELETE_ALL,
  538. collation=self.__collation)
  539. def upsert(self):
  540. """Specify that all chained update operations should be
  541. upserts.
  542. :Returns:
  543. - A :class:`BulkUpsertOperation` instance, used to add
  544. update operations to this bulk operation.
  545. """
  546. return BulkUpsertOperation(self.__selector, self.__bulk,
  547. self.__collation)
  548. class BulkOperationBuilder(object):
  549. """**DEPRECATED**: An interface for executing a batch of write operations.
  550. """
  551. __slots__ = '__bulk'
  552. def __init__(self, collection, ordered=True,
  553. bypass_document_validation=False):
  554. """**DEPRECATED**: Initialize a new BulkOperationBuilder instance.
  555. :Parameters:
  556. - `collection`: A :class:`~pymongo.collection.Collection` instance.
  557. - `ordered` (optional): If ``True`` all operations will be executed
  558. serially, in the order provided, and the entire execution will
  559. abort on the first error. If ``False`` operations will be executed
  560. in arbitrary order (possibly in parallel on the server), reporting
  561. any errors that occurred after attempting all operations. Defaults
  562. to ``True``.
  563. - `bypass_document_validation`: (optional) If ``True``, allows the
  564. write to opt-out of document level validation. Default is
  565. ``False``.
  566. .. note:: `bypass_document_validation` requires server version
  567. **>= 3.2**
  568. .. versionchanged:: 3.5
  569. Deprecated. Use :meth:`~pymongo.collection.Collection.bulk_write`
  570. instead.
  571. .. versionchanged:: 3.2
  572. Added bypass_document_validation support
  573. """
  574. self.__bulk = _Bulk(collection, ordered, bypass_document_validation)
  575. def find(self, selector, collation=None):
  576. """Specify selection criteria for bulk operations.
  577. :Parameters:
  578. - `selector` (dict): the selection criteria for update
  579. and remove operations.
  580. - `collation` (optional): An instance of
  581. :class:`~pymongo.collation.Collation`. This option is only
  582. supported on MongoDB 3.4 and above.
  583. :Returns:
  584. - A :class:`BulkWriteOperation` instance, used to add
  585. update and remove operations to this bulk operation.
  586. .. versionchanged:: 3.4
  587. Added the `collation` option.
  588. """
  589. validate_is_mapping("selector", selector)
  590. return BulkWriteOperation(selector, self.__bulk, collation)
  591. def insert(self, document):
  592. """Insert a single document.
  593. :Parameters:
  594. - `document` (dict): the document to insert
  595. .. seealso:: :ref:`writes-and-ids`
  596. """
  597. self.__bulk.add_insert(document)
  598. def execute(self, write_concern=None):
  599. """Execute all provided operations.
  600. :Parameters:
  601. - write_concern (optional): the write concern for this bulk
  602. execution.
  603. """
  604. if write_concern is not None:
  605. write_concern = WriteConcern(**write_concern)
  606. return self.__bulk.execute(write_concern, session=None)