dh.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import absolute_import, division, print_function
  5. from cryptography import utils
  6. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  7. from cryptography.hazmat.primitives import serialization
  8. from cryptography.hazmat.primitives.asymmetric import dh
  9. def _dh_params_dup(dh_cdata, backend):
  10. lib = backend._lib
  11. ffi = backend._ffi
  12. param_cdata = lib.DHparams_dup(dh_cdata)
  13. backend.openssl_assert(param_cdata != ffi.NULL)
  14. param_cdata = ffi.gc(param_cdata, lib.DH_free)
  15. if lib.CRYPTOGRAPHY_IS_LIBRESSL:
  16. # In libressl DHparams_dup don't copy q
  17. q = ffi.new("BIGNUM **")
  18. lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
  19. q_dup = lib.BN_dup(q[0])
  20. res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
  21. backend.openssl_assert(res == 1)
  22. return param_cdata
  23. def _dh_cdata_to_parameters(dh_cdata, backend):
  24. param_cdata = _dh_params_dup(dh_cdata, backend)
  25. return _DHParameters(backend, param_cdata)
  26. @utils.register_interface(dh.DHParametersWithSerialization)
  27. class _DHParameters(object):
  28. def __init__(self, backend, dh_cdata):
  29. self._backend = backend
  30. self._dh_cdata = dh_cdata
  31. def parameter_numbers(self):
  32. p = self._backend._ffi.new("BIGNUM **")
  33. g = self._backend._ffi.new("BIGNUM **")
  34. q = self._backend._ffi.new("BIGNUM **")
  35. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  36. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  37. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  38. if q[0] == self._backend._ffi.NULL:
  39. q_val = None
  40. else:
  41. q_val = self._backend._bn_to_int(q[0])
  42. return dh.DHParameterNumbers(
  43. p=self._backend._bn_to_int(p[0]),
  44. g=self._backend._bn_to_int(g[0]),
  45. q=q_val,
  46. )
  47. def generate_private_key(self):
  48. return self._backend.generate_dh_private_key(self)
  49. def parameter_bytes(self, encoding, format):
  50. if format is not serialization.ParameterFormat.PKCS3:
  51. raise ValueError("Only PKCS3 serialization is supported")
  52. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  53. q = self._backend._ffi.new("BIGNUM **")
  54. self._backend._lib.DH_get0_pqg(
  55. self._dh_cdata,
  56. self._backend._ffi.NULL,
  57. q,
  58. self._backend._ffi.NULL,
  59. )
  60. if q[0] != self._backend._ffi.NULL:
  61. raise UnsupportedAlgorithm(
  62. "DH X9.42 serialization is not supported",
  63. _Reasons.UNSUPPORTED_SERIALIZATION,
  64. )
  65. return self._backend._parameter_bytes(encoding, format, self._dh_cdata)
  66. def _get_dh_num_bits(backend, dh_cdata):
  67. p = backend._ffi.new("BIGNUM **")
  68. backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL)
  69. backend.openssl_assert(p[0] != backend._ffi.NULL)
  70. return backend._lib.BN_num_bits(p[0])
  71. @utils.register_interface(dh.DHPrivateKeyWithSerialization)
  72. class _DHPrivateKey(object):
  73. def __init__(self, backend, dh_cdata, evp_pkey):
  74. self._backend = backend
  75. self._dh_cdata = dh_cdata
  76. self._evp_pkey = evp_pkey
  77. self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
  78. @property
  79. def key_size(self):
  80. return _get_dh_num_bits(self._backend, self._dh_cdata)
  81. def private_numbers(self):
  82. p = self._backend._ffi.new("BIGNUM **")
  83. g = self._backend._ffi.new("BIGNUM **")
  84. q = self._backend._ffi.new("BIGNUM **")
  85. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  86. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  87. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  88. if q[0] == self._backend._ffi.NULL:
  89. q_val = None
  90. else:
  91. q_val = self._backend._bn_to_int(q[0])
  92. pub_key = self._backend._ffi.new("BIGNUM **")
  93. priv_key = self._backend._ffi.new("BIGNUM **")
  94. self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
  95. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  96. self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
  97. return dh.DHPrivateNumbers(
  98. public_numbers=dh.DHPublicNumbers(
  99. parameter_numbers=dh.DHParameterNumbers(
  100. p=self._backend._bn_to_int(p[0]),
  101. g=self._backend._bn_to_int(g[0]),
  102. q=q_val,
  103. ),
  104. y=self._backend._bn_to_int(pub_key[0]),
  105. ),
  106. x=self._backend._bn_to_int(priv_key[0]),
  107. )
  108. def exchange(self, peer_public_key):
  109. buf = self._backend._ffi.new("unsigned char[]", self._key_size_bytes)
  110. pub_key = self._backend._ffi.new("BIGNUM **")
  111. self._backend._lib.DH_get0_key(
  112. peer_public_key._dh_cdata, pub_key, self._backend._ffi.NULL
  113. )
  114. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  115. res = self._backend._lib.DH_compute_key(
  116. buf, pub_key[0], self._dh_cdata
  117. )
  118. if res == -1:
  119. errors_with_text = self._backend._consume_errors_with_text()
  120. raise ValueError(
  121. "Error computing shared key. Public key is likely invalid "
  122. "for this exchange.",
  123. errors_with_text,
  124. )
  125. else:
  126. self._backend.openssl_assert(res >= 1)
  127. key = self._backend._ffi.buffer(buf)[:res]
  128. pad = self._key_size_bytes - len(key)
  129. if pad > 0:
  130. key = (b"\x00" * pad) + key
  131. return key
  132. def public_key(self):
  133. dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
  134. pub_key = self._backend._ffi.new("BIGNUM **")
  135. self._backend._lib.DH_get0_key(
  136. self._dh_cdata, pub_key, self._backend._ffi.NULL
  137. )
  138. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  139. pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
  140. self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
  141. res = self._backend._lib.DH_set0_key(
  142. dh_cdata, pub_key_dup, self._backend._ffi.NULL
  143. )
  144. self._backend.openssl_assert(res == 1)
  145. evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
  146. return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
  147. def parameters(self):
  148. return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
  149. def private_bytes(self, encoding, format, encryption_algorithm):
  150. if format is not serialization.PrivateFormat.PKCS8:
  151. raise ValueError(
  152. "DH private keys support only PKCS8 serialization"
  153. )
  154. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  155. q = self._backend._ffi.new("BIGNUM **")
  156. self._backend._lib.DH_get0_pqg(
  157. self._dh_cdata,
  158. self._backend._ffi.NULL,
  159. q,
  160. self._backend._ffi.NULL,
  161. )
  162. if q[0] != self._backend._ffi.NULL:
  163. raise UnsupportedAlgorithm(
  164. "DH X9.42 serialization is not supported",
  165. _Reasons.UNSUPPORTED_SERIALIZATION,
  166. )
  167. return self._backend._private_key_bytes(
  168. encoding,
  169. format,
  170. encryption_algorithm,
  171. self,
  172. self._evp_pkey,
  173. self._dh_cdata,
  174. )
  175. @utils.register_interface(dh.DHPublicKeyWithSerialization)
  176. class _DHPublicKey(object):
  177. def __init__(self, backend, dh_cdata, evp_pkey):
  178. self._backend = backend
  179. self._dh_cdata = dh_cdata
  180. self._evp_pkey = evp_pkey
  181. self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
  182. @property
  183. def key_size(self):
  184. return self._key_size_bits
  185. def public_numbers(self):
  186. p = self._backend._ffi.new("BIGNUM **")
  187. g = self._backend._ffi.new("BIGNUM **")
  188. q = self._backend._ffi.new("BIGNUM **")
  189. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  190. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  191. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  192. if q[0] == self._backend._ffi.NULL:
  193. q_val = None
  194. else:
  195. q_val = self._backend._bn_to_int(q[0])
  196. pub_key = self._backend._ffi.new("BIGNUM **")
  197. self._backend._lib.DH_get0_key(
  198. self._dh_cdata, pub_key, self._backend._ffi.NULL
  199. )
  200. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  201. return dh.DHPublicNumbers(
  202. parameter_numbers=dh.DHParameterNumbers(
  203. p=self._backend._bn_to_int(p[0]),
  204. g=self._backend._bn_to_int(g[0]),
  205. q=q_val,
  206. ),
  207. y=self._backend._bn_to_int(pub_key[0]),
  208. )
  209. def parameters(self):
  210. return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
  211. def public_bytes(self, encoding, format):
  212. if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
  213. raise ValueError(
  214. "DH public keys support only "
  215. "SubjectPublicKeyInfo serialization"
  216. )
  217. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  218. q = self._backend._ffi.new("BIGNUM **")
  219. self._backend._lib.DH_get0_pqg(
  220. self._dh_cdata,
  221. self._backend._ffi.NULL,
  222. q,
  223. self._backend._ffi.NULL,
  224. )
  225. if q[0] != self._backend._ffi.NULL:
  226. raise UnsupportedAlgorithm(
  227. "DH X9.42 serialization is not supported",
  228. _Reasons.UNSUPPORTED_SERIALIZATION,
  229. )
  230. return self._backend._public_key_bytes(
  231. encoding, format, self, self._evp_pkey, None
  232. )