diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2015-09-29 20:48:52 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2015-09-29 20:48:52 (GMT) |
commit | dad20e4876b339a559d61810299f96712719687e (patch) | |
tree | d5e5ff211f2ba7712e7057263e35f27f1a21c9ee | |
parent | b27232949df2c7201f16e23bbfd6dd872f3bdc6d (diff) | |
download | cpython-dad20e4876b339a559d61810299f96712719687e.zip cpython-dad20e4876b339a559d61810299f96712719687e.tar.gz cpython-dad20e4876b339a559d61810299f96712719687e.tar.bz2 |
Issue #25220: Split the huge main() function of libregrtest.main into a class
with attributes and methods.
The --threshold command line option is now ignored if the gc module is missing.
* Convert main() variables to Regrtest attributes, document some attributes
* Convert accumulate_result() function to a method
* Create setup_python() function and setup_regrtest() method.
* Import gc at top level
* Move resource.setrlimit() and the code to make the module paths absolute into
the new setup_python() function. So this code is no more executed when the
module is imported, only when main() is executed. We have a better control on
when the setup is done.
* Move textwrap import from printlist() to the top level.
* Some other minor cleanup.
-rw-r--r-- | Lib/test/libregrtest/main.py | 701 |
1 files changed, 387 insertions, 314 deletions
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 5d34cff..63388c9 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -1,13 +1,14 @@ import faulthandler import json import os +import platform +import random import re +import signal import sys -import tempfile import sysconfig -import signal -import random -import platform +import tempfile +import textwrap import traceback import unittest from test.libregrtest.runtest import ( @@ -19,45 +20,15 @@ from test.libregrtest.refleak import warm_caches from test.libregrtest.cmdline import _parse_args from test import support try: + import gc +except ImportError: + gc = None +try: import threading except ImportError: threading = None -# Some times __path__ and __file__ are not absolute (e.g. while running from -# Lib/) and, if we change the CWD to run the tests in a temporary dir, some -# imports might fail. This affects only the modules imported before os.chdir(). -# These modules are searched first in sys.path[0] (so '' -- the CWD) and if -# they are found in the CWD their __file__ and __path__ will be relative (this -# happens before the chdir). All the modules imported after the chdir, are -# not found in the CWD, and since the other paths in sys.path[1:] are absolute -# (site.py absolutize them), the __file__ and __path__ will be absolute too. -# Therefore it is necessary to absolutize manually the __file__ and __path__ of -# the packages to prevent later imports to fail when the CWD is different. -for module in sys.modules.values(): - if hasattr(module, '__path__'): - module.__path__ = [os.path.abspath(path) for path in module.__path__] - if hasattr(module, '__file__'): - module.__file__ = os.path.abspath(module.__file__) - - -# MacOSX (a.k.a. Darwin) has a default stack size that is too small -# for deeply recursive regular expressions. We see this as crashes in -# the Python test suite when running test_re.py and test_sre.py. The -# fix is to set the stack limit to 2048. -# This approach may also be useful for other Unixy platforms that -# suffer from small default stack limits. -if sys.platform == 'darwin': - try: - import resource - except ImportError: - pass - else: - soft, hard = resource.getrlimit(resource.RLIMIT_STACK) - newsoft = min(hard, max(soft, 1024*2048)) - resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard)) - - # When tests are run from the Python build directory, it is best practice # to keep the test files in a subfolder. This eases the cleanup of leftover # files using the "make distclean" command. @@ -68,7 +39,73 @@ else: TEMPDIR = os.path.abspath(TEMPDIR) -def main(tests=None, **kwargs): +def slave_runner(slaveargs): + args, kwargs = json.loads(slaveargs) + if kwargs.get('huntrleaks'): + unittest.BaseTestSuite._cleanup = False + try: + result = runtest(*args, **kwargs) + except KeyboardInterrupt: + result = INTERRUPTED, '' + except BaseException as e: + traceback.print_exc() + result = CHILD_ERROR, str(e) + sys.stdout.flush() + print() # Force a newline (just in case) + print(json.dumps(result)) + sys.exit(0) + + +def setup_python(): + # Display the Python traceback on fatal errors (e.g. segfault) + faulthandler.enable(all_threads=True) + + # Display the Python traceback on SIGALRM or SIGUSR1 signal + signals = [] + if hasattr(signal, 'SIGALRM'): + signals.append(signal.SIGALRM) + if hasattr(signal, 'SIGUSR1'): + signals.append(signal.SIGUSR1) + for signum in signals: + faulthandler.register(signum, chain=True) + + replace_stdout() + support.record_original_stdout(sys.stdout) + + # Some times __path__ and __file__ are not absolute (e.g. while running from + # Lib/) and, if we change the CWD to run the tests in a temporary dir, some + # imports might fail. This affects only the modules imported before os.chdir(). + # These modules are searched first in sys.path[0] (so '' -- the CWD) and if + # they are found in the CWD their __file__ and __path__ will be relative (this + # happens before the chdir). All the modules imported after the chdir, are + # not found in the CWD, and since the other paths in sys.path[1:] are absolute + # (site.py absolutize them), the __file__ and __path__ will be absolute too. + # Therefore it is necessary to absolutize manually the __file__ and __path__ of + # the packages to prevent later imports to fail when the CWD is different. + for module in sys.modules.values(): + if hasattr(module, '__path__'): + module.__path__ = [os.path.abspath(path) for path in module.__path__] + if hasattr(module, '__file__'): + module.__file__ = os.path.abspath(module.__file__) + + # MacOSX (a.k.a. Darwin) has a default stack size that is too small + # for deeply recursive regular expressions. We see this as crashes in + # the Python test suite when running test_re.py and test_sre.py. The + # fix is to set the stack limit to 2048. + # This approach may also be useful for other Unixy platforms that + # suffer from small default stack limits. + if sys.platform == 'darwin': + try: + import resource + except ImportError: + pass + else: + soft, hard = resource.getrlimit(resource.RLIMIT_STACK) + newsoft = min(hard, max(soft, 1024*2048)) + resource.setrlimit(resource.RLIMIT_STACK, (newsoft, hard)) + + +class Regrtest: """Execute a test suite. This also parses command-line options and modifies its behavior @@ -91,210 +128,257 @@ def main(tests=None, **kwargs): directly to set the values that would normally be set by flags on the command line. """ - # Display the Python traceback on fatal errors (e.g. segfault) - faulthandler.enable(all_threads=True) + def __init__(self): + # Namespace of command line options + self.ns = None + + # tests + self.tests = [] + self.selected = [] + + # test results + self.good = [] + self.bad = [] + self.skipped = [] + self.resource_denieds = [] + self.environment_changed = [] + self.interrupted = False - # Display the Python traceback on SIGALRM or SIGUSR1 signal - signals = [] - if hasattr(signal, 'SIGALRM'): - signals.append(signal.SIGALRM) - if hasattr(signal, 'SIGUSR1'): - signals.append(signal.SIGUSR1) - for signum in signals: - faulthandler.register(signum, chain=True) + # used by --slow + self.test_times = [] - replace_stdout() + # used by --coverage, trace.Trace instance + self.tracer = None - support.record_original_stdout(sys.stdout) + # used by --findleaks, store for gc.garbage + self.found_garbage = [] - ns = _parse_args(sys.argv[1:], **kwargs) - - if ns.huntrleaks: - # Avoid false positives due to various caches - # filling slowly with random data: - warm_caches() - if ns.memlimit is not None: - support.set_memlimit(ns.memlimit) - if ns.threshold is not None: - import gc - gc.set_threshold(ns.threshold) - if ns.nowindows: - import msvcrt - msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS| - msvcrt.SEM_NOALIGNMENTFAULTEXCEPT| - msvcrt.SEM_NOGPFAULTERRORBOX| - msvcrt.SEM_NOOPENFILEERRORBOX) - try: - msvcrt.CrtSetReportMode - except AttributeError: - # release build - pass - else: - for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: - msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) - msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) - if ns.wait: - input("Press any key to continue...") - - if ns.slaveargs is not None: - args, kwargs = json.loads(ns.slaveargs) - if kwargs.get('huntrleaks'): - unittest.BaseTestSuite._cleanup = False - try: - result = runtest(*args, **kwargs) - except KeyboardInterrupt: - result = INTERRUPTED, '' - except BaseException as e: - traceback.print_exc() - result = CHILD_ERROR, str(e) - sys.stdout.flush() - print() # Force a newline (just in case) - print(json.dumps(result)) - sys.exit(0) - - good = [] - bad = [] - skipped = [] - resource_denieds = [] - environment_changed = [] - interrupted = False - - if ns.findleaks: - try: - import gc - except ImportError: - print('No GC available, disabling findleaks.') - ns.findleaks = False - else: - # Uncomment the line below to report garbage that is not - # freeable by reference counting alone. By default only - # garbage that is not collectable by the GC is reported. - #gc.set_debug(gc.DEBUG_SAVEALL) - found_garbage = [] - - if ns.huntrleaks: - unittest.BaseTestSuite._cleanup = False + # used to display the progress bar "[ 3/100]" + self.test_count = '' + self.test_count_width = 1 - if ns.single: - filename = os.path.join(TEMPDIR, 'pynexttest') - try: - with open(filename, 'r') as fp: - next_test = fp.read().strip() - tests = [next_test] - except OSError: - pass + # used by --single + self.next_single_test = None + self.next_single_filename = None - if ns.fromfile: - tests = [] - with open(os.path.join(support.SAVEDCWD, ns.fromfile)) as fp: - count_pat = re.compile(r'\[\s*\d+/\s*\d+\]') - for line in fp: - line = count_pat.sub('', line) - guts = line.split() # assuming no test has whitespace in its name - if guts and not guts[0].startswith('#'): - tests.extend(guts) - - # Strip .py extensions. - removepy(ns.args) - removepy(tests) - - stdtests = STDTESTS[:] - nottests = NOTTESTS.copy() - if ns.exclude: - for arg in ns.args: - if arg in stdtests: - stdtests.remove(arg) - nottests.add(arg) - ns.args = [] - - # For a partial run, we do not need to clutter the output. - if ns.verbose or ns.header or not (ns.quiet or ns.single or tests or ns.args): - # Print basic platform information - print("==", platform.python_implementation(), *sys.version.split()) - print("== ", platform.platform(aliased=True), - "%s-endian" % sys.byteorder) - print("== ", "hash algorithm:", sys.hash_info.algorithm, - "64bit" if sys.maxsize > 2**32 else "32bit") - print("== ", os.getcwd()) - print("Testing with flags:", sys.flags) - - # if testdir is set, then we are not running the python tests suite, so - # don't add default tests to be executed or skipped (pass empty values) - if ns.testdir: - alltests = findtests(ns.testdir, list(), set()) - else: - alltests = findtests(ns.testdir, stdtests, nottests) - - selected = tests or ns.args or alltests - if ns.single: - selected = selected[:1] - try: - next_single_test = alltests[alltests.index(selected[0])+1] - except IndexError: - next_single_test = None - # Remove all the selected tests that precede start if it's set. - if ns.start: - try: - del selected[:selected.index(ns.start)] - except ValueError: - print("Couldn't find starting test (%s), using all tests" % ns.start) - if ns.randomize: - if ns.random_seed is None: - ns.random_seed = random.randrange(10000000) - random.seed(ns.random_seed) - print("Using random seed", ns.random_seed) - random.shuffle(selected) - if ns.trace: - import trace, tempfile - tracer = trace.Trace(ignoredirs=[sys.base_prefix, sys.base_exec_prefix, - tempfile.gettempdir()], - trace=False, count=True) - - test_times = [] - support.verbose = ns.verbose # Tell tests to be moderately quiet - support.use_resources = ns.use_resources - save_modules = sys.modules.keys() - - def accumulate_result(test, result): + def accumulate_result(self, test, result): ok, test_time = result - test_times.append((test_time, test)) + self.test_times.append((test_time, test)) if ok == PASSED: - good.append(test) + self.good.append(test) elif ok == FAILED: - bad.append(test) + self.bad.append(test) elif ok == ENV_CHANGED: - environment_changed.append(test) + self.environment_changed.append(test) elif ok == SKIPPED: - skipped.append(test) + self.skipped.append(test) elif ok == RESOURCE_DENIED: - skipped.append(test) - resource_denieds.append(test) - - if ns.forever: - def test_forever(tests=list(selected)): - while True: - for test in tests: - yield test - if bad: - return - tests = test_forever() - test_count = '' - test_count_width = 3 - else: - tests = iter(selected) - test_count = '/{}'.format(len(selected)) - test_count_width = len(test_count) - 1 + self.skipped.append(test) + self.resource_denieds.append(test) + + def display_progress(self, test_index, test): + if self.ns.quiet: + return + fmt = "[{1:{0}}{2}/{3}] {4}" if self.bad else "[{1:{0}}{2}] {4}" + print(fmt.format( + self.test_count_width, test_index, self.test_count, len(self.bad), test)) + sys.stdout.flush() + + def setup_regrtest(self): + if self.ns.huntrleaks: + # Avoid false positives due to various caches + # filling slowly with random data: + warm_caches() - if ns.use_mp: + if self.ns.memlimit is not None: + support.set_memlimit(self.ns.memlimit) + + if self.ns.threshold is not None: + if gc is not None: + gc.set_threshold(self.ns.threshold) + else: + print('No GC available, ignore --threshold.') + + if self.ns.nowindows: + import msvcrt + msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS| + msvcrt.SEM_NOALIGNMENTFAULTEXCEPT| + msvcrt.SEM_NOGPFAULTERRORBOX| + msvcrt.SEM_NOOPENFILEERRORBOX) + try: + msvcrt.CrtSetReportMode + except AttributeError: + # release build + pass + else: + for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: + msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) + msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) + + if self.ns.findleaks: + if gc is not None: + # Uncomment the line below to report garbage that is not + # freeable by reference counting alone. By default only + # garbage that is not collectable by the GC is reported. + pass + #gc.set_debug(gc.DEBUG_SAVEALL) + else: + print('No GC available, disabling --findleaks') + self.ns.findleaks = False + + if self.ns.huntrleaks: + unittest.BaseTestSuite._cleanup = False + + # Strip .py extensions. + removepy(self.ns.args) + + if self.ns.trace: + import trace + self.tracer = trace.Trace(ignoredirs=[sys.base_prefix, + sys.base_exec_prefix, + tempfile.gettempdir()], + trace=False, count=True) + + def find_tests(self, tests): + self.tests = tests + + if self.ns.single: + self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest') + try: + with open(self.next_single_filename, 'r') as fp: + next_test = fp.read().strip() + self.tests = [next_test] + except OSError: + pass + + if self.ns.fromfile: + self.tests = [] + with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp: + count_pat = re.compile(r'\[\s*\d+/\s*\d+\]') + for line in fp: + line = count_pat.sub('', line) + guts = line.split() # assuming no test has whitespace in its name + if guts and not guts[0].startswith('#'): + self.tests.extend(guts) + + removepy(self.tests) + + stdtests = STDTESTS[:] + nottests = NOTTESTS.copy() + if self.ns.exclude: + for arg in self.ns.args: + if arg in stdtests: + stdtests.remove(arg) + nottests.add(arg) + self.ns.args = [] + + # For a partial run, we do not need to clutter the output. + if self.ns.verbose or self.ns.header or not (self.ns.quiet or self.ns.single or self.tests or self.ns.args): + # Print basic platform information + print("==", platform.python_implementation(), *sys.version.split()) + print("== ", platform.platform(aliased=True), + "%s-endian" % sys.byteorder) + print("== ", "hash algorithm:", sys.hash_info.algorithm, + "64bit" if sys.maxsize > 2**32 else "32bit") + print("== ", os.getcwd()) + print("Testing with flags:", sys.flags) + + # if testdir is set, then we are not running the python tests suite, so + # don't add default tests to be executed or skipped (pass empty values) + if self.ns.testdir: + alltests = findtests(self.ns.testdir, list(), set()) + else: + alltests = findtests(self.ns.testdir, stdtests, nottests) + + self.selected = self.tests or self.ns.args or alltests + if self.ns.single: + self.selected = self.selected[:1] + try: + pos = alltests.index(self.selected[0]) + self.next_single_test = alltests[pos + 1] + except IndexError: + pass + + # Remove all the self.selected tests that precede start if it's set. + if self.ns.start: + try: + del self.selected[:self.selected.index(self.ns.start)] + except ValueError: + print("Couldn't find starting test (%s), using all tests" % self.ns.start) + + if self.ns.randomize: + if self.ns.random_seed is None: + self.ns.random_seed = random.randrange(10000000) + random.seed(self.ns.random_seed) + print("Using random seed", self.ns.random_seed) + random.shuffle(self.selected) + + def display_result(self): + if self.interrupted: + # print a newline after ^C + print() + print("Test suite interrupted by signal SIGINT.") + omitted = set(self.selected) - set(self.good) - set(self.bad) - set(self.skipped) + print(count(len(omitted), "test"), "omitted:") + printlist(omitted) + + if self.good and not self.ns.quiet: + if not self.bad and not self.skipped and not self.interrupted and len(self.good) > 1: + print("All", end=' ') + print(count(len(self.good), "test"), "OK.") + + if self.ns.print_slow: + self.test_times.sort(reverse=True) + print("10 slowest tests:") + for time, test in self.test_times[:10]: + print("%s: %.1fs" % (test, time)) + + if self.bad: + print(count(len(self.bad), "test"), "failed:") + printlist(self.bad) + + if self.environment_changed: + print("{} altered the execution environment:".format( + count(len(self.environment_changed), "test"))) + printlist(self.environment_changed) + + if self.skipped and not self.ns.quiet: + print(count(len(self.skipped), "test"), "skipped:") + printlist(self.skipped) + + if self.ns.verbose2 and self.bad: + print("Re-running failed tests in verbose mode") + for test in self.bad[:]: + print("Re-running test %r in verbose mode" % test) + sys.stdout.flush() + try: + self.ns.verbose = True + ok = runtest(test, True, self.ns.quiet, self.ns.huntrleaks, + timeout=self.ns.timeout) + except KeyboardInterrupt: + # print a newline separate from the ^C + print() + break + else: + if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: + self.bad.remove(test) + else: + if self.bad: + print(count(len(self.bad), 'test'), "failed again:") + printlist(self.bad) + + def _run_tests_mp(self): try: from threading import Thread except ImportError: print("Multiprocess option requires thread support") sys.exit(2) from queue import Queue + debug_output_pat = re.compile(r"\[\d+ refs, \d+ blocks\]$") output = Queue() - pending = MultiprocessTests(tests) + pending = MultiprocessTests(self.tests) + def work(): # A worker thread. try: @@ -304,7 +388,7 @@ def main(tests=None, **kwargs): except StopIteration: output.put((None, None, None, None)) return - retcode, stdout, stderr = run_test_in_subprocess(test, ns) + retcode, stdout, stderr = run_test_in_subprocess(test, self.ns) # Strip last refcount output line if it exists, since it # comes from the shutdown of the interpreter in the subcommand. stderr = debug_output_pat.sub("", stderr) @@ -321,23 +405,20 @@ def main(tests=None, **kwargs): except BaseException: output.put((None, None, None, None)) raise - workers = [Thread(target=work) for i in range(ns.use_mp)] + + workers = [Thread(target=work) for i in range(self.ns.use_mp)] for worker in workers: worker.start() finished = 0 test_index = 1 try: - while finished < ns.use_mp: + while finished < self.ns.use_mp: test, stdout, stderr, result = output.get() if test is None: finished += 1 continue - accumulate_result(test, result) - if not ns.quiet: - fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}" - print(fmt.format( - test_count_width, test_index, test_count, - len(bad), test)) + self.accumulate_result(test, result) + self.display_progress(test_index, test) if stdout: print(stdout) if stderr: @@ -350,110 +431,99 @@ def main(tests=None, **kwargs): raise Exception("Child error on {}: {}".format(test, result[1])) test_index += 1 except KeyboardInterrupt: - interrupted = True + self.interrupted = True pending.interrupted = True for worker in workers: worker.join() - else: - for test_index, test in enumerate(tests, 1): - if not ns.quiet: - fmt = "[{1:{0}}{2}/{3}] {4}" if bad else "[{1:{0}}{2}] {4}" - print(fmt.format( - test_count_width, test_index, test_count, len(bad), test)) - sys.stdout.flush() - if ns.trace: + + def _run_tests_sequential(self): + save_modules = sys.modules.keys() + + for test_index, test in enumerate(self.tests, 1): + self.display_progress(test_index, test) + if self.ns.trace: # If we're tracing code coverage, then we don't exit with status # if on a false return value from main. - tracer.runctx('runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)', - globals=globals(), locals=vars()) + cmd = 'runtest(test, self.ns.verbose, self.ns.quiet, timeout=self.ns.timeout)' + self.tracer.runctx(cmd, globals=globals(), locals=vars()) else: try: - result = runtest(test, ns.verbose, ns.quiet, - ns.huntrleaks, - output_on_failure=ns.verbose3, - timeout=ns.timeout, failfast=ns.failfast, - match_tests=ns.match_tests) - accumulate_result(test, result) + result = runtest(test, self.ns.verbose, self.ns.quiet, + self.ns.huntrleaks, + output_on_failure=self.ns.verbose3, + timeout=self.ns.timeout, failfast=self.ns.failfast, + match_tests=self.ns.match_tests) + self.accumulate_result(test, result) except KeyboardInterrupt: - interrupted = True + self.interrupted = True break - if ns.findleaks: + if self.ns.findleaks: gc.collect() if gc.garbage: print("Warning: test created", len(gc.garbage), end=' ') print("uncollectable object(s).") # move the uncollectable objects somewhere so we don't see # them again - found_garbage.extend(gc.garbage) + self.found_garbage.extend(gc.garbage) del gc.garbage[:] # Unload the newly imported modules (best effort finalization) for module in sys.modules.keys(): if module not in save_modules and module.startswith("test."): support.unload(module) - if interrupted: - # print a newline after ^C - print() - print("Test suite interrupted by signal SIGINT.") - omitted = set(selected) - set(good) - set(bad) - set(skipped) - print(count(len(omitted), "test"), "omitted:") - printlist(omitted) - if good and not ns.quiet: - if not bad and not skipped and not interrupted and len(good) > 1: - print("All", end=' ') - print(count(len(good), "test"), "OK.") - if ns.print_slow: - test_times.sort(reverse=True) - print("10 slowest tests:") - for time, test in test_times[:10]: - print("%s: %.1fs" % (test, time)) - if bad: - print(count(len(bad), "test"), "failed:") - printlist(bad) - if environment_changed: - print("{} altered the execution environment:".format( - count(len(environment_changed), "test"))) - printlist(environment_changed) - if skipped and not ns.quiet: - print(count(len(skipped), "test"), "skipped:") - printlist(skipped) - - if ns.verbose2 and bad: - print("Re-running failed tests in verbose mode") - for test in bad[:]: - print("Re-running test %r in verbose mode" % test) - sys.stdout.flush() - try: - ns.verbose = True - ok = runtest(test, True, ns.quiet, ns.huntrleaks, - timeout=ns.timeout) - except KeyboardInterrupt: - # print a newline separate from the ^C - print() - break - else: - if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: - bad.remove(test) - else: - if bad: - print(count(len(bad), 'test'), "failed again:") - printlist(bad) - - if ns.single: - if next_single_test: - with open(filename, 'w') as fp: - fp.write(next_single_test + '\n') - else: - os.unlink(filename) + def run_tests(self): + support.verbose = self.ns.verbose # Tell tests to be moderately quiet + support.use_resources = self.ns.use_resources - if ns.trace: - r = tracer.results() - r.write_results(show_missing=True, summary=True, coverdir=ns.coverdir) + if self.ns.forever: + def test_forever(tests): + while True: + for test in tests: + yield test + if self.bad: + return + self.tests = test_forever(list(self.selected)) + self.test_count = '' + self.test_count_width = 3 + else: + self.tests = iter(self.selected) + self.test_count = '/{}'.format(len(self.selected)) + self.test_count_width = len(self.test_count) - 1 - if ns.runleaks: - os.system("leaks %d" % os.getpid()) + if self.ns.use_mp: + self._run_tests_mp() + else: + self._run_tests_sequential() - sys.exit(len(bad) > 0 or interrupted) + def finalize(self): + if self.next_single_filename: + if self.next_single_test: + with open(self.next_single_filename, 'w') as fp: + fp.write(self.next_single_test + '\n') + else: + os.unlink(self.next_single_filename) + + if self.ns.trace: + r = self.tracer.results() + r.write_results(show_missing=True, summary=True, + coverdir=self.ns.coverdir) + + if self.ns.runleaks: + os.system("leaks %d" % os.getpid()) + + def main(self, tests=None, **kwargs): + setup_python() + self.ns = _parse_args(sys.argv[1:], **kwargs) + self.setup_regrtest() + if self.ns.wait: + input("Press any key to continue...") + if self.ns.slaveargs is not None: + slave_runner(self.ns.slaveargs) + self.find_tests(tests) + self.run_tests() + self.display_result() + self.finalize() + sys.exit(len(self.bad) > 0 or self.interrupted) # We do not use a generator so multiple threads can call next(). @@ -518,11 +588,14 @@ def printlist(x, width=70, indent=4): begin each line. """ - from textwrap import fill blanks = ' ' * indent # Print the sorted list: 'x' may be a '--random' list or a set() - print(fill(' '.join(str(elt) for elt in sorted(x)), width, - initial_indent=blanks, subsequent_indent=blanks)) + print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width, + initial_indent=blanks, subsequent_indent=blanks)) + + +def main(tests=None, **kwargs): + Regrtest().main(tests=tests, **kwargs) def main_in_temp_cwd(): |