broker.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. from __future__ import absolute_import
  2. import sys
  3. import logging
  4. import numbers
  5. try:
  6. from urllib.parse import urlparse, urljoin, quote, unquote
  7. except ImportError:
  8. from urlparse import urlparse, urljoin
  9. from urllib import quote, unquote
  10. try:
  11. import requests
  12. logging.getLogger("requests").setLevel(logging.WARNING)
  13. except ImportError:
  14. requests = None
  15. try:
  16. import redis
  17. except ImportError:
  18. redis = None
  19. class BrokerBase(object):
  20. def __init__(self, broker_url, *args, **kwargs):
  21. purl = urlparse(broker_url)
  22. self.host = purl.hostname
  23. self.port = purl.port
  24. self.vhost = purl.path[1:]
  25. username = purl.username
  26. password = purl.password
  27. self.username = unquote(username) if username else username
  28. self.password = unquote(password) if password else password
  29. def queues(self, names):
  30. raise NotImplementedError
  31. class RabbitMQ(BrokerBase):
  32. def __init__(self, broker_url, broker_api_url):
  33. super(RabbitMQ, self).__init__(broker_url)
  34. self.host = self.host or 'localhost'
  35. self.port = int(self.port or 5672)
  36. self.vhost = quote(self.vhost, '') or '/'
  37. self.username = self.username or 'guest'
  38. self.password = self.password or 'guest'
  39. self._broker_api_url = broker_api_url
  40. if not requests:
  41. raise ImportError("'python-requests' library is required")
  42. def queues(self, names):
  43. if not self._broker_api_url.endswith('/'):
  44. self._broker_api_url += '/'
  45. url = urljoin(self._broker_api_url, 'queues/' + self.vhost)
  46. api_url = urlparse(self._broker_api_url)
  47. username = unquote(api_url.username or '') or self.username
  48. password = unquote(api_url.password or '') or self.password
  49. auth = requests.auth.HTTPBasicAuth(username, password)
  50. r = requests.get(url, auth=auth)
  51. if r.status_code == 200:
  52. try:
  53. info = r.json()
  54. except TypeError:
  55. info = r.json
  56. return [x for x in info if x['name'] in names]
  57. else:
  58. r.raise_for_status()
  59. class Redis(BrokerBase):
  60. def __init__(self, broker_url, *args, **kwargs):
  61. super(Redis, self).__init__(broker_url)
  62. self.host = self.host or 'localhost'
  63. self.port = self.port or 6379
  64. self.vhost = self._prepare_virtual_host(self.vhost)
  65. if not redis:
  66. raise ImportError('redis library is required')
  67. self._redis = redis.Redis(host=self.host, port=self.port,
  68. db=self.vhost, password=self.password)
  69. def queues(self, names):
  70. return [dict(name=x, messages=self._redis.llen(x)) for x in names]
  71. def _prepare_virtual_host(self, vhost):
  72. if not isinstance(vhost, numbers.Integral):
  73. if not vhost or vhost == '/':
  74. vhost = 0
  75. elif vhost.startswith('/'):
  76. vhost = vhost[1:]
  77. try:
  78. vhost = int(vhost)
  79. except ValueError:
  80. raise ValueError(
  81. 'Database is int between 0 and limit - 1, not {0}'.format(
  82. vhost,
  83. ))
  84. return vhost
  85. class Broker(object):
  86. def __new__(cls, broker_url, *args, **kwargs):
  87. scheme = urlparse(broker_url).scheme
  88. if scheme == 'amqp':
  89. return RabbitMQ(broker_url, *args, **kwargs)
  90. elif scheme == 'redis':
  91. return Redis(broker_url, *args, **kwargs)
  92. else:
  93. raise NotImplementedError
  94. if __name__ == "__main__":
  95. broker_url = sys.argv[1] if len(sys.argv) > 1 else 'amqp://'
  96. queue_name = sys.argv[2] if len(sys.argv) > 2 else 'celery'
  97. if len(sys.argv) > 3:
  98. broker_api_url = sys.argv[3]
  99. else:
  100. broker_api_url = 'http://guest:guest@localhost:55672/api/'
  101. broker = Broker(broker_url, broker_api_url=broker_api_url)
  102. print(broker.queues([queue_name]))