123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- from __future__ import absolute_import
- import sys
- import logging
- import numbers
- try:
- from urllib.parse import urlparse, urljoin, quote, unquote
- except ImportError:
- from urlparse import urlparse, urljoin
- from urllib import quote, unquote
- try:
- import requests
- logging.getLogger("requests").setLevel(logging.WARNING)
- except ImportError:
- requests = None
- try:
- import redis
- except ImportError:
- redis = None
- class BrokerBase(object):
- def __init__(self, broker_url, *args, **kwargs):
- purl = urlparse(broker_url)
- self.host = purl.hostname
- self.port = purl.port
- self.vhost = purl.path[1:]
- username = purl.username
- password = purl.password
- self.username = unquote(username) if username else username
- self.password = unquote(password) if password else password
- def queues(self, names):
- raise NotImplementedError
- class RabbitMQ(BrokerBase):
- def __init__(self, broker_url, broker_api_url):
- super(RabbitMQ, self).__init__(broker_url)
- self.host = self.host or 'localhost'
- self.port = int(self.port or 5672)
- self.vhost = quote(self.vhost, '') or '/'
- self.username = self.username or 'guest'
- self.password = self.password or 'guest'
- self._broker_api_url = broker_api_url
- if not requests:
- raise ImportError("'python-requests' library is required")
- def queues(self, names):
- if not self._broker_api_url.endswith('/'):
- self._broker_api_url += '/'
- url = urljoin(self._broker_api_url, 'queues/' + self.vhost)
- api_url = urlparse(self._broker_api_url)
- username = unquote(api_url.username or '') or self.username
- password = unquote(api_url.password or '') or self.password
- auth = requests.auth.HTTPBasicAuth(username, password)
- r = requests.get(url, auth=auth)
- if r.status_code == 200:
- try:
- info = r.json()
- except TypeError:
- info = r.json
- return [x for x in info if x['name'] in names]
- else:
- r.raise_for_status()
- class Redis(BrokerBase):
- def __init__(self, broker_url, *args, **kwargs):
- super(Redis, self).__init__(broker_url)
- self.host = self.host or 'localhost'
- self.port = self.port or 6379
- self.vhost = self._prepare_virtual_host(self.vhost)
- if not redis:
- raise ImportError('redis library is required')
- self._redis = redis.Redis(host=self.host, port=self.port,
- db=self.vhost, password=self.password)
- def queues(self, names):
- return [dict(name=x, messages=self._redis.llen(x)) for x in names]
- def _prepare_virtual_host(self, vhost):
- if not isinstance(vhost, numbers.Integral):
- if not vhost or vhost == '/':
- vhost = 0
- elif vhost.startswith('/'):
- vhost = vhost[1:]
- try:
- vhost = int(vhost)
- except ValueError:
- raise ValueError(
- 'Database is int between 0 and limit - 1, not {0}'.format(
- vhost,
- ))
- return vhost
- class Broker(object):
- def __new__(cls, broker_url, *args, **kwargs):
- scheme = urlparse(broker_url).scheme
- if scheme == 'amqp':
- return RabbitMQ(broker_url, *args, **kwargs)
- elif scheme == 'redis':
- return Redis(broker_url, *args, **kwargs)
- else:
- raise NotImplementedError
- if __name__ == "__main__":
- broker_url = sys.argv[1] if len(sys.argv) > 1 else 'amqp://'
- queue_name = sys.argv[2] if len(sys.argv) > 2 else 'celery'
- if len(sys.argv) > 3:
- broker_api_url = sys.argv[3]
- else:
- broker_api_url = 'http://guest:guest@localhost:55672/api/'
- broker = Broker(broker_url, broker_api_url=broker_api_url)
- print(broker.queues([queue_name]))
|