ocsp.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import absolute_import, division, print_function
  5. import abc
  6. import datetime
  7. from enum import Enum
  8. import six
  9. from cryptography import x509
  10. from cryptography.hazmat.primitives import hashes
  11. from cryptography.x509.base import (
  12. _EARLIEST_UTC_TIME,
  13. _convert_to_naive_utc_time,
  14. _reject_duplicate_extension,
  15. )
  16. _OIDS_TO_HASH = {
  17. "1.3.14.3.2.26": hashes.SHA1(),
  18. "2.16.840.1.101.3.4.2.4": hashes.SHA224(),
  19. "2.16.840.1.101.3.4.2.1": hashes.SHA256(),
  20. "2.16.840.1.101.3.4.2.2": hashes.SHA384(),
  21. "2.16.840.1.101.3.4.2.3": hashes.SHA512(),
  22. }
  23. class OCSPResponderEncoding(Enum):
  24. HASH = "By Hash"
  25. NAME = "By Name"
  26. class OCSPResponseStatus(Enum):
  27. SUCCESSFUL = 0
  28. MALFORMED_REQUEST = 1
  29. INTERNAL_ERROR = 2
  30. TRY_LATER = 3
  31. SIG_REQUIRED = 5
  32. UNAUTHORIZED = 6
  33. _RESPONSE_STATUS_TO_ENUM = {x.value: x for x in OCSPResponseStatus}
  34. _ALLOWED_HASHES = (
  35. hashes.SHA1,
  36. hashes.SHA224,
  37. hashes.SHA256,
  38. hashes.SHA384,
  39. hashes.SHA512,
  40. )
  41. def _verify_algorithm(algorithm):
  42. if not isinstance(algorithm, _ALLOWED_HASHES):
  43. raise ValueError(
  44. "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512"
  45. )
  46. class OCSPCertStatus(Enum):
  47. GOOD = 0
  48. REVOKED = 1
  49. UNKNOWN = 2
  50. _CERT_STATUS_TO_ENUM = {x.value: x for x in OCSPCertStatus}
  51. def load_der_ocsp_request(data):
  52. from cryptography.hazmat.backends.openssl.backend import backend
  53. return backend.load_der_ocsp_request(data)
  54. def load_der_ocsp_response(data):
  55. from cryptography.hazmat.backends.openssl.backend import backend
  56. return backend.load_der_ocsp_response(data)
  57. class OCSPRequestBuilder(object):
  58. def __init__(self, request=None, extensions=[]):
  59. self._request = request
  60. self._extensions = extensions
  61. def add_certificate(self, cert, issuer, algorithm):
  62. if self._request is not None:
  63. raise ValueError("Only one certificate can be added to a request")
  64. _verify_algorithm(algorithm)
  65. if not isinstance(cert, x509.Certificate) or not isinstance(
  66. issuer, x509.Certificate
  67. ):
  68. raise TypeError("cert and issuer must be a Certificate")
  69. return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions)
  70. def add_extension(self, extension, critical):
  71. if not isinstance(extension, x509.ExtensionType):
  72. raise TypeError("extension must be an ExtensionType")
  73. extension = x509.Extension(extension.oid, critical, extension)
  74. _reject_duplicate_extension(extension, self._extensions)
  75. return OCSPRequestBuilder(
  76. self._request, self._extensions + [extension]
  77. )
  78. def build(self):
  79. from cryptography.hazmat.backends.openssl.backend import backend
  80. if self._request is None:
  81. raise ValueError("You must add a certificate before building")
  82. return backend.create_ocsp_request(self)
  83. class _SingleResponse(object):
  84. def __init__(
  85. self,
  86. cert,
  87. issuer,
  88. algorithm,
  89. cert_status,
  90. this_update,
  91. next_update,
  92. revocation_time,
  93. revocation_reason,
  94. ):
  95. if not isinstance(cert, x509.Certificate) or not isinstance(
  96. issuer, x509.Certificate
  97. ):
  98. raise TypeError("cert and issuer must be a Certificate")
  99. _verify_algorithm(algorithm)
  100. if not isinstance(this_update, datetime.datetime):
  101. raise TypeError("this_update must be a datetime object")
  102. if next_update is not None and not isinstance(
  103. next_update, datetime.datetime
  104. ):
  105. raise TypeError("next_update must be a datetime object or None")
  106. self._cert = cert
  107. self._issuer = issuer
  108. self._algorithm = algorithm
  109. self._this_update = this_update
  110. self._next_update = next_update
  111. if not isinstance(cert_status, OCSPCertStatus):
  112. raise TypeError(
  113. "cert_status must be an item from the OCSPCertStatus enum"
  114. )
  115. if cert_status is not OCSPCertStatus.REVOKED:
  116. if revocation_time is not None:
  117. raise ValueError(
  118. "revocation_time can only be provided if the certificate "
  119. "is revoked"
  120. )
  121. if revocation_reason is not None:
  122. raise ValueError(
  123. "revocation_reason can only be provided if the certificate"
  124. " is revoked"
  125. )
  126. else:
  127. if not isinstance(revocation_time, datetime.datetime):
  128. raise TypeError("revocation_time must be a datetime object")
  129. revocation_time = _convert_to_naive_utc_time(revocation_time)
  130. if revocation_time < _EARLIEST_UTC_TIME:
  131. raise ValueError(
  132. "The revocation_time must be on or after"
  133. " 1950 January 1."
  134. )
  135. if revocation_reason is not None and not isinstance(
  136. revocation_reason, x509.ReasonFlags
  137. ):
  138. raise TypeError(
  139. "revocation_reason must be an item from the ReasonFlags "
  140. "enum or None"
  141. )
  142. self._cert_status = cert_status
  143. self._revocation_time = revocation_time
  144. self._revocation_reason = revocation_reason
  145. class OCSPResponseBuilder(object):
  146. def __init__(
  147. self, response=None, responder_id=None, certs=None, extensions=[]
  148. ):
  149. self._response = response
  150. self._responder_id = responder_id
  151. self._certs = certs
  152. self._extensions = extensions
  153. def add_response(
  154. self,
  155. cert,
  156. issuer,
  157. algorithm,
  158. cert_status,
  159. this_update,
  160. next_update,
  161. revocation_time,
  162. revocation_reason,
  163. ):
  164. if self._response is not None:
  165. raise ValueError("Only one response per OCSPResponse.")
  166. singleresp = _SingleResponse(
  167. cert,
  168. issuer,
  169. algorithm,
  170. cert_status,
  171. this_update,
  172. next_update,
  173. revocation_time,
  174. revocation_reason,
  175. )
  176. return OCSPResponseBuilder(
  177. singleresp,
  178. self._responder_id,
  179. self._certs,
  180. self._extensions,
  181. )
  182. def responder_id(self, encoding, responder_cert):
  183. if self._responder_id is not None:
  184. raise ValueError("responder_id can only be set once")
  185. if not isinstance(responder_cert, x509.Certificate):
  186. raise TypeError("responder_cert must be a Certificate")
  187. if not isinstance(encoding, OCSPResponderEncoding):
  188. raise TypeError(
  189. "encoding must be an element from OCSPResponderEncoding"
  190. )
  191. return OCSPResponseBuilder(
  192. self._response,
  193. (responder_cert, encoding),
  194. self._certs,
  195. self._extensions,
  196. )
  197. def certificates(self, certs):
  198. if self._certs is not None:
  199. raise ValueError("certificates may only be set once")
  200. certs = list(certs)
  201. if len(certs) == 0:
  202. raise ValueError("certs must not be an empty list")
  203. if not all(isinstance(x, x509.Certificate) for x in certs):
  204. raise TypeError("certs must be a list of Certificates")
  205. return OCSPResponseBuilder(
  206. self._response,
  207. self._responder_id,
  208. certs,
  209. self._extensions,
  210. )
  211. def add_extension(self, extension, critical):
  212. if not isinstance(extension, x509.ExtensionType):
  213. raise TypeError("extension must be an ExtensionType")
  214. extension = x509.Extension(extension.oid, critical, extension)
  215. _reject_duplicate_extension(extension, self._extensions)
  216. return OCSPResponseBuilder(
  217. self._response,
  218. self._responder_id,
  219. self._certs,
  220. self._extensions + [extension],
  221. )
  222. def sign(self, private_key, algorithm):
  223. from cryptography.hazmat.backends.openssl.backend import backend
  224. if self._response is None:
  225. raise ValueError("You must add a response before signing")
  226. if self._responder_id is None:
  227. raise ValueError("You must add a responder_id before signing")
  228. return backend.create_ocsp_response(
  229. OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
  230. )
  231. @classmethod
  232. def build_unsuccessful(cls, response_status):
  233. from cryptography.hazmat.backends.openssl.backend import backend
  234. if not isinstance(response_status, OCSPResponseStatus):
  235. raise TypeError(
  236. "response_status must be an item from OCSPResponseStatus"
  237. )
  238. if response_status is OCSPResponseStatus.SUCCESSFUL:
  239. raise ValueError("response_status cannot be SUCCESSFUL")
  240. return backend.create_ocsp_response(response_status, None, None, None)
  241. @six.add_metaclass(abc.ABCMeta)
  242. class OCSPRequest(object):
  243. @abc.abstractproperty
  244. def issuer_key_hash(self):
  245. """
  246. The hash of the issuer public key
  247. """
  248. @abc.abstractproperty
  249. def issuer_name_hash(self):
  250. """
  251. The hash of the issuer name
  252. """
  253. @abc.abstractproperty
  254. def hash_algorithm(self):
  255. """
  256. The hash algorithm used in the issuer name and key hashes
  257. """
  258. @abc.abstractproperty
  259. def serial_number(self):
  260. """
  261. The serial number of the cert whose status is being checked
  262. """
  263. @abc.abstractmethod
  264. def public_bytes(self, encoding):
  265. """
  266. Serializes the request to DER
  267. """
  268. @abc.abstractproperty
  269. def extensions(self):
  270. """
  271. The list of request extensions. Not single request extensions.
  272. """
  273. @six.add_metaclass(abc.ABCMeta)
  274. class OCSPResponse(object):
  275. @abc.abstractproperty
  276. def response_status(self):
  277. """
  278. The status of the response. This is a value from the OCSPResponseStatus
  279. enumeration
  280. """
  281. @abc.abstractproperty
  282. def signature_algorithm_oid(self):
  283. """
  284. The ObjectIdentifier of the signature algorithm
  285. """
  286. @abc.abstractproperty
  287. def signature_hash_algorithm(self):
  288. """
  289. Returns a HashAlgorithm corresponding to the type of the digest signed
  290. """
  291. @abc.abstractproperty
  292. def signature(self):
  293. """
  294. The signature bytes
  295. """
  296. @abc.abstractproperty
  297. def tbs_response_bytes(self):
  298. """
  299. The tbsResponseData bytes
  300. """
  301. @abc.abstractproperty
  302. def certificates(self):
  303. """
  304. A list of certificates used to help build a chain to verify the OCSP
  305. response. This situation occurs when the OCSP responder uses a delegate
  306. certificate.
  307. """
  308. @abc.abstractproperty
  309. def responder_key_hash(self):
  310. """
  311. The responder's key hash or None
  312. """
  313. @abc.abstractproperty
  314. def responder_name(self):
  315. """
  316. The responder's Name or None
  317. """
  318. @abc.abstractproperty
  319. def produced_at(self):
  320. """
  321. The time the response was produced
  322. """
  323. @abc.abstractproperty
  324. def certificate_status(self):
  325. """
  326. The status of the certificate (an element from the OCSPCertStatus enum)
  327. """
  328. @abc.abstractproperty
  329. def revocation_time(self):
  330. """
  331. The date of when the certificate was revoked or None if not
  332. revoked.
  333. """
  334. @abc.abstractproperty
  335. def revocation_reason(self):
  336. """
  337. The reason the certificate was revoked or None if not specified or
  338. not revoked.
  339. """
  340. @abc.abstractproperty
  341. def this_update(self):
  342. """
  343. The most recent time at which the status being indicated is known by
  344. the responder to have been correct
  345. """
  346. @abc.abstractproperty
  347. def next_update(self):
  348. """
  349. The time when newer information will be available
  350. """
  351. @abc.abstractproperty
  352. def issuer_key_hash(self):
  353. """
  354. The hash of the issuer public key
  355. """
  356. @abc.abstractproperty
  357. def issuer_name_hash(self):
  358. """
  359. The hash of the issuer name
  360. """
  361. @abc.abstractproperty
  362. def hash_algorithm(self):
  363. """
  364. The hash algorithm used in the issuer name and key hashes
  365. """
  366. @abc.abstractproperty
  367. def serial_number(self):
  368. """
  369. The serial number of the cert whose status is being checked
  370. """
  371. @abc.abstractproperty
  372. def extensions(self):
  373. """
  374. The list of response extensions. Not single response extensions.
  375. """
  376. @abc.abstractproperty
  377. def single_extensions(self):
  378. """
  379. The list of single response extensions. Not response extensions.
  380. """