123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """
- 生成报表
- """
- import datetime
- import re
- import sys
- from base import init_env, get_logger
- logger = get_logger(__name__)
- import os
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.testing')
- init_env(interactive = False)
- from apilib.utils_mongo import BulkHandlerEx
- from apps import reportCache
- from apps.web.report.models import DevReport, GroupReport, DealerReport
- from apps.web.device.models import Group, Device, OfflineReportDealers, GroupDict
- from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl
- groupid_from_key = lambda groupKey: groupKey.split('_')[1]
- dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1]
- def check_offline_coins(dev_in_db, dealerDict, parterDict, stringDate, update = False):
- dealerCoinDict = {}
- groupCoinDict = {}
- for ownerId, groupIds in dealerDict.items():
- ownerCoins = 0
- for groupId in list(set(groupIds)):
- devNos = Device.get_devNos_by_group([groupId])
- keys = [devCoinTmpl(devNo, stringDate) for devNo in devNos]
- devCacheDict = reportCache.get_many(keys)
- for devNo in devNos:
- dbValue = int(dev_in_db.get(devNo, 0))
- if devCoinTmpl(devNo, stringDate) in devCacheDict:
- cacheValue = int(devCacheDict[devCoinTmpl(devNo, stringDate)])
- if dbValue != cacheValue:
- print '{} db not equal cache {}. update db.'.format(devNo, cacheValue)
- DevReport.get_collection().update_one({
- 'devNo': devNo,
- 'type': 'day',
- 'date': stringDate
- }, {
- '$set': {
- 'devNo': devNo,
- 'type': 'day',
- 'date': stringDate,
- 'rpt.lineCoins': cacheValue
- }
- }, upsert = True)
- else:
- pass
- # print '{} db<{}> equal cache <{}>.'.format(devNo, dbValue, cacheValue)
- else:
- devCacheDict[devCoinTmpl(devNo, stringDate)] = dbValue
- listCoins = [int(coins) for coins in devCacheDict.values()]
- allDevCoinsByGroup = sum(listCoins)
- oldValue = reportCache.get(groupCoinTmpl(groupId, stringDate))
- if not oldValue:
- oldValue = 0
- else:
- oldValue = int(oldValue)
- groupCoinDict[groupCoinTmpl(groupId, stringDate)] = allDevCoinsByGroup
- if update:
- reportCache.set(groupCoinTmpl(groupId, stringDate), str(int(allDevCoinsByGroup)))
- if oldValue != allDevCoinsByGroup:
- logger.info(
- 'not equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % (
- ownerId, groupId, oldValue, allDevCoinsByGroup))
- else:
- pass
- # logger.info(
- # 'equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % (
- # ownerId, groupId, oldValue, allDevCoinsByGroup))
- ownerCoins = ownerCoins + int(allDevCoinsByGroup)
- dealerCoinDict[ownerCoinTmpl(ownerId, stringDate)] = ownerCoins
- for partnerId, groupIds in parterDict.items():
- partnerCoinsByGroup = 0
- for groupId in list(set(groupIds)):
- if groupCoinTmpl(groupId, stringDate) in groupCoinDict:
- groupCoin = groupCoinDict[groupCoinTmpl(groupId, stringDate)]
- else:
- groupCoin = 0
- logger.info(
- 'allocated as partner. partnerId = %s; groupId = %s; coin = %s' % (partnerId, groupId, groupCoin))
- partnerCoinsByGroup = partnerCoinsByGroup + int(groupCoin)
- if ownerCoinTmpl(partnerId, stringDate) in dealerCoinDict:
- dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = dealerCoinDict[
- ownerCoinTmpl(partnerId,
- stringDate)] + partnerCoinsByGroup
- else:
- dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = partnerCoinsByGroup
- for dealerKey, coin in dealerCoinDict.items():
- logger.info('dealer key is: {}'.format(dealerKey))
- oldValue = reportCache.get(dealerKey)
- if not oldValue:
- oldValue = 0
- else:
- oldValue = int(oldValue)
- if update:
- reportCache.set(dealerKey, str(int(coin)))
- if oldValue != coin:
- logger.info(
- 'not equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % (
- dealerKey, oldValue, coin))
- else:
- pass
- # logger.info(
- # 'equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % (
- # dealerKey, oldValue, coin))
- return groupCoinDict, dealerCoinDict
- def report_into_db(groupCoinDict, dealerCoinDict, stringDate):
- start_time = datetime.datetime.now()
- logger.info('generating report for date (%s)' % (stringDate,))
- bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx
- for groupKey, coins in groupCoinDict.items():
- groupId = groupid_from_key(groupKey)
- bulker.upsert(query_dict = {
- 'groupId': groupId,
- 'type': 'day',
- 'date': stringDate
- }, update_dict = {
- '$set': {
- 'groupId': groupId,
- 'type': 'day',
- 'date': stringDate,
- 'rpt.lineCoins': int(coins)
- }
- })
- if len(bulker.requests) >= 2000:
- bulker.execute()
- bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx
- if len(bulker.requests) > 0:
- bulker.execute()
- bulker = None
- bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx
- for dealerKey, coins in dealerCoinDict.iteritems():
- dealerId = dealerid_from_key(dealerKey)
- bulker.upsert(query_dict = {
- 'ownerId': dealerId,
- 'type': 'day',
- 'date': stringDate
- }, update_dict = {
- '$set': {
- 'ownerId': dealerId,
- 'type': 'day',
- 'date': stringDate,
- 'rpt.lineCoins': int(coins)
- }
- })
- if len(bulker.requests) >= 2000:
- bulker.execute()
- bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx
- if len(bulker.requests) > 0:
- bulker.execute()
- bulker = None
- logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - start_time,))
- def get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId):
- if dealerId in doneDealers:
- # logger.debug('{} has done.'.format(dealerId))
- return
- groupIds = Group.get_group_ids_of_dealer(dealerId)
- if groupIds:
- dealerDict[dealerId] = groupIds
- doneDealers.append(dealerId)
- dealerGroups = Group.get_groups_by_group_ids(groupIds).values()
- for dealerGroup in dealerGroups: # type: GroupDict
- groupId = dealerGroup.groupId
- parters = dealerGroup.partners()
- for parter in parters:
- parter_id = parter['id']
- parterDict[parter_id] = parterDict.get(parter_id) or set()
- parterDict[parter_id].add(groupId)
- get_owner_or_parter(dealerDict, doneDealers, parterDict, parter_id)
- parterGroupIds = Group.get_group_ids_of_partner(dealerId)
- parterGroups = Group.get_groups_by_group_ids(parterGroupIds).values()
- for parterGroup in parterGroups: # type: GroupDict
- groupId = parterGroup.groupId
- parterDict[dealerId] = parterDict.get(dealerId) or set()
- parterDict[dealerId].add(groupId)
- get_owner_or_parter(dealerDict, doneDealers, parterDict, parterGroup.ownerId)
- def main(dateFmtStr):
- if dateFmtStr:
- reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d")
- else:
- reportDate = datetime.datetime.now() - datetime.timedelta(days = 1)
- startTime = datetime.datetime.now()
- if len(sys.argv) >= 2:
- dealer_id_list = [str(sys.argv[1])]
- else:
- dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d"))
- dealerDict = {}
- parterDict = {}
- doneDealers = []
- for dealerId in dealer_id_list:
- logger.info('fetch group map for dealer<id={}>'.format(dealerId))
- get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId)
- for dealerId, groupIds in dealerDict.iteritems():
- print 'owner. {} = {}'.format(dealerId, groupIds)
- for dealerId, groupIds in parterDict.iteritems():
- print 'parter. {} = {}'.format(dealerId, groupIds)
- logger.info('fetch device map for dealer<id={}>'.format(dealerId))
- devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealerDict.items()}
- devNos = []
- for items in devDict.values():
- devNos.extend(items)
- dev_in_db = {}
- for item in DevReport.get_collection().find(
- {'devNo': {'$in': devNos}, 'type': 'day', 'date': reportDate.strftime("%Y-%m-%d")},
- {'devNo': 1, 'rpt': 1}):
- if 'rpt' in item and 'lineCoins' in item['rpt']:
- dev_in_db[item['devNo']] = item['rpt']['lineCoins']
- # for devNo, coins in dev_in_db.iteritems():
- # print '{} has coins {} in db.'.format(devNo, coins)
- groupCoinDict, dealerCoinDict = check_offline_coins(dev_in_db, dealerDict, parterDict,
- reportDate.strftime("%Y-%m-%d"), update = False)
- for groupKey, coins in groupCoinDict.iteritems():
- logger.info('group {} has coins {}'.format(groupid_from_key(groupKey), coins))
- for dealerKey, coins in dealerCoinDict.iteritems():
- logger.info('dealer {} has coins {}'.format(dealerid_from_key(dealerKey), coins))
- report_into_db(groupCoinDict, dealerCoinDict, reportDate.strftime("%Y-%m-%d"))
- logger.debug('all steps cost {}'.format(datetime.datetime.now() - startTime))
- if __name__ == '__main__':
- dateFmtStr = None
- index = 1
- regex = re.compile(r'\d{4}-\d{2}-\d{2}')
- for arg in sys.argv[1:]:
- m = regex.search(arg)
- if m and m.group():
- dateFmtStr = arg
- sys.argv.pop(index)
- break
- index += 1
- main(dateFmtStr)
|