runner.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. """ basic collect and runtest protocol implementations """
  2. from __future__ import absolute_import, division, print_function
  3. import bdb
  4. import os
  5. import sys
  6. from time import time
  7. import six
  8. from _pytest._code.code import ExceptionInfo
  9. from _pytest.outcomes import skip, Skipped, TEST_OUTCOME
  10. from .reports import TestReport, CollectReport, CollectErrorRepr
  11. #
  12. # pytest plugin hooks
  13. def pytest_addoption(parser):
  14. group = parser.getgroup("terminal reporting", "reporting", after="general")
  15. group.addoption(
  16. "--durations",
  17. action="store",
  18. type=int,
  19. default=None,
  20. metavar="N",
  21. help="show N slowest setup/test durations (N=0 for all).",
  22. ),
  23. def pytest_terminal_summary(terminalreporter):
  24. durations = terminalreporter.config.option.durations
  25. if durations is None:
  26. return
  27. tr = terminalreporter
  28. dlist = []
  29. for replist in tr.stats.values():
  30. for rep in replist:
  31. if hasattr(rep, "duration"):
  32. dlist.append(rep)
  33. if not dlist:
  34. return
  35. dlist.sort(key=lambda x: x.duration)
  36. dlist.reverse()
  37. if not durations:
  38. tr.write_sep("=", "slowest test durations")
  39. else:
  40. tr.write_sep("=", "slowest %s test durations" % durations)
  41. dlist = dlist[:durations]
  42. for rep in dlist:
  43. nodeid = rep.nodeid.replace("::()::", "::")
  44. tr.write_line("%02.2fs %-8s %s" % (rep.duration, rep.when, nodeid))
  45. def pytest_sessionstart(session):
  46. session._setupstate = SetupState()
  47. def pytest_sessionfinish(session):
  48. session._setupstate.teardown_all()
  49. def pytest_runtest_protocol(item, nextitem):
  50. item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
  51. runtestprotocol(item, nextitem=nextitem)
  52. item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
  53. return True
  54. def runtestprotocol(item, log=True, nextitem=None):
  55. hasrequest = hasattr(item, "_request")
  56. if hasrequest and not item._request:
  57. item._initrequest()
  58. rep = call_and_report(item, "setup", log)
  59. reports = [rep]
  60. if rep.passed:
  61. if item.config.option.setupshow:
  62. show_test_item(item)
  63. if not item.config.option.setuponly:
  64. reports.append(call_and_report(item, "call", log))
  65. reports.append(call_and_report(item, "teardown", log, nextitem=nextitem))
  66. # after all teardown hooks have been called
  67. # want funcargs and request info to go away
  68. if hasrequest:
  69. item._request = False
  70. item.funcargs = None
  71. return reports
  72. def show_test_item(item):
  73. """Show test function, parameters and the fixtures of the test item."""
  74. tw = item.config.get_terminal_writer()
  75. tw.line()
  76. tw.write(" " * 8)
  77. tw.write(item._nodeid)
  78. used_fixtures = sorted(item._fixtureinfo.name2fixturedefs.keys())
  79. if used_fixtures:
  80. tw.write(" (fixtures used: {})".format(", ".join(used_fixtures)))
  81. def pytest_runtest_setup(item):
  82. _update_current_test_var(item, "setup")
  83. item.session._setupstate.prepare(item)
  84. def pytest_runtest_call(item):
  85. _update_current_test_var(item, "call")
  86. sys.last_type, sys.last_value, sys.last_traceback = (None, None, None)
  87. try:
  88. item.runtest()
  89. except Exception:
  90. # Store trace info to allow postmortem debugging
  91. type, value, tb = sys.exc_info()
  92. tb = tb.tb_next # Skip *this* frame
  93. sys.last_type = type
  94. sys.last_value = value
  95. sys.last_traceback = tb
  96. del type, value, tb # Get rid of these in this frame
  97. raise
  98. def pytest_runtest_teardown(item, nextitem):
  99. _update_current_test_var(item, "teardown")
  100. item.session._setupstate.teardown_exact(item, nextitem)
  101. _update_current_test_var(item, None)
  102. def _update_current_test_var(item, when):
  103. """
  104. Update PYTEST_CURRENT_TEST to reflect the current item and stage.
  105. If ``when`` is None, delete PYTEST_CURRENT_TEST from the environment.
  106. """
  107. var_name = "PYTEST_CURRENT_TEST"
  108. if when:
  109. value = "{} ({})".format(item.nodeid, when)
  110. # don't allow null bytes on environment variables (see #2644, #2957)
  111. value = value.replace("\x00", "(null)")
  112. os.environ[var_name] = value
  113. else:
  114. os.environ.pop(var_name)
  115. def pytest_report_teststatus(report):
  116. if report.when in ("setup", "teardown"):
  117. if report.failed:
  118. # category, shortletter, verbose-word
  119. return "error", "E", "ERROR"
  120. elif report.skipped:
  121. return "skipped", "s", "SKIPPED"
  122. else:
  123. return "", "", ""
  124. #
  125. # Implementation
  126. def call_and_report(item, when, log=True, **kwds):
  127. call = call_runtest_hook(item, when, **kwds)
  128. hook = item.ihook
  129. report = hook.pytest_runtest_makereport(item=item, call=call)
  130. if log:
  131. hook.pytest_runtest_logreport(report=report)
  132. if check_interactive_exception(call, report):
  133. hook.pytest_exception_interact(node=item, call=call, report=report)
  134. return report
  135. def check_interactive_exception(call, report):
  136. return call.excinfo and not (
  137. hasattr(report, "wasxfail")
  138. or call.excinfo.errisinstance(skip.Exception)
  139. or call.excinfo.errisinstance(bdb.BdbQuit)
  140. )
  141. def call_runtest_hook(item, when, **kwds):
  142. hookname = "pytest_runtest_" + when
  143. ihook = getattr(item.ihook, hookname)
  144. return CallInfo(
  145. lambda: ihook(item=item, **kwds),
  146. when=when,
  147. treat_keyboard_interrupt_as_exception=item.config.getvalue("usepdb"),
  148. )
  149. class CallInfo(object):
  150. """ Result/Exception info a function invocation. """
  151. #: None or ExceptionInfo object.
  152. excinfo = None
  153. def __init__(self, func, when, treat_keyboard_interrupt_as_exception=False):
  154. #: context of invocation: one of "setup", "call",
  155. #: "teardown", "memocollect"
  156. self.when = when
  157. self.start = time()
  158. try:
  159. self.result = func()
  160. except KeyboardInterrupt:
  161. if treat_keyboard_interrupt_as_exception:
  162. self.excinfo = ExceptionInfo()
  163. else:
  164. self.stop = time()
  165. raise
  166. except: # noqa
  167. self.excinfo = ExceptionInfo()
  168. self.stop = time()
  169. def __repr__(self):
  170. if self.excinfo:
  171. status = "exception: %s" % str(self.excinfo.value)
  172. else:
  173. status = "result: %r" % (self.result,)
  174. return "<CallInfo when=%r %s>" % (self.when, status)
  175. def pytest_runtest_makereport(item, call):
  176. when = call.when
  177. duration = call.stop - call.start
  178. keywords = {x: 1 for x in item.keywords}
  179. excinfo = call.excinfo
  180. sections = []
  181. if not call.excinfo:
  182. outcome = "passed"
  183. longrepr = None
  184. else:
  185. if not isinstance(excinfo, ExceptionInfo):
  186. outcome = "failed"
  187. longrepr = excinfo
  188. elif excinfo.errisinstance(skip.Exception):
  189. outcome = "skipped"
  190. r = excinfo._getreprcrash()
  191. longrepr = (str(r.path), r.lineno, r.message)
  192. else:
  193. outcome = "failed"
  194. if call.when == "call":
  195. longrepr = item.repr_failure(excinfo)
  196. else: # exception in setup or teardown
  197. longrepr = item._repr_failure_py(
  198. excinfo, style=item.config.option.tbstyle
  199. )
  200. for rwhen, key, content in item._report_sections:
  201. sections.append(("Captured %s %s" % (key, rwhen), content))
  202. return TestReport(
  203. item.nodeid,
  204. item.location,
  205. keywords,
  206. outcome,
  207. longrepr,
  208. when,
  209. sections,
  210. duration,
  211. user_properties=item.user_properties,
  212. )
  213. def pytest_make_collect_report(collector):
  214. call = CallInfo(lambda: list(collector.collect()), "collect")
  215. longrepr = None
  216. if not call.excinfo:
  217. outcome = "passed"
  218. else:
  219. from _pytest import nose
  220. skip_exceptions = (Skipped,) + nose.get_skip_exceptions()
  221. if call.excinfo.errisinstance(skip_exceptions):
  222. outcome = "skipped"
  223. r = collector._repr_failure_py(call.excinfo, "line").reprcrash
  224. longrepr = (str(r.path), r.lineno, r.message)
  225. else:
  226. outcome = "failed"
  227. errorinfo = collector.repr_failure(call.excinfo)
  228. if not hasattr(errorinfo, "toterminal"):
  229. errorinfo = CollectErrorRepr(errorinfo)
  230. longrepr = errorinfo
  231. rep = CollectReport(
  232. collector.nodeid, outcome, longrepr, getattr(call, "result", None)
  233. )
  234. rep.call = call # see collect_one_node
  235. return rep
  236. class SetupState(object):
  237. """ shared state for setting up/tearing down test items or collectors. """
  238. def __init__(self):
  239. self.stack = []
  240. self._finalizers = {}
  241. def addfinalizer(self, finalizer, colitem):
  242. """ attach a finalizer to the given colitem.
  243. if colitem is None, this will add a finalizer that
  244. is called at the end of teardown_all().
  245. """
  246. assert colitem and not isinstance(colitem, tuple)
  247. assert callable(finalizer)
  248. # assert colitem in self.stack # some unit tests don't setup stack :/
  249. self._finalizers.setdefault(colitem, []).append(finalizer)
  250. def _pop_and_teardown(self):
  251. colitem = self.stack.pop()
  252. self._teardown_with_finalization(colitem)
  253. def _callfinalizers(self, colitem):
  254. finalizers = self._finalizers.pop(colitem, None)
  255. exc = None
  256. while finalizers:
  257. fin = finalizers.pop()
  258. try:
  259. fin()
  260. except TEST_OUTCOME:
  261. # XXX Only first exception will be seen by user,
  262. # ideally all should be reported.
  263. if exc is None:
  264. exc = sys.exc_info()
  265. if exc:
  266. six.reraise(*exc)
  267. def _teardown_with_finalization(self, colitem):
  268. self._callfinalizers(colitem)
  269. if hasattr(colitem, "teardown"):
  270. colitem.teardown()
  271. for colitem in self._finalizers:
  272. assert (
  273. colitem is None or colitem in self.stack or isinstance(colitem, tuple)
  274. )
  275. def teardown_all(self):
  276. while self.stack:
  277. self._pop_and_teardown()
  278. for key in list(self._finalizers):
  279. self._teardown_with_finalization(key)
  280. assert not self._finalizers
  281. def teardown_exact(self, item, nextitem):
  282. needed_collectors = nextitem and nextitem.listchain() or []
  283. self._teardown_towards(needed_collectors)
  284. def _teardown_towards(self, needed_collectors):
  285. exc = None
  286. while self.stack:
  287. if self.stack == needed_collectors[: len(self.stack)]:
  288. break
  289. try:
  290. self._pop_and_teardown()
  291. except TEST_OUTCOME:
  292. # XXX Only first exception will be seen by user,
  293. # ideally all should be reported.
  294. if exc is None:
  295. exc = sys.exc_info()
  296. if exc:
  297. six.reraise(*exc)
  298. def prepare(self, colitem):
  299. """ setup objects along the collector chain to the test-method
  300. and teardown previously setup objects."""
  301. needed_collectors = colitem.listchain()
  302. self._teardown_towards(needed_collectors)
  303. # check if the last collection node has raised an error
  304. for col in self.stack:
  305. if hasattr(col, "_prepare_exc"):
  306. six.reraise(*col._prepare_exc)
  307. for col in needed_collectors[len(self.stack) :]:
  308. self.stack.append(col)
  309. try:
  310. col.setup()
  311. except TEST_OUTCOME:
  312. col._prepare_exc = sys.exc_info()
  313. raise
  314. def collect_one_node(collector):
  315. ihook = collector.ihook
  316. ihook.pytest_collectstart(collector=collector)
  317. rep = ihook.pytest_make_collect_report(collector=collector)
  318. call = rep.__dict__.pop("call", None)
  319. if call and check_interactive_exception(call, rep):
  320. ihook.pytest_exception_interact(node=collector, call=call, report=rep)
  321. return rep