diff options
author | Victor Stinner <vstinner@redhat.com> | 2019-04-26 06:40:25 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-26 06:40:25 (GMT) |
commit | 3cde440f20a9db75fb2c4e65e8e4d04a53216a2d (patch) | |
tree | c867159d768e27d5c92bd7dc4bef8d749255baa6 /Lib/test/libregrtest | |
parent | 87d23a041d9efb743c5680ac23305ddddf300e51 (diff) | |
download | cpython-3cde440f20a9db75fb2c4e65e8e4d04a53216a2d.zip cpython-3cde440f20a9db75fb2c4e65e8e4d04a53216a2d.tar.gz cpython-3cde440f20a9db75fb2c4e65e8e4d04a53216a2d.tar.bz2 |
bpo-36725: Refactor regrtest multiprocessing code (GH-12961)
Rewrite run_tests_multiprocess() function as a new MultiprocessRunner
class with multiple methods to better report errors and stop
immediately when needed.
Changes:
* Worker processes are now killed immediately if tests are
interrupted or if a test does crash (CHILD_ERROR): worker
processes are killed.
* Rewrite how errors in a worker thread are reported to
the main thread. No longer ignore BaseException or parsing errors
silently.
* Remove 'finished' variable: use worker.is_alive() instead
* Always compute omitted tests. Add Regrtest.get_executed() method.
Diffstat (limited to 'Lib/test/libregrtest')
-rw-r--r-- | Lib/test/libregrtest/main.py | 17 | ||||
-rw-r--r-- | Lib/test/libregrtest/runtest.py | 1 | ||||
-rw-r--r-- | Lib/test/libregrtest/runtest_mp.py | 332 |
3 files changed, 200 insertions, 150 deletions
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index ef1336a..606dc26 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -79,8 +79,8 @@ class Regrtest: self.skipped = [] self.resource_denieds = [] self.environment_changed = [] - self.rerun = [] self.run_no_tests = [] + self.rerun = [] self.first_result = None self.interrupted = False @@ -105,6 +105,11 @@ class Regrtest: # used by --junit-xml self.testsuite_xml = None + def get_executed(self): + return (set(self.good) | set(self.bad) | set(self.skipped) + | set(self.resource_denieds) | set(self.environment_changed) + | set(self.run_no_tests)) + def accumulate_result(self, result): test_name = result.test_name ok = result.result @@ -311,8 +316,6 @@ class Regrtest: self.bad.remove(test_name) if ok.result == INTERRUPTED: - # print a newline separate from the ^C - print() self.interrupted = True break else: @@ -331,11 +334,11 @@ class Regrtest: print("== Tests result: %s ==" % self.get_tests_result()) if self.interrupted: - print() - # print a newline after ^C print("Test suite interrupted by signal SIGINT.") - executed = set(self.good) | set(self.bad) | set(self.skipped) - omitted = set(self.selected) - executed + + omitted = set(self.selected) - self.get_executed() + if omitted: + print() print(count(len(omitted), "test"), "omitted:") printlist(omitted) diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py index 55913b3..c0cfa53 100644 --- a/Lib/test/libregrtest/runtest.py +++ b/Lib/test/libregrtest/runtest.py @@ -275,6 +275,7 @@ def _runtest_inner(ns, test_name, display_failure=True): except support.TestDidNotRun: return TEST_DID_NOT_RUN except KeyboardInterrupt: + print() return INTERRUPTED except: if not ns.pgo: diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py index 0a95bf6..e6c4f4f 100644 --- a/Lib/test/libregrtest/runtest_mp.py +++ b/Lib/test/libregrtest/runtest_mp.py @@ -3,9 +3,11 @@ import faulthandler import json import os import queue +import subprocess import sys import threading import time +import traceback import types from test import support @@ -19,20 +21,12 @@ from test.libregrtest.utils import format_duration # Display the running tests if nothing happened last N seconds PROGRESS_UPDATE = 30.0 # seconds -# If interrupted, display the wait progress every N seconds -WAIT_PROGRESS = 2.0 # seconds +def must_stop(result): + return result.result in (INTERRUPTED, CHILD_ERROR) -def run_test_in_subprocess(testname, ns): - """Run the given test in a subprocess with --worker-args. - - ns is the option Namespace parsed from command-line arguments. regrtest - is invoked in a subprocess with the --worker-args argument; when the - subprocess exits, its return code, stdout and stderr are returned as a - 3-tuple. - """ - from subprocess import Popen, PIPE +def run_test_in_subprocess(testname, ns): ns_dict = vars(ns) worker_args = (ns_dict, testname) worker_args = json.dumps(worker_args) @@ -47,15 +41,12 @@ def run_test_in_subprocess(testname, ns): # Running the child from the same working directory as regrtest's original # invocation ensures that TEMPDIR for the child is the same when # sysconfig.is_python_build() is true. See issue 15300. - popen = Popen(cmd, - stdout=PIPE, stderr=PIPE, - universal_newlines=True, - close_fds=(os.name != 'nt'), - cwd=support.SAVEDCWD) - with popen: - stdout, stderr = popen.communicate() - retcode = popen.wait() - return retcode, stdout, stderr + return subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + close_fds=(os.name != 'nt'), + cwd=support.SAVEDCWD) def run_tests_worker(worker_args): @@ -66,7 +57,6 @@ def run_tests_worker(worker_args): result = runtest(ns, testname) print() # Force a newline (just in case) - print(json.dumps(result), flush=True) sys.exit(0) @@ -77,7 +67,6 @@ class MultiprocessIterator: """A thread-safe iterator over tests for multiprocess mode.""" def __init__(self, tests): - self.interrupted = False self.lock = threading.Lock() self.tests = tests @@ -86,8 +75,6 @@ class MultiprocessIterator: def __next__(self): with self.lock: - if self.interrupted: - raise StopIteration('tests interrupted') return next(self.tests) @@ -102,143 +89,202 @@ class MultiprocessThread(threading.Thread): self.ns = ns self.current_test_name = None self.start_time = None + self._popen = None - def _runtest(self): - try: - test_name = next(self.pending) - except StopIteration: - self.output.put(None) - return True + def kill(self): + if not self.is_alive(): + return + if self._popen is not None: + self._popen.kill() + def _runtest(self, test_name): try: self.start_time = time.monotonic() self.current_test_name = test_name - retcode, stdout, stderr = run_test_in_subprocess(test_name, self.ns) + popen = run_test_in_subprocess(test_name, self.ns) + self._popen = popen + with popen: + try: + stdout, stderr = popen.communicate() + except: + popen.kill() + popen.wait() + raise + + retcode = popen.wait() finally: self.current_test_name = None + self._popen = None + + stdout = stdout.strip() + stderr = stderr.rstrip() + err_msg = None if retcode != 0: + err_msg = "Exit code %s" % retcode + else: + stdout, _, result = stdout.rpartition("\n") + stdout = stdout.rstrip() + if not result: + err_msg = "Failed to parse worker stdout" + else: + try: + # deserialize run_tests_worker() output + result = json.loads(result) + result = TestResult(*result) + except Exception as exc: + err_msg = "Failed to parse worker JSON: %s" % exc + + if err_msg is not None: test_time = time.monotonic() - self.start_time result = TestResult(test_name, CHILD_ERROR, test_time, None) - err_msg = "Exit code %s" % retcode - mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), err_msg) - self.output.put(mp_result) - return False - stdout, _, result = stdout.strip().rpartition("\n") - if not result: - self.output.put(None) - return True - - # deserialize run_tests_worker() output - result = json.loads(result) - result = TestResult(*result) - mp_result = MultiprocessResult(result, stdout.rstrip(), stderr.rstrip(), None) - self.output.put(mp_result) - return False + return MultiprocessResult(result, stdout, stderr, err_msg) def run(self): - try: - stop = False - while not stop: - stop = self._runtest() - except BaseException: - self.output.put(None) - raise + while True: + try: + try: + test_name = next(self.pending) + except StopIteration: + break + mp_result = self._runtest(test_name) + self.output.put((False, mp_result)) -def run_tests_multiprocess(regrtest): - output = queue.Queue() - pending = MultiprocessIterator(regrtest.tests) - test_timeout = regrtest.ns.timeout - use_timeout = (test_timeout is not None) - - workers = [MultiprocessThread(pending, output, regrtest.ns) - for i in range(regrtest.ns.use_mp)] - print("Run tests in parallel using %s child processes" - % len(workers)) + if must_stop(mp_result.result): + break + except BaseException: + self.output.put((True, traceback.format_exc())) + break + + +def get_running(workers): + running = [] for worker in workers: - worker.start() - - def get_running(workers): - running = [] - for worker in workers: - current_test_name = worker.current_test_name - if not current_test_name: - continue - dt = time.monotonic() - worker.start_time - if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test_name, format_duration(dt)) - running.append(text) - return running - - finished = 0 - test_index = 1 - get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME) - try: - while finished < regrtest.ns.use_mp: - if use_timeout: - faulthandler.dump_traceback_later(test_timeout, exit=True) + current_test_name = worker.current_test_name + if not current_test_name: + continue + dt = time.monotonic() - worker.start_time + if dt >= PROGRESS_MIN_TIME: + text = '%s (%s)' % (current_test_name, format_duration(dt)) + running.append(text) + return running + + +class MultiprocessRunner: + def __init__(self, regrtest): + self.regrtest = regrtest + self.ns = regrtest.ns + self.output = queue.Queue() + self.pending = MultiprocessIterator(self.regrtest.tests) + if self.ns.timeout is not None: + self.test_timeout = self.ns.timeout * 1.5 + else: + self.test_timeout = None + self.workers = None + + def start_workers(self): + self.workers = [MultiprocessThread(self.pending, self.output, self.ns) + for _ in range(self.ns.use_mp)] + print("Run tests in parallel using %s child processes" + % len(self.workers)) + for worker in self.workers: + worker.start() + + def wait_workers(self): + for worker in self.workers: + worker.kill() + for worker in self.workers: + worker.join() + + def _get_result(self): + if not any(worker.is_alive() for worker in self.workers): + # all worker threads are done: consume pending results + try: + return self.output.get(timeout=0) + except queue.Empty: + return None + + while True: + if self.test_timeout is not None: + faulthandler.dump_traceback_later(self.test_timeout, exit=True) + # wait for a thread + timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME) try: - mp_result = output.get(timeout=get_timeout) + return self.output.get(timeout=timeout) except queue.Empty: - running = get_running(workers) - if running and not regrtest.ns.pgo: - print('running: %s' % ', '.join(running), flush=True) - continue - - if mp_result is None: - finished += 1 - continue - result = mp_result.result - regrtest.accumulate_result(result) - - # Display progress - ok = result.result - - text = format_test_result(result) - if (ok not in (CHILD_ERROR, INTERRUPTED) - and result.test_time >= PROGRESS_MIN_TIME - and not regrtest.ns.pgo): - text += ' (%s)' % format_duration(result.test_time) - elif ok == CHILD_ERROR: - text = '%s (%s)' % (text, mp_result.error_msg) - running = get_running(workers) - if running and not regrtest.ns.pgo: - text += ' -- running: %s' % ', '.join(running) - regrtest.display_progress(test_index, text) - - # Copy stdout and stderr from the child process - if mp_result.stdout: - print(mp_result.stdout, flush=True) - if mp_result.stderr and not regrtest.ns.pgo: - print(mp_result.stderr, file=sys.stderr, flush=True) - - if result.result == INTERRUPTED: - raise KeyboardInterrupt - test_index += 1 - except KeyboardInterrupt: - regrtest.interrupted = True - pending.interrupted = True - print() - finally: - if use_timeout: - faulthandler.cancel_dump_traceback_later() - - # If tests are interrupted, wait until tests complete - wait_start = time.monotonic() - while True: - running = [worker.current_test_name for worker in workers] - running = list(filter(bool, running)) - if not running: - break - - dt = time.monotonic() - wait_start - line = "Waiting for %s (%s tests)" % (', '.join(running), len(running)) - if dt >= WAIT_PROGRESS: - line = "%s since %.0f sec" % (line, dt) - print(line, flush=True) - for worker in workers: - worker.join(WAIT_PROGRESS) + pass + + # display progress + running = get_running(self.workers) + if running and not self.ns.pgo: + print('running: %s' % ', '.join(running), flush=True) + + def display_result(self, mp_result): + result = mp_result.result + + text = format_test_result(result) + if mp_result.error_msg is not None: + # CHILD_ERROR + text += ' (%s)' % mp_result.error_msg + elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo): + text += ' (%s)' % format_duration(result.test_time) + running = get_running(self.workers) + if running and not self.ns.pgo: + text += ' -- running: %s' % ', '.join(running) + self.regrtest.display_progress(self.test_index, text) + + def _process_result(self, item): + if item[0]: + # Thread got an exception + format_exc = item[1] + print(f"regrtest worker thread failed: {format_exc}", + file=sys.stderr, flush=True) + return True + + self.test_index += 1 + mp_result = item[1] + self.regrtest.accumulate_result(mp_result.result) + self.display_result(mp_result) + + if mp_result.stdout: + print(mp_result.stdout, flush=True) + if mp_result.stderr and not self.ns.pgo: + print(mp_result.stderr, file=sys.stderr, flush=True) + + if mp_result.result.result == INTERRUPTED: + self.regrtest.interrupted = True + + if must_stop(mp_result.result): + return True + + return False + + def run_tests(self): + self.start_workers() + + self.test_index = 0 + try: + while True: + item = self._get_result() + if item is None: + break + + stop = self._process_result(item) + if stop: + break + except KeyboardInterrupt: + print() + self.regrtest.interrupted = True + finally: + if self.test_timeout is not None: + faulthandler.cancel_dump_traceback_later() + + self.wait_workers() + + +def run_tests_multiprocess(regrtest): + MultiprocessRunner(regrtest).run_tests() |