123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """
- 数据库水平分表的工具 暂时简易处理
- 后续可能会直接修改querySet来代替
- """
- import collections
- import datetime
- import logging
- from collections import namedtuple
- from weakref import WeakKeyDictionary
- from bson import ObjectId
- from cytoolz import merge_with, keyfilter
- from dateutil import tz
- from django.conf import settings
- from mongoengine import ValidationError, DoesNotExist
- from typing import TYPE_CHECKING, Iterable, Mapping, List, Optional, Dict, Any
- from apilib.monetary import RMB, Money
- from apilib.quantity import Quantity
- from apps.common.utils import get_start_and_end_by_month, get_start_and_end_by_year, get_date_range
- from apps.web.constant import DEALER_CONSUMPTION_AGG_KIND, Const
- from apps.web.core.accounting import Accounting
- from apps.web.core.db import copy_document_classes
- from apps.web.core.exceptions import ReportProxyValueError, ReportProxyTypeError
- from apps.web.dealer.define import DEALER_INCOME_SOURCE
- from apps.web.dealer.proxy import DealerIncomeProxy
- from apps.web.models import ArchivedModelProxyConfig
- from apps.web.report.models import DealerDailyStat, GroupDailyStat, DeviceDailyStat, GroupReport, DealerReport, \
- DevReport
- from apps.web.user.models import ConsumeRecord
- from apps.web.user.models import RechargeRecord
- if TYPE_CHECKING:
- from apps.web.core.db import Searchable
- from apps.web.core.db import CustomQuerySet
- logger = logging.getLogger(__name__)
- TimeStream = namedtuple("TimeStream", ["archived", "st", "et"])
- def sum_fn(lst):
- return sum(map(lambda _: Quantity(_), lst), Quantity('0'))
- not_aggreate_id = lambda _: _ != '_id'
- class LazyCount(object):
- def __init__(self):
- self._cache = WeakKeyDictionary()
- def __set__(self, instance, value): # type:(QuerySetProxy, Mapping) -> None
- """"""
- if instance is None:
- raise Exception("can not use <{}>".format(instance))
- # TODO zjl 对value做数据补齐处理
- self._cache[instance] = value
- def __get__(self, instance, owner):
- if instance is None:
- raise Exception("can not use <{}> by <{}>".format(instance, owner))
- return self._cache.get(instance, dict())
- class QuerySetProxy(object):
- """
- 模拟 CustomQuerySet 主要处理分页的问题
- """
- lazyCount = LazyCount()
- def __init__(self, *iterable): # type:(Iterable[CustomQuerySet]) -> None
- """
- :param iterable:
- """
- self._iterable = iterable
- def _get_count(self, queryset):
- _count = self.lazyCount.get(queryset, dict()).get("count")
- if _count is None:
- _count = queryset.count()
- return _count
- def paginate(self, pageIndex, pageSize): # type:(int, int) -> QuerySetProxy
- """
- to CustomQuerySet paginate
- 分页的逻辑 不要使用count count 很慢
- :param pageIndex:
- :param pageSize:
- :return:
- """
- # 需要的数据总量
- total = pageSize * pageIndex
- new_iterable = list()
- for _queryset in self._iterable:
- _count = self._get_count(_queryset)
- # 不需要下一个queryset 参与
- if total <= _count:
- skip = max(0, total - pageSize)
- limit = pageSize if total - pageSize > 0 else total
- new_iterable.append(_queryset.skip(skip).limit(limit))
- break
- # 当前queryset 需要参与 后续的 queryset 也需要参与, 跨页
- elif _count < total < _count + pageSize:
- new_iterable.append(_queryset.skip(max(total - pageSize, 0)).limit(pageSize))
- total = total - _count
- # 当前的queryset不参与
- else:
- total = total - _count
- return self.__class__(*new_iterable)
- def _sum_and_count(self, queryset, field):
- cache_info = self.lazyCount
- if cache_info.get(queryset):
- _sum = cache_info.get(queryset).get("sum")
- _count = cache_info.get(queryset).get("count")
- else:
- _count, _sum = queryset.sum_and_count(field)
- cache_info.update({queryset: {"sum": _sum, "count": _count}})
- self.lazyCount = cache_info
- return _count, _sum
- def sum_and_count(self, field):
- """
- :param field: 需用统计字段field
- :return:
- """
- allCount, allSum = 0, Money(0)
- for _queryset in self._iterable:
- _count, _sum = self._sum_and_count(_queryset, field)
- allSum += Money(_sum)
- allCount += _count
- return allCount, allSum
- def count(self, with_limit_and_skip = False):
- cache_info = self.lazyCount
- allCount = 0
- for _queryset in self._iterable:
- if cache_info.get(_queryset):
- _count = cache_info.get(_queryset).get("count")
- else:
- _count = _queryset.count(with_limit_and_skip)
- cache_info.update({_queryset: {"count": _count}})
- self.lazyCount = cache_info
- allCount += _count
- return allCount
- def __iter__(self):
- """
- 从每一个 customQuerySet 中 分别取出数据
- :return:
- """
- for _querySet in self._iterable:
- for _item in _querySet:
- yield _item
- def first(self):
- for _queryset in self._iterable:
- record = _queryset.first()
- if record:
- logger.debug('find one record in {}'.format(_queryset._document._meta['db_alias']))
- return record
- else:
- logger.debug('find null record in {}'.format(_queryset._document._meta['db_alias']))
- return None
- def aggregate(self, *pipeline, **kwargs):
- """
- 在跨hotdata节点数据上, 会有重复, 调用方对重复数据做处理
- :param pipeline:
- :param kwargs:
- :return:
- """
- rv = []
- for _queryset in self._iterable:
- for item in _queryset.aggregate(*pipeline, **kwargs):
- rv.append(item)
- return rv
- class ModelProxy(object):
- """
- 只获取数据,是否应该获取数据有业务逻辑判断
- """
- _META_MODEL = None
- _DEFAULT_TIME_FIELD = "dateTimeAdded"
- def __init__(self, st, et, delta = None, query = None, timeField = None):
- # 字串时间转换为时间对象 方便节点分割
- self.st = self.str_to_datetime(st)
- self.et = self.str_to_datetime(et)
- self.reverse = True
- if delta is not None:
- if delta >= 0:
- self.et = self.et + datetime.timedelta(seconds = delta)
- self.reverse = False
- else:
- self.reverse = True
- self.st = self.st - datetime.timedelta(seconds = abs(delta))
- if self.st > self.et:
- raise ReportProxyValueError(u"查询的起始时间超过结束时间")
- self.query = query or self.default_query_set()
- self._timeField = timeField or self._DEFAULT_TIME_FIELD
- @classmethod
- def default_query_set(cls):
- return cls._META_MODEL.objects
- @classmethod
- def default_hot_data_start_day(cls):
- if cls._META_MODEL:
- return ArchivedModelProxyConfig.get_model_hot_data_start_day(cls._META_MODEL.__name__)
- else:
- return ArchivedModelProxyConfig.get_model_hot_data_start_day("default")
- @staticmethod
- def str_to_datetime(s):
- # 重新导入datetime 避免多线程引发 _strptime 错误
- import datetime
- try:
- if isinstance(s, datetime.datetime):
- return s
- else:
- return datetime.datetime.strptime(s, "%Y-%m-%d")
- except ValueError:
- raise ReportProxyValueError(u"错误的时间字符串, 格式为 {}".format("%Y-%m-%d"))
- except TypeError:
- raise ReportProxyTypeError(u"请输入时间字符串")
- @staticmethod
- def adapt_time_field_type(_time):
- return _time
- @classmethod
- def node_time(cls):
- """
- 获取节点时间的日期
- :return:
- """
- import datetime
- start_day = cls.default_hot_data_start_day()
- nodeTime = datetime.datetime.strptime(start_day, "%Y-%m-%d")
- if nodeTime > datetime.datetime.now():
- nodeTime = datetime.datetime.now()
- return nodeTime
- @property
- def db_alias(self):
- """
- 获取 原始的 db alias
- :return:
- """
- defaultDbAlias = self.query._document._origin_meta.get("db_alias")
- return defaultDbAlias
- def get_db_alias(self, timeStream):
- """
- 根据时间片生成的 db alias
- :return:
- """
- defaultDbAlias = self.db_alias
- if not timeStream.archived:
- return defaultDbAlias
- else:
- if defaultDbAlias.endswith(timeStream.year):
- return defaultDbAlias
- else:
- return "{}_{}".format(defaultDbAlias, timeStream.st.year)
- def split_time(self, st = None, et = None):
- import datetime
- st = st or self.st
- et = et or self.et
- nodeTime = self.node_time()
- timeStreamList = list()
- # 开始的时间在热数据区域
- if st >= nodeTime:
- timeStreamList.append(TimeStream(archived = False, st = st, et = et + datetime.timedelta(days = 1)))
- return timeStreamList
- # 时间节点在冷热交替的区域
- elif st < nodeTime < et:
- timeStreamList.append(TimeStream(archived = False, st = nodeTime, et = et + datetime.timedelta(days = 1)))
- return self.split_time(st = st, et = nodeTime - datetime.timedelta(days = 1)) + timeStreamList
- # 时间节点都在冷数据区域 这种情况就需要按照年份来区分
- elif et <= nodeTime:
- timeStreamList.append(TimeStream(archived = True, st = st, et = et + datetime.timedelta(days = 1)))
- return timeStreamList
- else:
- return timeStreamList
- def all(self, **filters):
- logger.info("ready to find <{}> data, filters is <{}>".format(self.__class__.__name__, filters))
- timeField = filters.pop("timeField", None) or self._timeField
- timeList = self.split_time()
- timeList.sort(key = lambda x: x.st, reverse = self.reverse)
- logger.debug('hit data time list is: {}'.format(str(timeList)))
- records = list()
- shard_filter = filters.pop('shard_filter', None)
- for timeStream in timeList:
- filters.update({
- "{}__gte".format(timeField): self.adapt_time_field_type(timeStream.st),
- "{}__lt".format(timeField): self.adapt_time_field_type(timeStream.et)
- })
- defaultDbAlias = self.db_alias
- dbAlias = defaultDbAlias if not timeStream.archived else "{}_his".format(defaultDbAlias)
- if dbAlias == defaultDbAlias:
- new_model_cls = self._META_MODEL
- sourceQuerySet = self._META_MODEL.objects # type: CustomQuerySet
- else:
- new_model_cls = copy_document_classes(
- self._META_MODEL, '{}_his'.format(self._META_MODEL.__name__),
- dbAlias)
- sourceQuerySet = new_model_cls.objects # type: CustomQuerySet
- logger.info(
- "dbAlias = {}, model = {}, filters = {}; shard_filter = {}".format(dbAlias, new_model_cls.__name__,
- filters, shard_filter))
- shard_key = new_model_cls._meta.get('shard_key', tuple())
- if shard_key:
- if shard_filter:
- filters.update(shard_filter)
- for _field in shard_key[0:-1]:
- if _field not in filters and '{}__in'.format(_field) not in filters:
- logger.warning(
- 'query in {}, filters = {}, has no shard key.'.format(new_model_cls.__name__, filters))
- records.append(
- self._combine_queryset(sourceQuerySet, **filters).order_by("-{}".format(self._DEFAULT_TIME_FIELD)))
- if len(records) > 0:
- return QuerySetProxy(*records)
- else:
- logger.error("bad time split, start time is <{}>, end time is <{}>".format(self.st, self.et))
- return self._META_MODEL.objects.none()
- @staticmethod
- def _combine_queryset(queryset, **filters):
- # type:(CustomQuerySet, dict) -> CustomQuerySet
- """
- 整合 过滤筛选条件 将only 以及 search等字段处理掉
- searchKey: 搜索字符串
- only: 查询有限的字段 list ['_id', "dateTimeAdded"...]
- hint: 指定索引字段
- :param filters:
- :return:
- """
- queryset_new = queryset.clone()
- _searchKey = filters.pop("searchKey", None)
- if _searchKey:
- queryset_new = queryset_new.search(_searchKey)
- _only = filters.pop("only", None)
- if _only:
- queryset_new = queryset_new.only(*_only)
- _exclude = filters.pop('exclude', None)
- if _exclude:
- queryset_new = queryset_new.exclude(*_exclude)
- _hint = filters.pop("hint", None)
- if _hint:
- queryset_new = queryset_new.hint(_hint)
- return queryset_new.filter(**filters)
- @classmethod
- def get_one(cls, startTime = None, endTime = None, **kwargs):
- # type:(str, str, Dict[str, Any]) -> Optional[ConsumeRecord, RechargeRecord, DealerDailyStat, GroupDailyStat, DeviceDailyStat, GroupReport, DealerReport, DevReport]
- """
- 获取 单个记录 时间片用户获取数据的数据库 通过主键获取
- :param kwargs:
- :param startTime:
- :param endTime:
- :return:
- """
- import datetime
- try:
- delta = None
- if 'id' in kwargs:
- endTime = ObjectId(kwargs.get('id')).generation_time.astimezone(tz.gettz(settings.TIME_ZONE)).replace(
- tzinfo = None)
- startTime = endTime
- endTime = endTime + datetime.timedelta(seconds = 60)
- delta = -3 * 24 * 60 * 60
- elif 'foreign_id' in kwargs:
- startTime = ObjectId(kwargs.pop('foreign_id')).generation_time.astimezone(
- tz.gettz(settings.TIME_ZONE)).replace(tzinfo = None)
- endTime = datetime.datetime.now()
- delta = 0
- else:
- if not startTime and not endTime:
- startTime = Const.QUERY_START_DATE
- endTime = datetime.datetime.now().strftime('%Y-%m-%d')
- elif not startTime:
- startTime = Const.QUERY_START_DATE
- else:
- endTime = datetime.datetime.now().strftime('%Y-%m-%d')
- delta = 0
- query_set_proxy = cls(st = startTime, et = endTime, delta = delta).all(**kwargs) # type: QuerySetProxy
- return query_set_proxy.first()
- except ValidationError:
- logger.error("get one record kwargs=<{}>, time is <{}-{}> error pk".format(kwargs, startTime, endTime))
- record = None
- except DoesNotExist:
- logger.error("get one record kwargs=<{}>, time is <{}-{}> not exist pk".format(kwargs, startTime, endTime))
- record = None
- except Exception as e:
- logger.exception(
- "get one record kwargs=<{}>, time is <{} to {}> error {}".format(kwargs, startTime, endTime, e))
- record = None
- return record
- @classmethod
- def get_data_list(cls, startTime = None, endTime = None,
- **kwargs): # type:(str, str, Dict[str, Any]) -> CustomQuerySet
- """
- 类似于filter 的实现
- 有可能会涉及多个数据库的查询,因此首先判断下节点时间 根据节点时间的不通决定是否需要多个数据库的访问
- :param startTime:
- :param endTime:
- :param kwargs:
- :return:
- """
- import datetime
- delta = None
- # 如果有id列表, 则通过ID获取最小和最大时间
- if 'id__in' in kwargs:
- id_list = kwargs.get('id__in')
- _time_list = []
- for _id in id_list:
- _time = ObjectId(_id).generation_time.astimezone(tz.gettz(settings.TIME_ZONE)).replace(tzinfo = None)
- _time_list.append(_time)
- startTime = min(_time_list)
- endTime = max(_time_list)
- delta = -3 * 24 * 60 * 60
- if not startTime and not endTime:
- startTime = Const.QUERY_START_DATE
- endTime = datetime.datetime.now().strftime('%Y-%m-%d')
- elif not startTime:
- startTime = Const.QUERY_START_DATE
- elif not endTime:
- endTime = datetime.datetime.now().strftime('%Y-%m-%d')
- delta = 0
- return cls(st = startTime, et = endTime, delta = delta).all(**kwargs)
- @classmethod
- def current_db_proxy(cls):
- return cls(st = cls.node_time(), et = datetime.datetime.now())
- class DealerDailyStatsModelProxy(ModelProxy):
- _DEFAULT_TIME_FIELD = "date"
- _META_MODEL = DealerDailyStat
- @staticmethod
- def adapt_time_field_type(_time):
- return datetime.datetime.strftime(_time, "%Y-%m-%d")
- @staticmethod
- def income_statistic_key(sourceName):
- return "daily.income.{}".format(sourceName)
- @staticmethod
- def consumption_statistic_key(sourceName):
- return "daily.consumption.{}".format(sourceName)
- @classmethod
- def get_stats_as_month(cls, dealerIds, startDay, endDay, project, aggregateMap):
- # type:(List[str], str, str, tuple, dict) -> dict
- pipeline = DealerDailyStat.prepare_group_pipeline(project = project)
- groupKey = DealerDailyStat.month_group_key()
- groupKey.update(aggregateMap)
- _group = {
- "$group": groupKey
- }
- pipeline.append(_group)
- rv = {}
- proxies = cls(st = startDay, et = endDay)
- items = proxies.all(dealerId__in = dealerIds).aggregate(*pipeline)
- for item in items:
- key = "{year}-{month:02d}".format(**item.get("_id"))
- value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))])
- rv[key] = value
- return rv
- @classmethod
- def get_stats_as_day(cls, dealerIds, startDay, endDay, project, aggregateMap):
- # type:(List[str], str, str, tuple, dict) -> dict
- pipeline = DealerDailyStat.prepare_group_pipeline(project = project, hasDay = True)
- groupKey = DealerDailyStat.day_group_key()
- groupKey.update(aggregateMap)
- _group = {
- "$group": groupKey
- }
- pipeline.append(_group)
- rv = collections.OrderedDict()
- for day in get_date_range(startDay, endDay):
- rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict()
- proxies = cls(st = startDay, et = endDay)
- items = proxies.all(dealerId__in = dealerIds).aggregate(*pipeline)
- for item in items:
- key = "{year}-{month:02d}-{day:02d}".format(**item.get("_id"))
- value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))])
- rv[key] = value
- return rv
- @classmethod
- def get_stats_as_dealer(cls, dealerIds, startDay, endDay, project, aggregateMap):
- pipeline = DealerDailyStat.prepare_group_pipeline(project = project, needTime = False)
- groupKey = DealerDailyStat.dealer_group_key()
- groupKey.update(aggregateMap)
- _group = {
- "$group": groupKey
- }
- pipeline.append(_group)
- query_set_proxy = cls(st = startDay, et = endDay).all(dealerId__in = dealerIds) # type: QuerySetProxy
- items = query_set_proxy.aggregate(*pipeline)
- rv = {}
- for item in items:
- key = str(item.get('_id'))
- value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))])
- rv[key] = value
- return rv
- @classmethod
- def get_total_income_as_dealer(cls, dealerIds, startDay, endDay):
- return cls.get_stats_as_dealer(
- dealerIds = dealerIds,
- startDay = startDay,
- endDay = endDay,
- project = ('dealerId', 'daily.totalIncome',),
- aggregateMap = {
- 'totalIncome': {'$sum': '$daily.totalIncome'}
- })
- @classmethod
- def get_one_month_income(cls, dealerId, monthStr):
- # type:(ObjectId, str) -> RMB
- """
- 获取经销商本月的收益总额 按照之前的逻辑 只有线上的收益才是收益 也就是 充值+充卡+充虚拟卡+广告
- :param dealerId:
- :param monthStr:
- :return:
- """
- startDay, endDay = get_start_and_end_by_month(monthStr = monthStr)
- items = cls.get_stats_as_month(dealerIds = [dealerId],
- startDay = startDay,
- endDay = endDay,
- project = ('daily.totalIncome',),
- aggregateMap = {"totalIncome": {"$sum": "$daily.totalIncome"}})
- return RMB(items.get(monthStr, {'totalIncome': Quantity(0)}).get('totalIncome'))
- @classmethod
- def get_consume_as_month(cls, dealerIds, startDay, endDay, sources = None):
- if not sources:
- sources = DEALER_CONSUMPTION_AGG_KIND.choices()
- aggregateMap = {
- format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)} for _sourceName in sources
- }
- return cls.get_stats_as_month(dealerIds = dealerIds,
- startDay = startDay,
- endDay = endDay,
- project = ('daily.consumption',),
- aggregateMap = aggregateMap)
- @classmethod
- def get_one_year_consume_as_month(cls, dealerId, yearStr):
- # type:(ObjectId, str) -> dict
- """
- 通过月份 分组 聚合一年的收益信息
- :param dealerId:
- :param yearStr:
- :return:
- """
- aggregateMap = {format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)} for _sourceName in
- DEALER_CONSUMPTION_AGG_KIND.choices()}
- startDay, endDay = get_start_and_end_by_year(yearStr)
- return cls.get_stats_as_month([dealerId], startDay, endDay, ('daily.consumption',), aggregateMap)
- @classmethod
- def get_one_year_income_as_month(cls, dealerId, yearStr):
- # type:(ObjectId, str) -> dict
- aggregateMap = {format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)} for _sourceName in
- DEALER_INCOME_SOURCE.choices()}
- aggregateMap.update({"totalIncome": {"$sum": "$daily.totalIncome"}})
- startDay, endDay = get_start_and_end_by_year(yearStr)
- return cls.get_stats_as_month([dealerId], startDay, endDay, ('daily.totalIncome', 'daily.income'), aggregateMap)
- @classmethod
- def get_days_income_stat(cls, dealerId, monthStr):
- startDay, endDay = get_start_and_end_by_month(monthStr)
- endDay = min(datetime.date.today().strftime("%Y-%m-%d"), endDay)
- statisticRecords = DealerDailyStatsModelProxy.get_data_list(
- startTime = startDay, endTime = endDay, dealerId = dealerId,
- exclude = ('origin', '_id', 'hourly', 'dealerId', 'daily.consumption'))
- rv = collections.OrderedDict()
- for day in get_date_range(startDay, endDay):
- rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict()
- for _stat in statisticRecords: # type: DealerDailyStat
- rv.update({_stat['date']: _stat.daily})
- return rv
- @classmethod
- def get_days_consume_stat(cls, dealerId, monthStr):
- startDay, endDay = get_start_and_end_by_month(monthStr)
- endDay = min(datetime.date.today().strftime("%Y-%m-%d"), endDay)
- statisticRecords = DealerDailyStatsModelProxy.get_data_list(
- startTime = startDay, endTime = endDay, dealerId = dealerId,
- exclude = ('_id', 'dealerId', 'hourly', 'origin', 'daily.income', 'daily.totalIncome'))
- rv = collections.OrderedDict()
- for day in get_date_range(startDay, endDay):
- rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict()
- for _stat in statisticRecords: # type: DealerDailyStat
- rv.update({_stat['date']: _stat.daily})
- return rv
- class GroupDailyStatsModelProxy(ModelProxy):
- _DEFAULT_TIME_FIELD = "date"
- _META_MODEL = GroupDailyStat
- @staticmethod
- def adapt_time_field_type(_time):
- return datetime.datetime.strftime(_time, "%Y-%m-%d")
- @classmethod
- def get_groups_statistic(cls, groupIds, startDate, endDate, statisticKey):
- if not groupIds:
- return []
- statisticKey.update({"_id": "$groupId"})
- resultDict = dict()
- for _item in cls(st = startDate, et = endDate).all(
- groupId__in = [ObjectId(_id) for _id in groupIds]).aggregate({"$group": statisticKey}):
- groupId = _item.pop("_id", None)
- resultDict[str(groupId)] = merge_with(sum_fn, [_item, resultDict.get(str(groupId), {})])
- return resultDict
- @classmethod
- def get_groups_income_statistic(cls, groupIds, startDate, endDate, dealerId = None):
- # type:(list, str, str, str) -> dict
- """
- 获取地址组的收益统计
- :param groupIds:
- :param startDate:
- :param endDate:
- :param dealerId: TODO 临时加的参数,对于这种map数据的扁平化查询 由于mongo版本暂不支持 unwind,后续改可优化
- :return:
- """
- statisticKey = {
- format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)}
- for _sourceName in DEALER_INCOME_SOURCE.choices()
- }
- if dealerId:
- statisticKey.update({
- "dealerActualIncome": {"$sum": "$daily.incomeMap.{}".format(dealerId)}
- })
- statisticKey.update({"totalIncome": {"$sum": "$daily.totalIncome"}})
- return cls.get_groups_statistic(groupIds, startDate, endDate, statisticKey)
- @classmethod
- def get_groups_consumption_statistic(cls, groupIds, startDate, endDate):
- statisticKey = {
- format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)}
- for _sourceName in DEALER_CONSUMPTION_AGG_KIND.choices()
- }
- return cls.get_groups_statistic(groupIds, startDate, endDate, statisticKey)
- @classmethod
- def get_one_group_income_statistic(cls, groupId, startDate, endDate, dealerId = None):
- return cls.get_groups_income_statistic([groupId], startDate, endDate, dealerId).get(groupId)
- @classmethod
- def get_one_group_consumption_statistic(cls, groupId, startDate, endDate):
- return cls.get_groups_consumption_statistic([groupId], startDate, endDate).get(groupId)
- class DeviceDailyStatsModelProxy(ModelProxy):
- _DEFAULT_TIME_FIELD = "date"
- _META_MODEL = DeviceDailyStat
- @staticmethod
- def adapt_time_field_type(_time):
- return datetime.datetime.strftime(_time, "%Y-%m-%d")
- @classmethod
- def get_devices_statistic(cls, logicalCodes, startDate, endDate, statisticKey):
- proxies = cls(st = startDate, et = endDate)
- statisticKey.update({"_id": "$logicalCode"})
- resultDict = dict()
- for _item in proxies.all(logicalCode__in = logicalCodes).aggregate({"$group": statisticKey}):
- logicalCode = str(_item.pop("_id", None))
- resultDict[str(logicalCode)] = merge_with(sum_fn, [_item, resultDict.get(str(logicalCode), {})])
- return resultDict
- @classmethod
- def get_devices_income_statistic(cls, logicalCodes, startDate, endDate):
- """
- 获取设备的收益统计
- :param logicalCodes:
- :param startDate:
- :param endDate:
- :return:
- """
- statisticKey = {
- format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)}
- for _sourceName in DEALER_INCOME_SOURCE.choices()
- }
- return cls.get_devices_statistic(logicalCodes, startDate, endDate, statisticKey)
- @classmethod
- def get_devices_consumption_statistic(cls, logicalCodes, startDate, endDate):
- """
- 获取设备的消费统计
- :param logicalCodes:
- :param startDate:
- :param endDate:
- :return:
- """
- statisticKey = {
- format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)}
- for _sourceName in DEALER_CONSUMPTION_AGG_KIND.choices()
- }
- return cls.get_devices_statistic(logicalCodes, startDate, endDate, statisticKey)
- class ClientRechargeModelProxy(ModelProxy):
- _META_MODEL = RechargeRecord
- class ClientConsumeModelProxy(ModelProxy):
- _META_MODEL = ConsumeRecord
- @classmethod
- def get_not_finished_record(cls, ownerId, openId, devTypeCode, **kwargs):
- """
- 获取用户在该经销商处尚未结束的订单
- :param ownerId:
- :param openId:
- :param devTypeCode:
- :return:
- """
- return cls.get_one(ownerId = ownerId,
- openId = openId,
- devTypeCode = devTypeCode,
- status__ne = "finished",
- isNormal = True,
- **kwargs)
- class ClientDealerIncomeModelProxy(ModelProxy):
- _META_MODEL = DealerIncomeProxy
- @classmethod
- def get_one(cls, startTime = None, endTime = None, **kwargs): # type:(str, str, dict) -> DealerIncomeProxy
- if 'ref_id' in kwargs:
- return super(ClientDealerIncomeModelProxy, cls).get_one(foreign_id = str(kwargs.get('ref_id')), **kwargs)
- else:
- return super(ClientDealerIncomeModelProxy, cls).get_one(startTime = startTime, endTime = endTime, **kwargs)
- class GroupReportModelProxy(ModelProxy):
- _DEFAULT_TIME_FIELD = "date"
- _META_MODEL = GroupReport
- @staticmethod
- def adapt_time_field_type(_time):
- return datetime.datetime.strftime(_time, "%Y-%m-%d")
- class DealerReportModelProxy(ModelProxy):
- _DEFAULT_TIME_FIELD = "date"
- _META_MODEL = DealerReport
- @staticmethod
- def adapt_time_field_type(_time):
- return datetime.datetime.strftime(_time, "%Y-%m-%d")
- @classmethod
- def get_year_by_month(cls, dealerId, yearStr):
- # type:(str, str) -> dict
- pipeline = cls._META_MODEL.prepare_group_pipeline()
- groupKey = cls._META_MODEL.month_group_key()
- groupKey.update({'lineCoins': {'$sum': '$rpt.lineCoins'}, 'count': {'$sum': '$rpt.count'}})
- _group = {
- "$group": groupKey
- }
- pipeline.append(_group)
- rv = {}
- startDay, endDay = get_start_and_end_by_year(yearStr)
- now_month = datetime.datetime.now().month
- proxies = cls(st = startDay, et = endDay)
- items = proxies.all(ownerId = dealerId, type = 'day').aggregate(*pipeline)
- for item in items:
- key = "{year}-{month:02d}".format(**item.get("_id"))
- value = {
- 'count': item.get('count', 0),
- 'lineCoins': item.get('lineCoins', 0)
- }
- if int(now_month) == int(item.get('_id').get('month')):
- todayRpt = Accounting.getOwnerIncome(dealerId, datetime.datetime.now())
- value['count'] = value['count'] + todayRpt.get('count', 0)
- value['lineCoins'] = value['lineCoins'] + todayRpt.get('lineCoins', 0)
- rv[key] = value
- return rv
- class DevReportModelProxy(ModelProxy):
- _DEFAULT_TIME_FIELD = "date"
- _META_MODEL = DevReport
- @staticmethod
- def adapt_time_field_type(_time):
- return datetime.datetime.strftime(_time, "%Y-%m-%d")
|