# -*- coding: utf-8 -*- # !/usr/bin/env python """ Goals: 1. 从收益来源模型里生成经销商的收益代理模型 2. 加载缓存 """ import datetime import itertools import logging import os import sys from collections import defaultdict from functools import partial from mongoengine import Q PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..') sys.path.insert(0, PROJECT_ROOT) from script.base import init_env init_env(interactive = True) import daiquiri import click from pymongo.errors import AutoReconnect from bson.objectid import ObjectId from apps.web.constant import Const, USER_RECHARGE_TYPE from apps.web.user.models import ConsumeRecord, RechargeRecord, CardRechargeRecord from apps.web.dealer.models import Dealer from apps.web.dealer.proxy import (DealerIncomeProxy, generate_recharge_title, record_income_proxy) from apps.web.report.models import GroupDailyStat, GroupMonthlyStat from apps.web.report.utils import record_income_stats, record_consumption_stats, logger from apps.web.device.models import Group from apilib.utils_mongo import BulkHandler from apps.web.report.ledger import Ledger flatten = itertools.chain.from_iterable daiquiri.setup( level = logging.INFO, outputs = ( daiquiri.output.Stream(sys.stdout, level = logging.INFO), daiquiri.output.TimedRotatingFile( 'upgrade_dealer_datacenter-everything.log', level = logging.DEBUG, interval = datetime.timedelta(days = 1)) ) ) logger = logging.getLogger(__name__) @click.group() def cli(): """ 更新经销商数据中心 :return: """ logger.info('cli called') def generate_bulk(collection): return collection.initialize_unordered_bulk_op() # @retry(retry=retry_if_exception_type(AutoReconnect), stop=stop_after_attempt(7)) def execute_bulk(bulk): result = {'success': 0, 'info': 0} try: if len(bulk._BulkOperationBuilder__bulk.ops) != 0: result['info'] = bulk.execute() result['success'] = 1 else: result['info'] = 'no operation to execute' result['success'] = 1 except AutoReconnect as e: logger.exception(e) raise e except Exception as e: logger.exception(e) result['info'] = e return result def __upgrade_record(clazz, chunk_size): items = [] count = 0 for record in clazz.get_collection().find(spec = {}, fields = {'_id': 1, 'time': 1, 'dateTimeAdded': 1}, timeout = False): count += 1 if ('time' not in record or not record['time']) and ( 'dateTimeAdded' not in record or not record['dateTimeAdded']): logger.info('%s; no time and dateTimeAdded; %s' % (count, record['_id'])) items.append({ '_id': record['_id'], 'dateTimeAdded': datetime.datetime.now() }) continue if 'time' not in record or not record['time']: logger.info('%s; no time; %s' % (count, record['_id'])) continue if 'dateTimeAdded' not in record or not record['dateTimeAdded']: logger.info('%s; no dateTimeAdded; %s' % (count, record['_id'])) items.append({ '_id': record['_id'], 'dateTimeAdded': datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S") }) continue if record['dateTimeAdded'].strftime('%Y-%m-%d %H:%M:%S') == record['time']: logger.info('%s; is equal; %s' % (count, record['_id'])) continue else: logger.info('%s; is not equal; %s' % (count, record['_id'])) items.append({ '_id': record['_id'], 'dateTimeAdded': datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S") }) bulker = BulkHandler(clazz.get_collection()) for item in items: auto_id = item.pop('_id') bulker.update(query_dict = {'_id': auto_id}, update_dict = {'$set': item}) if len(bulker.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size: result = bulker.execute() assert bool(result['success']), 'error happened' bulker = BulkHandler(clazz.get_collection()) if len(bulker.bulk._BulkOperationBuilder__bulk.ops) > 0: result = bulker.execute() assert bool(result['success']), 'error happened' bulker = BulkHandler(clazz.get_collection()) logger.info('hello') @cli.command() @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500) def upgrade_consume_record(chunk_size): __upgrade_record(ConsumeRecord, chunk_size) @cli.command() @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500) def upgrade_recharge_record(chunk_size): __upgrade_record(RechargeRecord, chunk_size) @cli.command() def update_card_recharge_record(): """add dealerId to every CardRechargeRecord""" CardRechargeRecordCollection = CardRechargeRecord.get_collection() card_recharge_record_bulk = BulkHandler(CardRechargeRecordCollection) count = 0 for record in CardRechargeRecordCollection.find({}): ownerId = ObjectId(Group.get_group(groupId = record['groupId'])['ownerId']) card_recharge_record_bulk.update({}, {'$set': {'ownerId': ownerId}}) count += 1 logger.info('updating CardRechargeRecord(id=%s) ownerId -> %s count=%d' % (record['_id'], ownerId, count)) if len(card_recharge_record_bulk.bulk._BulkOperationBuilder__bulk.ops) > 2000: card_recharge_record_bulk.execute() card_recharge_record_bulk = BulkHandler(DealerIncomeProxy.get_collection()) if len(card_recharge_record_bulk.bulk._BulkOperationBuilder__bulk.ops) > 0: card_recharge_record_bulk.execute() assert CardRechargeRecordCollection.find({'ownerId': {'$exists': True}}).count() \ == CardRechargeRecordCollection.find({}).count(), u'not all update_CardRechargeRecord updated' @cli.command() @click.option('-b', '--begin', help = u'begin', type = str, default = '') @click.option('-e', '--end', help = u'end', type = str, default = '') @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000) @click.option('-k', '--skip', help = u'skip count', type = int, default = 0) @click.option('-c', '--check', help = u'check', type = str, default = 'no') def build_income_stats(begin, end, limit, skip, check): logger.info('begin = %s; end = %s; limit = %d; skip = %s; check = %s' % (begin, end, limit, skip, check)) _filter = None if begin: _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}}) if end: if _filter: _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}}) else: _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}}) if not _filter: _filter = Q(__raw__ = {}) check_done = False if check == 'yes': check_done = True while True: logger.info('skip = %s' % (skip,)) proxies = [] for proxy in DealerIncomeProxy.get_collection().find(spec = _filter.to_query(DealerIncomeProxy), timeout = False).sort('_id', 1).limit( limit).skip(skip): proxies.append(proxy) if len(proxies) == 0: break for proxy in proxies: auto_id = proxy.pop('_id') proxy.update({'id': ObjectId(auto_id)}) proxy = DealerIncomeProxy(**proxy) record_income_stats(proxy, check = check_done, allowed = {'group': True}) skip = skip + limit @cli.command() @click.option('-b', '--begin', help = u'begin', type = str, default = '') @click.option('-e', '--end', help = u'end', type = str, default = '') @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000) @click.option('-k', '--skip', help = u'skip count', type = int, default = 0) @click.option('-s', '--chunk_size', help = u'chunk size', type = int, default = 5000) def bulk_build_income_stats(begin, end, limit, skip, chunk_size): logger.info('begin = %s; end = %s; limit = %d; skip = %s; chunk size = %s' % (begin, end, limit, skip, chunk_size)) _filter = None if begin: _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}}) if end: if _filter: _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}}) else: _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}}) if not _filter: _filter = Q(__raw__ = {}) while True: logger.info('skip = %s' % (skip,)) bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection()) bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection()) proxies = [] for proxy in DealerIncomeProxy.get_collection().find(spec = _filter.to_query(DealerIncomeProxy), timeout = False).sort('_id', 1).limit( limit).skip(skip): proxies.append(proxy) if len(proxies) == 0: break for proxy in proxies: auto_id = proxy.pop('_id') proxy.update({'id': ObjectId(auto_id)}) proxy = DealerIncomeProxy(**proxy) get_income_stats(proxy, bulkerGroupDailyStat, bulkerGroupMonthlyStat) if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size: result = bulkerGroupDailyStat.execute() assert bool(result['success']), 'error happened' bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection()) if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size: result = bulkerGroupMonthlyStat.execute() assert bool(result['success']), 'error happened' bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection()) if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) > 0: result = bulkerGroupDailyStat.execute() assert bool(result['success']), 'error happened' bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection()) if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) > 0: result = bulkerGroupMonthlyStat.execute() assert bool(result['success']), 'error happened' bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection()) skip = skip + limit @cli.command() @click.option('-b', '--begin', help = u'begin', type = str, default = '') @click.option('-e', '--end', help = u'end', type = str, default = '') @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000) @click.option('-k', '--skip', help = u'skip count', type = int, default = 0) @click.option('-c', '--check', help = u'check', type = str, default = 'no') def build_consume_stats(begin, end, limit, skip, check): logger.info('begin = %s; end = %s; limit = %d; skip = %s; check = %s' % (begin, end, limit, skip, check)) _filter = None if begin: _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}}) if end: if _filter: _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}}) else: _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}}) if not _filter: _filter = Q(__raw__ = {}) check_done = False if check == 'yes': check_done = True while True: logger.info('skip = %s' % skip) records = [] for record in ConsumeRecord.get_collection().find(spec = _filter.to_query(ConsumeRecord), timeout = False).sort('_id', 1).limit(limit).skip(skip): records.append(record) if len(records) == 0: break for record in records: if 'ownerId' not in record or not record.get('ownerId'): logger.debug('not find id or ownerId. %s' % record['_id']) continue auto_id = record.pop('_id') record.update({'id': ObjectId(auto_id)}) record_consumption_stats(record = ConsumeRecord(**record), check = check_done, allowed = {'group': True}) skip = skip + limit @cli.command() @click.option('-b', '--begin', help = u'begin', type = str, default = '') @click.option('-e', '--end', help = u'end', type = str, default = '') @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000) @click.option('-k', '--skip', help = u'skip count', type = int, default = 0) @click.option('-s', '--chunk_size', help = u'chunk size', type = int, default = 5000) def bulker_build_consume_stats(begin, end, limit, skip, chunk_size): logger.info('begin = %s; end = %s; limit = %d; skip = %s; chunk size = %s' % (begin, end, limit, skip, chunk_size)) _filter = None if begin: _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}}) if end: if _filter: _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}}) else: _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}}) if not _filter: _filter = Q(__raw__ = {}) while True: logger.info('skip = %s' % skip) bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection()) bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection()) records = [] for record in ConsumeRecord.get_collection().find(spec = _filter.to_query(ConsumeRecord), timeout = False).sort('_id', 1).limit(limit).skip(skip): records.append(record) if len(records) == 0: break for record in records: if 'ownerId' not in record or not record.get('ownerId'): logger.debug('not find id or ownerId. %s' % record['_id']) continue auto_id = record.pop('_id') record.update({'id': ObjectId(auto_id)}) get_consumption_stats(ConsumeRecord(**record), bulkerGroupDailyStat, bulkerGroupMonthlyStat) if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size: result = bulkerGroupDailyStat.execute() assert bool(result['success']), 'error happened' bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection()) if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size: result = bulkerGroupMonthlyStat.execute() assert bool(result['success']), 'error happened' bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection()) if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) > 0: result = bulkerGroupDailyStat.execute() assert bool(result['success']), 'error happened' bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection()) if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) > 0: result = bulkerGroupMonthlyStat.execute() assert bool(result['success']), 'error happened' bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection()) skip = skip + limit @cli.command() @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500) @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000) @click.option('-k', '--skip', help = u'skip count', type = int, default = 0) def generate_income_proxy(chunk_size, limit, skip): """ 从以往的充值订单,卡充值记录,广告来生成收益代理 :return: """ logger.info('chunk_size = %d; limit = %d; skip = %s' % (chunk_size, limit, skip)) DealerIncomeProxyCollection = DealerIncomeProxy.get_collection() income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection) def load_maps(): logger.debug('loading dealerMap...') dealerMap = { str(dealer['_id']): { 'agentProfitShare': dealer.get('agentProfitShare', 0.0), 'agentId': dealer['agentId'] } for dealer in Dealer.get_collection().find({}, {'agentProfitShare': 1, 'agentId': 1}) } logger.debug('loading groupMap...') groupMap = {} for group in Group.get_collection().find({}, {'partnerList': 1, 'ownerId': 1}): partnerDict = {} partnerList = group.get('partnerDict', []) for partner in partnerList: partnerDict[partner['id']] = partner groupMap.update({ str(group['_id']): { 'partnerDict': partnerDict, 'ownerId': group['ownerId'] } }) return dealerMap, groupMap # : insert rechargeRecords logger.info('loading recharge records') dealerMap, groupMap = load_maps() while True: logger.info('skip = %s' % skip) records = RechargeRecord.get_collection().find( {'via': {'$in': [USER_RECHARGE_TYPE.RECHARGE, USER_RECHARGE_TYPE.RECHARGE_CARD]}, 'result': 'success'}, {'ownerId': 1, 'groupId': 1, 'logicalCode': 1, 'devNo': 1, 'via': 1, 'money': 1, 'dateTimeAdded': 1, 'time': 1}, timeout = False).sort('_id', 1).limit( limit).skip(skip) if not records or records.count(True) == 0: break for record in records: # type: dict if record['_id'] and record.get('ownerId'): if ('time' not in record or not record['time']) and ( 'dateTimeAdded' not in record or not record['dateTimeAdded']): logger.info('no time and dateTimeAdded; %s' % (record['_id'], )) continue if ('dateTimeAdded' not in record) or (not record['dateTimeAdded']): logger.info('no dateTimeAdded; %s' % (record['_id'])) record['dateTimeAdded'] = datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S") else: if 'time' in record and record['time']: if record['dateTimeAdded'].strftime('%Y-%m-%d %H:%M:%S') != record['time']: logger.info('dateTimeAdded is not same with time; %s' % (record['_id'])) record['dateTimeAdded'] = datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S") group = groupMap.get(record['groupId']) if not group: logger.info('skipping income_proxy (rechargeRecord id=%s) given group is None' % (record['_id'])) continue partners = group['partnerDict'].values() dealer = dealerMap.get(record['ownerId']) if not dealer: logger.error('dealer not exist. dealer = %s' % record['ownerId']) continue partitionMap = Ledger.get_group_income_partition(dealer['agentId'], record['ownerId'], dealer['agentProfitShare'], group, record['money']) doc = { 'ref_id': ObjectId(record['_id']), 'dealerIds': [ObjectId(record['ownerId'])] + [ObjectId(partner['id']) for partner in partners], 'partition': list(flatten(partitionMap.values())), 'groupId': ObjectId(record['groupId']), 'logicalCode': record['logicalCode'], 'title': generate_recharge_title(devType = record.get('devType', ''), logicalCode = record['logicalCode']), 'source': record['via'], 'totalAmount': record['money'], 'actualAmountMap': { dealer['id']: dealer['money'] for dealer in partitionMap['owner'] + partitionMap['partner'] }, 'dateTimeAdded': record['dateTimeAdded'], 'date': record['dateTimeAdded'].strftime(Const.DATE_FMT), 'tags': '2018-10-01', 'desc': u'升级' } income_proxy_bulk.upsert({'ref_id': ObjectId(record['_id'])}, {'$set': doc}) if len(income_proxy_bulk.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size: result = income_proxy_bulk.execute() assert bool(result['success']), 'error happened' income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection) if len(income_proxy_bulk.bulk._BulkOperationBuilder__bulk.ops) > 0: result = income_proxy_bulk.execute() assert bool(result['success']), 'error happened' income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection) skip = skip + limit @cli.command() def post_build_consume_stats(): records = ConsumeRecord.objects().filter( dateTimeAdded__gte = datetime.datetime.strptime('2018-09-27 16:47:48', "%Y-%m-%d %H:%M:%S")) for record in records: if not record.ownerId: logger.info('no dealer id. %s' % str(record.id)) record_consumption_stats(record, check = True) @cli.command() def post_generate_income_proxy(): """ 从以往的充值订单,卡充值记录,广告来生成收益代理 :return: """ def load_maps(): logger.debug('loading dealerMap...') dealerMap = { str(dealer['_id']): { 'agentProfitShare': dealer.get('agentProfitShare', 0.0), 'agentId': dealer['agentId'] } for dealer in Dealer.get_collection().find({}, {'agentProfitShare': 1, 'agentId': 1}) } logger.debug('loading groupMap...') groupMap = {} for group in Group.get_collection().find({}, {'partnerList': 1, 'ownerId': 1}): partnerDict = {} partnerList = group.get('partnerDict', []) for partner in partnerList: partnerDict[partner['id']] = partner groupMap.update({ str(group['_id']): { 'partnerDict': partnerDict, 'ownerId': group['ownerId'] } }) return dealerMap, groupMap # : insert rechargeRecords logger.debug('loading recharge records') dealerMap, groupMap = load_maps() records = [] for record in RechargeRecord.get_collection().find( dateTimeAdded__gte = datetime.datetime.strptime('2018-09-27 00:00:00', "%Y-%m-%d %H:%M:%S")): # type: dict auto_id = record.pop('_id') record.update({'id': ObjectId(auto_id)}) records.append(record) for record in records: # type: RechargeRecord logger.debug('to do record is %s' % record.to_dict()) if not record.ownerId: logger.info('no dealer id') group = groupMap.get(record.groupId) if not group: logger.info('skipping income_proxy (rechargeRecord id=%s) given group is None' % (str(record.id))) continue dealer = dealerMap.get(record.ownerId) if not dealer: logger.error('dealer not exist. dealer = %s' % record.ownerId) continue partitionMap = Ledger.get_group_income_partition(dealer['agentId'], record.ownerId, dealer['agentProfitShare'], group, float(record.money)) proxy = record_income_proxy(record.via, record, partitionMap, record.dateTimeAdded) record_income_stats(proxy, True) def validate_stats_are_correct(): pass def validate_income_proxies_are_correct(): pass if __name__ == '__main__': cli() def get_consumption_stats(record, bulkerGroupDailyStat, bulkerGroupMonthlyStat): # type:(ConsumeRecord)->None """ 记录消费 :param record: :param bulkerGroupDailyStat: :param bulkerGroupMonthlyStat: :return: """ try: dt = record.dateTimeAdded day = dt.day hour = dt.hour report_monthly_date = dt.strftime('%Y-%m') report_daily_date = dt.strftime('%Y-%m-%d') def build_query(initial, extra): # type: (dict, dict)->dict rv = initial.copy() rv.update(extra) return rv build_daily_query = partial(build_query, {'date': report_daily_date}) build_monthly_query = partial(build_query, {'date': report_monthly_date}) def build_daily_update(kind_amount_map): kam = kind_amount_map.copy() kam.setdefault('coin', record.coin) rv = defaultdict(dict) try: for kind, amount in kam.iteritems(): rv['$inc']['daily.consumption.{kind}'.format(kind=kind)] = amount rv['$inc']['hourly.{hour}.consumption.{kind}'.format(hour=hour, kind=kind)] = amount rv.update( { '$addToSet': {'origin.consumption': record.id}, } ) except Exception as e: logger.error(str(kam)) raise e return rv def build_monthly_update(kind_amount_map): kam = kind_amount_map.copy() kam.setdefault('coin', record.coin) rv = defaultdict(dict) for kind, amount in kam.iteritems(): rv['$inc']['daily.{day}.consumption.{kind}'.format(day=day, kind=kind)] = amount rv['$inc']['monthly.consumption.{kind}'.format(kind=kind)] = amount rv.update( { '$addToSet': {'origin.consumption': record.id}, } ) return rv #: group group_daily_query = build_daily_query({'groupId': ObjectId(record.groupId)}) group_monthly_query = build_monthly_query({'groupId': ObjectId(record.groupId)}) group_daily_update = build_daily_update(record.aggInfo) group_monthly_update = build_monthly_update(record.aggInfo) bulkerGroupDailyStat.upsert(group_daily_query, group_daily_update) bulkerGroupMonthlyStat.upsert(group_monthly_query, group_monthly_update) except Exception as e: logger.exception(e) def get_income_stats(proxy, bulkerGroupDailyStat, bulkerGroupMonthlyStat): # type:(DealerIncomeProxy)->None """ 升级工具用 :param proxy: :param bulkerGroupDailyStat: :param bulkerGroupMonthlyStat: :return: """ try: dt = proxy.dateTimeAdded day = dt.day hour = dt.hour report_monthly_date = dt.strftime('%Y-%m') report_daily_date = dt.strftime('%Y-%m-%d') def build_query(initial, extra): rv = initial.copy() rv.update(extra) return rv build_daily_query = partial(build_query, {'date': report_daily_date}) build_monthly_query = partial(build_query, {'date': report_monthly_date}) def build_daily_update(amount): return { '$inc': { 'daily.income.{source}'.format(source = proxy.source): amount, 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): amount, 'daily.totalIncome': amount, }, '$addToSet': {'origin.income': proxy.id} } def build_monthly_update(amount): return { '$inc': { 'daily.{day}.income.{source}'.format(day = day, source = proxy.source): amount, 'daily.{day}.totalIncome'.format(day = day): amount, 'monthly.income.{source}'.format(source = proxy.source, ): amount, 'monthly.totalIncome': amount, }, '$addToSet': {'origin.income': proxy.id} } # : group group_daily_query = build_daily_query({'groupId': proxy.groupId}) group_monthly_query = build_monthly_query({'groupId': proxy.groupId}) group_daily_update = build_daily_update(proxy.totalAmount) group_monthly_update = build_monthly_update(proxy.totalAmount) bulkerGroupDailyStat.upsert(group_daily_query, group_daily_update) bulkerGroupMonthlyStat.upsert(group_monthly_query, group_monthly_update) except Exception as e: logger.exception(e)