new_listener.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import getopt
  4. import logging
  5. import sys
  6. from typing import TYPE_CHECKING
  7. try:
  8. options, args = getopt.getopt(sys.argv[1:], 'l:q:e:p:d:m:b',
  9. ['queue=', 'env=', 'port=', 'debug='])
  10. except getopt.GetoptError as e:
  11. print(str(e))
  12. sys.exit()
  13. platform_env = 'testing'
  14. queue = False
  15. port = None
  16. debug = ''
  17. for name, value in options:
  18. if name in ('-e', '--env'):
  19. platform_env = value
  20. if name in ('-q', '--queue'):
  21. queue = value
  22. if name in ('-p', '--port'):
  23. port = int(value)
  24. if name in ('-d', '--debug'):
  25. debug = value
  26. if not port:
  27. print 'no port parameter.'
  28. sys.exit(1)
  29. import os
  30. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env))
  31. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  32. sys.path.insert(0, PROJECT_ROOT)
  33. from script.base import init_env
  34. init_env(interactive = False)
  35. from django.conf import settings
  36. from apps.web.core.sysparas import SysParas
  37. from script.eventer.handlers import UnknowCmdHandler
  38. from script.eventer.handlers.shake_hand import ShakeHandHandler
  39. from script.eventer.handlers.offline_coins import OfflineCoinsHandler
  40. from script.eventer.mqtt_context import MqttContext
  41. from apps.web.constant import Const
  42. from apps.web.device.models import DeviceDict
  43. if TYPE_CHECKING:
  44. pass
  45. logger = logging.getLogger(__name__)
  46. host = SysParas.get_private_ip(settings.MQTT_HOSTNAME)
  47. def worker(cmd, dev, payload):
  48. # type: (str, DeviceDict, str)->None
  49. try:
  50. if cmd == '507':
  51. ShakeHandHandler(cmd, dev, payload).do()
  52. # 添加特性用来过滤大量507, 推送端口的时候需要注意配置特性
  53. if dev.is_registered and dev.owner.supports("supportDelixi"):
  54. from apps.web.south_intf.delixi import DelixiNorther
  55. DelixiNorther.send_port_status(dev)
  56. # 上海城运消息推送
  57. try:
  58. if dev.is_registered:
  59. from apps.web.south_intf.shanghai_urban_data_collection_platform import \
  60. ShangHaiUrbanDataCollectionPlatformModel, ShangHaiUrbanDataCollectionPlatform
  61. dealer_info = ShangHaiUrbanDataCollectionPlatformModel.get_dealer_info(dev.ownerId)
  62. if dealer_info:
  63. ShangHaiUrbanDataCollectionPlatform(dev, **dealer_info).celery_push_heatbeat()
  64. except:
  65. pass
  66. elif cmd == '505' or cmd == '205':
  67. OfflineCoinsHandler(cmd, dev, payload).do()
  68. else:
  69. UnknowCmdHandler(cmd, dev, payload).do()
  70. except Exception as e:
  71. logger.exception(e)
  72. with MqttContext(host = host,
  73. port = port,
  74. user = settings.MQTT_USER,
  75. password = settings.MQTT_PSWD,
  76. worker = worker,
  77. platform_env = platform_env,
  78. queue = queue,
  79. debug = debug,
  80. cmds = {
  81. '205': {
  82. 'qos': Const.MQTT_QOS,
  83. 'checkRegistered': True
  84. },
  85. '505': {
  86. 'qos': Const.MQTT_QOS,
  87. 'checkRegistered': True
  88. },
  89. '507': {
  90. 'qos': Const.MQTT_QOS,
  91. 'checkRegistered': False
  92. }
  93. },
  94. logger = logger,
  95. name = 'new_listener') as context:
  96. context.start()
  97. logger.debug('exit now.')