common.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. # -*- coding: utf-8 -*-
  2. #
  3. # SelfTest/Hash/common.py: Common code for Crypto.SelfTest.Hash
  4. #
  5. # Written in 2008 by Dwayne C. Litzenberger <dlitz@dlitz.net>
  6. #
  7. # ===================================================================
  8. # The contents of this file are dedicated to the public domain. To
  9. # the extent that dedication to the public domain is not available,
  10. # everyone is granted a worldwide, perpetual, royalty-free,
  11. # non-exclusive license to exercise all rights associated with the
  12. # contents of this file for any purpose whatsoever.
  13. # No rights are reserved.
  14. #
  15. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  16. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  17. # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  18. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  19. # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  20. # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  22. # SOFTWARE.
  23. # ===================================================================
  24. """Self-testing for PyCrypto hash modules"""
  25. __revision__ = "$Id$"
  26. import sys
  27. import unittest
  28. from binascii import a2b_hex, b2a_hex
  29. from Crypto.Util.py3compat import *
  30. # For compatibility with Python 2.1 and Python 2.2
  31. if sys.hexversion < 0x02030000:
  32. # Python 2.1 doesn't have a dict() function
  33. # Python 2.2 dict() function raises TypeError if you do dict(MD5='blah')
  34. def dict(**kwargs):
  35. return kwargs.copy()
  36. else:
  37. dict = dict
  38. class _NoDefault: pass # sentinel object
  39. def _extract(d, k, default=_NoDefault):
  40. """Get an item from a dictionary, and remove it from the dictionary."""
  41. try:
  42. retval = d[k]
  43. except KeyError:
  44. if default is _NoDefault:
  45. raise
  46. return default
  47. del d[k]
  48. return retval
  49. # Generic cipher test case
  50. class CipherSelfTest(unittest.TestCase):
  51. def __init__(self, module, params):
  52. unittest.TestCase.__init__(self)
  53. self.module = module
  54. # Extract the parameters
  55. params = params.copy()
  56. self.description = _extract(params, 'description')
  57. self.key = b(_extract(params, 'key'))
  58. self.plaintext = b(_extract(params, 'plaintext'))
  59. self.ciphertext = b(_extract(params, 'ciphertext'))
  60. self.module_name = _extract(params, 'module_name', None)
  61. mode = _extract(params, 'mode', None)
  62. self.mode_name = str(mode)
  63. if mode is not None:
  64. # Block cipher
  65. self.mode = getattr(self.module, "MODE_" + mode)
  66. self.iv = _extract(params, 'iv', None)
  67. if self.iv is not None: self.iv = b(self.iv)
  68. # Only relevant for OPENPGP mode
  69. self.encrypted_iv = _extract(params, 'encrypted_iv', None)
  70. if self.encrypted_iv is not None:
  71. self.encrypted_iv = b(self.encrypted_iv)
  72. else:
  73. # Stream cipher
  74. self.mode = None
  75. self.iv = None
  76. self.extra_params = params
  77. def shortDescription(self):
  78. return self.description
  79. def _new(self, do_decryption=0):
  80. params = self.extra_params.copy()
  81. # Handle CTR mode parameters. By default, we use Counter.new(self.module.block_size)
  82. if hasattr(self.module, "MODE_CTR") and self.mode == self.module.MODE_CTR:
  83. from Crypto.Util import Counter
  84. ctr_class = _extract(params, 'ctr_class', Counter.new)
  85. ctr_params = _extract(params, 'ctr_params', {}).copy()
  86. if ctr_params.has_key('prefix'): ctr_params['prefix'] = a2b_hex(b(ctr_params['prefix']))
  87. if ctr_params.has_key('suffix'): ctr_params['suffix'] = a2b_hex(b(ctr_params['suffix']))
  88. if not ctr_params.has_key('nbits'):
  89. ctr_params['nbits'] = 8*(self.module.block_size - len(ctr_params.get('prefix', '')) - len(ctr_params.get('suffix', '')))
  90. params['counter'] = ctr_class(**ctr_params)
  91. if self.mode is None:
  92. # Stream cipher
  93. return self.module.new(a2b_hex(self.key), **params)
  94. elif self.iv is None:
  95. # Block cipher without iv
  96. return self.module.new(a2b_hex(self.key), self.mode, **params)
  97. else:
  98. # Block cipher with iv
  99. if do_decryption and self.mode == self.module.MODE_OPENPGP:
  100. # In PGP mode, the IV to feed for decryption is the *encrypted* one
  101. return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(self.encrypted_iv), **params)
  102. else:
  103. return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(self.iv), **params)
  104. def runTest(self):
  105. plaintext = a2b_hex(self.plaintext)
  106. ciphertext = a2b_hex(self.ciphertext)
  107. ct1 = b2a_hex(self._new().encrypt(plaintext))
  108. pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
  109. ct2 = b2a_hex(self._new().encrypt(plaintext))
  110. pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
  111. if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
  112. # In PGP mode, data returned by the first encrypt()
  113. # is prefixed with the encrypted IV.
  114. # Here we check it and then remove it from the ciphertexts.
  115. eilen = len(self.encrypted_iv)
  116. self.assertEqual(self.encrypted_iv, ct1[:eilen])
  117. self.assertEqual(self.encrypted_iv, ct2[:eilen])
  118. ct1 = ct1[eilen:]
  119. ct2 = ct2[eilen:]
  120. self.assertEqual(self.ciphertext, ct1) # encrypt
  121. self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
  122. self.assertEqual(self.plaintext, pt1) # decrypt
  123. self.assertEqual(self.plaintext, pt2) # decrypt (second time)
  124. class CipherStreamingSelfTest(CipherSelfTest):
  125. def shortDescription(self):
  126. desc = self.module_name
  127. if self.mode is not None:
  128. desc += " in %s mode" % (self.mode_name,)
  129. return "%s should behave like a stream cipher" % (desc,)
  130. def runTest(self):
  131. plaintext = a2b_hex(self.plaintext)
  132. ciphertext = a2b_hex(self.ciphertext)
  133. # The cipher should work like a stream cipher
  134. # Test counter mode encryption, 3 bytes at a time
  135. ct3 = []
  136. cipher = self._new()
  137. for i in range(0, len(plaintext), 3):
  138. ct3.append(cipher.encrypt(plaintext[i:i+3]))
  139. ct3 = b2a_hex(b("").join(ct3))
  140. self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time)
  141. # Test counter mode decryption, 3 bytes at a time
  142. pt3 = []
  143. cipher = self._new()
  144. for i in range(0, len(ciphertext), 3):
  145. pt3.append(cipher.encrypt(ciphertext[i:i+3]))
  146. # PY3K: This is meant to be text, do not change to bytes (data)
  147. pt3 = b2a_hex(b("").join(pt3))
  148. self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
  149. class CTRSegfaultTest(unittest.TestCase):
  150. def __init__(self, module, params):
  151. unittest.TestCase.__init__(self)
  152. self.module = module
  153. self.key = b(params['key'])
  154. self.module_name = params.get('module_name', None)
  155. def shortDescription(self):
  156. return """Regression test: %s.new(key, %s.MODE_CTR) should raise TypeError, not segfault""" % (self.module_name, self.module_name)
  157. def runTest(self):
  158. self.assertRaises(TypeError, self.module.new, a2b_hex(self.key), self.module.MODE_CTR)
  159. class CTRWraparoundTest(unittest.TestCase):
  160. def __init__(self, module, params):
  161. unittest.TestCase.__init__(self)
  162. self.module = module
  163. self.key = b(params['key'])
  164. self.module_name = params.get('module_name', None)
  165. def shortDescription(self):
  166. return """Regression test: %s with MODE_CTR should raise OverflowError on wraparound when shortcut used""" % (self.module_name,)
  167. def runTest(self):
  168. from Crypto.Util import Counter
  169. for disable_shortcut in (0, 1): # (False, True) Test CTR-mode shortcut and PyObject_CallObject code paths
  170. for little_endian in (0, 1): # (False, True) Test both endiannesses
  171. ctr = Counter.new(8*self.module.block_size, initial_value=2L**(8*self.module.block_size)-1, little_endian=little_endian, disable_shortcut=disable_shortcut)
  172. cipher = self.module.new(a2b_hex(self.key), self.module.MODE_CTR, counter=ctr)
  173. block = b("\x00") * self.module.block_size
  174. cipher.encrypt(block)
  175. self.assertRaises(OverflowError, cipher.encrypt, block)
  176. class CFBSegmentSizeTest(unittest.TestCase):
  177. def __init__(self, module, params):
  178. unittest.TestCase.__init__(self)
  179. self.module = module
  180. self.key = b(params['key'])
  181. self.description = params['description']
  182. def shortDescription(self):
  183. return self.description
  184. def runTest(self):
  185. """Regression test: m.new(key, m.MODE_CFB, segment_size=N) should require segment_size to be a multiple of 8 bits"""
  186. for i in range(1, 8):
  187. self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), self.module.MODE_CFB, segment_size=i)
  188. self.module.new(a2b_hex(self.key), self.module.MODE_CFB, "\0"*self.module.block_size, segment_size=8) # should succeed
  189. class RoundtripTest(unittest.TestCase):
  190. def __init__(self, module, params):
  191. from Crypto import Random
  192. unittest.TestCase.__init__(self)
  193. self.module = module
  194. self.iv = Random.get_random_bytes(module.block_size)
  195. self.key = b(params['key'])
  196. self.plaintext = 100 * b(params['plaintext'])
  197. self.module_name = params.get('module_name', None)
  198. def shortDescription(self):
  199. return """%s .decrypt() output of .encrypt() should not be garbled""" % (self.module_name,)
  200. def runTest(self):
  201. for mode in (self.module.MODE_ECB, self.module.MODE_CBC, self.module.MODE_CFB, self.module.MODE_OFB, self.module.MODE_OPENPGP):
  202. encryption_cipher = self.module.new(a2b_hex(self.key), mode, self.iv)
  203. ciphertext = encryption_cipher.encrypt(self.plaintext)
  204. if mode != self.module.MODE_OPENPGP:
  205. decryption_cipher = self.module.new(a2b_hex(self.key), mode, self.iv)
  206. else:
  207. eiv = ciphertext[:self.module.block_size+2]
  208. ciphertext = ciphertext[self.module.block_size+2:]
  209. decryption_cipher = self.module.new(a2b_hex(self.key), mode, eiv)
  210. decrypted_plaintext = decryption_cipher.decrypt(ciphertext)
  211. self.assertEqual(self.plaintext, decrypted_plaintext)
  212. class PGPTest(unittest.TestCase):
  213. def __init__(self, module, params):
  214. unittest.TestCase.__init__(self)
  215. self.module = module
  216. self.key = b(params['key'])
  217. def shortDescription(self):
  218. return "MODE_PGP was implemented incorrectly and insecurely. It's completely banished now."
  219. def runTest(self):
  220. self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
  221. self.module.MODE_PGP)
  222. class IVLengthTest(unittest.TestCase):
  223. def __init__(self, module, params):
  224. unittest.TestCase.__init__(self)
  225. self.module = module
  226. self.key = b(params['key'])
  227. def shortDescription(self):
  228. return "Check that all modes except MODE_ECB and MODE_CTR require an IV of the proper length"
  229. def runTest(self):
  230. self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
  231. self.module.MODE_CBC, "")
  232. self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
  233. self.module.MODE_CFB, "")
  234. self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
  235. self.module.MODE_OFB, "")
  236. self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
  237. self.module.MODE_OPENPGP, "")
  238. self.module.new(a2b_hex(self.key), self.module.MODE_ECB, "")
  239. self.module.new(a2b_hex(self.key), self.module.MODE_CTR, "", counter=self._dummy_counter)
  240. def _dummy_counter(self):
  241. return "\0" * self.module.block_size
  242. def make_block_tests(module, module_name, test_data):
  243. tests = []
  244. extra_tests_added = 0
  245. for i in range(len(test_data)):
  246. row = test_data[i]
  247. # Build the "params" dictionary
  248. params = {'mode': 'ECB'}
  249. if len(row) == 3:
  250. (params['plaintext'], params['ciphertext'], params['key']) = row
  251. elif len(row) == 4:
  252. (params['plaintext'], params['ciphertext'], params['key'], params['description']) = row
  253. elif len(row) == 5:
  254. (params['plaintext'], params['ciphertext'], params['key'], params['description'], extra_params) = row
  255. params.update(extra_params)
  256. else:
  257. raise AssertionError("Unsupported tuple size %d" % (len(row),))
  258. # Build the display-name for the test
  259. p2 = params.copy()
  260. p_key = _extract(p2, 'key')
  261. p_plaintext = _extract(p2, 'plaintext')
  262. p_ciphertext = _extract(p2, 'ciphertext')
  263. p_description = _extract(p2, 'description', None)
  264. p_mode = p2.get('mode', 'ECB')
  265. if p_mode == 'ECB':
  266. _extract(p2, 'mode', 'ECB')
  267. if p_description is not None:
  268. description = p_description
  269. elif p_mode == 'ECB' and not p2:
  270. description = "p=%s, k=%s" % (p_plaintext, p_key)
  271. else:
  272. description = "p=%s, k=%s, %r" % (p_plaintext, p_key, p2)
  273. name = "%s #%d: %s" % (module_name, i+1, description)
  274. params['description'] = name
  275. params['module_name'] = module_name
  276. # Add extra test(s) to the test suite before the current test
  277. if not extra_tests_added:
  278. tests += [
  279. CTRSegfaultTest(module, params),
  280. CTRWraparoundTest(module, params),
  281. CFBSegmentSizeTest(module, params),
  282. RoundtripTest(module, params),
  283. PGPTest(module, params),
  284. IVLengthTest(module, params),
  285. ]
  286. extra_tests_added = 1
  287. # Add the current test to the test suite
  288. tests.append(CipherSelfTest(module, params))
  289. # When using CTR mode, test that the interface behaves like a stream cipher
  290. if p_mode == 'CTR':
  291. tests.append(CipherStreamingSelfTest(module, params))
  292. # When using CTR mode, test the non-shortcut code path.
  293. if p_mode == 'CTR' and not params.has_key('ctr_class'):
  294. params2 = params.copy()
  295. params2['description'] += " (shortcut disabled)"
  296. ctr_params2 = params.get('ctr_params', {}).copy()
  297. params2['ctr_params'] = ctr_params2
  298. if not params2['ctr_params'].has_key('disable_shortcut'):
  299. params2['ctr_params']['disable_shortcut'] = 1
  300. tests.append(CipherSelfTest(module, params2))
  301. return tests
  302. def make_stream_tests(module, module_name, test_data):
  303. tests = []
  304. for i in range(len(test_data)):
  305. row = test_data[i]
  306. # Build the "params" dictionary
  307. params = {}
  308. if len(row) == 3:
  309. (params['plaintext'], params['ciphertext'], params['key']) = row
  310. elif len(row) == 4:
  311. (params['plaintext'], params['ciphertext'], params['key'], params['description']) = row
  312. elif len(row) == 5:
  313. (params['plaintext'], params['ciphertext'], params['key'], params['description'], extra_params) = row
  314. params.update(extra_params)
  315. else:
  316. raise AssertionError("Unsupported tuple size %d" % (len(row),))
  317. # Build the display-name for the test
  318. p2 = params.copy()
  319. p_key = _extract(p2, 'key')
  320. p_plaintext = _extract(p2, 'plaintext')
  321. p_ciphertext = _extract(p2, 'ciphertext')
  322. p_description = _extract(p2, 'description', None)
  323. if p_description is not None:
  324. description = p_description
  325. elif not p2:
  326. description = "p=%s, k=%s" % (p_plaintext, p_key)
  327. else:
  328. description = "p=%s, k=%s, %r" % (p_plaintext, p_key, p2)
  329. name = "%s #%d: %s" % (module_name, i+1, description)
  330. params['description'] = name
  331. params['module_name'] = module_name
  332. # Add the test to the test suite
  333. tests.append(CipherSelfTest(module, params))
  334. tests.append(CipherStreamingSelfTest(module, params))
  335. return tests
  336. # vim:set ts=4 sw=4 sts=4 expandtab: