upgrade_adRecord_20180723.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """
  4. """
  5. import os
  6. import sys
  7. from bson import json_util as json
  8. #: current_dir - 2
  9. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  10. sys.path.insert(0, PROJECT_ROOT)
  11. from script.base import init_env, setup_logger, measure_time
  12. filename = sys.argv[0]
  13. namespace = __name__
  14. logger = setup_logger(filename=filename, namespace=namespace)
  15. init_env(interactive=True)
  16. from pymongo.collection import Collection
  17. from mongoengine.queryset import QuerySet
  18. from mongoengine.errors import NotUniqueError
  19. from apilib.utils_mongo import BulkHandler
  20. from apps.web.ad.models import AdRecord
  21. from apps.web.ad.utils import AdRecordProxy, timestamp_to_dt
  22. from apps.web.user.models import MyUser
  23. from apps.web.constant import Const
  24. adRecordCollection = AdRecord.get_collection() # type: Collection
  25. adRecordBulkHandler = BulkHandler(adRecordCollection)
  26. sex_map = {u'男': Const.USER_SEX.MALE, u'女': Const.USER_SEX.FEMALE, '-': Const.USER_SEX.UNKNOWN}
  27. is_male = lambda _ : int(_ == Const.USER_SEX.MALE)
  28. is_female = lambda _ : int(_ == Const.USER_SEX.FEMALE)
  29. @measure_time(logger)
  30. def add_date_to_ad_record():
  31. # adRecordBulkHandler.update({})
  32. adRecordCollection.aggregate([
  33. { "$addFields":
  34. {
  35. "createdDate": { "$dateToString": { "format": "%Y-%m-%d", "date": "$dateTimeAdded" }}
  36. }
  37. },
  38. { "$out": "ad_records" }
  39. ])
  40. def aggregate():
  41. AdRecord.get_collection().aggregate([{'$group': {'_id': '$createdDate', 'clicks': {'$sum': '$clicks'}}}])
  42. def clean(data):
  43. dateTimeAdded = timestamp_to_dt(float(data.pop('timestamp')))
  44. def to_int(raw):
  45. try:
  46. return int(float(raw))
  47. except Exception, e:
  48. logger.exception('error happened, e=%s, raw=%s' % (e, raw))
  49. data['agentPrice'] = to_int(data['agentPrice'])
  50. data['dealerPrice'] = to_int(data['dealerPrice'])
  51. data['price'] = to_int(data['price'])
  52. data['dateTimeAdded'] = dateTimeAdded
  53. data['createdDate'] = dateTimeAdded.strftime(Const.DATE_FMT)
  54. data['remark'] = data.get('remark', '') or 'moved from redis'
  55. data['converted'] = bool(data['converted'])
  56. data['features'] = {}
  57. data['rewarded'] = bool(data['converted'])
  58. if 'sex' in data:
  59. data['sex'] = sex_map.get(data['sex'], 0)
  60. data['features']['sex'] = {'is_male': is_male(data['sex']), 'is_female': is_female(data['sex'])}
  61. else:
  62. user = MyUser.objects(openId=data['openId'], groupId=data['groupId']).first()
  63. if user:
  64. data['sex'] = user.sex
  65. maleFans = int(user.is_male)
  66. femaleFans = int(user.is_female)
  67. data['features']['sex'] = {'is_male': maleFans, 'is_female': femaleFans}
  68. else:
  69. data['sex'] = 0
  70. data['features']['sex'] = {'is_male': 0, 'is_female': 0}
  71. return data
  72. def insert(to_be_added):
  73. queryset = AdRecord.objects() # type: QuerySet
  74. result = queryset.insert(to_be_added)
  75. logger.info('bulk insert finished, result=%s' % (result,))
  76. @measure_time(logger)
  77. def move_all_adRecordProxies_to_adRecord():
  78. with open('output.json', 'w') as f:
  79. records = ( AdRecord(**clean(_.to_dict())) for _ in AdRecordProxy.iall())
  80. for record in records:
  81. if AdRecord.objects(openId=record.openId, adId=record.adId).count() > 0:
  82. logger.info('{0!r} skipped'.format(record))
  83. continue
  84. logger.info('{0!r} written'.format(record))
  85. f.write(record.to_json() + '\n')
  86. try:
  87. record.save()
  88. except NotUniqueError:
  89. logger.error('{0!r} already exists'.format(record))
  90. logger.info('{0!r} saved'.format(record))
  91. @measure_time
  92. def modify_old_data():
  93. """将老数据整理更新"""
  94. def test():
  95. _ = "user:oeia6wdy9WJwhLvHpD4Fbvhg-2oo:adId:100172"
  96. sample_redis_data = {
  97. "dealerPrice":"0.0",
  98. "sex": u"\u7537",
  99. "adName": u"\u4e01-\u9ad8\u6cb3\u6897\u738b",
  100. "agentName": u"\u90ed\u53ec\u65b9",
  101. "agentPrice": "0.0",
  102. "agentId": "5abdebc68732d665bd804e66",
  103. "openId": "oeia6wdy9WJwhLvHpD4Fbvhg-2oo",
  104. "dealerName": u"\u8d75\u6676\u6676",
  105. "logicalCode": "11733",
  106. "groupName": u"\u5546\u573a",
  107. "adId": "100172",
  108. "clicks": "1",
  109. "shows": "1",
  110. "timestamp": "1531549445.161992",
  111. "price":"0.0",
  112. "address": u"\u8fd0\u57ce\u5e02\u95fb\u559c\u53bf\u897f\u6e56\u751f\u6d3b\u5e7f\u573a\u5165\u53e3",
  113. "nickname": u"\u97e9\u5c0f\u519b",
  114. "groupId": u"5b07f7468732d6395cd1f72f",
  115. "remark": "",
  116. "dealerId": "5b060de18732d672a93061ad",
  117. "devNo": "868575026226557",
  118. "devType": u"\u7eb8\u5dfe\u673a",
  119. "converted": "1",
  120. "advertiserId": "",
  121. "managerId": "5abc5f5c4864d0265c654cb0"
  122. }
  123. insert(sample_redis_data)
  124. def main():
  125. move_all_adRecordProxies_to_adRecord()
  126. #add_date_to_ad_record()
  127. #test()
  128. if __name__ == '__main__':
  129. main()