123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import datetime
- import getopt
- import logging
- import sys
- import time
- from typing import TYPE_CHECKING
- try:
- options, args = getopt.getopt(sys.argv[1:], 'l:q:e:p:d:m:b',
- ['queue=', 'env=', 'port=', 'debug='])
- except getopt.GetoptError as e:
- print(str(e))
- sys.exit()
- platform_env = 'testing'
- queue = False
- port = None
- debug = ''
- for name, value in options:
- if name in ('-e', '--env'):
- platform_env = value
- if name in ('-q', '--queue'):
- queue = value
- if name in ('-p', '--port'):
- port = int(value)
- if name in ('-d', '--debug'):
- debug = value
- if not port:
- print 'no port parameter.'
- sys.exit(1)
- import os
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env))
- PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
- sys.path.insert(0, PROJECT_ROOT)
- from script.base import init_env
- init_env(interactive = False)
- import simplejson as json
- from django.conf import settings
- from apps.web.core.sysparas import SysParas
- from script.eventer.mqtt_context import MqttContext
- from apps.web.device.timescale import OfflineManager, FluentedEngine
- from apps.web.device.models import Device, DeviceCmdStatics
- from apps.web.constant import Const
- if TYPE_CHECKING:
- from apps.web.device.models import DeviceDict
- logger = logging.getLogger(__name__)
- host = SysParas.get_private_ip(settings.MQTT_HOSTNAME)
- def worker(cmd, dev, payload):
- # type: (str, DeviceDict, str)->None
- if cmd == 'log':
- logger.debug('log message[{}]: {}'.format(dev.devNo, payload))
- else:
- try:
- device_event = json.loads(bytes.decode(payload))
- except ValueError:
- logger.error(
- "can not loads message payload, cmd = <{}>, dev= <{}>, payload = <{}>".format(cmd, dev.devNo, payload))
- return
- if dev.devNo != device_event.get('IMEI', None):
- logger.warning('invalid msg for not same devNo({} != {})'.format(dev.devNo, device_event.get('IMEI')))
- return
- if cmd != str(device_event.get('cmd', '')):
- logger.warning('invalid msg for not same cmd({} != {})'.format(cmd, device_event.get('cmd', None)))
- return
- if cmd == '208':
- FluentedEngine().in_signal_udp(devNo = dev.devNo, ts = int(time.time()), signal = 0, cmd = cmd)
- # 准备发起设备离线通知
- # OfflineManager.registe_device_offline_notify(devNo = dev.devNo)
- # 上海城运消息推送
- try:
- if dev.is_registered:
- from apps.web.south_intf.shanghai_urban_data_collection_platform import \
- ShangHaiUrbanDataCollectionPlatformModel, ShangHaiUrbanDataCollectionPlatform
- dealer_info = ShangHaiUrbanDataCollectionPlatformModel.get_dealer_info(dev.ownerId)
- if dealer_info:
- ShangHaiUrbanDataCollectionPlatform(dev, **dealer_info).celery_push_heatbeat()
- except:
- pass
- if dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100242" or \
- dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100250":
- from apps.web.south_intf.platform import handle_and_notify_event_to_north_cy4_new
- dataDict = {
- "deviceCode":dev.logicalCode,
- "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "notifyType": "offline"
- }
- handle_and_notify_event_to_north_cy4_new(dev, dataDict)
- elif cmd in ['207', '200']:
- try:
- signal = int(device_event.get('signal', 10))
- except Exception:
- return
- FluentedEngine().in_signal_udp(devNo = dev.devNo, ts = int(time.time()), signal = signal, cmd = cmd)
- # 设备离线通知的取消
- # OfflineManager.discard_device_offline_notify(devNo = dev.devNo)
- if cmd == '200': # 此处记录下sim卡的激活时间,不想放到设备管理中做,放这里做方便修改
- logger.debug('dev<{}> 200 message.'.format(dev.devNo))
- # if (not dev.has_key('ownerId')) or (not dev['ownerId']):
- # try:
- # devObj = dev.my_obj
- # if not devObj.simActiveFirstTime:
- # devObj.simActiveFirstTime = datetime.datetime.now()
- # if device_event.has_key('driverCode'):
- # devObj.driverCode = device_event.get('driverCode')
- # devObj.save()
- # except Exception, e:
- # pass
- DeviceCmdStatics.record(dev.devNo, '200')
- if dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100242" or\
- dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100250":
- from apps.web.south_intf.platform import handle_and_notify_event_to_north_cy4_new
- dataDict = {
- "signal": signal,
- "deviceCode": dev.logicalCode,
- "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "notifyType": "online"}
- handle_and_notify_event_to_north_cy4_new(dev,dataDict)
- elif cmd == '207':
- if dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100242" or\
- dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100250":
- from apps.web.south_intf.platform import handle_and_notify_event_to_north_cy4_new
- dataDict = {
- "signal": signal,
- "deviceCode": dev.logicalCode,
- "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "notifyType": "heartbeat"
- }
- handle_and_notify_event_to_north_cy4_new(dev,dataDict)
- # 上海城运消息推送
- try:
- if dev.is_registered:
- from apps.web.south_intf.shanghai_urban_data_collection_platform import \
- ShangHaiUrbanDataCollectionPlatformModel, ShangHaiUrbanDataCollectionPlatform
- dealer_info = ShangHaiUrbanDataCollectionPlatformModel.get_dealer_info(dev.ownerId)
- if dealer_info:
- ShangHaiUrbanDataCollectionPlatform(dev, **dealer_info).celery_push_heatbeat()
- except:
- pass
- else:
- pass
- with MqttContext(host = host,
- port = port,
- user = settings.MQTT_USER,
- password = settings.MQTT_PSWD,
- worker = worker,
- platform_env = platform_env,
- queue = queue,
- debug = debug,
- cmds = {
- '207': {
- 'qos': Const.MQTT_QOS,
- 'checkRegistered': False
- },
- '200': {
- 'qos': Const.MQTT_QOS,
- 'checkRegistered': False
- },
- '208': {
- 'qos': Const.MQTT_QOS,
- 'checkRegistered': False
- },
- 'log': {
- 'qos': Const.MQTT_QOS,
- 'checkRegistered': False
- }
- },
- logger = logger,
- name = 'monitor',
- checkRegistered = False) as context:
- context.start()
- logger.debug('exit now.')
|