diff options
Diffstat (limited to 'Lib/test/libregrtest')
-rw-r--r-- | Lib/test/libregrtest/__init__.py | 2 | ||||
-rw-r--r-- | Lib/test/libregrtest/cmdline.py | 416 | ||||
-rw-r--r-- | Lib/test/libregrtest/main.py | 712 | ||||
-rw-r--r-- | Lib/test/libregrtest/pgo.py | 55 | ||||
-rw-r--r-- | Lib/test/libregrtest/refleak.py | 286 | ||||
-rw-r--r-- | Lib/test/libregrtest/runtest.py | 340 | ||||
-rw-r--r-- | Lib/test/libregrtest/runtest_mp.py | 464 | ||||
-rw-r--r-- | Lib/test/libregrtest/save_env.py | 303 | ||||
-rw-r--r-- | Lib/test/libregrtest/setup.py | 144 | ||||
-rw-r--r-- | Lib/test/libregrtest/utils.py | 81 | ||||
-rw-r--r-- | Lib/test/libregrtest/win_utils.py | 190 |
11 files changed, 0 insertions, 2993 deletions
diff --git a/Lib/test/libregrtest/__init__.py b/Lib/test/libregrtest/__init__.py deleted file mode 100644 index 5e8dba5..0000000 --- a/Lib/test/libregrtest/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES, ALL_RESOURCES -from test.libregrtest.main import main diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py deleted file mode 100644 index c0bb051..0000000 --- a/Lib/test/libregrtest/cmdline.py +++ /dev/null @@ -1,416 +0,0 @@ -import argparse -import os -import sys -from test import support - - -USAGE = """\ -python -m test [options] [test_name1 [test_name2 ...]] -python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]] -""" - -DESCRIPTION = """\ -Run Python regression tests. - -If no arguments or options are provided, finds all files matching -the pattern "test_*" in the Lib/test subdirectory and runs -them in alphabetical order (but see -M and -u, below, for exceptions). - -For more rigorous testing, it is useful to use the following -command line: - -python -E -Wd -m test [options] [test_name1 ...] -""" - -EPILOG = """\ -Additional option details: - --r randomizes test execution order. You can use --randseed=int to provide an -int seed value for the randomizer; this is useful for reproducing troublesome -test orders. - --s On the first invocation of regrtest using -s, the first test file found -or the first test file given on the command line is run, and the name of -the next test is recorded in a file named pynexttest. If run from the -Python build directory, pynexttest is located in the 'build' subdirectory, -otherwise it is located in tempfile.gettempdir(). On subsequent runs, -the test in pynexttest is run, and the next test is written to pynexttest. -When the last test has been run, pynexttest is deleted. In this way it -is possible to single step through the test files. This is useful when -doing memory analysis on the Python interpreter, which process tends to -consume too many resources to run the full regression test non-stop. - --S is used to continue running tests after an aborted run. It will -maintain the order a standard run (ie, this assumes -r is not used). -This is useful after the tests have prematurely stopped for some external -reason and you want to start running from where you left off rather -than starting from the beginning. - --f reads the names of tests from the file given as f's argument, one -or more test names per line. Whitespace is ignored. Blank lines and -lines beginning with '#' are ignored. This is especially useful for -whittling down failures involving interactions among tests. - --L causes the leaks(1) command to be run just before exit if it exists. -leaks(1) is available on Mac OS X and presumably on some other -FreeBSD-derived systems. - --R runs each test several times and examines sys.gettotalrefcount() to -see if the test appears to be leaking references. The argument should -be of the form stab:run:fname where 'stab' is the number of times the -test is run to let gettotalrefcount settle down, 'run' is the number -of times further it is run and 'fname' is the name of the file the -reports are written to. These parameters all have defaults (5, 4 and -"reflog.txt" respectively), and the minimal invocation is '-R :'. - --M runs tests that require an exorbitant amount of memory. These tests -typically try to ascertain containers keep working when containing more than -2 billion objects, which only works on 64-bit systems. There are also some -tests that try to exhaust the address space of the process, which only makes -sense on 32-bit systems with at least 2Gb of memory. The passed-in memlimit, -which is a string in the form of '2.5Gb', determines how much memory the -tests will limit themselves to (but they may go slightly over.) The number -shouldn't be more memory than the machine has (including swap memory). You -should also keep in mind that swap memory is generally much, much slower -than RAM, and setting memlimit to all available RAM or higher will heavily -tax the machine. On the other hand, it is no use running these tests with a -limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect -to use more than memlimit memory will be skipped. The big-memory tests -generally run very, very long. - --u is used to specify which special resource intensive tests to run, -such as those requiring large file support or network connectivity. -The argument is a comma-separated list of words indicating the -resources to test. Currently only the following are defined: - - all - Enable all special resources. - - none - Disable all special resources (this is the default). - - audio - Tests that use the audio device. (There are known - cases of broken audio drivers that can crash Python or - even the Linux kernel.) - - curses - Tests that use curses and will modify the terminal's - state and output modes. - - largefile - It is okay to run some test that may create huge - files. These tests can take a long time and may - consume >2 GiB of disk space temporarily. - - network - It is okay to run tests that use external network - resource, e.g. testing SSL support for sockets. - - decimal - Test the decimal module against a large suite that - verifies compliance with standards. - - cpu - Used for certain CPU-heavy tests. - - subprocess Run all tests for the subprocess module. - - urlfetch - It is okay to download files required on testing. - - gui - Run tests that require a running GUI. - - tzdata - Run tests that require timezone data. - -To enable all resources except one, use '-uall,-<resource>'. For -example, to run all the tests except for the gui tests, give the -option '-uall,-gui'. - ---matchfile filters tests using a text file, one pattern per line. -Pattern examples: - -- test method: test_stat_attributes -- test class: FileTests -- test identifier: test_os.FileTests.test_stat_attributes -""" - - -ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network', - 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui') - -# Other resources excluded from --use=all: -# -# - extralagefile (ex: test_zipfile64): really too slow to be enabled -# "by default" -# - tzdata: while needed to validate fully test_datetime, it makes -# test_datetime too slow (15-20 min on some buildbots) and so is disabled by -# default (see bpo-30822). -RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata') - -class _ArgParser(argparse.ArgumentParser): - - def error(self, message): - super().error(message + "\nPass -h or --help for complete help.") - - -def _create_parser(): - # Set prog to prevent the uninformative "__main__.py" from displaying in - # error messages when using "python -m test ...". - parser = _ArgParser(prog='regrtest.py', - usage=USAGE, - description=DESCRIPTION, - epilog=EPILOG, - add_help=False, - formatter_class=argparse.RawDescriptionHelpFormatter) - - # Arguments with this clause added to its help are described further in - # the epilog's "Additional option details" section. - more_details = ' See the section at bottom for more details.' - - group = parser.add_argument_group('General options') - # We add help explicitly to control what argument group it renders under. - group.add_argument('-h', '--help', action='help', - help='show this help message and exit') - group.add_argument('--timeout', metavar='TIMEOUT', type=float, - help='dump the traceback and exit if a test takes ' - 'more than TIMEOUT seconds; disabled if TIMEOUT ' - 'is negative or equals to zero') - group.add_argument('--wait', action='store_true', - help='wait for user input, e.g., allow a debugger ' - 'to be attached') - group.add_argument('--worker-args', metavar='ARGS') - group.add_argument('-S', '--start', metavar='START', - help='the name of the test at which to start.' + - more_details) - - group = parser.add_argument_group('Verbosity') - group.add_argument('-v', '--verbose', action='count', - help='run tests in verbose mode with output to stdout') - group.add_argument('-w', '--verbose2', action='store_true', - help='re-run failed tests in verbose mode') - group.add_argument('-W', '--verbose3', action='store_true', - help='display test output on failure') - group.add_argument('-q', '--quiet', action='store_true', - help='no output unless one or more tests fail') - group.add_argument('-o', '--slowest', action='store_true', dest='print_slow', - help='print the slowest 10 tests') - group.add_argument('--header', action='store_true', - help='print header with interpreter info') - - group = parser.add_argument_group('Selecting tests') - group.add_argument('-r', '--randomize', action='store_true', - help='randomize test execution order.' + more_details) - group.add_argument('--randseed', metavar='SEED', - dest='random_seed', type=int, - help='pass a random seed to reproduce a previous ' - 'random run') - group.add_argument('-f', '--fromfile', metavar='FILE', - help='read names of tests to run from a file.' + - more_details) - group.add_argument('-x', '--exclude', action='store_true', - help='arguments are tests to *exclude*') - group.add_argument('-s', '--single', action='store_true', - help='single step through a set of tests.' + - more_details) - group.add_argument('-m', '--match', metavar='PAT', - dest='match_tests', action='append', - help='match test cases and methods with glob pattern PAT') - group.add_argument('-i', '--ignore', metavar='PAT', - dest='ignore_tests', action='append', - help='ignore test cases and methods with glob pattern PAT') - group.add_argument('--matchfile', metavar='FILENAME', - dest='match_filename', - help='similar to --match but get patterns from a ' - 'text file, one pattern per line') - group.add_argument('--ignorefile', metavar='FILENAME', - dest='ignore_filename', - help='similar to --matchfile but it receives patterns ' - 'from text file to ignore') - group.add_argument('-G', '--failfast', action='store_true', - help='fail as soon as a test fails (only with -v or -W)') - group.add_argument('-u', '--use', metavar='RES1,RES2,...', - action='append', type=resources_list, - help='specify which special resource intensive tests ' - 'to run.' + more_details) - group.add_argument('-M', '--memlimit', metavar='LIMIT', - help='run very large memory-consuming tests.' + - more_details) - group.add_argument('--testdir', metavar='DIR', - type=relative_filename, - help='execute test files in the specified directory ' - '(instead of the Python stdlib test suite)') - - group = parser.add_argument_group('Special runs') - group.add_argument('-l', '--findleaks', action='store_const', const=2, - default=1, - help='deprecated alias to --fail-env-changed') - group.add_argument('-L', '--runleaks', action='store_true', - help='run the leaks(1) command just before exit.' + - more_details) - group.add_argument('-R', '--huntrleaks', metavar='RUNCOUNTS', - type=huntrleaks, - help='search for reference leaks (needs debug build, ' - 'very slow).' + more_details) - group.add_argument('-j', '--multiprocess', metavar='PROCESSES', - dest='use_mp', type=int, - help='run PROCESSES processes at once') - group.add_argument('-T', '--coverage', action='store_true', - dest='trace', - help='turn on code coverage tracing using the trace ' - 'module') - group.add_argument('-D', '--coverdir', metavar='DIR', - type=relative_filename, - help='directory where coverage files are put') - group.add_argument('-N', '--nocoverdir', - action='store_const', const=None, dest='coverdir', - help='put coverage files alongside modules') - group.add_argument('-t', '--threshold', metavar='THRESHOLD', - type=int, - help='call gc.set_threshold(THRESHOLD)') - group.add_argument('-n', '--nowindows', action='store_true', - help='suppress error message boxes on Windows') - group.add_argument('-F', '--forever', action='store_true', - help='run the specified tests in a loop, until an ' - 'error happens; imply --failfast') - group.add_argument('--list-tests', action='store_true', - help="only write the name of tests that will be run, " - "don't execute them") - group.add_argument('--list-cases', action='store_true', - help='only write the name of test cases that will be run' - ' , don\'t execute them') - group.add_argument('-P', '--pgo', dest='pgo', action='store_true', - help='enable Profile Guided Optimization (PGO) training') - group.add_argument('--pgo-extended', action='store_true', - help='enable extended PGO training (slower training)') - group.add_argument('--fail-env-changed', action='store_true', - help='if a test file alters the environment, mark ' - 'the test as failed') - - group.add_argument('--junit-xml', dest='xmlpath', metavar='FILENAME', - help='writes JUnit-style XML results to the specified ' - 'file') - group.add_argument('--tempdir', metavar='PATH', - help='override the working directory for the test run') - group.add_argument('--cleanup', action='store_true', - help='remove old test_python_* directories') - return parser - - -def relative_filename(string): - # CWD is replaced with a temporary dir before calling main(), so we - # join it with the saved CWD so it ends up where the user expects. - return os.path.join(support.SAVEDCWD, string) - - -def huntrleaks(string): - args = string.split(':') - if len(args) not in (2, 3): - raise argparse.ArgumentTypeError( - 'needs 2 or 3 colon-separated arguments') - nwarmup = int(args[0]) if args[0] else 5 - ntracked = int(args[1]) if args[1] else 4 - fname = args[2] if len(args) > 2 and args[2] else 'reflog.txt' - return nwarmup, ntracked, fname - - -def resources_list(string): - u = [x.lower() for x in string.split(',')] - for r in u: - if r == 'all' or r == 'none': - continue - if r[0] == '-': - r = r[1:] - if r not in RESOURCE_NAMES: - raise argparse.ArgumentTypeError('invalid resource: ' + r) - return u - - -def _parse_args(args, **kwargs): - # Defaults - ns = argparse.Namespace(testdir=None, verbose=0, quiet=False, - exclude=False, single=False, randomize=False, fromfile=None, - findleaks=1, use_resources=None, trace=False, coverdir='coverage', - runleaks=False, huntrleaks=False, verbose2=False, print_slow=False, - random_seed=None, use_mp=None, verbose3=False, forever=False, - header=False, failfast=False, match_tests=None, ignore_tests=None, - pgo=False) - for k, v in kwargs.items(): - if not hasattr(ns, k): - raise TypeError('%r is an invalid keyword argument ' - 'for this function' % k) - setattr(ns, k, v) - if ns.use_resources is None: - ns.use_resources = [] - - parser = _create_parser() - # Issue #14191: argparse doesn't support "intermixed" positional and - # optional arguments. Use parse_known_args() as workaround. - ns.args = parser.parse_known_args(args=args, namespace=ns)[1] - for arg in ns.args: - if arg.startswith('-'): - parser.error("unrecognized arguments: %s" % arg) - sys.exit(1) - - if ns.findleaks > 1: - # --findleaks implies --fail-env-changed - ns.fail_env_changed = True - if ns.single and ns.fromfile: - parser.error("-s and -f don't go together!") - if ns.use_mp is not None and ns.trace: - parser.error("-T and -j don't go together!") - if ns.failfast and not (ns.verbose or ns.verbose3): - parser.error("-G/--failfast needs either -v or -W") - if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3): - parser.error("--pgo/-v don't go together!") - if ns.pgo_extended: - ns.pgo = True # pgo_extended implies pgo - - if ns.nowindows: - print("Warning: the --nowindows (-n) option is deprecated. " - "Use -vv to display assertions in stderr.", file=sys.stderr) - - if ns.quiet: - ns.verbose = 0 - if ns.timeout is not None: - if ns.timeout <= 0: - ns.timeout = None - if ns.use_mp is not None: - if ns.use_mp <= 0: - # Use all cores + extras for tests that like to sleep - ns.use_mp = 2 + (os.cpu_count() or 1) - if ns.use: - for a in ns.use: - for r in a: - if r == 'all': - ns.use_resources[:] = ALL_RESOURCES - continue - if r == 'none': - del ns.use_resources[:] - continue - remove = False - if r[0] == '-': - remove = True - r = r[1:] - if remove: - if r in ns.use_resources: - ns.use_resources.remove(r) - elif r not in ns.use_resources: - ns.use_resources.append(r) - if ns.random_seed is not None: - ns.randomize = True - if ns.verbose: - ns.header = True - if ns.huntrleaks and ns.verbose3: - ns.verbose3 = False - print("WARNING: Disable --verbose3 because it's incompatible with " - "--huntrleaks: see http://bugs.python.org/issue27103", - file=sys.stderr) - if ns.match_filename: - if ns.match_tests is None: - ns.match_tests = [] - with open(ns.match_filename) as fp: - for line in fp: - ns.match_tests.append(line.strip()) - if ns.ignore_filename: - if ns.ignore_tests is None: - ns.ignore_tests = [] - with open(ns.ignore_filename) as fp: - for line in fp: - ns.ignore_tests.append(line.strip()) - if ns.forever: - # --forever implies --failfast - ns.failfast = True - - return ns diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py deleted file mode 100644 index 1de51b7..0000000 --- a/Lib/test/libregrtest/main.py +++ /dev/null @@ -1,712 +0,0 @@ -import datetime -import faulthandler -import locale -import os -import platform -import random -import re -import sys -import sysconfig -import tempfile -import time -import unittest -from test.libregrtest.cmdline import _parse_args -from test.libregrtest.runtest import ( - findtests, runtest, get_abs_module, - STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED, - INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT, - PROGRESS_MIN_TIME, format_test_result, is_failed) -from test.libregrtest.setup import setup_tests -from test.libregrtest.pgo import setup_pgo_tests -from test.libregrtest.utils import removepy, count, format_duration, printlist -from test import support - - -# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). -# Used to protect against threading._shutdown() hang. -# Must be smaller than buildbot "1200 seconds without output" limit. -EXIT_TIMEOUT = 120.0 - - -class Regrtest: - """Execute a test suite. - - This also parses command-line options and modifies its behavior - accordingly. - - tests -- a list of strings containing test names (optional) - testdir -- the directory in which to look for tests (optional) - - Users other than the Python test suite will certainly want to - specify testdir; if it's omitted, the directory containing the - Python test suite is searched for. - - If the tests argument is omitted, the tests listed on the - command-line will be used. If that's empty, too, then all *.py - files beginning with test_ will be used. - - The other default arguments (verbose, quiet, exclude, - single, randomize, findleaks, use_resources, trace, coverdir, - print_slow, and random_seed) allow programmers calling main() - directly to set the values that would normally be set by flags - on the command line. - """ - def __init__(self): - # Namespace of command line options - self.ns = None - - # tests - self.tests = [] - self.selected = [] - - # test results - self.good = [] - self.bad = [] - self.skipped = [] - self.resource_denieds = [] - self.environment_changed = [] - self.run_no_tests = [] - self.rerun = [] - self.first_result = None - self.interrupted = False - - # used by --slow - self.test_times = [] - - # used by --coverage, trace.Trace instance - self.tracer = None - - # used to display the progress bar "[ 3/100]" - self.start_time = time.monotonic() - self.test_count = '' - self.test_count_width = 1 - - # used by --single - self.next_single_test = None - self.next_single_filename = None - - # used by --junit-xml - self.testsuite_xml = None - - # misc - self.win_load_tracker = None - self.tmp_dir = None - self.worker_test_name = None - - def get_executed(self): - return (set(self.good) | set(self.bad) | set(self.skipped) - | set(self.resource_denieds) | set(self.environment_changed) - | set(self.run_no_tests)) - - def accumulate_result(self, result, rerun=False): - test_name = result.test_name - ok = result.result - - if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun: - self.test_times.append((result.test_time, test_name)) - - if ok == PASSED: - self.good.append(test_name) - elif ok in (FAILED, CHILD_ERROR): - if not rerun: - self.bad.append(test_name) - elif ok == ENV_CHANGED: - self.environment_changed.append(test_name) - elif ok == SKIPPED: - self.skipped.append(test_name) - elif ok == RESOURCE_DENIED: - self.skipped.append(test_name) - self.resource_denieds.append(test_name) - elif ok == TEST_DID_NOT_RUN: - self.run_no_tests.append(test_name) - elif ok == INTERRUPTED: - self.interrupted = True - elif ok == TIMEOUT: - self.bad.append(test_name) - else: - raise ValueError("invalid test result: %r" % ok) - - if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}: - self.bad.remove(test_name) - - xml_data = result.xml_data - if xml_data: - import xml.etree.ElementTree as ET - for e in xml_data: - try: - self.testsuite_xml.append(ET.fromstring(e)) - except ET.ParseError: - print(xml_data, file=sys.__stderr__) - raise - - def log(self, line=''): - empty = not line - - # add the system load prefix: "load avg: 1.80 " - load_avg = self.getloadavg() - if load_avg is not None: - line = f"load avg: {load_avg:.2f} {line}" - - # add the timestamp prefix: "0:01:05 " - test_time = time.monotonic() - self.start_time - test_time = datetime.timedelta(seconds=int(test_time)) - line = f"{test_time} {line}" - - if empty: - line = line[:-1] - - print(line, flush=True) - - def display_progress(self, test_index, text): - if self.ns.quiet: - return - - # "[ 51/405/1] test_tcl passed" - line = f"{test_index:{self.test_count_width}}{self.test_count}" - fails = len(self.bad) + len(self.environment_changed) - if fails and not self.ns.pgo: - line = f"{line}/{fails}" - self.log(f"[{line}] {text}") - - def parse_args(self, kwargs): - ns = _parse_args(sys.argv[1:], **kwargs) - - if ns.xmlpath: - support.junit_xml_list = self.testsuite_xml = [] - - worker_args = ns.worker_args - if worker_args is not None: - from test.libregrtest.runtest_mp import parse_worker_args - ns, test_name = parse_worker_args(ns.worker_args) - ns.worker_args = worker_args - self.worker_test_name = test_name - - # Strip .py extensions. - removepy(ns.args) - - if ns.huntrleaks: - warmup, repetitions, _ = ns.huntrleaks - if warmup < 1 or repetitions < 1: - msg = ("Invalid values for the --huntrleaks/-R parameters. The " - "number of warmups and repetitions must be at least 1 " - "each (1:1).") - print(msg, file=sys.stderr, flush=True) - sys.exit(2) - - if ns.tempdir: - ns.tempdir = os.path.expanduser(ns.tempdir) - - self.ns = ns - - def find_tests(self, tests): - self.tests = tests - - if self.ns.single: - self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') - try: - with open(self.next_single_filename, 'r') as fp: - next_test = fp.read().strip() - self.tests = [next_test] - except OSError: - pass - - if self.ns.fromfile: - self.tests = [] - # regex to match 'test_builtin' in line: - # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' - regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') - with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp: - for line in fp: - line = line.split('#', 1)[0] - line = line.strip() - match = regex.search(line) - if match is not None: - self.tests.append(match.group()) - - removepy(self.tests) - - if self.ns.pgo: - # add default PGO tests if no tests are specified - setup_pgo_tests(self.ns) - - stdtests = STDTESTS[:] - nottests = NOTTESTS.copy() - if self.ns.exclude: - for arg in self.ns.args: - if arg in stdtests: - stdtests.remove(arg) - nottests.add(arg) - self.ns.args = [] - - # if testdir is set, then we are not running the python tests suite, so - # don't add default tests to be executed or skipped (pass empty values) - if self.ns.testdir: - alltests = findtests(self.ns.testdir, list(), set()) - else: - alltests = findtests(self.ns.testdir, stdtests, nottests) - - if not self.ns.fromfile: - self.selected = self.tests or self.ns.args or alltests - else: - self.selected = self.tests - if self.ns.single: - self.selected = self.selected[:1] - try: - pos = alltests.index(self.selected[0]) - self.next_single_test = alltests[pos + 1] - except IndexError: - pass - - # Remove all the selected tests that precede start if it's set. - if self.ns.start: - try: - del self.selected[:self.selected.index(self.ns.start)] - except ValueError: - print("Couldn't find starting test (%s), using all tests" - % self.ns.start, file=sys.stderr) - - if self.ns.randomize: - if self.ns.random_seed is None: - self.ns.random_seed = random.randrange(10000000) - random.seed(self.ns.random_seed) - random.shuffle(self.selected) - - def list_tests(self): - for name in self.selected: - print(name) - - def _list_cases(self, suite): - for test in suite: - if isinstance(test, unittest.loader._FailedTest): - continue - if isinstance(test, unittest.TestSuite): - self._list_cases(test) - elif isinstance(test, unittest.TestCase): - if support.match_test(test): - print(test.id()) - - def list_cases(self): - support.verbose = False - support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests) - - for test_name in self.selected: - abstest = get_abs_module(self.ns, test_name) - try: - suite = unittest.defaultTestLoader.loadTestsFromName(abstest) - self._list_cases(suite) - except unittest.SkipTest: - self.skipped.append(test_name) - - if self.skipped: - print(file=sys.stderr) - print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr) - printlist(self.skipped, file=sys.stderr) - - def rerun_failed_tests(self): - self.ns.verbose = True - self.ns.failfast = False - self.ns.verbose3 = False - - self.first_result = self.get_tests_result() - - self.log() - self.log("Re-running failed tests in verbose mode") - self.rerun = self.bad[:] - for test_name in self.rerun: - self.log(f"Re-running {test_name} in verbose mode") - self.ns.verbose = True - result = runtest(self.ns, test_name) - - self.accumulate_result(result, rerun=True) - - if result.result == INTERRUPTED: - break - - if self.bad: - print(count(len(self.bad), 'test'), "failed again:") - printlist(self.bad) - - self.display_result() - - def display_result(self): - # If running the test suite for PGO then no one cares about results. - if self.ns.pgo: - return - - print() - print("== Tests result: %s ==" % self.get_tests_result()) - - if self.interrupted: - print("Test suite interrupted by signal SIGINT.") - - omitted = set(self.selected) - self.get_executed() - if omitted: - print() - print(count(len(omitted), "test"), "omitted:") - printlist(omitted) - - if self.good and not self.ns.quiet: - print() - if (not self.bad - and not self.skipped - and not self.interrupted - and len(self.good) > 1): - print("All", end=' ') - print(count(len(self.good), "test"), "OK.") - - if self.ns.print_slow: - self.test_times.sort(reverse=True) - print() - print("10 slowest tests:") - for test_time, test in self.test_times[:10]: - print("- %s: %s" % (test, format_duration(test_time))) - - if self.bad: - print() - print(count(len(self.bad), "test"), "failed:") - printlist(self.bad) - - if self.environment_changed: - print() - print("{} altered the execution environment:".format( - count(len(self.environment_changed), "test"))) - printlist(self.environment_changed) - - if self.skipped and not self.ns.quiet: - print() - print(count(len(self.skipped), "test"), "skipped:") - printlist(self.skipped) - - if self.rerun: - print() - print("%s:" % count(len(self.rerun), "re-run test")) - printlist(self.rerun) - - if self.run_no_tests: - print() - print(count(len(self.run_no_tests), "test"), "run no tests:") - printlist(self.run_no_tests) - - def run_tests_sequential(self): - if self.ns.trace: - import trace - self.tracer = trace.Trace(trace=False, count=True) - - save_modules = sys.modules.keys() - - self.log("Run tests sequentially") - - previous_test = None - for test_index, test_name in enumerate(self.tests, 1): - start_time = time.monotonic() - - text = test_name - if previous_test: - text = '%s -- %s' % (text, previous_test) - self.display_progress(test_index, text) - - if self.tracer: - # If we're tracing code coverage, then we don't exit with status - # if on a false return value from main. - cmd = ('result = runtest(self.ns, test_name); ' - 'self.accumulate_result(result)') - ns = dict(locals()) - self.tracer.runctx(cmd, globals=globals(), locals=ns) - result = ns['result'] - else: - result = runtest(self.ns, test_name) - self.accumulate_result(result) - - if result.result == INTERRUPTED: - break - - previous_test = format_test_result(result) - test_time = time.monotonic() - start_time - if test_time >= PROGRESS_MIN_TIME: - previous_test = "%s in %s" % (previous_test, format_duration(test_time)) - elif result.result == PASSED: - # be quiet: say nothing if the test passed shortly - previous_test = None - - # Unload the newly imported modules (best effort finalization) - for module in sys.modules.keys(): - if module not in save_modules and module.startswith("test."): - support.unload(module) - - if self.ns.failfast and is_failed(result, self.ns): - break - - if previous_test: - print(previous_test) - - def _test_forever(self, tests): - while True: - for test_name in tests: - yield test_name - if self.bad: - return - if self.ns.fail_env_changed and self.environment_changed: - return - - def display_header(self): - # Print basic platform information - print("==", platform.python_implementation(), *sys.version.split()) - print("==", platform.platform(aliased=True), - "%s-endian" % sys.byteorder) - print("== cwd:", os.getcwd()) - cpu_count = os.cpu_count() - if cpu_count: - print("== CPU count:", cpu_count) - print("== encodings: locale=%s, FS=%s" - % (locale.getpreferredencoding(False), - sys.getfilesystemencoding())) - - def get_tests_result(self): - result = [] - if self.bad: - result.append("FAILURE") - elif self.ns.fail_env_changed and self.environment_changed: - result.append("ENV CHANGED") - elif not any((self.good, self.bad, self.skipped, self.interrupted, - self.environment_changed)): - result.append("NO TEST RUN") - - if self.interrupted: - result.append("INTERRUPTED") - - if not result: - result.append("SUCCESS") - - result = ', '.join(result) - if self.first_result: - result = '%s then %s' % (self.first_result, result) - return result - - def run_tests(self): - # For a partial run, we do not need to clutter the output. - if (self.ns.header - or not(self.ns.pgo or self.ns.quiet or self.ns.single - or self.tests or self.ns.args)): - self.display_header() - - if self.ns.huntrleaks: - warmup, repetitions, _ = self.ns.huntrleaks - if warmup < 3: - msg = ("WARNING: Running tests with --huntrleaks/-R and less than " - "3 warmup repetitions can give false positives!") - print(msg, file=sys.stdout, flush=True) - - if self.ns.randomize: - print("Using random seed", self.ns.random_seed) - - if self.ns.forever: - self.tests = self._test_forever(list(self.selected)) - self.test_count = '' - self.test_count_width = 3 - else: - self.tests = iter(self.selected) - self.test_count = '/{}'.format(len(self.selected)) - self.test_count_width = len(self.test_count) - 1 - - if self.ns.use_mp: - from test.libregrtest.runtest_mp import run_tests_multiprocess - run_tests_multiprocess(self) - else: - self.run_tests_sequential() - - def finalize(self): - if self.next_single_filename: - if self.next_single_test: - with open(self.next_single_filename, 'w') as fp: - fp.write(self.next_single_test + '\n') - else: - os.unlink(self.next_single_filename) - - if self.tracer: - r = self.tracer.results() - r.write_results(show_missing=True, summary=True, - coverdir=self.ns.coverdir) - - print() - duration = time.monotonic() - self.start_time - print("Total duration: %s" % format_duration(duration)) - print("Tests result: %s" % self.get_tests_result()) - - if self.ns.runleaks: - os.system("leaks %d" % os.getpid()) - - def save_xml_result(self): - if not self.ns.xmlpath and not self.testsuite_xml: - return - - import xml.etree.ElementTree as ET - root = ET.Element("testsuites") - - # Manually count the totals for the overall summary - totals = {'tests': 0, 'errors': 0, 'failures': 0} - for suite in self.testsuite_xml: - root.append(suite) - for k in totals: - try: - totals[k] += int(suite.get(k, 0)) - except ValueError: - pass - - for k, v in totals.items(): - root.set(k, str(v)) - - xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath) - with open(xmlpath, 'wb') as f: - for s in ET.tostringlist(root): - f.write(s) - - def set_temp_dir(self): - if self.ns.tempdir: - self.tmp_dir = self.ns.tempdir - - if not self.tmp_dir: - # When tests are run from the Python build directory, it is best practice - # to keep the test files in a subfolder. This eases the cleanup of leftover - # files using the "make distclean" command. - if sysconfig.is_python_build(): - self.tmp_dir = sysconfig.get_config_var('abs_builddir') - if self.tmp_dir is None: - # bpo-30284: On Windows, only srcdir is available. Using - # abs_builddir mostly matters on UNIX when building Python - # out of the source tree, especially when the source tree - # is read only. - self.tmp_dir = sysconfig.get_config_var('srcdir') - self.tmp_dir = os.path.join(self.tmp_dir, 'build') - else: - self.tmp_dir = tempfile.gettempdir() - - self.tmp_dir = os.path.abspath(self.tmp_dir) - - def create_temp_dir(self): - os.makedirs(self.tmp_dir, exist_ok=True) - - # Define a writable temp dir that will be used as cwd while running - # the tests. The name of the dir includes the pid to allow parallel - # testing (see the -j option). - pid = os.getpid() - if self.worker_test_name is not None: - test_cwd = 'test_python_worker_{}'.format(pid) - else: - test_cwd = 'test_python_{}'.format(pid) - test_cwd = os.path.join(self.tmp_dir, test_cwd) - return test_cwd - - def cleanup(self): - import glob - - path = os.path.join(self.tmp_dir, 'test_python_*') - print("Cleanup %s directory" % self.tmp_dir) - for name in glob.glob(path): - if os.path.isdir(name): - print("Remove directory: %s" % name) - support.rmtree(name) - else: - print("Remove file: %s" % name) - support.unlink(name) - - def main(self, tests=None, **kwargs): - self.parse_args(kwargs) - - self.set_temp_dir() - - if self.ns.cleanup: - self.cleanup() - sys.exit(0) - - test_cwd = self.create_temp_dir() - - try: - # Run the tests in a context manager that temporarily changes the CWD - # to a temporary and writable directory. If it's not possible to - # create or change the CWD, the original CWD will be used. - # The original CWD is available from support.SAVEDCWD. - with support.temp_cwd(test_cwd, quiet=True): - # When using multiprocessing, worker processes will use test_cwd - # as their parent temporary directory. So when the main process - # exit, it removes also subdirectories of worker processes. - self.ns.tempdir = test_cwd - - self._main(tests, kwargs) - except SystemExit as exc: - # bpo-38203: Python can hang at exit in Py_Finalize(), especially - # on threading._shutdown() call: put a timeout - faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) - - sys.exit(exc.code) - - def getloadavg(self): - if self.win_load_tracker is not None: - return self.win_load_tracker.getloadavg() - - if hasattr(os, 'getloadavg'): - return os.getloadavg()[0] - - return None - - def _main(self, tests, kwargs): - if self.worker_test_name is not None: - from test.libregrtest.runtest_mp import run_tests_worker - run_tests_worker(self.ns, self.worker_test_name) - - if self.ns.wait: - input("Press any key to continue...") - - support.PGO = self.ns.pgo - support.PGO_EXTENDED = self.ns.pgo_extended - - setup_tests(self.ns) - - self.find_tests(tests) - - if self.ns.list_tests: - self.list_tests() - sys.exit(0) - - if self.ns.list_cases: - self.list_cases() - sys.exit(0) - - # If we're on windows and this is the parent runner (not a worker), - # track the load average. - if sys.platform == 'win32' and self.worker_test_name is None: - from test.libregrtest.win_utils import WindowsLoadTracker - - try: - self.win_load_tracker = WindowsLoadTracker() - except FileNotFoundError as error: - # Windows IoT Core and Windows Nano Server do not provide - # typeperf.exe for x64, x86 or ARM - print(f'Failed to create WindowsLoadTracker: {error}') - - try: - self.run_tests() - self.display_result() - - if self.ns.verbose2 and self.bad: - self.rerun_failed_tests() - finally: - if self.win_load_tracker is not None: - self.win_load_tracker.close() - self.win_load_tracker = None - - self.finalize() - - self.save_xml_result() - - if self.bad: - sys.exit(2) - if self.interrupted: - sys.exit(130) - if self.ns.fail_env_changed and self.environment_changed: - sys.exit(3) - sys.exit(0) - - -def main(tests=None, **kwargs): - """Run the Python suite.""" - Regrtest().main(tests=tests, **kwargs) diff --git a/Lib/test/libregrtest/pgo.py b/Lib/test/libregrtest/pgo.py deleted file mode 100644 index 379ff05..0000000 --- a/Lib/test/libregrtest/pgo.py +++ /dev/null @@ -1,55 +0,0 @@ -# Set of tests run by default if --pgo is specified. The tests below were -# chosen based on the following criteria: either they exercise a commonly used -# C extension module or type, or they run some relatively typical Python code. -# Long running tests should be avoided because the PGO instrumented executable -# runs slowly. -PGO_TESTS = [ - 'test_array', - 'test_base64', - 'test_binascii', - 'test_binop', - 'test_bisect', - 'test_bytes', - 'test_bz2', - 'test_cmath', - 'test_codecs', - 'test_collections', - 'test_complex', - 'test_dataclasses', - 'test_datetime', - 'test_decimal', - 'test_difflib', - 'test_embed', - 'test_float', - 'test_fstring', - 'test_functools', - 'test_generators', - 'test_hashlib', - 'test_heapq', - 'test_int', - 'test_itertools', - 'test_json', - 'test_long', - 'test_lzma', - 'test_math', - 'test_memoryview', - 'test_operator', - 'test_ordered_dict', - 'test_pickle', - 'test_pprint', - 'test_re', - 'test_set', - 'test_sqlite', - 'test_statistics', - 'test_struct', - 'test_tabnanny', - 'test_time', - 'test_unicode', - 'test_xml_etree', - 'test_xml_etree_c', -] - -def setup_pgo_tests(ns): - if not ns.args and not ns.pgo_extended: - # run default set of tests for PGO training - ns.args = PGO_TESTS[:] diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py deleted file mode 100644 index 8d22123..0000000 --- a/Lib/test/libregrtest/refleak.py +++ /dev/null @@ -1,286 +0,0 @@ -import os -import re -import sys -import warnings -from inspect import isabstract -from test import support -try: - from _abc import _get_dump -except ImportError: - import weakref - - def _get_dump(cls): - # Reimplement _get_dump() for pure-Python implementation of - # the abc module (Lib/_py_abc.py) - registry_weakrefs = set(weakref.ref(obj) for obj in cls._abc_registry) - return (registry_weakrefs, cls._abc_cache, - cls._abc_negative_cache, cls._abc_negative_cache_version) - - -def dash_R(ns, test_name, test_func): - """Run a test multiple times, looking for reference leaks. - - Returns: - False if the test didn't leak references; True if we detected refleaks. - """ - # This code is hackish and inelegant, but it seems to do the job. - import copyreg - import collections.abc - - if not hasattr(sys, 'gettotalrefcount'): - raise Exception("Tracking reference leaks requires a debug build " - "of Python") - - # Avoid false positives due to various caches - # filling slowly with random data: - warm_caches() - - # Save current values for dash_R_cleanup() to restore. - fs = warnings.filters[:] - ps = copyreg.dispatch_table.copy() - pic = sys.path_importer_cache.copy() - try: - import zipimport - except ImportError: - zdc = None # Run unmodified on platforms without zipimport support - else: - zdc = zipimport._zip_directory_cache.copy() - abcs = {} - for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: - if not isabstract(abc): - continue - for obj in abc.__subclasses__() + [abc]: - abcs[obj] = _get_dump(obj)[0] - - # bpo-31217: Integer pool to get a single integer object for the same - # value. The pool is used to prevent false alarm when checking for memory - # block leaks. Fill the pool with values in -1000..1000 which are the most - # common (reference, memory block, file descriptor) differences. - int_pool = {value: value for value in range(-1000, 1000)} - def get_pooled_int(value): - return int_pool.setdefault(value, value) - - nwarmup, ntracked, fname = ns.huntrleaks - fname = os.path.join(support.SAVEDCWD, fname) - repcount = nwarmup + ntracked - - # Pre-allocate to ensure that the loop doesn't allocate anything new - rep_range = list(range(repcount)) - rc_deltas = [0] * repcount - alloc_deltas = [0] * repcount - fd_deltas = [0] * repcount - getallocatedblocks = sys.getallocatedblocks - gettotalrefcount = sys.gettotalrefcount - fd_count = support.fd_count - - # initialize variables to make pyflakes quiet - rc_before = alloc_before = fd_before = 0 - - if not ns.quiet: - print("beginning", repcount, "repetitions", file=sys.stderr) - print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr, - flush=True) - - dash_R_cleanup(fs, ps, pic, zdc, abcs) - - for i in rep_range: - test_func() - dash_R_cleanup(fs, ps, pic, zdc, abcs) - - # dash_R_cleanup() ends with collecting cyclic trash: - # read memory statistics immediately after. - alloc_after = getallocatedblocks() - rc_after = gettotalrefcount() - fd_after = fd_count() - - if not ns.quiet: - print('.', end='', file=sys.stderr, flush=True) - - rc_deltas[i] = get_pooled_int(rc_after - rc_before) - alloc_deltas[i] = get_pooled_int(alloc_after - alloc_before) - fd_deltas[i] = get_pooled_int(fd_after - fd_before) - - alloc_before = alloc_after - rc_before = rc_after - fd_before = fd_after - - if not ns.quiet: - print(file=sys.stderr) - - # These checkers return False on success, True on failure - def check_rc_deltas(deltas): - # Checker for reference counters and memomry blocks. - # - # bpo-30776: Try to ignore false positives: - # - # [3, 0, 0] - # [0, 1, 0] - # [8, -8, 1] - # - # Expected leaks: - # - # [5, 5, 6] - # [10, 1, 1] - return all(delta >= 1 for delta in deltas) - - def check_fd_deltas(deltas): - return any(deltas) - - failed = False - for deltas, item_name, checker in [ - (rc_deltas, 'references', check_rc_deltas), - (alloc_deltas, 'memory blocks', check_rc_deltas), - (fd_deltas, 'file descriptors', check_fd_deltas) - ]: - # ignore warmup runs - deltas = deltas[nwarmup:] - if checker(deltas): - msg = '%s leaked %s %s, sum=%s' % ( - test_name, deltas, item_name, sum(deltas)) - print(msg, file=sys.stderr, flush=True) - with open(fname, "a") as refrep: - print(msg, file=refrep) - refrep.flush() - failed = True - return failed - - -def dash_R_cleanup(fs, ps, pic, zdc, abcs): - import copyreg - import collections.abc - - # Restore some original values. - warnings.filters[:] = fs - copyreg.dispatch_table.clear() - copyreg.dispatch_table.update(ps) - sys.path_importer_cache.clear() - sys.path_importer_cache.update(pic) - try: - import zipimport - except ImportError: - pass # Run unmodified on platforms without zipimport support - else: - zipimport._zip_directory_cache.clear() - zipimport._zip_directory_cache.update(zdc) - - # clear type cache - sys._clear_type_cache() - - # Clear ABC registries, restoring previously saved ABC registries. - abs_classes = [getattr(collections.abc, a) for a in collections.abc.__all__] - abs_classes = filter(isabstract, abs_classes) - for abc in abs_classes: - for obj in abc.__subclasses__() + [abc]: - for ref in abcs.get(obj, set()): - if ref() is not None: - obj.register(ref()) - obj._abc_caches_clear() - - clear_caches() - - -def clear_caches(): - # Clear the warnings registry, so they can be displayed again - for mod in sys.modules.values(): - if hasattr(mod, '__warningregistry__'): - del mod.__warningregistry__ - - # Flush standard output, so that buffered data is sent to the OS and - # associated Python objects are reclaimed. - for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__): - if stream is not None: - stream.flush() - - # Clear assorted module caches. - # Don't worry about resetting the cache if the module is not loaded - try: - distutils_dir_util = sys.modules['distutils.dir_util'] - except KeyError: - pass - else: - distutils_dir_util._path_created.clear() - re.purge() - - try: - _strptime = sys.modules['_strptime'] - except KeyError: - pass - else: - _strptime._regex_cache.clear() - - try: - urllib_parse = sys.modules['urllib.parse'] - except KeyError: - pass - else: - urllib_parse.clear_cache() - - try: - urllib_request = sys.modules['urllib.request'] - except KeyError: - pass - else: - urllib_request.urlcleanup() - - try: - linecache = sys.modules['linecache'] - except KeyError: - pass - else: - linecache.clearcache() - - try: - mimetypes = sys.modules['mimetypes'] - except KeyError: - pass - else: - mimetypes._default_mime_types() - - try: - filecmp = sys.modules['filecmp'] - except KeyError: - pass - else: - filecmp._cache.clear() - - try: - struct = sys.modules['struct'] - except KeyError: - pass - else: - struct._clearcache() - - try: - doctest = sys.modules['doctest'] - except KeyError: - pass - else: - doctest.master = None - - try: - ctypes = sys.modules['ctypes'] - except KeyError: - pass - else: - ctypes._reset_cache() - - try: - typing = sys.modules['typing'] - except KeyError: - pass - else: - for f in typing._cleanups: - f() - - support.gc_collect() - - -def warm_caches(): - # char cache - s = bytes(range(256)) - for i in range(256): - s[i:i+1] - # unicode cache - [chr(i) for i in range(256)] - # int cache - list(range(-5, 257)) diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py deleted file mode 100644 index 558f209..0000000 --- a/Lib/test/libregrtest/runtest.py +++ /dev/null @@ -1,340 +0,0 @@ -import collections -import faulthandler -import functools -import gc -import importlib -import io -import os -import sys -import time -import traceback -import unittest - -from test import support -from test.libregrtest.refleak import dash_R, clear_caches -from test.libregrtest.save_env import saved_test_environment -from test.libregrtest.utils import format_duration, print_warning - - -# Test result constants. -PASSED = 1 -FAILED = 0 -ENV_CHANGED = -1 -SKIPPED = -2 -RESOURCE_DENIED = -3 -INTERRUPTED = -4 -CHILD_ERROR = -5 # error in a child process -TEST_DID_NOT_RUN = -6 -TIMEOUT = -7 - -_FORMAT_TEST_RESULT = { - PASSED: '%s passed', - FAILED: '%s failed', - ENV_CHANGED: '%s failed (env changed)', - SKIPPED: '%s skipped', - RESOURCE_DENIED: '%s skipped (resource denied)', - INTERRUPTED: '%s interrupted', - CHILD_ERROR: '%s crashed', - TEST_DID_NOT_RUN: '%s run no tests', - TIMEOUT: '%s timed out', -} - -# Minimum duration of a test to display its duration or to mention that -# the test is running in background -PROGRESS_MIN_TIME = 30.0 # seconds - -# small set of tests to determine if we have a basically functioning interpreter -# (i.e. if any of these fail, then anything else is likely to follow) -STDTESTS = [ - 'test_grammar', - 'test_opcodes', - 'test_dict', - 'test_builtin', - 'test_exceptions', - 'test_types', - 'test_unittest', - 'test_doctest', - 'test_doctest2', - 'test_support' -] - -# set of tests that we don't want to be executed when using regrtest -NOTTESTS = set() - - -# used by --findleaks, store for gc.garbage -FOUND_GARBAGE = [] - - -def is_failed(result, ns): - ok = result.result - if ok in (PASSED, RESOURCE_DENIED, SKIPPED, TEST_DID_NOT_RUN): - return False - if ok == ENV_CHANGED: - return ns.fail_env_changed - return True - - -def format_test_result(result): - fmt = _FORMAT_TEST_RESULT.get(result.result, "%s") - text = fmt % result.test_name - if result.result == TIMEOUT: - text = '%s (%s)' % (text, format_duration(result.test_time)) - return text - - -def findtestdir(path=None): - return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir - - -def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): - """Return a list of all applicable test modules.""" - testdir = findtestdir(testdir) - names = os.listdir(testdir) - tests = [] - others = set(stdtests) | nottests - for name in names: - mod, ext = os.path.splitext(name) - if mod[:5] == "test_" and ext in (".py", "") and mod not in others: - tests.append(mod) - return stdtests + sorted(tests) - - -def get_abs_module(ns, test_name): - if test_name.startswith('test.') or ns.testdir: - return test_name - else: - # Import it from the test package - return 'test.' + test_name - - -TestResult = collections.namedtuple('TestResult', - 'test_name result test_time xml_data') - -def _runtest(ns, test_name): - # Handle faulthandler timeout, capture stdout+stderr, XML serialization - # and measure time. - - output_on_failure = ns.verbose3 - - use_timeout = (ns.timeout is not None) - if use_timeout: - faulthandler.dump_traceback_later(ns.timeout, exit=True) - - start_time = time.perf_counter() - try: - support.set_match_tests(ns.match_tests, ns.ignore_tests) - support.junit_xml_list = xml_list = [] if ns.xmlpath else None - if ns.failfast: - support.failfast = True - - if output_on_failure: - support.verbose = True - - stream = io.StringIO() - orig_stdout = sys.stdout - orig_stderr = sys.stderr - try: - sys.stdout = stream - sys.stderr = stream - result = _runtest_inner(ns, test_name, - display_failure=False) - if result != PASSED: - output = stream.getvalue() - orig_stderr.write(output) - orig_stderr.flush() - finally: - sys.stdout = orig_stdout - sys.stderr = orig_stderr - else: - # Tell tests to be moderately quiet - support.verbose = ns.verbose - - result = _runtest_inner(ns, test_name, - display_failure=not ns.verbose) - - if xml_list: - import xml.etree.ElementTree as ET - xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list] - else: - xml_data = None - - test_time = time.perf_counter() - start_time - - return TestResult(test_name, result, test_time, xml_data) - finally: - if use_timeout: - faulthandler.cancel_dump_traceback_later() - support.junit_xml_list = None - - -def runtest(ns, test_name): - """Run a single test. - - ns -- regrtest namespace of options - test_name -- the name of the test - - Returns the tuple (result, test_time, xml_data), where result is one - of the constants: - - INTERRUPTED KeyboardInterrupt - RESOURCE_DENIED test skipped because resource denied - SKIPPED test skipped for some other reason - ENV_CHANGED test failed because it changed the execution environment - FAILED test failed - PASSED test passed - EMPTY_TEST_SUITE test ran no subtests. - TIMEOUT test timed out. - - If ns.xmlpath is not None, xml_data is a list containing each - generated testsuite element. - """ - try: - return _runtest(ns, test_name) - except: - if not ns.pgo: - msg = traceback.format_exc() - print(f"test {test_name} crashed -- {msg}", - file=sys.stderr, flush=True) - return TestResult(test_name, FAILED, 0.0, None) - - -def _test_module(the_module): - loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(the_module) - for error in loader.errors: - print(error, file=sys.stderr) - if loader.errors: - raise Exception("errors while loading tests") - support.run_unittest(tests) - - -def _runtest_inner2(ns, test_name): - # Load the test function, run the test function, handle huntrleaks - # and findleaks to detect leaks - - abstest = get_abs_module(ns, test_name) - - # remove the module from sys.module to reload it if it was already imported - support.unload(abstest) - - the_module = importlib.import_module(abstest) - - # If the test has a test_main, that will run the appropriate - # tests. If not, use normal unittest test loading. - test_runner = getattr(the_module, "test_main", None) - if test_runner is None: - test_runner = functools.partial(_test_module, the_module) - - try: - if ns.huntrleaks: - # Return True if the test leaked references - refleak = dash_R(ns, test_name, test_runner) - else: - test_runner() - refleak = False - finally: - cleanup_test_droppings(test_name, ns.verbose) - - support.gc_collect() - - if gc.garbage: - support.environment_altered = True - print_warning(f"{test_name} created {len(gc.garbage)} " - f"uncollectable object(s).") - - # move the uncollectable objects somewhere, - # so we don't see them again - FOUND_GARBAGE.extend(gc.garbage) - gc.garbage.clear() - - support.reap_children() - - return refleak - - -def _runtest_inner(ns, test_name, display_failure=True): - # Detect environment changes, handle exceptions. - - # Reset the environment_altered flag to detect if a test altered - # the environment - support.environment_altered = False - - if ns.pgo: - display_failure = False - - try: - clear_caches() - - with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment: - refleak = _runtest_inner2(ns, test_name) - except support.ResourceDenied as msg: - if not ns.quiet and not ns.pgo: - print(f"{test_name} skipped -- {msg}", flush=True) - return RESOURCE_DENIED - except unittest.SkipTest as msg: - if not ns.quiet and not ns.pgo: - print(f"{test_name} skipped -- {msg}", flush=True) - return SKIPPED - except support.TestFailed as exc: - msg = f"test {test_name} failed" - if display_failure: - msg = f"{msg} -- {exc}" - print(msg, file=sys.stderr, flush=True) - return FAILED - except support.TestDidNotRun: - return TEST_DID_NOT_RUN - except KeyboardInterrupt: - print() - return INTERRUPTED - except: - if not ns.pgo: - msg = traceback.format_exc() - print(f"test {test_name} crashed -- {msg}", - file=sys.stderr, flush=True) - return FAILED - - if refleak: - return FAILED - if environment.changed: - return ENV_CHANGED - return PASSED - - -def cleanup_test_droppings(test_name, verbose): - # First kill any dangling references to open files etc. - # This can also issue some ResourceWarnings which would otherwise get - # triggered during the following test run, and possibly produce failures. - support.gc_collect() - - # Try to clean up junk commonly left behind. While tests shouldn't leave - # any files or directories behind, when a test fails that can be tedious - # for it to arrange. The consequences can be especially nasty on Windows, - # since if a test leaves a file open, it cannot be deleted by name (while - # there's nothing we can do about that here either, we can display the - # name of the offending test, which is a real help). - for name in (support.TESTFN,): - if not os.path.exists(name): - continue - - if os.path.isdir(name): - import shutil - kind, nuker = "directory", shutil.rmtree - elif os.path.isfile(name): - kind, nuker = "file", os.unlink - else: - raise RuntimeError(f"os.path says {name!r} exists but is neither " - f"directory nor file") - - if verbose: - print_warning("%r left behind %s %r" % (test_name, kind, name)) - support.environment_altered = True - - try: - import stat - # fix possible permissions problems that might prevent cleanup - os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - nuker(name) - except Exception as exc: - print_warning(f"{test_name} left behind {kind} {name!r} " - f"and it couldn't be removed: {exc}") diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py deleted file mode 100644 index fc12ea7..0000000 --- a/Lib/test/libregrtest/runtest_mp.py +++ /dev/null @@ -1,464 +0,0 @@ -import collections -import faulthandler -import json -import os -import queue -import signal -import subprocess -import sys -import threading -import time -import traceback -import types -from test import support - -from test.libregrtest.runtest import ( - runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, - format_test_result, TestResult, is_failed, TIMEOUT) -from test.libregrtest.setup import setup_tests -from test.libregrtest.utils import format_duration, print_warning - - -# Display the running tests if nothing happened last N seconds -PROGRESS_UPDATE = 30.0 # seconds -assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME - -# Kill the main process after 5 minutes. It is supposed to write an update -# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest -# buildbot workers. -MAIN_PROCESS_TIMEOUT = 5 * 60.0 -assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE - -# Time to wait until a worker completes: should be immediate -JOIN_TIMEOUT = 30.0 # seconds - -USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) - - -def must_stop(result, ns): - if result.result == INTERRUPTED: - return True - if ns.failfast and is_failed(result, ns): - return True - return False - - -def parse_worker_args(worker_args): - ns_dict, test_name = json.loads(worker_args) - ns = types.SimpleNamespace(**ns_dict) - return (ns, test_name) - - -def run_test_in_subprocess(testname, ns): - ns_dict = vars(ns) - worker_args = (ns_dict, testname) - worker_args = json.dumps(worker_args) - - cmd = [sys.executable, *support.args_from_interpreter_flags(), - '-u', # Unbuffered stdout and stderr - '-m', 'test.regrtest', - '--worker-args', worker_args] - - # Running the child from the same working directory as regrtest's original - # invocation ensures that TEMPDIR for the child is the same when - # sysconfig.is_python_build() is true. See issue 15300. - kw = {} - if USE_PROCESS_GROUP: - kw['start_new_session'] = True - return subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - close_fds=(os.name != 'nt'), - cwd=support.SAVEDCWD, - **kw) - - -def run_tests_worker(ns, test_name): - setup_tests(ns) - - result = runtest(ns, test_name) - - print() # Force a newline (just in case) - - # Serialize TestResult as list in JSON - print(json.dumps(list(result)), flush=True) - sys.exit(0) - - -# We do not use a generator so multiple threads can call next(). -class MultiprocessIterator: - - """A thread-safe iterator over tests for multiprocess mode.""" - - def __init__(self, tests_iter): - self.lock = threading.Lock() - self.tests_iter = tests_iter - - def __iter__(self): - return self - - def __next__(self): - with self.lock: - if self.tests_iter is None: - raise StopIteration - return next(self.tests_iter) - - def stop(self): - with self.lock: - self.tests_iter = None - - -MultiprocessResult = collections.namedtuple('MultiprocessResult', - 'result stdout stderr error_msg') - -class ExitThread(Exception): - pass - - -class TestWorkerProcess(threading.Thread): - def __init__(self, worker_id, runner): - super().__init__() - self.worker_id = worker_id - self.pending = runner.pending - self.output = runner.output - self.ns = runner.ns - self.timeout = runner.worker_timeout - self.regrtest = runner.regrtest - self.current_test_name = None - self.start_time = None - self._popen = None - self._killed = False - self._stopped = False - - def __repr__(self): - info = [f'TestWorkerProcess #{self.worker_id}'] - if self.is_alive(): - info.append("running") - else: - info.append('stopped') - test = self.current_test_name - if test: - info.append(f'test={test}') - popen = self._popen - if popen is not None: - dt = time.monotonic() - self.start_time - info.extend((f'pid={self._popen.pid}', - f'time={format_duration(dt)}')) - return '<%s>' % ' '.join(info) - - def _kill(self): - popen = self._popen - if popen is None: - return - - if self._killed: - return - self._killed = True - - if USE_PROCESS_GROUP: - what = f"{self} process group" - else: - what = f"{self}" - - print(f"Kill {what}", file=sys.stderr, flush=True) - try: - if USE_PROCESS_GROUP: - os.killpg(popen.pid, signal.SIGKILL) - else: - popen.kill() - except ProcessLookupError: - # popen.kill(): the process completed, the TestWorkerProcess thread - # read its exit status, but Popen.send_signal() read the returncode - # just before Popen.wait() set returncode. - pass - except OSError as exc: - print_warning(f"Failed to kill {what}: {exc!r}") - - def stop(self): - # Method called from a different thread to stop this thread - self._stopped = True - self._kill() - - def mp_result_error(self, test_name, error_type, stdout='', stderr='', - err_msg=None): - test_time = time.monotonic() - self.start_time - result = TestResult(test_name, error_type, test_time, None) - return MultiprocessResult(result, stdout, stderr, err_msg) - - def _run_process(self, test_name): - self.start_time = time.monotonic() - - self.current_test_name = test_name - try: - popen = run_test_in_subprocess(test_name, self.ns) - - self._killed = False - self._popen = popen - except: - self.current_test_name = None - raise - - try: - if self._stopped: - # If kill() has been called before self._popen is set, - # self._popen is still running. Call again kill() - # to ensure that the process is killed. - self._kill() - raise ExitThread - - try: - stdout, stderr = popen.communicate(timeout=self.timeout) - retcode = popen.returncode - assert retcode is not None - except subprocess.TimeoutExpired: - if self._stopped: - # kill() has been called: communicate() fails - # on reading closed stdout/stderr - raise ExitThread - - # On timeout, kill the process - self._kill() - - # None means TIMEOUT for the caller - retcode = None - # bpo-38207: Don't attempt to call communicate() again: on it - # can hang until all child processes using stdout and stderr - # pipes completes. - stdout = stderr = '' - except OSError: - if self._stopped: - # kill() has been called: communicate() fails - # on reading closed stdout/stderr - raise ExitThread - raise - else: - stdout = stdout.strip() - stderr = stderr.rstrip() - - return (retcode, stdout, stderr) - except: - self._kill() - raise - finally: - self._wait_completed() - self._popen = None - self.current_test_name = None - - def _runtest(self, test_name): - retcode, stdout, stderr = self._run_process(test_name) - - if retcode is None: - return self.mp_result_error(test_name, TIMEOUT, stdout, stderr) - - err_msg = None - if retcode != 0: - err_msg = "Exit code %s" % retcode - else: - stdout, _, result = stdout.rpartition("\n") - stdout = stdout.rstrip() - if not result: - err_msg = "Failed to parse worker stdout" - else: - try: - # deserialize run_tests_worker() output - result = json.loads(result) - result = TestResult(*result) - except Exception as exc: - err_msg = "Failed to parse worker JSON: %s" % exc - - if err_msg is not None: - return self.mp_result_error(test_name, CHILD_ERROR, - stdout, stderr, err_msg) - - return MultiprocessResult(result, stdout, stderr, err_msg) - - def run(self): - while not self._stopped: - try: - try: - test_name = next(self.pending) - except StopIteration: - break - - mp_result = self._runtest(test_name) - self.output.put((False, mp_result)) - - if must_stop(mp_result.result, self.ns): - break - except ExitThread: - break - except BaseException: - self.output.put((True, traceback.format_exc())) - break - - def _wait_completed(self): - popen = self._popen - - # stdout and stderr must be closed to ensure that communicate() - # does not hang - popen.stdout.close() - popen.stderr.close() - - try: - popen.wait(JOIN_TIMEOUT) - except (subprocess.TimeoutExpired, OSError) as exc: - print_warning(f"Failed to wait for {self} completion " - f"(timeout={format_duration(JOIN_TIMEOUT)}): " - f"{exc!r}") - - def wait_stopped(self, start_time): - # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() - # which killed the process. Sometimes, killing the process from the - # main thread does not interrupt popen.communicate() in - # TestWorkerProcess thread. This loop with a timeout is a workaround - # for that. - # - # Moreover, if this method fails to join the thread, it is likely - # that Python will hang at exit while calling threading._shutdown() - # which tries again to join the blocked thread. Regrtest.main() - # uses EXIT_TIMEOUT to workaround this second bug. - while True: - # Write a message every second - self.join(1.0) - if not self.is_alive(): - break - dt = time.monotonic() - start_time - self.regrtest.log(f"Waiting for {self} thread " - f"for {format_duration(dt)}") - if dt > JOIN_TIMEOUT: - print_warning(f"Failed to join {self} in {format_duration(dt)}") - break - - -def get_running(workers): - running = [] - for worker in workers: - current_test_name = worker.current_test_name - if not current_test_name: - continue - dt = time.monotonic() - worker.start_time - if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test_name, format_duration(dt)) - running.append(text) - return running - - -class MultiprocessTestRunner: - def __init__(self, regrtest): - self.regrtest = regrtest - self.log = self.regrtest.log - self.ns = regrtest.ns - self.output = queue.Queue() - self.pending = MultiprocessIterator(self.regrtest.tests) - if self.ns.timeout is not None: - self.worker_timeout = self.ns.timeout * 1.5 - else: - self.worker_timeout = None - self.workers = None - - def start_workers(self): - self.workers = [TestWorkerProcess(index, self) - for index in range(1, self.ns.use_mp + 1)] - self.log("Run tests in parallel using %s child processes" - % len(self.workers)) - for worker in self.workers: - worker.start() - - def stop_workers(self): - start_time = time.monotonic() - for worker in self.workers: - worker.stop() - for worker in self.workers: - worker.wait_stopped(start_time) - - def _get_result(self): - if not any(worker.is_alive() for worker in self.workers): - # all worker threads are done: consume pending results - try: - return self.output.get(timeout=0) - except queue.Empty: - return None - - use_faulthandler = (self.ns.timeout is not None) - timeout = PROGRESS_UPDATE - while True: - if use_faulthandler: - faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, - exit=True) - - # wait for a thread - try: - return self.output.get(timeout=timeout) - except queue.Empty: - pass - - # display progress - running = get_running(self.workers) - if running and not self.ns.pgo: - self.log('running: %s' % ', '.join(running)) - - def display_result(self, mp_result): - result = mp_result.result - - text = format_test_result(result) - if mp_result.error_msg is not None: - # CHILD_ERROR - text += ' (%s)' % mp_result.error_msg - elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo): - text += ' (%s)' % format_duration(result.test_time) - running = get_running(self.workers) - if running and not self.ns.pgo: - text += ' -- running: %s' % ', '.join(running) - self.regrtest.display_progress(self.test_index, text) - - def _process_result(self, item): - if item[0]: - # Thread got an exception - format_exc = item[1] - print_warning(f"regrtest worker thread failed: {format_exc}") - return True - - self.test_index += 1 - mp_result = item[1] - self.regrtest.accumulate_result(mp_result.result) - self.display_result(mp_result) - - if mp_result.stdout: - print(mp_result.stdout, flush=True) - if mp_result.stderr and not self.ns.pgo: - print(mp_result.stderr, file=sys.stderr, flush=True) - - if must_stop(mp_result.result, self.ns): - return True - - return False - - def run_tests(self): - self.start_workers() - - self.test_index = 0 - try: - while True: - item = self._get_result() - if item is None: - break - - stop = self._process_result(item) - if stop: - break - except KeyboardInterrupt: - print() - self.regrtest.interrupted = True - finally: - if self.ns.timeout is not None: - faulthandler.cancel_dump_traceback_later() - - # Always ensure that all worker processes are no longer - # worker when we exit this function - self.pending.stop() - self.stop_workers() - - -def run_tests_multiprocess(regrtest): - MultiprocessTestRunner(regrtest).run_tests() diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py deleted file mode 100644 index e7c27a6..0000000 --- a/Lib/test/libregrtest/save_env.py +++ /dev/null @@ -1,303 +0,0 @@ -import asyncio -import builtins -import locale -import logging -import os -import shutil -import sys -import sysconfig -import threading -import urllib.request -import warnings -from test import support -from test.libregrtest.utils import print_warning -try: - import _multiprocessing, multiprocessing.process -except ImportError: - multiprocessing = None - - -# Unit tests are supposed to leave the execution environment unchanged -# once they complete. But sometimes tests have bugs, especially when -# tests fail, and the changes to environment go on to mess up other -# tests. This can cause issues with buildbot stability, since tests -# are run in random order and so problems may appear to come and go. -# There are a few things we can save and restore to mitigate this, and -# the following context manager handles this task. - -class saved_test_environment: - """Save bits of the test environment and restore them at block exit. - - with saved_test_environment(testname, verbose, quiet): - #stuff - - Unless quiet is True, a warning is printed to stderr if any of - the saved items was changed by the test. The attribute 'changed' - is initially False, but is set to True if a change is detected. - - If verbose is more than 1, the before and after state of changed - items is also printed. - """ - - changed = False - - def __init__(self, testname, verbose=0, quiet=False, *, pgo=False): - self.testname = testname - self.verbose = verbose - self.quiet = quiet - self.pgo = pgo - - # To add things to save and restore, add a name XXX to the resources list - # and add corresponding get_XXX/restore_XXX functions. get_XXX should - # return the value to be saved and compared against a second call to the - # get function when test execution completes. restore_XXX should accept - # the saved value and restore the resource using it. It will be called if - # and only if a change in the value is detected. - # - # Note: XXX will have any '.' replaced with '_' characters when determining - # the corresponding method names. - - resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr', - 'os.environ', 'sys.path', 'sys.path_hooks', '__import__', - 'warnings.filters', 'asyncore.socket_map', - 'logging._handlers', 'logging._handlerList', 'sys.gettrace', - 'sys.warnoptions', - # multiprocessing.process._cleanup() may release ref - # to a thread, so check processes first. - 'multiprocessing.process._dangling', 'threading._dangling', - 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES', - 'files', 'locale', 'warnings.showwarning', - 'shutil_archive_formats', 'shutil_unpack_formats', - 'asyncio.events._event_loop_policy', - 'urllib.requests._url_tempfiles', 'urllib.requests._opener', - ) - - def get_urllib_requests__url_tempfiles(self): - return list(urllib.request._url_tempfiles) - def restore_urllib_requests__url_tempfiles(self, tempfiles): - for filename in tempfiles: - support.unlink(filename) - - def get_urllib_requests__opener(self): - return urllib.request._opener - def restore_urllib_requests__opener(self, opener): - urllib.request._opener = opener - - def get_asyncio_events__event_loop_policy(self): - return support.maybe_get_event_loop_policy() - def restore_asyncio_events__event_loop_policy(self, policy): - asyncio.set_event_loop_policy(policy) - - def get_sys_argv(self): - return id(sys.argv), sys.argv, sys.argv[:] - def restore_sys_argv(self, saved_argv): - sys.argv = saved_argv[1] - sys.argv[:] = saved_argv[2] - - def get_cwd(self): - return os.getcwd() - def restore_cwd(self, saved_cwd): - os.chdir(saved_cwd) - - def get_sys_stdout(self): - return sys.stdout - def restore_sys_stdout(self, saved_stdout): - sys.stdout = saved_stdout - - def get_sys_stderr(self): - return sys.stderr - def restore_sys_stderr(self, saved_stderr): - sys.stderr = saved_stderr - - def get_sys_stdin(self): - return sys.stdin - def restore_sys_stdin(self, saved_stdin): - sys.stdin = saved_stdin - - def get_os_environ(self): - return id(os.environ), os.environ, dict(os.environ) - def restore_os_environ(self, saved_environ): - os.environ = saved_environ[1] - os.environ.clear() - os.environ.update(saved_environ[2]) - - def get_sys_path(self): - return id(sys.path), sys.path, sys.path[:] - def restore_sys_path(self, saved_path): - sys.path = saved_path[1] - sys.path[:] = saved_path[2] - - def get_sys_path_hooks(self): - return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:] - def restore_sys_path_hooks(self, saved_hooks): - sys.path_hooks = saved_hooks[1] - sys.path_hooks[:] = saved_hooks[2] - - def get_sys_gettrace(self): - return sys.gettrace() - def restore_sys_gettrace(self, trace_fxn): - sys.settrace(trace_fxn) - - def get___import__(self): - return builtins.__import__ - def restore___import__(self, import_): - builtins.__import__ = import_ - - def get_warnings_filters(self): - return id(warnings.filters), warnings.filters, warnings.filters[:] - def restore_warnings_filters(self, saved_filters): - warnings.filters = saved_filters[1] - warnings.filters[:] = saved_filters[2] - - def get_asyncore_socket_map(self): - asyncore = sys.modules.get('asyncore') - # XXX Making a copy keeps objects alive until __exit__ gets called. - return asyncore and asyncore.socket_map.copy() or {} - def restore_asyncore_socket_map(self, saved_map): - asyncore = sys.modules.get('asyncore') - if asyncore is not None: - asyncore.close_all(ignore_all=True) - asyncore.socket_map.update(saved_map) - - def get_shutil_archive_formats(self): - # we could call get_archives_formats() but that only returns the - # registry keys; we want to check the values too (the functions that - # are registered) - return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy() - def restore_shutil_archive_formats(self, saved): - shutil._ARCHIVE_FORMATS = saved[0] - shutil._ARCHIVE_FORMATS.clear() - shutil._ARCHIVE_FORMATS.update(saved[1]) - - def get_shutil_unpack_formats(self): - return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy() - def restore_shutil_unpack_formats(self, saved): - shutil._UNPACK_FORMATS = saved[0] - shutil._UNPACK_FORMATS.clear() - shutil._UNPACK_FORMATS.update(saved[1]) - - def get_logging__handlers(self): - # _handlers is a WeakValueDictionary - return id(logging._handlers), logging._handlers, logging._handlers.copy() - def restore_logging__handlers(self, saved_handlers): - # Can't easily revert the logging state - pass - - def get_logging__handlerList(self): - # _handlerList is a list of weakrefs to handlers - return id(logging._handlerList), logging._handlerList, logging._handlerList[:] - def restore_logging__handlerList(self, saved_handlerList): - # Can't easily revert the logging state - pass - - def get_sys_warnoptions(self): - return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:] - def restore_sys_warnoptions(self, saved_options): - sys.warnoptions = saved_options[1] - sys.warnoptions[:] = saved_options[2] - - # Controlling dangling references to Thread objects can make it easier - # to track reference leaks. - def get_threading__dangling(self): - # This copies the weakrefs without making any strong reference - return threading._dangling.copy() - def restore_threading__dangling(self, saved): - threading._dangling.clear() - threading._dangling.update(saved) - - # Same for Process objects - def get_multiprocessing_process__dangling(self): - if not multiprocessing: - return None - # Unjoined process objects can survive after process exits - multiprocessing.process._cleanup() - # This copies the weakrefs without making any strong reference - return multiprocessing.process._dangling.copy() - def restore_multiprocessing_process__dangling(self, saved): - if not multiprocessing: - return - multiprocessing.process._dangling.clear() - multiprocessing.process._dangling.update(saved) - - def get_sysconfig__CONFIG_VARS(self): - # make sure the dict is initialized - sysconfig.get_config_var('prefix') - return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS, - dict(sysconfig._CONFIG_VARS)) - def restore_sysconfig__CONFIG_VARS(self, saved): - sysconfig._CONFIG_VARS = saved[1] - sysconfig._CONFIG_VARS.clear() - sysconfig._CONFIG_VARS.update(saved[2]) - - def get_sysconfig__INSTALL_SCHEMES(self): - return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES, - sysconfig._INSTALL_SCHEMES.copy()) - def restore_sysconfig__INSTALL_SCHEMES(self, saved): - sysconfig._INSTALL_SCHEMES = saved[1] - sysconfig._INSTALL_SCHEMES.clear() - sysconfig._INSTALL_SCHEMES.update(saved[2]) - - def get_files(self): - return sorted(fn + ('/' if os.path.isdir(fn) else '') - for fn in os.listdir()) - def restore_files(self, saved_value): - fn = support.TESTFN - if fn not in saved_value and (fn + '/') not in saved_value: - if os.path.isfile(fn): - support.unlink(fn) - elif os.path.isdir(fn): - support.rmtree(fn) - - _lc = [getattr(locale, lc) for lc in dir(locale) - if lc.startswith('LC_')] - def get_locale(self): - pairings = [] - for lc in self._lc: - try: - pairings.append((lc, locale.setlocale(lc, None))) - except (TypeError, ValueError): - continue - return pairings - def restore_locale(self, saved): - for lc, setting in saved: - locale.setlocale(lc, setting) - - def get_warnings_showwarning(self): - return warnings.showwarning - def restore_warnings_showwarning(self, fxn): - warnings.showwarning = fxn - - def resource_info(self): - for name in self.resources: - method_suffix = name.replace('.', '_') - get_name = 'get_' + method_suffix - restore_name = 'restore_' + method_suffix - yield name, getattr(self, get_name), getattr(self, restore_name) - - def __enter__(self): - self.saved_values = dict((name, get()) for name, get, restore - in self.resource_info()) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - saved_values = self.saved_values - del self.saved_values - - # Some resources use weak references - support.gc_collect() - - # Read support.environment_altered, set by support helper functions - self.changed |= support.environment_altered - - for name, get, restore in self.resource_info(): - current = get() - original = saved_values.pop(name) - # Check for changes to the resource's value - if current != original: - self.changed = True - restore(original) - if not self.quiet and not self.pgo: - print_warning(f"{name} was modified by {self.testname}") - print(f" Before: {original}\n After: {current} ", - file=sys.stderr, flush=True) - return False diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py deleted file mode 100644 index ce81496..0000000 --- a/Lib/test/libregrtest/setup.py +++ /dev/null @@ -1,144 +0,0 @@ -import atexit -import faulthandler -import os -import signal -import sys -import unittest -from test import support -try: - import gc -except ImportError: - gc = None - -from test.libregrtest.utils import setup_unraisable_hook - - -def setup_tests(ns): - try: - stderr_fd = sys.__stderr__.fileno() - except (ValueError, AttributeError): - # Catch ValueError to catch io.UnsupportedOperation on TextIOBase - # and ValueError on a closed stream. - # - # Catch AttributeError for stderr being None. - stderr_fd = None - else: - # Display the Python traceback on fatal errors (e.g. segfault) - faulthandler.enable(all_threads=True, file=stderr_fd) - - # Display the Python traceback on SIGALRM or SIGUSR1 signal - signals = [] - if hasattr(signal, 'SIGALRM'): - signals.append(signal.SIGALRM) - if hasattr(signal, 'SIGUSR1'): - signals.append(signal.SIGUSR1) - for signum in signals: - faulthandler.register(signum, chain=True, file=stderr_fd) - - replace_stdout() - support.record_original_stdout(sys.stdout) - - if ns.testdir: - # Prepend test directory to sys.path, so runtest() will be able - # to locate tests - sys.path.insert(0, os.path.abspath(ns.testdir)) - - # Some times __path__ and __file__ are not absolute (e.g. while running from - # Lib/) and, if we change the CWD to run the tests in a temporary dir, some - # imports might fail. This affects only the modules imported before os.chdir(). - # These modules are searched first in sys.path[0] (so '' -- the CWD) and if - # they are found in the CWD their __file__ and __path__ will be relative (this - # happens before the chdir). All the modules imported after the chdir, are - # not found in the CWD, and since the other paths in sys.path[1:] are absolute - # (site.py absolutize them), the __file__ and __path__ will be absolute too. - # Therefore it is necessary to absolutize manually the __file__ and __path__ of - # the packages to prevent later imports to fail when the CWD is different. - for module in sys.modules.values(): - if hasattr(module, '__path__'): - for index, path in enumerate(module.__path__): - module.__path__[index] = os.path.abspath(path) - if getattr(module, '__file__', None): - module.__file__ = os.path.abspath(module.__file__) - - if ns.huntrleaks: - unittest.BaseTestSuite._cleanup = False - - if ns.memlimit is not None: - support.set_memlimit(ns.memlimit) - - if ns.threshold is not None: - gc.set_threshold(ns.threshold) - - suppress_msvcrt_asserts(ns.verbose and ns.verbose >= 2) - - support.use_resources = ns.use_resources - - if hasattr(sys, 'addaudithook'): - # Add an auditing hook for all tests to ensure PySys_Audit is tested - def _test_audit_hook(name, args): - pass - sys.addaudithook(_test_audit_hook) - - setup_unraisable_hook() - - if ns.timeout is not None: - # For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT - support.SHORT_TIMEOUT = max(support.SHORT_TIMEOUT, ns.timeout / 40) - support.LONG_TIMEOUT = max(support.LONG_TIMEOUT, ns.timeout / 4) - - # If --timeout is short: reduce timeouts - support.LOOPBACK_TIMEOUT = min(support.LOOPBACK_TIMEOUT, ns.timeout) - support.INTERNET_TIMEOUT = min(support.INTERNET_TIMEOUT, ns.timeout) - support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, ns.timeout) - support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, ns.timeout) - - -def suppress_msvcrt_asserts(verbose): - try: - import msvcrt - except ImportError: - return - - msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS| - msvcrt.SEM_NOALIGNMENTFAULTEXCEPT| - msvcrt.SEM_NOGPFAULTERRORBOX| - msvcrt.SEM_NOOPENFILEERRORBOX) - try: - msvcrt.CrtSetReportMode - except AttributeError: - # release build - return - - for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: - if verbose: - msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) - msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) - else: - msvcrt.CrtSetReportMode(m, 0) - - - -def replace_stdout(): - """Set stdout encoder error handler to backslashreplace (as stderr error - handler) to avoid UnicodeEncodeError when printing a traceback""" - stdout = sys.stdout - try: - fd = stdout.fileno() - except ValueError: - # On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper - # object. Leaving sys.stdout unchanged. - # - # Catch ValueError to catch io.UnsupportedOperation on TextIOBase - # and ValueError on a closed stream. - return - - sys.stdout = open(fd, 'w', - encoding=stdout.encoding, - errors="backslashreplace", - closefd=False, - newline='\n') - - def restore_stdout(): - sys.stdout.close() - sys.stdout = stdout - atexit.register(restore_stdout) diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py deleted file mode 100644 index 40faed8..0000000 --- a/Lib/test/libregrtest/utils.py +++ /dev/null @@ -1,81 +0,0 @@ -import math -import os.path -import sys -import textwrap -from test import support - - -def format_duration(seconds): - ms = math.ceil(seconds * 1e3) - seconds, ms = divmod(ms, 1000) - minutes, seconds = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - - parts = [] - if hours: - parts.append('%s hour' % hours) - if minutes: - parts.append('%s min' % minutes) - if seconds: - if parts: - # 2 min 1 sec - parts.append('%s sec' % seconds) - else: - # 1.0 sec - parts.append('%.1f sec' % (seconds + ms / 1000)) - if not parts: - return '%s ms' % ms - - parts = parts[:2] - return ' '.join(parts) - - -def removepy(names): - if not names: - return - for idx, name in enumerate(names): - basename, ext = os.path.splitext(name) - if ext == '.py': - names[idx] = basename - - -def count(n, word): - if n == 1: - return "%d %s" % (n, word) - else: - return "%d %ss" % (n, word) - - -def printlist(x, width=70, indent=4, file=None): - """Print the elements of iterable x to stdout. - - Optional arg width (default 70) is the maximum line length. - Optional arg indent (default 4) is the number of blanks with which to - begin each line. - """ - - blanks = ' ' * indent - # Print the sorted list: 'x' may be a '--random' list or a set() - print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width, - initial_indent=blanks, subsequent_indent=blanks), - file=file) - - -def print_warning(msg): - print(f"Warning -- {msg}", file=sys.stderr, flush=True) - - -orig_unraisablehook = None - - -def regrtest_unraisable_hook(unraisable): - global orig_unraisablehook - support.environment_altered = True - print_warning("Unraisable exception") - orig_unraisablehook(unraisable) - - -def setup_unraisable_hook(): - global orig_unraisablehook - orig_unraisablehook = sys.unraisablehook - sys.unraisablehook = regrtest_unraisable_hook diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py deleted file mode 100644 index 028c011..0000000 --- a/Lib/test/libregrtest/win_utils.py +++ /dev/null @@ -1,190 +0,0 @@ -import _winapi -import math -import msvcrt -import os -import subprocess -import uuid -import winreg -from test import support -from test.libregrtest.utils import print_warning - - -# Max size of asynchronous reads -BUFSIZE = 8192 -# Seconds per measurement -SAMPLING_INTERVAL = 1 -# Exponential damping factor to compute exponentially weighted moving average -# on 1 minute (60 seconds) -LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) -# Initialize the load using the arithmetic mean of the first NVALUE values -# of the Processor Queue Length -NVALUE = 5 -# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names -# of typeperf are registered -COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" - r"\Perflib\CurrentLanguage") - - -class WindowsLoadTracker(): - """ - This class asynchronously interacts with the `typeperf` command to read - the system load on Windows. Multiprocessing and threads can't be used - here because they interfere with the test suite's cases for those - modules. - """ - - def __init__(self): - self._values = [] - self._load = None - self._buffer = '' - self._popen = None - self.start() - - def start(self): - # Create a named pipe which allows for asynchronous IO in Windows - pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4()) - - open_mode = _winapi.PIPE_ACCESS_INBOUND - open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE - open_mode |= _winapi.FILE_FLAG_OVERLAPPED - - # This is the read end of the pipe, where we will be grabbing output - self.pipe = _winapi.CreateNamedPipe( - pipe_name, open_mode, _winapi.PIPE_WAIT, - 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL - ) - # The write end of the pipe which is passed to the created process - pipe_write_end = _winapi.CreateFile( - pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL, - _winapi.OPEN_EXISTING, 0, _winapi.NULL - ) - # Open up the handle as a python file object so we can pass it to - # subprocess - command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0) - - # Connect to the read end of the pipe in overlap/async mode - overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True) - overlap.GetOverlappedResult(True) - - # Spawn off the load monitor - counter_name = self._get_counter_name() - command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] - self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD) - - # Close our copy of the write end of the pipe - os.close(command_stdout) - - def _get_counter_name(self): - # accessing the registry to get the counter localization name - with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey: - counters = winreg.QueryValueEx(perfkey, 'Counter')[0] - - # Convert [key1, value1, key2, value2, ...] list - # to {key1: value1, key2: value2, ...} dict - counters = iter(counters) - counters_dict = dict(zip(counters, counters)) - - # System counter has key '2' and Processor Queue Length has key '44' - system = counters_dict['2'] - process_queue_length = counters_dict['44'] - return f'"\\{system}\\{process_queue_length}"' - - def close(self, kill=True): - if self._popen is None: - return - - self._load = None - - if kill: - self._popen.kill() - self._popen.wait() - self._popen = None - - def __del__(self): - self.close() - - def _parse_line(self, line): - # typeperf outputs in a CSV format like this: - # "07/19/2018 01:32:26.605","3.000000" - # (date, process queue length) - tokens = line.split(',') - if len(tokens) != 2: - raise ValueError - - value = tokens[1] - if not value.startswith('"') or not value.endswith('"'): - raise ValueError - value = value[1:-1] - return float(value) - - def _read_lines(self): - overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) - bytes_read, res = overlapped.GetOverlappedResult(False) - if res != 0: - return () - - output = overlapped.getbuffer() - output = output.decode('oem', 'replace') - output = self._buffer + output - lines = output.splitlines(True) - - # bpo-36670: typeperf only writes a newline *before* writing a value, - # not after. Sometimes, the written line in incomplete (ex: only - # timestamp, without the process queue length). Only pass the last line - # to the parser if it's a valid value, otherwise store it in - # self._buffer. - try: - self._parse_line(lines[-1]) - except ValueError: - self._buffer = lines.pop(-1) - else: - self._buffer = '' - - return lines - - def getloadavg(self): - if self._popen is None: - return None - - returncode = self._popen.poll() - if returncode is not None: - self.close(kill=False) - return None - - try: - lines = self._read_lines() - except BrokenPipeError: - self.close() - return None - - for line in lines: - line = line.rstrip() - - # Ignore the initial header: - # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length" - if 'PDH-CSV' in line: - continue - - # Ignore blank lines - if not line: - continue - - try: - processor_queue_length = self._parse_line(line) - except ValueError: - print_warning("Failed to parse typeperf output: %a" % line) - continue - - # We use an exponentially weighted moving average, imitating the - # load calculation on Unix systems. - # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation - # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - if self._load is not None: - self._load = (self._load * LOAD_FACTOR_1 - + processor_queue_length * (1.0 - LOAD_FACTOR_1)) - elif len(self._values) < NVALUE: - self._values.append(processor_queue_length) - else: - self._load = sum(self._values) / len(self._values) - - return self._load |