# -*- coding: utf-8 -*- #!/usr/bin/env python """ 将重复的用户按组归并 """ import os import sys #: current_dir - 2 PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..') sys.path.insert(0, PROJECT_ROOT) from script.base import init_env, setup_logger init_env() import click from pymongo.collection import Collection from apilib.utils import Timer from apps.web.user.models import MyUser def get_pipeline(agg_collection_name): group_stage = { "_id": "$managerialOpenId", "groups": { "$push": { "balance": "$balance", "groupId": "$groupId", "total_recharged": "$total_recharged", "total_consumed": "$total_consumed", } }, "origin": { "$addToSet": "$_id" }, #: take the latest one "sex": {"$first": "$sex"}, "phoneOS": {"$first": "$phoneOS"}, "last_login": {"$first": "$last_login"}, "avatar": {"$first": "$avatar"}, "nickname": {"$first": "$nickname"}, "gateway": {"$first": "$gateway"}, "dateTimeAdded": {"$first": "$dateTimeAdded"}, "locations": {"$first": "$locations"}, "agentId": {"$first": "$agentId"}, "city": {"$first": "$city"}, "province": {"$first": "$province"}, "country": {"$first": "$country"}, "extra": {"$first": "$extra"}, #: To observe "authAppId": {"$addToSet": "$authAppId"}, "managerialAppId": {"$addToSet": "$managerialAppId"}, "managerialOpenId": {"$addToSet": "$managerialOpenId"}, "payAppId": {"$addToSet": "$payAppId"}, "payOpenId": {"$addToSet": "$payOpenId"} } project_stage = dict.fromkeys(group_stage.keys(), 1) project_stage['authAppId'] = {"$setDifference":["$authAppId", [""]]} project_stage['managerialAppId'] = {"$setDifference":["$managerialAppId", [""]]} project_stage['payAppId'] = {"$setDifference":["$payAppId", [""]]} project_stage['payOpenId'] = {"$setDifference":["$payOpenId", [""]]} pipeline = [ {"$match": {"managerialOpenId": {"$ne": "", "$exists": 1}}}, {"$sort": {"last_login": -1}}, {"$group": group_stage}, {"$project": project_stage}, {"$out": agg_collection_name} ] return pipeline @click.command() @click.option('-ac', '--agg_collection_name', default='merged_myusers') def plain_agg(agg_collection_name): t = Timer() # type: Timer with t: collection = MyUser.get_collection() # type: Collection pipeline = get_pipeline(agg_collection_name) print('pipeline is: %s' % (pipeline,)) result = collection.aggregate(pipeline, allowDiskUse=True) print(list(result)) click.echo('time spent {} secs'.format(t.elapsed)) if __name__ == '__main__': plain_agg()