device_manager.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. import os
  4. import sys
  5. import json
  6. import time
  7. import logging
  8. import datetime
  9. from collections import namedtuple
  10. import daiquiri
  11. import django
  12. from gevent.pool import Pool
  13. #: current_dir - 2
  14. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  15. sys.path.insert(0, PROJECT_ROOT)
  16. from common import *
  17. from apps.web.core.mqtt_client import MqttClient
  18. from apps.web.device.utils import device_online_cache_key
  19. django.setup()
  20. from django.core.cache import caches
  21. devmgr_cache = caches['devmgr']
  22. daiquiri.setup(
  23. level=logging.DEBUG,
  24. outputs=(
  25. daiquiri.output.Stream(sys.stdout),
  26. daiquiri.output.File('device_manager-errors.log', level=logging.ERROR),
  27. daiquiri.output.TimedRotatingFile('device_manager-everything.log',
  28. level=logging.DEBUG,
  29. interval=datetime.timedelta(days=1))
  30. )
  31. )
  32. logger = logging.getLogger(__name__)
  33. # We'll use exceptions to notify the connection-handling loop of problems.
  34. class CommandError(Exception): pass
  35. class Disconnect(Exception): pass
  36. Error = namedtuple('Error', ('message',))
  37. class ProtocolHandler(BaseProtocolHandler):
  38. def handle_200(self, payload):
  39. # type: (dict)->None
  40. pass
  41. def handle_201(self, payload):
  42. # type: (dict)->None
  43. logger.debug('Got 201 %s', payload)
  44. def handle_202(self, payload):
  45. # type: (dict)->None
  46. pass
  47. def handle_203(self, payload):
  48. # type: (dict)->None
  49. pass
  50. def handle_204(self, payload):
  51. # type: (dict)->None
  52. pass
  53. def handle_205(self, payload):
  54. # type: (dict)->None
  55. pass
  56. def handle_206(self, payload):
  57. # type: (dict)->None
  58. pass
  59. def handle_207(self, payload):
  60. # type: (dict)->None
  61. #: maintain device's cache online status
  62. imei = payload['IMEI']
  63. info = {
  64. 'online': DEV_ONLINE,
  65. 'status': payload['status'],
  66. 'signal': payload['signal'],
  67. 'updateTime': time.time() * 1000
  68. }
  69. devmgr_cache.set(device_online_cache_key(imei), json.dumps(info))
  70. def handle_208(self, request):
  71. # type: (dict)->None
  72. pass
  73. #: callbacks
  74. def on_connect(client, userdata, flags, rc):
  75. # type: (MqttClient, dict, int, dict)->None
  76. logger.info('connected to (%s, %d)' % (MQTT_HOSTNAME, MQTT_PORT))
  77. logger.info('client: {}'.format(client))
  78. logger.info('userdata: {}'.format(userdata))
  79. logger.info('flags: {}'.format(flags))
  80. logger.info('rc: {}'.format(rc))
  81. def on_message(client, userdata, message):
  82. message = json.loads(bytes.decode(message.payload))
  83. logger.info('client: {}'.format(client))
  84. logger.info('userdata: {}'.format(userdata))
  85. logger.info('message: {}'.format(message))
  86. handler = ProtocolHandler()
  87. handler.handle(message)
  88. class Server(object):
  89. def __init__(self, max_clients=64):
  90. self._pool = Pool(max_clients)
  91. self._kv = {}
  92. self._client = MqttClient(client_id='device_manager') # type: MqttClient
  93. def run(self):
  94. self._client.on_connect = on_connect
  95. self._client.on_message = on_message
  96. self._client.connect(MQTT_HOSTNAME, MQTT_PORT)
  97. for topic in SERVER_TOPICS:
  98. self._client.subscribe(topic=topic, qos=MQTT_QOS)
  99. self._client.loop_forever()
  100. if __name__ == '__main__':
  101. server = Server()
  102. server.run()