1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- import os
- from dotenv import load_dotenv
- from parse import parse
- load_dotenv('.env.dispatch')
- from apps.web.core.exceptions import FormatError
- QUERY_PREFIX = 'query'
- RESULT_PREFIX = 'result'
- HANDLER_MQTT_HOST = os.environ.get('MQTT_HOSTNAME', 'test.mosquitto.org')
- HANDLER_MQTT_PORT = int(os.environ.get('MQTT_PORT', 8080))
- TOPIC_FMT = '/{type}/{prefix}/{cmdNo}/{devNo}'
- def get_result_topic(prefix, cmdNo, devNo):
- return TOPIC_FMT.format(type=RESULT_PREFIX, prefix=prefix, cmdNo=cmdNo, devNo=devNo)
- def get_query_topic(prefix, cmdNo, devNo):
- return TOPIC_FMT.format(type=QUERY_PREFIX, prefix=prefix, cmdNo=cmdNo, devNo=devNo)
- ALL_QUERY_TOPICS = '/{prefix}/#'.format(prefix=QUERY_PREFIX)
- def parse_topic(string, format_=TOPIC_FMT):
- return parse(format=format_, string=string).named
- class BrokerUrl(object):
- def __init__(self, url):
- self.url = url
- try:
- self.host, self.port = url.split(':')
- self.port = int(self.port)
- except (ValueError, AttributeError):
- raise FormatError('url has to be a string and format has to be host:port')
|