test_redis.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. from __future__ import absolute_import
  2. from datetime import timedelta
  3. from mock import Mock, patch
  4. from nose import SkipTest
  5. from pickle import loads, dumps
  6. from kombu.utils import cached_property, uuid
  7. from celery import signature
  8. from celery import states
  9. from celery.datastructures import AttributeDict
  10. from celery.exceptions import ImproperlyConfigured
  11. from celery.utils.timeutils import timedelta_seconds
  12. from celery.tests.case import AppCase, depends_on_current_app
  13. class Redis(object):
  14. class Connection(object):
  15. connected = True
  16. def disconnect(self):
  17. self.connected = False
  18. def __init__(self, host=None, port=None, db=None, password=None, **kw):
  19. self.host = host
  20. self.port = port
  21. self.db = db
  22. self.password = password
  23. self.connection = self.Connection()
  24. self.keyspace = {}
  25. self.expiry = {}
  26. def get(self, key):
  27. return self.keyspace.get(key)
  28. def setex(self, key, value, expires):
  29. self.set(key, value)
  30. self.expire(key, expires)
  31. def set(self, key, value):
  32. self.keyspace[key] = value
  33. def expire(self, key, expires):
  34. self.expiry[key] = expires
  35. def delete(self, key):
  36. self.keyspace.pop(key)
  37. def publish(self, key, value):
  38. pass
  39. class redis(object):
  40. Redis = Redis
  41. class ConnectionPool(object):
  42. def __init__(self, **kwargs):
  43. pass
  44. class test_RedisBackend(AppCase):
  45. def get_backend(self):
  46. from celery.backends import redis
  47. class RedisBackend(redis.RedisBackend):
  48. redis = redis
  49. return RedisBackend
  50. def setup(self):
  51. self.Backend = self.get_backend()
  52. class MockBackend(self.Backend):
  53. @cached_property
  54. def client(self):
  55. return Mock()
  56. self.MockBackend = MockBackend
  57. @depends_on_current_app
  58. def test_reduce(self):
  59. try:
  60. from celery.backends.redis import RedisBackend
  61. x = RedisBackend(app=self.app)
  62. self.assertTrue(loads(dumps(x)))
  63. except ImportError:
  64. raise SkipTest('redis not installed')
  65. def test_no_redis(self):
  66. self.MockBackend.redis = None
  67. with self.assertRaises(ImproperlyConfigured):
  68. self.MockBackend(app=self.app)
  69. def test_url(self):
  70. x = self.MockBackend('redis://foobar//1', app=self.app)
  71. self.assertEqual(x.host, 'foobar')
  72. self.assertEqual(x.db, '1')
  73. def test_conf_raises_KeyError(self):
  74. self.app.conf = AttributeDict({
  75. 'CELERY_RESULT_SERIALIZER': 'json',
  76. 'CELERY_MAX_CACHED_RESULTS': 1,
  77. 'CELERY_ACCEPT_CONTENT': ['json'],
  78. 'CELERY_TASK_RESULT_EXPIRES': None,
  79. })
  80. self.MockBackend(app=self.app)
  81. def test_expires_defaults_to_config(self):
  82. self.app.conf.CELERY_TASK_RESULT_EXPIRES = 10
  83. b = self.Backend(expires=None, app=self.app)
  84. self.assertEqual(b.expires, 10)
  85. def test_expires_is_int(self):
  86. b = self.Backend(expires=48, app=self.app)
  87. self.assertEqual(b.expires, 48)
  88. def test_expires_is_None(self):
  89. b = self.Backend(expires=None, app=self.app)
  90. self.assertEqual(b.expires, timedelta_seconds(
  91. self.app.conf.CELERY_TASK_RESULT_EXPIRES))
  92. def test_expires_is_timedelta(self):
  93. b = self.Backend(expires=timedelta(minutes=1), app=self.app)
  94. self.assertEqual(b.expires, 60)
  95. def test_on_chord_apply(self):
  96. self.Backend(app=self.app).on_chord_apply(
  97. 'group_id', {},
  98. result=[self.app.AsyncResult(x) for x in [1, 2, 3]],
  99. )
  100. def test_mget(self):
  101. b = self.MockBackend(app=self.app)
  102. self.assertTrue(b.mget(['a', 'b', 'c']))
  103. b.client.mget.assert_called_with(['a', 'b', 'c'])
  104. def test_set_no_expire(self):
  105. b = self.MockBackend(app=self.app)
  106. b.expires = None
  107. b.set('foo', 'bar')
  108. @patch('celery.result.GroupResult.restore')
  109. def test_on_chord_part_return(self, restore):
  110. b = self.MockBackend(app=self.app)
  111. deps = Mock()
  112. deps.__len__ = Mock()
  113. deps.__len__.return_value = 10
  114. restore.return_value = deps
  115. b.client.incr.return_value = 1
  116. task = Mock()
  117. task.name = 'foobarbaz'
  118. self.app.tasks['foobarbaz'] = task
  119. task.request.chord = signature(task)
  120. task.request.group = 'group_id'
  121. b.on_chord_part_return(task)
  122. self.assertTrue(b.client.incr.call_count)
  123. b.client.incr.return_value = len(deps)
  124. b.on_chord_part_return(task)
  125. deps.join_native.assert_called_with(propagate=True)
  126. deps.delete.assert_called_with()
  127. self.assertTrue(b.client.expire.call_count)
  128. def test_process_cleanup(self):
  129. self.Backend(app=self.app).process_cleanup()
  130. def test_get_set_forget(self):
  131. b = self.Backend(app=self.app)
  132. tid = uuid()
  133. b.store_result(tid, 42, states.SUCCESS)
  134. self.assertEqual(b.get_status(tid), states.SUCCESS)
  135. self.assertEqual(b.get_result(tid), 42)
  136. b.forget(tid)
  137. self.assertEqual(b.get_status(tid), states.PENDING)
  138. def test_set_expires(self):
  139. b = self.Backend(expires=512, app=self.app)
  140. tid = uuid()
  141. key = b.get_key_for_task(tid)
  142. b.store_result(tid, 42, states.SUCCESS)
  143. self.assertEqual(b.client.expiry[key], 512)