123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import os
- import py
- import pytest
- def auto_detect_cpus():
- try:
- from os import sched_getaffinity
- except ImportError:
- if os.environ.get("TRAVIS") == "true":
- # workaround https://bitbucket.org/pypy/pypy/issues/2375
- return 2
- try:
- from os import cpu_count
- except ImportError:
- from multiprocessing import cpu_count
- else:
- def cpu_count():
- return len(sched_getaffinity(0))
- try:
- n = cpu_count()
- except NotImplementedError:
- return 1
- return n if n else 1
- def parse_numprocesses(s):
- if s == "auto":
- return auto_detect_cpus()
- else:
- return int(s)
- def pytest_addoption(parser):
- group = parser.getgroup("xdist", "distributed and subprocess testing")
- group._addoption(
- "-n",
- "--numprocesses",
- dest="numprocesses",
- metavar="numprocesses",
- action="store",
- type=parse_numprocesses,
- help="shortcut for '--dist=load --tx=NUM*popen', "
- "you can use 'auto' here for auto detection CPUs number on "
- "host system",
- )
- group.addoption(
- "--max-worker-restart",
- "--max-slave-restart",
- action="store",
- default=None,
- dest="maxworkerrestart",
- help="maximum number of workers that can be restarted "
- "when crashed (set to zero to disable this feature)\n"
- "'--max-slave-restart' option is deprecated and will be removed in "
- "a future release",
- )
- group.addoption(
- "--dist",
- metavar="distmode",
- action="store",
- choices=["each", "load", "loadscope", "loadfile", "no"],
- dest="dist",
- default="no",
- help=(
- "set mode for distributing tests to exec environments.\n\n"
- "each: send each test to all available environments.\n\n"
- "load: load balance by sending any pending test to any"
- " available environment.\n\n"
- "loadscope: load balance by sending pending groups of tests in"
- " the same scope to any available environment.\n\n"
- "loadfile: load balance by sending test grouped by file"
- " to any available environment.\n\n"
- "(default) no: run tests inprocess, don't distribute."
- ),
- )
- group.addoption(
- "--tx",
- dest="tx",
- action="append",
- default=[],
- metavar="xspec",
- help=(
- "add a test execution environment. some examples: "
- "--tx popen//python=python2.5 --tx socket=192.168.1.102:8888 "
- "--tx ssh=user@codespeak.net//chdir=testcache"
- ),
- )
- group._addoption(
- "-d",
- action="store_true",
- dest="distload",
- default=False,
- help="load-balance tests. shortcut for '--dist=load'",
- )
- group.addoption(
- "--rsyncdir",
- action="append",
- default=[],
- metavar="DIR",
- help="add directory for rsyncing to remote tx nodes.",
- )
- group.addoption(
- "--rsyncignore",
- action="append",
- default=[],
- metavar="GLOB",
- help="add expression for ignores when rsyncing to remote tx nodes.",
- )
- group.addoption(
- "--boxed",
- action="store_true",
- help="backward compatibility alias for pytest-forked --forked",
- )
- parser.addini(
- "rsyncdirs",
- "list of (relative) paths to be rsynced for" " remote distributed testing.",
- type="pathlist",
- )
- parser.addini(
- "rsyncignore",
- "list of (relative) glob-style paths to be ignored " "for rsyncing.",
- type="pathlist",
- )
- parser.addini(
- "looponfailroots",
- type="pathlist",
- help="directories to check for changes",
- default=[py.path.local()],
- )
- # -------------------------------------------------------------------------
- # distributed testing hooks
- # -------------------------------------------------------------------------
- def pytest_addhooks(pluginmanager):
- from xdist import newhooks
- # avoid warnings with pytest-2.8
- method = getattr(pluginmanager, "add_hookspecs", None)
- if method is None:
- method = pluginmanager.addhooks
- method(newhooks)
- # -------------------------------------------------------------------------
- # distributed testing initialization
- # -------------------------------------------------------------------------
- @pytest.mark.trylast
- def pytest_configure(config):
- if config.getoption("dist") != "no" and not config.getvalue("collectonly"):
- from xdist.dsession import DSession
- session = DSession(config)
- config.pluginmanager.register(session, "dsession")
- tr = config.pluginmanager.getplugin("terminalreporter")
- tr.showfspath = False
- if config.getoption("boxed"):
- config.option.forked = True
- @pytest.mark.tryfirst
- def pytest_cmdline_main(config):
- if config.option.numprocesses:
- if config.option.dist == "no":
- config.option.dist = "load"
- config.option.tx = ["popen"] * config.option.numprocesses
- if config.option.distload:
- config.option.dist = "load"
- val = config.getvalue
- if not val("collectonly"):
- usepdb = config.getoption("usepdb") # a core option
- if val("dist") != "no":
- if usepdb:
- raise pytest.UsageError(
- "--pdb is incompatible with distributing tests; try using -n0."
- ) # noqa: E501
- # -------------------------------------------------------------------------
- # fixtures
- # -------------------------------------------------------------------------
- @pytest.fixture(scope="session")
- def worker_id(request):
- """Return the id of the current worker ('gw0', 'gw1', etc) or 'master'
- if running on the master node.
- """
- if hasattr(request.config, "workerinput"):
- return request.config.workerinput["workerid"]
- else:
- return "master"
|