monitor.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import getopt
  5. import logging
  6. import sys
  7. import time
  8. from typing import TYPE_CHECKING
  9. try:
  10. options, args = getopt.getopt(sys.argv[1:], 'l:q:e:p:d:m:b',
  11. ['queue=', 'env=', 'port=', 'debug='])
  12. except getopt.GetoptError as e:
  13. print(str(e))
  14. sys.exit()
  15. platform_env = 'testing'
  16. queue = False
  17. port = None
  18. debug = ''
  19. for name, value in options:
  20. if name in ('-e', '--env'):
  21. platform_env = value
  22. if name in ('-q', '--queue'):
  23. queue = value
  24. if name in ('-p', '--port'):
  25. port = int(value)
  26. if name in ('-d', '--debug'):
  27. debug = value
  28. if not port:
  29. print 'no port parameter.'
  30. sys.exit(1)
  31. import os
  32. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env))
  33. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  34. sys.path.insert(0, PROJECT_ROOT)
  35. from script.base import init_env
  36. init_env(interactive = False)
  37. import simplejson as json
  38. from django.conf import settings
  39. from apps.web.core.sysparas import SysParas
  40. from script.eventer.mqtt_context import MqttContext
  41. from apps.web.device.timescale import OfflineManager, FluentedEngine
  42. from apps.web.device.models import Device, DeviceCmdStatics
  43. from apps.web.constant import Const
  44. if TYPE_CHECKING:
  45. from apps.web.device.models import DeviceDict
  46. logger = logging.getLogger(__name__)
  47. host = SysParas.get_private_ip(settings.MQTT_HOSTNAME)
  48. def worker(cmd, dev, payload):
  49. # type: (str, DeviceDict, str)->None
  50. if cmd == 'log':
  51. logger.debug('log message[{}]: {}'.format(dev.devNo, payload))
  52. else:
  53. try:
  54. device_event = json.loads(bytes.decode(payload))
  55. except ValueError:
  56. logger.error(
  57. "can not loads message payload, cmd = <{}>, dev= <{}>, payload = <{}>".format(cmd, dev.devNo, payload))
  58. return
  59. if dev.devNo != device_event.get('IMEI', None):
  60. logger.warning('invalid msg for not same devNo({} != {})'.format(dev.devNo, device_event.get('IMEI')))
  61. return
  62. if cmd != str(device_event.get('cmd', '')):
  63. logger.warning('invalid msg for not same cmd({} != {})'.format(cmd, device_event.get('cmd', None)))
  64. return
  65. if cmd == '208':
  66. FluentedEngine().in_signal_udp(devNo = dev.devNo, ts = int(time.time()), signal = 0, cmd = cmd)
  67. # 准备发起设备离线通知
  68. # OfflineManager.registe_device_offline_notify(devNo = dev.devNo)
  69. # 上海城运消息推送
  70. try:
  71. if dev.is_registered:
  72. from apps.web.south_intf.shanghai_urban_data_collection_platform import \
  73. ShangHaiUrbanDataCollectionPlatformModel, ShangHaiUrbanDataCollectionPlatform
  74. dealer_info = ShangHaiUrbanDataCollectionPlatformModel.get_dealer_info(dev.ownerId)
  75. if dealer_info:
  76. ShangHaiUrbanDataCollectionPlatform(dev, **dealer_info).celery_push_heatbeat()
  77. except:
  78. pass
  79. if dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100242" or \
  80. dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100250":
  81. from apps.web.south_intf.platform import handle_and_notify_event_to_north_cy4_new
  82. dataDict = {
  83. "deviceCode":dev.logicalCode,
  84. "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  85. "notifyType": "offline"
  86. }
  87. handle_and_notify_event_to_north_cy4_new(dev, dataDict)
  88. elif cmd in ['207', '200']:
  89. try:
  90. signal = int(device_event.get('signal', 10))
  91. except Exception:
  92. return
  93. FluentedEngine().in_signal_udp(devNo = dev.devNo, ts = int(time.time()), signal = signal, cmd = cmd)
  94. # 设备离线通知的取消
  95. # OfflineManager.discard_device_offline_notify(devNo = dev.devNo)
  96. if cmd == '200': # 此处记录下sim卡的激活时间,不想放到设备管理中做,放这里做方便修改
  97. logger.debug('dev<{}> 200 message.'.format(dev.devNo))
  98. # if (not dev.has_key('ownerId')) or (not dev['ownerId']):
  99. # try:
  100. # devObj = dev.my_obj
  101. # if not devObj.simActiveFirstTime:
  102. # devObj.simActiveFirstTime = datetime.datetime.now()
  103. # if device_event.has_key('driverCode'):
  104. # devObj.driverCode = device_event.get('driverCode')
  105. # devObj.save()
  106. # except Exception, e:
  107. # pass
  108. DeviceCmdStatics.record(dev.devNo, '200')
  109. if dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100242" or\
  110. dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100250":
  111. from apps.web.south_intf.platform import handle_and_notify_event_to_north_cy4_new
  112. dataDict = {
  113. "signal": signal,
  114. "deviceCode": dev.logicalCode,
  115. "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  116. "notifyType": "online"}
  117. handle_and_notify_event_to_north_cy4_new(dev,dataDict)
  118. elif cmd == '207':
  119. if dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100242" or\
  120. dev.is_registered and dev.owner.supports("cy4EventApi") and dev.driverCode == "100250":
  121. from apps.web.south_intf.platform import handle_and_notify_event_to_north_cy4_new
  122. dataDict = {
  123. "signal": signal,
  124. "deviceCode": dev.logicalCode,
  125. "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  126. "notifyType": "heartbeat"
  127. }
  128. handle_and_notify_event_to_north_cy4_new(dev,dataDict)
  129. # 上海城运消息推送
  130. try:
  131. if dev.is_registered:
  132. from apps.web.south_intf.shanghai_urban_data_collection_platform import \
  133. ShangHaiUrbanDataCollectionPlatformModel, ShangHaiUrbanDataCollectionPlatform
  134. dealer_info = ShangHaiUrbanDataCollectionPlatformModel.get_dealer_info(dev.ownerId)
  135. if dealer_info:
  136. ShangHaiUrbanDataCollectionPlatform(dev, **dealer_info).celery_push_heatbeat()
  137. except:
  138. pass
  139. else:
  140. pass
  141. with MqttContext(host = host,
  142. port = port,
  143. user = settings.MQTT_USER,
  144. password = settings.MQTT_PSWD,
  145. worker = worker,
  146. platform_env = platform_env,
  147. queue = queue,
  148. debug = debug,
  149. cmds = {
  150. '207': {
  151. 'qos': Const.MQTT_QOS,
  152. 'checkRegistered': False
  153. },
  154. '200': {
  155. 'qos': Const.MQTT_QOS,
  156. 'checkRegistered': False
  157. },
  158. '208': {
  159. 'qos': Const.MQTT_QOS,
  160. 'checkRegistered': False
  161. },
  162. 'log': {
  163. 'qos': Const.MQTT_QOS,
  164. 'checkRegistered': False
  165. }
  166. },
  167. logger = logger,
  168. name = 'monitor',
  169. checkRegistered = False) as context:
  170. context.start()
  171. logger.debug('exit now.')