|
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import datetime
- from collections import OrderedDict, namedtuple
- from collections import defaultdict
- from threading import Thread
- import arrow
- import pandas
- import simplejson as json
- from bson.objectid import ObjectId
- from celery.utils.log import get_task_logger
- from django.conf import settings
- from typing import Dict, Optional, TYPE_CHECKING, List
- from apilib.monetary import RMB, Percent
- from apilib.quantity import Quantity
- from apilib.utils_datetime import timestamp_to_dt
- from apilib.utils_datetime import to_datetime
- from apps.web.agent.models import Agent
- from apps.web.api.models import APIStartDeviceRecord
- from apps.web.common.models import OperatorLog, DealerDoneReportRecord
- from apps.web.common.proxy import ClientRechargeModelProxy, ClientConsumeModelProxy
- from apps.web.common.transaction import WITHDRAW_PAY_TYPE
- from apps.web.common.transaction.pay import PayRecordPoller, PayManager
- from apps.web.constant import Const, RechargeRecordVia, USER_RECHARGE_TYPE, AppPlatformType
- from apps.web.constant import DEALER_CONSUMPTION_AGG_KIND_TRANSLATION, MONTH_DATE_KEY
- from apps.web.core.exceptions import InsufficientFundsError
- from apps.web.core.helpers import ActionDeviceBuilder
- from apps.web.core.messages.sms import SMSSender
- from apps.web.core.utils import generate_excel_report, gernerate_excel_report_for_sheet
- from apps.web.dealer.accessors import income_business_stat, consume_business_stat, monthly_business_stat
- from apps.web.dealer.define import DEALER_INCOME_TYPE, DEALER_INCOME_SOURCE
- from apps.web.dealer.models import Dealer, DealerRechargeRecord, UpscoreRecord, UpCardScoreRecord
- from apps.web.dealer.proxy import DealerIncomeProxy, record_income_proxy, DealerGroupStats
- from apps.web.dealer.utils import create_dealer_sim_charge_order
- from apps.web.dealer.transaction import post_pay, post_sim_recharge
- from apps.web.dealer.withdraw import DealerWithdrawService
- from apps.web.device.models import DevStatusRecord, Device, Group, DeviceDict
- from apps.web.device.timescale import SignalManager
- from apps.web.helpers import get_wechat_manager_mp_proxy, get_platform_wallet_pay_gateway
- from apps.web.report.ledger import Ledger, LedgerConsumeOrder
- from apps.web.report.models import DealerReport, DealerDailyStat, \
- DeviceDailyStat, GroupDailyStat, DealerMonthlyStat
- from apps.web.report.models import GroupReport
- from apps.web.report.utils import cum_stats, translate_consumption
- from apps.web.user.models import ConsumeRecord, MyUser, RechargeRecord, Card
- if TYPE_CHECKING:
- from apps.web.core.adapter.base import SmartBox
- logger = get_task_logger(__name__)
- def report_feedback_to_dealer_via_wechat(dealerId, nickname, msg, feedbackTime):
- """
- TODO
- :return:
- """
- if not settings.SEND_DEALER_WECHAT_PUSH_MESSAGE:
- logger.info('SEND_DEALER_WECHAT_PUSH_MESSAGE is turned off, you are probably in dev env')
- else:
- dealer = Dealer.objects(id = str(dealerId)).first() # type: Optional[Dealer]
- if not dealer:
- return {'info': 'dealer does not exist, id=%s' % (dealerId,)}
- if not dealer.managerialOpenId:
- return {'info': 'no manager open Id for dealer %s' % (dealer.username,)}
- wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer)
- return wechat_mp_proxy.notify(dealer.managerialOpenId, 'feedback', **{
- 'title': msg,
- 'nickname': nickname,
- 'feedbackTime': feedbackTime
- })
- def report_daily_report_to_dealer_via_wechat(dealer = None):
- # type:(Optional[Dealer])->None
- """
- 每日收益次日推送(9点)
- `模版微信公众号后台ID` : OPENTM401155654
- :return:
- """
- sent = set([])
- def _report(dealer):
- # type:(Dealer)->dict
- logger.info('sending daily income report to %r' % (dealer,))
- if not dealer.managerialOpenId:
- return {'info': 'no managerial open Id for %r' % (dealer,)}
- agent = Agent.objects(id = dealer.agentId).first()
- if not agent:
- return {'info': 'no agent found by %r' % (dealer,)}
- if not agent.is_in_domain:
- return {'info': 'is not my agent. agentId = %s' % (str(dealer.agentId),)}
- yesterday = datetime.datetime.now() - datetime.timedelta(days = 1)
- offline_report = DealerReport.objects(ownerId = str(dealer.id),
- type = 'day',
- date = yesterday.strftime("%Y-%m-%d")).first() # type: DealerReport
- # getattr 加上默认参数 防止获取不到报错
- stat = getattr(DealerDailyStat.objects(
- date = yesterday.date().strftime(Const.DATE_FMT),
- dealerId = ObjectId(dealer.id)).only('daily').first(), 'daily', None)
- if not stat:
- stat = {}
- if offline_report is None:
- offline_coins = 0
- else:
- if offline_report.rpt is None:
- offline_coins = 0
- else:
- offline_coins = offline_report.rpt.get('lineCoins')
- income_map = {
- 'offline_coins': offline_coins,
- 'online_income': RMB(stat.get('income', {}).get(RechargeRecordVia.Balance, 0)) +
- RMB(stat.get('income', {}).get(RechargeRecordVia.Redpack, 0)) +
- RMB(stat.get('income', {}).get(RechargeRecordVia.Card, 0)) +
- RMB(stat.get('income', {}).get(RechargeRecordVia.VirtualCard, 0)),
- 'refund_cash': RMB(stat.get('income', {}).get(RechargeRecordVia.RefundCash, 0))
- }
- if abs(income_map['refund_cash']) > RMB(0):
- report = u'线下投币(%s)次 在线支付收入(%s)元 现金退款(%s)元' % (
- income_map['offline_coins'], income_map['online_income'], abs(income_map['refund_cash']))
- else:
- report = u'线下投币(%s)次 在线支付收入(%s)元' % (
- income_map['offline_coins'], income_map['online_income'])
- wechat_mp_proxy = get_wechat_manager_mp_proxy(agent)
- return wechat_mp_proxy.notify(dealer.managerialOpenId, 'daily_income', **{
- 'title': u'您的昨日收益报表',
- 'reportTime': yesterday.strftime('%Y-%m-%d'),
- 'report': report
- })
- try:
- if dealer is None:
- last_done_list = []
- doneRecord = DealerDoneReportRecord.objects(
- reportName = 'report_daily_report_to_dealer_via_wechat',
- reportDay = datetime.datetime.now().strftime('%Y-%m-%d')).only('dealerIds').first()
- if doneRecord and doneRecord.dealerIds:
- last_done_list = doneRecord.dealerIds
- dealers = Dealer.objects(dailyIncomeReportPushSwitch = True).only(
- 'id', 'managerialOpenId', 'agentId').batch_size(2000).timeout(False)
- for dealer in dealers:
- if str(dealer.id) in last_done_list:
- continue
- try:
- _report(dealer)
- except Exception as e:
- logger.exception(e)
- else:
- sent.add(str(dealer.id))
- else:
- _report(dealer)
- finally:
- if sent:
- DealerDoneReportRecord.update_done_list(
- report_name = 'report_daily_report_to_dealer_via_wechat',
- report_day = datetime.datetime.now().strftime('%Y-%m-%d'),
- done_list = list(sent))
- def report_new_payment_to_dealer_via_wechat(record):
- # type:(RechargeRecord)->dict
- """每笔订单完成后推送
- 模版元格式
- {{first.DATA}}
- 顾客:{{keyword1.DATA}}
- 收益:{{keyword2.DATA}}
- {{remark.DATA}}
- `模版微信公众号后台ID` : OPENTM207568114
- 您的顾客消费啦
- 顾客:小明
- 收益:10.00元
- 你的顾客消费啦,为您带来10.00元的收益哦
- :param record:
- """
- if not settings.SEND_DEALER_WECHAT_PUSH_MESSAGE:
- logger.info('SEND_DEALER_WECHAT_PUSH_MESSAGE is turned off, you are probably in dev env')
- else:
- logger.info('sending new payment order record=%s' % (json.dumps(record),))
- dealer = Dealer.objects(id = str(record['ownerId'])).first() # type: Optional[Dealer]
- if not dealer:
- return {'info': 'dealer does not exist, ownerId=%s' % (record['ownerId'],)}
- if not dealer.newUserPaymentOrderPushSwitch:
- return {'info': 'dealer(%s) hasn\'t switched up yet' % (record['ownerId'],)}
- if not dealer.managerialOpenId:
- return {'info': 'no manager open Id for dealer %s' % (dealer.username,)}
- agent = Agent.objects(id = dealer.agentId).first() # type: Optional[Agent]
- if not agent:
- return {'info': 'No agent found by id=%s' % (str(dealer.agentId),)}
- _translate = {
- 'wechat': u'微信',
- 'alipay': u'支付宝',
- 'card': u'卡充值'
- }
- from apps.web.dealer.validation import newPaymentNotifyPayload
- payload = newPaymentNotifyPayload(record)
- _info = _translate.get(payload['gateway'])
- _translate_via = {'redpack': '平台红包'}
- if payload.get('via') and _translate_via.get(payload['via']):
- _info = _translate_via[payload['via']]
- wechat_mp_proxy = get_wechat_manager_mp_proxy(agent)
- return wechat_mp_proxy.notify(dealer.managerialOpenId, 'new_payment_order', **{
- 'title': u'您有新顾客充值啦',
- 'customer': u'昵称(%s)来自(%s)' % (payload['nickname'], _info),
- 'income': u'%s元' % payload['money']
- })
- def report_device_abnormally_offline_to_dealer_via_wechat(dealer = None, logicalCode = None, offTime = 0):
- """
- 默认为2小时推送设备离线信息
- `模版`
- {{first.DATA}}
- 设备名称:{{keyword1.DATA}}
- 时间:{{keyword2.DATA}}
- {{remark.DATA}}
- `模版微信公众号后台ID` OPENTM401433345
- `示例`
- 掌聚网能设备离线通知
- 设备名称:机房UPS
- 时间:2016-02-19 13:12:45
- 请立即登录查看详情
- :return:
- """
- notified = defaultdict(list)
- def _report(dealer, logicalCode, offTime):
- if not dealer.managerialOpenId:
- return {'info': 'no managerial open Id for dealer %s' % (dealer.phone,)}
- agent = Agent.objects(id = dealer.agentId).first() # type: Agent
- if not agent:
- return {'info': 'No agent found by id=%s' % (str(dealer.agentId),)}
- if not agent.is_in_domain:
- return {'info': 'is not my agent. agentId = %s' % (str(dealer.agentId),)}
- if not agent.supports('notify_dealer_device_offline'):
- return {'info': 'agent does not support offline device push'}
- #: 记录操作结果
- notified[str(dealer.id)].append((logicalCode, offTime))
- wechat_mp_proxy = get_wechat_manager_mp_proxy(agent)
- return wechat_mp_proxy.notify(dealer.managerialOpenId, 'abnormal_device_offline', **{
- 'title': '您有一台设备已离线!',
- 'device': u'逻辑编码:%s' % logicalCode,
- 'notifyTime': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- })
- if dealer is None:
- dealers = list(Dealer.objects(offlineNotify = True))
- for dealer in dealers: # type: Dealer
- #: 暂时只通知设备所属经销商而不通知合伙人
- now = datetime.datetime.now()
- devices = dealer.get_own_devices()
- for device in devices: # type: DeviceDict
- if not device.online:
- if device.offTime > 0:
- offset = (now - datetime.datetime.fromtimestamp(device.offTime // 1000)).total_seconds()
- #: 如果设备离线超过24小时,则不予推送
- if 24 > (offset // 3600) >= 2:
- logger.info(
- 'device(logicalCode=%s) is offline(offTime=%s, offset=%s), sending to dealer(phone=%s)'
- % (device['logicalCode'], device['offTime'], offset, dealer.username,))
- _report(dealer, device['logicalCode'], device['offTime'])
- else:
- _report(dealer, logicalCode, offTime)
- return notified
- def report_offline_device_to_dealer_via_wechat():
- notified = defaultdict(list)
- dealers = Dealer.objects(offlineNotifySwitch = True)
- for _ in dealers:
- try:
- logicalCodes = []
- now = datetime.datetime.now()
- devices = _.get_own_devices()
- if len(devices) == 0:
- continue
- for d in devices:
- if d.online == 0:
- offset = (now - datetime.datetime.fromtimestamp(d.offTime // 1000)).total_seconds()
- if _.offlineNotifyTime != '':
- if (offset // 3600) > int(_.offlineNotifyTime):
- logicalCodes.append(d['logicalCode'])
- if len(logicalCodes) == 0:
- continue
- if not _.managerialOpenId:
- return {'info': 'no managerial open Id for dealer %s' % (_.phone,)}
- agent = Agent.objects(id = _.agentId).first() # type: Agent
- if not agent:
- return {'info': 'No agent found by id=%s' % (str(_.agentId),)}
- if not agent.is_in_domain:
- return {'info': 'is not my agent. agentId = %s' % (str(_.agentId),)}
- wechat_mp_proxy = get_wechat_manager_mp_proxy(agent)
- logicalCodesStr = ''
- for l in logicalCodes:
- logicalCodesStr += l + ' '
- wechat_mp_proxy.notify(_.managerialOpenId, 'abnormal_device_offline', **{
- 'title': '您有%s台设备已离线!' % len(logicalCodes),
- 'device': u'逻辑编码:%s' % logicalCodesStr,
- 'notifyTime': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- })
- except Exception as e:
- logicalCodesStr = ''
- pass
- notified[str(_.id)].append(logicalCodesStr)
- return notified
- def report_to_dealer_via_wechat(openId, dealerId, templateName, **kwargs):
- dealer = Dealer.objects(id = str(dealerId)).first()
- if not dealer:
- return {'info': 'dealer does not exist, id=%s' % (dealerId,)}
- wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer)
- return wechat_mp_proxy.notify(openId, templateName, **kwargs)
- def send_SIM_expired_messages(send_type):
- """
- 给经销商发送设备SIM卡过期短信
- :return:
- """
- logger.debug('send type is {}'.format(send_type))
- if settings.DEBUG_CELERY_TASK:
- expire_notify_devices = [{'logicalCode': 'DUMMY', 'ownerId': '5b9ae99ad89a177846459999'}]
- else:
- expire_notify_devices = Device.get_sim_expire_notify_devices()
- logger.info('%s devices will be expired' % len(expire_notify_devices))
- from cytoolz import groupby
- dealer_device_map = groupby('ownerId', expire_notify_devices)
- dealer_map = {
- dealer['_id']: dealer
- for dealer in Dealer.get_collection()
- .find({'_id': {'$in': [ObjectId(id_) for id_ in dealer_device_map.keys()]}},
- {'username': 1, 'agentId': 1, 'smsVendor': 1})
- } # type: Dict[ObjectId, dict]
- agent_product_map = {
- agent['_id']: agent.get('productName', '').encode('utf-8')
- for agent in Agent.get_collection()
- .find({'_id': {'$in': [ObjectId(_['agentId']) for _ in dealer_map.values()]}},
- {'productName': 1})
- } # type: Dict[ObjectId, str]
- if not len(dealer_map):
- logger.info('there are no devices are expired at the moment')
- return {'result': 'no devices are expired'}
- else:
- logger.info('we will notify %d dealer, devices are %s' % (len(dealer_map), dealer_device_map))
- if not (settings.DEBUG_CELERY_TASK or settings.CHECK_DEVICE_SMS_EXPIRE):
- logger.info('CHECK_DEVICE_SMS_EXPIRE is turned off, you are probably in dev/testing environment')
- else:
- for id_, dealer in dealer_map.iteritems():
- logger.debug('sending SIM expired message to Dealer(phone=%s)' % (dealer['username'],))
- if send_type == 'sms':
- sms_sender = SMSSender()
- response = sms_sender.send(phoneNumber = dealer['username'],
- templateName = "SMS_NOTIFY_EXPIRED_DEVICE_TEMPLATEID",
- msg = u'您有设备即将在近期过期'.encode('utf-8'),
- productName = agent_product_map[ObjectId(dealer['agentId'])])
- if not response['result']:
- logger.error(
- 'send sim expired sms to dealer failed(phone={}), error={}'.format(
- dealer['username'], response['msg']))
- else:
- logger.info('send sim expired sms to dealer(phone={}) success'.format(dealer['username']))
- if send_type == 'wechat':
- dealer = Dealer.objects(id = str(id_)).first() # type: Dealer
- if not dealer:
- logger.debug('dealer<id={}> is not exist.'.format(id_))
- continue
- wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer)
- wechat_mp_proxy.notify(dealer.managerialOpenId, 'sim_expire_notify', **{
- 'title': u'亲爱的{}用户,您有设备本月流量卡过期,请及时充值。已经充值请忽略本消息。'.format(
- agent_product_map[ObjectId(dealer['agentId'])]),
- 'num': u'{}台(本月)'.format(len(dealer_device_map.get(str(id_)))),
- 'type': u'流量卡到期',
- 'time': arrow.now().replace(day = Const.SIM_CARD_FORBIDDEN_DAY).format('YYYY-MM-DD')
- })
- logger.info(
- 'sent sim expired message to dealer(phone={})'.format(dealer['username']))
- def daily_check_auto_withdraw():
- """
- 每日检查有自动提现资格经销商,该功能不倡导用
- :return:
- """
- if not settings.SUPPORT_DEALER_AUTO_WITHDRAW:
- logger.info('dealer auto withdraw is not supported, you are probably in testing/dev env')
- dealers = Dealer.objects(autoWithdrawAllowable = True)
- logger.info('we are going to pay dealers(%s)' % dealers)
- for dealer in dealers:
- for income_type in DEALER_INCOME_TYPE.choices():
- for source_key, balance_field in dealer.balance_dict(income_type):
- balance = balance_field.balance
- if balance > RMB(10):
- withdraw_service = DealerWithdrawService(payee = dealer,
- income_type = income_type,
- amount = balance,
- pay_type = WITHDRAW_PAY_TYPE.WECHAT,
- bank_card_no = None)
- result = withdraw_service.execute(source_key = source_key, recurrent = True)
- logger.info('%r withdrawn %s, result = %s' % (dealer, balance, result))
- # 写入的数据模型
- ExcelSheet = namedtuple("Sheet", ["data", "name"])
- def generate_business_stats_report_by_dealer(filePath, queryAttrs):
- """
- 给经销商生成旗下的经营数据报表
- :return:
- """
- logger.debug(
- 'generate_business_stats_report_by_dealer, filePath is <{}>, queryAttrs is <{}>'.format(filePath, queryAttrs))
- try:
- kind = queryAttrs.pop('kind', '')
- # 经销商加入了一个新的特性,该特性支持按月统计账单
- if kind != "monthly_bill":
- # 前台传递过来的数据是xxxx xx xx 23:59:59,相当于是下一天的闭区间,目前方法所需要的数据是当天的闭区间,所以把时间减一天
- queryAttrs['endTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__lte')) - datetime.timedelta(
- hours = 23, minutes = 59, seconds = 59)
- queryAttrs['startTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__gte'))
- if kind == "income":
- records = income_business_stat(**queryAttrs)
- groupByGroup = list()
- if records:
- df = pandas.DataFrame(records)
- # 地址分类
- for _groupName, _v in df.groupby(u"地址"):
- tempList = list()
- tempList.append((u"", _groupName))
- for _source, __v in _v.groupby(u"支付类型"):
- tempList.append((_source, __v[u"分得金额"].astype(float).sum()))
- tempList.append((u"分得总金额", _v[u"分得金额"].astype(float).sum()))
- tempList.append((u"总金额", _v[u"总金额"].astype(float).sum()))
- groupByGroup.append(OrderedDict(tempList))
- sheets = list()
- sheets.append(ExcelSheet(data = records, name = u"数据总览"))
- sheets.append(ExcelSheet(data = groupByGroup, name = u"地址汇总"))
- gernerate_excel_report_for_sheet(filePath, sheets)
- elif kind == "consume":
- records = consume_business_stat(**queryAttrs)
- generate_excel_report(filePath, records)
- else:
- logger.error("invalid kind <{}> to generate_business_stats_report_by_dealer")
- else:
- queryAttrs['endTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__lte')) - datetime.timedelta(
- hours = 23, minutes = 59, seconds = 59)
- queryAttrs['startTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__gte'))
- records = monthly_business_stat(**queryAttrs)
- generate_excel_report(filePath, records)
- except Exception as e:
- logger.exception(e)
- # 计算经销商订单的一些统计数据,包括计算出日活设备占总设备的比例,以及各个套餐的使用数目
- def calc_dealer_order_data_stats(dealerId, startTime, endTime):
- logger.info('calc_dealer_order_data_stats dealerId=%s,startTime=%s,endTime=%s' % (dealerId, startTime, endTime))
- group_id_list = Group.get_group_ids_of_dealer(dealerId)
- rcds = ConsumeRecord.get_collection().find(
- {
- # 'ownerId': dealerId,
- 'groupId': {
- '$in': group_id_list
- },
- 'dateTimeAdded': {
- '$gte': startTime, '$lte': endTime
- },
- 'isNormal': True
- })
- # 计算套餐统计,以及活跃过的设备清单
- dealerDevNoSet = set()
- groupDict = {}
- dealerPackageDict = {}
- for rcd in rcds:
- dealerDevNoSet.add(rcd['devNo'])
- groupId = rcd['groupId']
- coin = str(rcd['coin'])
- if not groupDict.has_key(groupId):
- groupDict[groupId] = {'devNoSet': {rcd['devNo']}}
- groupDict[groupId]['package'] = {coin: 1} # type:Dict
- else:
- groupDict[groupId]['devNoSet'].add(rcd['devNo'])
- if coin in groupDict[groupId]['package']:
- groupDict[groupId]['package'][coin] += 1
- else:
- groupDict[groupId]['package'] = {coin: 1}
- if not dealerPackageDict.has_key(coin):
- dealerPackageDict[coin] = 1
- else:
- dealerPackageDict[coin] += 1
- # 计算实际每组的设备使用率
- allDevCount = 0
- for groupId, value in groupDict.items():
- devCount = len(Device.get_devNos_by_group([groupId]))
- groupActivedDevCount = len(value['devNoSet'])
- value['activedDevRatio'] = int(float(groupActivedDevCount) / devCount * 100) if devCount != 0 else 0
- allDevCount += devCount
- value.pop('devNoSet')
- activedDevRatio = int(float(len(dealerDevNoSet)) / allDevCount * 100) if allDevCount != 0 else 0
- return {'activedDevRatio': activedDevRatio, 'groupStats': groupDict, 'package': dealerPackageDict}
- def get_status_from_event(event):
- if event['signal'] > 0:
- return 'online'
- elif event['usage'] > 0:
- return 'offline_busy'
- else:
- return 'offline'
- # 因为设备状态记录依赖于事件的触发,有可能某个离线一直离线,不上线;或者设备一直在线,不曾离线,这样devStatusRecord就不会生成,
- # 但是统计数据又依赖这个数据,所以就需要定时触发,做一次切割,主动生成一次这个数据
- def make_dev_status_rcd(devNo, splitTime):
- def make_status_between_node(startNode, endNode, valueList = []):
- if startNode['time'] == endNode['time']:
- return
- if valueList:
- changeNode = valueList[-1]
- else:
- changeNode = startNode
- if changeNode['signal'] == 0 and endNode['signal'] > 0: # 离线->在线
- status = get_status_from_event(changeNode)
- elif changeNode['signal'] == 0 and endNode['signal'] == 0: # 离线->离线
- status = get_status_from_event(changeNode)
- elif changeNode['signal'] > 0 and endNode['signal'] == 0: # 在线->离线
- status = get_status_from_event(endNode)
- elif changeNode['signal'] > 0 and endNode['signal'] > 0: # 在线->在线
- if (to_datetime(endNode['time']) - to_datetime(changeNode['time'])).total_seconds() > 10 * 60:
- status = 'offline' if endNode['usage'] == 0 else 'offline_busy'
- else:
- status = 'online'
- # 记录数据
- if not valueList:
- valueList = [startNode, endNode]
- startTime = to_datetime(startNode['time'])
- endTime = to_datetime(endNode['time'])
- DevStatusRecord.get_collection().update({'devNo': devNo, 'startTime': startTime, 'endTime': endTime}, {'$set': {
- 'devNo': devNo,
- 'status': status,
- 'startTime': startTime,
- 'endTime': endTime,
- 'duration': (endTime - startTime).total_seconds(),
- 'valueDict': {'signalUsage': valueList}
- }}, upsert = True)
- logger.info('make_dev_status_rcd devNo=%s,splittime%s' % (devNo, splitTime))
- # 如果没有没有事件,说明最近从来没有上过线,也没有收到过离线报文,统一按照离线处理。(前提是,初始化的时候,cache中的event需要初始化一个数据进来)
- # 从来没有上过线,或者某种状态维持了1周,然后cache中丢失了,才会出现event 为None
- preDay = (splitTime - datetime.timedelta(days = 1)).strftime(Const.DATE_FMT)
- pre2Day = (splitTime - datetime.timedelta(days = 2)).strftime(Const.DATE_FMT)
- thisToday = splitTime.strftime(Const.DATE_FMT)
- startTime = to_datetime(preDay + ' 00:00:00')
- endTime = to_datetime(preDay + ' 23:59:59')
- startTimePre2Day = to_datetime(pre2Day + ' 00:00:00') # 用于多取一些数据,方便判断第一个起始时间到第一个节点时间之间的状态
- endTimePre2Day = to_datetime(preDay + ' 23:59:59')
- startTimeToday = to_datetime(thisToday + ' 00:00:00') # 用于多取一些数据,方便判断最后一个时间节点到结束时间之间的状态
- endTimeToday = to_datetime(thisToday + ' 23:59:59')
- events = SignalManager.instence().get(devNo, startTime, startTime)
- lastEvents = SignalManager.instence().get(devNo, startTimePre2Day, endTimePre2Day)
- nextEvents = SignalManager.instence().get(devNo, startTimeToday, endTimeToday)
- # 这种情况要么是掉线时间过久,memcache中的key都被清除掉了;要么是没有初始化进来包括设备离线增加进来,未上线或者监控脚本才启动,没有初始化(第一次启动的时候,数据略有偏差,可以忽略)。
- if not events:
- make_status_between_node(
- {'time': startTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0, 'isSplit': 1},
- {'time': endTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0, 'isSplit': 1})
- else: # 如果上一次事件的时间是昨天以前,就需要切割
- count = len(events)
- preStatusEvent = events[0]
- valueList = []
- # 首先处理第一个节点
- if lastEvents and lastEvents[-1]: #
- make_status_between_node(lastEvents[-1], preStatusEvent)
- else: # 没有监听的情况,默认认为离线
- make_status_between_node({'time': startTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0},
- preStatusEvent)
- for ii in range(count - 1):
- curEvent = events[ii]
- nextEvent = events[ii + 1]
- valueList.append(curEvent)
- if (curEvent['signal'] > 0 and nextEvent['signal'] > 0): # 两个节点都在线,需要判断两个节点的时间间隔,如果超过10分钟,算作离线
- if (to_datetime(nextEvent['time']) - to_datetime(curEvent['time'])).total_seconds() > 10 * 60:
- make_status_between_node(preStatusEvent, curEvent, valueList) # 在线状态
- make_status_between_node(curEvent, {'time': nextEvent['time'], 'signal': 0, 'usage': 0}) # 离线状态
- valueList = []
- preStatusEvent = nextEvent
- else:
- if ii == (count - 2): # 如果最后一个,应该主动生成
- make_status_between_node(preStatusEvent, nextEvent, valueList)
- valueList = []
- preStatusEvent = nextEvent
- elif curEvent['signal'] > 0 and nextEvent['signal'] == 0: # 在线-》离线,应该记录一个在线,一个离线
- make_status_between_node(preStatusEvent, curEvent, valueList) # 在线状态
- make_status_between_node(curEvent, nextEvent) # 离线状态
- valueList = []
- preStatusEvent = nextEvent
- elif curEvent['signal'] == 0 and nextEvent['signal'] > 0: # 离线->在线,产生一条离线处理
- # 记录离线
- make_status_between_node(preStatusEvent, nextEvent, valueList)
- valueList = []
- preStatusEvent = nextEvent
- elif curEvent['signal'] == 0 and nextEvent['signal'] == 0:
- if ii == count - 2: # 如果最后一个,应该主动生成
- make_status_between_node(preStatusEvent, nextEvent, valueList)
- # 收尾最后一个事件,直接产生一条记录
- if nextEvents and nextEvents[0]:
- make_status_between_node(events[-1], nextEvents[0], valueList) # 如果跑出来,能够碰到心跳,
- else:
- make_status_between_node(events[-1], {'time': endTime, 'signal': 0, 'usage': 0}, valueList)
- logger.info('finish make_dev_status_rcd devNo=%s' % devNo)
- # 统计设备状态数据
- def calc_dev_status_stat(devNo, startTime, endTime):
- logger.info('calc dev status devNo=%s,startTime=%s,endTime=%s' % (devNo, startTime, endTime))
- def count_time(startTime, endTime, rcd, totalDict):
- status = rcd['status']
- duration = (endTime - startTime).total_seconds()
- if status == 'online':
- totalDict['totalOnlineTime'] += duration
- elif status == 'offline':
- totalDict['totalOfflineTime'] += duration
- elif status == 'offline_busy':
- totalDict['totalOfflineBusyTime'] += duration
- else:
- pass
- queryStartTime = startTime - datetime.timedelta(days = 1)
- queryEndTime = endTime + datetime.timedelta(days = 1)
- rcds = DevStatusRecord.objects.filter(devNo = devNo, startTime__gte = queryStartTime, endTime__lte = queryEndTime)
- totalDict = {'totalOfflineTime': 0, 'totalOfflineBusyTime': 0, 'totalOnlineTime': 0, 'offlineCount': 0,
- 'offlineBusyCount': 0, 'onlineBusy': 0}
- # 统计的状态的累计之和,包括总的离线时间、在线时间、离线繁忙时间等
- statusList = []
- for rcd in rcds:
- if endTime <= rcd.startTime or startTime >= rcd.endTime: # 两头的不合理,直接去掉
- continue
- elif startTime >= rcd.startTime and endTime <= rcd.endTime:
- count_time(startTime, endTime, rcd, totalDict)
- elif startTime <= rcd.startTime:
- if endTime >= rcd.startTime and endTime <= rcd.endTime:
- count_time(rcd.startTime, endTime, rcd, totalDict)
- elif endTime >= endTime:
- count_time(rcd.startTime, rcd.endTime, rcd, totalDict)
- else:
- continue
- elif startTime >= rcd.startTime and startTime <= rcd.endTime:
- if endTime <= rcd.endTime:
- count_time(startTime, endTime, rcd, totalDict)
- else:
- count_time(startTime, rcd.endTime, rcd, totalDict)
- else:
- continue
- statusList.extend(rcd.valueDict['signalUsage'])
- # 计算每个小时统计数据。如果在线变离线、离线变在线,空闲变繁忙,繁忙变空闲。记录的是变化情况
- hourDict = {}
- ii = 0
- while ii < len(statusList) - 1:
- preStatus = statusList[ii]
- status = statusList[ii + 1]
- ii += 1
- # 如果是人为劈开的,不作为正式数据,直接过滤掉
- if status.has_key('isSplit'):
- continue
- # 不在时间范围内的,过滤掉
- statusTime = to_datetime(status['time'])
- if statusTime < startTime or statusTime > endTime:
- continue
- # 如果状态没有变化,也直接过滤掉
- if (preStatus['signal'] > 0 and status['signal'] > 0 and preStatus['usage'] == status['usage']) \
- or (preStatus['signal'] == 0 and status['signal'] == 0):
- continue
- hour = statusTime.hour
- if not hourDict.has_key(hour):
- hourDict[hour] = {'offlineCount': 0, 'offlineBusyCount': 0, 'onlineBusy': 0, 'usageSum': 0, 'usageCount': 0}
- if status['usage'] == 0 and status['signal'] == 0:
- hourDict[hour]['offlineCount'] += 1
- totalDict['offlineCount'] += 1
- elif status['usage'] > 0 and status['signal'] == 0:
- hourDict[hour]['offlineBusyCount'] += 1
- hourDict[hour]['usageSum'] += status['usage']
- hourDict[hour]['usageCount'] += 1
- totalDict['offlineBusyCount'] += 1
- elif status['usage'] > 0 and status['signal'] > 0:
- hourDict[hour]['onlineBusy'] += 1
- hourDict[hour]['usageSum'] += status['usage']
- hourDict[hour]['usageCount'] += 1
- totalDict['onlineBusy'] += 1
- else:
- pass
- return {'totalOfflineTime': totalDict['totalOfflineTime'],
- 'totalOfflineBusyTime': totalDict['totalOfflineBusyTime'],
- 'totalOnlineTime': totalDict['totalOnlineTime'],
- 'offlineCount': totalDict['offlineCount'],
- 'offlineBusyCount': totalDict['offlineBusyCount'],
- 'onlineBusy': totalDict['onlineBusy'],
- 'hourly': hourDict}
- def calc_one_dealer_and_insert_into_db(dealerId, nowTime):
- logger.info('start calc dealerId=%s info and insert into db now......' % dealerId)
- def build_daily_update(valueMap):
- rv = defaultdict(dict)
- for kind, amount in valueMap.iteritems():
- if kind == 'package':
- for coin, value in amount.items():
- rv['$inc']['other.package.{coin}'.format(coin = int(int(float(coin)) * 100))] = int(value)
- elif kind == 'hourly':
- for hour, value in amount.items():
- for hourKind, hourValue in value.items():
- rv['$inc']['other.hourly.{hour}.{hourKind}'.format(hour = hour, hourKind = hourKind)] = int(
- hourValue)
- else:
- rv['$inc']['other.{kind}'.format(kind = kind)] = int(amount)
- if rv.has_key('activedDevRatio'):
- rv.pop('activedDevRatio')
- if rv.has_key('peakUsage'):
- rv.pop('peakUsage')
- if rv.has_key('devCount'):
- rv.pop('devCount')
- rv.update({'$set': {'devCount': valueMap.get('devCount', 0),
- 'activedDevRatio': valueMap.get('activedDevRatio', 0),
- 'peakUsage': valueMap.get('peakUsage', 0)}})
- return rv
- # 月度数据,主要是月度平均活跃度,需要特殊计算
- def build_month_update(valueMap, preDay, monthactivedDevRatio):
- rv = defaultdict(dict)
- for kind, amount in valueMap.iteritems():
- if kind == 'package':
- for coin, value in amount.items():
- rv['$inc']['other.package.{coin}'.format(coin = int(int(float(coin)) * 100))] = int(value)
- elif kind == 'hourly':
- for hour, value in amount.items():
- for hourKind, hourValue in value.items():
- rv['$inc']['other.hourly.{hour}.{hourKind}'.format(hour = hour, hourKind = hourKind)] = int(
- hourValue)
- else:
- rv['$inc']['other.{kind}'.format(kind = kind)] = int(amount)
- if rv.has_key('activedDevRatio'): # 目前这个算法是有缺陷的,因为每天的设备数目是变化的,准确的计算应该是当月进行计算才对
- rv.pop('activedDevRatio')
- if rv.has_key('devCount'):
- rv.pop('devCount')
- activedDevRatio = int(((preDay - 1) * monthactivedDevRatio + valueMap['activedDevRatio']) / preDay)
- rv.update({'$set': {'devCount': valueMap.get('devCount', 0),
- 'activedDevRatio': activedDevRatio}})
- return rv
- preDaytime = nowTime - datetime.timedelta(days = 1)
- preDay = preDaytime.day
- preStrDay = preDaytime.strftime(Const.DATE_FMT)
- preMonth = MONTH_DATE_KEY.format(year = preDaytime.year, month = preDaytime.month)
- startTime = to_datetime(preStrDay + ' 00:00:00')
- endTime = to_datetime(preStrDay + ' 23:59:59')
- groupIds = Group.get_group_ids_of_dealer(str(dealerId))
- devNoList = Device.get_devNos_by_group(groupIds)
- devStatDict = {}
- dealerHourlyDict = {}
- for devNo in devNoList:
- make_dev_status_rcd(devNo, nowTime)
- statInfo = calc_dev_status_stat(devNo, startTime, endTime)
- devStatDict[devNo] = statInfo
- statInfo = calc_dealer_order_data_stats(str(dealerId), startTime, endTime)
- groupStatDict = statInfo['groupStats']
- activedDevRatio = statInfo['activedDevRatio']
- dealerPackage = statInfo['package']
- totalOfflineTime, totalOfflineBusyTime, totalOnlineTime, offlineCount, offlineBusyCount = 0, 0, 0, 0, 0
- dealerDevCount = 0
- # 进行汇总计算数据,从设备-》组-》经销商
- groups = Group.get_groups_by_group_ids(groupIds)
- for groupId, group in groups.items():
- devNos = Device.get_devNos_by_group([groupId])
- groupDevCount = 0
- if not groupStatDict.has_key(groupId):
- groupStatDict[groupId] = {
- 'totalOfflineTime': 0,
- 'totalOfflineBusyTime': 0,
- 'totalOnlineTime': 0,
- 'offlineCount': 0,
- 'offlineBusyCount': 0,
- 'activedDevRatio': 0,
- 'devCount': 0,
- 'package': {},
- 'hourly': {}
- }
- else:
- groupStatDict[groupId].update({
- 'totalOfflineTime': 0,
- 'totalOfflineBusyTime': 0,
- 'totalOnlineTime': 0,
- 'offlineCount': 0,
- 'offlineBusyCount': 0,
- 'hourly': {}
- })
- for devNo in devNos:
- groupDevCount += 1
- dealerDevCount += 1
- devInfo = devStatDict.get(devNo, {})
- devTotalOfflineTime = devInfo.get('totalOfflineTime', 0)
- devTotalOfflineBusyTime = devInfo.get('totalOfflineBusyTime', 0)
- devTotalOnlineTime = devInfo.get('totalOnlineTime', 0)
- devOfflineCount = devInfo.get('offlineCount', 0)
- devOfflineBusyCount = devInfo.get('offlineBusyCount', 0)
- hourlyDict = devInfo.get('hourly', {})
- groupStatDict[groupId]['totalOfflineTime'] += devTotalOfflineTime
- groupStatDict[groupId]['totalOfflineBusyTime'] += devTotalOfflineBusyTime
- groupStatDict[groupId]['totalOnlineTime'] += devTotalOnlineTime
- groupStatDict[groupId]['offlineCount'] += devOfflineCount
- groupStatDict[groupId]['offlineBusyCount'] += devOfflineBusyCount
- # 往组和经销商内统计数据,补充小时级别的统计数据
- for hour, value in hourlyDict.items():
- if hour not in groupStatDict[groupId]['hourly']:
- groupStatDict[groupId]['hourly'] = {hour: value}
- else:
- groupStatDict[groupId]['hourly'][hour]['offlineCount'] += hourlyDict[hour]['offlineCount']
- groupStatDict[groupId]['hourly'][hour]['offlineBusyCount'] += hourlyDict[hour]['offlineBusyCount']
- groupStatDict[groupId]['hourly'][hour]['usageSum'] += hourlyDict[hour]['usageSum']
- groupStatDict[groupId]['hourly'][hour]['usageCount'] += hourlyDict[hour]['usageCount']
- # 经销商级别的汇总
- if not dealerHourlyDict.has_key(hour):
- dealerHourlyDict[hour] = value
- else:
- dealerHourlyDict[hour]['offlineCount'] += hourlyDict[hour]['offlineCount']
- dealerHourlyDict[hour]['offlineBusyCount'] += hourlyDict[hour]['offlineBusyCount']
- dealerHourlyDict[hour]['usageSum'] += hourlyDict[hour]['usageSum']
- dealerHourlyDict[hour]['usageCount'] += hourlyDict[hour]['usageCount']
- totalOfflineTime += devTotalOfflineTime
- totalOfflineBusyTime += devTotalOfflineBusyTime
- totalOnlineTime += devTotalOnlineTime
- offlineCount += devOfflineCount
- offlineBusyCount += devOfflineBusyCount
- # 更新地址下的设备总数
- groupStatDict[groupId].update({'devCount': groupDevCount})
- # 找出地址下的最大繁忙度
- groupHourly = groupStatDict[groupId].get('hourly', {})
- peakUsage = 0
- for hour, info in groupHourly.items():
- usage = int(info['usageSum'] / info['usageCount']) if info['usageCount'] > 0 else 0
- if usage >= peakUsage:
- peakUsage = usage
- groupStatDict[groupId]['peakUsage'] = peakUsage
- dealerInfo = {
- 'totalOfflineTime': totalOfflineTime,
- 'totalOfflineBusyTime': totalOfflineBusyTime,
- 'totalOnlineTime': totalOnlineTime,
- 'offlineCount': offlineCount,
- 'offlineBusyCount': offlineBusyCount,
- 'devCount': dealerDevCount,
- 'activedDevRatio': activedDevRatio,
- 'package': dealerPackage,
- 'hourly': dealerHourlyDict
- }
- # 将数据分别存入设备、地址、经销商的日报表中.设备活跃度,需要好好想下
- for groupId in groupIds:
- for devNo in devNos:
- logicalCode = Device.get_logicalCode_by_devNo(devNo)
- devInfo = devStatDict.get(devNo, {})
- DeviceDailyStat.get_collection().update({'logicalCode': logicalCode, 'date': preStrDay},
- build_daily_update(devInfo), multi = True, upsert = True)
- groupInfo = groupStatDict.get(groupId, {})
- GroupDailyStat.get_collection().update({'groupId': ObjectId(groupId), 'date': preStrDay},
- build_daily_update(groupInfo), multi = True, upsert = True)
- DealerDailyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preStrDay},
- build_daily_update(dealerInfo), multi = True, upsert = True)
- try:
- mouthActivedDevRatio = \
- DealerDailyStat.get_collection().find({'dealerId': ObjectId(dealerId), 'date': preStrDay},
- {'activedDevRatio': 1})[0]['activedDevRatio']
- except Exception, e:
- mouthActivedDevRatio = 0
- DealerMonthlyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preMonth},
- build_month_update(dealerInfo, preDay, mouthActivedDevRatio),
- multi = True, upsert = True)
- logger.info('finished calc dealerId=%s info and insert into db now' % dealerId)
- # 统计本月新增加的用户量 以及所有的用户量
- def calc_dealer_user_count():
- preDaytime = datetime.datetime.now() - datetime.timedelta(days = 1)
- preStrDay = preDaytime.strftime(Const.DATE_FMT)
- preMonth = MONTH_DATE_KEY.format(year = preDaytime.year, month = preDaytime.month)
- endTime = to_datetime(preStrDay + ' 23:59:59')
- dealerIds = [str(dealer.id) for dealer in Dealer.objects.all().only('id')]
- for dealerId in dealerIds:
- try:
- groupIds = Group.get_group_ids_of_dealer(str(dealerId))
- monthStartTime = to_datetime(preMonth + '-01 00:00:00')
- userAddedThisMonth = MyUser.get_user_count_by_filter(groupIds, {
- 'dateTimeAdded': {'$gte': monthStartTime, '$lte': endTime}})
- DealerMonthlyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preMonth},
- {'$set': {'addedUserCount': userAddedThisMonth}})
- logger.info('dealerId=%s,added this month usercount=%s' % (dealerId, userAddedThisMonth))
- # 统计所有的用户量
- allUserCount = MyUser.get_user_count_by_filter(groupIds)
- Dealer.get_collection().update({'_id': ObjectId(dealerId)}, {'$set': {'userCount': allUserCount}})
- logger.info('dealerId=%s,all user count=%s' % (dealerId, allUserCount))
- except Exception, e:
- logger.error('stat user count e=%s' % e)
- def calc_dealer_stat_and_insert_into_db_for_dealers(dealerIds):
- nowTime = datetime.datetime.now()
- logger.info('calc dealer info now.....,dealers count=%s' % len(dealerIds))
- for dealerId in dealerIds:
- calc_one_dealer_and_insert_into_db(dealerId, nowTime)
- logger.info('finished all: calc dealer info and insert into db now')
- def calc_dealer_stat_and_insert_into_db():
- logger.info('start all: calc dealer info and insert into db now')
- dealerIds = [str(dealer.id) for dealer in Dealer.objects.all().only('id')]
- dealerCount = len(dealerIds)
- logger.info('calc dealer info now,all dealer count = %s' % dealerCount)
- pageSize = dealerCount / 3
- pageNum = dealerCount / pageSize if dealerCount % pageSize == 0 else dealerCount / pageSize + 1
- for pageIndex in range(pageNum):
- sliceDealerIds = dealerIds[pageIndex * pageSize:(pageIndex + 1) * pageSize]
- try:
- t = Thread(target = calc_dealer_stat_and_insert_into_db_for_dealers, kwargs = {'dealerIds': sliceDealerIds})
- t.setName('calc_dealer_stat_%s' % pageIndex)
- t.setDaemon(False)
- t.start()
- except Exception as e:
- logger.exception(e)
- # {'startTime':startDate,'endTime':endDate,'ownerId':ownerId,'groupId':groupId,'logicalCode':logicalCode}
- def export_charge_order_excel_from_db(filepath, queryDict):
- logger.info('start export_charge_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict))
- startTime = queryDict.get('startTime', None)
- endTime = queryDict.get('endTime', None)
- ownerId = queryDict.get('ownerId', '')
- groupId = queryDict.get('groupId', '')
- logicalCode = queryDict.get('logicalCode', '')
- phoneNumber = queryDict.get('phoneNumber')
- chargeTypeDict = {'recharge': u'充值', 'sendcoin': u'赠币', 'refund': u'退币', 'chargeCard': u'卡充值',
- 'chargeVirtualCard': u'虚拟卡充值'}
- groupIdList = Group.get_group_ids_of_dealer(ownerId)
- filters = {
- "ownerId": ownerId,
- "result": "success",
- }
- logger.info("ownerId is %s" % ownerId)
- if logicalCode:
- filters.update({"logicalCode": logicalCode})
- if groupId:
- filters.update({"groupId": groupId})
- if phoneNumber:
- phoneOwner = MyUser.objects.filter(phoneNumber = phoneNumber).first()
- if phoneOwner:
- filters.update({"openId": phoneOwner.openId})
- partnerDict = {}
- for groupId in groupIdList:
- group = Group.get_group(groupId)
- partnerDict.update(group.get('partnerDict', {}))
- userDict = {}
- records = []
- rcdList = []
- logger.info("filter is %s" % filters)
- rechargeRcds = ClientRechargeModelProxy(st = startTime, et = endTime).all(**filters)
- for rcd in rechargeRcds:
- if rcd.via in ['refund', 'sendcoin']:
- continue
- openId = rcd.openId
- if not userDict.has_key(openId):
- try:
- user = MyUser.objects.filter(openId = openId).only('openId', 'nickname', 'sex', 'groupId', 'country',
- 'province', 'city', "gateWay",
- "productAgentId").first()
- sex = u'未知'
- if user.sex == 0:
- sex = u'女'
- elif user.sex == 1:
- sex = u'男'
- else:
- pass
- user = {'sex': sex, 'nickname': user.nickname,
- 'zone': '%s%s%s' % (user.country, user.province, user.city), "phoneNumber": user.phone}
- userDict[openId] = user
- except Exception, e:
- continue
- else:
- user = userDict[openId]
- from apps.web.common.proxy import ClientDealerIncomeModelProxy
- try:
- proxyRcd = ClientDealerIncomeModelProxy.get_one(ref_id = rcd.id,
- groupId = ObjectId(rcd.groupId)) # type: DealerIncomeProxy
- if not proxyRcd:
- raise ValueError("rechargeRecord <{}> has not dealer income proxy".format(rcd.id))
- except Exception, e:
- continue
- # 获取代理商、经销商的收入
- amountDict = DealerIncomeProxy.get_agent_partner_allocated_money(ownerId, proxyRcd.partition, partnerDict)
- userNickname = "{}-{}".format(user.get("nickname", ""), user.get("phoneNumber", "")) if user.get(
- "phoneNumber") else user.get("nickname")
- dataList = [
- (u'逻辑编码', rcd.logicalCode),
- (u'IMEI', rcd.devNo),
- (u'设备类型', rcd.dev_type_name),
- (u'组名称', rcd.groupName),
- (u'组内编号', rcd.groupNumber),
- (u'组地址', rcd.address),
- (u'用户昵称', userNickname),
- (u'用户性别', user.get('sex', '')),
- (u'用户地域', user.get('zone', '')),
- (u'订单号', rcd.orderNo),
- (u'下单时间', rcd.to_datetime_str(rcd.dateTimeAdded)),
- (u'第三方支付单号', rcd.wxOrderNo),
- (u'支付方式', u'微信' if rcd.gateway == 'wechat' else u'支付宝'),
- (u'是否快捷支付', 'yes' if rcd.isQuickPay else 'no'),
- (u'充值金额', str(rcd.money)),
- (u'充值金币', str(rcd.coins)),
- (u'充值方式', chargeTypeDict.get(rcd.via, '')), # 卡充值、虚拟卡充值、直接充值等
- (u'设备负责人分配收入', str(amountDict.get('ownerAmount'))),
- (u'代理商分配金额', str(amountDict.get('agentAmount')))
- ]
- # 把所有合伙人的分配都记录下来
- for partnerIncome in amountDict.get('partnerDict').values():
- dataList.append((u'合伙人分配金额:%s(%s)' % (partnerIncome['nickname'], partnerIncome['username']),
- str(partnerIncome['money'])))
- records.append(OrderedDict(dataList))
- generate_excel_report(filepath, records)
- logger.info('finished export_charge_order_excel_from_db')
- def export_consume_order_excel_from_db(filepath, queryDict):
- logger.info('start export_consume_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict))
- startTime = queryDict.get('startTime', None)
- endTime = queryDict.get('endTime', None)
- ownerId = queryDict.get('ownerId', '')
- groupId = queryDict.get('groupId', '')
- logicalCode = queryDict.get('logicalCode', '')
- filters = {
- "isNormal": True
- }
- if logicalCode:
- filters.update({"logicalCode": logicalCode})
- elif groupId:
- filters.update({"groupId": groupId})
- else:
- groupIds = Group.get_group_ids_of_dealer(str(ownerId))
- if not groupIds:
- return generate_excel_report(filepath, [])
- filters.update({'groupId__in': groupIds})
- # 找出所有消费记录可以展示的消费细节信息,比如:消耗电量、使用时长等
- servicedKeysTemp = []
- rcdList = []
- consume_orders = ClientConsumeModelProxy(
- st = startTime,
- et = endTime
- ).all(**filters) # type:List[ConsumeRecord]
- for consume_order in consume_orders:
- try:
- servicedKeysTemp.extend(consume_order.servicedInfo.keys())
- rcdList.append(consume_order)
- except Exception as e:
- logger.exception('consume record id = {}, exception = {}'.format(str(consume_order.id), e))
- servicedKeysTemp = list(set(servicedKeysTemp))
- servicedKeys = []
- for key in servicedKeysTemp:
- if key in DEALER_CONSUMPTION_AGG_KIND_TRANSLATION:
- servicedKeys.append(key)
- else:
- continue
- userDict = {}
- records = []
- for rcd in rcdList:
- openId = rcd.openId
- if openId not in userDict:
- try:
- user = MyUser.objects.filter(openId = openId).only('nickname', 'phone').first()
- userDict[openId] = {'nickname': user.nickname, "phoneNumber": user.phone}
- user = userDict[openId]
- except Exception, e:
- continue
- else:
- user = userDict[openId]
- userNickname = "{}-{}".format(user.get("nickname", ""), user.get("phoneNumber", "")) if user.get(
- "phoneNumber") else user.get("nickname")
- dataList = [
- (u'逻辑编码', rcd.logicalCode),
- (u'IMEI', rcd.devNo),
- (u'设备端口', rcd.attachParas.get('port', '-')),
- (u'设备类型', rcd.dev_type_name),
- (u'组名称', rcd.groupName),
- (u'组内编号', rcd.groupNumber),
- (u'组地址', rcd.address),
- (u'用户昵称', userNickname),
- (u'订单号', rcd.orderNo),
- (u'下单时间', rcd.to_datetime_str(rcd.dateTimeAdded)),
- (u'花费金币', str(rcd.coin)),
- (u'消费备注', str(rcd.remarks) if rcd.remarks else u'扫码消费'),
- (u'端口', str(rcd.used_port) if rcd.used_port != -1 else ''),
- (u'停止时间', rcd.to_datetime_str(rcd.device_finished_time)),
- (u'结束原因', rcd.servicedInfo.get('reason', ''))
- ]
- # 把消费细节也记录到excel表中
- for key in servicedKeys:
- dataList.append((DEALER_CONSUMPTION_AGG_KIND_TRANSLATION.get(key), str(rcd.servicedInfo.get(key, '-'))))
- records.append(OrderedDict(dataList))
- generate_excel_report(filepath, records)
- def export_API_order_excel_from_db(filepath, queryDict):
- logger.info('start export_API_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict))
- startTime = queryDict.get('startTime', None)
- endTime = queryDict.get('endTime', None)
- ownerId = queryDict.get('ownerId', '')
- groupId = queryDict.get('groupId', '')
- logicalCode = queryDict.get('logicalCode', '')
- if logicalCode:
- dev = Device.get_dev_by_l(logicalCode)
- devNoList = [dev['devNo']]
- elif groupId:
- devNoList = Device.get_devNos_by_group([groupId])
- else:
- devNoList = None
- if devNoList:
- rcds = APIStartDeviceRecord.objects(ownerId = ownerId, devNo__in = devNoList,
- errCode = 0, datetimeAdded__gte = startTime,
- datetimeAdded__lte = endTime).order_by('-datetimeAdded')
- else:
- rcds = APIStartDeviceRecord.objects(ownerId = ownerId, errCode = 0,
- datetimeAdded__gte = startTime,
- datetimeAdded__lte = endTime).order_by('-datetimeAdded')
- records = []
- for rcd in rcds:
- dataList = [
- (u'下单时间', rcd.to_datetime_str(rcd.datetimeAdded)),
- (u'订单编号', rcd.attachParas.get('extOrderNo', '-')),
- (u'用户ID', rcd['userId']),
- (u'订购时间', rcd.package.get('time', '-')),
- (u'退费金额', rcd.servicedInfo.get('backCoins', '-')),
- (u'消耗电量', rcd.servicedInfo.get('spendElec', '-')),
- (u'消费金额', str(rcd.package.get('price', '-'))),
- (u'地址', Group.get_groupName_by_logicalCode(rcd.deviceCode)),
- (u'设备编号', rcd.deviceCode),
- ]
- records.append(OrderedDict(dataList))
- generate_excel_report(filepath, records)
- def export_on_points_order_excel_from_db(filepath, queryDict):
- logger.info('start export_onPoints_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict))
- startTime = queryDict.get('startTime', None)
- endTime = queryDict.get('endTime', None)
- ownerId = queryDict.get('ownerId', '')
- groupId = queryDict.get('groupId', '')
- logicalCode = queryDict.get('logicalCode', '')
- filters = {
- "ownerId": ownerId,
- "time__gte": startTime,
- "time__lte": endTime
- }
- if logicalCode:
- dev = Device.get_dev_by_l(logicalCode)
- devNoList = [dev['devNo']]
- filters.update({"devNo": dev["devNo"]})
- elif groupId:
- devNoList = Device.get_devNos_by_group([groupId])
- filters.update({"devNo__in": devNoList})
- else:
- devNoList = None
- rcds = UpscoreRecord.objects(**filters).order_by("-time")
- records = []
- for rcd in rcds:
- dataList = [
- (u'下分时间', rcd.time),
- (u'地址', rcd.groupName),
- (u'设备编号', rcd.logicalCode),
- (u'上分金额', str(rcd.score)),
- ]
- records.append(OrderedDict(dataList))
- generate_excel_report(filepath, records)
- def export_send_coins_to_card_order_excel_from_db(filepath, queryDict):
- logger.info('start export_send_coins_to_card_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict))
- startTime = queryDict.get('startTime', None)
- endTime = queryDict.get('endTime', None)
- ownerId = queryDict.get('ownerId', '')
- startDateTime = datetime.datetime.strptime(startTime, '%Y-%m-%d %H:%M:%S')
- endDateTime = datetime.datetime.strptime(endTime, '%Y-%m-%d %H:%M:%S')
- filters = {
- "ownerId": ownerId,
- "dateTimeAdded__gte": startDateTime,
- "dateTimeAdded__lte": endDateTime
- }
- rcds = UpCardScoreRecord.objects(**filters).order_by('-dateTimeAdded')
- records = []
- for rcd in rcds:
- if rcd.address != '':
- groupName = rcd.address
- else:
- card = Card.objects.get(cardNo = rcd.cardNo)
- groupId = card.groupId
- if groupId:
- groupName = Group.get_group(groupId).groupName
- else:
- groupName = ''
- dataList = [
- (u'实体卡号', rcd.cardNo),
- (u'上分数量', str(rcd.score)),
- (u'备注', rcd.remark),
- (u'绑定地址', groupName),
- (u'创建时间', rcd.dateTimeAdded.strftime('%Y-%m-%d %H:%M:%S')),
- ]
- records.append(OrderedDict(dataList))
- # records = []
- # for rcd in rcds:
- # dataList = [
- # (u'实体卡号', rcd.cardNo),
- # (u'上分数量', str(rcd.score)),
- # (u'备注', rcd.remark),
- # (u'创建时间', rcd.dateTimeAdded.strftime('%Y-%m-%d %H:%M:%S')),
- # ]
- #
- # records.append(OrderedDict(dataList))
- #
- generate_excel_report(filepath, records)
- def export_group_stat_excel_from_db(filepath, queryDict):
- def calc_group_income_stats(groupId, rcds, startDate, endDate):
- groupResult = GroupReport.get_rpt([groupId], startDate, endDate)
- offlineCoin = groupResult[groupId].get('lineCoins', 0)
- orderTotal, payIncome, offlineTime, totalTime, totalActivedRate = 0, RMB(0.0), 0, 0, 0
- count = 0
- for rcd in rcds:
- count += 1
- daily = rcd.get('daily', {})
- other = rcd.get('other', {})
- orderTotal += daily.get('totalIncomeCount', 0)
- payIncome += daily.get('totalIncome', 0)
- offlineTime += other.get('totalOfflineTime', 0)
- totalTime = totalTime + other.get('totalOfflineTime', 0) + other.get('totalOnlineTime', 0) + other.get(
- 'totalOfflineBusyTime', 0)
- totalActivedRate += rcd.get('activedDevRatio', 0)
- return {
- 'orderTotal': orderTotal,
- 'payIncome': payIncome,
- 'offlineCoin': offlineCoin,
- 'dailyActivityRate': int(round(totalActivedRate / count, 2)) if count > 0 else 0,
- }
- def calc_group_consume_stats(groupId, rcds):
- def sum_fn(lst):
- return sum(map(lambda _: Quantity(_), lst), Quantity('0'))
- groupConsumptionMap = cum_stats(keys = ['daily', 'consumption'],
- stats = rcds,
- translate = translate_consumption,
- sum_fn = sum_fn)
- return groupConsumptionMap.get(groupId, None)
- logger.info('start export_consume_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict))
- startTime = queryDict.get('startTime', None)
- endTime = queryDict.get('endTime', None)
- ownerId = queryDict.get('ownerId', '')
- from apps.web.agent.models import Agent
- from apps.web.dealer.models import Dealer
- dealer = Dealer.objects(id = ownerId).first() # type: Dealer
- agent = Agent.objects.get(id = dealer.agentId) # type: Agent
- records = []
- groupIds = Group.get_group_ids_of_dealer(ownerId)
- for groupId in groupIds:
- group = Group.get_group(groupId)
- rcds = [rcd for rcd in GroupDailyStat.get_collection().find(
- {'groupId': ObjectId(groupId), 'date': {'$gte': startTime, '$lte': endTime}},
- {'origin': 0, 'hourly': 0, '_id': 0}).sort('groupId')]
- incomeStats = calc_group_income_stats(groupId, rcds, startTime, endTime)
- consumeStats = calc_group_consume_stats(groupId, rcds)
- dataList = [
- (u'组名称', group['groupName']),
- (u'组地址', group['address']),
- (u'总支付笔数', int(incomeStats.get('orderTotal'))),
- (u'总支付金额', str(incomeStats.get('payIncome'))),
- (u'线下投币', str(incomeStats.get('offlineCoin'))),
- (u'设备日均活跃度', int(incomeStats.get('dailyActivityRate'))),
- ]
- # 把消费细节也记录到excel表中
- consumeStats = consumeStats if consumeStats else {}
- for info in consumeStats:
- if info['source'] not in agent.hide_consume_kinds_dealer:
- dataList.append((DEALER_CONSUMPTION_AGG_KIND_TRANSLATION.get(info['source']), str(info['value'])))
- records.append(OrderedDict(dataList))
- # 先统计
- generate_excel_report(filepath, records)
- def export_vcard_info_excel_from_db(filepath, queryDict):
- dealerId = queryDict.get('dealerId')
- # from apps.web.dealer.models import Dealer
- from apps.web.user.models import MyUser, UserVirtualCard
- # dealer = Dealer.objects.filter(id=dealerId)
- groups = Group.objects.filter(ownerId = dealerId)
- groupIds = map(lambda x: str(x.id), groups)
- users = MyUser.objects.filter(groupId__in = groupIds)
- openIds = map(lambda x: str(x.openId), users)
- vcards = UserVirtualCard.objects.filter(ownerOpenId__in = openIds, dealerId = dealerId, groupId__in = groupIds)
- result = []
- for vcard in vcards:
- group = groups.filter(id = vcard.groupId).first() or Group()
- user = users.filter(openId = vcard.ownerOpenId, groupId = vcard.groupId).first()
- dataList = [
- (u'地址', group.groupName),
- (u'卡名称', vcard.cardName),
- (u'卡号', vcard.cardNo),
- (u'卡主', vcard.nickname),
- (u'用户ID', str(user.id)),
- (u'用户电话', user.phone),
- (u'开卡时间', vcard.startTime.strftime("%Y-%m-%d %H:%M:%S")),
- (u'过期时间', vcard.expiredTime.strftime("%Y-%m-%d %H:%M:%S")),
- (u'过期时间', vcard.expiredTime.strftime("%Y-%m-%d %H:%M:%S")),
- (u'卡可用天数', vcard.periodDays),
- (u'备注', vcard.userDesc),
- (u'状态', vcard.status),
- ]
- quotaInfo = vcard.quotaInfo
- if quotaInfo:
- dataList.append((u'总额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quota'], 1)))
- dataList.append((u'已用额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quotaUsed'], 1)))
- dataList.append(
- (u'总剩余额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quota'] - quotaInfo['quotaUsed'], 1)))
- dataList.append((u'日额度()'.format(quotaInfo.get('dayQuotaUnit')), round(quotaInfo['dayQuota'], 1)))
- dataList.append((u'日已用额度()'.format(quotaInfo.get('dayQuotaUnit')), round(quotaInfo['dayUsed'], 1)))
- dataList.append((u'日剩余额度()'.format(quotaInfo.get('dayQuotaUnit')),
- round(quotaInfo['dayQuota'] - quotaInfo['dayUsed'], 1)))
- result.append(OrderedDict(dataList))
- generate_excel_report(filepath, result)
- def export_group_user_account_excel_form_db(filepath, queryDict):
- def get_spendMoney(consumeRecord):
- if consumeRecord.aggInfo:
- coin = float(str(consumeRecord.aggInfo.get('coin',0)))
- else:
- coin = 0
- return coin
- from apps.web.user.models import MyUser
- ownerId = queryDict.get('ownerId', '')
- startTime = queryDict.get('startTime', None)
- endTime = queryDict.get('endTime', None)
- groupIds = Group.get_group_ids_of_dealer(ownerId)
- sheet = []
- usersDict = dict() # type:dict
- for groupId in groupIds:
- users = MyUser.objects.filter(groupId=groupId)
- consumeRecords = ConsumeRecord.objects.filter(groupId=groupId,dateTimeAdded__gte=startTime,dateTimeAdded__lte=endTime)
- rechargeRecords = RechargeRecord.objects.filter(groupId=groupId,dateTimeAdded__gte=startTime,dateTimeAdded__lte=endTime)# type:List
- group = Group.get_group(groupId)
- # 统计充值信息
- effectiveRechargeRecords = []
- effectiveConsumeRecords = []
- for rechargeRecord in rechargeRecords:
- # 找出有效充值数据
- for i in users:
- if i.openId == rechargeRecord.openId:
- effectiveRechargeRecords.append(
- {'nickName': i.nickname, 'openId': i.openId, 'recharge': float(str(rechargeRecord.money)),
- 'balance': float(str(i.balance))})
- for effectiveRechargeRecord in effectiveRechargeRecords:
- # 将同相同用户的充值账单整理分类,并统计充值金额
- if effectiveRechargeRecord['openId'] not in usersDict:
- usersDict.update({effectiveRechargeRecord['openId']:effectiveRechargeRecord})
- else:
- usersDict[effectiveRechargeRecord['openId']]['recharge'] += effectiveRechargeRecord['recharge']
- # 统计消费信息
- for consumeRecord in consumeRecords:
- for i in users:
- if i.openId == consumeRecord.openId:
- effectiveConsumeRecords.append(
- {'nickName': i.nickname, 'openId': i.openId, 'consume': get_spendMoney(consumeRecord),'balance': float(str(i.balance))})
- for effectiveConsumeRecord in effectiveConsumeRecords:
- if effectiveConsumeRecord['openId'] not in usersDict:
- usersDict.update({effectiveConsumeRecords['openId']: effectiveConsumeRecords})
- else:
- if 'consume' not in usersDict[effectiveConsumeRecord['openId']]:
- usersDict[effectiveConsumeRecord['openId']].update({
- 'consume' : effectiveConsumeRecord['consume'],
- })
- else:
- usersDict[effectiveConsumeRecord['openId']]['consume'] += effectiveConsumeRecord['consume']
- records = []
- total_recharged = RMB(0)
- total_consumed = 0
- balance = RMB(0)
- for openId,info in usersDict.items():
- dataList = [
- (u'用户',info.get('nickName')),
- (u"用户总充值",info.get('recharge',0)),
- (u"用户总消费",info.get('consume',0)),
- (u"用户账户余额",info.get('balance',0)),
- ]
- records.append(OrderedDict(dataList))
- total_recharged += info.get('recharge',0)
- total_consumed += info.get('consume',0)
- balance += info.get('balance',0)
- records.append(OrderedDict([
- (u'用户', u'总计'),
- (u"用户总充值", total_recharged),
- (u"用户总消费", total_consumed),
- (u"用户账户余额", balance),
- ]))
- sheet.append(ExcelSheet(data=records,name=group.groupName))
- gernerate_excel_report_for_sheet(filepath, sheet)
- def poll_dealer_recharge_record(record_id, pay_app_type, interval, total_count):
- # type: (str, str, int, int)->None
- poller_cls = PayManager().get_poller(pay_app_type = pay_app_type)
- poller = poller_cls(
- record_id = record_id,
- interval = interval,
- total_count = total_count,
- record_cls = DealerRechargeRecord,
- post_pay = post_pay) # type: PayRecordPoller
- poller.start()
- def aggregate_records(records):
- totalIncome = RMB(0)
- actualTotalIncome = RMB(0)
- partDict = dict()
- for record in records: # type: DealerIncomeProxy
- totalIncome += RMB(record.totalAmount)
- actualTotalIncome += RMB(sum(record.actualAmountMap.values(), RMB(0)))
- for part in record.partition:
- if part["role"] == "agent":
- continue
- if RMB(part["money"]) == RMB(0.00):
- continue
- partDict[part["id"]] = partDict.setdefault(part["id"], RMB(0)) + part["money"]
- ledgerInfo = ""
- for k, _v in partDict.items():
- dealer = Dealer.objects.filter(id = k).only("nickname", "username").first()
- ledgerInfo += "{}-{}-{}\r\n".format(dealer.nickname, dealer.username, _v)
- return {
- "totalIncome": totalIncome,
- "actualTotalIncome": actualTotalIncome,
- "ledgerInfo": ledgerInfo
- }
- def export_aggregate_dealer_income(filePath, queryDict, aggregateType):
- s = datetime.datetime.strptime(queryDict.pop("startTime"), "%Y-%m-%d")
- e = datetime.datetime.strptime(queryDict.pop("endTime"), "%Y-%m-%d")
- queryDict["dateTimeAdded__gte"] = s
- queryDict["dateTimeAdded__lt"] = e
- records = DealerIncomeProxy.objects.filter(**queryDict)
- if aggregateType == "date":
- ts = queryDict.get("dateTimeAdded__gte")
- te = queryDict.get("dateTimeAdded__lt")
- aggregateRes = {(ts + datetime.timedelta(day)).strftime("%Y-%m-%d"): list() for day in range((te - ts).days)}
- for record in records:
- aggregateRes[record.dateTimeAdded.strftime(Const.DATE_FMT)].append(record)
- elif aggregateType == "group":
- groupIds = queryDict.get("groupId__in")
- aggregateRes = {g: list() for g in groupIds}
- for record in records:
- aggregateRes[str(record.groupId)].append(record)
- else:
- logger.error("undefined aggregate type!")
- return
- data = list()
- for key, items in aggregateRes.items():
- tempAggregate = aggregate_records(items)
- if aggregateType == "group":
- group = Group.get_group(key)
- tempAggregate.update({"groupName": group.get("groupName")})
- else:
- tempAggregate.update({"time": key})
- data.append(tempAggregate)
- writeDataList = list()
- a = RMB(0)
- b = RMB(0)
- _k = "groupName" if aggregateType == "group" else "time"
- for v in sorted(data, key = lambda x: x[_k]):
- if aggregateType == "group":
- tempData = [(u"地址名称", v.get("groupName"))]
- else:
- tempData = [(u"日期", v.get("time")), ]
- tempData.extend([
- (u"总收益", "{0:.2f}".format(float(v.get("totalIncome")))),
- (u"总分帐金额", "{0:.2f}".format(float(v.get("actualTotalIncome")))),
- (u"分账明细", v.get("ledgerInfo"))
- ])
- a += v.get("totalIncome")
- b += v.get("actualTotalIncome")
- writeDataList.append(OrderedDict(tempData))
- else:
- if aggregateType == "group":
- tempData = [(u"地址名称", u"总计")]
- else:
- tempData = [(u"日期", u"总计")]
- tempData.extend([
- (u"总收益", "{:.2f}".format(float(a))),
- (u"总分帐金额", "{:.2f}".format(float(b))),
- (u"分账明细", u"")
- ])
- writeDataList.append(OrderedDict(tempData))
- generate_excel_report(filePath, writeDataList)
- def dealer_auto_withdraw():
- """
- 经销商自动提现的脚本 每日走一次
- :return:
- """
- # 找出需要 执行脚本的经销商
- dealers = Dealer.get_auto_withdraw_dealers()
- needExecuteDealers = list()
- # 筛选处 符合条件的 经销商 主要是有金额
- for dealer in dealers: # type: Dealer
- strategy = dealer.auto_withdraw_strategy
- # 按周提现
- if strategy['type'] == 'asWeek':
- if datetime.datetime.now().isoweekday() == strategy['value']:
- logger.debug('{} has open auto withdraw switch.'.format(repr(dealer)))
- needExecuteDealers.append(dealer)
- # 每天提现
- elif strategy['type'] == 'asDay':
- logger.debug('{} has open auto withdraw switch.'.format(repr(dealer)))
- needExecuteDealers.append(dealer)
- # 按月提现
- elif strategy['type'] == "asMonth":
- if datetime.datetime.now().day == strategy['value']:
- logger.debug('{} has open auto withdraw switch.'.format(repr(dealer)))
- needExecuteDealers.append(dealer)
- else:
- logger.error('{} auto withdraw strategy type <{}> error'.format(repr(dealer), strategy['type']))
- # 建单
- for dealer in needExecuteDealers: # type: Dealer
- try:
- for incomeType in DEALER_INCOME_TYPE.choices():
- withdrawInfoList = Dealer.get_income_balance_list(dealer, incomeType, settings.WITHDRAW_MINIMUM)
- autoWithdrawType = dealer.withdrawOptions.get("autoWithdrawType")
- if autoWithdrawType == 'wechat':
- dealer.withdraw_open_id = dealer.auto_withdraw_bound_open_id
- # 对每一种收益分开提现建单
- for sourceKey, balance in withdrawInfoList:
- try:
- result = DealerWithdrawService(payee = dealer,
- income_type = incomeType,
- amount = balance,
- pay_type = autoWithdrawType).execute(sourceKey, True)
- logger.info("auto withdraw success, result is <{}>".format(result))
- except Exception as e:
- logger.exception(e)
- except Exception as e:
- logger.exception(e)
- def dealer_auto_charge_sim_card():
- """
- 经销商自动充值SIM卡
- :return:
- """
- def consume_dealer_balance(pay_gateway, incomeType, dealer, cost):
- source_key = pay_gateway.withdraw_source_key()
- fundKey = dealer.fund_key(income_type = incomeType, source_key = source_key)
- filter = {
- '_id': dealer.id,
- '{fundKey}.balance'.format(fundKey = fundKey): {'$gte': cost.mongo_amount}
- }
- update = {
- '$inc': {'{fundKey}.balance'.format(fundKey = fundKey): (-cost).mongo_amount}
- }
- result = Dealer.get_collection().update_one(filter, update, upsert = False)
- if result.matched_count == 1 and result.modified_count == 1:
- return True
- else:
- return False
- devices = Device.get_sim_expire_notify_devices(extra_filter = {'simChargeAuto': True})
- logger.info('start auto charge sim card, %s devices will be expired' % len(devices))
- from cytoolz import groupby
- devMap = groupby('ownerId', devices)
- # TODO 需要平台资金池合并成一个, 不在通过具体的支付网关来确认提现网关
- pay_gateway = get_platform_wallet_pay_gateway(AppPlatformType.PLATFORM)
- for ownerId, devs in devMap.items():
- dealer = Dealer.objects(id = ownerId).first()
- if not dealer:
- logger.warning('dealer<id={}> not exists.'.format(ownerId))
- continue
- groupMap = {}
- devObjs = list(Device.objects(logicalCode__in = [dev['logicalCode'] for dev in devs]))
- for income_type in DEALER_INCOME_TYPE.choices():
- can_used_balance = dealer.sub_balance(income_type, pay_gateway.withdraw_source_key())
- if can_used_balance <= RMB(0):
- continue
- for devObj in devObjs[:]: # type: Device
- try:
- logger.info('start auto charge sim of {}.'.format(devObj))
- if ownerId != devObj.ownerId:
- logger.warning(
- 'auto charge sim failure of {}. {} not equal {}. may be unregister.'.format(
- devObj, devObj.ownerId, ownerId))
- devObjs.remove(devObj)
- continue
- if not devObj.is_expire_in_this_month:
- continue
- devItems = [devObj]
- rcd = create_dealer_sim_charge_order(pay_gateway, dealer, devItems, 'auto', can_used_balance)
- if not rcd:
- devObjs.remove(devObj)
- continue
- costMoney = RMB(rcd.totalFee / 100.0)
- result = consume_dealer_balance(pay_gateway, income_type, dealer, costMoney)
- if result:
- logger.info('auto charge sim of {} success.'.format(devObj))
- rcd.succeed(rcd.orderNo)
- post_sim_recharge(rcd)
- if devObj.groupId not in groupMap:
- group = Group.get_group(devObj.groupId)
- groupMap[devObj.groupId] = group
- else:
- group = groupMap[devObj.groupId]
- income_record = RechargeRecord.issue_from_auto_sim_order(dealer, rcd, devObj, group)
- record_income_proxy(DEALER_INCOME_SOURCE.AUTO_SIM, income_record, {
- "owner": [
- {
- "money": (-costMoney).mongo_amount,
- "role": "owner",
- "share": Percent("100.0").mongo_amount,
- "id": str(dealer.id)
- }
- ],
- 'partner': []
- })
- can_used_balance = can_used_balance - costMoney
- devObjs.remove(devObj)
- else:
- logger.info('auto charge sim of {} failure to cancel it.'.format(devObj))
- rcd.cancel()
- break
- except InsufficientFundsError:
- logger.debug('auto charge sim of {} failure because no enough balance.'.format(devObj))
- break
- except Exception, e:
- logger.exception(e)
- def auto_charge_sim_card(dealerId):
- """
- 仅手工执行, 对单个经销商进行SIM卡自动充值
- :param dealerId:
- :return:
- """
- def consume_dealer_balance(pay_gateway, incomeType, dealer, cost):
- source_key = pay_gateway.withdraw_source_key()
- fundKey = dealer.fund_key(income_type = incomeType, source_key = source_key)
- filter = {
- '_id': dealer.id,
- '{fundKey}.balance'.format(fundKey = fundKey): {'$gte': cost.mongo_amount}
- }
- update = {
- '$inc': {'{fundKey}.balance'.format(fundKey = fundKey): (-cost).mongo_amount}
- }
- result = Dealer.get_collection().update_one(filter, update, upsert = False)
- if result.matched_count == 1 and result.modified_count == 1:
- return True
- else:
- return False
- devices = Device.get_sim_expire_notify_devices(dealer_id = dealerId, extra_filter = {'simChargeAuto': True})
- logger.info('start auto charge sim card, %s devices will be expired' % len(devices))
- from cytoolz import groupby
- devMap = groupby('ownerId', devices)
- # TODO 需要平台资金池合并成一个, 不在通过具体的支付网关来确认提现网关
- pay_gateway = get_platform_wallet_pay_gateway(AppPlatformType.PLATFORM)
- for ownerId, devs in devMap.items():
- dealer = Dealer.objects(id = ownerId).first()
- if not dealer:
- logger.warning('dealer<id={}> not exists.'.format(ownerId))
- continue
- groupMap = {}
- devObjs = list(Device.objects(logicalCode__in = [dev['logicalCode'] for dev in devs]))
- for income_type in DEALER_INCOME_TYPE.choices():
- can_used_balance = dealer.sub_balance(income_type, pay_gateway.withdraw_source_key())
- if can_used_balance <= RMB(0):
- continue
- for devObj in devObjs[:]: # type: Device
- try:
- logger.info('start auto charge sim card,the device is auto charge,devNo=%s' % devObj.devNo)
- if ownerId != devObj.ownerId:
- logger.warning(
- 'ownerId of device<devNo={},ownerId={}> not equal {}. may be unregister.'.format(
- devObj.devNo, devObj.ownerId, ownerId))
- devObjs.remove(devObj)
- continue
- if not devObj.is_expire_in_this_month:
- continue
- devItems = [devObj]
- rcd = create_dealer_sim_charge_order(pay_gateway, dealer, devItems, 'auto', can_used_balance)
- if not rcd:
- devObjs.remove(devObj)
- continue
- costMoney = RMB(rcd.totalFee / 100.0)
- result = consume_dealer_balance(pay_gateway, income_type, dealer, costMoney)
- if result:
- rcd.succeed(rcd.orderNo)
- post_sim_recharge(rcd)
- logger.info('charge sim card success,devNo=%s' % devObj.devNo)
- if devObj.groupId not in groupMap:
- group = Group.get_group(devObj.groupId)
- groupMap[devObj.groupId] = group
- else:
- group = groupMap[devObj.groupId]
- income_record = RechargeRecord.issue_from_auto_sim_order(dealer, rcd, devObj, group)
- record_income_proxy(DEALER_INCOME_SOURCE.AUTO_SIM, income_record, {
- "owner": [
- {
- "money": (-costMoney).mongo_amount,
- "role": "owner",
- "share": Percent("100.0"),
- "id": str(dealer.id)
- }
- ],
- 'partner': []
- })
- can_used_balance = can_used_balance - costMoney
- devObjs.remove(devObj)
- else:
- rcd.cancel()
- break
- except Exception, e:
- logger.exception(e)
- def batch_set_device_params(logicalCodes, updateConf, lastSetConf, operationId):
- logger.info(
- 'start batch_set_device_params, logicalCodes=<{}>, updateConf=<{}>, lastSetConf=<{}>, operationId=<{}>'.format(
- logicalCodes, updateConf, lastSetConf, operationId))
- from taskmanager.mediator import task_caller
- for logicalCode in logicalCodes:
- task_caller('set_device_params', logicalCode = logicalCode,
- updateConf = updateConf, lastSetConf = lastSetConf, operationId = operationId)
- def set_device_params(logicalCode, updateConf, lastSetConf, operationId):
- logger.info(
- 'start set_device_params, logicalCodes=<{}>, updateConf=<{}>, lastSetConf=<{}>, operationId=<{}>'.format(
- logicalCode, updateConf, lastSetConf, operationId))
- dev = Device.get_dev_by_l(logicalCode)
- operation = OperatorLog.record_dev_setting_changes_log(dev.owner, 'record_someone_set_devSettings',
- dev['logicalCode'], dev['devType']['code'],
- {
- 'updateConfStr': json.dumps(updateConf),
- 'operationId': operationId,
- 'status': 'waiting'
- })
- box = ActionDeviceBuilder.create_action_device(dev) # type: SmartBox
- from apps.web.device.models import RequestBodyDict
- requestBody = RequestBodyDict({"POST": updateConf})
- try:
- if not lastSetConf:
- lastSetConf = box.get_dev_setting()
- box.set_device_function(requestBody, lastSetConf)
- box.set_device_function_param(requestBody, lastSetConf)
- except Exception, e:
- logger.exception('error happened, error=%s' % (e,))
- operation.update(content__status = 'fail', content__beforeCacheStr = json.dumps(lastSetConf))
- return 'set_device_params logicalCode=<{}> is fail'.format(logicalCode)
- else:
- operation.update(content__status = 'success', content__beforeCacheStr = json.dumps(lastSetConf))
- return 'set_device_params logicalCode=<{}> is success'.format(logicalCode)
- def batch_set_server_settings(operator, logicalCodes, payload, ownerId, devTypeCode, operationId):
- logger.debug(
- 'batch_set_server_settings, operator={}, logicalCodes={}, payload={}, ownerId={}, devTypeCode={}, operationId={}'.format(
- operator, logicalCodes, payload, ownerId, devTypeCode, operationId))
- from taskmanager.mediator import task_caller
- for logicalCode in logicalCodes:
- task_caller('set_server_settings', operator = operator, logicalCode = logicalCode, payload = payload,
- ownerId = ownerId,
- devTypeCode = devTypeCode, operationId = operationId)
- def set_server_settings(operator, logicalCode, payload, ownerId, devTypeCode, operationId):
- logger.info(
- 'set_server_settings, operator={}, logicalCode={}, payload={}, ownerId={}, devTypeCode={}, operationId={}'.format(
- operator, logicalCode, payload, ownerId, devTypeCode, operationId))
- device = Device.get_dev_by_l(logicalCode) # type: DeviceDict
- _operator = namedtuple('Operator', ['id', 'username', 'role'])(**operator)
- operation = OperatorLog.log_dev_operation(
- operator = _operator,
- device = device,
- operator_name = 'setServerSetting',
- content = {
- 'operationId': operationId,
- 'after': payload,
- 'status': 'waiting'
- }) # type: OperatorLog
- try:
- if device.ownerId != ownerId:
- raise Exception(u'无法操作设备')
- if device.devTypeCode != devTypeCode:
- raise Exception(u'无法操作该类型设备')
- before = device.deviceAdapter.get_server_setting()
- device.deviceAdapter.set_server_setting(payload)
- operation.update(content__status = 'success', content__before = before)
- except Exception, e:
- logger.exception('error happened, error=%s' % (e,))
- operation.update(content__status = 'fail')
- def push_shanghai_platform_heatbeat():
- logger.info('start push_shanghai_platform_heatbeat')
- from apps.web.south_intf.shanghai_urban_data_collection_platform import ShangHaiUrbanDataCollectionPlatform
- from apps.web.south_intf.shanghai_urban_data_collection_platform import ShangHaiUrbanDataCollectionPlatformModel
- from apps.web.device.models import Device
- all_dealer_info = ShangHaiUrbanDataCollectionPlatformModel.all_dealers()
- for _one_info in all_dealer_info:
- devs = Device.get_devs_by_ownerId(_one_info['dealerId'])
- for dev in devs:
- try:
- ShangHaiUrbanDataCollectionPlatform(dev, **_one_info).celery_push_heatbeat()
- except:
- import traceback
- logger.error(traceback.format_exc())
- return 'success'
- def export_modify_customer_balance_record_excel_from_db(filepath, queryDict):
- customerOpenId = queryDict.get("customerOpenId")
- pageIndex = queryDict.get("pageIndex")
- pageSize = queryDict.get("pageSize")
- ownerId = queryDict.get("ownerId")
- from apps.web.user.models import MyUser
- user = MyUser.objects(openId=customerOpenId).first()
- if not user:
- records = RechargeRecord.objects(
- ownerId=ownerId,
- via='sendcoin').order_by('-dateTimeAdded').skip((pageIndex - 1) * pageSize).limit(pageSize)
- else:
- records = RechargeRecord.objects(
- ownerId=ownerId,
- openId=customerOpenId,
- via='sendcoin').order_by('-dateTimeAdded').skip((pageIndex - 1) * pageSize).limit(pageSize)
- dataList = []
- result = []
- if not user:
- for r in records:
- u = MyUser.objects(openId=r.openId).first()
- if u is None:
- logger.error('invalid openId, openId=%s' % r.openId)
- continue
- dataList.extend([
- ('用户名称',u.nickname if u.nickname is not None else ''),
- ('用户ID', customerOpenId),
- ('操作人员',r.operator),
- ('派币金额', r.coins),
- ('派币地址', r.groupName),
- ('派币时间', r.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S")),
- ('备注', r.desc),]
- )
- result.append(OrderedDict(dataList))
- else:
- for r in records:
- dataList.extend(
- [
- ('用户名称', user.nickname if user.nickname is not None else ''),
- ('用户ID', customerOpenId),
- ('操作人员', r.operator),
- ('派币金额', r.coins),
- ('派币地址', r.groupName),
- ('派币时间', r.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S")),
- ('备注', r.desc)
- ]
- )
- result.append(OrderedDict(dataList))
- generate_excel_report(filepath, result)
- def ledger_consume_order_stats(date=None, statsId=None): # type: (Optional[str, datetime], str) -> None
- """
- 统计经销商的每日分润的收益
- """
- # 获取时间戳的转换
- data = date or datetime.datetime.now() - datetime.timedelta(days=1)
- if isinstance(date, datetime.datetime):
- date = date.strftime("%Y-%m-%d")
- if statsId:
- query = DealerGroupStats.objects.filter(id=statsId)
- else:
- query = DealerGroupStats.objects.filter(date=date)
- # 处理每一单的分账任务
- for _stats in query:
- try:
- LedgerConsumeOrder(_stats).execute()
- except Exception as e:
- logger.exception("[ledger_consume_order_stats] stats <{}> execute ledger error ={}".format(_stats, e))
|