diff options
Diffstat (limited to 'Lib/test/libregrtest/runtest_mp.py')
-rw-r--r-- | Lib/test/libregrtest/runtest_mp.py | 464 |
1 files changed, 0 insertions, 464 deletions
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py deleted file mode 100644 index fc12ea7..0000000 --- a/Lib/test/libregrtest/runtest_mp.py +++ /dev/null @@ -1,464 +0,0 @@ -import collections -import faulthandler -import json -import os -import queue -import signal -import subprocess -import sys -import threading -import time -import traceback -import types -from test import support - -from test.libregrtest.runtest import ( - runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME, - format_test_result, TestResult, is_failed, TIMEOUT) -from test.libregrtest.setup import setup_tests -from test.libregrtest.utils import format_duration, print_warning - - -# Display the running tests if nothing happened last N seconds -PROGRESS_UPDATE = 30.0 # seconds -assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME - -# Kill the main process after 5 minutes. It is supposed to write an update -# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest -# buildbot workers. -MAIN_PROCESS_TIMEOUT = 5 * 60.0 -assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE - -# Time to wait until a worker completes: should be immediate -JOIN_TIMEOUT = 30.0 # seconds - -USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) - - -def must_stop(result, ns): - if result.result == INTERRUPTED: - return True - if ns.failfast and is_failed(result, ns): - return True - return False - - -def parse_worker_args(worker_args): - ns_dict, test_name = json.loads(worker_args) - ns = types.SimpleNamespace(**ns_dict) - return (ns, test_name) - - -def run_test_in_subprocess(testname, ns): - ns_dict = vars(ns) - worker_args = (ns_dict, testname) - worker_args = json.dumps(worker_args) - - cmd = [sys.executable, *support.args_from_interpreter_flags(), - '-u', # Unbuffered stdout and stderr - '-m', 'test.regrtest', - '--worker-args', worker_args] - - # Running the child from the same working directory as regrtest's original - # invocation ensures that TEMPDIR for the child is the same when - # sysconfig.is_python_build() is true. See issue 15300. - kw = {} - if USE_PROCESS_GROUP: - kw['start_new_session'] = True - return subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - close_fds=(os.name != 'nt'), - cwd=support.SAVEDCWD, - **kw) - - -def run_tests_worker(ns, test_name): - setup_tests(ns) - - result = runtest(ns, test_name) - - print() # Force a newline (just in case) - - # Serialize TestResult as list in JSON - print(json.dumps(list(result)), flush=True) - sys.exit(0) - - -# We do not use a generator so multiple threads can call next(). -class MultiprocessIterator: - - """A thread-safe iterator over tests for multiprocess mode.""" - - def __init__(self, tests_iter): - self.lock = threading.Lock() - self.tests_iter = tests_iter - - def __iter__(self): - return self - - def __next__(self): - with self.lock: - if self.tests_iter is None: - raise StopIteration - return next(self.tests_iter) - - def stop(self): - with self.lock: - self.tests_iter = None - - -MultiprocessResult = collections.namedtuple('MultiprocessResult', - 'result stdout stderr error_msg') - -class ExitThread(Exception): - pass - - -class TestWorkerProcess(threading.Thread): - def __init__(self, worker_id, runner): - super().__init__() - self.worker_id = worker_id - self.pending = runner.pending - self.output = runner.output - self.ns = runner.ns - self.timeout = runner.worker_timeout - self.regrtest = runner.regrtest - self.current_test_name = None - self.start_time = None - self._popen = None - self._killed = False - self._stopped = False - - def __repr__(self): - info = [f'TestWorkerProcess #{self.worker_id}'] - if self.is_alive(): - info.append("running") - else: - info.append('stopped') - test = self.current_test_name - if test: - info.append(f'test={test}') - popen = self._popen - if popen is not None: - dt = time.monotonic() - self.start_time - info.extend((f'pid={self._popen.pid}', - f'time={format_duration(dt)}')) - return '<%s>' % ' '.join(info) - - def _kill(self): - popen = self._popen - if popen is None: - return - - if self._killed: - return - self._killed = True - - if USE_PROCESS_GROUP: - what = f"{self} process group" - else: - what = f"{self}" - - print(f"Kill {what}", file=sys.stderr, flush=True) - try: - if USE_PROCESS_GROUP: - os.killpg(popen.pid, signal.SIGKILL) - else: - popen.kill() - except ProcessLookupError: - # popen.kill(): the process completed, the TestWorkerProcess thread - # read its exit status, but Popen.send_signal() read the returncode - # just before Popen.wait() set returncode. - pass - except OSError as exc: - print_warning(f"Failed to kill {what}: {exc!r}") - - def stop(self): - # Method called from a different thread to stop this thread - self._stopped = True - self._kill() - - def mp_result_error(self, test_name, error_type, stdout='', stderr='', - err_msg=None): - test_time = time.monotonic() - self.start_time - result = TestResult(test_name, error_type, test_time, None) - return MultiprocessResult(result, stdout, stderr, err_msg) - - def _run_process(self, test_name): - self.start_time = time.monotonic() - - self.current_test_name = test_name - try: - popen = run_test_in_subprocess(test_name, self.ns) - - self._killed = False - self._popen = popen - except: - self.current_test_name = None - raise - - try: - if self._stopped: - # If kill() has been called before self._popen is set, - # self._popen is still running. Call again kill() - # to ensure that the process is killed. - self._kill() - raise ExitThread - - try: - stdout, stderr = popen.communicate(timeout=self.timeout) - retcode = popen.returncode - assert retcode is not None - except subprocess.TimeoutExpired: - if self._stopped: - # kill() has been called: communicate() fails - # on reading closed stdout/stderr - raise ExitThread - - # On timeout, kill the process - self._kill() - - # None means TIMEOUT for the caller - retcode = None - # bpo-38207: Don't attempt to call communicate() again: on it - # can hang until all child processes using stdout and stderr - # pipes completes. - stdout = stderr = '' - except OSError: - if self._stopped: - # kill() has been called: communicate() fails - # on reading closed stdout/stderr - raise ExitThread - raise - else: - stdout = stdout.strip() - stderr = stderr.rstrip() - - return (retcode, stdout, stderr) - except: - self._kill() - raise - finally: - self._wait_completed() - self._popen = None - self.current_test_name = None - - def _runtest(self, test_name): - retcode, stdout, stderr = self._run_process(test_name) - - if retcode is None: - return self.mp_result_error(test_name, TIMEOUT, stdout, stderr) - - err_msg = None - if retcode != 0: - err_msg = "Exit code %s" % retcode - else: - stdout, _, result = stdout.rpartition("\n") - stdout = stdout.rstrip() - if not result: - err_msg = "Failed to parse worker stdout" - else: - try: - # deserialize run_tests_worker() output - result = json.loads(result) - result = TestResult(*result) - except Exception as exc: - err_msg = "Failed to parse worker JSON: %s" % exc - - if err_msg is not None: - return self.mp_result_error(test_name, CHILD_ERROR, - stdout, stderr, err_msg) - - return MultiprocessResult(result, stdout, stderr, err_msg) - - def run(self): - while not self._stopped: - try: - try: - test_name = next(self.pending) - except StopIteration: - break - - mp_result = self._runtest(test_name) - self.output.put((False, mp_result)) - - if must_stop(mp_result.result, self.ns): - break - except ExitThread: - break - except BaseException: - self.output.put((True, traceback.format_exc())) - break - - def _wait_completed(self): - popen = self._popen - - # stdout and stderr must be closed to ensure that communicate() - # does not hang - popen.stdout.close() - popen.stderr.close() - - try: - popen.wait(JOIN_TIMEOUT) - except (subprocess.TimeoutExpired, OSError) as exc: - print_warning(f"Failed to wait for {self} completion " - f"(timeout={format_duration(JOIN_TIMEOUT)}): " - f"{exc!r}") - - def wait_stopped(self, start_time): - # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() - # which killed the process. Sometimes, killing the process from the - # main thread does not interrupt popen.communicate() in - # TestWorkerProcess thread. This loop with a timeout is a workaround - # for that. - # - # Moreover, if this method fails to join the thread, it is likely - # that Python will hang at exit while calling threading._shutdown() - # which tries again to join the blocked thread. Regrtest.main() - # uses EXIT_TIMEOUT to workaround this second bug. - while True: - # Write a message every second - self.join(1.0) - if not self.is_alive(): - break - dt = time.monotonic() - start_time - self.regrtest.log(f"Waiting for {self} thread " - f"for {format_duration(dt)}") - if dt > JOIN_TIMEOUT: - print_warning(f"Failed to join {self} in {format_duration(dt)}") - break - - -def get_running(workers): - running = [] - for worker in workers: - current_test_name = worker.current_test_name - if not current_test_name: - continue - dt = time.monotonic() - worker.start_time - if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test_name, format_duration(dt)) - running.append(text) - return running - - -class MultiprocessTestRunner: - def __init__(self, regrtest): - self.regrtest = regrtest - self.log = self.regrtest.log - self.ns = regrtest.ns - self.output = queue.Queue() - self.pending = MultiprocessIterator(self.regrtest.tests) - if self.ns.timeout is not None: - self.worker_timeout = self.ns.timeout * 1.5 - else: - self.worker_timeout = None - self.workers = None - - def start_workers(self): - self.workers = [TestWorkerProcess(index, self) - for index in range(1, self.ns.use_mp + 1)] - self.log("Run tests in parallel using %s child processes" - % len(self.workers)) - for worker in self.workers: - worker.start() - - def stop_workers(self): - start_time = time.monotonic() - for worker in self.workers: - worker.stop() - for worker in self.workers: - worker.wait_stopped(start_time) - - def _get_result(self): - if not any(worker.is_alive() for worker in self.workers): - # all worker threads are done: consume pending results - try: - return self.output.get(timeout=0) - except queue.Empty: - return None - - use_faulthandler = (self.ns.timeout is not None) - timeout = PROGRESS_UPDATE - while True: - if use_faulthandler: - faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, - exit=True) - - # wait for a thread - try: - return self.output.get(timeout=timeout) - except queue.Empty: - pass - - # display progress - running = get_running(self.workers) - if running and not self.ns.pgo: - self.log('running: %s' % ', '.join(running)) - - def display_result(self, mp_result): - result = mp_result.result - - text = format_test_result(result) - if mp_result.error_msg is not None: - # CHILD_ERROR - text += ' (%s)' % mp_result.error_msg - elif (result.test_time >= PROGRESS_MIN_TIME and not self.ns.pgo): - text += ' (%s)' % format_duration(result.test_time) - running = get_running(self.workers) - if running and not self.ns.pgo: - text += ' -- running: %s' % ', '.join(running) - self.regrtest.display_progress(self.test_index, text) - - def _process_result(self, item): - if item[0]: - # Thread got an exception - format_exc = item[1] - print_warning(f"regrtest worker thread failed: {format_exc}") - return True - - self.test_index += 1 - mp_result = item[1] - self.regrtest.accumulate_result(mp_result.result) - self.display_result(mp_result) - - if mp_result.stdout: - print(mp_result.stdout, flush=True) - if mp_result.stderr and not self.ns.pgo: - print(mp_result.stderr, file=sys.stderr, flush=True) - - if must_stop(mp_result.result, self.ns): - return True - - return False - - def run_tests(self): - self.start_workers() - - self.test_index = 0 - try: - while True: - item = self._get_result() - if item is None: - break - - stop = self._process_result(item) - if stop: - break - except KeyboardInterrupt: - print() - self.regrtest.interrupted = True - finally: - if self.ns.timeout is not None: - faulthandler.cancel_dump_traceback_later() - - # Always ensure that all worker processes are no longer - # worker when we exit this function - self.pending.stop() - self.stop_workers() - - -def run_tests_multiprocess(regrtest): - MultiprocessTestRunner(regrtest).run_tests() |