check_launder_money.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. # coding=utf-8
  2. import csv
  3. import sys
  4. import os
  5. import pandas
  6. import datetime
  7. from collections import OrderedDict
  8. from base import init_env
  9. env = sys.argv[1]
  10. os.environ.setdefault('DJANGO_SETTINGS_MODULE', env)
  11. init_env(interactive=False)
  12. from apps.web.user.models import MyUser, RechargeRecord
  13. from apps.web.dealer.models import Dealer
  14. from apps.web.device.models import Group
  15. from apilib.monetary import RMB
  16. class LaunderChecker(object):
  17. def __init__(self, days=None, maxLimit=None, filePath=None):
  18. if days is None:
  19. days = 1
  20. if maxLimit is None:
  21. self._maxLimit = 500
  22. else:
  23. self._maxLimit = maxLimit
  24. self._startTime, self._endTime, self._today = self.get_time(days)
  25. if filePath is None:
  26. filePath = os.path.join("/var/www/upload/launderCheck", "{}.xlsx".format(str(self._today)))
  27. else:
  28. filePath = os.path.join(filePath, "{}.xlsx".format(str(self._today)))
  29. self._writer = pandas.ExcelWriter(filePath)
  30. @staticmethod
  31. def get_time(days):
  32. today = datetime.date.today() - datetime.timedelta(days=days)
  33. return datetime.datetime.combine(today, datetime.datetime.min.time()), datetime.datetime.combine(today, datetime.datetime.max.time()), today
  34. @staticmethod
  35. def get_sum_recharge_by_openId(startTime, endTime):
  36. """
  37. 以openId聚合 阶段时间的充值综合 筛选出超过充值上限的人
  38. :param startTime: 阶段时间开始
  39. :param endTime: 阶段时间结束
  40. :return:
  41. """
  42. matchFilters = {
  43. "dateTimeAdded": {
  44. "$gte": startTime,
  45. "$lte": endTime
  46. },
  47. "result": RechargeRecord.PayResult.SUCCESS,
  48. }
  49. projectFilters = {
  50. "_id": 1,
  51. "money": 1,
  52. "coins": 1,
  53. "openId": 1
  54. }
  55. groupFilters = {
  56. "_id": "$openId",
  57. "sum_recharge": {"$sum": "$money"},
  58. }
  59. renameProject = {
  60. "_id": 0,
  61. "openId": "$_id",
  62. "recharge": "$sum_recharge"
  63. }
  64. records = RechargeRecord.get_collection().aggregate([
  65. {"$match": matchFilters},
  66. {"$project": projectFilters},
  67. {"$group": groupFilters},
  68. {"$project": renameProject}
  69. ])
  70. for record in records:
  71. yield record
  72. def check_recharge(self, record):
  73. """
  74. 检验是否最大额超出
  75. :param record:
  76. :return:
  77. """
  78. flag = False
  79. if RMB(record.get("recharge", 0)) > RMB(self._maxLimit):
  80. flag = True
  81. return flag
  82. @staticmethod
  83. def get_recharge_record(openId, startTime, endTime):
  84. """
  85. 获取 涉嫌用户的 当天充值记录
  86. :param openId:
  87. :param startTime:
  88. :param endTime:
  89. :return:
  90. """
  91. user = MyUser.objects.filter(openId=openId).first()
  92. records = RechargeRecord.objects.filter(openId=openId, dateTimeAdded__gte=startTime, dateTimeAdded__lte=endTime)
  93. dataList = list()
  94. for record in records:
  95. dealer = Dealer.get_dealer(record.ownerId)
  96. if not dealer:
  97. dealer = dict()
  98. group = Group.get_group(record.groupId)
  99. if not group:
  100. group = dict()
  101. tempData = [
  102. (u"用户名称", user.nickname),
  103. (u"用户ID", openId),
  104. (u"花钱地址", group.get("address", "-")),
  105. (u"组名称", group.get("groupName", "-")),
  106. (u"设备编号", record.devNo),
  107. (u"逻辑编号", record.logicalCode),
  108. (u"设备类型", record.devType),
  109. (u"经销商", dealer.get("nickname", "-")),
  110. (u"经销商电话", dealer.get("username", "-")),
  111. (u"单笔消费", str(record.money)),
  112. (u"消费时间", record.time),
  113. ]
  114. dataList.append(OrderedDict(tempData))
  115. return dataList
  116. def write_recharge_record(self, dataList, sheetName):
  117. """
  118. 写入数据
  119. :param dataList:
  120. :param sheetName:
  121. :return:
  122. """
  123. dataForm = pandas.DataFrame(dataList)
  124. dataForm.to_excel(self._writer, sheet_name=sheetName, index=False)
  125. def save(self):
  126. self._writer.save()
  127. def run(self):
  128. launderOpenIds = list()
  129. launderRecords = list()
  130. for record in self.get_sum_recharge_by_openId(self._startTime, self._endTime):
  131. if self.check_recharge(record):
  132. launderOpenIds.append(record.get("openId"))
  133. data = [
  134. ("openId", record.get("openId")),
  135. ("recharge", str(record.get("recharge")))
  136. ]
  137. launderRecords.append(OrderedDict(data))
  138. print record
  139. self.write_recharge_record(launderRecords, sheetName=u"总计")
  140. for openId in launderOpenIds:
  141. dataList = self.get_recharge_record(openId, self._startTime, self._endTime)
  142. self.write_recharge_record(dataList, sheetName=openId)
  143. self.save()
  144. print "OK"
  145. if __name__ == '__main__':
  146. """
  147. days 从今天向前推几天 今天是10号 查8号的, days就是2, 默认是1
  148. maxLimit 检测充值金额的上线,默认是500
  149. filePath 文件存储文件夹 默认是/var/www/upload/launderCheck/
  150. """
  151. LaunderChecker(days=1).run()