# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import time from celery.utils.log import get_task_logger from mongoengine import DoesNotExist from apilib.utils_mongo import BulkHandlerEx from apps.web.constant import Const from apps.web.dealer.models import Dealer from apps.web.dealer.utils import RentOrderServer from apps.web.device.models import Device, Group, DeviceDict, FaultRecord, DeviceRentOrder from apps.web.device.timescale import OfflineManager from apps.web.helpers import get_wechat_manager_mp_proxy from apps.web.south_intf.liangxi_fire import ToXiaoFang from apps.web.user.models import ServiceProgress logger = get_task_logger(__name__) def remove_serviceProgress_periodically(): threeDaysAgo = datetime.datetime.now() - datetime.timedelta(days = 3) threeDaysAgo = int(time.mktime(threeDaysAgo.timetuple())) threeMonthAgo = datetime.datetime.now() - datetime.timedelta(days = 91) threeMonthAgo = int(time.mktime(threeMonthAgo.timetuple())) start_time = int(time.time()) while True: now_time = int(time.time()) if now_time - start_time > 2 * 60 * 60: logger.debug('time is over. wait next.') return items = ServiceProgress.get_collection().find( { '$or': [{'finished_time': {'$lt': threeDaysAgo}}, {'isFinished': True}], 'devTypeCode': { '$ne': Const.DEVICE_TYPE_CODE_HP_GATE }}, {'_id': 1}).limit(2000) bulker = BulkHandlerEx(ServiceProgress.get_collection()) for item in items: bulker.delete(query_dict = {'_id': item['_id']}) count = len(bulker.requests) if count > 0: logger.debug('prepare to delete {} rows.'.format(count)) bulker.execute() bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx if count < 2000: break while True: now_time = int(time.time()) if now_time - start_time > 2 * 60 * 60: logger.debug('time is over. wait next.') return items = ServiceProgress.get_collection().find( { '$or': [ {'finished_time': {'$lt': threeMonthAgo}}, {'isFinished': True} ], 'finished_time': {'$lt': threeDaysAgo}, 'devTypeCode': Const.DEVICE_TYPE_CODE_HP_GATE }, {'_id': 1}).limit(2000) bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx for item in items: bulker.delete(query_dict = {'_id': item['_id']}) count = len(bulker.requests) if count > 0: logger.debug('prepare to delete {} rows.'.format(count)) bulker.execute() bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx if count < 2000: break logger.debug('delete all over.') def send_to_xf_all_dev_info(): logger.info('now to send_to_xf_all') # 找到梁希区的所有设备 groups = Group.objects.filter(districtId = '200275') groupIds = [str(_.id) for _ in groups] for dev in Device.objects.filter(groupId__in = groupIds).only(Device.devNo.name, Device.groupId.name): dev = Device.get_dev(dev.devNo) ToXiaoFang(dev).send_to_xf_ini() def send_to_xf_falut(devNo, faultId): """ :param devNo: :param faultId: :return: """ logger.info("device {} send to liangxi xiaofang fault = {}".format(devNo, faultId)) faultRecord = FaultRecord.objects.get(id = faultId) dev = Device.get_dev(devNo) to_xiao_fang = ToXiaoFang(dev) to_xiao_fang.send_to_xf_fault(faultRecord) def send_to_xf_fault_handle(devNo, faultId): """ :param devNo: :param faultId: :return: """ logger.info("device {} send to liangxi xiaofang fault handle, fault = {}".format(devNo, faultId)) faultRecord = FaultRecord.objects.get(id = faultId) dev = Device.get_dev(devNo) to_xiao_fang = ToXiaoFang(dev) to_xiao_fang.send_to_xf_handle(faultRecord) # 检查设备,以服务器为中心,如果两边数据不一致,就下发配置。 # 注意,这里是每几分钟产生一条任务,所以,每台设备执行完后,需要重新从数据库中检查 # 这里为了答标,不考虑重入,如果拿到标了,需要最终做到设备上 def set_device_deactive_for_langxin(): from apps.web.core.helpers import ActionDeviceBuilder def isBetweenTime(startTime, endTime, thisTime): if endTime >= startTime: return thisTime >= startTime and thisTime <= endTime else: if thisTime >= startTime and thisTime <= '23:59': return True elif thisTime >= '00:00' and thisTime <= endTime: return True else: return False devObjs = Device.get_collection().find({'otherConf.supportForbiddenInNight': 1}) # 先检查 nowDateTime = datetime.datetime.now() nowTime = '%s:%s' % (nowDateTime.hour, nowDateTime.minute) for obj in devObjs: otherConf = obj['otherConf'] if otherConf['forbiddenInNight']: # 禁用期内,如果已经禁用了,直接禁用 if isBetweenTime(otherConf['startTime'], otherConf['endTime'], nowTime): if otherConf.get('curCoinStatus', '') == 'forbidden' and otherConf.get('curCardStatus', '') == 'forbidden': continue else: try: dev = Device.get_dev(obj['devNo']) box = ActionDeviceBuilder.create_action_device(dev) box.set_coin_card_enable({'putCoins': '00', 'icCard': '00'}) otherConf['curCoinStatus'] = '00' otherConf['curCardStatus'] = '00' obj.save() except Exception, e: continue else: if otherConf['putCoins'] != otherConf['curCoinStatus'] or otherConf['icCard'] != otherConf[ 'curCardStatus']: try: dev = Device.get_dev(obj['devNo']) box = ActionDeviceBuilder.create_action_device(dev) box.set_coin_card_enable({'putCoins': otherConf['putCoins'], 'icCard': otherConf['icCard']}) otherConf['curCoinStatus'] = otherConf['putCoins'] otherConf['curCardStatus'] = otherConf['icCard'] obj.save() except Exception, e: continue def turn_on_power_huan_dian_gui(devNo, port): from apps.web.core.helpers import ActionDeviceBuilder dev = Device.get_dev(devNo) box = ActionDeviceBuilder.create_action_device(dev) box._turn_on_power(port) def device_offline_notify(devNo): """ 设备离线通知经销商 :param devNo: :return: """ # 不管成功与否 释放cache OfflineManager.delete_cache(devNo) dev = Device.get_dev(devNo) # type: DeviceDict # 参数检查 if not dev: logger.error("dev is not registe to databse, devNo is <{}>".format(devNo)) return if dev.online: logger.info("dev is online, devNo is <{}>".format(devNo)) return try: dealer = Dealer.objects.get(id = dev.ownerId) except DoesNotExist: logger.error("dealer is not registe to databse, devNo is <{}>, dealer is <{}>".format(devNo, dev.ownerId)) return # 对经销商发起通知 try: wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer) wechat_mp_proxy.notify( dealer.managerialOpenId or "", "abnormal_device_offline", title = u"您当前有设备离线,请尽快排查原因", device = dev.logicalCode, notifyTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) except Exception as e: logger.exception( "notify to dealer offline device faild, devNo is <{}>, dealer is <{}>\n, error is {}".format(devNo, dev.ownerId, e)) def gen_daily_rent_order(): """ 生成每日的 日租订单 在此循环中 需要自动激活超过最大时间限制的设备 """ # 找出所有被标记的设备 devices = Device.objects.filter(isRent = True) for _dev in devices: # type: Device if not _dev.dailyRent: logger.error("rent device = {} has not dailyRent!".format(_dev.devNo)) nowTime = datetime.datetime.now() # 没有被激活的设备 if not _dev.dailyRent.active: # 自动激活 if _dev.dailyRent.lastActiveDate < nowTime: _dev.active_rent() _dev.reload() # 激活状态的设备 直接创建订单 if not _dev.dailyRent.active: continue # 订单创建 DeviceRentOrder.create_by_device(_dev) def deduct_rent_order(): """ 设备 日租账单扣款 以经销商的维度 -- 设备 进行扣款 :return: """ devices = Device.objects.filter(isRent = True).only(Device.ownerId.name) # 找出所有持有租用设备的经销商的ID dealerIds = set() for _device in devices: # type: Device dealerIds.add(_device.ownerId) for _dealerId in dealerIds: logger.info("start dealer <{}> rent order".format(_dealerId)) # 获取所有经销商没有支付的订单 时间由远及近 _dealer = Dealer.objects.get(id=_dealerId) _orders = DeviceRentOrder.get_not_paid_by_dealer(_dealer) # 对每一个订单进行支付 for _order in _orders: RentOrderServer(_order, _dealer).execute() # 针对云快充协议。同步设备的时间。下发服务器的时间到设备。 def sync_device_time_for_tcpcar(): from apps.web.core.helpers import ActionDeviceBuilder devObjs = Device.get_collection().find({'devType.code':Const.DEVICE_TYPE_CODE_CAR_NENGPAI}) devList = [] for obj in devObjs: dev = Device.get_dev(obj.devNo) if not dev.online: continue devList.append(dev) for dev in devList: try: box = ActionDeviceBuilder.create_action_device(dev) box.send_current_time() except Exception,e: continue return groupid_from_key = lambda groupKey: groupKey.split('_')[1] dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1] # todo 凌晨2点半执行 def make_rpt_into_db(): from apps.web.report.models import DevReport, GroupReport, DealerReport from apps import reportCache from apps.web.device.models import OfflineReportDealers, GroupDict from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl def get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId): if dealerId in doneDealers: # logger.debug('{} has done.'.format(dealerId)) return groupIds = Group.get_group_ids_of_dealer(dealerId) if groupIds: dealerDict[dealerId] = groupIds doneDealers.append(dealerId) dealerGroups = Group.get_groups_by_group_ids(groupIds).values() for dealerGroup in dealerGroups: # type: GroupDict groupId = dealerGroup.groupId parters = dealerGroup.partners() for parter in parters: parter_id = parter['id'] parterDict[parter_id] = parterDict.get(parter_id) or set() parterDict[parter_id].add(groupId) get_owner_or_parter(dealerDict, doneDealers, parterDict, parter_id) parterGroupIds = Group.get_group_ids_of_partner(dealerId) parterGroups = Group.get_groups_by_group_ids(parterGroupIds).values() for parterGroup in parterGroups: # type: GroupDict groupId = parterGroup.groupId parterDict[dealerId] = parterDict.get(dealerId) or set() parterDict[dealerId].add(groupId) get_owner_or_parter(dealerDict, doneDealers, parterDict, parterGroup.ownerId) def check_offline_coins(dev_in_db, dealerDict, parterDict, stringDate, update=False): dealerCoinDict = {} groupCoinDict = {} for ownerId, groupIds in dealerDict.items(): ownerCoins = 0 for groupId in list(set(groupIds)): devNos = Device.get_devNos_by_group([groupId]) keys = [devCoinTmpl(devNo, stringDate) for devNo in devNos] devCacheDict = reportCache.get_many(keys) for devNo in devNos: dbValue = int(dev_in_db.get(devNo, 0)) if devCoinTmpl(devNo, stringDate) in devCacheDict: cacheValue = int(devCacheDict[devCoinTmpl(devNo, stringDate)]) if dbValue != cacheValue: print '{} db not equal cache {}. update db.'.format(devNo, cacheValue) DevReport.get_collection().update_one({ 'devNo': devNo, 'type': 'day', 'date': stringDate }, { '$set': { 'devNo': devNo, 'type': 'day', 'date': stringDate, 'rpt.lineCoins': cacheValue } }, upsert=True) else: pass # print '{} db<{}> equal cache <{}>.'.format(devNo, dbValue, cacheValue) else: devCacheDict[devCoinTmpl(devNo, stringDate)] = dbValue listCoins = [int(coins) for coins in devCacheDict.values()] allDevCoinsByGroup = sum(listCoins) oldValue = reportCache.get(groupCoinTmpl(groupId, stringDate)) if not oldValue: oldValue = 0 else: oldValue = int(oldValue) groupCoinDict[groupCoinTmpl(groupId, stringDate)] = allDevCoinsByGroup if update: reportCache.set(groupCoinTmpl(groupId, stringDate), str(int(allDevCoinsByGroup))) if oldValue != allDevCoinsByGroup: logger.info( 'not equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % ( ownerId, groupId, oldValue, allDevCoinsByGroup)) else: pass # logger.info( # 'equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % ( # ownerId, groupId, oldValue, allDevCoinsByGroup)) ownerCoins = ownerCoins + int(allDevCoinsByGroup) dealerCoinDict[ownerCoinTmpl(ownerId, stringDate)] = ownerCoins for partnerId, groupIds in parterDict.items(): partnerCoinsByGroup = 0 for groupId in list(set(groupIds)): if groupCoinTmpl(groupId, stringDate) in groupCoinDict: groupCoin = groupCoinDict[groupCoinTmpl(groupId, stringDate)] else: groupCoin = 0 logger.info( 'allocated as partner. partnerId = %s; groupId = %s; coin = %s' % (partnerId, groupId, groupCoin)) partnerCoinsByGroup = partnerCoinsByGroup + int(groupCoin) if ownerCoinTmpl(partnerId, stringDate) in dealerCoinDict: dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = dealerCoinDict[ ownerCoinTmpl(partnerId, stringDate)] + partnerCoinsByGroup else: dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = partnerCoinsByGroup for dealerKey, coin in dealerCoinDict.items(): logger.info('dealer key is: {}'.format(dealerKey)) oldValue = reportCache.get(dealerKey) if not oldValue: oldValue = 0 else: oldValue = int(oldValue) if update: reportCache.set(dealerKey, str(int(coin))) if oldValue != coin: logger.info( 'not equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % ( dealerKey, oldValue, coin)) else: pass # logger.info( # 'equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % ( # dealerKey, oldValue, coin)) return groupCoinDict, dealerCoinDict def report_into_db(groupCoinDict, dealerCoinDict, stringDate): start_time = datetime.datetime.now() logger.info('generating report for date (%s)' % (stringDate,)) bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx for groupKey, coins in groupCoinDict.items(): groupId = groupid_from_key(groupKey) bulker.upsert(query_dict={ 'groupId': groupId, 'type': 'day', 'date': stringDate }, update_dict={ '$set': { 'groupId': groupId, 'type': 'day', 'date': stringDate, 'rpt.lineCoins': int(coins) } }) if len(bulker.requests) >= 2000: bulker.execute() bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx if len(bulker.requests) > 0: bulker.execute() bulker = None bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx for dealerKey, coins in dealerCoinDict.iteritems(): dealerId = dealerid_from_key(dealerKey) bulker.upsert(query_dict={ 'ownerId': dealerId, 'type': 'day', 'date': stringDate }, update_dict={ '$set': { 'ownerId': dealerId, 'type': 'day', 'date': stringDate, 'rpt.lineCoins': int(coins) } }) if len(bulker.requests) >= 2000: bulker.execute() bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx if len(bulker.requests) > 0: bulker.execute() bulker = None logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - start_time,)) # 自动脚本不判断,注释 # if dateFmtStr: # reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d") # else: # reportDate = datetime.datetime.now() - datetime.timedelta(days = 1) reportDate = datetime.datetime.now() - datetime.timedelta(days=1) startTime = datetime.datetime.now() # 自动脚本不判断,注释 # if len(sys.argv) >= 2: # dealer_id_list = [str(sys.argv[1])] # else: # dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d")) dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d")) dealerDict = {} parterDict = {} doneDealers = [] dealerId = '' for dealerId in dealer_id_list: logger.info('fetch group map for dealer'.format(dealerId)) get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId) for dealerId, groupIds in dealerDict.iteritems(): print 'owner. {} = {}'.format(dealerId, groupIds) for dealerId, groupIds in parterDict.iteritems(): print 'parter. {} = {}'.format(dealerId, groupIds) logger.info('fetch device map for dealer'.format(dealerId)) devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealerDict.items()} devNos = [] for items in devDict.values(): devNos.extend(items) dev_in_db = {} for item in DevReport.get_collection().find( {'devNo': {'$in': devNos}, 'type': 'day', 'date': reportDate.strftime("%Y-%m-%d")}, {'devNo': 1, 'rpt': 1}): if 'rpt' in item and 'lineCoins' in item['rpt']: dev_in_db[item['devNo']] = item['rpt']['lineCoins'] # for devNo, coins in dev_in_db.iteritems(): # print '{} has coins {} in db.'.format(devNo, coins) groupCoinDict, dealerCoinDict = check_offline_coins(dev_in_db, dealerDict, parterDict, reportDate.strftime("%Y-%m-%d"), update = False) for groupKey, coins in groupCoinDict.iteritems(): logger.info('group {} has coins {}'.format(groupid_from_key(groupKey), coins)) for dealerKey, coins in dealerCoinDict.iteritems(): logger.info('dealer {} has coins {}'.format(dealerid_from_key(dealerKey), coins)) report_into_db(groupCoinDict, dealerCoinDict, reportDate.strftime("%Y-%m-%d")) logger.debug('all steps cost {}'.format(datetime.datetime.now() - startTime)) def report_to_zhejiang_fight(): from apps.web.api.zhejiang.models import ZheJiangFireFight from apps.web.api.zhejiang.utils import ZheJiangNorther fights = list(ZheJiangFireFight.objects.all()) for _fight in fights: ZheJiangNorther(_fight).report()