# -*- coding: utf-8 -*- # !/usr/bin/env python """ """ import os import sys from bson import json_util as json #: current_dir - 2 PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..') sys.path.insert(0, PROJECT_ROOT) from script.base import init_env, setup_logger, measure_time filename = sys.argv[0] namespace = __name__ logger = setup_logger(filename=filename, namespace=namespace) init_env(interactive=True) from pymongo.collection import Collection from mongoengine.queryset import QuerySet from mongoengine.errors import NotUniqueError from apilib.utils_mongo import BulkHandler from apps.web.ad.models import AdRecord from apps.web.ad.utils import AdRecordProxy, timestamp_to_dt from apps.web.user.models import MyUser from apps.web.constant import Const adRecordCollection = AdRecord.get_collection() # type: Collection adRecordBulkHandler = BulkHandler(adRecordCollection) sex_map = {u'男': Const.USER_SEX.MALE, u'女': Const.USER_SEX.FEMALE, '-': Const.USER_SEX.UNKNOWN} is_male = lambda _ : int(_ == Const.USER_SEX.MALE) is_female = lambda _ : int(_ == Const.USER_SEX.FEMALE) @measure_time(logger) def add_date_to_ad_record(): # adRecordBulkHandler.update({}) adRecordCollection.aggregate([ { "$addFields": { "createdDate": { "$dateToString": { "format": "%Y-%m-%d", "date": "$dateTimeAdded" }} } }, { "$out": "ad_records" } ]) def aggregate(): AdRecord.get_collection().aggregate([{'$group': {'_id': '$createdDate', 'clicks': {'$sum': '$clicks'}}}]) def clean(data): dateTimeAdded = timestamp_to_dt(float(data.pop('timestamp'))) def to_int(raw): try: return int(float(raw)) except Exception, e: logger.exception('error happened, e=%s, raw=%s' % (e, raw)) data['agentPrice'] = to_int(data['agentPrice']) data['dealerPrice'] = to_int(data['dealerPrice']) data['price'] = to_int(data['price']) data['dateTimeAdded'] = dateTimeAdded data['createdDate'] = dateTimeAdded.strftime(Const.DATE_FMT) data['remark'] = data.get('remark', '') or 'moved from redis' data['converted'] = bool(data['converted']) data['features'] = {} data['rewarded'] = bool(data['converted']) if 'sex' in data: data['sex'] = sex_map.get(data['sex'], 0) data['features']['sex'] = {'is_male': is_male(data['sex']), 'is_female': is_female(data['sex'])} else: user = MyUser.objects(openId=data['openId'], groupId=data['groupId']).first() if user: data['sex'] = user.sex maleFans = int(user.is_male) femaleFans = int(user.is_female) data['features']['sex'] = {'is_male': maleFans, 'is_female': femaleFans} else: data['sex'] = 0 data['features']['sex'] = {'is_male': 0, 'is_female': 0} return data def insert(to_be_added): queryset = AdRecord.objects() # type: QuerySet result = queryset.insert(to_be_added) logger.info('bulk insert finished, result=%s' % (result,)) @measure_time(logger) def move_all_adRecordProxies_to_adRecord(): with open('output.json', 'w') as f: records = ( AdRecord(**clean(_.to_dict())) for _ in AdRecordProxy.iall()) for record in records: if AdRecord.objects(openId=record.openId, adId=record.adId).count() > 0: logger.info('{0!r} skipped'.format(record)) continue logger.info('{0!r} written'.format(record)) f.write(record.to_json() + '\n') try: record.save() except NotUniqueError: logger.error('{0!r} already exists'.format(record)) logger.info('{0!r} saved'.format(record)) @measure_time def modify_old_data(): """将老数据整理更新""" def test(): _ = "user:oeia6wdy9WJwhLvHpD4Fbvhg-2oo:adId:100172" sample_redis_data = { "dealerPrice":"0.0", "sex": u"\u7537", "adName": u"\u4e01-\u9ad8\u6cb3\u6897\u738b", "agentName": u"\u90ed\u53ec\u65b9", "agentPrice": "0.0", "agentId": "5abdebc68732d665bd804e66", "openId": "oeia6wdy9WJwhLvHpD4Fbvhg-2oo", "dealerName": u"\u8d75\u6676\u6676", "logicalCode": "11733", "groupName": u"\u5546\u573a", "adId": "100172", "clicks": "1", "shows": "1", "timestamp": "1531549445.161992", "price":"0.0", "address": u"\u8fd0\u57ce\u5e02\u95fb\u559c\u53bf\u897f\u6e56\u751f\u6d3b\u5e7f\u573a\u5165\u53e3", "nickname": u"\u97e9\u5c0f\u519b", "groupId": u"5b07f7468732d6395cd1f72f", "remark": "", "dealerId": "5b060de18732d672a93061ad", "devNo": "868575026226557", "devType": u"\u7eb8\u5dfe\u673a", "converted": "1", "advertiserId": "", "managerId": "5abc5f5c4864d0265c654cb0" } insert(sample_redis_data) def main(): move_all_adRecordProxies_to_adRecord() #add_date_to_ad_record() #test() if __name__ == '__main__': main()