# -*- coding: utf-8 -*- # !/usr/bin/env python """ 生成报表 """ import datetime import re import sys import threading from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, Future from base import init_env, get_logger logger = get_logger(__name__) init_env(interactive = False) from apps import reportCache from apps.web.core.accounting import Accounting from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl from apps.web.report.models import DevReport, GroupReport, DealerReport from apps.web.device.models import Group, Device from apps.web.dealer.models import Dealer # 导入线程锁 rlock = threading.RLock() # 设置最大线程数量 MAX_WORKER = 50 def accountDealerOrPartner(f): # type:(Future) -> None """ 根据统计的 组地址的金币 去加和 地址经销商的相关信息 :param f: future object :return: """ # 获取计算的额返回结果 oid, pids, allDevCoinsByGroup, sd, dm = f.result() # 返回结果中 gid 一定存在 oid 一定存在 pid 不一定存在 allDeviceCoinsByGroup 可能为0 # 地址金币小于等于0 的情况下 直接返还 不参与计算 if not max(allDevCoinsByGroup, 0): return with rlock: # dealerCurCoins = dm[oid] # dealerCurCoins += allDevCoinsByGroup dm.update({oid: dm[oid] + allDevCoinsByGroup}) if not pids: return # 对于 partner 的加和处理 for _pid in pids: with rlock: # partnerCurCoins = dm[_pid] # partnerCurCoins += allDevCoinsByGroup dm.update({_pid: dm[_pid] + allDevCoinsByGroup}) def accountGroup(gid, sd, dm, update): """ 统计组的 金币的消耗详情 :param gid: 组ID :param sd: 日期 :param dm: 经销商的投币统计 :param update: 是否更新 :return: """ # 获取所有的缓存键 keys = [devCoinTmpl(devNo, sd) for devNo in Device.get_devNos_by_group([gid])] valueDict = reportCache.get_many(keys) # 使用生成器表达式 listCoins = (int(coins) for coins in valueDict.values()) allDevCoinsByGroup = sum(listCoins) # 取出旧数据 做对比判断 oldValue = reportCache.get(groupCoinTmpl(gid, sd)) if not oldValue: oldValue = 0 else: oldValue = int(oldValue) if oldValue != allDevCoinsByGroup: logger.info('not equal, groupId=%s, oldValue=%s, nowValue=%s' % (gid, oldValue, allDevCoinsByGroup)) else: logger.info('euqal, groupId=%s, oldValue=%s, nowValue=%s' % (gid, oldValue, allDevCoinsByGroup)) # 需要更新 缓存组金币统计的情况下 直接刷新缓存 if update: reportCache.set(groupCoinTmpl(gid, sd), str(int(allDevCoinsByGroup))) # 返回gid 以及相应的金币的统计值 作为回调使用 group = Group.get_group(gid) pids = [_partner.get("id") for _partner in group.partners()] oid = group.ownerId return oid, pids, allDevCoinsByGroup, sd, dm def check_offline_coins(groupIds, dealerIds, stringDate, update = False): """ 统计 线下金币的 以设备的金币统计为准 更新组地址、经销商、合伙人的金币统计 修改为以下的路线 将map反转 以地址为基本计算单元 当某一个计算单元完成之后,通过callback 执行经销商或者合伙人的数量累加 所有任务执行完成的时候,一次性提交 # TODO 考虑使用pandas 的数据联表统计 替代 for 循环方式 :param groupIds: 所有需要统计的地址 :param dealerIds: 所有需要统计的经销商(包含partner) :param stringDate: 日期字符串 一般是当日的日期 :param update: 是否更新 :return: """ # 取出所有的groupId 求并集 allGroupIds = groupIds dealerCoinMap = defaultdict(int) # 所有的task执行完成前阻塞 with ThreadPoolExecutor(max_workers = MAX_WORKER) as executor: for _gid in allGroupIds: executor.submit(accountGroup, _gid, stringDate, dealerCoinMap, update).add_done_callback( accountDealerOrPartner) # TODO 所有的task 都完成之后 最后看下经销商的金币数量和之前的是否是相等 并且处理更新 # TODO 可以考虑为 dealer 设置一个终点触发值 在经销商最后一次加和的时候进行统计 而不是再走一个for循环 dealerAndPartnerIds = dealerIds for _did in dealerAndPartnerIds: oldKey = ownerCoinTmpl(_did, stringDate) oValue = reportCache.get(oldKey) nValue = dealerCoinMap.get(_did) if not oValue: oValue = 0 else: oValue = int(oValue) # 数值对比 if oValue != nValue: logger.info('not equal. ownerKey=%s,oldValue=%s,nowValue=%s' % (_did, oValue, nValue)) else: logger.info('ownerKey=%s,nowValue=%s' % (_did, nValue)) if update: reportCache.set(oldKey, str(int(nValue))) def report_into_db(devDict, startTime, reportDate): stringDate = reportDate.strftime("%Y-%m-%d") logger.info('generating report for date (%s)' % (stringDate,)) #: 生成日报表 for ownerId, devList in devDict.items(): devRptDict = Accounting.getDevIncome(devList, reportDate) groupIds = Group.get_group_ids_of_dealer(ownerId) grpRptDict = Accounting.getGroupIncome(groupIds, reportDate) dealerRpt = Accounting.getOwnerIncome(ownerId, reportDate) devRpts, devNos = [], [] for devNo, rpt in devRptDict.items(): devRpts.append({'devNo': devNo, 'type': 'day', 'date': stringDate, 'rpt': rpt}) devNos.append(devNo) try: DevReport.get_collection().remove({'devNo': {'$in': devNos}, 'date': stringDate, 'type': 'day'}) if devRpts: DevReport.get_collection().insert(devRpts) except Exception, e: logger.exception('insert dev rpt error=%s,devNos=%s,owner=%s' % (e, devNos, ownerId,)) raise grpRpts, groupIds = [], [] for groupId, rpt in grpRptDict.items(): grpRpts.append({'groupId': groupId, 'type': 'day', 'date': stringDate, 'rpt': rpt}) groupIds.append(groupId) try: GroupReport.get_collection().remove({'groupId': {'$in': groupIds}, 'date': stringDate, 'type': 'day'}) if grpRpts: GroupReport.get_collection().insert(grpRpts) except Exception, e: logger.exception('insert group rpt error=%s,groupIds=%s,owner=%s' % (e, groupIds, ownerId)) raise try: DealerReport.get_collection().remove({'ownerId': ownerId, 'date': stringDate, 'type': 'day'}) DealerReport.get_collection().insert( [{'ownerId': ownerId, 'date': stringDate, 'type': 'day', 'rpt': dealerRpt}]) except Exception, e: logger.exception('insert dealer rpt error=%s,owner=%s' % (e, ownerId)) raise #: 将当月的所有数据汇总,并更新到mongodb startDate = reportDate.strftime("%Y-%m") + "-01" endDate = stringDate def monthlySummary(rpts): monthRpt = {'lineCoins': 0, 'todayPayIncome': 0, 'todayAdIncome': 0, 'totalIncome': 0, 'count': 0} for rpt in rpts: monthRpt['lineCoins'] += rpt['rpt'].get('lineCoins', 0) monthRpt['count'] += rpt['rpt'].get('count', 0) return monthRpt for ownerId, devList in devDict.items(): for devNo in devList: devRpts = DevReport.get_collection() \ .find({'devNo': devNo, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}}) monthRpt = monthlySummary(devRpts) try: DevReport.get_collection().update({'devNo': devNo, 'type': 'month', 'date': startDate}, {'$set': {'devNo': devNo, 'type': 'month', 'date': startDate, 'rpt': monthRpt}}, upsert = True) except Exception, e: logger.exception( 'update dev month report error=%s,devNo=%s,owner=%s,rpt=%s' % (e, devNo, ownerId, monthRpt)) raise groupIds = Group.get_group_ids_of_dealer(ownerId) for groupId in groupIds: grpRpts = GroupReport.get_collection() \ .find({'groupId': groupId, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}}) monthRpt = monthlySummary(grpRpts) try: GroupReport.get_collection().update({'groupId': groupId, 'type': 'month', 'date': startDate}, {'$set': {'groupId': groupId, 'type': 'month', 'date': startDate, 'rpt': monthRpt}}, upsert = True) except Exception, e: logger.exception('update group month report error=%s,groupId=%s,owner=%s,rpt=%s' % (e, groupId, ownerId, monthRpt)) raise dealerRpts = DealerReport.get_collection() \ .find({'ownerId': ownerId, 'type': 'day', 'date': {'$gte': startDate, '$lte': endDate}}) monthRpt = monthlySummary(dealerRpts) try: DealerReport.get_collection() \ .update({'ownerId': ownerId, 'type': 'month', 'date': startDate}, {'$set': {'ownerId': ownerId, 'type': 'month', 'date': startDate, 'rpt': monthRpt}}, upsert = True) except Exception, e: logger.exception('update dealer month report error=%s,owner=%s,rpt=%s' % (e, ownerId, monthRpt)) raise logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - startTime,)) def main(dateFmtStr): paramDealerId = None if len(sys.argv) >= 2: paramDealerId = str(sys.argv[1]) startTime = datetime.datetime.now() if dateFmtStr: reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d") else: reportDate = datetime.datetime.now() - datetime.timedelta(days = 1) dealer_id_list = [] if paramDealerId: dealer_id_list = [paramDealerId] else: items = DealerReport.get_collection().find({'date': '2021-04-12', 'rpt.lineCoins': {'$gt': 0}}) for item in items: dealer_id_list.append(item['ownerId']) groupIds = set() dealersWithGroupIds = dict() for dealerId in dealer_id_list: logger.info('fetch group map for dealer'.format(dealerId)) dealersWithGroupIds[dealerId] = Group.get_group_ids_of_dealer(dealerId) groupIds |= set(Group.get_group_ids_of_dealer_and_partner(dealerId)) devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealersWithGroupIds.items()} check_offline_coins(groupIds, dealer_id_list, reportDate.strftime("%Y-%m-%d")) report_into_db(devDict, startTime, reportDate) def generate_test_data(): """ 生成测试数据 :return: """ import os if os.environ["DJANGO_SETTINGS_MODULE"] != "configs.testing": return dealerIds = [str(_d.id) for _d in Dealer.objects.only("id")] groupIds = set() for _did in dealerIds: groupIds |= set(Group.get_group_ids_of_dealer(_did)) devNos = Device.get_devNos_by_group(list(groupIds)) import random for _devNo in devNos: coins = str(random.randint(1, 100)) reportCache.set(devCoinTmpl(_devNo, "2020-04-21"), coins) if __name__ == '__main__': # generate_test_data() dateFmtStr = None index = 1 regex = re.compile(r'\d{4}-\d{2}-\d{2}') for arg in sys.argv[1:]: m = regex.search(arg) if m and m.group(): dateFmtStr = arg sys.argv.pop(index) break index += 1 main(dateFmtStr)