diff options
Diffstat (limited to 'Lib/test/regrtest.py')
-rw-r--r--[-rwxr-xr-x] | Lib/test/regrtest.py | 1659 |
1 files changed, 13 insertions, 1646 deletions
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index 6b7ce16..21b0edf 100755..100644 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -6,1670 +6,33 @@ Script to run Python regression tests. Run this script with -h or --help for documentation. """ -USAGE = """\ -python -m test [options] [test_name1 [test_name2 ...]] -python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]] -""" - -DESCRIPTION = """\ -Run Python regression tests. - -If no arguments or options are provided, finds all files matching -the pattern "test_*" in the Lib/test subdirectory and runs -them in alphabetical order (but see -M and -u, below, for exceptions). - -For more rigorous testing, it is useful to use the following -command line: - -python -E -Wd -m test [options] [test_name1 ...] -""" - -EPILOG = """\ -Additional option details: - --r randomizes test execution order. You can use --randseed=int to provide an -int seed value for the randomizer; this is useful for reproducing troublesome -test orders. - --s On the first invocation of regrtest using -s, the first test file found -or the first test file given on the command line is run, and the name of -the next test is recorded in a file named pynexttest. If run from the -Python build directory, pynexttest is located in the 'build' subdirectory, -otherwise it is located in tempfile.gettempdir(). On subsequent runs, -the test in pynexttest is run, and the next test is written to pynexttest. -When the last test has been run, pynexttest is deleted. In this way it -is possible to single step through the test files. This is useful when -doing memory analysis on the Python interpreter, which process tends to -consume too many resources to run the full regression test non-stop. - --S is used to continue running tests after an aborted run. It will -maintain the order a standard run (ie, this assumes -r is not used). -This is useful after the tests have prematurely stopped for some external -reason and you want to start running from where you left off rather -than starting from the beginning. - --f reads the names of tests from the file given as f's argument, one -or more test names per line. Whitespace is ignored. Blank lines and -lines beginning with '#' are ignored. This is especially useful for -whittling down failures involving interactions among tests. - --L causes the leaks(1) command to be run just before exit if it exists. -leaks(1) is available on Mac OS X and presumably on some other -FreeBSD-derived systems. - --R runs each test several times and examines sys.gettotalrefcount() to -see if the test appears to be leaking references. The argument should -be of the form stab:run:fname where 'stab' is the number of times the -test is run to let gettotalrefcount settle down, 'run' is the number -of times further it is run and 'fname' is the name of the file the -reports are written to. These parameters all have defaults (5, 4 and -"reflog.txt" respectively), and the minimal invocation is '-R :'. - --M runs tests that require an exorbitant amount of memory. These tests -typically try to ascertain containers keep working when containing more than -2 billion objects, which only works on 64-bit systems. There are also some -tests that try to exhaust the address space of the process, which only makes -sense on 32-bit systems with at least 2Gb of memory. The passed-in memlimit, -which is a string in the form of '2.5Gb', determines howmuch memory the -tests will limit themselves to (but they may go slightly over.) The number -shouldn't be more memory than the machine has (including swap memory). You -should also keep in mind that swap memory is generally much, much slower -than RAM, and setting memlimit to all available RAM or higher will heavily -tax the machine. On the other hand, it is no use running these tests with a -limit of less than 2.5Gb, and many require more than 20Gb. Tests that expect -to use more than memlimit memory will be skipped. The big-memory tests -generally run very, very long. - --u is used to specify which special resource intensive tests to run, -such as those requiring large file support or network connectivity. -The argument is a comma-separated list of words indicating the -resources to test. Currently only the following are defined: - - all - Enable all special resources. - - none - Disable all special resources (this is the default). - - audio - Tests that use the audio device. (There are known - cases of broken audio drivers that can crash Python or - even the Linux kernel.) - - curses - Tests that use curses and will modify the terminal's - state and output modes. - - largefile - It is okay to run some test that may create huge - files. These tests can take a long time and may - consume >2GB of disk space temporarily. - - network - It is okay to run tests that use external network - resource, e.g. testing SSL support for sockets. - - decimal - Test the decimal module against a large suite that - verifies compliance with standards. - - cpu - Used for certain CPU-heavy tests. - - subprocess Run all tests for the subprocess module. - - urlfetch - It is okay to download files required on testing. - - gui - Run tests that require a running GUI. - -To enable all resources except one, use '-uall,-<resource>'. For -example, to run all the tests except for the gui tests, give the -option '-uall,-gui'. -""" - # We import importlib *ASAP* in order to test #15386 import importlib -import argparse -import builtins -import faulthandler -import io -import json -import locale -import logging import os -import platform -import random -import re -import shutil -import signal import sys -import sysconfig -import tempfile -import time -import traceback -import unittest -import warnings -from inspect import isabstract - -try: - import threading -except ImportError: - threading = None -try: - import _multiprocessing, multiprocessing.process -except ImportError: - multiprocessing = None - - -# Some times __path__ and __file__ are not absolute (e.g. while running from -# Lib/) and, if we change the CWD to run the tests in a temporary dir, some -# imports might fail. This affects only the modules imported before os.chdir(). -# These modules are searched first in sys.path[0] (so '' -- the CWD) and if -# they are found in the CWD their __file__ and __path__ will be relative (this -# happens before the chdir). All the modules imported after the chdir, are -# not found in the CWD, and since the other paths in sys.path[1:] are absolute -# (site.py absolutize them), the __file__ and __path__ will be absolute too. -# Therefore it is necessary to absolutize manually the __file__ and __path__ of -# the packages to prevent later imports to fail when the CWD is different. -for module in sys.modules.values(): - if hasattr(module, '__path__'): - module.__path__ = [os.path.abspath(path) for path in module.__path__] - if hasattr(module, '__file__'): - module.__file__ = os.path.abspath(module.__file__) - - -# MacOSX (a.k.a. Darwin) has a default stack size that is too small -# for deeply recursive regular expressions. We see this as crashes in -# the Python test suite when running test_re.py and test_sre.py. The -# fix is to set the stack limit to 2048. -# This approach may also be useful for other Unixy platforms that -# suffer from small default stack limits. -if sys.platform == 'darwin': - try: - import resource - except ImportError: - pass - else: - soft, hard = resource.getrlimit(resource.RLIMIT_STACK) - newsoft = min(hard, max(soft, 1024*2048)) - resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard)) - -# Test result constants. -PASSED = 1 -FAILED = 0 -ENV_CHANGED = -1 -SKIPPED = -2 -RESOURCE_DENIED = -3 -INTERRUPTED = -4 -CHILD_ERROR = -5 # error in a child process - -from test import support - -RESOURCE_NAMES = ('audio', 'curses', 'largefile', 'network', - 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui') - -# When tests are run from the Python build directory, it is best practice -# to keep the test files in a subfolder. This eases the cleanup of leftover -# files using the "make distclean" command. -if sysconfig.is_python_build(): - TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build') -else: - TEMPDIR = tempfile.gettempdir() -TEMPDIR = os.path.abspath(TEMPDIR) - -class _ArgParser(argparse.ArgumentParser): - - def error(self, message): - super().error(message + "\nPass -h or --help for complete help.") - -def _create_parser(): - # Set prog to prevent the uninformative "__main__.py" from displaying in - # error messages when using "python -m test ...". - parser = _ArgParser(prog='regrtest.py', - usage=USAGE, - description=DESCRIPTION, - epilog=EPILOG, - add_help=False, - formatter_class=argparse.RawDescriptionHelpFormatter) - - # Arguments with this clause added to its help are described further in - # the epilog's "Additional option details" section. - more_details = ' See the section at bottom for more details.' - - group = parser.add_argument_group('General options') - # We add help explicitly to control what argument group it renders under. - group.add_argument('-h', '--help', action='help', - help='show this help message and exit') - group.add_argument('--timeout', metavar='TIMEOUT', type=float, - help='dump the traceback and exit if a test takes ' - 'more than TIMEOUT seconds; disabled if TIMEOUT ' - 'is negative or equals to zero') - group.add_argument('--wait', action='store_true', - help='wait for user input, e.g., allow a debugger ' - 'to be attached') - group.add_argument('--slaveargs', metavar='ARGS') - group.add_argument('-S', '--start', metavar='START', - help='the name of the test at which to start.' + - more_details) - - group = parser.add_argument_group('Verbosity') - group.add_argument('-v', '--verbose', action='count', - help='run tests in verbose mode with output to stdout') - group.add_argument('-w', '--verbose2', action='store_true', - help='re-run failed tests in verbose mode') - group.add_argument('-W', '--verbose3', action='store_true', - help='display test output on failure') - group.add_argument('-q', '--quiet', action='store_true', - help='no output unless one or more tests fail') - group.add_argument('-o', '--slow', action='store_true', dest='print_slow', - help='print the slowest 10 tests') - group.add_argument('--header', action='store_true', - help='print header with interpreter info') - - group = parser.add_argument_group('Selecting tests') - group.add_argument('-r', '--randomize', action='store_true', - help='randomize test execution order.' + more_details) - group.add_argument('--randseed', metavar='SEED', - dest='random_seed', type=int, - help='pass a random seed to reproduce a previous ' - 'random run') - group.add_argument('-f', '--fromfile', metavar='FILE', - help='read names of tests to run from a file.' + - more_details) - group.add_argument('-x', '--exclude', action='store_true', - help='arguments are tests to *exclude*') - group.add_argument('-s', '--single', action='store_true', - help='single step through a set of tests.' + - more_details) - group.add_argument('-m', '--match', metavar='PAT', - dest='match_tests', - help='match test cases and methods with glob pattern PAT') - group.add_argument('-G', '--failfast', action='store_true', - help='fail as soon as a test fails (only with -v or -W)') - group.add_argument('-u', '--use', metavar='RES1,RES2,...', - action='append', type=resources_list, - help='specify which special resource intensive tests ' - 'to run.' + more_details) - group.add_argument('-M', '--memlimit', metavar='LIMIT', - help='run very large memory-consuming tests.' + - more_details) - group.add_argument('--testdir', metavar='DIR', - type=relative_filename, - help='execute test files in the specified directory ' - '(instead of the Python stdlib test suite)') - - group = parser.add_argument_group('Special runs') - group.add_argument('-l', '--findleaks', action='store_true', - help='if GC is available detect tests that leak memory') - group.add_argument('-L', '--runleaks', action='store_true', - help='run the leaks(1) command just before exit.' + - more_details) - group.add_argument('-R', '--huntrleaks', metavar='RUNCOUNTS', - type=huntrleaks, - help='search for reference leaks (needs debug build, ' - 'very slow).' + more_details) - group.add_argument('-j', '--multiprocess', metavar='PROCESSES', - dest='use_mp', type=int, - help='run PROCESSES processes at once') - group.add_argument('-T', '--coverage', action='store_true', - dest='trace', - help='turn on code coverage tracing using the trace ' - 'module') - group.add_argument('-D', '--coverdir', metavar='DIR', - type=relative_filename, - help='directory where coverage files are put') - group.add_argument('-N', '--nocoverdir', - action='store_const', const=None, dest='coverdir', - help='put coverage files alongside modules') - group.add_argument('-t', '--threshold', metavar='THRESHOLD', - type=int, - help='call gc.set_threshold(THRESHOLD)') - group.add_argument('-n', '--nowindows', action='store_true', - help='suppress error message boxes on Windows') - group.add_argument('-F', '--forever', action='store_true', - help='run the specified tests in a loop, until an ' - 'error happens') - group.add_argument('-P', '--pgo', dest='pgo', action='store_true', - help='enable Profile Guided Optimization training') - - return parser - -def relative_filename(string): - # CWD is replaced with a temporary dir before calling main(), so we - # join it with the saved CWD so it ends up where the user expects. - return os.path.join(support.SAVEDCWD, string) - -def huntrleaks(string): - args = string.split(':') - if len(args) not in (2, 3): - raise argparse.ArgumentTypeError( - 'needs 2 or 3 colon-separated arguments') - nwarmup = int(args[0]) if args[0] else 5 - ntracked = int(args[1]) if args[1] else 4 - fname = args[2] if len(args) > 2 and args[2] else 'reflog.txt' - return nwarmup, ntracked, fname - -def resources_list(string): - u = [x.lower() for x in string.split(',')] - for r in u: - if r == 'all' or r == 'none': - continue - if r[0] == '-': - r = r[1:] - if r not in RESOURCE_NAMES: - raise argparse.ArgumentTypeError('invalid resource: ' + r) - return u - -def _parse_args(args, **kwargs): - # Defaults - ns = argparse.Namespace(testdir=None, verbose=0, quiet=False, - exclude=False, single=False, randomize=False, fromfile=None, - findleaks=False, use_resources=None, trace=False, coverdir='coverage', - runleaks=False, huntrleaks=False, verbose2=False, print_slow=False, - random_seed=None, use_mp=None, verbose3=False, forever=False, - header=False, failfast=False, match_tests=None, pgo=False) - for k, v in kwargs.items(): - if not hasattr(ns, k): - raise TypeError('%r is an invalid keyword argument ' - 'for this function' % k) - setattr(ns, k, v) - if ns.use_resources is None: - ns.use_resources = [] - - parser = _create_parser() - # Issue #14191: argparse doesn't support "intermixed" positional and - # optional arguments. Use parse_known_args() as workaround. - ns.args = parser.parse_known_args(args=args, namespace=ns)[1] - for arg in ns.args: - if arg.startswith('-'): - parser.error("unrecognized arguments: %s" % arg) - sys.exit(1) - - if ns.single and ns.fromfile: - parser.error("-s and -f don't go together!") - if ns.use_mp and ns.trace: - parser.error("-T and -j don't go together!") - if ns.use_mp and ns.findleaks: - parser.error("-l and -j don't go together!") - if ns.use_mp and ns.memlimit: - parser.error("-M and -j don't go together!") - if ns.failfast and not (ns.verbose or ns.verbose3): - parser.error("-G/--failfast needs either -v or -W") - - if ns.quiet: - ns.verbose = 0 - if ns.timeout is not None: - if hasattr(faulthandler, 'dump_traceback_later'): - if ns.timeout <= 0: - ns.timeout = None - else: - print("Warning: The timeout option requires " - "faulthandler.dump_traceback_later") - ns.timeout = None - if ns.use_mp is not None: - if ns.use_mp <= 0: - # Use all cores + extras for tests that like to sleep - ns.use_mp = 2 + (os.cpu_count() or 1) - if ns.use_mp == 1: - ns.use_mp = None - if ns.use: - for a in ns.use: - for r in a: - if r == 'all': - ns.use_resources[:] = RESOURCE_NAMES - continue - if r == 'none': - del ns.use_resources[:] - continue - remove = False - if r[0] == '-': - remove = True - r = r[1:] - if remove: - if r in ns.use_resources: - ns.use_resources.remove(r) - elif r not in ns.use_resources: - ns.use_resources.append(r) - if ns.random_seed is not None: - ns.randomize = True - - return ns - - -def run_test_in_subprocess(testname, ns): - """Run the given test in a subprocess with --slaveargs. - - ns is the option Namespace parsed from command-line arguments. regrtest - is invoked in a subprocess with the --slaveargs argument; when the - subprocess exits, its return code, stdout and stderr are returned as a - 3-tuple. - """ - from subprocess import Popen, PIPE - base_cmd = ([sys.executable] + support.args_from_interpreter_flags() + - ['-X', 'faulthandler', '-m', 'test.regrtest']) - # required to spawn a new process with PGO flag on/off - if ns.pgo: - base_cmd = base_cmd + ['--pgo'] - slaveargs = ( - (testname, ns.verbose, ns.quiet), - dict(huntrleaks=ns.huntrleaks, - use_resources=ns.use_resources, - output_on_failure=ns.verbose3, - timeout=ns.timeout, failfast=ns.failfast, - match_tests=ns.match_tests, pgo=ns.pgo)) - # Running the child from the same working directory as regrtest's original - # invocation ensures that TEMPDIR for the child is the same when - # sysconfig.is_python_build() is true. See issue 15300. - popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)], - stdout=PIPE, stderr=PIPE, - universal_newlines=True, - close_fds=(os.name != 'nt'), - cwd=support.SAVEDCWD) - stdout, stderr = popen.communicate() - retcode = popen.wait() - return retcode, stdout, stderr - - -def main(tests=None, **kwargs): - """Execute a test suite. - - This also parses command-line options and modifies its behavior - accordingly. - - tests -- a list of strings containing test names (optional) - testdir -- the directory in which to look for tests (optional) - - Users other than the Python test suite will certainly want to - specify testdir; if it's omitted, the directory containing the - Python test suite is searched for. - - If the tests argument is omitted, the tests listed on the - command-line will be used. If that's empty, too, then all *.py - files beginning with test_ will be used. - - The other default arguments (verbose, quiet, exclude, - single, randomize, findleaks, use_resources, trace, coverdir, - print_slow, and random_seed) allow programmers calling main() - directly to set the values that would normally be set by flags - on the command line. - """ - # Display the Python traceback on fatal errors (e.g. segfault) - faulthandler.enable(all_threads=True) - - # Display the Python traceback on SIGALRM or SIGUSR1 signal - signals = [] - if hasattr(signal, 'SIGALRM'): - signals.append(signal.SIGALRM) - if hasattr(signal, 'SIGUSR1'): - signals.append(signal.SIGUSR1) - for signum in signals: - faulthandler.register(signum, chain=True) - - replace_stdout() - - support.record_original_stdout(sys.stdout) - - ns = _parse_args(sys.argv[1:], **kwargs) - - if ns.huntrleaks: - # Avoid false positives due to various caches - # filling slowly with random data: - warm_caches() - if ns.memlimit is not None: - support.set_memlimit(ns.memlimit) - if ns.threshold is not None: - import gc - gc.set_threshold(ns.threshold) - if ns.nowindows: - print('The --nowindows (-n) option is deprecated. ' - 'Use -vv to display assertions in stderr.') - try: - import msvcrt - except ImportError: - pass - else: - msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS| - msvcrt.SEM_NOALIGNMENTFAULTEXCEPT| - msvcrt.SEM_NOGPFAULTERRORBOX| - msvcrt.SEM_NOOPENFILEERRORBOX) - try: - msvcrt.CrtSetReportMode - except AttributeError: - # release build - pass - else: - for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: - if ns.verbose and ns.verbose >= 2: - msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) - msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) - else: - msvcrt.CrtSetReportMode(m, 0) - if ns.wait: - input("Press any key to continue...") - - if ns.slaveargs is not None: - args, kwargs = json.loads(ns.slaveargs) - if kwargs.get('huntrleaks'): - unittest.BaseTestSuite._cleanup = False - try: - result = runtest(*args, **kwargs) - except KeyboardInterrupt: - result = INTERRUPTED, '' - except BaseException as e: - traceback.print_exc() - result = CHILD_ERROR, str(e) - sys.stdout.flush() - print() # Force a newline (just in case) - print(json.dumps(result)) - sys.exit(0) - - good = [] - bad = [] - skipped = [] - resource_denieds = [] - environment_changed = [] - interrupted = False - - if ns.findleaks: - try: - import gc - except ImportError: - print('No GC available, disabling findleaks.') - ns.findleaks = False - else: - # Uncomment the line below to report garbage that is not - # freeable by reference counting alone. By default only - # garbage that is not collectable by the GC is reported. - #gc.set_debug(gc.DEBUG_SAVEALL) - found_garbage = [] - - if ns.huntrleaks: - unittest.BaseTestSuite._cleanup = False - - if ns.single: - filename = os.path.join(TEMPDIR, 'pynexttest') - try: - with open(filename, 'r') as fp: - next_test = fp.read().strip() - tests = [next_test] - except OSError: - pass - - if ns.fromfile: - tests = [] - with open(os.path.join(support.SAVEDCWD, ns.fromfile)) as fp: - count_pat = re.compile(r'\[\s*\d+/\s*\d+\]') - for line in fp: - line = count_pat.sub('', line) - guts = line.split() # assuming no test has whitespace in its name - if guts and not guts[0].startswith('#'): - tests.extend(guts) - - # Strip .py extensions. - removepy(ns.args) - removepy(tests) - - stdtests = STDTESTS[:] - nottests = NOTTESTS.copy() - if ns.exclude: - for arg in ns.args: - if arg in stdtests: - stdtests.remove(arg) - nottests.add(arg) - ns.args = [] - - # For a partial run, we do not need to clutter the output. - if (ns.verbose or ns.header or - not (ns.pgo or ns.quiet or ns.single or tests or ns.args)): - # Print basic platform information - print("==", platform.python_implementation(), *sys.version.split()) - print("== ", platform.platform(aliased=True), - "%s-endian" % sys.byteorder) - print("== ", "hash algorithm:", sys.hash_info.algorithm, - "64bit" if sys.maxsize > 2**32 else "32bit") - print("== ", os.getcwd()) - print("Testing with flags:", sys.flags) - - # if testdir is set, then we are not running the python tests suite, so - # don't add default tests to be executed or skipped (pass empty values) - if ns.testdir: - alltests = findtests(ns.testdir, list(), set()) - else: - alltests = findtests(ns.testdir, stdtests, nottests) - - selected = tests or ns.args or alltests - if ns.single: - selected = selected[:1] - try: - next_single_test = alltests[alltests.index(selected[0])+1] - except IndexError: - next_single_test = None - # Remove all the selected tests that precede start if it's set. - if ns.start: - try: - del selected[:selected.index(ns.start)] - except ValueError: - print("Couldn't find starting test (%s), using all tests" % ns.start) - if ns.randomize: - if ns.random_seed is None: - ns.random_seed = random.randrange(10000000) - random.seed(ns.random_seed) - print("Using random seed", ns.random_seed) - random.shuffle(selected) - if ns.trace: - import trace, tempfile - tracer = trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix, - tempfile.gettempdir()], - trace=False, count=True) - - test_times = [] - support.verbose = ns.verbose # Tell tests to be moderately quiet - support.use_resources = ns.use_resources - save_modules = sys.modules.keys() - - def accumulate_result(test, result): - ok, test_time = result - if ok not in (CHILD_ERROR, INTERRUPTED): - test_times.append((test_time, test)) - if ok == PASSED: - good.append(test) - elif ok == FAILED: - bad.append(test) - elif ok == ENV_CHANGED: - environment_changed.append(test) - elif ok == SKIPPED: - skipped.append(test) - elif ok == RESOURCE_DENIED: - skipped.append(test) - resource_denieds.append(test) - - if ns.forever: - def test_forever(tests=list(selected)): - while True: - for test in tests: - yield test - if bad: - return - tests = test_forever() - test_count = '' - test_count_width = 3 - else: - tests = iter(selected) - test_count = '/{}'.format(len(selected)) - test_count_width = len(test_count) - 1 - - if ns.use_mp: - try: - from threading import Thread - except ImportError: - print("Multiprocess option requires thread support") - sys.exit(2) - from queue import Queue - debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$") - output = Queue() - pending = MultiprocessTests(tests) - def work(): - # A worker thread. - try: - while True: - try: - test = next(pending) - except StopIteration: - output.put((None, None, None, None)) - return - retcode, stdout, stderr = run_test_in_subprocess(test, ns) - # Strip last refcount output line if it exists, since it - # comes from the shutdown of the interpreter in the subcommand. - stderr = debug_output_pat.sub("", stderr) - stdout, _, result = stdout.strip().rpartition("\n") - if retcode != 0: - result = (CHILD_ERROR, "Exit code %s" % retcode) - output.put((test, stdout.rstrip(), stderr.rstrip(), result)) - return - if not result: - output.put((None, None, None, None)) - return - result = json.loads(result) - output.put((test, stdout.rstrip(), stderr.rstrip(), result)) - except BaseException: - output.put((None, None, None, None)) - raise - workers = [Thread(target=work) for i in range(ns.use_mp)] - for worker in workers: - worker.start() - finished = 0 - test_index = 1 - try: - while finished < ns.use_mp: - test, stdout, stderr, result = output.get() - if test is None: - finished += 1 - continue - accumulate_result(test, result) - if not ns.quiet: - if bad and not ns.pgo: - fmt = "[{1:{0}}{2}/{3}] {4}" - else: - fmt = "[{1:{0}}{2}] {4}" - print(fmt.format( - test_count_width, test_index, test_count, - len(bad), test)) - if stdout: - print(stdout) - if stderr and not ns.pgo: - print(stderr, file=sys.stderr) - sys.stdout.flush() - sys.stderr.flush() - if result[0] == INTERRUPTED: - raise KeyboardInterrupt - if result[0] == CHILD_ERROR: - raise Exception("Child error on {}: {}".format(test, result[1])) - test_index += 1 - except KeyboardInterrupt: - interrupted = True - pending.interrupted = True - for worker in workers: - worker.join() - else: - for test_index, test in enumerate(tests, 1): - if not ns.quiet: - if bad and not ns.pgo: - fmt = "[{1:{0}}{2}/{3}] {4}" - else: - fmt = "[{1:{0}}{2}] {4}" - print(fmt.format( - test_count_width, test_index, test_count, len(bad), test)) - sys.stdout.flush() - if ns.trace: - # If we're tracing code coverage, then we don't exit with status - # if on a false return value from main. - tracer.runctx('runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)', - globals=globals(), locals=vars()) - else: - try: - result = runtest(test, ns.verbose, ns.quiet, - ns.huntrleaks, - output_on_failure=ns.verbose3, - timeout=ns.timeout, failfast=ns.failfast, - match_tests=ns.match_tests, pgo=ns.pgo) - accumulate_result(test, result) - except KeyboardInterrupt: - interrupted = True - break - if ns.findleaks: - gc.collect() - if gc.garbage: - print("Warning: test created", len(gc.garbage), end=' ') - print("uncollectable object(s).") - # move the uncollectable objects somewhere so we don't see - # them again - found_garbage.extend(gc.garbage) - del gc.garbage[:] - # Unload the newly imported modules (best effort finalization) - for module in sys.modules.keys(): - if module not in save_modules and module.startswith("test."): - support.unload(module) - - if interrupted and not ns.pgo: - # print a newline after ^C - print() - print("Test suite interrupted by signal SIGINT.") - omitted = set(selected) - set(good) - set(bad) - set(skipped) - print(count(len(omitted), "test"), "omitted:") - printlist(omitted) - if good and not ns.quiet and not ns.pgo: - if not bad and not skipped and not interrupted and len(good) > 1: - print("All", end=' ') - print(count(len(good), "test"), "OK.") - if ns.print_slow: - test_times.sort(reverse=True) - print("10 slowest tests:") - for time, test in test_times[:10]: - print("%s: %.1fs" % (test, time)) - if bad and not ns.pgo: - print(count(len(bad), "test"), "failed:") - printlist(bad) - if environment_changed and not ns.pgo: - print("{} altered the execution environment:".format( - count(len(environment_changed), "test"))) - printlist(environment_changed) - if skipped and not ns.quiet and not ns.pgo: - print(count(len(skipped), "test"), "skipped:") - printlist(skipped) - - if ns.verbose2 and bad: - print("Re-running failed tests in verbose mode") - for test in bad[:]: - if not ns.pgo: - print("Re-running test %r in verbose mode" % test) - sys.stdout.flush() - try: - ns.verbose = True - ok = runtest(test, True, ns.quiet, ns.huntrleaks, - timeout=ns.timeout, pgo=ns.pgo) - except KeyboardInterrupt: - # print a newline separate from the ^C - print() - break - else: - if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: - bad.remove(test) - else: - if bad: - print(count(len(bad), 'test'), "failed again:") - printlist(bad) - - if ns.single: - if next_single_test: - with open(filename, 'w') as fp: - fp.write(next_single_test + '\n') - else: - os.unlink(filename) - - if ns.trace: - r = tracer.results() - r.write_results(show_missing=True, summary=True, coverdir=ns.coverdir) - - if ns.runleaks: - os.system("leaks %d" % os.getpid()) - - sys.exit(len(bad) > 0 or interrupted) - - -# small set of tests to determine if we have a basically functioning interpreter -# (i.e. if any of these fail, then anything else is likely to follow) -STDTESTS = [ - 'test_grammar', - 'test_opcodes', - 'test_dict', - 'test_builtin', - 'test_exceptions', - 'test_types', - 'test_unittest', - 'test_doctest', - 'test_doctest2', - 'test_support' -] - -# set of tests that we don't want to be executed when using regrtest -NOTTESTS = set() - -def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): - """Return a list of all applicable test modules.""" - testdir = findtestdir(testdir) - names = os.listdir(testdir) - tests = [] - others = set(stdtests) | nottests - for name in names: - mod, ext = os.path.splitext(name) - if mod[:5] == "test_" and ext in (".py", "") and mod not in others: - tests.append(mod) - return stdtests + sorted(tests) +from test.libregrtest import main -# We do not use a generator so multiple threads can call next(). -class MultiprocessTests(object): - """A thread-safe iterator over tests for multiprocess mode.""" +# Alias for backward compatibility (just in case) +main_in_temp_cwd = main - def __init__(self, tests): - self.interrupted = False - self.lock = threading.Lock() - self.tests = tests - def __iter__(self): - return self +def _main(): + global __file__ - def __next__(self): - with self.lock: - if self.interrupted: - raise StopIteration('tests interrupted') - return next(self.tests) - -def replace_stdout(): - """Set stdout encoder error handler to backslashreplace (as stderr error - handler) to avoid UnicodeEncodeError when printing a traceback""" - import atexit - - stdout = sys.stdout - sys.stdout = open(stdout.fileno(), 'w', - encoding=stdout.encoding, - errors="backslashreplace", - closefd=False, - newline='\n') - - def restore_stdout(): - sys.stdout.close() - sys.stdout = stdout - atexit.register(restore_stdout) - -def runtest(test, verbose, quiet, - huntrleaks=False, use_resources=None, - output_on_failure=False, failfast=False, match_tests=None, - timeout=None, *, pgo=False): - """Run a single test. - - test -- the name of the test - verbose -- if true, print more messages - quiet -- if true, don't print 'skipped' messages (probably redundant) - huntrleaks -- run multiple times to test for leaks; requires a debug - build; a triple corresponding to -R's three arguments - use_resources -- list of extra resources to use - output_on_failure -- if true, display test output on failure - timeout -- dump the traceback and exit if a test takes more than - timeout seconds - failfast, match_tests -- See regrtest command-line flags for these. - pgo -- if true, do not print unnecessary info when running the test - for Profile Guided Optimization build - - Returns the tuple result, test_time, where result is one of the constants: - INTERRUPTED KeyboardInterrupt when run under -j - RESOURCE_DENIED test skipped because resource denied - SKIPPED test skipped for some other reason - ENV_CHANGED test failed because it changed the execution environment - FAILED test failed - PASSED test passed - """ - if use_resources is not None: - support.use_resources = use_resources - use_timeout = (timeout is not None) - if use_timeout: - faulthandler.dump_traceback_later(timeout, exit=True) - try: - support.match_tests = match_tests - if failfast: - support.failfast = True - if output_on_failure: - support.verbose = True - - # Reuse the same instance to all calls to runtest(). Some - # tests keep a reference to sys.stdout or sys.stderr - # (eg. test_argparse). - if runtest.stringio is None: - stream = io.StringIO() - runtest.stringio = stream - else: - stream = runtest.stringio - stream.seek(0) - stream.truncate() - - orig_stdout = sys.stdout - orig_stderr = sys.stderr - try: - sys.stdout = stream - sys.stderr = stream - result = runtest_inner(test, verbose, quiet, huntrleaks, - display_failure=False, pgo=pgo) - if result[0] != PASSED and not pgo: - output = stream.getvalue() - orig_stderr.write(output) - orig_stderr.flush() - finally: - sys.stdout = orig_stdout - sys.stderr = orig_stderr - else: - support.verbose = verbose # Tell tests to be moderately quiet - result = runtest_inner(test, verbose, quiet, huntrleaks, - display_failure=not verbose, pgo=pgo) - return result - finally: - if use_timeout: - faulthandler.cancel_dump_traceback_later() - cleanup_test_droppings(test, verbose) -runtest.stringio = None - -# Unit tests are supposed to leave the execution environment unchanged -# once they complete. But sometimes tests have bugs, especially when -# tests fail, and the changes to environment go on to mess up other -# tests. This can cause issues with buildbot stability, since tests -# are run in random order and so problems may appear to come and go. -# There are a few things we can save and restore to mitigate this, and -# the following context manager handles this task. - -class saved_test_environment: - """Save bits of the test environment and restore them at block exit. - - with saved_test_environment(testname, verbose, quiet): - #stuff - - Unless quiet is True, a warning is printed to stderr if any of - the saved items was changed by the test. The attribute 'changed' - is initially False, but is set to True if a change is detected. - - If verbose is more than 1, the before and after state of changed - items is also printed. - """ - - changed = False - - def __init__(self, testname, verbose=0, quiet=False, *, pgo=False): - self.testname = testname - self.verbose = verbose - self.quiet = quiet - self.pgo = pgo - - # To add things to save and restore, add a name XXX to the resources list - # and add corresponding get_XXX/restore_XXX functions. get_XXX should - # return the value to be saved and compared against a second call to the - # get function when test execution completes. restore_XXX should accept - # the saved value and restore the resource using it. It will be called if - # and only if a change in the value is detected. - # - # Note: XXX will have any '.' replaced with '_' characters when determining - # the corresponding method names. - - resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr', - 'os.environ', 'sys.path', 'sys.path_hooks', '__import__', - 'asyncore.socket_map', - 'logging._handlers', 'logging._handlerList', 'sys.gettrace', - 'sys.warnoptions', - # multiprocessing.process._cleanup() may release ref - # to a thread, so check processes first. - 'multiprocessing.process._dangling', 'threading._dangling', - 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES', - 'files', 'locale', 'warnings.showwarning', - 'shutil_archive_formats', 'shutil_unpack_formats', - ) - - def get_sys_argv(self): - return id(sys.argv), sys.argv, sys.argv[:] - def restore_sys_argv(self, saved_argv): - sys.argv = saved_argv[1] - sys.argv[:] = saved_argv[2] - - def get_cwd(self): - return os.getcwd() - def restore_cwd(self, saved_cwd): - os.chdir(saved_cwd) - - def get_sys_stdout(self): - return sys.stdout - def restore_sys_stdout(self, saved_stdout): - sys.stdout = saved_stdout - - def get_sys_stderr(self): - return sys.stderr - def restore_sys_stderr(self, saved_stderr): - sys.stderr = saved_stderr - - def get_sys_stdin(self): - return sys.stdin - def restore_sys_stdin(self, saved_stdin): - sys.stdin = saved_stdin - - def get_os_environ(self): - return id(os.environ), os.environ, dict(os.environ) - def restore_os_environ(self, saved_environ): - os.environ = saved_environ[1] - os.environ.clear() - os.environ.update(saved_environ[2]) - - def get_sys_path(self): - return id(sys.path), sys.path, sys.path[:] - def restore_sys_path(self, saved_path): - sys.path = saved_path[1] - sys.path[:] = saved_path[2] - - def get_sys_path_hooks(self): - return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:] - def restore_sys_path_hooks(self, saved_hooks): - sys.path_hooks = saved_hooks[1] - sys.path_hooks[:] = saved_hooks[2] - - def get_sys_gettrace(self): - return sys.gettrace() - def restore_sys_gettrace(self, trace_fxn): - sys.settrace(trace_fxn) - - def get___import__(self): - return builtins.__import__ - def restore___import__(self, import_): - builtins.__import__ = import_ - - def get_asyncore_socket_map(self): - asyncore = sys.modules.get('asyncore') - # XXX Making a copy keeps objects alive until __exit__ gets called. - return asyncore and asyncore.socket_map.copy() or {} - def restore_asyncore_socket_map(self, saved_map): - asyncore = sys.modules.get('asyncore') - if asyncore is not None: - asyncore.close_all(ignore_all=True) - asyncore.socket_map.update(saved_map) - - def get_shutil_archive_formats(self): - # we could call get_archives_formats() but that only returns the - # registry keys; we want to check the values too (the functions that - # are registered) - return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy() - def restore_shutil_archive_formats(self, saved): - shutil._ARCHIVE_FORMATS = saved[0] - shutil._ARCHIVE_FORMATS.clear() - shutil._ARCHIVE_FORMATS.update(saved[1]) - - def get_shutil_unpack_formats(self): - return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy() - def restore_shutil_unpack_formats(self, saved): - shutil._UNPACK_FORMATS = saved[0] - shutil._UNPACK_FORMATS.clear() - shutil._UNPACK_FORMATS.update(saved[1]) - - def get_logging__handlers(self): - # _handlers is a WeakValueDictionary - return id(logging._handlers), logging._handlers, logging._handlers.copy() - def restore_logging__handlers(self, saved_handlers): - # Can't easily revert the logging state - pass - - def get_logging__handlerList(self): - # _handlerList is a list of weakrefs to handlers - return id(logging._handlerList), logging._handlerList, logging._handlerList[:] - def restore_logging__handlerList(self, saved_handlerList): - # Can't easily revert the logging state - pass - - def get_sys_warnoptions(self): - return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:] - def restore_sys_warnoptions(self, saved_options): - sys.warnoptions = saved_options[1] - sys.warnoptions[:] = saved_options[2] - - # Controlling dangling references to Thread objects can make it easier - # to track reference leaks. - def get_threading__dangling(self): - if not threading: - return None - # This copies the weakrefs without making any strong reference - return threading._dangling.copy() - def restore_threading__dangling(self, saved): - if not threading: - return - threading._dangling.clear() - threading._dangling.update(saved) - - # Same for Process objects - def get_multiprocessing_process__dangling(self): - if not multiprocessing: - return None - # Unjoined process objects can survive after process exits - multiprocessing.process._cleanup() - # This copies the weakrefs without making any strong reference - return multiprocessing.process._dangling.copy() - def restore_multiprocessing_process__dangling(self, saved): - if not multiprocessing: - return - multiprocessing.process._dangling.clear() - multiprocessing.process._dangling.update(saved) - - def get_sysconfig__CONFIG_VARS(self): - # make sure the dict is initialized - sysconfig.get_config_var('prefix') - return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS, - dict(sysconfig._CONFIG_VARS)) - def restore_sysconfig__CONFIG_VARS(self, saved): - sysconfig._CONFIG_VARS = saved[1] - sysconfig._CONFIG_VARS.clear() - sysconfig._CONFIG_VARS.update(saved[2]) - - def get_sysconfig__INSTALL_SCHEMES(self): - return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES, - sysconfig._INSTALL_SCHEMES.copy()) - def restore_sysconfig__INSTALL_SCHEMES(self, saved): - sysconfig._INSTALL_SCHEMES = saved[1] - sysconfig._INSTALL_SCHEMES.clear() - sysconfig._INSTALL_SCHEMES.update(saved[2]) - - def get_files(self): - return sorted(fn + ('/' if os.path.isdir(fn) else '') - for fn in os.listdir()) - def restore_files(self, saved_value): - fn = support.TESTFN - if fn not in saved_value and (fn + '/') not in saved_value: - if os.path.isfile(fn): - support.unlink(fn) - elif os.path.isdir(fn): - support.rmtree(fn) - - _lc = [getattr(locale, lc) for lc in dir(locale) - if lc.startswith('LC_')] - def get_locale(self): - pairings = [] - for lc in self._lc: - try: - pairings.append((lc, locale.setlocale(lc, None))) - except (TypeError, ValueError): - continue - return pairings - def restore_locale(self, saved): - for lc, setting in saved: - locale.setlocale(lc, setting) - - def get_warnings_showwarning(self): - return warnings.showwarning - def restore_warnings_showwarning(self, fxn): - warnings.showwarning = fxn - - def resource_info(self): - for name in self.resources: - method_suffix = name.replace('.', '_') - get_name = 'get_' + method_suffix - restore_name = 'restore_' + method_suffix - yield name, getattr(self, get_name), getattr(self, restore_name) - - def __enter__(self): - self.saved_values = dict((name, get()) for name, get, restore - in self.resource_info()) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - saved_values = self.saved_values - del self.saved_values - support.gc_collect() # Some resources use weak references - for name, get, restore in self.resource_info(): - current = get() - original = saved_values.pop(name) - # Check for changes to the resource's value - if current != original: - self.changed = True - restore(original) - if not self.quiet and not self.pgo: - print("Warning -- {} was modified by {}".format( - name, self.testname), - file=sys.stderr) - if self.verbose > 1 and not self.pgo: - print(" Before: {}\n After: {} ".format( - original, current), - file=sys.stderr) - return False - - -def runtest_inner(test, verbose, quiet, - huntrleaks=False, display_failure=True, pgo=False): - support.unload(test) - - test_time = 0.0 - refleak = False # True if the test leaked references. - try: - if test.startswith('test.'): - abstest = test - else: - # Always import it from the test package - abstest = 'test.' + test - clear_caches() - with saved_test_environment(test, verbose, quiet, pgo=pgo) as environment: - start_time = time.time() - the_module = importlib.import_module(abstest) - # If the test has a test_main, that will run the appropriate - # tests. If not, use normal unittest test loading. - test_runner = getattr(the_module, "test_main", None) - if test_runner is None: - def test_runner(): - loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(the_module) - for error in loader.errors: - print(error, file=sys.stderr) - if loader.errors: - raise Exception("errors while loading tests") - support.run_unittest(tests) - test_runner() - if huntrleaks: - refleak = dash_R(the_module, test, test_runner, huntrleaks) - test_time = time.time() - start_time - except support.ResourceDenied as msg: - if not quiet and not pgo: - print(test, "skipped --", msg) - sys.stdout.flush() - return RESOURCE_DENIED, test_time - except unittest.SkipTest as msg: - if not quiet and not pgo: - print(test, "skipped --", msg) - sys.stdout.flush() - return SKIPPED, test_time - except KeyboardInterrupt: - raise - except support.TestFailed as msg: - if not pgo: - if display_failure: - print("test", test, "failed --", msg, file=sys.stderr) - else: - print("test", test, "failed", file=sys.stderr) - sys.stderr.flush() - return FAILED, test_time - except: - msg = traceback.format_exc() - if not pgo: - print("test", test, "crashed --", msg, file=sys.stderr) - sys.stderr.flush() - return FAILED, test_time - else: - if refleak: - return FAILED, test_time - if environment.changed: - return ENV_CHANGED, test_time - return PASSED, test_time - -def cleanup_test_droppings(testname, verbose): - import shutil - import stat - import gc - - # First kill any dangling references to open files etc. - # This can also issue some ResourceWarnings which would otherwise get - # triggered during the following test run, and possibly produce failures. - gc.collect() - - # Try to clean up junk commonly left behind. While tests shouldn't leave - # any files or directories behind, when a test fails that can be tedious - # for it to arrange. The consequences can be especially nasty on Windows, - # since if a test leaves a file open, it cannot be deleted by name (while - # there's nothing we can do about that here either, we can display the - # name of the offending test, which is a real help). - for name in (support.TESTFN, - "db_home", - ): - if not os.path.exists(name): - continue - - if os.path.isdir(name): - kind, nuker = "directory", shutil.rmtree - elif os.path.isfile(name): - kind, nuker = "file", os.unlink - else: - raise SystemError("os.path says %r exists but is neither " - "directory nor file" % name) - - if verbose: - print("%r left behind %s %r" % (testname, kind, name)) - try: - # if we have chmod, fix possible permissions problems - # that might prevent cleanup - if (hasattr(os, 'chmod')): - os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - nuker(name) - except Exception as msg: - print(("%r left behind %s %r and it couldn't be " - "removed: %s" % (testname, kind, name, msg)), file=sys.stderr) - -def dash_R(the_module, test, indirect_test, huntrleaks): - """Run a test multiple times, looking for reference leaks. - - Returns: - False if the test didn't leak references; True if we detected refleaks. - """ - # This code is hackish and inelegant, but it seems to do the job. - import copyreg - import collections.abc - - if not hasattr(sys, 'gettotalrefcount'): - raise Exception("Tracking reference leaks requires a debug build " - "of Python") - - # Save current values for dash_R_cleanup() to restore. - fs = warnings.filters[:] - ps = copyreg.dispatch_table.copy() - pic = sys.path_importer_cache.copy() - try: - import zipimport - except ImportError: - zdc = None # Run unmodified on platforms without zipimport support - else: - zdc = zipimport._zip_directory_cache.copy() - abcs = {} - for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: - if not isabstract(abc): - continue - for obj in abc.__subclasses__() + [abc]: - abcs[obj] = obj._abc_registry.copy() - - nwarmup, ntracked, fname = huntrleaks - fname = os.path.join(support.SAVEDCWD, fname) - repcount = nwarmup + ntracked - rc_deltas = [0] * repcount - alloc_deltas = [0] * repcount - - print("beginning", repcount, "repetitions", file=sys.stderr) - print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr) - sys.stderr.flush() - for i in range(repcount): - indirect_test() - alloc_after, rc_after = dash_R_cleanup(fs, ps, pic, zdc, abcs) - sys.stderr.write('.') - sys.stderr.flush() - if i >= nwarmup: - rc_deltas[i] = rc_after - rc_before - alloc_deltas[i] = alloc_after - alloc_before - alloc_before, rc_before = alloc_after, rc_after - print(file=sys.stderr) - # These checkers return False on success, True on failure - def check_rc_deltas(deltas): - return any(deltas) - def check_alloc_deltas(deltas): - # At least 1/3rd of 0s - if 3 * deltas.count(0) < len(deltas): - return True - # Nothing else than 1s, 0s and -1s - if not set(deltas) <= {1,0,-1}: - return True - return False - failed = False - for deltas, item_name, checker in [ - (rc_deltas, 'references', check_rc_deltas), - (alloc_deltas, 'memory blocks', check_alloc_deltas)]: - if checker(deltas): - msg = '%s leaked %s %s, sum=%s' % ( - test, deltas[nwarmup:], item_name, sum(deltas)) - print(msg, file=sys.stderr) - sys.stderr.flush() - with open(fname, "a") as refrep: - print(msg, file=refrep) - refrep.flush() - failed = True - return failed - -def dash_R_cleanup(fs, ps, pic, zdc, abcs): - import gc, copyreg - import collections.abc - from weakref import WeakSet - - # Restore some original values. - warnings.filters[:] = fs - copyreg.dispatch_table.clear() - copyreg.dispatch_table.update(ps) - sys.path_importer_cache.clear() - sys.path_importer_cache.update(pic) - try: - import zipimport - except ImportError: - pass # Run unmodified on platforms without zipimport support - else: - zipimport._zip_directory_cache.clear() - zipimport._zip_directory_cache.update(zdc) - - # clear type cache - sys._clear_type_cache() - - # Clear ABC registries, restoring previously saved ABC registries. - for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: - if not isabstract(abc): - continue - for obj in abc.__subclasses__() + [abc]: - obj._abc_registry = abcs.get(obj, WeakSet()).copy() - obj._abc_cache.clear() - obj._abc_negative_cache.clear() - - clear_caches() - - # Collect cyclic trash and read memory statistics immediately after. - func1 = sys.getallocatedblocks - func2 = sys.gettotalrefcount - gc.collect() - return func1(), func2() - -def clear_caches(): - import gc - - # Clear the warnings registry, so they can be displayed again - for mod in sys.modules.values(): - if hasattr(mod, '__warningregistry__'): - del mod.__warningregistry__ - - # Flush standard output, so that buffered data is sent to the OS and - # associated Python objects are reclaimed. - for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__): - if stream is not None: - stream.flush() - - # Clear assorted module caches. - # Don't worry about resetting the cache if the module is not loaded - try: - distutils_dir_util = sys.modules['distutils.dir_util'] - except KeyError: - pass - else: - distutils_dir_util._path_created.clear() - - re.purge() - - try: - _strptime = sys.modules['_strptime'] - except KeyError: - pass - else: - _strptime._regex_cache.clear() - - try: - urllib_parse = sys.modules['urllib.parse'] - except KeyError: - pass - else: - urllib_parse.clear_cache() - - try: - urllib_request = sys.modules['urllib.request'] - except KeyError: - pass - else: - urllib_request.urlcleanup() - - try: - linecache = sys.modules['linecache'] - except KeyError: - pass - else: - linecache.clearcache() - - try: - mimetypes = sys.modules['mimetypes'] - except KeyError: - pass - else: - mimetypes._default_mime_types() - - try: - filecmp = sys.modules['filecmp'] - except KeyError: - pass - else: - filecmp._cache.clear() - - try: - struct = sys.modules['struct'] - except KeyError: - pass - else: - struct._clearcache() - - try: - doctest = sys.modules['doctest'] - except KeyError: - pass - else: - doctest.master = None - - try: - ctypes = sys.modules['ctypes'] - except KeyError: - pass - else: - ctypes._reset_cache() - - try: - typing = sys.modules['typing'] - except KeyError: - pass - else: - for f in typing._cleanups: - f() - - gc.collect() - -def warm_caches(): - # char cache - s = bytes(range(256)) - for i in range(256): - s[i:i+1] - # unicode cache - x = [chr(i) for i in range(256)] - # int cache - x = list(range(-5, 257)) - -def findtestdir(path=None): - return path or os.path.dirname(__file__) or os.curdir - -def removepy(names): - if not names: - return - for idx, name in enumerate(names): - basename, ext = os.path.splitext(name) - if ext == '.py': - names[idx] = basename - -def count(n, word): - if n == 1: - return "%d %s" % (n, word) - else: - return "%d %ss" % (n, word) - -def printlist(x, width=70, indent=4): - """Print the elements of iterable x to stdout. - - Optional arg width (default 70) is the maximum line length. - Optional arg indent (default 4) is the number of blanks with which to - begin each line. - """ - - from textwrap import fill - blanks = ' ' * indent - # Print the sorted list: 'x' may be a '--random' list or a set() - print(fill(' '.join(str(elt) for elt in sorted(x)), width, - initial_indent=blanks, subsequent_indent=blanks)) - - -def main_in_temp_cwd(): - """Run main() in a temporary working directory.""" - if sysconfig.is_python_build(): - try: - os.mkdir(TEMPDIR) - except FileExistsError: - pass - - # Define a writable temp dir that will be used as cwd while running - # the tests. The name of the dir includes the pid to allow parallel - # testing (see the -j option). - test_cwd = 'test_python_{}'.format(os.getpid()) - test_cwd = os.path.join(TEMPDIR, test_cwd) - - # Run the tests in a context manager that temporarily changes the CWD to a - # temporary and writable directory. If it's not possible to create or - # change the CWD, the original CWD will be used. The original CWD is - # available from support.SAVEDCWD. - with support.temp_cwd(test_cwd, quiet=True): - main() - - -if __name__ == '__main__': # Remove regrtest.py's own directory from the module search path. Despite # the elimination of implicit relative imports, this is still needed to # ensure that submodules of the test package do not inappropriately appear # as top-level modules even when people (or buildbots!) invoke regrtest.py # directly instead of using the -m switch mydir = os.path.abspath(os.path.normpath(os.path.dirname(sys.argv[0]))) - i = len(sys.path) + i = len(sys.path) - 1 while i >= 0: - i -= 1 if os.path.abspath(os.path.normpath(sys.path[i])) == mydir: del sys.path[i] + else: + i -= 1 # findtestdir() gets the dirname out of __file__, so we have to make it # absolute before changing the working directory. @@ -1680,4 +43,8 @@ if __name__ == '__main__': # sanity check assert __file__ == os.path.abspath(sys.argv[0]) - main_in_temp_cwd() + main() + + +if __name__ == '__main__': + _main() |