proxy.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. 数据库水平分表的工具 暂时简易处理
  5. 后续可能会直接修改querySet来代替
  6. """
  7. import collections
  8. import datetime
  9. import logging
  10. from collections import namedtuple
  11. from weakref import WeakKeyDictionary
  12. from bson import ObjectId
  13. from cytoolz import merge_with, keyfilter
  14. from dateutil import tz
  15. from django.conf import settings
  16. from mongoengine import ValidationError, DoesNotExist
  17. from typing import TYPE_CHECKING, Iterable, Mapping, List, Optional, Dict, Any
  18. from apilib.monetary import RMB, Money
  19. from apilib.quantity import Quantity
  20. from apps.common.utils import get_start_and_end_by_month, get_start_and_end_by_year, get_date_range
  21. from apps.web.constant import DEALER_CONSUMPTION_AGG_KIND, Const
  22. from apps.web.core.accounting import Accounting
  23. from apps.web.core.db import copy_document_classes
  24. from apps.web.core.exceptions import ReportProxyValueError, ReportProxyTypeError
  25. from apps.web.dealer.define import DEALER_INCOME_SOURCE
  26. from apps.web.dealer.proxy import DealerIncomeProxy
  27. from apps.web.models import ArchivedModelProxyConfig
  28. from apps.web.report.models import DealerDailyStat, GroupDailyStat, DeviceDailyStat, GroupReport, DealerReport, \
  29. DevReport
  30. from apps.web.user.models import ConsumeRecord
  31. from apps.web.user.models import RechargeRecord
  32. if TYPE_CHECKING:
  33. from apps.web.core.db import Searchable
  34. from apps.web.core.db import CustomQuerySet
  35. logger = logging.getLogger(__name__)
  36. TimeStream = namedtuple("TimeStream", ["archived", "st", "et"])
  37. def sum_fn(lst):
  38. return sum(map(lambda _: Quantity(_), lst), Quantity('0'))
  39. not_aggreate_id = lambda _: _ != '_id'
  40. class LazyCount(object):
  41. def __init__(self):
  42. self._cache = WeakKeyDictionary()
  43. def __set__(self, instance, value): # type:(QuerySetProxy, Mapping) -> None
  44. """"""
  45. if instance is None:
  46. raise Exception("can not use <{}>".format(instance))
  47. # TODO zjl 对value做数据补齐处理
  48. self._cache[instance] = value
  49. def __get__(self, instance, owner):
  50. if instance is None:
  51. raise Exception("can not use <{}> by <{}>".format(instance, owner))
  52. return self._cache.get(instance, dict())
  53. class QuerySetProxy(object):
  54. """
  55. 模拟 CustomQuerySet 主要处理分页的问题
  56. """
  57. lazyCount = LazyCount()
  58. def __init__(self, *iterable): # type:(Iterable[CustomQuerySet]) -> None
  59. """
  60. :param iterable:
  61. """
  62. self._iterable = iterable
  63. def _get_count(self, queryset):
  64. _count = self.lazyCount.get(queryset, dict()).get("count")
  65. if _count is None:
  66. _count = queryset.count()
  67. return _count
  68. def paginate(self, pageIndex, pageSize): # type:(int, int) -> QuerySetProxy
  69. """
  70. to CustomQuerySet paginate
  71. 分页的逻辑 不要使用count count 很慢
  72. :param pageIndex:
  73. :param pageSize:
  74. :return:
  75. """
  76. # 需要的数据总量
  77. total = pageSize * pageIndex
  78. new_iterable = list()
  79. for _queryset in self._iterable:
  80. _count = self._get_count(_queryset)
  81. # 不需要下一个queryset 参与
  82. if total <= _count:
  83. skip = max(0, total - pageSize)
  84. limit = pageSize if total - pageSize > 0 else total
  85. new_iterable.append(_queryset.skip(skip).limit(limit))
  86. break
  87. # 当前queryset 需要参与 后续的 queryset 也需要参与, 跨页
  88. elif _count < total < _count + pageSize:
  89. new_iterable.append(_queryset.skip(max(total - pageSize, 0)).limit(pageSize))
  90. total = total - _count
  91. # 当前的queryset不参与
  92. else:
  93. total = total - _count
  94. return self.__class__(*new_iterable)
  95. def _sum_and_count(self, queryset, field):
  96. cache_info = self.lazyCount
  97. if cache_info.get(queryset):
  98. _sum = cache_info.get(queryset).get("sum")
  99. _count = cache_info.get(queryset).get("count")
  100. else:
  101. _count, _sum = queryset.sum_and_count(field)
  102. cache_info.update({queryset: {"sum": _sum, "count": _count}})
  103. self.lazyCount = cache_info
  104. return _count, _sum
  105. def sum_and_count(self, field):
  106. """
  107. :param field: 需用统计字段field
  108. :return:
  109. """
  110. allCount, allSum = 0, Money(0)
  111. for _queryset in self._iterable:
  112. _count, _sum = self._sum_and_count(_queryset, field)
  113. allSum += Money(_sum)
  114. allCount += _count
  115. return allCount, allSum
  116. def count(self, with_limit_and_skip = False):
  117. cache_info = self.lazyCount
  118. allCount = 0
  119. for _queryset in self._iterable:
  120. if cache_info.get(_queryset):
  121. _count = cache_info.get(_queryset).get("count")
  122. else:
  123. _count = _queryset.count(with_limit_and_skip)
  124. cache_info.update({_queryset: {"count": _count}})
  125. self.lazyCount = cache_info
  126. allCount += _count
  127. return allCount
  128. def __iter__(self):
  129. """
  130. 从每一个 customQuerySet 中 分别取出数据
  131. :return:
  132. """
  133. for _querySet in self._iterable:
  134. for _item in _querySet:
  135. yield _item
  136. def first(self):
  137. for _queryset in self._iterable:
  138. record = _queryset.first()
  139. if record:
  140. logger.debug('find one record in {}'.format(_queryset._document._meta['db_alias']))
  141. return record
  142. else:
  143. logger.debug('find null record in {}'.format(_queryset._document._meta['db_alias']))
  144. return None
  145. def aggregate(self, *pipeline, **kwargs):
  146. """
  147. 在跨hotdata节点数据上, 会有重复, 调用方对重复数据做处理
  148. :param pipeline:
  149. :param kwargs:
  150. :return:
  151. """
  152. rv = []
  153. for _queryset in self._iterable:
  154. for item in _queryset.aggregate(*pipeline, **kwargs):
  155. rv.append(item)
  156. return rv
  157. class ModelProxy(object):
  158. """
  159. 只获取数据,是否应该获取数据有业务逻辑判断
  160. """
  161. _META_MODEL = None
  162. _DEFAULT_TIME_FIELD = "dateTimeAdded"
  163. def __init__(self, st, et, delta = None, query = None, timeField = None):
  164. # 字串时间转换为时间对象 方便节点分割
  165. self.st = self.str_to_datetime(st)
  166. self.et = self.str_to_datetime(et)
  167. self.reverse = True
  168. if delta is not None:
  169. if delta >= 0:
  170. self.et = self.et + datetime.timedelta(seconds = delta)
  171. self.reverse = False
  172. else:
  173. self.reverse = True
  174. self.st = self.st - datetime.timedelta(seconds = abs(delta))
  175. if self.st > self.et:
  176. raise ReportProxyValueError(u"查询的起始时间超过结束时间")
  177. self.query = query or self.default_query_set()
  178. self._timeField = timeField or self._DEFAULT_TIME_FIELD
  179. @classmethod
  180. def default_query_set(cls):
  181. return cls._META_MODEL.objects
  182. @classmethod
  183. def default_hot_data_start_day(cls):
  184. if cls._META_MODEL:
  185. return ArchivedModelProxyConfig.get_model_hot_data_start_day(cls._META_MODEL.__name__)
  186. else:
  187. return ArchivedModelProxyConfig.get_model_hot_data_start_day("default")
  188. @staticmethod
  189. def str_to_datetime(s):
  190. # 重新导入datetime 避免多线程引发 _strptime 错误
  191. import datetime
  192. try:
  193. if isinstance(s, datetime.datetime):
  194. return s
  195. else:
  196. return datetime.datetime.strptime(s, "%Y-%m-%d")
  197. except ValueError:
  198. raise ReportProxyValueError(u"错误的时间字符串, 格式为 {}".format("%Y-%m-%d"))
  199. except TypeError:
  200. raise ReportProxyTypeError(u"请输入时间字符串")
  201. @staticmethod
  202. def adapt_time_field_type(_time):
  203. return _time
  204. @classmethod
  205. def node_time(cls):
  206. """
  207. 获取节点时间的日期
  208. :return:
  209. """
  210. import datetime
  211. start_day = cls.default_hot_data_start_day()
  212. nodeTime = datetime.datetime.strptime(start_day, "%Y-%m-%d")
  213. if nodeTime > datetime.datetime.now():
  214. nodeTime = datetime.datetime.now()
  215. return nodeTime
  216. @property
  217. def db_alias(self):
  218. """
  219. 获取 原始的 db alias
  220. :return:
  221. """
  222. defaultDbAlias = self.query._document._origin_meta.get("db_alias")
  223. return defaultDbAlias
  224. def get_db_alias(self, timeStream):
  225. """
  226. 根据时间片生成的 db alias
  227. :return:
  228. """
  229. defaultDbAlias = self.db_alias
  230. if not timeStream.archived:
  231. return defaultDbAlias
  232. else:
  233. if defaultDbAlias.endswith(timeStream.year):
  234. return defaultDbAlias
  235. else:
  236. return "{}_{}".format(defaultDbAlias, timeStream.st.year)
  237. def split_time(self, st = None, et = None):
  238. import datetime
  239. st = st or self.st
  240. et = et or self.et
  241. nodeTime = self.node_time()
  242. timeStreamList = list()
  243. # 开始的时间在热数据区域
  244. if st >= nodeTime:
  245. timeStreamList.append(TimeStream(archived = False, st = st, et = et + datetime.timedelta(days = 1)))
  246. return timeStreamList
  247. # 时间节点在冷热交替的区域
  248. elif st < nodeTime < et:
  249. timeStreamList.append(TimeStream(archived = False, st = nodeTime, et = et + datetime.timedelta(days = 1)))
  250. return self.split_time(st = st, et = nodeTime - datetime.timedelta(days = 1)) + timeStreamList
  251. # 时间节点都在冷数据区域 这种情况就需要按照年份来区分
  252. elif et <= nodeTime:
  253. timeStreamList.append(TimeStream(archived = True, st = st, et = et + datetime.timedelta(days = 1)))
  254. return timeStreamList
  255. else:
  256. return timeStreamList
  257. def all(self, **filters):
  258. logger.info("ready to find <{}> data, filters is <{}>".format(self.__class__.__name__, filters))
  259. timeField = filters.pop("timeField", None) or self._timeField
  260. timeList = self.split_time()
  261. timeList.sort(key = lambda x: x.st, reverse = self.reverse)
  262. logger.debug('hit data time list is: {}'.format(str(timeList)))
  263. records = list()
  264. shard_filter = filters.pop('shard_filter', None)
  265. for timeStream in timeList:
  266. filters.update({
  267. "{}__gte".format(timeField): self.adapt_time_field_type(timeStream.st),
  268. "{}__lt".format(timeField): self.adapt_time_field_type(timeStream.et)
  269. })
  270. defaultDbAlias = self.db_alias
  271. dbAlias = defaultDbAlias if not timeStream.archived else "{}_his".format(defaultDbAlias)
  272. if dbAlias == defaultDbAlias:
  273. new_model_cls = self._META_MODEL
  274. sourceQuerySet = self._META_MODEL.objects # type: CustomQuerySet
  275. else:
  276. new_model_cls = copy_document_classes(
  277. self._META_MODEL, '{}_his'.format(self._META_MODEL.__name__),
  278. dbAlias)
  279. sourceQuerySet = new_model_cls.objects # type: CustomQuerySet
  280. logger.info(
  281. "dbAlias = {}, model = {}, filters = {}; shard_filter = {}".format(dbAlias, new_model_cls.__name__,
  282. filters, shard_filter))
  283. shard_key = new_model_cls._meta.get('shard_key', tuple())
  284. if shard_key:
  285. if shard_filter:
  286. filters.update(shard_filter)
  287. for _field in shard_key[0:-1]:
  288. if _field not in filters and '{}__in'.format(_field) not in filters:
  289. logger.warning(
  290. 'query in {}, filters = {}, has no shard key.'.format(new_model_cls.__name__, filters))
  291. records.append(
  292. self._combine_queryset(sourceQuerySet, **filters).order_by("-{}".format(self._DEFAULT_TIME_FIELD)))
  293. if len(records) > 0:
  294. return QuerySetProxy(*records)
  295. else:
  296. logger.error("bad time split, start time is <{}>, end time is <{}>".format(self.st, self.et))
  297. return self._META_MODEL.objects.none()
  298. @staticmethod
  299. def _combine_queryset(queryset, **filters):
  300. # type:(CustomQuerySet, dict) -> CustomQuerySet
  301. """
  302. 整合 过滤筛选条件 将only 以及 search等字段处理掉
  303. searchKey: 搜索字符串
  304. only: 查询有限的字段 list ['_id', "dateTimeAdded"...]
  305. hint: 指定索引字段
  306. :param filters:
  307. :return:
  308. """
  309. queryset_new = queryset.clone()
  310. _searchKey = filters.pop("searchKey", None)
  311. if _searchKey:
  312. queryset_new = queryset_new.search(_searchKey)
  313. _only = filters.pop("only", None)
  314. if _only:
  315. queryset_new = queryset_new.only(*_only)
  316. _exclude = filters.pop('exclude', None)
  317. if _exclude:
  318. queryset_new = queryset_new.exclude(*_exclude)
  319. _hint = filters.pop("hint", None)
  320. if _hint:
  321. queryset_new = queryset_new.hint(_hint)
  322. return queryset_new.filter(**filters)
  323. @classmethod
  324. def get_one(cls, startTime = None, endTime = None, **kwargs):
  325. # type:(str, str, Dict[str, Any]) -> Optional[ConsumeRecord, RechargeRecord, DealerDailyStat, GroupDailyStat, DeviceDailyStat, GroupReport, DealerReport, DevReport]
  326. """
  327. 获取 单个记录 时间片用户获取数据的数据库 通过主键获取
  328. :param kwargs:
  329. :param startTime:
  330. :param endTime:
  331. :return:
  332. """
  333. import datetime
  334. try:
  335. delta = None
  336. if 'id' in kwargs:
  337. endTime = ObjectId(kwargs.get('id')).generation_time.astimezone(tz.gettz(settings.TIME_ZONE)).replace(
  338. tzinfo = None)
  339. startTime = endTime
  340. endTime = endTime + datetime.timedelta(seconds = 60)
  341. delta = -3 * 24 * 60 * 60
  342. elif 'foreign_id' in kwargs:
  343. startTime = ObjectId(kwargs.pop('foreign_id')).generation_time.astimezone(
  344. tz.gettz(settings.TIME_ZONE)).replace(tzinfo = None)
  345. endTime = datetime.datetime.now()
  346. delta = 0
  347. else:
  348. if not startTime and not endTime:
  349. startTime = Const.QUERY_START_DATE
  350. endTime = datetime.datetime.now().strftime('%Y-%m-%d')
  351. elif not startTime:
  352. startTime = Const.QUERY_START_DATE
  353. else:
  354. endTime = datetime.datetime.now().strftime('%Y-%m-%d')
  355. delta = 0
  356. query_set_proxy = cls(st = startTime, et = endTime, delta = delta).all(**kwargs) # type: QuerySetProxy
  357. return query_set_proxy.first()
  358. except ValidationError:
  359. logger.error("get one record kwargs=<{}>, time is <{}-{}> error pk".format(kwargs, startTime, endTime))
  360. record = None
  361. except DoesNotExist:
  362. logger.error("get one record kwargs=<{}>, time is <{}-{}> not exist pk".format(kwargs, startTime, endTime))
  363. record = None
  364. except Exception as e:
  365. logger.exception(
  366. "get one record kwargs=<{}>, time is <{} to {}> error {}".format(kwargs, startTime, endTime, e))
  367. record = None
  368. return record
  369. @classmethod
  370. def get_data_list(cls, startTime = None, endTime = None,
  371. **kwargs): # type:(str, str, Dict[str, Any]) -> CustomQuerySet
  372. """
  373. 类似于filter 的实现
  374. 有可能会涉及多个数据库的查询,因此首先判断下节点时间 根据节点时间的不通决定是否需要多个数据库的访问
  375. :param startTime:
  376. :param endTime:
  377. :param kwargs:
  378. :return:
  379. """
  380. import datetime
  381. delta = None
  382. # 如果有id列表, 则通过ID获取最小和最大时间
  383. if 'id__in' in kwargs:
  384. id_list = kwargs.get('id__in')
  385. _time_list = []
  386. for _id in id_list:
  387. _time = ObjectId(_id).generation_time.astimezone(tz.gettz(settings.TIME_ZONE)).replace(tzinfo = None)
  388. _time_list.append(_time)
  389. startTime = min(_time_list)
  390. endTime = max(_time_list)
  391. delta = -3 * 24 * 60 * 60
  392. if not startTime and not endTime:
  393. startTime = Const.QUERY_START_DATE
  394. endTime = datetime.datetime.now().strftime('%Y-%m-%d')
  395. elif not startTime:
  396. startTime = Const.QUERY_START_DATE
  397. elif not endTime:
  398. endTime = datetime.datetime.now().strftime('%Y-%m-%d')
  399. delta = 0
  400. return cls(st = startTime, et = endTime, delta = delta).all(**kwargs)
  401. @classmethod
  402. def current_db_proxy(cls):
  403. return cls(st = cls.node_time(), et = datetime.datetime.now())
  404. class DealerDailyStatsModelProxy(ModelProxy):
  405. _DEFAULT_TIME_FIELD = "date"
  406. _META_MODEL = DealerDailyStat
  407. @staticmethod
  408. def adapt_time_field_type(_time):
  409. return datetime.datetime.strftime(_time, "%Y-%m-%d")
  410. @staticmethod
  411. def income_statistic_key(sourceName):
  412. return "daily.income.{}".format(sourceName)
  413. @staticmethod
  414. def consumption_statistic_key(sourceName):
  415. return "daily.consumption.{}".format(sourceName)
  416. @classmethod
  417. def get_stats_as_month(cls, dealerIds, startDay, endDay, project, aggregateMap):
  418. # type:(List[str], str, str, tuple, dict) -> dict
  419. pipeline = DealerDailyStat.prepare_group_pipeline(project = project)
  420. groupKey = DealerDailyStat.month_group_key()
  421. groupKey.update(aggregateMap)
  422. _group = {
  423. "$group": groupKey
  424. }
  425. pipeline.append(_group)
  426. rv = {}
  427. proxies = cls(st = startDay, et = endDay)
  428. items = proxies.all(dealerId__in = dealerIds).aggregate(*pipeline)
  429. for item in items:
  430. key = "{year}-{month:02d}".format(**item.get("_id"))
  431. value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))])
  432. rv[key] = value
  433. return rv
  434. @classmethod
  435. def get_stats_as_day(cls, dealerIds, startDay, endDay, project, aggregateMap):
  436. # type:(List[str], str, str, tuple, dict) -> dict
  437. pipeline = DealerDailyStat.prepare_group_pipeline(project = project, hasDay = True)
  438. groupKey = DealerDailyStat.day_group_key()
  439. groupKey.update(aggregateMap)
  440. _group = {
  441. "$group": groupKey
  442. }
  443. pipeline.append(_group)
  444. rv = collections.OrderedDict()
  445. for day in get_date_range(startDay, endDay):
  446. rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict()
  447. proxies = cls(st = startDay, et = endDay)
  448. items = proxies.all(dealerId__in = dealerIds).aggregate(*pipeline)
  449. for item in items:
  450. key = "{year}-{month:02d}-{day:02d}".format(**item.get("_id"))
  451. value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))])
  452. rv[key] = value
  453. return rv
  454. @classmethod
  455. def get_stats_as_dealer(cls, dealerIds, startDay, endDay, project, aggregateMap):
  456. pipeline = DealerDailyStat.prepare_group_pipeline(project = project, needTime = False)
  457. groupKey = DealerDailyStat.dealer_group_key()
  458. groupKey.update(aggregateMap)
  459. _group = {
  460. "$group": groupKey
  461. }
  462. pipeline.append(_group)
  463. query_set_proxy = cls(st = startDay, et = endDay).all(dealerId__in = dealerIds) # type: QuerySetProxy
  464. items = query_set_proxy.aggregate(*pipeline)
  465. rv = {}
  466. for item in items:
  467. key = str(item.get('_id'))
  468. value = merge_with(sum_fn, [keyfilter(not_aggreate_id, item), keyfilter(not_aggreate_id, rv.get(key, {}))])
  469. rv[key] = value
  470. return rv
  471. @classmethod
  472. def get_total_income_as_dealer(cls, dealerIds, startDay, endDay):
  473. return cls.get_stats_as_dealer(
  474. dealerIds = dealerIds,
  475. startDay = startDay,
  476. endDay = endDay,
  477. project = ('dealerId', 'daily.totalIncome',),
  478. aggregateMap = {
  479. 'totalIncome': {'$sum': '$daily.totalIncome'}
  480. })
  481. @classmethod
  482. def get_one_month_income(cls, dealerId, monthStr):
  483. # type:(ObjectId, str) -> RMB
  484. """
  485. 获取经销商本月的收益总额 按照之前的逻辑 只有线上的收益才是收益 也就是 充值+充卡+充虚拟卡+广告
  486. :param dealerId:
  487. :param monthStr:
  488. :return:
  489. """
  490. startDay, endDay = get_start_and_end_by_month(monthStr = monthStr)
  491. items = cls.get_stats_as_month(dealerIds = [dealerId],
  492. startDay = startDay,
  493. endDay = endDay,
  494. project = ('daily.totalIncome',),
  495. aggregateMap = {"totalIncome": {"$sum": "$daily.totalIncome"}})
  496. return RMB(items.get(monthStr, {'totalIncome': Quantity(0)}).get('totalIncome'))
  497. @classmethod
  498. def get_consume_as_month(cls, dealerIds, startDay, endDay, sources = None):
  499. if not sources:
  500. sources = DEALER_CONSUMPTION_AGG_KIND.choices()
  501. aggregateMap = {
  502. format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)} for _sourceName in sources
  503. }
  504. return cls.get_stats_as_month(dealerIds = dealerIds,
  505. startDay = startDay,
  506. endDay = endDay,
  507. project = ('daily.consumption',),
  508. aggregateMap = aggregateMap)
  509. @classmethod
  510. def get_one_year_consume_as_month(cls, dealerId, yearStr):
  511. # type:(ObjectId, str) -> dict
  512. """
  513. 通过月份 分组 聚合一年的收益信息
  514. :param dealerId:
  515. :param yearStr:
  516. :return:
  517. """
  518. aggregateMap = {format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)} for _sourceName in
  519. DEALER_CONSUMPTION_AGG_KIND.choices()}
  520. startDay, endDay = get_start_and_end_by_year(yearStr)
  521. return cls.get_stats_as_month([dealerId], startDay, endDay, ('daily.consumption',), aggregateMap)
  522. @classmethod
  523. def get_one_year_income_as_month(cls, dealerId, yearStr):
  524. # type:(ObjectId, str) -> dict
  525. aggregateMap = {format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)} for _sourceName in
  526. DEALER_INCOME_SOURCE.choices()}
  527. aggregateMap.update({"totalIncome": {"$sum": "$daily.totalIncome"}})
  528. startDay, endDay = get_start_and_end_by_year(yearStr)
  529. return cls.get_stats_as_month([dealerId], startDay, endDay, ('daily.totalIncome', 'daily.income'), aggregateMap)
  530. @classmethod
  531. def get_days_income_stat(cls, dealerId, monthStr):
  532. startDay, endDay = get_start_and_end_by_month(monthStr)
  533. endDay = min(datetime.date.today().strftime("%Y-%m-%d"), endDay)
  534. statisticRecords = DealerDailyStatsModelProxy.get_data_list(
  535. startTime = startDay, endTime = endDay, dealerId = dealerId,
  536. exclude = ('origin', '_id', 'hourly', 'dealerId', 'daily.consumption'))
  537. rv = collections.OrderedDict()
  538. for day in get_date_range(startDay, endDay):
  539. rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict()
  540. for _stat in statisticRecords: # type: DealerDailyStat
  541. rv.update({_stat['date']: _stat.daily})
  542. return rv
  543. @classmethod
  544. def get_days_consume_stat(cls, dealerId, monthStr):
  545. startDay, endDay = get_start_and_end_by_month(monthStr)
  546. endDay = min(datetime.date.today().strftime("%Y-%m-%d"), endDay)
  547. statisticRecords = DealerDailyStatsModelProxy.get_data_list(
  548. startTime = startDay, endTime = endDay, dealerId = dealerId,
  549. exclude = ('_id', 'dealerId', 'hourly', 'origin', 'daily.income', 'daily.totalIncome'))
  550. rv = collections.OrderedDict()
  551. for day in get_date_range(startDay, endDay):
  552. rv[datetime.datetime.strftime(day, "%Y-%m-%d")] = dict()
  553. for _stat in statisticRecords: # type: DealerDailyStat
  554. rv.update({_stat['date']: _stat.daily})
  555. return rv
  556. class GroupDailyStatsModelProxy(ModelProxy):
  557. _DEFAULT_TIME_FIELD = "date"
  558. _META_MODEL = GroupDailyStat
  559. @staticmethod
  560. def adapt_time_field_type(_time):
  561. return datetime.datetime.strftime(_time, "%Y-%m-%d")
  562. @classmethod
  563. def get_groups_statistic(cls, groupIds, startDate, endDate, statisticKey):
  564. if not groupIds:
  565. return []
  566. statisticKey.update({"_id": "$groupId"})
  567. resultDict = dict()
  568. for _item in cls(st = startDate, et = endDate).all(
  569. groupId__in = [ObjectId(_id) for _id in groupIds]).aggregate({"$group": statisticKey}):
  570. groupId = _item.pop("_id", None)
  571. resultDict[str(groupId)] = merge_with(sum_fn, [_item, resultDict.get(str(groupId), {})])
  572. return resultDict
  573. @classmethod
  574. def get_groups_income_statistic(cls, groupIds, startDate, endDate, dealerId = None):
  575. # type:(list, str, str, str) -> dict
  576. """
  577. 获取地址组的收益统计
  578. :param groupIds:
  579. :param startDate:
  580. :param endDate:
  581. :param dealerId: TODO 临时加的参数,对于这种map数据的扁平化查询 由于mongo版本暂不支持 unwind,后续改可优化
  582. :return:
  583. """
  584. statisticKey = {
  585. format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)}
  586. for _sourceName in DEALER_INCOME_SOURCE.choices()
  587. }
  588. if dealerId:
  589. statisticKey.update({
  590. "dealerActualIncome": {"$sum": "$daily.incomeMap.{}".format(dealerId)}
  591. })
  592. statisticKey.update({"totalIncome": {"$sum": "$daily.totalIncome"}})
  593. return cls.get_groups_statistic(groupIds, startDate, endDate, statisticKey)
  594. @classmethod
  595. def get_groups_consumption_statistic(cls, groupIds, startDate, endDate):
  596. statisticKey = {
  597. format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)}
  598. for _sourceName in DEALER_CONSUMPTION_AGG_KIND.choices()
  599. }
  600. return cls.get_groups_statistic(groupIds, startDate, endDate, statisticKey)
  601. @classmethod
  602. def get_one_group_income_statistic(cls, groupId, startDate, endDate, dealerId = None):
  603. return cls.get_groups_income_statistic([groupId], startDate, endDate, dealerId).get(groupId)
  604. @classmethod
  605. def get_one_group_consumption_statistic(cls, groupId, startDate, endDate):
  606. return cls.get_groups_consumption_statistic([groupId], startDate, endDate).get(groupId)
  607. class DeviceDailyStatsModelProxy(ModelProxy):
  608. _DEFAULT_TIME_FIELD = "date"
  609. _META_MODEL = DeviceDailyStat
  610. @staticmethod
  611. def adapt_time_field_type(_time):
  612. return datetime.datetime.strftime(_time, "%Y-%m-%d")
  613. @classmethod
  614. def get_devices_statistic(cls, logicalCodes, startDate, endDate, statisticKey):
  615. proxies = cls(st = startDate, et = endDate)
  616. statisticKey.update({"_id": "$logicalCode"})
  617. resultDict = dict()
  618. for _item in proxies.all(logicalCode__in = logicalCodes).aggregate({"$group": statisticKey}):
  619. logicalCode = str(_item.pop("_id", None))
  620. resultDict[str(logicalCode)] = merge_with(sum_fn, [_item, resultDict.get(str(logicalCode), {})])
  621. return resultDict
  622. @classmethod
  623. def get_devices_income_statistic(cls, logicalCodes, startDate, endDate):
  624. """
  625. 获取设备的收益统计
  626. :param logicalCodes:
  627. :param startDate:
  628. :param endDate:
  629. :return:
  630. """
  631. statisticKey = {
  632. format(_sourceName): {"$sum": "$daily.income.{}".format(_sourceName)}
  633. for _sourceName in DEALER_INCOME_SOURCE.choices()
  634. }
  635. return cls.get_devices_statistic(logicalCodes, startDate, endDate, statisticKey)
  636. @classmethod
  637. def get_devices_consumption_statistic(cls, logicalCodes, startDate, endDate):
  638. """
  639. 获取设备的消费统计
  640. :param logicalCodes:
  641. :param startDate:
  642. :param endDate:
  643. :return:
  644. """
  645. statisticKey = {
  646. format(_sourceName): {"$sum": "$daily.consumption.{}".format(_sourceName)}
  647. for _sourceName in DEALER_CONSUMPTION_AGG_KIND.choices()
  648. }
  649. return cls.get_devices_statistic(logicalCodes, startDate, endDate, statisticKey)
  650. class ClientRechargeModelProxy(ModelProxy):
  651. _META_MODEL = RechargeRecord
  652. class ClientConsumeModelProxy(ModelProxy):
  653. _META_MODEL = ConsumeRecord
  654. @classmethod
  655. def get_not_finished_record(cls, ownerId, openId, devTypeCode, **kwargs):
  656. """
  657. 获取用户在该经销商处尚未结束的订单
  658. :param ownerId:
  659. :param openId:
  660. :param devTypeCode:
  661. :return:
  662. """
  663. return cls.get_one(ownerId = ownerId,
  664. openId = openId,
  665. devTypeCode = devTypeCode,
  666. status__ne = "finished",
  667. isNormal = True,
  668. **kwargs)
  669. class ClientDealerIncomeModelProxy(ModelProxy):
  670. _META_MODEL = DealerIncomeProxy
  671. @classmethod
  672. def get_one(cls, startTime = None, endTime = None, **kwargs): # type:(str, str, dict) -> DealerIncomeProxy
  673. if 'ref_id' in kwargs:
  674. return super(ClientDealerIncomeModelProxy, cls).get_one(foreign_id = str(kwargs.get('ref_id')), **kwargs)
  675. else:
  676. return super(ClientDealerIncomeModelProxy, cls).get_one(startTime = startTime, endTime = endTime, **kwargs)
  677. class GroupReportModelProxy(ModelProxy):
  678. _DEFAULT_TIME_FIELD = "date"
  679. _META_MODEL = GroupReport
  680. @staticmethod
  681. def adapt_time_field_type(_time):
  682. return datetime.datetime.strftime(_time, "%Y-%m-%d")
  683. class DealerReportModelProxy(ModelProxy):
  684. _DEFAULT_TIME_FIELD = "date"
  685. _META_MODEL = DealerReport
  686. @staticmethod
  687. def adapt_time_field_type(_time):
  688. return datetime.datetime.strftime(_time, "%Y-%m-%d")
  689. @classmethod
  690. def get_year_by_month(cls, dealerId, yearStr):
  691. # type:(str, str) -> dict
  692. pipeline = cls._META_MODEL.prepare_group_pipeline()
  693. groupKey = cls._META_MODEL.month_group_key()
  694. groupKey.update({'lineCoins': {'$sum': '$rpt.lineCoins'}, 'count': {'$sum': '$rpt.count'}})
  695. _group = {
  696. "$group": groupKey
  697. }
  698. pipeline.append(_group)
  699. rv = {}
  700. startDay, endDay = get_start_and_end_by_year(yearStr)
  701. now_month = datetime.datetime.now().month
  702. proxies = cls(st = startDay, et = endDay)
  703. items = proxies.all(ownerId = dealerId, type = 'day').aggregate(*pipeline)
  704. for item in items:
  705. key = "{year}-{month:02d}".format(**item.get("_id"))
  706. value = {
  707. 'count': item.get('count', 0),
  708. 'lineCoins': item.get('lineCoins', 0)
  709. }
  710. if int(now_month) == int(item.get('_id').get('month')):
  711. todayRpt = Accounting.getOwnerIncome(dealerId, datetime.datetime.now())
  712. value['count'] = value['count'] + todayRpt.get('count', 0)
  713. value['lineCoins'] = value['lineCoins'] + todayRpt.get('lineCoins', 0)
  714. rv[key] = value
  715. return rv
  716. class DevReportModelProxy(ModelProxy):
  717. _DEFAULT_TIME_FIELD = "date"
  718. _META_MODEL = DevReport
  719. @staticmethod
  720. def adapt_time_field_type(_time):
  721. return datetime.datetime.strftime(_time, "%Y-%m-%d")