certificate.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.security.certificate
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. X.509 certificates.
  6. """
  7. from __future__ import absolute_import
  8. import glob
  9. import os
  10. from kombu.utils.encoding import bytes_to_str
  11. from celery.exceptions import SecurityError
  12. from celery.five import values
  13. from .utils import crypto, reraise_errors
  14. __all__ = ['Certificate', 'CertStore', 'FSCertStore']
  15. class Certificate(object):
  16. """X.509 certificate."""
  17. def __init__(self, cert):
  18. assert crypto is not None
  19. with reraise_errors('Invalid certificate: {0!r}'):
  20. self._cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
  21. def has_expired(self):
  22. """Check if the certificate has expired."""
  23. return self._cert.has_expired()
  24. def get_serial_number(self):
  25. """Return the serial number in the certificate."""
  26. return self._cert.get_serial_number()
  27. def get_issuer(self):
  28. """Return issuer (CA) as a string"""
  29. return ' '.join(bytes_to_str(x[1]) for x in
  30. self._cert.get_issuer().get_components())
  31. def get_id(self):
  32. """Serial number/issuer pair uniquely identifies a certificate"""
  33. return '{0} {1}'.format(self.get_issuer(), self.get_serial_number())
  34. def verify(self, data, signature, digest):
  35. """Verifies the signature for string containing data."""
  36. with reraise_errors('Bad signature: {0!r}'):
  37. crypto.verify(self._cert, signature, data, digest)
  38. class CertStore(object):
  39. """Base class for certificate stores"""
  40. def __init__(self):
  41. self._certs = {}
  42. def itercerts(self):
  43. """an iterator over the certificates"""
  44. for c in values(self._certs):
  45. yield c
  46. def __getitem__(self, id):
  47. """get certificate by id"""
  48. try:
  49. return self._certs[id]
  50. except KeyError:
  51. raise SecurityError('Unknown certificate: {0!r}'.format(id))
  52. def add_cert(self, cert):
  53. if cert.get_id() in self._certs:
  54. raise SecurityError('Duplicate certificate: {0!r}'.format(id))
  55. self._certs[cert.get_id()] = cert
  56. class FSCertStore(CertStore):
  57. """File system certificate store"""
  58. def __init__(self, path):
  59. CertStore.__init__(self)
  60. if os.path.isdir(path):
  61. path = os.path.join(path, '*')
  62. for p in glob.glob(path):
  63. with open(p) as f:
  64. cert = Certificate(f.read())
  65. if cert.has_expired():
  66. raise SecurityError(
  67. 'Expired certificate: {0!r}'.format(cert.get_id()))
  68. self.add_cert(cert)