accounting.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import logging
  5. from enum import Enum
  6. from typing import Callable, TYPE_CHECKING
  7. from apps import reportCache
  8. from apps.web.constant import Const
  9. from apps.web.device.timescale import FluentedEngine
  10. from apps.web.utils import set_or_incr_cache
  11. if TYPE_CHECKING:
  12. from apps.web.device.models import DeviceDict
  13. logger = logging.getLogger(__name__)
  14. def dated_key_tmpl(template):
  15. # type: (str)->Callable
  16. """
  17. 生成模板函数,预置日期选择
  18. :param template:
  19. :return:
  20. """
  21. def inner(**kwargs):
  22. date = datetime.datetime.now().strftime(Const.DATE_FMT) # type: str
  23. kwargs.update({'nowDate': date})
  24. return template.format(**kwargs)
  25. return inner
  26. def build_tmpl(tmpl):
  27. # type: (Enum)->Callable
  28. return dated_key_tmpl(tmpl.value)
  29. # Coin: 线下投币个数统计;
  30. devCoinTmpl = lambda devNo, nowDate: 'd_%s_%s_coin' % (devNo, nowDate)
  31. devno_from_key = lambda devKey: devKey.split('_')[1]
  32. groupCoinTmpl = lambda groupId, nowDate: 'g_%s_%s_coin' % (groupId, nowDate)
  33. groupid_from_key = lambda groupKey: groupKey.split('_')[1]
  34. ownerCoinTmpl = lambda ownerId, nowDate: 'o_%s_%s_coin' % (ownerId, nowDate)
  35. dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1]
  36. # CountNetPay: 在线消费(包括快捷支付消费以及在线卡消费)统计
  37. devCountNetPayTmpl = lambda devNo, nowDate: 'd_%s_%s_net_pay_count_coin' % (devNo, nowDate)
  38. groupCountNetPayTmpl = lambda groupId, nowDate: 'g_%s_%s_net_pay_count_coin' % (groupId, nowDate)
  39. ownerCountNetPayTmpl = lambda ownerId, nowDate: 'o_%s_%s_net_pay_count_coin' % (ownerId, nowDate)
  40. class Accounting(object):
  41. @staticmethod
  42. def recordNetPayCoinCount(devNo):
  43. now = datetime.datetime.now()
  44. nowDate = now.strftime("%Y-%m-%d")
  45. # 更新设备当天汇总投币
  46. set_or_incr_cache(reportCache, devCountNetPayTmpl(devNo, nowDate), 1)
  47. # 更新汇总当天地址汇总投币
  48. from apps.web.device.models import Device
  49. dev = Device.get_dev(devNo)
  50. if dev is None:
  51. return
  52. set_or_incr_cache(reportCache, groupCountNetPayTmpl(dev['groupId'], nowDate), 1)
  53. # 更新用户当天汇总投币
  54. set_or_incr_cache(reportCache, ownerCountNetPayTmpl(dev['ownerId'], nowDate), 1)
  55. @staticmethod
  56. def recordOfflineCoin(device, report_ts, coins, mode = 'uart', port = None):
  57. # type:(DeviceDict, int, int, str, str) -> None
  58. """
  59. 记录 投币数据到缓存 这个地方注意修正一下 将有金币统计的经销商存入一个地方
  60. :param device:
  61. :param report_ts:
  62. :param coins:
  63. :return:
  64. """
  65. coins = int(float(coins))
  66. if coins <= 0:
  67. return
  68. report_day = datetime.datetime.fromtimestamp(report_ts).strftime('%Y-%m-%d')
  69. device_key = devCoinTmpl(device['devNo'], report_day)
  70. # 将经销商的ID写入
  71. ownerId = device.ownerId
  72. from apps.web.device.models import OfflineReportDealers
  73. OfflineReportDealers.record_dealer(ownerId)
  74. FluentedEngine().in_put_coins_udp(devNo = device.devNo,
  75. ts = report_ts,
  76. coins = coins,
  77. mode = mode,
  78. port = port)
  79. try:
  80. set_or_incr_cache(reportCache, device_key, coins, 48 * 60 * 60)
  81. groupId = device.get('groupId', None)
  82. if not groupId:
  83. return
  84. set_or_incr_cache(reportCache, groupCoinTmpl(groupId, report_day), coins, 48 * 60 * 60)
  85. set_or_incr_cache(reportCache, ownerCoinTmpl(device['ownerId'], report_day), coins, 48 * 60 * 60)
  86. from apps.web.device.models import Group
  87. group = Group.get_group(groupId)
  88. if group and group['partnerDict']:
  89. for partnerId, partner in group['partnerDict'].items():
  90. set_or_incr_cache(reportCache, ownerCoinTmpl(partnerId, report_day), coins, 48 * 60 * 60)
  91. finally:
  92. from apps.web.report.models import DevReport
  93. DevReport.get_collection().update_one(
  94. filter = {'devNo': device.devNo, 'type': 'day', 'date': report_day},
  95. update = {'$inc': {'rpt.lineCoins': coins}},
  96. upsert = True)
  97. @staticmethod
  98. def syncOfflineCoin(device, report_day, today_coins): # type:(DeviceDict, str, int) -> None
  99. """
  100. 记录 投币数据到缓存 这个地方注意休整一下 将有金币统计的经销商存入一个地方
  101. """
  102. if today_coins <= 0:
  103. return
  104. device_key = devCoinTmpl(device.devNo, report_day)
  105. ownerId = device.ownerId
  106. from apps.web.device.models import OfflineReportDealers
  107. OfflineReportDealers.record_dealer(ownerId)
  108. count = 0
  109. while True:
  110. old, version_token = reportCache.gets(device_key)
  111. if old is None:
  112. old = 0
  113. else:
  114. old = int(float(old))
  115. if reportCache.cas(device_key, str(today_coins), version_token, timeout = 48 * 60 * 60):
  116. logger.debug('set coins succeed. diff = {}'.format((today_coins - old)))
  117. break
  118. count = count + 1
  119. logger.debug('memcached cas error<{}>'.format(count))
  120. if count > 3:
  121. return
  122. difference_coins = (today_coins - old)
  123. logger.debug('different coins is {}'.format(difference_coins))
  124. try:
  125. if difference_coins <= 0:
  126. logger.debug('is equal or less than db. ignore this coin report.')
  127. return
  128. groupId = device.get('groupId', None)
  129. if not groupId:
  130. return
  131. set_or_incr_cache(reportCache, groupCoinTmpl(groupId, report_day), difference_coins, 48 * 60 * 60)
  132. set_or_incr_cache(reportCache, ownerCoinTmpl(device['ownerId'], report_day), difference_coins, 48 * 60 * 60)
  133. from apps.web.device.models import Group
  134. group = Group.get_group(groupId)
  135. if group and group['partnerDict']:
  136. for partnerId, partner in group['partnerDict'].items():
  137. set_or_incr_cache(reportCache, ownerCoinTmpl(partnerId, report_day), difference_coins, 48 * 60 * 60)
  138. finally:
  139. # 有数据变更的时候, 把离线投币数更新到数据库. 始终以设备上报的为准
  140. from apps.web.report.models import DevReport
  141. if difference_coins != 0:
  142. DevReport.get_collection().update_one(
  143. filter = {'devNo': device.devNo, 'type': 'day', 'date': report_day},
  144. update = {'$set': {'rpt.lineCoins': today_coins}},
  145. upsert = True)
  146. @staticmethod
  147. def getOwnerIncome(ownerId, now):
  148. nowDate = now.strftime("%Y-%m-%d")
  149. newValue = {'lineCoins': 0, 'count': 0}
  150. key = ownerCoinTmpl(ownerId, nowDate)
  151. coin = reportCache.get(key, 0)
  152. if coin is not None:
  153. newValue.update({'lineCoins': int(float(coin))})
  154. key = ownerCountNetPayTmpl(ownerId, nowDate)
  155. count = reportCache.get(key, 0)
  156. if count is not None:
  157. newValue.update({'count': int(float(count))})
  158. return {k: v for k, v in newValue.iteritems()}
  159. @staticmethod
  160. def get_dealer_offline_coins(dealerId, date):
  161. return int(float(reportCache.get(ownerCoinTmpl(dealerId, date), 0)))
  162. @staticmethod
  163. def getGroupIncome(groupIdList, now):
  164. nowDate = now.strftime("%Y-%m-%d")
  165. keyCoinList, keyCountList = [], []
  166. for groupId in groupIdList:
  167. keyCoinList.append(groupCoinTmpl(groupId, nowDate))
  168. keyCountList.append(groupCountNetPayTmpl(groupId, nowDate))
  169. coinValueDict = reportCache.get_many(keyCoinList)
  170. countValueDict = reportCache.get_many(keyCountList)
  171. resultDict = {}
  172. for groupId in groupIdList:
  173. newValue = {'lineCoins': 0, 'count': 0}
  174. coin = int(float(coinValueDict.get(groupCoinTmpl(groupId, nowDate), 0)))
  175. count = int(float(countValueDict.get(groupCountNetPayTmpl(groupId, nowDate), 0)))
  176. newValue.update({'lineCoins': int(coin), 'count': count})
  177. resultDict.update({groupId: newValue})
  178. return resultDict
  179. @staticmethod
  180. def getDevIncome(devNoList, now):
  181. nowDate = now.strftime("%Y-%m-%d")
  182. keyCoinList, keyCountList = [], []
  183. for devNo in devNoList:
  184. keyCoinList.append(devCoinTmpl(devNo, nowDate))
  185. keyCountList.append(devCountNetPayTmpl(devNo, nowDate))
  186. coinValueDict = reportCache.get_many(keyCoinList)
  187. countValueDict = reportCache.get_many(keyCountList)
  188. resultDict = {}
  189. for devNo in devNoList:
  190. newValue = {'lineCoins': 0, 'count': 0}
  191. coin = int(float(coinValueDict.get(devCoinTmpl(devNo, nowDate), 0)))
  192. count = int(float(countValueDict.get(devCountNetPayTmpl(devNo, nowDate), 0)))
  193. newValue.update({'lineCoins': int(coin), 'count': count})
  194. resultDict.update({devNo: newValue})
  195. return resultDict