# coding=utf-8 import csv import sys import os import pandas import datetime from collections import OrderedDict from base import init_env env = sys.argv[1] os.environ.setdefault('DJANGO_SETTINGS_MODULE', env) init_env(interactive=False) from apps.web.user.models import MyUser, RechargeRecord from apps.web.dealer.models import Dealer from apps.web.device.models import Group from apilib.monetary import RMB class LaunderChecker(object): def __init__(self, days=None, maxLimit=None, filePath=None): if days is None: days = 1 if maxLimit is None: self._maxLimit = 500 else: self._maxLimit = maxLimit self._startTime, self._endTime, self._today = self.get_time(days) if filePath is None: filePath = os.path.join("/var/www/upload/launderCheck", "{}.xlsx".format(str(self._today))) else: filePath = os.path.join(filePath, "{}.xlsx".format(str(self._today))) self._writer = pandas.ExcelWriter(filePath) @staticmethod def get_time(days): today = datetime.date.today() - datetime.timedelta(days=days) return datetime.datetime.combine(today, datetime.datetime.min.time()), datetime.datetime.combine(today, datetime.datetime.max.time()), today @staticmethod def get_sum_recharge_by_openId(startTime, endTime): """ 以openId聚合 阶段时间的充值综合 筛选出超过充值上限的人 :param startTime: 阶段时间开始 :param endTime: 阶段时间结束 :return: """ matchFilters = { "dateTimeAdded": { "$gte": startTime, "$lte": endTime }, "result": RechargeRecord.PayResult.SUCCESS, } projectFilters = { "_id": 1, "money": 1, "coins": 1, "openId": 1 } groupFilters = { "_id": "$openId", "sum_recharge": {"$sum": "$money"}, } renameProject = { "_id": 0, "openId": "$_id", "recharge": "$sum_recharge" } records = RechargeRecord.get_collection().aggregate([ {"$match": matchFilters}, {"$project": projectFilters}, {"$group": groupFilters}, {"$project": renameProject} ]) for record in records: yield record def check_recharge(self, record): """ 检验是否最大额超出 :param record: :return: """ flag = False if RMB(record.get("recharge", 0)) > RMB(self._maxLimit): flag = True return flag @staticmethod def get_recharge_record(openId, startTime, endTime): """ 获取 涉嫌用户的 当天充值记录 :param openId: :param startTime: :param endTime: :return: """ user = MyUser.objects.filter(openId=openId).first() records = RechargeRecord.objects.filter(openId=openId, dateTimeAdded__gte=startTime, dateTimeAdded__lte=endTime) dataList = list() for record in records: dealer = Dealer.get_dealer(record.ownerId) if not dealer: dealer = dict() group = Group.get_group(record.groupId) if not group: group = dict() tempData = [ (u"用户名称", user.nickname), (u"用户ID", openId), (u"花钱地址", group.get("address", "-")), (u"组名称", group.get("groupName", "-")), (u"设备编号", record.devNo), (u"逻辑编号", record.logicalCode), (u"设备类型", record.devType), (u"经销商", dealer.get("nickname", "-")), (u"经销商电话", dealer.get("username", "-")), (u"单笔消费", str(record.money)), (u"消费时间", record.time), ] dataList.append(OrderedDict(tempData)) return dataList def write_recharge_record(self, dataList, sheetName): """ 写入数据 :param dataList: :param sheetName: :return: """ dataForm = pandas.DataFrame(dataList) dataForm.to_excel(self._writer, sheet_name=sheetName, index=False) def save(self): self._writer.save() def run(self): launderOpenIds = list() launderRecords = list() for record in self.get_sum_recharge_by_openId(self._startTime, self._endTime): if self.check_recharge(record): launderOpenIds.append(record.get("openId")) data = [ ("openId", record.get("openId")), ("recharge", str(record.get("recharge"))) ] launderRecords.append(OrderedDict(data)) print record self.write_recharge_record(launderRecords, sheetName=u"总计") for openId in launderOpenIds: dataList = self.get_recharge_record(openId, self._startTime, self._endTime) self.write_recharge_record(dataList, sheetName=openId) self.save() print "OK" if __name__ == '__main__': """ days 从今天向前推几天 今天是10号 查8号的, days就是2, 默认是1 maxLimit 检测充值金额的上线,默认是500 filePath 文件存储文件夹 默认是/var/www/upload/launderCheck/ """ LaunderChecker(days=1).run()