# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging from enum import Enum from typing import Callable, TYPE_CHECKING from apps import reportCache from apps.web.constant import Const from apps.web.device.timescale import FluentedEngine from apps.web.utils import set_or_incr_cache if TYPE_CHECKING: from apps.web.device.models import DeviceDict logger = logging.getLogger(__name__) def dated_key_tmpl(template): # type: (str)->Callable """ 生成模板函数,预置日期选择 :param template: :return: """ def inner(**kwargs): date = datetime.datetime.now().strftime(Const.DATE_FMT) # type: str kwargs.update({'nowDate': date}) return template.format(**kwargs) return inner def build_tmpl(tmpl): # type: (Enum)->Callable return dated_key_tmpl(tmpl.value) # Coin: 线下投币个数统计; devCoinTmpl = lambda devNo, nowDate: 'd_%s_%s_coin' % (devNo, nowDate) devno_from_key = lambda devKey: devKey.split('_')[1] groupCoinTmpl = lambda groupId, nowDate: 'g_%s_%s_coin' % (groupId, nowDate) groupid_from_key = lambda groupKey: groupKey.split('_')[1] ownerCoinTmpl = lambda ownerId, nowDate: 'o_%s_%s_coin' % (ownerId, nowDate) dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1] # CountNetPay: 在线消费(包括快捷支付消费以及在线卡消费)统计 devCountNetPayTmpl = lambda devNo, nowDate: 'd_%s_%s_net_pay_count_coin' % (devNo, nowDate) groupCountNetPayTmpl = lambda groupId, nowDate: 'g_%s_%s_net_pay_count_coin' % (groupId, nowDate) ownerCountNetPayTmpl = lambda ownerId, nowDate: 'o_%s_%s_net_pay_count_coin' % (ownerId, nowDate) class Accounting(object): @staticmethod def recordNetPayCoinCount(devNo): now = datetime.datetime.now() nowDate = now.strftime("%Y-%m-%d") # 更新设备当天汇总投币 set_or_incr_cache(reportCache, devCountNetPayTmpl(devNo, nowDate), 1) # 更新汇总当天地址汇总投币 from apps.web.device.models import Device dev = Device.get_dev(devNo) if dev is None: return set_or_incr_cache(reportCache, groupCountNetPayTmpl(dev['groupId'], nowDate), 1) # 更新用户当天汇总投币 set_or_incr_cache(reportCache, ownerCountNetPayTmpl(dev['ownerId'], nowDate), 1) @staticmethod def recordOfflineCoin(device, report_ts, coins, mode = 'uart', port = None): # type:(DeviceDict, int, int, str, str) -> None """ 记录 投币数据到缓存 这个地方注意修正一下 将有金币统计的经销商存入一个地方 :param device: :param report_ts: :param coins: :return: """ coins = int(float(coins)) if coins <= 0: return report_day = datetime.datetime.fromtimestamp(report_ts).strftime('%Y-%m-%d') device_key = devCoinTmpl(device['devNo'], report_day) # 将经销商的ID写入 ownerId = device.ownerId from apps.web.device.models import OfflineReportDealers OfflineReportDealers.record_dealer(ownerId) FluentedEngine().in_put_coins_udp(devNo = device.devNo, ts = report_ts, coins = coins, mode = mode, port = port) try: set_or_incr_cache(reportCache, device_key, coins, 48 * 60 * 60) groupId = device.get('groupId', None) if not groupId: return set_or_incr_cache(reportCache, groupCoinTmpl(groupId, report_day), coins, 48 * 60 * 60) set_or_incr_cache(reportCache, ownerCoinTmpl(device['ownerId'], report_day), coins, 48 * 60 * 60) from apps.web.device.models import Group group = Group.get_group(groupId) if group and group['partnerDict']: for partnerId, partner in group['partnerDict'].items(): set_or_incr_cache(reportCache, ownerCoinTmpl(partnerId, report_day), coins, 48 * 60 * 60) finally: from apps.web.report.models import DevReport DevReport.get_collection().update_one( filter = {'devNo': device.devNo, 'type': 'day', 'date': report_day}, update = {'$inc': {'rpt.lineCoins': coins}}, upsert = True) @staticmethod def syncOfflineCoin(device, report_day, today_coins): # type:(DeviceDict, str, int) -> None """ 记录 投币数据到缓存 这个地方注意休整一下 将有金币统计的经销商存入一个地方 """ if today_coins <= 0: return device_key = devCoinTmpl(device.devNo, report_day) ownerId = device.ownerId from apps.web.device.models import OfflineReportDealers OfflineReportDealers.record_dealer(ownerId) count = 0 while True: old, version_token = reportCache.gets(device_key) if old is None: old = 0 else: old = int(float(old)) if reportCache.cas(device_key, str(today_coins), version_token, timeout = 48 * 60 * 60): logger.debug('set coins succeed. diff = {}'.format((today_coins - old))) break count = count + 1 logger.debug('memcached cas error<{}>'.format(count)) if count > 3: return difference_coins = (today_coins - old) logger.debug('different coins is {}'.format(difference_coins)) try: if difference_coins <= 0: logger.debug('is equal or less than db. ignore this coin report.') return groupId = device.get('groupId', None) if not groupId: return set_or_incr_cache(reportCache, groupCoinTmpl(groupId, report_day), difference_coins, 48 * 60 * 60) set_or_incr_cache(reportCache, ownerCoinTmpl(device['ownerId'], report_day), difference_coins, 48 * 60 * 60) from apps.web.device.models import Group group = Group.get_group(groupId) if group and group['partnerDict']: for partnerId, partner in group['partnerDict'].items(): set_or_incr_cache(reportCache, ownerCoinTmpl(partnerId, report_day), difference_coins, 48 * 60 * 60) finally: # 有数据变更的时候, 把离线投币数更新到数据库. 始终以设备上报的为准 from apps.web.report.models import DevReport if difference_coins != 0: DevReport.get_collection().update_one( filter = {'devNo': device.devNo, 'type': 'day', 'date': report_day}, update = {'$set': {'rpt.lineCoins': today_coins}}, upsert = True) @staticmethod def getOwnerIncome(ownerId, now): nowDate = now.strftime("%Y-%m-%d") newValue = {'lineCoins': 0, 'count': 0} key = ownerCoinTmpl(ownerId, nowDate) coin = reportCache.get(key, 0) if coin is not None: newValue.update({'lineCoins': int(float(coin))}) key = ownerCountNetPayTmpl(ownerId, nowDate) count = reportCache.get(key, 0) if count is not None: newValue.update({'count': int(float(count))}) return {k: v for k, v in newValue.iteritems()} @staticmethod def get_dealer_offline_coins(dealerId, date): return int(float(reportCache.get(ownerCoinTmpl(dealerId, date), 0))) @staticmethod def getGroupIncome(groupIdList, now): nowDate = now.strftime("%Y-%m-%d") keyCoinList, keyCountList = [], [] for groupId in groupIdList: keyCoinList.append(groupCoinTmpl(groupId, nowDate)) keyCountList.append(groupCountNetPayTmpl(groupId, nowDate)) coinValueDict = reportCache.get_many(keyCoinList) countValueDict = reportCache.get_many(keyCountList) resultDict = {} for groupId in groupIdList: newValue = {'lineCoins': 0, 'count': 0} coin = int(float(coinValueDict.get(groupCoinTmpl(groupId, nowDate), 0))) count = int(float(countValueDict.get(groupCountNetPayTmpl(groupId, nowDate), 0))) newValue.update({'lineCoins': int(coin), 'count': count}) resultDict.update({groupId: newValue}) return resultDict @staticmethod def getDevIncome(devNoList, now): nowDate = now.strftime("%Y-%m-%d") keyCoinList, keyCountList = [], [] for devNo in devNoList: keyCoinList.append(devCoinTmpl(devNo, nowDate)) keyCountList.append(devCountNetPayTmpl(devNo, nowDate)) coinValueDict = reportCache.get_many(keyCoinList) countValueDict = reportCache.get_many(keyCountList) resultDict = {} for devNo in devNoList: newValue = {'lineCoins': 0, 'count': 0} coin = int(float(coinValueDict.get(devCoinTmpl(devNo, nowDate), 0))) count = int(float(countValueDict.get(devCountNetPayTmpl(devNo, nowDate), 0))) newValue.update({'lineCoins': int(coin), 'count': count}) resultDict.update({devNo: newValue}) return resultDict