ocsp_support.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. # Copyright 2020-present MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Support for requesting and verifying OCSP responses."""
  15. import logging as _logging
  16. import re as _re
  17. from datetime import datetime as _datetime
  18. from cryptography.exceptions import InvalidSignature as _InvalidSignature
  19. from cryptography.hazmat.backends import default_backend as _default_backend
  20. from cryptography.hazmat.primitives.asymmetric.dsa import (
  21. DSAPublicKey as _DSAPublicKey)
  22. from cryptography.hazmat.primitives.asymmetric.ec import (
  23. ECDSA as _ECDSA,
  24. EllipticCurvePublicKey as _EllipticCurvePublicKey)
  25. from cryptography.hazmat.primitives.asymmetric.padding import (
  26. PKCS1v15 as _PKCS1v15)
  27. from cryptography.hazmat.primitives.asymmetric.rsa import (
  28. RSAPublicKey as _RSAPublicKey)
  29. from cryptography.hazmat.primitives.hashes import (
  30. Hash as _Hash,
  31. SHA1 as _SHA1)
  32. from cryptography.hazmat.primitives.serialization import (
  33. Encoding as _Encoding,
  34. PublicFormat as _PublicFormat)
  35. from cryptography.x509 import (
  36. AuthorityInformationAccess as _AuthorityInformationAccess,
  37. ExtendedKeyUsage as _ExtendedKeyUsage,
  38. ExtensionNotFound as _ExtensionNotFound,
  39. load_pem_x509_certificate as _load_pem_x509_certificate,
  40. TLSFeature as _TLSFeature,
  41. TLSFeatureType as _TLSFeatureType)
  42. from cryptography.x509.oid import (
  43. AuthorityInformationAccessOID as _AuthorityInformationAccessOID,
  44. ExtendedKeyUsageOID as _ExtendedKeyUsageOID)
  45. from cryptography.x509.ocsp import (
  46. load_der_ocsp_response as _load_der_ocsp_response,
  47. OCSPCertStatus as _OCSPCertStatus,
  48. OCSPRequestBuilder as _OCSPRequestBuilder,
  49. OCSPResponseStatus as _OCSPResponseStatus)
  50. from requests import post as _post
  51. from requests.exceptions import RequestException as _RequestException
  52. # Note: the functions in this module generally return 1 or 0. The reason
  53. # is simple. The entry point, ocsp_callback, is registered as a callback
  54. # with OpenSSL through PyOpenSSL. The callback must return 1 (success) or
  55. # 0 (failure).
  56. _LOGGER = _logging.getLogger(__name__)
  57. _CERT_REGEX = _re.compile(
  58. b'-----BEGIN CERTIFICATE[^\r\n]+.+?-----END CERTIFICATE[^\r\n]+',
  59. _re.DOTALL)
  60. def _load_trusted_ca_certs(cafile):
  61. """Parse the tlsCAFile into a list of certificates."""
  62. with open(cafile, 'rb') as f:
  63. data = f.read()
  64. # Load all the certs in the file.
  65. trusted_ca_certs = []
  66. backend = _default_backend()
  67. for cert_data in _re.findall(_CERT_REGEX, data):
  68. trusted_ca_certs.append(
  69. _load_pem_x509_certificate(cert_data, backend))
  70. return trusted_ca_certs
  71. def _get_issuer_cert(cert, chain, trusted_ca_certs):
  72. issuer_name = cert.issuer
  73. for candidate in chain:
  74. if candidate.subject == issuer_name:
  75. return candidate
  76. # Depending on the server's TLS library, the peer's cert chain may not
  77. # include the self signed root CA. In this case we check the user
  78. # provided tlsCAFile (ssl_ca_certs) for the issuer.
  79. # Remove once we use the verified peer cert chain in PYTHON-2147.
  80. if trusted_ca_certs:
  81. for candidate in trusted_ca_certs:
  82. if candidate.subject == issuer_name:
  83. return candidate
  84. return None
  85. def _verify_signature(key, signature, algorithm, data):
  86. # See cryptography.x509.Certificate.public_key
  87. # for the public key types.
  88. try:
  89. if isinstance(key, _RSAPublicKey):
  90. key.verify(signature, data, _PKCS1v15(), algorithm)
  91. elif isinstance(key, _DSAPublicKey):
  92. key.verify(signature, data, algorithm)
  93. elif isinstance(key, _EllipticCurvePublicKey):
  94. key.verify(signature, data, _ECDSA(algorithm))
  95. else:
  96. key.verify(signature, data)
  97. except _InvalidSignature:
  98. return 0
  99. return 1
  100. def _get_extension(cert, klass):
  101. try:
  102. return cert.extensions.get_extension_for_class(klass)
  103. except _ExtensionNotFound:
  104. return None
  105. def _public_key_hash(cert):
  106. public_key = cert.public_key()
  107. # https://tools.ietf.org/html/rfc2560#section-4.2.1
  108. # "KeyHash ::= OCTET STRING -- SHA-1 hash of responder's public key
  109. # (excluding the tag and length fields)"
  110. # https://stackoverflow.com/a/46309453/600498
  111. if isinstance(public_key, _RSAPublicKey):
  112. pbytes = public_key.public_bytes(
  113. _Encoding.DER, _PublicFormat.PKCS1)
  114. elif isinstance(public_key, _EllipticCurvePublicKey):
  115. pbytes = public_key.public_bytes(
  116. _Encoding.X962, _PublicFormat.UncompressedPoint)
  117. else:
  118. pbytes = public_key.public_bytes(
  119. _Encoding.DER, _PublicFormat.SubjectPublicKeyInfo)
  120. digest = _Hash(_SHA1(), backend=_default_backend())
  121. digest.update(pbytes)
  122. return digest.finalize()
  123. def _get_certs_by_key_hash(certificates, issuer, responder_key_hash):
  124. return [
  125. cert for cert in certificates
  126. if _public_key_hash(cert) == responder_key_hash and
  127. cert.issuer == issuer.subject]
  128. def _get_certs_by_name(certificates, issuer, responder_name):
  129. return [
  130. cert for cert in certificates
  131. if cert.subject == responder_name and
  132. cert.issuer == issuer.subject]
  133. def _verify_response_signature(issuer, response):
  134. # Response object will have a responder_name or responder_key_hash
  135. # not both.
  136. name = response.responder_name
  137. rkey_hash = response.responder_key_hash
  138. ikey_hash = response.issuer_key_hash
  139. if name is not None and name == issuer.subject or rkey_hash == ikey_hash:
  140. _LOGGER.debug("Responder is issuer")
  141. # Responder is the issuer
  142. responder_cert = issuer
  143. else:
  144. _LOGGER.debug("Responder is a delegate")
  145. # Responder is a delegate
  146. # https://tools.ietf.org/html/rfc6960#section-2.6
  147. # RFC6960, Section 3.2, Number 3
  148. certs = response.certificates
  149. if response.responder_name is not None:
  150. responder_certs = _get_certs_by_name(certs, issuer, name)
  151. _LOGGER.debug("Using responder name")
  152. else:
  153. responder_certs = _get_certs_by_key_hash(certs, issuer, rkey_hash)
  154. _LOGGER.debug("Using key hash")
  155. if not responder_certs:
  156. _LOGGER.debug("No matching or valid responder certs.")
  157. return 0
  158. # XXX: Can there be more than one? If so, should we try each one
  159. # until we find one that passes signature verification?
  160. responder_cert = responder_certs[0]
  161. # RFC6960, Section 3.2, Number 4
  162. ext = _get_extension(responder_cert, _ExtendedKeyUsage)
  163. if not ext or _ExtendedKeyUsageOID.OCSP_SIGNING not in ext.value:
  164. _LOGGER.debug("Delegate not authorized for OCSP signing")
  165. return 0
  166. if not _verify_signature(
  167. issuer.public_key(),
  168. responder_cert.signature,
  169. responder_cert.signature_hash_algorithm,
  170. responder_cert.tbs_certificate_bytes):
  171. _LOGGER.debug("Delegate signature verification failed")
  172. return 0
  173. # RFC6960, Section 3.2, Number 2
  174. ret = _verify_signature(
  175. responder_cert.public_key(),
  176. response.signature,
  177. response.signature_hash_algorithm,
  178. response.tbs_response_bytes)
  179. if not ret:
  180. _LOGGER.debug("Response signature verification failed")
  181. return ret
  182. def _build_ocsp_request(cert, issuer):
  183. # https://cryptography.io/en/latest/x509/ocsp/#creating-requests
  184. builder = _OCSPRequestBuilder()
  185. builder = builder.add_certificate(cert, issuer, _SHA1())
  186. return builder.build()
  187. def _verify_response(issuer, response):
  188. _LOGGER.debug("Verifying response")
  189. # RFC6960, Section 3.2, Number 2, 3 and 4 happen here.
  190. res = _verify_response_signature(issuer, response)
  191. if not res:
  192. return 0
  193. # Note that we are not using a "tolerence period" as discussed in
  194. # https://tools.ietf.org/rfc/rfc5019.txt?
  195. now = _datetime.utcnow()
  196. # RFC6960, Section 3.2, Number 5
  197. if response.this_update > now:
  198. _LOGGER.debug("thisUpdate is in the future")
  199. return 0
  200. # RFC6960, Section 3.2, Number 6
  201. if response.next_update and response.next_update < now:
  202. _LOGGER.debug("nextUpdate is in the past")
  203. return 0
  204. return 1
  205. def _get_ocsp_response(cert, issuer, uri, ocsp_response_cache):
  206. ocsp_request = _build_ocsp_request(cert, issuer)
  207. try:
  208. ocsp_response = ocsp_response_cache[ocsp_request]
  209. _LOGGER.debug("Using cached OCSP response.")
  210. except KeyError:
  211. try:
  212. response = _post(
  213. uri,
  214. data=ocsp_request.public_bytes(_Encoding.DER),
  215. headers={'Content-Type': 'application/ocsp-request'},
  216. timeout=5)
  217. except _RequestException as exc:
  218. _LOGGER.debug("HTTP request failed: %s", exc)
  219. return None
  220. if response.status_code != 200:
  221. _LOGGER.debug("HTTP request returned %d", response.status_code)
  222. return None
  223. ocsp_response = _load_der_ocsp_response(response.content)
  224. _LOGGER.debug(
  225. "OCSP response status: %r", ocsp_response.response_status)
  226. if ocsp_response.response_status != _OCSPResponseStatus.SUCCESSFUL:
  227. return None
  228. # RFC6960, Section 3.2, Number 1. Only relevant if we need to
  229. # talk to the responder directly.
  230. # Accessing response.serial_number raises if response status is not
  231. # SUCCESSFUL.
  232. if ocsp_response.serial_number != ocsp_request.serial_number:
  233. _LOGGER.debug("Response serial number does not match request")
  234. return None
  235. if not _verify_response(issuer, ocsp_response):
  236. # The response failed verification.
  237. return None
  238. _LOGGER.debug("Caching OCSP response.")
  239. ocsp_response_cache[ocsp_request] = ocsp_response
  240. return ocsp_response
  241. def _ocsp_callback(conn, ocsp_bytes, user_data):
  242. """Callback for use with OpenSSL.SSL.Context.set_ocsp_client_callback."""
  243. cert = conn.get_peer_certificate()
  244. if cert is None:
  245. _LOGGER.debug("No peer cert?")
  246. return 0
  247. cert = cert.to_cryptography()
  248. chain = conn.get_peer_cert_chain()
  249. if not chain:
  250. _LOGGER.debug("No peer cert chain?")
  251. return 0
  252. chain = [cer.to_cryptography() for cer in chain]
  253. issuer = _get_issuer_cert(cert, chain, user_data.trusted_ca_certs)
  254. must_staple = False
  255. # https://tools.ietf.org/html/rfc7633#section-4.2.3.1
  256. ext = _get_extension(cert, _TLSFeature)
  257. if ext is not None:
  258. for feature in ext.value:
  259. if feature == _TLSFeatureType.status_request:
  260. _LOGGER.debug("Peer presented a must-staple cert")
  261. must_staple = True
  262. break
  263. ocsp_response_cache = user_data.ocsp_response_cache
  264. # No stapled OCSP response
  265. if ocsp_bytes == b'':
  266. _LOGGER.debug("Peer did not staple an OCSP response")
  267. if must_staple:
  268. _LOGGER.debug("Must-staple cert with no stapled response, hard fail.")
  269. return 0
  270. if not user_data.check_ocsp_endpoint:
  271. _LOGGER.debug("OCSP endpoint checking is disabled, soft fail.")
  272. # No stapled OCSP response, checking responder URI diabled, soft fail.
  273. return 1
  274. # https://tools.ietf.org/html/rfc6960#section-3.1
  275. ext = _get_extension(cert, _AuthorityInformationAccess)
  276. if ext is None:
  277. _LOGGER.debug("No authority access information, soft fail")
  278. # No stapled OCSP response, no responder URI, soft fail.
  279. return 1
  280. uris = [desc.access_location.value
  281. for desc in ext.value
  282. if desc.access_method == _AuthorityInformationAccessOID.OCSP]
  283. if not uris:
  284. _LOGGER.debug("No OCSP URI, soft fail")
  285. # No responder URI, soft fail.
  286. return 1
  287. if issuer is None:
  288. _LOGGER.debug("No issuer cert?")
  289. return 0
  290. _LOGGER.debug("Requesting OCSP data")
  291. # When requesting data from an OCSP endpoint we only fail on
  292. # successful, valid responses with a certificate status of REVOKED.
  293. for uri in uris:
  294. _LOGGER.debug("Trying %s", uri)
  295. response = _get_ocsp_response(
  296. cert, issuer, uri, ocsp_response_cache)
  297. if response is None:
  298. # The endpoint didn't respond in time, or the response was
  299. # unsuccessful or didn't match the request, or the response
  300. # failed verification.
  301. continue
  302. _LOGGER.debug("OCSP cert status: %r", response.certificate_status)
  303. if response.certificate_status == _OCSPCertStatus.GOOD:
  304. return 1
  305. if response.certificate_status == _OCSPCertStatus.REVOKED:
  306. return 0
  307. # Soft fail if we couldn't get a definitive status.
  308. _LOGGER.debug("No definitive OCSP cert status, soft fail")
  309. return 1
  310. _LOGGER.debug("Peer stapled an OCSP response")
  311. if issuer is None:
  312. _LOGGER.debug("No issuer cert?")
  313. return 0
  314. response = _load_der_ocsp_response(ocsp_bytes)
  315. _LOGGER.debug(
  316. "OCSP response status: %r", response.response_status)
  317. # This happens in _request_ocsp when there is no stapled response so
  318. # we know if we can compare serial numbers for the request and response.
  319. if response.response_status != _OCSPResponseStatus.SUCCESSFUL:
  320. return 0
  321. if not _verify_response(issuer, response):
  322. return 0
  323. # Cache the verified, stapled response.
  324. ocsp_response_cache[_build_ocsp_request(cert, issuer)] = response
  325. _LOGGER.debug("OCSP cert status: %r", response.certificate_status)
  326. if response.certificate_status == _OCSPCertStatus.REVOKED:
  327. return 0
  328. return 1