# -*- coding: utf-8 -*- #!/usr/bin/env python import os import sys import json import time import logging import datetime from collections import namedtuple import daiquiri import django from gevent.pool import Pool #: current_dir - 2 PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..') sys.path.insert(0, PROJECT_ROOT) from common import * from apps.web.core.mqtt_client import MqttClient from apps.web.device.utils import device_online_cache_key django.setup() from django.core.cache import caches devmgr_cache = caches['devmgr'] daiquiri.setup( level=logging.DEBUG, outputs=( daiquiri.output.Stream(sys.stdout), daiquiri.output.File('device_manager-errors.log', level=logging.ERROR), daiquiri.output.TimedRotatingFile('device_manager-everything.log', level=logging.DEBUG, interval=datetime.timedelta(days=1)) ) ) logger = logging.getLogger(__name__) # We'll use exceptions to notify the connection-handling loop of problems. class CommandError(Exception): pass class Disconnect(Exception): pass Error = namedtuple('Error', ('message',)) class ProtocolHandler(BaseProtocolHandler): def handle_200(self, payload): # type: (dict)->None pass def handle_201(self, payload): # type: (dict)->None logger.debug('Got 201 %s', payload) def handle_202(self, payload): # type: (dict)->None pass def handle_203(self, payload): # type: (dict)->None pass def handle_204(self, payload): # type: (dict)->None pass def handle_205(self, payload): # type: (dict)->None pass def handle_206(self, payload): # type: (dict)->None pass def handle_207(self, payload): # type: (dict)->None #: maintain device's cache online status imei = payload['IMEI'] info = { 'online': DEV_ONLINE, 'status': payload['status'], 'signal': payload['signal'], 'updateTime': time.time() * 1000 } devmgr_cache.set(device_online_cache_key(imei), json.dumps(info)) def handle_208(self, request): # type: (dict)->None pass #: callbacks def on_connect(client, userdata, flags, rc): # type: (MqttClient, dict, int, dict)->None logger.info('connected to (%s, %d)' % (MQTT_HOSTNAME, MQTT_PORT)) logger.info('client: {}'.format(client)) logger.info('userdata: {}'.format(userdata)) logger.info('flags: {}'.format(flags)) logger.info('rc: {}'.format(rc)) def on_message(client, userdata, message): message = json.loads(bytes.decode(message.payload)) logger.info('client: {}'.format(client)) logger.info('userdata: {}'.format(userdata)) logger.info('message: {}'.format(message)) handler = ProtocolHandler() handler.handle(message) class Server(object): def __init__(self, max_clients=64): self._pool = Pool(max_clients) self._kv = {} self._client = MqttClient(client_id='device_manager') # type: MqttClient def run(self): self._client.on_connect = on_connect self._client.on_message = on_message self._client.connect(MQTT_HOSTNAME, MQTT_PORT) for topic in SERVER_TOPICS: self._client.subscribe(topic=topic, qos=MQTT_QOS) self._client.loop_forever() if __name__ == '__main__': server = Server() server.run()