1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- from pymongo import InsertOne, UpdateOne, DeleteOne
- from pymongo.errors import BulkWriteError
- from typing import Dict
- class BulkHandler(object):
- def __init__(self, collection):
- self.collection = collection
- self.bulk = collection.initialize_unordered_bulk_op()
- def insert(self, insert_dict):
- self.bulk.insert(insert_dict)
- def update(self, query_dict, update_dict):
- self.bulk.find(query_dict).update(update_dict)
- def upsert(self, query_dict, update_dict):
- self.bulk.find(query_dict).upsert().update(update_dict)
- def delete(self, query_dict):
- self.bulk.find(query_dict).remove()
- def execute(self):
- result = {'success': 0, 'info': 0}
- try:
- if len(self.bulk._BulkOperationBuilder__bulk.ops) != 0:
- result['info'] = self.bulk.execute()
- result['success'] = 1
- else:
- result['info'] = 'no operation to execute'
- result['success'] = 1
- except Exception as e:
- result['info'] = e
- result['success'] = 0
- return result
- class BulkHandlerEx(object):
- def __init__(self, collection):
- self.collection = collection
- self.requests = []
- def insert(self, insert_dict):
- self.requests.append(InsertOne(insert_dict))
- def update(self, query_dict, update_dict):
- self.requests.append(UpdateOne(query_dict, update_dict, upsert = False))
- def upsert(self, query_dict, update_dict):
- self.requests.append(UpdateOne(query_dict, update_dict, upsert = True))
- def delete(self, query_dict):
- self.requests.append(DeleteOne(query_dict))
- def execute(self, ordered = False):
- result = {'success': 0, 'info': 0}
- try:
- if len(self.requests) != 0:
- reuslt = self.collection.bulk_write(self.requests, ordered = ordered)
- result['info'] = reuslt.bulk_api_result
- result['success'] = 1
- else:
- result['info'] = {'writeErrors': list(), 'desc': 'no operation to execute'}
- result['success'] = 2
- except BulkWriteError as e:
- result['info'] = e.details
- result['success'] = 1
- except Exception as e:
- result['info'] = e
- result['success'] = 0
- return result
- def format_dot_key(rule_dict, to_dot = False):
- # type: (Dict, bool)->Dict
- rv = {}
- for k, v in rule_dict.items():
- if to_dot:
- rv[k.replace('-', '.')] = v
- else:
- rv[k.replace('.', '-')] = v
- return rv
|