accessors.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import logging
  5. import pandas as pd
  6. from collections import OrderedDict
  7. from bson import ObjectId
  8. from typing import Union, Iterable, TYPE_CHECKING
  9. from apps.web.common.models import WithdrawRecord
  10. from apps.web.common.proxy import ClientDealerIncomeModelProxy, ClientConsumeModelProxy, ClientRechargeModelProxy
  11. from apps.web.constant import USER_RECHARGE_TYPE
  12. from apps.web.core.exceptions import InvalidParameter
  13. from apps.web.dealer.define import DEALER_INCOME_SOURCE
  14. from apps.web.dealer.proxy import DealerIncomeProxy
  15. from apps.web.device.models import Group
  16. from apps.web.report.utils import translate_consumption
  17. from apps.web.user.models import ConsumeRecord, RechargeRecord, MyUser
  18. if TYPE_CHECKING:
  19. from apps.web.ad.models import AdRecord
  20. logger = logging.getLogger()
  21. def income_business_stat(dealerId, startTime, endTime, source = None, logicalCode = None, groupId = None):
  22. """
  23. 经销商手机端的报表下载
  24. :param startTime: 统计的开始时间 过滤数据
  25. :param endTime: 统计的结束时间 过滤数据
  26. :param source: 具体统计的项目 显示数据
  27. :param dealerId: 统计的经销商 过滤数据
  28. :param logicalCode: 统计的设备 过滤数据
  29. :param groupId: 统计的组 过滤数据
  30. :return:
  31. """
  32. if dealerId is None:
  33. raise InvalidParameter('currently we only support exporting with explicit dealerId')
  34. query = {}
  35. if source:
  36. query = {
  37. 'source': source
  38. }
  39. if logicalCode:
  40. query.update({'logicalCode': logicalCode})
  41. elif groupId:
  42. query.update({'groupId': ObjectId(groupId)})
  43. else:
  44. groupIds = [ObjectId(groupId) for groupId in Group.get_group_ids_of_dealer_and_partner(dealerId)]
  45. query.update({'groupId__in': groupIds})
  46. queryset = ClientDealerIncomeModelProxy(
  47. st = startTime,
  48. et = endTime
  49. ).all(**query) # type: DealerIncomeProxy
  50. records = []
  51. for item in queryset:
  52. try:
  53. detail = item.ref_detail(dealerId) # type: Union[RechargeRecord, AdRecord]
  54. except Exception as e:
  55. logger.exception(e)
  56. continue
  57. if item.source in (
  58. DEALER_INCOME_SOURCE.RECHARGE,
  59. DEALER_INCOME_SOURCE.RECHARGE_CARD,
  60. DEALER_INCOME_SOURCE.RECHARGE_VIRTUAL_CARD,
  61. DEALER_INCOME_SOURCE.RECHARGE_MONTHLY_PACKAGE,
  62. DEALER_INCOME_SOURCE.REDPACK,
  63. DEALER_INCOME_SOURCE.REFUND_CASH,
  64. DEALER_INCOME_SOURCE.AUTO_SIM
  65. ):
  66. recharge_record = detail
  67. record = OrderedDict([
  68. (u"订单号", recharge_record['outTradeNo']),
  69. (u'支付方式', RechargeRecord.gatewayText(recharge_record['gateway'])),
  70. (u'支付类型', RechargeRecord.viaText(recharge_record['via'], recharge_record['isQuickPay'])),
  71. (u"用户昵称", recharge_record['userNickname']),
  72. (u"设备编码", recharge_record['logicalCode']),
  73. (u"设备类型", recharge_record['devTypeName']),
  74. (u"分得金额", str(recharge_record['amount'])),
  75. (u"总金额", str(item.totalAmount)),
  76. (u"购币", str(recharge_record['coins'])),
  77. (u"地址", recharge_record['groupName']),
  78. (u"时间", item.to_datetime_str(item.dateTimeAdded))
  79. ])
  80. user = MyUser.objects(groupId = recharge_record['groupId'], openId = recharge_record['openId']).first()
  81. if not user:
  82. logger.warning('user<openId={}, groupId={}> not exists.'.format(
  83. recharge_record['openId'], recharge_record['groupId']))
  84. elif user.phone:
  85. record.update({u"电话": str(user.phone)})
  86. records.append(record)
  87. return records
  88. def consume_business_stat(dealerId, startTime, endTime, source = None, logicalCode = None, groupId = None):
  89. """
  90. 经销商手机端的报表下载
  91. :param startTime: 统计的开始时间 过滤数据
  92. :param endTime: 统计的结束时间 过滤数据
  93. :param source: 具体统计的项目 显示数据
  94. :param dealerId: 统计的经销商 过滤数据
  95. :param logicalCode: 统计的设备 过滤数据
  96. :param groupId: 统计的组 过滤数据
  97. :return:
  98. """
  99. if dealerId is None:
  100. raise InvalidParameter('currently we only support exporting with explicit dealerId')
  101. from apps.web.agent.models import Agent
  102. from apps.web.dealer.models import Dealer
  103. dealer = Dealer.objects(id = dealerId).first() # type: Dealer
  104. agent = Agent.objects.get(id = dealer.agentId) # type: Agent
  105. query = {}
  106. if source:
  107. query.update({'aggInfo__{}'.format(source): {'$exists': True}})
  108. if logicalCode:
  109. query.update({'logicalCode': logicalCode})
  110. elif groupId:
  111. query.update({'groupId': groupId})
  112. else:
  113. groupIds = Group.get_group_ids_of_dealer_and_partner(dealerId)
  114. query.update({'groupId__in': groupIds})
  115. queryset = ClientConsumeModelProxy(
  116. st = startTime,
  117. et = endTime
  118. ).all(**query) # type: Iterable[ConsumeRecord]
  119. records = []
  120. for item in queryset:
  121. record = OrderedDict([
  122. (u"订单号", item.orderNo),
  123. (u"用户昵称", item.nickname),
  124. (u"设备编码", item.logicalCode),
  125. (u"设备类型", item.dev_type_name),
  126. (u"金额", str(item.money)),
  127. (u"地址", item.groupName),
  128. (u"创建时间", item.to_datetime_str(item.dateTimeAdded)),
  129. (u'端口', str(item.used_port) if item.used_port != -1 else ''),
  130. (u'停止时间', item.to_datetime_str(item.device_finished_time)),
  131. (u'结束原因', item.servicedInfo.get('reason', ''))
  132. ])
  133. if source:
  134. for agg_info in translate_consumption(item.aggInfo, agent.hide_consume_kinds_dealer):
  135. if source == agg_info['source']:
  136. record[u'{}({})'.format(agg_info['name'], agg_info['unit'])] = '%s' % (agg_info['value'],)
  137. records.append(record)
  138. return records
  139. def monthly_business_stat(dealerId, startTime, endTime):
  140. """
  141. 经销商手机端的报表下载
  142. :param startTime: 统计的开始时间 过滤数据
  143. :param endTime: 统计的结束时间 过滤数据
  144. :param dealerId: 统计的经销商 过滤数据
  145. :return:
  146. """
  147. groupIds = [ObjectId(groupId) for groupId in Group.get_group_ids_of_dealer_and_partner(dealerId)]
  148. queryset = ClientDealerIncomeModelProxy(
  149. st = startTime,
  150. et = endTime
  151. ).all(**{
  152. 'groupId__in': groupIds
  153. }) # type: DealerIncomeProxy
  154. querysetPandasDatas = []
  155. records = []
  156. for item in queryset:
  157. try:
  158. detail = item.ref_detail(dealerId) # type: Union[RechargeRecord, AdRecord]
  159. except Exception as e:
  160. logger.exception(e)
  161. continue
  162. recharge_record = detail
  163. querysetPandasData = {"时间": item.to_datetime_str(item.dateTimeAdded).split(" ")[0],
  164. "分得金额": float(str(recharge_record['amount'])),
  165. "总金额": float(str(item.totalAmount)),
  166. "购币": float(str(recharge_record['coins'])),
  167. "用户昵称": recharge_record['userNickname'],
  168. "订单号": recharge_record['outTradeNo']}
  169. querysetPandasDatas.append(querysetPandasData)
  170. records_q = []
  171. if querysetPandasDatas:
  172. for dailyStat in pd.DataFrame(querysetPandasDatas).groupby("时间"):
  173. record_q = {"日期": dailyStat[0],
  174. "分得金额": round(dailyStat[1]['分得金额'].sum(), 2),
  175. "收入": round(dailyStat[1]['总金额'].sum(), 2),
  176. "购币": round(dailyStat[1]['购币'].sum(), 2),
  177. "人次": str(len(dailyStat[1])),
  178. "总用户": dailyStat[1]['用户昵称'].nunique(),
  179. }
  180. records_q.append(record_q)
  181. df_q = pd.DataFrame(records_q).set_index("日期")
  182. else:
  183. df_q = pd.DataFrame([])
  184. consumesetPandasDatas = []
  185. consumeset = ClientConsumeModelProxy(
  186. st = startTime,
  187. et = endTime
  188. ).all(**{
  189. 'groupId__in': groupIds,
  190. 'isNormal': True
  191. })
  192. for item in consumeset:
  193. consumesetPandasData = {
  194. "时间": item.to_datetime_str(item.dateTimeAdded).split(" ")[0],
  195. "消费": float(str(item.coin)),
  196. }
  197. consumesetPandasDatas.append(consumesetPandasData)
  198. records_c = []
  199. if consumesetPandasDatas:
  200. for dailyStat in pd.DataFrame(consumesetPandasDatas).groupby("时间"):
  201. record_c = {"日期": dailyStat[0],
  202. "消费": round(dailyStat[1]['消费'].sum(), 2)
  203. }
  204. records_c.append(record_c)
  205. df_c = pd.DataFrame(records_c).set_index("日期")
  206. else:
  207. df_c = pd.DataFrame()
  208. rechargesetPandasDatas = []
  209. rechargeset = ClientRechargeModelProxy(
  210. st = startTime,
  211. et = endTime
  212. ).all(**{
  213. 'groupId__in': groupIds,
  214. 'via__in': [USER_RECHARGE_TYPE.RECHARGE, USER_RECHARGE_TYPE.RECHARGE_CARD,
  215. USER_RECHARGE_TYPE.RECHARGE_MONTHLY_PACKAGE, USER_RECHARGE_TYPE.RECHARGE_VIRTUAL_CARD],
  216. 'isQuickPay': False
  217. })
  218. for item in rechargeset:
  219. if dealerId == item.ownerId:
  220. consumesetPandasData = {
  221. "时间": item.to_datetime_str(item.dateTimeAdded).split(" ")[0],
  222. "充值": float(str(item.money)),
  223. }
  224. rechargesetPandasDatas.append(consumesetPandasData)
  225. else:
  226. if item.groupId in Group.get_group_ids_of_partner(dealerId):
  227. consumesetPandasData = {
  228. "时间": item.to_datetime_str(item.dateTimeAdded).split(" ")[0],
  229. "充值": float(str(item.money)),
  230. }
  231. rechargesetPandasDatas.append(consumesetPandasData)
  232. records_r = []
  233. if rechargesetPandasDatas:
  234. for dailyStat in pd.DataFrame(rechargesetPandasDatas).groupby("时间"):
  235. record_r = {"日期": dailyStat[0],
  236. "充值": round(dailyStat[1]['充值'].sum(), 2)
  237. }
  238. records_r.append(record_r)
  239. df_r = pd.DataFrame(records_r).set_index("日期")
  240. else:
  241. df_r = pd.DataFrame()
  242. withDrawPandasDatas = []
  243. withDrawset = WithdrawRecord.objects.filter(ownerId = dealerId,
  244. postTime__lt = endTime,
  245. postTime__gte = startTime)
  246. if withDrawset:
  247. for item in withDrawset:
  248. withDrawPandasData = {
  249. "时间": item.to_datetime_str(item.postTime).split(" ")[0],
  250. "提现": float(str(item.amount))
  251. }
  252. withDrawPandasDatas.append(withDrawPandasData)
  253. records_w = []
  254. if withDrawPandasDatas:
  255. for dailyStat in pd.DataFrame(withDrawPandasDatas).groupby("时间"):
  256. record_w = {"日期": dailyStat[0],
  257. "提现": round(dailyStat[1]['提现'].sum(), 2)
  258. }
  259. records_w.append(record_w)
  260. df_w = pd.DataFrame(records_w).set_index("日期")
  261. else:
  262. df_w = pd.DataFrame()
  263. dailyStats_all = df_q.join(df_r, how = "outer").join(df_c, how = "outer").join(df_w, how = "outer").fillna(
  264. 0).groupby("日期")
  265. for dailyStat in dailyStats_all:
  266. record = OrderedDict([(u"日期", dailyStat[0]),
  267. (u"收入", str(round(dailyStat[1].get('收入', pd.Series()).sum(), 2))),
  268. (u"分得金额", str(round(dailyStat[1].get('分得金额', pd.Series()).sum(), 2))),
  269. (u"购币", str(round(dailyStat[1].get('购币', pd.Series()).sum(), 2))),
  270. (u"充值", round(dailyStat[1].get('充值', pd.Series()).sum(), 2),),
  271. (u"消费", round(dailyStat[1].get('消费', pd.Series()).sum(), 2),),
  272. (u"提现", round(dailyStat[1].get("提现", pd.Series()).sum(), 2),),
  273. (u"人次", str(len(dailyStat[1]))),
  274. (u"总用户", dailyStat[1]['总用户'].nunique())])
  275. records.append(record)
  276. return records