summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/libregrtest')
-rw-r--r--Lib/test/libregrtest/__init__.py2
-rw-r--r--Lib/test/libregrtest/cmdline.py416
-rw-r--r--Lib/test/libregrtest/main.py712
-rw-r--r--Lib/test/libregrtest/pgo.py55
-rw-r--r--Lib/test/libregrtest/refleak.py286
-rw-r--r--Lib/test/libregrtest/runtest.py340
-rw-r--r--Lib/test/libregrtest/runtest_mp.py464
-rw-r--r--Lib/test/libregrtest/save_env.py303
-rw-r--r--Lib/test/libregrtest/setup.py144
-rw-r--r--Lib/test/libregrtest/utils.py81
-rw-r--r--Lib/test/libregrtest/win_utils.py190
11 files changed, 0 insertions, 2993 deletions
diff --git a/Lib/test/libregrtest/__init__.py b/Lib/test/libregrtest/__init__.py
deleted file mode 100644
index 5e8dba5..0000000
--- a/Lib/test/libregrtest/__init__.py
+++ /dev/null
@@ -1,2 +0,0 @@
-from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES, ALL_RESOURCES
-from test.libregrtest.main import main
diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py
deleted file mode 100644
index c0bb051..0000000
--- a/Lib/test/libregrtest/cmdline.py
+++ /dev/null
@@ -1,416 +0,0 @@
-import argparse
-import os
-import sys
-from test import support
-
-
-USAGE = """\
-python -m test [options] [test_name1 [test_name2 ...]]
-python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]]
-"""
-
-DESCRIPTION = """\
-Run Python regression tests.
-
-If no arguments or options are provided, finds all files matching
-the pattern "test_*" in the Lib/test subdirectory and runs
-them in alphabetical order (but see -M and -u, below, for exceptions).
-
-For more rigorous testing, it is useful to use the following
-command line:
-
-python -E -Wd -m test [options] [test_name1 ...]
-"""
-
-EPILOG = """\
-Additional option details:
-
--r randomizes test execution order. You can use --randseed=int to provide an
-int seed value for the randomizer; this is useful for reproducing troublesome
-test orders.
-
--s On the first invocation of regrtest using -s, the first test file found
-or the first test file given on the command line is run, and the name of
-the next test is recorded in a file named pynexttest. If run from the
-Python build directory, pynexttest is located in the 'build' subdirectory,
-otherwise it is located in tempfile.gettempdir(). On subsequent runs,
-the test in pynexttest is run, and the next test is written to pynexttest.
-When the last test has been run, pynexttest is deleted. In this way it
-is possible to single step through the test files. This is useful when
-doing memory analysis on the Python interpreter, which process tends to
-consume too many resources to run the full regression test non-stop.
-
--S is used to continue running tests after an aborted run. It will
-maintain the order a standard run (ie, this assumes -r is not used).
-This is useful after the tests have prematurely stopped for some external
-reason and you want to start running from where you left off rather
-than starting from the beginning.
-
--f reads the names of tests from the file given as f's argument, one
-or more test names per line. Whitespace is ignored. Blank lines and
-lines beginning with '#' are ignored. This is especially useful for
-whittling down failures involving interactions among tests.
-
--L causes the leaks(1) command to be run just before exit if it exists.
-leaks(1) is available on Mac OS X and presumably on some other
-FreeBSD-derived systems.
-
--R runs each test several times and examines sys.gettotalrefcount() to
-see if the test appears to be leaking references. The argument should
-be of the form stab:run:fname where 'stab' is the number of times the
-test is run to let gettotalrefcount settle down, 'run' is the number
-of times further it is run and 'fname' is the name of the file the
-reports are written to. These parameters all have defaults (5, 4 and
-"reflog.txt" respectively), and the minimal invocation is '-R :'.
-
--M runs tests that require an exorbitant amount of memory. These tests
-typically try to ascertain containers keep working when containing more than
-2 billion objects, which only works on 64-bit systems. There are also some
-tests that try to exhaust the address space of the process, which only makes
-sense on 32-bit systems with at least 2Gb of memory. The passed-in memlimit,
-which is a string in the form of '2.5Gb', determines how much memory the
-tests will limit themselves to (but they may go slightly over.) The number
-shouldn't be more memory than the machine has (including swap memory). You
-should also keep in mind that swap memory is generally much, much slower
-than RAM, and setting memlimit to all available RAM or higher will heavily
-tax the machine. On the other hand, it is no use running these tests with a
-limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect
-to use more than memlimit memory will be skipped. The big-memory tests
-generally run very, very long.
-
--u is used to specify which special resource intensive tests to run,
-such as those requiring large file support or network connectivity.
-The argument is a comma-separated list of words indicating the
-resources to test. Currently only the following are defined:
-
- all - Enable all special resources.
-
- none - Disable all special resources (this is the default).
-
- audio - Tests that use the audio device. (There are known
- cases of broken audio drivers that can crash Python or
- even the Linux kernel.)
-
- curses - Tests that use curses and will modify the terminal's
- state and output modes.
-
- largefile - It is okay to run some test that may create huge
- files. These tests can take a long time and may
- consume >2 GiB of disk space temporarily.
-
- network - It is okay to run tests that use external network
- resource, e.g. testing SSL support for sockets.
-
- decimal - Test the decimal module against a large suite that
- verifies compliance with standards.
-
- cpu - Used for certain CPU-heavy tests.
-
- subprocess Run all tests for the subprocess module.
-
- urlfetch - It is okay to download files required on testing.
-
- gui - Run tests that require a running GUI.
-
- tzdata - Run tests that require timezone data.
-
-To enable all resources except one, use '-uall,-<resource>'. For
-example, to run all the tests except for the gui tests, give the
-option '-uall,-gui'.
-
---matchfile filters tests using a text file, one pattern per line.
-Pattern examples:
-
-- test method: test_stat_attributes
-- test class: FileTests
-- test identifier: test_os.FileTests.test_stat_attributes
-"""
-
-
-ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
- 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui')
-
-# Other resources excluded from --use=all:
-#
-# - extralagefile (ex: test_zipfile64): really too slow to be enabled
-# "by default"
-# - tzdata: while needed to validate fully test_datetime, it makes
-# test_datetime too slow (15-20 min on some buildbots) and so is disabled by
-# default (see bpo-30822).
-RESOURCE_NAMES = ALL_RESOURCES + ('extralargefile', 'tzdata')
-
-class _ArgParser(argparse.ArgumentParser):
-
- def error(self, message):
- super().error(message + "\nPass -h or --help for complete help.")
-
-
-def _create_parser():
- # Set prog to prevent the uninformative "__main__.py" from displaying in
- # error messages when using "python -m test ...".
- parser = _ArgParser(prog='regrtest.py',
- usage=USAGE,
- description=DESCRIPTION,
- epilog=EPILOG,
- add_help=False,
- formatter_class=argparse.RawDescriptionHelpFormatter)
-
- # Arguments with this clause added to its help are described further in
- # the epilog's "Additional option details" section.
- more_details = ' See the section at bottom for more details.'
-
- group = parser.add_argument_group('General options')
- # We add help explicitly to control what argument group it renders under.
- group.add_argument('-h', '--help', action='help',
- help='show this help message and exit')
- group.add_argument('--timeout', metavar='TIMEOUT', type=float,
- help='dump the traceback and exit if a test takes '
- 'more than TIMEOUT seconds; disabled if TIMEOUT '
- 'is negative or equals to zero')
- group.add_argument('--wait', action='store_true',
- help='wait for user input, e.g., allow a debugger '
- 'to be attached')
- group.add_argument('--worker-args', metavar='ARGS')
- group.add_argument('-S', '--start', metavar='START',
- help='the name of the test at which to start.' +
- more_details)
-
- group = parser.add_argument_group('Verbosity')
- group.add_argument('-v', '--verbose', action='count',
- help='run tests in verbose mode with output to stdout')
- group.add_argument('-w', '--verbose2', action='store_true',
- help='re-run failed tests in verbose mode')
- group.add_argument('-W', '--verbose3', action='store_true',
- help='display test output on failure')
- group.add_argument('-q', '--quiet', action='store_true',
- help='no output unless one or more tests fail')
- group.add_argument('-o', '--slowest', action='store_true', dest='print_slow',
- help='print the slowest 10 tests')
- group.add_argument('--header', action='store_true',
- help='print header with interpreter info')
-
- group = parser.add_argument_group('Selecting tests')
- group.add_argument('-r', '--randomize', action='store_true',
- help='randomize test execution order.' + more_details)
- group.add_argument('--randseed', metavar='SEED',
- dest='random_seed', type=int,
- help='pass a random seed to reproduce a previous '
- 'random run')
- group.add_argument('-f', '--fromfile', metavar='FILE',
- help='read names of tests to run from a file.' +
- more_details)
- group.add_argument('-x', '--exclude', action='store_true',
- help='arguments are tests to *exclude*')
- group.add_argument('-s', '--single', action='store_true',
- help='single step through a set of tests.' +
- more_details)
- group.add_argument('-m', '--match', metavar='PAT',
- dest='match_tests', action='append',
- help='match test cases and methods with glob pattern PAT')
- group.add_argument('-i', '--ignore', metavar='PAT',
- dest='ignore_tests', action='append',
- help='ignore test cases and methods with glob pattern PAT')
- group.add_argument('--matchfile', metavar='FILENAME',
- dest='match_filename',
- help='similar to --match but get patterns from a '
- 'text file, one pattern per line')
- group.add_argument('--ignorefile', metavar='FILENAME',
- dest='ignore_filename',
- help='similar to --matchfile but it receives patterns '
- 'from text file to ignore')
- group.add_argument('-G', '--failfast', action='store_true',
- help='fail as soon as a test fails (only with -v or -W)')
- group.add_argument('-u', '--use', metavar='RES1,RES2,...',
- action='append', type=resources_list,
- help='specify which special resource intensive tests '
- 'to run.' + more_details)
- group.add_argument('-M', '--memlimit', metavar='LIMIT',
- help='run very large memory-consuming tests.' +
- more_details)
- group.add_argument('--testdir', metavar='DIR',
- type=relative_filename,
- help='execute test files in the specified directory '
- '(instead of the Python stdlib test suite)')
-
- group = parser.add_argument_group('Special runs')
- group.add_argument('-l', '--findleaks', action='store_const', const=2,
- default=1,
- help='deprecated alias to --fail-env-changed')
- group.add_argument('-L', '--runleaks', action='store_true',
- help='run the leaks(1) command just before exit.' +
- more_details)
- group.add_argument('-R', '--huntrleaks', metavar='RUNCOUNTS',
- type=huntrleaks,
- help='search for reference leaks (needs debug build, '
- 'very slow).' + more_details)
- group.add_argument('-j', '--multiprocess', metavar='PROCESSES',
- dest='use_mp', type=int,
- help='run PROCESSES processes at once')
- group.add_argument('-T', '--coverage', action='store_true',
- dest='trace',
- help='turn on code coverage tracing using the trace '
- 'module')
- group.add_argument('-D', '--coverdir', metavar='DIR',
- type=relative_filename,
- help='directory where coverage files are put')
- group.add_argument('-N', '--nocoverdir',
- action='store_const', const=None, dest='coverdir',
- help='put coverage files alongside modules')
- group.add_argument('-t', '--threshold', metavar='THRESHOLD',
- type=int,
- help='call gc.set_threshold(THRESHOLD)')
- group.add_argument('-n', '--nowindows', action='store_true',
- help='suppress error message boxes on Windows')
- group.add_argument('-F', '--forever', action='store_true',
- help='run the specified tests in a loop, until an '
- 'error happens; imply --failfast')
- group.add_argument('--list-tests', action='store_true',
- help="only write the name of tests that will be run, "
- "don't execute them")
- group.add_argument('--list-cases', action='store_true',
- help='only write the name of test cases that will be run'
- ' , don\'t execute them')
- group.add_argument('-P', '--pgo', dest='pgo', action='store_true',
- help='enable Profile Guided Optimization (PGO) training')
- group.add_argument('--pgo-extended', action='store_true',
- help='enable extended PGO training (slower training)')
- group.add_argument('--fail-env-changed', action='store_true',
- help='if a test file alters the environment, mark '
- 'the test as failed')
-
- group.add_argument('--junit-xml', dest='xmlpath', metavar='FILENAME',
- help='writes JUnit-style XML results to the specified '
- 'file')
- group.add_argument('--tempdir', metavar='PATH',
- help='override the working directory for the test run')
- group.add_argument('--cleanup', action='store_true',
- help='remove old test_python_* directories')
- return parser
-
-
-def relative_filename(string):
- # CWD is replaced with a temporary dir before calling main(), so we
- # join it with the saved CWD so it ends up where the user expects.
- return os.path.join(support.SAVEDCWD, string)
-
-
-def huntrleaks(string):
- args = string.split(':')
- if len(args) not in (2, 3):
- raise argparse.ArgumentTypeError(
- 'needs 2 or 3 colon-separated arguments')
- nwarmup = int(args[0]) if args[0] else 5
- ntracked = int(args[1]) if args[1] else 4
- fname = args[2] if len(args) > 2 and args[2] else 'reflog.txt'
- return nwarmup, ntracked, fname
-
-
-def resources_list(string):
- u = [x.lower() for x in string.split(',')]
- for r in u:
- if r == 'all' or r == 'none':
- continue
- if r[0] == '-':
- r = r[1:]
- if r not in RESOURCE_NAMES:
- raise argparse.ArgumentTypeError('invalid resource: ' + r)
- return u
-
-
-def _parse_args(args, **kwargs):
- # Defaults
- ns = argparse.Namespace(testdir=None, verbose=0, quiet=False,
- exclude=False, single=False, randomize=False, fromfile=None,
- findleaks=1, use_resources=None, trace=False, coverdir='coverage',
- runleaks=False, huntrleaks=False, verbose2=False, print_slow=False,
- random_seed=None, use_mp=None, verbose3=False, forever=False,
- header=False, failfast=False, match_tests=None, ignore_tests=None,
- pgo=False)
- for k, v in kwargs.items():
- if not hasattr(ns, k):
- raise TypeError('%r is an invalid keyword argument '
- 'for this function' % k)
- setattr(ns, k, v)
- if ns.use_resources is None:
- ns.use_resources = []
-
- parser = _create_parser()
- # Issue #14191: argparse doesn't support "intermixed" positional and
- # optional arguments. Use parse_known_args() as workaround.
- ns.args = parser.parse_known_args(args=args, namespace=ns)[1]
- for arg in ns.args:
- if arg.startswith('-'):
- parser.error("unrecognized arguments: %s" % arg)
- sys.exit(1)
-
- if ns.findleaks > 1:
- # --findleaks implies --fail-env-changed
- ns.fail_env_changed = True
- if ns.single and ns.fromfile:
- parser.error("-s and -f don't go together!")
- if ns.use_mp is not None and ns.trace:
- parser.error("-T and -j don't go together!")
- if ns.failfast and not (ns.verbose or ns.verbose3):
- parser.error("-G/--failfast needs either -v or -W")
- if ns.pgo and (ns.verbose or ns.verbose2 or ns.verbose3):
- parser.error("--pgo/-v don't go together!")
- if ns.pgo_extended:
- ns.pgo = True # pgo_extended implies pgo
-
- if ns.nowindows:
- print("Warning: the --nowindows (-n) option is deprecated. "
- "Use -vv to display assertions in stderr.", file=sys.stderr)
-
- if ns.quiet:
- ns.verbose = 0
- if ns.timeout is not None:
- if ns.timeout <= 0:
- ns.timeout = None
- if ns.use_mp is not None:
- if ns.use_mp <= 0:
- # Use all cores + extras for tests that like to sleep
- ns.use_mp = 2 + (os.cpu_count() or 1)
- if ns.use:
- for a in ns.use:
- for r in a:
- if r == 'all':
- ns.use_resources[:] = ALL_RESOURCES
- continue
- if r == 'none':
- del ns.use_resources[:]
- continue
- remove = False
- if r[0] == '-':
- remove = True
- r = r[1:]
- if remove:
- if r in ns.use_resources:
- ns.use_resources.remove(r)
- elif r not in ns.use_resources:
- ns.use_resources.append(r)
- if ns.random_seed is not None:
- ns.randomize = True
- if ns.verbose:
- ns.header = True
- if ns.huntrleaks and ns.verbose3:
- ns.verbose3 = False
- print("WARNING: Disable --verbose3 because it's incompatible with "
- "--huntrleaks: see http://bugs.python.org/issue27103",
- file=sys.stderr)
- if ns.match_filename:
- if ns.match_tests is None:
- ns.match_tests = []
- with open(ns.match_filename) as fp:
- for line in fp:
- ns.match_tests.append(line.strip())
- if ns.ignore_filename:
- if ns.ignore_tests is None:
- ns.ignore_tests = []
- with open(ns.ignore_filename) as fp:
- for line in fp:
- ns.ignore_tests.append(line.strip())
- if ns.forever:
- # --forever implies --failfast
- ns.failfast = True
-
- return ns
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
deleted file mode 100644
index 1de51b7..0000000
--- a/Lib/test/libregrtest/main.py
+++ /dev/null
@@ -1,712 +0,0 @@
-import datetime
-import faulthandler
-import locale
-import os
-import platform
-import random
-import re
-import sys
-import sysconfig
-import tempfile
-import time
-import unittest
-from test.libregrtest.cmdline import _parse_args
-from test.libregrtest.runtest import (
- findtests, runtest, get_abs_module,
- STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
- INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT,
- PROGRESS_MIN_TIME, format_test_result, is_failed)
-from test.libregrtest.setup import setup_tests
-from test.libregrtest.pgo import setup_pgo_tests
-from test.libregrtest.utils import removepy, count, format_duration, printlist
-from test import support
-
-
-# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
-# Used to protect against threading._shutdown() hang.
-# Must be smaller than buildbot "1200 seconds without output" limit.
-EXIT_TIMEOUT = 120.0
-
-
-class Regrtest:
- """Execute a test suite.
-
- This also parses command-line options and modifies its behavior
- accordingly.
-
- tests -- a list of strings containing test names (optional)
- testdir -- the directory in which to look for tests (optional)
-
- Users other than the Python test suite will certainly want to
- specify testdir; if it's omitted, the directory containing the
- Python test suite is searched for.
-
- If the tests argument is omitted, the tests listed on the
- command-line will be used. If that's empty, too, then all *.py
- files beginning with test_ will be used.
-
- The other default arguments (verbose, quiet, exclude,
- single, randomize, findleaks, use_resources, trace, coverdir,
- print_slow, and random_seed) allow programmers calling main()
- directly to set the values that would normally be set by flags
- on the command line.
- """
- def __init__(self):
- # Namespace of command line options
- self.ns = None
-
- # tests
- self.tests = []
- self.selected = []
-
- # test results
- self.good = []
- self.bad = []
- self.skipped = []
- self.resource_denieds = []
- self.environment_changed = []
- self.run_no_tests = []
- self.rerun = []
- self.first_result = None
- self.interrupted = False
-
- # used by --slow
- self.test_times = []
-
- # used by --coverage, trace.Trace instance
- self.tracer = None
-
- # used to display the progress bar "[ 3/100]"
- self.start_time = time.monotonic()
- self.test_count = ''
- self.test_count_width = 1
-
- # used by --single
- self.next_single_test = None
- self.next_single_filename = None
-
- # used by --junit-xml
- self.testsuite_xml = None
-
- # misc
- self.win_load_tracker = None
- self.tmp_dir = None
- self.worker_test_name = None
-
- def get_executed(self):
- return (set(self.good) | set(self.bad) | set(self.skipped)
- | set(self.resource_denieds) | set(self.environment_changed)
- | set(self.run_no_tests))
-
- def accumulate_result(self, result, rerun=False):
- test_name = result.test_name
- ok = result.result
-
- if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun:
- self.test_times.append((result.test_time, test_name))
-
- if ok == PASSED:
- self.good.append(test_name)
- elif ok in (FAILED, CHILD_ERROR):
- if not rerun:
- self.bad.append(test_name)
- elif ok == ENV_CHANGED:
- self.environment_changed.append(test_name)
- elif ok == SKIPPED:
- self.skipped.append(test_name)
- elif ok == RESOURCE_DENIED:
- self.skipped.append(test_name)
- self.resource_denieds.append(test_name)
- elif ok == TEST_DID_NOT_RUN:
- self.run_no_tests.append(test_name)
- elif ok == INTERRUPTED:
- self.interrupted = True
- elif ok == TIMEOUT:
- self.bad.append(test_name)
- else:
- raise ValueError("invalid test result: %r" % ok)
-
- if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}:
- self.bad.remove(test_name)
-
- xml_data = result.xml_data
- if xml_data:
- import xml.etree.ElementTree as ET
- for e in xml_data:
- try:
- self.testsuite_xml.append(ET.fromstring(e))
- except ET.ParseError:
- print(xml_data, file=sys.__stderr__)
- raise
-
- def log(self, line=''):
- empty = not line
-
- # add the system load prefix: "load avg: 1.80 "
- load_avg = self.getloadavg()
- if load_avg is not None:
- line = f"load avg: {load_avg:.2f} {line}"
-
- # add the timestamp prefix: "0:01:05 "
- test_time = time.monotonic() - self.start_time
- test_time = datetime.timedelta(seconds=int(test_time))
- line = f"{test_time} {line}"
-
- if empty:
- line = line[:-1]
-
- print(line, flush=True)
-
- def display_progress(self, test_index, text):
- if self.ns.quiet:
- return
-
- # "[ 51/405/1] test_tcl passed"
- line = f"{test_index:{self.test_count_width}}{self.test_count}"
- fails = len(self.bad) + len(self.environment_changed)
- if fails and not self.ns.pgo:
- line = f"{line}/{fails}"
- self.log(f"[{line}] {text}")
-
- def parse_args(self, kwargs):
- ns = _parse_args(sys.argv[1:], **kwargs)
-
- if ns.xmlpath:
- support.junit_xml_list = self.testsuite_xml = []
-
- worker_args = ns.worker_args
- if worker_args is not None:
- from test.libregrtest.runtest_mp import parse_worker_args
- ns, test_name = parse_worker_args(ns.worker_args)
- ns.worker_args = worker_args
- self.worker_test_name = test_name
-
- # Strip .py extensions.
- removepy(ns.args)
-
- if ns.huntrleaks:
- warmup, repetitions, _ = ns.huntrleaks
- if warmup < 1 or repetitions < 1:
- msg = ("Invalid values for the --huntrleaks/-R parameters. The "
- "number of warmups and repetitions must be at least 1 "
- "each (1:1).")
- print(msg, file=sys.stderr, flush=True)
- sys.exit(2)
-
- if ns.tempdir:
- ns.tempdir = os.path.expanduser(ns.tempdir)
-
- self.ns = ns
-
- def find_tests(self, tests):
- self.tests = tests
-
- if self.ns.single:
- self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
- try:
- with open(self.next_single_filename, 'r') as fp:
- next_test = fp.read().strip()
- self.tests = [next_test]
- except OSError:
- pass
-
- if self.ns.fromfile:
- self.tests = []
- # regex to match 'test_builtin' in line:
- # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
- regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
- with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
- for line in fp:
- line = line.split('#', 1)[0]
- line = line.strip()
- match = regex.search(line)
- if match is not None:
- self.tests.append(match.group())
-
- removepy(self.tests)
-
- if self.ns.pgo:
- # add default PGO tests if no tests are specified
- setup_pgo_tests(self.ns)
-
- stdtests = STDTESTS[:]
- nottests = NOTTESTS.copy()
- if self.ns.exclude:
- for arg in self.ns.args:
- if arg in stdtests:
- stdtests.remove(arg)
- nottests.add(arg)
- self.ns.args = []
-
- # if testdir is set, then we are not running the python tests suite, so
- # don't add default tests to be executed or skipped (pass empty values)
- if self.ns.testdir:
- alltests = findtests(self.ns.testdir, list(), set())
- else:
- alltests = findtests(self.ns.testdir, stdtests, nottests)
-
- if not self.ns.fromfile:
- self.selected = self.tests or self.ns.args or alltests
- else:
- self.selected = self.tests
- if self.ns.single:
- self.selected = self.selected[:1]
- try:
- pos = alltests.index(self.selected[0])
- self.next_single_test = alltests[pos + 1]
- except IndexError:
- pass
-
- # Remove all the selected tests that precede start if it's set.
- if self.ns.start:
- try:
- del self.selected[:self.selected.index(self.ns.start)]
- except ValueError:
- print("Couldn't find starting test (%s), using all tests"
- % self.ns.start, file=sys.stderr)
-
- if self.ns.randomize:
- if self.ns.random_seed is None:
- self.ns.random_seed = random.randrange(10000000)
- random.seed(self.ns.random_seed)
- random.shuffle(self.selected)
-
- def list_tests(self):
- for name in self.selected:
- print(name)
-
- def _list_cases(self, suite):
- for test in suite:
- if isinstance(test, unittest.loader._FailedTest):
- continue
- if isinstance(test, unittest.TestSuite):
- self._list_cases(test)
- elif isinstance(test, unittest.TestCase):
- if support.match_test(test):
- print(test.id())
-
- def list_cases(self):
- support.verbose = False
- support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
-
- for test_name in self.selected:
- abstest = get_abs_module(self.ns, test_name)
- try:
- suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
- self._list_cases(suite)
- except unittest.SkipTest:
- self.skipped.append(test_name)
-
- if self.skipped:
- print(file=sys.stderr)
- print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
- printlist(self.skipped, file=sys.stderr)
-
- def rerun_failed_tests(self):
- self.ns.verbose = True
- self.ns.failfast = False
- self.ns.verbose3 = False
-
- self.first_result = self.get_tests_result()
-
- self.log()
- self.log("Re-running failed tests in verbose mode")
- self.rerun = self.bad[:]
- for test_name in self.rerun:
- self.log(f"Re-running {test_name} in verbose mode")
- self.ns.verbose = True
- result = runtest(self.ns, test_name)
-
- self.accumulate_result(result, rerun=True)
-
- if result.result == INTERRUPTED:
- break
-
- if self.bad:
- print(count(len(self.bad), 'test'), "failed again:")
- printlist(self.bad)
-
- self.display_result()
-
- def display_result(self):
- # If running the test suite for PGO then no one cares about results.
- if self.ns.pgo:
- return
-
- print()
- print("== Tests result: %s ==" % self.get_tests_result())
-
- if self.interrupted:
- print("Test suite interrupted by signal SIGINT.")
-
- omitted = set(self.selected) - self.get_executed()
- if omitted:
- print()
- print(count(len(omitted), "test"), "omitted:")
- printlist(omitted)
-
- if self.good and not self.ns.quiet:
- print()
- if (not self.bad
- and not self.skipped
- and not self.interrupted
- and len(self.good) > 1):
- print("All", end=' ')
- print(count(len(self.good), "test"), "OK.")
-
- if self.ns.print_slow:
- self.test_times.sort(reverse=True)
- print()
- print("10 slowest tests:")
- for test_time, test in self.test_times[:10]:
- print("- %s: %s" % (test, format_duration(test_time)))
-
- if self.bad:
- print()
- print(count(len(self.bad), "test"), "failed:")
- printlist(self.bad)
-
- if self.environment_changed:
- print()
- print("{} altered the execution environment:".format(
- count(len(self.environment_changed), "test")))
- printlist(self.environment_changed)
-
- if self.skipped and not self.ns.quiet:
- print()
- print(count(len(self.skipped), "test"), "skipped:")
- printlist(self.skipped)
-
- if self.rerun:
- print()
- print("%s:" % count(len(self.rerun), "re-run test"))
- printlist(self.rerun)
-
- if self.run_no_tests:
- print()
- print(count(len(self.run_no_tests), "test"), "run no tests:")
- printlist(self.run_no_tests)
-
- def run_tests_sequential(self):
- if self.ns.trace:
- import trace
- self.tracer = trace.Trace(trace=False, count=True)
-
- save_modules = sys.modules.keys()
-
- self.log("Run tests sequentially")
-
- previous_test = None
- for test_index, test_name in enumerate(self.tests, 1):
- start_time = time.monotonic()
-
- text = test_name
- if previous_test:
- text = '%s -- %s' % (text, previous_test)
- self.display_progress(test_index, text)
-
- if self.tracer:
- # If we're tracing code coverage, then we don't exit with status
- # if on a false return value from main.
- cmd = ('result = runtest(self.ns, test_name); '
- 'self.accumulate_result(result)')
- ns = dict(locals())
- self.tracer.runctx(cmd, globals=globals(), locals=ns)
- result = ns['result']
- else:
- result = runtest(self.ns, test_name)
- self.accumulate_result(result)
-
- if result.result == INTERRUPTED:
- break
-
- previous_test = format_test_result(result)
- test_time = time.monotonic() - start_time
- if test_time >= PROGRESS_MIN_TIME:
- previous_test = "%s in %s" % (previous_test, format_duration(test_time))
- elif result.result == PASSED:
- # be quiet: say nothing if the test passed shortly
- previous_test = None
-
- # Unload the newly imported modules (best effort finalization)
- for module in sys.modules.keys():
- if module not in save_modules and module.startswith("test."):
- support.unload(module)
-
- if self.ns.failfast and is_failed(result, self.ns):
- break
-
- if previous_test:
- print(previous_test)
-
- def _test_forever(self, tests):
- while True:
- for test_name in tests:
- yield test_name
- if self.bad:
- return
- if self.ns.fail_env_changed and self.environment_changed:
- return
-
- def display_header(self):
- # Print basic platform information
- print("==", platform.python_implementation(), *sys.version.split())
- print("==", platform.platform(aliased=True),
- "%s-endian" % sys.byteorder)
- print("== cwd:", os.getcwd())
- cpu_count = os.cpu_count()
- if cpu_count:
- print("== CPU count:", cpu_count)
- print("== encodings: locale=%s, FS=%s"
- % (locale.getpreferredencoding(False),
- sys.getfilesystemencoding()))
-
- def get_tests_result(self):
- result = []
- if self.bad:
- result.append("FAILURE")
- elif self.ns.fail_env_changed and self.environment_changed:
- result.append("ENV CHANGED")
- elif not any((self.good, self.bad, self.skipped, self.interrupted,
- self.environment_changed)):
- result.append("NO TEST RUN")
-
- if self.interrupted:
- result.append("INTERRUPTED")
-
- if not result:
- result.append("SUCCESS")
-
- result = ', '.join(result)
- if self.first_result:
- result = '%s then %s' % (self.first_result, result)
- return result
-
- def run_tests(self):
- # For a partial run, we do not need to clutter the output.
- if (self.ns.header
- or not(self.ns.pgo or self.ns.quiet or self.ns.single
- or self.tests or self.ns.args)):
- self.display_header()
-
- if self.ns.huntrleaks:
- warmup, repetitions, _ = self.ns.huntrleaks
- if warmup < 3:
- msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
- "3 warmup repetitions can give false positives!")
- print(msg, file=sys.stdout, flush=True)
-
- if self.ns.randomize:
- print("Using random seed", self.ns.random_seed)
-
- if self.ns.forever:
- self.tests = self._test_forever(list(self.selected))
- self.test_count = ''
- self.test_count_width = 3
- else:
- self.tests = iter(self.selected)
- self.test_count = '/{}'.format(len(self.selected))
- self.test_count_width = len(self.test_count) - 1
-
- if self.ns.use_mp:
- from test.libregrtest.runtest_mp import run_tests_multiprocess
- run_tests_multiprocess(self)
- else:
- self.run_tests_sequential()
-
- def finalize(self):
- if self.next_single_filename:
- if self.next_single_test:
- with open(self.next_single_filename, 'w') as fp:
- fp.write(self.next_single_test + '\n')
- else:
- os.unlink(self.next_single_filename)
-
- if self.tracer:
- r = self.tracer.results()
- r.write_results(show_missing=True, summary=True,
- coverdir=self.ns.coverdir)
-
- print()
- duration = time.monotonic() - self.start_time
- print("Total duration: %s" % format_duration(duration))
- print("Tests result: %s" % self.get_tests_result())
-
- if self.ns.runleaks:
- os.system("leaks %d" % os.getpid())
-
- def save_xml_result(self):
- if not self.ns.xmlpath and not self.testsuite_xml:
- return
-
- import xml.etree.ElementTree as ET
- root = ET.Element("testsuites")
-
- # Manually count the totals for the overall summary
- totals = {'tests': 0, 'errors': 0, 'failures': 0}
- for suite in self.testsuite_xml:
- root.append(suite)
- for k in totals:
- try:
- totals[k] += int(suite.get(k, 0))
- except ValueError:
- pass
-
- for k, v in totals.items():
- root.set(k, str(v))
-
- xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath)
- with open(xmlpath, 'wb') as f:
- for s in ET.tostringlist(root):
- f.write(s)
-
- def set_temp_dir(self):
- if self.ns.tempdir:
- self.tmp_dir = self.ns.tempdir
-
- if not self.tmp_dir:
- # When tests are run from the Python build directory, it is best practice
- # to keep the test files in a subfolder. This eases the cleanup of leftover
- # files using the "make distclean" command.
- if sysconfig.is_python_build():
- self.tmp_dir = sysconfig.get_config_var('abs_builddir')
- if self.tmp_dir is None:
- # bpo-30284: On Windows, only srcdir is available. Using
- # abs_builddir mostly matters on UNIX when building Python
- # out of the source tree, especially when the source tree
- # is read only.
- self.tmp_dir = sysconfig.get_config_var('srcdir')
- self.tmp_dir = os.path.join(self.tmp_dir, 'build')
- else:
- self.tmp_dir = tempfile.gettempdir()
-
- self.tmp_dir = os.path.abspath(self.tmp_dir)
-
- def create_temp_dir(self):
- os.makedirs(self.tmp_dir, exist_ok=True)
-
- # Define a writable temp dir that will be used as cwd while running
- # the tests. The name of the dir includes the pid to allow parallel
- # testing (see the -j option).
- pid = os.getpid()
- if self.worker_test_name is not None:
- test_cwd = 'test_python_worker_{}'.format(pid)
- else:
- test_cwd = 'test_python_{}'.format(pid)
- test_cwd = os.path.join(self.tmp_dir, test_cwd)
- return test_cwd
-
- def cleanup(self):
- import glob
-
- path = os.path.join(self.tmp_dir, 'test_python_*')
- print("Cleanup %s directory" % self.tmp_dir)
- for name in glob.glob(path):
- if os.path.isdir(name):
- print("Remove directory: %s" % name)
- support.rmtree(name)
- else:
- print("Remove file: %s" % name)
- support.unlink(name)
-
- def main(self, tests=None, **kwargs):
- self.parse_args(kwargs)
-
- self.set_temp_dir()
-
- if self.ns.cleanup:
- self.cleanup()
- sys.exit(0)
-
- test_cwd = self.create_temp_dir()
-
- try:
- # Run the tests in a context manager that temporarily changes the CWD
- # to a temporary and writable directory. If it's not possible to
- # create or change the CWD, the original CWD will be used.
- # The original CWD is available from support.SAVEDCWD.
- with support.temp_cwd(test_cwd, quiet=True):
- # When using multiprocessing, worker processes will use test_cwd
- # as their parent temporary directory. So when the main process
- # exit, it removes also subdirectories of worker processes.
- self.ns.tempdir = test_cwd
-
- self._main(tests, kwargs)
- except SystemExit as exc:
- # bpo-38203: Python can hang at exit in Py_Finalize(), especially
- # on threading._shutdown() call: put a timeout
- faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
-
- sys.exit(exc.code)
-
- def getloadavg(self):
- if self.win_load_tracker is not None:
- return self.win_load_tracker.getloadavg()
-
- if hasattr(os, 'getloadavg'):
- return os.getloadavg()[0]
-
- return None
-
- def _main(self, tests, kwargs):
- if self.worker_test_name is not None:
- from test.libregrtest.runtest_mp import run_tests_worker
- run_tests_worker(self.ns, self.worker_test_name)
-
- if self.ns.wait:
- input("Press any key to continue...")
-
- support.PGO = self.ns.pgo
- support.PGO_EXTENDED = self.ns.pgo_extended
-
- setup_tests(self.ns)
-
- self.find_tests(tests)
-
- if self.ns.list_tests:
- self.list_tests()
- sys.exit(0)
-
- if self.ns.list_cases:
- self.list_cases()
- sys.exit(0)
-
- # If we're on windows and this is the parent runner (not a worker),
- # track the load average.
- if sys.platform == 'win32' and self.worker_test_name is None:
- from test.libregrtest.win_utils import WindowsLoadTracker
-
- try:
- self.win_load_tracker = WindowsLoadTracker()
- except FileNotFoundError as error:
- # Windows IoT Core and Windows Nano Server do not provide
- # typeperf.exe for x64, x86 or ARM
- print(f'Failed to create WindowsLoadTracker: {error}')
-
- try:
- self.run_tests()
- self.display_result()
-
- if self.ns.verbose2 and self.bad:
- self.rerun_failed_tests()
- finally:
- if self.win_load_tracker is not None:
- self.win_load_tracker.close()
- self.win_load_tracker = None
-
- self.finalize()
-
- self.save_xml_result()
-
- if self.bad:
- sys.exit(2)
- if self.interrupted:
- sys.exit(130)
- if self.ns.fail_env_changed and self.environment_changed:
- sys.exit(3)
- sys.exit(0)
-
-
-def main(tests=None, **kwargs):
- """Run the Python suite."""
- Regrtest().main(tests=tests, **kwargs)
diff --git a/Lib/test/libregrtest/pgo.py b/Lib/test/libregrtest/pgo.py
deleted file mode 100644
index 379ff05..0000000
--- a/Lib/test/libregrtest/pgo.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Set of tests run by default if --pgo is specified. The tests below were
-# chosen based on the following criteria: either they exercise a commonly used
-# C extension module or type, or they run some relatively typical Python code.
-# Long running tests should be avoided because the PGO instrumented executable
-# runs slowly.
-PGO_TESTS = [
- 'test_array',
- 'test_base64',
- 'test_binascii',
- 'test_binop',
- 'test_bisect',
- 'test_bytes',
- 'test_bz2',
- 'test_cmath',
- 'test_codecs',
- 'test_collections',
- 'test_complex',
- 'test_dataclasses',
- 'test_datetime',
- 'test_decimal',
- 'test_difflib',
- 'test_embed',
- 'test_float',
- 'test_fstring',
- 'test_functools',
- 'test_generators',
- 'test_hashlib',
- 'test_heapq',
- 'test_int',
- 'test_itertools',
- 'test_json',
- 'test_long',
- 'test_lzma',
- 'test_math',
- 'test_memoryview',
- 'test_operator',
- 'test_ordered_dict',
- 'test_pickle',
- 'test_pprint',
- 'test_re',
- 'test_set',
- 'test_sqlite',
- 'test_statistics',
- 'test_struct',
- 'test_tabnanny',
- 'test_time',
- 'test_unicode',
- 'test_xml_etree',
- 'test_xml_etree_c',
-]
-
-def setup_pgo_tests(ns):
- if not ns.args and not ns.pgo_extended:
- # run default set of tests for PGO training
- ns.args = PGO_TESTS[:]
diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py
deleted file mode 100644
index 8d22123..0000000
--- a/Lib/test/libregrtest/refleak.py
+++ /dev/null
@@ -1,286 +0,0 @@
-import os
-import re
-import sys
-import warnings
-from inspect import isabstract
-from test import support
-try:
- from _abc import _get_dump
-except ImportError:
- import weakref
-
- def _get_dump(cls):
- # Reimplement _get_dump() for pure-Python implementation of
- # the abc module (Lib/_py_abc.py)
- registry_weakrefs = set(weakref.ref(obj) for obj in cls._abc_registry)
- return (registry_weakrefs, cls._abc_cache,
- cls._abc_negative_cache, cls._abc_negative_cache_version)
-
-
-def dash_R(ns, test_name, test_func):
- """Run a test multiple times, looking for reference leaks.
-
- Returns:
- False if the test didn't leak references; True if we detected refleaks.
- """
- # This code is hackish and inelegant, but it seems to do the job.
- import copyreg
- import collections.abc
-
- if not hasattr(sys, 'gettotalrefcount'):
- raise Exception("Tracking reference leaks requires a debug build "
- "of Python")
-
- # Avoid false positives due to various caches
- # filling slowly with random data:
- warm_caches()
-
- # Save current values for dash_R_cleanup() to restore.
- fs = warnings.filters[:]
- ps = copyreg.dispatch_table.copy()
- pic = sys.path_importer_cache.copy()
- try:
- import zipimport
- except ImportError:
- zdc = None # Run unmodified on platforms without zipimport support
- else:
- zdc = zipimport._zip_directory_cache.copy()
- abcs = {}
- for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
- if not isabstract(abc):
- continue
- for obj in abc.__subclasses__() + [abc]:
- abcs[obj] = _get_dump(obj)[0]
-
- # bpo-31217: Integer pool to get a single integer object for the same
- # value. The pool is used to prevent false alarm when checking for memory
- # block leaks. Fill the pool with values in -1000..1000 which are the most
- # common (reference, memory block, file descriptor) differences.
- int_pool = {value: value for value in range(-1000, 1000)}
- def get_pooled_int(value):
- return int_pool.setdefault(value, value)
-
- nwarmup, ntracked, fname = ns.huntrleaks
- fname = os.path.join(support.SAVEDCWD, fname)
- repcount = nwarmup + ntracked
-
- # Pre-allocate to ensure that the loop doesn't allocate anything new
- rep_range = list(range(repcount))
- rc_deltas = [0] * repcount
- alloc_deltas = [0] * repcount
- fd_deltas = [0] * repcount
- getallocatedblocks = sys.getallocatedblocks
- gettotalrefcount = sys.gettotalrefcount
- fd_count = support.fd_count
-
- # initialize variables to make pyflakes quiet
- rc_before = alloc_before = fd_before = 0
-
- if not ns.quiet:
- print("beginning", repcount, "repetitions", file=sys.stderr)
- print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr,
- flush=True)
-
- dash_R_cleanup(fs, ps, pic, zdc, abcs)
-
- for i in rep_range:
- test_func()
- dash_R_cleanup(fs, ps, pic, zdc, abcs)
-
- # dash_R_cleanup() ends with collecting cyclic trash:
- # read memory statistics immediately after.
- alloc_after = getallocatedblocks()
- rc_after = gettotalrefcount()
- fd_after = fd_count()
-
- if not ns.quiet:
- print('.', end='', file=sys.stderr, flush=True)
-
- rc_deltas[i] = get_pooled_int(rc_after - rc_before)
- alloc_deltas[i] = get_pooled_int(alloc_after - alloc_before)
- fd_deltas[i] = get_pooled_int(fd_after - fd_before)
-
- alloc_before = alloc_after
- rc_before = rc_after
- fd_before = fd_after
-
- if not ns.quiet:
- print(file=sys.stderr)
-
- # These checkers return False on success, True on failure
- def check_rc_deltas(deltas):
- # Checker for reference counters and memomry blocks.
- #
- # bpo-30776: Try to ignore false positives:
- #
- # [3, 0, 0]
- # [0, 1, 0]
- # [8, -8, 1]
- #
- # Expected leaks:
- #
- # [5, 5, 6]
- # [10, 1, 1]
- return all(delta >= 1 for delta in deltas)
-
- def check_fd_deltas(deltas):
- return any(deltas)
-
- failed = False
- for deltas, item_name, checker in [
- (rc_deltas, 'references', check_rc_deltas),
- (alloc_deltas, 'memory blocks', check_rc_deltas),
- (fd_deltas, 'file descriptors', check_fd_deltas)
- ]:
- # ignore warmup runs
- deltas = deltas[nwarmup:]
- if checker(deltas):
- msg = '%s leaked %s %s, sum=%s' % (
- test_name, deltas, item_name, sum(deltas))
- print(msg, file=sys.stderr, flush=True)
- with open(fname, "a") as refrep:
- print(msg, file=refrep)
- refrep.flush()
- failed = True
- return failed
-
-
-def dash_R_cleanup(fs, ps, pic, zdc, abcs):
- import copyreg
- import collections.abc
-
- # Restore some original values.
- warnings.filters[:] = fs
- copyreg.dispatch_table.clear()
- copyreg.dispatch_table.update(ps)
- sys.path_importer_cache.clear()
- sys.path_importer_cache.update(pic)
- try:
- import zipimport
- except ImportError:
- pass # Run unmodified on platforms without zipimport support
- else:
- zipimport._zip_directory_cache.clear()
- zipimport._zip_directory_cache.update(zdc)
-
- # clear type cache
- sys._clear_type_cache()
-
- # Clear ABC registries, restoring previously saved ABC registries.
- abs_classes = [getattr(collections.abc, a) for a in collections.abc.__all__]
- abs_classes = filter(isabstract, abs_classes)
- for abc in abs_classes:
- for obj in abc.__subclasses__() + [abc]:
- for ref in abcs.get(obj, set()):
- if ref() is not None:
- obj.register(ref())
- obj._abc_caches_clear()
-
- clear_caches()
-
-
-def clear_caches():
- # Clear the warnings registry, so they can be displayed again
- for mod in sys.modules.values():
- if hasattr(mod, '__warningregistry__'):
- del mod.__warningregistry__
-
- # Flush standard output, so that buffered data is sent to the OS and
- # associated Python objects are reclaimed.
- for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__):
- if stream is not None:
- stream.flush()
-
- # Clear assorted module caches.
- # Don't worry about resetting the cache if the module is not loaded
- try:
- distutils_dir_util = sys.modules['distutils.dir_util']
- except KeyError:
- pass
- else:
- distutils_dir_util._path_created.clear()
- re.purge()
-
- try:
- _strptime = sys.modules['_strptime']
- except KeyError:
- pass
- else:
- _strptime._regex_cache.clear()
-
- try:
- urllib_parse = sys.modules['urllib.parse']
- except KeyError:
- pass
- else:
- urllib_parse.clear_cache()
-
- try:
- urllib_request = sys.modules['urllib.request']
- except KeyError:
- pass
- else:
- urllib_request.urlcleanup()
-
- try:
- linecache = sys.modules['linecache']
- except KeyError:
- pass
- else:
- linecache.clearcache()
-
- try:
- mimetypes = sys.modules['mimetypes']
- except KeyError:
- pass
- else:
- mimetypes._default_mime_types()
-
- try:
- filecmp = sys.modules['filecmp']
- except KeyError:
- pass
- else:
- filecmp._cache.clear()
-
- try:
- struct = sys.modules['struct']
- except KeyError:
- pass
- else:
- struct._clearcache()
-
- try:
- doctest = sys.modules['doctest']
- except KeyError:
- pass
- else:
- doctest.master = None
-
- try:
- ctypes = sys.modules['ctypes']
- except KeyError:
- pass
- else:
- ctypes._reset_cache()
-
- try:
- typing = sys.modules['typing']
- except KeyError:
- pass
- else:
- for f in typing._cleanups:
- f()
-
- support.gc_collect()
-
-
-def warm_caches():
- # char cache
- s = bytes(range(256))
- for i in range(256):
- s[i:i+1]
- # unicode cache
- [chr(i) for i in range(256)]
- # int cache
- list(range(-5, 257))
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
deleted file mode 100644
index 558f209..0000000
--- a/Lib/test/libregrtest/runtest.py
+++ /dev/null
@@ -1,340 +0,0 @@
-import collections
-import faulthandler
-import functools
-import gc
-import importlib
-import io
-import os
-import sys
-import time
-import traceback
-import unittest
-
-from test import support
-from test.libregrtest.refleak import dash_R, clear_caches
-from test.libregrtest.save_env import saved_test_environment
-from test.libregrtest.utils import format_duration, print_warning
-
-
-# Test result constants.
-PASSED = 1
-FAILED = 0
-ENV_CHANGED = -1
-SKIPPED = -2
-RESOURCE_DENIED = -3
-INTERRUPTED = -4
-CHILD_ERROR = -5 # error in a child process
-TEST_DID_NOT_RUN = -6
-TIMEOUT = -7
-
-_FORMAT_TEST_RESULT = {
- PASSED: '%s passed',
- FAILED: '%s failed',
- ENV_CHANGED: '%s failed (env changed)',
- SKIPPED: '%s skipped',
- RESOURCE_DENIED: '%s skipped (resource denied)',
- INTERRUPTED: '%s interrupted',
- CHILD_ERROR: '%s crashed',
- TEST_DID_NOT_RUN: '%s run no tests',
- TIMEOUT: '%s timed out',
-}
-
-# Minimum duration of a test to display its duration or to mention that
-# the test is running in background
-PROGRESS_MIN_TIME = 30.0 # seconds
-
-# small set of tests to determine if we have a basically functioning interpreter
-# (i.e. if any of these fail, then anything else is likely to follow)
-STDTESTS = [
- 'test_grammar',
- 'test_opcodes',
- 'test_dict',
- 'test_builtin',
- 'test_exceptions',
- 'test_types',
- 'test_unittest',
- 'test_doctest',
- 'test_doctest2',
- 'test_support'
-]
-
-# set of tests that we don't want to be executed when using regrtest
-NOTTESTS = set()
-
-
-# used by --findleaks, store for gc.garbage
-FOUND_GARBAGE = []
-
-
-def is_failed(result, ns):
- ok = result.result
- if ok in (PASSED, RESOURCE_DENIED, SKIPPED, TEST_DID_NOT_RUN):
- return False
- if ok == ENV_CHANGED:
- return ns.fail_env_changed
- return True
-
-
-def format_test_result(result):
- fmt = _FORMAT_TEST_RESULT.get(result.result, "%s")
- text = fmt % result.test_name
- if result.result == TIMEOUT:
- text = '%s (%s)' % (text, format_duration(result.test_time))
- return text
-
-
-def findtestdir(path=None):
- return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-
-
-def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
- """Return a list of all applicable test modules."""
- testdir = findtestdir(testdir)
- names = os.listdir(testdir)
- tests = []
- others = set(stdtests) | nottests
- for name in names:
- mod, ext = os.path.splitext(name)
- if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
- tests.append(mod)
- return stdtests + sorted(tests)
-
-
-def get_abs_module(ns, test_name):
- if test_name.startswith('test.') or ns.testdir:
- return test_name
- else:
- # Import it from the test package
- return 'test.' + test_name
-
-
-TestResult = collections.namedtuple('TestResult',
- 'test_name result test_time xml_data')
-
-def _runtest(ns, test_name):
- # Handle faulthandler timeout, capture stdout+stderr, XML serialization
- # and measure time.
-
- output_on_failure = ns.verbose3
-
- use_timeout = (ns.timeout is not None)
- if use_timeout:
- faulthandler.dump_traceback_later(ns.timeout, exit=True)
-
- start_time = time.perf_counter()
- try:
- support.set_match_tests(ns.match_tests, ns.ignore_tests)
- support.junit_xml_list = xml_list = [] if ns.xmlpath else None
- if ns.failfast:
- support.failfast = True
-
- if output_on_failure:
- support.verbose = True
-
- stream = io.StringIO()
- orig_stdout = sys.stdout
- orig_stderr = sys.stderr
- try:
- sys.stdout = stream
- sys.stderr = stream
- result = _runtest_inner(ns, test_name,
- display_failure=False)
- if result != PASSED:
- output = stream.getvalue()
- orig_stderr.write(output)
- orig_stderr.flush()
- finally:
- sys.stdout = orig_stdout
- sys.stderr = orig_stderr
- else:
- # Tell tests to be moderately quiet
- support.verbose = ns.verbose
-
- result = _runtest_inner(ns, test_name,
- display_failure=not ns.verbose)
-
- if xml_list:
- import xml.etree.ElementTree as ET
- xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list]
- else:
- xml_data = None
-
- test_time = time.perf_counter() - start_time
-
- return TestResult(test_name, result, test_time, xml_data)
- finally:
- if use_timeout:
- faulthandler.cancel_dump_traceback_later()
- support.junit_xml_list = None
-
-
-def runtest(ns, test_name):
- """Run a single test.
-
- ns -- regrtest namespace of options
- test_name -- the name of the test
-
- Returns the tuple (result, test_time, xml_data), where result is one
- of the constants:
-
- INTERRUPTED KeyboardInterrupt
- RESOURCE_DENIED test skipped because resource denied
- SKIPPED test skipped for some other reason
- ENV_CHANGED test failed because it changed the execution environment
- FAILED test failed
- PASSED test passed
- EMPTY_TEST_SUITE test ran no subtests.
- TIMEOUT test timed out.
-
- If ns.xmlpath is not None, xml_data is a list containing each
- generated testsuite element.
- """
- try:
- return _runtest(ns, test_name)
- except:
- if not ns.pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- return TestResult(test_name, FAILED, 0.0, None)
-
-
-def _test_module(the_module):
- loader = unittest.TestLoader()
- tests = loader.loadTestsFromModule(the_module)
- for error in loader.errors:
- print(error, file=sys.stderr)
- if loader.errors:
- raise Exception("errors while loading tests")
- support.run_unittest(tests)
-
-
-def _runtest_inner2(ns, test_name):
- # Load the test function, run the test function, handle huntrleaks
- # and findleaks to detect leaks
-
- abstest = get_abs_module(ns, test_name)
-
- # remove the module from sys.module to reload it if it was already imported
- support.unload(abstest)
-
- the_module = importlib.import_module(abstest)
-
- # If the test has a test_main, that will run the appropriate
- # tests. If not, use normal unittest test loading.
- test_runner = getattr(the_module, "test_main", None)
- if test_runner is None:
- test_runner = functools.partial(_test_module, the_module)
-
- try:
- if ns.huntrleaks:
- # Return True if the test leaked references
- refleak = dash_R(ns, test_name, test_runner)
- else:
- test_runner()
- refleak = False
- finally:
- cleanup_test_droppings(test_name, ns.verbose)
-
- support.gc_collect()
-
- if gc.garbage:
- support.environment_altered = True
- print_warning(f"{test_name} created {len(gc.garbage)} "
- f"uncollectable object(s).")
-
- # move the uncollectable objects somewhere,
- # so we don't see them again
- FOUND_GARBAGE.extend(gc.garbage)
- gc.garbage.clear()
-
- support.reap_children()
-
- return refleak
-
-
-def _runtest_inner(ns, test_name, display_failure=True):
- # Detect environment changes, handle exceptions.
-
- # Reset the environment_altered flag to detect if a test altered
- # the environment
- support.environment_altered = False
-
- if ns.pgo:
- display_failure = False
-
- try:
- clear_caches()
-
- with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
- refleak = _runtest_inner2(ns, test_name)
- except support.ResourceDenied as msg:
- if not ns.quiet and not ns.pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- return RESOURCE_DENIED
- except unittest.SkipTest as msg:
- if not ns.quiet and not ns.pgo:
- print(f"{test_name} skipped -- {msg}", flush=True)
- return SKIPPED
- except support.TestFailed as exc:
- msg = f"test {test_name} failed"
- if display_failure:
- msg = f"{msg} -- {exc}"
- print(msg, file=sys.stderr, flush=True)
- return FAILED
- except support.TestDidNotRun:
- return TEST_DID_NOT_RUN
- except KeyboardInterrupt:
- print()
- return INTERRUPTED
- except:
- if not ns.pgo:
- msg = traceback.format_exc()
- print(f"test {test_name} crashed -- {msg}",
- file=sys.stderr, flush=True)
- return FAILED
-
- if refleak:
- return FAILED
- if environment.changed:
- return ENV_CHANGED
- return PASSED
-
-
-def cleanup_test_droppings(test_name, verbose):
- # First kill any dangling references to open files etc.
- # This can also issue some ResourceWarnings which would otherwise get
- # triggered during the following test run, and possibly produce failures.
- support.gc_collect()
-
- # Try to clean up junk commonly left behind. While tests shouldn't leave
- # any files or directories behind, when a test fails that can be tedious
- # for it to arrange. The consequences can be especially nasty on Windows,
- # since if a test leaves a file open, it cannot be deleted by name (while
- # there's nothing we can do about that here either, we can display the
- # name of the offending test, which is a real help).
- for name in (support.TESTFN,):
- if not os.path.exists(name):
- continue
-
- if os.path.isdir(name):
- import shutil
- kind, nuker = "directory", shutil.rmtree
- elif os.path.isfile(name):
- kind, nuker = "file", os.unlink
- else:
- raise RuntimeError(f"os.path says {name!r} exists but is neither "
- f"directory nor file")
-
- if verbose:
- print_warning("%r left behind %s %r" % (test_name, kind, name))
- support.environment_altered = True
-
- try:
- import stat
- # fix possible permissions problems that might prevent cleanup
- os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
- nuker(name)
- except Exception as exc:
- print_warning(f"{test_name} left behind {kind} {name!r} "
- f"and it couldn't be removed: {exc}")
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
deleted file mode 100644
index fc12ea7..0000000
--- a/Lib/test/libregrtest/runtest_mp.py
+++ /dev/null
@@ -1,464 +0,0 @@
-import collections
-import faulthandler
-import json
-import os
-import queue
-import signal
-import subprocess
-import sys
-import threading
-import time
-import traceback
-import types
-from test import support
-
-from test.libregrtest.runtest import (
- runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
- format_test_result, TestResult, is_failed, TIMEOUT)
-from test.libregrtest.setup import setup_tests
-from test.libregrtest.utils import format_duration, print_warning
-
-
-# Display the running tests if nothing happened last N seconds
-PROGRESS_UPDATE = 30.0 # seconds
-assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME
-
-# Kill the main process after 5 minutes. It is supposed to write an update
-# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest
-# buildbot workers.
-MAIN_PROCESS_TIMEOUT = 5 * 60.0
-assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE
-
-# Time to wait until a worker completes: should be immediate
-JOIN_TIMEOUT = 30.0 # seconds
-
-USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
-
-
-def must_stop(result, ns):
- if result.result == INTERRUPTED:
- return True
- if ns.failfast and is_failed(result, ns):
- return True
- return False
-
-
-def parse_worker_args(worker_args):
- ns_dict, test_name = json.loads(worker_args)
- ns = types.SimpleNamespace(**ns_dict)
- return (ns, test_name)
-
-
-def run_test_in_subprocess(testname, ns):
- ns_dict = vars(ns)
- worker_args = (ns_dict, testname)
- worker_args = json.dumps(worker_args)
-
- cmd = [sys.executable, *support.args_from_interpreter_flags(),
- '-u', # Unbuffered stdout and stderr
- '-m', 'test.regrtest',
- '--worker-args', worker_args]
-
- # Running the child from the same working directory as regrtest's original
- # invocation ensures that TEMPDIR for the child is the same when
- # sysconfig.is_python_build() is true. See issue 15300.
- kw = {}
- if USE_PROCESS_GROUP:
- kw['start_new_session'] = True
- return subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True,
- close_fds=(os.name != 'nt'),
- cwd=support.SAVEDCWD,
- **kw)
-
-
-def run_tests_worker(ns, test_name):
- setup_tests(ns)
-
- result = runtest(ns, test_name)
-
- print() # Force a newline (just in case)
-
- # Serialize TestResult as list in JSON
- print(json.dumps(list(result)), flush=True)
- sys.exit(0)
-
-
-# We do not use a generator so multiple threads can call next().
-class MultiprocessIterator:
-
- """A thread-safe iterator over tests for multiprocess mode."""
-
- def __init__(self, tests_iter):
- self.lock = threading.Lock()
- self.tests_iter = tests_iter
-
- def __iter__(self):
- return self
-
- def __next__(self):
- with self.lock:
- if self.tests_iter is None:
- raise StopIteration
- return next(self.tests_iter)
-
- def stop(self):
- with self.lock:
- self.tests_iter = None
-
-
-MultiprocessResult = collections.namedtuple('MultiprocessResult',
- 'result stdout stderr error_msg')
-
-class ExitThread(Exception):
- pass
-
-
-class TestWorkerProcess(threading.Thread):
- def __init__(self, worker_id, runner):
- super().__init__()
- self.worker_id = worker_id
- self.pending = runner.pending
- self.output = runner.output
- self.ns = runner.ns
- self.timeout = runner.worker_timeout
- self.regrtest = runner.regrtest
- self.current_test_name = None
- self.start_time = None
- self._popen = None
- self._killed = False
- self._stopped = False
-
- def __repr__(self):
- info = [f'TestWorkerProcess #{self.worker_id}']
- if self.is_alive():
- info.append("running")
- else:
- info.append('stopped')
- test = self.current_test_name
- if test:
- info.append(f'test={test}')
- popen = self._popen
- if popen is not None:
- dt = time.monotonic() - self.start_time
- info.extend((f'pid={self._popen.pid}',
- f'time={format_duration(dt)}'))
- return '<%s>' % ' '.join(info)
-
- def _kill(self):
- popen = self._popen
- if popen is None:
- return
-
- if self._killed:
- return
- self._killed = True
-
- if USE_PROCESS_GROUP:
- what = f"{self} process group"
- else:
- what = f"{self}"
-
- print(f"Kill {what}", file=sys.stderr, flush=True)
- try:
- if USE_PROCESS_GROUP:
- os.killpg(popen.pid, signal.SIGKILL)
- else:
- popen.kill()
- except ProcessLookupError:
- # popen.kill(): the process completed, the TestWorkerProcess thread
- # read its exit status, but Popen.send_signal() read the returncode
- # just before Popen.wait() set returncode.
- pass
- except OSError as exc:
- print_warning(f"Failed to kill {what}: {exc!r}")
-
- def stop(self):
- # Method called from a different thread to stop this thread
- self._stopped = True
- self._kill()
-
- def mp_result_error(self, test_name, error_type, stdout='', stderr='',
- err_msg=None):
- test_time = time.monotonic() - self.start_time
- result = TestResult(test_name, error_type, test_time, None)
- return MultiprocessResult(result, stdout, stderr, err_msg)
-
- def _run_process(self, test_name):
- self.start_time = time.monotonic()
-
- self.current_test_name = test_name
- try:
- popen = run_test_in_subprocess(test_name, self.ns)
-
- self._killed = False
- self._popen = popen
- except:
- self.current_test_name = None
- raise
-
- try:
- if self._stopped:
- # If kill() has been called before self._popen is set,
- # self._popen is still running. Call again kill()
- # to ensure that the process is killed.
- self._kill()
- raise ExitThread
-
- try:
- stdout, stderr = popen.communicate(timeout=self.timeout)
- retcode = popen.returncode
- assert retcode is not None
- except subprocess.TimeoutExpired:
- if self._stopped:
- # kill() has been called: communicate() fails
- # on reading closed stdout/stderr
- raise ExitThread
-
- # On timeout, kill the process
- self._kill()
-
- # None means TIMEOUT for the caller
- retcode = None
- # bpo-38207: Don't attempt to call communicate() again: on it
- # can hang until all child processes using stdout and stderr
- # pipes completes.
- stdout = stderr = ''
- except OSError:
- if self._stopped:
- # kill() has been called: communicate() fails
- # on reading closed stdout/stderr
- raise ExitThread
- raise
- else:
- stdout = stdout.strip()
- stderr = stderr.rstrip()
-
- return (retcode, stdout, stderr)
- except:
- self._kill()
- raise
- finally:
- self._wait_completed()
- self._popen = None
- self.current_test_name = None
-
- def _runtest(self, test_name):
- retcode, stdout, stderr = self._run_process(test_name)
-
- if retcode is None:
- return self.mp_result_error(test_name, TIMEOUT, stdout, stderr)
-
- err_msg = None
- if retcode != 0:
- err_msg = "Exit code %s" % retcode
- else:
- stdout, _, result = stdout.rpartition("\n")
- stdout = stdout.rstrip()
- if not result:
- err_msg = "Failed to parse worker stdout"
- else:
- try:
- # deserialize run_tests_worker() output
- result = json.loads(result)
- result = TestResult(*result)
- except Exception as exc:
- err_msg = "Failed to parse worker JSON: %s" % exc
-
- if err_msg is not None:
- return self.mp_result_error(test_name, CHILD_ERROR,
- stdout, stderr, err_msg)
-
- return MultiprocessResult(result, stdout, stderr, err_msg)
-
- def run(self):
- while not self._stopped:
- try:
- try:
- test_name = next(self.pending)
- except StopIteration:
- break
-
- mp_result = self._runtest(test_name)
- self.output.put((False, mp_result))
-
- if must_stop(mp_result.result, self.ns):
- break
- except ExitThread:
- break
- except BaseException:
- self.output.put((True, traceback.format_exc()))
- break
-
- def _wait_completed(self):
- popen = self._popen
-
- # stdout and stderr must be closed to ensure that communicate()
- # does not hang
- popen.stdout.close()
- popen.stderr.close()
-
- try:
- popen.wait(JOIN_TIMEOUT)
- except (subprocess.TimeoutExpired, OSError) as exc:
- print_warning(f"Failed to wait for {self} completion "
- f"(timeout={format_duration(JOIN_TIMEOUT)}): "
- f"{exc!r}")
-
- def wait_stopped(self, start_time):
- # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
- # which killed the process. Sometimes, killing the process from the
- # main thread does not interrupt popen.communicate() in
- # TestWorkerProcess thread. This loop with a timeout is a workaround
- # for that.
- #
- # Moreover, if this method fails to join the thread, it is likely
- # that Python will hang at exit while calling threading._shutdown()
- # which tries again to join the blocked thread. Regrtest.main()
- # uses EXIT_TIMEOUT to workaround this second bug.
- while True:
- # Write a message every second
- self.join(1.0)
- if not self.is_alive():
- break
- dt = time.monotonic() - start_time
- self.regrtest.log(f"Waiting for {self} thread "
- f"for {format_duration(dt)}")
- if dt > JOIN_TIMEOUT:
- print_warning(f"Failed to join {self} in {format_duration(dt)}")
- break
-
-
-def get_running(workers):
- running = []
- for worker in workers:
- current_test_name = worker.current_test_name
- if not current_test_name:
- continue
- dt = time.monotonic() - worker.start_time
- if dt >= PROGRESS_MIN_TIME:
- text = '%s (%s)' % (current_test_name, format_duration(dt))
- running.append(text)
- return running
-
-
-class MultiprocessTestRunner:
- def __init__(self, regrtest):
- self.regrtest = regrtest
- self.log = self.regrtest.log
- self.ns = regrtest.ns
- self.output = queue.Queue()
- self.pending = MultiprocessIterator(self.regrtest.tests)
- if self.ns.timeout is not None:
- self.worker_timeout = self.ns.timeout * 1.5
- else:
- self.worker_timeout = None
- self.workers = None
-
- def start_workers(self):
- self.workers = [TestWorkerProcess(index, self)
- for index in range(1, self.ns.use_mp + 1)]
- self.log("Run tests in parallel using %s child processes"
- % len(self.workers))
- for worker in self.workers:
- worker.start()
-
- def stop_workers(self):
- start_time = time.monotonic()
- for worker in self.workers:
- worker.stop()
- for worker in self.workers:
- worker.wait_stopped(start_time)
-
- def _get_result(self):
- if not any(worker.is_alive() for worker in self.workers):
- # all worker threads are done: consume pending results
- try:
- return self.output.get(timeout=0)
- except queue.Empty:
- return None
-
- use_faulthandler = (self.ns.timeout is not None)
- timeout = PROGRESS_UPDATE
- while True:
- if use_faulthandler:
- faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT,
- exit=True)
-
- # wait for a thread
- try:
- return self.output.get(timeout=timeout)
- except queue.Empty:
- pass
-
- # display progress
- running = get_running(self.workers)
- if running and not self.ns.pgo:
- self.log('running: %s' % ', '.join(running))
-
- def display_result(self, mp_result):
- result = mp_result.result
-
- text = format_test_result(result)
- if mp_result.error_msg is not None:
- # CHILD_ERROR
- text += ' (%s)' % mp_result.error_msg
- elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo):
- text += ' (%s)' % format_duration(result.test_time)
- running = get_running(self.workers)
- if running and not self.ns.pgo:
- text += ' -- running: %s' % ', '.join(running)
- self.regrtest.display_progress(self.test_index, text)
-
- def _process_result(self, item):
- if item[0]:
- # Thread got an exception
- format_exc = item[1]
- print_warning(f"regrtest worker thread failed: {format_exc}")
- return True
-
- self.test_index += 1
- mp_result = item[1]
- self.regrtest.accumulate_result(mp_result.result)
- self.display_result(mp_result)
-
- if mp_result.stdout:
- print(mp_result.stdout, flush=True)
- if mp_result.stderr and not self.ns.pgo:
- print(mp_result.stderr, file=sys.stderr, flush=True)
-
- if must_stop(mp_result.result, self.ns):
- return True
-
- return False
-
- def run_tests(self):
- self.start_workers()
-
- self.test_index = 0
- try:
- while True:
- item = self._get_result()
- if item is None:
- break
-
- stop = self._process_result(item)
- if stop:
- break
- except KeyboardInterrupt:
- print()
- self.regrtest.interrupted = True
- finally:
- if self.ns.timeout is not None:
- faulthandler.cancel_dump_traceback_later()
-
- # Always ensure that all worker processes are no longer
- # worker when we exit this function
- self.pending.stop()
- self.stop_workers()
-
-
-def run_tests_multiprocess(regrtest):
- MultiprocessTestRunner(regrtest).run_tests()
diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py
deleted file mode 100644
index e7c27a6..0000000
--- a/Lib/test/libregrtest/save_env.py
+++ /dev/null
@@ -1,303 +0,0 @@
-import asyncio
-import builtins
-import locale
-import logging
-import os
-import shutil
-import sys
-import sysconfig
-import threading
-import urllib.request
-import warnings
-from test import support
-from test.libregrtest.utils import print_warning
-try:
- import _multiprocessing, multiprocessing.process
-except ImportError:
- multiprocessing = None
-
-
-# Unit tests are supposed to leave the execution environment unchanged
-# once they complete. But sometimes tests have bugs, especially when
-# tests fail, and the changes to environment go on to mess up other
-# tests. This can cause issues with buildbot stability, since tests
-# are run in random order and so problems may appear to come and go.
-# There are a few things we can save and restore to mitigate this, and
-# the following context manager handles this task.
-
-class saved_test_environment:
- """Save bits of the test environment and restore them at block exit.
-
- with saved_test_environment(testname, verbose, quiet):
- #stuff
-
- Unless quiet is True, a warning is printed to stderr if any of
- the saved items was changed by the test. The attribute 'changed'
- is initially False, but is set to True if a change is detected.
-
- If verbose is more than 1, the before and after state of changed
- items is also printed.
- """
-
- changed = False
-
- def __init__(self, testname, verbose=0, quiet=False, *, pgo=False):
- self.testname = testname
- self.verbose = verbose
- self.quiet = quiet
- self.pgo = pgo
-
- # To add things to save and restore, add a name XXX to the resources list
- # and add corresponding get_XXX/restore_XXX functions. get_XXX should
- # return the value to be saved and compared against a second call to the
- # get function when test execution completes. restore_XXX should accept
- # the saved value and restore the resource using it. It will be called if
- # and only if a change in the value is detected.
- #
- # Note: XXX will have any '.' replaced with '_' characters when determining
- # the corresponding method names.
-
- resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
- 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
- 'warnings.filters', 'asyncore.socket_map',
- 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
- 'sys.warnoptions',
- # multiprocessing.process._cleanup() may release ref
- # to a thread, so check processes first.
- 'multiprocessing.process._dangling', 'threading._dangling',
- 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
- 'files', 'locale', 'warnings.showwarning',
- 'shutil_archive_formats', 'shutil_unpack_formats',
- 'asyncio.events._event_loop_policy',
- 'urllib.requests._url_tempfiles', 'urllib.requests._opener',
- )
-
- def get_urllib_requests__url_tempfiles(self):
- return list(urllib.request._url_tempfiles)
- def restore_urllib_requests__url_tempfiles(self, tempfiles):
- for filename in tempfiles:
- support.unlink(filename)
-
- def get_urllib_requests__opener(self):
- return urllib.request._opener
- def restore_urllib_requests__opener(self, opener):
- urllib.request._opener = opener
-
- def get_asyncio_events__event_loop_policy(self):
- return support.maybe_get_event_loop_policy()
- def restore_asyncio_events__event_loop_policy(self, policy):
- asyncio.set_event_loop_policy(policy)
-
- def get_sys_argv(self):
- return id(sys.argv), sys.argv, sys.argv[:]
- def restore_sys_argv(self, saved_argv):
- sys.argv = saved_argv[1]
- sys.argv[:] = saved_argv[2]
-
- def get_cwd(self):
- return os.getcwd()
- def restore_cwd(self, saved_cwd):
- os.chdir(saved_cwd)
-
- def get_sys_stdout(self):
- return sys.stdout
- def restore_sys_stdout(self, saved_stdout):
- sys.stdout = saved_stdout
-
- def get_sys_stderr(self):
- return sys.stderr
- def restore_sys_stderr(self, saved_stderr):
- sys.stderr = saved_stderr
-
- def get_sys_stdin(self):
- return sys.stdin
- def restore_sys_stdin(self, saved_stdin):
- sys.stdin = saved_stdin
-
- def get_os_environ(self):
- return id(os.environ), os.environ, dict(os.environ)
- def restore_os_environ(self, saved_environ):
- os.environ = saved_environ[1]
- os.environ.clear()
- os.environ.update(saved_environ[2])
-
- def get_sys_path(self):
- return id(sys.path), sys.path, sys.path[:]
- def restore_sys_path(self, saved_path):
- sys.path = saved_path[1]
- sys.path[:] = saved_path[2]
-
- def get_sys_path_hooks(self):
- return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
- def restore_sys_path_hooks(self, saved_hooks):
- sys.path_hooks = saved_hooks[1]
- sys.path_hooks[:] = saved_hooks[2]
-
- def get_sys_gettrace(self):
- return sys.gettrace()
- def restore_sys_gettrace(self, trace_fxn):
- sys.settrace(trace_fxn)
-
- def get___import__(self):
- return builtins.__import__
- def restore___import__(self, import_):
- builtins.__import__ = import_
-
- def get_warnings_filters(self):
- return id(warnings.filters), warnings.filters, warnings.filters[:]
- def restore_warnings_filters(self, saved_filters):
- warnings.filters = saved_filters[1]
- warnings.filters[:] = saved_filters[2]
-
- def get_asyncore_socket_map(self):
- asyncore = sys.modules.get('asyncore')
- # XXX Making a copy keeps objects alive until __exit__ gets called.
- return asyncore and asyncore.socket_map.copy() or {}
- def restore_asyncore_socket_map(self, saved_map):
- asyncore = sys.modules.get('asyncore')
- if asyncore is not None:
- asyncore.close_all(ignore_all=True)
- asyncore.socket_map.update(saved_map)
-
- def get_shutil_archive_formats(self):
- # we could call get_archives_formats() but that only returns the
- # registry keys; we want to check the values too (the functions that
- # are registered)
- return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
- def restore_shutil_archive_formats(self, saved):
- shutil._ARCHIVE_FORMATS = saved[0]
- shutil._ARCHIVE_FORMATS.clear()
- shutil._ARCHIVE_FORMATS.update(saved[1])
-
- def get_shutil_unpack_formats(self):
- return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
- def restore_shutil_unpack_formats(self, saved):
- shutil._UNPACK_FORMATS = saved[0]
- shutil._UNPACK_FORMATS.clear()
- shutil._UNPACK_FORMATS.update(saved[1])
-
- def get_logging__handlers(self):
- # _handlers is a WeakValueDictionary
- return id(logging._handlers), logging._handlers, logging._handlers.copy()
- def restore_logging__handlers(self, saved_handlers):
- # Can't easily revert the logging state
- pass
-
- def get_logging__handlerList(self):
- # _handlerList is a list of weakrefs to handlers
- return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
- def restore_logging__handlerList(self, saved_handlerList):
- # Can't easily revert the logging state
- pass
-
- def get_sys_warnoptions(self):
- return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
- def restore_sys_warnoptions(self, saved_options):
- sys.warnoptions = saved_options[1]
- sys.warnoptions[:] = saved_options[2]
-
- # Controlling dangling references to Thread objects can make it easier
- # to track reference leaks.
- def get_threading__dangling(self):
- # This copies the weakrefs without making any strong reference
- return threading._dangling.copy()
- def restore_threading__dangling(self, saved):
- threading._dangling.clear()
- threading._dangling.update(saved)
-
- # Same for Process objects
- def get_multiprocessing_process__dangling(self):
- if not multiprocessing:
- return None
- # Unjoined process objects can survive after process exits
- multiprocessing.process._cleanup()
- # This copies the weakrefs without making any strong reference
- return multiprocessing.process._dangling.copy()
- def restore_multiprocessing_process__dangling(self, saved):
- if not multiprocessing:
- return
- multiprocessing.process._dangling.clear()
- multiprocessing.process._dangling.update(saved)
-
- def get_sysconfig__CONFIG_VARS(self):
- # make sure the dict is initialized
- sysconfig.get_config_var('prefix')
- return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
- dict(sysconfig._CONFIG_VARS))
- def restore_sysconfig__CONFIG_VARS(self, saved):
- sysconfig._CONFIG_VARS = saved[1]
- sysconfig._CONFIG_VARS.clear()
- sysconfig._CONFIG_VARS.update(saved[2])
-
- def get_sysconfig__INSTALL_SCHEMES(self):
- return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
- sysconfig._INSTALL_SCHEMES.copy())
- def restore_sysconfig__INSTALL_SCHEMES(self, saved):
- sysconfig._INSTALL_SCHEMES = saved[1]
- sysconfig._INSTALL_SCHEMES.clear()
- sysconfig._INSTALL_SCHEMES.update(saved[2])
-
- def get_files(self):
- return sorted(fn + ('/' if os.path.isdir(fn) else '')
- for fn in os.listdir())
- def restore_files(self, saved_value):
- fn = support.TESTFN
- if fn not in saved_value and (fn + '/') not in saved_value:
- if os.path.isfile(fn):
- support.unlink(fn)
- elif os.path.isdir(fn):
- support.rmtree(fn)
-
- _lc = [getattr(locale, lc) for lc in dir(locale)
- if lc.startswith('LC_')]
- def get_locale(self):
- pairings = []
- for lc in self._lc:
- try:
- pairings.append((lc, locale.setlocale(lc, None)))
- except (TypeError, ValueError):
- continue
- return pairings
- def restore_locale(self, saved):
- for lc, setting in saved:
- locale.setlocale(lc, setting)
-
- def get_warnings_showwarning(self):
- return warnings.showwarning
- def restore_warnings_showwarning(self, fxn):
- warnings.showwarning = fxn
-
- def resource_info(self):
- for name in self.resources:
- method_suffix = name.replace('.', '_')
- get_name = 'get_' + method_suffix
- restore_name = 'restore_' + method_suffix
- yield name, getattr(self, get_name), getattr(self, restore_name)
-
- def __enter__(self):
- self.saved_values = dict((name, get()) for name, get, restore
- in self.resource_info())
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- saved_values = self.saved_values
- del self.saved_values
-
- # Some resources use weak references
- support.gc_collect()
-
- # Read support.environment_altered, set by support helper functions
- self.changed |= support.environment_altered
-
- for name, get, restore in self.resource_info():
- current = get()
- original = saved_values.pop(name)
- # Check for changes to the resource's value
- if current != original:
- self.changed = True
- restore(original)
- if not self.quiet and not self.pgo:
- print_warning(f"{name} was modified by {self.testname}")
- print(f" Before: {original}\n After: {current} ",
- file=sys.stderr, flush=True)
- return False
diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py
deleted file mode 100644
index ce81496..0000000
--- a/Lib/test/libregrtest/setup.py
+++ /dev/null
@@ -1,144 +0,0 @@
-import atexit
-import faulthandler
-import os
-import signal
-import sys
-import unittest
-from test import support
-try:
- import gc
-except ImportError:
- gc = None
-
-from test.libregrtest.utils import setup_unraisable_hook
-
-
-def setup_tests(ns):
- try:
- stderr_fd = sys.__stderr__.fileno()
- except (ValueError, AttributeError):
- # Catch ValueError to catch io.UnsupportedOperation on TextIOBase
- # and ValueError on a closed stream.
- #
- # Catch AttributeError for stderr being None.
- stderr_fd = None
- else:
- # Display the Python traceback on fatal errors (e.g. segfault)
- faulthandler.enable(all_threads=True, file=stderr_fd)
-
- # Display the Python traceback on SIGALRM or SIGUSR1 signal
- signals = []
- if hasattr(signal, 'SIGALRM'):
- signals.append(signal.SIGALRM)
- if hasattr(signal, 'SIGUSR1'):
- signals.append(signal.SIGUSR1)
- for signum in signals:
- faulthandler.register(signum, chain=True, file=stderr_fd)
-
- replace_stdout()
- support.record_original_stdout(sys.stdout)
-
- if ns.testdir:
- # Prepend test directory to sys.path, so runtest() will be able
- # to locate tests
- sys.path.insert(0, os.path.abspath(ns.testdir))
-
- # Some times __path__ and __file__ are not absolute (e.g. while running from
- # Lib/) and, if we change the CWD to run the tests in a temporary dir, some
- # imports might fail. This affects only the modules imported before os.chdir().
- # These modules are searched first in sys.path[0] (so '' -- the CWD) and if
- # they are found in the CWD their __file__ and __path__ will be relative (this
- # happens before the chdir). All the modules imported after the chdir, are
- # not found in the CWD, and since the other paths in sys.path[1:] are absolute
- # (site.py absolutize them), the __file__ and __path__ will be absolute too.
- # Therefore it is necessary to absolutize manually the __file__ and __path__ of
- # the packages to prevent later imports to fail when the CWD is different.
- for module in sys.modules.values():
- if hasattr(module, '__path__'):
- for index, path in enumerate(module.__path__):
- module.__path__[index] = os.path.abspath(path)
- if getattr(module, '__file__', None):
- module.__file__ = os.path.abspath(module.__file__)
-
- if ns.huntrleaks:
- unittest.BaseTestSuite._cleanup = False
-
- if ns.memlimit is not None:
- support.set_memlimit(ns.memlimit)
-
- if ns.threshold is not None:
- gc.set_threshold(ns.threshold)
-
- suppress_msvcrt_asserts(ns.verbose and ns.verbose >= 2)
-
- support.use_resources = ns.use_resources
-
- if hasattr(sys, 'addaudithook'):
- # Add an auditing hook for all tests to ensure PySys_Audit is tested
- def _test_audit_hook(name, args):
- pass
- sys.addaudithook(_test_audit_hook)
-
- setup_unraisable_hook()
-
- if ns.timeout is not None:
- # For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT
- support.SHORT_TIMEOUT = max(support.SHORT_TIMEOUT, ns.timeout / 40)
- support.LONG_TIMEOUT = max(support.LONG_TIMEOUT, ns.timeout / 4)
-
- # If --timeout is short: reduce timeouts
- support.LOOPBACK_TIMEOUT = min(support.LOOPBACK_TIMEOUT, ns.timeout)
- support.INTERNET_TIMEOUT = min(support.INTERNET_TIMEOUT, ns.timeout)
- support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, ns.timeout)
- support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, ns.timeout)
-
-
-def suppress_msvcrt_asserts(verbose):
- try:
- import msvcrt
- except ImportError:
- return
-
- msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS|
- msvcrt.SEM_NOALIGNMENTFAULTEXCEPT|
- msvcrt.SEM_NOGPFAULTERRORBOX|
- msvcrt.SEM_NOOPENFILEERRORBOX)
- try:
- msvcrt.CrtSetReportMode
- except AttributeError:
- # release build
- return
-
- for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
- if verbose:
- msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
- msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
- else:
- msvcrt.CrtSetReportMode(m, 0)
-
-
-
-def replace_stdout():
- """Set stdout encoder error handler to backslashreplace (as stderr error
- handler) to avoid UnicodeEncodeError when printing a traceback"""
- stdout = sys.stdout
- try:
- fd = stdout.fileno()
- except ValueError:
- # On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper
- # object. Leaving sys.stdout unchanged.
- #
- # Catch ValueError to catch io.UnsupportedOperation on TextIOBase
- # and ValueError on a closed stream.
- return
-
- sys.stdout = open(fd, 'w',
- encoding=stdout.encoding,
- errors="backslashreplace",
- closefd=False,
- newline='\n')
-
- def restore_stdout():
- sys.stdout.close()
- sys.stdout = stdout
- atexit.register(restore_stdout)
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
deleted file mode 100644
index 40faed8..0000000
--- a/Lib/test/libregrtest/utils.py
+++ /dev/null
@@ -1,81 +0,0 @@
-import math
-import os.path
-import sys
-import textwrap
-from test import support
-
-
-def format_duration(seconds):
- ms = math.ceil(seconds * 1e3)
- seconds, ms = divmod(ms, 1000)
- minutes, seconds = divmod(seconds, 60)
- hours, minutes = divmod(minutes, 60)
-
- parts = []
- if hours:
- parts.append('%s hour' % hours)
- if minutes:
- parts.append('%s min' % minutes)
- if seconds:
- if parts:
- # 2 min 1 sec
- parts.append('%s sec' % seconds)
- else:
- # 1.0 sec
- parts.append('%.1f sec' % (seconds + ms / 1000))
- if not parts:
- return '%s ms' % ms
-
- parts = parts[:2]
- return ' '.join(parts)
-
-
-def removepy(names):
- if not names:
- return
- for idx, name in enumerate(names):
- basename, ext = os.path.splitext(name)
- if ext == '.py':
- names[idx] = basename
-
-
-def count(n, word):
- if n == 1:
- return "%d %s" % (n, word)
- else:
- return "%d %ss" % (n, word)
-
-
-def printlist(x, width=70, indent=4, file=None):
- """Print the elements of iterable x to stdout.
-
- Optional arg width (default 70) is the maximum line length.
- Optional arg indent (default 4) is the number of blanks with which to
- begin each line.
- """
-
- blanks = ' ' * indent
- # Print the sorted list: 'x' may be a '--random' list or a set()
- print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
- initial_indent=blanks, subsequent_indent=blanks),
- file=file)
-
-
-def print_warning(msg):
- print(f"Warning -- {msg}", file=sys.stderr, flush=True)
-
-
-orig_unraisablehook = None
-
-
-def regrtest_unraisable_hook(unraisable):
- global orig_unraisablehook
- support.environment_altered = True
- print_warning("Unraisable exception")
- orig_unraisablehook(unraisable)
-
-
-def setup_unraisable_hook():
- global orig_unraisablehook
- orig_unraisablehook = sys.unraisablehook
- sys.unraisablehook = regrtest_unraisable_hook
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py
deleted file mode 100644
index 028c011..0000000
--- a/Lib/test/libregrtest/win_utils.py
+++ /dev/null
@@ -1,190 +0,0 @@
-import _winapi
-import math
-import msvcrt
-import os
-import subprocess
-import uuid
-import winreg
-from test import support
-from test.libregrtest.utils import print_warning
-
-
-# Max size of asynchronous reads
-BUFSIZE = 8192
-# Seconds per measurement
-SAMPLING_INTERVAL = 1
-# Exponential damping factor to compute exponentially weighted moving average
-# on 1 minute (60 seconds)
-LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
-# Initialize the load using the arithmetic mean of the first NVALUE values
-# of the Processor Queue Length
-NVALUE = 5
-# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
-# of typeperf are registered
-COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
- r"\Perflib\CurrentLanguage")
-
-
-class WindowsLoadTracker():
- """
- This class asynchronously interacts with the `typeperf` command to read
- the system load on Windows. Multiprocessing and threads can't be used
- here because they interfere with the test suite's cases for those
- modules.
- """
-
- def __init__(self):
- self._values = []
- self._load = None
- self._buffer = ''
- self._popen = None
- self.start()
-
- def start(self):
- # Create a named pipe which allows for asynchronous IO in Windows
- pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
-
- open_mode = _winapi.PIPE_ACCESS_INBOUND
- open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
- open_mode |= _winapi.FILE_FLAG_OVERLAPPED
-
- # This is the read end of the pipe, where we will be grabbing output
- self.pipe = _winapi.CreateNamedPipe(
- pipe_name, open_mode, _winapi.PIPE_WAIT,
- 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
- )
- # The write end of the pipe which is passed to the created process
- pipe_write_end = _winapi.CreateFile(
- pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
- _winapi.OPEN_EXISTING, 0, _winapi.NULL
- )
- # Open up the handle as a python file object so we can pass it to
- # subprocess
- command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
-
- # Connect to the read end of the pipe in overlap/async mode
- overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
- overlap.GetOverlappedResult(True)
-
- # Spawn off the load monitor
- counter_name = self._get_counter_name()
- command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
- self._popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
-
- # Close our copy of the write end of the pipe
- os.close(command_stdout)
-
- def _get_counter_name(self):
- # accessing the registry to get the counter localization name
- with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
- counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
-
- # Convert [key1, value1, key2, value2, ...] list
- # to {key1: value1, key2: value2, ...} dict
- counters = iter(counters)
- counters_dict = dict(zip(counters, counters))
-
- # System counter has key '2' and Processor Queue Length has key '44'
- system = counters_dict['2']
- process_queue_length = counters_dict['44']
- return f'"\\{system}\\{process_queue_length}"'
-
- def close(self, kill=True):
- if self._popen is None:
- return
-
- self._load = None
-
- if kill:
- self._popen.kill()
- self._popen.wait()
- self._popen = None
-
- def __del__(self):
- self.close()
-
- def _parse_line(self, line):
- # typeperf outputs in a CSV format like this:
- # "07/19/2018 01:32:26.605","3.000000"
- # (date, process queue length)
- tokens = line.split(',')
- if len(tokens) != 2:
- raise ValueError
-
- value = tokens[1]
- if not value.startswith('"') or not value.endswith('"'):
- raise ValueError
- value = value[1:-1]
- return float(value)
-
- def _read_lines(self):
- overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
- bytes_read, res = overlapped.GetOverlappedResult(False)
- if res != 0:
- return ()
-
- output = overlapped.getbuffer()
- output = output.decode('oem', 'replace')
- output = self._buffer + output
- lines = output.splitlines(True)
-
- # bpo-36670: typeperf only writes a newline *before* writing a value,
- # not after. Sometimes, the written line in incomplete (ex: only
- # timestamp, without the process queue length). Only pass the last line
- # to the parser if it's a valid value, otherwise store it in
- # self._buffer.
- try:
- self._parse_line(lines[-1])
- except ValueError:
- self._buffer = lines.pop(-1)
- else:
- self._buffer = ''
-
- return lines
-
- def getloadavg(self):
- if self._popen is None:
- return None
-
- returncode = self._popen.poll()
- if returncode is not None:
- self.close(kill=False)
- return None
-
- try:
- lines = self._read_lines()
- except BrokenPipeError:
- self.close()
- return None
-
- for line in lines:
- line = line.rstrip()
-
- # Ignore the initial header:
- # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
- if 'PDH-CSV' in line:
- continue
-
- # Ignore blank lines
- if not line:
- continue
-
- try:
- processor_queue_length = self._parse_line(line)
- except ValueError:
- print_warning("Failed to parse typeperf output: %a" % line)
- continue
-
- # We use an exponentially weighted moving average, imitating the
- # load calculation on Unix systems.
- # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
- # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- if self._load is not None:
- self._load = (self._load * LOAD_FACTOR_1
- + processor_queue_length * (1.0 - LOAD_FACTOR_1))
- elif len(self._values) < NVALUE:
- self._values.append(processor_queue_length)
- else:
- self._load = sum(self._values) / len(self._values)
-
- return self._load