make_rpt_into_db.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. 生成报表
  5. """
  6. import datetime
  7. import re
  8. import sys
  9. from base import init_env, get_logger
  10. logger = get_logger(__name__)
  11. import os
  12. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.testing')
  13. init_env(interactive = False)
  14. from apilib.utils_mongo import BulkHandlerEx
  15. from apps import reportCache
  16. from apps.web.report.models import DevReport, GroupReport, DealerReport
  17. from apps.web.device.models import Group, Device, OfflineReportDealers, GroupDict
  18. from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl
  19. groupid_from_key = lambda groupKey: groupKey.split('_')[1]
  20. dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1]
  21. def check_offline_coins(dev_in_db, dealerDict, parterDict, stringDate, update = False):
  22. dealerCoinDict = {}
  23. groupCoinDict = {}
  24. for ownerId, groupIds in dealerDict.items():
  25. ownerCoins = 0
  26. for groupId in list(set(groupIds)):
  27. devNos = Device.get_devNos_by_group([groupId])
  28. keys = [devCoinTmpl(devNo, stringDate) for devNo in devNos]
  29. devCacheDict = reportCache.get_many(keys)
  30. for devNo in devNos:
  31. dbValue = int(dev_in_db.get(devNo, 0))
  32. if devCoinTmpl(devNo, stringDate) in devCacheDict:
  33. cacheValue = int(devCacheDict[devCoinTmpl(devNo, stringDate)])
  34. if dbValue != cacheValue:
  35. print '{} db not equal cache {}. update db.'.format(devNo, cacheValue)
  36. DevReport.get_collection().update_one({
  37. 'devNo': devNo,
  38. 'type': 'day',
  39. 'date': stringDate
  40. }, {
  41. '$set': {
  42. 'devNo': devNo,
  43. 'type': 'day',
  44. 'date': stringDate,
  45. 'rpt.lineCoins': cacheValue
  46. }
  47. }, upsert = True)
  48. else:
  49. pass
  50. # print '{} db<{}> equal cache <{}>.'.format(devNo, dbValue, cacheValue)
  51. else:
  52. devCacheDict[devCoinTmpl(devNo, stringDate)] = dbValue
  53. listCoins = [int(coins) for coins in devCacheDict.values()]
  54. allDevCoinsByGroup = sum(listCoins)
  55. oldValue = reportCache.get(groupCoinTmpl(groupId, stringDate))
  56. if not oldValue:
  57. oldValue = 0
  58. else:
  59. oldValue = int(oldValue)
  60. groupCoinDict[groupCoinTmpl(groupId, stringDate)] = allDevCoinsByGroup
  61. if update:
  62. reportCache.set(groupCoinTmpl(groupId, stringDate), str(int(allDevCoinsByGroup)))
  63. if oldValue != allDevCoinsByGroup:
  64. logger.info(
  65. 'not equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % (
  66. ownerId, groupId, oldValue, allDevCoinsByGroup))
  67. else:
  68. pass
  69. # logger.info(
  70. # 'equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % (
  71. # ownerId, groupId, oldValue, allDevCoinsByGroup))
  72. ownerCoins = ownerCoins + int(allDevCoinsByGroup)
  73. dealerCoinDict[ownerCoinTmpl(ownerId, stringDate)] = ownerCoins
  74. for partnerId, groupIds in parterDict.items():
  75. partnerCoinsByGroup = 0
  76. for groupId in list(set(groupIds)):
  77. if groupCoinTmpl(groupId, stringDate) in groupCoinDict:
  78. groupCoin = groupCoinDict[groupCoinTmpl(groupId, stringDate)]
  79. else:
  80. groupCoin = 0
  81. logger.info(
  82. 'allocated as partner. partnerId = %s; groupId = %s; coin = %s' % (partnerId, groupId, groupCoin))
  83. partnerCoinsByGroup = partnerCoinsByGroup + int(groupCoin)
  84. if ownerCoinTmpl(partnerId, stringDate) in dealerCoinDict:
  85. dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = dealerCoinDict[
  86. ownerCoinTmpl(partnerId,
  87. stringDate)] + partnerCoinsByGroup
  88. else:
  89. dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = partnerCoinsByGroup
  90. for dealerKey, coin in dealerCoinDict.items():
  91. logger.info('dealer key is: {}'.format(dealerKey))
  92. oldValue = reportCache.get(dealerKey)
  93. if not oldValue:
  94. oldValue = 0
  95. else:
  96. oldValue = int(oldValue)
  97. if update:
  98. reportCache.set(dealerKey, str(int(coin)))
  99. if oldValue != coin:
  100. logger.info(
  101. 'not equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % (
  102. dealerKey, oldValue, coin))
  103. else:
  104. pass
  105. # logger.info(
  106. # 'equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % (
  107. # dealerKey, oldValue, coin))
  108. return groupCoinDict, dealerCoinDict
  109. def report_into_db(groupCoinDict, dealerCoinDict, stringDate):
  110. start_time = datetime.datetime.now()
  111. logger.info('generating report for date (%s)' % (stringDate,))
  112. bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx
  113. for groupKey, coins in groupCoinDict.items():
  114. groupId = groupid_from_key(groupKey)
  115. bulker.upsert(query_dict = {
  116. 'groupId': groupId,
  117. 'type': 'day',
  118. 'date': stringDate
  119. }, update_dict = {
  120. '$set': {
  121. 'groupId': groupId,
  122. 'type': 'day',
  123. 'date': stringDate,
  124. 'rpt.lineCoins': int(coins)
  125. }
  126. })
  127. if len(bulker.requests) >= 2000:
  128. bulker.execute()
  129. bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx
  130. if len(bulker.requests) > 0:
  131. bulker.execute()
  132. bulker = None
  133. bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx
  134. for dealerKey, coins in dealerCoinDict.iteritems():
  135. dealerId = dealerid_from_key(dealerKey)
  136. bulker.upsert(query_dict = {
  137. 'ownerId': dealerId,
  138. 'type': 'day',
  139. 'date': stringDate
  140. }, update_dict = {
  141. '$set': {
  142. 'ownerId': dealerId,
  143. 'type': 'day',
  144. 'date': stringDate,
  145. 'rpt.lineCoins': int(coins)
  146. }
  147. })
  148. if len(bulker.requests) >= 2000:
  149. bulker.execute()
  150. bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx
  151. if len(bulker.requests) > 0:
  152. bulker.execute()
  153. bulker = None
  154. logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - start_time,))
  155. def get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId):
  156. if dealerId in doneDealers:
  157. # logger.debug('{} has done.'.format(dealerId))
  158. return
  159. groupIds = Group.get_group_ids_of_dealer(dealerId)
  160. if groupIds:
  161. dealerDict[dealerId] = groupIds
  162. doneDealers.append(dealerId)
  163. dealerGroups = Group.get_groups_by_group_ids(groupIds).values()
  164. for dealerGroup in dealerGroups: # type: GroupDict
  165. groupId = dealerGroup.groupId
  166. parters = dealerGroup.partners()
  167. for parter in parters:
  168. parter_id = parter['id']
  169. parterDict[parter_id] = parterDict.get(parter_id) or set()
  170. parterDict[parter_id].add(groupId)
  171. get_owner_or_parter(dealerDict, doneDealers, parterDict, parter_id)
  172. parterGroupIds = Group.get_group_ids_of_partner(dealerId)
  173. parterGroups = Group.get_groups_by_group_ids(parterGroupIds).values()
  174. for parterGroup in parterGroups: # type: GroupDict
  175. groupId = parterGroup.groupId
  176. parterDict[dealerId] = parterDict.get(dealerId) or set()
  177. parterDict[dealerId].add(groupId)
  178. get_owner_or_parter(dealerDict, doneDealers, parterDict, parterGroup.ownerId)
  179. def main(dateFmtStr):
  180. if dateFmtStr:
  181. reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d")
  182. else:
  183. reportDate = datetime.datetime.now() - datetime.timedelta(days = 1)
  184. startTime = datetime.datetime.now()
  185. if len(sys.argv) >= 2:
  186. dealer_id_list = [str(sys.argv[1])]
  187. else:
  188. dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d"))
  189. dealerDict = {}
  190. parterDict = {}
  191. doneDealers = []
  192. for dealerId in dealer_id_list:
  193. logger.info('fetch group map for dealer<id={}>'.format(dealerId))
  194. get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId)
  195. for dealerId, groupIds in dealerDict.iteritems():
  196. print 'owner. {} = {}'.format(dealerId, groupIds)
  197. for dealerId, groupIds in parterDict.iteritems():
  198. print 'parter. {} = {}'.format(dealerId, groupIds)
  199. logger.info('fetch device map for dealer<id={}>'.format(dealerId))
  200. devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealerDict.items()}
  201. devNos = []
  202. for items in devDict.values():
  203. devNos.extend(items)
  204. dev_in_db = {}
  205. for item in DevReport.get_collection().find(
  206. {'devNo': {'$in': devNos}, 'type': 'day', 'date': reportDate.strftime("%Y-%m-%d")},
  207. {'devNo': 1, 'rpt': 1}):
  208. if 'rpt' in item and 'lineCoins' in item['rpt']:
  209. dev_in_db[item['devNo']] = item['rpt']['lineCoins']
  210. # for devNo, coins in dev_in_db.iteritems():
  211. # print '{} has coins {} in db.'.format(devNo, coins)
  212. groupCoinDict, dealerCoinDict = check_offline_coins(dev_in_db, dealerDict, parterDict,
  213. reportDate.strftime("%Y-%m-%d"), update = False)
  214. for groupKey, coins in groupCoinDict.iteritems():
  215. logger.info('group {} has coins {}'.format(groupid_from_key(groupKey), coins))
  216. for dealerKey, coins in dealerCoinDict.iteritems():
  217. logger.info('dealer {} has coins {}'.format(dealerid_from_key(dealerKey), coins))
  218. report_into_db(groupCoinDict, dealerCoinDict, reportDate.strftime("%Y-%m-%d"))
  219. logger.debug('all steps cost {}'.format(datetime.datetime.now() - startTime))
  220. if __name__ == '__main__':
  221. dateFmtStr = None
  222. index = 1
  223. regex = re.compile(r'\d{4}-\d{2}-\d{2}')
  224. for arg in sys.argv[1:]:
  225. m = regex.search(arg)
  226. if m and m.group():
  227. dateFmtStr = arg
  228. sys.argv.pop(index)
  229. break
  230. index += 1
  231. main(dateFmtStr)