test_loaders.py 9.3 KB


  1. from __future__ import absolute_import
  2. import os
  3. import sys
  4. import warnings
  5. from mock import Mock, patch
  6. from celery import loaders
  7. from celery.exceptions import (
  8. NotConfigured,
  9. CPendingDeprecationWarning,
  10. )
  11. from celery.loaders import base
  12. from celery.loaders import default
  13. from celery.loaders.app import AppLoader
  14. from celery.utils.imports import NotAPackage
  15. from celery.utils.mail import SendmailWarning
  16. from celery.tests.case import (
  17. AppCase, Case, depends_on_current_app, with_environ,
  18. )
  19. class DummyLoader(base.BaseLoader):
  20. def read_configuration(self):
  21. return {'foo': 'bar', 'CELERY_IMPORTS': ('os', 'sys')}
  22. class test_loaders(AppCase):
  23. def test_get_loader_cls(self):
  24. self.assertEqual(loaders.get_loader_cls('default'),
  25. default.Loader)
  26. @depends_on_current_app
  27. def test_current_loader(self):
  28. with self.assertWarnsRegex(
  29. CPendingDeprecationWarning,
  30. r'deprecation'):
  31. self.assertIs(loaders.current_loader(), self.app.loader)
  32. @depends_on_current_app
  33. def test_load_settings(self):
  34. with self.assertWarnsRegex(
  35. CPendingDeprecationWarning,
  36. r'deprecation'):
  37. self.assertIs(loaders.load_settings(), self.app.conf)
  38. class test_LoaderBase(AppCase):
  39. message_options = {'subject': 'Subject',
  40. 'body': 'Body',
  41. 'sender': 'x@x.com',
  42. 'to': 'y@x.com'}
  43. server_options = {'host': 'smtp.x.com',
  44. 'port': 1234,
  45. 'user': 'x',
  46. 'password': 'qwerty',
  47. 'timeout': 3}
  48. def setup(self):
  49. self.loader = DummyLoader(app=self.app)
  50. def test_handlers_pass(self):
  51. self.loader.on_task_init('foo.task', 'feedface-cafebabe')
  52. self.loader.on_worker_init()
  53. def test_now(self):
  54. self.assertTrue(self.loader.now(utc=True))
  55. self.assertTrue(self.loader.now(utc=False))
  56. def test_read_configuration_no_env(self):
  57. self.assertDictEqual(
  58. base.BaseLoader(app=self.app).read_configuration(
  59. 'FOO_X_S_WE_WQ_Q_WE'),
  60. {},
  61. )
  62. def test_autodiscovery(self):
  63. with patch('celery.loaders.base.autodiscover_tasks') as auto:
  64. auto.return_value = [Mock()]
  65. auto.return_value[0].__name__ = 'moo'
  66. self.loader.autodiscover_tasks(['A', 'B'])
  67. self.assertIn('moo', self.loader.task_modules)
  68. self.loader.task_modules.discard('moo')
  69. def test_import_task_module(self):
  70. self.assertEqual(sys, self.loader.import_task_module('sys'))
  71. def test_init_worker_process(self):
  72. self.loader.on_worker_process_init()
  73. m = self.loader.on_worker_process_init = Mock()
  74. self.loader.init_worker_process()
  75. m.assert_called_with()
  76. def test_config_from_object_module(self):
  77. self.loader.import_from_cwd = Mock()
  78. self.loader.config_from_object('module_name')
  79. self.loader.import_from_cwd.assert_called_with('module_name')
  80. def test_conf_property(self):
  81. self.assertEqual(self.loader.conf['foo'], 'bar')
  82. self.assertEqual(self.loader._conf['foo'], 'bar')
  83. self.assertEqual(self.loader.conf['foo'], 'bar')
  84. def test_import_default_modules(self):
  85. modnames = lambda l: [m.__name__ for m in l]
  86. self.app.conf.CELERY_IMPORTS = ('os', 'sys')
  87. self.assertEqual(
  88. sorted(modnames(self.loader.import_default_modules())),
  89. sorted(modnames([os, sys])),
  90. )
  91. def test_import_from_cwd_custom_imp(self):
  92. def imp(module, package=None):
  93. imp.called = True
  94. imp.called = False
  95. self.loader.import_from_cwd('foo', imp=imp)
  96. self.assertTrue(imp.called)
  97. @patch('celery.utils.mail.Mailer._send')
  98. def test_mail_admins_errors(self, send):
  99. send.side_effect = KeyError()
  100. opts = dict(self.message_options, **self.server_options)
  101. with self.assertWarnsRegex(SendmailWarning, r'KeyError'):
  102. self.loader.mail_admins(fail_silently=True, **opts)
  103. with self.assertRaises(KeyError):
  104. self.loader.mail_admins(fail_silently=False, **opts)
  105. @patch('celery.utils.mail.Mailer._send')
  106. def test_mail_admins(self, send):
  107. opts = dict(self.message_options, **self.server_options)
  108. self.loader.mail_admins(**opts)
  109. self.assertTrue(send.call_args)
  110. message = send.call_args[0][0]
  111. self.assertEqual(message.to, [self.message_options['to']])
  112. self.assertEqual(message.subject, self.message_options['subject'])
  113. self.assertEqual(message.sender, self.message_options['sender'])
  114. self.assertEqual(message.body, self.message_options['body'])
  115. def test_mail_attribute(self):
  116. from celery.utils import mail
  117. loader = base.BaseLoader(app=self.app)
  118. self.assertIs(loader.mail, mail)
  119. def test_cmdline_config_ValueError(self):
  120. with self.assertRaises(ValueError):
  121. self.loader.cmdline_config_parser(['broker.port=foobar'])
  122. class test_DefaultLoader(AppCase):
  123. @patch('celery.loaders.base.find_module')
  124. def test_read_configuration_not_a_package(self, find_module):
  125. find_module.side_effect = NotAPackage()
  126. l = default.Loader(app=self.app)
  127. with self.assertRaises(NotAPackage):
  128. l.read_configuration(fail_silently=False)
  129. @patch('celery.loaders.base.find_module')
  130. @with_environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')
  131. def test_read_configuration_py_in_name(self, find_module):
  132. find_module.side_effect = NotAPackage()
  133. l = default.Loader(app=self.app)
  134. with self.assertRaises(NotAPackage):
  135. l.read_configuration(fail_silently=False)
  136. @patch('celery.loaders.base.find_module')
  137. def test_read_configuration_importerror(self, find_module):
  138. default.C_WNOCONF = True
  139. find_module.side_effect = ImportError()
  140. l = default.Loader(app=self.app)
  141. with self.assertWarnsRegex(NotConfigured, r'make sure it exists'):
  142. l.read_configuration(fail_silently=True)
  143. default.C_WNOCONF = False
  144. l.read_configuration(fail_silently=True)
  145. def test_read_configuration(self):
  146. from types import ModuleType
  147. class ConfigModule(ModuleType):
  148. pass
  149. configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'
  150. celeryconfig = ConfigModule(configname)
  151. celeryconfig.CELERY_IMPORTS = ('os', 'sys')
  152. prevconfig = sys.modules.get(configname)
  153. sys.modules[configname] = celeryconfig
  154. try:
  155. l = default.Loader(app=self.app)
  156. l.find_module = Mock(name='find_module')
  157. settings = l.read_configuration(fail_silently=False)
  158. self.assertTupleEqual(settings.CELERY_IMPORTS, ('os', 'sys'))
  159. settings = l.read_configuration(fail_silently=False)
  160. self.assertTupleEqual(settings.CELERY_IMPORTS, ('os', 'sys'))
  161. l.on_worker_init()
  162. finally:
  163. if prevconfig:
  164. sys.modules[configname] = prevconfig
  165. def test_import_from_cwd(self):
  166. l = default.Loader(app=self.app)
  167. old_path = list(sys.path)
  168. try:
  169. sys.path.remove(os.getcwd())
  170. except ValueError:
  171. pass
  172. celery = sys.modules.pop('celery', None)
  173. try:
  174. self.assertTrue(l.import_from_cwd('celery'))
  175. sys.modules.pop('celery', None)
  176. sys.path.insert(0, os.getcwd())
  177. self.assertTrue(l.import_from_cwd('celery'))
  178. finally:
  179. sys.path = old_path
  180. sys.modules['celery'] = celery
  181. def test_unconfigured_settings(self):
  182. context_executed = [False]
  183. class _Loader(default.Loader):
  184. def find_module(self, name):
  185. raise ImportError(name)
  186. with warnings.catch_warnings(record=True):
  187. l = _Loader(app=self.app)
  188. self.assertFalse(l.configured)
  189. context_executed[0] = True
  190. self.assertTrue(context_executed[0])
  191. class test_AppLoader(AppCase):
  192. def setup(self):
  193. self.loader = AppLoader(app=self.app)
  194. def test_on_worker_init(self):
  195. self.app.conf.CELERY_IMPORTS = ('subprocess', )
  196. sys.modules.pop('subprocess', None)
  197. self.loader.init_worker()
  198. self.assertIn('subprocess', sys.modules)
  199. class test_autodiscovery(Case):
  200. def test_autodiscover_tasks(self):
  201. base._RACE_PROTECTION = True
  202. try:
  203. base.autodiscover_tasks(['foo'])
  204. finally:
  205. base._RACE_PROTECTION = False
  206. with patch('celery.loaders.base.find_related_module') as frm:
  207. base.autodiscover_tasks(['foo'])
  208. self.assertTrue(frm.called)
  209. def test_find_related_module(self):
  210. with patch('importlib.import_module') as imp:
  211. with patch('imp.find_module') as find:
  212. imp.return_value = Mock()
  213. imp.return_value.__path__ = 'foo'
  214. base.find_related_module(base, 'tasks')
  215. imp.side_effect = AttributeError()
  216. base.find_related_module(base, 'tasks')
  217. imp.side_effect = None
  218. find.side_effect = ImportError()
  219. base.find_related_module(base, 'tasks')