ocsp.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. # coding: utf-8
  2. """
  3. ASN.1 type classes for the online certificate status protocol (OCSP). Exports
  4. the following items:
  5. - OCSPRequest()
  6. - OCSPResponse()
  7. Other type classes are defined that help compose the types listed above.
  8. """
  9. from __future__ import unicode_literals, division, absolute_import, print_function
  10. from .algos import DigestAlgorithm, SignedDigestAlgorithm
  11. from .core import (
  12. Boolean,
  13. Choice,
  14. Enumerated,
  15. GeneralizedTime,
  16. IA5String,
  17. Integer,
  18. Null,
  19. ObjectIdentifier,
  20. OctetBitString,
  21. OctetString,
  22. ParsableOctetString,
  23. Sequence,
  24. SequenceOf,
  25. )
  26. from .crl import AuthorityInfoAccessSyntax, CRLReason
  27. from .keys import PublicKeyAlgorithm
  28. from .x509 import Certificate, GeneralName, GeneralNames, Name
  29. # The structures in this file are taken from https://tools.ietf.org/html/rfc6960
  30. class Version(Integer):
  31. _map = {
  32. 0: 'v1'
  33. }
  34. class CertId(Sequence):
  35. _fields = [
  36. ('hash_algorithm', DigestAlgorithm),
  37. ('issuer_name_hash', OctetString),
  38. ('issuer_key_hash', OctetString),
  39. ('serial_number', Integer),
  40. ]
  41. class ServiceLocator(Sequence):
  42. _fields = [
  43. ('issuer', Name),
  44. ('locator', AuthorityInfoAccessSyntax),
  45. ]
  46. class RequestExtensionId(ObjectIdentifier):
  47. _map = {
  48. '1.3.6.1.5.5.7.48.1.7': 'service_locator',
  49. }
  50. class RequestExtension(Sequence):
  51. _fields = [
  52. ('extn_id', RequestExtensionId),
  53. ('critical', Boolean, {'default': False}),
  54. ('extn_value', ParsableOctetString),
  55. ]
  56. _oid_pair = ('extn_id', 'extn_value')
  57. _oid_specs = {
  58. 'service_locator': ServiceLocator,
  59. }
  60. class RequestExtensions(SequenceOf):
  61. _child_spec = RequestExtension
  62. class Request(Sequence):
  63. _fields = [
  64. ('req_cert', CertId),
  65. ('single_request_extensions', RequestExtensions, {'tag_type': 'explicit', 'tag': 0, 'optional': True}),
  66. ]
  67. _processed_extensions = False
  68. _critical_extensions = None
  69. _service_locator_value = None
  70. def _set_extensions(self):
  71. """
  72. Sets common named extensions to private attributes and creates a list
  73. of critical extensions
  74. """
  75. self._critical_extensions = set()
  76. for extension in self['single_request_extensions']:
  77. name = extension['extn_id'].native
  78. attribute_name = '_%s_value' % name
  79. if hasattr(self, attribute_name):
  80. setattr(self, attribute_name, extension['extn_value'].parsed)
  81. if extension['critical'].native:
  82. self._critical_extensions.add(name)
  83. self._processed_extensions = True
  84. @property
  85. def critical_extensions(self):
  86. """
  87. Returns a set of the names (or OID if not a known extension) of the
  88. extensions marked as critical
  89. :return:
  90. A set of unicode strings
  91. """
  92. if not self._processed_extensions:
  93. self._set_extensions()
  94. return self._critical_extensions
  95. @property
  96. def service_locator_value(self):
  97. """
  98. This extension is used when communicating with an OCSP responder that
  99. acts as a proxy for OCSP requests
  100. :return:
  101. None or a ServiceLocator object
  102. """
  103. if self._processed_extensions is False:
  104. self._set_extensions()
  105. return self._service_locator_value
  106. class Requests(SequenceOf):
  107. _child_spec = Request
  108. class ResponseType(ObjectIdentifier):
  109. _map = {
  110. '1.3.6.1.5.5.7.48.1.1': 'basic_ocsp_response',
  111. }
  112. class AcceptableResponses(SequenceOf):
  113. _child_spec = ResponseType
  114. class PreferredSignatureAlgorithm(Sequence):
  115. _fields = [
  116. ('sig_identifier', SignedDigestAlgorithm),
  117. ('cert_identifier', PublicKeyAlgorithm, {'optional': True}),
  118. ]
  119. class PreferredSignatureAlgorithms(SequenceOf):
  120. _child_spec = PreferredSignatureAlgorithm
  121. class TBSRequestExtensionId(ObjectIdentifier):
  122. _map = {
  123. '1.3.6.1.5.5.7.48.1.2': 'nonce',
  124. '1.3.6.1.5.5.7.48.1.4': 'acceptable_responses',
  125. '1.3.6.1.5.5.7.48.1.8': 'preferred_signature_algorithms',
  126. }
  127. class TBSRequestExtension(Sequence):
  128. _fields = [
  129. ('extn_id', TBSRequestExtensionId),
  130. ('critical', Boolean, {'default': False}),
  131. ('extn_value', ParsableOctetString),
  132. ]
  133. _oid_pair = ('extn_id', 'extn_value')
  134. _oid_specs = {
  135. 'nonce': OctetString,
  136. 'acceptable_responses': AcceptableResponses,
  137. 'preferred_signature_algorithms': PreferredSignatureAlgorithms,
  138. }
  139. class TBSRequestExtensions(SequenceOf):
  140. _child_spec = TBSRequestExtension
  141. class TBSRequest(Sequence):
  142. _fields = [
  143. ('version', Version, {'tag_type': 'explicit', 'tag': 0, 'default': 'v1'}),
  144. ('requestor_name', GeneralName, {'tag_type': 'explicit', 'tag': 1, 'optional': True}),
  145. ('request_list', Requests),
  146. ('request_extensions', TBSRequestExtensions, {'tag_type': 'explicit', 'tag': 2, 'optional': True}),
  147. ]
  148. class Certificates(SequenceOf):
  149. _child_spec = Certificate
  150. class Signature(Sequence):
  151. _fields = [
  152. ('signature_algorithm', SignedDigestAlgorithm),
  153. ('signature', OctetBitString),
  154. ('certs', Certificates, {'tag_type': 'explicit', 'tag': 0, 'optional': True}),
  155. ]
  156. class OCSPRequest(Sequence):
  157. _fields = [
  158. ('tbs_request', TBSRequest),
  159. ('optional_signature', Signature, {'tag_type': 'explicit', 'tag': 0, 'optional': True}),
  160. ]
  161. _processed_extensions = False
  162. _critical_extensions = None
  163. _nonce_value = None
  164. _acceptable_responses_value = None
  165. _preferred_signature_algorithms_value = None
  166. def _set_extensions(self):
  167. """
  168. Sets common named extensions to private attributes and creates a list
  169. of critical extensions
  170. """
  171. self._critical_extensions = set()
  172. for extension in self['tbs_request']['request_extensions']:
  173. name = extension['extn_id'].native
  174. attribute_name = '_%s_value' % name
  175. if hasattr(self, attribute_name):
  176. setattr(self, attribute_name, extension['extn_value'].parsed)
  177. if extension['critical'].native:
  178. self._critical_extensions.add(name)
  179. self._processed_extensions = True
  180. @property
  181. def critical_extensions(self):
  182. """
  183. Returns a set of the names (or OID if not a known extension) of the
  184. extensions marked as critical
  185. :return:
  186. A set of unicode strings
  187. """
  188. if not self._processed_extensions:
  189. self._set_extensions()
  190. return self._critical_extensions
  191. @property
  192. def nonce_value(self):
  193. """
  194. This extension is used to prevent replay attacks by including a unique,
  195. random value with each request/response pair
  196. :return:
  197. None or an OctetString object
  198. """
  199. if self._processed_extensions is False:
  200. self._set_extensions()
  201. return self._nonce_value
  202. @property
  203. def acceptable_responses_value(self):
  204. """
  205. This extension is used to allow the client and server to communicate
  206. with alternative response formats other than just basic_ocsp_response,
  207. although no other formats are defined in the standard.
  208. :return:
  209. None or an AcceptableResponses object
  210. """
  211. if self._processed_extensions is False:
  212. self._set_extensions()
  213. return self._acceptable_responses_value
  214. @property
  215. def preferred_signature_algorithms_value(self):
  216. """
  217. This extension is used by the client to define what signature algorithms
  218. are preferred, including both the hash algorithm and the public key
  219. algorithm, with a level of detail down to even the public key algorithm
  220. parameters, such as curve name.
  221. :return:
  222. None or a PreferredSignatureAlgorithms object
  223. """
  224. if self._processed_extensions is False:
  225. self._set_extensions()
  226. return self._preferred_signature_algorithms_value
  227. class OCSPResponseStatus(Enumerated):
  228. _map = {
  229. 0: 'successful',
  230. 1: 'malformed_request',
  231. 2: 'internal_error',
  232. 3: 'try_later',
  233. 5: 'sign_required',
  234. 6: 'unauthorized',
  235. }
  236. class ResponderId(Choice):
  237. _alternatives = [
  238. ('by_name', Name, {'tag_type': 'explicit', 'tag': 1}),
  239. ('by_key', OctetString, {'tag_type': 'explicit', 'tag': 2}),
  240. ]
  241. class RevokedInfo(Sequence):
  242. _fields = [
  243. ('revocation_time', GeneralizedTime),
  244. ('revocation_reason', CRLReason, {'tag_type': 'explicit', 'tag': 0, 'optional': True}),
  245. ]
  246. class CertStatus(Choice):
  247. _alternatives = [
  248. ('good', Null, {'tag_type': 'implicit', 'tag': 0}),
  249. ('revoked', RevokedInfo, {'tag_type': 'implicit', 'tag': 1}),
  250. ('unknown', Null, {'tag_type': 'implicit', 'tag': 2}),
  251. ]
  252. class CrlId(Sequence):
  253. _fields = [
  254. ('crl_url', IA5String, {'tag_type': 'explicit', 'tag': 0, 'optional': True}),
  255. ('crl_num', Integer, {'tag_type': 'explicit', 'tag': 1, 'optional': True}),
  256. ('crl_time', GeneralizedTime, {'tag_type': 'explicit', 'tag': 2, 'optional': True}),
  257. ]
  258. class SingleResponseExtensionId(ObjectIdentifier):
  259. _map = {
  260. '1.3.6.1.5.5.7.48.1.3': 'crl',
  261. '1.3.6.1.5.5.7.48.1.6': 'archive_cutoff',
  262. # These are CRLEntryExtension values from
  263. # https://tools.ietf.org/html/rfc5280
  264. '2.5.29.21': 'crl_reason',
  265. '2.5.29.24': 'invalidity_date',
  266. '2.5.29.29': 'certificate_issuer',
  267. }
  268. class SingleResponseExtension(Sequence):
  269. _fields = [
  270. ('extn_id', SingleResponseExtensionId),
  271. ('critical', Boolean, {'default': False}),
  272. ('extn_value', ParsableOctetString),
  273. ]
  274. _oid_pair = ('extn_id', 'extn_value')
  275. _oid_specs = {
  276. 'crl': CrlId,
  277. 'archive_cutoff': GeneralizedTime,
  278. 'crl_reason': CRLReason,
  279. 'invalidity_date': GeneralizedTime,
  280. 'certificate_issuer': GeneralNames,
  281. }
  282. class SingleResponseExtensions(SequenceOf):
  283. _child_spec = SingleResponseExtension
  284. class SingleResponse(Sequence):
  285. _fields = [
  286. ('cert_id', CertId),
  287. ('cert_status', CertStatus),
  288. ('this_update', GeneralizedTime),
  289. ('next_update', GeneralizedTime, {'tag_type': 'explicit', 'tag': 0, 'optional': True}),
  290. ('single_extensions', SingleResponseExtensions, {'tag_type': 'explicit', 'tag': 1, 'optional': True}),
  291. ]
  292. _processed_extensions = False
  293. _critical_extensions = None
  294. _crl_value = None
  295. _archive_cutoff_value = None
  296. _crl_reason_value = None
  297. _invalidity_date_value = None
  298. _certificate_issuer_value = None
  299. def _set_extensions(self):
  300. """
  301. Sets common named extensions to private attributes and creates a list
  302. of critical extensions
  303. """
  304. self._critical_extensions = set()
  305. for extension in self['single_extensions']:
  306. name = extension['extn_id'].native
  307. attribute_name = '_%s_value' % name
  308. if hasattr(self, attribute_name):
  309. setattr(self, attribute_name, extension['extn_value'].parsed)
  310. if extension['critical'].native:
  311. self._critical_extensions.add(name)
  312. self._processed_extensions = True
  313. @property
  314. def critical_extensions(self):
  315. """
  316. Returns a set of the names (or OID if not a known extension) of the
  317. extensions marked as critical
  318. :return:
  319. A set of unicode strings
  320. """
  321. if not self._processed_extensions:
  322. self._set_extensions()
  323. return self._critical_extensions
  324. @property
  325. def crl_value(self):
  326. """
  327. This extension is used to locate the CRL that a certificate's revocation
  328. is contained within.
  329. :return:
  330. None or a CrlId object
  331. """
  332. if self._processed_extensions is False:
  333. self._set_extensions()
  334. return self._crl_value
  335. @property
  336. def archive_cutoff_value(self):
  337. """
  338. This extension is used to indicate the date at which an archived
  339. (historical) certificate status entry will no longer be available.
  340. :return:
  341. None or a GeneralizedTime object
  342. """
  343. if self._processed_extensions is False:
  344. self._set_extensions()
  345. return self._archive_cutoff_value
  346. @property
  347. def crl_reason_value(self):
  348. """
  349. This extension indicates the reason that a certificate was revoked.
  350. :return:
  351. None or a CRLReason object
  352. """
  353. if self._processed_extensions is False:
  354. self._set_extensions()
  355. return self._crl_reason_value
  356. @property
  357. def invalidity_date_value(self):
  358. """
  359. This extension indicates the suspected date/time the private key was
  360. compromised or the certificate became invalid. This would usually be
  361. before the revocation date, which is when the CA processed the
  362. revocation.
  363. :return:
  364. None or a GeneralizedTime object
  365. """
  366. if self._processed_extensions is False:
  367. self._set_extensions()
  368. return self._invalidity_date_value
  369. @property
  370. def certificate_issuer_value(self):
  371. """
  372. This extension indicates the issuer of the certificate in question.
  373. :return:
  374. None or an x509.GeneralNames object
  375. """
  376. if self._processed_extensions is False:
  377. self._set_extensions()
  378. return self._certificate_issuer_value
  379. class Responses(SequenceOf):
  380. _child_spec = SingleResponse
  381. class ResponseDataExtensionId(ObjectIdentifier):
  382. _map = {
  383. '1.3.6.1.5.5.7.48.1.2': 'nonce',
  384. '1.3.6.1.5.5.7.48.1.9': 'extended_revoke',
  385. }
  386. class ResponseDataExtension(Sequence):
  387. _fields = [
  388. ('extn_id', ResponseDataExtensionId),
  389. ('critical', Boolean, {'default': False}),
  390. ('extn_value', ParsableOctetString),
  391. ]
  392. _oid_pair = ('extn_id', 'extn_value')
  393. _oid_specs = {
  394. 'nonce': OctetString,
  395. 'extended_revoke': Null,
  396. }
  397. class ResponseDataExtensions(SequenceOf):
  398. _child_spec = ResponseDataExtension
  399. class ResponseData(Sequence):
  400. _fields = [
  401. ('version', Version, {'tag_type': 'explicit', 'tag': 0, 'default': 'v1'}),
  402. ('responder_id', ResponderId),
  403. ('produced_at', GeneralizedTime),
  404. ('responses', Responses),
  405. ('response_extensions', ResponseDataExtensions, {'tag_type': 'explicit', 'tag': 1, 'optional': True}),
  406. ]
  407. class BasicOCSPResponse(Sequence):
  408. _fields = [
  409. ('tbs_response_data', ResponseData),
  410. ('signature_algorithm', SignedDigestAlgorithm),
  411. ('signature', OctetBitString),
  412. ('certs', Certificates, {'tag_type': 'explicit', 'tag': 0, 'optional': True}),
  413. ]
  414. class ResponseBytes(Sequence):
  415. _fields = [
  416. ('response_type', ResponseType),
  417. ('response', ParsableOctetString),
  418. ]
  419. _oid_pair = ('response_type', 'response')
  420. _oid_specs = {
  421. 'basic_ocsp_response': BasicOCSPResponse,
  422. }
  423. class OCSPResponse(Sequence):
  424. _fields = [
  425. ('response_status', OCSPResponseStatus),
  426. ('response_bytes', ResponseBytes, {'tag_type': 'explicit', 'tag': 0, 'optional': True}),
  427. ]
  428. _processed_extensions = False
  429. _critical_extensions = None
  430. _nonce_value = None
  431. _extended_revoke_value = None
  432. def _set_extensions(self):
  433. """
  434. Sets common named extensions to private attributes and creates a list
  435. of critical extensions
  436. """
  437. self._critical_extensions = set()
  438. for extension in self['response_bytes']['response'].parsed['tbs_response_data']['response_extensions']:
  439. name = extension['extn_id'].native
  440. attribute_name = '_%s_value' % name
  441. if hasattr(self, attribute_name):
  442. setattr(self, attribute_name, extension['extn_value'].parsed)
  443. if extension['critical'].native:
  444. self._critical_extensions.add(name)
  445. self._processed_extensions = True
  446. @property
  447. def critical_extensions(self):
  448. """
  449. Returns a set of the names (or OID if not a known extension) of the
  450. extensions marked as critical
  451. :return:
  452. A set of unicode strings
  453. """
  454. if not self._processed_extensions:
  455. self._set_extensions()
  456. return self._critical_extensions
  457. @property
  458. def nonce_value(self):
  459. """
  460. This extension is used to prevent replay attacks on the request/response
  461. exchange
  462. :return:
  463. None or an OctetString object
  464. """
  465. if self._processed_extensions is False:
  466. self._set_extensions()
  467. return self._nonce_value
  468. @property
  469. def extended_revoke_value(self):
  470. """
  471. This extension is used to signal that the responder will return a
  472. "revoked" status for non-issued certificates.
  473. :return:
  474. None or a Null object (if present)
  475. """
  476. if self._processed_extensions is False:
  477. self._set_extensions()
  478. return self._extended_revoke_value
  479. @property
  480. def basic_ocsp_response(self):
  481. """
  482. A shortcut into the BasicOCSPResponse sequence
  483. :return:
  484. None or an asn1crypto.ocsp.BasicOCSPResponse object
  485. """
  486. return self['response_bytes']['response'].parsed
  487. @property
  488. def response_data(self):
  489. """
  490. A shortcut into the parsed, ResponseData sequence
  491. :return:
  492. None or an asn1crypto.ocsp.ResponseData object
  493. """
  494. return self['response_bytes']['response'].parsed['tbs_response_data']