# -*- coding: utf-8 -*- # !/usr/bin/env python """ 数据库水平分表的工具 暂时简易处理 后续可能会直接修改querySet来代替 """ import collections import datetime import logging from collections import namedtuple from weakref import WeakKeyDictionary from bson import ObjectId from cytoolz import merge_with, keyfilter from dateutil import tz from django.conf import settings from mongoengine import ValidationError, DoesNotExist from typing import TYPE_CHECKING, Iterable, Mapping, List, Optional, Dict, Any from apilib.monetary import RMB, Money from apilib.quantity import Quantity from apps.common.utils import get_start_and_end_by_month, get_start_and_end_by_year, get_date_range from apps.web.constant import DEALER_CONSUMPTION_AGG_KIND, Const from apps.web.core.accounting import Accounting from apps.web.core.db import copy_document_classes from apps.web.core.exceptions import ReportProxyValueError, ReportProxyTypeError from apps.web.dealer.define import DEALER_INCOME_SOURCE from apps.web.dealer.proxy import DealerIncomeProxy from apps.web.models import ArchivedModelProxyConfig from apps.web.report.models import DealerDailyStat, GroupDailyStat, DeviceDailyStat, GroupReport, DealerReport, \ DevReport from apps.web.user.models import ConsumeRecord from apps.web.user.models import RechargeRecord if TYPE_CHECKING: from apps.web.core.db import Searchable from apps.web.core.db import CustomQuerySet logger = logging.getLogger(__name__) TimeStream = namedtuple("TimeStream", ["archived", "st", "et"]) def sum_fn(lst): return sum(map(lambda _: Quantity(_), lst), Quantity('0')) not_aggreate_id = lambda _: _ != '_id' class LazyCount(object): def __init__(self): self._cache = WeakKeyDictionary() def __set__(self, instance, value): # type:(QuerySetProxy, Mapping) -> None """""" if instance is None: raise Exception("can not use <{}>".format(instance)) # TODO zjl 对value做数据补齐处理 self._cache[instance] = value def __get__(self, instance, owner): if instance is None: raise Exception("can not use <{}> by <{}>".format(instance, owner)) return self._cache.get(instance, dict()) class QuerySetProxy(object): """ 模拟 CustomQuerySet 主要处理分页的问题 """ lazyCount = LazyCount() def __init__(self, *iterable): # type:(Iterable[CustomQuerySet]) -> None """ :param iterable: """ self._iterable = iterable def _get_count(self, queryset): _count = self.lazyCount.get(queryset, dict()).get("count") if _count is None: _count = queryset.count() return _count def paginate(self, pageIndex, pageSize): # type:(int, int) -> QuerySetProxy """ to CustomQuerySet paginate 分页的逻辑 不要使用count count 很慢 :param pageIndex: :param pageSize: :return: """ # 需要的数据总量 total = pageSize * pageIndex new_iterable = list() for _queryset in self._iterable: _count = self._get_count(_queryset) # 不需要下一个queryset 参与 if total <= _count: skip = max(0, total - pageSize) limit = pageSize if total - pageSize > 0 else total new_iterable.append(_queryset.skip(skip).limit(limit)) break # 当前queryset 需要参与 后续的 queryset 也需要参与, 跨页 elif _count < total < _count + pageSize: new_iterable.append(_queryset.skip(max(total - pageSize, 0)).limit(pageSize)) total = total - _count # 当前的queryset不参与 else: total = total - _count return self.__class__(*new_iterable) def _sum_and_count(self, queryset, field): cache_info = self.lazyCount if cache_info.get(queryset): _sum = cache_info.get(queryset).get("sum") _count = cache_info.get(queryset).get("count") else: _count, _sum = queryset.sum_and_count(field) cache_info.update({queryset: {"sum": _sum, "count": _count}}) self.lazyCount = cache_info return _count, _sum def sum_and_count(self, field): """ :param field: 需用统计字段field :return: """ allCount, allSum = 0, Money(0) for _queryset in self._iterable: _count, _sum = self._sum_and_count(_queryset, field) allSum += Money(_sum) allCount += _count return allCount, allSum def count(self, with_limit_and_skip = False): cache_info = self.lazyCount allCount = 0 for _queryset in self._iterable: if cache_info.get(_queryset): _count = cache_info.get(_queryset).get("count") else: _count = _queryset.count(with_limit_and_skip) cache_info.update({_queryset: {"count": _count}}) self.lazyCount = cache_info allCount += _count return allCount def __iter__(self): """ 从每一个 customQuerySet 中 分别取出数据 :return: """ for _querySet in self._iterable: for _item in _querySet: yield _item def first(self): for _queryset in self._iterable: record = _queryset.first() if record: logger.debug('find one record in {}'.format(_queryset._document._meta['db_alias'])) return record else: logger.debug('find null record in {}'.format(_queryset._document._meta['db_alias'])) return None def aggregate(self, *pipeline, **kwargs): """ 在跨hotdata节点数据上, 会有重复, 调用方对重复数据做处理 :param pipeline: :param kwargs: :return: """ rv = [] for _queryset in self._iterable: for item in _queryset.aggregate(*pipeline, **kwargs): rv.append(item) return rv class ModelProxy(object): """ 只获取数据,是否应该获取数据有业务逻辑判断 """ _META_MODEL = None _DEFAULT_TIME_FIELD = "dateTimeAdded" def __init__(self, st, et, delta = None, query = None, timeField = None): # 字串时间转换为时间对象 方便节点分割 self.st = self.str_to_datetime(st) self.et = self.str_to_datetime(et) self.reverse = True if delta is not None: if delta >= 0: self.et = self.et + datetime.timedelta(seconds = delta) self.reverse = False else: self.reverse = True self.st = self.st - datetime.timedelta(seconds = abs(delta)) if self.st > self.et: raise ReportProxyValueError(u"查询的起始时间超过结束时间") self.query = query or self.default_query_set() self._timeField = timeField or self._DEFAULT_TIME_FIELD @classmethod def default_query_set(cls): return cls._META_MODEL.objects @classmethod def default_hot_data_start_day(cls): if cls._META_MODEL: return ArchivedModelProxyConfig.get_model_hot_data_start_day(cls._META_MODEL.__name__) else: return ArchivedModelProxyConfig.get_model_hot_data_start_day("default") @staticmethod def str_to_datetime(s): # 重新导入datetime 避免多线程引发 _strptime 错误 import datetime try: if isinstance(s, datetime.datetime): return s else: return datetime.datetime.strptime(s, "%Y-%m-%d") except ValueError: raise ReportProxyValueError(u"错误的时间字符串, 格式为 {}".format("%Y-%m-%d")) except TypeError: raise ReportProxyTypeError(u"请输入时间字符串") @staticmethod def adapt_time_field_type(_time): return _time @classmethod def node_time(cls): """ 获取节点时间的日期 :return: """ import datetime start_day = cls.default_hot_data_start_day() nodeTime = datetime.datetime.strptime(start_day, "%Y-%m-%d") if nodeTime > datetime.datetime.now(): nodeTime = datetime.datetime.now() return nodeTime @property def db_alias(self): """ 获取 原始的 db alias :return: """ defaultDbAlias = self.query._document._origin_meta.get("db_alias") return defaultDbAlias def get_db_alias(self, timeStream): """ 根据时间片生成的 db alias :return: """ defaultDbAlias = self.db_alias if not timeStream.archived: return defaultDbAlias else: if defaultDbAlias.endswith(timeStream.year): return defaultDbAlias else: return "{}_{}".format(defaultDbAlias, timeStream.st.year) def split_time(self, st = None, et = None): import datetime st = st or self.st et = et or self.et nodeTime = self.node_time() timeStreamList = list() # 开始的时间在热数据区域 if st >= nodeTime: timeStreamList.append(TimeStream(archived = False, st = st, et = et + datetime.timedelta(days = 1))) return timeStreamList # 时间节点在冷热交替的区域 elif st < nodeTime < et: timeStreamList.append(TimeStream(archived = False, st = nodeTime, et = et + datetime.timedelta(days = 1))) return self.split_time(st = st, et = nodeTime - datetime.timedelta(days = 1)) + timeStreamList # 时间节点都在冷数据区域 这种情况就需要按照年份来区分 elif et <= nodeTime: timeStreamList.append(TimeStream(archived = True, st = st, et = et + datetime.timedelta(days = 1))) return timeStreamList else: return timeStreamList def all(self, **filters): logger.info("ready to find <{}> data, filters is <{}>".format(self.__class__.__name__, filters)) timeField = filters.pop("timeField", None) or self._timeField timeList = self.split_time() timeList.sort(key = lambda x: x.st, reverse = self.reverse) logger.debug('hit data time list is: {}'.format(str(timeList))) records = list() shard_filter = filters.pop('shard_filter', None) for timeStream in timeList: filters.update({ "{}__gte".format(timeField): self.adapt_time_field_type(timeStream.st), "{}__lt".format(timeField): self.adapt_time_field_type(timeStream.et) }) defaultDbAlias = self.db_alias dbAlias = defaultDbAlias if not timeStream.archived else "{}_his".format(defaultDbAlias) if dbAlias == defaultDbAlias: new_model_cls = self._META_MODEL sourceQuerySet = self._META_MODEL.objects # type: CustomQuerySet else: new_model_cls = copy_document_classes( self._META_MODEL, '{}_his'.format(self._META_MODEL.__name__), dbAlias) sourceQuerySet = new_model_cls.objects # type: CustomQuerySet logger.info( "dbAlias = {}, model = {}, filters = {}; shard_filter = {}".format(dbAlias, new_model_cls.__name__, filters, shard_filter)) shard_key = new_model_cls._meta.get('shard_key', tuple()) if shard_key: if shard_filter: filters.update(shard_filter) for _field in shard_key[0:-1]: if _field not in filters and '{}__in'.format(_field) not in filters: logger.warning( 'query in {}, filters = {}, has no shard key.'.format(new_model_cls.__name__, filters)) records.append( self._combine_queryset(sourceQuerySet, **filters).order_by("-{}".format(self._DEFAULT_TIME_FIELD))) if len(records) > 0: return QuerySetProxy(*records) else: logger.error("bad time split, start time is <{}>, end time is <{}>".format(self.st, self.et)) return self._META_MODEL.objects.none() @staticmethod def _combine_queryset(queryset, **filters): # type:(CustomQuerySet, dict) -> CustomQuerySet """ 整合 过滤筛选条件 将only 以及 search等字段处理掉 searchKey: 搜索字符串 only: 查询有限的字段 list ['_id', "dateTimeAdded"...] hint: 指定索引字段 :param filters: :return: """ queryset_new = queryset.clone() _searchKey = filters.pop("searchKey", None) if _searchKey: queryset_new = queryset_new.search(_searchKey) _only = filters.pop("only", None) if _only: queryset_new = queryset_new.only(*_only) _exclude = filters.pop('exclude', None) if _exclude: queryset_new = queryset_new.exclude(*_exclude) _hint = filters.pop("hint", None) if _hint: queryset_new = queryset_new.hint(_hint) return queryset_new.filter(**filters) @classmethod def get_one(cls, startTime = None, endTime = None, **kwargs): # type:(str, str, Dict[str, Any]) -> Optional[ConsumeRecord, RechargeRecord, DealerDailyStat, GroupDailyStat, DeviceDailyStat, GroupReport, DealerReport, DevReport] """ 获取 单个记录 时间片用户获取数据的数据库 通过主键获取 :param kwargs: :param startTime: :param endTime: :return: """ import datetime try: delta = None if 'id' in kwargs: endTime = ObjectId(kwargs.get('id')).generation_time.astimezone(tz.gettz(settings.TIME_ZONE)).replace( tzinfo = None) startTime = endTime endTime = endTime + datetime.timedelta(seconds = 60) delta = -3 * 24 * 60 * 60 elif 'foreign_id' in kwargs: startTime = ObjectId(kwargs.pop('foreign_id')).generation_time.astimezone( tz.gettz(settings.TIME_ZONE)).replace(tzinfo = None) endTime = datetime.datetime.now() delta = 0 else: if not startTime and not endTime: startTime = Const.QUERY_START_DATE endTime = datetime.datetime.now().strftime('%Y-%m-%d') elif not startTime: startTime = Const.QUERY_START_DATE else: endTime = datetime.datetime.now().strftime('%Y-%m-%d') delta = 0 query_set_proxy = cls(st = startTime, et = endTime, delta = delta).all(**kwargs) # type: QuerySetProxy return query_set_proxy.first() except ValidationError: logger.error("get one record kwargs=<{}>, time is <{}-{}> error pk".format(kwargs, startTime, endTime)) record = None except DoesNotExist: logger.error("get one record kwargs=<{}>, time is <{}-{}> not exist pk".format(kwargs, startTime, endTime)) record = None except Exception as e: logger.exception( "get one record kwargs=<{}>, time is <{} to {}> error {}".format(kwargs, startTime, endTime, e)) record = None return record @classmethod def get_data_list(cls, startTime = None, endTime = None, **kwargs): # type:(str, str, Dict[str, Any]) -> CustomQuerySet """ 类似于filter 的实现 有可能会涉及多个数据库的查询,因此首先判断下节点时间 根据节点时间的不通决定是否需要多个数据库的访问 :param startTime: :param endTime: :param kwargs: :return: """ import datetime delta = None # 如果有id列表, 则通过ID获取最小和最大时间 if 'id__in' in kwargs: id_list = kwargs.get('id__in') _time_list = [] for _id in id_list: _time = ObjectId(_id).generation_time.astimezone(tz.gettz(settings.TIME_ZONE)).replace(tzinfo = None) _time_list.append(_time) startTime = min(_time_list) endTime = max(_time_list) delta = -3 * 24 * 60 * 60 if not startTime and not endTime: startTime = Const.QUERY_START_DATE endTime = datetime.datetime.now().strftime('%Y-%m-%d') elif not startTime: startTime = Const.QUERY_START_DATE elif not endTime: endTime = datetime.datetime.now().strftime('%Y-%m-%d') delta = 0 return cls(st = startTime, et = endTime, delta = delta).all(**kwargs) @classmethod def current_db_proxy(cls): return cls(st = cls.node_time(), et = datetime.datetime.now()) class DealerDailyStatsModelProxy(ModelProxy): _DEFAULT_TIME_FIELD = "date" _META_MODEL = DealerDailyStat @staticmethod def adapt_time_field_type(_time): return datetime.datetime.strftime(_time, "%Y-%m-%d") @staticmethod def income_statistic_key(sourceName): return "daily.income.{}".format(sourceName) @staticmethod def consumption_statistic_key(sourceName): return "daily.consumption.{}".format(sourceName) @classmethod def get_stats_as_month(cls, dealerIds, startDay, endDay, project, aggregateMap): # type:(List[str], str, str, tuple, dict) -> dict pipeline = DealerDailyStat.prepare_group_pipeline(project = project) groupKey = DealerDailyStat.month_group_key() groupKey.update(aggregateMap) _group = { "$group": groupKey } pipeline.append(_group) rv = {} proxies = cls(st = startDay, et = endDay) items = proxies.all(dealerId__in = dealerIds).aggregate(*pipeline) for item in items: key = "{year}-{month:02d}".format(**item.get("_id")) value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))]) rv[key] = value return rv @classmethod def get_stats_as_day(cls, dealerIds, startDay, endDay, project, aggregateMap): # type:(List[str], str, str, tuple, dict) -> dict pipeline = DealerDailyStat.prepare_group_pipeline(project = project, hasDay = True) groupKey = DealerDailyStat.day_group_key() groupKey.update(aggregateMap) _group = { "$group": groupKey } pipeline.append(_group) rv = collections.OrderedDict() for day in get_date_range(startDay, endDay): rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict() proxies = cls(st = startDay, et = endDay) items = proxies.all(dealerId__in = dealerIds).aggregate(*pipeline) for item in items: key = "{year}-{month:02d}-{day:02d}".format(**item.get("_id")) value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))]) rv[key] = value return rv @classmethod def get_stats_as_dealer(cls, dealerIds, startDay, endDay, project, aggregateMap): pipeline = DealerDailyStat.prepare_group_pipeline(project = project, needTime = False) groupKey = DealerDailyStat.dealer_group_key() groupKey.update(aggregateMap) _group = { "$group": groupKey } pipeline.append(_group) query_set_proxy = cls(st = startDay, et = endDay).all(dealerId__in = dealerIds) # type: QuerySetProxy items = query_set_proxy.aggregate(*pipeline) rv = {} for item in items: key = str(item.get('_id')) value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))]) rv[key] = value return rv @classmethod def get_total_income_as_dealer(cls, dealerIds, startDay, endDay): return cls.get_stats_as_dealer( dealerIds = dealerIds, startDay = startDay, endDay = endDay, project = ('dealerId', 'daily.totalIncome',), aggregateMap = { 'totalIncome': {'$sum': '$daily.totalIncome'} }) @classmethod def get_one_month_income(cls, dealerId, monthStr): # type:(ObjectId, str) -> RMB """ 获取经销商本月的收益总额 按照之前的逻辑 只有线上的收益才是收益 也就是 充值+充卡+充虚拟卡+广告 :param dealerId: :param monthStr: :return: """ startDay, endDay = get_start_and_end_by_month(monthStr = monthStr) items = cls.get_stats_as_month(dealerIds = [dealerId], startDay = startDay, endDay = endDay, project = ('daily.totalIncome',), aggregateMap = {"totalIncome": {"$sum": "$daily.totalIncome"}}) return RMB(items.get(monthStr, {'totalIncome': Quantity(0)}).get('totalIncome')) @classmethod def get_consume_as_month(cls, dealerIds, startDay, endDay, sources = None): if not sources: sources = DEALER_CONSUMPTION_AGG_KIND.choices() aggregateMap = { format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)} for _sourceName in sources } return cls.get_stats_as_month(dealerIds = dealerIds, startDay = startDay, endDay = endDay, project = ('daily.consumption',), aggregateMap = aggregateMap) @classmethod def get_one_year_consume_as_month(cls, dealerId, yearStr): # type:(ObjectId, str) -> dict """ 通过月份 分组 聚合一年的收益信息 :param dealerId: :param yearStr: :return: """ aggregateMap = {format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)} for _sourceName in DEALER_CONSUMPTION_AGG_KIND.choices()} startDay, endDay = get_start_and_end_by_year(yearStr) return cls.get_stats_as_month([dealerId], startDay, endDay, ('daily.consumption',), aggregateMap) @classmethod def get_one_year_income_as_month(cls, dealerId, yearStr): # type:(ObjectId, str) -> dict aggregateMap = {format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)} for _sourceName in DEALER_INCOME_SOURCE.choices()} aggregateMap.update({"totalIncome": {"$sum": "$daily.totalIncome"}}) startDay, endDay = get_start_and_end_by_year(yearStr) return cls.get_stats_as_month([dealerId], startDay, endDay, ('daily.totalIncome', 'daily.income'), aggregateMap) @classmethod def get_days_income_stat(cls, dealerId, monthStr): startDay, endDay = get_start_and_end_by_month(monthStr) endDay = min(datetime.date.today().strftime("%Y-%m-%d"), endDay) statisticRecords = DealerDailyStatsModelProxy.get_data_list( startTime = startDay, endTime = endDay, dealerId = dealerId, exclude = ('origin', '_id', 'hourly', 'dealerId', 'daily.consumption')) rv = collections.OrderedDict() for day in get_date_range(startDay, endDay): rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict() for _stat in statisticRecords: # type: DealerDailyStat rv.update({_stat['date']: _stat.daily}) return rv @classmethod def get_days_consume_stat(cls, dealerId, monthStr): startDay, endDay = get_start_and_end_by_month(monthStr) endDay = min(datetime.date.today().strftime("%Y-%m-%d"), endDay) statisticRecords = DealerDailyStatsModelProxy.get_data_list( startTime = startDay, endTime = endDay, dealerId = dealerId, exclude = ('_id', 'dealerId', 'hourly', 'origin', 'daily.income', 'daily.totalIncome')) rv = collections.OrderedDict() for day in get_date_range(startDay, endDay): rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict() for _stat in statisticRecords: # type: DealerDailyStat rv.update({_stat['date']: _stat.daily}) return rv class GroupDailyStatsModelProxy(ModelProxy): _DEFAULT_TIME_FIELD = "date" _META_MODEL = GroupDailyStat @staticmethod def adapt_time_field_type(_time): return datetime.datetime.strftime(_time, "%Y-%m-%d") @classmethod def get_groups_statistic(cls, groupIds, startDate, endDate, statisticKey): if not groupIds: return [] statisticKey.update({"_id": "$groupId"}) resultDict = dict() for _item in cls(st = startDate, et = endDate).all( groupId__in = [ObjectId(_id) for _id in groupIds]).aggregate({"$group": statisticKey}): groupId = _item.pop("_id", None) resultDict[str(groupId)] = merge_with(sum_fn, [_item, resultDict.get(str(groupId), {})]) return resultDict @classmethod def get_groups_income_statistic(cls, groupIds, startDate, endDate, dealerId = None): # type:(list, str, str, str) -> dict """ 获取地址组的收益统计 :param groupIds: :param startDate: :param endDate: :param dealerId: TODO 临时加的参数,对于这种map数据的扁平化查询 由于mongo版本暂不支持 unwind,后续改可优化 :return: """ statisticKey = { format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)} for _sourceName in DEALER_INCOME_SOURCE.choices() } if dealerId: statisticKey.update({ "dealerActualIncome": {"$sum": "$daily.incomeMap.{}".format(dealerId)} }) statisticKey.update({"totalIncome": {"$sum": "$daily.totalIncome"}}) return cls.get_groups_statistic(groupIds, startDate, endDate, statisticKey) @classmethod def get_groups_consumption_statistic(cls, groupIds, startDate, endDate): statisticKey = { format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)} for _sourceName in DEALER_CONSUMPTION_AGG_KIND.choices() } return cls.get_groups_statistic(groupIds, startDate, endDate, statisticKey) @classmethod def get_one_group_income_statistic(cls, groupId, startDate, endDate, dealerId = None): return cls.get_groups_income_statistic([groupId], startDate, endDate, dealerId).get(groupId) @classmethod def get_one_group_consumption_statistic(cls, groupId, startDate, endDate): return cls.get_groups_consumption_statistic([groupId], startDate, endDate).get(groupId) class DeviceDailyStatsModelProxy(ModelProxy): _DEFAULT_TIME_FIELD = "date" _META_MODEL = DeviceDailyStat @staticmethod def adapt_time_field_type(_time): return datetime.datetime.strftime(_time, "%Y-%m-%d") @classmethod def get_devices_statistic(cls, logicalCodes, startDate, endDate, statisticKey): proxies = cls(st = startDate, et = endDate) statisticKey.update({"_id": "$logicalCode"}) resultDict = dict() for _item in proxies.all(logicalCode__in = logicalCodes).aggregate({"$group": statisticKey}): logicalCode = str(_item.pop("_id", None)) resultDict[str(logicalCode)] = merge_with(sum_fn, [_item, resultDict.get(str(logicalCode), {})]) return resultDict @classmethod def get_devices_income_statistic(cls, logicalCodes, startDate, endDate): """ 获取设备的收益统计 :param logicalCodes: :param startDate: :param endDate: :return: """ statisticKey = { format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)} for _sourceName in DEALER_INCOME_SOURCE.choices() } return cls.get_devices_statistic(logicalCodes, startDate, endDate, statisticKey) @classmethod def get_devices_consumption_statistic(cls, logicalCodes, startDate, endDate): """ 获取设备的消费统计 :param logicalCodes: :param startDate: :param endDate: :return: """ statisticKey = { format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)} for _sourceName in DEALER_CONSUMPTION_AGG_KIND.choices() } return cls.get_devices_statistic(logicalCodes, startDate, endDate, statisticKey) class ClientRechargeModelProxy(ModelProxy): _META_MODEL = RechargeRecord class ClientConsumeModelProxy(ModelProxy): _META_MODEL = ConsumeRecord @classmethod def get_not_finished_record(cls, ownerId, openId, devTypeCode, **kwargs): """ 获取用户在该经销商处尚未结束的订单 :param ownerId: :param openId: :param devTypeCode: :return: """ return cls.get_one(ownerId = ownerId, openId = openId, devTypeCode = devTypeCode, status__ne = "finished", isNormal = True, **kwargs) class ClientDealerIncomeModelProxy(ModelProxy): _META_MODEL = DealerIncomeProxy @classmethod def get_one(cls, startTime = None, endTime = None, **kwargs): # type:(str, str, dict) -> Searchable if 'ref_id' in kwargs: return super(ClientDealerIncomeModelProxy, cls).get_one(foreign_id = str(kwargs.get('ref_id')), **kwargs) else: return super(ClientDealerIncomeModelProxy, cls).get_one(startTime = startTime, endTime = endTime, **kwargs) class GroupReportModelProxy(ModelProxy): _DEFAULT_TIME_FIELD = "date" _META_MODEL = GroupReport @staticmethod def adapt_time_field_type(_time): return datetime.datetime.strftime(_time, "%Y-%m-%d") class DealerReportModelProxy(ModelProxy): _DEFAULT_TIME_FIELD = "date" _META_MODEL = DealerReport @staticmethod def adapt_time_field_type(_time): return datetime.datetime.strftime(_time, "%Y-%m-%d") @classmethod def get_year_by_month(cls, dealerId, yearStr): # type:(str, str) -> dict pipeline = cls._META_MODEL.prepare_group_pipeline() groupKey = cls._META_MODEL.month_group_key() groupKey.update({'lineCoins': {'$sum': '$rpt.lineCoins'}, 'count': {'$sum': '$rpt.count'}}) _group = { "$group": groupKey } pipeline.append(_group) rv = {} startDay, endDay = get_start_and_end_by_year(yearStr) now_month = datetime.datetime.now().month proxies = cls(st = startDay, et = endDay) items = proxies.all(ownerId = dealerId, type = 'day').aggregate(*pipeline) for item in items: key = "{year}-{month:02d}".format(**item.get("_id")) value = { 'count': item.get('count', 0), 'lineCoins': item.get('lineCoins', 0) } if int(now_month) == int(item.get('_id').get('month')): todayRpt = Accounting.getOwnerIncome(dealerId, datetime.datetime.now()) value['count'] = value['count'] + todayRpt.get('count', 0) value['lineCoins'] = value['lineCoins'] + todayRpt.get('lineCoins', 0) rv[key] = value return rv class DevReportModelProxy(ModelProxy): _DEFAULT_TIME_FIELD = "date" _META_MODEL = DevReport @staticmethod def adapt_time_field_type(_time): return datetime.datetime.strftime(_time, "%Y-%m-%d")