summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/libregrtest/main.py')
-rw-r--r--Lib/test/libregrtest/main.py712
1 files changed, 0 insertions, 712 deletions
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
deleted file mode 100644
index 1de51b7..0000000
--- a/Lib/test/libregrtest/main.py
+++ /dev/null
@@ -1,712 +0,0 @@
-import datetime
-import faulthandler
-import locale
-import os
-import platform
-import random
-import re
-import sys
-import sysconfig
-import tempfile
-import time
-import unittest
-from test.libregrtest.cmdline import _parse_args
-from test.libregrtest.runtest import (
- findtests, runtest, get_abs_module,
- STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
- INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT,
- PROGRESS_MIN_TIME, format_test_result, is_failed)
-from test.libregrtest.setup import setup_tests
-from test.libregrtest.pgo import setup_pgo_tests
-from test.libregrtest.utils import removepy, count, format_duration, printlist
-from test import support
-
-
-# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
-# Used to protect against threading._shutdown() hang.
-# Must be smaller than buildbot "1200 seconds without output" limit.
-EXIT_TIMEOUT = 120.0
-
-
-class Regrtest:
- """Execute a test suite.
-
- This also parses command-line options and modifies its behavior
- accordingly.
-
- tests -- a list of strings containing test names (optional)
- testdir -- the directory in which to look for tests (optional)
-
- Users other than the Python test suite will certainly want to
- specify testdir; if it's omitted, the directory containing the
- Python test suite is searched for.
-
- If the tests argument is omitted, the tests listed on the
- command-line will be used. If that's empty, too, then all *.py
- files beginning with test_ will be used.
-
- The other default arguments (verbose, quiet, exclude,
- single, randomize, findleaks, use_resources, trace, coverdir,
- print_slow, and random_seed) allow programmers calling main()
- directly to set the values that would normally be set by flags
- on the command line.
- """
- def __init__(self):
- # Namespace of command line options
- self.ns = None
-
- # tests
- self.tests = []
- self.selected = []
-
- # test results
- self.good = []
- self.bad = []
- self.skipped = []
- self.resource_denieds = []
- self.environment_changed = []
- self.run_no_tests = []
- self.rerun = []
- self.first_result = None
- self.interrupted = False
-
- # used by --slow
- self.test_times = []
-
- # used by --coverage, trace.Trace instance
- self.tracer = None
-
- # used to display the progress bar "[ 3/100]"
- self.start_time = time.monotonic()
- self.test_count = ''
- self.test_count_width = 1
-
- # used by --single
- self.next_single_test = None
- self.next_single_filename = None
-
- # used by --junit-xml
- self.testsuite_xml = None
-
- # misc
- self.win_load_tracker = None
- self.tmp_dir = None
- self.worker_test_name = None
-
- def get_executed(self):
- return (set(self.good) | set(self.bad) | set(self.skipped)
- | set(self.resource_denieds) | set(self.environment_changed)
- | set(self.run_no_tests))
-
- def accumulate_result(self, result, rerun=False):
- test_name = result.test_name
- ok = result.result
-
- if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun:
- self.test_times.append((result.test_time, test_name))
-
- if ok == PASSED:
- self.good.append(test_name)
- elif ok in (FAILED, CHILD_ERROR):
- if not rerun:
- self.bad.append(test_name)
- elif ok == ENV_CHANGED:
- self.environment_changed.append(test_name)
- elif ok == SKIPPED:
- self.skipped.append(test_name)
- elif ok == RESOURCE_DENIED:
- self.skipped.append(test_name)
- self.resource_denieds.append(test_name)
- elif ok == TEST_DID_NOT_RUN:
- self.run_no_tests.append(test_name)
- elif ok == INTERRUPTED:
- self.interrupted = True
- elif ok == TIMEOUT:
- self.bad.append(test_name)
- else:
- raise ValueError("invalid test result: %r" % ok)
-
- if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}:
- self.bad.remove(test_name)
-
- xml_data = result.xml_data
- if xml_data:
- import xml.etree.ElementTree as ET
- for e in xml_data:
- try:
- self.testsuite_xml.append(ET.fromstring(e))
- except ET.ParseError:
- print(xml_data, file=sys.__stderr__)
- raise
-
- def log(self, line=''):
- empty = not line
-
- # add the system load prefix: "load avg: 1.80 "
- load_avg = self.getloadavg()
- if load_avg is not None:
- line = f"load avg: {load_avg:.2f} {line}"
-
- # add the timestamp prefix: "0:01:05 "
- test_time = time.monotonic() - self.start_time
- test_time = datetime.timedelta(seconds=int(test_time))
- line = f"{test_time} {line}"
-
- if empty:
- line = line[:-1]
-
- print(line, flush=True)
-
- def display_progress(self, test_index, text):
- if self.ns.quiet:
- return
-
- # "[ 51/405/1] test_tcl passed"
- line = f"{test_index:{self.test_count_width}}{self.test_count}"
- fails = len(self.bad) + len(self.environment_changed)
- if fails and not self.ns.pgo:
- line = f"{line}/{fails}"
- self.log(f"[{line}] {text}")
-
- def parse_args(self, kwargs):
- ns = _parse_args(sys.argv[1:], **kwargs)
-
- if ns.xmlpath:
- support.junit_xml_list = self.testsuite_xml = []
-
- worker_args = ns.worker_args
- if worker_args is not None:
- from test.libregrtest.runtest_mp import parse_worker_args
- ns, test_name = parse_worker_args(ns.worker_args)
- ns.worker_args = worker_args
- self.worker_test_name = test_name
-
- # Strip .py extensions.
- removepy(ns.args)
-
- if ns.huntrleaks:
- warmup, repetitions, _ = ns.huntrleaks
- if warmup < 1 or repetitions < 1:
- msg = ("Invalid values for the --huntrleaks/-R parameters. The "
- "number of warmups and repetitions must be at least 1 "
- "each (1:1).")
- print(msg, file=sys.stderr, flush=True)
- sys.exit(2)
-
- if ns.tempdir:
- ns.tempdir = os.path.expanduser(ns.tempdir)
-
- self.ns = ns
-
- def find_tests(self, tests):
- self.tests = tests
-
- if self.ns.single:
- self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
- try:
- with open(self.next_single_filename, 'r') as fp:
- next_test = fp.read().strip()
- self.tests = [next_test]
- except OSError:
- pass
-
- if self.ns.fromfile:
- self.tests = []
- # regex to match 'test_builtin' in line:
- # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
- regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
- with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
- for line in fp:
- line = line.split('#', 1)[0]
- line = line.strip()
- match = regex.search(line)
- if match is not None:
- self.tests.append(match.group())
-
- removepy(self.tests)
-
- if self.ns.pgo:
- # add default PGO tests if no tests are specified
- setup_pgo_tests(self.ns)
-
- stdtests = STDTESTS[:]
- nottests = NOTTESTS.copy()
- if self.ns.exclude:
- for arg in self.ns.args:
- if arg in stdtests:
- stdtests.remove(arg)
- nottests.add(arg)
- self.ns.args = []
-
- # if testdir is set, then we are not running the python tests suite, so
- # don't add default tests to be executed or skipped (pass empty values)
- if self.ns.testdir:
- alltests = findtests(self.ns.testdir, list(), set())
- else:
- alltests = findtests(self.ns.testdir, stdtests, nottests)
-
- if not self.ns.fromfile:
- self.selected = self.tests or self.ns.args or alltests
- else:
- self.selected = self.tests
- if self.ns.single:
- self.selected = self.selected[:1]
- try:
- pos = alltests.index(self.selected[0])
- self.next_single_test = alltests[pos + 1]
- except IndexError:
- pass
-
- # Remove all the selected tests that precede start if it's set.
- if self.ns.start:
- try:
- del self.selected[:self.selected.index(self.ns.start)]
- except ValueError:
- print("Couldn't find starting test (%s), using all tests"
- % self.ns.start, file=sys.stderr)
-
- if self.ns.randomize:
- if self.ns.random_seed is None:
- self.ns.random_seed = random.randrange(10000000)
- random.seed(self.ns.random_seed)
- random.shuffle(self.selected)
-
- def list_tests(self):
- for name in self.selected:
- print(name)
-
- def _list_cases(self, suite):
- for test in suite:
- if isinstance(test, unittest.loader._FailedTest):
- continue
- if isinstance(test, unittest.TestSuite):
- self._list_cases(test)
- elif isinstance(test, unittest.TestCase):
- if support.match_test(test):
- print(test.id())
-
- def list_cases(self):
- support.verbose = False
- support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
-
- for test_name in self.selected:
- abstest = get_abs_module(self.ns, test_name)
- try:
- suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
- self._list_cases(suite)
- except unittest.SkipTest:
- self.skipped.append(test_name)
-
- if self.skipped:
- print(file=sys.stderr)
- print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
- printlist(self.skipped, file=sys.stderr)
-
- def rerun_failed_tests(self):
- self.ns.verbose = True
- self.ns.failfast = False
- self.ns.verbose3 = False
-
- self.first_result = self.get_tests_result()
-
- self.log()
- self.log("Re-running failed tests in verbose mode")
- self.rerun = self.bad[:]
- for test_name in self.rerun:
- self.log(f"Re-running {test_name} in verbose mode")
- self.ns.verbose = True
- result = runtest(self.ns, test_name)
-
- self.accumulate_result(result, rerun=True)
-
- if result.result == INTERRUPTED:
- break
-
- if self.bad:
- print(count(len(self.bad), 'test'), "failed again:")
- printlist(self.bad)
-
- self.display_result()
-
- def display_result(self):
- # If running the test suite for PGO then no one cares about results.
- if self.ns.pgo:
- return
-
- print()
- print("== Tests result: %s ==" % self.get_tests_result())
-
- if self.interrupted:
- print("Test suite interrupted by signal SIGINT.")
-
- omitted = set(self.selected) - self.get_executed()
- if omitted:
- print()
- print(count(len(omitted), "test"), "omitted:")
- printlist(omitted)
-
- if self.good and not self.ns.quiet:
- print()
- if (not self.bad
- and not self.skipped
- and not self.interrupted
- and len(self.good) > 1):
- print("All", end=' ')
- print(count(len(self.good), "test"), "OK.")
-
- if self.ns.print_slow:
- self.test_times.sort(reverse=True)
- print()
- print("10 slowest tests:")
- for test_time, test in self.test_times[:10]:
- print("- %s: %s" % (test, format_duration(test_time)))
-
- if self.bad:
- print()
- print(count(len(self.bad), "test"), "failed:")
- printlist(self.bad)
-
- if self.environment_changed:
- print()
- print("{} altered the execution environment:".format(
- count(len(self.environment_changed), "test")))
- printlist(self.environment_changed)
-
- if self.skipped and not self.ns.quiet:
- print()
- print(count(len(self.skipped), "test"), "skipped:")
- printlist(self.skipped)
-
- if self.rerun:
- print()
- print("%s:" % count(len(self.rerun), "re-run test"))
- printlist(self.rerun)
-
- if self.run_no_tests:
- print()
- print(count(len(self.run_no_tests), "test"), "run no tests:")
- printlist(self.run_no_tests)
-
- def run_tests_sequential(self):
- if self.ns.trace:
- import trace
- self.tracer = trace.Trace(trace=False, count=True)
-
- save_modules = sys.modules.keys()
-
- self.log("Run tests sequentially")
-
- previous_test = None
- for test_index, test_name in enumerate(self.tests, 1):
- start_time = time.monotonic()
-
- text = test_name
- if previous_test:
- text = '%s -- %s' % (text, previous_test)
- self.display_progress(test_index, text)
-
- if self.tracer:
- # If we're tracing code coverage, then we don't exit with status
- # if on a false return value from main.
- cmd = ('result = runtest(self.ns, test_name); '
- 'self.accumulate_result(result)')
- ns = dict(locals())
- self.tracer.runctx(cmd, globals=globals(), locals=ns)
- result = ns['result']
- else:
- result = runtest(self.ns, test_name)
- self.accumulate_result(result)
-
- if result.result == INTERRUPTED:
- break
-
- previous_test = format_test_result(result)
- test_time = time.monotonic() - start_time
- if test_time >= PROGRESS_MIN_TIME:
- previous_test = "%s in %s" % (previous_test, format_duration(test_time))
- elif result.result == PASSED:
- # be quiet: say nothing if the test passed shortly
- previous_test = None
-
- # Unload the newly imported modules (best effort finalization)
- for module in sys.modules.keys():
- if module not in save_modules and module.startswith("test."):
- support.unload(module)
-
- if self.ns.failfast and is_failed(result, self.ns):
- break
-
- if previous_test:
- print(previous_test)
-
- def _test_forever(self, tests):
- while True:
- for test_name in tests:
- yield test_name
- if self.bad:
- return
- if self.ns.fail_env_changed and self.environment_changed:
- return
-
- def display_header(self):
- # Print basic platform information
- print("==", platform.python_implementation(), *sys.version.split())
- print("==", platform.platform(aliased=True),
- "%s-endian" % sys.byteorder)
- print("== cwd:", os.getcwd())
- cpu_count = os.cpu_count()
- if cpu_count:
- print("== CPU count:", cpu_count)
- print("== encodings: locale=%s, FS=%s"
- % (locale.getpreferredencoding(False),
- sys.getfilesystemencoding()))
-
- def get_tests_result(self):
- result = []
- if self.bad:
- result.append("FAILURE")
- elif self.ns.fail_env_changed and self.environment_changed:
- result.append("ENV CHANGED")
- elif not any((self.good, self.bad, self.skipped, self.interrupted,
- self.environment_changed)):
- result.append("NO TEST RUN")
-
- if self.interrupted:
- result.append("INTERRUPTED")
-
- if not result:
- result.append("SUCCESS")
-
- result = ', '.join(result)
- if self.first_result:
- result = '%s then %s' % (self.first_result, result)
- return result
-
- def run_tests(self):
- # For a partial run, we do not need to clutter the output.
- if (self.ns.header
- or not(self.ns.pgo or self.ns.quiet or self.ns.single
- or self.tests or self.ns.args)):
- self.display_header()
-
- if self.ns.huntrleaks:
- warmup, repetitions, _ = self.ns.huntrleaks
- if warmup < 3:
- msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
- "3 warmup repetitions can give false positives!")
- print(msg, file=sys.stdout, flush=True)
-
- if self.ns.randomize:
- print("Using random seed", self.ns.random_seed)
-
- if self.ns.forever:
- self.tests = self._test_forever(list(self.selected))
- self.test_count = ''
- self.test_count_width = 3
- else:
- self.tests = iter(self.selected)
- self.test_count = '/{}'.format(len(self.selected))
- self.test_count_width = len(self.test_count) - 1
-
- if self.ns.use_mp:
- from test.libregrtest.runtest_mp import run_tests_multiprocess
- run_tests_multiprocess(self)
- else:
- self.run_tests_sequential()
-
- def finalize(self):
- if self.next_single_filename:
- if self.next_single_test:
- with open(self.next_single_filename, 'w') as fp:
- fp.write(self.next_single_test + '\n')
- else:
- os.unlink(self.next_single_filename)
-
- if self.tracer:
- r = self.tracer.results()
- r.write_results(show_missing=True, summary=True,
- coverdir=self.ns.coverdir)
-
- print()
- duration = time.monotonic() - self.start_time
- print("Total duration: %s" % format_duration(duration))
- print("Tests result: %s" % self.get_tests_result())
-
- if self.ns.runleaks:
- os.system("leaks %d" % os.getpid())
-
- def save_xml_result(self):
- if not self.ns.xmlpath and not self.testsuite_xml:
- return
-
- import xml.etree.ElementTree as ET
- root = ET.Element("testsuites")
-
- # Manually count the totals for the overall summary
- totals = {'tests': 0, 'errors': 0, 'failures': 0}
- for suite in self.testsuite_xml:
- root.append(suite)
- for k in totals:
- try:
- totals[k] += int(suite.get(k, 0))
- except ValueError:
- pass
-
- for k, v in totals.items():
- root.set(k, str(v))
-
- xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath)
- with open(xmlpath, 'wb') as f:
- for s in ET.tostringlist(root):
- f.write(s)
-
- def set_temp_dir(self):
- if self.ns.tempdir:
- self.tmp_dir = self.ns.tempdir
-
- if not self.tmp_dir:
- # When tests are run from the Python build directory, it is best practice
- # to keep the test files in a subfolder. This eases the cleanup of leftover
- # files using the "make distclean" command.
- if sysconfig.is_python_build():
- self.tmp_dir = sysconfig.get_config_var('abs_builddir')
- if self.tmp_dir is None:
- # bpo-30284: On Windows, only srcdir is available. Using
- # abs_builddir mostly matters on UNIX when building Python
- # out of the source tree, especially when the source tree
- # is read only.
- self.tmp_dir = sysconfig.get_config_var('srcdir')
- self.tmp_dir = os.path.join(self.tmp_dir, 'build')
- else:
- self.tmp_dir = tempfile.gettempdir()
-
- self.tmp_dir = os.path.abspath(self.tmp_dir)
-
- def create_temp_dir(self):
- os.makedirs(self.tmp_dir, exist_ok=True)
-
- # Define a writable temp dir that will be used as cwd while running
- # the tests. The name of the dir includes the pid to allow parallel
- # testing (see the -j option).
- pid = os.getpid()
- if self.worker_test_name is not None:
- test_cwd = 'test_python_worker_{}'.format(pid)
- else:
- test_cwd = 'test_python_{}'.format(pid)
- test_cwd = os.path.join(self.tmp_dir, test_cwd)
- return test_cwd
-
- def cleanup(self):
- import glob
-
- path = os.path.join(self.tmp_dir, 'test_python_*')
- print("Cleanup %s directory" % self.tmp_dir)
- for name in glob.glob(path):
- if os.path.isdir(name):
- print("Remove directory: %s" % name)
- support.rmtree(name)
- else:
- print("Remove file: %s" % name)
- support.unlink(name)
-
- def main(self, tests=None, **kwargs):
- self.parse_args(kwargs)
-
- self.set_temp_dir()
-
- if self.ns.cleanup:
- self.cleanup()
- sys.exit(0)
-
- test_cwd = self.create_temp_dir()
-
- try:
- # Run the tests in a context manager that temporarily changes the CWD
- # to a temporary and writable directory. If it's not possible to
- # create or change the CWD, the original CWD will be used.
- # The original CWD is available from support.SAVEDCWD.
- with support.temp_cwd(test_cwd, quiet=True):
- # When using multiprocessing, worker processes will use test_cwd
- # as their parent temporary directory. So when the main process
- # exit, it removes also subdirectories of worker processes.
- self.ns.tempdir = test_cwd
-
- self._main(tests, kwargs)
- except SystemExit as exc:
- # bpo-38203: Python can hang at exit in Py_Finalize(), especially
- # on threading._shutdown() call: put a timeout
- faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
-
- sys.exit(exc.code)
-
- def getloadavg(self):
- if self.win_load_tracker is not None:
- return self.win_load_tracker.getloadavg()
-
- if hasattr(os, 'getloadavg'):
- return os.getloadavg()[0]
-
- return None
-
- def _main(self, tests, kwargs):
- if self.worker_test_name is not None:
- from test.libregrtest.runtest_mp import run_tests_worker
- run_tests_worker(self.ns, self.worker_test_name)
-
- if self.ns.wait:
- input("Press any key to continue...")
-
- support.PGO = self.ns.pgo
- support.PGO_EXTENDED = self.ns.pgo_extended
-
- setup_tests(self.ns)
-
- self.find_tests(tests)
-
- if self.ns.list_tests:
- self.list_tests()
- sys.exit(0)
-
- if self.ns.list_cases:
- self.list_cases()
- sys.exit(0)
-
- # If we're on windows and this is the parent runner (not a worker),
- # track the load average.
- if sys.platform == 'win32' and self.worker_test_name is None:
- from test.libregrtest.win_utils import WindowsLoadTracker
-
- try:
- self.win_load_tracker = WindowsLoadTracker()
- except FileNotFoundError as error:
- # Windows IoT Core and Windows Nano Server do not provide
- # typeperf.exe for x64, x86 or ARM
- print(f'Failed to create WindowsLoadTracker: {error}')
-
- try:
- self.run_tests()
- self.display_result()
-
- if self.ns.verbose2 and self.bad:
- self.rerun_failed_tests()
- finally:
- if self.win_load_tracker is not None:
- self.win_load_tracker.close()
- self.win_load_tracker = None
-
- self.finalize()
-
- self.save_xml_result()
-
- if self.bad:
- sys.exit(2)
- if self.interrupted:
- sys.exit(130)
- if self.ns.fail_env_changed and self.environment_changed:
- sys.exit(3)
- sys.exit(0)
-
-
-def main(tests=None, **kwargs):
- """Run the Python suite."""
- Regrtest().main(tests=tests, **kwargs)