dsession.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. import py
  2. import pytest
  3. from xdist.workermanage import NodeManager
  4. from xdist.scheduler import (
  5. EachScheduling,
  6. LoadScheduling,
  7. LoadScopeScheduling,
  8. LoadFileScheduling,
  9. )
  10. from six.moves.queue import Empty, Queue
  11. class Interrupted(KeyboardInterrupt):
  12. """ signals an immediate interruption. """
  13. class DSession(object):
  14. """A pytest plugin which runs a distributed test session
  15. At the beginning of the test session this creates a NodeManager
  16. instance which creates and starts all nodes. Nodes then emit
  17. events processed in the pytest_runtestloop hook using the worker_*
  18. methods.
  19. Once a node is started it will automatically start running the
  20. pytest mainloop with some custom hooks. This means a node
  21. automatically starts collecting tests. Once tests are collected
  22. it will wait for instructions.
  23. """
  24. def __init__(self, config):
  25. self.config = config
  26. self.log = py.log.Producer("dsession")
  27. if not config.option.debug:
  28. py.log.setconsumer(self.log._keywords, None)
  29. self.nodemanager = None
  30. self.sched = None
  31. self.shuttingdown = False
  32. self.countfailures = 0
  33. self.maxfail = config.getvalue("maxfail")
  34. self.queue = Queue()
  35. self._session = None
  36. self._failed_collection_errors = {}
  37. self._active_nodes = set()
  38. self._failed_nodes_count = 0
  39. self._max_worker_restart = self.config.option.maxworkerrestart
  40. if self._max_worker_restart is not None:
  41. self._max_worker_restart = int(self._max_worker_restart)
  42. try:
  43. self.terminal = config.pluginmanager.getplugin("terminalreporter")
  44. except KeyError:
  45. self.terminal = None
  46. else:
  47. self.trdist = TerminalDistReporter(config)
  48. config.pluginmanager.register(self.trdist, "terminaldistreporter")
  49. @property
  50. def session_finished(self):
  51. """Return True if the distributed session has finished
  52. This means all nodes have executed all test items. This is
  53. used by pytest_runtestloop to break out of its loop.
  54. """
  55. return bool(self.shuttingdown and not self._active_nodes)
  56. def report_line(self, line):
  57. if self.terminal and self.config.option.verbose >= 0:
  58. self.terminal.write_line(line)
  59. @pytest.mark.trylast
  60. def pytest_sessionstart(self, session):
  61. """Creates and starts the nodes.
  62. The nodes are setup to put their events onto self.queue. As
  63. soon as nodes start they will emit the worker_workerready event.
  64. """
  65. self.nodemanager = NodeManager(self.config)
  66. nodes = self.nodemanager.setup_nodes(putevent=self.queue.put)
  67. self._active_nodes.update(nodes)
  68. self._session = session
  69. def pytest_sessionfinish(self, session):
  70. """Shutdown all nodes."""
  71. nm = getattr(self, "nodemanager", None) # if not fully initialized
  72. if nm is not None:
  73. nm.teardown_nodes()
  74. self._session = None
  75. def pytest_collection(self):
  76. # prohibit collection of test items in master process
  77. return True
  78. @pytest.mark.trylast
  79. def pytest_xdist_make_scheduler(self, config, log):
  80. dist = config.getvalue("dist")
  81. schedulers = {
  82. "each": EachScheduling,
  83. "load": LoadScheduling,
  84. "loadscope": LoadScopeScheduling,
  85. "loadfile": LoadFileScheduling,
  86. }
  87. return schedulers[dist](config, log)
  88. def pytest_runtestloop(self):
  89. self.sched = self.config.hook.pytest_xdist_make_scheduler(
  90. config=self.config, log=self.log
  91. )
  92. assert self.sched is not None
  93. self.shouldstop = False
  94. while not self.session_finished:
  95. self.loop_once()
  96. if self.shouldstop:
  97. self.triggershutdown()
  98. raise Interrupted(str(self.shouldstop))
  99. return True
  100. def loop_once(self):
  101. """Process one callback from one of the workers."""
  102. while 1:
  103. if not self._active_nodes:
  104. # If everything has died stop looping
  105. self.triggershutdown()
  106. raise RuntimeError("Unexpectedly no active workers available")
  107. try:
  108. eventcall = self.queue.get(timeout=2.0)
  109. break
  110. except Empty:
  111. continue
  112. callname, kwargs = eventcall
  113. assert callname, kwargs
  114. method = "worker_" + callname
  115. call = getattr(self, method)
  116. self.log("calling method", method, kwargs)
  117. call(**kwargs)
  118. if self.sched.tests_finished:
  119. self.triggershutdown()
  120. #
  121. # callbacks for processing events from workers
  122. #
  123. def worker_workerready(self, node, workerinfo):
  124. """Emitted when a node first starts up.
  125. This adds the node to the scheduler, nodes continue with
  126. collection without any further input.
  127. """
  128. node.workerinfo = workerinfo
  129. node.workerinfo["id"] = node.gateway.id
  130. node.workerinfo["spec"] = node.gateway.spec
  131. # TODO: (#234 task) needs this for pytest. Remove when refactor in pytest repo
  132. node.slaveinfo = node.workerinfo
  133. self.config.hook.pytest_testnodeready(node=node)
  134. if self.shuttingdown:
  135. node.shutdown()
  136. else:
  137. self.sched.add_node(node)
  138. def worker_workerfinished(self, node):
  139. """Emitted when node executes its pytest_sessionfinish hook.
  140. Removes the node from the scheduler.
  141. The node might not be in the scheduler if it had not emitted
  142. workerready before shutdown was triggered.
  143. """
  144. self.config.hook.pytest_testnodedown(node=node, error=None)
  145. if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt
  146. self.shouldstop = "%s received keyboard-interrupt" % (node,)
  147. self.worker_errordown(node, "keyboard-interrupt")
  148. return
  149. if node in self.sched.nodes:
  150. crashitem = self.sched.remove_node(node)
  151. assert not crashitem, (crashitem, node)
  152. self._active_nodes.remove(node)
  153. def worker_errordown(self, node, error):
  154. """Emitted by the WorkerController when a node dies."""
  155. self.config.hook.pytest_testnodedown(node=node, error=error)
  156. try:
  157. crashitem = self.sched.remove_node(node)
  158. except KeyError:
  159. pass
  160. else:
  161. if crashitem:
  162. self.handle_crashitem(crashitem, node)
  163. self._failed_nodes_count += 1
  164. maximum_reached = (
  165. self._max_worker_restart is not None
  166. and self._failed_nodes_count > self._max_worker_restart
  167. )
  168. if maximum_reached:
  169. if self._max_worker_restart == 0:
  170. msg = "Worker restarting disabled"
  171. else:
  172. msg = "Maximum crashed workers reached: %d" % self._max_worker_restart
  173. self.report_line(msg)
  174. else:
  175. self.report_line("Replacing crashed worker %s" % node.gateway.id)
  176. self._clone_node(node)
  177. self._active_nodes.remove(node)
  178. def worker_collectionfinish(self, node, ids):
  179. """worker has finished test collection.
  180. This adds the collection for this node to the scheduler. If
  181. the scheduler indicates collection is finished (i.e. all
  182. initial nodes have submitted their collections), then tells the
  183. scheduler to schedule the collected items. When initiating
  184. scheduling the first time it logs which scheduler is in use.
  185. """
  186. if self.shuttingdown:
  187. return
  188. self.config.hook.pytest_xdist_node_collection_finished(node=node, ids=ids)
  189. # tell session which items were effectively collected otherwise
  190. # the master node will finish the session with EXIT_NOTESTSCOLLECTED
  191. self._session.testscollected = len(ids)
  192. self.sched.add_node_collection(node, ids)
  193. if self.terminal:
  194. self.trdist.setstatus(node.gateway.spec, "[%d]" % (len(ids)))
  195. if self.sched.collection_is_completed:
  196. if self.terminal and not self.sched.has_pending:
  197. self.trdist.ensure_show_status()
  198. self.terminal.write_line("")
  199. self.terminal.write_line(
  200. "scheduling tests via %s" % (self.sched.__class__.__name__)
  201. )
  202. self.sched.schedule()
  203. def worker_logstart(self, node, nodeid, location):
  204. """Emitted when a node calls the pytest_runtest_logstart hook."""
  205. self.config.hook.pytest_runtest_logstart(nodeid=nodeid, location=location)
  206. def worker_logfinish(self, node, nodeid, location):
  207. """Emitted when a node calls the pytest_runtest_logfinish hook."""
  208. self.config.hook.pytest_runtest_logfinish(nodeid=nodeid, location=location)
  209. def worker_testreport(self, node, rep):
  210. """Emitted when a node calls the pytest_runtest_logreport hook."""
  211. rep.node = node
  212. self.config.hook.pytest_runtest_logreport(report=rep)
  213. self._handlefailures(rep)
  214. def worker_runtest_protocol_complete(self, node, item_index, duration):
  215. """
  216. Emitted when a node fires the 'runtest_protocol_complete' event,
  217. signalling that a test has completed the runtestprotocol and should be
  218. removed from the pending list in the scheduler.
  219. """
  220. self.sched.mark_test_complete(node, item_index, duration)
  221. def worker_collectreport(self, node, rep):
  222. """Emitted when a node calls the pytest_collectreport hook.
  223. Because we only need the report when there's a failure, as optimization
  224. we only expect to receive failed reports from workers (#330).
  225. """
  226. assert rep.failed
  227. self._failed_worker_collectreport(node, rep)
  228. def worker_logwarning(self, message, code, nodeid, fslocation):
  229. """Emitted when a node calls the pytest_logwarning hook."""
  230. kwargs = dict(message=message, code=code, nodeid=nodeid, fslocation=fslocation)
  231. self.config.hook.pytest_logwarning.call_historic(kwargs=kwargs)
  232. def _clone_node(self, node):
  233. """Return new node based on an existing one.
  234. This is normally for when a node dies, this will copy the spec
  235. of the existing node and create a new one with a new id. The
  236. new node will have been setup so it will start calling the
  237. "worker_*" hooks and do work soon.
  238. """
  239. spec = node.gateway.spec
  240. spec.id = None
  241. self.nodemanager.group.allocate_id(spec)
  242. node = self.nodemanager.setup_node(spec, self.queue.put)
  243. self._active_nodes.add(node)
  244. return node
  245. def _failed_worker_collectreport(self, node, rep):
  246. # Check we haven't already seen this report (from
  247. # another worker).
  248. if rep.longrepr not in self._failed_collection_errors:
  249. self._failed_collection_errors[rep.longrepr] = True
  250. self.config.hook.pytest_collectreport(report=rep)
  251. self._handlefailures(rep)
  252. def _handlefailures(self, rep):
  253. if rep.failed:
  254. self.countfailures += 1
  255. if self.maxfail and self.countfailures >= self.maxfail:
  256. self.shouldstop = "stopping after %d failures" % (self.countfailures)
  257. def triggershutdown(self):
  258. self.log("triggering shutdown")
  259. self.shuttingdown = True
  260. for node in self.sched.nodes:
  261. node.shutdown()
  262. def handle_crashitem(self, nodeid, worker):
  263. # XXX get more reporting info by recording pytest_runtest_logstart?
  264. # XXX count no of failures and retry N times
  265. runner = self.config.pluginmanager.getplugin("runner")
  266. fspath = nodeid.split("::")[0]
  267. msg = "Worker %r crashed while running %r" % (worker.gateway.id, nodeid)
  268. rep = runner.TestReport(
  269. nodeid, (fspath, None, fspath), (), "failed", msg, "???"
  270. )
  271. rep.node = worker
  272. self.config.hook.pytest_runtest_logreport(report=rep)
  273. class TerminalDistReporter(object):
  274. def __init__(self, config):
  275. self.config = config
  276. self.tr = config.pluginmanager.getplugin("terminalreporter")
  277. self._status = {}
  278. self._lastlen = 0
  279. self._isatty = getattr(self.tr, "isatty", self.tr.hasmarkup)
  280. def write_line(self, msg):
  281. self.tr.write_line(msg)
  282. def ensure_show_status(self):
  283. if not self._isatty:
  284. self.write_line(self.getstatus())
  285. def setstatus(self, spec, status, show=True):
  286. self._status[spec.id] = status
  287. if show and self._isatty:
  288. self.rewrite(self.getstatus())
  289. def getstatus(self):
  290. parts = ["%s %s" % (spec.id, self._status[spec.id]) for spec in self._specs]
  291. return " / ".join(parts)
  292. def rewrite(self, line, newline=False):
  293. pline = line + " " * max(self._lastlen - len(line), 0)
  294. if newline:
  295. self._lastlen = 0
  296. pline += "\n"
  297. else:
  298. self._lastlen = len(line)
  299. self.tr.rewrite(pline, bold=True)
  300. def pytest_xdist_setupnodes(self, specs):
  301. self._specs = specs
  302. for spec in specs:
  303. self.setstatus(spec, "I", show=False)
  304. self.setstatus(spec, "I", show=True)
  305. self.ensure_show_status()
  306. def pytest_xdist_newgateway(self, gateway):
  307. if self.config.option.verbose > 0:
  308. rinfo = gateway._rinfo()
  309. version = "%s.%s.%s" % rinfo.version_info[:3]
  310. self.rewrite(
  311. "[%s] %s Python %s cwd: %s"
  312. % (gateway.id, rinfo.platform, version, rinfo.cwd),
  313. newline=True,
  314. )
  315. self.setstatus(gateway.spec, "C")
  316. def pytest_testnodeready(self, node):
  317. if self.config.option.verbose > 0:
  318. d = node.workerinfo
  319. infoline = "[%s] Python %s" % (d["id"], d["version"].replace("\n", " -- "))
  320. self.rewrite(infoline, newline=True)
  321. self.setstatus(node.gateway.spec, "ok")
  322. def pytest_testnodedown(self, node, error):
  323. if not error:
  324. return
  325. self.write_line("[%s] node down: %s" % (node.gateway.id, error))
  326. # def pytest_xdist_rsyncstart(self, source, gateways):
  327. # targets = ",".join([gw.id for gw in gateways])
  328. # msg = "[%s] rsyncing: %s" %(targets, source)
  329. # self.write_line(msg)
  330. # def pytest_xdist_rsyncfinish(self, source, gateways):
  331. # targets = ", ".join(["[%s]" % gw.id for gw in gateways])
  332. # self.write_line("rsyncfinish: %s -> %s" %(source, targets))