test_security.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. """
  2. Keys and certificates for tests (KEY1 is a private key of CERT1, etc.)
  3. Generated with:
  4. .. code-block:: bash
  5. $ openssl genrsa -des3 -passout pass:test -out key1.key 1024
  6. $ openssl req -new -key key1.key -out key1.csr -passin pass:test
  7. $ cp key1.key key1.key.org
  8. $ openssl rsa -in key1.key.org -out key1.key -passin pass:test
  9. $ openssl x509 -req -days 365 -in cert1.csr \
  10. -signkey key1.key -out cert1.crt
  11. $ rm key1.key.org cert1.csr
  12. """
  13. from __future__ import absolute_import
  14. from mock import Mock, patch
  15. from kombu.serialization import disable_insecure_serializers
  16. from celery.exceptions import ImproperlyConfigured, SecurityError
  17. from celery.five import builtins
  18. from celery.security.utils import reraise_errors
  19. from kombu.serialization import registry
  20. from .case import SecurityCase
  21. from celery.tests.case import mock_open
  22. class test_security(SecurityCase):
  23. def teardown(self):
  24. registry._disabled_content_types.clear()
  25. def test_disable_insecure_serializers(self):
  26. try:
  27. disabled = registry._disabled_content_types
  28. self.assertTrue(disabled)
  29. disable_insecure_serializers(
  30. ['application/json', 'application/x-python-serialize'],
  31. )
  32. self.assertIn('application/x-yaml', disabled)
  33. self.assertNotIn('application/json', disabled)
  34. self.assertNotIn('application/x-python-serialize', disabled)
  35. disabled.clear()
  36. disable_insecure_serializers(allowed=None)
  37. self.assertIn('application/x-yaml', disabled)
  38. self.assertIn('application/json', disabled)
  39. self.assertIn('application/x-python-serialize', disabled)
  40. finally:
  41. disable_insecure_serializers(allowed=['json'])
  42. def test_setup_security(self):
  43. disabled = registry._disabled_content_types
  44. self.assertEqual(0, len(disabled))
  45. self.app.conf.CELERY_TASK_SERIALIZER = 'json'
  46. self.app.setup_security()
  47. self.assertIn('application/x-python-serialize', disabled)
  48. disabled.clear()
  49. @patch('celery.security.register_auth')
  50. @patch('celery.security._disable_insecure_serializers')
  51. def test_setup_registry_complete(self, dis, reg, key='KEY', cert='CERT'):
  52. calls = [0]
  53. def effect(*args):
  54. try:
  55. m = Mock()
  56. m.read.return_value = 'B' if calls[0] else 'A'
  57. return m
  58. finally:
  59. calls[0] += 1
  60. self.app.conf.CELERY_TASK_SERIALIZER = 'auth'
  61. with mock_open(side_effect=effect):
  62. with patch('celery.security.registry') as registry:
  63. store = Mock()
  64. self.app.setup_security(['json'], key, cert, store)
  65. dis.assert_called_with(['json'])
  66. reg.assert_called_with('A', 'B', store, 'sha1', 'json')
  67. registry._set_default_serializer.assert_called_with('auth')
  68. def test_security_conf(self):
  69. self.app.conf.CELERY_TASK_SERIALIZER = 'auth'
  70. with self.assertRaises(ImproperlyConfigured):
  71. self.app.setup_security()
  72. _import = builtins.__import__
  73. def import_hook(name, *args, **kwargs):
  74. if name == 'OpenSSL':
  75. raise ImportError
  76. return _import(name, *args, **kwargs)
  77. builtins.__import__ = import_hook
  78. with self.assertRaises(ImproperlyConfigured):
  79. self.app.setup_security()
  80. builtins.__import__ = _import
  81. def test_reraise_errors(self):
  82. with self.assertRaises(SecurityError):
  83. with reraise_errors(errors=(KeyError, )):
  84. raise KeyError('foo')
  85. with self.assertRaises(KeyError):
  86. with reraise_errors(errors=(ValueError, )):
  87. raise KeyError('bar')