123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import logging
- from calendar import monthrange
- from collections import defaultdict
- from functools import partial
- import cytoolz
- from bson.decimal128 import Decimal128
- from bson.objectid import ObjectId
- from cytoolz.dicttoolz import get_in, merge_with
- from mongoengine.document import Document
- from typing import TYPE_CHECKING, Tuple, Optional, Dict, Iterable, Iterator, Union, Callable, List
- from apilib.monetary import RMB, VirtualCoin
- from apilib.quantity import Quantity
- from apps.web.constant import (
- from apps.web.dealer.define import DEALER_INCOME_SOURCE_TRANSLATION
- from apps.web.device.models import Group
- from apps.web.report.models import DealerDailyStat, GroupDailyStat, DeviceDailyStat, \
- DailyStat
- logger = logging.getLogger(__name__)
- from apps.web.dealer.proxy import DealerIncomeProxy
- from apps.web.user.models import ConsumeRecord
- def upsert_stat(model, query, params, identifier = '', log_error = True):
- # type: (Document, dict, dict, Optional[str], Optional[bool])->bool
- """
- :param model:
- :param query:
- :param params:
- :param identifier:
- :param log_error:
- :return:
- """
- result = model._get_collection().update(query, params, upsert = True)
- if not bool(result['n']):
- if log_error:
- logger.error(
- 'session(%s) update failed, model=%r, query=%r, params=%r' % (identifier, model, query, params))
- return False
- return True
- def record_consumption_stats(record, check = False, allowed = None):
- # type:(ConsumeRecord, bool, Optional[dict, None])->None
- """
- 记录消费 之前的设备 启动的时候会记录一次金币 结束的时候又会记录一次 导致用户的消费信息实际上是 记录了两倍
- :param record: ConsumeRecord 消费记录
- :param check:
- :param allowed
- :return:
- """
- def records_already_handled(queryDict, model):
- queryDict.update({'origin.consumption': record.id})
- return model.get_collection().find(queryDict).count() > 0
- if not allowed: allowed = {'dealer': True, 'device': True, 'group': True}
- try:
- RECORD_SESSION_ID = str(record.id)
- _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID)
- dt = record.dateTimeAdded
- hour = dt.hour
- report_daily_date = dt.strftime('%Y-%m-%d')
- def build_query(initial, extra):
- # type: (dict, dict)->dict
- rv = initial.copy()
- rv.update(extra)
- return rv
- # 构建的查询条件
- build_daily_query = partial(build_query, {'date': report_daily_date})
- def build_daily_update(kind_amount_map):
- kam = kind_amount_map.copy()
- # kam.setdefault('coin', record.coin)
- rv = defaultdict(dict)
- for kind, amount in kam.iteritems():
- rv['$inc']['daily.consumption.{kind}'.format(kind = kind)] = Decimal128(str(amount))
- rv['$inc']['hourly.{hour}.consumption.{kind}'.format(hour = hour, kind = kind)] = Decimal128(
- str(amount))
- rv.update(
- {
- '$addToSet': {'origin.consumption': record.id},
- }
- )
- # 把消费记录的条数也记录下来
- rv['$inc']['daily.totalConsumptionCount'] = 1
- return rv
- dealerIds = [ObjectId(record.ownerId)] + [ObjectId(_['id']) for _ in
- Group.get_group(record.groupId)['partnerDict'].values()]
- # 统计经销商的数据
- if 'dealer' in allowed and allowed['dealer']:
- dealer_query = build_daily_query({'dealerId': {'$in': dealerIds}})
- if not check or not records_already_handled(dealer_query, DealerDailyStat):
- for dealerId in dealerIds:
- dealer_daily_query = build_daily_query({'dealerId': ObjectId(dealerId)})
- dealer_daily_update = build_daily_update(record.aggInfo)
- _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update)
- else:
- logger.debug('skipping %r for DealerDailyStat, already recorded' % (record,)) if check else None
- # 统计组地址的数据
- if 'group' in allowed and allowed['group']:
- group_query = build_daily_query({'groupId': ObjectId(record.groupId)})
- if not check or not records_already_handled(group_query, GroupDailyStat):
- group_daily_query = build_daily_query({'groupId': ObjectId(record.groupId)})
- group_daily_update = build_daily_update(record.aggInfo)
- _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update)
- else:
- logger.debug('skipping %r for GroupDailyStat, already recorded' % (record,)) if check else None
- # 统计设备的数据
- if 'device' in allowed and allowed['device']:
- device_query = build_daily_query({'logicalCode': record.logicalCode})
- if not check or not records_already_handled(device_query, DeviceDailyStat):
- device_daily_query = build_daily_query({'logicalCode': record.logicalCode})
- device_daily_update = build_daily_update(record.aggInfo)
- _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update)
- else:
- logger.debug('skipping %r for DeviceDailyStat, already recorded' % (record,)) if check else None
- except Exception as e:
- logger.error('record_consumption_stats failed, record=%r' % (record,))
- logger.exception(e)
- def update_consumption_states(record, refund_coin, allowed = None):
- # type: (ConsumeRecord, VirtualCoin, Optional[None, dict]) -> None
- pass
- # 这个只在脚本里面有使用 应该是 历史数据的整合
- def record_income_stats(proxy, check = False, allowed = None):
- # type:(DealerIncomeProxy, bool, Optional[None, dict])->None
- """
- 记录收益
- 针对地址的统计 将收益的每一笔分账都统计到该记录的经销商上
- :param proxy: 收益划分的详情
- :param check: 是否需要check
- :param allowed 允许统计的模型
- :return:
- """
- if not allowed:
- allowed = {'dealer': True, 'device': True, 'group': True}
- try:
- assert isinstance(proxy, DealerIncomeProxy), 'proxy has to be a DealerIncomeProxy'
- RECORD_SESSION_ID = str(proxy.id)
- _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID)
- def proxy_already_handled(queryDict, model):
- queryDict.update({'origin.income': proxy.id})
- return model.get_collection().find(queryDict).count() > 0
- dt = proxy.dateTimeAdded
- hour = dt.hour
- report_daily_date = dt.strftime('%Y-%m-%d')
- def build_query(initial, extra):
- # type: (dict, dict)->dict
- rv = initial.copy()
- rv.update(extra)
- return rv
- build_daily_query = partial(build_query, {'date': report_daily_date})
- def build_daily_update(amount):
- return {
- '$inc': {
- 'daily.income.{source}'.format(source = proxy.source): amount,
- 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): amount,
- 'daily.totalIncome': amount,
- 'daily.totalIncomeCount': 1
- },
- '$addToSet': {'origin.income': proxy.id}
- }
- if 'dealer' in allowed and allowed['dealer']:
- dealerIds_query = build_daily_query({'dealerId': {'$in': proxy.dealerIds}})
- if not check or not proxy_already_handled(dealerIds_query, DealerDailyStat):
- for dealerId in proxy.dealerIds:
- dealer_daily_query = build_daily_query({'dealerId': dealerId})
- dealer_daily_update = build_daily_update(proxy.actualAmountMap[str(dealerId)])
- _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update)
- else:
- logger.debug('skipping %r for DealerDailyStat, already recorded' % (proxy,)) if check else None
- # : group
- if 'group' in allowed and allowed['group']:
- group_query = build_daily_query({'groupId': proxy.groupId})
- if not check or not proxy_already_handled(group_query, GroupDailyStat):
- group_daily_query = build_daily_query({'groupId': proxy.groupId})
- group_daily_update = build_daily_update(proxy.totalAmount)
- _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update)
- else:
- logger.debug('skipping %r for GroupDailyStat, already recorded' % (proxy,)) if check else None
- #: device
- if 'device' in allowed and allowed['device']:
- device_query = build_daily_query({'logicalCode': proxy.logicalCode})
- if not check or not proxy_already_handled(device_query, DeviceDailyStat):
- device_daily_query = build_daily_query({'logicalCode': proxy.logicalCode})
- device_daily_update = build_daily_update(proxy.totalAmount)
- _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update)
- else:
- logger.debug('skipping %r for DeviceDailyStat, already recorded' % (proxy,)) if check else None
- except Exception as e:
- logger.error('record_income_stats failed, proxy=%r' % (proxy,))
- logger.exception(e)
- def update_income_stats(proxy, refund_partition, refund_fee, allowed = None):
- # type:(DealerIncomeProxy, dict, RMB, dict)->None
- """
- 现金退费的时候会使用 主要处理从收益中将现金退款的部分减除掉
- :param proxy:
- :param refund_partition:
- :param refund_fee:
- :param allowed:
- :return:
- """
- if not allowed: allowed = {'dealer': True, 'device': True, 'group': True}
- try:
- assert isinstance(proxy, DealerIncomeProxy), 'proxy has to be a DealerIncomeProxy'
- RECORD_SESSION_ID = str(proxy.id)
- _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID)
- def proxy_already_handled(queryDict, model):
- queryDict.update({'origin.income': proxy.id})
- return model.get_collection().find(queryDict).count() > 0
- dt = proxy.dateTimeAdded
- hour = dt.hour
- report_daily_date = dt.strftime('%Y-%m-%d')
- def build_query(initial, extra):
- # type: (dict, dict)->dict
- rv = initial.copy()
- rv.update(extra)
- return rv
- build_daily_query = partial(build_query, {'date': report_daily_date})
- def build_daily_update(amount):
- refund = (-amount).mongo_amount
- return {
- '$inc': {
- 'daily.income.{source}'.format(source = proxy.source): refund,
- 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): refund,
- 'daily.totalIncome': refund
- }
- }
- # def build_monthly_update(amount):
- # refund = (-amount).mongo_amount
- #
- # return {
- # '$inc': {
- # 'daily.{day}.income.{source}'.format(day = day, source = proxy.source): refund,
- # 'daily.{day}.totalIncome'.format(day = day): refund,
- # 'monthly.income.{source}'.format(source = proxy.source, ): refund,
- # 'monthly.totalIncome': refund
- # }
- # }
- if 'dealer' in allowed and allowed['dealer']:
- dealerIds_query = build_daily_query({'dealerId': {'$in': proxy.dealerIds}})
- if proxy_already_handled(dealerIds_query, DealerDailyStat):
- for item in refund_partition:
- if item['role'] == PARTITION_ROLE.AGENT:
- continue
- dealer_daily_query = build_daily_query({'dealerId': ObjectId(item['id'])})
- dealer_daily_update = build_daily_update(item['amount'])
- _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update)
- else:
- logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy)
- # : group
- if 'group' in allowed and allowed['group']:
- group_query = build_daily_query({'groupId': proxy.groupId})
- if proxy_already_handled(group_query, GroupDailyStat):
- group_daily_query = build_daily_query({'groupId': proxy.groupId})
- group_daily_update = build_daily_update(refund_fee)
- _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update)
- else:
- logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy)
- #: device
- if 'device' in allowed and allowed['device']:
- device_query = build_daily_query({'logicalCode': proxy.logicalCode})
- if proxy_already_handled(device_query, DeviceDailyStat):
- device_daily_query = build_daily_query({'logicalCode': proxy.logicalCode})
- device_daily_update = build_daily_update(refund_fee)
- _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update)
- else:
- logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy)
- except Exception as e:
- logger.error('record_income_stats failed, proxy=%r' % (proxy,))
- logger.exception(e)
- # 同 get_consumption_stats
- def consumption_unit_precision(kind):
- # type:(str)->str
- def to_quantity(kind, value):
- # type: (str, Union[Decimal128, float, int, Quantity])->Quantity
- return Quantity(value, places = consumption_unit_precision(kind))
- def transfer_consumption_item(item):
- # type: (tuple)->Tuple[str, Quantity]
- key, value = item
- return key, to_quantity(kind = key, value = value)
- def get_month_range(year, month):
- # type: (int, int)->Tuple[str, str]
- """
- :param year:
- :param month:
- :return:
- """
- DATE_FMT_TEXT = '{year}-{month}-{day}'
- _, end = monthrange(year = int(year), month = int(month))
- return (
- # start
- DATE_FMT_TEXT.format(year = year, month = month, day = 1),
- # end
- DATE_FMT_TEXT.format(year = year, month = month, day = end)
- )
- ##
- ## Majorly for views
- ##
- def translate_consumption(mapping, hides = []):
- show_kinds = show_kinds - set(hides)
- if not mapping:
- return [consumption_map(kind, 0) for kind in show_kinds]
- else:
- return [consumption_map(kind, amount) for kind, amount in mapping.iteritems() if kind in show_kinds]
- def translate_consumption_stats(items, source = None, hides = []):
- # type:(Iterator, Optional[None, str], Optional(list))->list
- show_kinds = set(DEALER_CONSUMPTION_AGG_KIND.choices())
- show_kinds = show_kinds - set(hides)
- rv = []
- for item_source, item_value in items:
- if item_source not in DEALER_CONSUMPTION_AGG_KIND.choices():
- logger.error('invalid source <{}>'.format(item_source))
- continue
- if not source or source == item_source:
- rv.append('{kind} {value} {unit}'.format(
- value = item_value,
- ))
- return rv
- def daily_stat_sum_on_field(income_or_consumption, field_name, stats, initial_type):
- # type: (str, str, Iterable, Union[type(RMB), type(Quantity)])->Union[RMB, Quantity]
- initial = initial_type.initial()
- return sum(
- (initial_type(
- get_in(keys = ['daily', income_or_consumption, field_name], coll = stat, default = initial)) for stat in
- stats),
- initial
- )
- def daily_stat_income_sum_on_field(field_name, stats):
- # type: (str, Iterable)->RMB
- return daily_stat_sum_on_field('income', field_name = field_name, stats = stats, initial_type = RMB)
- def daily_stat_consumption_sum_on_field(field_name, stats):
- # type: (str, Iterable)->Quantity
- return daily_stat_sum_on_field('consumption', field_name = field_name, stats = stats, initial_type = Quantity)
- def transform(keys, sub_stats, translate, sum_fn):
- # type:(List[str], List[dict], Callable[[dict], list], Callable[[Iterable], int])-> List[dict]
- """
- 转换器
- get_in(list, iter, default_value) ex:
- demo = {
- "chain": {"wuhan": ["hs", "dh"]}
- }
- get_in(["chain", "wuhan", 1], demo) -> dh
- merge_with(func, iter) ex:
- demo1 = {"card": 10, "recharge": 20}
- demo2 = {"card": 1, "recharge": 2}
- merge_with(sum, [demo1, demo2]) -> {"card": 11, "recharge": 22}
- """
- return translate(merge_with(sum_fn, [get_in(keys, sub_stat, {}) for sub_stat in sub_stats]))
- def cum_stats(keys, stats, translate, sum_fn = sum):
- # type:(List[str], Iterable, Callable[[dict]], Callable[[Iterable], int])->Dict[str, list]
- """
- 将统计出来的记录 分组进行加和
- :param keys: 分组key
- :param stats: 统计记录
- :param translate: 数值转换函数
- :param sum_fn: 加和函数
- :return:
- """
- incomeMap = dict()
- groupStats = cytoolz.groupby("groupId", stats)
- for groupId, sub_stats in groupStats.items():
- incomeMap[str(groupId)] = transform(keys, sub_stats, translate, sum_fn)
- return incomeMap
- def income_map(kind, amount):
- return {'name': DEALER_INCOME_SOURCE_TRANSLATION[kind], 'value': RMB(amount), 'source': kind}
- def translate_income(mapping):
- return [income_map(kind, amount) for kind, amount in mapping.iteritems() if
- def default_income_translation(kinds):
- return [income_map(kind, 0) for kind in kinds]
- def consumption_map(kind, amount):
- return {
- 'source': kind,
- 'value': to_quantity(kind = kind, value = amount),
- }
- class StatisticRecorder(object):
- def __init__(self, processor, statisticModel):
- # type:(CentralDataProcessor, DailyStat) -> None
- self._processor = processor
- self._statisticModel = statisticModel
- @property
- def statisticModel(self):
- return self._statisticModel
- def work(self, check):
- # type:(bool) -> bool
- """
- 插入数据的时候需要注意的是check的判断和[record.id in origin] 的判断不要冲突
- :param check: 是否校验数据已经被插入
- :return: 是否无异常处理
- """
- record = self._processor.record
- if check and self.statisticModel.check_already_record(record):
- logger.info("skip <{}> id <{}> for <{}>, already recorded!".format(record, record.id, self.statisticModel))
- return True
- return self.statisticModel.update_statistic_data(record)
- class CentralDataProcessor(object):
- """
- 数据中央处理器
- 流程 全部的执行顺序为:
- 设备 --> 组 --> 经销商 --> ...
- """
- def __init__(self, record, check = False, allowed = None):
- # type:(Optional[ConsumeRecord, DealerIncomeProxy], bool, dict) -> None
- """
- :param record: 待处理的数据
- :param check: 是否需要检查数据已经被处理过
- :param allowed: 各个角色是否需要记录相应的数据
- """
- # TODO zjl record的校验
- self._record = record
- self._check = check
- self._statisticList = list()
- # 加载数据处理
- allowed = allowed or dict.fromkeys(["device", "group", "dealer"], True)
- # 构建每个独立的worker
- for _statistic in [DeviceDailyStat, GroupDailyStat, DealerDailyStat]:
- if _statistic.is_allowed(allowed):
- self._statisticList.append(_statistic)
- def process(self):
- """
- 记录统计值
- :return:
- """
- # TODO zjl 有可能的话 异步进行
- # TODO zjl 对于 result 进行处理 异常的订单处理机制尚待完成
- for _statistic in self._statisticList:
- try:
- result = StatisticRecorder(self, _statistic).work(self._check)
- except Exception as e:
- logger.error("statistic <{}> handle record <{}> id <{}> error {}".format(_statistic, self._record,
- self._record.id, e))
- result = False
- continue
- @property
- def record(self):
- return self._record
- def __repr__(self):
- return self._record