test__UserFriendlyRNG.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # -*- coding: utf-8 -*-
  2. # Self-tests for the user-friendly Crypto.Random interface
  3. #
  4. # Written in 2013 by Dwayne C. Litzenberger <dlitz@dlitz.net>
  5. #
  6. # ===================================================================
  7. # The contents of this file are dedicated to the public domain. To
  8. # the extent that dedication to the public domain is not available,
  9. # everyone is granted a worldwide, perpetual, royalty-free,
  10. # non-exclusive license to exercise all rights associated with the
  11. # contents of this file for any purpose whatsoever.
  12. # No rights are reserved.
  13. #
  14. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  15. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  16. # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  17. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  18. # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  19. # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  20. # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  21. # SOFTWARE.
  22. # ===================================================================
  23. """Self-test suite for generic Crypto.Random stuff """
  24. from __future__ import nested_scopes
  25. __revision__ = "$Id$"
  26. import binascii
  27. import pprint
  28. import unittest
  29. import os
  30. import time
  31. import sys
  32. if sys.version_info[0] == 2 and sys.version_info[1] == 1:
  33. from Crypto.Util.py21compat import *
  34. from Crypto.Util.py3compat import *
  35. try:
  36. import multiprocessing
  37. except ImportError:
  38. multiprocessing = None
  39. import Crypto.Random._UserFriendlyRNG
  40. import Crypto.Random.random
  41. class RNGForkTest(unittest.TestCase):
  42. def _get_reseed_count(self):
  43. """
  44. Get `FortunaAccumulator.reseed_count`, the global count of the
  45. number of times that the PRNG has been reseeded.
  46. """
  47. rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
  48. rng_singleton._lock.acquire()
  49. try:
  50. return rng_singleton._fa.reseed_count
  51. finally:
  52. rng_singleton._lock.release()
  53. def runTest(self):
  54. # Regression test for CVE-2013-1445. We had a bug where, under the
  55. # right conditions, two processes might see the same random sequence.
  56. if sys.platform.startswith('win'): # windows can't fork
  57. assert not hasattr(os, 'fork') # ... right?
  58. return
  59. # Wait 150 ms so that we don't trigger the rate-limit prematurely.
  60. time.sleep(0.15)
  61. reseed_count_before = self._get_reseed_count()
  62. # One or both of these calls together should trigger a reseed right here.
  63. Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
  64. Crypto.Random.get_random_bytes(1)
  65. reseed_count_after = self._get_reseed_count()
  66. self.assertNotEqual(reseed_count_before, reseed_count_after) # sanity check: test should reseed parent before forking
  67. rfiles = []
  68. for i in range(10):
  69. rfd, wfd = os.pipe()
  70. if os.fork() == 0:
  71. # child
  72. os.close(rfd)
  73. f = os.fdopen(wfd, "wb")
  74. Crypto.Random.atfork()
  75. data = Crypto.Random.get_random_bytes(16)
  76. f.write(data)
  77. f.close()
  78. os._exit(0)
  79. # parent
  80. os.close(wfd)
  81. rfiles.append(os.fdopen(rfd, "rb"))
  82. results = []
  83. results_dict = {}
  84. for f in rfiles:
  85. data = binascii.hexlify(f.read())
  86. results.append(data)
  87. results_dict[data] = 1
  88. f.close()
  89. if len(results) != len(results_dict.keys()):
  90. raise AssertionError("RNG output duplicated across fork():\n%s" %
  91. (pprint.pformat(results)))
  92. # For RNGMultiprocessingForkTest
  93. def _task_main(q):
  94. a = Crypto.Random.get_random_bytes(16)
  95. time.sleep(0.1) # wait 100 ms
  96. b = Crypto.Random.get_random_bytes(16)
  97. q.put(binascii.b2a_hex(a))
  98. q.put(binascii.b2a_hex(b))
  99. q.put(None) # Wait for acknowledgment
  100. class RNGMultiprocessingForkTest(unittest.TestCase):
  101. def runTest(self):
  102. # Another regression test for CVE-2013-1445. This is basically the
  103. # same as RNGForkTest, but less compatible with old versions of Python,
  104. # and a little easier to read.
  105. n_procs = 5
  106. manager = multiprocessing.Manager()
  107. queues = [manager.Queue(1) for i in range(n_procs)]
  108. # Reseed the pool
  109. time.sleep(0.15)
  110. Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
  111. Crypto.Random.get_random_bytes(1)
  112. # Start the child processes
  113. pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random.atfork)
  114. map_result = pool.map_async(_task_main, queues)
  115. # Get the results, ensuring that no pool processes are reused.
  116. aa = [queues[i].get(30) for i in range(n_procs)]
  117. bb = [queues[i].get(30) for i in range(n_procs)]
  118. res = list(zip(aa, bb))
  119. # Shut down the pool
  120. map_result.get(30)
  121. pool.close()
  122. pool.join()
  123. # Check that the results are unique
  124. if len(set(aa)) != len(aa) or len(set(res)) != len(res):
  125. raise AssertionError("RNG output duplicated across fork():\n%s" %
  126. (pprint.pformat(res),))
  127. def get_tests(config={}):
  128. tests = []
  129. tests += [RNGForkTest()]
  130. if multiprocessing is not None:
  131. tests += [RNGMultiprocessingForkTest()]
  132. return tests
  133. if __name__ == '__main__':
  134. suite = lambda: unittest.TestSuite(get_tests())
  135. unittest.main(defaultTest='suite')
  136. # vim:set ts=4 sw=4 sts=4 expandtab: