test_iutils.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test running processes with the APIs in L{twisted.internet.utils}.
  5. """
  6. from __future__ import division, absolute_import
  7. import warnings, os, stat, sys, signal
  8. from twisted.python.compat import _PY3
  9. from twisted.python.runtime import platform
  10. from twisted.trial import unittest
  11. from twisted.internet import error, reactor, utils, interfaces
  12. from twisted.internet.defer import Deferred
  13. from twisted.python.test.test_util import SuppressedWarningsTests
  14. class ProcessUtilsTests(unittest.TestCase):
  15. """
  16. Test running a process using L{getProcessOutput}, L{getProcessValue}, and
  17. L{getProcessOutputAndValue}.
  18. """
  19. if interfaces.IReactorProcess(reactor, None) is None:
  20. skip = "reactor doesn't implement IReactorProcess"
  21. output = None
  22. value = None
  23. exe = sys.executable
  24. def makeSourceFile(self, sourceLines):
  25. """
  26. Write the given list of lines to a text file and return the absolute
  27. path to it.
  28. """
  29. script = self.mktemp()
  30. with open(script, 'wt') as scriptFile:
  31. scriptFile.write(os.linesep.join(sourceLines) + os.linesep)
  32. return os.path.abspath(script)
  33. def test_output(self):
  34. """
  35. L{getProcessOutput} returns a L{Deferred} which fires with the complete
  36. output of the process it runs after that process exits.
  37. """
  38. scriptFile = self.makeSourceFile([
  39. "import sys",
  40. "for s in b'hello world\\n':",
  41. " if hasattr(sys.stdout, 'buffer'):",
  42. " # Python 3",
  43. " s = bytes([s])",
  44. " sys.stdout.buffer.write(s)",
  45. " else:",
  46. " # Python 2",
  47. " sys.stdout.write(s)",
  48. " sys.stdout.flush()"])
  49. d = utils.getProcessOutput(self.exe, ['-u', scriptFile])
  50. return d.addCallback(self.assertEqual, b"hello world\n")
  51. def test_outputWithErrorIgnored(self):
  52. """
  53. The L{Deferred} returned by L{getProcessOutput} is fired with an
  54. L{IOError} L{Failure} if the child process writes to stderr.
  55. """
  56. # make sure stderr raises an error normally
  57. scriptFile = self.makeSourceFile([
  58. 'import sys',
  59. 'sys.stderr.write("hello world\\n")'
  60. ])
  61. d = utils.getProcessOutput(self.exe, ['-u', scriptFile])
  62. d = self.assertFailure(d, IOError)
  63. def cbFailed(err):
  64. return self.assertFailure(err.processEnded, error.ProcessDone)
  65. d.addCallback(cbFailed)
  66. return d
  67. def test_outputWithErrorCollected(self):
  68. """
  69. If a C{True} value is supplied for the C{errortoo} parameter to
  70. L{getProcessOutput}, the returned L{Deferred} fires with the child's
  71. stderr output as well as its stdout output.
  72. """
  73. scriptFile = self.makeSourceFile([
  74. 'import sys',
  75. # Write the same value to both because ordering isn't guaranteed so
  76. # this simplifies the test.
  77. 'sys.stdout.write("foo")',
  78. 'sys.stdout.flush()',
  79. 'sys.stderr.write("foo")',
  80. 'sys.stderr.flush()'])
  81. d = utils.getProcessOutput(self.exe, ['-u', scriptFile], errortoo=True)
  82. return d.addCallback(self.assertEqual, b"foofoo")
  83. def test_value(self):
  84. """
  85. The L{Deferred} returned by L{getProcessValue} is fired with the exit
  86. status of the child process.
  87. """
  88. scriptFile = self.makeSourceFile(["raise SystemExit(1)"])
  89. d = utils.getProcessValue(self.exe, ['-u', scriptFile])
  90. return d.addCallback(self.assertEqual, 1)
  91. def test_outputAndValue(self):
  92. """
  93. The L{Deferred} returned by L{getProcessOutputAndValue} fires with a
  94. three-tuple, the elements of which give the data written to the child's
  95. stdout, the data written to the child's stderr, and the exit status of
  96. the child.
  97. """
  98. scriptFile = self.makeSourceFile([
  99. "import sys",
  100. "if hasattr(sys.stdout, 'buffer'):",
  101. " # Python 3",
  102. " sys.stdout.buffer.write(b'hello world!\\n')",
  103. " sys.stderr.buffer.write(b'goodbye world!\\n')",
  104. "else:",
  105. " # Python 2",
  106. " sys.stdout.write(b'hello world!\\n')",
  107. " sys.stderr.write(b'goodbye world!\\n')",
  108. "sys.exit(1)"
  109. ])
  110. def gotOutputAndValue(out_err_code):
  111. out, err, code = out_err_code
  112. self.assertEqual(out, b"hello world!\n")
  113. if _PY3:
  114. self.assertEqual(err, b"goodbye world!\n")
  115. else:
  116. self.assertEqual(err, b"goodbye world!" +
  117. os.linesep)
  118. self.assertEqual(code, 1)
  119. d = utils.getProcessOutputAndValue(self.exe, ["-u", scriptFile])
  120. return d.addCallback(gotOutputAndValue)
  121. def test_outputSignal(self):
  122. """
  123. If the child process exits because of a signal, the L{Deferred}
  124. returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple
  125. containing the child's stdout, stderr, and the signal which caused
  126. it to exit.
  127. """
  128. # Use SIGKILL here because it's guaranteed to be delivered. Using
  129. # SIGHUP might not work in, e.g., a buildbot slave run under the
  130. # 'nohup' command.
  131. scriptFile = self.makeSourceFile([
  132. "import sys, os, signal",
  133. "sys.stdout.write('stdout bytes\\n')",
  134. "sys.stderr.write('stderr bytes\\n')",
  135. "sys.stdout.flush()",
  136. "sys.stderr.flush()",
  137. "os.kill(os.getpid(), signal.SIGKILL)"])
  138. def gotOutputAndValue(out_err_sig):
  139. out, err, sig = out_err_sig
  140. self.assertEqual(out, b"stdout bytes\n")
  141. self.assertEqual(err, b"stderr bytes\n")
  142. self.assertEqual(sig, signal.SIGKILL)
  143. d = utils.getProcessOutputAndValue(self.exe, ['-u', scriptFile])
  144. d = self.assertFailure(d, tuple)
  145. return d.addCallback(gotOutputAndValue)
  146. if platform.isWindows():
  147. test_outputSignal.skip = "Windows doesn't have real signals."
  148. def _pathTest(self, utilFunc, check):
  149. dir = os.path.abspath(self.mktemp())
  150. os.makedirs(dir)
  151. scriptFile = self.makeSourceFile([
  152. "import os, sys",
  153. "sys.stdout.write(os.getcwd())"])
  154. d = utilFunc(self.exe, ['-u', scriptFile], path=dir)
  155. d.addCallback(check, dir.encode(sys.getfilesystemencoding()))
  156. return d
  157. def test_getProcessOutputPath(self):
  158. """
  159. L{getProcessOutput} runs the given command with the working directory
  160. given by the C{path} parameter.
  161. """
  162. return self._pathTest(utils.getProcessOutput, self.assertEqual)
  163. def test_getProcessValuePath(self):
  164. """
  165. L{getProcessValue} runs the given command with the working directory
  166. given by the C{path} parameter.
  167. """
  168. def check(result, ignored):
  169. self.assertEqual(result, 0)
  170. return self._pathTest(utils.getProcessValue, check)
  171. def test_getProcessOutputAndValuePath(self):
  172. """
  173. L{getProcessOutputAndValue} runs the given command with the working
  174. directory given by the C{path} parameter.
  175. """
  176. def check(out_err_status, dir):
  177. out, err, status = out_err_status
  178. self.assertEqual(out, dir)
  179. self.assertEqual(status, 0)
  180. return self._pathTest(utils.getProcessOutputAndValue, check)
  181. def _defaultPathTest(self, utilFunc, check):
  182. # Make another directory to mess around with.
  183. dir = os.path.abspath(self.mktemp())
  184. os.makedirs(dir)
  185. scriptFile = self.makeSourceFile([
  186. "import os, sys, stat",
  187. # Fix the permissions so we can report the working directory.
  188. # On OS X (and maybe elsewhere), os.getcwd() fails with EACCES
  189. # if +x is missing from the working directory.
  190. "os.chmod(%r, stat.S_IXUSR)" % (dir,),
  191. "sys.stdout.write(os.getcwd())"])
  192. # Switch to it, but make sure we switch back
  193. self.addCleanup(os.chdir, os.getcwd())
  194. os.chdir(dir)
  195. # Get rid of all its permissions, but make sure they get cleaned up
  196. # later, because otherwise it might be hard to delete the trial
  197. # temporary directory.
  198. self.addCleanup(
  199. os.chmod, dir, stat.S_IMODE(os.stat('.').st_mode))
  200. os.chmod(dir, 0)
  201. # Pass in -S so that if run using the coverage .pth trick, it won't be
  202. # loaded and cause Coverage to try and get the current working
  203. # directory (see the comments above why this can be a problem) on OSX.
  204. d = utilFunc(self.exe, ['-S', '-u', scriptFile])
  205. d.addCallback(check, dir.encode(sys.getfilesystemencoding()))
  206. return d
  207. def test_getProcessOutputDefaultPath(self):
  208. """
  209. If no value is supplied for the C{path} parameter, L{getProcessOutput}
  210. runs the given command in the same working directory as the parent
  211. process and succeeds even if the current working directory is not
  212. accessible.
  213. """
  214. return self._defaultPathTest(utils.getProcessOutput, self.assertEqual)
  215. def test_getProcessValueDefaultPath(self):
  216. """
  217. If no value is supplied for the C{path} parameter, L{getProcessValue}
  218. runs the given command in the same working directory as the parent
  219. process and succeeds even if the current working directory is not
  220. accessible.
  221. """
  222. def check(result, ignored):
  223. self.assertEqual(result, 0)
  224. return self._defaultPathTest(utils.getProcessValue, check)
  225. def test_getProcessOutputAndValueDefaultPath(self):
  226. """
  227. If no value is supplied for the C{path} parameter,
  228. L{getProcessOutputAndValue} runs the given command in the same working
  229. directory as the parent process and succeeds even if the current
  230. working directory is not accessible.
  231. """
  232. def check(out_err_status, dir):
  233. out, err, status = out_err_status
  234. self.assertEqual(out, dir)
  235. self.assertEqual(status, 0)
  236. return self._defaultPathTest(
  237. utils.getProcessOutputAndValue, check)
  238. class SuppressWarningsTests(unittest.SynchronousTestCase):
  239. """
  240. Tests for L{utils.suppressWarnings}.
  241. """
  242. def test_suppressWarnings(self):
  243. """
  244. L{utils.suppressWarnings} decorates a function so that the given
  245. warnings are suppressed.
  246. """
  247. result = []
  248. def showwarning(self, *a, **kw):
  249. result.append((a, kw))
  250. self.patch(warnings, "showwarning", showwarning)
  251. def f(msg):
  252. warnings.warn(msg)
  253. g = utils.suppressWarnings(f, (('ignore',), dict(message="This is message")))
  254. # Start off with a sanity check - calling the original function
  255. # should emit the warning.
  256. f("Sanity check message")
  257. self.assertEqual(len(result), 1)
  258. # Now that that's out of the way, call the wrapped function, and
  259. # make sure no new warnings show up.
  260. g("This is message")
  261. self.assertEqual(len(result), 1)
  262. # Finally, emit another warning which should not be ignored, and
  263. # make sure it is not.
  264. g("Unignored message")
  265. self.assertEqual(len(result), 2)
  266. class DeferredSuppressedWarningsTests(SuppressedWarningsTests):
  267. """
  268. Tests for L{utils.runWithWarningsSuppressed}, the version that supports
  269. Deferreds.
  270. """
  271. # Override the non-Deferred-supporting function from the base class with
  272. # the function we are testing in this class:
  273. runWithWarningsSuppressed = staticmethod(utils.runWithWarningsSuppressed)
  274. def test_deferredCallback(self):
  275. """
  276. If the function called by L{utils.runWithWarningsSuppressed} returns a
  277. C{Deferred}, the warning filters aren't removed until the Deferred
  278. fires.
  279. """
  280. filters = [(("ignore", ".*foo.*"), {}),
  281. (("ignore", ".*bar.*"), {})]
  282. result = Deferred()
  283. self.runWithWarningsSuppressed(filters, lambda: result)
  284. warnings.warn("ignore foo")
  285. result.callback(3)
  286. warnings.warn("ignore foo 2")
  287. self.assertEqual(
  288. ["ignore foo 2"], [w['message'] for w in self.flushWarnings()])
  289. def test_deferredErrback(self):
  290. """
  291. If the function called by L{utils.runWithWarningsSuppressed} returns a
  292. C{Deferred}, the warning filters aren't removed until the Deferred
  293. fires with an errback.
  294. """
  295. filters = [(("ignore", ".*foo.*"), {}),
  296. (("ignore", ".*bar.*"), {})]
  297. result = Deferred()
  298. d = self.runWithWarningsSuppressed(filters, lambda: result)
  299. warnings.warn("ignore foo")
  300. result.errback(ZeroDivisionError())
  301. d.addErrback(lambda f: f.trap(ZeroDivisionError))
  302. warnings.warn("ignore foo 2")
  303. self.assertEqual(
  304. ["ignore foo 2"], [w['message'] for w in self.flushWarnings()])