123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- """
- This module is executed in remote subprocesses and helps to
- control a remote testing session and relay back information.
- It assumes that 'py' is importable and does not have dependencies
- on the rest of the xdist code. This means that the xdist-plugin
- needs not to be installed in remote environments.
- """
- import sys
- import os
- import time
- import _pytest.hookspec
- import pytest
- class WorkerInteractor(object):
- def __init__(self, config, channel):
- self.config = config
- self.workerid = config.workerinput.get("workerid", "?")
- self.log = py.log.Producer("worker-%s" % self.workerid)
- if not config.option.debug:
- py.log.setconsumer(self.log._keywords, None)
- self.channel = channel
- config.pluginmanager.register(self)
- def sendevent(self, name, **kwargs):
- self.log("sending", name, kwargs)
- self.channel.send((name, kwargs))
- def pytest_internalerror(self, excrepr):
- for line in str(excrepr).split("\n"):
- self.log("IERROR>", line)
- def pytest_sessionstart(self, session):
- self.session = session
- workerinfo = getinfodict()
- self.sendevent("workerready", workerinfo=workerinfo)
- @pytest.hookimpl(hookwrapper=True)
- def pytest_sessionfinish(self, exitstatus):
- self.config.workeroutput["exitstatus"] = exitstatus
- yield
- self.sendevent("workerfinished", workeroutput=self.config.workeroutput)
- def pytest_collection(self, session):
- self.sendevent("collectionstart")
- def pytest_runtestloop(self, session):
- self.log("entering main loop")
- torun = []
- while 1:
- try:
- name, kwargs = self.channel.receive()
- except EOFError:
- return True
- self.log("received command", name, kwargs)
- if name == "runtests":
- torun.extend(kwargs["indices"])
- elif name == "runtests_all":
- torun.extend(range(len(session.items)))
- self.log("items to run:", torun)
- # only run if we have an item and a next item
- while len(torun) >= 2:
- self.run_one_test(torun)
- if name == "shutdown":
- if torun:
- self.run_one_test(torun)
- break
- return True
- def run_one_test(self, torun):
- items = self.session.items
- self.item_index = torun.pop(0)
- item = items[self.item_index]
- if torun:
- nextitem = items[torun[0]]
- else:
- nextitem = None
- start = time.time()
- self.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
- duration = time.time() - start
- self.sendevent(
- "runtest_protocol_complete", item_index=self.item_index, duration=duration
- )
- def pytest_collection_finish(self, session):
- self.sendevent(
- "collectionfinish",
- topdir=str(session.fspath),
- ids=[item.nodeid for item in session.items],
- )
- def pytest_runtest_logstart(self, nodeid, location):
- self.sendevent("logstart", nodeid=nodeid, location=location)
- # the pytest_runtest_logfinish hook was introduced in pytest 3.4
- if hasattr(_pytest.hookspec, "pytest_runtest_logfinish"):
- def pytest_runtest_logfinish(self, nodeid, location):
- self.sendevent("logfinish", nodeid=nodeid, location=location)
- def pytest_runtest_logreport(self, report):
- data = serialize_report(report)
- data["item_index"] = self.item_index
- data["worker_id"] = self.workerid
- assert self.session.items[self.item_index].nodeid == report.nodeid
- self.sendevent("testreport", data=data)
- def pytest_collectreport(self, report):
- # master only needs reports that failed, as optimization send only them instead (#330)
- if report.failed:
- data = serialize_report(report)
- self.sendevent("collectreport", data=data)
- def pytest_logwarning(self, message, code, nodeid, fslocation):
- self.sendevent(
- "logwarning",
- message=message,
- code=code,
- nodeid=nodeid,
- fslocation=str(fslocation),
- )
- def serialize_report(rep):
- def disassembled_report(rep):
- reprtraceback = rep.longrepr.reprtraceback.__dict__.copy()
- reprcrash = rep.longrepr.reprcrash.__dict__.copy()
- new_entries = []
- for entry in reprtraceback["reprentries"]:
- entry_data = {"type": type(entry).__name__, "data": entry.__dict__.copy()}
- for key, value in entry_data["data"].items():
- if hasattr(value, "__dict__"):
- entry_data["data"][key] = value.__dict__.copy()
- new_entries.append(entry_data)
- reprtraceback["reprentries"] = new_entries
- return {
- "reprcrash": reprcrash,
- "reprtraceback": reprtraceback,
- "sections": rep.longrepr.sections,
- }
- import py
- d = rep.__dict__.copy()
- if hasattr(rep.longrepr, "toterminal"):
- if hasattr(rep.longrepr, "reprtraceback") and hasattr(
- rep.longrepr, "reprcrash"
- ):
- d["longrepr"] = disassembled_report(rep)
- else:
- d["longrepr"] = str(rep.longrepr)
- else:
- d["longrepr"] = rep.longrepr
- for name in d:
- if isinstance(d[name], py.path.local):
- d[name] = str(d[name])
- elif name == "result":
- d[name] = None # for now
- return d
- def getinfodict():
- import platform
- return dict(
- version=sys.version,
- version_info=tuple(sys.version_info),
- sysplatform=sys.platform,
- platform=platform.platform(),
- executable=sys.executable,
- cwd=os.getcwd(),
- )
- def remote_initconfig(option_dict, args):
- from _pytest.config import Config
- option_dict["plugins"].append("no:terminal")
- config = Config.fromdictargs(option_dict, args)
- config.option.looponfail = False
- config.option.usepdb = False
- config.option.dist = "no"
- config.option.distload = False
- config.option.numprocesses = None
- config.args = args
- return config
- if __name__ == "__channelexec__":
- channel = channel # noqa
- workerinput, args, option_dict = channel.receive()
- importpath = os.getcwd()
- sys.path.insert(0, importpath) # XXX only for remote situations
- os.environ["PYTHONPATH"] = (
- importpath + os.pathsep + os.environ.get("PYTHONPATH", "")
- )
- os.environ["PYTEST_XDIST_WORKER"] = workerinput["workerid"]
- os.environ["PYTEST_XDIST_WORKER_COUNT"] = str(workerinput["workercount"])
- # os.environ['PYTHONPATH'] = importpath
- import py
- config = remote_initconfig(option_dict, args)
- config.workerinput = workerinput
- config.workeroutput = {}
- # TODO: deprecated name, backward compatibility only. Remove it in future
- config.slaveinput = config.workerinput
- config.slaveoutput = config.workeroutput
- interactor = WorkerInteractor(config, channel)
- config.hook.pytest_cmdline_main(config=config)
|