tasks.py 22 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import time
  5. from celery.utils.log import get_task_logger
  6. from mongoengine import DoesNotExist
  7. from apilib.utils_mongo import BulkHandlerEx
  8. from apps.web.constant import Const
  9. from apps.web.dealer.models import Dealer
  10. from apps.web.dealer.utils import RentOrderServer
  11. from apps.web.device.models import Device, Group, DeviceDict, FaultRecord, DeviceRentOrder
  12. from apps.web.device.timescale import OfflineManager
  13. from apps.web.helpers import get_wechat_manager_mp_proxy
  14. from apps.web.south_intf.liangxi_fire import ToXiaoFang
  15. from apps.web.user.models import ServiceProgress
  16. logger = get_task_logger(__name__)
  17. def remove_serviceProgress_periodically():
  18. threeDaysAgo = datetime.datetime.now() - datetime.timedelta(days = 3)
  19. threeDaysAgo = int(time.mktime(threeDaysAgo.timetuple()))
  20. threeMonthAgo = datetime.datetime.now() - datetime.timedelta(days = 91)
  21. threeMonthAgo = int(time.mktime(threeMonthAgo.timetuple()))
  22. start_time = int(time.time())
  23. while True:
  24. now_time = int(time.time())
  25. if now_time - start_time > 2 * 60 * 60:
  26. logger.debug('time is over. wait next.')
  27. return
  28. items = ServiceProgress.get_collection().find(
  29. {
  30. '$or': [{'finished_time': {'$lt': threeDaysAgo}}, {'isFinished': True}],
  31. 'devTypeCode': {
  32. '$ne': Const.DEVICE_TYPE_CODE_HP_GATE
  33. }}, {'_id': 1}).limit(2000)
  34. bulker = BulkHandlerEx(ServiceProgress.get_collection())
  35. for item in items:
  36. bulker.delete(query_dict = {'_id': item['_id']})
  37. count = len(bulker.requests)
  38. if count > 0:
  39. logger.debug('prepare to delete {} rows.'.format(count))
  40. bulker.execute()
  41. bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx
  42. if count < 2000:
  43. break
  44. while True:
  45. now_time = int(time.time())
  46. if now_time - start_time > 2 * 60 * 60:
  47. logger.debug('time is over. wait next.')
  48. return
  49. items = ServiceProgress.get_collection().find(
  50. {
  51. '$or': [
  52. {'finished_time': {'$lt': threeMonthAgo}},
  53. {'isFinished': True}
  54. ],
  55. 'finished_time': {'$lt': threeDaysAgo},
  56. 'devTypeCode': Const.DEVICE_TYPE_CODE_HP_GATE
  57. }, {'_id': 1}).limit(2000)
  58. bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx
  59. for item in items:
  60. bulker.delete(query_dict = {'_id': item['_id']})
  61. count = len(bulker.requests)
  62. if count > 0:
  63. logger.debug('prepare to delete {} rows.'.format(count))
  64. bulker.execute()
  65. bulker = BulkHandlerEx(ServiceProgress.get_collection()) # type: BulkHandlerEx
  66. if count < 2000:
  67. break
  68. logger.debug('delete all over.')
  69. def send_to_xf_all_dev_info():
  70. logger.info('now to send_to_xf_all')
  71. # 找到梁希区的所有设备
  72. groups = Group.objects.filter(districtId = '200275')
  73. groupIds = [str(_.id) for _ in groups]
  74. for dev in Device.objects.filter(groupId__in = groupIds).only(Device.devNo.name, Device.groupId.name):
  75. dev = Device.get_dev(dev.devNo)
  76. ToXiaoFang(dev).send_to_xf_ini()
  77. def send_to_xf_falut(devNo, faultId):
  78. """
  79. :param devNo:
  80. :param faultId:
  81. :return:
  82. """
  83. logger.info("device {} send to liangxi xiaofang fault = {}".format(devNo, faultId))
  84. faultRecord = FaultRecord.objects.get(id = faultId)
  85. dev = Device.get_dev(devNo)
  86. to_xiao_fang = ToXiaoFang(dev)
  87. to_xiao_fang.send_to_xf_fault(faultRecord)
  88. def send_to_xf_fault_handle(devNo, faultId):
  89. """
  90. :param devNo:
  91. :param faultId:
  92. :return:
  93. """
  94. logger.info("device {} send to liangxi xiaofang fault handle, fault = {}".format(devNo, faultId))
  95. faultRecord = FaultRecord.objects.get(id = faultId)
  96. dev = Device.get_dev(devNo)
  97. to_xiao_fang = ToXiaoFang(dev)
  98. to_xiao_fang.send_to_xf_handle(faultRecord)
  99. # 检查设备,以服务器为中心,如果两边数据不一致,就下发配置。
  100. # 注意,这里是每几分钟产生一条任务,所以,每台设备执行完后,需要重新从数据库中检查
  101. # 这里为了答标,不考虑重入,如果拿到标了,需要最终做到设备上
  102. def set_device_deactive_for_langxin():
  103. from apps.web.core.helpers import ActionDeviceBuilder
  104. def isBetweenTime(startTime, endTime, thisTime):
  105. if endTime >= startTime:
  106. return thisTime >= startTime and thisTime <= endTime
  107. else:
  108. if thisTime >= startTime and thisTime <= '23:59':
  109. return True
  110. elif thisTime >= '00:00' and thisTime <= endTime:
  111. return True
  112. else:
  113. return False
  114. devObjs = Device.get_collection().find({'otherConf.supportForbiddenInNight': 1})
  115. # 先检查
  116. nowDateTime = datetime.datetime.now()
  117. nowTime = '%s:%s' % (nowDateTime.hour, nowDateTime.minute)
  118. for obj in devObjs:
  119. otherConf = obj['otherConf']
  120. if otherConf['forbiddenInNight']:
  121. # 禁用期内,如果已经禁用了,直接禁用
  122. if isBetweenTime(otherConf['startTime'], otherConf['endTime'], nowTime):
  123. if otherConf.get('curCoinStatus', '') == 'forbidden' and otherConf.get('curCardStatus',
  124. '') == 'forbidden':
  125. continue
  126. else:
  127. try:
  128. dev = Device.get_dev(obj['devNo'])
  129. box = ActionDeviceBuilder.create_action_device(dev)
  130. box.set_coin_card_enable({'putCoins': '00', 'icCard': '00'})
  131. otherConf['curCoinStatus'] = '00'
  132. otherConf['curCardStatus'] = '00'
  133. obj.save()
  134. except Exception, e:
  135. continue
  136. else:
  137. if otherConf['putCoins'] != otherConf['curCoinStatus'] or otherConf['icCard'] != otherConf[
  138. 'curCardStatus']:
  139. try:
  140. dev = Device.get_dev(obj['devNo'])
  141. box = ActionDeviceBuilder.create_action_device(dev)
  142. box.set_coin_card_enable({'putCoins': otherConf['putCoins'], 'icCard': otherConf['icCard']})
  143. otherConf['curCoinStatus'] = otherConf['putCoins']
  144. otherConf['curCardStatus'] = otherConf['icCard']
  145. obj.save()
  146. except Exception, e:
  147. continue
  148. def turn_on_power_huan_dian_gui(devNo, port):
  149. from apps.web.core.helpers import ActionDeviceBuilder
  150. dev = Device.get_dev(devNo)
  151. box = ActionDeviceBuilder.create_action_device(dev)
  152. box._turn_on_power(port)
  153. def device_offline_notify(devNo):
  154. """
  155. 设备离线通知经销商
  156. :param devNo:
  157. :return:
  158. """
  159. # 不管成功与否 释放cache
  160. OfflineManager.delete_cache(devNo)
  161. dev = Device.get_dev(devNo) # type: DeviceDict
  162. # 参数检查
  163. if not dev:
  164. logger.error("dev is not registe to databse, devNo is <{}>".format(devNo))
  165. return
  166. if dev.online:
  167. logger.info("dev is online, devNo is <{}>".format(devNo))
  168. return
  169. try:
  170. dealer = Dealer.objects.get(id = dev.ownerId)
  171. except DoesNotExist:
  172. logger.error("dealer is not registe to databse, devNo is <{}>, dealer is <{}>".format(devNo, dev.ownerId))
  173. return
  174. # 对经销商发起通知
  175. try:
  176. wechat_mp_proxy = get_wechat_manager_mp_proxy(dealer)
  177. wechat_mp_proxy.notify(
  178. dealer.managerialOpenId or "",
  179. "abnormal_device_offline",
  180. title = u"您当前有设备离线,请尽快排查原因",
  181. device = dev.logicalCode,
  182. notifyTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  183. )
  184. except Exception as e:
  185. logger.exception(
  186. "notify to dealer offline device faild, devNo is <{}>, dealer is <{}>\n, error is {}".format(devNo,
  187. dev.ownerId,
  188. e))
  189. def gen_daily_rent_order():
  190. """
  191. 生成每日的 日租订单
  192. 在此循环中 需要自动激活超过最大时间限制的设备
  193. """
  194. # 找出所有被标记的设备
  195. devices = Device.objects.filter(isRent = True)
  196. for _dev in devices: # type: Device
  197. if not _dev.dailyRent:
  198. logger.error("rent device = {} has not dailyRent!".format(_dev.devNo))
  199. nowTime = datetime.datetime.now()
  200. # 没有被激活的设备
  201. if not _dev.dailyRent.active:
  202. # 自动激活
  203. if _dev.dailyRent.lastActiveDate < nowTime:
  204. _dev.active_rent()
  205. _dev.reload()
  206. # 激活状态的设备 直接创建订单
  207. if not _dev.dailyRent.active:
  208. continue
  209. # 订单创建
  210. DeviceRentOrder.create_by_device(_dev)
  211. def deduct_rent_order():
  212. """
  213. 设备 日租账单扣款
  214. 以经销商的维度 -- 设备 进行扣款
  215. :return:
  216. """
  217. devices = Device.objects.filter(isRent = True).only(Device.ownerId.name)
  218. # 找出所有持有租用设备的经销商的ID
  219. dealerIds = set()
  220. for _device in devices: # type: Device
  221. dealerIds.add(_device.ownerId)
  222. for _dealerId in dealerIds:
  223. logger.info("start dealer <{}> rent order".format(_dealerId))
  224. # 获取所有经销商没有支付的订单 时间由远及近
  225. _dealer = Dealer.objects.get(id=_dealerId)
  226. _orders = DeviceRentOrder.get_not_paid_by_dealer(_dealer)
  227. # 对每一个订单进行支付
  228. for _order in _orders:
  229. RentOrderServer(_order, _dealer).execute()
  230. # 针对云快充协议。同步设备的时间。下发服务器的时间到设备。
  231. def sync_device_time_for_tcpcar():
  232. from apps.web.core.helpers import ActionDeviceBuilder
  233. devObjs = Device.get_collection().find({'devType.code':Const.DEVICE_TYPE_CODE_CAR_NENGPAI})
  234. devList = []
  235. for obj in devObjs:
  236. dev = Device.get_dev(obj.devNo)
  237. if not dev.online:
  238. continue
  239. devList.append(dev)
  240. for dev in devList:
  241. try:
  242. box = ActionDeviceBuilder.create_action_device(dev)
  243. box.send_current_time()
  244. except Exception,e:
  245. continue
  246. return
  247. groupid_from_key = lambda groupKey: groupKey.split('_')[1]
  248. dealerid_from_key = lambda dealerKey: dealerKey.split('_')[1]
  249. # todo 凌晨2点半执行
  250. def make_rpt_into_db():
  251. from apps.web.report.models import DevReport, GroupReport, DealerReport
  252. from apps import reportCache
  253. from apps.web.device.models import OfflineReportDealers, GroupDict
  254. from apps.web.core.accounting import devCoinTmpl, ownerCoinTmpl, groupCoinTmpl
  255. def get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId):
  256. if dealerId in doneDealers:
  257. # logger.debug('{} has done.'.format(dealerId))
  258. return
  259. groupIds = Group.get_group_ids_of_dealer(dealerId)
  260. if groupIds:
  261. dealerDict[dealerId] = groupIds
  262. doneDealers.append(dealerId)
  263. dealerGroups = Group.get_groups_by_group_ids(groupIds).values()
  264. for dealerGroup in dealerGroups: # type: GroupDict
  265. groupId = dealerGroup.groupId
  266. parters = dealerGroup.partners()
  267. for parter in parters:
  268. parter_id = parter['id']
  269. parterDict[parter_id] = parterDict.get(parter_id) or set()
  270. parterDict[parter_id].add(groupId)
  271. get_owner_or_parter(dealerDict, doneDealers, parterDict, parter_id)
  272. parterGroupIds = Group.get_group_ids_of_partner(dealerId)
  273. parterGroups = Group.get_groups_by_group_ids(parterGroupIds).values()
  274. for parterGroup in parterGroups: # type: GroupDict
  275. groupId = parterGroup.groupId
  276. parterDict[dealerId] = parterDict.get(dealerId) or set()
  277. parterDict[dealerId].add(groupId)
  278. get_owner_or_parter(dealerDict, doneDealers, parterDict, parterGroup.ownerId)
  279. def check_offline_coins(dev_in_db, dealerDict, parterDict, stringDate, update=False):
  280. dealerCoinDict = {}
  281. groupCoinDict = {}
  282. for ownerId, groupIds in dealerDict.items():
  283. ownerCoins = 0
  284. for groupId in list(set(groupIds)):
  285. devNos = Device.get_devNos_by_group([groupId])
  286. keys = [devCoinTmpl(devNo, stringDate) for devNo in devNos]
  287. devCacheDict = reportCache.get_many(keys)
  288. for devNo in devNos:
  289. dbValue = int(dev_in_db.get(devNo, 0))
  290. if devCoinTmpl(devNo, stringDate) in devCacheDict:
  291. cacheValue = int(devCacheDict[devCoinTmpl(devNo, stringDate)])
  292. if dbValue != cacheValue:
  293. print '{} db not equal cache {}. update db.'.format(devNo, cacheValue)
  294. DevReport.get_collection().update_one({
  295. 'devNo': devNo,
  296. 'type': 'day',
  297. 'date': stringDate
  298. }, {
  299. '$set': {
  300. 'devNo': devNo,
  301. 'type': 'day',
  302. 'date': stringDate,
  303. 'rpt.lineCoins': cacheValue
  304. }
  305. }, upsert=True)
  306. else:
  307. pass
  308. # print '{} db<{}> equal cache <{}>.'.format(devNo, dbValue, cacheValue)
  309. else:
  310. devCacheDict[devCoinTmpl(devNo, stringDate)] = dbValue
  311. listCoins = [int(coins) for coins in devCacheDict.values()]
  312. allDevCoinsByGroup = sum(listCoins)
  313. oldValue = reportCache.get(groupCoinTmpl(groupId, stringDate))
  314. if not oldValue:
  315. oldValue = 0
  316. else:
  317. oldValue = int(oldValue)
  318. groupCoinDict[groupCoinTmpl(groupId, stringDate)] = allDevCoinsByGroup
  319. if update:
  320. reportCache.set(groupCoinTmpl(groupId, stringDate), str(int(allDevCoinsByGroup)))
  321. if oldValue != allDevCoinsByGroup:
  322. logger.info(
  323. 'not equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % (
  324. ownerId, groupId, oldValue, allDevCoinsByGroup))
  325. else:
  326. pass
  327. # logger.info(
  328. # 'equal in group. ownerId=%s, groupId=%s, oldValue=%s, nowValue=%s' % (
  329. # ownerId, groupId, oldValue, allDevCoinsByGroup))
  330. ownerCoins = ownerCoins + int(allDevCoinsByGroup)
  331. dealerCoinDict[ownerCoinTmpl(ownerId, stringDate)] = ownerCoins
  332. for partnerId, groupIds in parterDict.items():
  333. partnerCoinsByGroup = 0
  334. for groupId in list(set(groupIds)):
  335. if groupCoinTmpl(groupId, stringDate) in groupCoinDict:
  336. groupCoin = groupCoinDict[groupCoinTmpl(groupId, stringDate)]
  337. else:
  338. groupCoin = 0
  339. logger.info(
  340. 'allocated as partner. partnerId = %s; groupId = %s; coin = %s' % (partnerId, groupId, groupCoin))
  341. partnerCoinsByGroup = partnerCoinsByGroup + int(groupCoin)
  342. if ownerCoinTmpl(partnerId, stringDate) in dealerCoinDict:
  343. dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = dealerCoinDict[
  344. ownerCoinTmpl(partnerId,
  345. stringDate)] + partnerCoinsByGroup
  346. else:
  347. dealerCoinDict[ownerCoinTmpl(partnerId, stringDate)] = partnerCoinsByGroup
  348. for dealerKey, coin in dealerCoinDict.items():
  349. logger.info('dealer key is: {}'.format(dealerKey))
  350. oldValue = reportCache.get(dealerKey)
  351. if not oldValue:
  352. oldValue = 0
  353. else:
  354. oldValue = int(oldValue)
  355. if update:
  356. reportCache.set(dealerKey, str(int(coin)))
  357. if oldValue != coin:
  358. logger.info(
  359. 'not equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % (
  360. dealerKey, oldValue, coin))
  361. else:
  362. pass
  363. # logger.info(
  364. # 'equal in dealer. ownerKey=%s, oldValue=%s, nowValue=%s' % (
  365. # dealerKey, oldValue, coin))
  366. return groupCoinDict, dealerCoinDict
  367. def report_into_db(groupCoinDict, dealerCoinDict, stringDate):
  368. start_time = datetime.datetime.now()
  369. logger.info('generating report for date (%s)' % (stringDate,))
  370. bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx
  371. for groupKey, coins in groupCoinDict.items():
  372. groupId = groupid_from_key(groupKey)
  373. bulker.upsert(query_dict={
  374. 'groupId': groupId,
  375. 'type': 'day',
  376. 'date': stringDate
  377. }, update_dict={
  378. '$set': {
  379. 'groupId': groupId,
  380. 'type': 'day',
  381. 'date': stringDate,
  382. 'rpt.lineCoins': int(coins)
  383. }
  384. })
  385. if len(bulker.requests) >= 2000:
  386. bulker.execute()
  387. bulker = BulkHandlerEx(GroupReport.get_collection()) # type: BulkHandlerEx
  388. if len(bulker.requests) > 0:
  389. bulker.execute()
  390. bulker = None
  391. bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx
  392. for dealerKey, coins in dealerCoinDict.iteritems():
  393. dealerId = dealerid_from_key(dealerKey)
  394. bulker.upsert(query_dict={
  395. 'ownerId': dealerId,
  396. 'type': 'day',
  397. 'date': stringDate
  398. }, update_dict={
  399. '$set': {
  400. 'ownerId': dealerId,
  401. 'type': 'day',
  402. 'date': stringDate,
  403. 'rpt.lineCoins': int(coins)
  404. }
  405. })
  406. if len(bulker.requests) >= 2000:
  407. bulker.execute()
  408. bulker = BulkHandlerEx(DealerReport.get_collection()) # type: BulkHandlerEx
  409. if len(bulker.requests) > 0:
  410. bulker.execute()
  411. bulker = None
  412. logger.info('[*]finished insert rpt into database!, time cost=%s' % (datetime.datetime.now() - start_time,))
  413. # 自动脚本不判断,注释
  414. # if dateFmtStr:
  415. # reportDate = datetime.datetime.strptime(dateFmtStr, "%Y-%m-%d")
  416. # else:
  417. # reportDate = datetime.datetime.now() - datetime.timedelta(days = 1)
  418. reportDate = datetime.datetime.now() - datetime.timedelta(days=1)
  419. startTime = datetime.datetime.now()
  420. # 自动脚本不判断,注释
  421. # if len(sys.argv) >= 2:
  422. # dealer_id_list = [str(sys.argv[1])]
  423. # else:
  424. # dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d"))
  425. dealer_id_list = OfflineReportDealers.get_rpt_dealIds(reportDate.strftime("%Y-%m-%d"))
  426. dealerDict = {}
  427. parterDict = {}
  428. doneDealers = []
  429. dealerId = ''
  430. for dealerId in dealer_id_list:
  431. logger.info('fetch group map for dealer<id={}>'.format(dealerId))
  432. get_owner_or_parter(dealerDict, doneDealers, parterDict, dealerId)
  433. for dealerId, groupIds in dealerDict.iteritems():
  434. print 'owner. {} = {}'.format(dealerId, groupIds)
  435. for dealerId, groupIds in parterDict.iteritems():
  436. print 'parter. {} = {}'.format(dealerId, groupIds)
  437. logger.info('fetch device map for dealer<id={}>'.format(dealerId))
  438. devDict = {dealerId: Device.get_devNos_by_group(groupIds) for dealerId, groupIds in dealerDict.items()}
  439. devNos = []
  440. for items in devDict.values():
  441. devNos.extend(items)
  442. dev_in_db = {}
  443. for item in DevReport.get_collection().find(
  444. {'devNo': {'$in': devNos}, 'type': 'day', 'date': reportDate.strftime("%Y-%m-%d")},
  445. {'devNo': 1, 'rpt': 1}):
  446. if 'rpt' in item and 'lineCoins' in item['rpt']:
  447. dev_in_db[item['devNo']] = item['rpt']['lineCoins']
  448. # for devNo, coins in dev_in_db.iteritems():
  449. # print '{} has coins {} in db.'.format(devNo, coins)
  450. groupCoinDict, dealerCoinDict = check_offline_coins(dev_in_db, dealerDict, parterDict,
  451. reportDate.strftime("%Y-%m-%d"), update = False)
  452. for groupKey, coins in groupCoinDict.iteritems():
  453. logger.info('group {} has coins {}'.format(groupid_from_key(groupKey), coins))
  454. for dealerKey, coins in dealerCoinDict.iteritems():
  455. logger.info('dealer {} has coins {}'.format(dealerid_from_key(dealerKey), coins))
  456. report_into_db(groupCoinDict, dealerCoinDict, reportDate.strftime("%Y-%m-%d"))
  457. logger.debug('all steps cost {}'.format(datetime.datetime.now() - startTime))
  458. def report_to_zhejiang_fight():
  459. from apps.web.api.zhejiang.models import ZheJiangFireFight
  460. from apps.web.api.zhejiang.utils import ZheJiangNorther
  461. fights = list(ZheJiangFireFight.objects.all())
  462. for _fight in fights:
  463. ZheJiangNorther(_fight).report()