new_listener.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import getopt
  4. import logging
  5. import sys
  6. from typing import TYPE_CHECKING
  7. try:
  8. options, args = getopt.getopt(sys.argv[1:], 'l:q:e:p:d:m:b',
  9. ['queue=', 'env=', 'port=', 'debug='])
  10. except getopt.GetoptError as e:
  11. print(str(e))
  12. sys.exit()
  13. platform_env = 'testing'
  14. queue = False
  15. port = None
  16. debug = ''
  17. for name, value in options:
  18. if name in ('-e', '--env'):
  19. platform_env = value
  20. if name in ('-q', '--queue'):
  21. queue = value
  22. if name in ('-p', '--port'):
  23. port = int(value)
  24. if name in ('-d', '--debug'):
  25. debug = value
  26. if not port:
  27. print 'no port parameter.'
  28. sys.exit(1)
  29. import os
  30. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.{env}'.format(env = platform_env))
  31. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  32. sys.path.insert(0, PROJECT_ROOT)
  33. from script.base import init_env
  34. init_env(interactive = False)
  35. from django.conf import settings
  36. from apps.web.core.sysparas import SysParas
  37. from script.eventer.handlers import UnknowCmdHandler
  38. from script.eventer.handlers.shake_hand import ShakeHandHandler
  39. from script.eventer.handlers.offline_coins import OfflineCoinsHandler
  40. from script.eventer.mqtt_context import MqttContext
  41. from apps.web.constant import Const
  42. from apps.web.device.models import DeviceDict
  43. if TYPE_CHECKING:
  44. pass
  45. logger = logging.getLogger(__name__)
  46. host = SysParas.get_private_ip(settings.MQTT_HOSTNAME)
  47. def worker(cmd, dev, payload):
  48. # type: (str, DeviceDict, str)->None
  49. try:
  50. if cmd == '507':
  51. ShakeHandHandler(cmd, dev, payload).do()
  52. # 添加特性用来过滤大量507, 推送端口的时候需要注意配置特性
  53. if dev.is_registered and dev.owner.supports("supportDelixi"):
  54. from apps.web.south_intf.delixi import DelixiNorther
  55. DelixiNorther.send_port_status(dev)
  56. if dev.is_registered and dev.owner.supports("supportFengTu"):
  57. from apps.web.api.ft_north.utils import devHeartbeatReport
  58. devHeartbeatReport(dev.devNo)
  59. if dev.is_registered and dev.owner.supports("supportBeiJingFengTai"):
  60. from apps.web.south_intf.bj_north.api import post_charging_meta_info_list
  61. post_charging_meta_info_list(dev.devNo)
  62. # 上海城运消息推送
  63. try:
  64. if dev.is_registered:
  65. from apps.web.south_intf.shanghai_urban_data_collection_platform import \
  66. ShangHaiUrbanDataCollectionPlatformModel, ShangHaiUrbanDataCollectionPlatform
  67. dealer_info = ShangHaiUrbanDataCollectionPlatformModel.get_dealer_info(dev.ownerId)
  68. if dealer_info:
  69. ShangHaiUrbanDataCollectionPlatform(dev, **dealer_info).celery_push_heatbeat()
  70. except:
  71. pass
  72. elif cmd == '505' or cmd == '205':
  73. OfflineCoinsHandler(cmd, dev, payload).do()
  74. else:
  75. UnknowCmdHandler(cmd, dev, payload).do()
  76. except Exception as e:
  77. logger.exception(e)
  78. with MqttContext(host = host,
  79. port = port,
  80. user = settings.MQTT_USER,
  81. password = settings.MQTT_PSWD,
  82. worker = worker,
  83. platform_env = platform_env,
  84. queue = queue,
  85. debug = debug,
  86. cmds = {
  87. '205': {
  88. 'qos': Const.MQTT_QOS,
  89. 'checkRegistered': True
  90. },
  91. '505': {
  92. 'qos': Const.MQTT_QOS,
  93. 'checkRegistered': True
  94. },
  95. '507': {
  96. 'qos': Const.MQTT_QOS,
  97. 'checkRegistered': False
  98. }
  99. },
  100. logger = logger,
  101. name = 'new_listener') as context:
  102. context.start()
  103. logger.debug('exit now.')