utils_mongo.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. from pymongo import InsertOne, UpdateOne, DeleteOne
  4. from pymongo.errors import BulkWriteError
  5. from typing import Dict
  6. class BulkHandler(object):
  7. def __init__(self, collection):
  8. self.collection = collection
  9. self.bulk = collection.initialize_unordered_bulk_op()
  10. def insert(self, insert_dict):
  11. self.bulk.insert(insert_dict)
  12. def update(self, query_dict, update_dict):
  13. self.bulk.find(query_dict).update(update_dict)
  14. def upsert(self, query_dict, update_dict):
  15. self.bulk.find(query_dict).upsert().update(update_dict)
  16. def delete(self, query_dict):
  17. self.bulk.find(query_dict).remove()
  18. def execute(self):
  19. result = {'success': 0, 'info': 0}
  20. try:
  21. if len(self.bulk._BulkOperationBuilder__bulk.ops) != 0:
  22. result['info'] = self.bulk.execute()
  23. result['success'] = 1
  24. else:
  25. result['info'] = 'no operation to execute'
  26. result['success'] = 1
  27. except Exception as e:
  28. result['info'] = e
  29. result['success'] = 0
  30. return result
  31. class BulkHandlerEx(object):
  32. def __init__(self, collection):
  33. self.collection = collection
  34. self.requests = []
  35. def insert(self, insert_dict):
  36. self.requests.append(InsertOne(insert_dict))
  37. def update(self, query_dict, update_dict):
  38. self.requests.append(UpdateOne(query_dict, update_dict, upsert = False))
  39. def upsert(self, query_dict, update_dict):
  40. self.requests.append(UpdateOne(query_dict, update_dict, upsert = True))
  41. def delete(self, query_dict):
  42. self.requests.append(DeleteOne(query_dict))
  43. def execute(self, ordered = False):
  44. result = {'success': 0, 'info': 0}
  45. try:
  46. if len(self.requests) != 0:
  47. reuslt = self.collection.bulk_write(self.requests, ordered = ordered)
  48. result['info'] = reuslt.bulk_api_result
  49. result['success'] = 1
  50. else:
  51. result['info'] = {'writeErrors': list(), 'desc': 'no operation to execute'}
  52. result['success'] = 2
  53. except BulkWriteError as e:
  54. result['info'] = e.details
  55. result['success'] = 1
  56. except Exception as e:
  57. result['info'] = e
  58. result['success'] = 0
  59. return result
  60. def format_dot_key(rule_dict, to_dot = False):
  61. # type: (Dict, bool)->Dict
  62. rv = {}
  63. for k, v in rule_dict.items():
  64. if to_dot:
  65. rv[k.replace('-', '.')] = v
  66. else:
  67. rv[k.replace('.', '-')] = v
  68. return rv
  69. def dict_field_with_money(mydict):
  70. for key, value in mydict.iteritems():
  71. if hasattr(value, 'mongo_amount'):
  72. mydict[key] = value.mongo_amount
  73. else:
  74. mydict[key] = value
  75. return mydict