x963kdf.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import absolute_import, division, print_function
  5. import struct
  6. from cryptography import utils
  7. from cryptography.exceptions import (
  8. AlreadyFinalized,
  9. InvalidKey,
  10. UnsupportedAlgorithm,
  11. _Reasons,
  12. )
  13. from cryptography.hazmat.backends import _get_backend
  14. from cryptography.hazmat.backends.interfaces import HashBackend
  15. from cryptography.hazmat.primitives import constant_time, hashes
  16. from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
  17. def _int_to_u32be(n):
  18. return struct.pack(">I", n)
  19. @utils.register_interface(KeyDerivationFunction)
  20. class X963KDF(object):
  21. def __init__(self, algorithm, length, sharedinfo, backend=None):
  22. backend = _get_backend(backend)
  23. max_len = algorithm.digest_size * (2 ** 32 - 1)
  24. if length > max_len:
  25. raise ValueError(
  26. "Can not derive keys larger than {} bits.".format(max_len)
  27. )
  28. if sharedinfo is not None:
  29. utils._check_bytes("sharedinfo", sharedinfo)
  30. self._algorithm = algorithm
  31. self._length = length
  32. self._sharedinfo = sharedinfo
  33. if not isinstance(backend, HashBackend):
  34. raise UnsupportedAlgorithm(
  35. "Backend object does not implement HashBackend.",
  36. _Reasons.BACKEND_MISSING_INTERFACE,
  37. )
  38. self._backend = backend
  39. self._used = False
  40. def derive(self, key_material):
  41. if self._used:
  42. raise AlreadyFinalized
  43. self._used = True
  44. utils._check_byteslike("key_material", key_material)
  45. output = [b""]
  46. outlen = 0
  47. counter = 1
  48. while self._length > outlen:
  49. h = hashes.Hash(self._algorithm, self._backend)
  50. h.update(key_material)
  51. h.update(_int_to_u32be(counter))
  52. if self._sharedinfo is not None:
  53. h.update(self._sharedinfo)
  54. output.append(h.finalize())
  55. outlen += len(output[-1])
  56. counter += 1
  57. return b"".join(output)[: self._length]
  58. def verify(self, key_material, expected_key):
  59. if not constant_time.bytes_eq(self.derive(key_material), expected_key):
  60. raise InvalidKey