# -*- coding: utf-8 -*- #!/usr/bin/env python import os from dotenv import load_dotenv from parse import parse load_dotenv('.env.dispatch') from apps.web.core.exceptions import FormatError QUERY_PREFIX = 'query' RESULT_PREFIX = 'result' HANDLER_MQTT_HOST = os.environ.get('MQTT_HOSTNAME', 'test.mosquitto.org') HANDLER_MQTT_PORT = int(os.environ.get('MQTT_PORT', 8080)) TOPIC_FMT = '/{type}/{prefix}/{cmdNo}/{devNo}' def get_result_topic(prefix, cmdNo, devNo): return TOPIC_FMT.format(type=RESULT_PREFIX, prefix=prefix, cmdNo=cmdNo, devNo=devNo) def get_query_topic(prefix, cmdNo, devNo): return TOPIC_FMT.format(type=QUERY_PREFIX, prefix=prefix, cmdNo=cmdNo, devNo=devNo) ALL_QUERY_TOPICS = '/{prefix}/#'.format(prefix=QUERY_PREFIX) def parse_topic(string, format_=TOPIC_FMT): return parse(format=format_, string=string).named class BrokerUrl(object): def __init__(self, url): self.url = url try: self.host, self.port = url.split(':') self.port = int(self.port) except (ValueError, AttributeError): raise FormatError('url has to be a string and format has to be host:port')