test_disttrial.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.trial._dist.disttrial}.
  5. """
  6. import os
  7. import sys
  8. from twisted.internet.protocol import ProcessProtocol
  9. from twisted.internet.defer import fail, succeed
  10. from twisted.internet.task import Cooperator, deferLater
  11. from twisted.internet.main import CONNECTION_DONE
  12. from twisted.internet import reactor
  13. from twisted.python.compat import NativeStringIO as StringIO
  14. from twisted.python.failure import Failure
  15. from twisted.python.lockfile import FilesystemLock
  16. from twisted.test.test_cooperator import FakeScheduler
  17. from twisted.trial.unittest import TestCase
  18. from twisted.trial.reporter import Reporter, TreeReporter
  19. from twisted.trial.reporter import UncleanWarningsReporterWrapper
  20. from twisted.trial.runner import TrialSuite, ErrorHolder
  21. from twisted.trial._dist.disttrial import DistTrialRunner
  22. from twisted.trial._dist.distreporter import DistReporter
  23. from twisted.trial._dist.worker import LocalWorker
  24. class FakeTransport(object):
  25. """
  26. A simple fake process transport.
  27. """
  28. def writeToChild(self, fd, data):
  29. """
  30. Ignore write calls.
  31. """
  32. class FakeReactor(object):
  33. """
  34. A simple fake reactor for testing purposes.
  35. """
  36. spawnCount = 0
  37. stopCount = 0
  38. runCount = 0
  39. def spawnProcess(self, worker, *args, **kwargs):
  40. worker.makeConnection(FakeTransport())
  41. self.spawnCount += 1
  42. def stop(self):
  43. self.stopCount += 1
  44. def run(self):
  45. self.runCount += 1
  46. def addSystemEventTrigger(self, *args, **kw):
  47. pass
  48. class EternalTerminationPredicateFactory(object):
  49. """
  50. A rigged terminationPredicateFactory for which time never pass.
  51. """
  52. def __call__(self):
  53. """
  54. See: L{task._Timer}
  55. """
  56. return False
  57. class DistTrialRunnerTests(TestCase):
  58. """
  59. Tests for L{DistTrialRunner}.
  60. """
  61. def setUp(self):
  62. """
  63. Create a runner for testing.
  64. """
  65. self.runner = DistTrialRunner(TreeReporter, 4, [],
  66. workingDirectory=self.mktemp())
  67. self.runner._stream = StringIO()
  68. def getFakeSchedulerAndEternalCooperator(self):
  69. """
  70. Helper to create fake scheduler and cooperator in tests.
  71. The cooperator has a termination timer which will never inform
  72. the scheduler that the task needs to be terminated.
  73. @return: L{tuple} of (scheduler, cooperator)
  74. """
  75. scheduler = FakeScheduler()
  76. cooperator = Cooperator(
  77. scheduler=scheduler,
  78. terminationPredicateFactory=EternalTerminationPredicateFactory,
  79. )
  80. return scheduler, cooperator
  81. def test_writeResults(self):
  82. """
  83. L{DistTrialRunner.writeResults} writes to the stream specified in the
  84. init.
  85. """
  86. stringIO = StringIO()
  87. result = DistReporter(Reporter(stringIO))
  88. self.runner.writeResults(result)
  89. self.assertTrue(stringIO.tell() > 0)
  90. def test_createLocalWorkers(self):
  91. """
  92. C{createLocalWorkers} iterates the list of protocols and create one
  93. L{LocalWorker} for each.
  94. """
  95. protocols = [object() for x in range(4)]
  96. workers = self.runner.createLocalWorkers(protocols, "path")
  97. for s in workers:
  98. self.assertIsInstance(s, LocalWorker)
  99. self.assertEqual(4, len(workers))
  100. def test_launchWorkerProcesses(self):
  101. """
  102. Given a C{spawnProcess} function, C{launchWorkerProcess} launches a
  103. python process with an existing path as its argument.
  104. """
  105. protocols = [ProcessProtocol() for i in range(4)]
  106. arguments = []
  107. environment = {}
  108. def fakeSpawnProcess(processProtocol, executable, args=(), env={},
  109. path=None, uid=None, gid=None, usePTY=0,
  110. childFDs=None):
  111. arguments.append(executable)
  112. arguments.extend(args)
  113. environment.update(env)
  114. self.runner.launchWorkerProcesses(
  115. fakeSpawnProcess, protocols, ["foo"])
  116. self.assertEqual(arguments[0], arguments[1])
  117. self.assertTrue(os.path.exists(arguments[2]))
  118. self.assertEqual("foo", arguments[3])
  119. self.assertEqual(os.pathsep.join(sys.path),
  120. environment["TRIAL_PYTHONPATH"])
  121. def test_run(self):
  122. """
  123. C{run} starts the reactor exactly once and spawns each of the workers
  124. exactly once.
  125. """
  126. fakeReactor = FakeReactor()
  127. suite = TrialSuite()
  128. for i in range(10):
  129. suite.addTest(TestCase())
  130. self.runner.run(suite, fakeReactor)
  131. self.assertEqual(fakeReactor.runCount, 1)
  132. self.assertEqual(fakeReactor.spawnCount, self.runner._workerNumber)
  133. def test_runUsedDirectory(self):
  134. """
  135. L{DistTrialRunner} checks if the test directory is already locked, and
  136. if it is generates a name based on it.
  137. """
  138. class FakeReactorWithLock(FakeReactor):
  139. def spawnProcess(oself, worker, *args, **kwargs):
  140. self.assertEqual(os.path.abspath(worker._logDirectory),
  141. os.path.abspath(
  142. os.path.join(workingDirectory + "-1",
  143. str(oself.spawnCount))))
  144. localLock = FilesystemLock(workingDirectory + "-1.lock")
  145. self.assertFalse(localLock.lock())
  146. oself.spawnCount += 1
  147. worker.makeConnection(FakeTransport())
  148. worker._ampProtocol.run = lambda *args: succeed(None)
  149. newDirectory = self.mktemp()
  150. os.mkdir(newDirectory)
  151. workingDirectory = os.path.join(newDirectory, "_trial_temp")
  152. lock = FilesystemLock(workingDirectory + ".lock")
  153. lock.lock()
  154. self.addCleanup(lock.unlock)
  155. self.runner._workingDirectory = workingDirectory
  156. fakeReactor = FakeReactorWithLock()
  157. suite = TrialSuite()
  158. for i in range(10):
  159. suite.addTest(TestCase())
  160. self.runner.run(suite, fakeReactor)
  161. def test_minimalWorker(self):
  162. """
  163. L{DistTrialRunner} doesn't try to start more workers than the number of
  164. tests.
  165. """
  166. fakeReactor = FakeReactor()
  167. self.runner.run(TestCase(), fakeReactor)
  168. self.assertEqual(fakeReactor.runCount, 1)
  169. self.assertEqual(fakeReactor.spawnCount, 1)
  170. def test_runUncleanWarnings(self):
  171. """
  172. Running with the C{unclean-warnings} option makes L{DistTrialRunner}
  173. uses the L{UncleanWarningsReporterWrapper}.
  174. """
  175. fakeReactor = FakeReactor()
  176. self.runner._uncleanWarnings = True
  177. result = self.runner.run(TestCase(), fakeReactor)
  178. self.assertIsInstance(result, DistReporter)
  179. self.assertIsInstance(result.original,
  180. UncleanWarningsReporterWrapper)
  181. def test_runWithoutTest(self):
  182. """
  183. When the suite contains no test, L{DistTrialRunner} takes a shortcut
  184. path without launching any process or starting the reactor.
  185. """
  186. fakeReactor = object()
  187. suite = TrialSuite()
  188. result = self.runner.run(suite, fakeReactor)
  189. self.assertIsInstance(result, DistReporter)
  190. output = self.runner._stream.getvalue()
  191. self.assertIn("Running 0 test", output)
  192. self.assertIn("PASSED", output)
  193. def test_runWithoutTestButWithAnError(self):
  194. """
  195. Even if there is no test, the suite can contain an error (most likely,
  196. an import error): this should make the run fail, and the error should
  197. be printed.
  198. """
  199. fakeReactor = object()
  200. error = ErrorHolder("an error", Failure(RuntimeError("foo bar")))
  201. result = self.runner.run(error, fakeReactor)
  202. self.assertIsInstance(result, DistReporter)
  203. output = self.runner._stream.getvalue()
  204. self.assertIn("Running 0 test", output)
  205. self.assertIn("foo bar", output)
  206. self.assertIn("an error", output)
  207. self.assertIn("errors=1", output)
  208. self.assertIn("FAILED", output)
  209. def test_runUnexpectedError(self):
  210. """
  211. If for some reasons we can't connect to the worker process, the test
  212. suite catches and fails.
  213. """
  214. class FakeReactorWithFail(FakeReactor):
  215. def spawnProcess(self, worker, *args, **kwargs):
  216. worker.makeConnection(FakeTransport())
  217. self.spawnCount += 1
  218. worker._ampProtocol.run = self.failingRun
  219. def failingRun(self, case, result):
  220. return fail(RuntimeError("oops"))
  221. scheduler, cooperator = self.getFakeSchedulerAndEternalCooperator()
  222. fakeReactor = FakeReactorWithFail()
  223. result = self.runner.run(TestCase(), fakeReactor,
  224. cooperator.cooperate)
  225. self.assertEqual(fakeReactor.runCount, 1)
  226. self.assertEqual(fakeReactor.spawnCount, 1)
  227. scheduler.pump()
  228. self.assertEqual(1, len(result.original.failures))
  229. def test_runStopAfterTests(self):
  230. """
  231. L{DistTrialRunner} calls C{reactor.stop} and unlocks the test directory
  232. once the tests have run.
  233. """
  234. functions = []
  235. class FakeReactorWithSuccess(FakeReactor):
  236. def spawnProcess(self, worker, *args, **kwargs):
  237. worker.makeConnection(FakeTransport())
  238. self.spawnCount += 1
  239. worker._ampProtocol.run = self.succeedingRun
  240. def succeedingRun(self, case, result):
  241. return succeed(None)
  242. def addSystemEventTrigger(oself, phase, event, function):
  243. self.assertEqual('before', phase)
  244. self.assertEqual('shutdown', event)
  245. functions.append(function)
  246. workingDirectory = self.runner._workingDirectory
  247. fakeReactor = FakeReactorWithSuccess()
  248. self.runner.run(TestCase(), fakeReactor)
  249. def check():
  250. localLock = FilesystemLock(workingDirectory + ".lock")
  251. self.assertTrue(localLock.lock())
  252. self.assertEqual(1, fakeReactor.stopCount)
  253. # We don't wait for the process deferreds here, so nothing is
  254. # returned by the function before shutdown
  255. self.assertIdentical(None, functions[0]())
  256. return deferLater(reactor, 0, check)
  257. def test_runWaitForProcessesDeferreds(self):
  258. """
  259. L{DistTrialRunner} waits for the worker processes to stop when the
  260. reactor is stopping, and then unlocks the test directory, not trying to
  261. stop the reactor again.
  262. """
  263. functions = []
  264. workers = []
  265. class FakeReactorWithEvent(FakeReactor):
  266. def spawnProcess(self, worker, *args, **kwargs):
  267. worker.makeConnection(FakeTransport())
  268. workers.append(worker)
  269. def addSystemEventTrigger(oself, phase, event, function):
  270. self.assertEqual('before', phase)
  271. self.assertEqual('shutdown', event)
  272. functions.append(function)
  273. workingDirectory = self.runner._workingDirectory
  274. fakeReactor = FakeReactorWithEvent()
  275. self.runner.run(TestCase(), fakeReactor)
  276. def check(ign):
  277. # Let the AMP deferreds fire
  278. return deferLater(reactor, 0, realCheck)
  279. def realCheck():
  280. localLock = FilesystemLock(workingDirectory + ".lock")
  281. self.assertTrue(localLock.lock())
  282. # Stop is not called, as it ought to have been called before
  283. self.assertEqual(0, fakeReactor.stopCount)
  284. workers[0].processEnded(Failure(CONNECTION_DONE))
  285. return functions[0]().addCallback(check)
  286. def test_runUntilFailure(self):
  287. """
  288. L{DistTrialRunner} can run in C{untilFailure} mode where it will run
  289. the given tests until they fail.
  290. """
  291. called = []
  292. class FakeReactorWithSuccess(FakeReactor):
  293. def spawnProcess(self, worker, *args, **kwargs):
  294. worker.makeConnection(FakeTransport())
  295. self.spawnCount += 1
  296. worker._ampProtocol.run = self.succeedingRun
  297. def succeedingRun(self, case, result):
  298. called.append(None)
  299. if len(called) == 5:
  300. return fail(RuntimeError("oops"))
  301. return succeed(None)
  302. fakeReactor = FakeReactorWithSuccess()
  303. scheduler, cooperator = self.getFakeSchedulerAndEternalCooperator()
  304. result = self.runner.run(
  305. TestCase(), fakeReactor, cooperate=cooperator.cooperate,
  306. untilFailure=True)
  307. scheduler.pump()
  308. self.assertEqual(5, len(called))
  309. self.assertFalse(result.wasSuccessful())
  310. output = self.runner._stream.getvalue()
  311. self.assertIn("PASSED", output)
  312. self.assertIn("FAIL", output)