remote.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. """
  2. This module is executed in remote subprocesses and helps to
  3. control a remote testing session and relay back information.
  4. It assumes that 'py' is importable and does not have dependencies
  5. on the rest of the xdist code. This means that the xdist-plugin
  6. needs not to be installed in remote environments.
  7. """
  8. import sys
  9. import os
  10. import time
  11. import _pytest.hookspec
  12. import pytest
  13. class WorkerInteractor(object):
  14. def __init__(self, config, channel):
  15. self.config = config
  16. self.workerid = config.workerinput.get("workerid", "?")
  17. self.log = py.log.Producer("worker-%s" % self.workerid)
  18. if not config.option.debug:
  19. py.log.setconsumer(self.log._keywords, None)
  20. self.channel = channel
  21. config.pluginmanager.register(self)
  22. def sendevent(self, name, **kwargs):
  23. self.log("sending", name, kwargs)
  24. self.channel.send((name, kwargs))
  25. def pytest_internalerror(self, excrepr):
  26. for line in str(excrepr).split("\n"):
  27. self.log("IERROR>", line)
  28. def pytest_sessionstart(self, session):
  29. self.session = session
  30. workerinfo = getinfodict()
  31. self.sendevent("workerready", workerinfo=workerinfo)
  32. @pytest.hookimpl(hookwrapper=True)
  33. def pytest_sessionfinish(self, exitstatus):
  34. self.config.workeroutput["exitstatus"] = exitstatus
  35. yield
  36. self.sendevent("workerfinished", workeroutput=self.config.workeroutput)
  37. def pytest_collection(self, session):
  38. self.sendevent("collectionstart")
  39. def pytest_runtestloop(self, session):
  40. self.log("entering main loop")
  41. torun = []
  42. while 1:
  43. try:
  44. name, kwargs = self.channel.receive()
  45. except EOFError:
  46. return True
  47. self.log("received command", name, kwargs)
  48. if name == "runtests":
  49. torun.extend(kwargs["indices"])
  50. elif name == "runtests_all":
  51. torun.extend(range(len(session.items)))
  52. self.log("items to run:", torun)
  53. # only run if we have an item and a next item
  54. while len(torun) >= 2:
  55. self.run_one_test(torun)
  56. if name == "shutdown":
  57. if torun:
  58. self.run_one_test(torun)
  59. break
  60. return True
  61. def run_one_test(self, torun):
  62. items = self.session.items
  63. self.item_index = torun.pop(0)
  64. item = items[self.item_index]
  65. if torun:
  66. nextitem = items[torun[0]]
  67. else:
  68. nextitem = None
  69. start = time.time()
  70. self.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  71. duration = time.time() - start
  72. self.sendevent(
  73. "runtest_protocol_complete", item_index=self.item_index, duration=duration
  74. )
  75. def pytest_collection_finish(self, session):
  76. self.sendevent(
  77. "collectionfinish",
  78. topdir=str(session.fspath),
  79. ids=[item.nodeid for item in session.items],
  80. )
  81. def pytest_runtest_logstart(self, nodeid, location):
  82. self.sendevent("logstart", nodeid=nodeid, location=location)
  83. # the pytest_runtest_logfinish hook was introduced in pytest 3.4
  84. if hasattr(_pytest.hookspec, "pytest_runtest_logfinish"):
  85. def pytest_runtest_logfinish(self, nodeid, location):
  86. self.sendevent("logfinish", nodeid=nodeid, location=location)
  87. def pytest_runtest_logreport(self, report):
  88. data = serialize_report(report)
  89. data["item_index"] = self.item_index
  90. data["worker_id"] = self.workerid
  91. assert self.session.items[self.item_index].nodeid == report.nodeid
  92. self.sendevent("testreport", data=data)
  93. def pytest_collectreport(self, report):
  94. # master only needs reports that failed, as optimization send only them instead (#330)
  95. if report.failed:
  96. data = serialize_report(report)
  97. self.sendevent("collectreport", data=data)
  98. def pytest_logwarning(self, message, code, nodeid, fslocation):
  99. self.sendevent(
  100. "logwarning",
  101. message=message,
  102. code=code,
  103. nodeid=nodeid,
  104. fslocation=str(fslocation),
  105. )
  106. def serialize_report(rep):
  107. def disassembled_report(rep):
  108. reprtraceback = rep.longrepr.reprtraceback.__dict__.copy()
  109. reprcrash = rep.longrepr.reprcrash.__dict__.copy()
  110. new_entries = []
  111. for entry in reprtraceback["reprentries"]:
  112. entry_data = {"type": type(entry).__name__, "data": entry.__dict__.copy()}
  113. for key, value in entry_data["data"].items():
  114. if hasattr(value, "__dict__"):
  115. entry_data["data"][key] = value.__dict__.copy()
  116. new_entries.append(entry_data)
  117. reprtraceback["reprentries"] = new_entries
  118. return {
  119. "reprcrash": reprcrash,
  120. "reprtraceback": reprtraceback,
  121. "sections": rep.longrepr.sections,
  122. }
  123. import py
  124. d = rep.__dict__.copy()
  125. if hasattr(rep.longrepr, "toterminal"):
  126. if hasattr(rep.longrepr, "reprtraceback") and hasattr(
  127. rep.longrepr, "reprcrash"
  128. ):
  129. d["longrepr"] = disassembled_report(rep)
  130. else:
  131. d["longrepr"] = str(rep.longrepr)
  132. else:
  133. d["longrepr"] = rep.longrepr
  134. for name in d:
  135. if isinstance(d[name], py.path.local):
  136. d[name] = str(d[name])
  137. elif name == "result":
  138. d[name] = None # for now
  139. return d
  140. def getinfodict():
  141. import platform
  142. return dict(
  143. version=sys.version,
  144. version_info=tuple(sys.version_info),
  145. sysplatform=sys.platform,
  146. platform=platform.platform(),
  147. executable=sys.executable,
  148. cwd=os.getcwd(),
  149. )
  150. def remote_initconfig(option_dict, args):
  151. from _pytest.config import Config
  152. option_dict["plugins"].append("no:terminal")
  153. config = Config.fromdictargs(option_dict, args)
  154. config.option.looponfail = False
  155. config.option.usepdb = False
  156. config.option.dist = "no"
  157. config.option.distload = False
  158. config.option.numprocesses = None
  159. config.args = args
  160. return config
  161. if __name__ == "__channelexec__":
  162. channel = channel # noqa
  163. workerinput, args, option_dict = channel.receive()
  164. importpath = os.getcwd()
  165. sys.path.insert(0, importpath) # XXX only for remote situations
  166. os.environ["PYTHONPATH"] = (
  167. importpath + os.pathsep + os.environ.get("PYTHONPATH", "")
  168. )
  169. os.environ["PYTEST_XDIST_WORKER"] = workerinput["workerid"]
  170. os.environ["PYTEST_XDIST_WORKER_COUNT"] = str(workerinput["workercount"])
  171. # os.environ['PYTHONPATH'] = importpath
  172. import py
  173. config = remote_initconfig(option_dict, args)
  174. config.workerinput = workerinput
  175. config.workeroutput = {}
  176. # TODO: deprecated name, backward compatibility only. Remove it in future
  177. config.slaveinput = config.workerinput
  178. config.slaveoutput = config.workeroutput
  179. interactor = WorkerInteractor(config, channel)
  180. config.hook.pytest_cmdline_main(config=config)