decode_asn1.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import absolute_import, division, print_function
  5. import datetime
  6. import ipaddress
  7. import six
  8. from cryptography import x509
  9. from cryptography.hazmat._der import DERReader, INTEGER, NULL, SEQUENCE
  10. from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM
  11. from cryptography.x509.name import _ASN1_TYPE_TO_ENUM
  12. from cryptography.x509.oid import (
  13. CRLEntryExtensionOID,
  14. CertificatePoliciesOID,
  15. ExtensionOID,
  16. OCSPExtensionOID,
  17. )
  18. def _obj2txt(backend, obj):
  19. # Set to 80 on the recommendation of
  20. # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
  21. #
  22. # But OIDs longer than this occur in real life (e.g. Active
  23. # Directory makes some very long OIDs). So we need to detect
  24. # and properly handle the case where the default buffer is not
  25. # big enough.
  26. #
  27. buf_len = 80
  28. buf = backend._ffi.new("char[]", buf_len)
  29. # 'res' is the number of bytes that *would* be written if the
  30. # buffer is large enough. If 'res' > buf_len - 1, we need to
  31. # alloc a big-enough buffer and go again.
  32. res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
  33. if res > buf_len - 1: # account for terminating null byte
  34. buf_len = res + 1
  35. buf = backend._ffi.new("char[]", buf_len)
  36. res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
  37. backend.openssl_assert(res > 0)
  38. return backend._ffi.buffer(buf, res)[:].decode()
  39. def _decode_x509_name_entry(backend, x509_name_entry):
  40. obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
  41. backend.openssl_assert(obj != backend._ffi.NULL)
  42. data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
  43. backend.openssl_assert(data != backend._ffi.NULL)
  44. value = _asn1_string_to_utf8(backend, data)
  45. oid = _obj2txt(backend, obj)
  46. type = _ASN1_TYPE_TO_ENUM[data.type]
  47. return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type)
  48. def _decode_x509_name(backend, x509_name):
  49. count = backend._lib.X509_NAME_entry_count(x509_name)
  50. attributes = []
  51. prev_set_id = -1
  52. for x in range(count):
  53. entry = backend._lib.X509_NAME_get_entry(x509_name, x)
  54. attribute = _decode_x509_name_entry(backend, entry)
  55. set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
  56. if set_id != prev_set_id:
  57. attributes.append({attribute})
  58. else:
  59. # is in the same RDN a previous entry
  60. attributes[-1].add(attribute)
  61. prev_set_id = set_id
  62. return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
  63. def _decode_general_names(backend, gns):
  64. num = backend._lib.sk_GENERAL_NAME_num(gns)
  65. names = []
  66. for i in range(num):
  67. gn = backend._lib.sk_GENERAL_NAME_value(gns, i)
  68. backend.openssl_assert(gn != backend._ffi.NULL)
  69. names.append(_decode_general_name(backend, gn))
  70. return names
  71. def _decode_general_name(backend, gn):
  72. if gn.type == backend._lib.GEN_DNS:
  73. # Convert to bytes and then decode to utf8. We don't use
  74. # asn1_string_to_utf8 here because it doesn't properly convert
  75. # utf8 from ia5strings.
  76. data = _asn1_string_to_bytes(backend, gn.d.dNSName).decode("utf8")
  77. # We don't use the constructor for DNSName so we can bypass validation
  78. # This allows us to create DNSName objects that have unicode chars
  79. # when a certificate (against the RFC) contains them.
  80. return x509.DNSName._init_without_validation(data)
  81. elif gn.type == backend._lib.GEN_URI:
  82. # Convert to bytes and then decode to utf8. We don't use
  83. # asn1_string_to_utf8 here because it doesn't properly convert
  84. # utf8 from ia5strings.
  85. data = _asn1_string_to_bytes(
  86. backend, gn.d.uniformResourceIdentifier
  87. ).decode("utf8")
  88. # We don't use the constructor for URI so we can bypass validation
  89. # This allows us to create URI objects that have unicode chars
  90. # when a certificate (against the RFC) contains them.
  91. return x509.UniformResourceIdentifier._init_without_validation(data)
  92. elif gn.type == backend._lib.GEN_RID:
  93. oid = _obj2txt(backend, gn.d.registeredID)
  94. return x509.RegisteredID(x509.ObjectIdentifier(oid))
  95. elif gn.type == backend._lib.GEN_IPADD:
  96. data = _asn1_string_to_bytes(backend, gn.d.iPAddress)
  97. data_len = len(data)
  98. if data_len == 8 or data_len == 32:
  99. # This is an IPv4 or IPv6 Network and not a single IP. This
  100. # type of data appears in Name Constraints. Unfortunately,
  101. # ipaddress doesn't support packed bytes + netmask. Additionally,
  102. # IPv6Network can only handle CIDR rather than the full 16 byte
  103. # netmask. To handle this we convert the netmask to integer, then
  104. # find the first 0 bit, which will be the prefix. If another 1
  105. # bit is present after that the netmask is invalid.
  106. base = ipaddress.ip_address(data[: data_len // 2])
  107. netmask = ipaddress.ip_address(data[data_len // 2 :])
  108. bits = bin(int(netmask))[2:]
  109. prefix = bits.find("0")
  110. # If no 0 bits are found it is a /32 or /128
  111. if prefix == -1:
  112. prefix = len(bits)
  113. if "1" in bits[prefix:]:
  114. raise ValueError("Invalid netmask")
  115. ip = ipaddress.ip_network(base.exploded + u"/{}".format(prefix))
  116. else:
  117. ip = ipaddress.ip_address(data)
  118. return x509.IPAddress(ip)
  119. elif gn.type == backend._lib.GEN_DIRNAME:
  120. return x509.DirectoryName(
  121. _decode_x509_name(backend, gn.d.directoryName)
  122. )
  123. elif gn.type == backend._lib.GEN_EMAIL:
  124. # Convert to bytes and then decode to utf8. We don't use
  125. # asn1_string_to_utf8 here because it doesn't properly convert
  126. # utf8 from ia5strings.
  127. data = _asn1_string_to_bytes(backend, gn.d.rfc822Name).decode("utf8")
  128. # We don't use the constructor for RFC822Name so we can bypass
  129. # validation. This allows us to create RFC822Name objects that have
  130. # unicode chars when a certificate (against the RFC) contains them.
  131. return x509.RFC822Name._init_without_validation(data)
  132. elif gn.type == backend._lib.GEN_OTHERNAME:
  133. type_id = _obj2txt(backend, gn.d.otherName.type_id)
  134. value = _asn1_to_der(backend, gn.d.otherName.value)
  135. return x509.OtherName(x509.ObjectIdentifier(type_id), value)
  136. else:
  137. # x400Address or ediPartyName
  138. raise x509.UnsupportedGeneralNameType(
  139. "{} is not a supported type".format(
  140. x509._GENERAL_NAMES.get(gn.type, gn.type)
  141. ),
  142. gn.type,
  143. )
  144. def _decode_ocsp_no_check(backend, ext):
  145. return x509.OCSPNoCheck()
  146. def _decode_crl_number(backend, ext):
  147. asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
  148. asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
  149. return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int))
  150. def _decode_delta_crl_indicator(backend, ext):
  151. asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
  152. asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
  153. return x509.DeltaCRLIndicator(_asn1_integer_to_int(backend, asn1_int))
  154. class _X509ExtensionParser(object):
  155. def __init__(self, backend, ext_count, get_ext, handlers):
  156. self.ext_count = ext_count
  157. self.get_ext = get_ext
  158. self.handlers = handlers
  159. self._backend = backend
  160. def parse(self, x509_obj):
  161. extensions = []
  162. seen_oids = set()
  163. for i in range(self.ext_count(x509_obj)):
  164. ext = self.get_ext(x509_obj, i)
  165. self._backend.openssl_assert(ext != self._backend._ffi.NULL)
  166. crit = self._backend._lib.X509_EXTENSION_get_critical(ext)
  167. critical = crit == 1
  168. oid = x509.ObjectIdentifier(
  169. _obj2txt(
  170. self._backend,
  171. self._backend._lib.X509_EXTENSION_get_object(ext),
  172. )
  173. )
  174. if oid in seen_oids:
  175. raise x509.DuplicateExtension(
  176. "Duplicate {} extension found".format(oid), oid
  177. )
  178. # These OIDs are only supported in OpenSSL 1.1.0+ but we want
  179. # to support them in all versions of OpenSSL so we decode them
  180. # ourselves.
  181. if oid == ExtensionOID.TLS_FEATURE:
  182. # The extension contents are a SEQUENCE OF INTEGERs.
  183. data = self._backend._lib.X509_EXTENSION_get_data(ext)
  184. data_bytes = _asn1_string_to_bytes(self._backend, data)
  185. features = DERReader(data_bytes).read_single_element(SEQUENCE)
  186. parsed = []
  187. while not features.is_empty():
  188. parsed.append(features.read_element(INTEGER).as_integer())
  189. # Map the features to their enum value.
  190. value = x509.TLSFeature(
  191. [_TLS_FEATURE_TYPE_TO_ENUM[x] for x in parsed]
  192. )
  193. extensions.append(x509.Extension(oid, critical, value))
  194. seen_oids.add(oid)
  195. continue
  196. elif oid == ExtensionOID.PRECERT_POISON:
  197. data = self._backend._lib.X509_EXTENSION_get_data(ext)
  198. # The contents of the extension must be an ASN.1 NULL.
  199. reader = DERReader(_asn1_string_to_bytes(self._backend, data))
  200. reader.read_single_element(NULL).check_empty()
  201. extensions.append(
  202. x509.Extension(oid, critical, x509.PrecertPoison())
  203. )
  204. seen_oids.add(oid)
  205. continue
  206. try:
  207. handler = self.handlers[oid]
  208. except KeyError:
  209. # Dump the DER payload into an UnrecognizedExtension object
  210. data = self._backend._lib.X509_EXTENSION_get_data(ext)
  211. self._backend.openssl_assert(data != self._backend._ffi.NULL)
  212. der = self._backend._ffi.buffer(data.data, data.length)[:]
  213. unrecognized = x509.UnrecognizedExtension(oid, der)
  214. extensions.append(x509.Extension(oid, critical, unrecognized))
  215. else:
  216. ext_data = self._backend._lib.X509V3_EXT_d2i(ext)
  217. if ext_data == self._backend._ffi.NULL:
  218. self._backend._consume_errors()
  219. raise ValueError(
  220. "The {} extension is invalid and can't be "
  221. "parsed".format(oid)
  222. )
  223. value = handler(self._backend, ext_data)
  224. extensions.append(x509.Extension(oid, critical, value))
  225. seen_oids.add(oid)
  226. return x509.Extensions(extensions)
  227. def _decode_certificate_policies(backend, cp):
  228. cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp)
  229. cp = backend._ffi.gc(cp, backend._lib.CERTIFICATEPOLICIES_free)
  230. num = backend._lib.sk_POLICYINFO_num(cp)
  231. certificate_policies = []
  232. for i in range(num):
  233. qualifiers = None
  234. pi = backend._lib.sk_POLICYINFO_value(cp, i)
  235. oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid))
  236. if pi.qualifiers != backend._ffi.NULL:
  237. qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
  238. qualifiers = []
  239. for j in range(qnum):
  240. pqi = backend._lib.sk_POLICYQUALINFO_value(pi.qualifiers, j)
  241. pqualid = x509.ObjectIdentifier(_obj2txt(backend, pqi.pqualid))
  242. if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
  243. cpsuri = backend._ffi.buffer(
  244. pqi.d.cpsuri.data, pqi.d.cpsuri.length
  245. )[:].decode("ascii")
  246. qualifiers.append(cpsuri)
  247. else:
  248. assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
  249. user_notice = _decode_user_notice(
  250. backend, pqi.d.usernotice
  251. )
  252. qualifiers.append(user_notice)
  253. certificate_policies.append(x509.PolicyInformation(oid, qualifiers))
  254. return x509.CertificatePolicies(certificate_policies)
  255. def _decode_user_notice(backend, un):
  256. explicit_text = None
  257. notice_reference = None
  258. if un.exptext != backend._ffi.NULL:
  259. explicit_text = _asn1_string_to_utf8(backend, un.exptext)
  260. if un.noticeref != backend._ffi.NULL:
  261. organization = _asn1_string_to_utf8(backend, un.noticeref.organization)
  262. num = backend._lib.sk_ASN1_INTEGER_num(un.noticeref.noticenos)
  263. notice_numbers = []
  264. for i in range(num):
  265. asn1_int = backend._lib.sk_ASN1_INTEGER_value(
  266. un.noticeref.noticenos, i
  267. )
  268. notice_num = _asn1_integer_to_int(backend, asn1_int)
  269. notice_numbers.append(notice_num)
  270. notice_reference = x509.NoticeReference(organization, notice_numbers)
  271. return x509.UserNotice(notice_reference, explicit_text)
  272. def _decode_basic_constraints(backend, bc_st):
  273. basic_constraints = backend._ffi.cast("BASIC_CONSTRAINTS *", bc_st)
  274. basic_constraints = backend._ffi.gc(
  275. basic_constraints, backend._lib.BASIC_CONSTRAINTS_free
  276. )
  277. # The byte representation of an ASN.1 boolean true is \xff. OpenSSL
  278. # chooses to just map this to its ordinal value, so true is 255 and
  279. # false is 0.
  280. ca = basic_constraints.ca == 255
  281. path_length = _asn1_integer_to_int_or_none(
  282. backend, basic_constraints.pathlen
  283. )
  284. return x509.BasicConstraints(ca, path_length)
  285. def _decode_subject_key_identifier(backend, asn1_string):
  286. asn1_string = backend._ffi.cast("ASN1_OCTET_STRING *", asn1_string)
  287. asn1_string = backend._ffi.gc(
  288. asn1_string, backend._lib.ASN1_OCTET_STRING_free
  289. )
  290. return x509.SubjectKeyIdentifier(
  291. backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]
  292. )
  293. def _decode_authority_key_identifier(backend, akid):
  294. akid = backend._ffi.cast("AUTHORITY_KEYID *", akid)
  295. akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free)
  296. key_identifier = None
  297. authority_cert_issuer = None
  298. if akid.keyid != backend._ffi.NULL:
  299. key_identifier = backend._ffi.buffer(
  300. akid.keyid.data, akid.keyid.length
  301. )[:]
  302. if akid.issuer != backend._ffi.NULL:
  303. authority_cert_issuer = _decode_general_names(backend, akid.issuer)
  304. authority_cert_serial_number = _asn1_integer_to_int_or_none(
  305. backend, akid.serial
  306. )
  307. return x509.AuthorityKeyIdentifier(
  308. key_identifier, authority_cert_issuer, authority_cert_serial_number
  309. )
  310. def _decode_information_access(backend, ia):
  311. ia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", ia)
  312. ia = backend._ffi.gc(
  313. ia,
  314. lambda x: backend._lib.sk_ACCESS_DESCRIPTION_pop_free(
  315. x,
  316. backend._ffi.addressof(
  317. backend._lib._original_lib, "ACCESS_DESCRIPTION_free"
  318. ),
  319. ),
  320. )
  321. num = backend._lib.sk_ACCESS_DESCRIPTION_num(ia)
  322. access_descriptions = []
  323. for i in range(num):
  324. ad = backend._lib.sk_ACCESS_DESCRIPTION_value(ia, i)
  325. backend.openssl_assert(ad.method != backend._ffi.NULL)
  326. oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method))
  327. backend.openssl_assert(ad.location != backend._ffi.NULL)
  328. gn = _decode_general_name(backend, ad.location)
  329. access_descriptions.append(x509.AccessDescription(oid, gn))
  330. return access_descriptions
  331. def _decode_authority_information_access(backend, aia):
  332. access_descriptions = _decode_information_access(backend, aia)
  333. return x509.AuthorityInformationAccess(access_descriptions)
  334. def _decode_subject_information_access(backend, aia):
  335. access_descriptions = _decode_information_access(backend, aia)
  336. return x509.SubjectInformationAccess(access_descriptions)
  337. def _decode_key_usage(backend, bit_string):
  338. bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
  339. bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
  340. get_bit = backend._lib.ASN1_BIT_STRING_get_bit
  341. digital_signature = get_bit(bit_string, 0) == 1
  342. content_commitment = get_bit(bit_string, 1) == 1
  343. key_encipherment = get_bit(bit_string, 2) == 1
  344. data_encipherment = get_bit(bit_string, 3) == 1
  345. key_agreement = get_bit(bit_string, 4) == 1
  346. key_cert_sign = get_bit(bit_string, 5) == 1
  347. crl_sign = get_bit(bit_string, 6) == 1
  348. encipher_only = get_bit(bit_string, 7) == 1
  349. decipher_only = get_bit(bit_string, 8) == 1
  350. return x509.KeyUsage(
  351. digital_signature,
  352. content_commitment,
  353. key_encipherment,
  354. data_encipherment,
  355. key_agreement,
  356. key_cert_sign,
  357. crl_sign,
  358. encipher_only,
  359. decipher_only,
  360. )
  361. def _decode_general_names_extension(backend, gns):
  362. gns = backend._ffi.cast("GENERAL_NAMES *", gns)
  363. gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
  364. general_names = _decode_general_names(backend, gns)
  365. return general_names
  366. def _decode_subject_alt_name(backend, ext):
  367. return x509.SubjectAlternativeName(
  368. _decode_general_names_extension(backend, ext)
  369. )
  370. def _decode_issuer_alt_name(backend, ext):
  371. return x509.IssuerAlternativeName(
  372. _decode_general_names_extension(backend, ext)
  373. )
  374. def _decode_name_constraints(backend, nc):
  375. nc = backend._ffi.cast("NAME_CONSTRAINTS *", nc)
  376. nc = backend._ffi.gc(nc, backend._lib.NAME_CONSTRAINTS_free)
  377. permitted = _decode_general_subtrees(backend, nc.permittedSubtrees)
  378. excluded = _decode_general_subtrees(backend, nc.excludedSubtrees)
  379. return x509.NameConstraints(
  380. permitted_subtrees=permitted, excluded_subtrees=excluded
  381. )
  382. def _decode_general_subtrees(backend, stack_subtrees):
  383. if stack_subtrees == backend._ffi.NULL:
  384. return None
  385. num = backend._lib.sk_GENERAL_SUBTREE_num(stack_subtrees)
  386. subtrees = []
  387. for i in range(num):
  388. obj = backend._lib.sk_GENERAL_SUBTREE_value(stack_subtrees, i)
  389. backend.openssl_assert(obj != backend._ffi.NULL)
  390. name = _decode_general_name(backend, obj.base)
  391. subtrees.append(name)
  392. return subtrees
  393. def _decode_issuing_dist_point(backend, idp):
  394. idp = backend._ffi.cast("ISSUING_DIST_POINT *", idp)
  395. idp = backend._ffi.gc(idp, backend._lib.ISSUING_DIST_POINT_free)
  396. if idp.distpoint != backend._ffi.NULL:
  397. full_name, relative_name = _decode_distpoint(backend, idp.distpoint)
  398. else:
  399. full_name = None
  400. relative_name = None
  401. only_user = idp.onlyuser == 255
  402. only_ca = idp.onlyCA == 255
  403. indirect_crl = idp.indirectCRL == 255
  404. only_attr = idp.onlyattr == 255
  405. if idp.onlysomereasons != backend._ffi.NULL:
  406. only_some_reasons = _decode_reasons(backend, idp.onlysomereasons)
  407. else:
  408. only_some_reasons = None
  409. return x509.IssuingDistributionPoint(
  410. full_name,
  411. relative_name,
  412. only_user,
  413. only_ca,
  414. only_some_reasons,
  415. indirect_crl,
  416. only_attr,
  417. )
  418. def _decode_policy_constraints(backend, pc):
  419. pc = backend._ffi.cast("POLICY_CONSTRAINTS *", pc)
  420. pc = backend._ffi.gc(pc, backend._lib.POLICY_CONSTRAINTS_free)
  421. require_explicit_policy = _asn1_integer_to_int_or_none(
  422. backend, pc.requireExplicitPolicy
  423. )
  424. inhibit_policy_mapping = _asn1_integer_to_int_or_none(
  425. backend, pc.inhibitPolicyMapping
  426. )
  427. return x509.PolicyConstraints(
  428. require_explicit_policy, inhibit_policy_mapping
  429. )
  430. def _decode_extended_key_usage(backend, sk):
  431. sk = backend._ffi.cast("Cryptography_STACK_OF_ASN1_OBJECT *", sk)
  432. sk = backend._ffi.gc(sk, backend._lib.sk_ASN1_OBJECT_free)
  433. num = backend._lib.sk_ASN1_OBJECT_num(sk)
  434. ekus = []
  435. for i in range(num):
  436. obj = backend._lib.sk_ASN1_OBJECT_value(sk, i)
  437. backend.openssl_assert(obj != backend._ffi.NULL)
  438. oid = x509.ObjectIdentifier(_obj2txt(backend, obj))
  439. ekus.append(oid)
  440. return x509.ExtendedKeyUsage(ekus)
  441. _DISTPOINT_TYPE_FULLNAME = 0
  442. _DISTPOINT_TYPE_RELATIVENAME = 1
  443. def _decode_dist_points(backend, cdps):
  444. cdps = backend._ffi.cast("Cryptography_STACK_OF_DIST_POINT *", cdps)
  445. cdps = backend._ffi.gc(cdps, backend._lib.CRL_DIST_POINTS_free)
  446. num = backend._lib.sk_DIST_POINT_num(cdps)
  447. dist_points = []
  448. for i in range(num):
  449. full_name = None
  450. relative_name = None
  451. crl_issuer = None
  452. reasons = None
  453. cdp = backend._lib.sk_DIST_POINT_value(cdps, i)
  454. if cdp.reasons != backend._ffi.NULL:
  455. reasons = _decode_reasons(backend, cdp.reasons)
  456. if cdp.CRLissuer != backend._ffi.NULL:
  457. crl_issuer = _decode_general_names(backend, cdp.CRLissuer)
  458. # Certificates may have a crl_issuer/reasons and no distribution
  459. # point so make sure it's not null.
  460. if cdp.distpoint != backend._ffi.NULL:
  461. full_name, relative_name = _decode_distpoint(
  462. backend, cdp.distpoint
  463. )
  464. dist_points.append(
  465. x509.DistributionPoint(
  466. full_name, relative_name, reasons, crl_issuer
  467. )
  468. )
  469. return dist_points
  470. # ReasonFlags ::= BIT STRING {
  471. # unused (0),
  472. # keyCompromise (1),
  473. # cACompromise (2),
  474. # affiliationChanged (3),
  475. # superseded (4),
  476. # cessationOfOperation (5),
  477. # certificateHold (6),
  478. # privilegeWithdrawn (7),
  479. # aACompromise (8) }
  480. _REASON_BIT_MAPPING = {
  481. 1: x509.ReasonFlags.key_compromise,
  482. 2: x509.ReasonFlags.ca_compromise,
  483. 3: x509.ReasonFlags.affiliation_changed,
  484. 4: x509.ReasonFlags.superseded,
  485. 5: x509.ReasonFlags.cessation_of_operation,
  486. 6: x509.ReasonFlags.certificate_hold,
  487. 7: x509.ReasonFlags.privilege_withdrawn,
  488. 8: x509.ReasonFlags.aa_compromise,
  489. }
  490. def _decode_reasons(backend, reasons):
  491. # We will check each bit from RFC 5280
  492. enum_reasons = []
  493. for bit_position, reason in six.iteritems(_REASON_BIT_MAPPING):
  494. if backend._lib.ASN1_BIT_STRING_get_bit(reasons, bit_position):
  495. enum_reasons.append(reason)
  496. return frozenset(enum_reasons)
  497. def _decode_distpoint(backend, distpoint):
  498. if distpoint.type == _DISTPOINT_TYPE_FULLNAME:
  499. full_name = _decode_general_names(backend, distpoint.name.fullname)
  500. return full_name, None
  501. # OpenSSL code doesn't test for a specific type for
  502. # relativename, everything that isn't fullname is considered
  503. # relativename. Per RFC 5280:
  504. #
  505. # DistributionPointName ::= CHOICE {
  506. # fullName [0] GeneralNames,
  507. # nameRelativeToCRLIssuer [1] RelativeDistinguishedName }
  508. rns = distpoint.name.relativename
  509. rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns)
  510. attributes = set()
  511. for i in range(rnum):
  512. rn = backend._lib.sk_X509_NAME_ENTRY_value(rns, i)
  513. backend.openssl_assert(rn != backend._ffi.NULL)
  514. attributes.add(_decode_x509_name_entry(backend, rn))
  515. relative_name = x509.RelativeDistinguishedName(attributes)
  516. return None, relative_name
  517. def _decode_crl_distribution_points(backend, cdps):
  518. dist_points = _decode_dist_points(backend, cdps)
  519. return x509.CRLDistributionPoints(dist_points)
  520. def _decode_freshest_crl(backend, cdps):
  521. dist_points = _decode_dist_points(backend, cdps)
  522. return x509.FreshestCRL(dist_points)
  523. def _decode_inhibit_any_policy(backend, asn1_int):
  524. asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int)
  525. asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
  526. skip_certs = _asn1_integer_to_int(backend, asn1_int)
  527. return x509.InhibitAnyPolicy(skip_certs)
  528. def _decode_scts(backend, asn1_scts):
  529. from cryptography.hazmat.backends.openssl.x509 import (
  530. _SignedCertificateTimestamp,
  531. )
  532. asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts)
  533. asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free)
  534. scts = []
  535. for i in range(backend._lib.sk_SCT_num(asn1_scts)):
  536. sct = backend._lib.sk_SCT_value(asn1_scts, i)
  537. scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct))
  538. return scts
  539. def _decode_precert_signed_certificate_timestamps(backend, asn1_scts):
  540. return x509.PrecertificateSignedCertificateTimestamps(
  541. _decode_scts(backend, asn1_scts)
  542. )
  543. def _decode_signed_certificate_timestamps(backend, asn1_scts):
  544. return x509.SignedCertificateTimestamps(_decode_scts(backend, asn1_scts))
  545. # CRLReason ::= ENUMERATED {
  546. # unspecified (0),
  547. # keyCompromise (1),
  548. # cACompromise (2),
  549. # affiliationChanged (3),
  550. # superseded (4),
  551. # cessationOfOperation (5),
  552. # certificateHold (6),
  553. # -- value 7 is not used
  554. # removeFromCRL (8),
  555. # privilegeWithdrawn (9),
  556. # aACompromise (10) }
  557. _CRL_ENTRY_REASON_CODE_TO_ENUM = {
  558. 0: x509.ReasonFlags.unspecified,
  559. 1: x509.ReasonFlags.key_compromise,
  560. 2: x509.ReasonFlags.ca_compromise,
  561. 3: x509.ReasonFlags.affiliation_changed,
  562. 4: x509.ReasonFlags.superseded,
  563. 5: x509.ReasonFlags.cessation_of_operation,
  564. 6: x509.ReasonFlags.certificate_hold,
  565. 8: x509.ReasonFlags.remove_from_crl,
  566. 9: x509.ReasonFlags.privilege_withdrawn,
  567. 10: x509.ReasonFlags.aa_compromise,
  568. }
  569. _CRL_ENTRY_REASON_ENUM_TO_CODE = {
  570. x509.ReasonFlags.unspecified: 0,
  571. x509.ReasonFlags.key_compromise: 1,
  572. x509.ReasonFlags.ca_compromise: 2,
  573. x509.ReasonFlags.affiliation_changed: 3,
  574. x509.ReasonFlags.superseded: 4,
  575. x509.ReasonFlags.cessation_of_operation: 5,
  576. x509.ReasonFlags.certificate_hold: 6,
  577. x509.ReasonFlags.remove_from_crl: 8,
  578. x509.ReasonFlags.privilege_withdrawn: 9,
  579. x509.ReasonFlags.aa_compromise: 10,
  580. }
  581. def _decode_crl_reason(backend, enum):
  582. enum = backend._ffi.cast("ASN1_ENUMERATED *", enum)
  583. enum = backend._ffi.gc(enum, backend._lib.ASN1_ENUMERATED_free)
  584. code = backend._lib.ASN1_ENUMERATED_get(enum)
  585. try:
  586. return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code])
  587. except KeyError:
  588. raise ValueError("Unsupported reason code: {}".format(code))
  589. def _decode_invalidity_date(backend, inv_date):
  590. generalized_time = backend._ffi.cast("ASN1_GENERALIZEDTIME *", inv_date)
  591. generalized_time = backend._ffi.gc(
  592. generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
  593. )
  594. return x509.InvalidityDate(
  595. _parse_asn1_generalized_time(backend, generalized_time)
  596. )
  597. def _decode_cert_issuer(backend, gns):
  598. gns = backend._ffi.cast("GENERAL_NAMES *", gns)
  599. gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
  600. general_names = _decode_general_names(backend, gns)
  601. return x509.CertificateIssuer(general_names)
  602. def _asn1_to_der(backend, asn1_type):
  603. buf = backend._ffi.new("unsigned char **")
  604. res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf)
  605. backend.openssl_assert(res >= 0)
  606. backend.openssl_assert(buf[0] != backend._ffi.NULL)
  607. buf = backend._ffi.gc(
  608. buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
  609. )
  610. return backend._ffi.buffer(buf[0], res)[:]
  611. def _asn1_integer_to_int(backend, asn1_int):
  612. bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL)
  613. backend.openssl_assert(bn != backend._ffi.NULL)
  614. bn = backend._ffi.gc(bn, backend._lib.BN_free)
  615. return backend._bn_to_int(bn)
  616. def _asn1_integer_to_int_or_none(backend, asn1_int):
  617. if asn1_int == backend._ffi.NULL:
  618. return None
  619. else:
  620. return _asn1_integer_to_int(backend, asn1_int)
  621. def _asn1_string_to_bytes(backend, asn1_string):
  622. return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]
  623. def _asn1_string_to_ascii(backend, asn1_string):
  624. return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
  625. def _asn1_string_to_utf8(backend, asn1_string):
  626. buf = backend._ffi.new("unsigned char **")
  627. res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
  628. if res == -1:
  629. raise ValueError(
  630. "Unsupported ASN1 string type. Type: {}".format(asn1_string.type)
  631. )
  632. backend.openssl_assert(buf[0] != backend._ffi.NULL)
  633. buf = backend._ffi.gc(
  634. buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
  635. )
  636. return backend._ffi.buffer(buf[0], res)[:].decode("utf8")
  637. def _parse_asn1_time(backend, asn1_time):
  638. backend.openssl_assert(asn1_time != backend._ffi.NULL)
  639. generalized_time = backend._lib.ASN1_TIME_to_generalizedtime(
  640. asn1_time, backend._ffi.NULL
  641. )
  642. if generalized_time == backend._ffi.NULL:
  643. raise ValueError(
  644. "Couldn't parse ASN.1 time as generalizedtime {!r}".format(
  645. _asn1_string_to_bytes(backend, asn1_time)
  646. )
  647. )
  648. generalized_time = backend._ffi.gc(
  649. generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
  650. )
  651. return _parse_asn1_generalized_time(backend, generalized_time)
  652. def _parse_asn1_generalized_time(backend, generalized_time):
  653. time = _asn1_string_to_ascii(
  654. backend, backend._ffi.cast("ASN1_STRING *", generalized_time)
  655. )
  656. return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")
  657. def _decode_nonce(backend, nonce):
  658. nonce = backend._ffi.cast("ASN1_OCTET_STRING *", nonce)
  659. nonce = backend._ffi.gc(nonce, backend._lib.ASN1_OCTET_STRING_free)
  660. return x509.OCSPNonce(_asn1_string_to_bytes(backend, nonce))
  661. _EXTENSION_HANDLERS_BASE = {
  662. ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints,
  663. ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier,
  664. ExtensionOID.KEY_USAGE: _decode_key_usage,
  665. ExtensionOID.SUBJECT_ALTERNATIVE_NAME: _decode_subject_alt_name,
  666. ExtensionOID.EXTENDED_KEY_USAGE: _decode_extended_key_usage,
  667. ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier,
  668. ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
  669. _decode_authority_information_access
  670. ),
  671. ExtensionOID.SUBJECT_INFORMATION_ACCESS: (
  672. _decode_subject_information_access
  673. ),
  674. ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies,
  675. ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points,
  676. ExtensionOID.FRESHEST_CRL: _decode_freshest_crl,
  677. ExtensionOID.OCSP_NO_CHECK: _decode_ocsp_no_check,
  678. ExtensionOID.INHIBIT_ANY_POLICY: _decode_inhibit_any_policy,
  679. ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name,
  680. ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints,
  681. ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints,
  682. }
  683. _EXTENSION_HANDLERS_SCT = {
  684. ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS: (
  685. _decode_precert_signed_certificate_timestamps
  686. )
  687. }
  688. _REVOKED_EXTENSION_HANDLERS = {
  689. CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason,
  690. CRLEntryExtensionOID.INVALIDITY_DATE: _decode_invalidity_date,
  691. CRLEntryExtensionOID.CERTIFICATE_ISSUER: _decode_cert_issuer,
  692. }
  693. _CRL_EXTENSION_HANDLERS = {
  694. ExtensionOID.CRL_NUMBER: _decode_crl_number,
  695. ExtensionOID.DELTA_CRL_INDICATOR: _decode_delta_crl_indicator,
  696. ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier,
  697. ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name,
  698. ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
  699. _decode_authority_information_access
  700. ),
  701. ExtensionOID.ISSUING_DISTRIBUTION_POINT: _decode_issuing_dist_point,
  702. ExtensionOID.FRESHEST_CRL: _decode_freshest_crl,
  703. }
  704. _OCSP_REQ_EXTENSION_HANDLERS = {
  705. OCSPExtensionOID.NONCE: _decode_nonce,
  706. }
  707. _OCSP_BASICRESP_EXTENSION_HANDLERS = {
  708. OCSPExtensionOID.NONCE: _decode_nonce,
  709. }
  710. _OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT = {
  711. ExtensionOID.SIGNED_CERTIFICATE_TIMESTAMPS: (
  712. _decode_signed_certificate_timestamps
  713. )
  714. }