hashes.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import absolute_import, division, print_function
  5. from cryptography import utils
  6. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  7. from cryptography.hazmat.primitives import hashes
  8. @utils.register_interface(hashes.HashContext)
  9. class _HashContext(object):
  10. def __init__(self, backend, algorithm, ctx=None):
  11. self._algorithm = algorithm
  12. self._backend = backend
  13. if ctx is None:
  14. ctx = self._backend._lib.Cryptography_EVP_MD_CTX_new()
  15. ctx = self._backend._ffi.gc(
  16. ctx, self._backend._lib.Cryptography_EVP_MD_CTX_free
  17. )
  18. evp_md = self._backend._evp_md_from_algorithm(algorithm)
  19. if evp_md == self._backend._ffi.NULL:
  20. raise UnsupportedAlgorithm(
  21. "{} is not a supported hash on this backend.".format(
  22. algorithm.name
  23. ),
  24. _Reasons.UNSUPPORTED_HASH,
  25. )
  26. res = self._backend._lib.EVP_DigestInit_ex(
  27. ctx, evp_md, self._backend._ffi.NULL
  28. )
  29. self._backend.openssl_assert(res != 0)
  30. self._ctx = ctx
  31. algorithm = utils.read_only_property("_algorithm")
  32. def copy(self):
  33. copied_ctx = self._backend._lib.Cryptography_EVP_MD_CTX_new()
  34. copied_ctx = self._backend._ffi.gc(
  35. copied_ctx, self._backend._lib.Cryptography_EVP_MD_CTX_free
  36. )
  37. res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx)
  38. self._backend.openssl_assert(res != 0)
  39. return _HashContext(self._backend, self.algorithm, ctx=copied_ctx)
  40. def update(self, data):
  41. data_ptr = self._backend._ffi.from_buffer(data)
  42. res = self._backend._lib.EVP_DigestUpdate(
  43. self._ctx, data_ptr, len(data)
  44. )
  45. self._backend.openssl_assert(res != 0)
  46. def finalize(self):
  47. if isinstance(self.algorithm, hashes.ExtendableOutputFunction):
  48. # extendable output functions use a different finalize
  49. return self._finalize_xof()
  50. else:
  51. buf = self._backend._ffi.new(
  52. "unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE
  53. )
  54. outlen = self._backend._ffi.new("unsigned int *")
  55. res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, outlen)
  56. self._backend.openssl_assert(res != 0)
  57. self._backend.openssl_assert(
  58. outlen[0] == self.algorithm.digest_size
  59. )
  60. return self._backend._ffi.buffer(buf)[: outlen[0]]
  61. def _finalize_xof(self):
  62. buf = self._backend._ffi.new(
  63. "unsigned char[]", self.algorithm.digest_size
  64. )
  65. res = self._backend._lib.EVP_DigestFinalXOF(
  66. self._ctx, buf, self.algorithm.digest_size
  67. )
  68. self._backend.openssl_assert(res != 0)
  69. return self._backend._ffi.buffer(buf)[: self.algorithm.digest_size]