summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/libregrtest')
-rw-r--r--Lib/test/libregrtest/__init__.py1
-rw-r--r--Lib/test/libregrtest/cmdline.py2
-rw-r--r--Lib/test/libregrtest/main.py547
-rw-r--r--Lib/test/libregrtest/refleak.py165
-rw-r--r--Lib/test/libregrtest/runtest.py271
-rw-r--r--Lib/test/libregrtest/save_env.py284
6 files changed, 1269 insertions, 1 deletions
diff --git a/Lib/test/libregrtest/__init__.py b/Lib/test/libregrtest/__init__.py
index 554aeed..a882ed2 100644
--- a/Lib/test/libregrtest/__init__.py
+++ b/Lib/test/libregrtest/__init__.py
@@ -1 +1,2 @@
from test.libregrtest.cmdline import _parse_args, RESOURCE_NAMES
+from test.libregrtest.main import main_in_temp_cwd
diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py
index 9464996..19c52a2 100644
--- a/Lib/test/libregrtest/cmdline.py
+++ b/Lib/test/libregrtest/cmdline.py
@@ -1,9 +1,9 @@
import argparse
import faulthandler
import os
-
from test import support
+
USAGE = """\
python -m test [options] [test_name1 [test_name2 ...]]
python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]]
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
new file mode 100644
index 0000000..5d34cff
--- /dev/null
+++ b/Lib/test/libregrtest/main.py
@@ -0,0 +1,547 @@
+import faulthandler
+import json
+import os
+import re
+import sys
+import tempfile
+import sysconfig
+import signal
+import random
+import platform
+import traceback
+import unittest
+from test.libregrtest.runtest import (
+ findtests, runtest, run_test_in_subprocess,
+ STDTESTS, NOTTESTS,
+ PASSED, FAILED, ENV_CHANGED, SKIPPED,
+ RESOURCE_DENIED, INTERRUPTED, CHILD_ERROR)
+from test.libregrtest.refleak import warm_caches
+from test.libregrtest.cmdline import _parse_args
+from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+
+
+# Some times __path__ and __file__ are not absolute (e.g. while running from
+# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
+# imports might fail. This affects only the modules imported before os.chdir().
+# These modules are searched first in sys.path[0] (so '' -- the CWD) and if
+# they are found in the CWD their __file__ and __path__ will be relative (this
+# happens before the chdir). All the modules imported after the chdir, are
+# not found in the CWD, and since the other paths in sys.path[1:] are absolute
+# (site.py absolutize them), the __file__ and __path__ will be absolute too.
+# Therefore it is necessary to absolutize manually the __file__ and __path__ of
+# the packages to prevent later imports to fail when the CWD is different.
+for module in sys.modules.values():
+ if hasattr(module, '__path__'):
+ module.__path__ = [os.path.abspath(path) for path in module.__path__]
+ if hasattr(module, '__file__'):
+ module.__file__ = os.path.abspath(module.__file__)
+
+
+# MacOSX (a.k.a. Darwin) has a default stack size that is too small
+# for deeply recursive regular expressions. We see this as crashes in
+# the Python test suite when running test_re.py and test_sre.py. The
+# fix is to set the stack limit to 2048.
+# This approach may also be useful for other Unixy platforms that
+# suffer from small default stack limits.
+if sys.platform == 'darwin':
+ try:
+ import resource
+ except ImportError:
+ pass
+ else:
+ soft, hard = resource.getrlimit(resource.RLIMIT_STACK)
+ newsoft = min(hard, max(soft, 1024*2048))
+ resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard))
+
+
+# When tests are run from the Python build directory, it is best practice
+# to keep the test files in a subfolder. This eases the cleanup of leftover
+# files using the "make distclean" command.
+if sysconfig.is_python_build():
+ TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
+else:
+ TEMPDIR = tempfile.gettempdir()
+TEMPDIR = os.path.abspath(TEMPDIR)
+
+
+def main(tests=None, **kwargs):
+ """Execute a test suite.
+
+ This also parses command-line options and modifies its behavior
+ accordingly.
+
+ tests -- a list of strings containing test names (optional)
+ testdir -- the directory in which to look for tests (optional)
+
+ Users other than the Python test suite will certainly want to
+ specify testdir; if it's omitted, the directory containing the
+ Python test suite is searched for.
+
+ If the tests argument is omitted, the tests listed on the
+ command-line will be used. If that's empty, too, then all *.py
+ files beginning with test_ will be used.
+
+ The other default arguments (verbose, quiet, exclude,
+ single, randomize, findleaks, use_resources, trace, coverdir,
+ print_slow, and random_seed) allow programmers calling main()
+ directly to set the values that would normally be set by flags
+ on the command line.
+ """
+ # Display the Python traceback on fatal errors (e.g. segfault)
+ faulthandler.enable(all_threads=True)
+
+ # Display the Python traceback on SIGALRM or SIGUSR1 signal
+ signals = []
+ if hasattr(signal, 'SIGALRM'):
+ signals.append(signal.SIGALRM)
+ if hasattr(signal, 'SIGUSR1'):
+ signals.append(signal.SIGUSR1)
+ for signum in signals:
+ faulthandler.register(signum, chain=True)
+
+ replace_stdout()
+
+ support.record_original_stdout(sys.stdout)
+
+ ns = _parse_args(sys.argv[1:], **kwargs)
+
+ if ns.huntrleaks:
+ # Avoid false positives due to various caches
+ # filling slowly with random data:
+ warm_caches()
+ if ns.memlimit is not None:
+ support.set_memlimit(ns.memlimit)
+ if ns.threshold is not None:
+ import gc
+ gc.set_threshold(ns.threshold)
+ if ns.nowindows:
+ import msvcrt
+ msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS|
+ msvcrt.SEM_NOALIGNMENTFAULTEXCEPT|
+ msvcrt.SEM_NOGPFAULTERRORBOX|
+ msvcrt.SEM_NOOPENFILEERRORBOX)
+ try:
+ msvcrt.CrtSetReportMode
+ except AttributeError:
+ # release build
+ pass
+ else:
+ for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
+ msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
+ msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
+ if ns.wait:
+ input("Press any key to continue...")
+
+ if ns.slaveargs is not None:
+ args, kwargs = json.loads(ns.slaveargs)
+ if kwargs.get('huntrleaks'):
+ unittest.BaseTestSuite._cleanup = False
+ try:
+ result = runtest(*args, **kwargs)
+ except KeyboardInterrupt:
+ result = INTERRUPTED, ''
+ except BaseException as e:
+ traceback.print_exc()
+ result = CHILD_ERROR, str(e)
+ sys.stdout.flush()
+ print() # Force a newline (just in case)
+ print(json.dumps(result))
+ sys.exit(0)
+
+ good = []
+ bad = []
+ skipped = []
+ resource_denieds = []
+ environment_changed = []
+ interrupted = False
+
+ if ns.findleaks:
+ try:
+ import gc
+ except ImportError:
+ print('No GC available, disabling findleaks.')
+ ns.findleaks = False
+ else:
+ # Uncomment the line below to report garbage that is not
+ # freeable by reference counting alone. By default only
+ # garbage that is not collectable by the GC is reported.
+ #gc.set_debug(gc.DEBUG_SAVEALL)
+ found_garbage = []
+
+ if ns.huntrleaks:
+ unittest.BaseTestSuite._cleanup = False
+
+ if ns.single:
+ filename = os.path.join(TEMPDIR, 'pynexttest')
+ try:
+ with open(filename, 'r') as fp:
+ next_test = fp.read().strip()
+ tests = [next_test]
+ except OSError:
+ pass
+
+ if ns.fromfile:
+ tests = []
+ with open(os.path.join(support.SAVEDCWD, ns.fromfile)) as fp:
+ count_pat = re.compile(r'\[\s*\d+/\s*\d+\]')
+ for line in fp:
+ line = count_pat.sub('', line)
+ guts = line.split() # assuming no test has whitespace in its name
+ if guts and not guts[0].startswith('#'):
+ tests.extend(guts)
+
+ # Strip .py extensions.
+ removepy(ns.args)
+ removepy(tests)
+
+ stdtests = STDTESTS[:]
+ nottests = NOTTESTS.copy()
+ if ns.exclude:
+ for arg in ns.args:
+ if arg in stdtests:
+ stdtests.remove(arg)
+ nottests.add(arg)
+ ns.args = []
+
+ # For a partial run, we do not need to clutter the output.
+ if ns.verbose or ns.header or not (ns.quiet or ns.single or tests or ns.args):
+ # Print basic platform information
+ print("==", platform.python_implementation(), *sys.version.split())
+ print("== ", platform.platform(aliased=True),
+ "%s-endian" % sys.byteorder)
+ print("== ", "hash algorithm:", sys.hash_info.algorithm,
+ "64bit" if sys.maxsize > 2**32 else "32bit")
+ print("== ", os.getcwd())
+ print("Testing with flags:", sys.flags)
+
+ # if testdir is set, then we are not running the python tests suite, so
+ # don't add default tests to be executed or skipped (pass empty values)
+ if ns.testdir:
+ alltests = findtests(ns.testdir, list(), set())
+ else:
+ alltests = findtests(ns.testdir, stdtests, nottests)
+
+ selected = tests or ns.args or alltests
+ if ns.single:
+ selected = selected[:1]
+ try:
+ next_single_test = alltests[alltests.index(selected[0])+1]
+ except IndexError:
+ next_single_test = None
+ # Remove all the selected tests that precede start if it's set.
+ if ns.start:
+ try:
+ del selected[:selected.index(ns.start)]
+ except ValueError:
+ print("Couldn't find starting test (%s), using all tests" % ns.start)
+ if ns.randomize:
+ if ns.random_seed is None:
+ ns.random_seed = random.randrange(10000000)
+ random.seed(ns.random_seed)
+ print("Using random seed", ns.random_seed)
+ random.shuffle(selected)
+ if ns.trace:
+ import trace, tempfile
+ tracer = trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix,
+ tempfile.gettempdir()],
+ trace=False, count=True)
+
+ test_times = []
+ support.verbose = ns.verbose # Tell tests to be moderately quiet
+ support.use_resources = ns.use_resources
+ save_modules = sys.modules.keys()
+
+ def accumulate_result(test, result):
+ ok, test_time = result
+ test_times.append((test_time, test))
+ if ok == PASSED:
+ good.append(test)
+ elif ok == FAILED:
+ bad.append(test)
+ elif ok == ENV_CHANGED:
+ environment_changed.append(test)
+ elif ok == SKIPPED:
+ skipped.append(test)
+ elif ok == RESOURCE_DENIED:
+ skipped.append(test)
+ resource_denieds.append(test)
+
+ if ns.forever:
+ def test_forever(tests=list(selected)):
+ while True:
+ for test in tests:
+ yield test
+ if bad:
+ return
+ tests = test_forever()
+ test_count = ''
+ test_count_width = 3
+ else:
+ tests = iter(selected)
+ test_count = '/{}'.format(len(selected))
+ test_count_width = len(test_count) - 1
+
+ if ns.use_mp:
+ try:
+ from threading import Thread
+ except ImportError:
+ print("Multiprocess option requires thread support")
+ sys.exit(2)
+ from queue import Queue
+ debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
+ output = Queue()
+ pending = MultiprocessTests(tests)
+ def work():
+ # A worker thread.
+ try:
+ while True:
+ try:
+ test = next(pending)
+ except StopIteration:
+ output.put((None, None, None, None))
+ return
+ retcode, stdout, stderr = run_test_in_subprocess(test, ns)
+ # Strip last refcount output line if it exists, since it
+ # comes from the shutdown of the interpreter in the subcommand.
+ stderr = debug_output_pat.sub("", stderr)
+ stdout, _, result = stdout.strip().rpartition("\n")
+ if retcode != 0:
+ result = (CHILD_ERROR, "Exit code %s" % retcode)
+ output.put((test, stdout.rstrip(), stderr.rstrip(), result))
+ return
+ if not result:
+ output.put((None, None, None, None))
+ return
+ result = json.loads(result)
+ output.put((test, stdout.rstrip(), stderr.rstrip(), result))
+ except BaseException:
+ output.put((None, None, None, None))
+ raise
+ workers = [Thread(target=work) for i in range(ns.use_mp)]
+ for worker in workers:
+ worker.start()
+ finished = 0
+ test_index = 1
+ try:
+ while finished < ns.use_mp:
+ test, stdout, stderr, result = output.get()
+ if test is None:
+ finished += 1
+ continue
+ accumulate_result(test, result)
+ if not ns.quiet:
+ fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}"
+ print(fmt.format(
+ test_count_width, test_index, test_count,
+ len(bad), test))
+ if stdout:
+ print(stdout)
+ if stderr:
+ print(stderr, file=sys.stderr)
+ sys.stdout.flush()
+ sys.stderr.flush()
+ if result[0] == INTERRUPTED:
+ raise KeyboardInterrupt
+ if result[0] == CHILD_ERROR:
+ raise Exception("Child error on {}: {}".format(test, result[1]))
+ test_index += 1
+ except KeyboardInterrupt:
+ interrupted = True
+ pending.interrupted = True
+ for worker in workers:
+ worker.join()
+ else:
+ for test_index, test in enumerate(tests, 1):
+ if not ns.quiet:
+ fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}"
+ print(fmt.format(
+ test_count_width, test_index, test_count, len(bad), test))
+ sys.stdout.flush()
+ if ns.trace:
+ # If we're tracing code coverage, then we don't exit with status
+ # if on a false return value from main.
+ tracer.runctx('runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)',
+ globals=globals(), locals=vars())
+ else:
+ try:
+ result = runtest(test, ns.verbose, ns.quiet,
+ ns.huntrleaks,
+ output_on_failure=ns.verbose3,
+ timeout=ns.timeout, failfast=ns.failfast,
+ match_tests=ns.match_tests)
+ accumulate_result(test, result)
+ except KeyboardInterrupt:
+ interrupted = True
+ break
+ if ns.findleaks:
+ gc.collect()
+ if gc.garbage:
+ print("Warning: test created", len(gc.garbage), end=' ')
+ print("uncollectable object(s).")
+ # move the uncollectable objects somewhere so we don't see
+ # them again
+ found_garbage.extend(gc.garbage)
+ del gc.garbage[:]
+ # Unload the newly imported modules (best effort finalization)
+ for module in sys.modules.keys():
+ if module not in save_modules and module.startswith("test."):
+ support.unload(module)
+
+ if interrupted:
+ # print a newline after ^C
+ print()
+ print("Test suite interrupted by signal SIGINT.")
+ omitted = set(selected) - set(good) - set(bad) - set(skipped)
+ print(count(len(omitted), "test"), "omitted:")
+ printlist(omitted)
+ if good and not ns.quiet:
+ if not bad and not skipped and not interrupted and len(good) > 1:
+ print("All", end=' ')
+ print(count(len(good), "test"), "OK.")
+ if ns.print_slow:
+ test_times.sort(reverse=True)
+ print("10 slowest tests:")
+ for time, test in test_times[:10]:
+ print("%s: %.1fs" % (test, time))
+ if bad:
+ print(count(len(bad), "test"), "failed:")
+ printlist(bad)
+ if environment_changed:
+ print("{} altered the execution environment:".format(
+ count(len(environment_changed), "test")))
+ printlist(environment_changed)
+ if skipped and not ns.quiet:
+ print(count(len(skipped), "test"), "skipped:")
+ printlist(skipped)
+
+ if ns.verbose2 and bad:
+ print("Re-running failed tests in verbose mode")
+ for test in bad[:]:
+ print("Re-running test %r in verbose mode" % test)
+ sys.stdout.flush()
+ try:
+ ns.verbose = True
+ ok = runtest(test, True, ns.quiet, ns.huntrleaks,
+ timeout=ns.timeout)
+ except KeyboardInterrupt:
+ # print a newline separate from the ^C
+ print()
+ break
+ else:
+ if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
+ bad.remove(test)
+ else:
+ if bad:
+ print(count(len(bad), 'test'), "failed again:")
+ printlist(bad)
+
+ if ns.single:
+ if next_single_test:
+ with open(filename, 'w') as fp:
+ fp.write(next_single_test + '\n')
+ else:
+ os.unlink(filename)
+
+ if ns.trace:
+ r = tracer.results()
+ r.write_results(show_missing=True, summary=True, coverdir=ns.coverdir)
+
+ if ns.runleaks:
+ os.system("leaks %d" % os.getpid())
+
+ sys.exit(len(bad) > 0 or interrupted)
+
+
+# We do not use a generator so multiple threads can call next().
+class MultiprocessTests(object):
+
+ """A thread-safe iterator over tests for multiprocess mode."""
+
+ def __init__(self, tests):
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.tests = tests
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ with self.lock:
+ if self.interrupted:
+ raise StopIteration('tests interrupted')
+ return next(self.tests)
+
+
+def replace_stdout():
+ """Set stdout encoder error handler to backslashreplace (as stderr error
+ handler) to avoid UnicodeEncodeError when printing a traceback"""
+ import atexit
+
+ stdout = sys.stdout
+ sys.stdout = open(stdout.fileno(), 'w',
+ encoding=stdout.encoding,
+ errors="backslashreplace",
+ closefd=False,
+ newline='\n')
+
+ def restore_stdout():
+ sys.stdout.close()
+ sys.stdout = stdout
+ atexit.register(restore_stdout)
+
+
+def removepy(names):
+ if not names:
+ return
+ for idx, name in enumerate(names):
+ basename, ext = os.path.splitext(name)
+ if ext == '.py':
+ names[idx] = basename
+
+
+def count(n, word):
+ if n == 1:
+ return "%d %s" % (n, word)
+ else:
+ return "%d %ss" % (n, word)
+
+
+def printlist(x, width=70, indent=4):
+ """Print the elements of iterable x to stdout.
+
+ Optional arg width (default 70) is the maximum line length.
+ Optional arg indent (default 4) is the number of blanks with which to
+ begin each line.
+ """
+
+ from textwrap import fill
+ blanks = ' ' * indent
+ # Print the sorted list: 'x' may be a '--random' list or a set()
+ print(fill(' '.join(str(elt) for elt in sorted(x)), width,
+ initial_indent=blanks, subsequent_indent=blanks))
+
+
+def main_in_temp_cwd():
+ """Run main() in a temporary working directory."""
+ if sysconfig.is_python_build():
+ try:
+ os.mkdir(TEMPDIR)
+ except FileExistsError:
+ pass
+
+ # Define a writable temp dir that will be used as cwd while running
+ # the tests. The name of the dir includes the pid to allow parallel
+ # testing (see the -j option).
+ test_cwd = 'test_python_{}'.format(os.getpid())
+ test_cwd = os.path.join(TEMPDIR, test_cwd)
+
+ # Run the tests in a context manager that temporarily changes the CWD to a
+ # temporary and writable directory. If it's not possible to create or
+ # change the CWD, the original CWD will be used. The original CWD is
+ # available from support.SAVEDCWD.
+ with support.temp_cwd(test_cwd, quiet=True):
+ main()
diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py
new file mode 100644
index 0000000..dd0d05d
--- /dev/null
+++ b/Lib/test/libregrtest/refleak.py
@@ -0,0 +1,165 @@
+import os
+import re
+import sys
+import warnings
+from inspect import isabstract
+from test import support
+
+
+def dash_R(the_module, test, indirect_test, huntrleaks):
+ """Run a test multiple times, looking for reference leaks.
+
+ Returns:
+ False if the test didn't leak references; True if we detected refleaks.
+ """
+ # This code is hackish and inelegant, but it seems to do the job.
+ import copyreg
+ import collections.abc
+
+ if not hasattr(sys, 'gettotalrefcount'):
+ raise Exception("Tracking reference leaks requires a debug build "
+ "of Python")
+
+ # Save current values for dash_R_cleanup() to restore.
+ fs = warnings.filters[:]
+ ps = copyreg.dispatch_table.copy()
+ pic = sys.path_importer_cache.copy()
+ try:
+ import zipimport
+ except ImportError:
+ zdc = None # Run unmodified on platforms without zipimport support
+ else:
+ zdc = zipimport._zip_directory_cache.copy()
+ abcs = {}
+ for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
+ if not isabstract(abc):
+ continue
+ for obj in abc.__subclasses__() + [abc]:
+ abcs[obj] = obj._abc_registry.copy()
+
+ nwarmup, ntracked, fname = huntrleaks
+ fname = os.path.join(support.SAVEDCWD, fname)
+ repcount = nwarmup + ntracked
+ rc_deltas = [0] * repcount
+ alloc_deltas = [0] * repcount
+
+ print("beginning", repcount, "repetitions", file=sys.stderr)
+ print(("1234567890"*(repcount//10 + 1))[:repcount], file=sys.stderr)
+ sys.stderr.flush()
+ for i in range(repcount):
+ indirect_test()
+ alloc_after, rc_after = dash_R_cleanup(fs, ps, pic, zdc, abcs)
+ sys.stderr.write('.')
+ sys.stderr.flush()
+ if i >= nwarmup:
+ rc_deltas[i] = rc_after - rc_before
+ alloc_deltas[i] = alloc_after - alloc_before
+ alloc_before, rc_before = alloc_after, rc_after
+ print(file=sys.stderr)
+ # These checkers return False on success, True on failure
+ def check_rc_deltas(deltas):
+ return any(deltas)
+ def check_alloc_deltas(deltas):
+ # At least 1/3rd of 0s
+ if 3 * deltas.count(0) < len(deltas):
+ return True
+ # Nothing else than 1s, 0s and -1s
+ if not set(deltas) <= {1,0,-1}:
+ return True
+ return False
+ failed = False
+ for deltas, item_name, checker in [
+ (rc_deltas, 'references', check_rc_deltas),
+ (alloc_deltas, 'memory blocks', check_alloc_deltas)]:
+ if checker(deltas):
+ msg = '%s leaked %s %s, sum=%s' % (
+ test, deltas[nwarmup:], item_name, sum(deltas))
+ print(msg, file=sys.stderr)
+ sys.stderr.flush()
+ with open(fname, "a") as refrep:
+ print(msg, file=refrep)
+ refrep.flush()
+ failed = True
+ return failed
+
+
+def dash_R_cleanup(fs, ps, pic, zdc, abcs):
+ import gc, copyreg
+ import _strptime, linecache
+ import urllib.parse, urllib.request, mimetypes, doctest
+ import struct, filecmp, collections.abc
+ from distutils.dir_util import _path_created
+ from weakref import WeakSet
+
+ # Clear the warnings registry, so they can be displayed again
+ for mod in sys.modules.values():
+ if hasattr(mod, '__warningregistry__'):
+ del mod.__warningregistry__
+
+ # Restore some original values.
+ warnings.filters[:] = fs
+ copyreg.dispatch_table.clear()
+ copyreg.dispatch_table.update(ps)
+ sys.path_importer_cache.clear()
+ sys.path_importer_cache.update(pic)
+ try:
+ import zipimport
+ except ImportError:
+ pass # Run unmodified on platforms without zipimport support
+ else:
+ zipimport._zip_directory_cache.clear()
+ zipimport._zip_directory_cache.update(zdc)
+
+ # clear type cache
+ sys._clear_type_cache()
+
+ # Clear ABC registries, restoring previously saved ABC registries.
+ for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
+ if not isabstract(abc):
+ continue
+ for obj in abc.__subclasses__() + [abc]:
+ obj._abc_registry = abcs.get(obj, WeakSet()).copy()
+ obj._abc_cache.clear()
+ obj._abc_negative_cache.clear()
+
+ # Flush standard output, so that buffered data is sent to the OS and
+ # associated Python objects are reclaimed.
+ for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__):
+ if stream is not None:
+ stream.flush()
+
+ # Clear assorted module caches.
+ _path_created.clear()
+ re.purge()
+ _strptime._regex_cache.clear()
+ urllib.parse.clear_cache()
+ urllib.request.urlcleanup()
+ linecache.clearcache()
+ mimetypes._default_mime_types()
+ filecmp._cache.clear()
+ struct._clearcache()
+ doctest.master = None
+ try:
+ import ctypes
+ except ImportError:
+ # Don't worry about resetting the cache if ctypes is not supported
+ pass
+ else:
+ ctypes._reset_cache()
+
+ # Collect cyclic trash and read memory statistics immediately after.
+ func1 = sys.getallocatedblocks
+ func2 = sys.gettotalrefcount
+ gc.collect()
+ return func1(), func2()
+
+
+def warm_caches():
+ # char cache
+ s = bytes(range(256))
+ for i in range(256):
+ s[i:i+1]
+ # unicode cache
+ x = [chr(i) for i in range(256)]
+ # int cache
+ x = list(range(-5, 257))
diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py
new file mode 100644
index 0000000..d8c0eb2
--- /dev/null
+++ b/Lib/test/libregrtest/runtest.py
@@ -0,0 +1,271 @@
+import faulthandler
+import importlib
+import io
+import json
+import os
+import sys
+import time
+import traceback
+import unittest
+from test import support
+from test.libregrtest.refleak import dash_R
+from test.libregrtest.save_env import saved_test_environment
+
+
+# Test result constants.
+PASSED = 1
+FAILED = 0
+ENV_CHANGED = -1
+SKIPPED = -2
+RESOURCE_DENIED = -3
+INTERRUPTED = -4
+CHILD_ERROR = -5 # error in a child process
+
+
+def run_test_in_subprocess(testname, ns):
+ """Run the given test in a subprocess with --slaveargs.
+
+ ns is the option Namespace parsed from command-line arguments. regrtest
+ is invoked in a subprocess with the --slaveargs argument; when the
+ subprocess exits, its return code, stdout and stderr are returned as a
+ 3-tuple.
+ """
+ from subprocess import Popen, PIPE
+ base_cmd = ([sys.executable] + support.args_from_interpreter_flags() +
+ ['-X', 'faulthandler', '-m', 'test.regrtest'])
+
+ slaveargs = (
+ (testname, ns.verbose, ns.quiet),
+ dict(huntrleaks=ns.huntrleaks,
+ use_resources=ns.use_resources,
+ output_on_failure=ns.verbose3,
+ timeout=ns.timeout, failfast=ns.failfast,
+ match_tests=ns.match_tests))
+ # Running the child from the same working directory as regrtest's original
+ # invocation ensures that TEMPDIR for the child is the same when
+ # sysconfig.is_python_build() is true. See issue 15300.
+ popen = Popen(base_cmd + ['--slaveargs', json.dumps(slaveargs)],
+ stdout=PIPE, stderr=PIPE,
+ universal_newlines=True,
+ close_fds=(os.name != 'nt'),
+ cwd=support.SAVEDCWD)
+ stdout, stderr = popen.communicate()
+ retcode = popen.wait()
+ return retcode, stdout, stderr
+
+
+# small set of tests to determine if we have a basically functioning interpreter
+# (i.e. if any of these fail, then anything else is likely to follow)
+STDTESTS = [
+ 'test_grammar',
+ 'test_opcodes',
+ 'test_dict',
+ 'test_builtin',
+ 'test_exceptions',
+ 'test_types',
+ 'test_unittest',
+ 'test_doctest',
+ 'test_doctest2',
+ 'test_support'
+]
+
+# set of tests that we don't want to be executed when using regrtest
+NOTTESTS = set()
+
+
+def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
+ """Return a list of all applicable test modules."""
+ testdir = findtestdir(testdir)
+ names = os.listdir(testdir)
+ tests = []
+ others = set(stdtests) | nottests
+ for name in names:
+ mod, ext = os.path.splitext(name)
+ if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
+ tests.append(mod)
+ return stdtests + sorted(tests)
+
+
+def runtest(test, verbose, quiet,
+ huntrleaks=False, use_resources=None,
+ output_on_failure=False, failfast=False, match_tests=None,
+ timeout=None):
+ """Run a single test.
+
+ test -- the name of the test
+ verbose -- if true, print more messages
+ quiet -- if true, don't print 'skipped' messages (probably redundant)
+ huntrleaks -- run multiple times to test for leaks; requires a debug
+ build; a triple corresponding to -R's three arguments
+ use_resources -- list of extra resources to use
+ output_on_failure -- if true, display test output on failure
+ timeout -- dump the traceback and exit if a test takes more than
+ timeout seconds
+ failfast, match_tests -- See regrtest command-line flags for these.
+
+ Returns the tuple result, test_time, where result is one of the constants:
+ INTERRUPTED KeyboardInterrupt when run under -j
+ RESOURCE_DENIED test skipped because resource denied
+ SKIPPED test skipped for some other reason
+ ENV_CHANGED test failed because it changed the execution environment
+ FAILED test failed
+ PASSED test passed
+ """
+
+ if use_resources is not None:
+ support.use_resources = use_resources
+ use_timeout = (timeout is not None)
+ if use_timeout:
+ faulthandler.dump_traceback_later(timeout, exit=True)
+ try:
+ support.match_tests = match_tests
+ if failfast:
+ support.failfast = True
+ if output_on_failure:
+ support.verbose = True
+
+ # Reuse the same instance to all calls to runtest(). Some
+ # tests keep a reference to sys.stdout or sys.stderr
+ # (eg. test_argparse).
+ if runtest.stringio is None:
+ stream = io.StringIO()
+ runtest.stringio = stream
+ else:
+ stream = runtest.stringio
+ stream.seek(0)
+ stream.truncate()
+
+ orig_stdout = sys.stdout
+ orig_stderr = sys.stderr
+ try:
+ sys.stdout = stream
+ sys.stderr = stream
+ result = runtest_inner(test, verbose, quiet, huntrleaks,
+ display_failure=False)
+ if result[0] == FAILED:
+ output = stream.getvalue()
+ orig_stderr.write(output)
+ orig_stderr.flush()
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+ else:
+ support.verbose = verbose # Tell tests to be moderately quiet
+ result = runtest_inner(test, verbose, quiet, huntrleaks,
+ display_failure=not verbose)
+ return result
+ finally:
+ if use_timeout:
+ faulthandler.cancel_dump_traceback_later()
+ cleanup_test_droppings(test, verbose)
+runtest.stringio = None
+
+
+def runtest_inner(test, verbose, quiet,
+ huntrleaks=False, display_failure=True):
+ support.unload(test)
+
+ test_time = 0.0
+ refleak = False # True if the test leaked references.
+ try:
+ if test.startswith('test.'):
+ abstest = test
+ else:
+ # Always import it from the test package
+ abstest = 'test.' + test
+ with saved_test_environment(test, verbose, quiet) as environment:
+ start_time = time.time()
+ the_module = importlib.import_module(abstest)
+ # If the test has a test_main, that will run the appropriate
+ # tests. If not, use normal unittest test loading.
+ test_runner = getattr(the_module, "test_main", None)
+ if test_runner is None:
+ def test_runner():
+ loader = unittest.TestLoader()
+ tests = loader.loadTestsFromModule(the_module)
+ for error in loader.errors:
+ print(error, file=sys.stderr)
+ if loader.errors:
+ raise Exception("errors while loading tests")
+ support.run_unittest(tests)
+ test_runner()
+ if huntrleaks:
+ refleak = dash_R(the_module, test, test_runner, huntrleaks)
+ test_time = time.time() - start_time
+ except support.ResourceDenied as msg:
+ if not quiet:
+ print(test, "skipped --", msg)
+ sys.stdout.flush()
+ return RESOURCE_DENIED, test_time
+ except unittest.SkipTest as msg:
+ if not quiet:
+ print(test, "skipped --", msg)
+ sys.stdout.flush()
+ return SKIPPED, test_time
+ except KeyboardInterrupt:
+ raise
+ except support.TestFailed as msg:
+ if display_failure:
+ print("test", test, "failed --", msg, file=sys.stderr)
+ else:
+ print("test", test, "failed", file=sys.stderr)
+ sys.stderr.flush()
+ return FAILED, test_time
+ except:
+ msg = traceback.format_exc()
+ print("test", test, "crashed --", msg, file=sys.stderr)
+ sys.stderr.flush()
+ return FAILED, test_time
+ else:
+ if refleak:
+ return FAILED, test_time
+ if environment.changed:
+ return ENV_CHANGED, test_time
+ return PASSED, test_time
+
+
+def cleanup_test_droppings(testname, verbose):
+ import shutil
+ import stat
+ import gc
+
+ # First kill any dangling references to open files etc.
+ # This can also issue some ResourceWarnings which would otherwise get
+ # triggered during the following test run, and possibly produce failures.
+ gc.collect()
+
+ # Try to clean up junk commonly left behind. While tests shouldn't leave
+ # any files or directories behind, when a test fails that can be tedious
+ # for it to arrange. The consequences can be especially nasty on Windows,
+ # since if a test leaves a file open, it cannot be deleted by name (while
+ # there's nothing we can do about that here either, we can display the
+ # name of the offending test, which is a real help).
+ for name in (support.TESTFN,
+ "db_home",
+ ):
+ if not os.path.exists(name):
+ continue
+
+ if os.path.isdir(name):
+ kind, nuker = "directory", shutil.rmtree
+ elif os.path.isfile(name):
+ kind, nuker = "file", os.unlink
+ else:
+ raise SystemError("os.path says %r exists but is neither "
+ "directory nor file" % name)
+
+ if verbose:
+ print("%r left behind %s %r" % (testname, kind, name))
+ try:
+ # if we have chmod, fix possible permissions problems
+ # that might prevent cleanup
+ if (hasattr(os, 'chmod')):
+ os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
+ nuker(name)
+ except Exception as msg:
+ print(("%r left behind %s %r and it couldn't be "
+ "removed: %s" % (testname, kind, name, msg)), file=sys.stderr)
+
+
+def findtestdir(path=None):
+ return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py
new file mode 100644
index 0000000..4f6a1aa
--- /dev/null
+++ b/Lib/test/libregrtest/save_env.py
@@ -0,0 +1,284 @@
+import builtins
+import locale
+import logging
+import os
+import shutil
+import sys
+import sysconfig
+import warnings
+from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+try:
+ import _multiprocessing, multiprocessing.process
+except ImportError:
+ multiprocessing = None
+
+
+# Unit tests are supposed to leave the execution environment unchanged
+# once they complete. But sometimes tests have bugs, especially when
+# tests fail, and the changes to environment go on to mess up other
+# tests. This can cause issues with buildbot stability, since tests
+# are run in random order and so problems may appear to come and go.
+# There are a few things we can save and restore to mitigate this, and
+# the following context manager handles this task.
+
+class saved_test_environment:
+ """Save bits of the test environment and restore them at block exit.
+
+ with saved_test_environment(testname, verbose, quiet):
+ #stuff
+
+ Unless quiet is True, a warning is printed to stderr if any of
+ the saved items was changed by the test. The attribute 'changed'
+ is initially False, but is set to True if a change is detected.
+
+ If verbose is more than 1, the before and after state of changed
+ items is also printed.
+ """
+
+ changed = False
+
+ def __init__(self, testname, verbose=0, quiet=False):
+ self.testname = testname
+ self.verbose = verbose
+ self.quiet = quiet
+
+ # To add things to save and restore, add a name XXX to the resources list
+ # and add corresponding get_XXX/restore_XXX functions. get_XXX should
+ # return the value to be saved and compared against a second call to the
+ # get function when test execution completes. restore_XXX should accept
+ # the saved value and restore the resource using it. It will be called if
+ # and only if a change in the value is detected.
+ #
+ # Note: XXX will have any '.' replaced with '_' characters when determining
+ # the corresponding method names.
+
+ resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
+ 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
+ 'warnings.filters', 'asyncore.socket_map',
+ 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
+ 'sys.warnoptions',
+ # multiprocessing.process._cleanup() may release ref
+ # to a thread, so check processes first.
+ 'multiprocessing.process._dangling', 'threading._dangling',
+ 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
+ 'files', 'locale', 'warnings.showwarning',
+ )
+
+ def get_sys_argv(self):
+ return id(sys.argv), sys.argv, sys.argv[:]
+ def restore_sys_argv(self, saved_argv):
+ sys.argv = saved_argv[1]
+ sys.argv[:] = saved_argv[2]
+
+ def get_cwd(self):
+ return os.getcwd()
+ def restore_cwd(self, saved_cwd):
+ os.chdir(saved_cwd)
+
+ def get_sys_stdout(self):
+ return sys.stdout
+ def restore_sys_stdout(self, saved_stdout):
+ sys.stdout = saved_stdout
+
+ def get_sys_stderr(self):
+ return sys.stderr
+ def restore_sys_stderr(self, saved_stderr):
+ sys.stderr = saved_stderr
+
+ def get_sys_stdin(self):
+ return sys.stdin
+ def restore_sys_stdin(self, saved_stdin):
+ sys.stdin = saved_stdin
+
+ def get_os_environ(self):
+ return id(os.environ), os.environ, dict(os.environ)
+ def restore_os_environ(self, saved_environ):
+ os.environ = saved_environ[1]
+ os.environ.clear()
+ os.environ.update(saved_environ[2])
+
+ def get_sys_path(self):
+ return id(sys.path), sys.path, sys.path[:]
+ def restore_sys_path(self, saved_path):
+ sys.path = saved_path[1]
+ sys.path[:] = saved_path[2]
+
+ def get_sys_path_hooks(self):
+ return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
+ def restore_sys_path_hooks(self, saved_hooks):
+ sys.path_hooks = saved_hooks[1]
+ sys.path_hooks[:] = saved_hooks[2]
+
+ def get_sys_gettrace(self):
+ return sys.gettrace()
+ def restore_sys_gettrace(self, trace_fxn):
+ sys.settrace(trace_fxn)
+
+ def get___import__(self):
+ return builtins.__import__
+ def restore___import__(self, import_):
+ builtins.__import__ = import_
+
+ def get_warnings_filters(self):
+ return id(warnings.filters), warnings.filters, warnings.filters[:]
+ def restore_warnings_filters(self, saved_filters):
+ warnings.filters = saved_filters[1]
+ warnings.filters[:] = saved_filters[2]
+
+ def get_asyncore_socket_map(self):
+ asyncore = sys.modules.get('asyncore')
+ # XXX Making a copy keeps objects alive until __exit__ gets called.
+ return asyncore and asyncore.socket_map.copy() or {}
+ def restore_asyncore_socket_map(self, saved_map):
+ asyncore = sys.modules.get('asyncore')
+ if asyncore is not None:
+ asyncore.close_all(ignore_all=True)
+ asyncore.socket_map.update(saved_map)
+
+ def get_shutil_archive_formats(self):
+ # we could call get_archives_formats() but that only returns the
+ # registry keys; we want to check the values too (the functions that
+ # are registered)
+ return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
+ def restore_shutil_archive_formats(self, saved):
+ shutil._ARCHIVE_FORMATS = saved[0]
+ shutil._ARCHIVE_FORMATS.clear()
+ shutil._ARCHIVE_FORMATS.update(saved[1])
+
+ def get_shutil_unpack_formats(self):
+ return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
+ def restore_shutil_unpack_formats(self, saved):
+ shutil._UNPACK_FORMATS = saved[0]
+ shutil._UNPACK_FORMATS.clear()
+ shutil._UNPACK_FORMATS.update(saved[1])
+
+ def get_logging__handlers(self):
+ # _handlers is a WeakValueDictionary
+ return id(logging._handlers), logging._handlers, logging._handlers.copy()
+ def restore_logging__handlers(self, saved_handlers):
+ # Can't easily revert the logging state
+ pass
+
+ def get_logging__handlerList(self):
+ # _handlerList is a list of weakrefs to handlers
+ return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
+ def restore_logging__handlerList(self, saved_handlerList):
+ # Can't easily revert the logging state
+ pass
+
+ def get_sys_warnoptions(self):
+ return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
+ def restore_sys_warnoptions(self, saved_options):
+ sys.warnoptions = saved_options[1]
+ sys.warnoptions[:] = saved_options[2]
+
+ # Controlling dangling references to Thread objects can make it easier
+ # to track reference leaks.
+ def get_threading__dangling(self):
+ if not threading:
+ return None
+ # This copies the weakrefs without making any strong reference
+ return threading._dangling.copy()
+ def restore_threading__dangling(self, saved):
+ if not threading:
+ return
+ threading._dangling.clear()
+ threading._dangling.update(saved)
+
+ # Same for Process objects
+ def get_multiprocessing_process__dangling(self):
+ if not multiprocessing:
+ return None
+ # Unjoined process objects can survive after process exits
+ multiprocessing.process._cleanup()
+ # This copies the weakrefs without making any strong reference
+ return multiprocessing.process._dangling.copy()
+ def restore_multiprocessing_process__dangling(self, saved):
+ if not multiprocessing:
+ return
+ multiprocessing.process._dangling.clear()
+ multiprocessing.process._dangling.update(saved)
+
+ def get_sysconfig__CONFIG_VARS(self):
+ # make sure the dict is initialized
+ sysconfig.get_config_var('prefix')
+ return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
+ dict(sysconfig._CONFIG_VARS))
+ def restore_sysconfig__CONFIG_VARS(self, saved):
+ sysconfig._CONFIG_VARS = saved[1]
+ sysconfig._CONFIG_VARS.clear()
+ sysconfig._CONFIG_VARS.update(saved[2])
+
+ def get_sysconfig__INSTALL_SCHEMES(self):
+ return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
+ sysconfig._INSTALL_SCHEMES.copy())
+ def restore_sysconfig__INSTALL_SCHEMES(self, saved):
+ sysconfig._INSTALL_SCHEMES = saved[1]
+ sysconfig._INSTALL_SCHEMES.clear()
+ sysconfig._INSTALL_SCHEMES.update(saved[2])
+
+ def get_files(self):
+ return sorted(fn + ('/' if os.path.isdir(fn) else '')
+ for fn in os.listdir())
+ def restore_files(self, saved_value):
+ fn = support.TESTFN
+ if fn not in saved_value and (fn + '/') not in saved_value:
+ if os.path.isfile(fn):
+ support.unlink(fn)
+ elif os.path.isdir(fn):
+ support.rmtree(fn)
+
+ _lc = [getattr(locale, lc) for lc in dir(locale)
+ if lc.startswith('LC_')]
+ def get_locale(self):
+ pairings = []
+ for lc in self._lc:
+ try:
+ pairings.append((lc, locale.setlocale(lc, None)))
+ except (TypeError, ValueError):
+ continue
+ return pairings
+ def restore_locale(self, saved):
+ for lc, setting in saved:
+ locale.setlocale(lc, setting)
+
+ def get_warnings_showwarning(self):
+ return warnings.showwarning
+ def restore_warnings_showwarning(self, fxn):
+ warnings.showwarning = fxn
+
+ def resource_info(self):
+ for name in self.resources:
+ method_suffix = name.replace('.', '_')
+ get_name = 'get_' + method_suffix
+ restore_name = 'restore_' + method_suffix
+ yield name, getattr(self, get_name), getattr(self, restore_name)
+
+ def __enter__(self):
+ self.saved_values = dict((name, get()) for name, get, restore
+ in self.resource_info())
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ saved_values = self.saved_values
+ del self.saved_values
+ for name, get, restore in self.resource_info():
+ current = get()
+ original = saved_values.pop(name)
+ # Check for changes to the resource's value
+ if current != original:
+ self.changed = True
+ restore(original)
+ if not self.quiet:
+ print("Warning -- {} was modified by {}".format(
+ name, self.testname),
+ file=sys.stderr)
+ if self.verbose > 1:
+ print(" Before: {}\n After: {} ".format(
+ original, current),
+ file=sys.stderr)
+ return False