CMAC.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. # -*- coding: utf-8 -*-
  2. #
  3. # Hash/CMAC.py - Implements the CMAC algorithm
  4. #
  5. # ===================================================================
  6. # The contents of this file are dedicated to the public domain. To
  7. # the extent that dedication to the public domain is not available,
  8. # everyone is granted a worldwide, perpetual, royalty-free,
  9. # non-exclusive license to exercise all rights associated with the
  10. # contents of this file for any purpose whatsoever.
  11. # No rights are reserved.
  12. #
  13. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  14. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  15. # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  16. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  17. # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  18. # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  19. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  20. # SOFTWARE.
  21. # ===================================================================
  22. from Crypto.Util.py3compat import b, bchr, bord, tobytes
  23. from binascii import unhexlify
  24. from Crypto.Hash import BLAKE2s
  25. from Crypto.Util.strxor import strxor
  26. from Crypto.Util.number import long_to_bytes, bytes_to_long
  27. from Crypto.Random import get_random_bytes
  28. # The size of the authentication tag produced by the MAC.
  29. digest_size = None
  30. def _shift_bytes(bs, xor_lsb=0):
  31. num = (bytes_to_long(bs) << 1) ^ xor_lsb
  32. return long_to_bytes(num, len(bs))[-len(bs):]
  33. class CMAC(object):
  34. """A CMAC hash object.
  35. Do not instantiate directly. Use the :func:`new` function.
  36. :ivar digest_size: the size in bytes of the resulting MAC tag
  37. :vartype digest_size: integer
  38. """
  39. digest_size = None
  40. def __init__(self, key, msg=None, ciphermod=None, cipher_params=None):
  41. if ciphermod is None:
  42. raise TypeError("ciphermod must be specified (try AES)")
  43. self._key = key
  44. self._factory = ciphermod
  45. if cipher_params is None:
  46. self._cipher_params = {}
  47. else:
  48. self._cipher_params = dict(cipher_params)
  49. # Section 5.3 of NIST SP 800 38B and Appendix B
  50. if ciphermod.block_size == 8:
  51. const_Rb = 0x1B
  52. self._max_size = 8 * (2 ** 21)
  53. elif ciphermod.block_size == 16:
  54. const_Rb = 0x87
  55. self._max_size = 16 * (2 ** 48)
  56. else:
  57. raise TypeError("CMAC requires a cipher with a block size"
  58. "of 8 or 16 bytes, not %d" %
  59. (ciphermod.block_size,))
  60. # Size of the final MAC tag, in bytes
  61. self.digest_size = ciphermod.block_size
  62. self._mac_tag = None
  63. # Compute sub-keys
  64. zero_block = bchr(0) * ciphermod.block_size
  65. cipher = ciphermod.new(key,
  66. ciphermod.MODE_ECB,
  67. **self._cipher_params)
  68. l = cipher.encrypt(zero_block)
  69. if bord(l[0]) & 0x80:
  70. self._k1 = _shift_bytes(l, const_Rb)
  71. else:
  72. self._k1 = _shift_bytes(l)
  73. if bord(self._k1[0]) & 0x80:
  74. self._k2 = _shift_bytes(self._k1, const_Rb)
  75. else:
  76. self._k2 = _shift_bytes(self._k1)
  77. # Initialize CBC cipher with zero IV
  78. self._cbc = ciphermod.new(key,
  79. ciphermod.MODE_CBC,
  80. zero_block,
  81. **self._cipher_params)
  82. # Cache for outstanding data to authenticate
  83. self._cache = b("")
  84. # Last two pieces of ciphertext produced
  85. self._last_ct = self._last_pt = zero_block
  86. self._before_last_ct = None
  87. # Counter for total message size
  88. self._data_size = 0
  89. if msg:
  90. self.update(msg)
  91. def update(self, msg):
  92. """Authenticate the next chunk of message.
  93. Args:
  94. data (byte string): The next chunk of data
  95. """
  96. self._data_size += len(msg)
  97. if len(self._cache) > 0:
  98. filler = min(self.digest_size - len(self._cache), len(msg))
  99. self._cache += msg[:filler]
  100. if len(self._cache) < self.digest_size:
  101. return self
  102. msg = msg[filler:]
  103. self._update(self._cache)
  104. self._cache = b("")
  105. update_len, remain = divmod(len(msg), self.digest_size)
  106. update_len *= self.digest_size
  107. if remain > 0:
  108. self._update(msg[:update_len])
  109. self._cache = msg[update_len:]
  110. else:
  111. self._update(msg)
  112. self._cache = b("")
  113. return self
  114. def _update(self, data_block):
  115. """Update a block aligned to the block boundary"""
  116. if len(data_block) == 0:
  117. return
  118. assert len(data_block) % self.digest_size == 0
  119. ct = self._cbc.encrypt(data_block)
  120. if len(data_block) == self.digest_size:
  121. self._before_last_ct = self._last_ct
  122. else:
  123. self._before_last_ct = ct[-self.digest_size * 2:-self.digest_size]
  124. self._last_ct = ct[-self.digest_size:]
  125. self._last_pt = data_block[-self.digest_size:]
  126. def copy(self):
  127. """Return a copy ("clone") of the CMAC object.
  128. The copy will have the same internal state as the original CMAC
  129. object.
  130. This can be used to efficiently compute the MAC tag of byte
  131. strings that share a common initial substring.
  132. :return: An :class:`CMAC`
  133. """
  134. obj = CMAC(self._key,
  135. ciphermod=self._factory,
  136. cipher_params=self._cipher_params)
  137. obj._cbc = self._factory.new(self._key,
  138. self._factory.MODE_CBC,
  139. self._last_ct,
  140. **self._cipher_params)
  141. for m in ['_mac_tag', '_last_ct', '_before_last_ct', '_cache',
  142. '_data_size', '_max_size']:
  143. setattr(obj, m, getattr(self, m))
  144. return obj
  145. def digest(self):
  146. """Return the **binary** (non-printable) MAC tag of the message
  147. that has been authenticated so far.
  148. :return: The MAC tag, computed over the data processed so far.
  149. Binary form.
  150. :rtype: byte string
  151. """
  152. if self._mac_tag is not None:
  153. return self._mac_tag
  154. if self._data_size > self._max_size:
  155. raise ValueError("MAC is unsafe for this message")
  156. if len(self._cache) == 0 and self._before_last_ct is not None:
  157. # Last block was full
  158. pt = strxor(strxor(self._before_last_ct, self._k1), self._last_pt)
  159. else:
  160. # Last block is partial (or message length is zero)
  161. ext = self._cache + bchr(0x80) +\
  162. bchr(0) * (self.digest_size - len(self._cache) - 1)
  163. pt = strxor(strxor(self._last_ct, self._k2), ext)
  164. cipher = self._factory.new(self._key,
  165. self._factory.MODE_ECB,
  166. **self._cipher_params)
  167. self._mac_tag = cipher.encrypt(pt)
  168. return self._mac_tag
  169. def hexdigest(self):
  170. """Return the **printable** MAC tag of the message authenticated so far.
  171. :return: The MAC tag, computed over the data processed so far.
  172. Hexadecimal encoded.
  173. :rtype: string
  174. """
  175. return "".join(["%02x" % bord(x)
  176. for x in tuple(self.digest())])
  177. def verify(self, mac_tag):
  178. """Verify that a given **binary** MAC (computed by another party)
  179. is valid.
  180. Args:
  181. mac_tag (byte string): the expected MAC of the message.
  182. Raises:
  183. ValueError: if the MAC does not match. It means that the message
  184. has been tampered with or that the MAC key is incorrect.
  185. """
  186. secret = get_random_bytes(16)
  187. mac1 = BLAKE2s.new(digest_bits=160, key=secret, data=mac_tag)
  188. mac2 = BLAKE2s.new(digest_bits=160, key=secret, data=self.digest())
  189. if mac1.digest() != mac2.digest():
  190. raise ValueError("MAC check failed")
  191. def hexverify(self, hex_mac_tag):
  192. """Return the **printable** MAC tag of the message authenticated so far.
  193. :return: The MAC tag, computed over the data processed so far.
  194. Hexadecimal encoded.
  195. :rtype: string
  196. """
  197. self.verify(unhexlify(tobytes(hex_mac_tag)))
  198. def new(key, msg=None, ciphermod=None, cipher_params=None):
  199. """Create a new MAC object.
  200. Args:
  201. key (byte string):
  202. key for the CMAC object.
  203. The key must be valid for the underlying cipher algorithm.
  204. For instance, it must be 16 bytes long for AES-128.
  205. ciphermod (module):
  206. A cipher module from :mod:`Crypto.Cipher`.
  207. The cipher's block size has to be 128 bits,
  208. like :mod:`Crypto.Cipher.AES`, to reduce the probability
  209. of collisions.
  210. msg (byte string):
  211. Optional. The very first chunk of the message to authenticate.
  212. It is equivalent to an early call to `CMAC.update`. Optional.
  213. cipher_params (dict):
  214. Optional. A set of parameters to use when instantiating a cipher
  215. object.
  216. Returns:
  217. A :class:`CMAC` object
  218. """
  219. return CMAC(key, msg, ciphermod, cipher_params)