# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime from collections import OrderedDict, namedtuple from collections import defaultdict from threading import Thread import arrow import pandas import simplejson as json from bson.objectid import ObjectId from celery.utils.log import get_task_logger from django.conf import settings from typing import Dict, Optional, TYPE_CHECKING, List from apilib.monetary import RMB, Percent from apilib.quantity import Quantity from apilib.utils_datetime import timestamp_to_dt from apilib.utils_datetime import to_datetime from apps.web.agent.models import Agent from apps.web.api.models import APIStartDeviceRecord from apps.web.common.models import OperatorLog, DealerDoneReportRecord from apps.web.common.proxy import ClientRechargeModelProxy, ClientConsumeModelProxy from apps.web.common.transaction import WITHDRAW_PAY_TYPE from apps.web.common.transaction.pay import PayRecordPoller, PayManager from apps.web.constant import Const, RechargeRecordVia, USER_RECHARGE_TYPE, AppPlatformType from apps.web.constant import DEALER_CONSUMPTION_AGG_KIND_TRANSLATION, MONTH_DATE_KEY from apps.web.core.exceptions import InsufficientFundsError from apps.web.core.helpers import ActionDeviceBuilder from apps.web.core.messages.sms import SMSSender from apps.web.core.utils import generate_excel_report, gernerate_excel_report_for_sheet from apps.web.dealer.accessors import income_business_stat, consume_business_stat, monthly_business_stat from apps.web.dealer.define import DEALER_INCOME_TYPE, DEALER_INCOME_SOURCE from apps.web.dealer.models import Dealer, DealerRechargeRecord, UpscoreRecord, UpCardScoreRecord from apps.web.dealer.proxy import DealerIncomeProxy, record_income_proxy, DealerGroupStats from apps.web.dealer.utils import create_dealer_sim_charge_order from apps.web.dealer.transaction import post_pay, post_sim_recharge from apps.web.dealer.withdraw import DealerWithdrawService from apps.web.device.models import DevStatusRecord, Device, Group, DeviceDict from apps.web.device.timescale import SignalManager from apps.web.helpers import get_wechat_manager_mp_proxy, get_platform_wallet_pay_gateway from apps.web.report.ledger import Ledger, LedgerConsumeOrder from apps.web.report.models import DealerReport, DealerDailyStat, \ DeviceDailyStat, GroupDailyStat, DealerMonthlyStat from apps.web.report.models import GroupReport from apps.web.report.utils import cum_stats, translate_consumption from apps.web.user.models import ConsumeRecord, MyUser, RechargeRecord, Card if TYPE_CHECKING: from apps.web.core.adapter.base import SmartBox logger = get_task_logger(__name__) def report_feedback_to_dealer_via_wechat(dealerId, nickname, msg, feedbackTime): """ TODO :return: """ if not settings.SEND_DEALER_WECHAT_PUSH_MESSAGE: logger.info('SEND_DEALER_WECHAT_PUSH_MESSAGE is turned off, you are probably in dev env') else: dealer = Dealer.objects(id = str(dealerId)).first() # type: Optional[Dealer] if not dealer: return {'info': 'dealer does not exist, id=%s' % (dealerId,)} if not dealer.managerialOpenId: return {'info': 'no manager open Id for dealer %s' % (dealer.username,)} wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer) return wechat_mp_proxy.notify(dealer.managerialOpenId, 'feedback', **{ 'title': msg, 'nickname': nickname, 'feedbackTime': feedbackTime }) def report_daily_report_to_dealer_via_wechat(dealer = None): # type:(Optional[Dealer])->None """ 每日收益次日推送(9点) `模版微信公众号后台ID` : OPENTM401155654 :return: """ sent = set([]) def _report(dealer): # type:(Dealer)->dict logger.info('sending daily income report to %r' % (dealer,)) if not dealer.managerialOpenId: return {'info': 'no managerial open Id for %r' % (dealer,)} agent = Agent.objects(id = dealer.agentId).first() if not agent: return {'info': 'no agent found by %r' % (dealer,)} if not agent.is_in_domain: return {'info': 'is not my agent. agentId = %s' % (str(dealer.agentId),)} yesterday = datetime.datetime.now() - datetime.timedelta(days = 1) offline_report = DealerReport.objects(ownerId = str(dealer.id), type = 'day', date = yesterday.strftime("%Y-%m-%d")).first() # type: DealerReport # getattr 加上默认参数 防止获取不到报错 stat = getattr(DealerDailyStat.objects( date = yesterday.date().strftime(Const.DATE_FMT), dealerId = ObjectId(dealer.id)).only('daily').first(), 'daily', None) if not stat: stat = {} if offline_report is None: offline_coins = 0 else: if offline_report.rpt is None: offline_coins = 0 else: offline_coins = offline_report.rpt.get('lineCoins') income_map = { 'offline_coins': offline_coins, 'online_income': RMB(stat.get('income', {}).get(RechargeRecordVia.Balance, 0)) + RMB(stat.get('income', {}).get(RechargeRecordVia.Redpack, 0)) + RMB(stat.get('income', {}).get(RechargeRecordVia.Card, 0)) + RMB(stat.get('income', {}).get(RechargeRecordVia.VirtualCard, 0)), 'refund_cash': RMB(stat.get('income', {}).get(RechargeRecordVia.RefundCash, 0)) } if abs(income_map['refund_cash']) > RMB(0): report = u'线下投币(%s)次 在线支付收入(%s)元 现金退款(%s)元' % ( income_map['offline_coins'], income_map['online_income'], abs(income_map['refund_cash'])) else: report = u'线下投币(%s)次 在线支付收入(%s)元' % ( income_map['offline_coins'], income_map['online_income']) wechat_mp_proxy = get_wechat_manager_mp_proxy(agent) return wechat_mp_proxy.notify(dealer.managerialOpenId, 'daily_income', **{ 'title': u'您的昨日收益报表', 'reportTime': yesterday.strftime('%Y-%m-%d'), 'report': report }) try: if dealer is None: last_done_list = [] doneRecord = DealerDoneReportRecord.objects( reportName = 'report_daily_report_to_dealer_via_wechat', reportDay = datetime.datetime.now().strftime('%Y-%m-%d')).only('dealerIds').first() if doneRecord and doneRecord.dealerIds: last_done_list = doneRecord.dealerIds dealers = Dealer.objects(dailyIncomeReportPushSwitch = True).only( 'id', 'managerialOpenId', 'agentId').batch_size(2000).timeout(False) for dealer in dealers: if str(dealer.id) in last_done_list: continue try: _report(dealer) except Exception as e: logger.exception(e) else: sent.add(str(dealer.id)) else: _report(dealer) finally: if sent: DealerDoneReportRecord.update_done_list( report_name = 'report_daily_report_to_dealer_via_wechat', report_day = datetime.datetime.now().strftime('%Y-%m-%d'), done_list = list(sent)) def report_new_payment_to_dealer_via_wechat(record): # type:(RechargeRecord)->dict """每笔订单完成后推送 模版元格式 {{first.DATA}} 顾客:{{keyword1.DATA}} 收益:{{keyword2.DATA}} {{remark.DATA}} `模版微信公众号后台ID` : OPENTM207568114 您的顾客消费啦 顾客:小明 收益:10.00元 你的顾客消费啦,为您带来10.00元的收益哦 :param record: """ if not settings.SEND_DEALER_WECHAT_PUSH_MESSAGE: logger.info('SEND_DEALER_WECHAT_PUSH_MESSAGE is turned off, you are probably in dev env') else: logger.info('sending new payment order record=%s' % (json.dumps(record),)) dealer = Dealer.objects(id = str(record['ownerId'])).first() # type: Optional[Dealer] if not dealer: return {'info': 'dealer does not exist, ownerId=%s' % (record['ownerId'],)} if not dealer.newUserPaymentOrderPushSwitch: return {'info': 'dealer(%s) hasn\'t switched up yet' % (record['ownerId'],)} if not dealer.managerialOpenId: return {'info': 'no manager open Id for dealer %s' % (dealer.username,)} agent = Agent.objects(id = dealer.agentId).first() # type: Optional[Agent] if not agent: return {'info': 'No agent found by id=%s' % (str(dealer.agentId),)} _translate = { 'wechat': u'微信', 'alipay': u'支付宝', 'card': u'卡充值' } from apps.web.dealer.validation import newPaymentNotifyPayload payload = newPaymentNotifyPayload(record) _info = _translate.get(payload['gateway']) _translate_via = {'redpack': '平台红包'} if payload.get('via') and _translate_via.get(payload['via']): _info = _translate_via[payload['via']] wechat_mp_proxy = get_wechat_manager_mp_proxy(agent) return wechat_mp_proxy.notify(dealer.managerialOpenId, 'new_payment_order', **{ 'title': u'您有新顾客充值啦', 'customer': u'昵称(%s)来自(%s)' % (payload['nickname'], _info), 'income': u'%s元' % payload['money'] }) def report_device_abnormally_offline_to_dealer_via_wechat(dealer = None, logicalCode = None, offTime = 0): """ 默认为2小时推送设备离线信息 `模版` {{first.DATA}} 设备名称:{{keyword1.DATA}} 时间:{{keyword2.DATA}} {{remark.DATA}} `模版微信公众号后台ID` OPENTM401433345 `示例` 掌聚网能设备离线通知 设备名称:机房UPS 时间:2016-02-19 13:12:45 请立即登录查看详情 :return: """ notified = defaultdict(list) def _report(dealer, logicalCode, offTime): if not dealer.managerialOpenId: return {'info': 'no managerial open Id for dealer %s' % (dealer.phone,)} agent = Agent.objects(id = dealer.agentId).first() # type: Agent if not agent: return {'info': 'No agent found by id=%s' % (str(dealer.agentId),)} if not agent.is_in_domain: return {'info': 'is not my agent. agentId = %s' % (str(dealer.agentId),)} if not agent.supports('notify_dealer_device_offline'): return {'info': 'agent does not support offline device push'} #: 记录操作结果 notified[str(dealer.id)].append((logicalCode, offTime)) wechat_mp_proxy = get_wechat_manager_mp_proxy(agent) return wechat_mp_proxy.notify(dealer.managerialOpenId, 'abnormal_device_offline', **{ 'title': '您有一台设备已离线!', 'device': u'逻辑编码:%s' % logicalCode, 'notifyTime': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) if dealer is None: dealers = list(Dealer.objects(offlineNotify = True)) for dealer in dealers: # type: Dealer #: 暂时只通知设备所属经销商而不通知合伙人 now = datetime.datetime.now() devices = dealer.get_own_devices() for device in devices: # type: DeviceDict if not device.online: if device.offTime > 0: offset = (now - datetime.datetime.fromtimestamp(device.offTime // 1000)).total_seconds() #: 如果设备离线超过24小时,则不予推送 if 24 > (offset // 3600) >= 2: logger.info( 'device(logicalCode=%s) is offline(offTime=%s, offset=%s), sending to dealer(phone=%s)' % (device['logicalCode'], device['offTime'], offset, dealer.username,)) _report(dealer, device['logicalCode'], device['offTime']) else: _report(dealer, logicalCode, offTime) return notified def report_offline_device_to_dealer_via_wechat(): notified = defaultdict(list) dealers = Dealer.objects(offlineNotifySwitch = True) for _ in dealers: try: logicalCodes = [] now = datetime.datetime.now() devices = _.get_own_devices() if len(devices) == 0: continue for d in devices: if d.online == 0: offset = (now - datetime.datetime.fromtimestamp(d.offTime // 1000)).total_seconds() if _.offlineNotifyTime != '': if (offset // 3600) > int(_.offlineNotifyTime): logicalCodes.append(d['logicalCode']) if len(logicalCodes) == 0: continue if not _.managerialOpenId: return {'info': 'no managerial open Id for dealer %s' % (_.phone,)} agent = Agent.objects(id = _.agentId).first() # type: Agent if not agent: return {'info': 'No agent found by id=%s' % (str(_.agentId),)} if not agent.is_in_domain: return {'info': 'is not my agent. agentId = %s' % (str(_.agentId),)} wechat_mp_proxy = get_wechat_manager_mp_proxy(agent) logicalCodesStr = '' for l in logicalCodes: logicalCodesStr += l + ' ' wechat_mp_proxy.notify(_.managerialOpenId, 'abnormal_device_offline', **{ 'title': '您有%s台设备已离线!' % len(logicalCodes), 'device': u'逻辑编码:%s' % logicalCodesStr, 'notifyTime': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) except Exception as e: logicalCodesStr = '' pass notified[str(_.id)].append(logicalCodesStr) return notified def report_to_dealer_via_wechat(openId, dealerId, templateName, **kwargs): dealer = Dealer.objects(id = str(dealerId)).first() if not dealer: return {'info': 'dealer does not exist, id=%s' % (dealerId,)} wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer) return wechat_mp_proxy.notify(openId, templateName, **kwargs) def send_SIM_expired_messages(send_type): """ 给经销商发送设备SIM卡过期短信 :return: """ logger.debug('send type is {}'.format(send_type)) if settings.DEBUG_CELERY_TASK: expire_notify_devices = [{'logicalCode': 'DUMMY', 'ownerId': '5b9ae99ad89a177846459999'}] else: expire_notify_devices = Device.get_sim_expire_notify_devices() logger.info('%s devices will be expired' % len(expire_notify_devices)) from cytoolz import groupby dealer_device_map = groupby('ownerId', expire_notify_devices) dealer_map = { dealer['_id']: dealer for dealer in Dealer.get_collection() .find({'_id': {'$in': [ObjectId(id_) for id_ in dealer_device_map.keys()]}}, {'username': 1, 'agentId': 1, 'smsVendor': 1}) } # type: Dict[ObjectId, dict] agent_product_map = {} for agent in Agent.objects(id__in = [_['agentId'] for _ in dealer_map.values()]).all(): # type: Agent agent_product_map[str(agent.id)] = agent.product_name if not len(dealer_map): logger.info('there are no devices are expired at the moment') return {'result': 'no devices are expired'} else: logger.info('we will notify %d dealer, devices are %s' % (len(dealer_map), dealer_device_map)) if not (settings.DEBUG_CELERY_TASK or settings.CHECK_DEVICE_SMS_EXPIRE): logger.info('CHECK_DEVICE_SMS_EXPIRE is turned off, you are probably in dev/testing environment') else: smsVendor = SysParas.get_sim_expire_sms_vendor(default = SmsVendorCode.UCPAAS) for id_, dealer in dealer_map.iteritems(): logger.debug('sending SIM expired message to Dealer(phone=%s)' % (dealer['username'],)) if send_type == 'sms': logger.debug('dealerId = {}, mobile = {}, productName = {}'.format( id_, dealer['username'], agent_product_map[dealer['agentId']])) sms_sender = SMSSender(vendor = smsVendor) response = sms_sender.send(phoneNumber = dealer['username'], templateName = "SMS_NOTIFY_EXPIRED_DEVICE_TEMPLATEID", msg = u'您有设备即将在近期过期'.encode('utf-8'), productName = agent_product_map[dealer['agentId']]) if not response['result']: logger.error( 'send sim expired sms to dealer failed(phone={}), error={}'.format( dealer['username'], response['msg'])) else: logger.info('send sim expired sms to dealer(phone={}) success'.format(dealer['username'])) if send_type == 'wechat': dealer = Dealer.objects(id = str(id_)).first() # type: Dealer if not dealer: logger.debug('dealer is not exist.'.format(id_)) continue wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer) wechat_mp_proxy.notify(dealer.managerialOpenId, 'sim_expire_notify', **{ 'title': u'亲爱的用户,您有设备本月流量卡过期,请及时充值。已经充值请忽略本消息。'.format( agent_product_map[dealer['agentId']]), 'num': u'{}台(本月)'.format(len(dealer_device_map.get(str(id_)))), 'type': u'流量卡到期', 'time': arrow.now().replace(day = Const.SIM_CARD_FORBIDDEN_DAY).format('YYYY-MM-DD') }) logger.info( 'sent sim expired message to dealer(phone={})'.format(dealer['username'])) def daily_check_auto_withdraw(): """ 每日检查有自动提现资格经销商,该功能不倡导用 :return: """ if not settings.SUPPORT_DEALER_AUTO_WITHDRAW: logger.info('dealer auto withdraw is not supported, you are probably in testing/dev env') dealers = Dealer.objects(autoWithdrawAllowable = True) logger.info('we are going to pay dealers(%s)' % dealers) for dealer in dealers: for income_type in DEALER_INCOME_TYPE.choices(): for source_key, balance_field in dealer.balance_dict(income_type): balance = balance_field.balance if balance > RMB(10): withdraw_service = DealerWithdrawService(payee = dealer, income_type = income_type, amount = balance, pay_type = WITHDRAW_PAY_TYPE.WECHAT, bank_card_no = None) result = withdraw_service.execute(source_key = source_key, recurrent = True) logger.info('%r withdrawn %s, result = %s' % (dealer, balance, result)) # 写入的数据模型 ExcelSheet = namedtuple("Sheet", ["data", "name"]) def generate_business_stats_report_by_dealer(filePath, queryAttrs): """ 给经销商生成旗下的经营数据报表 :return: """ logger.debug( 'generate_business_stats_report_by_dealer, filePath is <{}>, queryAttrs is <{}>'.format(filePath, queryAttrs)) try: kind = queryAttrs.pop('kind', '') # 经销商加入了一个新的特性,该特性支持按月统计账单 if kind != "monthly_bill": # 前台传递过来的数据是xxxx xx xx 23:59:59,相当于是下一天的闭区间,目前方法所需要的数据是当天的闭区间,所以把时间减一天 queryAttrs['endTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__lte')) - datetime.timedelta( hours = 23, minutes = 59, seconds = 59) queryAttrs['startTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__gte')) if kind == "income": records = income_business_stat(**queryAttrs) groupByGroup = list() if records: df = pandas.DataFrame(records) # 地址分类 for _groupName, _v in df.groupby(u"地址"): tempList = list() tempList.append((u"", _groupName)) for _source, __v in _v.groupby(u"支付类型"): tempList.append((_source, __v[u"分得金额"].astype(float).sum())) tempList.append((u"分得总金额", _v[u"分得金额"].astype(float).sum())) tempList.append((u"总金额", _v[u"总金额"].astype(float).sum())) groupByGroup.append(OrderedDict(tempList)) sheets = list() sheets.append(ExcelSheet(data = records, name = u"数据总览")) sheets.append(ExcelSheet(data = groupByGroup, name = u"地址汇总")) gernerate_excel_report_for_sheet(filePath, sheets) elif kind == "consume": records = consume_business_stat(**queryAttrs) generate_excel_report(filePath, records) else: logger.error("invalid kind <{}> to generate_business_stats_report_by_dealer") else: queryAttrs['endTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__lte')) - datetime.timedelta( hours = 23, minutes = 59, seconds = 59) queryAttrs['startTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__gte')) records = monthly_business_stat(**queryAttrs) generate_excel_report(filePath, records) except Exception as e: logger.exception(e) # 计算经销商订单的一些统计数据,包括计算出日活设备占总设备的比例,以及各个套餐的使用数目 def calc_dealer_order_data_stats(dealerId, startTime, endTime): logger.info('calc_dealer_order_data_stats dealerId=%s,startTime=%s,endTime=%s' % (dealerId, startTime, endTime)) group_id_list = Group.get_group_ids_of_dealer(dealerId) rcds = ConsumeRecord.get_collection().find( { # 'ownerId': dealerId, 'groupId': { '$in': group_id_list }, 'dateTimeAdded': { '$gte': startTime, '$lte': endTime }, 'isNormal': True }) # 计算套餐统计,以及活跃过的设备清单 dealerDevNoSet = set() groupDict = {} dealerPackageDict = {} for rcd in rcds: dealerDevNoSet.add(rcd['devNo']) groupId = rcd['groupId'] coin = str(rcd['coin']) if not groupDict.has_key(groupId): groupDict[groupId] = {'devNoSet': {rcd['devNo']}} groupDict[groupId]['package'] = {coin: 1} # type:Dict else: groupDict[groupId]['devNoSet'].add(rcd['devNo']) if coin in groupDict[groupId]['package']: groupDict[groupId]['package'][coin] += 1 else: groupDict[groupId]['package'] = {coin: 1} if not dealerPackageDict.has_key(coin): dealerPackageDict[coin] = 1 else: dealerPackageDict[coin] += 1 # 计算实际每组的设备使用率 allDevCount = 0 for groupId, value in groupDict.items(): devCount = len(Device.get_devNos_by_group([groupId])) groupActivedDevCount = len(value['devNoSet']) value['activedDevRatio'] = int(float(groupActivedDevCount) / devCount * 100) if devCount != 0 else 0 allDevCount += devCount value.pop('devNoSet') activedDevRatio = int(float(len(dealerDevNoSet)) / allDevCount * 100) if allDevCount != 0 else 0 return {'activedDevRatio': activedDevRatio, 'groupStats': groupDict, 'package': dealerPackageDict} def get_status_from_event(event): if event['signal'] > 0: return 'online' elif event['usage'] > 0: return 'offline_busy' else: return 'offline' # 因为设备状态记录依赖于事件的触发,有可能某个离线一直离线,不上线;或者设备一直在线,不曾离线,这样devStatusRecord就不会生成, # 但是统计数据又依赖这个数据,所以就需要定时触发,做一次切割,主动生成一次这个数据 def make_dev_status_rcd(devNo, splitTime): def make_status_between_node(startNode, endNode, valueList = []): if startNode['time'] == endNode['time']: return if valueList: changeNode = valueList[-1] else: changeNode = startNode if changeNode['signal'] == 0 and endNode['signal'] > 0: # 离线->在线 status = get_status_from_event(changeNode) elif changeNode['signal'] == 0 and endNode['signal'] == 0: # 离线->离线 status = get_status_from_event(changeNode) elif changeNode['signal'] > 0 and endNode['signal'] == 0: # 在线->离线 status = get_status_from_event(endNode) elif changeNode['signal'] > 0 and endNode['signal'] > 0: # 在线->在线 if (to_datetime(endNode['time']) - to_datetime(changeNode['time'])).total_seconds() > 10 * 60: status = 'offline' if endNode['usage'] == 0 else 'offline_busy' else: status = 'online' # 记录数据 if not valueList: valueList = [startNode, endNode] startTime = to_datetime(startNode['time']) endTime = to_datetime(endNode['time']) DevStatusRecord.get_collection().update({'devNo': devNo, 'startTime': startTime, 'endTime': endTime}, {'$set': { 'devNo': devNo, 'status': status, 'startTime': startTime, 'endTime': endTime, 'duration': (endTime - startTime).total_seconds(), 'valueDict': {'signalUsage': valueList} }}, upsert = True) logger.info('make_dev_status_rcd devNo=%s,splittime%s' % (devNo, splitTime)) # 如果没有没有事件,说明最近从来没有上过线,也没有收到过离线报文,统一按照离线处理。(前提是,初始化的时候,cache中的event需要初始化一个数据进来) # 从来没有上过线,或者某种状态维持了1周,然后cache中丢失了,才会出现event 为None preDay = (splitTime - datetime.timedelta(days = 1)).strftime(Const.DATE_FMT) pre2Day = (splitTime - datetime.timedelta(days = 2)).strftime(Const.DATE_FMT) thisToday = splitTime.strftime(Const.DATE_FMT) startTime = to_datetime(preDay + ' 00:00:00') endTime = to_datetime(preDay + ' 23:59:59') startTimePre2Day = to_datetime(pre2Day + ' 00:00:00') # 用于多取一些数据,方便判断第一个起始时间到第一个节点时间之间的状态 endTimePre2Day = to_datetime(preDay + ' 23:59:59') startTimeToday = to_datetime(thisToday + ' 00:00:00') # 用于多取一些数据,方便判断最后一个时间节点到结束时间之间的状态 endTimeToday = to_datetime(thisToday + ' 23:59:59') events = SignalManager.instence().get(devNo, startTime, startTime) lastEvents = SignalManager.instence().get(devNo, startTimePre2Day, endTimePre2Day) nextEvents = SignalManager.instence().get(devNo, startTimeToday, endTimeToday) # 这种情况要么是掉线时间过久,memcache中的key都被清除掉了;要么是没有初始化进来包括设备离线增加进来,未上线或者监控脚本才启动,没有初始化(第一次启动的时候,数据略有偏差,可以忽略)。 if not events: make_status_between_node( {'time': startTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0, 'isSplit': 1}, {'time': endTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0, 'isSplit': 1}) else: # 如果上一次事件的时间是昨天以前,就需要切割 count = len(events) preStatusEvent = events[0] valueList = [] # 首先处理第一个节点 if lastEvents and lastEvents[-1]: # make_status_between_node(lastEvents[-1], preStatusEvent) else: # 没有监听的情况,默认认为离线 make_status_between_node({'time': startTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0}, preStatusEvent) for ii in range(count - 1): curEvent = events[ii] nextEvent = events[ii + 1] valueList.append(curEvent) if (curEvent['signal'] > 0 and nextEvent['signal'] > 0): # 两个节点都在线,需要判断两个节点的时间间隔,如果超过10分钟,算作离线 if (to_datetime(nextEvent['time']) - to_datetime(curEvent['time'])).total_seconds() > 10 * 60: make_status_between_node(preStatusEvent, curEvent, valueList) # 在线状态 make_status_between_node(curEvent, {'time': nextEvent['time'], 'signal': 0, 'usage': 0}) # 离线状态 valueList = [] preStatusEvent = nextEvent else: if ii == (count - 2): # 如果最后一个,应该主动生成 make_status_between_node(preStatusEvent, nextEvent, valueList) valueList = [] preStatusEvent = nextEvent elif curEvent['signal'] > 0 and nextEvent['signal'] == 0: # 在线-》离线,应该记录一个在线,一个离线 make_status_between_node(preStatusEvent, curEvent, valueList) # 在线状态 make_status_between_node(curEvent, nextEvent) # 离线状态 valueList = [] preStatusEvent = nextEvent elif curEvent['signal'] == 0 and nextEvent['signal'] > 0: # 离线->在线,产生一条离线处理 # 记录离线 make_status_between_node(preStatusEvent, nextEvent, valueList) valueList = [] preStatusEvent = nextEvent elif curEvent['signal'] == 0 and nextEvent['signal'] == 0: if ii == count - 2: # 如果最后一个,应该主动生成 make_status_between_node(preStatusEvent, nextEvent, valueList) # 收尾最后一个事件,直接产生一条记录 if nextEvents and nextEvents[0]: make_status_between_node(events[-1], nextEvents[0], valueList) # 如果跑出来,能够碰到心跳, else: make_status_between_node(events[-1], {'time': endTime, 'signal': 0, 'usage': 0}, valueList) logger.info('finish make_dev_status_rcd devNo=%s' % devNo) # 统计设备状态数据 def calc_dev_status_stat(devNo, startTime, endTime): logger.info('calc dev status devNo=%s,startTime=%s,endTime=%s' % (devNo, startTime, endTime)) def count_time(startTime, endTime, rcd, totalDict): status = rcd['status'] duration = (endTime - startTime).total_seconds() if status == 'online': totalDict['totalOnlineTime'] += duration elif status == 'offline': totalDict['totalOfflineTime'] += duration elif status == 'offline_busy': totalDict['totalOfflineBusyTime'] += duration else: pass queryStartTime = startTime - datetime.timedelta(days = 1) queryEndTime = endTime + datetime.timedelta(days = 1) rcds = DevStatusRecord.objects.filter(devNo = devNo, startTime__gte = queryStartTime, endTime__lte = queryEndTime) totalDict = {'totalOfflineTime': 0, 'totalOfflineBusyTime': 0, 'totalOnlineTime': 0, 'offlineCount': 0, 'offlineBusyCount': 0, 'onlineBusy': 0} # 统计的状态的累计之和,包括总的离线时间、在线时间、离线繁忙时间等 statusList = [] for rcd in rcds: if endTime <= rcd.startTime or startTime >= rcd.endTime: # 两头的不合理,直接去掉 continue elif startTime >= rcd.startTime and endTime <= rcd.endTime: count_time(startTime, endTime, rcd, totalDict) elif startTime <= rcd.startTime: if endTime >= rcd.startTime and endTime <= rcd.endTime: count_time(rcd.startTime, endTime, rcd, totalDict) elif endTime >= endTime: count_time(rcd.startTime, rcd.endTime, rcd, totalDict) else: continue elif startTime >= rcd.startTime and startTime <= rcd.endTime: if endTime <= rcd.endTime: count_time(startTime, endTime, rcd, totalDict) else: count_time(startTime, rcd.endTime, rcd, totalDict) else: continue statusList.extend(rcd.valueDict['signalUsage']) # 计算每个小时统计数据。如果在线变离线、离线变在线,空闲变繁忙,繁忙变空闲。记录的是变化情况 hourDict = {} ii = 0 while ii < len(statusList) - 1: preStatus = statusList[ii] status = statusList[ii + 1] ii += 1 # 如果是人为劈开的,不作为正式数据,直接过滤掉 if status.has_key('isSplit'): continue # 不在时间范围内的,过滤掉 statusTime = to_datetime(status['time']) if statusTime < startTime or statusTime > endTime: continue # 如果状态没有变化,也直接过滤掉 if (preStatus['signal'] > 0 and status['signal'] > 0 and preStatus['usage'] == status['usage']) \ or (preStatus['signal'] == 0 and status['signal'] == 0): continue hour = statusTime.hour if not hourDict.has_key(hour): hourDict[hour] = {'offlineCount': 0, 'offlineBusyCount': 0, 'onlineBusy': 0, 'usageSum': 0, 'usageCount': 0} if status['usage'] == 0 and status['signal'] == 0: hourDict[hour]['offlineCount'] += 1 totalDict['offlineCount'] += 1 elif status['usage'] > 0 and status['signal'] == 0: hourDict[hour]['offlineBusyCount'] += 1 hourDict[hour]['usageSum'] += status['usage'] hourDict[hour]['usageCount'] += 1 totalDict['offlineBusyCount'] += 1 elif status['usage'] > 0 and status['signal'] > 0: hourDict[hour]['onlineBusy'] += 1 hourDict[hour]['usageSum'] += status['usage'] hourDict[hour]['usageCount'] += 1 totalDict['onlineBusy'] += 1 else: pass return {'totalOfflineTime': totalDict['totalOfflineTime'], 'totalOfflineBusyTime': totalDict['totalOfflineBusyTime'], 'totalOnlineTime': totalDict['totalOnlineTime'], 'offlineCount': totalDict['offlineCount'], 'offlineBusyCount': totalDict['offlineBusyCount'], 'onlineBusy': totalDict['onlineBusy'], 'hourly': hourDict} def calc_one_dealer_and_insert_into_db(dealerId, nowTime): logger.info('start calc dealerId=%s info and insert into db now......' % dealerId) def build_daily_update(valueMap): rv = defaultdict(dict) for kind, amount in valueMap.iteritems(): if kind == 'package': for coin, value in amount.items(): rv['$inc']['other.package.{coin}'.format(coin = int(int(float(coin)) * 100))] = int(value) elif kind == 'hourly': for hour, value in amount.items(): for hourKind, hourValue in value.items(): rv['$inc']['other.hourly.{hour}.{hourKind}'.format(hour = hour, hourKind = hourKind)] = int( hourValue) else: rv['$inc']['other.{kind}'.format(kind = kind)] = int(amount) if rv.has_key('activedDevRatio'): rv.pop('activedDevRatio') if rv.has_key('peakUsage'): rv.pop('peakUsage') if rv.has_key('devCount'): rv.pop('devCount') rv.update({'$set': {'devCount': valueMap.get('devCount', 0), 'activedDevRatio': valueMap.get('activedDevRatio', 0), 'peakUsage': valueMap.get('peakUsage', 0)}}) return rv # 月度数据,主要是月度平均活跃度,需要特殊计算 def build_month_update(valueMap, preDay, monthactivedDevRatio): rv = defaultdict(dict) for kind, amount in valueMap.iteritems(): if kind == 'package': for coin, value in amount.items(): rv['$inc']['other.package.{coin}'.format(coin = int(int(float(coin)) * 100))] = int(value) elif kind == 'hourly': for hour, value in amount.items(): for hourKind, hourValue in value.items(): rv['$inc']['other.hourly.{hour}.{hourKind}'.format(hour = hour, hourKind = hourKind)] = int( hourValue) else: rv['$inc']['other.{kind}'.format(kind = kind)] = int(amount) if rv.has_key('activedDevRatio'): # 目前这个算法是有缺陷的,因为每天的设备数目是变化的,准确的计算应该是当月进行计算才对 rv.pop('activedDevRatio') if rv.has_key('devCount'): rv.pop('devCount') activedDevRatio = int(((preDay - 1) * monthactivedDevRatio + valueMap['activedDevRatio']) / preDay) rv.update({'$set': {'devCount': valueMap.get('devCount', 0), 'activedDevRatio': activedDevRatio}}) return rv preDaytime = nowTime - datetime.timedelta(days = 1) preDay = preDaytime.day preStrDay = preDaytime.strftime(Const.DATE_FMT) preMonth = MONTH_DATE_KEY.format(year = preDaytime.year, month = preDaytime.month) startTime = to_datetime(preStrDay + ' 00:00:00') endTime = to_datetime(preStrDay + ' 23:59:59') groupIds = Group.get_group_ids_of_dealer(str(dealerId)) devNoList = Device.get_devNos_by_group(groupIds) devStatDict = {} dealerHourlyDict = {} for devNo in devNoList: make_dev_status_rcd(devNo, nowTime) statInfo = calc_dev_status_stat(devNo, startTime, endTime) devStatDict[devNo] = statInfo statInfo = calc_dealer_order_data_stats(str(dealerId), startTime, endTime) groupStatDict = statInfo['groupStats'] activedDevRatio = statInfo['activedDevRatio'] dealerPackage = statInfo['package'] totalOfflineTime, totalOfflineBusyTime, totalOnlineTime, offlineCount, offlineBusyCount = 0, 0, 0, 0, 0 dealerDevCount = 0 # 进行汇总计算数据,从设备-》组-》经销商 groups = Group.get_groups_by_group_ids(groupIds) for groupId, group in groups.items(): devNos = Device.get_devNos_by_group([groupId]) groupDevCount = 0 if not groupStatDict.has_key(groupId): groupStatDict[groupId] = { 'totalOfflineTime': 0, 'totalOfflineBusyTime': 0, 'totalOnlineTime': 0, 'offlineCount': 0, 'offlineBusyCount': 0, 'activedDevRatio': 0, 'devCount': 0, 'package': {}, 'hourly': {} } else: groupStatDict[groupId].update({ 'totalOfflineTime': 0, 'totalOfflineBusyTime': 0, 'totalOnlineTime': 0, 'offlineCount': 0, 'offlineBusyCount': 0, 'hourly': {} }) for devNo in devNos: groupDevCount += 1 dealerDevCount += 1 devInfo = devStatDict.get(devNo, {}) devTotalOfflineTime = devInfo.get('totalOfflineTime', 0) devTotalOfflineBusyTime = devInfo.get('totalOfflineBusyTime', 0) devTotalOnlineTime = devInfo.get('totalOnlineTime', 0) devOfflineCount = devInfo.get('offlineCount', 0) devOfflineBusyCount = devInfo.get('offlineBusyCount', 0) hourlyDict = devInfo.get('hourly', {}) groupStatDict[groupId]['totalOfflineTime'] += devTotalOfflineTime groupStatDict[groupId]['totalOfflineBusyTime'] += devTotalOfflineBusyTime groupStatDict[groupId]['totalOnlineTime'] += devTotalOnlineTime groupStatDict[groupId]['offlineCount'] += devOfflineCount groupStatDict[groupId]['offlineBusyCount'] += devOfflineBusyCount # 往组和经销商内统计数据,补充小时级别的统计数据 for hour, value in hourlyDict.items(): if hour not in groupStatDict[groupId]['hourly']: groupStatDict[groupId]['hourly'] = {hour: value} else: groupStatDict[groupId]['hourly'][hour]['offlineCount'] += hourlyDict[hour]['offlineCount'] groupStatDict[groupId]['hourly'][hour]['offlineBusyCount'] += hourlyDict[hour]['offlineBusyCount'] groupStatDict[groupId]['hourly'][hour]['usageSum'] += hourlyDict[hour]['usageSum'] groupStatDict[groupId]['hourly'][hour]['usageCount'] += hourlyDict[hour]['usageCount'] # 经销商级别的汇总 if not dealerHourlyDict.has_key(hour): dealerHourlyDict[hour] = value else: dealerHourlyDict[hour]['offlineCount'] += hourlyDict[hour]['offlineCount'] dealerHourlyDict[hour]['offlineBusyCount'] += hourlyDict[hour]['offlineBusyCount'] dealerHourlyDict[hour]['usageSum'] += hourlyDict[hour]['usageSum'] dealerHourlyDict[hour]['usageCount'] += hourlyDict[hour]['usageCount'] totalOfflineTime += devTotalOfflineTime totalOfflineBusyTime += devTotalOfflineBusyTime totalOnlineTime += devTotalOnlineTime offlineCount += devOfflineCount offlineBusyCount += devOfflineBusyCount # 更新地址下的设备总数 groupStatDict[groupId].update({'devCount': groupDevCount}) # 找出地址下的最大繁忙度 groupHourly = groupStatDict[groupId].get('hourly', {}) peakUsage = 0 for hour, info in groupHourly.items(): usage = int(info['usageSum'] / info['usageCount']) if info['usageCount'] > 0 else 0 if usage >= peakUsage: peakUsage = usage groupStatDict[groupId]['peakUsage'] = peakUsage dealerInfo = { 'totalOfflineTime': totalOfflineTime, 'totalOfflineBusyTime': totalOfflineBusyTime, 'totalOnlineTime': totalOnlineTime, 'offlineCount': offlineCount, 'offlineBusyCount': offlineBusyCount, 'devCount': dealerDevCount, 'activedDevRatio': activedDevRatio, 'package': dealerPackage, 'hourly': dealerHourlyDict } # 将数据分别存入设备、地址、经销商的日报表中.设备活跃度,需要好好想下 for groupId in groupIds: for devNo in devNos: logicalCode = Device.get_logicalCode_by_devNo(devNo) devInfo = devStatDict.get(devNo, {}) DeviceDailyStat.get_collection().update({'logicalCode': logicalCode, 'date': preStrDay}, build_daily_update(devInfo), multi = True, upsert = True) groupInfo = groupStatDict.get(groupId, {}) GroupDailyStat.get_collection().update({'groupId': ObjectId(groupId), 'date': preStrDay}, build_daily_update(groupInfo), multi = True, upsert = True) DealerDailyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preStrDay}, build_daily_update(dealerInfo), multi = True, upsert = True) try: mouthActivedDevRatio = \ DealerDailyStat.get_collection().find({'dealerId': ObjectId(dealerId), 'date': preStrDay}, {'activedDevRatio': 1})[0]['activedDevRatio'] except Exception, e: mouthActivedDevRatio = 0 DealerMonthlyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preMonth}, build_month_update(dealerInfo, preDay, mouthActivedDevRatio), multi = True, upsert = True) logger.info('finished calc dealerId=%s info and insert into db now' % dealerId) # 统计本月新增加的用户量 以及所有的用户量 def calc_dealer_user_count(): preDaytime = datetime.datetime.now() - datetime.timedelta(days = 1) preStrDay = preDaytime.strftime(Const.DATE_FMT) preMonth = MONTH_DATE_KEY.format(year = preDaytime.year, month = preDaytime.month) endTime = to_datetime(preStrDay + ' 23:59:59') dealerIds = [str(dealer.id) for dealer in Dealer.objects.all().only('id')] for dealerId in dealerIds: try: groupIds = Group.get_group_ids_of_dealer(str(dealerId)) monthStartTime = to_datetime(preMonth + '-01 00:00:00') userAddedThisMonth = MyUser.get_user_count_by_filter(groupIds, { 'dateTimeAdded': {'$gte': monthStartTime, '$lte': endTime}}) DealerMonthlyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preMonth}, {'$set': {'addedUserCount': userAddedThisMonth}}) logger.info('dealerId=%s,added this month usercount=%s' % (dealerId, userAddedThisMonth)) # 统计所有的用户量 allUserCount = MyUser.get_user_count_by_filter(groupIds) Dealer.get_collection().update({'_id': ObjectId(dealerId)}, {'$set': {'userCount': allUserCount}}) logger.info('dealerId=%s,all user count=%s' % (dealerId, allUserCount)) except Exception, e: logger.error('stat user count e=%s' % e) def calc_dealer_stat_and_insert_into_db_for_dealers(dealerIds): nowTime = datetime.datetime.now() logger.info('calc dealer info now.....,dealers count=%s' % len(dealerIds)) for dealerId in dealerIds: calc_one_dealer_and_insert_into_db(dealerId, nowTime) logger.info('finished all: calc dealer info and insert into db now') def calc_dealer_stat_and_insert_into_db(): logger.info('start all: calc dealer info and insert into db now') dealerIds = [str(dealer.id) for dealer in Dealer.objects.all().only('id')] dealerCount = len(dealerIds) logger.info('calc dealer info now,all dealer count = %s' % dealerCount) pageSize = dealerCount / 3 pageNum = dealerCount / pageSize if dealerCount % pageSize == 0 else dealerCount / pageSize + 1 for pageIndex in range(pageNum): sliceDealerIds = dealerIds[pageIndex * pageSize:(pageIndex + 1) * pageSize] try: t = Thread(target = calc_dealer_stat_and_insert_into_db_for_dealers, kwargs = {'dealerIds': sliceDealerIds}) t.setName('calc_dealer_stat_%s' % pageIndex) t.setDaemon(False) t.start() except Exception as e: logger.exception(e) # {'startTime':startDate,'endTime':endDate,'ownerId':ownerId,'groupId':groupId,'logicalCode':logicalCode} def export_charge_order_excel_from_db(filepath, queryDict): logger.info('start export_charge_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict)) startTime = queryDict.get('startTime', None) endTime = queryDict.get('endTime', None) ownerId = queryDict.get('ownerId', '') groupId = queryDict.get('groupId', '') logicalCode = queryDict.get('logicalCode', '') phoneNumber = queryDict.get('phoneNumber') chargeTypeDict = {'recharge': u'充值', 'sendcoin': u'赠币', 'refund': u'退币', 'chargeCard': u'卡充值', 'chargeVirtualCard': u'虚拟卡充值'} groupIdList = Group.get_group_ids_of_dealer(ownerId) filters = { "ownerId": ownerId, "result": "success", } logger.info("ownerId is %s" % ownerId) if logicalCode: filters.update({"logicalCode": logicalCode}) if groupId: filters.update({"groupId": groupId}) if phoneNumber: phoneOwner = MyUser.objects.filter(phoneNumber = phoneNumber).first() if phoneOwner: filters.update({"openId": phoneOwner.openId}) partnerDict = {} for groupId in groupIdList: group = Group.get_group(groupId) partnerDict.update(group.get('partnerDict', {})) userDict = {} records = [] rcdList = [] logger.info("filter is %s" % filters) rechargeRcds = ClientRechargeModelProxy(st = startTime, et = endTime).all(**filters) for rcd in rechargeRcds: if rcd.via in ['refund', 'sendcoin']: continue openId = rcd.openId if not userDict.has_key(openId): try: user = MyUser.objects.filter(openId = openId).only('openId', 'nickname', 'sex', 'groupId', 'country', 'province', 'city', "gateWay", "productAgentId").first() sex = u'未知' if user.sex == 0: sex = u'女' elif user.sex == 1: sex = u'男' else: pass user = {'sex': sex, 'nickname': user.nickname, 'zone': '%s%s%s' % (user.country, user.province, user.city), "phoneNumber": user.phone} userDict[openId] = user except Exception, e: continue else: user = userDict[openId] from apps.web.common.proxy import ClientDealerIncomeModelProxy try: proxyRcd = ClientDealerIncomeModelProxy.get_one(ref_id = rcd.id, groupId = ObjectId(rcd.groupId)) # type: DealerIncomeProxy if not proxyRcd: raise ValueError("rechargeRecord <{}> has not dealer income proxy".format(rcd.id)) except Exception, e: continue # 获取代理商、经销商的收入 amountDict = DealerIncomeProxy.get_agent_partner_allocated_money(ownerId, proxyRcd.partition, partnerDict) userNickname = "{}-{}".format(user.get("nickname", ""), user.get("phoneNumber", "")) if user.get( "phoneNumber") else user.get("nickname") dataList = [ (u'逻辑编码', rcd.logicalCode), (u'IMEI', rcd.devNo), (u'设备类型', rcd.dev_type_name), (u'组名称', rcd.groupName), (u'组内编号', rcd.groupNumber), (u'组地址', rcd.address), (u'用户昵称', userNickname), (u'用户性别', user.get('sex', '')), (u'用户地域', user.get('zone', '')), (u'订单号', rcd.orderNo), (u'下单时间', rcd.to_datetime_str(rcd.dateTimeAdded)), (u'第三方支付单号', rcd.wxOrderNo), (u'支付方式', u'微信' if rcd.gateway == 'wechat' else u'支付宝'), (u'是否快捷支付', 'yes' if rcd.isQuickPay else 'no'), (u'充值金额', str(rcd.money)), (u'充值金币', str(rcd.coins)), (u'充值方式', chargeTypeDict.get(rcd.via, '')), # 卡充值、虚拟卡充值、直接充值等 (u'设备负责人分配收入', str(amountDict.get('ownerAmount'))), (u'代理商分配金额', str(amountDict.get('agentAmount'))) ] # 把所有合伙人的分配都记录下来 for partnerIncome in amountDict.get('partnerDict').values(): dataList.append((u'合伙人分配金额:%s(%s)' % (partnerIncome['nickname'], partnerIncome['username']), str(partnerIncome['money']))) records.append(OrderedDict(dataList)) generate_excel_report(filepath, records) logger.info('finished export_charge_order_excel_from_db') def export_consume_order_excel_from_db(filepath, queryDict): logger.info('start export_consume_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict)) startTime = queryDict.get('startTime', None) endTime = queryDict.get('endTime', None) ownerId = queryDict.get('ownerId', '') groupId = queryDict.get('groupId', '') logicalCode = queryDict.get('logicalCode', '') filters = { "isNormal": True } if logicalCode: filters.update({"logicalCode": logicalCode}) elif groupId: filters.update({"groupId": groupId}) else: groupIds = Group.get_group_ids_of_dealer(str(ownerId)) if not groupIds: return generate_excel_report(filepath, []) filters.update({'groupId__in': groupIds}) # 找出所有消费记录可以展示的消费细节信息,比如:消耗电量、使用时长等 servicedKeysTemp = [] rcdList = [] consume_orders = ClientConsumeModelProxy( st = startTime, et = endTime ).all(**filters) # type:List[ConsumeRecord] for consume_order in consume_orders: try: servicedKeysTemp.extend(consume_order.servicedInfo.keys()) rcdList.append(consume_order) except Exception as e: logger.exception('consume record id = {}, exception = {}'.format(str(consume_order.id), e)) servicedKeysTemp = list(set(servicedKeysTemp)) servicedKeys = [] for key in servicedKeysTemp: if key in DEALER_CONSUMPTION_AGG_KIND_TRANSLATION: servicedKeys.append(key) else: continue userDict = {} records = [] for rcd in rcdList: openId = rcd.openId if openId not in userDict: try: user = MyUser.objects.filter(openId = openId).only('nickname', 'phone').first() userDict[openId] = {'nickname': user.nickname, "phoneNumber": user.phone} user = userDict[openId] except Exception, e: continue else: user = userDict[openId] userNickname = "{}-{}".format(user.get("nickname", ""), user.get("phoneNumber", "")) if user.get( "phoneNumber") else user.get("nickname") dataList = [ (u'逻辑编码', rcd.logicalCode), (u'IMEI', rcd.devNo), (u'设备端口', rcd.attachParas.get('port', '-')), (u'设备类型', rcd.dev_type_name), (u'组名称', rcd.groupName), (u'组内编号', rcd.groupNumber), (u'组地址', rcd.address), (u'用户昵称', userNickname), (u'订单号', rcd.orderNo), (u'下单时间', rcd.to_datetime_str(rcd.dateTimeAdded)), (u'花费金币', str(rcd.coin)), (u'消费备注', str(rcd.remarks) if rcd.remarks else u'扫码消费'), (u'端口', str(rcd.used_port) if rcd.used_port != -1 else ''), (u'停止时间', rcd.to_datetime_str(rcd.device_finished_time)), (u'结束原因', rcd.servicedInfo.get('reason', '')) ] # 把消费细节也记录到excel表中 for key in servicedKeys: dataList.append((DEALER_CONSUMPTION_AGG_KIND_TRANSLATION.get(key), str(rcd.servicedInfo.get(key, '-')))) records.append(OrderedDict(dataList)) generate_excel_report(filepath, records) def export_API_order_excel_from_db(filepath, queryDict): logger.info('start export_API_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict)) startTime = queryDict.get('startTime', None) endTime = queryDict.get('endTime', None) ownerId = queryDict.get('ownerId', '') groupId = queryDict.get('groupId', '') logicalCode = queryDict.get('logicalCode', '') if logicalCode: dev = Device.get_dev_by_l(logicalCode) devNoList = [dev['devNo']] elif groupId: devNoList = Device.get_devNos_by_group([groupId]) else: devNoList = None if devNoList: rcds = APIStartDeviceRecord.objects(ownerId = ownerId, devNo__in = devNoList, errCode = 0, datetimeAdded__gte = startTime, datetimeAdded__lte = endTime).order_by('-datetimeAdded') else: rcds = APIStartDeviceRecord.objects(ownerId = ownerId, errCode = 0, datetimeAdded__gte = startTime, datetimeAdded__lte = endTime).order_by('-datetimeAdded') records = [] for rcd in rcds: dataList = [ (u'下单时间', rcd.to_datetime_str(rcd.datetimeAdded)), (u'订单编号', rcd.attachParas.get('extOrderNo', '-')), (u'用户ID', rcd['userId']), (u'订购时间', rcd.package.get('time', '-')), (u'退费金额', rcd.servicedInfo.get('backCoins', '-')), (u'消耗电量', rcd.servicedInfo.get('spendElec', '-')), (u'消费金额', str(rcd.package.get('price', '-'))), (u'地址', Group.get_groupName_by_logicalCode(rcd.deviceCode)), (u'设备编号', rcd.deviceCode), ] records.append(OrderedDict(dataList)) generate_excel_report(filepath, records) def export_on_points_order_excel_from_db(filepath, queryDict): logger.info('start export_onPoints_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict)) startTime = queryDict.get('startTime', None) endTime = queryDict.get('endTime', None) ownerId = queryDict.get('ownerId', '') groupId = queryDict.get('groupId', '') logicalCode = queryDict.get('logicalCode', '') filters = { "ownerId": ownerId, "time__gte": startTime, "time__lte": endTime } if logicalCode: dev = Device.get_dev_by_l(logicalCode) devNoList = [dev['devNo']] filters.update({"devNo": dev["devNo"]}) elif groupId: devNoList = Device.get_devNos_by_group([groupId]) filters.update({"devNo__in": devNoList}) else: devNoList = None rcds = UpscoreRecord.objects(**filters).order_by("-time") records = [] for rcd in rcds: dataList = [ (u'下分时间', rcd.time), (u'地址', rcd.groupName), (u'设备编号', rcd.logicalCode), (u'上分金额', str(rcd.score)), ] records.append(OrderedDict(dataList)) generate_excel_report(filepath, records) def export_send_coins_to_card_order_excel_from_db(filepath, queryDict): logger.info('start export_send_coins_to_card_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict)) startTime = queryDict.get('startTime', None) endTime = queryDict.get('endTime', None) ownerId = queryDict.get('ownerId', '') startDateTime = datetime.datetime.strptime(startTime, '%Y-%m-%d %H:%M:%S') endDateTime = datetime.datetime.strptime(endTime, '%Y-%m-%d %H:%M:%S') filters = { "ownerId": ownerId, "dateTimeAdded__gte": startDateTime, "dateTimeAdded__lte": endDateTime } rcds = UpCardScoreRecord.objects(**filters).order_by('-dateTimeAdded') records = [] for rcd in rcds: if rcd.address != '': groupName = rcd.address else: card = Card.objects.get(cardNo = rcd.cardNo) groupId = card.groupId if groupId: groupName = Group.get_group(groupId).groupName else: groupName = '' dataList = [ (u'实体卡号', rcd.cardNo), (u'上分数量', str(rcd.score)), (u'备注', rcd.remark), (u'绑定地址', groupName), (u'创建时间', rcd.dateTimeAdded.strftime('%Y-%m-%d %H:%M:%S')), ] records.append(OrderedDict(dataList)) # records = [] # for rcd in rcds: # dataList = [ # (u'实体卡号', rcd.cardNo), # (u'上分数量', str(rcd.score)), # (u'备注', rcd.remark), # (u'创建时间', rcd.dateTimeAdded.strftime('%Y-%m-%d %H:%M:%S')), # ] # # records.append(OrderedDict(dataList)) # generate_excel_report(filepath, records) def export_group_stat_excel_from_db(filepath, queryDict): def calc_group_income_stats(groupId, rcds, startDate, endDate): groupResult = GroupReport.get_rpt([groupId], startDate, endDate) offlineCoin = groupResult[groupId].get('lineCoins', 0) orderTotal, payIncome, offlineTime, totalTime, totalActivedRate = 0, RMB(0.0), 0, 0, 0 count = 0 for rcd in rcds: count += 1 daily = rcd.get('daily', {}) other = rcd.get('other', {}) orderTotal += daily.get('totalIncomeCount', 0) payIncome += daily.get('totalIncome', 0) offlineTime += other.get('totalOfflineTime', 0) totalTime = totalTime + other.get('totalOfflineTime', 0) + other.get('totalOnlineTime', 0) + other.get( 'totalOfflineBusyTime', 0) totalActivedRate += rcd.get('activedDevRatio', 0) return { 'orderTotal': orderTotal, 'payIncome': payIncome, 'offlineCoin': offlineCoin, 'dailyActivityRate': int(round(totalActivedRate / count, 2)) if count > 0 else 0, } def calc_group_consume_stats(groupId, rcds): def sum_fn(lst): return sum(map(lambda _: Quantity(_), lst), Quantity('0')) groupConsumptionMap = cum_stats(keys = ['daily', 'consumption'], stats = rcds, translate = translate_consumption, sum_fn = sum_fn) return groupConsumptionMap.get(groupId, None) logger.info('start export_consume_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict)) startTime = queryDict.get('startTime', None) endTime = queryDict.get('endTime', None) ownerId = queryDict.get('ownerId', '') from apps.web.agent.models import Agent from apps.web.dealer.models import Dealer dealer = Dealer.objects(id = ownerId).first() # type: Dealer agent = Agent.objects.get(id = dealer.agentId) # type: Agent records = [] groupIds = Group.get_group_ids_of_dealer(ownerId) for groupId in groupIds: group = Group.get_group(groupId) rcds = [rcd for rcd in GroupDailyStat.get_collection().find( {'groupId': ObjectId(groupId), 'date': {'$gte': startTime, '$lte': endTime}}, {'origin': 0, 'hourly': 0, '_id': 0}).sort('groupId')] incomeStats = calc_group_income_stats(groupId, rcds, startTime, endTime) consumeStats = calc_group_consume_stats(groupId, rcds) dataList = [ (u'组名称', group['groupName']), (u'组地址', group['address']), (u'总支付笔数', int(incomeStats.get('orderTotal'))), (u'总支付金额', str(incomeStats.get('payIncome'))), (u'线下投币', str(incomeStats.get('offlineCoin'))), (u'设备日均活跃度', int(incomeStats.get('dailyActivityRate'))), ] # 把消费细节也记录到excel表中 consumeStats = consumeStats if consumeStats else {} for info in consumeStats: if info['source'] not in agent.hide_consume_kinds_dealer: dataList.append((DEALER_CONSUMPTION_AGG_KIND_TRANSLATION.get(info['source']), str(info['value']))) records.append(OrderedDict(dataList)) # 先统计 generate_excel_report(filepath, records) def export_vcard_info_excel_from_db(filepath, queryDict): dealerId = queryDict.get('dealerId') # from apps.web.dealer.models import Dealer from apps.web.user.models import MyUser, UserVirtualCard # dealer = Dealer.objects.filter(id=dealerId) groups = Group.objects.filter(ownerId = dealerId) groupIds = map(lambda x: str(x.id), groups) users = MyUser.objects.filter(groupId__in = groupIds) openIds = map(lambda x: str(x.openId), users) vcards = UserVirtualCard.objects.filter(ownerOpenId__in = openIds, dealerId = dealerId, groupId__in = groupIds) result = [] for vcard in vcards: group = groups.filter(id = vcard.groupId).first() or Group() user = users.filter(openId = vcard.ownerOpenId, groupId = vcard.groupId).first() dataList = [ (u'地址', group.groupName), (u'卡名称', vcard.cardName), (u'卡号', vcard.cardNo), (u'卡主', vcard.nickname), (u'用户ID', str(user.id)), (u'用户电话', user.phone), (u'开卡时间', vcard.startTime.strftime("%Y-%m-%d %H:%M:%S")), (u'过期时间', vcard.expiredTime.strftime("%Y-%m-%d %H:%M:%S")), (u'过期时间', vcard.expiredTime.strftime("%Y-%m-%d %H:%M:%S")), (u'卡可用天数', vcard.periodDays), (u'备注', vcard.userDesc), (u'状态', vcard.status), ] quotaInfo = vcard.quotaInfo if quotaInfo: dataList.append((u'总额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quota'], 1))) dataList.append((u'已用额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quotaUsed'], 1))) dataList.append( (u'总剩余额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quota'] - quotaInfo['quotaUsed'], 1))) dataList.append((u'日额度()'.format(quotaInfo.get('dayQuotaUnit')), round(quotaInfo['dayQuota'], 1))) dataList.append((u'日已用额度()'.format(quotaInfo.get('dayQuotaUnit')), round(quotaInfo['dayUsed'], 1))) dataList.append((u'日剩余额度()'.format(quotaInfo.get('dayQuotaUnit')), round(quotaInfo['dayQuota'] - quotaInfo['dayUsed'], 1))) result.append(OrderedDict(dataList)) generate_excel_report(filepath, result) def export_group_user_account_excel_form_db(filepath, queryDict): def get_spendMoney(consumeRecord): if consumeRecord.aggInfo: coin = float(str(consumeRecord.aggInfo.get('coin',0))) else: coin = 0 return coin from apps.web.user.models import MyUser ownerId = queryDict.get('ownerId', '') startTime = queryDict.get('startTime', None) endTime = queryDict.get('endTime', None) groupIds = Group.get_group_ids_of_dealer(ownerId) sheet = [] usersDict = dict() # type:dict for groupId in groupIds: users = MyUser.objects.filter(groupId=groupId) consumeRecords = ConsumeRecord.objects.filter(groupId=groupId,dateTimeAdded__gte=startTime,dateTimeAdded__lte=endTime) rechargeRecords = RechargeRecord.objects.filter(groupId=groupId,dateTimeAdded__gte=startTime,dateTimeAdded__lte=endTime)# type:List group = Group.get_group(groupId) # 统计充值信息 effectiveRechargeRecords = [] effectiveConsumeRecords = [] for rechargeRecord in rechargeRecords: # 找出有效充值数据 for i in users: if i.openId == rechargeRecord.openId: effectiveRechargeRecords.append( {'nickName': i.nickname, 'openId': i.openId, 'recharge': float(str(rechargeRecord.money)), 'balance': float(str(i.balance))}) for effectiveRechargeRecord in effectiveRechargeRecords: # 将同相同用户的充值账单整理分类,并统计充值金额 if effectiveRechargeRecord['openId'] not in usersDict: usersDict.update({effectiveRechargeRecord['openId']:effectiveRechargeRecord}) else: usersDict[effectiveRechargeRecord['openId']]['recharge'] += effectiveRechargeRecord['recharge'] # 统计消费信息 for consumeRecord in consumeRecords: for i in users: if i.openId == consumeRecord.openId: effectiveConsumeRecords.append( {'nickName': i.nickname, 'openId': i.openId, 'consume': get_spendMoney(consumeRecord),'balance': float(str(i.balance))}) for effectiveConsumeRecord in effectiveConsumeRecords: if effectiveConsumeRecord['openId'] not in usersDict: usersDict.update({effectiveConsumeRecords['openId']: effectiveConsumeRecords}) else: if 'consume' not in usersDict[effectiveConsumeRecord['openId']]: usersDict[effectiveConsumeRecord['openId']].update({ 'consume' : effectiveConsumeRecord['consume'], }) else: usersDict[effectiveConsumeRecord['openId']]['consume'] += effectiveConsumeRecord['consume'] records = [] total_recharged = RMB(0) total_consumed = 0 balance = RMB(0) for openId,info in usersDict.items(): dataList = [ (u'用户',info.get('nickName')), (u"用户总充值",info.get('recharge',0)), (u"用户总消费",info.get('consume',0)), (u"用户账户余额",info.get('balance',0)), ] records.append(OrderedDict(dataList)) total_recharged += info.get('recharge',0) total_consumed += info.get('consume',0) balance += info.get('balance',0) records.append(OrderedDict([ (u'用户', u'总计'), (u"用户总充值", total_recharged), (u"用户总消费", total_consumed), (u"用户账户余额", balance), ])) sheet.append(ExcelSheet(data=records,name=group.groupName)) gernerate_excel_report_for_sheet(filepath, sheet) def poll_dealer_recharge_record(record_id, pay_app_type, interval, total_count): # type: (str, str, int, int)->None poller_cls = PayManager().get_poller(pay_app_type = pay_app_type) poller = poller_cls( record_id = record_id, interval = interval, total_count = total_count, record_cls = DealerRechargeRecord, post_pay = post_pay) # type: PayRecordPoller poller.start() def aggregate_records(records): totalIncome = RMB(0) actualTotalIncome = RMB(0) partDict = dict() for record in records: # type: DealerIncomeProxy totalIncome += RMB(record.totalAmount) actualTotalIncome += RMB(sum(record.actualAmountMap.values(), RMB(0))) for part in record.partition: if part["role"] == "agent": continue if RMB(part["money"]) == RMB(0.00): continue partDict[part["id"]] = partDict.setdefault(part["id"], RMB(0)) + part["money"] ledgerInfo = "" for k, _v in partDict.items(): dealer = Dealer.objects.filter(id = k).only("nickname", "username").first() ledgerInfo += "{}-{}-{}\r\n".format(dealer.nickname, dealer.username, _v) return { "totalIncome": totalIncome, "actualTotalIncome": actualTotalIncome, "ledgerInfo": ledgerInfo } def export_aggregate_dealer_income(filePath, queryDict, aggregateType): s = datetime.datetime.strptime(queryDict.pop("startTime"), "%Y-%m-%d") e = datetime.datetime.strptime(queryDict.pop("endTime"), "%Y-%m-%d") queryDict["dateTimeAdded__gte"] = s queryDict["dateTimeAdded__lt"] = e records = DealerIncomeProxy.objects.filter(**queryDict) if aggregateType == "date": ts = queryDict.get("dateTimeAdded__gte") te = queryDict.get("dateTimeAdded__lt") aggregateRes = {(ts + datetime.timedelta(day)).strftime("%Y-%m-%d"): list() for day in range((te - ts).days)} for record in records: aggregateRes[record.dateTimeAdded.strftime(Const.DATE_FMT)].append(record) elif aggregateType == "group": groupIds = queryDict.get("groupId__in") aggregateRes = {g: list() for g in groupIds} for record in records: aggregateRes[str(record.groupId)].append(record) else: logger.error("undefined aggregate type!") return data = list() for key, items in aggregateRes.items(): tempAggregate = aggregate_records(items) if aggregateType == "group": group = Group.get_group(key) tempAggregate.update({"groupName": group.get("groupName")}) else: tempAggregate.update({"time": key}) data.append(tempAggregate) writeDataList = list() a = RMB(0) b = RMB(0) _k = "groupName" if aggregateType == "group" else "time" for v in sorted(data, key = lambda x: x[_k]): if aggregateType == "group": tempData = [(u"地址名称", v.get("groupName"))] else: tempData = [(u"日期", v.get("time")), ] tempData.extend([ (u"总收益", "{0:.2f}".format(float(v.get("totalIncome")))), (u"总分帐金额", "{0:.2f}".format(float(v.get("actualTotalIncome")))), (u"分账明细", v.get("ledgerInfo")) ]) a += v.get("totalIncome") b += v.get("actualTotalIncome") writeDataList.append(OrderedDict(tempData)) else: if aggregateType == "group": tempData = [(u"地址名称", u"总计")] else: tempData = [(u"日期", u"总计")] tempData.extend([ (u"总收益", "{:.2f}".format(float(a))), (u"总分帐金额", "{:.2f}".format(float(b))), (u"分账明细", u"") ]) writeDataList.append(OrderedDict(tempData)) generate_excel_report(filePath, writeDataList) def dealer_auto_withdraw(): """ 经销商自动提现的脚本 每日走一次 :return: """ # 找出需要 执行脚本的经销商 dealers = Dealer.get_auto_withdraw_dealers() needExecuteDealers = list() # 筛选处 符合条件的 经销商 主要是有金额 for dealer in dealers: # type: Dealer strategy = dealer.auto_withdraw_strategy # 按周提现 if strategy['type'] == 'asWeek': if datetime.datetime.now().isoweekday() == strategy['value']: logger.debug('{} has open auto withdraw switch.'.format(repr(dealer))) needExecuteDealers.append(dealer) # 每天提现 elif strategy['type'] == 'asDay': logger.debug('{} has open auto withdraw switch.'.format(repr(dealer))) needExecuteDealers.append(dealer) # 按月提现 elif strategy['type'] == "asMonth": if datetime.datetime.now().day == strategy['value']: logger.debug('{} has open auto withdraw switch.'.format(repr(dealer))) needExecuteDealers.append(dealer) else: logger.error('{} auto withdraw strategy type <{}> error'.format(repr(dealer), strategy['type'])) for dealer in needExecuteDealers: # type: Dealer try: for incomeType in DEALER_INCOME_TYPE.choices(): withdrawInfoList = Dealer.get_income_balance_list(dealer, incomeType, settings.WITHDRAW_MINIMUM) bank_card_no = None autoWithdrawType = dealer.withdrawOptions.get("autoWithdrawType") if autoWithdrawType == 'wechat': if not dealer.auto_withdraw_bound_open_id: logger.warning( 'dealer of auto withdraw has no bound open id.'.format(str(dealer.id))) continue else: dealer.withdraw_open_id = dealer.auto_withdraw_bound_open_id else: bank_card_no = dealer.auto_withdraw_bank_account if not bank_card_no: logger.warning( 'dealer of auto withdraw has no bound bank account.'.format(str(dealer.id))) continue for sourceKey, balance in withdrawInfoList: try: result = DealerWithdrawService( payee = dealer, income_type = incomeType, amount = balance, pay_type = autoWithdrawType, bank_card_no = bank_card_no).execute(sourceKey, True) logger.info("auto withdraw success, result is <{}>".format(result)) except Exception as e: logger.exception(e) except Exception as e: logger.exception(e) def dealer_auto_charge_sim_card(): """ 经销商自动充值SIM卡 :return: """ def consume_dealer_balance(pay_gateway, incomeType, dealer, cost): source_key = pay_gateway.withdraw_source_key() fundKey = dealer.fund_key(income_type = incomeType, source_key = source_key) filter = { '_id': dealer.id, '{fundKey}.balance'.format(fundKey = fundKey): {'$gte': cost.mongo_amount} } update = { '$inc': {'{fundKey}.balance'.format(fundKey = fundKey): (-cost).mongo_amount} } result = Dealer.get_collection().update_one(filter, update, upsert = False) if result.matched_count == 1 and result.modified_count == 1: return True else: return False devices = Device.get_sim_expire_notify_devices(extra_filter = {'simChargeAuto': True}) logger.info('start auto charge sim card, %s devices will be expired' % len(devices)) from cytoolz import groupby devMap = groupby('ownerId', devices) # TODO 需要平台资金池合并成一个, 不在通过具体的支付网关来确认提现网关 pay_gateway = get_platform_wallet_pay_gateway(AppPlatformType.PLATFORM) for ownerId, devs in devMap.items(): dealer = Dealer.objects(id = ownerId).first() if not dealer: logger.warning('dealer not exists.'.format(ownerId)) continue groupMap = {} devObjs = list(Device.objects(logicalCode__in = [dev['logicalCode'] for dev in devs])) for income_type in DEALER_INCOME_TYPE.choices(): can_used_balance = dealer.sub_balance(income_type, pay_gateway.withdraw_source_key()) if can_used_balance <= RMB(0): continue for devObj in devObjs[:]: # type: Device try: logger.info('start auto charge sim of {}.'.format(devObj)) if ownerId != devObj.ownerId: logger.warning( 'auto charge sim failure of {}. {} not equal {}. may be unregister.'.format( devObj, devObj.ownerId, ownerId)) devObjs.remove(devObj) continue if not devObj.is_expire_in_this_month: continue devItems = [devObj] rcd = create_dealer_sim_charge_order(pay_gateway, dealer, devItems, 'auto', can_used_balance) if not rcd: devObjs.remove(devObj) continue costMoney = RMB(rcd.totalFee / 100.0) result = consume_dealer_balance(pay_gateway, income_type, dealer, costMoney) if result: logger.info('auto charge sim of {} success.'.format(devObj)) rcd.succeed(wxOrderNo = rcd.orderNo) post_sim_recharge(rcd) if devObj.groupId not in groupMap: group = Group.get_group(devObj.groupId) groupMap[devObj.groupId] = group else: group = groupMap[devObj.groupId] income_record = RechargeRecord.issue_from_auto_sim_order(dealer, rcd, devObj, group) record_income_proxy(DEALER_INCOME_SOURCE.AUTO_SIM, income_record, { "owner": [ { "money": (-costMoney).mongo_amount, "role": "owner", "share": Percent("100.0").mongo_amount, "id": str(dealer.id) } ], 'partner': [] }) can_used_balance = can_used_balance - costMoney devObjs.remove(devObj) else: logger.info('auto charge sim of {} failure to cancel it.'.format(devObj)) rcd.cancel() break except Exception, e: logger.exception(e) def auto_charge_sim_card(dealerId): """ 仅手工执行, 对单个经销商进行SIM卡自动充值 :param dealerId: :return: """ def consume_dealer_balance(pay_gateway, incomeType, dealer, cost): source_key = pay_gateway.withdraw_source_key() fundKey = dealer.fund_key(income_type = incomeType, source_key = source_key) filter = { '_id': dealer.id, '{fundKey}.balance'.format(fundKey = fundKey): {'$gte': cost.mongo_amount} } update = { '$inc': {'{fundKey}.balance'.format(fundKey = fundKey): (-cost).mongo_amount} } result = Dealer.get_collection().update_one(filter, update, upsert = False) if result.matched_count == 1 and result.modified_count == 1: logger.debug('dec dealer money = {}, fundKey = {}'.format(str(dealer.id), cost, fundKey)) return True else: return False devices = Device.get_sim_expire_notify_devices(dealer_id = dealerId, extra_filter = {'simChargeAuto': True}) logger.info('start auto charge sim card, %s devices will be expired' % len(devices)) from cytoolz import groupby devMap = groupby('ownerId', devices) # TODO 需要平台资金池合并成一个, 不在通过具体的支付网关来确认提现网关 pay_gateway = get_platform_wallet_pay_gateway(AppPlatformType.PLATFORM) for ownerId, devs in devMap.items(): dealer = Dealer.objects(id = ownerId).first() if not dealer: logger.warning('dealer not exists.'.format(ownerId)) continue groupMap = {} devObjs = list(Device.objects(logicalCode__in = [dev['logicalCode'] for dev in devs])) for income_type in DEALER_INCOME_TYPE.choices(): can_used_balance = dealer.sub_balance(income_type, pay_gateway.withdraw_source_key()) if can_used_balance <= RMB(0): continue for devObj in devObjs[:]: # type: Device try: logger.info('start auto charge sim card,the device is auto charge,devNo=%s' % devObj.devNo) if ownerId != devObj.ownerId: logger.warning( 'ownerId of device not equal {}. may be unregister.'.format( devObj.devNo, devObj.ownerId, ownerId)) devObjs.remove(devObj) continue if not devObj.is_expire_in_this_month: continue devItems = [devObj] rcd = create_dealer_sim_charge_order(pay_gateway, dealer, devItems, 'auto', can_used_balance) if not rcd: devObjs.remove(devObj) continue costMoney = RMB(rcd.totalFee / 100.0) result = consume_dealer_balance(pay_gateway, income_type, dealer, costMoney) if result: rcd.succeed(wxOrderNo = rcd.orderNo) post_sim_recharge(rcd) logger.info('charge sim card success,devNo=%s' % devObj.devNo) if devObj.groupId not in groupMap: group = Group.get_group(devObj.groupId) groupMap[devObj.groupId] = group else: group = groupMap[devObj.groupId] income_record = RechargeRecord.issue_from_auto_sim_order(dealer, rcd, devObj, group) record_income_proxy(DEALER_INCOME_SOURCE.AUTO_SIM, income_record, { "owner": [ { "money": (-costMoney).mongo_amount, "role": "owner", "share": Percent("100.0").mongo_amount, "id": str(dealer.id) } ], 'partner': [] }) can_used_balance = can_used_balance - costMoney devObjs.remove(devObj) else: rcd.cancel() break except Exception, e: logger.exception(e) def batch_set_device_params(logicalCodes, updateConf, lastSetConf, operationId): logger.info( 'start batch_set_device_params, logicalCodes=<{}>, updateConf=<{}>, lastSetConf=<{}>, operationId=<{}>'.format( logicalCodes, updateConf, lastSetConf, operationId)) from taskmanager.mediator import task_caller for logicalCode in logicalCodes: task_caller('set_device_params', logicalCode = logicalCode, updateConf = updateConf, lastSetConf = lastSetConf, operationId = operationId) def set_device_params(logicalCode, updateConf, lastSetConf, operationId): logger.info( 'start set_device_params, logicalCodes=<{}>, updateConf=<{}>, lastSetConf=<{}>, operationId=<{}>'.format( logicalCode, updateConf, lastSetConf, operationId)) dev = Device.get_dev_by_l(logicalCode) operation = OperatorLog.record_dev_setting_changes_log(dev.owner, 'record_someone_set_devSettings', dev['logicalCode'], dev['devType']['code'], { 'updateConfStr': json.dumps(updateConf), 'operationId': operationId, 'status': 'waiting' }) box = ActionDeviceBuilder.create_action_device(dev) # type: SmartBox from apps.web.device.models import RequestBodyDict requestBody = RequestBodyDict({"POST": updateConf}) try: if not lastSetConf: lastSetConf = box.get_dev_setting() box.set_device_function(requestBody, lastSetConf) box.set_device_function_param(requestBody, lastSetConf) except Exception, e: logger.exception('error happened, error=%s' % (e,)) operation.update(content__status = 'fail', content__beforeCacheStr = json.dumps(lastSetConf)) return 'set_device_params logicalCode=<{}> is fail'.format(logicalCode) else: operation.update(content__status = 'success', content__beforeCacheStr = json.dumps(lastSetConf)) return 'set_device_params logicalCode=<{}> is success'.format(logicalCode) def batch_set_server_settings(operator, logicalCodes, payload, ownerId, devTypeCode, operationId): logger.debug( 'batch_set_server_settings, operator={}, logicalCodes={}, payload={}, ownerId={}, devTypeCode={}, operationId={}'.format( operator, logicalCodes, payload, ownerId, devTypeCode, operationId)) from taskmanager.mediator import task_caller for logicalCode in logicalCodes: task_caller('set_server_settings', operator = operator, logicalCode = logicalCode, payload = payload, ownerId = ownerId, devTypeCode = devTypeCode, operationId = operationId) def set_server_settings(operator, logicalCode, payload, ownerId, devTypeCode, operationId): logger.info( 'set_server_settings, operator={}, logicalCode={}, payload={}, ownerId={}, devTypeCode={}, operationId={}'.format( operator, logicalCode, payload, ownerId, devTypeCode, operationId)) device = Device.get_dev_by_l(logicalCode) # type: DeviceDict _operator = namedtuple('Operator', ['id', 'username', 'role'])(**operator) operation = OperatorLog.log_dev_operation( operator = _operator, device = device, operator_name = 'setServerSetting', content = { 'operationId': operationId, 'after': payload, 'status': 'waiting' }) # type: OperatorLog try: if device.ownerId != ownerId: raise Exception(u'无法操作设备') if device.devTypeCode != devTypeCode: raise Exception(u'无法操作该类型设备') before = device.deviceAdapter.get_server_setting() device.deviceAdapter.set_server_setting(payload) operation.update(content__status = 'success', content__before = before) except Exception, e: logger.exception('error happened, error=%s' % (e,)) operation.update(content__status = 'fail') def push_shanghai_platform_heatbeat(): logger.info('start push_shanghai_platform_heatbeat') from apps.web.south_intf.shanghai_urban_data_collection_platform import ShangHaiUrbanDataCollectionPlatform from apps.web.south_intf.shanghai_urban_data_collection_platform import ShangHaiUrbanDataCollectionPlatformModel from apps.web.device.models import Device all_dealer_info = ShangHaiUrbanDataCollectionPlatformModel.all_dealers() for _one_info in all_dealer_info: devs = Device.get_devs_by_ownerId(_one_info['dealerId']) for dev in devs: try: ShangHaiUrbanDataCollectionPlatform(dev, **_one_info).celery_push_heatbeat() except: import traceback logger.error(traceback.format_exc()) return 'success' def export_modify_customer_balance_record_excel_from_db(filepath, queryDict): customerOpenId = queryDict.get("customerOpenId") pageIndex = queryDict.get("pageIndex") pageSize = queryDict.get("pageSize") ownerId = queryDict.get("ownerId") from apps.web.user.models import MyUser user = MyUser.objects(openId=customerOpenId).first() if not user: records = RechargeRecord.objects( ownerId=ownerId, via='sendcoin').order_by('-dateTimeAdded').skip((pageIndex - 1) * pageSize).limit(pageSize) else: records = RechargeRecord.objects( ownerId=ownerId, openId=customerOpenId, via='sendcoin').order_by('-dateTimeAdded').skip((pageIndex - 1) * pageSize).limit(pageSize) dataList = [] result = [] if not user: for r in records: u = MyUser.objects(openId=r.openId).first() if u is None: logger.error('invalid openId, openId=%s' % r.openId) continue dataList.extend([ ('用户名称',u.nickname if u.nickname is not None else ''), ('用户ID', customerOpenId), ('操作人员',r.operator), ('派币金额', r.coins), ('派币地址', r.groupName), ('派币时间', r.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S")), ('备注', r.desc),] ) result.append(OrderedDict(dataList)) else: for r in records: dataList.extend( [ ('用户名称', user.nickname if user.nickname is not None else ''), ('用户ID', customerOpenId), ('操作人员', r.operator), ('派币金额', r.coins), ('派币地址', r.groupName), ('派币时间', r.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S")), ('备注', r.desc) ] ) result.append(OrderedDict(dataList)) generate_excel_report(filepath, result) def ledger_consume_order_stats(date=None, statsId=None): # type: (Optional[str, datetime], str) -> None """ 统计经销商的每日分润的收益 """ # 获取时间戳的转换 data = date or datetime.datetime.now() - datetime.timedelta(days=1) if isinstance(date, datetime.datetime): date = date.strftime("%Y-%m-%d") if statsId: query = DealerGroupStats.objects.filter(id=statsId) else: query = DealerGroupStats.objects.filter(date=date) # 处理每一单的分账任务 for _stats in query: try: LedgerConsumeOrder(_stats).execute() except Exception as e: logger.exception("[ledger_consume_order_stats] stats <{}> execute ledger error ={}".format(_stats, e))