123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import datetime
- import time
- from celery.utils.log import get_task_logger
- from mongoengine import DoesNotExist
- from apilib.utils_mongo import BulkHandlerEx
- from apps.web.constant import Const
- from apps.web.dealer.models import Dealer
- from apps.web.dealer.utils import RentOrderServer
- from apps.web.device.models import Device, Group, DeviceDict, FaultRecord, DeviceRentOrder
- from apps.web.device.timescale import OfflineManager
- from apps.web.helpers import get_wechat_manager_mp_proxy
- from apps.web.south_intf.liangxi_fire import ToXiaoFang
- from apps.web.user.models import ServiceProgress
- logger = get_task_logger(__name__)
- def remove_serviceProgress_periodically():
- threeDaysAgo = datetime.datetime.now() - datetime.timedelta(days = 3)
- threeDaysAgo = int(time.mktime(threeDaysAgo.timetuple()))
- threeMonthAgo = datetime.datetime.now() - datetime.timedelta(days = 91)
- threeMonthAgo = int(time.mktime(threeMonthAgo.timetuple()))
- start_time = int(time.time())
- while True:
- now_time = int(time.time())
- if now_time - start_time > 2 * 60 * 60:
- logger.debug('time is over. wait next.')
- return
- items = ServiceProgress.get_collection().find(
- {
- '$or': [{'finished_time': {'$lt': threeDaysAgo}}, {'isFinished': True}],
- 'devTypeCode': {
- '$ne': Const.DEVICE_TYPE_CODE_HP_GATE
- }}, {'_id': 1}).limit(2000)
- bulker = BulkHandlerEx(ServiceProgress.get_collection())
- for item in items:
- bulker.delete(query_dict = {'_id': item['_id']})
- count = len(bulker.requests)
- if count > 0:
- logger.debug('prepare to delete {} rows.'.format(count))
- bulker.execute()
- bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx
- if count < 2000:
- break
- while True:
- now_time = int(time.time())
- if now_time - start_time > 2 * 60 * 60:
- logger.debug('time is over. wait next.')
- return
- items = ServiceProgress.get_collection().find(
- {
- '$or': [
- {'finished_time': {'$lt': threeMonthAgo}},
- {'isFinished': True}
- ],
- 'finished_time': {'$lt': threeDaysAgo},
- 'devTypeCode': Const.DEVICE_TYPE_CODE_HP_GATE
- }, {'_id': 1}).limit(2000)
- bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx
- for item in items:
- bulker.delete(query_dict = {'_id': item['_id']})
- count = len(bulker.requests)
- if count > 0:
- logger.debug('prepare to delete {} rows.'.format(count))
- bulker.execute()
- bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx
- if count < 2000:
- break
- logger.debug('delete all over.')
- def send_to_xf_all_dev_info():
- logger.info('now to send_to_xf_all')
- # 找到梁希区的所有设备
- groups = Group.objects.filter(districtId = '200275')
- groupIds = [str(_.id) for _ in groups]
- for dev in Device.objects.filter(groupId__in = groupIds).only(Device.devNo.name, Device.groupId.name):
- dev = Device.get_dev(dev.devNo)
- ToXiaoFang(dev).send_to_xf_ini()
- def send_to_xf_falut(devNo, faultId):
- """
- :param devNo:
- :param faultId:
- :return:
- """
- logger.info("device {} send to liangxi xiaofang fault = {}".format(devNo, faultId))
- faultRecord = FaultRecord.objects.get(id = faultId)
- dev = Device.get_dev(devNo)
- to_xiao_fang = ToXiaoFang(dev)
- to_xiao_fang.send_to_xf_fault(faultRecord)
- def send_to_xf_fault_handle(devNo, faultId):
- """
- :param devNo:
- :param faultId:
- :return:
- """
- logger.info("device {} send to liangxi xiaofang fault handle, fault = {}".format(devNo, faultId))
- faultRecord = FaultRecord.objects.get(id = faultId)
- dev = Device.get_dev(devNo)
- to_xiao_fang = ToXiaoFang(dev)
- to_xiao_fang.send_to_xf_handle(faultRecord)
- # 检查设备,以服务器为中心,如果两边数据不一致,就下发配置。
- # 注意,这里是每几分钟产生一条任务,所以,每台设备执行完后,需要重新从数据库中检查
- # 这里为了答标,不考虑重入,如果拿到标了,需要最终做到设备上
- def set_device_deactive_for_langxin():
- from apps.web.core.helpers import ActionDeviceBuilder
- def isBetweenTime(startTime, endTime, thisTime):
- if endTime >= startTime:
- return thisTime >= startTime and thisTime <= endTime
- else:
- if thisTime >= startTime and thisTime <= '23:59':
- return True
- elif thisTime >= '00:00' and thisTime <= endTime:
- return True
- else:
- return False
- devObjs = Device.get_collection().find({'otherConf.supportForbiddenInNight': 1})
- # 先检查
- nowDateTime = datetime.datetime.now()
- nowTime = '%s:%s' % (nowDateTime.hour, nowDateTime.minute)
- for obj in devObjs:
- otherConf = obj['otherConf']
- if otherConf['forbiddenInNight']:
- # 禁用期内,如果已经禁用了,直接禁用
- if isBetweenTime(otherConf['startTime'], otherConf['endTime'], nowTime):
- if otherConf.get('curCoinStatus', '') == 'forbidden' and otherConf.get('curCardStatus',
- '') == 'forbidden':
- continue
- else:
- try:
- dev = Device.get_dev(obj['devNo'])
- box = ActionDeviceBuilder.create_action_device(dev)
- box.set_coin_card_enable({'putCoins': '00', 'icCard': '00'})
- otherConf['curCoinStatus'] = '00'
- otherConf['curCardStatus'] = '00'
- obj.save()
- except Exception, e:
- continue
- else:
- if otherConf['putCoins'] != otherConf['curCoinStatus'] or otherConf['icCard'] != otherConf[
- 'curCardStatus']:
- try:
- dev = Device.get_dev(obj['devNo'])
- box = ActionDeviceBuilder.create_action_device(dev)
- box.set_coin_card_enable({'putCoins': otherConf['putCoins'], 'icCard': otherConf['icCard']})
- otherConf['curCoinStatus'] = otherConf['putCoins']
- otherConf['curCardStatus'] = otherConf['icCard']
- obj.save()
- except Exception, e:
- continue
- def turn_on_power_huan_dian_gui(devNo, port):
- from apps.web.core.helpers import ActionDeviceBuilder
- dev = Device.get_dev(devNo)
- box = ActionDeviceBuilder.create_action_device(dev)
- box._turn_on_power(port)
- def device_offline_notify(devNo):
- """
- 设备离线通知经销商
- :param devNo:
- :return:
- """
- # 不管成功与否 释放cache
- OfflineManager.delete_cache(devNo)
- dev = Device.get_dev(devNo) # type: DeviceDict
- # 参数检查
- if not dev:
- logger.error("dev is not registe to databse, devNo is <{}>".format(devNo))
- return
- if dev.online:
- logger.info("dev is online, devNo is <{}>".format(devNo))
- return
- try:
- dealer = Dealer.objects.get(id = dev.ownerId)
- except DoesNotExist:
- logger.error("dealer is not registe to databse, devNo is <{}>, dealer is <{}>".format(devNo, dev.ownerId))
- return
- # 对经销商发起通知
- try:
- wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer)
- wechat_mp_proxy.notify(
- dealer.managerialOpenId or "",
- "abnormal_device_offline",
- title = u"您当前有设备离线,请尽快排查原因",
- device = dev.logicalCode,
- notifyTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- )
- except Exception as e:
- logger.exception(
- "notify to dealer offline device faild, devNo is <{}>, dealer is <{}>\n, error is {}".format(devNo,
- dev.ownerId,
- e))
- def gen_daily_rent_order():
- """
- 生成每日的 日租订单
- 在此循环中 需要自动激活超过最大时间限制的设备
- """
- # 找出所有被标记的设备
- devices = Device.objects.filter(isRent = True)
- for _dev in devices: # type: Device
- if not _dev.dailyRent:
- logger.error("rent device = {} has not dailyRent!".format(_dev.devNo))
- nowTime = datetime.datetime.now()
- # 没有被激活的设备
- if not _dev.dailyRent.active:
- # 自动激活
- if _dev.dailyRent.lastActiveDate < nowTime:
- _dev.active_rent()
- _dev.reload()
- # 激活状态的设备 直接创建订单
- if not _dev.dailyRent.active:
- continue
- # 订单创建
- DeviceRentOrder.create_by_device(_dev)
- def deduct_rent_order():
- """
- 设备 日租账单扣款
- 以经销商的维度 -- 设备 进行扣款
- :return:
- """
- devices = Device.objects.filter(isRent = True).only(Device.ownerId.name)
- # 找出所有持有租用设备的经销商的ID
- dealerIds = set()
- for _device in devices: # type: Device
- dealerIds.add(_device.ownerId)
- for _dealerId in dealerIds:
- logger.info("start dealer <{}> rent order".format(_dealerId))
- # 获取所有经销商没有支付的订单 时间由远及近
- _dealer = Dealer.objects.get(id=_dealerId)
- _orders = DeviceRentOrder.get_not_paid_by_dealer(_dealer)
- # 对每一个订单进行支付
- for _order in _orders:
- RentOrderServer(_order, _dealer).execute()
- # 针对云快充协议。同步设备的时间。下发服务器的时间到设备。
- def sync_device_time_for_tcpcar():
- from apps.web.core.helpers import ActionDeviceBuilder
- devObjs = Device.get_collection().find({'devType.code':Const.DEVICE_TYPE_CODE_CAR_NENGPAI})
- devList = []
- for obj in devObjs:
- dev = Device.get_dev(obj.devNo)
- if not dev.online:
- continue
- devList.append(dev)
-
- for dev in devList:
- try:
- box = ActionDeviceBuilder.create_action_device(dev)
- box.send_current_time()
- except Exception,e:
- continue
- return
- groupid_from_key = lambda groupKey: groupKey.split('_')[1]
- dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1]
- # todo 凌晨2点半执行
- def make_rpt_into_db():
- from apps.web.report.models import DevReport, GroupReport, DealerReport
- from apps import reportCache
- from apps.web.device.models import OfflineReportDealers, GroupDict
- from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl
- def get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId):
- if dealerId in doneDealers:
- # logger.debug('{} has done.'.format(dealerId))
- return
- groupIds = Group.get_group_ids_of_dealer(dealerId)
- if groupIds:
- dealerDict[dealerId] = groupIds
- doneDealers.append(dealerId)
- dealerGroups = Group.get_groups_by_group_ids(groupIds).values()
- for dealerGroup in dealerGroups: # type: GroupDict
- groupId = dealerGroup.groupId
- parters = dealerGroup.partners()
- for parter in parters:
- parter_id = parter['id']
- parterDict[parter_id] = parterDict.get(parter_id) or set()
- parterDict[parter_id].add(groupId)
- get_owner_or_parter(dealerDict, doneDealers, parterDict, parter_id)
- parterGroupIds = Group.get_group_ids_of_partner(dealerId)
- parterGroups = Group.get_groups_by_group_ids(parterGroupIds).values()
- for parterGroup in parterGroups: # type: GroupDict
- groupId = parterGroup.groupId
- parterDict[dealerId] = parterDict.get(dealerId) or set()
- parterDict[dealerId].add(groupId)
- get_owner_or_parter(dealerDict, doneDealers, parterDict, parterGroup.ownerId)
- def check_offline_coins(dev_in_db, dealerDict, parterDict, stringDate, update=False):
- dealerCoinDict = {}
- groupCoinDict = {}
- for ownerId, groupIds in dealerDict.items():
- ownerCoins = 0
- for groupId in list(set(groupIds)):
- devNos = Device.get_devNos_by_group([groupId])
- keys = [devCoinTmpl(devNo, stringDate) for devNo in devNos]
- devCacheDict = reportCache.get_many(keys)
- for devNo in devNos:
- dbValue = int(dev_in_db.get(devNo, 0))
- if devCoinTmpl(devNo, stringDate) in devCacheDict:
- cacheValue = int(devCacheDict[devCoinTmpl(devNo, stringDate)])
- if dbValue != cacheValue:
- print '{} db not equal cache {}. update db.'.format(devNo, cacheValue)
- DevReport.get_collection().update_one({
- 'devNo': devNo,
- 'type': 'day',
- 'date': stringDate
- }, {
- '$set': {
- 'devNo': devNo,
- 'type': 'day',
- 'date': stringDate,
- 'rpt.lineCoins': cacheValue
- }
- }, upsert=True)
- else:
- pass
- # print '{} db<{}> equal cache <{}>.'.format(devNo, dbValue, cacheValue)
- else:
- devCacheDict[devCoinTmpl(devNo, stringDate)] = dbValue
- listCoins = [int(coins) for coins in devCacheDict.values()]
- allDevCoinsByGroup = sum(listCoins)
- oldValue = reportCache.get(groupCoinTmpl(groupId, stringDate))
- if not oldValue:
- oldValue = 0
- else:
- oldValue = int(oldValue)
- groupCoinDict[groupCoinTmpl(groupId, stringDate)] = allDevCoinsByGroup
- if update:
- reportCache.set(groupCoinTmpl(groupId, stringDate), str(int(allDevCoinsByGroup)))
- if oldValue != allDevCoinsByGroup:
- logger.info(
- 'not equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % (
- ownerId, groupId, oldValue, allDevCoinsByGroup))
- else:
- pass
- # logger.info(
- # 'equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % (
- # ownerId, groupId, oldValue, allDevCoinsByGroup))
- ownerCoins = ownerCoins + int(allDevCoinsByGroup)
- dealerCoinDict[ownerCoinTmpl(ownerId, stringDate)] = ownerCoins
- for partnerId, groupIds in parterDict.items():
- partnerCoinsByGroup = 0
- for groupId in list(set(groupIds)):
- if groupCoinTmpl(groupId, stringDate) in groupCoinDict:
- groupCoin = groupCoinDict[groupCoinTmpl(groupId, stringDate)]
- else:
- groupCoin = 0
- logger.info(
- 'allocated as partner. partnerId = %s; groupId = %s; coin = %s' % (partnerId, groupId, groupCoin))
- partnerCoinsByGroup = partnerCoinsByGroup + int(groupCoin)
- if ownerCoinTmpl(partnerId, stringDate) in dealerCoinDict:
- dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = dealerCoinDict[
- ownerCoinTmpl(partnerId,
- stringDate)] + partnerCoinsByGroup
- else:
- dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = partnerCoinsByGroup
- for dealerKey, coin in dealerCoinDict.items():
- logger.info('dealer key is: {}'.format(dealerKey))
- oldValue = reportCache.get(dealerKey)
- if not oldValue:
- oldValue = 0
- else:
- oldValue = int(oldValue)
- if update:
- reportCache.set(dealerKey, str(int(coin)))
- if oldValue != coin:
- logger.info(
- 'not equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % (
- dealerKey, oldValue, coin))
- else:
- pass
- # logger.info(
- # 'equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % (
- # dealerKey, oldValue, coin))
- return groupCoinDict, dealerCoinDict
- def report_into_db(groupCoinDict, dealerCoinDict, stringDate):
- start_time = datetime.datetime.now()
- logger.info('generating report for date (%s)' % (stringDate,))
- bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx
- for groupKey, coins in groupCoinDict.items():
- groupId = groupid_from_key(groupKey)
- bulker.upsert(query_dict={
- 'groupId': groupId,
- 'type': 'day',
- 'date': stringDate
- }, update_dict={
- '$set': {
- 'groupId': groupId,
- 'type': 'day',
- 'date': stringDate,
- 'rpt.lineCoins': int(coins)
- }
- })
- if len(bulker.requests) >= 2000:
- bulker.execute()
- bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx
- if len(bulker.requests) > 0:
- bulker.execute()
- bulker = None
- bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx
- for dealerKey, coins in dealerCoinDict.iteritems():
- dealerId = dealerid_from_key(dealerKey)
- bulker.upsert(query_dict={
- 'ownerId': dealerId,
- 'type': 'day',
- 'date': stringDate
- }, update_dict={
- '$set': {
- 'ownerId': dealerId,
- 'type': 'day',
- 'date': stringDate,
- 'rpt.lineCoins': int(coins)
- }
- })
- if len(bulker.requests) >= 2000:
- bulker.execute()
- bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx
- if len(bulker.requests) > 0:
- bulker.execute()
- bulker = None
- logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - start_time,))
- # 自动脚本不判断,注释
- # if dateFmtStr:
- # reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d")
- # else:
- # reportDate = datetime.datetime.now() - datetime.timedelta(days = 1)
- reportDate = datetime.datetime.now() - datetime.timedelta(days=1)
- startTime = datetime.datetime.now()
- # 自动脚本不判断,注释
- # if len(sys.argv) >= 2:
- # dealer_id_list = [str(sys.argv[1])]
- # else:
- # dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d"))
- dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d"))
- dealerDict = {}
- parterDict = {}
- doneDealers = []
- dealerId = ''
- for dealerId in dealer_id_list:
- logger.info('fetch group map for dealer<id={}>'.format(dealerId))
- get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId)
- for dealerId, groupIds in dealerDict.iteritems():
- print 'owner. {} = {}'.format(dealerId, groupIds)
- for dealerId, groupIds in parterDict.iteritems():
- print 'parter. {} = {}'.format(dealerId, groupIds)
- logger.info('fetch device map for dealer<id={}>'.format(dealerId))
- devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealerDict.items()}
- devNos = []
- for items in devDict.values():
- devNos.extend(items)
- dev_in_db = {}
- for item in DevReport.get_collection().find(
- {'devNo': {'$in': devNos}, 'type': 'day', 'date': reportDate.strftime("%Y-%m-%d")},
- {'devNo': 1, 'rpt': 1}):
- if 'rpt' in item and 'lineCoins' in item['rpt']:
- dev_in_db[item['devNo']] = item['rpt']['lineCoins']
- # for devNo, coins in dev_in_db.iteritems():
- # print '{} has coins {} in db.'.format(devNo, coins)
- groupCoinDict, dealerCoinDict = check_offline_coins(dev_in_db, dealerDict, parterDict,
- reportDate.strftime("%Y-%m-%d"), update = False)
- for groupKey, coins in groupCoinDict.iteritems():
- logger.info('group {} has coins {}'.format(groupid_from_key(groupKey), coins))
- for dealerKey, coins in dealerCoinDict.iteritems():
- logger.info('dealer {} has coins {}'.format(dealerid_from_key(dealerKey), coins))
- report_into_db(groupCoinDict, dealerCoinDict, reportDate.strftime("%Y-%m-%d"))
- logger.debug('all steps cost {}'.format(datetime.datetime.now() - startTime))
- def report_to_zhejiang_fight():
- from apps.web.api.zhejiang.models import ZheJiangFireFight
- from apps.web.api.zhejiang.utils import ZheJiangNorther
- fights = list(ZheJiangFireFight.objects.all())
- for _fight in fights:
- ZheJiangNorther(_fight).report()
|