# -*- coding: utf-8 -*- # !/usr/bin/env python import logging from calendar import monthrange from collections import defaultdict from functools import partial import cytoolz from bson.decimal128 import Decimal128 from bson.objectid import ObjectId from cytoolz.dicttoolz import get_in, merge_with from mongoengine.document import Document from typing import TYPE_CHECKING, Tuple, Optional, Dict, Iterable, Iterator, Union, Callable, List from apilib.monetary import RMB, VirtualCoin from apilib.quantity import Quantity from apps.web.constant import ( DEALER_CONSUMPTION_AGG_KIND_TRANSLATION, DEALER_CONSUMPTION_AGG_KIND_UNIT, DEALER_CONSUMPTION_AGG_KIND_UNIT_PRECISION, DEALER_CONSUMPTION_AGG_KIND, PARTITION_ROLE) from apps.web.dealer.define import DEALER_INCOME_SOURCE_TRANSLATION from apps.web.device.models import Group from apps.web.report.models import DealerDailyStat, GroupDailyStat, DeviceDailyStat, \ DailyStat logger = logging.getLogger(__name__) from apps.web.dealer.proxy import DealerIncomeProxy if TYPE_CHECKING: from apps.web.user.models import ConsumeRecord def upsert_stat(model, query, params, identifier = '', log_error = True): # type: (Document, dict, dict, Optional[str], Optional[bool])->bool """ :param model: :param query: :param params: :param identifier: :param log_error: :return: """ result = model._get_collection().update(query, params, upsert = True) if not bool(result['n']): if log_error: logger.error( 'session(%s) update failed, model=%r, query=%r, params=%r' % (identifier, model, query, params)) return False return True def record_consumption_stats(record, check = False, allowed = None): # type:(ConsumeRecord, bool, Optional[dict, None])->None """ 记录消费 之前的设备 启动的时候会记录一次金币 结束的时候又会记录一次 导致用户的消费信息实际上是 记录了两倍 :param record: ConsumeRecord 消费记录 :param check: :param allowed :return: """ def records_already_handled(queryDict, model): queryDict.update({'origin.consumption': record.id}) return model.get_collection().find(queryDict).count() > 0 if not allowed: allowed = {'dealer': True, 'device': True, 'group': True} try: RECORD_SESSION_ID = str(record.id) _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID) dt = record.dateTimeAdded hour = dt.hour report_daily_date = dt.strftime('%Y-%m-%d') def build_query(initial, extra): # type: (dict, dict)->dict rv = initial.copy() rv.update(extra) return rv # 构建的查询条件 build_daily_query = partial(build_query, {'date': report_daily_date}) def build_daily_update(kind_amount_map): kam = kind_amount_map.copy() # kam.setdefault('coin', record.coin) rv = defaultdict(dict) for kind, amount in kam.iteritems(): rv['$inc']['daily.consumption.{kind}'.format(kind = kind)] = Decimal128(str(amount)) rv['$inc']['hourly.{hour}.consumption.{kind}'.format(hour = hour, kind = kind)] = Decimal128( str(amount)) rv.update( { '$addToSet': {'origin.consumption': record.id}, } ) # 把消费记录的条数也记录下来 rv['$inc']['daily.totalConsumptionCount'] = 1 return rv dealerIds = [ObjectId(record.ownerId)] + [ObjectId(_['id']) for _ in Group.get_group(record.groupId)['partnerDict'].values()] # 统计经销商的数据 if 'dealer' in allowed and allowed['dealer']: dealer_query = build_daily_query({'dealerId': {'$in': dealerIds}}) if not check or not records_already_handled(dealer_query, DealerDailyStat): for dealerId in dealerIds: dealer_daily_query = build_daily_query({'dealerId': ObjectId(dealerId)}) dealer_daily_update = build_daily_update(record.aggInfo) _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update) else: logger.debug('skipping %r for DealerDailyStat, already recorded' % (record,)) if check else None # 统计组地址的数据 if 'group' in allowed and allowed['group']: group_query = build_daily_query({'groupId': ObjectId(record.groupId)}) if not check or not records_already_handled(group_query, GroupDailyStat): group_daily_query = build_daily_query({'groupId': ObjectId(record.groupId)}) group_daily_update = build_daily_update(record.aggInfo) _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update) else: logger.debug('skipping %r for GroupDailyStat, already recorded' % (record,)) if check else None # 统计设备的数据 if 'device' in allowed and allowed['device']: device_query = build_daily_query({'logicalCode': record.logicalCode}) if not check or not records_already_handled(device_query, DeviceDailyStat): device_daily_query = build_daily_query({'logicalCode': record.logicalCode}) device_daily_update = build_daily_update(record.aggInfo) _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update) else: logger.debug('skipping %r for DeviceDailyStat, already recorded' % (record,)) if check else None except Exception as e: logger.error('record_consumption_stats failed, record=%r' % (record,)) logger.exception(e) def update_consumption_states(record, refund_coin, allowed = None): # type: (ConsumeRecord, VirtualCoin, Optional[None, dict]) -> None pass # 这个只在脚本里面有使用 应该是 历史数据的整合 def record_income_stats(proxy, check = False, allowed = None): # type:(DealerIncomeProxy, bool, Optional[None, dict])->None """ 记录收益 针对地址的统计 将收益的每一笔分账都统计到该记录的经销商上 :param proxy: 收益划分的详情 :param check: 是否需要check :param allowed 允许统计的模型 :return: """ if not allowed: allowed = {'dealer': True, 'device': True, 'group': True} try: assert isinstance(proxy, DealerIncomeProxy), 'proxy has to be a DealerIncomeProxy' RECORD_SESSION_ID = str(proxy.id) _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID) def proxy_already_handled(queryDict, model): queryDict.update({'origin.income': proxy.id}) return model.get_collection().find(queryDict).count() > 0 dt = proxy.dateTimeAdded hour = dt.hour report_daily_date = dt.strftime('%Y-%m-%d') def build_query(initial, extra): # type: (dict, dict)->dict rv = initial.copy() rv.update(extra) return rv build_daily_query = partial(build_query, {'date': report_daily_date}) def build_daily_update(amount): return { '$inc': { 'daily.income.{source}'.format(source = proxy.source): amount, 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): amount, 'daily.totalIncome': amount, 'daily.totalIncomeCount': 1 }, '$addToSet': {'origin.income': proxy.id} } if 'dealer' in allowed and allowed['dealer']: dealerIds_query = build_daily_query({'dealerId': {'$in': proxy.dealerIds}}) if not check or not proxy_already_handled(dealerIds_query, DealerDailyStat): for dealerId in proxy.dealerIds: dealer_daily_query = build_daily_query({'dealerId': dealerId}) dealer_daily_update = build_daily_update(proxy.actualAmountMap[str(dealerId)]) _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update) else: logger.debug('skipping %r for DealerDailyStat, already recorded' % (proxy,)) if check else None # : group if 'group' in allowed and allowed['group']: group_query = build_daily_query({'groupId': proxy.groupId}) if not check or not proxy_already_handled(group_query, GroupDailyStat): group_daily_query = build_daily_query({'groupId': proxy.groupId}) group_daily_update = build_daily_update(proxy.totalAmount) _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update) else: logger.debug('skipping %r for GroupDailyStat, already recorded' % (proxy,)) if check else None #: device if 'device' in allowed and allowed['device']: device_query = build_daily_query({'logicalCode': proxy.logicalCode}) if not check or not proxy_already_handled(device_query, DeviceDailyStat): device_daily_query = build_daily_query({'logicalCode': proxy.logicalCode}) device_daily_update = build_daily_update(proxy.totalAmount) _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update) else: logger.debug('skipping %r for DeviceDailyStat, already recorded' % (proxy,)) if check else None except Exception as e: logger.error('record_income_stats failed, proxy=%r' % (proxy,)) logger.exception(e) def update_income_stats(proxy, refund_partition, refund_fee, allowed = None): # type:(DealerIncomeProxy, dict, RMB, dict)->None """ 现金退费的时候会使用 主要处理从收益中将现金退款的部分减除掉 :param proxy: :param refund_partition: :param refund_fee: :param allowed: :return: """ if not allowed: allowed = {'dealer': True, 'device': True, 'group': True} try: assert isinstance(proxy, DealerIncomeProxy), 'proxy has to be a DealerIncomeProxy' RECORD_SESSION_ID = str(proxy.id) _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID) def proxy_already_handled(queryDict, model): queryDict.update({'origin.income': proxy.id}) return model.get_collection().find(queryDict).count() > 0 dt = proxy.dateTimeAdded hour = dt.hour report_daily_date = dt.strftime('%Y-%m-%d') def build_query(initial, extra): # type: (dict, dict)->dict rv = initial.copy() rv.update(extra) return rv build_daily_query = partial(build_query, {'date': report_daily_date}) def build_daily_update(amount): refund = (-amount).mongo_amount return { '$inc': { 'daily.income.{source}'.format(source = proxy.source): refund, 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): refund, 'daily.totalIncome': refund } } # def build_monthly_update(amount): # refund = (-amount).mongo_amount # # return { # '$inc': { # 'daily.{day}.income.{source}'.format(day = day, source = proxy.source): refund, # 'daily.{day}.totalIncome'.format(day = day): refund, # 'monthly.income.{source}'.format(source = proxy.source, ): refund, # 'monthly.totalIncome': refund # } # } if 'dealer' in allowed and allowed['dealer']: dealerIds_query = build_daily_query({'dealerId': {'$in': proxy.dealerIds}}) if proxy_already_handled(dealerIds_query, DealerDailyStat): for item in refund_partition: if item['role'] == PARTITION_ROLE.AGENT: continue dealer_daily_query = build_daily_query({'dealerId': ObjectId(item['id'])}) dealer_daily_update = build_daily_update(item['amount']) _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update) else: logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy) # : group if 'group' in allowed and allowed['group']: group_query = build_daily_query({'groupId': proxy.groupId}) if proxy_already_handled(group_query, GroupDailyStat): group_daily_query = build_daily_query({'groupId': proxy.groupId}) group_daily_update = build_daily_update(refund_fee) _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update) else: logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy) #: device if 'device' in allowed and allowed['device']: device_query = build_daily_query({'logicalCode': proxy.logicalCode}) if proxy_already_handled(device_query, DeviceDailyStat): device_daily_query = build_daily_query({'logicalCode': proxy.logicalCode}) device_daily_update = build_daily_update(refund_fee) _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update) else: logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy) except Exception as e: logger.error('record_income_stats failed, proxy=%r' % (proxy,)) logger.exception(e) # 同 get_consumption_stats def consumption_unit_precision(kind): # type:(str)->str return DEALER_CONSUMPTION_AGG_KIND_UNIT_PRECISION.get(kind, '0.01') def to_quantity(kind, value): # type: (str, Union[Decimal128, float, int, Quantity])->Quantity return Quantity(value, places = consumption_unit_precision(kind)) def transfer_consumption_item(item): # type: (tuple)->Tuple[str, Quantity] key, value = item return key, to_quantity(kind = key, value = value) def get_month_range(year, month): # type: (int, int)->Tuple[str, str] """ :param year: :param month: :return: """ DATE_FMT_TEXT = '{year}-{month}-{day}' _, end = monthrange(year = int(year), month = int(month)) return ( # start DATE_FMT_TEXT.format(year = year, month = month, day = 1), # end DATE_FMT_TEXT.format(year = year, month = month, day = end) ) ## ## Majorly for views ## def translate_consumption(mapping, hides = []): show_kinds = set(DEALER_CONSUMPTION_AGG_KIND_TRANSLATION.keys()) show_kinds = show_kinds - set(hides) if not mapping: return [consumption_map(kind, 0) for kind in show_kinds] else: return [consumption_map(kind, amount) for kind, amount in mapping.iteritems() if kind in show_kinds] def translate_consumption_stats(items, source = None, hides = []): # type:(Iterator, Optional[None, str], Optional(list))->list show_kinds = set(DEALER_CONSUMPTION_AGG_KIND.choices()) show_kinds = show_kinds - set(hides) rv = [] for item_source, item_value in items: if item_source not in DEALER_CONSUMPTION_AGG_KIND.choices(): logger.error('invalid source <{}>'.format(item_source)) continue if not source or source == item_source: rv.append('{kind} {value} {unit}'.format( kind = DEALER_CONSUMPTION_AGG_KIND_TRANSLATION[item_source], value = item_value, unit = DEALER_CONSUMPTION_AGG_KIND_UNIT[item_source] )) return rv def daily_stat_sum_on_field(income_or_consumption, field_name, stats, initial_type): # type: (str, str, Iterable, Union[type(RMB), type(Quantity)])->Union[RMB, Quantity] initial = initial_type.initial() return sum( (initial_type( get_in(keys = ['daily', income_or_consumption, field_name], coll = stat, default = initial)) for stat in stats), initial ) def daily_stat_income_sum_on_field(field_name, stats): # type: (str, Iterable)->RMB return daily_stat_sum_on_field('income', field_name = field_name, stats = stats, initial_type = RMB) def daily_stat_consumption_sum_on_field(field_name, stats): # type: (str, Iterable)->Quantity return daily_stat_sum_on_field('consumption', field_name = field_name, stats = stats, initial_type = Quantity) def transform(keys, sub_stats, translate, sum_fn): # type:(List[str], List[dict], Callable[[dict], list], Callable[[Iterable], int])-> List[dict] """ 转换器 get_in(list, iter, default_value) ex: demo = { "chain": {"wuhan": ["hs", "dh"]} } get_in(["chain", "wuhan", 1], demo) -> dh merge_with(func, iter) ex: demo1 = {"card": 10, "recharge": 20} demo2 = {"card": 1, "recharge": 2} merge_with(sum, [demo1, demo2]) -> {"card": 11, "recharge": 22} """ return translate(merge_with(sum_fn, [get_in(keys, sub_stat, {}) for sub_stat in sub_stats])) def cum_stats(keys, stats, translate, sum_fn = sum): # type:(List[str], Iterable, Callable[[dict]], Callable[[Iterable], int])->Dict[str, list] """ 将统计出来的记录 分组进行加和 :param keys: 分组key :param stats: 统计记录 :param translate: 数值转换函数 :param sum_fn: 加和函数 :return: """ incomeMap = dict() groupStats = cytoolz.groupby("groupId", stats) for groupId, sub_stats in groupStats.items(): incomeMap[str(groupId)] = transform(keys, sub_stats, translate, sum_fn) return incomeMap def income_map(kind, amount): return {'name': DEALER_INCOME_SOURCE_TRANSLATION[kind], 'value': RMB(amount), 'source': kind} def translate_income(mapping): return [income_map(kind, amount) for kind, amount in mapping.iteritems() if kind in DEALER_INCOME_SOURCE_TRANSLATION] def default_income_translation(kinds): return [income_map(kind, 0) for kind in kinds] def consumption_map(kind, amount): return { 'source': kind, 'name': DEALER_CONSUMPTION_AGG_KIND_TRANSLATION[kind], 'value': to_quantity(kind = kind, value = amount), 'unit': DEALER_CONSUMPTION_AGG_KIND_UNIT[kind] } class StatisticRecorder(object): OBJECT_MODULE = None def __init__(self, processor, statisticModel): # type:(CentralDataProcessor, DailyStat) -> None self._processor = processor self._statisticModel = statisticModel @property def statisticModel(self): return self._statisticModel def work(self, check): # type:(bool) -> bool """ 插入数据的时候需要注意的是check的判断和[record.id in origin] 的判断不要冲突 :param check: 是否校验数据已经被插入 :return: 是否无异常处理 """ record = self._processor.record if check and self.statisticModel.check_already_record(record): logger.info("skip <{}> id <{}> for <{}>, already recorded!".format(record, record.id, self.statisticModel)) return True return self.statisticModel.update_statistic_data(record) class CentralDataProcessor(object): """ 数据中央处理器 流程 全部的执行顺序为: 设备 --> 组 --> 经销商 --> ... """ def __init__(self, record, check = False, allowed = None): # type:(Optional[ConsumeRecord, DealerIncomeProxy], bool, dict) -> None """ :param record: 待处理的数据 :param check: 是否需要检查数据已经被处理过 :param allowed: 各个角色是否需要记录相应的数据 """ # TODO zjl record的校验 self._record = record self._check = check self._statisticList = list() # 加载数据处理 allowed = allowed or dict.fromkeys(["device", "group", "dealer"], True) # 构建每个独立的worker for _statistic in [DeviceDailyStat, GroupDailyStat, DealerDailyStat]: if _statistic.is_allowed(allowed): self._statisticList.append(_statistic) def process(self): """ 记录统计值 :return: """ # TODO zjl 有可能的话 异步进行 # TODO zjl 对于 result 进行处理 异常的订单处理机制尚待完成 for _statistic in self._statisticList: try: result = StatisticRecorder(self, _statistic).work(self._check) except Exception as e: logger.error("statistic <{}> handle record <{}> id <{}> error {}".format(_statistic, self._record, self._record.id, e)) result = False continue @property def record(self): return self._record def __repr__(self): return self._record