tasks.py 93 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. from collections import OrderedDict, namedtuple
  5. from collections import defaultdict
  6. from threading import Thread
  7. import arrow
  8. import pandas
  9. import simplejson as json
  10. from bson.objectid import ObjectId
  11. from celery.utils.log import get_task_logger
  12. from django.conf import settings
  13. from typing import Dict, Optional, TYPE_CHECKING, List
  14. from apilib.monetary import RMB, Percent
  15. from apilib.quantity import Quantity
  16. from apilib.utils_datetime import timestamp_to_dt
  17. from apilib.utils_datetime import to_datetime
  18. from apps.web.agent.models import Agent
  19. from apps.web.api.models import APIStartDeviceRecord
  20. from apps.web.common.models import OperatorLog, DealerDoneReportRecord
  21. from apps.web.common.proxy import ClientRechargeModelProxy, ClientConsumeModelProxy
  22. from apps.web.common.transaction import WITHDRAW_PAY_TYPE
  23. from apps.web.common.transaction.pay import PayRecordPoller, PayManager
  24. from apps.web.constant import Const, RechargeRecordVia, AppPlatformType
  25. from apps.web.constant import DEALER_CONSUMPTION_AGG_KIND_TRANSLATION, MONTH_DATE_KEY
  26. from apps.web.core.helpers import ActionDeviceBuilder
  27. from apps.web.core.messages import SmsVendorCode
  28. from apps.web.core.messages.sms import SMSSender
  29. from apps.web.core.sysparas import SysParas
  30. from apps.web.core.utils import generate_excel_report, gernerate_excel_report_for_sheet
  31. from apps.web.dealer.accessors import income_business_stat, consume_business_stat, monthly_business_stat
  32. from apps.web.dealer.define import DEALER_INCOME_TYPE, DEALER_INCOME_SOURCE
  33. from apps.web.dealer.models import Dealer, DealerRechargeRecord, UpscoreRecord, UpCardScoreRecord
  34. from apps.web.dealer.proxy import DealerIncomeProxy, record_income_proxy, DealerGroupStats
  35. from apps.web.dealer.utils import create_dealer_sim_charge_order
  36. from apps.web.dealer.transaction import post_pay, post_sim_recharge
  37. from apps.web.dealer.withdraw import DealerWithdrawService
  38. from apps.web.device.models import DevStatusRecord, Device, Group, DeviceDict
  39. from apps.web.device.timescale import SignalManager
  40. from apps.web.helpers import get_wechat_manager_mp_proxy, get_platform_wallet_pay_gateway
  41. from apps.web.report.ledger import LedgerConsumeOrder
  42. from apps.web.report.models import DealerReport, DealerDailyStat, DeviceDailyStat, GroupDailyStat, DealerMonthlyStat
  43. from apps.web.report.models import GroupReport
  44. from apps.web.report.utils import cum_stats, translate_consumption
  45. from apps.web.user.models import ConsumeRecord, MyUser, RechargeRecord, Card
  46. if TYPE_CHECKING:
  47. from apps.web.core.adapter.base import SmartBox
  48. logger = get_task_logger(__name__)
  49. def report_feedback_to_dealer_via_wechat(dealerId, nickname, msg, feedbackTime):
  50. """
  51. TODO
  52. :return:
  53. """
  54. if not settings.SEND_DEALER_WECHAT_PUSH_MESSAGE:
  55. logger.info('SEND_DEALER_WECHAT_PUSH_MESSAGE is turned off, you are probably in dev env')
  56. else:
  57. dealer = Dealer.objects(id = str(dealerId)).first() # type: Optional[Dealer]
  58. if not dealer:
  59. return {'info': 'dealer does not exist, id=%s' % (dealerId,)}
  60. if not dealer.managerialOpenId:
  61. return {'info': 'no manager open Id for dealer %s' % (dealer.username,)}
  62. wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer)
  63. return wechat_mp_proxy.notify(dealer.managerialOpenId, 'feedback', **{
  64. 'title': msg,
  65. 'nickname': nickname,
  66. 'feedbackTime': feedbackTime
  67. })
  68. def report_daily_report_to_dealer_via_wechat(dealer = None):
  69. # type:(Optional[Dealer])->None
  70. """
  71. 每日收益次日推送(9点)
  72. `模版微信公众号后台ID` : OPENTM401155654
  73. :return:
  74. """
  75. sent = set([])
  76. def _report(dealer):
  77. # type:(Dealer)->dict
  78. logger.info('sending daily income report to %r' % (dealer,))
  79. if not dealer.managerialOpenId:
  80. return {'info': 'no managerial open Id for %r' % (dealer,)}
  81. agent = Agent.objects(id = dealer.agentId).first()
  82. if not agent:
  83. return {'info': 'no agent found by %r' % (dealer,)}
  84. if not agent.is_in_domain:
  85. return {'info': 'is not my agent. agentId = %s' % (str(dealer.agentId),)}
  86. yesterday = datetime.datetime.now() - datetime.timedelta(days = 1)
  87. offline_report = DealerReport.objects(ownerId = str(dealer.id),
  88. type = 'day',
  89. date = yesterday.strftime("%Y-%m-%d")).first() # type: DealerReport
  90. # getattr 加上默认参数 防止获取不到报错
  91. stat = getattr(DealerDailyStat.objects(
  92. date = yesterday.date().strftime(Const.DATE_FMT),
  93. dealerId = ObjectId(dealer.id)).only('daily').first(), 'daily', None)
  94. if not stat:
  95. stat = {}
  96. if offline_report is None:
  97. offline_coins = 0
  98. else:
  99. if offline_report.rpt is None:
  100. offline_coins = 0
  101. else:
  102. offline_coins = offline_report.rpt.get('lineCoins')
  103. income_map = {
  104. 'offline_coins': offline_coins,
  105. 'online_income': RMB(stat.get('income', {}).get(RechargeRecordVia.Balance, 0)) +
  106. RMB(stat.get('income', {}).get(RechargeRecordVia.Redpack, 0)) +
  107. RMB(stat.get('income', {}).get(RechargeRecordVia.Card, 0)) +
  108. RMB(stat.get('income', {}).get(RechargeRecordVia.VirtualCard, 0)),
  109. 'refund_cash': RMB(stat.get('income', {}).get(RechargeRecordVia.RefundCash, 0))
  110. }
  111. if abs(income_map['refund_cash']) > RMB(0):
  112. report = u'线下投币(%s)次 在线支付收入(%s)元 现金退款(%s)元' % (
  113. income_map['offline_coins'], income_map['online_income'], abs(income_map['refund_cash']))
  114. else:
  115. report = u'线下投币(%s)次 在线支付收入(%s)元' % (
  116. income_map['offline_coins'], income_map['online_income'])
  117. wechat_mp_proxy = get_wechat_manager_mp_proxy(agent)
  118. return wechat_mp_proxy.notify(dealer.managerialOpenId, 'daily_income', **{
  119. 'title': u'您的昨日收益报表',
  120. 'reportTime': yesterday.strftime('%Y-%m-%d'),
  121. 'report': report
  122. })
  123. try:
  124. if dealer is None:
  125. last_done_list = []
  126. doneRecord = DealerDoneReportRecord.objects(
  127. reportName = 'report_daily_report_to_dealer_via_wechat',
  128. reportDay = datetime.datetime.now().strftime('%Y-%m-%d')).only('dealerIds').first()
  129. if doneRecord and doneRecord.dealerIds:
  130. last_done_list = doneRecord.dealerIds
  131. dealers = Dealer.objects(dailyIncomeReportPushSwitch = True).only(
  132. 'id', 'managerialOpenId', 'agentId').batch_size(2000).timeout(False)
  133. for dealer in dealers:
  134. if str(dealer.id) in last_done_list:
  135. continue
  136. try:
  137. _report(dealer)
  138. except Exception as e:
  139. logger.exception(e)
  140. else:
  141. sent.add(str(dealer.id))
  142. else:
  143. _report(dealer)
  144. finally:
  145. if sent:
  146. DealerDoneReportRecord.update_done_list(
  147. report_name = 'report_daily_report_to_dealer_via_wechat',
  148. report_day = datetime.datetime.now().strftime('%Y-%m-%d'),
  149. done_list = list(sent))
  150. def report_new_payment_to_dealer_via_wechat(record):
  151. # type:(RechargeRecord)->dict
  152. """每笔订单完成后推送
  153. 模版元格式
  154. {{first.DATA}}
  155. 顾客:{{keyword1.DATA}}
  156. 收益:{{keyword2.DATA}}
  157. {{remark.DATA}}
  158. `模版微信公众号后台ID` : OPENTM207568114
  159. 您的顾客消费啦
  160. 顾客:小明
  161. 收益:10.00元
  162. 你的顾客消费啦,为您带来10.00元的收益哦
  163. :param record:
  164. """
  165. if not settings.SEND_DEALER_WECHAT_PUSH_MESSAGE:
  166. logger.info('SEND_DEALER_WECHAT_PUSH_MESSAGE is turned off, you are probably in dev env')
  167. else:
  168. logger.info('sending new payment order record=%s' % (json.dumps(record),))
  169. dealer = Dealer.objects(id = str(record['ownerId'])).first() # type: Optional[Dealer]
  170. if not dealer:
  171. return {'info': 'dealer does not exist, ownerId=%s' % (record['ownerId'],)}
  172. if not dealer.newUserPaymentOrderPushSwitch:
  173. return {'info': 'dealer(%s) hasn\'t switched up yet' % (record['ownerId'],)}
  174. if not dealer.managerialOpenId:
  175. return {'info': 'no manager open Id for dealer %s' % (dealer.username,)}
  176. agent = Agent.objects(id = dealer.agentId).first() # type: Optional[Agent]
  177. if not agent:
  178. return {'info': 'No agent found by id=%s' % (str(dealer.agentId),)}
  179. _translate = {
  180. 'wechat': u'微信',
  181. 'alipay': u'支付宝',
  182. 'card': u'卡充值'
  183. }
  184. from apps.web.dealer.validation import newPaymentNotifyPayload
  185. payload = newPaymentNotifyPayload(record)
  186. _info = _translate.get(payload['gateway'])
  187. _translate_via = {'redpack': '平台红包'}
  188. if payload.get('via') and _translate_via.get(payload['via']):
  189. _info = _translate_via[payload['via']]
  190. wechat_mp_proxy = get_wechat_manager_mp_proxy(agent)
  191. return wechat_mp_proxy.notify(dealer.managerialOpenId, 'new_payment_order', **{
  192. 'title': u'您有新顾客充值啦',
  193. 'customer': u'昵称(%s)来自(%s)' % (payload['nickname'], _info),
  194. 'income': u'%s元' % payload['money']
  195. })
  196. def report_device_abnormally_offline_to_dealer_via_wechat(dealer = None, logicalCode = None, offTime = 0):
  197. """
  198. 默认为2小时推送设备离线信息
  199. `模版`
  200. {{first.DATA}}
  201. 设备名称:{{keyword1.DATA}}
  202. 时间:{{keyword2.DATA}}
  203. {{remark.DATA}}
  204. `模版微信公众号后台ID` OPENTM401433345
  205. `示例`
  206. 掌聚网能设备离线通知
  207. 设备名称:机房UPS
  208. 时间:2016-02-19 13:12:45
  209. 请立即登录查看详情
  210. :return:
  211. """
  212. notified = defaultdict(list)
  213. def _report(dealer, logicalCode, offTime):
  214. if not dealer.managerialOpenId:
  215. return {'info': 'no managerial open Id for dealer %s' % (dealer.phone,)}
  216. agent = Agent.objects(id = dealer.agentId).first() # type: Agent
  217. if not agent:
  218. return {'info': 'No agent found by id=%s' % (str(dealer.agentId),)}
  219. if not agent.is_in_domain:
  220. return {'info': 'is not my agent. agentId = %s' % (str(dealer.agentId),)}
  221. if not agent.supports('notify_dealer_device_offline'):
  222. return {'info': 'agent does not support offline device push'}
  223. #: 记录操作结果
  224. notified[str(dealer.id)].append((logicalCode, offTime))
  225. wechat_mp_proxy = get_wechat_manager_mp_proxy(agent)
  226. return wechat_mp_proxy.notify(dealer.managerialOpenId, 'abnormal_device_offline', **{
  227. 'title': '您有一台设备已离线!',
  228. 'device': u'逻辑编码:%s' % logicalCode,
  229. 'notifyTime': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  230. })
  231. if dealer is None:
  232. dealers = list(Dealer.objects(offlineNotify = True))
  233. for dealer in dealers: # type: Dealer
  234. #: 暂时只通知设备所属经销商而不通知合伙人
  235. now = datetime.datetime.now()
  236. devices = dealer.get_own_devices()
  237. for device in devices: # type: DeviceDict
  238. if not device.online:
  239. if device.offTime > 0:
  240. offset = (now - datetime.datetime.fromtimestamp(device.offTime // 1000)).total_seconds()
  241. #: 如果设备离线超过24小时,则不予推送
  242. if 24 > (offset // 3600) >= 2:
  243. logger.info(
  244. 'device(logicalCode=%s) is offline(offTime=%s, offset=%s), sending to dealer(phone=%s)'
  245. % (device['logicalCode'], device['offTime'], offset, dealer.username,))
  246. _report(dealer, device['logicalCode'], device['offTime'])
  247. else:
  248. _report(dealer, logicalCode, offTime)
  249. return notified
  250. def report_offline_device_to_dealer_via_wechat():
  251. notified = defaultdict(list)
  252. dealers = Dealer.objects(offlineNotifySwitch = True)
  253. for _ in dealers:
  254. try:
  255. logicalCodes = []
  256. now = datetime.datetime.now()
  257. devices = _.get_own_devices()
  258. if len(devices) == 0:
  259. continue
  260. for d in devices:
  261. if d.online == 0:
  262. offset = (now - datetime.datetime.fromtimestamp(d.offTime // 1000)).total_seconds()
  263. if _.offlineNotifyTime != '':
  264. if (offset // 3600) > int(_.offlineNotifyTime):
  265. logicalCodes.append(d['logicalCode'])
  266. if len(logicalCodes) == 0:
  267. continue
  268. if not _.managerialOpenId:
  269. return {'info': 'no managerial open Id for dealer %s' % (_.phone,)}
  270. agent = Agent.objects(id = _.agentId).first() # type: Agent
  271. if not agent:
  272. return {'info': 'No agent found by id=%s' % (str(_.agentId),)}
  273. if not agent.is_in_domain:
  274. return {'info': 'is not my agent. agentId = %s' % (str(_.agentId),)}
  275. wechat_mp_proxy = get_wechat_manager_mp_proxy(agent)
  276. logicalCodesStr = ''
  277. for l in logicalCodes:
  278. logicalCodesStr += l + ' '
  279. wechat_mp_proxy.notify(_.managerialOpenId, 'abnormal_device_offline', **{
  280. 'title': '您有%s台设备已离线!' % len(logicalCodes),
  281. 'device': u'逻辑编码:%s' % logicalCodesStr,
  282. 'notifyTime': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  283. })
  284. except Exception as e:
  285. logicalCodesStr = ''
  286. pass
  287. notified[str(_.id)].append(logicalCodesStr)
  288. return notified
  289. def report_to_dealer_via_wechat(openId, dealerId, templateName, **kwargs):
  290. dealer = Dealer.objects(id = str(dealerId)).first()
  291. if not dealer:
  292. return {'info': 'dealer does not exist, id=%s' % (dealerId,)}
  293. wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer)
  294. return wechat_mp_proxy.notify(openId, templateName, **kwargs)
  295. def send_SIM_expired_messages(send_type):
  296. """
  297. 给经销商发送设备SIM卡过期短信
  298. :return:
  299. """
  300. logger.debug('send type is {}'.format(send_type))
  301. if settings.DEBUG_CELERY_TASK:
  302. expire_notify_devices = [{'logicalCode': 'DUMMY', 'ownerId': '5b9ae99ad89a177846459999'}]
  303. else:
  304. expire_notify_devices = Device.get_sim_expire_notify_devices()
  305. logger.info('%s devices will be expired' % len(expire_notify_devices))
  306. from cytoolz import groupby
  307. dealer_device_map = groupby('ownerId', expire_notify_devices)
  308. dealer_map = {
  309. dealer['_id']: dealer
  310. for dealer in Dealer.get_collection()
  311. .find({'_id': {'$in': [ObjectId(id_) for id_ in dealer_device_map.keys()]}},
  312. {'username': 1, 'agentId': 1, 'smsVendor': 1})
  313. } # type: Dict[ObjectId, dict]
  314. agent_product_map = {}
  315. for agent in Agent.objects(id__in = [_['agentId'] for _ in dealer_map.values()]).all(): # type: Agent
  316. agent_product_map[str(agent.id)] = agent.product_name
  317. if not len(dealer_map):
  318. logger.info('there are no devices are expired at the moment')
  319. return {'result': 'no devices are expired'}
  320. else:
  321. logger.info('we will notify %d dealer, devices are %s' % (len(dealer_map), dealer_device_map))
  322. if not (settings.DEBUG_CELERY_TASK or settings.CHECK_DEVICE_SMS_EXPIRE):
  323. logger.info('CHECK_DEVICE_SMS_EXPIRE is turned off, you are probably in dev/testing environment')
  324. else:
  325. smsVendor = SysParas.get_sim_expire_sms_vendor(default = SmsVendorCode.UCPAAS)
  326. for id_, dealer in dealer_map.iteritems():
  327. logger.debug('sending SIM expired message to Dealer(phone=%s)' % (dealer['username'],))
  328. if send_type == 'sms':
  329. logger.debug('dealerId = {}, mobile = {}, productName = {}'.format(
  330. id_, dealer['username'], agent_product_map[dealer['agentId']]))
  331. sms_sender = SMSSender(vendor = smsVendor)
  332. response = sms_sender.send(phoneNumber = dealer['username'],
  333. templateName = "SMS_NOTIFY_EXPIRED_DEVICE_TEMPLATEID",
  334. msg = u'您有设备即将在近期过期'.encode('utf-8'),
  335. productName = agent_product_map[dealer['agentId']])
  336. if not response['result']:
  337. logger.error(
  338. 'send sim expired sms to dealer failed(phone={}), error={}'.format(
  339. dealer['username'], response['msg']))
  340. else:
  341. logger.info('send sim expired sms to dealer(phone={}) success'.format(dealer['username']))
  342. if send_type == 'wechat':
  343. dealer = Dealer.objects(id = str(id_)).first() # type: Dealer
  344. if not dealer:
  345. logger.debug('dealer<id={}> is not exist.'.format(id_))
  346. continue
  347. wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer)
  348. wechat_mp_proxy.notify(dealer.managerialOpenId, 'sim_expire_notify', **{
  349. 'title': u'亲爱的用户,您有设备本月流量卡过期,请及时充值。已经充值请忽略本消息。'.format(
  350. agent_product_map[dealer['agentId']]),
  351. 'num': u'{}台(本月)'.format(len(dealer_device_map.get(str(id_)))),
  352. 'type': u'流量卡到期',
  353. 'time': arrow.now().replace(day = Const.SIM_CARD_FORBIDDEN_DAY).format('YYYY-MM-DD')
  354. })
  355. logger.info(
  356. 'sent sim expired message to dealer(phone={})'.format(dealer['username']))
  357. def daily_check_auto_withdraw():
  358. """
  359. 每日检查有自动提现资格经销商,该功能不倡导用
  360. :return:
  361. """
  362. if not settings.SUPPORT_DEALER_AUTO_WITHDRAW:
  363. logger.info('dealer auto withdraw is not supported, you are probably in testing/dev env')
  364. dealers = Dealer.objects(autoWithdrawAllowable = True)
  365. logger.info('we are going to pay dealers(%s)' % dealers)
  366. for dealer in dealers:
  367. for income_type in DEALER_INCOME_TYPE.choices():
  368. for source_key, balance_field in dealer.balance_dict(income_type):
  369. balance = balance_field.balance
  370. if balance > RMB(10):
  371. withdraw_service = DealerWithdrawService(payee = dealer,
  372. income_type = income_type,
  373. amount = balance,
  374. pay_type = WITHDRAW_PAY_TYPE.WECHAT,
  375. bank_card_no = None)
  376. result = withdraw_service.execute(source_key = source_key, recurrent = True)
  377. logger.info('%r withdrawn %s, result = %s' % (dealer, balance, result))
  378. # 写入的数据模型
  379. ExcelSheet = namedtuple("Sheet", ["data", "name"])
  380. def generate_business_stats_report_by_dealer(filePath, queryAttrs):
  381. """
  382. 给经销商生成旗下的经营数据报表
  383. :return:
  384. """
  385. logger.debug(
  386. 'generate_business_stats_report_by_dealer, filePath is <{}>, queryAttrs is <{}>'.format(filePath, queryAttrs))
  387. try:
  388. kind = queryAttrs.pop('kind', '')
  389. # 经销商加入了一个新的特性,该特性支持按月统计账单
  390. if kind != "monthly_bill":
  391. # 前台传递过来的数据是xxxx xx xx 23:59:59,相当于是下一天的闭区间,目前方法所需要的数据是当天的闭区间,所以把时间减一天
  392. queryAttrs['endTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__lte')) - datetime.timedelta(
  393. hours = 23, minutes = 59, seconds = 59)
  394. queryAttrs['startTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__gte'))
  395. if kind == "income":
  396. records = income_business_stat(**queryAttrs)
  397. groupByGroup = list()
  398. if records:
  399. df = pandas.DataFrame(records)
  400. # 地址分类
  401. for _groupName, _v in df.groupby(u"地址"):
  402. tempList = list()
  403. tempList.append((u"", _groupName))
  404. for _source, __v in _v.groupby(u"支付类型"):
  405. tempList.append((_source, __v[u"分得金额"].astype(float).sum()))
  406. tempList.append((u"分得总金额", _v[u"分得金额"].astype(float).sum()))
  407. tempList.append((u"总金额", _v[u"总金额"].astype(float).sum()))
  408. groupByGroup.append(OrderedDict(tempList))
  409. sheets = list()
  410. sheets.append(ExcelSheet(data = records, name = u"数据总览"))
  411. sheets.append(ExcelSheet(data = groupByGroup, name = u"地址汇总"))
  412. gernerate_excel_report_for_sheet(filePath, sheets)
  413. elif kind == "consume":
  414. records = consume_business_stat(**queryAttrs)
  415. generate_excel_report(filePath, records)
  416. else:
  417. logger.error("invalid kind <{}> to generate_business_stats_report_by_dealer")
  418. else:
  419. queryAttrs['endTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__lte')) - datetime.timedelta(
  420. hours = 23, minutes = 59, seconds = 59)
  421. queryAttrs['startTime'] = timestamp_to_dt(queryAttrs.pop('dateTimeAdded__gte'))
  422. records = monthly_business_stat(**queryAttrs)
  423. generate_excel_report(filePath, records)
  424. except Exception as e:
  425. logger.exception(e)
  426. # 计算经销商订单的一些统计数据,包括计算出日活设备占总设备的比例,以及各个套餐的使用数目
  427. def calc_dealer_order_data_stats(dealerId, startTime, endTime):
  428. logger.info('calc_dealer_order_data_stats dealerId=%s,startTime=%s,endTime=%s' % (dealerId, startTime, endTime))
  429. group_id_list = Group.get_group_ids_of_dealer(dealerId)
  430. rcds = ConsumeRecord.get_collection().find(
  431. {
  432. # 'ownerId': dealerId,
  433. 'groupId': {
  434. '$in': group_id_list
  435. },
  436. 'dateTimeAdded': {
  437. '$gte': startTime, '$lte': endTime
  438. },
  439. 'isNormal': True
  440. })
  441. # 计算套餐统计,以及活跃过的设备清单
  442. dealerDevNoSet = set()
  443. groupDict = {}
  444. dealerPackageDict = {}
  445. for rcd in rcds:
  446. dealerDevNoSet.add(rcd['devNo'])
  447. groupId = rcd['groupId']
  448. coin = str(rcd['coin'])
  449. if not groupDict.has_key(groupId):
  450. groupDict[groupId] = {'devNoSet': {rcd['devNo']}}
  451. groupDict[groupId]['package'] = {coin: 1} # type:Dict
  452. else:
  453. groupDict[groupId]['devNoSet'].add(rcd['devNo'])
  454. if coin in groupDict[groupId]['package']:
  455. groupDict[groupId]['package'][coin] += 1
  456. else:
  457. groupDict[groupId]['package'] = {coin: 1}
  458. if not dealerPackageDict.has_key(coin):
  459. dealerPackageDict[coin] = 1
  460. else:
  461. dealerPackageDict[coin] += 1
  462. # 计算实际每组的设备使用率
  463. allDevCount = 0
  464. for groupId, value in groupDict.items():
  465. devCount = len(Device.get_devNos_by_group([groupId]))
  466. groupActivedDevCount = len(value['devNoSet'])
  467. value['activedDevRatio'] = int(float(groupActivedDevCount) / devCount * 100) if devCount != 0 else 0
  468. allDevCount += devCount
  469. value.pop('devNoSet')
  470. activedDevRatio = int(float(len(dealerDevNoSet)) / allDevCount * 100) if allDevCount != 0 else 0
  471. return {'activedDevRatio': activedDevRatio, 'groupStats': groupDict, 'package': dealerPackageDict}
  472. def get_status_from_event(event):
  473. if event['signal'] > 0:
  474. return 'online'
  475. elif event['usage'] > 0:
  476. return 'offline_busy'
  477. else:
  478. return 'offline'
  479. # 因为设备状态记录依赖于事件的触发,有可能某个离线一直离线,不上线;或者设备一直在线,不曾离线,这样devStatusRecord就不会生成,
  480. # 但是统计数据又依赖这个数据,所以就需要定时触发,做一次切割,主动生成一次这个数据
  481. def make_dev_status_rcd(devNo, splitTime):
  482. def make_status_between_node(startNode, endNode, valueList = []):
  483. if startNode['time'] == endNode['time']:
  484. return
  485. if valueList:
  486. changeNode = valueList[-1]
  487. else:
  488. changeNode = startNode
  489. if changeNode['signal'] == 0 and endNode['signal'] > 0: # 离线->在线
  490. status = get_status_from_event(changeNode)
  491. elif changeNode['signal'] == 0 and endNode['signal'] == 0: # 离线->离线
  492. status = get_status_from_event(changeNode)
  493. elif changeNode['signal'] > 0 and endNode['signal'] == 0: # 在线->离线
  494. status = get_status_from_event(endNode)
  495. elif changeNode['signal'] > 0 and endNode['signal'] > 0: # 在线->在线
  496. if (to_datetime(endNode['time']) - to_datetime(changeNode['time'])).total_seconds() > 10 * 60:
  497. status = 'offline' if endNode['usage'] == 0 else 'offline_busy'
  498. else:
  499. status = 'online'
  500. # 记录数据
  501. if not valueList:
  502. valueList = [startNode, endNode]
  503. startTime = to_datetime(startNode['time'])
  504. endTime = to_datetime(endNode['time'])
  505. DevStatusRecord.get_collection().update({'devNo': devNo, 'startTime': startTime, 'endTime': endTime}, {'$set': {
  506. 'devNo': devNo,
  507. 'status': status,
  508. 'startTime': startTime,
  509. 'endTime': endTime,
  510. 'duration': (endTime - startTime).total_seconds(),
  511. 'valueDict': {'signalUsage': valueList}
  512. }}, upsert = True)
  513. logger.info('make_dev_status_rcd devNo=%s,splittime%s' % (devNo, splitTime))
  514. # 如果没有没有事件,说明最近从来没有上过线,也没有收到过离线报文,统一按照离线处理。(前提是,初始化的时候,cache中的event需要初始化一个数据进来)
  515. # 从来没有上过线,或者某种状态维持了1周,然后cache中丢失了,才会出现event 为None
  516. preDay = (splitTime - datetime.timedelta(days = 1)).strftime(Const.DATE_FMT)
  517. pre2Day = (splitTime - datetime.timedelta(days = 2)).strftime(Const.DATE_FMT)
  518. thisToday = splitTime.strftime(Const.DATE_FMT)
  519. startTime = to_datetime(preDay + ' 00:00:00')
  520. endTime = to_datetime(preDay + ' 23:59:59')
  521. startTimePre2Day = to_datetime(pre2Day + ' 00:00:00') # 用于多取一些数据,方便判断第一个起始时间到第一个节点时间之间的状态
  522. endTimePre2Day = to_datetime(preDay + ' 23:59:59')
  523. startTimeToday = to_datetime(thisToday + ' 00:00:00') # 用于多取一些数据,方便判断最后一个时间节点到结束时间之间的状态
  524. endTimeToday = to_datetime(thisToday + ' 23:59:59')
  525. events = SignalManager.instence().get(devNo, startTime, startTime)
  526. lastEvents = SignalManager.instence().get(devNo, startTimePre2Day, endTimePre2Day)
  527. nextEvents = SignalManager.instence().get(devNo, startTimeToday, endTimeToday)
  528. # 这种情况要么是掉线时间过久,memcache中的key都被清除掉了;要么是没有初始化进来包括设备离线增加进来,未上线或者监控脚本才启动,没有初始化(第一次启动的时候,数据略有偏差,可以忽略)。
  529. if not events:
  530. make_status_between_node(
  531. {'time': startTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0, 'isSplit': 1},
  532. {'time': endTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0, 'isSplit': 1})
  533. else: # 如果上一次事件的时间是昨天以前,就需要切割
  534. count = len(events)
  535. preStatusEvent = events[0]
  536. valueList = []
  537. # 首先处理第一个节点
  538. if lastEvents and lastEvents[-1]: #
  539. make_status_between_node(lastEvents[-1], preStatusEvent)
  540. else: # 没有监听的情况,默认认为离线
  541. make_status_between_node({'time': startTime.strftime(Const.DATETIME_FMT), 'signal': 0, 'usage': 0},
  542. preStatusEvent)
  543. for ii in range(count - 1):
  544. curEvent = events[ii]
  545. nextEvent = events[ii + 1]
  546. valueList.append(curEvent)
  547. if (curEvent['signal'] > 0 and nextEvent['signal'] > 0): # 两个节点都在线,需要判断两个节点的时间间隔,如果超过10分钟,算作离线
  548. if (to_datetime(nextEvent['time']) - to_datetime(curEvent['time'])).total_seconds() > 10 * 60:
  549. make_status_between_node(preStatusEvent, curEvent, valueList) # 在线状态
  550. make_status_between_node(curEvent, {'time': nextEvent['time'], 'signal': 0, 'usage': 0}) # 离线状态
  551. valueList = []
  552. preStatusEvent = nextEvent
  553. else:
  554. if ii == (count - 2): # 如果最后一个,应该主动生成
  555. make_status_between_node(preStatusEvent, nextEvent, valueList)
  556. valueList = []
  557. preStatusEvent = nextEvent
  558. elif curEvent['signal'] > 0 and nextEvent['signal'] == 0: # 在线-》离线,应该记录一个在线,一个离线
  559. make_status_between_node(preStatusEvent, curEvent, valueList) # 在线状态
  560. make_status_between_node(curEvent, nextEvent) # 离线状态
  561. valueList = []
  562. preStatusEvent = nextEvent
  563. elif curEvent['signal'] == 0 and nextEvent['signal'] > 0: # 离线->在线,产生一条离线处理
  564. # 记录离线
  565. make_status_between_node(preStatusEvent, nextEvent, valueList)
  566. valueList = []
  567. preStatusEvent = nextEvent
  568. elif curEvent['signal'] == 0 and nextEvent['signal'] == 0:
  569. if ii == count - 2: # 如果最后一个,应该主动生成
  570. make_status_between_node(preStatusEvent, nextEvent, valueList)
  571. # 收尾最后一个事件,直接产生一条记录
  572. if nextEvents and nextEvents[0]:
  573. make_status_between_node(events[-1], nextEvents[0], valueList) # 如果跑出来,能够碰到心跳,
  574. else:
  575. make_status_between_node(events[-1], {'time': endTime, 'signal': 0, 'usage': 0}, valueList)
  576. logger.info('finish make_dev_status_rcd devNo=%s' % devNo)
  577. # 统计设备状态数据
  578. def calc_dev_status_stat(devNo, startTime, endTime):
  579. logger.info('calc dev status devNo=%s,startTime=%s,endTime=%s' % (devNo, startTime, endTime))
  580. def count_time(startTime, endTime, rcd, totalDict):
  581. status = rcd['status']
  582. duration = (endTime - startTime).total_seconds()
  583. if status == 'online':
  584. totalDict['totalOnlineTime'] += duration
  585. elif status == 'offline':
  586. totalDict['totalOfflineTime'] += duration
  587. elif status == 'offline_busy':
  588. totalDict['totalOfflineBusyTime'] += duration
  589. else:
  590. pass
  591. queryStartTime = startTime - datetime.timedelta(days = 1)
  592. queryEndTime = endTime + datetime.timedelta(days = 1)
  593. rcds = DevStatusRecord.objects.filter(devNo = devNo, startTime__gte = queryStartTime, endTime__lte = queryEndTime)
  594. totalDict = {'totalOfflineTime': 0, 'totalOfflineBusyTime': 0, 'totalOnlineTime': 0, 'offlineCount': 0,
  595. 'offlineBusyCount': 0, 'onlineBusy': 0}
  596. # 统计的状态的累计之和,包括总的离线时间、在线时间、离线繁忙时间等
  597. statusList = []
  598. for rcd in rcds:
  599. if endTime <= rcd.startTime or startTime >= rcd.endTime: # 两头的不合理,直接去掉
  600. continue
  601. elif startTime >= rcd.startTime and endTime <= rcd.endTime:
  602. count_time(startTime, endTime, rcd, totalDict)
  603. elif startTime <= rcd.startTime:
  604. if endTime >= rcd.startTime and endTime <= rcd.endTime:
  605. count_time(rcd.startTime, endTime, rcd, totalDict)
  606. elif endTime >= endTime:
  607. count_time(rcd.startTime, rcd.endTime, rcd, totalDict)
  608. else:
  609. continue
  610. elif startTime >= rcd.startTime and startTime <= rcd.endTime:
  611. if endTime <= rcd.endTime:
  612. count_time(startTime, endTime, rcd, totalDict)
  613. else:
  614. count_time(startTime, rcd.endTime, rcd, totalDict)
  615. else:
  616. continue
  617. statusList.extend(rcd.valueDict['signalUsage'])
  618. # 计算每个小时统计数据。如果在线变离线、离线变在线,空闲变繁忙,繁忙变空闲。记录的是变化情况
  619. hourDict = {}
  620. ii = 0
  621. while ii < len(statusList) - 1:
  622. preStatus = statusList[ii]
  623. status = statusList[ii + 1]
  624. ii += 1
  625. # 如果是人为劈开的,不作为正式数据,直接过滤掉
  626. if status.has_key('isSplit'):
  627. continue
  628. # 不在时间范围内的,过滤掉
  629. statusTime = to_datetime(status['time'])
  630. if statusTime < startTime or statusTime > endTime:
  631. continue
  632. # 如果状态没有变化,也直接过滤掉
  633. if (preStatus['signal'] > 0 and status['signal'] > 0 and preStatus['usage'] == status['usage']) \
  634. or (preStatus['signal'] == 0 and status['signal'] == 0):
  635. continue
  636. hour = statusTime.hour
  637. if not hourDict.has_key(hour):
  638. hourDict[hour] = {'offlineCount': 0, 'offlineBusyCount': 0, 'onlineBusy': 0, 'usageSum': 0, 'usageCount': 0}
  639. if status['usage'] == 0 and status['signal'] == 0:
  640. hourDict[hour]['offlineCount'] += 1
  641. totalDict['offlineCount'] += 1
  642. elif status['usage'] > 0 and status['signal'] == 0:
  643. hourDict[hour]['offlineBusyCount'] += 1
  644. hourDict[hour]['usageSum'] += status['usage']
  645. hourDict[hour]['usageCount'] += 1
  646. totalDict['offlineBusyCount'] += 1
  647. elif status['usage'] > 0 and status['signal'] > 0:
  648. hourDict[hour]['onlineBusy'] += 1
  649. hourDict[hour]['usageSum'] += status['usage']
  650. hourDict[hour]['usageCount'] += 1
  651. totalDict['onlineBusy'] += 1
  652. else:
  653. pass
  654. return {'totalOfflineTime': totalDict['totalOfflineTime'],
  655. 'totalOfflineBusyTime': totalDict['totalOfflineBusyTime'],
  656. 'totalOnlineTime': totalDict['totalOnlineTime'],
  657. 'offlineCount': totalDict['offlineCount'],
  658. 'offlineBusyCount': totalDict['offlineBusyCount'],
  659. 'onlineBusy': totalDict['onlineBusy'],
  660. 'hourly': hourDict}
  661. def calc_one_dealer_and_insert_into_db(dealerId, nowTime):
  662. logger.info('start calc dealerId=%s info and insert into db now......' % dealerId)
  663. def build_daily_update(valueMap):
  664. rv = defaultdict(dict)
  665. for kind, amount in valueMap.iteritems():
  666. if kind == 'package':
  667. for coin, value in amount.items():
  668. rv['$inc']['other.package.{coin}'.format(coin = int(int(float(coin)) * 100))] = int(value)
  669. elif kind == 'hourly':
  670. for hour, value in amount.items():
  671. for hourKind, hourValue in value.items():
  672. rv['$inc']['other.hourly.{hour}.{hourKind}'.format(hour = hour, hourKind = hourKind)] = int(
  673. hourValue)
  674. else:
  675. rv['$inc']['other.{kind}'.format(kind = kind)] = int(amount)
  676. if rv.has_key('activedDevRatio'):
  677. rv.pop('activedDevRatio')
  678. if rv.has_key('peakUsage'):
  679. rv.pop('peakUsage')
  680. if rv.has_key('devCount'):
  681. rv.pop('devCount')
  682. rv.update({'$set': {'devCount': valueMap.get('devCount', 0),
  683. 'activedDevRatio': valueMap.get('activedDevRatio', 0),
  684. 'peakUsage': valueMap.get('peakUsage', 0)}})
  685. return rv
  686. # 月度数据,主要是月度平均活跃度,需要特殊计算
  687. def build_month_update(valueMap, preDay, monthactivedDevRatio):
  688. rv = defaultdict(dict)
  689. for kind, amount in valueMap.iteritems():
  690. if kind == 'package':
  691. for coin, value in amount.items():
  692. rv['$inc']['other.package.{coin}'.format(coin = int(int(float(coin)) * 100))] = int(value)
  693. elif kind == 'hourly':
  694. for hour, value in amount.items():
  695. for hourKind, hourValue in value.items():
  696. rv['$inc']['other.hourly.{hour}.{hourKind}'.format(hour = hour, hourKind = hourKind)] = int(
  697. hourValue)
  698. else:
  699. rv['$inc']['other.{kind}'.format(kind = kind)] = int(amount)
  700. if rv.has_key('activedDevRatio'): # 目前这个算法是有缺陷的,因为每天的设备数目是变化的,准确的计算应该是当月进行计算才对
  701. rv.pop('activedDevRatio')
  702. if rv.has_key('devCount'):
  703. rv.pop('devCount')
  704. activedDevRatio = int(((preDay - 1) * monthactivedDevRatio + valueMap['activedDevRatio']) / preDay)
  705. rv.update({'$set': {'devCount': valueMap.get('devCount', 0),
  706. 'activedDevRatio': activedDevRatio}})
  707. return rv
  708. preDaytime = nowTime - datetime.timedelta(days = 1)
  709. preDay = preDaytime.day
  710. preStrDay = preDaytime.strftime(Const.DATE_FMT)
  711. preMonth = MONTH_DATE_KEY.format(year = preDaytime.year, month = preDaytime.month)
  712. startTime = to_datetime(preStrDay + ' 00:00:00')
  713. endTime = to_datetime(preStrDay + ' 23:59:59')
  714. groupIds = Group.get_group_ids_of_dealer(str(dealerId))
  715. devNoList = Device.get_devNos_by_group(groupIds)
  716. devStatDict = {}
  717. dealerHourlyDict = {}
  718. for devNo in devNoList:
  719. make_dev_status_rcd(devNo, nowTime)
  720. statInfo = calc_dev_status_stat(devNo, startTime, endTime)
  721. devStatDict[devNo] = statInfo
  722. statInfo = calc_dealer_order_data_stats(str(dealerId), startTime, endTime)
  723. groupStatDict = statInfo['groupStats']
  724. activedDevRatio = statInfo['activedDevRatio']
  725. dealerPackage = statInfo['package']
  726. totalOfflineTime, totalOfflineBusyTime, totalOnlineTime, offlineCount, offlineBusyCount = 0, 0, 0, 0, 0
  727. dealerDevCount = 0
  728. # 进行汇总计算数据,从设备-》组-》经销商
  729. groups = Group.get_groups_by_group_ids(groupIds)
  730. for groupId, group in groups.items():
  731. devNos = Device.get_devNos_by_group([groupId])
  732. groupDevCount = 0
  733. if not groupStatDict.has_key(groupId):
  734. groupStatDict[groupId] = {
  735. 'totalOfflineTime': 0,
  736. 'totalOfflineBusyTime': 0,
  737. 'totalOnlineTime': 0,
  738. 'offlineCount': 0,
  739. 'offlineBusyCount': 0,
  740. 'activedDevRatio': 0,
  741. 'devCount': 0,
  742. 'package': {},
  743. 'hourly': {}
  744. }
  745. else:
  746. groupStatDict[groupId].update({
  747. 'totalOfflineTime': 0,
  748. 'totalOfflineBusyTime': 0,
  749. 'totalOnlineTime': 0,
  750. 'offlineCount': 0,
  751. 'offlineBusyCount': 0,
  752. 'hourly': {}
  753. })
  754. for devNo in devNos:
  755. groupDevCount += 1
  756. dealerDevCount += 1
  757. devInfo = devStatDict.get(devNo, {})
  758. devTotalOfflineTime = devInfo.get('totalOfflineTime', 0)
  759. devTotalOfflineBusyTime = devInfo.get('totalOfflineBusyTime', 0)
  760. devTotalOnlineTime = devInfo.get('totalOnlineTime', 0)
  761. devOfflineCount = devInfo.get('offlineCount', 0)
  762. devOfflineBusyCount = devInfo.get('offlineBusyCount', 0)
  763. hourlyDict = devInfo.get('hourly', {})
  764. groupStatDict[groupId]['totalOfflineTime'] += devTotalOfflineTime
  765. groupStatDict[groupId]['totalOfflineBusyTime'] += devTotalOfflineBusyTime
  766. groupStatDict[groupId]['totalOnlineTime'] += devTotalOnlineTime
  767. groupStatDict[groupId]['offlineCount'] += devOfflineCount
  768. groupStatDict[groupId]['offlineBusyCount'] += devOfflineBusyCount
  769. # 往组和经销商内统计数据,补充小时级别的统计数据
  770. for hour, value in hourlyDict.items():
  771. if hour not in groupStatDict[groupId]['hourly']:
  772. groupStatDict[groupId]['hourly'] = {hour: value}
  773. else:
  774. groupStatDict[groupId]['hourly'][hour]['offlineCount'] += hourlyDict[hour]['offlineCount']
  775. groupStatDict[groupId]['hourly'][hour]['offlineBusyCount'] += hourlyDict[hour]['offlineBusyCount']
  776. groupStatDict[groupId]['hourly'][hour]['usageSum'] += hourlyDict[hour]['usageSum']
  777. groupStatDict[groupId]['hourly'][hour]['usageCount'] += hourlyDict[hour]['usageCount']
  778. # 经销商级别的汇总
  779. if not dealerHourlyDict.has_key(hour):
  780. dealerHourlyDict[hour] = value
  781. else:
  782. dealerHourlyDict[hour]['offlineCount'] += hourlyDict[hour]['offlineCount']
  783. dealerHourlyDict[hour]['offlineBusyCount'] += hourlyDict[hour]['offlineBusyCount']
  784. dealerHourlyDict[hour]['usageSum'] += hourlyDict[hour]['usageSum']
  785. dealerHourlyDict[hour]['usageCount'] += hourlyDict[hour]['usageCount']
  786. totalOfflineTime += devTotalOfflineTime
  787. totalOfflineBusyTime += devTotalOfflineBusyTime
  788. totalOnlineTime += devTotalOnlineTime
  789. offlineCount += devOfflineCount
  790. offlineBusyCount += devOfflineBusyCount
  791. # 更新地址下的设备总数
  792. groupStatDict[groupId].update({'devCount': groupDevCount})
  793. # 找出地址下的最大繁忙度
  794. groupHourly = groupStatDict[groupId].get('hourly', {})
  795. peakUsage = 0
  796. for hour, info in groupHourly.items():
  797. usage = int(info['usageSum'] / info['usageCount']) if info['usageCount'] > 0 else 0
  798. if usage >= peakUsage:
  799. peakUsage = usage
  800. groupStatDict[groupId]['peakUsage'] = peakUsage
  801. dealerInfo = {
  802. 'totalOfflineTime': totalOfflineTime,
  803. 'totalOfflineBusyTime': totalOfflineBusyTime,
  804. 'totalOnlineTime': totalOnlineTime,
  805. 'offlineCount': offlineCount,
  806. 'offlineBusyCount': offlineBusyCount,
  807. 'devCount': dealerDevCount,
  808. 'activedDevRatio': activedDevRatio,
  809. 'package': dealerPackage,
  810. 'hourly': dealerHourlyDict
  811. }
  812. # 将数据分别存入设备、地址、经销商的日报表中.设备活跃度,需要好好想下
  813. for groupId in groupIds:
  814. for devNo in devNos:
  815. logicalCode = Device.get_logicalCode_by_devNo(devNo)
  816. devInfo = devStatDict.get(devNo, {})
  817. DeviceDailyStat.get_collection().update({'logicalCode': logicalCode, 'date': preStrDay},
  818. build_daily_update(devInfo), multi = True, upsert = True)
  819. groupInfo = groupStatDict.get(groupId, {})
  820. GroupDailyStat.get_collection().update({'groupId': ObjectId(groupId), 'date': preStrDay},
  821. build_daily_update(groupInfo), multi = True, upsert = True)
  822. DealerDailyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preStrDay},
  823. build_daily_update(dealerInfo), multi = True, upsert = True)
  824. try:
  825. mouthActivedDevRatio = \
  826. DealerDailyStat.get_collection().find({'dealerId': ObjectId(dealerId), 'date': preStrDay},
  827. {'activedDevRatio': 1})[0]['activedDevRatio']
  828. except Exception, e:
  829. mouthActivedDevRatio = 0
  830. DealerMonthlyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preMonth},
  831. build_month_update(dealerInfo, preDay, mouthActivedDevRatio),
  832. multi = True, upsert = True)
  833. logger.info('finished calc dealerId=%s info and insert into db now' % dealerId)
  834. # 统计本月新增加的用户量 以及所有的用户量
  835. def calc_dealer_user_count():
  836. preDaytime = datetime.datetime.now() - datetime.timedelta(days = 1)
  837. preStrDay = preDaytime.strftime(Const.DATE_FMT)
  838. preMonth = MONTH_DATE_KEY.format(year = preDaytime.year, month = preDaytime.month)
  839. endTime = to_datetime(preStrDay + ' 23:59:59')
  840. dealerIds = [str(dealer.id) for dealer in Dealer.objects.all().only('id')]
  841. for dealerId in dealerIds:
  842. try:
  843. groupIds = Group.get_group_ids_of_dealer(str(dealerId))
  844. monthStartTime = to_datetime(preMonth + '-01 00:00:00')
  845. userAddedThisMonth = MyUser.get_user_count_by_filter(groupIds, {
  846. 'dateTimeAdded': {'$gte': monthStartTime, '$lte': endTime}})
  847. DealerMonthlyStat.get_collection().update({'dealerId': ObjectId(dealerId), 'date': preMonth},
  848. {'$set': {'addedUserCount': userAddedThisMonth}})
  849. logger.info('dealerId=%s,added this month usercount=%s' % (dealerId, userAddedThisMonth))
  850. # 统计所有的用户量
  851. allUserCount = MyUser.get_user_count_by_filter(groupIds)
  852. Dealer.get_collection().update({'_id': ObjectId(dealerId)}, {'$set': {'userCount': allUserCount}})
  853. logger.info('dealerId=%s,all user count=%s' % (dealerId, allUserCount))
  854. except Exception, e:
  855. logger.error('stat user count e=%s' % e)
  856. def calc_dealer_stat_and_insert_into_db_for_dealers(dealerIds):
  857. nowTime = datetime.datetime.now()
  858. logger.info('calc dealer info now.....,dealers count=%s' % len(dealerIds))
  859. for dealerId in dealerIds:
  860. calc_one_dealer_and_insert_into_db(dealerId, nowTime)
  861. logger.info('finished all: calc dealer info and insert into db now')
  862. def calc_dealer_stat_and_insert_into_db():
  863. logger.info('start all: calc dealer info and insert into db now')
  864. dealerIds = [str(dealer.id) for dealer in Dealer.objects.all().only('id')]
  865. dealerCount = len(dealerIds)
  866. logger.info('calc dealer info now,all dealer count = %s' % dealerCount)
  867. pageSize = dealerCount / 3
  868. pageNum = dealerCount / pageSize if dealerCount % pageSize == 0 else dealerCount / pageSize + 1
  869. for pageIndex in range(pageNum):
  870. sliceDealerIds = dealerIds[pageIndex * pageSize:(pageIndex + 1) * pageSize]
  871. try:
  872. t = Thread(target = calc_dealer_stat_and_insert_into_db_for_dealers, kwargs = {'dealerIds': sliceDealerIds})
  873. t.setName('calc_dealer_stat_%s' % pageIndex)
  874. t.setDaemon(False)
  875. t.start()
  876. except Exception as e:
  877. logger.exception(e)
  878. # {'startTime':startDate,'endTime':endDate,'ownerId':ownerId,'groupId':groupId,'logicalCode':logicalCode}
  879. def export_charge_order_excel_from_db(filepath, queryDict):
  880. logger.info('start export_charge_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict))
  881. startTime = queryDict.get('startTime', None)
  882. endTime = queryDict.get('endTime', None)
  883. ownerId = queryDict.get('ownerId', '')
  884. groupId = queryDict.get('groupId', '')
  885. logicalCode = queryDict.get('logicalCode', '')
  886. phoneNumber = queryDict.get('phoneNumber')
  887. chargeTypeDict = {'recharge': u'充值', 'sendcoin': u'赠币', 'refund': u'退币', 'chargeCard': u'卡充值',
  888. 'chargeVirtualCard': u'虚拟卡充值'}
  889. groupIdList = Group.get_group_ids_of_dealer(ownerId)
  890. filters = {
  891. "ownerId": ownerId,
  892. "result": "success",
  893. }
  894. logger.info("ownerId is %s" % ownerId)
  895. if logicalCode:
  896. filters.update({"logicalCode": logicalCode})
  897. if groupId:
  898. filters.update({"groupId": groupId})
  899. if phoneNumber:
  900. phoneOwner = MyUser.objects.filter(phoneNumber = phoneNumber).first()
  901. if phoneOwner:
  902. filters.update({"openId": phoneOwner.openId})
  903. partnerDict = {}
  904. for groupId in groupIdList:
  905. group = Group.get_group(groupId)
  906. partnerDict.update(group.get('partnerDict', {}))
  907. userDict = {}
  908. records = []
  909. rcdList = []
  910. logger.info("filter is %s" % filters)
  911. rechargeRcds = ClientRechargeModelProxy(st = startTime, et = endTime).all(**filters)
  912. for rcd in rechargeRcds:
  913. if rcd.via in ['refund', 'sendcoin']:
  914. continue
  915. openId = rcd.openId
  916. if not userDict.has_key(openId):
  917. try:
  918. user = MyUser.objects.filter(openId = openId).only('openId', 'nickname', 'sex', 'groupId', 'country',
  919. 'province', 'city', "gateWay",
  920. "productAgentId").first()
  921. sex = u'未知'
  922. if user.sex == 0:
  923. sex = u'女'
  924. elif user.sex == 1:
  925. sex = u'男'
  926. else:
  927. pass
  928. user = {'sex': sex, 'nickname': user.nickname,
  929. 'zone': '%s%s%s' % (user.country, user.province, user.city), "phoneNumber": user.phone}
  930. userDict[openId] = user
  931. except Exception, e:
  932. continue
  933. else:
  934. user = userDict[openId]
  935. from apps.web.common.proxy import ClientDealerIncomeModelProxy
  936. try:
  937. proxyRcd = ClientDealerIncomeModelProxy.get_one(ref_id = rcd.id,
  938. groupId = ObjectId(rcd.groupId)) # type: DealerIncomeProxy
  939. if not proxyRcd:
  940. raise ValueError("rechargeRecord <{}> has not dealer income proxy".format(rcd.id))
  941. except Exception, e:
  942. continue
  943. # 获取代理商、经销商的收入
  944. amountDict = DealerIncomeProxy.get_agent_partner_allocated_money(ownerId, proxyRcd.partition, partnerDict)
  945. userNickname = "{}-{}".format(user.get("nickname", ""), user.get("phoneNumber", "")) if user.get(
  946. "phoneNumber") else user.get("nickname")
  947. dataList = [
  948. (u'逻辑编码', rcd.logicalCode),
  949. (u'IMEI', rcd.devNo),
  950. (u'设备类型', rcd.dev_type_name),
  951. (u'组名称', rcd.groupName),
  952. (u'组内编号', rcd.groupNumber),
  953. (u'组地址', rcd.address),
  954. (u'用户昵称', userNickname),
  955. (u'用户性别', user.get('sex', '')),
  956. (u'用户地域', user.get('zone', '')),
  957. (u'订单号', rcd.orderNo),
  958. (u'下单时间', rcd.to_datetime_str(rcd.dateTimeAdded)),
  959. (u'第三方支付单号', rcd.wxOrderNo),
  960. (u'支付方式', u'微信' if rcd.gateway == 'wechat' else u'支付宝'),
  961. (u'是否快捷支付', 'yes' if rcd.isQuickPay else 'no'),
  962. (u'充值金额', str(rcd.money)),
  963. (u'充值金币', str(rcd.coins)),
  964. (u'充值方式', chargeTypeDict.get(rcd.via, '')), # 卡充值、虚拟卡充值、直接充值等
  965. (u'设备负责人分配收入', str(amountDict.get('ownerAmount'))),
  966. (u'代理商分配金额', str(amountDict.get('agentAmount')))
  967. ]
  968. # 把所有合伙人的分配都记录下来
  969. for partnerIncome in amountDict.get('partnerDict').values():
  970. dataList.append((u'合伙人分配金额:%s(%s)' % (partnerIncome['nickname'], partnerIncome['username']),
  971. str(partnerIncome['money'])))
  972. records.append(OrderedDict(dataList))
  973. generate_excel_report(filepath, records)
  974. logger.info('finished export_charge_order_excel_from_db')
  975. def export_consume_order_excel_from_db(filepath, queryDict):
  976. logger.info('start export_consume_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict))
  977. startTime = queryDict.get('startTime', None)
  978. endTime = queryDict.get('endTime', None)
  979. ownerId = queryDict.get('ownerId', '')
  980. groupId = queryDict.get('groupId', '')
  981. logicalCode = queryDict.get('logicalCode', '')
  982. filters = {
  983. "isNormal": True
  984. }
  985. if logicalCode:
  986. filters.update({"logicalCode": logicalCode})
  987. elif groupId:
  988. filters.update({"groupId": groupId})
  989. else:
  990. groupIds = Group.get_group_ids_of_dealer(str(ownerId))
  991. if not groupIds:
  992. return generate_excel_report(filepath, [])
  993. filters.update({'groupId__in': groupIds})
  994. # 找出所有消费记录可以展示的消费细节信息,比如:消耗电量、使用时长等
  995. servicedKeysTemp = []
  996. rcdList = []
  997. consume_orders = ClientConsumeModelProxy(
  998. st = startTime,
  999. et = endTime
  1000. ).all(**filters) # type:List[ConsumeRecord]
  1001. for consume_order in consume_orders:
  1002. try:
  1003. servicedKeysTemp.extend(consume_order.servicedInfo.keys())
  1004. rcdList.append(consume_order)
  1005. except Exception as e:
  1006. logger.exception('consume record id = {}, exception = {}'.format(str(consume_order.id), e))
  1007. servicedKeysTemp = list(set(servicedKeysTemp))
  1008. servicedKeys = []
  1009. for key in servicedKeysTemp:
  1010. if key in DEALER_CONSUMPTION_AGG_KIND_TRANSLATION:
  1011. servicedKeys.append(key)
  1012. else:
  1013. continue
  1014. userDict = {}
  1015. records = []
  1016. for rcd in rcdList:
  1017. openId = rcd.openId
  1018. if openId not in userDict:
  1019. try:
  1020. user = MyUser.objects.filter(openId = openId).only('nickname', 'phone').first()
  1021. userDict[openId] = {'nickname': user.nickname, "phoneNumber": user.phone}
  1022. user = userDict[openId]
  1023. except Exception, e:
  1024. continue
  1025. else:
  1026. user = userDict[openId]
  1027. userNickname = "{}-{}".format(user.get("nickname", ""), user.get("phoneNumber", "")) if user.get(
  1028. "phoneNumber") else user.get("nickname")
  1029. dataList = [
  1030. (u'逻辑编码', rcd.logicalCode),
  1031. (u'IMEI', rcd.devNo),
  1032. (u'设备端口', rcd.attachParas.get('port', '-')),
  1033. (u'设备类型', rcd.dev_type_name),
  1034. (u'组名称', rcd.groupName),
  1035. (u'组内编号', rcd.groupNumber),
  1036. (u'组地址', rcd.address),
  1037. (u'用户昵称', userNickname),
  1038. (u'订单号', rcd.orderNo),
  1039. (u'下单时间', rcd.to_datetime_str(rcd.dateTimeAdded)),
  1040. (u'花费金币', str(rcd.coin)),
  1041. (u'消费备注', str(rcd.remarks) if rcd.remarks else u'扫码消费'),
  1042. (u'端口', str(rcd.used_port) if rcd.used_port != -1 else ''),
  1043. (u'停止时间', rcd.to_datetime_str(rcd.device_finished_time)),
  1044. (u'结束原因', rcd.servicedInfo.get('reason', ''))
  1045. ]
  1046. # 把消费细节也记录到excel表中
  1047. for key in servicedKeys:
  1048. dataList.append((DEALER_CONSUMPTION_AGG_KIND_TRANSLATION.get(key), str(rcd.servicedInfo.get(key, '-'))))
  1049. records.append(OrderedDict(dataList))
  1050. generate_excel_report(filepath, records)
  1051. def export_API_order_excel_from_db(filepath, queryDict):
  1052. logger.info('start export_API_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict))
  1053. startTime = queryDict.get('startTime', None)
  1054. endTime = queryDict.get('endTime', None)
  1055. ownerId = queryDict.get('ownerId', '')
  1056. groupId = queryDict.get('groupId', '')
  1057. logicalCode = queryDict.get('logicalCode', '')
  1058. if logicalCode:
  1059. dev = Device.get_dev_by_l(logicalCode)
  1060. devNoList = [dev['devNo']]
  1061. elif groupId:
  1062. devNoList = Device.get_devNos_by_group([groupId])
  1063. else:
  1064. devNoList = None
  1065. if devNoList:
  1066. rcds = APIStartDeviceRecord.objects(ownerId = ownerId, devNo__in = devNoList,
  1067. errCode = 0, datetimeAdded__gte = startTime,
  1068. datetimeAdded__lte = endTime).order_by('-datetimeAdded')
  1069. else:
  1070. rcds = APIStartDeviceRecord.objects(ownerId = ownerId, errCode = 0,
  1071. datetimeAdded__gte = startTime,
  1072. datetimeAdded__lte = endTime).order_by('-datetimeAdded')
  1073. records = []
  1074. for rcd in rcds:
  1075. dataList = [
  1076. (u'下单时间', rcd.to_datetime_str(rcd.datetimeAdded)),
  1077. (u'订单编号', rcd.attachParas.get('extOrderNo', '-')),
  1078. (u'用户ID', rcd['userId']),
  1079. (u'订购时间', rcd.package.get('time', '-')),
  1080. (u'退费金额', rcd.servicedInfo.get('backCoins', '-')),
  1081. (u'消耗电量', rcd.servicedInfo.get('spendElec', '-')),
  1082. (u'消费金额', str(rcd.package.get('price', '-'))),
  1083. (u'地址', Group.get_groupName_by_logicalCode(rcd.deviceCode)),
  1084. (u'设备编号', rcd.deviceCode),
  1085. ]
  1086. records.append(OrderedDict(dataList))
  1087. generate_excel_report(filepath, records)
  1088. def export_on_points_order_excel_from_db(filepath, queryDict):
  1089. logger.info('start export_onPoints_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict))
  1090. startTime = queryDict.get('startTime', None)
  1091. endTime = queryDict.get('endTime', None)
  1092. ownerId = queryDict.get('ownerId', '')
  1093. groupId = queryDict.get('groupId', '')
  1094. logicalCode = queryDict.get('logicalCode', '')
  1095. filters = {
  1096. "ownerId": ownerId,
  1097. "time__gte": startTime,
  1098. "time__lte": endTime
  1099. }
  1100. if logicalCode:
  1101. dev = Device.get_dev_by_l(logicalCode)
  1102. devNoList = [dev['devNo']]
  1103. filters.update({"devNo": dev["devNo"]})
  1104. elif groupId:
  1105. devNoList = Device.get_devNos_by_group([groupId])
  1106. filters.update({"devNo__in": devNoList})
  1107. else:
  1108. devNoList = None
  1109. rcds = UpscoreRecord.objects(**filters).order_by("-time")
  1110. records = []
  1111. for rcd in rcds:
  1112. dataList = [
  1113. (u'下分时间', rcd.time),
  1114. (u'地址', rcd.groupName),
  1115. (u'设备编号', rcd.logicalCode),
  1116. (u'上分金额', str(rcd.score)),
  1117. ]
  1118. records.append(OrderedDict(dataList))
  1119. generate_excel_report(filepath, records)
  1120. def export_send_coins_to_card_order_excel_from_db(filepath, queryDict):
  1121. logger.info('start export_send_coins_to_card_order_excel_from_db, filepath=%s, query=%s' % (filepath, queryDict))
  1122. startTime = queryDict.get('startTime', None)
  1123. endTime = queryDict.get('endTime', None)
  1124. ownerId = queryDict.get('ownerId', '')
  1125. startDateTime = datetime.datetime.strptime(startTime, '%Y-%m-%d %H:%M:%S')
  1126. endDateTime = datetime.datetime.strptime(endTime, '%Y-%m-%d %H:%M:%S')
  1127. filters = {
  1128. "ownerId": ownerId,
  1129. "dateTimeAdded__gte": startDateTime,
  1130. "dateTimeAdded__lte": endDateTime
  1131. }
  1132. rcds = UpCardScoreRecord.objects(**filters).order_by('-dateTimeAdded')
  1133. records = []
  1134. for rcd in rcds:
  1135. if rcd.address != '':
  1136. groupName = rcd.address
  1137. else:
  1138. card = Card.objects.get(cardNo = rcd.cardNo)
  1139. groupId = card.groupId
  1140. if groupId:
  1141. groupName = Group.get_group(groupId).groupName
  1142. else:
  1143. groupName = ''
  1144. dataList = [
  1145. (u'实体卡号', rcd.cardNo),
  1146. (u'上分数量', str(rcd.score)),
  1147. (u'备注', rcd.remark),
  1148. (u'绑定地址', groupName),
  1149. (u'创建时间', rcd.dateTimeAdded.strftime('%Y-%m-%d %H:%M:%S')),
  1150. ]
  1151. records.append(OrderedDict(dataList))
  1152. # records = []
  1153. # for rcd in rcds:
  1154. # dataList = [
  1155. # (u'实体卡号', rcd.cardNo),
  1156. # (u'上分数量', str(rcd.score)),
  1157. # (u'备注', rcd.remark),
  1158. # (u'创建时间', rcd.dateTimeAdded.strftime('%Y-%m-%d %H:%M:%S')),
  1159. # ]
  1160. #
  1161. # records.append(OrderedDict(dataList))
  1162. #
  1163. generate_excel_report(filepath, records)
  1164. def export_group_stat_excel_from_db(filepath, queryDict):
  1165. def calc_group_income_stats(groupId, rcds, startDate, endDate):
  1166. groupResult = GroupReport.get_rpt([groupId], startDate, endDate)
  1167. offlineCoin = groupResult[groupId].get('lineCoins', 0)
  1168. orderTotal, payIncome, offlineTime, totalTime, totalActivedRate = 0, RMB(0.0), 0, 0, 0
  1169. count = 0
  1170. for rcd in rcds:
  1171. count += 1
  1172. daily = rcd.get('daily', {})
  1173. other = rcd.get('other', {})
  1174. orderTotal += daily.get('totalIncomeCount', 0)
  1175. payIncome += daily.get('totalIncome', 0)
  1176. offlineTime += other.get('totalOfflineTime', 0)
  1177. totalTime = totalTime + other.get('totalOfflineTime', 0) + other.get('totalOnlineTime', 0) + other.get(
  1178. 'totalOfflineBusyTime', 0)
  1179. totalActivedRate += rcd.get('activedDevRatio', 0)
  1180. return {
  1181. 'orderTotal': orderTotal,
  1182. 'payIncome': payIncome,
  1183. 'offlineCoin': offlineCoin,
  1184. 'dailyActivityRate': int(round(totalActivedRate / count, 2)) if count > 0 else 0,
  1185. }
  1186. def calc_group_consume_stats(groupId, rcds):
  1187. def sum_fn(lst):
  1188. return sum(map(lambda _: Quantity(_), lst), Quantity('0'))
  1189. groupConsumptionMap = cum_stats(keys = ['daily', 'consumption'],
  1190. stats = rcds,
  1191. translate = translate_consumption,
  1192. sum_fn = sum_fn)
  1193. return groupConsumptionMap.get(groupId, None)
  1194. logger.info('start export_consume_order_excel_from_db,filepath=%s,query=%s' % (filepath, queryDict))
  1195. startTime = queryDict.get('startTime', None)
  1196. endTime = queryDict.get('endTime', None)
  1197. ownerId = queryDict.get('ownerId', '')
  1198. from apps.web.agent.models import Agent
  1199. from apps.web.dealer.models import Dealer
  1200. dealer = Dealer.objects(id = ownerId).first() # type: Dealer
  1201. agent = Agent.objects.get(id = dealer.agentId) # type: Agent
  1202. records = []
  1203. groupIds = Group.get_group_ids_of_dealer(ownerId)
  1204. for groupId in groupIds:
  1205. group = Group.get_group(groupId)
  1206. rcds = [rcd for rcd in GroupDailyStat.get_collection().find(
  1207. {'groupId': ObjectId(groupId), 'date': {'$gte': startTime, '$lte': endTime}},
  1208. {'origin': 0, 'hourly': 0, '_id': 0}).sort('groupId')]
  1209. incomeStats = calc_group_income_stats(groupId, rcds, startTime, endTime)
  1210. consumeStats = calc_group_consume_stats(groupId, rcds)
  1211. dataList = [
  1212. (u'组名称', group['groupName']),
  1213. (u'组地址', group['address']),
  1214. (u'总支付笔数', int(incomeStats.get('orderTotal'))),
  1215. (u'总支付金额', str(incomeStats.get('payIncome'))),
  1216. (u'线下投币', str(incomeStats.get('offlineCoin'))),
  1217. (u'设备日均活跃度', int(incomeStats.get('dailyActivityRate'))),
  1218. ]
  1219. # 把消费细节也记录到excel表中
  1220. consumeStats = consumeStats if consumeStats else {}
  1221. for info in consumeStats:
  1222. if info['source'] not in agent.hide_consume_kinds_dealer:
  1223. dataList.append((DEALER_CONSUMPTION_AGG_KIND_TRANSLATION.get(info['source']), str(info['value'])))
  1224. records.append(OrderedDict(dataList))
  1225. # 先统计
  1226. generate_excel_report(filepath, records)
  1227. def export_vcard_info_excel_from_db(filepath, queryDict):
  1228. dealerId = queryDict.get('dealerId')
  1229. # from apps.web.dealer.models import Dealer
  1230. from apps.web.user.models import MyUser, UserVirtualCard
  1231. # dealer = Dealer.objects.filter(id=dealerId)
  1232. groups = Group.objects.filter(ownerId = dealerId)
  1233. groupIds = map(lambda x: str(x.id), groups)
  1234. users = MyUser.objects.filter(groupId__in = groupIds)
  1235. openIds = map(lambda x: str(x.openId), users)
  1236. vcards = UserVirtualCard.objects.filter(ownerOpenId__in = openIds, dealerId = dealerId, groupId__in = groupIds)
  1237. result = []
  1238. for vcard in vcards:
  1239. group = groups.filter(id = vcard.groupId).first() or Group()
  1240. user = users.filter(openId = vcard.ownerOpenId, groupId = vcard.groupId).first()
  1241. dataList = [
  1242. (u'地址', group.groupName),
  1243. (u'卡名称', vcard.cardName),
  1244. (u'卡号', vcard.cardNo),
  1245. (u'卡主', vcard.nickname),
  1246. (u'用户ID', str(user.id)),
  1247. (u'用户电话', user.phone),
  1248. (u'开卡时间', vcard.startTime.strftime("%Y-%m-%d %H:%M:%S")),
  1249. (u'过期时间', vcard.expiredTime.strftime("%Y-%m-%d %H:%M:%S")),
  1250. (u'过期时间', vcard.expiredTime.strftime("%Y-%m-%d %H:%M:%S")),
  1251. (u'卡可用天数', vcard.periodDays),
  1252. (u'备注', vcard.userDesc),
  1253. (u'状态', vcard.status),
  1254. ]
  1255. quotaInfo = vcard.quotaInfo
  1256. if quotaInfo:
  1257. dataList.append((u'总额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quota'], 1)))
  1258. dataList.append((u'已用额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quotaUsed'], 1)))
  1259. dataList.append(
  1260. (u'总剩余额度()'.format(quotaInfo.get('quotaUnit')), round(quotaInfo['quota'] - quotaInfo['quotaUsed'], 1)))
  1261. dataList.append((u'日额度()'.format(quotaInfo.get('dayQuotaUnit')), round(quotaInfo['dayQuota'], 1)))
  1262. dataList.append((u'日已用额度()'.format(quotaInfo.get('dayQuotaUnit')), round(quotaInfo['dayUsed'], 1)))
  1263. dataList.append((u'日剩余额度()'.format(quotaInfo.get('dayQuotaUnit')),
  1264. round(quotaInfo['dayQuota'] - quotaInfo['dayUsed'], 1)))
  1265. result.append(OrderedDict(dataList))
  1266. generate_excel_report(filepath, result)
  1267. def export_group_user_account_excel_form_db(filepath, queryDict):
  1268. def get_spendMoney(consumeRecord):
  1269. if consumeRecord.aggInfo:
  1270. coin = float(str(consumeRecord.aggInfo.get('coin',0)))
  1271. else:
  1272. coin = 0
  1273. return coin
  1274. from apps.web.user.models import MyUser
  1275. ownerId = queryDict.get('ownerId', '')
  1276. startTime = queryDict.get('startTime', None)
  1277. endTime = queryDict.get('endTime', None)
  1278. groupIds = Group.get_group_ids_of_dealer(ownerId)
  1279. sheet = []
  1280. usersDict = dict() # type:dict
  1281. for groupId in groupIds:
  1282. users = MyUser.objects.filter(groupId=groupId)
  1283. consumeRecords = ConsumeRecord.objects.filter(groupId=groupId,dateTimeAdded__gte=startTime,dateTimeAdded__lte=endTime)
  1284. rechargeRecords = RechargeRecord.objects.filter(groupId=groupId,dateTimeAdded__gte=startTime,dateTimeAdded__lte=endTime)# type:List
  1285. group = Group.get_group(groupId)
  1286. # 统计充值信息
  1287. effectiveRechargeRecords = []
  1288. effectiveConsumeRecords = []
  1289. for rechargeRecord in rechargeRecords:
  1290. # 找出有效充值数据
  1291. for i in users:
  1292. if i.openId == rechargeRecord.openId:
  1293. effectiveRechargeRecords.append(
  1294. {'nickName': i.nickname, 'openId': i.openId, 'recharge': float(str(rechargeRecord.money)),
  1295. 'balance': float(str(i.balance))})
  1296. for effectiveRechargeRecord in effectiveRechargeRecords:
  1297. # 将同相同用户的充值账单整理分类,并统计充值金额
  1298. if effectiveRechargeRecord['openId'] not in usersDict:
  1299. usersDict.update({effectiveRechargeRecord['openId']:effectiveRechargeRecord})
  1300. else:
  1301. usersDict[effectiveRechargeRecord['openId']]['recharge'] += effectiveRechargeRecord['recharge']
  1302. # 统计消费信息
  1303. for consumeRecord in consumeRecords:
  1304. for i in users:
  1305. if i.openId == consumeRecord.openId:
  1306. effectiveConsumeRecords.append(
  1307. {'nickName': i.nickname, 'openId': i.openId, 'consume': get_spendMoney(consumeRecord),'balance': float(str(i.balance))})
  1308. for effectiveConsumeRecord in effectiveConsumeRecords:
  1309. if effectiveConsumeRecord['openId'] not in usersDict:
  1310. usersDict.update({effectiveConsumeRecords['openId']: effectiveConsumeRecords})
  1311. else:
  1312. if 'consume' not in usersDict[effectiveConsumeRecord['openId']]:
  1313. usersDict[effectiveConsumeRecord['openId']].update({
  1314. 'consume' : effectiveConsumeRecord['consume'],
  1315. })
  1316. else:
  1317. usersDict[effectiveConsumeRecord['openId']]['consume'] += effectiveConsumeRecord['consume']
  1318. records = []
  1319. total_recharged = RMB(0)
  1320. total_consumed = 0
  1321. balance = RMB(0)
  1322. for openId,info in usersDict.items():
  1323. dataList = [
  1324. (u'用户',info.get('nickName')),
  1325. (u"用户总充值",info.get('recharge',0)),
  1326. (u"用户总消费",info.get('consume',0)),
  1327. (u"用户账户余额",info.get('balance',0)),
  1328. ]
  1329. records.append(OrderedDict(dataList))
  1330. total_recharged += info.get('recharge',0)
  1331. total_consumed += info.get('consume',0)
  1332. balance += info.get('balance',0)
  1333. records.append(OrderedDict([
  1334. (u'用户', u'总计'),
  1335. (u"用户总充值", total_recharged),
  1336. (u"用户总消费", total_consumed),
  1337. (u"用户账户余额", balance),
  1338. ]))
  1339. sheet.append(ExcelSheet(data=records,name=group.groupName))
  1340. gernerate_excel_report_for_sheet(filepath, sheet)
  1341. def poll_dealer_recharge_record(record_id, pay_app_type, interval, total_count):
  1342. # type: (str, str, int, int)->None
  1343. poller_cls = PayManager().get_poller(pay_app_type = pay_app_type)
  1344. poller = poller_cls(
  1345. record_id = record_id,
  1346. interval = interval,
  1347. total_count = total_count,
  1348. record_cls = DealerRechargeRecord,
  1349. post_pay = post_pay) # type: PayRecordPoller
  1350. poller.start()
  1351. def aggregate_records(records):
  1352. totalIncome = RMB(0)
  1353. actualTotalIncome = RMB(0)
  1354. partDict = dict()
  1355. for record in records: # type: DealerIncomeProxy
  1356. totalIncome += RMB(record.totalAmount)
  1357. actualTotalIncome += RMB(sum(record.actualAmountMap.values(), RMB(0)))
  1358. for part in record.partition:
  1359. if part["role"] == "agent":
  1360. continue
  1361. if RMB(part["money"]) == RMB(0.00):
  1362. continue
  1363. partDict[part["id"]] = partDict.setdefault(part["id"], RMB(0)) + part["money"]
  1364. ledgerInfo = ""
  1365. for k, _v in partDict.items():
  1366. dealer = Dealer.objects.filter(id = k).only("nickname", "username").first()
  1367. ledgerInfo += "{}-{}-{}\r\n".format(dealer.nickname, dealer.username, _v)
  1368. return {
  1369. "totalIncome": totalIncome,
  1370. "actualTotalIncome": actualTotalIncome,
  1371. "ledgerInfo": ledgerInfo
  1372. }
  1373. def export_aggregate_dealer_income(filePath, queryDict, aggregateType):
  1374. s = datetime.datetime.strptime(queryDict.pop("startTime"), "%Y-%m-%d")
  1375. e = datetime.datetime.strptime(queryDict.pop("endTime"), "%Y-%m-%d")
  1376. queryDict["dateTimeAdded__gte"] = s
  1377. queryDict["dateTimeAdded__lt"] = e
  1378. records = DealerIncomeProxy.objects.filter(**queryDict)
  1379. if aggregateType == "date":
  1380. ts = queryDict.get("dateTimeAdded__gte")
  1381. te = queryDict.get("dateTimeAdded__lt")
  1382. aggregateRes = {(ts + datetime.timedelta(day)).strftime("%Y-%m-%d"): list() for day in range((te - ts).days)}
  1383. for record in records:
  1384. aggregateRes[record.dateTimeAdded.strftime(Const.DATE_FMT)].append(record)
  1385. elif aggregateType == "group":
  1386. groupIds = queryDict.get("groupId__in")
  1387. aggregateRes = {g: list() for g in groupIds}
  1388. for record in records:
  1389. aggregateRes[str(record.groupId)].append(record)
  1390. else:
  1391. logger.error("undefined aggregate type!")
  1392. return
  1393. data = list()
  1394. for key, items in aggregateRes.items():
  1395. tempAggregate = aggregate_records(items)
  1396. if aggregateType == "group":
  1397. group = Group.get_group(key)
  1398. tempAggregate.update({"groupName": group.get("groupName")})
  1399. else:
  1400. tempAggregate.update({"time": key})
  1401. data.append(tempAggregate)
  1402. writeDataList = list()
  1403. a = RMB(0)
  1404. b = RMB(0)
  1405. _k = "groupName" if aggregateType == "group" else "time"
  1406. for v in sorted(data, key = lambda x: x[_k]):
  1407. if aggregateType == "group":
  1408. tempData = [(u"地址名称", v.get("groupName"))]
  1409. else:
  1410. tempData = [(u"日期", v.get("time")), ]
  1411. tempData.extend([
  1412. (u"总收益", "{0:.2f}".format(float(v.get("totalIncome")))),
  1413. (u"总分帐金额", "{0:.2f}".format(float(v.get("actualTotalIncome")))),
  1414. (u"分账明细", v.get("ledgerInfo"))
  1415. ])
  1416. a += v.get("totalIncome")
  1417. b += v.get("actualTotalIncome")
  1418. writeDataList.append(OrderedDict(tempData))
  1419. else:
  1420. if aggregateType == "group":
  1421. tempData = [(u"地址名称", u"总计")]
  1422. else:
  1423. tempData = [(u"日期", u"总计")]
  1424. tempData.extend([
  1425. (u"总收益", "{:.2f}".format(float(a))),
  1426. (u"总分帐金额", "{:.2f}".format(float(b))),
  1427. (u"分账明细", u"")
  1428. ])
  1429. writeDataList.append(OrderedDict(tempData))
  1430. generate_excel_report(filePath, writeDataList)
  1431. def dealer_auto_withdraw():
  1432. """
  1433. 经销商自动提现的脚本 每日走一次
  1434. :return:
  1435. """
  1436. # 找出需要 执行脚本的经销商
  1437. dealers = Dealer.get_auto_withdraw_dealers()
  1438. needExecuteDealers = list()
  1439. # 筛选处 符合条件的 经销商 主要是有金额
  1440. for dealer in dealers: # type: Dealer
  1441. strategy = dealer.auto_withdraw_strategy
  1442. # 按周提现
  1443. if strategy['type'] == 'asWeek':
  1444. if datetime.datetime.now().isoweekday() == strategy['value']:
  1445. logger.debug('{} has open auto withdraw switch.'.format(repr(dealer)))
  1446. needExecuteDealers.append(dealer)
  1447. # 每天提现
  1448. elif strategy['type'] == 'asDay':
  1449. logger.debug('{} has open auto withdraw switch.'.format(repr(dealer)))
  1450. needExecuteDealers.append(dealer)
  1451. # 按月提现
  1452. elif strategy['type'] == "asMonth":
  1453. if datetime.datetime.now().day == strategy['value']:
  1454. logger.debug('{} has open auto withdraw switch.'.format(repr(dealer)))
  1455. needExecuteDealers.append(dealer)
  1456. else:
  1457. logger.error('{} auto withdraw strategy type <{}> error'.format(repr(dealer), strategy['type']))
  1458. for dealer in needExecuteDealers: # type: Dealer
  1459. try:
  1460. for incomeType in DEALER_INCOME_TYPE.choices():
  1461. withdrawInfoList = Dealer.get_income_balance_list(dealer, incomeType, settings.WITHDRAW_MINIMUM)
  1462. bank_card_no = None
  1463. autoWithdrawType = dealer.withdrawOptions.get("autoWithdrawType")
  1464. if autoWithdrawType == 'wechat':
  1465. if not dealer.auto_withdraw_bound_open_id:
  1466. logger.warning(
  1467. 'dealer<id={}> of auto withdraw<type=wechat> has no bound open id.'.format(str(dealer.id)))
  1468. continue
  1469. else:
  1470. dealer.withdraw_open_id = dealer.auto_withdraw_bound_open_id
  1471. else:
  1472. bank_card_no = dealer.auto_withdraw_bank_account
  1473. if not bank_card_no:
  1474. logger.warning(
  1475. 'dealer<id={}> of auto withdraw<type=wechat> has no bound bank account.'.format(str(dealer.id)))
  1476. continue
  1477. for sourceKey, balance in withdrawInfoList:
  1478. try:
  1479. result = DealerWithdrawService(
  1480. payee = dealer,
  1481. income_type = incomeType,
  1482. amount = balance,
  1483. pay_type = autoWithdrawType,
  1484. bank_card_no = bank_card_no).execute(sourceKey, True)
  1485. logger.info("auto withdraw success, result is <{}>".format(result))
  1486. except Exception as e:
  1487. logger.exception(e)
  1488. except Exception as e:
  1489. logger.exception(e)
  1490. def dealer_auto_charge_sim_card():
  1491. """
  1492. 经销商自动充值SIM卡
  1493. :return:
  1494. """
  1495. def consume_dealer_balance(pay_gateway, incomeType, dealer, cost):
  1496. source_key = pay_gateway.withdraw_source_key()
  1497. fundKey = dealer.fund_key(income_type = incomeType, source_key = source_key)
  1498. filter = {
  1499. '_id': dealer.id,
  1500. '{fundKey}.balance'.format(fundKey = fundKey): {'$gte': cost.mongo_amount}
  1501. }
  1502. update = {
  1503. '$inc': {'{fundKey}.balance'.format(fundKey = fundKey): (-cost).mongo_amount}
  1504. }
  1505. result = Dealer.get_collection().update_one(filter, update, upsert = False)
  1506. if result.matched_count == 1 and result.modified_count == 1:
  1507. return True
  1508. else:
  1509. return False
  1510. devices = Device.get_sim_expire_notify_devices(extra_filter = {'simChargeAuto': True})
  1511. logger.info('start auto charge sim card, %s devices will be expired' % len(devices))
  1512. from cytoolz import groupby
  1513. devMap = groupby('ownerId', devices)
  1514. # TODO 需要平台资金池合并成一个, 不在通过具体的支付网关来确认提现网关
  1515. pay_gateway = get_platform_wallet_pay_gateway(AppPlatformType.PLATFORM)
  1516. for ownerId, devs in devMap.items():
  1517. dealer = Dealer.objects(id = ownerId).first()
  1518. if not dealer:
  1519. logger.warning('dealer<id={}> not exists.'.format(ownerId))
  1520. continue
  1521. groupMap = {}
  1522. devObjs = list(Device.objects(logicalCode__in = [dev['logicalCode'] for dev in devs]))
  1523. for income_type in DEALER_INCOME_TYPE.choices():
  1524. can_used_balance = dealer.sub_balance(income_type, pay_gateway.withdraw_source_key())
  1525. if can_used_balance <= RMB(0):
  1526. continue
  1527. for devObj in devObjs[:]: # type: Device
  1528. try:
  1529. logger.info('start auto charge sim of {}.'.format(devObj))
  1530. if ownerId != devObj.ownerId:
  1531. logger.warning(
  1532. 'auto charge sim failure of {}. {} not equal {}. may be unregister.'.format(
  1533. devObj, devObj.ownerId, ownerId))
  1534. devObjs.remove(devObj)
  1535. continue
  1536. if not devObj.is_expire_in_this_month:
  1537. continue
  1538. devItems = [devObj]
  1539. rcd = create_dealer_sim_charge_order(pay_gateway, dealer, devItems, 'auto', can_used_balance)
  1540. if not rcd:
  1541. devObjs.remove(devObj)
  1542. continue
  1543. costMoney = RMB(rcd.totalFee / 100.0)
  1544. result = consume_dealer_balance(pay_gateway, income_type, dealer, costMoney)
  1545. if result:
  1546. logger.info('auto charge sim of {} success.'.format(devObj))
  1547. rcd.succeed(wxOrderNo = rcd.orderNo)
  1548. post_sim_recharge(rcd)
  1549. if devObj.groupId not in groupMap:
  1550. group = Group.get_group(devObj.groupId)
  1551. groupMap[devObj.groupId] = group
  1552. else:
  1553. group = groupMap[devObj.groupId]
  1554. income_record = RechargeRecord.issue_from_auto_sim_order(dealer, rcd, devObj, group)
  1555. record_income_proxy(DEALER_INCOME_SOURCE.AUTO_SIM, income_record, {
  1556. "owner": [
  1557. {
  1558. "money": (-costMoney).mongo_amount,
  1559. "role": "owner",
  1560. "share": Percent("100.0").mongo_amount,
  1561. "id": str(dealer.id)
  1562. }
  1563. ],
  1564. 'partner': []
  1565. })
  1566. can_used_balance = can_used_balance - costMoney
  1567. devObjs.remove(devObj)
  1568. else:
  1569. logger.info('auto charge sim of {} failure to cancel it.'.format(devObj))
  1570. rcd.cancel()
  1571. break
  1572. except Exception, e:
  1573. logger.exception(e)
  1574. def auto_charge_sim_card(dealerId):
  1575. """
  1576. 仅手工执行, 对单个经销商进行SIM卡自动充值
  1577. :param dealerId:
  1578. :return:
  1579. """
  1580. def consume_dealer_balance(pay_gateway, incomeType, dealer, cost):
  1581. source_key = pay_gateway.withdraw_source_key()
  1582. fundKey = dealer.fund_key(income_type = incomeType, source_key = source_key)
  1583. filter = {
  1584. '_id': dealer.id,
  1585. '{fundKey}.balance'.format(fundKey = fundKey): {'$gte': cost.mongo_amount}
  1586. }
  1587. update = {
  1588. '$inc': {'{fundKey}.balance'.format(fundKey = fundKey): (-cost).mongo_amount}
  1589. }
  1590. result = Dealer.get_collection().update_one(filter, update, upsert = False)
  1591. if result.matched_count == 1 and result.modified_count == 1:
  1592. logger.debug('dec dealer<id={}> money = {}, fundKey = {}'.format(str(dealer.id), cost, fundKey))
  1593. return True
  1594. else:
  1595. return False
  1596. devices = Device.get_sim_expire_notify_devices(dealer_id = dealerId, extra_filter = {'simChargeAuto': True})
  1597. logger.info('start auto charge sim card, %s devices will be expired' % len(devices))
  1598. from cytoolz import groupby
  1599. devMap = groupby('ownerId', devices)
  1600. # TODO 需要平台资金池合并成一个, 不在通过具体的支付网关来确认提现网关
  1601. pay_gateway = get_platform_wallet_pay_gateway(AppPlatformType.PLATFORM)
  1602. for ownerId, devs in devMap.items():
  1603. dealer = Dealer.objects(id = ownerId).first()
  1604. if not dealer:
  1605. logger.warning('dealer<id={}> not exists.'.format(ownerId))
  1606. continue
  1607. groupMap = {}
  1608. devObjs = list(Device.objects(logicalCode__in = [dev['logicalCode'] for dev in devs]))
  1609. for income_type in DEALER_INCOME_TYPE.choices():
  1610. can_used_balance = dealer.sub_balance(income_type, pay_gateway.withdraw_source_key())
  1611. if can_used_balance <= RMB(0):
  1612. continue
  1613. for devObj in devObjs[:]: # type: Device
  1614. try:
  1615. logger.info('start auto charge sim card,the device is auto charge,devNo=%s' % devObj.devNo)
  1616. if ownerId != devObj.ownerId:
  1617. logger.warning(
  1618. 'ownerId of device<devNo={},ownerId={}> not equal {}. may be unregister.'.format(
  1619. devObj.devNo, devObj.ownerId, ownerId))
  1620. devObjs.remove(devObj)
  1621. continue
  1622. if not devObj.is_expire_in_this_month:
  1623. continue
  1624. devItems = [devObj]
  1625. rcd = create_dealer_sim_charge_order(pay_gateway, dealer, devItems, 'auto', can_used_balance)
  1626. if not rcd:
  1627. devObjs.remove(devObj)
  1628. continue
  1629. costMoney = RMB(rcd.totalFee / 100.0)
  1630. result = consume_dealer_balance(pay_gateway, income_type, dealer, costMoney)
  1631. if result:
  1632. rcd.succeed(wxOrderNo = rcd.orderNo)
  1633. post_sim_recharge(rcd)
  1634. logger.info('charge sim card success,devNo=%s' % devObj.devNo)
  1635. if devObj.groupId not in groupMap:
  1636. group = Group.get_group(devObj.groupId)
  1637. groupMap[devObj.groupId] = group
  1638. else:
  1639. group = groupMap[devObj.groupId]
  1640. income_record = RechargeRecord.issue_from_auto_sim_order(dealer, rcd, devObj, group)
  1641. record_income_proxy(DEALER_INCOME_SOURCE.AUTO_SIM, income_record, {
  1642. "owner": [
  1643. {
  1644. "money": (-costMoney).mongo_amount,
  1645. "role": "owner",
  1646. "share": Percent("100.0").mongo_amount,
  1647. "id": str(dealer.id)
  1648. }
  1649. ],
  1650. 'partner': []
  1651. })
  1652. can_used_balance = can_used_balance - costMoney
  1653. devObjs.remove(devObj)
  1654. else:
  1655. rcd.cancel()
  1656. break
  1657. except Exception, e:
  1658. logger.exception(e)
  1659. def batch_set_device_params(logicalCodes, updateConf, lastSetConf, operationId):
  1660. logger.info(
  1661. 'start batch_set_device_params, logicalCodes=<{}>, updateConf=<{}>, lastSetConf=<{}>, operationId=<{}>'.format(
  1662. logicalCodes, updateConf, lastSetConf, operationId))
  1663. from taskmanager.mediator import task_caller
  1664. for logicalCode in logicalCodes:
  1665. task_caller('set_device_params', logicalCode = logicalCode,
  1666. updateConf = updateConf, lastSetConf = lastSetConf, operationId = operationId)
  1667. def set_device_params(logicalCode, updateConf, lastSetConf, operationId):
  1668. logger.info(
  1669. 'start set_device_params, logicalCodes=<{}>, updateConf=<{}>, lastSetConf=<{}>, operationId=<{}>'.format(
  1670. logicalCode, updateConf, lastSetConf, operationId))
  1671. dev = Device.get_dev_by_l(logicalCode)
  1672. operation = OperatorLog.record_dev_setting_changes_log(dev.owner, 'record_someone_set_devSettings',
  1673. dev['logicalCode'], dev['devType']['code'],
  1674. {
  1675. 'updateConfStr': json.dumps(updateConf),
  1676. 'operationId': operationId,
  1677. 'status': 'waiting'
  1678. })
  1679. box = ActionDeviceBuilder.create_action_device(dev) # type: SmartBox
  1680. from apps.web.device.models import RequestBodyDict
  1681. requestBody = RequestBodyDict({"POST": updateConf})
  1682. try:
  1683. if not lastSetConf:
  1684. lastSetConf = box.get_dev_setting()
  1685. box.set_device_function(requestBody, lastSetConf)
  1686. box.set_device_function_param(requestBody, lastSetConf)
  1687. except Exception, e:
  1688. logger.exception('error happened, error=%s' % (e,))
  1689. operation.update(content__status = 'fail', content__beforeCacheStr = json.dumps(lastSetConf))
  1690. return 'set_device_params logicalCode=<{}> is fail'.format(logicalCode)
  1691. else:
  1692. operation.update(content__status = 'success', content__beforeCacheStr = json.dumps(lastSetConf))
  1693. return 'set_device_params logicalCode=<{}> is success'.format(logicalCode)
  1694. def batch_set_server_settings(operator, logicalCodes, payload, ownerId, devTypeCode, operationId):
  1695. logger.debug(
  1696. 'batch_set_server_settings, operator={}, logicalCodes={}, payload={}, ownerId={}, devTypeCode={}, operationId={}'.format(
  1697. operator, logicalCodes, payload, ownerId, devTypeCode, operationId))
  1698. from taskmanager.mediator import task_caller
  1699. for logicalCode in logicalCodes:
  1700. task_caller('set_server_settings', operator = operator, logicalCode = logicalCode, payload = payload,
  1701. ownerId = ownerId,
  1702. devTypeCode = devTypeCode, operationId = operationId)
  1703. def set_server_settings(operator, logicalCode, payload, ownerId, devTypeCode, operationId):
  1704. logger.info(
  1705. 'set_server_settings, operator={}, logicalCode={}, payload={}, ownerId={}, devTypeCode={}, operationId={}'.format(
  1706. operator, logicalCode, payload, ownerId, devTypeCode, operationId))
  1707. device = Device.get_dev_by_l(logicalCode) # type: DeviceDict
  1708. _operator = namedtuple('Operator', ['id', 'username', 'role'])(**operator)
  1709. operation = OperatorLog.log_dev_operation(
  1710. operator = _operator,
  1711. device = device,
  1712. operator_name = 'setServerSetting',
  1713. content = {
  1714. 'operationId': operationId,
  1715. 'after': payload,
  1716. 'status': 'waiting'
  1717. }) # type: OperatorLog
  1718. try:
  1719. if device.ownerId != ownerId:
  1720. raise Exception(u'无法操作设备')
  1721. if device.devTypeCode != devTypeCode:
  1722. raise Exception(u'无法操作该类型设备')
  1723. before = device.deviceAdapter.get_server_setting()
  1724. device.deviceAdapter.set_server_setting(payload)
  1725. operation.update(content__status = 'success', content__before = before)
  1726. except Exception, e:
  1727. logger.exception('error happened, error=%s' % (e,))
  1728. operation.update(content__status = 'fail')
  1729. def push_shanghai_platform_heatbeat():
  1730. logger.info('start push_shanghai_platform_heatbeat')
  1731. from apps.web.south_intf.shanghai_urban_data_collection_platform import ShangHaiUrbanDataCollectionPlatform
  1732. from apps.web.south_intf.shanghai_urban_data_collection_platform import ShangHaiUrbanDataCollectionPlatformModel
  1733. from apps.web.device.models import Device
  1734. all_dealer_info = ShangHaiUrbanDataCollectionPlatformModel.all_dealers()
  1735. for _one_info in all_dealer_info:
  1736. devs = Device.get_devs_by_ownerId(_one_info['dealerId'])
  1737. for dev in devs:
  1738. try:
  1739. ShangHaiUrbanDataCollectionPlatform(dev, **_one_info).celery_push_heatbeat()
  1740. except:
  1741. import traceback
  1742. logger.error(traceback.format_exc())
  1743. return 'success'
  1744. def export_modify_customer_balance_record_excel_from_db(filepath, queryDict):
  1745. customerOpenId = queryDict.get("customerOpenId")
  1746. pageIndex = queryDict.get("pageIndex")
  1747. pageSize = queryDict.get("pageSize")
  1748. ownerId = queryDict.get("ownerId")
  1749. from apps.web.user.models import MyUser
  1750. user = MyUser.objects(openId=customerOpenId).first()
  1751. if not user:
  1752. records = RechargeRecord.objects(
  1753. ownerId=ownerId,
  1754. via='sendcoin').order_by('-dateTimeAdded').skip((pageIndex - 1) * pageSize).limit(pageSize)
  1755. else:
  1756. records = RechargeRecord.objects(
  1757. ownerId=ownerId,
  1758. openId=customerOpenId,
  1759. via='sendcoin').order_by('-dateTimeAdded').skip((pageIndex - 1) * pageSize).limit(pageSize)
  1760. dataList = []
  1761. result = []
  1762. if not user:
  1763. for r in records:
  1764. u = MyUser.objects(openId=r.openId).first()
  1765. if u is None:
  1766. logger.error('invalid openId, openId=%s' % r.openId)
  1767. continue
  1768. dataList.extend([
  1769. ('用户名称',u.nickname if u.nickname is not None else ''),
  1770. ('用户ID', customerOpenId),
  1771. ('操作人员',r.operator),
  1772. ('派币金额', r.coins),
  1773. ('派币地址', r.groupName),
  1774. ('派币时间', r.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S")),
  1775. ('备注', r.desc),]
  1776. )
  1777. result.append(OrderedDict(dataList))
  1778. else:
  1779. for r in records:
  1780. dataList.extend(
  1781. [
  1782. ('用户名称', user.nickname if user.nickname is not None else ''),
  1783. ('用户ID', customerOpenId),
  1784. ('操作人员', r.operator),
  1785. ('派币金额', r.coins),
  1786. ('派币地址', r.groupName),
  1787. ('派币时间', r.dateTimeAdded.strftime("%Y-%m-%d %H:%M:%S")),
  1788. ('备注', r.desc)
  1789. ]
  1790. )
  1791. result.append(OrderedDict(dataList))
  1792. generate_excel_report(filepath, result)
  1793. def ledger_consume_order_stats(date=None, statsId=None): # type: (Optional[str, datetime], str) -> None
  1794. """
  1795. 统计经销商的每日分润的收益
  1796. """
  1797. # 获取时间戳的转换
  1798. date = date or datetime.datetime.now() - datetime.timedelta(days=1)
  1799. if isinstance(date, datetime.datetime):
  1800. date = date.strftime("%Y-%m-%d")
  1801. if statsId:
  1802. query = DealerGroupStats.objects.filter(id=statsId)
  1803. else:
  1804. query = DealerGroupStats.objects.filter(date=date)
  1805. # 处理每一单的分账任务
  1806. for _stats in query:
  1807. try:
  1808. LedgerConsumeOrder(_stats).execute()
  1809. except Exception as e:
  1810. logger.exception("[ledger_consume_order_stats] stats <{}> execute ledger error ={}".format(_stats, e))