redis.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.redis
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Redis result store backend.
  6. """
  7. from __future__ import absolute_import
  8. from kombu.utils import cached_property
  9. from kombu.utils.url import _parse_url
  10. from celery.exceptions import ImproperlyConfigured
  11. from .base import KeyValueStoreBackend
  12. try:
  13. import redis
  14. from redis.exceptions import ConnectionError
  15. except ImportError: # pragma: no cover
  16. redis = None # noqa
  17. ConnectionError = None # noqa
  18. __all__ = ['RedisBackend']
  19. REDIS_MISSING = """\
  20. You need to install the redis library in order to use \
  21. the Redis result store backend."""
  22. class RedisBackend(KeyValueStoreBackend):
  23. """Redis task result store."""
  24. #: redis-py client module.
  25. redis = redis
  26. #: default Redis server hostname (`localhost`).
  27. host = 'localhost'
  28. #: default Redis server port (6379)
  29. port = 6379
  30. #: default Redis db number (0)
  31. db = 0
  32. #: default Redis password (:const:`None`)
  33. password = None
  34. #: Maximium number of connections in the pool.
  35. max_connections = None
  36. supports_autoexpire = True
  37. supports_native_join = True
  38. implements_incr = True
  39. def __init__(self, host=None, port=None, db=None, password=None,
  40. expires=None, max_connections=None, url=None, **kwargs):
  41. super(RedisBackend, self).__init__(**kwargs)
  42. conf = self.app.conf
  43. if self.redis is None:
  44. raise ImproperlyConfigured(REDIS_MISSING)
  45. # For compatibility with the old REDIS_* configuration keys.
  46. def _get(key):
  47. for prefix in 'CELERY_REDIS_{0}', 'REDIS_{0}':
  48. try:
  49. return conf[prefix.format(key)]
  50. except KeyError:
  51. pass
  52. if host and '://' in host:
  53. url, host = host, None
  54. self.url = url
  55. uhost = uport = upass = udb = None
  56. if url:
  57. _, uhost, uport, _, upass, udb, _ = _parse_url(url)
  58. udb = udb.strip('/') if udb else 0
  59. self.host = uhost or host or _get('HOST') or self.host
  60. self.port = int(uport or port or _get('PORT') or self.port)
  61. self.db = udb or db or _get('DB') or self.db
  62. self.password = upass or password or _get('PASSWORD') or self.password
  63. self.expires = self.prepare_expires(expires, type=int)
  64. self.max_connections = (max_connections
  65. or _get('MAX_CONNECTIONS')
  66. or self.max_connections)
  67. def get(self, key):
  68. return self.client.get(key)
  69. def mget(self, keys):
  70. return self.client.mget(keys)
  71. def set(self, key, value):
  72. client = self.client
  73. if self.expires is not None:
  74. client.setex(key, value, self.expires)
  75. else:
  76. client.set(key, value)
  77. client.publish(key, value)
  78. def delete(self, key):
  79. self.client.delete(key)
  80. def incr(self, key):
  81. return self.client.incr(key)
  82. def expire(self, key, value):
  83. return self.client.expire(key, value)
  84. @cached_property
  85. def client(self):
  86. pool = self.redis.ConnectionPool(host=self.host, port=self.port,
  87. db=self.db, password=self.password,
  88. max_connections=self.max_connections)
  89. return self.redis.Redis(connection_pool=pool)
  90. def __reduce__(self, args=(), kwargs={}):
  91. kwargs.update(
  92. dict(host=self.host,
  93. port=self.port,
  94. db=self.db,
  95. password=self.password,
  96. expires=self.expires,
  97. max_connections=self.max_connections))
  98. return super(RedisBackend, self).__reduce__(args, kwargs)