# -*- coding: utf-8 -*- # !/usr/bin/env python """ 生成报表 """ import datetime import re import sys from base import init_env, get_logger logger = get_logger(__name__) import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.testing') init_env(interactive = False) from apilib.utils_mongo import BulkHandlerEx from apps import reportCache from apps.web.report.models import DevReport, GroupReport, DealerReport from apps.web.device.models import Group, Device, OfflineReportDealers, GroupDict from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl groupid_from_key = lambda groupKey: groupKey.split('_')[1] dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1] def check_offline_coins(dev_in_db, dealerDict, parterDict, stringDate, update = False): dealerCoinDict = {} groupCoinDict = {} for ownerId, groupIds in dealerDict.items(): ownerCoins = 0 for groupId in list(set(groupIds)): devNos = Device.get_devNos_by_group([groupId]) keys = [devCoinTmpl(devNo, stringDate) for devNo in devNos] devCacheDict = reportCache.get_many(keys) for devNo in devNos: dbValue = int(dev_in_db.get(devNo, 0)) if devCoinTmpl(devNo, stringDate) in devCacheDict: cacheValue = int(devCacheDict[devCoinTmpl(devNo, stringDate)]) if dbValue != cacheValue: print '{} db not equal cache {}. update db.'.format(devNo, cacheValue) DevReport.get_collection().update_one({ 'devNo': devNo, 'type': 'day', 'date': stringDate }, { '$set': { 'devNo': devNo, 'type': 'day', 'date': stringDate, 'rpt.lineCoins': cacheValue } }, upsert = True) else: pass # print '{} db<{}> equal cache <{}>.'.format(devNo, dbValue, cacheValue) else: devCacheDict[devCoinTmpl(devNo, stringDate)] = dbValue listCoins = [int(coins) for coins in devCacheDict.values()] allDevCoinsByGroup = sum(listCoins) oldValue = reportCache.get(groupCoinTmpl(groupId, stringDate)) if not oldValue: oldValue = 0 else: oldValue = int(oldValue) groupCoinDict[groupCoinTmpl(groupId, stringDate)] = allDevCoinsByGroup if update: reportCache.set(groupCoinTmpl(groupId, stringDate), str(int(allDevCoinsByGroup))) if oldValue != allDevCoinsByGroup: logger.info( 'not equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % ( ownerId, groupId, oldValue, allDevCoinsByGroup)) else: pass # logger.info( # 'equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % ( # ownerId, groupId, oldValue, allDevCoinsByGroup)) ownerCoins = ownerCoins + int(allDevCoinsByGroup) dealerCoinDict[ownerCoinTmpl(ownerId, stringDate)] = ownerCoins for partnerId, groupIds in parterDict.items(): partnerCoinsByGroup = 0 for groupId in list(set(groupIds)): if groupCoinTmpl(groupId, stringDate) in groupCoinDict: groupCoin = groupCoinDict[groupCoinTmpl(groupId, stringDate)] else: groupCoin = 0 logger.info( 'allocated as partner. partnerId = %s; groupId = %s; coin = %s' % (partnerId, groupId, groupCoin)) partnerCoinsByGroup = partnerCoinsByGroup + int(groupCoin) if ownerCoinTmpl(partnerId, stringDate) in dealerCoinDict: dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = dealerCoinDict[ ownerCoinTmpl(partnerId, stringDate)] + partnerCoinsByGroup else: dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = partnerCoinsByGroup for dealerKey, coin in dealerCoinDict.items(): logger.info('dealer key is: {}'.format(dealerKey)) oldValue = reportCache.get(dealerKey) if not oldValue: oldValue = 0 else: oldValue = int(oldValue) if update: reportCache.set(dealerKey, str(int(coin))) if oldValue != coin: logger.info( 'not equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % ( dealerKey, oldValue, coin)) else: pass # logger.info( # 'equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % ( # dealerKey, oldValue, coin)) return groupCoinDict, dealerCoinDict def report_into_db(groupCoinDict, dealerCoinDict, stringDate): start_time = datetime.datetime.now() logger.info('generating report for date (%s)' % (stringDate,)) bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx for groupKey, coins in groupCoinDict.items(): groupId = groupid_from_key(groupKey) bulker.upsert(query_dict = { 'groupId': groupId, 'type': 'day', 'date': stringDate }, update_dict = { '$set': { 'groupId': groupId, 'type': 'day', 'date': stringDate, 'rpt.lineCoins': int(coins) } }) if len(bulker.requests) >= 2000: bulker.execute() bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx if len(bulker.requests) > 0: bulker.execute() bulker = None bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx for dealerKey, coins in dealerCoinDict.iteritems(): dealerId = dealerid_from_key(dealerKey) bulker.upsert(query_dict = { 'ownerId': dealerId, 'type': 'day', 'date': stringDate }, update_dict = { '$set': { 'ownerId': dealerId, 'type': 'day', 'date': stringDate, 'rpt.lineCoins': int(coins) } }) if len(bulker.requests) >= 2000: bulker.execute() bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx if len(bulker.requests) > 0: bulker.execute() bulker = None logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - start_time,)) def get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId): if dealerId in doneDealers: # logger.debug('{} has done.'.format(dealerId)) return groupIds = Group.get_group_ids_of_dealer(dealerId) if groupIds: dealerDict[dealerId] = groupIds doneDealers.append(dealerId) dealerGroups = Group.get_groups_by_group_ids(groupIds).values() for dealerGroup in dealerGroups: # type: GroupDict groupId = dealerGroup.groupId parters = dealerGroup.partners() for parter in parters: parter_id = parter['id'] parterDict[parter_id] = parterDict.get(parter_id) or set() parterDict[parter_id].add(groupId) get_owner_or_parter(dealerDict, doneDealers, parterDict, parter_id) parterGroupIds = Group.get_group_ids_of_partner(dealerId) parterGroups = Group.get_groups_by_group_ids(parterGroupIds).values() for parterGroup in parterGroups: # type: GroupDict groupId = parterGroup.groupId parterDict[dealerId] = parterDict.get(dealerId) or set() parterDict[dealerId].add(groupId) get_owner_or_parter(dealerDict, doneDealers, parterDict, parterGroup.ownerId) def main(dateFmtStr): if dateFmtStr: reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d") else: reportDate = datetime.datetime.now() - datetime.timedelta(days = 1) startTime = datetime.datetime.now() if len(sys.argv) >= 2: dealer_id_list = [str(sys.argv[1])] else: dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d")) dealerDict = {} parterDict = {} doneDealers = [] for dealerId in dealer_id_list: logger.info('fetch group map for dealer'.format(dealerId)) get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId) for dealerId, groupIds in dealerDict.iteritems(): print 'owner. {} = {}'.format(dealerId, groupIds) for dealerId, groupIds in parterDict.iteritems(): print 'parter. {} = {}'.format(dealerId, groupIds) logger.info('fetch device map for dealer'.format(dealerId)) devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealerDict.items()} devNos = [] for items in devDict.values(): devNos.extend(items) dev_in_db = {} for item in DevReport.get_collection().find( {'devNo': {'$in': devNos}, 'type': 'day', 'date': reportDate.strftime("%Y-%m-%d")}, {'devNo': 1, 'rpt': 1}): if 'rpt' in item and 'lineCoins' in item['rpt']: dev_in_db[item['devNo']] = item['rpt']['lineCoins'] # for devNo, coins in dev_in_db.iteritems(): # print '{} has coins {} in db.'.format(devNo, coins) groupCoinDict, dealerCoinDict = check_offline_coins(dev_in_db, dealerDict, parterDict, reportDate.strftime("%Y-%m-%d"), update = False) for groupKey, coins in groupCoinDict.iteritems(): logger.info('group {} has coins {}'.format(groupid_from_key(groupKey), coins)) for dealerKey, coins in dealerCoinDict.iteritems(): logger.info('dealer {} has coins {}'.format(dealerid_from_key(dealerKey), coins)) report_into_db(groupCoinDict, dealerCoinDict, reportDate.strftime("%Y-%m-%d")) logger.debug('all steps cost {}'.format(datetime.datetime.now() - startTime)) if __name__ == '__main__': dateFmtStr = None index = 1 regex = re.compile(r'\d{4}-\d{2}-\d{2}') for arg in sys.argv[1:]: m = regex.search(arg) if m and m.group(): dateFmtStr = arg sys.argv.pop(index) break index += 1 main(dateFmtStr)