# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging import pandas as pd from collections import OrderedDict from bson import ObjectId from typing import Union, Iterable, TYPE_CHECKING from apps.web.common.models import WithdrawRecord from apps.web.common.proxy import ClientDealerIncomeModelProxy, ClientConsumeModelProxy, ClientRechargeModelProxy from apps.web.constant import USER_RECHARGE_TYPE from apps.web.core.exceptions import InvalidParameter from apps.web.dealer.define import DEALER_INCOME_SOURCE from apps.web.dealer.proxy import DealerIncomeProxy from apps.web.device.models import Group from apps.web.report.utils import translate_consumption from apps.web.user.models import ConsumeRecord, RechargeRecord, MyUser if TYPE_CHECKING: from apps.web.ad.models import AdRecord logger = logging.getLogger() def income_business_stat(dealerId, startTime, endTime, source = None, logicalCode = None, groupId = None): """ 经销商手机端的报表下载 :param startTime: 统计的开始时间 过滤数据 :param endTime: 统计的结束时间 过滤数据 :param source: 具体统计的项目 显示数据 :param dealerId: 统计的经销商 过滤数据 :param logicalCode: 统计的设备 过滤数据 :param groupId: 统计的组 过滤数据 :return: """ if dealerId is None: raise InvalidParameter('currently we only support exporting with explicit dealerId') query = {} if source: query = { 'source': source } if logicalCode: query.update({'logicalCode': logicalCode}) elif groupId: query.update({'groupId': ObjectId(groupId)}) else: groupIds = [ObjectId(groupId) for groupId in Group.get_group_ids_of_dealer_and_partner(dealerId)] query.update({'groupId__in': groupIds}) queryset = ClientDealerIncomeModelProxy( st = startTime, et = endTime ).all(**query) # type: DealerIncomeProxy records = [] for item in queryset: try: detail = item.ref_detail(dealerId) # type: Union[RechargeRecord, AdRecord] except Exception as e: logger.exception(e) continue if item.source in ( DEALER_INCOME_SOURCE.RECHARGE, DEALER_INCOME_SOURCE.RECHARGE_CARD, DEALER_INCOME_SOURCE.RECHARGE_VIRTUAL_CARD, DEALER_INCOME_SOURCE.RECHARGE_MONTHLY_PACKAGE, DEALER_INCOME_SOURCE.REDPACK, DEALER_INCOME_SOURCE.REFUND_CASH, DEALER_INCOME_SOURCE.AUTO_SIM ): recharge_record = detail record = OrderedDict([ (u"订单号", recharge_record['outTradeNo']), (u'支付方式', RechargeRecord.gatewayText(recharge_record['gateway'])), (u'支付类型', RechargeRecord.viaText(recharge_record['via'], recharge_record['isQuickPay'])), (u"用户昵称", recharge_record['userNickname']), (u"设备编码", recharge_record['logicalCode']), (u"设备类型", recharge_record['devTypeName']), (u"分得金额", str(recharge_record['amount'])), (u"总金额", str(item.totalAmount)), (u"购币", str(recharge_record['coins'])), (u"地址", recharge_record['groupName']), (u"时间", item.to_datetime_str(item.dateTimeAdded)) ]) user = MyUser.objects(groupId = recharge_record['groupId'], openId = recharge_record['openId']).first() if not user: logger.warning('user not exists.'.format( recharge_record['openId'], recharge_record['groupId'])) elif user.phone: record.update({u"电话": str(user.phone)}) records.append(record) return records def consume_business_stat(dealerId, startTime, endTime, source = None, logicalCode = None, groupId = None): """ 经销商手机端的报表下载 :param startTime: 统计的开始时间 过滤数据 :param endTime: 统计的结束时间 过滤数据 :param source: 具体统计的项目 显示数据 :param dealerId: 统计的经销商 过滤数据 :param logicalCode: 统计的设备 过滤数据 :param groupId: 统计的组 过滤数据 :return: """ if dealerId is None: raise InvalidParameter('currently we only support exporting with explicit dealerId') from apps.web.agent.models import Agent from apps.web.dealer.models import Dealer dealer = Dealer.objects(id = dealerId).first() # type: Dealer agent = Agent.objects.get(id = dealer.agentId) # type: Agent query = {} if source: query.update({'aggInfo__{}'.format(source): {'$exists': True}}) if logicalCode: query.update({'logicalCode': logicalCode}) elif groupId: query.update({'groupId': groupId}) else: groupIds = Group.get_group_ids_of_dealer_and_partner(dealerId) query.update({'groupId__in': groupIds}) queryset = ClientConsumeModelProxy( st = startTime, et = endTime ).all(**query) # type: Iterable[ConsumeRecord] records = [] for item in queryset: record = OrderedDict([ (u"订单号", item.orderNo), (u"用户昵称", item.nickname), (u"设备编码", item.logicalCode), (u"设备类型", item.dev_type_name), (u"金额", str(item.money)), (u"地址", item.groupName), (u"创建时间", item.to_datetime_str(item.dateTimeAdded)), (u'端口', str(item.used_port) if item.used_port != -1 else ''), (u'停止时间', item.to_datetime_str(item.device_finished_time)), (u'结束原因', item.servicedInfo.get('reason', '')) ]) if source: for agg_info in translate_consumption(item.aggInfo, agent.hide_consume_kinds_dealer): if source == agg_info['source']: record[u'{}({})'.format(agg_info['name'], agg_info['unit'])] = '%s' % (agg_info['value'],) records.append(record) return records def monthly_business_stat(dealerId, startTime, endTime): """ 经销商手机端的报表下载 :param startTime: 统计的开始时间 过滤数据 :param endTime: 统计的结束时间 过滤数据 :param dealerId: 统计的经销商 过滤数据 :return: """ groupIds = [ObjectId(groupId) for groupId in Group.get_group_ids_of_dealer_and_partner(dealerId)] queryset = ClientDealerIncomeModelProxy( st = startTime, et = endTime ).all(**{ 'groupId__in': groupIds }) # type: DealerIncomeProxy querysetPandasDatas = [] records = [] for item in queryset: try: detail = item.ref_detail(dealerId) # type: Union[RechargeRecord, AdRecord] except Exception as e: logger.exception(e) continue recharge_record = detail querysetPandasData = {"时间": item.to_datetime_str(item.dateTimeAdded).split(" ")[0], "分得金额": float(str(recharge_record['amount'])), "总金额": float(str(item.totalAmount)), "购币": float(str(recharge_record['coins'])), "用户昵称": recharge_record['userNickname'], "订单号": recharge_record['outTradeNo']} querysetPandasDatas.append(querysetPandasData) records_q = [] if querysetPandasDatas: for dailyStat in pd.DataFrame(querysetPandasDatas).groupby("时间"): record_q = {"日期": dailyStat[0], "分得金额": round(dailyStat[1]['分得金额'].sum(), 2), "收入": round(dailyStat[1]['总金额'].sum(), 2), "购币": round(dailyStat[1]['购币'].sum(), 2), "人次": str(len(dailyStat[1])), "总用户": dailyStat[1]['用户昵称'].nunique(), } records_q.append(record_q) df_q = pd.DataFrame(records_q).set_index("日期") else: df_q = pd.DataFrame([]) consumesetPandasDatas = [] consumeset = ClientConsumeModelProxy( st = startTime, et = endTime ).all(**{ 'groupId__in': groupIds, 'isNormal': True }) for item in consumeset: consumesetPandasData = { "时间": item.to_datetime_str(item.dateTimeAdded).split(" ")[0], "消费": float(str(item.coin)), } consumesetPandasDatas.append(consumesetPandasData) records_c = [] if consumesetPandasDatas: for dailyStat in pd.DataFrame(consumesetPandasDatas).groupby("时间"): record_c = {"日期": dailyStat[0], "消费": round(dailyStat[1]['消费'].sum(), 2) } records_c.append(record_c) df_c = pd.DataFrame(records_c).set_index("日期") else: df_c = pd.DataFrame() rechargesetPandasDatas = [] rechargeset = ClientRechargeModelProxy( st = startTime, et = endTime ).all(**{ 'groupId__in': groupIds, 'via__in': [USER_RECHARGE_TYPE.RECHARGE, USER_RECHARGE_TYPE.RECHARGE_CARD, USER_RECHARGE_TYPE.RECHARGE_MONTHLY_PACKAGE, USER_RECHARGE_TYPE.RECHARGE_VIRTUAL_CARD], 'isQuickPay': False }) for item in rechargeset: if dealerId == item.ownerId: consumesetPandasData = { "时间": item.to_datetime_str(item.dateTimeAdded).split(" ")[0], "充值": float(str(item.money)), } rechargesetPandasDatas.append(consumesetPandasData) else: if item.groupId in Group.get_group_ids_of_partner(dealerId): consumesetPandasData = { "时间": item.to_datetime_str(item.dateTimeAdded).split(" ")[0], "充值": float(str(item.money)), } rechargesetPandasDatas.append(consumesetPandasData) records_r = [] if rechargesetPandasDatas: for dailyStat in pd.DataFrame(rechargesetPandasDatas).groupby("时间"): record_r = {"日期": dailyStat[0], "充值": round(dailyStat[1]['充值'].sum(), 2) } records_r.append(record_r) df_r = pd.DataFrame(records_r).set_index("日期") else: df_r = pd.DataFrame() withDrawPandasDatas = [] withDrawset = WithdrawRecord.objects.filter(ownerId = dealerId, postTime__lt = endTime, postTime__gte = startTime) if withDrawset: for item in withDrawset: withDrawPandasData = { "时间": item.to_datetime_str(item.postTime).split(" ")[0], "提现": float(str(item.amount)) } withDrawPandasDatas.append(withDrawPandasData) records_w = [] if withDrawPandasDatas: for dailyStat in pd.DataFrame(withDrawPandasDatas).groupby("时间"): record_w = {"日期": dailyStat[0], "提现": round(dailyStat[1]['提现'].sum(), 2) } records_w.append(record_w) df_w = pd.DataFrame(records_w).set_index("日期") else: df_w = pd.DataFrame() dailyStats_all = df_q.join(df_r, how = "outer").join(df_c, how = "outer").join(df_w, how = "outer").fillna( 0).groupby("日期") for dailyStat in dailyStats_all: record = OrderedDict([(u"日期", dailyStat[0]), (u"收入", str(round(dailyStat[1].get('收入', pd.Series()).sum(), 2))), (u"分得金额", str(round(dailyStat[1].get('分得金额', pd.Series()).sum(), 2))), (u"购币", str(round(dailyStat[1].get('购币', pd.Series()).sum(), 2))), (u"充值", round(dailyStat[1].get('充值', pd.Series()).sum(), 2),), (u"消费", round(dailyStat[1].get('消费', pd.Series()).sum(), 2),), (u"提现", round(dailyStat[1].get("提现", pd.Series()).sum(), 2),), (u"人次", str(len(dailyStat[1]))), (u"总用户", dailyStat[1]['总用户'].nunique())]) records.append(record) return records