summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/libregrtest/main.py')
-rw-r--r--Lib/test/libregrtest/main.py547
1 files changed, 547 insertions, 0 deletions
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
new file mode 100644
index 0000000..5d34cff
--- /dev/null
+++ b/Lib/test/libregrtest/main.py
@@ -0,0 +1,547 @@
+import faulthandler
+import json
+import os
+import re
+import sys
+import tempfile
+import sysconfig
+import signal
+import random
+import platform
+import traceback
+import unittest
+from test.libregrtest.runtest import (
+ findtests, runtest, run_test_in_subprocess,
+ STDTESTS, NOTTESTS,
+ PASSED, FAILED, ENV_CHANGED, SKIPPED,
+ RESOURCE_DENIED, INTERRUPTED, CHILD_ERROR)
+from test.libregrtest.refleak import warm_caches
+from test.libregrtest.cmdline import _parse_args
+from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+
+
+# Some times __path__ and __file__ are not absolute (e.g. while running from
+# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
+# imports might fail. This affects only the modules imported before os.chdir().
+# These modules are searched first in sys.path[0] (so '' -- the CWD) and if
+# they are found in the CWD their __file__ and __path__ will be relative (this
+# happens before the chdir). All the modules imported after the chdir, are
+# not found in the CWD, and since the other paths in sys.path[1:] are absolute
+# (site.py absolutize them), the __file__ and __path__ will be absolute too.
+# Therefore it is necessary to absolutize manually the __file__ and __path__ of
+# the packages to prevent later imports to fail when the CWD is different.
+for module in sys.modules.values():
+ if hasattr(module, '__path__'):
+ module.__path__ = [os.path.abspath(path) for path in module.__path__]
+ if hasattr(module, '__file__'):
+ module.__file__ = os.path.abspath(module.__file__)
+
+
+# MacOSX (a.k.a. Darwin) has a default stack size that is too small
+# for deeply recursive regular expressions. We see this as crashes in
+# the Python test suite when running test_re.py and test_sre.py. The
+# fix is to set the stack limit to 2048.
+# This approach may also be useful for other Unixy platforms that
+# suffer from small default stack limits.
+if sys.platform == 'darwin':
+ try:
+ import resource
+ except ImportError:
+ pass
+ else:
+ soft, hard = resource.getrlimit(resource.RLIMIT_STACK)
+ newsoft = min(hard, max(soft, 1024*2048))
+ resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard))
+
+
+# When tests are run from the Python build directory, it is best practice
+# to keep the test files in a subfolder. This eases the cleanup of leftover
+# files using the "make distclean" command.
+if sysconfig.is_python_build():
+ TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
+else:
+ TEMPDIR = tempfile.gettempdir()
+TEMPDIR = os.path.abspath(TEMPDIR)
+
+
+def main(tests=None, **kwargs):
+ """Execute a test suite.
+
+ This also parses command-line options and modifies its behavior
+ accordingly.
+
+ tests -- a list of strings containing test names (optional)
+ testdir -- the directory in which to look for tests (optional)
+
+ Users other than the Python test suite will certainly want to
+ specify testdir; if it's omitted, the directory containing the
+ Python test suite is searched for.
+
+ If the tests argument is omitted, the tests listed on the
+ command-line will be used. If that's empty, too, then all *.py
+ files beginning with test_ will be used.
+
+ The other default arguments (verbose, quiet, exclude,
+ single, randomize, findleaks, use_resources, trace, coverdir,
+ print_slow, and random_seed) allow programmers calling main()
+ directly to set the values that would normally be set by flags
+ on the command line.
+ """
+ # Display the Python traceback on fatal errors (e.g. segfault)
+ faulthandler.enable(all_threads=True)
+
+ # Display the Python traceback on SIGALRM or SIGUSR1 signal
+ signals = []
+ if hasattr(signal, 'SIGALRM'):
+ signals.append(signal.SIGALRM)
+ if hasattr(signal, 'SIGUSR1'):
+ signals.append(signal.SIGUSR1)
+ for signum in signals:
+ faulthandler.register(signum, chain=True)
+
+ replace_stdout()
+
+ support.record_original_stdout(sys.stdout)
+
+ ns = _parse_args(sys.argv[1:], **kwargs)
+
+ if ns.huntrleaks:
+ # Avoid false positives due to various caches
+ # filling slowly with random data:
+ warm_caches()
+ if ns.memlimit is not None:
+ support.set_memlimit(ns.memlimit)
+ if ns.threshold is not None:
+ import gc
+ gc.set_threshold(ns.threshold)
+ if ns.nowindows:
+ import msvcrt
+ msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS|
+ msvcrt.SEM_NOALIGNMENTFAULTEXCEPT|
+ msvcrt.SEM_NOGPFAULTERRORBOX|
+ msvcrt.SEM_NOOPENFILEERRORBOX)
+ try:
+ msvcrt.CrtSetReportMode
+ except AttributeError:
+ # release build
+ pass
+ else:
+ for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
+ msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
+ msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
+ if ns.wait:
+ input("Press any key to continue...")
+
+ if ns.slaveargs is not None:
+ args, kwargs = json.loads(ns.slaveargs)
+ if kwargs.get('huntrleaks'):
+ unittest.BaseTestSuite._cleanup = False
+ try:
+ result = runtest(*args, **kwargs)
+ except KeyboardInterrupt:
+ result = INTERRUPTED, ''
+ except BaseException as e:
+ traceback.print_exc()
+ result = CHILD_ERROR, str(e)
+ sys.stdout.flush()
+ print() # Force a newline (just in case)
+ print(json.dumps(result))
+ sys.exit(0)
+
+ good = []
+ bad = []
+ skipped = []
+ resource_denieds = []
+ environment_changed = []
+ interrupted = False
+
+ if ns.findleaks:
+ try:
+ import gc
+ except ImportError:
+ print('No GC available, disabling findleaks.')
+ ns.findleaks = False
+ else:
+ # Uncomment the line below to report garbage that is not
+ # freeable by reference counting alone. By default only
+ # garbage that is not collectable by the GC is reported.
+ #gc.set_debug(gc.DEBUG_SAVEALL)
+ found_garbage = []
+
+ if ns.huntrleaks:
+ unittest.BaseTestSuite._cleanup = False
+
+ if ns.single:
+ filename = os.path.join(TEMPDIR, 'pynexttest')
+ try:
+ with open(filename, 'r') as fp:
+ next_test = fp.read().strip()
+ tests = [next_test]
+ except OSError:
+ pass
+
+ if ns.fromfile:
+ tests = []
+ with open(os.path.join(support.SAVEDCWD, ns.fromfile)) as fp:
+ count_pat = re.compile(r'\[\s*\d+/\s*\d+\]')
+ for line in fp:
+ line = count_pat.sub('', line)
+ guts = line.split() # assuming no test has whitespace in its name
+ if guts and not guts[0].startswith('#'):
+ tests.extend(guts)
+
+ # Strip .py extensions.
+ removepy(ns.args)
+ removepy(tests)
+
+ stdtests = STDTESTS[:]
+ nottests = NOTTESTS.copy()
+ if ns.exclude:
+ for arg in ns.args:
+ if arg in stdtests:
+ stdtests.remove(arg)
+ nottests.add(arg)
+ ns.args = []
+
+ # For a partial run, we do not need to clutter the output.
+ if ns.verbose or ns.header or not (ns.quiet or ns.single or tests or ns.args):
+ # Print basic platform information
+ print("==", platform.python_implementation(), *sys.version.split())
+ print("== ", platform.platform(aliased=True),
+ "%s-endian" % sys.byteorder)
+ print("== ", "hash algorithm:", sys.hash_info.algorithm,
+ "64bit" if sys.maxsize > 2**32 else "32bit")
+ print("== ", os.getcwd())
+ print("Testing with flags:", sys.flags)
+
+ # if testdir is set, then we are not running the python tests suite, so
+ # don't add default tests to be executed or skipped (pass empty values)
+ if ns.testdir:
+ alltests = findtests(ns.testdir, list(), set())
+ else:
+ alltests = findtests(ns.testdir, stdtests, nottests)
+
+ selected = tests or ns.args or alltests
+ if ns.single:
+ selected = selected[:1]
+ try:
+ next_single_test = alltests[alltests.index(selected[0])+1]
+ except IndexError:
+ next_single_test = None
+ # Remove all the selected tests that precede start if it's set.
+ if ns.start:
+ try:
+ del selected[:selected.index(ns.start)]
+ except ValueError:
+ print("Couldn't find starting test (%s), using all tests" % ns.start)
+ if ns.randomize:
+ if ns.random_seed is None:
+ ns.random_seed = random.randrange(10000000)
+ random.seed(ns.random_seed)
+ print("Using random seed", ns.random_seed)
+ random.shuffle(selected)
+ if ns.trace:
+ import trace, tempfile
+ tracer = trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix,
+ tempfile.gettempdir()],
+ trace=False, count=True)
+
+ test_times = []
+ support.verbose = ns.verbose # Tell tests to be moderately quiet
+ support.use_resources = ns.use_resources
+ save_modules = sys.modules.keys()
+
+ def accumulate_result(test, result):
+ ok, test_time = result
+ test_times.append((test_time, test))
+ if ok == PASSED:
+ good.append(test)
+ elif ok == FAILED:
+ bad.append(test)
+ elif ok == ENV_CHANGED:
+ environment_changed.append(test)
+ elif ok == SKIPPED:
+ skipped.append(test)
+ elif ok == RESOURCE_DENIED:
+ skipped.append(test)
+ resource_denieds.append(test)
+
+ if ns.forever:
+ def test_forever(tests=list(selected)):
+ while True:
+ for test in tests:
+ yield test
+ if bad:
+ return
+ tests = test_forever()
+ test_count = ''
+ test_count_width = 3
+ else:
+ tests = iter(selected)
+ test_count = '/{}'.format(len(selected))
+ test_count_width = len(test_count) - 1
+
+ if ns.use_mp:
+ try:
+ from threading import Thread
+ except ImportError:
+ print("Multiprocess option requires thread support")
+ sys.exit(2)
+ from queue import Queue
+ debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$")
+ output = Queue()
+ pending = MultiprocessTests(tests)
+ def work():
+ # A worker thread.
+ try:
+ while True:
+ try:
+ test = next(pending)
+ except StopIteration:
+ output.put((None, None, None, None))
+ return
+ retcode, stdout, stderr = run_test_in_subprocess(test, ns)
+ # Strip last refcount output line if it exists, since it
+ # comes from the shutdown of the interpreter in the subcommand.
+ stderr = debug_output_pat.sub("", stderr)
+ stdout, _, result = stdout.strip().rpartition("\n")
+ if retcode != 0:
+ result = (CHILD_ERROR, "Exit code %s" % retcode)
+ output.put((test, stdout.rstrip(), stderr.rstrip(), result))
+ return
+ if not result:
+ output.put((None, None, None, None))
+ return
+ result = json.loads(result)
+ output.put((test, stdout.rstrip(), stderr.rstrip(), result))
+ except BaseException:
+ output.put((None, None, None, None))
+ raise
+ workers = [Thread(target=work) for i in range(ns.use_mp)]
+ for worker in workers:
+ worker.start()
+ finished = 0
+ test_index = 1
+ try:
+ while finished < ns.use_mp:
+ test, stdout, stderr, result = output.get()
+ if test is None:
+ finished += 1
+ continue
+ accumulate_result(test, result)
+ if not ns.quiet:
+ fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}"
+ print(fmt.format(
+ test_count_width, test_index, test_count,
+ len(bad), test))
+ if stdout:
+ print(stdout)
+ if stderr:
+ print(stderr, file=sys.stderr)
+ sys.stdout.flush()
+ sys.stderr.flush()
+ if result[0] == INTERRUPTED:
+ raise KeyboardInterrupt
+ if result[0] == CHILD_ERROR:
+ raise Exception("Child error on {}: {}".format(test, result[1]))
+ test_index += 1
+ except KeyboardInterrupt:
+ interrupted = True
+ pending.interrupted = True
+ for worker in workers:
+ worker.join()
+ else:
+ for test_index, test in enumerate(tests, 1):
+ if not ns.quiet:
+ fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}"
+ print(fmt.format(
+ test_count_width, test_index, test_count, len(bad), test))
+ sys.stdout.flush()
+ if ns.trace:
+ # If we're tracing code coverage, then we don't exit with status
+ # if on a false return value from main.
+ tracer.runctx('runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)',
+ globals=globals(), locals=vars())
+ else:
+ try:
+ result = runtest(test, ns.verbose, ns.quiet,
+ ns.huntrleaks,
+ output_on_failure=ns.verbose3,
+ timeout=ns.timeout, failfast=ns.failfast,
+ match_tests=ns.match_tests)
+ accumulate_result(test, result)
+ except KeyboardInterrupt:
+ interrupted = True
+ break
+ if ns.findleaks:
+ gc.collect()
+ if gc.garbage:
+ print("Warning: test created", len(gc.garbage), end=' ')
+ print("uncollectable object(s).")
+ # move the uncollectable objects somewhere so we don't see
+ # them again
+ found_garbage.extend(gc.garbage)
+ del gc.garbage[:]
+ # Unload the newly imported modules (best effort finalization)
+ for module in sys.modules.keys():
+ if module not in save_modules and module.startswith("test."):
+ support.unload(module)
+
+ if interrupted:
+ # print a newline after ^C
+ print()
+ print("Test suite interrupted by signal SIGINT.")
+ omitted = set(selected) - set(good) - set(bad) - set(skipped)
+ print(count(len(omitted), "test"), "omitted:")
+ printlist(omitted)
+ if good and not ns.quiet:
+ if not bad and not skipped and not interrupted and len(good) > 1:
+ print("All", end=' ')
+ print(count(len(good), "test"), "OK.")
+ if ns.print_slow:
+ test_times.sort(reverse=True)
+ print("10 slowest tests:")
+ for time, test in test_times[:10]:
+ print("%s: %.1fs" % (test, time))
+ if bad:
+ print(count(len(bad), "test"), "failed:")
+ printlist(bad)
+ if environment_changed:
+ print("{} altered the execution environment:".format(
+ count(len(environment_changed), "test")))
+ printlist(environment_changed)
+ if skipped and not ns.quiet:
+ print(count(len(skipped), "test"), "skipped:")
+ printlist(skipped)
+
+ if ns.verbose2 and bad:
+ print("Re-running failed tests in verbose mode")
+ for test in bad[:]:
+ print("Re-running test %r in verbose mode" % test)
+ sys.stdout.flush()
+ try:
+ ns.verbose = True
+ ok = runtest(test, True, ns.quiet, ns.huntrleaks,
+ timeout=ns.timeout)
+ except KeyboardInterrupt:
+ # print a newline separate from the ^C
+ print()
+ break
+ else:
+ if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
+ bad.remove(test)
+ else:
+ if bad:
+ print(count(len(bad), 'test'), "failed again:")
+ printlist(bad)
+
+ if ns.single:
+ if next_single_test:
+ with open(filename, 'w') as fp:
+ fp.write(next_single_test + '\n')
+ else:
+ os.unlink(filename)
+
+ if ns.trace:
+ r = tracer.results()
+ r.write_results(show_missing=True, summary=True, coverdir=ns.coverdir)
+
+ if ns.runleaks:
+ os.system("leaks %d" % os.getpid())
+
+ sys.exit(len(bad) > 0 or interrupted)
+
+
+# We do not use a generator so multiple threads can call next().
+class MultiprocessTests(object):
+
+ """A thread-safe iterator over tests for multiprocess mode."""
+
+ def __init__(self, tests):
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.tests = tests
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ with self.lock:
+ if self.interrupted:
+ raise StopIteration('tests interrupted')
+ return next(self.tests)
+
+
+def replace_stdout():
+ """Set stdout encoder error handler to backslashreplace (as stderr error
+ handler) to avoid UnicodeEncodeError when printing a traceback"""
+ import atexit
+
+ stdout = sys.stdout
+ sys.stdout = open(stdout.fileno(), 'w',
+ encoding=stdout.encoding,
+ errors="backslashreplace",
+ closefd=False,
+ newline='\n')
+
+ def restore_stdout():
+ sys.stdout.close()
+ sys.stdout = stdout
+ atexit.register(restore_stdout)
+
+
+def removepy(names):
+ if not names:
+ return
+ for idx, name in enumerate(names):
+ basename, ext = os.path.splitext(name)
+ if ext == '.py':
+ names[idx] = basename
+
+
+def count(n, word):
+ if n == 1:
+ return "%d %s" % (n, word)
+ else:
+ return "%d %ss" % (n, word)
+
+
+def printlist(x, width=70, indent=4):
+ """Print the elements of iterable x to stdout.
+
+ Optional arg width (default 70) is the maximum line length.
+ Optional arg indent (default 4) is the number of blanks with which to
+ begin each line.
+ """
+
+ from textwrap import fill
+ blanks = ' ' * indent
+ # Print the sorted list: 'x' may be a '--random' list or a set()
+ print(fill(' '.join(str(elt) for elt in sorted(x)), width,
+ initial_indent=blanks, subsequent_indent=blanks))
+
+
+def main_in_temp_cwd():
+ """Run main() in a temporary working directory."""
+ if sysconfig.is_python_build():
+ try:
+ os.mkdir(TEMPDIR)
+ except FileExistsError:
+ pass
+
+ # Define a writable temp dir that will be used as cwd while running
+ # the tests. The name of the dir includes the pid to allow parallel
+ # testing (see the -j option).
+ test_cwd = 'test_python_{}'.format(os.getpid())
+ test_cwd = os.path.join(TEMPDIR, test_cwd)
+
+ # Run the tests in a context manager that temporarily changes the CWD to a
+ # temporary and writable directory. If it's not possible to create or
+ # change the CWD, the original CWD will be used. The original CWD is
+ # available from support.SAVEDCWD.
+ with support.temp_cwd(test_cwd, quiet=True):
+ main()