utils_mongo.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. from pymongo import InsertOne, UpdateOne, DeleteOne
  4. from pymongo.errors import BulkWriteError
  5. from typing import Dict
  6. class BulkHandler(object):
  7. def __init__(self, collection):
  8. self.collection = collection
  9. self.bulk = collection.initialize_unordered_bulk_op()
  10. def insert(self, insert_dict):
  11. self.bulk.insert(insert_dict)
  12. def update(self, query_dict, update_dict):
  13. self.bulk.find(query_dict).update(update_dict)
  14. def upsert(self, query_dict, update_dict):
  15. self.bulk.find(query_dict).upsert().update(update_dict)
  16. def delete(self, query_dict):
  17. self.bulk.find(query_dict).remove()
  18. def execute(self):
  19. result = {'success': 0, 'info': 0}
  20. try:
  21. if len(self.bulk._BulkOperationBuilder__bulk.ops) != 0:
  22. result['info'] = self.bulk.execute()
  23. result['success'] = 1
  24. else:
  25. result['info'] = 'no operation to execute'
  26. result['success'] = 1
  27. except Exception as e:
  28. result['info'] = e
  29. result['success'] = 0
  30. return result
  31. class BulkHandlerEx(object):
  32. def __init__(self, collection):
  33. self.collection = collection
  34. self.requests = []
  35. def insert(self, insert_dict):
  36. self.requests.append(InsertOne(insert_dict))
  37. def update(self, query_dict, update_dict):
  38. self.requests.append(UpdateOne(query_dict, update_dict, upsert = False))
  39. def upsert(self, query_dict, update_dict):
  40. self.requests.append(UpdateOne(query_dict, update_dict, upsert = True))
  41. def delete(self, query_dict):
  42. self.requests.append(DeleteOne(query_dict))
  43. def execute(self, ordered = False):
  44. result = {'success': 0, 'info': 0}
  45. try:
  46. if len(self.requests) != 0:
  47. reuslt = self.collection.bulk_write(self.requests, ordered = ordered)
  48. result['info'] = reuslt.bulk_api_result
  49. result['success'] = 1
  50. else:
  51. result['info'] = {'writeErrors': list(), 'desc': 'no operation to execute'}
  52. result['success'] = 2
  53. except BulkWriteError as e:
  54. result['info'] = e.details
  55. result['success'] = 1
  56. except Exception as e:
  57. result['info'] = e
  58. result['success'] = 0
  59. return result
  60. def format_dot_key(rule_dict, to_dot = False):
  61. # type: (Dict, bool)->Dict
  62. rv = {}
  63. for k, v in rule_dict.items():
  64. if to_dot:
  65. rv[k.replace('-', '.')] = v
  66. else:
  67. rv[k.replace('.', '-')] = v
  68. return rv