upgrade_dealer_datacenter_20180913.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. Goals:
  5. 1. 从收益来源模型里生成经销商的收益代理模型
  6. 2. 加载缓存
  7. """
  8. import datetime
  9. import itertools
  10. import logging
  11. import os
  12. import sys
  13. from collections import defaultdict
  14. from functools import partial
  15. from mongoengine import Q
  16. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  17. sys.path.insert(0, PROJECT_ROOT)
  18. from script.base import init_env
  19. init_env(interactive = True)
  20. import daiquiri
  21. import click
  22. from pymongo.errors import AutoReconnect
  23. from bson.objectid import ObjectId
  24. from apps.web.constant import Const, USER_RECHARGE_TYPE
  25. from apps.web.user.models import ConsumeRecord, RechargeRecord, CardRechargeRecord
  26. from apps.web.dealer.models import Dealer
  27. from apps.web.dealer.proxy import (DealerIncomeProxy,
  28. generate_recharge_title, record_income_proxy)
  29. from apps.web.report.models import GroupDailyStat, GroupMonthlyStat
  30. from apps.web.report.utils import record_income_stats, record_consumption_stats, logger
  31. from apps.web.device.models import Group
  32. from apilib.utils_mongo import BulkHandler
  33. from apps.web.report.ledger import Ledger
  34. flatten = itertools.chain.from_iterable
  35. daiquiri.setup(
  36. level = logging.INFO,
  37. outputs = (
  38. daiquiri.output.Stream(sys.stdout, level = logging.INFO),
  39. daiquiri.output.TimedRotatingFile(
  40. 'upgrade_dealer_datacenter-everything.log',
  41. level = logging.DEBUG,
  42. interval = datetime.timedelta(days = 1))
  43. )
  44. )
  45. logger = logging.getLogger(__name__)
  46. @click.group()
  47. def cli():
  48. """
  49. 更新经销商数据中心
  50. :return:
  51. """
  52. logger.info('cli called')
  53. def generate_bulk(collection):
  54. return collection.initialize_unordered_bulk_op()
  55. # @retry(retry=retry_if_exception_type(AutoReconnect), stop=stop_after_attempt(7))
  56. def execute_bulk(bulk):
  57. result = {'success': 0, 'info': 0}
  58. try:
  59. if len(bulk._BulkOperationBuilder__bulk.ops) != 0:
  60. result['info'] = bulk.execute()
  61. result['success'] = 1
  62. else:
  63. result['info'] = 'no operation to execute'
  64. result['success'] = 1
  65. except AutoReconnect as e:
  66. logger.exception(e)
  67. raise e
  68. except Exception as e:
  69. logger.exception(e)
  70. result['info'] = e
  71. return result
  72. def __upgrade_record(clazz, chunk_size):
  73. items = []
  74. count = 0
  75. for record in clazz.get_collection().find(spec = {}, fields = {'_id': 1, 'time': 1, 'dateTimeAdded': 1},
  76. timeout = False):
  77. count += 1
  78. if ('time' not in record or not record['time']) and (
  79. 'dateTimeAdded' not in record or not record['dateTimeAdded']):
  80. logger.info('%s; no time and dateTimeAdded; %s' % (count, record['_id']))
  81. items.append({
  82. '_id': record['_id'],
  83. 'dateTimeAdded': datetime.datetime.now()
  84. })
  85. continue
  86. if 'time' not in record or not record['time']:
  87. logger.info('%s; no time; %s' % (count, record['_id']))
  88. continue
  89. if 'dateTimeAdded' not in record or not record['dateTimeAdded']:
  90. logger.info('%s; no dateTimeAdded; %s' % (count, record['_id']))
  91. items.append({
  92. '_id': record['_id'],
  93. 'dateTimeAdded': datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S")
  94. })
  95. continue
  96. if record['dateTimeAdded'].strftime('%Y-%m-%d %H:%M:%S') == record['time']:
  97. logger.info('%s; is equal; %s' % (count, record['_id']))
  98. continue
  99. else:
  100. logger.info('%s; is not equal; %s' % (count, record['_id']))
  101. items.append({
  102. '_id': record['_id'],
  103. 'dateTimeAdded': datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S")
  104. })
  105. bulker = BulkHandler(clazz.get_collection())
  106. for item in items:
  107. auto_id = item.pop('_id')
  108. bulker.update(query_dict = {'_id': auto_id}, update_dict = {'$set': item})
  109. if len(bulker.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
  110. result = bulker.execute()
  111. assert bool(result['success']), 'error happened'
  112. bulker = BulkHandler(clazz.get_collection())
  113. if len(bulker.bulk._BulkOperationBuilder__bulk.ops) > 0:
  114. result = bulker.execute()
  115. assert bool(result['success']), 'error happened'
  116. bulker = BulkHandler(clazz.get_collection())
  117. logger.info('hello')
  118. @cli.command()
  119. @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500)
  120. def upgrade_consume_record(chunk_size):
  121. __upgrade_record(ConsumeRecord, chunk_size)
  122. @cli.command()
  123. @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500)
  124. def upgrade_recharge_record(chunk_size):
  125. __upgrade_record(RechargeRecord, chunk_size)
  126. @cli.command()
  127. def update_card_recharge_record():
  128. """add dealerId to every CardRechargeRecord"""
  129. CardRechargeRecordCollection = CardRechargeRecord.get_collection()
  130. card_recharge_record_bulk = BulkHandler(CardRechargeRecordCollection)
  131. count = 0
  132. for record in CardRechargeRecordCollection.find({}):
  133. ownerId = ObjectId(Group.get_group(groupId = record['groupId'])['ownerId'])
  134. card_recharge_record_bulk.update({}, {'$set': {'ownerId': ownerId}})
  135. count += 1
  136. logger.info('updating CardRechargeRecord(id=%s) ownerId -> %s count=%d' % (record['_id'], ownerId, count))
  137. if len(card_recharge_record_bulk.bulk._BulkOperationBuilder__bulk.ops) > 2000:
  138. card_recharge_record_bulk.execute()
  139. card_recharge_record_bulk = BulkHandler(DealerIncomeProxy.get_collection())
  140. if len(card_recharge_record_bulk.bulk._BulkOperationBuilder__bulk.ops) > 0:
  141. card_recharge_record_bulk.execute()
  142. assert CardRechargeRecordCollection.find({'ownerId': {'$exists': True}}).count() \
  143. == CardRechargeRecordCollection.find({}).count(), u'not all update_CardRechargeRecord updated'
  144. @cli.command()
  145. @click.option('-b', '--begin', help = u'begin', type = str, default = '')
  146. @click.option('-e', '--end', help = u'end', type = str, default = '')
  147. @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
  148. @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
  149. @click.option('-c', '--check', help = u'check', type = str, default = 'no')
  150. def build_income_stats(begin, end, limit, skip, check):
  151. logger.info('begin = %s; end = %s; limit = %d; skip = %s; check = %s' % (begin, end, limit, skip, check))
  152. _filter = None
  153. if begin:
  154. _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}})
  155. if end:
  156. if _filter:
  157. _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
  158. else:
  159. _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
  160. if not _filter:
  161. _filter = Q(__raw__ = {})
  162. check_done = False
  163. if check == 'yes':
  164. check_done = True
  165. while True:
  166. logger.info('skip = %s' % (skip,))
  167. proxies = []
  168. for proxy in DealerIncomeProxy.get_collection().find(spec = _filter.to_query(DealerIncomeProxy),
  169. timeout = False).sort('_id', 1).limit(
  170. limit).skip(skip):
  171. proxies.append(proxy)
  172. if len(proxies) == 0:
  173. break
  174. for proxy in proxies:
  175. auto_id = proxy.pop('_id')
  176. proxy.update({'id': ObjectId(auto_id)})
  177. proxy = DealerIncomeProxy(**proxy)
  178. record_income_stats(proxy, check = check_done, allowed = {'group': True})
  179. skip = skip + limit
  180. @cli.command()
  181. @click.option('-b', '--begin', help = u'begin', type = str, default = '')
  182. @click.option('-e', '--end', help = u'end', type = str, default = '')
  183. @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
  184. @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
  185. @click.option('-s', '--chunk_size', help = u'chunk size', type = int, default = 5000)
  186. def bulk_build_income_stats(begin, end, limit, skip, chunk_size):
  187. logger.info('begin = %s; end = %s; limit = %d; skip = %s; chunk size = %s' % (begin, end, limit, skip, chunk_size))
  188. _filter = None
  189. if begin:
  190. _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}})
  191. if end:
  192. if _filter:
  193. _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
  194. else:
  195. _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
  196. if not _filter:
  197. _filter = Q(__raw__ = {})
  198. while True:
  199. logger.info('skip = %s' % (skip,))
  200. bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
  201. bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
  202. proxies = []
  203. for proxy in DealerIncomeProxy.get_collection().find(spec = _filter.to_query(DealerIncomeProxy),
  204. timeout = False).sort('_id', 1).limit(
  205. limit).skip(skip):
  206. proxies.append(proxy)
  207. if len(proxies) == 0:
  208. break
  209. for proxy in proxies:
  210. auto_id = proxy.pop('_id')
  211. proxy.update({'id': ObjectId(auto_id)})
  212. proxy = DealerIncomeProxy(**proxy)
  213. get_income_stats(proxy, bulkerGroupDailyStat, bulkerGroupMonthlyStat)
  214. if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
  215. result = bulkerGroupDailyStat.execute()
  216. assert bool(result['success']), 'error happened'
  217. bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
  218. if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
  219. result = bulkerGroupMonthlyStat.execute()
  220. assert bool(result['success']), 'error happened'
  221. bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
  222. if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) > 0:
  223. result = bulkerGroupDailyStat.execute()
  224. assert bool(result['success']), 'error happened'
  225. bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
  226. if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) > 0:
  227. result = bulkerGroupMonthlyStat.execute()
  228. assert bool(result['success']), 'error happened'
  229. bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
  230. skip = skip + limit
  231. @cli.command()
  232. @click.option('-b', '--begin', help = u'begin', type = str, default = '')
  233. @click.option('-e', '--end', help = u'end', type = str, default = '')
  234. @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
  235. @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
  236. @click.option('-c', '--check', help = u'check', type = str, default = 'no')
  237. def build_consume_stats(begin, end, limit, skip, check):
  238. logger.info('begin = %s; end = %s; limit = %d; skip = %s; check = %s' % (begin, end, limit, skip, check))
  239. _filter = None
  240. if begin:
  241. _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}})
  242. if end:
  243. if _filter:
  244. _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
  245. else:
  246. _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
  247. if not _filter:
  248. _filter = Q(__raw__ = {})
  249. check_done = False
  250. if check == 'yes':
  251. check_done = True
  252. while True:
  253. logger.info('skip = %s' % skip)
  254. records = []
  255. for record in ConsumeRecord.get_collection().find(spec = _filter.to_query(ConsumeRecord),
  256. timeout = False).sort('_id', 1).limit(limit).skip(skip):
  257. records.append(record)
  258. if len(records) == 0:
  259. break
  260. for record in records:
  261. if 'ownerId' not in record or not record.get('ownerId'):
  262. logger.debug('not find id or ownerId. %s' % record['_id'])
  263. continue
  264. auto_id = record.pop('_id')
  265. record.update({'id': ObjectId(auto_id)})
  266. record_consumption_stats(record = ConsumeRecord(**record), check = check_done, allowed = {'group': True})
  267. skip = skip + limit
  268. @cli.command()
  269. @click.option('-b', '--begin', help = u'begin', type = str, default = '')
  270. @click.option('-e', '--end', help = u'end', type = str, default = '')
  271. @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
  272. @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
  273. @click.option('-s', '--chunk_size', help = u'chunk size', type = int, default = 5000)
  274. def bulker_build_consume_stats(begin, end, limit, skip, chunk_size):
  275. logger.info('begin = %s; end = %s; limit = %d; skip = %s; chunk size = %s' % (begin, end, limit, skip, chunk_size))
  276. _filter = None
  277. if begin:
  278. _filter = Q(__raw__ = {'dateTimeAdded': {'$gte': datetime.datetime.strptime(begin, "%Y-%m-%d %H:%M:%S")}})
  279. if end:
  280. if _filter:
  281. _filter &= Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
  282. else:
  283. _filter = Q(__raw__ = {'dateTimeAdded': {'$lt': datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")}})
  284. if not _filter:
  285. _filter = Q(__raw__ = {})
  286. while True:
  287. logger.info('skip = %s' % skip)
  288. bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
  289. bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
  290. records = []
  291. for record in ConsumeRecord.get_collection().find(spec = _filter.to_query(ConsumeRecord),
  292. timeout = False).sort('_id', 1).limit(limit).skip(skip):
  293. records.append(record)
  294. if len(records) == 0:
  295. break
  296. for record in records:
  297. if 'ownerId' not in record or not record.get('ownerId'):
  298. logger.debug('not find id or ownerId. %s' % record['_id'])
  299. continue
  300. auto_id = record.pop('_id')
  301. record.update({'id': ObjectId(auto_id)})
  302. get_consumption_stats(ConsumeRecord(**record), bulkerGroupDailyStat, bulkerGroupMonthlyStat)
  303. if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
  304. result = bulkerGroupDailyStat.execute()
  305. assert bool(result['success']), 'error happened'
  306. bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
  307. if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
  308. result = bulkerGroupMonthlyStat.execute()
  309. assert bool(result['success']), 'error happened'
  310. bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
  311. if len(bulkerGroupDailyStat.bulk._BulkOperationBuilder__bulk.ops) > 0:
  312. result = bulkerGroupDailyStat.execute()
  313. assert bool(result['success']), 'error happened'
  314. bulkerGroupDailyStat = BulkHandler(GroupDailyStat.get_collection())
  315. if len(bulkerGroupMonthlyStat.bulk._BulkOperationBuilder__bulk.ops) > 0:
  316. result = bulkerGroupMonthlyStat.execute()
  317. assert bool(result['success']), 'error happened'
  318. bulkerGroupMonthlyStat = BulkHandler(GroupMonthlyStat.get_collection())
  319. skip = skip + limit
  320. @cli.command()
  321. @click.option('-s', '--chunk_size', help = u'bulk chunk size', type = int, default = 2500)
  322. @click.option('-l', '--limit', help = u'limit size', type = int, default = 10000)
  323. @click.option('-k', '--skip', help = u'skip count', type = int, default = 0)
  324. def generate_income_proxy(chunk_size, limit, skip):
  325. """
  326. 从以往的充值订单,卡充值记录,广告来生成收益代理
  327. :return:
  328. """
  329. logger.info('chunk_size = %d; limit = %d; skip = %s' % (chunk_size, limit, skip))
  330. DealerIncomeProxyCollection = DealerIncomeProxy.get_collection()
  331. income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection)
  332. def load_maps():
  333. logger.debug('loading dealerMap...')
  334. dealerMap = {
  335. str(dealer['_id']):
  336. {
  337. 'agentProfitShare': dealer.get('agentProfitShare', 0.0),
  338. 'agentId': dealer['agentId']
  339. }
  340. for dealer in Dealer.get_collection().find({}, {'agentProfitShare': 1, 'agentId': 1})
  341. }
  342. logger.debug('loading groupMap...')
  343. groupMap = {}
  344. for group in Group.get_collection().find({}, {'partnerList': 1, 'ownerId': 1}):
  345. partnerDict = {}
  346. partnerList = group.get('partnerDict', [])
  347. for partner in partnerList:
  348. partnerDict[partner['id']] = partner
  349. groupMap.update({
  350. str(group['_id']):
  351. {
  352. 'partnerDict': partnerDict,
  353. 'ownerId': group['ownerId']
  354. }
  355. })
  356. return dealerMap, groupMap
  357. # : insert rechargeRecords
  358. logger.info('loading recharge records')
  359. dealerMap, groupMap = load_maps()
  360. while True:
  361. logger.info('skip = %s' % skip)
  362. records = RechargeRecord.get_collection().find(
  363. {'via': {'$in': [USER_RECHARGE_TYPE.RECHARGE, USER_RECHARGE_TYPE.RECHARGE_CARD]}, 'result': 'success'},
  364. {'ownerId': 1, 'groupId': 1, 'logicalCode': 1, 'devNo': 1, 'via': 1, 'money': 1, 'dateTimeAdded': 1,
  365. 'time': 1},
  366. timeout = False).sort('_id', 1).limit(
  367. limit).skip(skip)
  368. if not records or records.count(True) == 0:
  369. break
  370. for record in records: # type: dict
  371. if record['_id'] and record.get('ownerId'):
  372. if ('time' not in record or not record['time']) and (
  373. 'dateTimeAdded' not in record or not record['dateTimeAdded']):
  374. logger.info('no time and dateTimeAdded; %s' % (record['_id'], ))
  375. continue
  376. if ('dateTimeAdded' not in record) or (not record['dateTimeAdded']):
  377. logger.info('no dateTimeAdded; %s' % (record['_id']))
  378. record['dateTimeAdded'] = datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S")
  379. else:
  380. if 'time' in record and record['time']:
  381. if record['dateTimeAdded'].strftime('%Y-%m-%d %H:%M:%S') != record['time']:
  382. logger.info('dateTimeAdded is not same with time; %s' % (record['_id']))
  383. record['dateTimeAdded'] = datetime.datetime.strptime(record['time'], "%Y-%m-%d %H:%M:%S")
  384. group = groupMap.get(record['groupId'])
  385. if not group:
  386. logger.info('skipping income_proxy (rechargeRecord id=%s) given group is None' % (record['_id']))
  387. continue
  388. partners = group['partnerDict'].values()
  389. dealer = dealerMap.get(record['ownerId'])
  390. if not dealer:
  391. logger.error('dealer not exist. dealer = %s' % record['ownerId'])
  392. continue
  393. partitionMap = Ledger.get_group_income_partition(dealer['agentId'], record['ownerId'],
  394. dealer['agentProfitShare'], group, record['money'])
  395. doc = {
  396. 'ref_id': ObjectId(record['_id']),
  397. 'dealerIds': [ObjectId(record['ownerId'])] + [ObjectId(partner['id']) for partner in partners],
  398. 'partition': list(flatten(partitionMap.values())),
  399. 'groupId': ObjectId(record['groupId']),
  400. 'logicalCode': record['logicalCode'],
  401. 'title': generate_recharge_title(devType = record.get('devType', ''),
  402. logicalCode = record['logicalCode']),
  403. 'source': record['via'],
  404. 'totalAmount': record['money'],
  405. 'actualAmountMap': {
  406. dealer['id']: dealer['money'] for dealer in partitionMap['owner'] + partitionMap['partner']
  407. },
  408. 'dateTimeAdded': record['dateTimeAdded'],
  409. 'date': record['dateTimeAdded'].strftime(Const.DATE_FMT),
  410. 'tags': '2018-10-01',
  411. 'desc': u'升级'
  412. }
  413. income_proxy_bulk.upsert({'ref_id': ObjectId(record['_id'])}, {'$set': doc})
  414. if len(income_proxy_bulk.bulk._BulkOperationBuilder__bulk.ops) >= chunk_size:
  415. result = income_proxy_bulk.execute()
  416. assert bool(result['success']), 'error happened'
  417. income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection)
  418. if len(income_proxy_bulk.bulk._BulkOperationBuilder__bulk.ops) > 0:
  419. result = income_proxy_bulk.execute()
  420. assert bool(result['success']), 'error happened'
  421. income_proxy_bulk = BulkHandler(DealerIncomeProxyCollection)
  422. skip = skip + limit
  423. @cli.command()
  424. def post_build_consume_stats():
  425. records = ConsumeRecord.objects().filter(
  426. dateTimeAdded__gte = datetime.datetime.strptime('2018-09-27 16:47:48', "%Y-%m-%d %H:%M:%S"))
  427. for record in records:
  428. if not record.ownerId:
  429. logger.info('no dealer id. %s' % str(record.id))
  430. record_consumption_stats(record, check = True)
  431. @cli.command()
  432. def post_generate_income_proxy():
  433. """
  434. 从以往的充值订单,卡充值记录,广告来生成收益代理
  435. :return:
  436. """
  437. def load_maps():
  438. logger.debug('loading dealerMap...')
  439. dealerMap = {
  440. str(dealer['_id']):
  441. {
  442. 'agentProfitShare': dealer.get('agentProfitShare', 0.0),
  443. 'agentId': dealer['agentId']
  444. }
  445. for dealer in Dealer.get_collection().find({}, {'agentProfitShare': 1, 'agentId': 1})
  446. }
  447. logger.debug('loading groupMap...')
  448. groupMap = {}
  449. for group in Group.get_collection().find({}, {'partnerList': 1, 'ownerId': 1}):
  450. partnerDict = {}
  451. partnerList = group.get('partnerDict', [])
  452. for partner in partnerList:
  453. partnerDict[partner['id']] = partner
  454. groupMap.update({
  455. str(group['_id']):
  456. {
  457. 'partnerDict': partnerDict,
  458. 'ownerId': group['ownerId']
  459. }
  460. })
  461. return dealerMap, groupMap
  462. # : insert rechargeRecords
  463. logger.debug('loading recharge records')
  464. dealerMap, groupMap = load_maps()
  465. records = []
  466. for record in RechargeRecord.get_collection().find(
  467. dateTimeAdded__gte = datetime.datetime.strptime('2018-09-27 00:00:00', "%Y-%m-%d %H:%M:%S")): # type: dict
  468. auto_id = record.pop('_id')
  469. record.update({'id': ObjectId(auto_id)})
  470. records.append(record)
  471. for record in records: # type: RechargeRecord
  472. logger.debug('to do record is %s' % record.to_dict())
  473. if not record.ownerId:
  474. logger.info('no dealer id')
  475. group = groupMap.get(record.groupId)
  476. if not group:
  477. logger.info('skipping income_proxy (rechargeRecord id=%s) given group is None' % (str(record.id)))
  478. continue
  479. dealer = dealerMap.get(record.ownerId)
  480. if not dealer:
  481. logger.error('dealer not exist. dealer = %s' % record.ownerId)
  482. continue
  483. partitionMap = Ledger.get_group_income_partition(dealer['agentId'], record.ownerId,
  484. dealer['agentProfitShare'], group, float(record.money))
  485. proxy = record_income_proxy(record.via, record, partitionMap, record.dateTimeAdded)
  486. record_income_stats(proxy, True)
  487. def validate_stats_are_correct():
  488. pass
  489. def validate_income_proxies_are_correct():
  490. pass
  491. if __name__ == '__main__':
  492. cli()
  493. def get_consumption_stats(record, bulkerGroupDailyStat, bulkerGroupMonthlyStat):
  494. # type:(ConsumeRecord)->None
  495. """
  496. 记录消费
  497. :param record:
  498. :param bulkerGroupDailyStat:
  499. :param bulkerGroupMonthlyStat:
  500. :return:
  501. """
  502. try:
  503. dt = record.dateTimeAdded
  504. day = dt.day
  505. hour = dt.hour
  506. report_monthly_date = dt.strftime('%Y-%m')
  507. report_daily_date = dt.strftime('%Y-%m-%d')
  508. def build_query(initial, extra):
  509. # type: (dict, dict)->dict
  510. rv = initial.copy()
  511. rv.update(extra)
  512. return rv
  513. build_daily_query = partial(build_query, {'date': report_daily_date})
  514. build_monthly_query = partial(build_query, {'date': report_monthly_date})
  515. def build_daily_update(kind_amount_map):
  516. kam = kind_amount_map.copy()
  517. kam.setdefault('coin', record.coin)
  518. rv = defaultdict(dict)
  519. try:
  520. for kind, amount in kam.iteritems():
  521. rv['$inc']['daily.consumption.{kind}'.format(kind=kind)] = amount
  522. rv['$inc']['hourly.{hour}.consumption.{kind}'.format(hour=hour, kind=kind)] = amount
  523. rv.update(
  524. {
  525. '$addToSet': {'origin.consumption': record.id},
  526. }
  527. )
  528. except Exception as e:
  529. logger.error(str(kam))
  530. raise e
  531. return rv
  532. def build_monthly_update(kind_amount_map):
  533. kam = kind_amount_map.copy()
  534. kam.setdefault('coin', record.coin)
  535. rv = defaultdict(dict)
  536. for kind, amount in kam.iteritems():
  537. rv['$inc']['daily.{day}.consumption.{kind}'.format(day=day, kind=kind)] = amount
  538. rv['$inc']['monthly.consumption.{kind}'.format(kind=kind)] = amount
  539. rv.update(
  540. {
  541. '$addToSet': {'origin.consumption': record.id},
  542. }
  543. )
  544. return rv
  545. #: group
  546. group_daily_query = build_daily_query({'groupId': ObjectId(record.groupId)})
  547. group_monthly_query = build_monthly_query({'groupId': ObjectId(record.groupId)})
  548. group_daily_update = build_daily_update(record.aggInfo)
  549. group_monthly_update = build_monthly_update(record.aggInfo)
  550. bulkerGroupDailyStat.upsert(group_daily_query, group_daily_update)
  551. bulkerGroupMonthlyStat.upsert(group_monthly_query, group_monthly_update)
  552. except Exception as e:
  553. logger.exception(e)
  554. def get_income_stats(proxy, bulkerGroupDailyStat, bulkerGroupMonthlyStat):
  555. # type:(DealerIncomeProxy)->None
  556. """
  557. 升级工具用
  558. :param proxy:
  559. :param bulkerGroupDailyStat:
  560. :param bulkerGroupMonthlyStat:
  561. :return:
  562. """
  563. try:
  564. dt = proxy.dateTimeAdded
  565. day = dt.day
  566. hour = dt.hour
  567. report_monthly_date = dt.strftime('%Y-%m')
  568. report_daily_date = dt.strftime('%Y-%m-%d')
  569. def build_query(initial, extra):
  570. rv = initial.copy()
  571. rv.update(extra)
  572. return rv
  573. build_daily_query = partial(build_query, {'date': report_daily_date})
  574. build_monthly_query = partial(build_query, {'date': report_monthly_date})
  575. def build_daily_update(amount):
  576. return {
  577. '$inc': {
  578. 'daily.income.{source}'.format(source = proxy.source): amount,
  579. 'hourly.{hour}.income.{source}'.format(hour = hour, source = proxy.source): amount,
  580. 'daily.totalIncome': amount,
  581. },
  582. '$addToSet': {'origin.income': proxy.id}
  583. }
  584. def build_monthly_update(amount):
  585. return {
  586. '$inc': {
  587. 'daily.{day}.income.{source}'.format(day = day, source = proxy.source): amount,
  588. 'daily.{day}.totalIncome'.format(day = day): amount,
  589. 'monthly.income.{source}'.format(source = proxy.source, ): amount,
  590. 'monthly.totalIncome': amount,
  591. },
  592. '$addToSet': {'origin.income': proxy.id}
  593. }
  594. # : group
  595. group_daily_query = build_daily_query({'groupId': proxy.groupId})
  596. group_monthly_query = build_monthly_query({'groupId': proxy.groupId})
  597. group_daily_update = build_daily_update(proxy.totalAmount)
  598. group_monthly_update = build_monthly_update(proxy.totalAmount)
  599. bulkerGroupDailyStat.upsert(group_daily_query, group_daily_update)
  600. bulkerGroupMonthlyStat.upsert(group_monthly_query, group_monthly_update)
  601. except Exception as e:
  602. logger.exception(e)