123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- # -*- coding: utf-8 -*-
- """
- celery.backends.redis
- ~~~~~~~~~~~~~~~~~~~~~
- Redis result store backend.
- """
- from __future__ import absolute_import
- from kombu.utils import cached_property
- from kombu.utils.url import _parse_url
- from celery.exceptions import ImproperlyConfigured
- from .base import KeyValueStoreBackend
- try:
- import redis
- from redis.exceptions import ConnectionError
- except ImportError: # pragma: no cover
- redis = None # noqa
- ConnectionError = None # noqa
- __all__ = ['RedisBackend']
- REDIS_MISSING = """\
- You need to install the redis library in order to use \
- the Redis result store backend."""
- class RedisBackend(KeyValueStoreBackend):
- """Redis task result store."""
- #: redis-py client module.
- redis = redis
- #: default Redis server hostname (`localhost`).
- host = 'localhost'
- #: default Redis server port (6379)
- port = 6379
- #: default Redis db number (0)
- db = 0
- #: default Redis password (:const:`None`)
- password = None
- #: Maximium number of connections in the pool.
- max_connections = None
- supports_autoexpire = True
- supports_native_join = True
- implements_incr = True
- def __init__(self, host=None, port=None, db=None, password=None,
- expires=None, max_connections=None, url=None, **kwargs):
- super(RedisBackend, self).__init__(**kwargs)
- conf = self.app.conf
- if self.redis is None:
- raise ImproperlyConfigured(REDIS_MISSING)
- # For compatibility with the old REDIS_* configuration keys.
- def _get(key):
- for prefix in 'CELERY_REDIS_{0}', 'REDIS_{0}':
- try:
- return conf[prefix.format(key)]
- except KeyError:
- pass
- if host and '://' in host:
- url, host = host, None
- self.url = url
- uhost = uport = upass = udb = None
- if url:
- _, uhost, uport, _, upass, udb, _ = _parse_url(url)
- udb = udb.strip('/') if udb else 0
- self.host = uhost or host or _get('HOST') or self.host
- self.port = int(uport or port or _get('PORT') or self.port)
- self.db = udb or db or _get('DB') or self.db
- self.password = upass or password or _get('PASSWORD') or self.password
- self.expires = self.prepare_expires(expires, type=int)
- self.max_connections = (max_connections
- or _get('MAX_CONNECTIONS')
- or self.max_connections)
- def get(self, key):
- return self.client.get(key)
- def mget(self, keys):
- return self.client.mget(keys)
- def set(self, key, value):
- client = self.client
- if self.expires is not None:
- client.setex(key, value, self.expires)
- else:
- client.set(key, value)
- client.publish(key, value)
- def delete(self, key):
- self.client.delete(key)
- def incr(self, key):
- return self.client.incr(key)
- def expire(self, key, value):
- return self.client.expire(key, value)
- @cached_property
- def client(self):
- pool = self.redis.ConnectionPool(host=self.host, port=self.port,
- db=self.db, password=self.password,
- max_connections=self.max_connections)
- return self.redis.Redis(connection_pool=pool)
- def __reduce__(self, args=(), kwargs={}):
- kwargs.update(
- dict(host=self.host,
- port=self.port,
- db=self.db,
- password=self.password,
- expires=self.expires,
- max_connections=self.max_connections))
- return super(RedisBackend, self).__reduce__(args, kwargs)
|