utils.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import logging
  4. from calendar import monthrange
  5. from collections import defaultdict
  6. from functools import partial
  7. import cytoolz
  8. from bson.decimal128 import Decimal128
  9. from bson.objectid import ObjectId
  10. from cytoolz.dicttoolz import get_in, merge_with
  11. from mongoengine.document import Document
  12. from typing import TYPE_CHECKING, Tuple, Optional, Dict, Iterable, Iterator, Union, Callable, List
  13. from apilib.monetary import RMB, VirtualCoin
  14. from apilib.quantity import Quantity
  15. from apps.web.constant import (
  16. DEALER_CONSUMPTION_AGG_KIND_TRANSLATION,
  17. DEALER_CONSUMPTION_AGG_KIND_UNIT,
  18. DEALER_CONSUMPTION_AGG_KIND_UNIT_PRECISION,
  19. DEALER_CONSUMPTION_AGG_KIND, PARTITION_ROLE)
  20. from apps.web.dealer.define import DEALER_INCOME_SOURCE_TRANSLATION
  21. from apps.web.device.models import Group
  22. from apps.web.report.models import DealerDailyStat, GroupDailyStat, DeviceDailyStat, \
  23. DailyStat
  24. logger = logging.getLogger(__name__)
  25. from apps.web.dealer.proxy import DealerIncomeProxy
  26. if TYPE_CHECKING:
  27. from apps.web.user.models import ConsumeRecord
  28. def upsert_stat(model, query, params, identifier = '', log_error = True):
  29. # type: (Document, dict, dict, Optional[str], Optional[bool])->bool
  30. """
  31. :param model:
  32. :param query:
  33. :param params:
  34. :param identifier:
  35. :param log_error:
  36. :return:
  37. """
  38. result = model._get_collection().update(query, params, upsert = True)
  39. if not bool(result['n']):
  40. if log_error:
  41. logger.error(
  42. 'session(%s) update failed, model=%r, query=%r, params=%r' % (identifier, model, query, params))
  43. return False
  44. return True
  45. def record_consumption_stats(record, check = False, allowed = None):
  46. # type:(ConsumeRecord, bool, Optional[dict, None])->None
  47. """
  48. 记录消费 之前的设备 启动的时候会记录一次金币 结束的时候又会记录一次 导致用户的消费信息实际上是 记录了两倍
  49. :param record: ConsumeRecord 消费记录
  50. :param check:
  51. :param allowed
  52. :return:
  53. """
  54. def records_already_handled(queryDict, model):
  55. queryDict.update({'origin.consumption': record.id})
  56. return model.get_collection().find(queryDict).count() > 0
  57. if not allowed: allowed = {'dealer': True, 'device': True, 'group': True}
  58. try:
  59. RECORD_SESSION_ID = str(record.id)
  60. _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID)
  61. dt = record.dateTimeAdded
  62. hour = dt.hour
  63. report_daily_date = dt.strftime('%Y-%m-%d')
  64. def build_query(initial, extra):
  65. # type: (dict, dict)->dict
  66. rv = initial.copy()
  67. rv.update(extra)
  68. return rv
  69. # 构建的查询条件
  70. build_daily_query = partial(build_query, {'date': report_daily_date})
  71. def build_daily_update(kind_amount_map):
  72. kam = kind_amount_map.copy()
  73. # kam.setdefault('coin', record.coin)
  74. rv = defaultdict(dict)
  75. for kind, amount in kam.iteritems():
  76. rv['$inc']['daily.consumption.{kind}'.format(kind = kind)] = Decimal128(str(amount))
  77. rv['$inc']['hourly.{hour}.consumption.{kind}'.format(hour = hour, kind = kind)] = Decimal128(
  78. str(amount))
  79. rv.update(
  80. {
  81. '$addToSet': {'origin.consumption': record.id},
  82. }
  83. )
  84. # 把消费记录的条数也记录下来
  85. rv['$inc']['daily.totalConsumptionCount'] = 1
  86. return rv
  87. dealerIds = [ObjectId(record.ownerId)] + [ObjectId(_['id']) for _ in
  88. Group.get_group(record.groupId)['partnerDict'].values()]
  89. # 统计经销商的数据
  90. if 'dealer' in allowed and allowed['dealer']:
  91. dealer_query = build_daily_query({'dealerId': {'$in': dealerIds}})
  92. if not check or not records_already_handled(dealer_query, DealerDailyStat):
  93. for dealerId in dealerIds:
  94. dealer_daily_query = build_daily_query({'dealerId': ObjectId(dealerId)})
  95. dealer_daily_update = build_daily_update(record.aggInfo)
  96. _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update)
  97. else:
  98. logger.debug('skipping %r for DealerDailyStat, already recorded' % (record,)) if check else None
  99. # 统计组地址的数据
  100. if 'group' in allowed and allowed['group']:
  101. group_query = build_daily_query({'groupId': ObjectId(record.groupId)})
  102. if not check or not records_already_handled(group_query, GroupDailyStat):
  103. group_daily_query = build_daily_query({'groupId': ObjectId(record.groupId)})
  104. group_daily_update = build_daily_update(record.aggInfo)
  105. _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update)
  106. else:
  107. logger.debug('skipping %r for GroupDailyStat, already recorded' % (record,)) if check else None
  108. # 统计设备的数据
  109. if 'device' in allowed and allowed['device']:
  110. device_query = build_daily_query({'logicalCode': record.logicalCode})
  111. if not check or not records_already_handled(device_query, DeviceDailyStat):
  112. device_daily_query = build_daily_query({'logicalCode': record.logicalCode})
  113. device_daily_update = build_daily_update(record.aggInfo)
  114. _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update)
  115. else:
  116. logger.debug('skipping %r for DeviceDailyStat, already recorded' % (record,)) if check else None
  117. except Exception as e:
  118. logger.error('record_consumption_stats failed, record=%r' % (record,))
  119. logger.exception(e)
  120. def update_consumption_states(record, refund_coin, allowed = None):
  121. # type: (ConsumeRecord, VirtualCoin, Optional[None, dict]) -> None
  122. pass
  123. # 这个只在脚本里面有使用 应该是 历史数据的整合
  124. def record_income_stats(proxy, check = False, allowed = None):
  125. # type:(DealerIncomeProxy, bool, Optional[None, dict])->None
  126. """
  127. 记录收益
  128. 针对地址的统计 将收益的每一笔分账都统计到该记录的经销商上
  129. :param proxy: 收益划分的详情
  130. :param check: 是否需要check
  131. :param allowed 允许统计的模型
  132. :return:
  133. """
  134. if not allowed:
  135. allowed = {'dealer': True, 'device': True, 'group': True}
  136. try:
  137. assert isinstance(proxy, DealerIncomeProxy), 'proxy has to be a DealerIncomeProxy'
  138. RECORD_SESSION_ID = str(proxy.id)
  139. _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID)
  140. def proxy_already_handled(queryDict, model):
  141. queryDict.update({'origin.income': proxy.id})
  142. return model.get_collection().find(queryDict).count() > 0
  143. dt = proxy.dateTimeAdded
  144. hour = dt.hour
  145. report_daily_date = dt.strftime('%Y-%m-%d')
  146. def build_query(initial, extra):
  147. # type: (dict, dict)->dict
  148. rv = initial.copy()
  149. rv.update(extra)
  150. return rv
  151. build_daily_query = partial(build_query, {'date': report_daily_date})
  152. def build_daily_update(amount):
  153. return {
  154. '$inc': {
  155. 'daily.income.{source}'.format(source = proxy.source): amount,
  156. 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): amount,
  157. 'daily.totalIncome': amount,
  158. 'daily.totalIncomeCount': 1
  159. },
  160. '$addToSet': {'origin.income': proxy.id}
  161. }
  162. if 'dealer' in allowed and allowed['dealer']:
  163. dealerIds_query = build_daily_query({'dealerId': {'$in': proxy.dealerIds}})
  164. if not check or not proxy_already_handled(dealerIds_query, DealerDailyStat):
  165. for dealerId in proxy.dealerIds:
  166. dealer_daily_query = build_daily_query({'dealerId': dealerId})
  167. dealer_daily_update = build_daily_update(proxy.actualAmountMap[str(dealerId)])
  168. _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update)
  169. else:
  170. logger.debug('skipping %r for DealerDailyStat, already recorded' % (proxy,)) if check else None
  171. # : group
  172. if 'group' in allowed and allowed['group']:
  173. group_query = build_daily_query({'groupId': proxy.groupId})
  174. if not check or not proxy_already_handled(group_query, GroupDailyStat):
  175. group_daily_query = build_daily_query({'groupId': proxy.groupId})
  176. group_daily_update = build_daily_update(proxy.totalAmount)
  177. _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update)
  178. else:
  179. logger.debug('skipping %r for GroupDailyStat, already recorded' % (proxy,)) if check else None
  180. #: device
  181. if 'device' in allowed and allowed['device']:
  182. device_query = build_daily_query({'logicalCode': proxy.logicalCode})
  183. if not check or not proxy_already_handled(device_query, DeviceDailyStat):
  184. device_daily_query = build_daily_query({'logicalCode': proxy.logicalCode})
  185. device_daily_update = build_daily_update(proxy.totalAmount)
  186. _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update)
  187. else:
  188. logger.debug('skipping %r for DeviceDailyStat, already recorded' % (proxy,)) if check else None
  189. except Exception as e:
  190. logger.error('record_income_stats failed, proxy=%r' % (proxy,))
  191. logger.exception(e)
  192. def update_income_stats(proxy, refund_partition, refund_fee, allowed = None):
  193. # type:(DealerIncomeProxy, dict, RMB, dict)->None
  194. """
  195. 现金退费的时候会使用 主要处理从收益中将现金退款的部分减除掉
  196. :param proxy:
  197. :param refund_partition:
  198. :param refund_fee:
  199. :param allowed:
  200. :return:
  201. """
  202. if not allowed: allowed = {'dealer': True, 'device': True, 'group': True}
  203. try:
  204. assert isinstance(proxy, DealerIncomeProxy), 'proxy has to be a DealerIncomeProxy'
  205. RECORD_SESSION_ID = str(proxy.id)
  206. _upsert_stat = partial(upsert_stat, identifier = RECORD_SESSION_ID)
  207. def proxy_already_handled(queryDict, model):
  208. queryDict.update({'origin.income': proxy.id})
  209. return model.get_collection().find(queryDict).count() > 0
  210. dt = proxy.dateTimeAdded
  211. hour = dt.hour
  212. report_daily_date = dt.strftime('%Y-%m-%d')
  213. def build_query(initial, extra):
  214. # type: (dict, dict)->dict
  215. rv = initial.copy()
  216. rv.update(extra)
  217. return rv
  218. build_daily_query = partial(build_query, {'date': report_daily_date})
  219. def build_daily_update(amount):
  220. refund = (-amount).mongo_amount
  221. return {
  222. '$inc': {
  223. 'daily.income.{source}'.format(source = proxy.source): refund,
  224. 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): refund,
  225. 'daily.totalIncome': refund
  226. }
  227. }
  228. # def build_monthly_update(amount):
  229. # refund = (-amount).mongo_amount
  230. #
  231. # return {
  232. # '$inc': {
  233. # 'daily.{day}.income.{source}'.format(day = day, source = proxy.source): refund,
  234. # 'daily.{day}.totalIncome'.format(day = day): refund,
  235. # 'monthly.income.{source}'.format(source = proxy.source, ): refund,
  236. # 'monthly.totalIncome': refund
  237. # }
  238. # }
  239. if 'dealer' in allowed and allowed['dealer']:
  240. dealerIds_query = build_daily_query({'dealerId': {'$in': proxy.dealerIds}})
  241. if proxy_already_handled(dealerIds_query, DealerDailyStat):
  242. for item in refund_partition:
  243. if item['role'] == PARTITION_ROLE.AGENT:
  244. continue
  245. dealer_daily_query = build_daily_query({'dealerId': ObjectId(item['id'])})
  246. dealer_daily_update = build_daily_update(item['amount'])
  247. _upsert_stat(DealerDailyStat, dealer_daily_query, dealer_daily_update)
  248. else:
  249. logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy)
  250. # : group
  251. if 'group' in allowed and allowed['group']:
  252. group_query = build_daily_query({'groupId': proxy.groupId})
  253. if proxy_already_handled(group_query, GroupDailyStat):
  254. group_daily_query = build_daily_query({'groupId': proxy.groupId})
  255. group_daily_update = build_daily_update(refund_fee)
  256. _upsert_stat(GroupDailyStat, group_daily_query, group_daily_update)
  257. else:
  258. logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy)
  259. #: device
  260. if 'device' in allowed and allowed['device']:
  261. device_query = build_daily_query({'logicalCode': proxy.logicalCode})
  262. if proxy_already_handled(device_query, DeviceDailyStat):
  263. device_daily_query = build_daily_query({'logicalCode': proxy.logicalCode})
  264. device_daily_update = build_daily_update(refund_fee)
  265. _upsert_stat(DeviceDailyStat, device_daily_query, device_daily_update)
  266. else:
  267. logger.debug('skipping for DealerDailyStat, not already recorded %s' % proxy)
  268. except Exception as e:
  269. logger.error('record_income_stats failed, proxy=%r' % (proxy,))
  270. logger.exception(e)
  271. # 同 get_consumption_stats
  272. def consumption_unit_precision(kind):
  273. # type:(str)->str
  274. return DEALER_CONSUMPTION_AGG_KIND_UNIT_PRECISION.get(kind, '0.01')
  275. def to_quantity(kind, value):
  276. # type: (str, Union[Decimal128, float, int, Quantity])->Quantity
  277. return Quantity(value, places = consumption_unit_precision(kind))
  278. def transfer_consumption_item(item):
  279. # type: (tuple)->Tuple[str, Quantity]
  280. key, value = item
  281. return key, to_quantity(kind = key, value = value)
  282. def get_month_range(year, month):
  283. # type: (int, int)->Tuple[str, str]
  284. """
  285. :param year:
  286. :param month:
  287. :return:
  288. """
  289. DATE_FMT_TEXT = '{year}-{month}-{day}'
  290. _, end = monthrange(year = int(year), month = int(month))
  291. return (
  292. # start
  293. DATE_FMT_TEXT.format(year = year, month = month, day = 1),
  294. # end
  295. DATE_FMT_TEXT.format(year = year, month = month, day = end)
  296. )
  297. ##
  298. ## Majorly for views
  299. ##
  300. def translate_consumption(mapping, hides = []):
  301. show_kinds = set(DEALER_CONSUMPTION_AGG_KIND_TRANSLATION.keys())
  302. show_kinds = show_kinds - set(hides)
  303. if not mapping:
  304. return [consumption_map(kind, 0) for kind in show_kinds]
  305. else:
  306. return [consumption_map(kind, amount) for kind, amount in mapping.iteritems() if kind in show_kinds]
  307. def translate_consumption_stats(items, source = None, hides = []):
  308. # type:(Iterator, Optional[None, str], Optional(list))->list
  309. show_kinds = set(DEALER_CONSUMPTION_AGG_KIND.choices())
  310. show_kinds = show_kinds - set(hides)
  311. rv = []
  312. for item_source, item_value in items:
  313. if item_source not in DEALER_CONSUMPTION_AGG_KIND.choices():
  314. logger.error('invalid source <{}>'.format(item_source))
  315. continue
  316. if not source or source == item_source:
  317. rv.append('{kind} {value} {unit}'.format(
  318. kind = DEALER_CONSUMPTION_AGG_KIND_TRANSLATION[item_source],
  319. value = item_value,
  320. unit = DEALER_CONSUMPTION_AGG_KIND_UNIT[item_source]
  321. ))
  322. return rv
  323. def daily_stat_sum_on_field(income_or_consumption, field_name, stats, initial_type):
  324. # type: (str, str, Iterable, Union[type(RMB), type(Quantity)])->Union[RMB, Quantity]
  325. initial = initial_type.initial()
  326. return sum(
  327. (initial_type(
  328. get_in(keys = ['daily', income_or_consumption, field_name], coll = stat, default = initial)) for stat in
  329. stats),
  330. initial
  331. )
  332. def daily_stat_income_sum_on_field(field_name, stats):
  333. # type: (str, Iterable)->RMB
  334. return daily_stat_sum_on_field('income', field_name = field_name, stats = stats, initial_type = RMB)
  335. def daily_stat_consumption_sum_on_field(field_name, stats):
  336. # type: (str, Iterable)->Quantity
  337. return daily_stat_sum_on_field('consumption', field_name = field_name, stats = stats, initial_type = Quantity)
  338. def transform(keys, sub_stats, translate, sum_fn):
  339. # type:(List[str], List[dict], Callable[[dict], list], Callable[[Iterable], int])-> List[dict]
  340. """
  341. 转换器
  342. get_in(list, iter, default_value) ex:
  343. demo = {
  344. "chain": {"wuhan": ["hs", "dh"]}
  345. }
  346. get_in(["chain", "wuhan", 1], demo) -> dh
  347. merge_with(func, iter) ex:
  348. demo1 = {"card": 10, "recharge": 20}
  349. demo2 = {"card": 1, "recharge": 2}
  350. merge_with(sum, [demo1, demo2]) -> {"card": 11, "recharge": 22}
  351. """
  352. return translate(merge_with(sum_fn, [get_in(keys, sub_stat, {}) for sub_stat in sub_stats]))
  353. def cum_stats(keys, stats, translate, sum_fn = sum):
  354. # type:(List[str], Iterable, Callable[[dict]], Callable[[Iterable], int])->Dict[str, list]
  355. """
  356. 将统计出来的记录 分组进行加和
  357. :param keys: 分组key
  358. :param stats: 统计记录
  359. :param translate: 数值转换函数
  360. :param sum_fn: 加和函数
  361. :return:
  362. """
  363. incomeMap = dict()
  364. groupStats = cytoolz.groupby("groupId", stats)
  365. for groupId, sub_stats in groupStats.items():
  366. incomeMap[str(groupId)] = transform(keys, sub_stats, translate, sum_fn)
  367. return incomeMap
  368. def income_map(kind, amount):
  369. return {'name': DEALER_INCOME_SOURCE_TRANSLATION[kind], 'value': RMB(amount), 'source': kind}
  370. def translate_income(mapping):
  371. return [income_map(kind, amount) for kind, amount in mapping.iteritems() if
  372. kind in DEALER_INCOME_SOURCE_TRANSLATION]
  373. def default_income_translation(kinds):
  374. return [income_map(kind, 0) for kind in kinds]
  375. def consumption_map(kind, amount):
  376. return {
  377. 'source': kind,
  378. 'name': DEALER_CONSUMPTION_AGG_KIND_TRANSLATION[kind],
  379. 'value': to_quantity(kind = kind, value = amount),
  380. 'unit': DEALER_CONSUMPTION_AGG_KIND_UNIT[kind]
  381. }
  382. class StatisticRecorder(object):
  383. OBJECT_MODULE = None
  384. def __init__(self, processor, statisticModel):
  385. # type:(CentralDataProcessor, DailyStat) -> None
  386. self._processor = processor
  387. self._statisticModel = statisticModel
  388. @property
  389. def statisticModel(self):
  390. return self._statisticModel
  391. def work(self, check):
  392. # type:(bool) -> bool
  393. """
  394. 插入数据的时候需要注意的是check的判断和[record.id in origin] 的判断不要冲突
  395. :param check: 是否校验数据已经被插入
  396. :return: 是否无异常处理
  397. """
  398. record = self._processor.record
  399. if check and self.statisticModel.check_already_record(record):
  400. logger.info("skip <{}> id <{}> for <{}>, already recorded!".format(record, record.id, self.statisticModel))
  401. return True
  402. return self.statisticModel.update_statistic_data(record)
  403. class CentralDataProcessor(object):
  404. """
  405. 数据中央处理器
  406. 流程 全部的执行顺序为:
  407. 设备 --> 组 --> 经销商 --> ...
  408. """
  409. def __init__(self, record, check = False, allowed = None):
  410. # type:(Optional[ConsumeRecord, DealerIncomeProxy], bool, dict) -> None
  411. """
  412. :param record: 待处理的数据
  413. :param check: 是否需要检查数据已经被处理过
  414. :param allowed: 各个角色是否需要记录相应的数据
  415. """
  416. # TODO zjl record的校验
  417. self._record = record
  418. self._check = check
  419. self._statisticList = list()
  420. # 加载数据处理
  421. allowed = allowed or dict.fromkeys(["device", "group", "dealer"], True)
  422. # 构建每个独立的worker
  423. for _statistic in [DeviceDailyStat, GroupDailyStat, DealerDailyStat]:
  424. if _statistic.is_allowed(allowed):
  425. self._statisticList.append(_statistic)
  426. def process(self):
  427. """
  428. 记录统计值
  429. :return:
  430. """
  431. # TODO zjl 有可能的话 异步进行
  432. # TODO zjl 对于 result 进行处理 异常的订单处理机制尚待完成
  433. for _statistic in self._statisticList:
  434. try:
  435. result = StatisticRecorder(self, _statistic).work(self._check)
  436. except Exception as e:
  437. logger.error("statistic <{}> handle record <{}> id <{}> error {}".format(_statistic, self._record,
  438. self._record.id, e))
  439. result = False
  440. continue
  441. @property
  442. def record(self):
  443. return self._record
  444. def __repr__(self):
  445. return self._record