123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- from pymongo import InsertOne, UpdateOne, DeleteOne
- from pymongo.errors import BulkWriteError
- from typing import Dict
- class BulkHandler(object):
- def __init__(self, collection):
- self.collection = collection
- self.bulk = collection.initialize_unordered_bulk_op()
- def insert(self, insert_dict):
- self.bulk.insert(insert_dict)
- def update(self, query_dict, update_dict):
- self.bulk.find(query_dict).update(update_dict)
- def upsert(self, query_dict, update_dict):
- self.bulk.find(query_dict).upsert().update(update_dict)
- def delete(self, query_dict):
- self.bulk.find(query_dict).remove()
- def execute(self):
- result = {'success': 0, 'info': 0}
- try:
- if len(self.bulk._BulkOperationBuilder__bulk.ops) != 0:
- result['info'] = self.bulk.execute()
- result['success'] = 1
- else:
- result['info'] = 'no operation to execute'
- result['success'] = 1
- except Exception as e:
- result['info'] = e
- result['success'] = 0
- return result
- class BulkHandlerEx(object):
- def __init__(self, collection):
- self.collection = collection
- self.requests = []
- def insert(self, insert_dict):
- self.requests.append(InsertOne(insert_dict))
- def update(self, query_dict, update_dict):
- self.requests.append(UpdateOne(query_dict, update_dict, upsert = False))
- def upsert(self, query_dict, update_dict):
- self.requests.append(UpdateOne(query_dict, update_dict, upsert = True))
- def delete(self, query_dict):
- self.requests.append(DeleteOne(query_dict))
- def execute(self, ordered = False):
- result = {'success': 0, 'info': 0}
- try:
- if len(self.requests) != 0:
- reuslt = self.collection.bulk_write(self.requests, ordered = ordered)
- result['info'] = reuslt.bulk_api_result
- result['success'] = 1
- else:
- result['info'] = {'writeErrors': list(), 'desc': 'no operation to execute'}
- result['success'] = 2
- except BulkWriteError as e:
- result['info'] = e.details
- result['success'] = 1
- except Exception as e:
- result['info'] = e
- result['success'] = 0
- return result
- def format_dot_key(rule_dict, to_dot = False):
- # type: (Dict, bool)->Dict
- rv = {}
- for k, v in rule_dict.items():
- if to_dot:
- rv[k.replace('-', '.')] = v
- else:
- rv[k.replace('.', '-')] = v
- return rv
- def dict_field_with_money(mydict):
- for key, value in mydict.iteritems():
- if hasattr(value, 'mongo_amount'):
- mydict[key] = value.mongo_amount
- else:
- mydict[key] = value
- return mydict
|