# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging from bson.decimal128 import Decimal128 from bson.objectid import ObjectId from cytoolz import get_in from mongoengine import DictField, StringField, DateTimeField, ObjectIdField, ListField from typing import Union, TYPE_CHECKING, Optional from apilib.monetary import RMB from apilib.quantity import Quantity from apps.web.constant import Const, DEALER_CONSUMPTION_AGG_KIND, MONTH_DATE_KEY from apps.web.core.accounting import Accounting from apps.web.core.db import Searchable from apps.web.dealer.define import DEALER_INCOME_SOURCE logger = logging.getLogger(__name__) if TYPE_CHECKING: from apps.web.user.models import ConsumeRecord from apps.web.dealer.proxy import DealerIncomeProxy class DevReport(Searchable): rpt = DictField(verbose_name = "报表数据") devNo = StringField(verbose_name = "设备编号", default = "") type = StringField(verbose_name = "报表类型", default = "") date = StringField(verbose_name = "日期", default = "") dateTimeAdded = DateTimeField(verbose_name = "时间", default = datetime.datetime.now) _shard_key = ('devNo', 'type', 'date') _origin_meta = { 'collection': 'DevReport', 'db_alias': 'report' } meta = _origin_meta @staticmethod def get_rpt(devNos, startDate, endDate): todayTime = datetime.datetime.now() today = todayTime.strftime("%Y-%m-%d") todayRpt = {} if startDate <= today <= endDate: todayRpt = Accounting.getDevIncome(devNos, todayTime) if startDate == endDate: return todayRpt devDict = {} filters = { 'devNo': {'$in': devNos}, 'type': 'day' } from apps.web.common.proxy import DevReportModelProxy rpts = DevReportModelProxy.get_data_list(startTime = startDate, endTime = endDate, **filters) for item in rpts: # type: DevReport if item.date == today: continue if item.devNo in devDict: devDict[item.devNo]['lineCoins'] += item.rpt.get('lineCoins', 0) devDict[item.devNo]['count'] += item.rpt.get('count', 0) else: devDict[item.devNo] = {'lineCoins': item.rpt.get('lineCoins', 0), 'count': item.rpt.get('count', 0)} result = {} for devNo in devNos: if devNo in devDict: result[devNo] = devDict[devNo] else: result[devNo] = {'lineCoins': 0, 'count': 0} if devNo in todayRpt: result[devNo]['lineCoins'] += todayRpt[devNo]['lineCoins'] result[devNo]['count'] += todayRpt[devNo]['count'] return result class GroupReport(Searchable): rpt = DictField(verbose_name = "报表数据") groupId = StringField(verbose_name = "地址ID") date = StringField(verbose_name = "日期", default = "") type = StringField(verbose_name = "报表类型", default = "") dateTimeAdded = DateTimeField(verbose_name = "时间", default = datetime.datetime.now) _shard_key = ('groupId', 'type', 'date') _origin_meta = { 'collection': 'GroupReport', 'db_alias': 'report' } meta = _origin_meta @staticmethod def get_rpt(groupIds, startDate, endDate): todayTime = datetime.datetime.now() today = todayTime.strftime("%Y-%m-%d") todayRpt = {} if startDate <= today <= endDate: todayRpt = Accounting.getGroupIncome(groupIds, todayTime) if startDate == endDate: return todayRpt groupDict = {} filters = { 'groupId': {'$in': groupIds}, 'type': 'day' } from apps.web.common.proxy import GroupReportModelProxy rpts = GroupReportModelProxy.get_data_list(startTime = startDate, endTime = endDate, **filters) for item in rpts: # type: GroupReport if item.date == today: continue temp = groupDict.get(item.groupId, dict()) l = temp.get('lineCoins', 0) c = temp.get('count', 0) l += item.rpt.get('lineCoins', 0) c += item.rpt.get('count', 0) temp.update({'lineCoins': l, 'count': c}) groupDict[item.groupId] = temp result = {} for groupId in groupIds: if groupId in groupDict: result[groupId] = groupDict[groupId] else: result[groupId] = {'lineCoins': 0, 'count': 0} if groupId in todayRpt: result[groupId]['lineCoins'] += todayRpt[groupId]['lineCoins'] result[groupId]['count'] += todayRpt[groupId]['count'] return result class DealerReport(Searchable): rpt = DictField(verbose_name = "报表数据") ownerId = StringField(verbose_name = "用户ID") date = StringField(verbose_name = "日期", default = "") type = StringField(verbose_name = "报表类型", default = "") dateTimeAdded = DateTimeField(verbose_name = "时间", default = datetime.datetime.now) _shard_key = ('ownerId', 'type', 'date') _origin_meta = { 'collection': 'DealerReport', 'db_alias': 'report' } meta = _origin_meta @staticmethod def prepare_group_pipeline(featureMatch = None): # type:(dict) -> list """ 创建mongo查询的聚合管道 :param featureMatch: 特征查询条件 {"dealerId": ObjectId()} :return: pipeline """ pipeline = list() if featureMatch: pipeline.append({"$match": featureMatch}) dateConvertPro = {"$project": { "date": {"$dateFromString": {"dateString": "$date"}}, "rpt": 1 }} pipeline.append(dateConvertPro) # 通过project将 date 的数据类型改变 dateUnpackPro = {"$project": { "month": {"$month": "$date"}, "year": {"$year": "$date"}, "rpt": 1 }} pipeline.append(dateUnpackPro) # 通过project 获取日期里面的年、月 为聚合做准备 return pipeline @staticmethod def month_group_key(): groupKey = { "_id": {"year": "$year", "month": "$month"} } return groupKey @staticmethod def get_rpts(ownerId, startDate, endDate): todayTime = datetime.datetime.now() today = todayTime.strftime("%Y-%m-%d") result = [] if startDate <= today <= endDate: todayRpt = Accounting.getOwnerIncome(ownerId, todayTime) todayRpt.update({'date': today}) result.append(todayRpt) if startDate == endDate: return [todayRpt] filters = { 'ownerId': ownerId, 'type': 'day' } from apps.web.common.proxy import DealerReportModelProxy rpts = DealerReportModelProxy.get_data_list(startTime = startDate, endTime = endDate, **filters) for item in rpts: # type: DealerReport if item.date == today: continue result.append({ 'date': item.date, 'count': item.rpt.get('count', 0), 'lineCoins': item.rpt.get('lineCoins', 0) }) return result #: ##### #: stats #: ##### ###: default dicts # `default_agg_map` example # {'totalIncome': 100 # 'income': { 'recharge': 50, 'ad': 50 }, # 'consumption: { 'elec': 10 } # } default_agg_map = { 'totalIncome': RMB('0.0').mongo_amount, 'income': {k: RMB('0.0').mongo_amount for k in DEALER_INCOME_SOURCE.choices()}, 'consumption': {k: Quantity('0.0').mongo_amount for k in DEALER_CONSUMPTION_AGG_KIND.choices()} } # `default_hour_stats` example # { # '1': # { # 'totalIncome': 100 # 'income': { 'recharge': 50, 'ad': 50 }, # 'consumption: { 'elec': 10 } # } # ... # } # default_hour_stats = {str(hour): default_agg_map for hour in range(24)} # `default_minute_stats` example # { '1:10': # { # 'totalIncome': 100 # 'income': { 'recharge': 50, 'ad': 50 }, # 'consumption: { 'elec': 10 } # } # ... # } default_minute_stats = {'%d:%d' % (hour, minute): default_agg_map for hour in range(24) for minute in range(60)} # `default_month_stats` example # {'daily.5' # { # 'totalIncome': 100 # 'income': { 'recharge': 50, 'ad': 50 }, # 'consumption: { 'elec': 10 } # } # ... # } default_month_stats = {'daily.%d' % d: default_agg_map for d in range(1, 32)} ############ ## Daily ### ############ class DailyStat(Searchable): date = StringField() origin = DictField() daily = DictField(default=default_agg_map) hourly = DictField(default = default_hour_stats) minute = DictField(default = default_minute_stats) meta = { 'abstract': True } def get(self, key, default = None): try: value = self.__getitem__(key) except KeyError: return default return value @classmethod def is_allowed(cls, allowed): supportKey = cls.__name__.replace(DailyStat.__name__, "").lower() return bool(allowed.get(supportKey, False)) @classmethod def check_already_record(cls, record): # type:(Optional[ConsumeRecord, DealerIncomeProxy]) -> bool """ 对于 查询每个记录是否已经存在 实际判断的方式就是 origin里面有没有record id 但是对于该模型建立的索引是date + 特征id 所以是先走索引降低数据查询量 在查询是否id 存在于origin :param record: :return: """ raise NotImplementedError("not realize!") @classmethod def update_statistic_data(cls, record): # type:(Optional[ConsumeRecord, DealerIncomeProxy]) -> bool """ 更新统计数据 根据 record 的记录统计不同的数据 对于设备 消费即消费 收益即收益 对于组 消费及消费 收益需要记录当时每个经销商的分成(1条 里面记录map) 对于经销商 消费每个经销商都需要记录一份 收益根据每个经销商根据map 记录自己分得的那一份(n个经销商n条 每条记录自己的) :param record: :return: """ raise NotImplementedError("not realize") @classmethod def update_statistic(cls, query, updateOrInsertData): # type:(dict, dict) -> True try: result = cls.objects.filter(__raw__ = query).update(upsert = True, **updateOrInsertData) except Exception as e: logger.error(e) result = False if not result: logger.info('update failed, model={}, query={}, params={}'.format(cls, query, updateOrInsertData)) return result @staticmethod def prepare_group_pipeline(featureMatch = None, project = ('daily'), needTime = True, hasDay = False): # type:(dict, tuple, bool, bool) -> list """ 创建mongo查询的聚合管道 :param featureMatch: 特征查询条件 {"dealerId": ObjectId()} :return: pipeline """ pipeline = list() if featureMatch: pipeline.append({"$match": featureMatch}) _include = {field: 1 for field in project} if needTime: dateConvertPro = {"$project": { "date": {"$dateFromString": {"dateString": "$date"}} }} dateConvertPro['$project'].update(_include) pipeline.append(dateConvertPro) # 通过project将 date 的数据类型改变 dateUnpackPro = {"$project": { "month": {"$month": "$date"}, "year": {"$year": "$date"} }} dateUnpackPro['$project'].update(_include) if hasDay: dateUnpackPro['$project'].update({"day": {"$dayOfMonth": "$date"}}) pipeline.append(dateUnpackPro) # 通过project 获取日期里面的年、月 为聚合做准备 else: pipeline.append({'$project': _include}) return pipeline @staticmethod def month_group_key(): groupKey = { "_id": {"year": "$year", "month": "$month"} } return groupKey @staticmethod def day_group_key(): groupKey = { "_id": {"year": "$year", "month": "$month", "day": "$day"} } return groupKey class DeviceDailyStat(DailyStat): logicalCode = StringField() _shard_key = ('logicalCode', 'dateTimeAdded') _origin_meta = { 'collection': 'device_daily_stats', 'indexes': [ {'fields': ['logicalCode', 'date'], 'unique': True}, ], 'db_alias': 'report', } meta = _origin_meta def __repr__(self): return '' % (self.logicalCode, self.date) @classmethod def check_already_record(cls, record): # type:(Optional[ConsumeRecord, DealerIncomeProxy]) -> bool logicalCode = record.logicalCode query = { "logicalCode": logicalCode, "date": record.dateTimeAdded.strftime("%Y-%m-%d"), "origin.{}".format(record.statistic_type): record.id } return bool(cls.objects(__raw__ = query).count()) @classmethod def update_statistic_data(cls, record): # type:(Optional[ConsumeRecord, DealerIncomeProxy]) -> bool logicalCode = record.logicalCode findQuery = { "logicalCode": logicalCode, "date": record.dateTimeAdded.strftime("%Y-%m-%d"), } updateOrInsertData = record.get_statistic_update_info() return cls.update_statistic(findQuery, updateOrInsertData) class GroupDailyStat(DailyStat): groupId = ObjectIdField() devices = ListField(ObjectIdField()) _shard_key = ('groupId', 'dateTimeAdded') _origin_meta = { 'collection': 'group_daily_stats', 'indexes': [ {'fields': ['groupId', 'date'], 'unique': True}, ], 'db_alias': 'report' } meta = _origin_meta def __repr__(self): return '' % (self.groupId, self.date) @classmethod def get_today_recharge_count(cls, groupIds): today = datetime.datetime.now().strftime(Const.DATE_FMT) count, total = cls.objects(groupId__in = groupIds, date = today).sum_and_count("daily.totalIncomeCount") return total @classmethod def check_already_record(cls, record): # type:(Optional[ConsumeRecord, DealerIncomeProxy]) -> bool groupId = ObjectId(record.groupId) query = { "groupId": groupId, "date": record.dateTimeAdded.strftime("%Y-%m-%d"), "origin.{}".format(record.statistic_type): record.id } return bool(cls.objects(__raw__ = query).count()) @classmethod def update_statistic_data(cls, record): # type:(Optional[ConsumeRecord, DealerIncomeProxy]) -> bool groupId = ObjectId(record.groupId) findQuery = { "groupId": groupId, "date": record.dateTimeAdded.strftime("%Y-%m-%d"), } updateOrInsertData = record.get_statistic_update_info() # 如果组统计的是 收益类型 需要为 收益进行Map的收益分成记录 模型如下 """ { "daily": { "consumption": {}, "income": {}, "incomeMap":{ "5fdadcc418e358a57aa8f59a": RMB(1), "5fdadcc418e358a57aa8f59a": RMB(3), ... } }, } """ if record.statistic_type == "income": for _dealerId, _income in record.actualAmountMap.items(): updateOrInsertData['inc__daily__incomeMap__{}'.format(str(_dealerId))] = RMB(_income).mongo_amount return cls.update_statistic(findQuery, updateOrInsertData) class DealerDailyStat(DailyStat): dealerId = ObjectIdField() groups = ListField(ObjectIdField()) _shard_key = ('dealerId', 'dateTimeAdded') _origin_meta = { 'collection': 'dealer_daily_stats', 'indexes': [ {'fields': ['dealerId', 'date'], 'unique': True}, ], 'db_alias': 'report' } meta = _origin_meta @classmethod def get_numerical_value(cls, dealerId, date, keys): # type:(ObjectId, str, list)->Union[Decimal128, int, float] return get_in(keys, cls.objects(dealerId = dealerId, date = date).head(default = cls).daily, default = 0) @classmethod def get_recharge(cls, dealerId, date): # type:(ObjectId, str)->RMB return RMB(cls.get_numerical_value(dealerId = dealerId, date = date, keys = ['income', DEALER_INCOME_SOURCE.RECHARGE])) @classmethod def get_recharge_card(cls, dealerId, date): # type:(ObjectId, str)->RMB return RMB(cls.get_numerical_value(dealerId = dealerId, date = date, keys = ['income', DEALER_INCOME_SOURCE.RECHARGE_CARD])) @classmethod def get_recharge_virtual_card(cls, dealerId, date): # type:(ObjectId, str)->RMB return RMB(cls.get_numerical_value(dealerId = dealerId, date = date, keys = ['income', DEALER_INCOME_SOURCE.RECHARGE_VIRTUAL_CARD])) @classmethod def get_redpack(cls, dealerId, date): # type:(ObjectId, str)->RMB return RMB(cls.get_numerical_value(dealerId = dealerId, date = date, keys = ['income', DEALER_INCOME_SOURCE.REDPACK])) @classmethod def get_refund_cash(cls, dealerId, date): return RMB(cls.get_numerical_value(dealerId=dealerId, date=date, keys = ["income", DEALER_INCOME_SOURCE.REFUND_CASH])) @classmethod def get_today_recharge(cls, dealerId): # type:(ObjectId)->RMB today = datetime.datetime.now().strftime(Const.DATE_FMT) return cls.get_recharge(dealerId = dealerId, date = today) @classmethod def get_today_income(cls, dealerId): # type:(ObjectId)->RMB today = datetime.datetime.now().strftime(Const.DATE_FMT) return cls.get_recharge(dealerId, today) + \ cls.get_recharge_card(dealerId, today) + \ cls.get_recharge_virtual_card(dealerId, today) + \ cls.get_redpack(dealerId, today) @classmethod def get_today_refund_cash(cls, dealerId): # type:(ObjectId)->RMB """ 获取今日退款金额 :param dealerId: :return: """ today = datetime.datetime.now().strftime(Const.DATE_FMT) return cls.get_refund_cash(dealerId, today) @classmethod def get_today(cls, dealerId): # type:(ObjectId)->dict today = datetime.datetime.now().strftime(Const.DATE_FMT) return cls.objects(dealerId = dealerId, date = today).head(default = cls).daily @classmethod def get_someday_income(cls, dealerId, somedayTime): # type:(ObjectId)->RMB someday = somedayTime.strftime(Const.DATE_FMT) return cls.get_recharge(dealerId, someday) + cls.get_recharge_card(dealerId, someday) + cls.get_recharge_virtual_card( dealerId, someday) @staticmethod def get_someday_income_and_consume_count(cls, dealerId, somedayTime): somdayDate = somedayTime.strftime(Const.DATE_FMT) rcd = cls.get_collection().find({'dealerId': ObjectId(dealerId), 'date': somdayDate}, {'daily': 1}) if len(rcd) == 0: return {'consumeTotalCount': 0, 'incomeTotalCount': 0} dayly = rcd[0].get('daily', {}) return { 'consumeTotalCount': dayly.get('consumption', {}).get('totalCount', 0), 'incomeTotalCount': dayly.get('income', {}).get('totalCount', 0) } @classmethod def check_already_record(cls, record): # type:(Optional[ConsumeRecord, DealerIncomeProxy]) -> bool dealerIds = record.dealerIds query = { "dealerId": {"$in": dealerIds}, "date": record.dateTimeAdded.strftime("%Y-%m-%d"), "origin.{}".format(record.statistic_type): record.id } return bool(cls.objects(__raw__ = query).count()) @classmethod def update_statistic_data(cls, record): # type:(Optional[ConsumeRecord, DealerIncomeProxy]) -> bool dealerIds = record.dealerIds findQuery = { "date": record.dateTimeAdded.strftime("%Y-%m-%d"), } # 经销商的需要循环处理 for _dealerId in dealerIds: findQuery.update({"dealerId": ObjectId(_dealerId)}) if record.statistic_type == "income": amount = record.actualAmountMap[str(_dealerId)] else: amount = record.coin updateOrInsertData = record.get_statistic_update_info(amount = amount) cls.update_statistic(findQuery, updateOrInsertData) @staticmethod def dealer_group_key(): return { '_id': '$dealerId' } def __repr__(self): return '' % (self.dealerId, self.date) ############ ## Month ### ########## class MonthlyStat(Searchable): date = StringField() origin = DictField() daily = DictField(default = default_month_stats) monthly = DictField(default = default_agg_map) meta = { 'abstract': True } class DeviceMonthlyStat(MonthlyStat): logicalCode = StringField() _shard_key = ('logicalCode', 'dateTimeAdded') _origin_meta = { 'collection': 'device_monthly_stats', 'indexes': [ {'fields': ['logicalCode', 'date'], 'unique': True}, ], 'db_alias': 'report' } meta = _origin_meta def __repr__(self): return '' % (self.logicalCode, self.date) class GroupMonthlyStat(MonthlyStat): groupId = ObjectIdField() devices = ListField(ObjectIdField()) _shard_key = ('groupId', 'dateTimeAdded') _origin_meta = { 'collection': 'group_monthly_stats', 'indexes': [ {'fields': ['groupId', 'date'], 'unique': True}, ], 'db_alias': 'report' } meta = _origin_meta def __repr__(self): return '' % (self.groupId, self.date) class DealerMonthlyStat(MonthlyStat): dealerId = ObjectIdField() groups = ListField(ObjectIdField()) _shard_key = ('dealerId', 'dateTimeAdded') _origin_meta = { 'collection': 'dealer_monthly_stats', 'indexes': [ {'fields': ['dealerId', 'date'], 'unique': True}, ], 'db_alias': 'report' } meta = _origin_meta @classmethod def get_numerical_value(cls, dealerId, date, keys): # type:(ObjectId, str, list)->Union[Decimal128, int, float] assert isinstance(dealerId, ObjectId), 'dealerId has to be a ObjectId' return get_in(keys, cls.objects(dealerId = dealerId, date = date).head(default = cls).monthly, default = 0) @classmethod def get_recharge(cls, dealerId, date): # type:(ObjectId, str)->RMB return RMB(cls.get_numerical_value(dealerId = dealerId, date = date, keys = ['income', DEALER_INCOME_SOURCE.RECHARGE])) @classmethod def get_recharge_card(cls, dealerId, date): # type:(ObjectId, str)->RMB return RMB(cls.get_numerical_value(dealerId = dealerId, date = date, keys = ['income', DEALER_INCOME_SOURCE.RECHARGE_CARD])) @classmethod def get_recharge_virtual_card(cls, dealerId, date): # type:(ObjectId, str)->RMB return RMB(cls.get_numerical_value(dealerId = dealerId, date = date, keys = ['income', DEALER_INCOME_SOURCE.RECHARGE_VIRTUAL_CARD])) @classmethod def get_this_month_income(cls, dealerId): # type:(ObjectId)->RMB now = datetime.datetime.now() date = MONTH_DATE_KEY.format(year = now.year, month = now.month) return cls.get_recharge(dealerId = dealerId, date = date) + cls.get_recharge_card(dealerId = dealerId, date = date) + cls.get_recharge_virtual_card( dealerId = dealerId, date = date) @classmethod def get_someday_income_and_consume_count(cls, dealerId, somedayTime): date = MONTH_DATE_KEY.format(year = somedayTime.year, month = somedayTime.month) rcds = cls.get_collection().find({'dealerId': ObjectId(dealerId), 'date': date}, {'monthly': 1}) if len(rcds) == 0: return {'consumeTotalCount': 0, 'incomeTotalCount': 0} monthly = rcds[0].get('monthly') return {'consumeTotalCount': monthly.get('cosumption', {}).get('totalCount'), 'incomeTotalCount': monthly.get('income', {}).get('totalCount')} def __repr__(self): return '' % (self.dealerId, self.date)