123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- """Timeout for tests to stop hanging testruns
- This plugin will dump the stack and terminate the test. This can be
- useful when running tests on a continuous integration server.
- If the platform supports SIGALRM this is used to raise an exception in
- the test, otherwise os._exit(1) is used.
- """
- import os
- import signal
- import sys
- import threading
- import traceback
- from collections import namedtuple
- from distutils.version import StrictVersion
- import py
- import pytest
- HAVE_SIGALRM = hasattr(signal, 'SIGALRM')
- if HAVE_SIGALRM:
- DEFAULT_METHOD = 'signal'
- else:
- DEFAULT_METHOD = 'thread'
- TIMEOUT_DESC = """
- Timeout in seconds before dumping the stacks. Default is 0 which
- means no timeout.
- """.strip()
- METHOD_DESC = """
- Timeout mechanism to use. 'signal' uses SIGALRM if available,
- 'thread' uses a timer thread. The default is to use 'signal' and fall
- back to 'thread'.
- """.strip()
- FUNC_ONLY_DESC = """
- When set to True, defers the timeout evaluation to only the test
- function body, ignoring the time it takes when evaluating any fixtures
- used in the test.
- """.strip()
- Settings = namedtuple('Settings', ['timeout', 'method', 'func_only'])
- @pytest.hookimpl
- def pytest_addoption(parser):
- """Add options to control the timeout plugin"""
- group = parser.getgroup(
- 'timeout', 'Interrupt test run and dump stacks of all threads after '
- 'a test times out')
- group.addoption('--timeout',
- type=float,
- help=TIMEOUT_DESC)
- group.addoption('--timeout_method',
- action='store',
- choices=['signal', 'thread'],
- help='Deprecated, use --timeout-method')
- group.addoption('--timeout-method',
- dest='timeout_method',
- action='store',
- choices=['signal', 'thread'],
- help=METHOD_DESC)
- parser.addini('timeout', TIMEOUT_DESC)
- parser.addini('timeout_method', METHOD_DESC)
- parser.addini('timeout_func_only', FUNC_ONLY_DESC, type='bool')
- @pytest.hookimpl
- def pytest_configure(config):
- # Register the marker so it shows up in --markers output.
- config.addinivalue_line(
- 'markers',
- 'timeout(timeout, method=None, func_only=False): Set a timeout, timeout '
- 'method and func_only evaluation on just one test item. The first '
- 'argument, *timeout*, is the timeout in seconds while the keyword, '
- '*method*, takes the same values as the --timeout_method option. The '
- '*func_only* keyword, when set to True, defers the timeout evaluation '
- 'to only the test function body, ignoring the time it takes when '
- 'evaluating any fixrures used in the test.')
- settings = get_env_settings(config)
- config._env_timeout = settings.timeout
- config._env_timeout_method = settings.method
- config._env_timeout_func_only = settings.func_only
- @pytest.hookimpl(hookwrapper=True)
- def pytest_runtest_protocol(item):
- func_only = get_func_only_setting(item)
- if func_only is False:
- timeout_setup(item)
- outcome = yield
- if func_only is False:
- timeout_teardown(item)
- @pytest.hookimpl(hookwrapper=True)
- def pytest_runtest_call(item):
- func_only = get_func_only_setting(item)
- if func_only is True:
- timeout_setup(item)
- outcome = yield
- if func_only is True:
- timeout_teardown(item)
- @pytest.hookimpl(tryfirst=True)
- def pytest_report_header(config):
- if config._env_timeout:
- return ["timeout: %ss\ntimeout method: %s\ntimeout func_only: %s" %
- (config._env_timeout,
- config._env_timeout_method,
- config._env_timeout_func_only)]
- @pytest.hookimpl(tryfirst=True)
- def pytest_exception_interact(node):
- timeout_teardown(node)
- @pytest.hookimpl
- def pytest_enter_pdb():
- # Since pdb.set_trace happens outside of any pytest control, we don't have
- # any pytest ``item`` here, so we cannot use timeout_teardown. Thus, we
- # need another way to signify that the timeout should not be performed.
- global SUPPRESS_TIMEOUT
- SUPPRESS_TIMEOUT = True
- SUPPRESS_TIMEOUT = False
- def timeout_setup(item):
- """Setup up a timeout trigger and handler"""
- params = get_params(item)
- if params.timeout is None or params.timeout <= 0:
- return
- if params.method == 'signal':
- def handler(signum, frame):
- __tracebackhide__ = True
- timeout_sigalrm(item, params.timeout)
- def cancel():
- signal.setitimer(signal.ITIMER_REAL, 0)
- signal.signal(signal.SIGALRM, signal.SIG_DFL)
- item.cancel_timeout = cancel
- signal.signal(signal.SIGALRM, handler)
- signal.setitimer(signal.ITIMER_REAL, params.timeout)
- elif params.method == 'thread':
- timer = threading.Timer(params.timeout, timeout_timer,
- (item, params.timeout))
- def cancel():
- timer.cancel()
- timer.join()
- item.cancel_timeout = cancel
- timer.start()
- def timeout_teardown(item):
- """Cancel the timeout trigger if it was set"""
- # When skipping is raised from a pytest_runtest_setup function
- # (as is the case when using the pytest.mark.skipif marker) we
- # may be called without our setup counterpart having been
- # called.
- cancel = getattr(item, 'cancel_timeout', None)
- if cancel:
- cancel()
- def get_env_settings(config):
- timeout = config.getvalue('timeout')
- if timeout is None:
- timeout = _validate_timeout(
- os.environ.get('PYTEST_TIMEOUT'),
- 'PYTEST_TIMEOUT environment variable')
- if timeout is None:
- ini = config.getini('timeout')
- if ini:
- timeout = _validate_timeout(ini, 'config file')
- method = config.getvalue('timeout_method')
- if method is None:
- ini = config.getini('timeout_method')
- if ini:
- method = _validate_method(ini, 'config file')
- if method is None:
- method = DEFAULT_METHOD
- func_only = config.getini('timeout_func_only')
- if func_only == []:
- # No value set
- func_only = None
- if func_only is not None:
- func_only = _validate_func_only(func_only, 'config file')
- return Settings(timeout, method, func_only or False)
- def get_func_only_setting(item):
- """Return the func_only setting for an item"""
- func_only = None
- marker = item.get_closest_marker('timeout')
- if marker:
- settings = get_params(item, marker=marker)
- func_only = _validate_func_only(settings.func_only, 'marker')
- if func_only is None:
- func_only = item.config._env_timeout_func_only
- if func_only is None:
- func_only = False
- return func_only
- def get_params(item, marker=None):
- """Return (timeout, method) for an item"""
- timeout = method = func_only = None
- if not marker:
- marker = item.get_closest_marker('timeout')
- if marker is not None:
- settings = _parse_marker(item.get_closest_marker(name='timeout'))
- timeout = _validate_timeout(settings.timeout, 'marker')
- method = _validate_method(settings.method, 'marker')
- func_only = _validate_func_only(settings.func_only, 'marker')
- if timeout is None:
- timeout = item.config._env_timeout
- if method is None:
- method = item.config._env_timeout_method
- if func_only is None:
- func_only = item.config._env_timeout_func_only
- return Settings(timeout, method, func_only)
- def _parse_marker(marker):
- """Return (timeout, method) tuple from marker
- Either could be None. The values are not interpreted, so
- could still be bogus and even the wrong type.
- """
- if not marker.args and not marker.kwargs:
- raise TypeError('Timeout marker must have at least one argument')
- timeout = method = func_only = NOTSET = object()
- for kw, val in marker.kwargs.items():
- if kw == 'timeout':
- timeout = val
- elif kw == 'method':
- method = val
- elif kw == 'func_only':
- func_only = val
- else:
- raise TypeError(
- 'Invalid keyword argument for timeout marker: %s' % kw)
- if len(marker.args) >= 1 and timeout is not NOTSET:
- raise TypeError(
- 'Multiple values for timeout argument of timeout marker')
- elif len(marker.args) >= 1:
- timeout = marker.args[0]
- if len(marker.args) >= 2 and method is not NOTSET:
- raise TypeError(
- 'Multiple values for method argument of timeout marker')
- elif len(marker.args) >= 2:
- method = marker.args[1]
- if len(marker.args) > 2:
- raise TypeError('Too many arguments for timeout marker')
- if timeout is NOTSET:
- timeout = None
- if method is NOTSET:
- method = None
- if func_only is NOTSET:
- func_only = None
- return Settings(timeout, method, func_only)
- def _validate_timeout(timeout, where):
- """Helper for get_params()"""
- if timeout is None:
- return None
- try:
- return float(timeout)
- except ValueError:
- raise ValueError('Invalid timeout %s from %s' % (timeout, where))
- def _validate_method(method, where):
- """Helper for get_params()"""
- if method is None:
- return None
- if method not in ['signal', 'thread']:
- raise ValueError('Invalid method %s from %s' % (method, where))
- return method
- def _validate_func_only(func_only, where):
- """Helper for get_params()"""
- if func_only is None:
- return False
- if not isinstance(func_only, bool):
- raise ValueError('Invalid func_only value %s from %s' % (func_only, where))
- return func_only
- def timeout_sigalrm(item, timeout):
- """Dump stack of threads and raise an exception
- This will output the stacks of any threads other then the
- current to stderr and then raise an AssertionError, thus
- terminating the test.
- """
- if SUPPRESS_TIMEOUT:
- return
- __tracebackhide__ = True
- nthreads = len(threading.enumerate())
- if nthreads > 1:
- write_title('Timeout', sep='+')
- dump_stacks()
- if nthreads > 1:
- write_title('Timeout', sep='+')
- pytest.fail('Timeout >%ss' % timeout)
- def timeout_timer(item, timeout):
- """Dump stack of threads and call os._exit()
- This disables the capturemanager and dumps stdout and stderr.
- Then the stacks are dumped and os._exit(1) is called.
- """
- if SUPPRESS_TIMEOUT:
- return
- try:
- capman = item.config.pluginmanager.getplugin('capturemanager')
- if capman:
- pytest_version = StrictVersion(pytest.__version__)
- if pytest_version >= StrictVersion('3.7.3'):
- capman.suspend_global_capture(item)
- stdout, stderr = capman.read_global_capture()
- else:
- stdout, stderr = capman.suspend_global_capture(item)
- else:
- stdout, stderr = None, None
- write_title('Timeout', sep='+')
- caplog = item.config.pluginmanager.getplugin('_capturelog')
- if caplog and hasattr(item, 'capturelog_handler'):
- log = item.capturelog_handler.stream.getvalue()
- if log:
- write_title('Captured log')
- write(log)
- if stdout:
- write_title('Captured stdout')
- write(stdout)
- if stderr:
- write_title('Captured stderr')
- write(stderr)
- dump_stacks()
- write_title('Timeout', sep='+')
- except Exception:
- traceback.print_exc()
- finally:
- sys.stdout.flush()
- sys.stderr.flush()
- os._exit(1)
- def dump_stacks():
- """Dump the stacks of all threads except the current thread"""
- current_ident = threading.current_thread().ident
- for thread_ident, frame in sys._current_frames().items():
- if thread_ident == current_ident:
- continue
- for t in threading.enumerate():
- if t.ident == thread_ident:
- thread_name = t.name
- break
- else:
- thread_name = '<unknown>'
- write_title('Stack of %s (%s)' % (thread_name, thread_ident))
- write(''.join(traceback.format_stack(frame)))
- def write_title(title, stream=None, sep='~'):
- """Write a section title
- If *stream* is None sys.stderr will be used, *sep* is used to
- draw the line.
- """
- if stream is None:
- stream = sys.stderr
- width = py.io.get_terminal_width()
- fill = int((width - len(title) - 2) / 2)
- line = ' '.join([sep * fill, title, sep * fill])
- if len(line) < width:
- line += sep * (width - len(line))
- stream.write('\n' + line + '\n')
- def write(text, stream=None):
- """Write text to stream
- Pretty stupid really, only here for symetry with .write_title().
- """
- if stream is None:
- stream = sys.stderr
- stream.write(text)
|