|
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """
- 生成报表
- """
- import datetime
- import re
- import sys
- import threading
- from collections import defaultdict
- from concurrent.futures import ThreadPoolExecutor, Future
- from base import init_env, get_logger
- logger = get_logger(__name__)
- init_env(interactive = False)
- from apps import reportCache
- from apps.web.core.accounting import Accounting
- from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl
- from apps.web.report.models import DevReport, GroupReport, DealerReport
- from apps.web.device.models import Group, Device
- from apps.web.dealer.models import Dealer
- # 导入线程锁
- rlock = threading.RLock()
- # 设置最大线程数量
- MAX_WORKER = 50
- def accountDealerOrPartner(f): # type:(Future) -> None
- """
- 根据统计的 组地址的金币 去加和 地址经销商的相关信息
- :param f: future object
- :return:
- """
- # 获取计算的额返回结果
- oid, pids, allDevCoinsByGroup, sd, dm = f.result()
- # 返回结果中 gid 一定存在 oid 一定存在 pid 不一定存在 allDeviceCoinsByGroup 可能为0
- # 地址金币小于等于0 的情况下 直接返还 不参与计算
- if not max(allDevCoinsByGroup, 0):
- return
- with rlock:
- # dealerCurCoins = dm[oid]
- # dealerCurCoins += allDevCoinsByGroup
- dm.update({oid: dm[oid] + allDevCoinsByGroup})
- if not pids:
- return
- # 对于 partner 的加和处理
- for _pid in pids:
- with rlock:
- # partnerCurCoins = dm[_pid]
- # partnerCurCoins += allDevCoinsByGroup
- dm.update({_pid: dm[_pid] + allDevCoinsByGroup})
- def accountGroup(gid, sd, dm, update):
- """
- 统计组的 金币的消耗详情
- :param gid: 组ID
- :param sd: 日期
- :param dm: 经销商的投币统计
- :param update: 是否更新
- :return:
- """
- # 获取所有的缓存键
- keys = [devCoinTmpl(devNo, sd) for devNo in Device.get_devNos_by_group([gid])]
- valueDict = reportCache.get_many(keys)
- # 使用生成器表达式
- listCoins = (int(coins) for coins in valueDict.values())
- allDevCoinsByGroup = sum(listCoins)
- # 取出旧数据 做对比判断
- oldValue = reportCache.get(groupCoinTmpl(gid, sd))
- if not oldValue:
- oldValue = 0
- else:
- oldValue = int(oldValue)
- if oldValue != allDevCoinsByGroup:
- logger.info('not equal, groupId=%s, oldValue=%s, nowValue=%s' % (gid, oldValue, allDevCoinsByGroup))
- else:
- logger.info('euqal, groupId=%s, oldValue=%s, nowValue=%s' % (gid, oldValue, allDevCoinsByGroup))
- # 需要更新 缓存组金币统计的情况下 直接刷新缓存
- if update:
- reportCache.set(groupCoinTmpl(gid, sd), str(int(allDevCoinsByGroup)))
- # 返回gid 以及相应的金币的统计值 作为回调使用
- group = Group.get_group(gid)
- pids = [_partner.get("id") for _partner in group.partners()]
- oid = group.ownerId
- return oid, pids, allDevCoinsByGroup, sd, dm
- def check_offline_coins(groupIds, dealerIds, stringDate, update = False):
- """
- 统计 线下金币的 以设备的金币统计为准 更新组地址、经销商、合伙人的金币统计
- 修改为以下的路线 将map反转 以地址为基本计算单元 当某一个计算单元完成之后,通过callback 执行经销商或者合伙人的数量累加 所有任务执行完成的时候,一次性提交
- # TODO 考虑使用pandas 的数据联表统计 替代 for 循环方式
- :param groupIds: 所有需要统计的地址
- :param dealerIds: 所有需要统计的经销商(包含partner)
- :param stringDate: 日期字符串 一般是当日的日期
- :param update: 是否更新
- :return:
- """
- # 取出所有的groupId 求并集
- allGroupIds = groupIds
- dealerCoinMap = defaultdict(int)
- # 所有的task执行完成前阻塞
- with ThreadPoolExecutor(max_workers = MAX_WORKER) as executor:
- for _gid in allGroupIds:
- executor.submit(accountGroup, _gid, stringDate, dealerCoinMap, update).add_done_callback(
- accountDealerOrPartner)
- # TODO 所有的task 都完成之后 最后看下经销商的金币数量和之前的是否是相等 并且处理更新
- # TODO 可以考虑为 dealer 设置一个终点触发值 在经销商最后一次加和的时候进行统计 而不是再走一个for循环
- dealerAndPartnerIds = dealerIds
- for _did in dealerAndPartnerIds:
- oldKey = ownerCoinTmpl(_did, stringDate)
- oValue = reportCache.get(oldKey)
- nValue = dealerCoinMap.get(_did)
- if not oValue:
- oValue = 0
- else:
- oValue = int(oValue)
- # 数值对比
- if oValue != nValue:
- logger.info('not equal. ownerKey=%s,oldValue=%s,nowValue=%s' % (_did, oValue, nValue))
- else:
- logger.info('ownerKey=%s,nowValue=%s' % (_did, nValue))
- if update:
- reportCache.set(oldKey, str(int(nValue)))
- def report_into_db(devDict, startTime, reportDate):
- stringDate = reportDate.strftime("%Y-%m-%d")
- logger.info('generating report for date (%s)' % (stringDate,))
- #: 生成日报表
- for ownerId, devList in devDict.items():
- devRptDict = Accounting.getDevIncome(devList, reportDate)
- groupIds = Group.get_group_ids_of_dealer(ownerId)
- grpRptDict = Accounting.getGroupIncome(groupIds, reportDate)
- dealerRpt = Accounting.getOwnerIncome(ownerId, reportDate)
- devRpts, devNos = [], []
- for devNo, rpt in devRptDict.items():
- devRpts.append({'devNo': devNo, 'type': 'day', 'date': stringDate, 'rpt': rpt})
- devNos.append(devNo)
- try:
- DevReport.get_collection().remove({'devNo': {'$in': devNos}, 'date': stringDate, 'type': 'day'})
- if devRpts:
- DevReport.get_collection().insert(devRpts)
- except Exception, e:
- logger.exception('insert dev rpt error=%s,devNos=%s,owner=%s' % (e, devNos, ownerId,))
- raise
- grpRpts, groupIds = [], []
- for groupId, rpt in grpRptDict.items():
- grpRpts.append({'groupId': groupId, 'type': 'day', 'date': stringDate, 'rpt': rpt})
- groupIds.append(groupId)
- try:
- GroupReport.get_collection().remove({'groupId': {'$in': groupIds}, 'date': stringDate, 'type': 'day'})
- if grpRpts:
- GroupReport.get_collection().insert(grpRpts)
- except Exception, e:
- logger.exception('insert group rpt error=%s,groupIds=%s,owner=%s' % (e, groupIds, ownerId))
- raise
- try:
- DealerReport.get_collection().remove({'ownerId': ownerId, 'date': stringDate, 'type': 'day'})
- DealerReport.get_collection().insert(
- [{'ownerId': ownerId, 'date': stringDate, 'type': 'day', 'rpt': dealerRpt}])
- except Exception, e:
- logger.exception('insert dealer rpt error=%s,owner=%s' % (e, ownerId))
- raise
- #: 将当月的所有数据汇总,并更新到mongodb
- startDate = reportDate.strftime("%Y-%m") + "-01"
- endDate = stringDate
- def monthlySummary(rpts):
- monthRpt = {'lineCoins': 0, 'todayPayIncome': 0, 'todayAdIncome': 0, 'totalIncome': 0, 'count': 0}
- for rpt in rpts:
- monthRpt['lineCoins'] += rpt['rpt'].get('lineCoins', 0)
- monthRpt['count'] += rpt['rpt'].get('count', 0)
- return monthRpt
- for ownerId, devList in devDict.items():
- for devNo in devList:
- devRpts = DevReport.get_collection() \
- .find({'devNo': devNo, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}})
- monthRpt = monthlySummary(devRpts)
- try:
- DevReport.get_collection().update({'devNo': devNo, 'type': 'month', 'date': startDate},
- {'$set': {'devNo': devNo, 'type': 'month', 'date': startDate,
- 'rpt': monthRpt}}, upsert = True)
- except Exception, e:
- logger.exception(
- 'update dev month report error=%s,devNo=%s,owner=%s,rpt=%s' % (e, devNo, ownerId, monthRpt))
- raise
- groupIds = Group.get_group_ids_of_dealer(ownerId)
- for groupId in groupIds:
- grpRpts = GroupReport.get_collection() \
- .find({'groupId': groupId, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}})
- monthRpt = monthlySummary(grpRpts)
- try:
- GroupReport.get_collection().update({'groupId': groupId, 'type': 'month', 'date': startDate},
- {'$set': {'groupId': groupId, 'type': 'month', 'date': startDate,
- 'rpt': monthRpt}}, upsert = True)
- except Exception, e:
- logger.exception('update group month report error=%s,groupId=%s,owner=%s,rpt=%s'
- % (e, groupId, ownerId, monthRpt))
- raise
- dealerRpts = DealerReport.get_collection() \
- .find({'ownerId': ownerId, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}})
- monthRpt = monthlySummary(dealerRpts)
- try:
- DealerReport.get_collection() \
- .update({'ownerId': ownerId, 'type': 'month', 'date': startDate},
- {'$set': {'ownerId': ownerId, 'type': 'month', 'date': startDate, 'rpt': monthRpt}},
- upsert = True)
- except Exception, e:
- logger.exception('update dealer month report error=%s,owner=%s,rpt=%s' % (e, ownerId, monthRpt))
- raise
- logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - startTime,))
- def main(dateFmtStr):
- paramDealerId = None
- if len(sys.argv) >= 2:
- paramDealerId = str(sys.argv[1])
- startTime = datetime.datetime.now()
- if dateFmtStr:
- reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d")
- else:
- reportDate = datetime.datetime.now() - datetime.timedelta(days = 1)
- dealer_id_list = []
- if paramDealerId:
- dealer_id_list = [paramDealerId]
- else:
- items = DealerReport.get_collection().find({'date': '2021-04-12', 'rpt.lineCoins': {'$gt': 0}})
- for item in items:
- dealer_id_list.append(item['ownerId'])
- groupIds = set()
- dealersWithGroupIds = dict()
- for dealerId in dealer_id_list:
- logger.info('fetch group map for dealer<id={}>'.format(dealerId))
- dealersWithGroupIds[dealerId] = Group.get_group_ids_of_dealer(dealerId)
- groupIds |= set(Group.get_group_ids_of_dealer_and_partner(dealerId))
- devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealersWithGroupIds.items()}
- check_offline_coins(groupIds, dealer_id_list, reportDate.strftime("%Y-%m-%d"))
- report_into_db(devDict, startTime, reportDate)
- def generate_test_data():
- """
- 生成测试数据
- :return:
- """
- import os
- if os.environ["DJANGO_SETTINGS_MODULE"] != "configs.testing":
- return
- dealerIds = [str(_d.id) for _d in Dealer.objects.only("id")]
- groupIds = set()
- for _did in dealerIds:
- groupIds |= set(Group.get_group_ids_of_dealer(_did))
- devNos = Device.get_devNos_by_group(list(groupIds))
- import random
- for _devNo in devNos:
- coins = str(random.randint(1, 100))
- reportCache.set(devCoinTmpl(_devNo, "2020-04-21"), coins)
- if __name__ == '__main__':
- # generate_test_data()
- dateFmtStr = None
- index = 1
- regex = re.compile(r'\d{4}-\d{2}-\d{2}')
- for arg in sys.argv[1:]:
- m = regex.search(arg)
- if m and m.group():
- dateFmtStr = arg
- sys.argv.pop(index)
- break
- index += 1
- main(dateFmtStr)
|