ocsp.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import absolute_import, division, print_function
  5. import functools
  6. from cryptography import utils, x509
  7. from cryptography.exceptions import UnsupportedAlgorithm
  8. from cryptography.hazmat.backends.openssl.decode_asn1 import (
  9. _CRL_ENTRY_REASON_CODE_TO_ENUM,
  10. _asn1_integer_to_int,
  11. _asn1_string_to_bytes,
  12. _decode_x509_name,
  13. _obj2txt,
  14. _parse_asn1_generalized_time,
  15. )
  16. from cryptography.hazmat.backends.openssl.x509 import _Certificate
  17. from cryptography.hazmat.primitives import serialization
  18. from cryptography.x509.ocsp import (
  19. OCSPCertStatus,
  20. OCSPRequest,
  21. OCSPResponse,
  22. OCSPResponseStatus,
  23. _CERT_STATUS_TO_ENUM,
  24. _OIDS_TO_HASH,
  25. _RESPONSE_STATUS_TO_ENUM,
  26. )
  27. def _requires_successful_response(func):
  28. @functools.wraps(func)
  29. def wrapper(self, *args):
  30. if self.response_status != OCSPResponseStatus.SUCCESSFUL:
  31. raise ValueError(
  32. "OCSP response status is not successful so the property "
  33. "has no value"
  34. )
  35. else:
  36. return func(self, *args)
  37. return wrapper
  38. def _issuer_key_hash(backend, cert_id):
  39. key_hash = backend._ffi.new("ASN1_OCTET_STRING **")
  40. res = backend._lib.OCSP_id_get0_info(
  41. backend._ffi.NULL,
  42. backend._ffi.NULL,
  43. key_hash,
  44. backend._ffi.NULL,
  45. cert_id,
  46. )
  47. backend.openssl_assert(res == 1)
  48. backend.openssl_assert(key_hash[0] != backend._ffi.NULL)
  49. return _asn1_string_to_bytes(backend, key_hash[0])
  50. def _issuer_name_hash(backend, cert_id):
  51. name_hash = backend._ffi.new("ASN1_OCTET_STRING **")
  52. res = backend._lib.OCSP_id_get0_info(
  53. name_hash,
  54. backend._ffi.NULL,
  55. backend._ffi.NULL,
  56. backend._ffi.NULL,
  57. cert_id,
  58. )
  59. backend.openssl_assert(res == 1)
  60. backend.openssl_assert(name_hash[0] != backend._ffi.NULL)
  61. return _asn1_string_to_bytes(backend, name_hash[0])
  62. def _serial_number(backend, cert_id):
  63. num = backend._ffi.new("ASN1_INTEGER **")
  64. res = backend._lib.OCSP_id_get0_info(
  65. backend._ffi.NULL, backend._ffi.NULL, backend._ffi.NULL, num, cert_id
  66. )
  67. backend.openssl_assert(res == 1)
  68. backend.openssl_assert(num[0] != backend._ffi.NULL)
  69. return _asn1_integer_to_int(backend, num[0])
  70. def _hash_algorithm(backend, cert_id):
  71. asn1obj = backend._ffi.new("ASN1_OBJECT **")
  72. res = backend._lib.OCSP_id_get0_info(
  73. backend._ffi.NULL,
  74. asn1obj,
  75. backend._ffi.NULL,
  76. backend._ffi.NULL,
  77. cert_id,
  78. )
  79. backend.openssl_assert(res == 1)
  80. backend.openssl_assert(asn1obj[0] != backend._ffi.NULL)
  81. oid = _obj2txt(backend, asn1obj[0])
  82. try:
  83. return _OIDS_TO_HASH[oid]
  84. except KeyError:
  85. raise UnsupportedAlgorithm(
  86. "Signature algorithm OID: {} not recognized".format(oid)
  87. )
  88. @utils.register_interface(OCSPResponse)
  89. class _OCSPResponse(object):
  90. def __init__(self, backend, ocsp_response):
  91. self._backend = backend
  92. self._ocsp_response = ocsp_response
  93. status = self._backend._lib.OCSP_response_status(self._ocsp_response)
  94. self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM)
  95. self._status = _RESPONSE_STATUS_TO_ENUM[status]
  96. if self._status is OCSPResponseStatus.SUCCESSFUL:
  97. basic = self._backend._lib.OCSP_response_get1_basic(
  98. self._ocsp_response
  99. )
  100. self._backend.openssl_assert(basic != self._backend._ffi.NULL)
  101. self._basic = self._backend._ffi.gc(
  102. basic, self._backend._lib.OCSP_BASICRESP_free
  103. )
  104. num_resp = self._backend._lib.OCSP_resp_count(self._basic)
  105. if num_resp != 1:
  106. raise ValueError(
  107. "OCSP response contains more than one SINGLERESP structure"
  108. ", which this library does not support. "
  109. "{} found".format(num_resp)
  110. )
  111. self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0)
  112. self._backend.openssl_assert(
  113. self._single != self._backend._ffi.NULL
  114. )
  115. self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id(
  116. self._single
  117. )
  118. self._backend.openssl_assert(
  119. self._cert_id != self._backend._ffi.NULL
  120. )
  121. response_status = utils.read_only_property("_status")
  122. @property
  123. @_requires_successful_response
  124. def signature_algorithm_oid(self):
  125. alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic)
  126. self._backend.openssl_assert(alg != self._backend._ffi.NULL)
  127. oid = _obj2txt(self._backend, alg.algorithm)
  128. return x509.ObjectIdentifier(oid)
  129. @property
  130. @_requires_successful_response
  131. def signature_hash_algorithm(self):
  132. oid = self.signature_algorithm_oid
  133. try:
  134. return x509._SIG_OIDS_TO_HASH[oid]
  135. except KeyError:
  136. raise UnsupportedAlgorithm(
  137. "Signature algorithm OID:{} not recognized".format(oid)
  138. )
  139. @property
  140. @_requires_successful_response
  141. def signature(self):
  142. sig = self._backend._lib.OCSP_resp_get0_signature(self._basic)
  143. self._backend.openssl_assert(sig != self._backend._ffi.NULL)
  144. return _asn1_string_to_bytes(self._backend, sig)
  145. @property
  146. @_requires_successful_response
  147. def tbs_response_bytes(self):
  148. respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic)
  149. self._backend.openssl_assert(respdata != self._backend._ffi.NULL)
  150. pp = self._backend._ffi.new("unsigned char **")
  151. res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp)
  152. self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL)
  153. pp = self._backend._ffi.gc(
  154. pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
  155. )
  156. self._backend.openssl_assert(res > 0)
  157. return self._backend._ffi.buffer(pp[0], res)[:]
  158. @property
  159. @_requires_successful_response
  160. def certificates(self):
  161. sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic)
  162. num = self._backend._lib.sk_X509_num(sk_x509)
  163. certs = []
  164. for i in range(num):
  165. x509 = self._backend._lib.sk_X509_value(sk_x509, i)
  166. self._backend.openssl_assert(x509 != self._backend._ffi.NULL)
  167. cert = _Certificate(self._backend, x509)
  168. # We need to keep the OCSP response that the certificate came from
  169. # alive until the Certificate object itself goes out of scope, so
  170. # we give it a private reference.
  171. cert._ocsp_resp = self
  172. certs.append(cert)
  173. return certs
  174. @property
  175. @_requires_successful_response
  176. def responder_key_hash(self):
  177. _, asn1_string = self._responder_key_name()
  178. if asn1_string == self._backend._ffi.NULL:
  179. return None
  180. else:
  181. return _asn1_string_to_bytes(self._backend, asn1_string)
  182. @property
  183. @_requires_successful_response
  184. def responder_name(self):
  185. x509_name, _ = self._responder_key_name()
  186. if x509_name == self._backend._ffi.NULL:
  187. return None
  188. else:
  189. return _decode_x509_name(self._backend, x509_name)
  190. def _responder_key_name(self):
  191. asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **")
  192. x509_name = self._backend._ffi.new("X509_NAME **")
  193. res = self._backend._lib.OCSP_resp_get0_id(
  194. self._basic, asn1_string, x509_name
  195. )
  196. self._backend.openssl_assert(res == 1)
  197. return x509_name[0], asn1_string[0]
  198. @property
  199. @_requires_successful_response
  200. def produced_at(self):
  201. produced_at = self._backend._lib.OCSP_resp_get0_produced_at(
  202. self._basic
  203. )
  204. return _parse_asn1_generalized_time(self._backend, produced_at)
  205. @property
  206. @_requires_successful_response
  207. def certificate_status(self):
  208. status = self._backend._lib.OCSP_single_get0_status(
  209. self._single,
  210. self._backend._ffi.NULL,
  211. self._backend._ffi.NULL,
  212. self._backend._ffi.NULL,
  213. self._backend._ffi.NULL,
  214. )
  215. self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM)
  216. return _CERT_STATUS_TO_ENUM[status]
  217. @property
  218. @_requires_successful_response
  219. def revocation_time(self):
  220. if self.certificate_status is not OCSPCertStatus.REVOKED:
  221. return None
  222. asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
  223. self._backend._lib.OCSP_single_get0_status(
  224. self._single,
  225. self._backend._ffi.NULL,
  226. asn1_time,
  227. self._backend._ffi.NULL,
  228. self._backend._ffi.NULL,
  229. )
  230. self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
  231. return _parse_asn1_generalized_time(self._backend, asn1_time[0])
  232. @property
  233. @_requires_successful_response
  234. def revocation_reason(self):
  235. if self.certificate_status is not OCSPCertStatus.REVOKED:
  236. return None
  237. reason_ptr = self._backend._ffi.new("int *")
  238. self._backend._lib.OCSP_single_get0_status(
  239. self._single,
  240. reason_ptr,
  241. self._backend._ffi.NULL,
  242. self._backend._ffi.NULL,
  243. self._backend._ffi.NULL,
  244. )
  245. # If no reason is encoded OpenSSL returns -1
  246. if reason_ptr[0] == -1:
  247. return None
  248. else:
  249. self._backend.openssl_assert(
  250. reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM
  251. )
  252. return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]]
  253. @property
  254. @_requires_successful_response
  255. def this_update(self):
  256. asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
  257. self._backend._lib.OCSP_single_get0_status(
  258. self._single,
  259. self._backend._ffi.NULL,
  260. self._backend._ffi.NULL,
  261. asn1_time,
  262. self._backend._ffi.NULL,
  263. )
  264. self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
  265. return _parse_asn1_generalized_time(self._backend, asn1_time[0])
  266. @property
  267. @_requires_successful_response
  268. def next_update(self):
  269. asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
  270. self._backend._lib.OCSP_single_get0_status(
  271. self._single,
  272. self._backend._ffi.NULL,
  273. self._backend._ffi.NULL,
  274. self._backend._ffi.NULL,
  275. asn1_time,
  276. )
  277. if asn1_time[0] != self._backend._ffi.NULL:
  278. return _parse_asn1_generalized_time(self._backend, asn1_time[0])
  279. else:
  280. return None
  281. @property
  282. @_requires_successful_response
  283. def issuer_key_hash(self):
  284. return _issuer_key_hash(self._backend, self._cert_id)
  285. @property
  286. @_requires_successful_response
  287. def issuer_name_hash(self):
  288. return _issuer_name_hash(self._backend, self._cert_id)
  289. @property
  290. @_requires_successful_response
  291. def hash_algorithm(self):
  292. return _hash_algorithm(self._backend, self._cert_id)
  293. @property
  294. @_requires_successful_response
  295. def serial_number(self):
  296. return _serial_number(self._backend, self._cert_id)
  297. @utils.cached_property
  298. @_requires_successful_response
  299. def extensions(self):
  300. return self._backend._ocsp_basicresp_ext_parser.parse(self._basic)
  301. @utils.cached_property
  302. @_requires_successful_response
  303. def single_extensions(self):
  304. return self._backend._ocsp_singleresp_ext_parser.parse(self._single)
  305. def public_bytes(self, encoding):
  306. if encoding is not serialization.Encoding.DER:
  307. raise ValueError("The only allowed encoding value is Encoding.DER")
  308. bio = self._backend._create_mem_bio_gc()
  309. res = self._backend._lib.i2d_OCSP_RESPONSE_bio(
  310. bio, self._ocsp_response
  311. )
  312. self._backend.openssl_assert(res > 0)
  313. return self._backend._read_mem_bio(bio)
  314. @utils.register_interface(OCSPRequest)
  315. class _OCSPRequest(object):
  316. def __init__(self, backend, ocsp_request):
  317. if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1:
  318. raise NotImplementedError(
  319. "OCSP request contains more than one request"
  320. )
  321. self._backend = backend
  322. self._ocsp_request = ocsp_request
  323. self._request = self._backend._lib.OCSP_request_onereq_get0(
  324. self._ocsp_request, 0
  325. )
  326. self._backend.openssl_assert(self._request != self._backend._ffi.NULL)
  327. self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request)
  328. self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL)
  329. @property
  330. def issuer_key_hash(self):
  331. return _issuer_key_hash(self._backend, self._cert_id)
  332. @property
  333. def issuer_name_hash(self):
  334. return _issuer_name_hash(self._backend, self._cert_id)
  335. @property
  336. def serial_number(self):
  337. return _serial_number(self._backend, self._cert_id)
  338. @property
  339. def hash_algorithm(self):
  340. return _hash_algorithm(self._backend, self._cert_id)
  341. @utils.cached_property
  342. def extensions(self):
  343. return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request)
  344. def public_bytes(self, encoding):
  345. if encoding is not serialization.Encoding.DER:
  346. raise ValueError("The only allowed encoding value is Encoding.DER")
  347. bio = self._backend._create_mem_bio_gc()
  348. res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request)
  349. self._backend.openssl_assert(res > 0)
  350. return self._backend._read_mem_bio(bio)