upgrade_merge_user.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. """
  4. 将重复的用户按组归并
  5. """
  6. import os
  7. import sys
  8. #: current_dir - 2
  9. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  10. sys.path.insert(0, PROJECT_ROOT)
  11. from script.base import init_env, setup_logger
  12. init_env()
  13. import click
  14. from pymongo.collection import Collection
  15. from apilib.utils import Timer
  16. from apps.web.user.models import MyUser
  17. def get_pipeline(agg_collection_name):
  18. group_stage = {
  19. "_id": "$managerialOpenId",
  20. "groups": {
  21. "$push":
  22. {
  23. "balance": "$balance",
  24. "groupId": "$groupId",
  25. "total_recharged": "$total_recharged",
  26. "total_consumed": "$total_consumed",
  27. }
  28. },
  29. "origin": {
  30. "$addToSet": "$_id"
  31. },
  32. #: take the latest one
  33. "sex": {"$first": "$sex"},
  34. "phoneOS": {"$first": "$phoneOS"},
  35. "last_login": {"$first": "$last_login"},
  36. "avatar": {"$first": "$avatar"},
  37. "nickname": {"$first": "$nickname"},
  38. "gateway": {"$first": "$gateway"},
  39. "dateTimeAdded": {"$first": "$dateTimeAdded"},
  40. "locations": {"$first": "$locations"},
  41. "agentId": {"$first": "$agentId"},
  42. "city": {"$first": "$city"},
  43. "province": {"$first": "$province"},
  44. "country": {"$first": "$country"},
  45. "extra": {"$first": "$extra"},
  46. #: To observe
  47. "authAppId": {"$addToSet": "$authAppId"},
  48. "managerialAppId": {"$addToSet": "$managerialAppId"},
  49. "managerialOpenId": {"$addToSet": "$managerialOpenId"},
  50. "payAppId": {"$addToSet": "$payAppId"},
  51. "payOpenId": {"$addToSet": "$payOpenId"}
  52. }
  53. project_stage = dict.fromkeys(group_stage.keys(), 1)
  54. project_stage['authAppId'] = {"$setDifference":["$authAppId", [""]]}
  55. project_stage['managerialAppId'] = {"$setDifference":["$managerialAppId", [""]]}
  56. project_stage['payAppId'] = {"$setDifference":["$payAppId", [""]]}
  57. project_stage['payOpenId'] = {"$setDifference":["$payOpenId", [""]]}
  58. pipeline = [
  59. {"$match": {"managerialOpenId": {"$ne": "", "$exists": 1}}},
  60. {"$sort": {"last_login": -1}},
  61. {"$group": group_stage},
  62. {"$project": project_stage},
  63. {"$out": agg_collection_name}
  64. ]
  65. return pipeline
  66. @click.command()
  67. @click.option('-ac', '--agg_collection_name', default='merged_myusers')
  68. def plain_agg(agg_collection_name):
  69. t = Timer() # type: Timer
  70. with t:
  71. collection = MyUser.get_collection() # type: Collection
  72. pipeline = get_pipeline(agg_collection_name)
  73. print('pipeline is: %s' % (pipeline,))
  74. result = collection.aggregate(pipeline, allowDiskUse=True)
  75. print(list(result))
  76. click.echo('time spent {} secs'.format(t.elapsed))
  77. if __name__ == '__main__':
  78. plain_agg()