capture.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. """
  2. per-test stdout/stderr capturing mechanism.
  3. """
  4. from __future__ import absolute_import, division, print_function
  5. import collections
  6. import contextlib
  7. import sys
  8. import os
  9. import io
  10. from io import UnsupportedOperation
  11. from tempfile import TemporaryFile
  12. import six
  13. import pytest
  14. from _pytest.compat import CaptureIO
  15. patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
  16. def pytest_addoption(parser):
  17. group = parser.getgroup("general")
  18. group._addoption(
  19. "--capture",
  20. action="store",
  21. default="fd" if hasattr(os, "dup") else "sys",
  22. metavar="method",
  23. choices=["fd", "sys", "no"],
  24. help="per-test capturing method: one of fd|sys|no.",
  25. )
  26. group._addoption(
  27. "-s",
  28. action="store_const",
  29. const="no",
  30. dest="capture",
  31. help="shortcut for --capture=no.",
  32. )
  33. @pytest.hookimpl(hookwrapper=True)
  34. def pytest_load_initial_conftests(early_config, parser, args):
  35. ns = early_config.known_args_namespace
  36. if ns.capture == "fd":
  37. _py36_windowsconsoleio_workaround(sys.stdout)
  38. _colorama_workaround()
  39. _readline_workaround()
  40. pluginmanager = early_config.pluginmanager
  41. capman = CaptureManager(ns.capture)
  42. pluginmanager.register(capman, "capturemanager")
  43. # make sure that capturemanager is properly reset at final shutdown
  44. early_config.add_cleanup(capman.stop_global_capturing)
  45. # make sure logging does not raise exceptions at the end
  46. def silence_logging_at_shutdown():
  47. if "logging" in sys.modules:
  48. sys.modules["logging"].raiseExceptions = False
  49. early_config.add_cleanup(silence_logging_at_shutdown)
  50. # finally trigger conftest loading but while capturing (issue93)
  51. capman.start_global_capturing()
  52. outcome = yield
  53. capman.suspend_global_capture()
  54. if outcome.excinfo is not None:
  55. out, err = capman.read_global_capture()
  56. sys.stdout.write(out)
  57. sys.stderr.write(err)
  58. class CaptureManager(object):
  59. """
  60. Capture plugin, manages that the appropriate capture method is enabled/disabled during collection and each
  61. test phase (setup, call, teardown). After each of those points, the captured output is obtained and
  62. attached to the collection/runtest report.
  63. There are two levels of capture:
  64. * global: which is enabled by default and can be suppressed by the ``-s`` option. This is always enabled/disabled
  65. during collection and each test phase.
  66. * fixture: when a test function or one of its fixture depend on the ``capsys`` or ``capfd`` fixtures. In this
  67. case special handling is needed to ensure the fixtures take precedence over the global capture.
  68. """
  69. def __init__(self, method):
  70. self._method = method
  71. self._global_capturing = None
  72. self._current_item = None
  73. def _getcapture(self, method):
  74. if method == "fd":
  75. return MultiCapture(out=True, err=True, Capture=FDCapture)
  76. elif method == "sys":
  77. return MultiCapture(out=True, err=True, Capture=SysCapture)
  78. elif method == "no":
  79. return MultiCapture(out=False, err=False, in_=False)
  80. else:
  81. raise ValueError("unknown capturing method: %r" % method)
  82. # Global capturing control
  83. def start_global_capturing(self):
  84. assert self._global_capturing is None
  85. self._global_capturing = self._getcapture(self._method)
  86. self._global_capturing.start_capturing()
  87. def stop_global_capturing(self):
  88. if self._global_capturing is not None:
  89. self._global_capturing.pop_outerr_to_orig()
  90. self._global_capturing.stop_capturing()
  91. self._global_capturing = None
  92. def resume_global_capture(self):
  93. self._global_capturing.resume_capturing()
  94. def suspend_global_capture(self, in_=False):
  95. cap = getattr(self, "_global_capturing", None)
  96. if cap is not None:
  97. cap.suspend_capturing(in_=in_)
  98. def read_global_capture(self):
  99. return self._global_capturing.readouterr()
  100. # Fixture Control (its just forwarding, think about removing this later)
  101. def activate_fixture(self, item):
  102. """If the current item is using ``capsys`` or ``capfd``, activate them so they take precedence over
  103. the global capture.
  104. """
  105. fixture = getattr(item, "_capture_fixture", None)
  106. if fixture is not None:
  107. fixture._start()
  108. def deactivate_fixture(self, item):
  109. """Deactivates the ``capsys`` or ``capfd`` fixture of this item, if any."""
  110. fixture = getattr(item, "_capture_fixture", None)
  111. if fixture is not None:
  112. fixture.close()
  113. def suspend_fixture(self, item):
  114. fixture = getattr(item, "_capture_fixture", None)
  115. if fixture is not None:
  116. fixture._suspend()
  117. def resume_fixture(self, item):
  118. fixture = getattr(item, "_capture_fixture", None)
  119. if fixture is not None:
  120. fixture._resume()
  121. # Helper context managers
  122. @contextlib.contextmanager
  123. def global_and_fixture_disabled(self):
  124. """Context manager to temporarily disables global and current fixture capturing."""
  125. # Need to undo local capsys-et-al if exists before disabling global capture
  126. self.suspend_fixture(self._current_item)
  127. self.suspend_global_capture(in_=False)
  128. try:
  129. yield
  130. finally:
  131. self.resume_global_capture()
  132. self.resume_fixture(self._current_item)
  133. @contextlib.contextmanager
  134. def item_capture(self, when, item):
  135. self.resume_global_capture()
  136. self.activate_fixture(item)
  137. try:
  138. yield
  139. finally:
  140. self.deactivate_fixture(item)
  141. self.suspend_global_capture(in_=False)
  142. out, err = self.read_global_capture()
  143. item.add_report_section(when, "stdout", out)
  144. item.add_report_section(when, "stderr", err)
  145. # Hooks
  146. @pytest.hookimpl(hookwrapper=True)
  147. def pytest_make_collect_report(self, collector):
  148. if isinstance(collector, pytest.File):
  149. self.resume_global_capture()
  150. outcome = yield
  151. self.suspend_global_capture()
  152. out, err = self.read_global_capture()
  153. rep = outcome.get_result()
  154. if out:
  155. rep.sections.append(("Captured stdout", out))
  156. if err:
  157. rep.sections.append(("Captured stderr", err))
  158. else:
  159. yield
  160. @pytest.hookimpl(hookwrapper=True)
  161. def pytest_runtest_protocol(self, item):
  162. self._current_item = item
  163. yield
  164. self._current_item = None
  165. @pytest.hookimpl(hookwrapper=True)
  166. def pytest_runtest_setup(self, item):
  167. with self.item_capture("setup", item):
  168. yield
  169. @pytest.hookimpl(hookwrapper=True)
  170. def pytest_runtest_call(self, item):
  171. with self.item_capture("call", item):
  172. yield
  173. @pytest.hookimpl(hookwrapper=True)
  174. def pytest_runtest_teardown(self, item):
  175. with self.item_capture("teardown", item):
  176. yield
  177. @pytest.hookimpl(tryfirst=True)
  178. def pytest_keyboard_interrupt(self, excinfo):
  179. self.stop_global_capturing()
  180. @pytest.hookimpl(tryfirst=True)
  181. def pytest_internalerror(self, excinfo):
  182. self.stop_global_capturing()
  183. capture_fixtures = {"capfd", "capfdbinary", "capsys", "capsysbinary"}
  184. def _ensure_only_one_capture_fixture(request, name):
  185. fixtures = set(request.fixturenames) & capture_fixtures - {name}
  186. if fixtures:
  187. fixtures = sorted(fixtures)
  188. fixtures = fixtures[0] if len(fixtures) == 1 else fixtures
  189. raise request.raiseerror(
  190. "cannot use {} and {} at the same time".format(fixtures, name)
  191. )
  192. @pytest.fixture
  193. def capsys(request):
  194. """Enable capturing of writes to ``sys.stdout`` and ``sys.stderr`` and make
  195. captured output available via ``capsys.readouterr()`` method calls
  196. which return a ``(out, err)`` namedtuple. ``out`` and ``err`` will be ``text``
  197. objects.
  198. """
  199. _ensure_only_one_capture_fixture(request, "capsys")
  200. with _install_capture_fixture_on_item(request, SysCapture) as fixture:
  201. yield fixture
  202. @pytest.fixture
  203. def capsysbinary(request):
  204. """Enable capturing of writes to ``sys.stdout`` and ``sys.stderr`` and make
  205. captured output available via ``capsys.readouterr()`` method calls
  206. which return a ``(out, err)`` tuple. ``out`` and ``err`` will be ``bytes``
  207. objects.
  208. """
  209. _ensure_only_one_capture_fixture(request, "capsysbinary")
  210. # Currently, the implementation uses the python3 specific `.buffer`
  211. # property of CaptureIO.
  212. if sys.version_info < (3,):
  213. raise request.raiseerror("capsysbinary is only supported on python 3")
  214. with _install_capture_fixture_on_item(request, SysCaptureBinary) as fixture:
  215. yield fixture
  216. @pytest.fixture
  217. def capfd(request):
  218. """Enable capturing of writes to file descriptors ``1`` and ``2`` and make
  219. captured output available via ``capfd.readouterr()`` method calls
  220. which return a ``(out, err)`` tuple. ``out`` and ``err`` will be ``text``
  221. objects.
  222. """
  223. _ensure_only_one_capture_fixture(request, "capfd")
  224. if not hasattr(os, "dup"):
  225. pytest.skip(
  226. "capfd fixture needs os.dup function which is not available in this system"
  227. )
  228. with _install_capture_fixture_on_item(request, FDCapture) as fixture:
  229. yield fixture
  230. @pytest.fixture
  231. def capfdbinary(request):
  232. """Enable capturing of write to file descriptors 1 and 2 and make
  233. captured output available via ``capfdbinary.readouterr`` method calls
  234. which return a ``(out, err)`` tuple. ``out`` and ``err`` will be
  235. ``bytes`` objects.
  236. """
  237. _ensure_only_one_capture_fixture(request, "capfdbinary")
  238. if not hasattr(os, "dup"):
  239. pytest.skip(
  240. "capfdbinary fixture needs os.dup function which is not available in this system"
  241. )
  242. with _install_capture_fixture_on_item(request, FDCaptureBinary) as fixture:
  243. yield fixture
  244. @contextlib.contextmanager
  245. def _install_capture_fixture_on_item(request, capture_class):
  246. """
  247. Context manager which creates a ``CaptureFixture`` instance and "installs" it on
  248. the item/node of the given request. Used by ``capsys`` and ``capfd``.
  249. The CaptureFixture is added as attribute of the item because it needs to accessed
  250. by ``CaptureManager`` during its ``pytest_runtest_*`` hooks.
  251. """
  252. request.node._capture_fixture = fixture = CaptureFixture(capture_class, request)
  253. capmanager = request.config.pluginmanager.getplugin("capturemanager")
  254. # need to active this fixture right away in case it is being used by another fixture (setup phase)
  255. # if this fixture is being used only by a test function (call phase), then we wouldn't need this
  256. # activation, but it doesn't hurt
  257. capmanager.activate_fixture(request.node)
  258. yield fixture
  259. fixture.close()
  260. del request.node._capture_fixture
  261. class CaptureFixture(object):
  262. """
  263. Object returned by :py:func:`capsys`, :py:func:`capsysbinary`, :py:func:`capfd` and :py:func:`capfdbinary`
  264. fixtures.
  265. """
  266. def __init__(self, captureclass, request):
  267. self.captureclass = captureclass
  268. self.request = request
  269. self._capture = None
  270. self._captured_out = self.captureclass.EMPTY_BUFFER
  271. self._captured_err = self.captureclass.EMPTY_BUFFER
  272. def _start(self):
  273. # Start if not started yet
  274. if getattr(self, "_capture", None) is None:
  275. self._capture = MultiCapture(
  276. out=True, err=True, in_=False, Capture=self.captureclass
  277. )
  278. self._capture.start_capturing()
  279. def close(self):
  280. if self._capture is not None:
  281. out, err = self._capture.pop_outerr_to_orig()
  282. self._captured_out += out
  283. self._captured_err += err
  284. self._capture.stop_capturing()
  285. self._capture = None
  286. def readouterr(self):
  287. """Read and return the captured output so far, resetting the internal buffer.
  288. :return: captured content as a namedtuple with ``out`` and ``err`` string attributes
  289. """
  290. captured_out, captured_err = self._captured_out, self._captured_err
  291. if self._capture is not None:
  292. out, err = self._capture.readouterr()
  293. captured_out += out
  294. captured_err += err
  295. self._captured_out = self.captureclass.EMPTY_BUFFER
  296. self._captured_err = self.captureclass.EMPTY_BUFFER
  297. return CaptureResult(captured_out, captured_err)
  298. def _suspend(self):
  299. """Suspends this fixture's own capturing temporarily."""
  300. self._capture.suspend_capturing()
  301. def _resume(self):
  302. """Resumes this fixture's own capturing temporarily."""
  303. self._capture.resume_capturing()
  304. @contextlib.contextmanager
  305. def disabled(self):
  306. """Temporarily disables capture while inside the 'with' block."""
  307. capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
  308. with capmanager.global_and_fixture_disabled():
  309. yield
  310. def safe_text_dupfile(f, mode, default_encoding="UTF8"):
  311. """ return an open text file object that's a duplicate of f on the
  312. FD-level if possible.
  313. """
  314. encoding = getattr(f, "encoding", None)
  315. try:
  316. fd = f.fileno()
  317. except Exception:
  318. if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
  319. # we seem to have a text stream, let's just use it
  320. return f
  321. else:
  322. newfd = os.dup(fd)
  323. if "b" not in mode:
  324. mode += "b"
  325. f = os.fdopen(newfd, mode, 0) # no buffering
  326. return EncodedFile(f, encoding or default_encoding)
  327. class EncodedFile(object):
  328. errors = "strict" # possibly needed by py3 code (issue555)
  329. def __init__(self, buffer, encoding):
  330. self.buffer = buffer
  331. self.encoding = encoding
  332. def write(self, obj):
  333. if isinstance(obj, six.text_type):
  334. obj = obj.encode(self.encoding, "replace")
  335. self.buffer.write(obj)
  336. def writelines(self, linelist):
  337. data = "".join(linelist)
  338. self.write(data)
  339. @property
  340. def name(self):
  341. """Ensure that file.name is a string."""
  342. return repr(self.buffer)
  343. def __getattr__(self, name):
  344. return getattr(object.__getattribute__(self, "buffer"), name)
  345. CaptureResult = collections.namedtuple("CaptureResult", ["out", "err"])
  346. class MultiCapture(object):
  347. out = err = in_ = None
  348. def __init__(self, out=True, err=True, in_=True, Capture=None):
  349. if in_:
  350. self.in_ = Capture(0)
  351. if out:
  352. self.out = Capture(1)
  353. if err:
  354. self.err = Capture(2)
  355. def start_capturing(self):
  356. if self.in_:
  357. self.in_.start()
  358. if self.out:
  359. self.out.start()
  360. if self.err:
  361. self.err.start()
  362. def pop_outerr_to_orig(self):
  363. """ pop current snapshot out/err capture and flush to orig streams. """
  364. out, err = self.readouterr()
  365. if out:
  366. self.out.writeorg(out)
  367. if err:
  368. self.err.writeorg(err)
  369. return out, err
  370. def suspend_capturing(self, in_=False):
  371. if self.out:
  372. self.out.suspend()
  373. if self.err:
  374. self.err.suspend()
  375. if in_ and self.in_:
  376. self.in_.suspend()
  377. self._in_suspended = True
  378. def resume_capturing(self):
  379. if self.out:
  380. self.out.resume()
  381. if self.err:
  382. self.err.resume()
  383. if hasattr(self, "_in_suspended"):
  384. self.in_.resume()
  385. del self._in_suspended
  386. def stop_capturing(self):
  387. """ stop capturing and reset capturing streams """
  388. if hasattr(self, "_reset"):
  389. raise ValueError("was already stopped")
  390. self._reset = True
  391. if self.out:
  392. self.out.done()
  393. if self.err:
  394. self.err.done()
  395. if self.in_:
  396. self.in_.done()
  397. def readouterr(self):
  398. """ return snapshot unicode value of stdout/stderr capturings. """
  399. return CaptureResult(
  400. self.out.snap() if self.out is not None else "",
  401. self.err.snap() if self.err is not None else "",
  402. )
  403. class NoCapture(object):
  404. EMPTY_BUFFER = None
  405. __init__ = start = done = suspend = resume = lambda *args: None
  406. class FDCaptureBinary(object):
  407. """Capture IO to/from a given os-level filedescriptor.
  408. snap() produces `bytes`
  409. """
  410. EMPTY_BUFFER = bytes()
  411. def __init__(self, targetfd, tmpfile=None):
  412. self.targetfd = targetfd
  413. try:
  414. self.targetfd_save = os.dup(self.targetfd)
  415. except OSError:
  416. self.start = lambda: None
  417. self.done = lambda: None
  418. else:
  419. if targetfd == 0:
  420. assert not tmpfile, "cannot set tmpfile with stdin"
  421. tmpfile = open(os.devnull, "r")
  422. self.syscapture = SysCapture(targetfd)
  423. else:
  424. if tmpfile is None:
  425. f = TemporaryFile()
  426. with f:
  427. tmpfile = safe_text_dupfile(f, mode="wb+")
  428. if targetfd in patchsysdict:
  429. self.syscapture = SysCapture(targetfd, tmpfile)
  430. else:
  431. self.syscapture = NoCapture()
  432. self.tmpfile = tmpfile
  433. self.tmpfile_fd = tmpfile.fileno()
  434. def __repr__(self):
  435. return "<FDCapture %s oldfd=%s>" % (self.targetfd, self.targetfd_save)
  436. def start(self):
  437. """ Start capturing on targetfd using memorized tmpfile. """
  438. try:
  439. os.fstat(self.targetfd_save)
  440. except (AttributeError, OSError):
  441. raise ValueError("saved filedescriptor not valid anymore")
  442. os.dup2(self.tmpfile_fd, self.targetfd)
  443. self.syscapture.start()
  444. def snap(self):
  445. self.tmpfile.seek(0)
  446. res = self.tmpfile.read()
  447. self.tmpfile.seek(0)
  448. self.tmpfile.truncate()
  449. return res
  450. def done(self):
  451. """ stop capturing, restore streams, return original capture file,
  452. seeked to position zero. """
  453. targetfd_save = self.__dict__.pop("targetfd_save")
  454. os.dup2(targetfd_save, self.targetfd)
  455. os.close(targetfd_save)
  456. self.syscapture.done()
  457. _attempt_to_close_capture_file(self.tmpfile)
  458. def suspend(self):
  459. self.syscapture.suspend()
  460. os.dup2(self.targetfd_save, self.targetfd)
  461. def resume(self):
  462. self.syscapture.resume()
  463. os.dup2(self.tmpfile_fd, self.targetfd)
  464. def writeorg(self, data):
  465. """ write to original file descriptor. """
  466. if isinstance(data, six.text_type):
  467. data = data.encode("utf8") # XXX use encoding of original stream
  468. os.write(self.targetfd_save, data)
  469. class FDCapture(FDCaptureBinary):
  470. """Capture IO to/from a given os-level filedescriptor.
  471. snap() produces text
  472. """
  473. EMPTY_BUFFER = str()
  474. def snap(self):
  475. res = FDCaptureBinary.snap(self)
  476. enc = getattr(self.tmpfile, "encoding", None)
  477. if enc and isinstance(res, bytes):
  478. res = six.text_type(res, enc, "replace")
  479. return res
  480. class SysCapture(object):
  481. EMPTY_BUFFER = str()
  482. def __init__(self, fd, tmpfile=None):
  483. name = patchsysdict[fd]
  484. self._old = getattr(sys, name)
  485. self.name = name
  486. if tmpfile is None:
  487. if name == "stdin":
  488. tmpfile = DontReadFromInput()
  489. else:
  490. tmpfile = CaptureIO()
  491. self.tmpfile = tmpfile
  492. def start(self):
  493. setattr(sys, self.name, self.tmpfile)
  494. def snap(self):
  495. res = self.tmpfile.getvalue()
  496. self.tmpfile.seek(0)
  497. self.tmpfile.truncate()
  498. return res
  499. def done(self):
  500. setattr(sys, self.name, self._old)
  501. del self._old
  502. _attempt_to_close_capture_file(self.tmpfile)
  503. def suspend(self):
  504. setattr(sys, self.name, self._old)
  505. def resume(self):
  506. setattr(sys, self.name, self.tmpfile)
  507. def writeorg(self, data):
  508. self._old.write(data)
  509. self._old.flush()
  510. class SysCaptureBinary(SysCapture):
  511. EMPTY_BUFFER = bytes()
  512. def snap(self):
  513. res = self.tmpfile.buffer.getvalue()
  514. self.tmpfile.seek(0)
  515. self.tmpfile.truncate()
  516. return res
  517. class DontReadFromInput(six.Iterator):
  518. """Temporary stub class. Ideally when stdin is accessed, the
  519. capturing should be turned off, with possibly all data captured
  520. so far sent to the screen. This should be configurable, though,
  521. because in automated test runs it is better to crash than
  522. hang indefinitely.
  523. """
  524. encoding = None
  525. def read(self, *args):
  526. raise IOError("reading from stdin while output is captured")
  527. readline = read
  528. readlines = read
  529. __next__ = read
  530. def __iter__(self):
  531. return self
  532. def fileno(self):
  533. raise UnsupportedOperation("redirected stdin is pseudofile, " "has no fileno()")
  534. def isatty(self):
  535. return False
  536. def close(self):
  537. pass
  538. @property
  539. def buffer(self):
  540. if sys.version_info >= (3, 0):
  541. return self
  542. else:
  543. raise AttributeError("redirected stdin has no attribute buffer")
  544. def _colorama_workaround():
  545. """
  546. Ensure colorama is imported so that it attaches to the correct stdio
  547. handles on Windows.
  548. colorama uses the terminal on import time. So if something does the
  549. first import of colorama while I/O capture is active, colorama will
  550. fail in various ways.
  551. """
  552. if not sys.platform.startswith("win32"):
  553. return
  554. try:
  555. import colorama # noqa
  556. except ImportError:
  557. pass
  558. def _readline_workaround():
  559. """
  560. Ensure readline is imported so that it attaches to the correct stdio
  561. handles on Windows.
  562. Pdb uses readline support where available--when not running from the Python
  563. prompt, the readline module is not imported until running the pdb REPL. If
  564. running pytest with the --pdb option this means the readline module is not
  565. imported until after I/O capture has been started.
  566. This is a problem for pyreadline, which is often used to implement readline
  567. support on Windows, as it does not attach to the correct handles for stdout
  568. and/or stdin if they have been redirected by the FDCapture mechanism. This
  569. workaround ensures that readline is imported before I/O capture is setup so
  570. that it can attach to the actual stdin/out for the console.
  571. See https://github.com/pytest-dev/pytest/pull/1281
  572. """
  573. if not sys.platform.startswith("win32"):
  574. return
  575. try:
  576. import readline # noqa
  577. except ImportError:
  578. pass
  579. def _py36_windowsconsoleio_workaround(stream):
  580. """
  581. Python 3.6 implemented unicode console handling for Windows. This works
  582. by reading/writing to the raw console handle using
  583. ``{Read,Write}ConsoleW``.
  584. The problem is that we are going to ``dup2`` over the stdio file
  585. descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the
  586. handles used by Python to write to the console. Though there is still some
  587. weirdness and the console handle seems to only be closed randomly and not
  588. on the first call to ``CloseHandle``, or maybe it gets reopened with the
  589. same handle value when we suspend capturing.
  590. The workaround in this case will reopen stdio with a different fd which
  591. also means a different handle by replicating the logic in
  592. "Py_lifecycle.c:initstdio/create_stdio".
  593. :param stream: in practice ``sys.stdout`` or ``sys.stderr``, but given
  594. here as parameter for unittesting purposes.
  595. See https://github.com/pytest-dev/py/issues/103
  596. """
  597. if not sys.platform.startswith("win32") or sys.version_info[:2] < (3, 6):
  598. return
  599. # bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666)
  600. if not hasattr(stream, "buffer"):
  601. return
  602. buffered = hasattr(stream.buffer, "raw")
  603. raw_stdout = stream.buffer.raw if buffered else stream.buffer
  604. if not isinstance(raw_stdout, io._WindowsConsoleIO):
  605. return
  606. def _reopen_stdio(f, mode):
  607. if not buffered and mode[0] == "w":
  608. buffering = 0
  609. else:
  610. buffering = -1
  611. return io.TextIOWrapper(
  612. open(os.dup(f.fileno()), mode, buffering),
  613. f.encoding,
  614. f.errors,
  615. f.newlines,
  616. f.line_buffering,
  617. )
  618. sys.__stdin__ = sys.stdin = _reopen_stdio(sys.stdin, "rb")
  619. sys.__stdout__ = sys.stdout = _reopen_stdio(sys.stdout, "wb")
  620. sys.__stderr__ = sys.stderr = _reopen_stdio(sys.stderr, "wb")
  621. def _attempt_to_close_capture_file(f):
  622. """Suppress IOError when closing the temporary file used for capturing streams in py27 (#2370)"""
  623. if six.PY2:
  624. try:
  625. f.close()
  626. except IOError:
  627. pass
  628. else:
  629. f.close()