123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """
- Goals:
- 1. 从收益来源模型里生成经销商的收益代理模型
- 2. 加载缓存
- """
- import datetime
- import itertools
- import logging
- import os
- import sys
- from collections import defaultdict
- from functools import partial
- from mongoengine import Q
- PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
- sys.path.insert(0, PROJECT_ROOT)
- from script.base import init_env
- init_env(interactive = True)
- import daiquiri
- import click
- from pymongo.errors import AutoReconnect
- from bson.objectid import ObjectId
- from apps.web.constant import Const, USER_RECHARGE_TYPE
- from apps.web.user.models import ConsumeRecord, RechargeRecord, CardRechargeRecord
- from apps.web.dealer.models import Dealer
- from apps.web.dealer.proxy import (DealerIncomeProxy,
- generate_recharge_title, record_income_proxy)
- from apps.web.report.models import GroupDailyStat, GroupMonthlyStat
- from apps.web.report.utils import record_income_stats, record_consumption_stats, logger
- from apps.web.device.models import Group
- from apilib.utils_mongo import BulkHandler
- from apps.web.report.ledger import Ledger
- flatten = itertools.chain.from_iterable
- daiquiri.setup(
- level = logging.INFO,
- outputs = (
- daiquiri.output.Stream(sys.stdout, level = logging.INFO),
- daiquiri.output.TimedRotatingFile(
- 'upgrade_dealer_datacenter-everything.log',
- level = logging.DEBUG,
- interval = datetime.timedelta(days = 1))
- )
- )
- logger = logging.getLogger(__name__)
- @click.group()
- def cli():
- """
- 更新经销商数据中心
- :return:
- """
- logger.info('cli called')
- def generate_bulk(collection):
- return collection.initialize_unordered_bulk_op()
- # @retry(retry=retry_if_exception_type(AutoReconnect), stop=stop_after_attempt(7))
- def execute_bulk(bulk):
- result = {'success': 0, 'info': 0}
- try:
- if len(bulk._BulkOperationBuilder__bulk.ops) != 0:
- result['info'] = bulk.execute()
- result['success'] = 1
- else:
- result['info'] = 'no operation to execute'
- result['success'] = 1
- except AutoReconnect as e:
- logger.exception(e)
- raise e
- except Exception as e:
- logger.exception(e)
- result['info'] = e
- return result
- def __upgrade_record(clazz, chunk_size):
- items = []
- count = 0
- for record in clazz.get_collection().find(spec = {}, fields = {'_id': 1, 'time': 1, 'dateTimeAdded': 1},
- timeout = False):
- count += 1
- if ('time' not in record or not record['time']) and (
- 'dateTimeAdded' not in record or not record['dateTimeAdded']):
- logger.info('%s; no time and dateTimeAdded; %s' % (count, record['_id']))
- items.append({
- '_id': record['_id'],
- 'dateTimeAdded': datetime.datetime.now()
- })
- continue
- if 'time' not in record or not record['time']:
- logger.info('%s; no time; %s' % (count, record['_id']))
- continue
- if 'dateTimeAdded' not in record or not record['dateTimeAdded']:
- logger.info('%s; no dateTimeAdded; %s' % (count, record['_id']))
- items.append({
- '_id': record['_id'],
- 'dateTimeAdded': datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S")
- })
- continue
- if record['dateTimeAdded'].strftime('%Y-%m-%d %H:%M:%S') == record['time']:
- logger.info('%s; is equal; %s' % (count, record['_id']))
- continue
- else:
- logger.info('%s; is not equal; %s' % (count, record['_id']))
- items.append({
- '_id': record['_id'],
- 'dateTimeAdded': datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S")
- })
- bulker = BulkHandler(clazz.get_collection())
- for item in items:
- auto_id = item.pop('_id')
- bulker.update(query_dict = {'_id': auto_id}, update_dict = {'$set': item})
- if len(bulker.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
- result = bulker.execute()
- assert bool(result['success']), 'error happened'
- bulker = BulkHandler(clazz.get_collection())
- if len(bulker.bulk._BulkOperationBuilder__bulk.ops) > 0:
- result = bulker.execute()
- assert bool(result['success']), 'error happened'
- bulker = BulkHandler(clazz.get_collection())
- logger.info('hello')
- @cli.command()
- @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500)
- def upgrade_consume_record(chunk_size):
- __upgrade_record(ConsumeRecord, chunk_size)
- @cli.command()
- @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500)
- def upgrade_recharge_record(chunk_size):
- __upgrade_record(RechargeRecord, chunk_size)
- @cli.command()
- def update_card_recharge_record():
- """add dealerId to every CardRechargeRecord"""
- CardRechargeRecordCollection = CardRechargeRecord.get_collection()
- card_recharge_record_bulk = BulkHandler(CardRechargeRecordCollection)
- count = 0
- for record in CardRechargeRecordCollection.find({}):
- ownerId = ObjectId(Group.get_group(groupId = record['groupId'])['ownerId'])
- card_recharge_record_bulk.update({}, {'$set': {'ownerId': ownerId}})
- count += 1
- logger.info('updating CardRechargeRecord(id=%s) ownerId -> %s count=%d' % (record['_id'], ownerId, count))
- if len(card_recharge_record_bulk.bulk._BulkOperationBuilder__bulk.ops) > 2000:
- card_recharge_record_bulk.execute()
- card_recharge_record_bulk = BulkHandler(DealerIncomeProxy.get_collection())
- if len(card_recharge_record_bulk.bulk._BulkOperationBuilder__bulk.ops) > 0:
- card_recharge_record_bulk.execute()
- assert CardRechargeRecordCollection.find({'ownerId': {'$exists': True}}).count() \
- == CardRechargeRecordCollection.find({}).count(), u'not all update_CardRechargeRecord updated'
- @cli.command()
- @click.option('-b', '--begin', help = u'begin', type = str, default = '')
- @click.option('-e', '--end', help = u'end', type = str, default = '')
- @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
- @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
- @click.option('-c', '--check', help = u'check', type = str, default = 'no')
- def build_income_stats(begin, end, limit, skip, check):
- logger.info('begin = %s; end = %s; limit = %d; skip = %s; check = %s' % (begin, end, limit, skip, check))
- _filter = None
- if begin:
- _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}})
- if end:
- if _filter:
- _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
- else:
- _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
- if not _filter:
- _filter = Q(__raw__ = {})
- check_done = False
- if check == 'yes':
- check_done = True
- while True:
- logger.info('skip = %s' % (skip,))
- proxies = []
- for proxy in DealerIncomeProxy.get_collection().find(spec = _filter.to_query(DealerIncomeProxy),
- timeout = False).sort('_id', 1).limit(
- limit).skip(skip):
- proxies.append(proxy)
- if len(proxies) == 0:
- break
- for proxy in proxies:
- auto_id = proxy.pop('_id')
- proxy.update({'id': ObjectId(auto_id)})
- proxy = DealerIncomeProxy(**proxy)
- record_income_stats(proxy, check = check_done, allowed = {'group': True})
- skip = skip + limit
- @cli.command()
- @click.option('-b', '--begin', help = u'begin', type = str, default = '')
- @click.option('-e', '--end', help = u'end', type = str, default = '')
- @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
- @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
- @click.option('-s', '--chunk_size', help = u'chunk size', type = int, default = 5000)
- def bulk_build_income_stats(begin, end, limit, skip, chunk_size):
- logger.info('begin = %s; end = %s; limit = %d; skip = %s; chunk size = %s' % (begin, end, limit, skip, chunk_size))
- _filter = None
- if begin:
- _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}})
- if end:
- if _filter:
- _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
- else:
- _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
- if not _filter:
- _filter = Q(__raw__ = {})
- while True:
- logger.info('skip = %s' % (skip,))
- bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
- bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
- proxies = []
- for proxy in DealerIncomeProxy.get_collection().find(spec = _filter.to_query(DealerIncomeProxy),
- timeout = False).sort('_id', 1).limit(
- limit).skip(skip):
- proxies.append(proxy)
- if len(proxies) == 0:
- break
- for proxy in proxies:
- auto_id = proxy.pop('_id')
- proxy.update({'id': ObjectId(auto_id)})
- proxy = DealerIncomeProxy(**proxy)
- get_income_stats(proxy, bulkerGroupDailyStat, bulkerGroupMonthlyStat)
- if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
- result = bulkerGroupDailyStat.execute()
- assert bool(result['success']), 'error happened'
- bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
- if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
- result = bulkerGroupMonthlyStat.execute()
- assert bool(result['success']), 'error happened'
- bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
- if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) > 0:
- result = bulkerGroupDailyStat.execute()
- assert bool(result['success']), 'error happened'
- bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
- if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) > 0:
- result = bulkerGroupMonthlyStat.execute()
- assert bool(result['success']), 'error happened'
- bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
- skip = skip + limit
- @cli.command()
- @click.option('-b', '--begin', help = u'begin', type = str, default = '')
- @click.option('-e', '--end', help = u'end', type = str, default = '')
- @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
- @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
- @click.option('-c', '--check', help = u'check', type = str, default = 'no')
- def build_consume_stats(begin, end, limit, skip, check):
- logger.info('begin = %s; end = %s; limit = %d; skip = %s; check = %s' % (begin, end, limit, skip, check))
- _filter = None
- if begin:
- _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}})
- if end:
- if _filter:
- _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
- else:
- _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
- if not _filter:
- _filter = Q(__raw__ = {})
- check_done = False
- if check == 'yes':
- check_done = True
- while True:
- logger.info('skip = %s' % skip)
- records = []
- for record in ConsumeRecord.get_collection().find(spec = _filter.to_query(ConsumeRecord),
- timeout = False).sort('_id', 1).limit(limit).skip(skip):
- records.append(record)
- if len(records) == 0:
- break
- for record in records:
- if 'ownerId' not in record or not record.get('ownerId'):
- logger.debug('not find id or ownerId. %s' % record['_id'])
- continue
- auto_id = record.pop('_id')
- record.update({'id': ObjectId(auto_id)})
- record_consumption_stats(record = ConsumeRecord(**record), check = check_done, allowed = {'group': True})
- skip = skip + limit
- @cli.command()
- @click.option('-b', '--begin', help = u'begin', type = str, default = '')
- @click.option('-e', '--end', help = u'end', type = str, default = '')
- @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
- @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
- @click.option('-s', '--chunk_size', help = u'chunk size', type = int, default = 5000)
- def bulker_build_consume_stats(begin, end, limit, skip, chunk_size):
- logger.info('begin = %s; end = %s; limit = %d; skip = %s; chunk size = %s' % (begin, end, limit, skip, chunk_size))
- _filter = None
- if begin:
- _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}})
- if end:
- if _filter:
- _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
- else:
- _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
- if not _filter:
- _filter = Q(__raw__ = {})
- while True:
- logger.info('skip = %s' % skip)
- bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
- bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
- records = []
- for record in ConsumeRecord.get_collection().find(spec = _filter.to_query(ConsumeRecord),
- timeout = False).sort('_id', 1).limit(limit).skip(skip):
- records.append(record)
- if len(records) == 0:
- break
- for record in records:
- if 'ownerId' not in record or not record.get('ownerId'):
- logger.debug('not find id or ownerId. %s' % record['_id'])
- continue
- auto_id = record.pop('_id')
- record.update({'id': ObjectId(auto_id)})
- get_consumption_stats(ConsumeRecord(**record), bulkerGroupDailyStat, bulkerGroupMonthlyStat)
- if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
- result = bulkerGroupDailyStat.execute()
- assert bool(result['success']), 'error happened'
- bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
- if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
- result = bulkerGroupMonthlyStat.execute()
- assert bool(result['success']), 'error happened'
- bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
- if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) > 0:
- result = bulkerGroupDailyStat.execute()
- assert bool(result['success']), 'error happened'
- bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
- if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) > 0:
- result = bulkerGroupMonthlyStat.execute()
- assert bool(result['success']), 'error happened'
- bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
- skip = skip + limit
- @cli.command()
- @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500)
- @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
- @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
- def generate_income_proxy(chunk_size, limit, skip):
- """
- 从以往的充值订单,卡充值记录,广告来生成收益代理
- :return:
- """
- logger.info('chunk_size = %d; limit = %d; skip = %s' % (chunk_size, limit, skip))
- DealerIncomeProxyCollection = DealerIncomeProxy.get_collection()
- income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection)
- def load_maps():
- logger.debug('loading dealerMap...')
- dealerMap = {
- str(dealer['_id']):
- {
- 'agentProfitShare': dealer.get('agentProfitShare', 0.0),
- 'agentId': dealer['agentId']
- }
- for dealer in Dealer.get_collection().find({}, {'agentProfitShare': 1, 'agentId': 1})
- }
- logger.debug('loading groupMap...')
- groupMap = {}
- for group in Group.get_collection().find({}, {'partnerList': 1, 'ownerId': 1}):
- partnerDict = {}
- partnerList = group.get('partnerDict', [])
- for partner in partnerList:
- partnerDict[partner['id']] = partner
- groupMap.update({
- str(group['_id']):
- {
- 'partnerDict': partnerDict,
- 'ownerId': group['ownerId']
- }
- })
- return dealerMap, groupMap
- # : insert rechargeRecords
- logger.info('loading recharge records')
- dealerMap, groupMap = load_maps()
- while True:
- logger.info('skip = %s' % skip)
- records = RechargeRecord.get_collection().find(
- {'via': {'$in': [USER_RECHARGE_TYPE.RECHARGE, USER_RECHARGE_TYPE.RECHARGE_CARD]}, 'result': 'success'},
- {'ownerId': 1, 'groupId': 1, 'logicalCode': 1, 'devNo': 1, 'via': 1, 'money': 1, 'dateTimeAdded': 1,
- 'time': 1},
- timeout = False).sort('_id', 1).limit(
- limit).skip(skip)
- if not records or records.count(True) == 0:
- break
- for record in records: # type: dict
- if record['_id'] and record.get('ownerId'):
- if ('time' not in record or not record['time']) and (
- 'dateTimeAdded' not in record or not record['dateTimeAdded']):
- logger.info('no time and dateTimeAdded; %s' % (record['_id'], ))
- continue
- if ('dateTimeAdded' not in record) or (not record['dateTimeAdded']):
- logger.info('no dateTimeAdded; %s' % (record['_id']))
- record['dateTimeAdded'] = datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S")
- else:
- if 'time' in record and record['time']:
- if record['dateTimeAdded'].strftime('%Y-%m-%d %H:%M:%S') != record['time']:
- logger.info('dateTimeAdded is not same with time; %s' % (record['_id']))
- record['dateTimeAdded'] = datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S")
- group = groupMap.get(record['groupId'])
- if not group:
- logger.info('skipping income_proxy (rechargeRecord id=%s) given group is None' % (record['_id']))
- continue
- partners = group['partnerDict'].values()
- dealer = dealerMap.get(record['ownerId'])
- if not dealer:
- logger.error('dealer not exist. dealer = %s' % record['ownerId'])
- continue
- partitionMap = Ledger.get_group_income_partition(dealer['agentId'], record['ownerId'],
- dealer['agentProfitShare'], group, record['money'])
- doc = {
- 'ref_id': ObjectId(record['_id']),
- 'dealerIds': [ObjectId(record['ownerId'])] + [ObjectId(partner['id']) for partner in partners],
- 'partition': list(flatten(partitionMap.values())),
- 'groupId': ObjectId(record['groupId']),
- 'logicalCode': record['logicalCode'],
- 'title': generate_recharge_title(devType = record.get('devType', ''),
- logicalCode = record['logicalCode']),
- 'source': record['via'],
- 'totalAmount': record['money'],
- 'actualAmountMap': {
- dealer['id']: dealer['money'] for dealer in partitionMap['owner'] + partitionMap['partner']
- },
- 'dateTimeAdded': record['dateTimeAdded'],
- 'date': record['dateTimeAdded'].strftime(Const.DATE_FMT),
- 'tags': '2018-10-01',
- 'desc': u'升级'
- }
- income_proxy_bulk.upsert({'ref_id': ObjectId(record['_id'])}, {'$set': doc})
- if len(income_proxy_bulk.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
- result = income_proxy_bulk.execute()
- assert bool(result['success']), 'error happened'
- income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection)
- if len(income_proxy_bulk.bulk._BulkOperationBuilder__bulk.ops) > 0:
- result = income_proxy_bulk.execute()
- assert bool(result['success']), 'error happened'
- income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection)
- skip = skip + limit
- @cli.command()
- def post_build_consume_stats():
- records = ConsumeRecord.objects().filter(
- dateTimeAdded__gte = datetime.datetime.strptime('2018-09-27 16:47:48', "%Y-%m-%d %H:%M:%S"))
- for record in records:
- if not record.ownerId:
- logger.info('no dealer id. %s' % str(record.id))
- record_consumption_stats(record, check = True)
- @cli.command()
- def post_generate_income_proxy():
- """
- 从以往的充值订单,卡充值记录,广告来生成收益代理
- :return:
- """
- def load_maps():
- logger.debug('loading dealerMap...')
- dealerMap = {
- str(dealer['_id']):
- {
- 'agentProfitShare': dealer.get('agentProfitShare', 0.0),
- 'agentId': dealer['agentId']
- }
- for dealer in Dealer.get_collection().find({}, {'agentProfitShare': 1, 'agentId': 1})
- }
- logger.debug('loading groupMap...')
- groupMap = {}
- for group in Group.get_collection().find({}, {'partnerList': 1, 'ownerId': 1}):
- partnerDict = {}
- partnerList = group.get('partnerDict', [])
- for partner in partnerList:
- partnerDict[partner['id']] = partner
- groupMap.update({
- str(group['_id']):
- {
- 'partnerDict': partnerDict,
- 'ownerId': group['ownerId']
- }
- })
- return dealerMap, groupMap
- # : insert rechargeRecords
- logger.debug('loading recharge records')
- dealerMap, groupMap = load_maps()
- records = []
- for record in RechargeRecord.get_collection().find(
- dateTimeAdded__gte = datetime.datetime.strptime('2018-09-27 00:00:00', "%Y-%m-%d %H:%M:%S")): # type: dict
- auto_id = record.pop('_id')
- record.update({'id': ObjectId(auto_id)})
- records.append(record)
- for record in records: # type: RechargeRecord
- logger.debug('to do record is %s' % record.to_dict())
- if not record.ownerId:
- logger.info('no dealer id')
- group = groupMap.get(record.groupId)
- if not group:
- logger.info('skipping income_proxy (rechargeRecord id=%s) given group is None' % (str(record.id)))
- continue
- dealer = dealerMap.get(record.ownerId)
- if not dealer:
- logger.error('dealer not exist. dealer = %s' % record.ownerId)
- continue
- partitionMap = Ledger.get_group_income_partition(dealer['agentId'], record.ownerId,
- dealer['agentProfitShare'], group, float(record.money))
- proxy = record_income_proxy(record.via, record, partitionMap, record.dateTimeAdded)
- record_income_stats(proxy, True)
- def validate_stats_are_correct():
- pass
- def validate_income_proxies_are_correct():
- pass
- if __name__ == '__main__':
- cli()
- def get_consumption_stats(record, bulkerGroupDailyStat, bulkerGroupMonthlyStat):
- # type:(ConsumeRecord)->None
- """
- 记录消费
- :param record:
- :param bulkerGroupDailyStat:
- :param bulkerGroupMonthlyStat:
- :return:
- """
- try:
- dt = record.dateTimeAdded
- day = dt.day
- hour = dt.hour
- report_monthly_date = dt.strftime('%Y-%m')
- report_daily_date = dt.strftime('%Y-%m-%d')
- def build_query(initial, extra):
- # type: (dict, dict)->dict
- rv = initial.copy()
- rv.update(extra)
- return rv
- build_daily_query = partial(build_query, {'date': report_daily_date})
- build_monthly_query = partial(build_query, {'date': report_monthly_date})
- def build_daily_update(kind_amount_map):
- kam = kind_amount_map.copy()
- kam.setdefault('coin', record.coin)
- rv = defaultdict(dict)
- try:
- for kind, amount in kam.iteritems():
- rv['$inc']['daily.consumption.{kind}'.format(kind=kind)] = amount
- rv['$inc']['hourly.{hour}.consumption.{kind}'.format(hour=hour, kind=kind)] = amount
- rv.update(
- {
- '$addToSet': {'origin.consumption': record.id},
- }
- )
- except Exception as e:
- logger.error(str(kam))
- raise e
- return rv
- def build_monthly_update(kind_amount_map):
- kam = kind_amount_map.copy()
- kam.setdefault('coin', record.coin)
- rv = defaultdict(dict)
- for kind, amount in kam.iteritems():
- rv['$inc']['daily.{day}.consumption.{kind}'.format(day=day, kind=kind)] = amount
- rv['$inc']['monthly.consumption.{kind}'.format(kind=kind)] = amount
- rv.update(
- {
- '$addToSet': {'origin.consumption': record.id},
- }
- )
- return rv
- #: group
- group_daily_query = build_daily_query({'groupId': ObjectId(record.groupId)})
- group_monthly_query = build_monthly_query({'groupId': ObjectId(record.groupId)})
- group_daily_update = build_daily_update(record.aggInfo)
- group_monthly_update = build_monthly_update(record.aggInfo)
- bulkerGroupDailyStat.upsert(group_daily_query, group_daily_update)
- bulkerGroupMonthlyStat.upsert(group_monthly_query, group_monthly_update)
- except Exception as e:
- logger.exception(e)
- def get_income_stats(proxy, bulkerGroupDailyStat, bulkerGroupMonthlyStat):
- # type:(DealerIncomeProxy)->None
- """
- 升级工具用
- :param proxy:
- :param bulkerGroupDailyStat:
- :param bulkerGroupMonthlyStat:
- :return:
- """
- try:
- dt = proxy.dateTimeAdded
- day = dt.day
- hour = dt.hour
- report_monthly_date = dt.strftime('%Y-%m')
- report_daily_date = dt.strftime('%Y-%m-%d')
- def build_query(initial, extra):
- rv = initial.copy()
- rv.update(extra)
- return rv
- build_daily_query = partial(build_query, {'date': report_daily_date})
- build_monthly_query = partial(build_query, {'date': report_monthly_date})
- def build_daily_update(amount):
- return {
- '$inc': {
- 'daily.income.{source}'.format(source = proxy.source): amount,
- 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): amount,
- 'daily.totalIncome': amount,
- },
- '$addToSet': {'origin.income': proxy.id}
- }
- def build_monthly_update(amount):
- return {
- '$inc': {
- 'daily.{day}.income.{source}'.format(day = day, source = proxy.source): amount,
- 'daily.{day}.totalIncome'.format(day = day): amount,
- 'monthly.income.{source}'.format(source = proxy.source, ): amount,
- 'monthly.totalIncome': amount,
- },
- '$addToSet': {'origin.income': proxy.id}
- }
- # : group
- group_daily_query = build_daily_query({'groupId': proxy.groupId})
- group_monthly_query = build_monthly_query({'groupId': proxy.groupId})
- group_daily_update = build_daily_update(proxy.totalAmount)
- group_monthly_update = build_monthly_update(proxy.totalAmount)
- bulkerGroupDailyStat.upsert(group_daily_query, group_daily_update)
- bulkerGroupMonthlyStat.upsert(group_monthly_query, group_monthly_update)
- except Exception as e:
- logger.exception(e)
|