test_cache.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. from __future__ import absolute_import
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. from kombu.utils.encoding import str_to_bytes
  6. from mock import Mock, patch
  7. from celery import signature
  8. from celery import states
  9. from celery.backends.cache import CacheBackend, DummyClient
  10. from celery.exceptions import ImproperlyConfigured
  11. from celery.five import items, string, text_t
  12. from celery.utils import uuid
  13. from celery.tests.case import AppCase, mask_modules, reset_modules
  14. class SomeClass(object):
  15. def __init__(self, data):
  16. self.data = data
  17. class test_CacheBackend(AppCase):
  18. def setup(self):
  19. self.tb = CacheBackend(backend='memory://', app=self.app)
  20. self.tid = uuid()
  21. def test_no_backend(self):
  22. self.app.conf.CELERY_CACHE_BACKEND = None
  23. with self.assertRaises(ImproperlyConfigured):
  24. CacheBackend(backend=None, app=self.app)
  25. def test_mark_as_done(self):
  26. self.assertEqual(self.tb.get_status(self.tid), states.PENDING)
  27. self.assertIsNone(self.tb.get_result(self.tid))
  28. self.tb.mark_as_done(self.tid, 42)
  29. self.assertEqual(self.tb.get_status(self.tid), states.SUCCESS)
  30. self.assertEqual(self.tb.get_result(self.tid), 42)
  31. def test_is_pickled(self):
  32. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  33. self.tb.mark_as_done(self.tid, result)
  34. # is serialized properly.
  35. rindb = self.tb.get_result(self.tid)
  36. self.assertEqual(rindb.get('foo'), 'baz')
  37. self.assertEqual(rindb.get('bar').data, 12345)
  38. def test_mark_as_failure(self):
  39. try:
  40. raise KeyError('foo')
  41. except KeyError as exception:
  42. self.tb.mark_as_failure(self.tid, exception)
  43. self.assertEqual(self.tb.get_status(self.tid), states.FAILURE)
  44. self.assertIsInstance(self.tb.get_result(self.tid), KeyError)
  45. def test_on_chord_apply(self):
  46. tb = CacheBackend(backend='memory://', app=self.app)
  47. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  48. tb.on_chord_apply(gid, {}, result=res)
  49. @patch('celery.result.GroupResult.restore')
  50. def test_on_chord_part_return(self, restore):
  51. tb = CacheBackend(backend='memory://', app=self.app)
  52. deps = Mock()
  53. deps.__len__ = Mock()
  54. deps.__len__.return_value = 2
  55. restore.return_value = deps
  56. task = Mock()
  57. task.name = 'foobarbaz'
  58. self.app.tasks['foobarbaz'] = task
  59. task.request.chord = signature(task)
  60. gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
  61. task.request.group = gid
  62. tb.on_chord_apply(gid, {}, result=res)
  63. self.assertFalse(deps.join_native.called)
  64. tb.on_chord_part_return(task)
  65. self.assertFalse(deps.join_native.called)
  66. tb.on_chord_part_return(task)
  67. deps.join_native.assert_called_with(propagate=True)
  68. deps.delete.assert_called_with()
  69. def test_mget(self):
  70. self.tb.set('foo', 1)
  71. self.tb.set('bar', 2)
  72. self.assertDictEqual(self.tb.mget(['foo', 'bar']),
  73. {'foo': 1, 'bar': 2})
  74. def test_forget(self):
  75. self.tb.mark_as_done(self.tid, {'foo': 'bar'})
  76. x = self.app.AsyncResult(self.tid, backend=self.tb)
  77. x.forget()
  78. self.assertIsNone(x.result)
  79. def test_process_cleanup(self):
  80. self.tb.process_cleanup()
  81. def test_expires_as_int(self):
  82. tb = CacheBackend(backend='memory://', expires=10, app=self.app)
  83. self.assertEqual(tb.expires, 10)
  84. def test_unknown_backend_raises_ImproperlyConfigured(self):
  85. with self.assertRaises(ImproperlyConfigured):
  86. CacheBackend(backend='unknown://', app=self.app)
  87. class MyMemcachedStringEncodingError(Exception):
  88. pass
  89. class MemcachedClient(DummyClient):
  90. def set(self, key, value, *args, **kwargs):
  91. if isinstance(key, text_t):
  92. raise MyMemcachedStringEncodingError(
  93. 'Keys must be bytes, not string. Convert your '
  94. 'strings using mystring.encode(charset)!')
  95. return super(MemcachedClient, self).set(key, value, *args, **kwargs)
  96. class MockCacheMixin(object):
  97. @contextmanager
  98. def mock_memcache(self):
  99. memcache = types.ModuleType('memcache')
  100. memcache.Client = MemcachedClient
  101. memcache.Client.__module__ = memcache.__name__
  102. prev, sys.modules['memcache'] = sys.modules.get('memcache'), memcache
  103. try:
  104. yield True
  105. finally:
  106. if prev is not None:
  107. sys.modules['memcache'] = prev
  108. @contextmanager
  109. def mock_pylibmc(self):
  110. pylibmc = types.ModuleType('pylibmc')
  111. pylibmc.Client = MemcachedClient
  112. pylibmc.Client.__module__ = pylibmc.__name__
  113. prev = sys.modules.get('pylibmc')
  114. sys.modules['pylibmc'] = pylibmc
  115. try:
  116. yield True
  117. finally:
  118. if prev is not None:
  119. sys.modules['pylibmc'] = prev
  120. class test_get_best_memcache(AppCase, MockCacheMixin):
  121. def test_pylibmc(self):
  122. with self.mock_pylibmc():
  123. with reset_modules('celery.backends.cache'):
  124. from celery.backends import cache
  125. cache._imp = [None]
  126. self.assertEqual(cache.get_best_memcache().__module__,
  127. 'pylibmc')
  128. def test_memcache(self):
  129. with self.mock_memcache():
  130. with reset_modules('celery.backends.cache'):
  131. with mask_modules('pylibmc'):
  132. from celery.backends import cache
  133. cache._imp = [None]
  134. self.assertEqual(cache.get_best_memcache().__module__,
  135. 'memcache')
  136. def test_no_implementations(self):
  137. with mask_modules('pylibmc', 'memcache'):
  138. with reset_modules('celery.backends.cache'):
  139. from celery.backends import cache
  140. cache._imp = [None]
  141. with self.assertRaises(ImproperlyConfigured):
  142. cache.get_best_memcache()
  143. def test_cached(self):
  144. with self.mock_pylibmc():
  145. with reset_modules('celery.backends.cache'):
  146. from celery.backends import cache
  147. cache._imp = [None]
  148. cache.get_best_memcache(behaviors={'foo': 'bar'})
  149. self.assertTrue(cache._imp[0])
  150. cache.get_best_memcache()
  151. def test_backends(self):
  152. from celery.backends.cache import backends
  153. for name, fun in items(backends):
  154. self.assertTrue(fun())
  155. class test_memcache_key(AppCase, MockCacheMixin):
  156. def test_memcache_unicode_key(self):
  157. with self.mock_memcache():
  158. with reset_modules('celery.backends.cache'):
  159. with mask_modules('pylibmc'):
  160. from celery.backends import cache
  161. cache._imp = [None]
  162. task_id, result = string(uuid()), 42
  163. b = cache.CacheBackend(backend='memcache', app=self.app)
  164. b.store_result(task_id, result, status=states.SUCCESS)
  165. self.assertEqual(b.get_result(task_id), result)
  166. def test_memcache_bytes_key(self):
  167. with self.mock_memcache():
  168. with reset_modules('celery.backends.cache'):
  169. with mask_modules('pylibmc'):
  170. from celery.backends import cache
  171. cache._imp = [None]
  172. task_id, result = str_to_bytes(uuid()), 42
  173. b = cache.CacheBackend(backend='memcache', app=self.app)
  174. b.store_result(task_id, result, status=states.SUCCESS)
  175. self.assertEqual(b.get_result(task_id), result)
  176. def test_pylibmc_unicode_key(self):
  177. with reset_modules('celery.backends.cache'):
  178. with self.mock_pylibmc():
  179. from celery.backends import cache
  180. cache._imp = [None]
  181. task_id, result = string(uuid()), 42
  182. b = cache.CacheBackend(backend='memcache', app=self.app)
  183. b.store_result(task_id, result, status=states.SUCCESS)
  184. self.assertEqual(b.get_result(task_id), result)
  185. def test_pylibmc_bytes_key(self):
  186. with reset_modules('celery.backends.cache'):
  187. with self.mock_pylibmc():
  188. from celery.backends import cache
  189. cache._imp = [None]
  190. task_id, result = str_to_bytes(uuid()), 42
  191. b = cache.CacheBackend(backend='memcache', app=self.app)
  192. b.store_result(task_id, result, status=states.SUCCESS)
  193. self.assertEqual(b.get_result(task_id), result)