| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 | 
							- # -*- coding: utf-8 -*-
 
- #!/usr/bin/env python
 
- """
 
- 将重复的用户按组归并
 
- """
 
- import os
 
- import sys
 
- #: current_dir - 2
 
- PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
 
- sys.path.insert(0, PROJECT_ROOT)
 
- from script.base import init_env, setup_logger
 
- init_env()
 
- import click
 
- from pymongo.collection import Collection
 
- from apilib.utils import Timer
 
- from apps.web.user.models import MyUser
 
- def get_pipeline(agg_collection_name):
 
-     group_stage = {
 
-         "_id": "$managerialOpenId",
 
-         "groups": {
 
-             "$push":
 
-                 {
 
-                     "balance": "$balance",
 
-                     "groupId": "$groupId",
 
-                     "total_recharged": "$total_recharged",
 
-                     "total_consumed": "$total_consumed",
 
-                 }
 
-         },
 
-         "origin": {
 
-             "$addToSet": "$_id"
 
-         },
 
-         #: take the latest one
 
-         "sex": {"$first": "$sex"},
 
-         "phoneOS": {"$first": "$phoneOS"},
 
-         "last_login": {"$first": "$last_login"},
 
-         "avatar": {"$first": "$avatar"},
 
-         "nickname": {"$first": "$nickname"},
 
-         "gateway": {"$first": "$gateway"},
 
-         "dateTimeAdded": {"$first": "$dateTimeAdded"},
 
-         "locations": {"$first": "$locations"},
 
-         "agentId": {"$first": "$agentId"},
 
-         "city": {"$first": "$city"},
 
-         "province": {"$first": "$province"},
 
-         "country": {"$first": "$country"},
 
-         "extra": {"$first": "$extra"},
 
-         #: To observe
 
-         "authAppId": {"$addToSet": "$authAppId"},
 
-         "managerialAppId": {"$addToSet": "$managerialAppId"},
 
-         "managerialOpenId": {"$addToSet": "$managerialOpenId"},
 
-         "payAppId": {"$addToSet": "$payAppId"},
 
-         "payOpenId": {"$addToSet": "$payOpenId"}
 
-     }
 
-     project_stage = dict.fromkeys(group_stage.keys(), 1)
 
-     project_stage['authAppId'] = {"$setDifference":["$authAppId", [""]]}
 
-     project_stage['managerialAppId'] = {"$setDifference":["$managerialAppId", [""]]}
 
-     project_stage['payAppId'] = {"$setDifference":["$payAppId", [""]]}
 
-     project_stage['payOpenId'] = {"$setDifference":["$payOpenId", [""]]}
 
-     pipeline = [
 
-         {"$match": {"managerialOpenId": {"$ne": "", "$exists": 1}}},
 
-         {"$sort": {"last_login": -1}},
 
-         {"$group": group_stage},
 
-         {"$project": project_stage},
 
-         {"$out": agg_collection_name}
 
-     ]
 
-     return pipeline
 
- @click.command()
 
- @click.option('-ac', '--agg_collection_name', default='merged_myusers')
 
- def plain_agg(agg_collection_name):
 
-     t = Timer()  # type: Timer
 
-     with t:
 
-         collection = MyUser.get_collection() # type: Collection
 
-         pipeline = get_pipeline(agg_collection_name)
 
-         print('pipeline is: %s' % (pipeline,))
 
-         result = collection.aggregate(pipeline, allowDiskUse=True)
 
-         print(list(result))
 
-     click.echo('time spent {} secs'.format(t.elapsed))
 
- if __name__ == '__main__':
 
-     plain_agg()
 
 
  |