123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- """
- 将重复的用户按组归并
- """
- import os
- import sys
- #: current_dir - 2
- PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
- sys.path.insert(0, PROJECT_ROOT)
- from script.base import init_env, setup_logger
- init_env()
- import click
- from pymongo.collection import Collection
- from apilib.utils import Timer
- from apps.web.user.models import MyUser
- def get_pipeline(agg_collection_name):
- group_stage = {
- "_id": "$managerialOpenId",
- "groups": {
- "$push":
- {
- "balance": "$balance",
- "groupId": "$groupId",
- "total_recharged": "$total_recharged",
- "total_consumed": "$total_consumed",
- }
- },
- "origin": {
- "$addToSet": "$_id"
- },
- #: take the latest one
- "sex": {"$first": "$sex"},
- "phoneOS": {"$first": "$phoneOS"},
- "last_login": {"$first": "$last_login"},
- "avatar": {"$first": "$avatar"},
- "nickname": {"$first": "$nickname"},
- "gateway": {"$first": "$gateway"},
- "dateTimeAdded": {"$first": "$dateTimeAdded"},
- "locations": {"$first": "$locations"},
- "agentId": {"$first": "$agentId"},
- "city": {"$first": "$city"},
- "province": {"$first": "$province"},
- "country": {"$first": "$country"},
- "extra": {"$first": "$extra"},
- #: To observe
- "authAppId": {"$addToSet": "$authAppId"},
- "managerialAppId": {"$addToSet": "$managerialAppId"},
- "managerialOpenId": {"$addToSet": "$managerialOpenId"},
- "payAppId": {"$addToSet": "$payAppId"},
- "payOpenId": {"$addToSet": "$payOpenId"}
- }
- project_stage = dict.fromkeys(group_stage.keys(), 1)
- project_stage['authAppId'] = {"$setDifference":["$authAppId", [""]]}
- project_stage['managerialAppId'] = {"$setDifference":["$managerialAppId", [""]]}
- project_stage['payAppId'] = {"$setDifference":["$payAppId", [""]]}
- project_stage['payOpenId'] = {"$setDifference":["$payOpenId", [""]]}
- pipeline = [
- {"$match": {"managerialOpenId": {"$ne": "", "$exists": 1}}},
- {"$sort": {"last_login": -1}},
- {"$group": group_stage},
- {"$project": project_stage},
- {"$out": agg_collection_name}
- ]
- return pipeline
- @click.command()
- @click.option('-ac', '--agg_collection_name', default='merged_myusers')
- def plain_agg(agg_collection_name):
- t = Timer() # type: Timer
- with t:
- collection = MyUser.get_collection() # type: Collection
- pipeline = get_pipeline(agg_collection_name)
- print('pipeline is: %s' % (pipeline,))
- result = collection.aggregate(pipeline, allowDiskUse=True)
- print(list(result))
- click.echo('time spent {} secs'.format(t.elapsed))
- if __name__ == '__main__':
- plain_agg()
|