x509.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # -*- coding: utf-8 -*-
  2. """A X509Adapter for use with the requests library.
  3. This file contains an implementation of the X509Adapter that will
  4. allow users to authenticate a request using an arbitrary
  5. X.509 certificate without needing to convert it to a .pem file
  6. """
  7. from OpenSSL.crypto import PKey, X509
  8. from cryptography import x509
  9. from cryptography.hazmat.primitives.serialization import (load_pem_private_key,
  10. load_der_private_key)
  11. from cryptography.hazmat.primitives.serialization import Encoding
  12. from cryptography.hazmat.backends import default_backend
  13. from datetime import datetime
  14. from requests.adapters import HTTPAdapter
  15. import requests
  16. from .._compat import PyOpenSSLContext
  17. from .. import exceptions as exc
  18. """
  19. importing the protocol constants from _ssl instead of ssl because only the
  20. constants are needed and to handle issues caused by importing from ssl on
  21. the 2.7.x line.
  22. """
  23. try:
  24. from _ssl import PROTOCOL_TLS as PROTOCOL
  25. except ImportError:
  26. from _ssl import PROTOCOL_SSLv23 as PROTOCOL
  27. class X509Adapter(HTTPAdapter):
  28. r"""Adapter for use with X.509 certificates.
  29. Provides an interface for Requests sessions to contact HTTPS urls and
  30. authenticate with an X.509 cert by implementing the Transport Adapter
  31. interface. This class will need to be manually instantiated and mounted
  32. to the session
  33. :param pool_connections: The number of urllib3 connection pools to
  34. cache.
  35. :param pool_maxsize: The maximum number of connections to save in the
  36. pool.
  37. :param max_retries: The maximum number of retries each connection
  38. should attempt. Note, this applies only to failed DNS lookups,
  39. socket connections and connection timeouts, never to requests where
  40. data has made it to the server. By default, Requests does not retry
  41. failed connections. If you need granular control over the
  42. conditions under which we retry a request, import urllib3's
  43. ``Retry`` class and pass that instead.
  44. :param pool_block: Whether the connection pool should block for
  45. connections.
  46. :param bytes cert_bytes:
  47. bytes object containing contents of a cryptography.x509Certificate
  48. object using the encoding specified by the ``encoding`` parameter.
  49. :param bytes pk_bytes:
  50. bytes object containing contents of a object that implements
  51. ``cryptography.hazmat.primitives.serialization.PrivateFormat``
  52. using the encoding specified by the ``encoding`` parameter.
  53. :param password:
  54. string or utf8 encoded bytes containing the passphrase used for the
  55. private key. None if unencrypted. Defaults to None.
  56. :param encoding:
  57. Enumeration detailing the encoding method used on the ``cert_bytes``
  58. parameter. Can be either PEM or DER. Defaults to PEM.
  59. :type encoding:
  60. :class: `cryptography.hazmat.primitives.serialization.Encoding`
  61. Usage::
  62. >>> import requests
  63. >>> from requests_toolbelt.adapters.x509 import X509Adapter
  64. >>> s = requests.Session()
  65. >>> a = X509Adapter(max_retries=3,
  66. cert_bytes=b'...', pk_bytes=b'...', encoding='...'
  67. >>> s.mount('https://', a)
  68. """
  69. def __init__(self, *args, **kwargs):
  70. self._check_version()
  71. cert_bytes = kwargs.pop('cert_bytes', None)
  72. pk_bytes = kwargs.pop('pk_bytes', None)
  73. password = kwargs.pop('password', None)
  74. encoding = kwargs.pop('encoding', Encoding.PEM)
  75. password_bytes = None
  76. if cert_bytes is None or not isinstance(cert_bytes, bytes):
  77. raise ValueError('Invalid cert content provided. '
  78. 'You must provide an X.509 cert '
  79. 'formatted as a byte array.')
  80. if pk_bytes is None or not isinstance(pk_bytes, bytes):
  81. raise ValueError('Invalid private key content provided. '
  82. 'You must provide a private key '
  83. 'formatted as a byte array.')
  84. if isinstance(password, bytes):
  85. password_bytes = password
  86. elif password:
  87. password_bytes = password.encode('utf8')
  88. self.ssl_context = create_ssl_context(cert_bytes, pk_bytes,
  89. password_bytes, encoding)
  90. super(X509Adapter, self).__init__(*args, **kwargs)
  91. def init_poolmanager(self, *args, **kwargs):
  92. if self.ssl_context:
  93. kwargs['ssl_context'] = self.ssl_context
  94. return super(X509Adapter, self).init_poolmanager(*args, **kwargs)
  95. def proxy_manager_for(self, *args, **kwargs):
  96. if self.ssl_context:
  97. kwargs['ssl_context'] = self.ssl_context
  98. return super(X509Adapter, self).proxy_manager_for(*args, **kwargs)
  99. def _check_version(self):
  100. if PyOpenSSLContext is None:
  101. raise exc.VersionMismatchError(
  102. "The X509Adapter requires at least Requests 2.12.0 to be "
  103. "installed. Version {0} was found instead.".format(
  104. requests.__version__
  105. )
  106. )
  107. def check_cert_dates(cert):
  108. """Verify that the supplied client cert is not invalid."""
  109. now = datetime.utcnow()
  110. if cert.not_valid_after < now or cert.not_valid_before > now:
  111. raise ValueError('Client certificate expired: Not After: '
  112. '{0:%Y-%m-%d %H:%M:%SZ} '
  113. 'Not Before: {1:%Y-%m-%d %H:%M:%SZ}'
  114. .format(cert.not_valid_after, cert.not_valid_before))
  115. def create_ssl_context(cert_byes, pk_bytes, password=None,
  116. encoding=Encoding.PEM):
  117. """Create an SSL Context with the supplied cert/password.
  118. :param cert_bytes array of bytes containing the cert encoded
  119. using the method supplied in the ``encoding`` parameter
  120. :param pk_bytes array of bytes containing the private key encoded
  121. using the method supplied in the ``encoding`` parameter
  122. :param password array of bytes containing the passphrase to be used
  123. with the supplied private key. None if unencrypted.
  124. Defaults to None.
  125. :param encoding ``cryptography.hazmat.primitives.serialization.Encoding``
  126. details the encoding method used on the ``cert_bytes`` and
  127. ``pk_bytes`` parameters. Can be either PEM or DER.
  128. Defaults to PEM.
  129. """
  130. backend = default_backend()
  131. cert = None
  132. key = None
  133. if encoding == Encoding.PEM:
  134. cert = x509.load_pem_x509_certificate(cert_byes, backend)
  135. key = load_pem_private_key(pk_bytes, password, backend)
  136. elif encoding == Encoding.DER:
  137. cert = x509.load_der_x509_certificate(cert_byes, backend)
  138. key = load_der_private_key(pk_bytes, password, backend)
  139. else:
  140. raise ValueError('Invalid encoding provided: Must be PEM or DER')
  141. if not (cert and key):
  142. raise ValueError('Cert and key could not be parsed from '
  143. 'provided data')
  144. check_cert_dates(cert)
  145. ssl_context = PyOpenSSLContext(PROTOCOL)
  146. ssl_context._ctx.use_certificate(X509.from_cryptography(cert))
  147. ssl_context._ctx.use_privatekey(PKey.from_cryptography_key(key))
  148. return ssl_context