common.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. import os
  4. from dotenv import load_dotenv
  5. from parse import parse
  6. load_dotenv('.env.dispatch')
  7. from apps.web.core.exceptions import FormatError
  8. QUERY_PREFIX = 'query'
  9. RESULT_PREFIX = 'result'
  10. HANDLER_MQTT_HOST = os.environ.get('MQTT_HOSTNAME', 'test.mosquitto.org')
  11. HANDLER_MQTT_PORT = int(os.environ.get('MQTT_PORT', 8080))
  12. TOPIC_FMT = '/{type}/{prefix}/{cmdNo}/{devNo}'
  13. def get_result_topic(prefix, cmdNo, devNo):
  14. return TOPIC_FMT.format(type=RESULT_PREFIX, prefix=prefix, cmdNo=cmdNo, devNo=devNo)
  15. def get_query_topic(prefix, cmdNo, devNo):
  16. return TOPIC_FMT.format(type=QUERY_PREFIX, prefix=prefix, cmdNo=cmdNo, devNo=devNo)
  17. ALL_QUERY_TOPICS = '/{prefix}/#'.format(prefix=QUERY_PREFIX)
  18. def parse_topic(string, format_=TOPIC_FMT):
  19. return parse(format=format_, string=string).named
  20. class BrokerUrl(object):
  21. def __init__(self, url):
  22. self.url = url
  23. try:
  24. self.host, self.port = url.split(':')
  25. self.port = int(self.port)
  26. except (ValueError, AttributeError):
  27. raise FormatError('url has to be a string and format has to be host:port')