monitor_co.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. from gevent import monkey
  4. monkey.patch_all()
  5. import datetime
  6. import getopt
  7. import logging
  8. import sys
  9. import time
  10. from typing import TYPE_CHECKING
  11. try:
  12. options, args = getopt.getopt(sys.argv[1:], 'l:q:e:p:d:m:b',
  13. ['queue=', 'env=', 'port=', 'debug='])
  14. except getopt.GetoptError as e:
  15. print(str(e))
  16. sys.exit()
  17. platform_env = 'testing'
  18. queue = False
  19. port = None
  20. debug = ''
  21. for name, value in options:
  22. if name in ('-e', '--env'):
  23. platform_env = value
  24. if name in ('-q', '--queue'):
  25. queue = value
  26. if name in ('-p', '--port'):
  27. port = int(value)
  28. if name in ('-d', '--debug'):
  29. debug = value
  30. if not port:
  31. print 'no port parameter.'
  32. sys.exit(1)
  33. import os
  34. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env))
  35. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  36. sys.path.insert(0, PROJECT_ROOT)
  37. from script.base import init_env
  38. init_env(interactive = False)
  39. import simplejson as json
  40. from django.conf import settings
  41. from apps.web.core.sysparas import SysParas
  42. from script.eventer.mqtt_context_co import MqttContext, message_received
  43. from apps.web.device.timescale import OfflineManager, FluentedEngine
  44. from apps.web.device.models import Device, DeviceCmdStatics
  45. from apps.web.constant import Const
  46. if TYPE_CHECKING:
  47. from apps.web.device.models import DeviceDict
  48. logger = logging.getLogger(__name__)
  49. host = SysParas.get_private_ip(settings.MQTT_HOSTNAME)
  50. @message_received.connect_via('/message')
  51. def worker(sender, cmd, dev, payload):
  52. # type: (object, str, DeviceDict, str)->None
  53. if cmd == 'log':
  54. logger.debug('log message[{}]: {}'.format(dev.devNo, payload))
  55. else:
  56. try:
  57. device_event = json.loads(bytes.decode(payload))
  58. except ValueError:
  59. logger.error(
  60. "can not loads message payload, cmd = <{}>, dev= <{}>, payload = <{}>".format(cmd, dev.devNo, payload))
  61. return
  62. if dev.devNo != device_event.get('IMEI', None):
  63. logger.warning('invalid msg for not same devNo({} != {})'.format(dev.devNo, device_event.get('IMEI')))
  64. return
  65. if cmd != str(device_event.get('cmd', '')):
  66. logger.warning('invalid msg for not same cmd({} != {})'.format(cmd, device_event.get('cmd', None)))
  67. return
  68. if cmd == '208':
  69. FluentedEngine().in_signal_udp(devNo = dev.devNo, ts = int(time.time()), signal = 0, cmd = cmd)
  70. # 准备发起设备离线通知
  71. OfflineManager.registe_device_offline_notify(devNo = dev.devNo)
  72. elif cmd in ['207', '200']:
  73. try:
  74. signal = int(device_event.get('signal', 10))
  75. except Exception:
  76. return
  77. FluentedEngine().in_signal_udp(devNo = dev.devNo, ts = int(time.time()), signal = signal, cmd = cmd)
  78. # 设备离线通知的取消
  79. OfflineManager.discard_device_offline_notify(devNo = dev.devNo)
  80. if cmd == '200': # 此处记录下sim卡的激活时间,不想放到设备管理中做,放这里做方便修改
  81. logger.debug('dev<{}> 200 message.'.format(dev.devNo))
  82. if (not dev.has_key('ownerId')) or (not dev['ownerId']):
  83. try:
  84. devObj = dev.my_obj
  85. if not devObj.simActiveFirstTime:
  86. devObj.simActiveFirstTime = datetime.datetime.now()
  87. devObj.save()
  88. except Exception, e:
  89. pass
  90. DeviceCmdStatics.record(dev.devNo, '200')
  91. else:
  92. pass
  93. with MqttContext(host = host,
  94. port = port,
  95. user = settings.MQTT_USER,
  96. password = settings.MQTT_PSWD,
  97. platform_env = platform_env,
  98. queue = queue,
  99. debug = debug,
  100. cmds = {
  101. '207': {
  102. 'qos': Const.MQTT_QOS,
  103. 'checkRegistered': False
  104. },
  105. '200': {
  106. 'qos': Const.MQTT_QOS,
  107. 'checkRegistered': False
  108. },
  109. '208': {
  110. 'qos': Const.MQTT_QOS,
  111. 'checkRegistered': False
  112. },
  113. 'log': {
  114. 'qos': Const.MQTT_QOS,
  115. 'checkRegistered': False
  116. }
  117. },
  118. logger = logger,
  119. name = 'monitor',
  120. checkRegistered = False) as context:
  121. context.start()
  122. logger.debug('exit now.')