runner.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """
  6. Unit test runner, providing new features on top of unittest module:
  7. - colourized output
  8. - parallel run (UNIX only)
  9. - print failures/tracebacks on CTRL+C
  10. - re-run failed tests only (make test-failed)
  11. Invocation examples:
  12. - make test
  13. - make test-failed
  14. Parallel:
  15. - make test-parallel
  16. - make test-process ARGS=--parallel
  17. """
  18. from __future__ import print_function
  19. import atexit
  20. import optparse
  21. import os
  22. import sys
  23. import textwrap
  24. import time
  25. import unittest
  26. try:
  27. import ctypes
  28. except ImportError:
  29. ctypes = None
  30. try:
  31. import concurrencytest # pip install concurrencytest
  32. except ImportError:
  33. concurrencytest = None
  34. import psutil
  35. from psutil._common import hilite
  36. from psutil._common import print_color
  37. from psutil._common import term_supports_colors
  38. from psutil._compat import super
  39. from psutil.tests import CI_TESTING
  40. from psutil.tests import import_module_by_path
  41. from psutil.tests import print_sysinfo
  42. from psutil.tests import reap_children
  43. from psutil.tests import safe_rmpath
  44. VERBOSITY = 2
  45. FAILED_TESTS_FNAME = '.failed-tests.txt'
  46. NWORKERS = psutil.cpu_count() or 1
  47. USE_COLORS = not CI_TESTING and term_supports_colors()
  48. HERE = os.path.abspath(os.path.dirname(__file__))
  49. loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
  50. def cprint(msg, color, bold=False, file=None):
  51. if file is None:
  52. file = sys.stderr if color == 'red' else sys.stdout
  53. if USE_COLORS:
  54. print_color(msg, color, bold=bold, file=file)
  55. else:
  56. print(msg, file=file)
  57. class TestLoader:
  58. testdir = HERE
  59. skip_files = ['test_memleaks.py']
  60. if "WHEELHOUSE_UPLOADER_USERNAME" in os.environ:
  61. skip_files.extend(['test_osx.py', 'test_linux.py', 'test_posix.py'])
  62. def _get_testmods(self):
  63. return [os.path.join(self.testdir, x)
  64. for x in os.listdir(self.testdir)
  65. if x.startswith('test_') and x.endswith('.py') and
  66. x not in self.skip_files]
  67. def _iter_testmod_classes(self):
  68. """Iterate over all test files in this directory and return
  69. all TestCase classes in them.
  70. """
  71. for path in self._get_testmods():
  72. mod = import_module_by_path(path)
  73. for name in dir(mod):
  74. obj = getattr(mod, name)
  75. if isinstance(obj, type) and \
  76. issubclass(obj, unittest.TestCase):
  77. yield obj
  78. def all(self):
  79. suite = unittest.TestSuite()
  80. for obj in self._iter_testmod_classes():
  81. test = loadTestsFromTestCase(obj)
  82. suite.addTest(test)
  83. return suite
  84. def last_failed(self):
  85. # ...from previously failed test run
  86. suite = unittest.TestSuite()
  87. if not os.path.isfile(FAILED_TESTS_FNAME):
  88. return suite
  89. with open(FAILED_TESTS_FNAME, 'rt') as f:
  90. names = f.read().split()
  91. for n in names:
  92. test = unittest.defaultTestLoader.loadTestsFromName(n)
  93. suite.addTest(test)
  94. return suite
  95. def from_name(self, name):
  96. if name.endswith('.py'):
  97. name = os.path.splitext(os.path.basename(name))[0]
  98. return unittest.defaultTestLoader.loadTestsFromName(name)
  99. class ColouredResult(unittest.TextTestResult):
  100. def addSuccess(self, test):
  101. unittest.TestResult.addSuccess(self, test)
  102. cprint("OK", "green")
  103. def addError(self, test, err):
  104. unittest.TestResult.addError(self, test, err)
  105. cprint("ERROR", "red", bold=True)
  106. def addFailure(self, test, err):
  107. unittest.TestResult.addFailure(self, test, err)
  108. cprint("FAIL", "red")
  109. def addSkip(self, test, reason):
  110. unittest.TestResult.addSkip(self, test, reason)
  111. cprint("skipped: %s" % reason.strip(), "brown")
  112. def printErrorList(self, flavour, errors):
  113. flavour = hilite(flavour, "red", bold=flavour == 'ERROR')
  114. super().printErrorList(flavour, errors)
  115. class ColouredTextRunner(unittest.TextTestRunner):
  116. """
  117. A coloured text runner which also prints failed tests on KeyboardInterrupt
  118. and save failed tests in a file so that they can be re-run.
  119. """
  120. resultclass = ColouredResult if USE_COLORS else unittest.TextTestResult
  121. def __init__(self, *args, **kwargs):
  122. super().__init__(*args, **kwargs)
  123. self.failed_tnames = set()
  124. def _makeResult(self):
  125. # Store result instance so that it can be accessed on
  126. # KeyboardInterrupt.
  127. self.result = super()._makeResult()
  128. return self.result
  129. def _write_last_failed(self):
  130. if self.failed_tnames:
  131. with open(FAILED_TESTS_FNAME, 'wt') as f:
  132. for tname in self.failed_tnames:
  133. f.write(tname + '\n')
  134. def _save_result(self, result):
  135. if not result.wasSuccessful():
  136. for t in result.errors + result.failures:
  137. tname = t[0].id()
  138. self.failed_tnames.add(tname)
  139. def _run(self, suite):
  140. try:
  141. result = super().run(suite)
  142. except (KeyboardInterrupt, SystemExit):
  143. result = self.runner.result
  144. result.printErrors()
  145. raise sys.exit(1)
  146. else:
  147. self._save_result(result)
  148. return result
  149. def _exit(self, success):
  150. if success:
  151. cprint("SUCCESS", "green", bold=True)
  152. safe_rmpath(FAILED_TESTS_FNAME)
  153. sys.exit(0)
  154. else:
  155. cprint("FAILED", "red", bold=True)
  156. self._write_last_failed()
  157. sys.exit(1)
  158. def run(self, suite):
  159. result = self._run(suite)
  160. self._exit(result.wasSuccessful())
  161. class ParallelRunner(ColouredTextRunner):
  162. @staticmethod
  163. def _parallelize(suite):
  164. def fdopen(fd, mode, *kwds):
  165. stream = orig_fdopen(fd, mode)
  166. atexit.register(stream.close)
  167. return stream
  168. # Monkey patch concurrencytest lib bug (fdopen() stream not closed).
  169. # https://github.com/cgoldberg/concurrencytest/issues/11
  170. orig_fdopen = os.fdopen
  171. concurrencytest.os.fdopen = fdopen
  172. forker = concurrencytest.fork_for_tests(NWORKERS)
  173. return concurrencytest.ConcurrentTestSuite(suite, forker)
  174. @staticmethod
  175. def _split_suite(suite):
  176. serial = unittest.TestSuite()
  177. parallel = unittest.TestSuite()
  178. for test in suite:
  179. if test.countTestCases() == 0:
  180. continue
  181. elif isinstance(test, unittest.TestSuite):
  182. test_class = test._tests[0].__class__
  183. elif isinstance(test, unittest.TestCase):
  184. test_class = test
  185. else:
  186. raise TypeError("can't recognize type %r" % test)
  187. if getattr(test_class, '_serialrun', False):
  188. serial.addTest(test)
  189. else:
  190. parallel.addTest(test)
  191. return (serial, parallel)
  192. def run(self, suite):
  193. ser_suite, par_suite = self._split_suite(suite)
  194. par_suite = self._parallelize(par_suite)
  195. # run parallel
  196. cprint("starting parallel tests using %s workers" % NWORKERS,
  197. "green", bold=True)
  198. t = time.time()
  199. par = self._run(par_suite)
  200. par_elapsed = time.time() - t
  201. # At this point we should have N zombies (the workers), which
  202. # will disappear with wait().
  203. orphans = psutil.Process().children()
  204. gone, alive = psutil.wait_procs(orphans, timeout=1)
  205. if alive:
  206. cprint("alive processes %s" % alive, "red")
  207. reap_children()
  208. # run serial
  209. t = time.time()
  210. ser = self._run(ser_suite)
  211. ser_elapsed = time.time() - t
  212. # print
  213. if not par.wasSuccessful() and ser_suite.countTestCases() > 0:
  214. par.printErrors() # print them again at the bottom
  215. par_fails, par_errs, par_skips = map(len, (par.failures,
  216. par.errors,
  217. par.skipped))
  218. ser_fails, ser_errs, ser_skips = map(len, (ser.failures,
  219. ser.errors,
  220. ser.skipped))
  221. print(textwrap.dedent("""
  222. +----------+----------+----------+----------+----------+----------+
  223. | | total | failures | errors | skipped | time |
  224. +----------+----------+----------+----------+----------+----------+
  225. | parallel | %3s | %3s | %3s | %3s | %.2fs |
  226. +----------+----------+----------+----------+----------+----------+
  227. | serial | %3s | %3s | %3s | %3s | %.2fs |
  228. +----------+----------+----------+----------+----------+----------+
  229. """ % (par.testsRun, par_fails, par_errs, par_skips, par_elapsed,
  230. ser.testsRun, ser_fails, ser_errs, ser_skips, ser_elapsed)))
  231. print("Ran %s tests in %.3fs using %s workers" % (
  232. par.testsRun + ser.testsRun, par_elapsed + ser_elapsed, NWORKERS))
  233. ok = par.wasSuccessful() and ser.wasSuccessful()
  234. self._exit(ok)
  235. def get_runner(parallel=False):
  236. def warn(msg):
  237. cprint(msg + " Running serial tests instead.", "red")
  238. if parallel:
  239. if psutil.WINDOWS:
  240. warn("Can't run parallel tests on Windows.")
  241. elif concurrencytest is None:
  242. warn("concurrencytest module is not installed.")
  243. elif NWORKERS == 1:
  244. warn("Only 1 CPU available.")
  245. else:
  246. return ParallelRunner(verbosity=VERBOSITY)
  247. return ColouredTextRunner(verbosity=VERBOSITY)
  248. # Used by test_*,py modules.
  249. def run_from_name(name):
  250. suite = TestLoader().from_name(name)
  251. runner = get_runner()
  252. runner.run(suite)
  253. def setup():
  254. if 'PSUTIL_TESTING' not in os.environ:
  255. # This won't work on Windows but set_testing() below will do it.
  256. os.environ['PSUTIL_TESTING'] = '1'
  257. psutil._psplatform.cext.set_testing()
  258. def main():
  259. setup()
  260. usage = "python3 -m psutil.tests [opts] [test-name]"
  261. parser = optparse.OptionParser(usage=usage, description="run unit tests")
  262. parser.add_option("--last-failed",
  263. action="store_true", default=False,
  264. help="only run last failed tests")
  265. parser.add_option("--parallel",
  266. action="store_true", default=False,
  267. help="run tests in parallel")
  268. opts, args = parser.parse_args()
  269. if not opts.last_failed:
  270. safe_rmpath(FAILED_TESTS_FNAME)
  271. # loader
  272. loader = TestLoader()
  273. if args:
  274. if len(args) > 1:
  275. parser.print_usage()
  276. return sys.exit(1)
  277. else:
  278. suite = loader.from_name(args[0])
  279. elif opts.last_failed:
  280. suite = loader.last_failed()
  281. else:
  282. suite = loader.all()
  283. if CI_TESTING:
  284. print_sysinfo()
  285. runner = get_runner(opts.parallel)
  286. runner.run(suite)
  287. if __name__ == '__main__':
  288. main()