123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import datetime
- import logging
- from enum import Enum
- from typing import Callable, TYPE_CHECKING
- from apps import reportCache
- from apps.web.constant import Const
- from apps.web.device.timescale import FluentedEngine
- from apps.web.utils import set_or_incr_cache
- if TYPE_CHECKING:
- from apps.web.device.models import DeviceDict
- logger = logging.getLogger(__name__)
- def dated_key_tmpl(template):
- # type: (str)->Callable
- """
- 生成模板函数,预置日期选择
- :param template:
- :return:
- """
- def inner(**kwargs):
- date = datetime.datetime.now().strftime(Const.DATE_FMT) # type: str
- kwargs.update({'nowDate': date})
- return template.format(**kwargs)
- return inner
- def build_tmpl(tmpl):
- # type: (Enum)->Callable
- return dated_key_tmpl(tmpl.value)
- # Coin: 线下投币个数统计;
- devCoinTmpl = lambda devNo, nowDate: 'd_%s_%s_coin' % (devNo, nowDate)
- devno_from_key = lambda devKey: devKey.split('_')[1]
- groupCoinTmpl = lambda groupId, nowDate: 'g_%s_%s_coin' % (groupId, nowDate)
- groupid_from_key = lambda groupKey: groupKey.split('_')[1]
- ownerCoinTmpl = lambda ownerId, nowDate: 'o_%s_%s_coin' % (ownerId, nowDate)
- dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1]
- # CountNetPay: 在线消费(包括快捷支付消费以及在线卡消费)统计
- devCountNetPayTmpl = lambda devNo, nowDate: 'd_%s_%s_net_pay_count_coin' % (devNo, nowDate)
- groupCountNetPayTmpl = lambda groupId, nowDate: 'g_%s_%s_net_pay_count_coin' % (groupId, nowDate)
- ownerCountNetPayTmpl = lambda ownerId, nowDate: 'o_%s_%s_net_pay_count_coin' % (ownerId, nowDate)
- class Accounting(object):
- @staticmethod
- def recordNetPayCoinCount(devNo):
- now = datetime.datetime.now()
- nowDate = now.strftime("%Y-%m-%d")
- # 更新设备当天汇总投币
- set_or_incr_cache(reportCache, devCountNetPayTmpl(devNo, nowDate), 1)
- # 更新汇总当天地址汇总投币
- from apps.web.device.models import Device
- dev = Device.get_dev(devNo)
- if dev is None:
- return
- set_or_incr_cache(reportCache, groupCountNetPayTmpl(dev['groupId'], nowDate), 1)
- # 更新用户当天汇总投币
- set_or_incr_cache(reportCache, ownerCountNetPayTmpl(dev['ownerId'], nowDate), 1)
- @staticmethod
- def recordOfflineCoin(device, report_ts, coins, mode = 'uart', port = None):
- # type:(DeviceDict, int, int, str, str) -> None
- """
- 记录 投币数据到缓存 这个地方注意修正一下 将有金币统计的经销商存入一个地方
- :param device:
- :param report_ts:
- :param coins:
- :return:
- """
- coins = int(float(coins))
- if coins <= 0:
- return
- report_day = datetime.datetime.fromtimestamp(report_ts).strftime('%Y-%m-%d')
- device_key = devCoinTmpl(device['devNo'], report_day)
- # 将经销商的ID写入
- ownerId = device.ownerId
- from apps.web.device.models import OfflineReportDealers
- OfflineReportDealers.record_dealer(ownerId)
- FluentedEngine().in_put_coins_udp(devNo = device.devNo,
- ts = report_ts,
- coins = coins,
- mode = mode,
- port = port)
- try:
- set_or_incr_cache(reportCache, device_key, coins, 48 * 60 * 60)
- groupId = device.get('groupId', None)
- if not groupId:
- return
- set_or_incr_cache(reportCache, groupCoinTmpl(groupId, report_day), coins, 48 * 60 * 60)
- set_or_incr_cache(reportCache, ownerCoinTmpl(device['ownerId'], report_day), coins, 48 * 60 * 60)
- from apps.web.device.models import Group
- group = Group.get_group(groupId)
- if group and group['partnerDict']:
- for partnerId, partner in group['partnerDict'].items():
- set_or_incr_cache(reportCache, ownerCoinTmpl(partnerId, report_day), coins, 48 * 60 * 60)
- finally:
- from apps.web.report.models import DevReport
- DevReport.get_collection().update_one(
- filter = {'devNo': device.devNo, 'type': 'day', 'date': report_day},
- update = {'$inc': {'rpt.lineCoins': coins}},
- upsert = True)
- @staticmethod
- def syncOfflineCoin(device, report_day, today_coins): # type:(DeviceDict, str, int) -> None
- """
- 记录 投币数据到缓存 这个地方注意休整一下 将有金币统计的经销商存入一个地方
- """
- if today_coins <= 0:
- return
- device_key = devCoinTmpl(device.devNo, report_day)
- ownerId = device.ownerId
- from apps.web.device.models import OfflineReportDealers
- OfflineReportDealers.record_dealer(ownerId)
- count = 0
- while True:
- old, version_token = reportCache.gets(device_key)
- if old is None:
- old = 0
- else:
- old = int(float(old))
- if reportCache.cas(device_key, str(today_coins), version_token, timeout = 48 * 60 * 60):
- logger.debug('set coins succeed. diff = {}'.format((today_coins - old)))
- break
- count = count + 1
- logger.debug('memcached cas error<{}>'.format(count))
- if count > 3:
- return
- difference_coins = (today_coins - old)
- logger.debug('different coins is {}'.format(difference_coins))
- try:
- if difference_coins <= 0:
- logger.debug('is equal or less than db. ignore this coin report.')
- return
- groupId = device.get('groupId', None)
- if not groupId:
- return
- set_or_incr_cache(reportCache, groupCoinTmpl(groupId, report_day), difference_coins, 48 * 60 * 60)
- set_or_incr_cache(reportCache, ownerCoinTmpl(device['ownerId'], report_day), difference_coins, 48 * 60 * 60)
- from apps.web.device.models import Group
- group = Group.get_group(groupId)
- if group and group['partnerDict']:
- for partnerId, partner in group['partnerDict'].items():
- set_or_incr_cache(reportCache, ownerCoinTmpl(partnerId, report_day), difference_coins, 48 * 60 * 60)
- finally:
- # 有数据变更的时候, 把离线投币数更新到数据库. 始终以设备上报的为准
- from apps.web.report.models import DevReport
- if difference_coins != 0:
- DevReport.get_collection().update_one(
- filter = {'devNo': device.devNo, 'type': 'day', 'date': report_day},
- update = {'$set': {'rpt.lineCoins': today_coins}},
- upsert = True)
- @staticmethod
- def getOwnerIncome(ownerId, now):
- nowDate = now.strftime("%Y-%m-%d")
- newValue = {'lineCoins': 0, 'count': 0}
- key = ownerCoinTmpl(ownerId, nowDate)
- coin = reportCache.get(key, 0)
- if coin is not None:
- newValue.update({'lineCoins': int(float(coin))})
- key = ownerCountNetPayTmpl(ownerId, nowDate)
- count = reportCache.get(key, 0)
- if count is not None:
- newValue.update({'count': int(float(count))})
- return {k: v for k, v in newValue.iteritems()}
- @staticmethod
- def get_dealer_offline_coins(dealerId, date):
- return int(float(reportCache.get(ownerCoinTmpl(dealerId, date), 0)))
- @staticmethod
- def getGroupIncome(groupIdList, now):
- nowDate = now.strftime("%Y-%m-%d")
- keyCoinList, keyCountList = [], []
- for groupId in groupIdList:
- keyCoinList.append(groupCoinTmpl(groupId, nowDate))
- keyCountList.append(groupCountNetPayTmpl(groupId, nowDate))
- coinValueDict = reportCache.get_many(keyCoinList)
- countValueDict = reportCache.get_many(keyCountList)
- resultDict = {}
- for groupId in groupIdList:
- newValue = {'lineCoins': 0, 'count': 0}
- coin = int(float(coinValueDict.get(groupCoinTmpl(groupId, nowDate), 0)))
- count = int(float(countValueDict.get(groupCountNetPayTmpl(groupId, nowDate), 0)))
- newValue.update({'lineCoins': int(coin), 'count': count})
- resultDict.update({groupId: newValue})
- return resultDict
- @staticmethod
- def getDevIncome(devNoList, now):
- nowDate = now.strftime("%Y-%m-%d")
- keyCoinList, keyCountList = [], []
- for devNo in devNoList:
- keyCoinList.append(devCoinTmpl(devNo, nowDate))
- keyCountList.append(devCountNetPayTmpl(devNo, nowDate))
- coinValueDict = reportCache.get_many(keyCoinList)
- countValueDict = reportCache.get_many(keyCountList)
- resultDict = {}
- for devNo in devNoList:
- newValue = {'lineCoins': 0, 'count': 0}
- coin = int(float(coinValueDict.get(devCoinTmpl(devNo, nowDate), 0)))
- count = int(float(countValueDict.get(devCountNetPayTmpl(devNo, nowDate), 0)))
- newValue.update({'lineCoins': int(coin), 'count': count})
- resultDict.update({devNo: newValue})
- return resultDict
|