make_rpt_into_db_zjl.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. 生成报表
  5. """
  6. import datetime
  7. import re
  8. import sys
  9. import threading
  10. from collections import defaultdict
  11. from concurrent.futures import ThreadPoolExecutor, Future
  12. from base import init_env, get_logger
  13. logger = get_logger(__name__)
  14. init_env(interactive = False)
  15. from apps import reportCache
  16. from apps.web.core.accounting import Accounting
  17. from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl
  18. from apps.web.report.models import DevReport, GroupReport, DealerReport
  19. from apps.web.device.models import Group, Device
  20. from apps.web.dealer.models import Dealer
  21. # 导入线程锁
  22. rlock = threading.RLock()
  23. # 设置最大线程数量
  24. MAX_WORKER = 50
  25. def accountDealerOrPartner(f): # type:(Future) -> None
  26. """
  27. 根据统计的 组地址的金币 去加和 地址经销商的相关信息
  28. :param f: future object
  29. :return:
  30. """
  31. # 获取计算的额返回结果
  32. oid, pids, allDevCoinsByGroup, sd, dm = f.result()
  33. # 返回结果中 gid 一定存在 oid 一定存在 pid 不一定存在 allDeviceCoinsByGroup 可能为0
  34. # 地址金币小于等于0 的情况下 直接返还 不参与计算
  35. if not max(allDevCoinsByGroup, 0):
  36. return
  37. with rlock:
  38. # dealerCurCoins = dm[oid]
  39. # dealerCurCoins += allDevCoinsByGroup
  40. dm.update({oid: dm[oid] + allDevCoinsByGroup})
  41. if not pids:
  42. return
  43. # 对于 partner 的加和处理
  44. for _pid in pids:
  45. with rlock:
  46. # partnerCurCoins = dm[_pid]
  47. # partnerCurCoins += allDevCoinsByGroup
  48. dm.update({_pid: dm[_pid] + allDevCoinsByGroup})
  49. def accountGroup(gid, sd, dm, update):
  50. """
  51. 统计组的 金币的消耗详情
  52. :param gid: 组ID
  53. :param sd: 日期
  54. :param dm: 经销商的投币统计
  55. :param update: 是否更新
  56. :return:
  57. """
  58. # 获取所有的缓存键
  59. keys = [devCoinTmpl(devNo, sd) for devNo in Device.get_devNos_by_group([gid])]
  60. valueDict = reportCache.get_many(keys)
  61. # 使用生成器表达式
  62. listCoins = (int(coins) for coins in valueDict.values())
  63. allDevCoinsByGroup = sum(listCoins)
  64. # 取出旧数据 做对比判断
  65. oldValue = reportCache.get(groupCoinTmpl(gid, sd))
  66. if not oldValue:
  67. oldValue = 0
  68. else:
  69. oldValue = int(oldValue)
  70. if oldValue != allDevCoinsByGroup:
  71. logger.info('not equal, groupId=%s, oldValue=%s, nowValue=%s' % (gid, oldValue, allDevCoinsByGroup))
  72. else:
  73. logger.info('euqal, groupId=%s, oldValue=%s, nowValue=%s' % (gid, oldValue, allDevCoinsByGroup))
  74. # 需要更新 缓存组金币统计的情况下 直接刷新缓存
  75. if update:
  76. reportCache.set(groupCoinTmpl(gid, sd), str(int(allDevCoinsByGroup)))
  77. # 返回gid 以及相应的金币的统计值 作为回调使用
  78. group = Group.get_group(gid)
  79. pids = [_partner.get("id") for _partner in group.partners()]
  80. oid = group.ownerId
  81. return oid, pids, allDevCoinsByGroup, sd, dm
  82. def check_offline_coins(groupIds, dealerIds, stringDate, update = False):
  83. """
  84. 统计 线下金币的 以设备的金币统计为准 更新组地址、经销商、合伙人的金币统计
  85. 修改为以下的路线 将map反转 以地址为基本计算单元 当某一个计算单元完成之后,通过callback 执行经销商或者合伙人的数量累加 所有任务执行完成的时候,一次性提交
  86. # TODO 考虑使用pandas 的数据联表统计 替代 for 循环方式
  87. :param groupIds: 所有需要统计的地址
  88. :param dealerIds: 所有需要统计的经销商(包含partner)
  89. :param stringDate: 日期字符串 一般是当日的日期
  90. :param update: 是否更新
  91. :return:
  92. """
  93. # 取出所有的groupId 求并集
  94. allGroupIds = groupIds
  95. dealerCoinMap = defaultdict(int)
  96. # 所有的task执行完成前阻塞
  97. with ThreadPoolExecutor(max_workers = MAX_WORKER) as executor:
  98. for _gid in allGroupIds:
  99. executor.submit(accountGroup, _gid, stringDate, dealerCoinMap, update).add_done_callback(
  100. accountDealerOrPartner)
  101. # TODO 所有的task 都完成之后 最后看下经销商的金币数量和之前的是否是相等 并且处理更新
  102. # TODO 可以考虑为 dealer 设置一个终点触发值 在经销商最后一次加和的时候进行统计 而不是再走一个for循环
  103. dealerAndPartnerIds = dealerIds
  104. for _did in dealerAndPartnerIds:
  105. oldKey = ownerCoinTmpl(_did, stringDate)
  106. oValue = reportCache.get(oldKey)
  107. nValue = dealerCoinMap.get(_did)
  108. if not oValue:
  109. oValue = 0
  110. else:
  111. oValue = int(oValue)
  112. # 数值对比
  113. if oValue != nValue:
  114. logger.info('not equal. ownerKey=%s,oldValue=%s,nowValue=%s' % (_did, oValue, nValue))
  115. else:
  116. logger.info('ownerKey=%s,nowValue=%s' % (_did, nValue))
  117. if update:
  118. reportCache.set(oldKey, str(int(nValue)))
  119. def report_into_db(devDict, startTime, reportDate):
  120. stringDate = reportDate.strftime("%Y-%m-%d")
  121. logger.info('generating report for date (%s)' % (stringDate,))
  122. #: 生成日报表
  123. for ownerId, devList in devDict.items():
  124. devRptDict = Accounting.getDevIncome(devList, reportDate)
  125. groupIds = Group.get_group_ids_of_dealer(ownerId)
  126. grpRptDict = Accounting.getGroupIncome(groupIds, reportDate)
  127. dealerRpt = Accounting.getOwnerIncome(ownerId, reportDate)
  128. devRpts, devNos = [], []
  129. for devNo, rpt in devRptDict.items():
  130. devRpts.append({'devNo': devNo, 'type': 'day', 'date': stringDate, 'rpt': rpt})
  131. devNos.append(devNo)
  132. try:
  133. DevReport.get_collection().remove({'devNo': {'$in': devNos}, 'date': stringDate, 'type': 'day'})
  134. if devRpts:
  135. DevReport.get_collection().insert(devRpts)
  136. except Exception, e:
  137. logger.exception('insert dev rpt error=%s,devNos=%s,owner=%s' % (e, devNos, ownerId,))
  138. raise
  139. grpRpts, groupIds = [], []
  140. for groupId, rpt in grpRptDict.items():
  141. grpRpts.append({'groupId': groupId, 'type': 'day', 'date': stringDate, 'rpt': rpt})
  142. groupIds.append(groupId)
  143. try:
  144. GroupReport.get_collection().remove({'groupId': {'$in': groupIds}, 'date': stringDate, 'type': 'day'})
  145. if grpRpts:
  146. GroupReport.get_collection().insert(grpRpts)
  147. except Exception, e:
  148. logger.exception('insert group rpt error=%s,groupIds=%s,owner=%s' % (e, groupIds, ownerId))
  149. raise
  150. try:
  151. DealerReport.get_collection().remove({'ownerId': ownerId, 'date': stringDate, 'type': 'day'})
  152. DealerReport.get_collection().insert(
  153. [{'ownerId': ownerId, 'date': stringDate, 'type': 'day', 'rpt': dealerRpt}])
  154. except Exception, e:
  155. logger.exception('insert dealer rpt error=%s,owner=%s' % (e, ownerId))
  156. raise
  157. #: 将当月的所有数据汇总,并更新到mongodb
  158. startDate = reportDate.strftime("%Y-%m") + "-01"
  159. endDate = stringDate
  160. def monthlySummary(rpts):
  161. monthRpt = {'lineCoins': 0, 'todayPayIncome': 0, 'todayAdIncome': 0, 'totalIncome': 0, 'count': 0}
  162. for rpt in rpts:
  163. monthRpt['lineCoins'] += rpt['rpt'].get('lineCoins', 0)
  164. monthRpt['count'] += rpt['rpt'].get('count', 0)
  165. return monthRpt
  166. for ownerId, devList in devDict.items():
  167. for devNo in devList:
  168. devRpts = DevReport.get_collection() \
  169. .find({'devNo': devNo, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}})
  170. monthRpt = monthlySummary(devRpts)
  171. try:
  172. DevReport.get_collection().update({'devNo': devNo, 'type': 'month', 'date': startDate},
  173. {'$set': {'devNo': devNo, 'type': 'month', 'date': startDate,
  174. 'rpt': monthRpt}}, upsert = True)
  175. except Exception, e:
  176. logger.exception(
  177. 'update dev month report error=%s,devNo=%s,owner=%s,rpt=%s' % (e, devNo, ownerId, monthRpt))
  178. raise
  179. groupIds = Group.get_group_ids_of_dealer(ownerId)
  180. for groupId in groupIds:
  181. grpRpts = GroupReport.get_collection() \
  182. .find({'groupId': groupId, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}})
  183. monthRpt = monthlySummary(grpRpts)
  184. try:
  185. GroupReport.get_collection().update({'groupId': groupId, 'type': 'month', 'date': startDate},
  186. {'$set': {'groupId': groupId, 'type': 'month', 'date': startDate,
  187. 'rpt': monthRpt}}, upsert = True)
  188. except Exception, e:
  189. logger.exception('update group month report error=%s,groupId=%s,owner=%s,rpt=%s'
  190. % (e, groupId, ownerId, monthRpt))
  191. raise
  192. dealerRpts = DealerReport.get_collection() \
  193. .find({'ownerId': ownerId, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}})
  194. monthRpt = monthlySummary(dealerRpts)
  195. try:
  196. DealerReport.get_collection() \
  197. .update({'ownerId': ownerId, 'type': 'month', 'date': startDate},
  198. {'$set': {'ownerId': ownerId, 'type': 'month', 'date': startDate, 'rpt': monthRpt}},
  199. upsert = True)
  200. except Exception, e:
  201. logger.exception('update dealer month report error=%s,owner=%s,rpt=%s' % (e, ownerId, monthRpt))
  202. raise
  203. logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - startTime,))
  204. def main(dateFmtStr):
  205. paramDealerId = None
  206. if len(sys.argv) >= 2:
  207. paramDealerId = str(sys.argv[1])
  208. startTime = datetime.datetime.now()
  209. if dateFmtStr:
  210. reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d")
  211. else:
  212. reportDate = datetime.datetime.now() - datetime.timedelta(days = 1)
  213. dealer_id_list = []
  214. if paramDealerId:
  215. dealer_id_list = [paramDealerId]
  216. else:
  217. items = DealerReport.get_collection().find({'date': '2021-04-12', 'rpt.lineCoins': {'$gt': 0}})
  218. for item in items:
  219. dealer_id_list.append(item['ownerId'])
  220. groupIds = set()
  221. dealersWithGroupIds = dict()
  222. for dealerId in dealer_id_list:
  223. logger.info('fetch group map for dealer<id={}>'.format(dealerId))
  224. dealersWithGroupIds[dealerId] = Group.get_group_ids_of_dealer(dealerId)
  225. groupIds |= set(Group.get_group_ids_of_dealer_and_partner(dealerId))
  226. devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealersWithGroupIds.items()}
  227. check_offline_coins(groupIds, dealer_id_list, reportDate.strftime("%Y-%m-%d"))
  228. report_into_db(devDict, startTime, reportDate)
  229. def generate_test_data():
  230. """
  231. 生成测试数据
  232. :return:
  233. """
  234. import os
  235. if os.environ["DJANGO_SETTINGS_MODULE"] != "configs.testing":
  236. return
  237. dealerIds = [str(_d.id) for _d in Dealer.objects.only("id")]
  238. groupIds = set()
  239. for _did in dealerIds:
  240. groupIds |= set(Group.get_group_ids_of_dealer(_did))
  241. devNos = Device.get_devNos_by_group(list(groupIds))
  242. import random
  243. for _devNo in devNos:
  244. coins = str(random.randint(1, 100))
  245. reportCache.set(devCoinTmpl(_devNo, "2020-04-21"), coins)
  246. if __name__ == '__main__':
  247. # generate_test_data()
  248. dateFmtStr = None
  249. index = 1
  250. regex = re.compile(r'\d{4}-\d{2}-\d{2}')
  251. for arg in sys.argv[1:]:
  252. m = regex.search(arg)
  253. if m and m.group():
  254. dateFmtStr = arg
  255. sys.argv.pop(index)
  256. break
  257. index += 1
  258. main(dateFmtStr)