x509.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import absolute_import, division, print_function
  5. import datetime
  6. import operator
  7. from cryptography import utils, x509
  8. from cryptography.exceptions import UnsupportedAlgorithm
  9. from cryptography.hazmat.backends.openssl.decode_asn1 import (
  10. _asn1_integer_to_int,
  11. _asn1_string_to_bytes,
  12. _decode_x509_name,
  13. _obj2txt,
  14. _parse_asn1_time,
  15. )
  16. from cryptography.hazmat.backends.openssl.encode_asn1 import (
  17. _encode_asn1_int_gc,
  18. _txt2obj_gc,
  19. )
  20. from cryptography.hazmat.primitives import hashes, serialization
  21. from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
  22. from cryptography.x509.name import _ASN1Type
  23. @utils.register_interface(x509.Certificate)
  24. class _Certificate(object):
  25. def __init__(self, backend, x509_cert):
  26. self._backend = backend
  27. self._x509 = x509_cert
  28. version = self._backend._lib.X509_get_version(self._x509)
  29. if version == 0:
  30. self._version = x509.Version.v1
  31. elif version == 2:
  32. self._version = x509.Version.v3
  33. else:
  34. raise x509.InvalidVersion(
  35. "{} is not a valid X509 version".format(version), version
  36. )
  37. def __repr__(self):
  38. return "<Certificate(subject={}, ...)>".format(self.subject)
  39. def __eq__(self, other):
  40. if not isinstance(other, x509.Certificate):
  41. return NotImplemented
  42. res = self._backend._lib.X509_cmp(self._x509, other._x509)
  43. return res == 0
  44. def __ne__(self, other):
  45. return not self == other
  46. def __hash__(self):
  47. return hash(self.public_bytes(serialization.Encoding.DER))
  48. def __deepcopy__(self, memo):
  49. return self
  50. def fingerprint(self, algorithm):
  51. h = hashes.Hash(algorithm, self._backend)
  52. h.update(self.public_bytes(serialization.Encoding.DER))
  53. return h.finalize()
  54. version = utils.read_only_property("_version")
  55. @property
  56. def serial_number(self):
  57. asn1_int = self._backend._lib.X509_get_serialNumber(self._x509)
  58. self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
  59. return _asn1_integer_to_int(self._backend, asn1_int)
  60. def public_key(self):
  61. pkey = self._backend._lib.X509_get_pubkey(self._x509)
  62. if pkey == self._backend._ffi.NULL:
  63. # Remove errors from the stack.
  64. self._backend._consume_errors()
  65. raise ValueError("Certificate public key is of an unknown type")
  66. pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
  67. return self._backend._evp_pkey_to_public_key(pkey)
  68. @property
  69. def not_valid_before(self):
  70. asn1_time = self._backend._lib.X509_getm_notBefore(self._x509)
  71. return _parse_asn1_time(self._backend, asn1_time)
  72. @property
  73. def not_valid_after(self):
  74. asn1_time = self._backend._lib.X509_getm_notAfter(self._x509)
  75. return _parse_asn1_time(self._backend, asn1_time)
  76. @property
  77. def issuer(self):
  78. issuer = self._backend._lib.X509_get_issuer_name(self._x509)
  79. self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
  80. return _decode_x509_name(self._backend, issuer)
  81. @property
  82. def subject(self):
  83. subject = self._backend._lib.X509_get_subject_name(self._x509)
  84. self._backend.openssl_assert(subject != self._backend._ffi.NULL)
  85. return _decode_x509_name(self._backend, subject)
  86. @property
  87. def signature_hash_algorithm(self):
  88. oid = self.signature_algorithm_oid
  89. try:
  90. return x509._SIG_OIDS_TO_HASH[oid]
  91. except KeyError:
  92. raise UnsupportedAlgorithm(
  93. "Signature algorithm OID:{} not recognized".format(oid)
  94. )
  95. @property
  96. def signature_algorithm_oid(self):
  97. alg = self._backend._ffi.new("X509_ALGOR **")
  98. self._backend._lib.X509_get0_signature(
  99. self._backend._ffi.NULL, alg, self._x509
  100. )
  101. self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
  102. oid = _obj2txt(self._backend, alg[0].algorithm)
  103. return x509.ObjectIdentifier(oid)
  104. @utils.cached_property
  105. def extensions(self):
  106. return self._backend._certificate_extension_parser.parse(self._x509)
  107. @property
  108. def signature(self):
  109. sig = self._backend._ffi.new("ASN1_BIT_STRING **")
  110. self._backend._lib.X509_get0_signature(
  111. sig, self._backend._ffi.NULL, self._x509
  112. )
  113. self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
  114. return _asn1_string_to_bytes(self._backend, sig[0])
  115. @property
  116. def tbs_certificate_bytes(self):
  117. pp = self._backend._ffi.new("unsigned char **")
  118. res = self._backend._lib.i2d_re_X509_tbs(self._x509, pp)
  119. self._backend.openssl_assert(res > 0)
  120. pp = self._backend._ffi.gc(
  121. pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
  122. )
  123. return self._backend._ffi.buffer(pp[0], res)[:]
  124. def public_bytes(self, encoding):
  125. bio = self._backend._create_mem_bio_gc()
  126. if encoding is serialization.Encoding.PEM:
  127. res = self._backend._lib.PEM_write_bio_X509(bio, self._x509)
  128. elif encoding is serialization.Encoding.DER:
  129. res = self._backend._lib.i2d_X509_bio(bio, self._x509)
  130. else:
  131. raise TypeError("encoding must be an item from the Encoding enum")
  132. self._backend.openssl_assert(res == 1)
  133. return self._backend._read_mem_bio(bio)
  134. @utils.register_interface(x509.RevokedCertificate)
  135. class _RevokedCertificate(object):
  136. def __init__(self, backend, crl, x509_revoked):
  137. self._backend = backend
  138. # The X509_REVOKED_value is a X509_REVOKED * that has
  139. # no reference counting. This means when X509_CRL_free is
  140. # called then the CRL and all X509_REVOKED * are freed. Since
  141. # you can retain a reference to a single revoked certificate
  142. # and let the CRL fall out of scope we need to retain a
  143. # private reference to the CRL inside the RevokedCertificate
  144. # object to prevent the gc from being called inappropriately.
  145. self._crl = crl
  146. self._x509_revoked = x509_revoked
  147. @property
  148. def serial_number(self):
  149. asn1_int = self._backend._lib.X509_REVOKED_get0_serialNumber(
  150. self._x509_revoked
  151. )
  152. self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
  153. return _asn1_integer_to_int(self._backend, asn1_int)
  154. @property
  155. def revocation_date(self):
  156. return _parse_asn1_time(
  157. self._backend,
  158. self._backend._lib.X509_REVOKED_get0_revocationDate(
  159. self._x509_revoked
  160. ),
  161. )
  162. @utils.cached_property
  163. def extensions(self):
  164. return self._backend._revoked_cert_extension_parser.parse(
  165. self._x509_revoked
  166. )
  167. @utils.register_interface(x509.CertificateRevocationList)
  168. class _CertificateRevocationList(object):
  169. def __init__(self, backend, x509_crl):
  170. self._backend = backend
  171. self._x509_crl = x509_crl
  172. def __eq__(self, other):
  173. if not isinstance(other, x509.CertificateRevocationList):
  174. return NotImplemented
  175. res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl)
  176. return res == 0
  177. def __ne__(self, other):
  178. return not self == other
  179. def fingerprint(self, algorithm):
  180. h = hashes.Hash(algorithm, self._backend)
  181. bio = self._backend._create_mem_bio_gc()
  182. res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
  183. self._backend.openssl_assert(res == 1)
  184. der = self._backend._read_mem_bio(bio)
  185. h.update(der)
  186. return h.finalize()
  187. @utils.cached_property
  188. def _sorted_crl(self):
  189. # X509_CRL_get0_by_serial sorts in place, which breaks a variety of
  190. # things we don't want to break (like iteration and the signature).
  191. # Let's dupe it and sort that instead.
  192. dup = self._backend._lib.X509_CRL_dup(self._x509_crl)
  193. self._backend.openssl_assert(dup != self._backend._ffi.NULL)
  194. dup = self._backend._ffi.gc(dup, self._backend._lib.X509_CRL_free)
  195. return dup
  196. def get_revoked_certificate_by_serial_number(self, serial_number):
  197. revoked = self._backend._ffi.new("X509_REVOKED **")
  198. asn1_int = _encode_asn1_int_gc(self._backend, serial_number)
  199. res = self._backend._lib.X509_CRL_get0_by_serial(
  200. self._sorted_crl, revoked, asn1_int
  201. )
  202. if res == 0:
  203. return None
  204. else:
  205. self._backend.openssl_assert(revoked[0] != self._backend._ffi.NULL)
  206. return _RevokedCertificate(
  207. self._backend, self._sorted_crl, revoked[0]
  208. )
  209. @property
  210. def signature_hash_algorithm(self):
  211. oid = self.signature_algorithm_oid
  212. try:
  213. return x509._SIG_OIDS_TO_HASH[oid]
  214. except KeyError:
  215. raise UnsupportedAlgorithm(
  216. "Signature algorithm OID:{} not recognized".format(oid)
  217. )
  218. @property
  219. def signature_algorithm_oid(self):
  220. alg = self._backend._ffi.new("X509_ALGOR **")
  221. self._backend._lib.X509_CRL_get0_signature(
  222. self._x509_crl, self._backend._ffi.NULL, alg
  223. )
  224. self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
  225. oid = _obj2txt(self._backend, alg[0].algorithm)
  226. return x509.ObjectIdentifier(oid)
  227. @property
  228. def issuer(self):
  229. issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl)
  230. self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
  231. return _decode_x509_name(self._backend, issuer)
  232. @property
  233. def next_update(self):
  234. nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl)
  235. self._backend.openssl_assert(nu != self._backend._ffi.NULL)
  236. return _parse_asn1_time(self._backend, nu)
  237. @property
  238. def last_update(self):
  239. lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl)
  240. self._backend.openssl_assert(lu != self._backend._ffi.NULL)
  241. return _parse_asn1_time(self._backend, lu)
  242. @property
  243. def signature(self):
  244. sig = self._backend._ffi.new("ASN1_BIT_STRING **")
  245. self._backend._lib.X509_CRL_get0_signature(
  246. self._x509_crl, sig, self._backend._ffi.NULL
  247. )
  248. self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
  249. return _asn1_string_to_bytes(self._backend, sig[0])
  250. @property
  251. def tbs_certlist_bytes(self):
  252. pp = self._backend._ffi.new("unsigned char **")
  253. res = self._backend._lib.i2d_re_X509_CRL_tbs(self._x509_crl, pp)
  254. self._backend.openssl_assert(res > 0)
  255. pp = self._backend._ffi.gc(
  256. pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
  257. )
  258. return self._backend._ffi.buffer(pp[0], res)[:]
  259. def public_bytes(self, encoding):
  260. bio = self._backend._create_mem_bio_gc()
  261. if encoding is serialization.Encoding.PEM:
  262. res = self._backend._lib.PEM_write_bio_X509_CRL(
  263. bio, self._x509_crl
  264. )
  265. elif encoding is serialization.Encoding.DER:
  266. res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
  267. else:
  268. raise TypeError("encoding must be an item from the Encoding enum")
  269. self._backend.openssl_assert(res == 1)
  270. return self._backend._read_mem_bio(bio)
  271. def _revoked_cert(self, idx):
  272. revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
  273. r = self._backend._lib.sk_X509_REVOKED_value(revoked, idx)
  274. self._backend.openssl_assert(r != self._backend._ffi.NULL)
  275. return _RevokedCertificate(self._backend, self, r)
  276. def __iter__(self):
  277. for i in range(len(self)):
  278. yield self._revoked_cert(i)
  279. def __getitem__(self, idx):
  280. if isinstance(idx, slice):
  281. start, stop, step = idx.indices(len(self))
  282. return [self._revoked_cert(i) for i in range(start, stop, step)]
  283. else:
  284. idx = operator.index(idx)
  285. if idx < 0:
  286. idx += len(self)
  287. if not 0 <= idx < len(self):
  288. raise IndexError
  289. return self._revoked_cert(idx)
  290. def __len__(self):
  291. revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
  292. if revoked == self._backend._ffi.NULL:
  293. return 0
  294. else:
  295. return self._backend._lib.sk_X509_REVOKED_num(revoked)
  296. @utils.cached_property
  297. def extensions(self):
  298. return self._backend._crl_extension_parser.parse(self._x509_crl)
  299. def is_signature_valid(self, public_key):
  300. if not isinstance(
  301. public_key,
  302. (dsa.DSAPublicKey, rsa.RSAPublicKey, ec.EllipticCurvePublicKey),
  303. ):
  304. raise TypeError(
  305. "Expecting one of DSAPublicKey, RSAPublicKey,"
  306. " or EllipticCurvePublicKey."
  307. )
  308. res = self._backend._lib.X509_CRL_verify(
  309. self._x509_crl, public_key._evp_pkey
  310. )
  311. if res != 1:
  312. self._backend._consume_errors()
  313. return False
  314. return True
  315. @utils.register_interface(x509.CertificateSigningRequest)
  316. class _CertificateSigningRequest(object):
  317. def __init__(self, backend, x509_req):
  318. self._backend = backend
  319. self._x509_req = x509_req
  320. def __eq__(self, other):
  321. if not isinstance(other, _CertificateSigningRequest):
  322. return NotImplemented
  323. self_bytes = self.public_bytes(serialization.Encoding.DER)
  324. other_bytes = other.public_bytes(serialization.Encoding.DER)
  325. return self_bytes == other_bytes
  326. def __ne__(self, other):
  327. return not self == other
  328. def __hash__(self):
  329. return hash(self.public_bytes(serialization.Encoding.DER))
  330. def public_key(self):
  331. pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
  332. self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
  333. pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
  334. return self._backend._evp_pkey_to_public_key(pkey)
  335. @property
  336. def subject(self):
  337. subject = self._backend._lib.X509_REQ_get_subject_name(self._x509_req)
  338. self._backend.openssl_assert(subject != self._backend._ffi.NULL)
  339. return _decode_x509_name(self._backend, subject)
  340. @property
  341. def signature_hash_algorithm(self):
  342. oid = self.signature_algorithm_oid
  343. try:
  344. return x509._SIG_OIDS_TO_HASH[oid]
  345. except KeyError:
  346. raise UnsupportedAlgorithm(
  347. "Signature algorithm OID:{} not recognized".format(oid)
  348. )
  349. @property
  350. def signature_algorithm_oid(self):
  351. alg = self._backend._ffi.new("X509_ALGOR **")
  352. self._backend._lib.X509_REQ_get0_signature(
  353. self._x509_req, self._backend._ffi.NULL, alg
  354. )
  355. self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
  356. oid = _obj2txt(self._backend, alg[0].algorithm)
  357. return x509.ObjectIdentifier(oid)
  358. @utils.cached_property
  359. def extensions(self):
  360. x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req)
  361. x509_exts = self._backend._ffi.gc(
  362. x509_exts,
  363. lambda x: self._backend._lib.sk_X509_EXTENSION_pop_free(
  364. x,
  365. self._backend._ffi.addressof(
  366. self._backend._lib._original_lib, "X509_EXTENSION_free"
  367. ),
  368. ),
  369. )
  370. return self._backend._csr_extension_parser.parse(x509_exts)
  371. def public_bytes(self, encoding):
  372. bio = self._backend._create_mem_bio_gc()
  373. if encoding is serialization.Encoding.PEM:
  374. res = self._backend._lib.PEM_write_bio_X509_REQ(
  375. bio, self._x509_req
  376. )
  377. elif encoding is serialization.Encoding.DER:
  378. res = self._backend._lib.i2d_X509_REQ_bio(bio, self._x509_req)
  379. else:
  380. raise TypeError("encoding must be an item from the Encoding enum")
  381. self._backend.openssl_assert(res == 1)
  382. return self._backend._read_mem_bio(bio)
  383. @property
  384. def tbs_certrequest_bytes(self):
  385. pp = self._backend._ffi.new("unsigned char **")
  386. res = self._backend._lib.i2d_re_X509_REQ_tbs(self._x509_req, pp)
  387. self._backend.openssl_assert(res > 0)
  388. pp = self._backend._ffi.gc(
  389. pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
  390. )
  391. return self._backend._ffi.buffer(pp[0], res)[:]
  392. @property
  393. def signature(self):
  394. sig = self._backend._ffi.new("ASN1_BIT_STRING **")
  395. self._backend._lib.X509_REQ_get0_signature(
  396. self._x509_req, sig, self._backend._ffi.NULL
  397. )
  398. self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
  399. return _asn1_string_to_bytes(self._backend, sig[0])
  400. @property
  401. def is_signature_valid(self):
  402. pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
  403. self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
  404. pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
  405. res = self._backend._lib.X509_REQ_verify(self._x509_req, pkey)
  406. if res != 1:
  407. self._backend._consume_errors()
  408. return False
  409. return True
  410. def get_attribute_for_oid(self, oid):
  411. obj = _txt2obj_gc(self._backend, oid.dotted_string)
  412. pos = self._backend._lib.X509_REQ_get_attr_by_OBJ(
  413. self._x509_req, obj, -1
  414. )
  415. if pos == -1:
  416. raise x509.AttributeNotFound(
  417. "No {} attribute was found".format(oid), oid
  418. )
  419. attr = self._backend._lib.X509_REQ_get_attr(self._x509_req, pos)
  420. self._backend.openssl_assert(attr != self._backend._ffi.NULL)
  421. # We don't support multiple valued attributes for now.
  422. self._backend.openssl_assert(
  423. self._backend._lib.X509_ATTRIBUTE_count(attr) == 1
  424. )
  425. asn1_type = self._backend._lib.X509_ATTRIBUTE_get0_type(attr, 0)
  426. self._backend.openssl_assert(asn1_type != self._backend._ffi.NULL)
  427. # We need this to ensure that our C type cast is safe.
  428. # Also this should always be a sane string type, but we'll see if
  429. # that is true in the real world...
  430. if asn1_type.type not in (
  431. _ASN1Type.UTF8String.value,
  432. _ASN1Type.PrintableString.value,
  433. _ASN1Type.IA5String.value,
  434. ):
  435. raise ValueError(
  436. "OID {} has a disallowed ASN.1 type: {}".format(
  437. oid, asn1_type.type
  438. )
  439. )
  440. data = self._backend._lib.X509_ATTRIBUTE_get0_data(
  441. attr, 0, asn1_type.type, self._backend._ffi.NULL
  442. )
  443. self._backend.openssl_assert(data != self._backend._ffi.NULL)
  444. # This cast is safe iff we assert on the type above to ensure
  445. # that it is always a type of ASN1_STRING
  446. data = self._backend._ffi.cast("ASN1_STRING *", data)
  447. return _asn1_string_to_bytes(self._backend, data)
  448. @utils.register_interface(
  449. x509.certificate_transparency.SignedCertificateTimestamp
  450. )
  451. class _SignedCertificateTimestamp(object):
  452. def __init__(self, backend, sct_list, sct):
  453. self._backend = backend
  454. # Keep the SCT_LIST that this SCT came from alive.
  455. self._sct_list = sct_list
  456. self._sct = sct
  457. @property
  458. def version(self):
  459. version = self._backend._lib.SCT_get_version(self._sct)
  460. assert version == self._backend._lib.SCT_VERSION_V1
  461. return x509.certificate_transparency.Version.v1
  462. @property
  463. def log_id(self):
  464. out = self._backend._ffi.new("unsigned char **")
  465. log_id_length = self._backend._lib.SCT_get0_log_id(self._sct, out)
  466. assert log_id_length >= 0
  467. return self._backend._ffi.buffer(out[0], log_id_length)[:]
  468. @property
  469. def timestamp(self):
  470. timestamp = self._backend._lib.SCT_get_timestamp(self._sct)
  471. milliseconds = timestamp % 1000
  472. return datetime.datetime.utcfromtimestamp(timestamp // 1000).replace(
  473. microsecond=milliseconds * 1000
  474. )
  475. @property
  476. def entry_type(self):
  477. entry_type = self._backend._lib.SCT_get_log_entry_type(self._sct)
  478. # We currently only support loading SCTs from the X.509 extension, so
  479. # we only have precerts.
  480. assert entry_type == self._backend._lib.CT_LOG_ENTRY_TYPE_PRECERT
  481. return x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE
  482. @property
  483. def _signature(self):
  484. ptrptr = self._backend._ffi.new("unsigned char **")
  485. res = self._backend._lib.SCT_get0_signature(self._sct, ptrptr)
  486. self._backend.openssl_assert(res > 0)
  487. self._backend.openssl_assert(ptrptr[0] != self._backend._ffi.NULL)
  488. return self._backend._ffi.buffer(ptrptr[0], res)[:]
  489. def __hash__(self):
  490. return hash(self._signature)
  491. def __eq__(self, other):
  492. if not isinstance(other, _SignedCertificateTimestamp):
  493. return NotImplemented
  494. return self._signature == other._signature
  495. def __ne__(self, other):
  496. return not self == other