From 0c139b5f2fb9c8a9e6df58e5da9d4a992d17926d Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Mon, 11 Sep 2023 05:27:37 +0200 Subject: gh-109162: libregrtest: rename runtest_mp.py to run_workers.py (#109248) * Rename runtest_mp.py to run_workers.py * Move exit_timeout() and temp_cwd() context managers from Regrtest.main() to Regrtest.run_tests(). Actions like --list-tests or --list-cases don't need these protections. * Regrtest: remove selected and tests attributes. Pass 'selected' to list_tests(), list_cases() and run_tests(). display_result() now expects a TestTuple, instead of TestList. * Rename setup_tests() to setup_process() and rename setup_support() to setup_tests(). * Move _adjust_resource_limits() to utils and rename it to adjust_rlimit_nofile(). * Move replace_stdout() to utils. * Fix RunTests.verbose type: it's an int. --- Lib/test/libregrtest/main.py | 133 +++++----- Lib/test/libregrtest/result.py | 4 +- Lib/test/libregrtest/results.py | 4 +- Lib/test/libregrtest/run_workers.py | 495 +++++++++++++++++++++++++++++++++++ Lib/test/libregrtest/runtest_mp.py | 496 ------------------------------------ Lib/test/libregrtest/runtests.py | 4 +- Lib/test/libregrtest/setup.py | 124 +++------ Lib/test/libregrtest/single.py | 4 +- Lib/test/libregrtest/utils.py | 53 ++++ Lib/test/libregrtest/worker.py | 4 +- 10 files changed, 664 insertions(+), 657 deletions(-) create mode 100644 Lib/test/libregrtest/run_workers.py delete mode 100644 Lib/test/libregrtest/runtest_mp.py diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 0864cbb..31eab99 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -15,12 +15,12 @@ from test.libregrtest.findtests import findtests, split_test_packages from test.libregrtest.logger import Logger from test.libregrtest.result import State from test.libregrtest.runtests import RunTests, HuntRefleak -from test.libregrtest.setup import setup_tests, setup_test_dir +from test.libregrtest.setup import setup_process, setup_test_dir from test.libregrtest.single import run_single_test, PROGRESS_MIN_TIME from test.libregrtest.pgo import setup_pgo_tests from test.libregrtest.results import TestResults from test.libregrtest.utils import ( - StrPath, StrJSON, TestName, TestList, FilterTuple, + StrPath, StrJSON, TestName, TestList, TestTuple, FilterTuple, strip_py_suffix, count, format_duration, printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout, abs_module_name) @@ -51,7 +51,7 @@ class Regrtest: """ def __init__(self, ns: Namespace): # Log verbosity - self.verbose: bool = ns.verbose + self.verbose: int = int(ns.verbose) self.quiet: bool = ns.quiet self.pgo: bool = ns.pgo self.pgo_extended: bool = ns.pgo_extended @@ -122,8 +122,6 @@ class Regrtest: self.tmp_dir: StrPath | None = ns.tempdir # tests - self.tests = [] - self.selected: TestList = [] self.first_runtests: RunTests | None = None # used by --slowest @@ -140,18 +138,18 @@ class Regrtest: def log(self, line=''): self.logger.log(line) - def find_tests(self): + def find_tests(self, tests: TestList | None = None) -> tuple[TestTuple, TestList | None]: if self.single_test_run: self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') try: with open(self.next_single_filename, 'r') as fp: next_test = fp.read().strip() - self.tests = [next_test] + tests = [next_test] except OSError: pass if self.fromfile: - self.tests = [] + tests = [] # regex to match 'test_builtin' in line: # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') @@ -161,9 +159,9 @@ class Regrtest: line = line.strip() match = regex.search(line) if match is not None: - self.tests.append(match.group()) + tests.append(match.group()) - strip_py_suffix(self.tests) + strip_py_suffix(tests) if self.pgo: # add default PGO tests if no tests are specified @@ -179,18 +177,18 @@ class Regrtest: exclude=exclude_tests) if not self.fromfile: - self.selected = self.tests or self.cmdline_args - if self.selected: - self.selected = split_test_packages(self.selected) + selected = tests or self.cmdline_args + if selected: + selected = split_test_packages(selected) else: - self.selected = alltests + selected = alltests else: - self.selected = self.tests + selected = tests if self.single_test_run: - self.selected = self.selected[:1] + selected = selected[:1] try: - pos = alltests.index(self.selected[0]) + pos = alltests.index(selected[0]) self.next_single_test = alltests[pos + 1] except IndexError: pass @@ -198,7 +196,7 @@ class Regrtest: # Remove all the selected tests that precede start if it's set. if self.starting_test: try: - del self.selected[:self.selected.index(self.starting_test)] + del selected[:selected.index(self.starting_test)] except ValueError: print(f"Cannot find starting test: {self.starting_test}") sys.exit(1) @@ -207,10 +205,12 @@ class Regrtest: if self.random_seed is None: self.random_seed = random.randrange(100_000_000) random.seed(self.random_seed) - random.shuffle(self.selected) + random.shuffle(selected) + + return (tuple(selected), tests) @staticmethod - def list_tests(tests: TestList): + def list_tests(tests: TestTuple): for name in tests: print(name) @@ -224,12 +224,12 @@ class Regrtest: if support.match_test(test): print(test.id()) - def list_cases(self): + def list_cases(self, tests: TestTuple): support.verbose = False support.set_match_tests(self.match_tests, self.ignore_tests) skipped = [] - for test_name in self.selected: + for test_name in tests: module_name = abs_module_name(test_name, self.test_dir) try: suite = unittest.defaultTestLoader.loadTestsFromName(module_name) @@ -247,6 +247,10 @@ class Regrtest: def _rerun_failed_tests(self, runtests: RunTests): # Configure the runner to re-run tests if self.num_workers == 0: + # Always run tests in fresh processes to have more deterministic + # initial state. Don't re-run tests in parallel but limit to a + # single worker process to have side effects (on the system load + # and timings) between tests. self.num_workers = 1 tests, match_tests_dict = self.results.prepare_rerun() @@ -294,7 +298,8 @@ class Regrtest: print() print(f"== Tests result: {state} ==") - self.results.display_result(self.selected, self.quiet, self.print_slowest) + self.results.display_result(runtests.tests, + self.quiet, self.print_slowest) def run_test(self, test_name: TestName, runtests: RunTests, tracer): if tracer is not None: @@ -404,7 +409,7 @@ class Regrtest: return state def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None: - from test.libregrtest.runtest_mp import RunWorkers + from test.libregrtest.run_workers import RunWorkers RunWorkers(num_workers, runtests, self.logger, self.results).run() def finalize_tests(self, tracer): @@ -454,39 +459,9 @@ class Regrtest: print("Remove file: %s" % name) os_helper.unlink(name) - def main(self, tests: TestList | None = None): - if self.junit_filename and not os.path.isabs(self.junit_filename): - self.junit_filename = os.path.abspath(self.junit_filename) - - self.tests = tests - - strip_py_suffix(self.cmdline_args) - - self.tmp_dir = get_temp_dir(self.tmp_dir) - - if self.want_cleanup: - self.cleanup_temp_dir(self.tmp_dir) - sys.exit(0) - - os.makedirs(self.tmp_dir, exist_ok=True) - work_dir = get_work_dir(parent_dir=self.tmp_dir) - - with exit_timeout(): - # Run the tests in a context manager that temporarily changes the - # CWD to a temporary and writable directory. If it's not possible - # to create or change the CWD, the original CWD will be used. - # The original CWD is available from os_helper.SAVEDCWD. - with os_helper.temp_cwd(work_dir, quiet=True): - # When using multiprocessing, worker processes will use - # work_dir as their parent temporary directory. So when the - # main process exit, it removes also subdirectories of worker - # processes. - - self._main() - - def create_run_tests(self): + def create_run_tests(self, tests: TestTuple): return RunTests( - tuple(self.selected), + tests, fail_fast=self.fail_fast, match_tests=self.match_tests, ignore_tests=self.ignore_tests, @@ -506,7 +481,7 @@ class Regrtest: python_cmd=self.python_cmd, ) - def run_tests(self) -> int: + def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int: if self.hunt_refleak and self.hunt_refleak.warmups < 3: msg = ("WARNING: Running tests with --huntrleaks/-R and " "less than 3 warmup repetitions can give false positives!") @@ -520,17 +495,17 @@ class Regrtest: # For a partial run, we do not need to clutter the output. if (self.want_header or not(self.pgo or self.quiet or self.single_test_run - or self.tests or self.cmdline_args)): + or tests or self.cmdline_args)): self.display_header() if self.randomize: print("Using random seed", self.random_seed) - runtests = self.create_run_tests() + runtests = self.create_run_tests(selected) self.first_runtests = runtests self.logger.set_tests(runtests) - setup_tests(runtests) + setup_process() self.logger.start_load_tracker() try: @@ -553,20 +528,48 @@ class Regrtest: return self.results.get_exitcode(self.fail_env_changed, self.fail_rerun) - def _main(self): + def run_tests(self, selected: TestTuple, tests: TestList | None) -> int: + os.makedirs(self.tmp_dir, exist_ok=True) + work_dir = get_work_dir(parent_dir=self.tmp_dir) + + # Put a timeout on Python exit + with exit_timeout(): + # Run the tests in a context manager that temporarily changes the + # CWD to a temporary and writable directory. If it's not possible + # to create or change the CWD, the original CWD will be used. + # The original CWD is available from os_helper.SAVEDCWD. + with os_helper.temp_cwd(work_dir, quiet=True): + # When using multiprocessing, worker processes will use + # work_dir as their parent temporary directory. So when the + # main process exit, it removes also subdirectories of worker + # processes. + return self._run_tests(selected, tests) + + def main(self, tests: TestList | None = None): + if self.junit_filename and not os.path.isabs(self.junit_filename): + self.junit_filename = os.path.abspath(self.junit_filename) + + strip_py_suffix(self.cmdline_args) + + self.tmp_dir = get_temp_dir(self.tmp_dir) + + if self.want_cleanup: + self.cleanup_temp_dir(self.tmp_dir) + sys.exit(0) + if self.want_wait: input("Press any key to continue...") setup_test_dir(self.test_dir) - self.find_tests() + selected, tests = self.find_tests(tests) exitcode = 0 if self.want_list_tests: - self.list_tests(self.selected) + self.list_tests(selected) elif self.want_list_cases: - self.list_cases() + self.list_cases(selected) else: - exitcode = self.run_tests() + exitcode = self.run_tests(selected, tests) sys.exit(exitcode) diff --git a/Lib/test/libregrtest/result.py b/Lib/test/libregrtest/result.py index 4a68872..b73494d 100644 --- a/Lib/test/libregrtest/result.py +++ b/Lib/test/libregrtest/result.py @@ -5,7 +5,7 @@ from typing import Any from test.support import TestStats from test.libregrtest.utils import ( - TestName, FilterTuple, + StrJSON, TestName, FilterTuple, format_duration, normalize_test_name, print_warning) @@ -160,7 +160,7 @@ class TestResult: json.dump(self, file, cls=_EncodeTestResult) @staticmethod - def from_json(worker_json) -> 'TestResult': + def from_json(worker_json: StrJSON) -> 'TestResult': return json.loads(worker_json, object_hook=_decode_test_result) diff --git a/Lib/test/libregrtest/results.py b/Lib/test/libregrtest/results.py index b7a044e..6a07c2f 100644 --- a/Lib/test/libregrtest/results.py +++ b/Lib/test/libregrtest/results.py @@ -106,7 +106,7 @@ class TestResults: xml_data = result.xml_data if xml_data: - self.add_junit(result.xml_data) + self.add_junit(xml_data) def need_rerun(self): return bool(self.bad_results) @@ -163,7 +163,7 @@ class TestResults: for s in ET.tostringlist(root): f.write(s) - def display_result(self, tests: TestList, quiet: bool, print_slowest: bool): + def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool): if self.interrupted: print("Test suite interrupted by signal SIGINT.") diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py new file mode 100644 index 0000000..6267fe5 --- /dev/null +++ b/Lib/test/libregrtest/run_workers.py @@ -0,0 +1,495 @@ +import dataclasses +import faulthandler +import os.path +import queue +import signal +import subprocess +import sys +import tempfile +import threading +import time +import traceback +from typing import Literal, TextIO + +from test import support +from test.support import os_helper + +from test.libregrtest.logger import Logger +from test.libregrtest.result import TestResult, State +from test.libregrtest.results import TestResults +from test.libregrtest.runtests import RunTests +from test.libregrtest.single import PROGRESS_MIN_TIME +from test.libregrtest.utils import ( + StrPath, TestName, + format_duration, print_warning) +from test.libregrtest.worker import create_worker_process, USE_PROCESS_GROUP + +if sys.platform == 'win32': + import locale + + +# Display the running tests if nothing happened last N seconds +PROGRESS_UPDATE = 30.0 # seconds +assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME + +# Kill the main process after 5 minutes. It is supposed to write an update +# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest +# buildbot workers. +MAIN_PROCESS_TIMEOUT = 5 * 60.0 +assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE + +# Time to wait until a worker completes: should be immediate +JOIN_TIMEOUT = 30.0 # seconds + + +# We do not use a generator so multiple threads can call next(). +class MultiprocessIterator: + + """A thread-safe iterator over tests for multiprocess mode.""" + + def __init__(self, tests_iter): + self.lock = threading.Lock() + self.tests_iter = tests_iter + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + if self.tests_iter is None: + raise StopIteration + return next(self.tests_iter) + + def stop(self): + with self.lock: + self.tests_iter = None + + +@dataclasses.dataclass(slots=True, frozen=True) +class MultiprocessResult: + result: TestResult + # bpo-45410: stderr is written into stdout to keep messages order + worker_stdout: str | None = None + err_msg: str | None = None + + +ExcStr = str +QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] + + +class ExitThread(Exception): + pass + + +class WorkerThread(threading.Thread): + def __init__(self, worker_id: int, runner: "RunWorkers") -> None: + super().__init__() + self.worker_id = worker_id + self.runtests = runner.runtests + self.pending = runner.pending + self.output = runner.output + self.timeout = runner.worker_timeout + self.log = runner.log + self.current_test_name = None + self.start_time = None + self._popen = None + self._killed = False + self._stopped = False + + def __repr__(self) -> str: + info = [f'WorkerThread #{self.worker_id}'] + if self.is_alive(): + info.append("running") + else: + info.append('stopped') + test = self.current_test_name + if test: + info.append(f'test={test}') + popen = self._popen + if popen is not None: + dt = time.monotonic() - self.start_time + info.extend((f'pid={self._popen.pid}', + f'time={format_duration(dt)}')) + return '<%s>' % ' '.join(info) + + def _kill(self) -> None: + popen = self._popen + if popen is None: + return + + if self._killed: + return + self._killed = True + + if USE_PROCESS_GROUP: + what = f"{self} process group" + else: + what = f"{self}" + + print(f"Kill {what}", file=sys.stderr, flush=True) + try: + if USE_PROCESS_GROUP: + os.killpg(popen.pid, signal.SIGKILL) + else: + popen.kill() + except ProcessLookupError: + # popen.kill(): the process completed, the WorkerThread thread + # read its exit status, but Popen.send_signal() read the returncode + # just before Popen.wait() set returncode. + pass + except OSError as exc: + print_warning(f"Failed to kill {what}: {exc!r}") + + def stop(self) -> None: + # Method called from a different thread to stop this thread + self._stopped = True + self._kill() + + def mp_result_error( + self, + test_result: TestResult, + stdout: str | None = None, + err_msg=None + ) -> MultiprocessResult: + return MultiprocessResult(test_result, stdout, err_msg) + + def _run_process(self, runtests: RunTests, output_file: TextIO, + tmp_dir: StrPath | None = None) -> int: + try: + popen = create_worker_process(runtests, output_file, tmp_dir) + + self._killed = False + self._popen = popen + except: + self.current_test_name = None + raise + + try: + if self._stopped: + # If kill() has been called before self._popen is set, + # self._popen is still running. Call again kill() + # to ensure that the process is killed. + self._kill() + raise ExitThread + + try: + # gh-94026: stdout+stderr are written to tempfile + retcode = popen.wait(timeout=self.timeout) + assert retcode is not None + return retcode + except subprocess.TimeoutExpired: + if self._stopped: + # kill() has been called: communicate() fails on reading + # closed stdout + raise ExitThread + + # On timeout, kill the process + self._kill() + + # None means TIMEOUT for the caller + retcode = None + # bpo-38207: Don't attempt to call communicate() again: on it + # can hang until all child processes using stdout + # pipes completes. + except OSError: + if self._stopped: + # kill() has been called: communicate() fails + # on reading closed stdout + raise ExitThread + raise + except: + self._kill() + raise + finally: + self._wait_completed() + self._popen = None + self.current_test_name = None + + def _runtest(self, test_name: TestName) -> MultiprocessResult: + self.current_test_name = test_name + + if sys.platform == 'win32': + # gh-95027: When stdout is not a TTY, Python uses the ANSI code + # page for the sys.stdout encoding. If the main process runs in a + # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding. + encoding = locale.getencoding() + else: + encoding = sys.stdout.encoding + + tests = (test_name,) + if self.runtests.rerun: + match_tests = self.runtests.get_match_tests(test_name) + else: + match_tests = None + kwargs = {} + if match_tests: + kwargs['match_tests'] = match_tests + worker_runtests = self.runtests.copy(tests=tests, **kwargs) + + # gh-94026: Write stdout+stderr to a tempfile as workaround for + # non-blocking pipes on Emscripten with NodeJS. + with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file: + # gh-93353: Check for leaked temporary files in the parent process, + # since the deletion of temporary files can happen late during + # Python finalization: too late for libregrtest. + if not support.is_wasi: + # Don't check for leaked temporary files and directories if Python is + # run on WASI. WASI don't pass environment variables like TMPDIR to + # worker processes. + tmp_dir = tempfile.mkdtemp(prefix="test_python_") + tmp_dir = os.path.abspath(tmp_dir) + try: + retcode = self._run_process(worker_runtests, stdout_file, tmp_dir) + finally: + tmp_files = os.listdir(tmp_dir) + os_helper.rmtree(tmp_dir) + else: + retcode = self._run_process(worker_runtests, stdout_file) + tmp_files = () + stdout_file.seek(0) + + try: + stdout = stdout_file.read().strip() + except Exception as exc: + # gh-101634: Catch UnicodeDecodeError if stdout cannot be + # decoded from encoding + err_msg = f"Cannot read process stdout: {exc}" + result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) + return self.mp_result_error(result, err_msg=err_msg) + + if retcode is None: + result = TestResult(test_name, state=State.TIMEOUT) + return self.mp_result_error(result, stdout) + + err_msg = None + if retcode != 0: + err_msg = "Exit code %s" % retcode + else: + stdout, _, worker_json = stdout.rpartition("\n") + stdout = stdout.rstrip() + if not worker_json: + err_msg = "Failed to parse worker stdout" + else: + try: + result = TestResult.from_json(worker_json) + except Exception as exc: + err_msg = "Failed to parse worker JSON: %s" % exc + + if err_msg: + result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) + return self.mp_result_error(result, stdout, err_msg) + + if tmp_files: + msg = (f'\n\n' + f'Warning -- {test_name} leaked temporary files ' + f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}') + stdout += msg + result.set_env_changed() + + return MultiprocessResult(result, stdout) + + def run(self) -> None: + fail_fast = self.runtests.fail_fast + fail_env_changed = self.runtests.fail_env_changed + while not self._stopped: + try: + try: + test_name = next(self.pending) + except StopIteration: + break + + self.start_time = time.monotonic() + mp_result = self._runtest(test_name) + mp_result.result.duration = time.monotonic() - self.start_time + self.output.put((False, mp_result)) + + if mp_result.result.must_stop(fail_fast, fail_env_changed): + break + except ExitThread: + break + except BaseException: + self.output.put((True, traceback.format_exc())) + break + + def _wait_completed(self) -> None: + popen = self._popen + + try: + popen.wait(JOIN_TIMEOUT) + except (subprocess.TimeoutExpired, OSError) as exc: + print_warning(f"Failed to wait for {self} completion " + f"(timeout={format_duration(JOIN_TIMEOUT)}): " + f"{exc!r}") + + def wait_stopped(self, start_time: float) -> None: + # bpo-38207: RunWorkers.stop_workers() called self.stop() + # which killed the process. Sometimes, killing the process from the + # main thread does not interrupt popen.communicate() in + # WorkerThread thread. This loop with a timeout is a workaround + # for that. + # + # Moreover, if this method fails to join the thread, it is likely + # that Python will hang at exit while calling threading._shutdown() + # which tries again to join the blocked thread. Regrtest.main() + # uses EXIT_TIMEOUT to workaround this second bug. + while True: + # Write a message every second + self.join(1.0) + if not self.is_alive(): + break + dt = time.monotonic() - start_time + self.log(f"Waiting for {self} thread for {format_duration(dt)}") + if dt > JOIN_TIMEOUT: + print_warning(f"Failed to join {self} in {format_duration(dt)}") + break + + +def get_running(workers: list[WorkerThread]) -> list[str]: + running = [] + for worker in workers: + current_test_name = worker.current_test_name + if not current_test_name: + continue + dt = time.monotonic() - worker.start_time + if dt >= PROGRESS_MIN_TIME: + text = '%s (%s)' % (current_test_name, format_duration(dt)) + running.append(text) + if not running: + return None + return f"running ({len(running)}): {', '.join(running)}" + + +class RunWorkers: + def __init__(self, num_workers: int, runtests: RunTests, + logger: Logger, results: TestResult) -> None: + self.num_workers = num_workers + self.runtests = runtests + self.log = logger.log + self.display_progress = logger.display_progress + self.results: TestResults = results + + self.output: queue.Queue[QueueOutput] = queue.Queue() + tests_iter = runtests.iter_tests() + self.pending = MultiprocessIterator(tests_iter) + self.timeout = runtests.timeout + if self.timeout is not None: + # Rely on faulthandler to kill a worker process. This timouet is + # when faulthandler fails to kill a worker process. Give a maximum + # of 5 minutes to faulthandler to kill the worker. + self.worker_timeout = min(self.timeout * 1.5, self.timeout + 5 * 60) + else: + self.worker_timeout = None + self.workers = None + + def start_workers(self) -> None: + self.workers = [WorkerThread(index, self) + for index in range(1, self.num_workers + 1)] + msg = f"Run tests in parallel using {len(self.workers)} child processes" + if self.timeout: + msg += (" (timeout: %s, worker timeout: %s)" + % (format_duration(self.timeout), + format_duration(self.worker_timeout))) + self.log(msg) + for worker in self.workers: + worker.start() + + def stop_workers(self) -> None: + start_time = time.monotonic() + for worker in self.workers: + worker.stop() + for worker in self.workers: + worker.wait_stopped(start_time) + + def _get_result(self) -> QueueOutput | None: + pgo = self.runtests.pgo + use_faulthandler = (self.timeout is not None) + + # bpo-46205: check the status of workers every iteration to avoid + # waiting forever on an empty queue. + while any(worker.is_alive() for worker in self.workers): + if use_faulthandler: + faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, + exit=True) + + # wait for a thread + try: + return self.output.get(timeout=PROGRESS_UPDATE) + except queue.Empty: + pass + + if not pgo: + # display progress + running = get_running(self.workers) + if running: + self.log(running) + + # all worker threads are done: consume pending results + try: + return self.output.get(timeout=0) + except queue.Empty: + return None + + def display_result(self, mp_result: MultiprocessResult) -> None: + result = mp_result.result + pgo = self.runtests.pgo + + text = str(result) + if mp_result.err_msg: + # MULTIPROCESSING_ERROR + text += ' (%s)' % mp_result.err_msg + elif (result.duration >= PROGRESS_MIN_TIME and not pgo): + text += ' (%s)' % format_duration(result.duration) + if not pgo: + running = get_running(self.workers) + if running: + text += f' -- {running}' + self.display_progress(self.test_index, text) + + def _process_result(self, item: QueueOutput) -> bool: + """Returns True if test runner must stop.""" + if item[0]: + # Thread got an exception + format_exc = item[1] + print_warning(f"regrtest worker thread failed: {format_exc}") + result = TestResult("", state=State.MULTIPROCESSING_ERROR) + self.results.accumulate_result(result, self.runtests) + return result + + self.test_index += 1 + mp_result = item[1] + result = mp_result.result + self.results.accumulate_result(result, self.runtests) + self.display_result(mp_result) + + if mp_result.worker_stdout: + print(mp_result.worker_stdout, flush=True) + + return result + + def run(self) -> None: + fail_fast = self.runtests.fail_fast + fail_env_changed = self.runtests.fail_env_changed + + self.start_workers() + + self.test_index = 0 + try: + while True: + item = self._get_result() + if item is None: + break + + result = self._process_result(item) + if result.must_stop(fail_fast, fail_env_changed): + break + except KeyboardInterrupt: + print() + self.results.interrupted = True + finally: + if self.timeout is not None: + faulthandler.cancel_dump_traceback_later() + + # Always ensure that all worker processes are no longer + # worker when we exit this function + self.pending.stop() + self.stop_workers() diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py deleted file mode 100644 index 96b2ac5..0000000 --- a/Lib/test/libregrtest/runtest_mp.py +++ /dev/null @@ -1,496 +0,0 @@ -import dataclasses -import faulthandler -import os.path -import queue -import signal -import subprocess -import sys -import tempfile -import threading -import time -import traceback -from typing import Literal, TextIO - -from test import support -from test.support import os_helper - -from test.libregrtest.logger import Logger -from test.libregrtest.main import Regrtest -from test.libregrtest.result import TestResult, State -from test.libregrtest.results import TestResults -from test.libregrtest.runtests import RunTests -from test.libregrtest.single import PROGRESS_MIN_TIME -from test.libregrtest.utils import ( - StrPath, TestName, - format_duration, print_warning) -from test.libregrtest.worker import create_worker_process, USE_PROCESS_GROUP - -if sys.platform == 'win32': - import locale - - -# Display the running tests if nothing happened last N seconds -PROGRESS_UPDATE = 30.0 # seconds -assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME - -# Kill the main process after 5 minutes. It is supposed to write an update -# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest -# buildbot workers. -MAIN_PROCESS_TIMEOUT = 5 * 60.0 -assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE - -# Time to wait until a worker completes: should be immediate -JOIN_TIMEOUT = 30.0 # seconds - - -# We do not use a generator so multiple threads can call next(). -class MultiprocessIterator: - - """A thread-safe iterator over tests for multiprocess mode.""" - - def __init__(self, tests_iter): - self.lock = threading.Lock() - self.tests_iter = tests_iter - - def __iter__(self): - return self - - def __next__(self): - with self.lock: - if self.tests_iter is None: - raise StopIteration - return next(self.tests_iter) - - def stop(self): - with self.lock: - self.tests_iter = None - - -@dataclasses.dataclass(slots=True, frozen=True) -class MultiprocessResult: - result: TestResult - # bpo-45410: stderr is written into stdout to keep messages order - worker_stdout: str | None = None - err_msg: str | None = None - - -ExcStr = str -QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] - - -class ExitThread(Exception): - pass - - -class WorkerThread(threading.Thread): - def __init__(self, worker_id: int, runner: "RunWorkers") -> None: - super().__init__() - self.worker_id = worker_id - self.runtests = runner.runtests - self.pending = runner.pending - self.output = runner.output - self.timeout = runner.worker_timeout - self.log = runner.log - self.current_test_name = None - self.start_time = None - self._popen = None - self._killed = False - self._stopped = False - - def __repr__(self) -> str: - info = [f'WorkerThread #{self.worker_id}'] - if self.is_alive(): - info.append("running") - else: - info.append('stopped') - test = self.current_test_name - if test: - info.append(f'test={test}') - popen = self._popen - if popen is not None: - dt = time.monotonic() - self.start_time - info.extend((f'pid={self._popen.pid}', - f'time={format_duration(dt)}')) - return '<%s>' % ' '.join(info) - - def _kill(self) -> None: - popen = self._popen - if popen is None: - return - - if self._killed: - return - self._killed = True - - if USE_PROCESS_GROUP: - what = f"{self} process group" - else: - what = f"{self}" - - print(f"Kill {what}", file=sys.stderr, flush=True) - try: - if USE_PROCESS_GROUP: - os.killpg(popen.pid, signal.SIGKILL) - else: - popen.kill() - except ProcessLookupError: - # popen.kill(): the process completed, the WorkerThread thread - # read its exit status, but Popen.send_signal() read the returncode - # just before Popen.wait() set returncode. - pass - except OSError as exc: - print_warning(f"Failed to kill {what}: {exc!r}") - - def stop(self) -> None: - # Method called from a different thread to stop this thread - self._stopped = True - self._kill() - - def mp_result_error( - self, - test_result: TestResult, - stdout: str | None = None, - err_msg=None - ) -> MultiprocessResult: - return MultiprocessResult(test_result, stdout, err_msg) - - def _run_process(self, worker_job, output_file: TextIO, - tmp_dir: StrPath | None = None) -> int: - try: - popen = create_worker_process(worker_job, output_file, tmp_dir) - - self._killed = False - self._popen = popen - except: - self.current_test_name = None - raise - - try: - if self._stopped: - # If kill() has been called before self._popen is set, - # self._popen is still running. Call again kill() - # to ensure that the process is killed. - self._kill() - raise ExitThread - - try: - # gh-94026: stdout+stderr are written to tempfile - retcode = popen.wait(timeout=self.timeout) - assert retcode is not None - return retcode - except subprocess.TimeoutExpired: - if self._stopped: - # kill() has been called: communicate() fails on reading - # closed stdout - raise ExitThread - - # On timeout, kill the process - self._kill() - - # None means TIMEOUT for the caller - retcode = None - # bpo-38207: Don't attempt to call communicate() again: on it - # can hang until all child processes using stdout - # pipes completes. - except OSError: - if self._stopped: - # kill() has been called: communicate() fails - # on reading closed stdout - raise ExitThread - raise - except: - self._kill() - raise - finally: - self._wait_completed() - self._popen = None - self.current_test_name = None - - def _runtest(self, test_name: TestName) -> MultiprocessResult: - self.current_test_name = test_name - - if sys.platform == 'win32': - # gh-95027: When stdout is not a TTY, Python uses the ANSI code - # page for the sys.stdout encoding. If the main process runs in a - # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding. - encoding = locale.getencoding() - else: - encoding = sys.stdout.encoding - - tests = (test_name,) - if self.runtests.rerun: - match_tests = self.runtests.get_match_tests(test_name) - else: - match_tests = None - kwargs = {} - if match_tests: - kwargs['match_tests'] = match_tests - worker_runtests = self.runtests.copy(tests=tests, **kwargs) - - # gh-94026: Write stdout+stderr to a tempfile as workaround for - # non-blocking pipes on Emscripten with NodeJS. - with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file: - # gh-93353: Check for leaked temporary files in the parent process, - # since the deletion of temporary files can happen late during - # Python finalization: too late for libregrtest. - if not support.is_wasi: - # Don't check for leaked temporary files and directories if Python is - # run on WASI. WASI don't pass environment variables like TMPDIR to - # worker processes. - tmp_dir = tempfile.mkdtemp(prefix="test_python_") - tmp_dir = os.path.abspath(tmp_dir) - try: - retcode = self._run_process(worker_runtests, stdout_file, tmp_dir) - finally: - tmp_files = os.listdir(tmp_dir) - os_helper.rmtree(tmp_dir) - else: - retcode = self._run_process(worker_runtests, stdout_file) - tmp_files = () - stdout_file.seek(0) - - try: - stdout = stdout_file.read().strip() - except Exception as exc: - # gh-101634: Catch UnicodeDecodeError if stdout cannot be - # decoded from encoding - err_msg = f"Cannot read process stdout: {exc}" - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, err_msg=err_msg) - - if retcode is None: - result = TestResult(test_name, state=State.TIMEOUT) - return self.mp_result_error(result, stdout) - - err_msg = None - if retcode != 0: - err_msg = "Exit code %s" % retcode - else: - stdout, _, worker_json = stdout.rpartition("\n") - stdout = stdout.rstrip() - if not worker_json: - err_msg = "Failed to parse worker stdout" - else: - try: - result = TestResult.from_json(worker_json) - except Exception as exc: - err_msg = "Failed to parse worker JSON: %s" % exc - - if err_msg: - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, stdout, err_msg) - - if tmp_files: - msg = (f'\n\n' - f'Warning -- {test_name} leaked temporary files ' - f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}') - stdout += msg - result.set_env_changed() - - return MultiprocessResult(result, stdout) - - def run(self) -> None: - fail_fast = self.runtests.fail_fast - fail_env_changed = self.runtests.fail_env_changed - while not self._stopped: - try: - try: - test_name = next(self.pending) - except StopIteration: - break - - self.start_time = time.monotonic() - mp_result = self._runtest(test_name) - mp_result.result.duration = time.monotonic() - self.start_time - self.output.put((False, mp_result)) - - if mp_result.result.must_stop(fail_fast, fail_env_changed): - break - except ExitThread: - break - except BaseException: - self.output.put((True, traceback.format_exc())) - break - - def _wait_completed(self) -> None: - popen = self._popen - - try: - popen.wait(JOIN_TIMEOUT) - except (subprocess.TimeoutExpired, OSError) as exc: - print_warning(f"Failed to wait for {self} completion " - f"(timeout={format_duration(JOIN_TIMEOUT)}): " - f"{exc!r}") - - def wait_stopped(self, start_time: float) -> None: - # bpo-38207: RunWorkers.stop_workers() called self.stop() - # which killed the process. Sometimes, killing the process from the - # main thread does not interrupt popen.communicate() in - # WorkerThread thread. This loop with a timeout is a workaround - # for that. - # - # Moreover, if this method fails to join the thread, it is likely - # that Python will hang at exit while calling threading._shutdown() - # which tries again to join the blocked thread. Regrtest.main() - # uses EXIT_TIMEOUT to workaround this second bug. - while True: - # Write a message every second - self.join(1.0) - if not self.is_alive(): - break - dt = time.monotonic() - start_time - self.log(f"Waiting for {self} thread for {format_duration(dt)}") - if dt > JOIN_TIMEOUT: - print_warning(f"Failed to join {self} in {format_duration(dt)}") - break - - -def get_running(workers: list[WorkerThread]) -> list[str]: - running = [] - for worker in workers: - current_test_name = worker.current_test_name - if not current_test_name: - continue - dt = time.monotonic() - worker.start_time - if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test_name, format_duration(dt)) - running.append(text) - if not running: - return None - return f"running ({len(running)}): {', '.join(running)}" - - -class RunWorkers: - def __init__(self, num_workers: int, runtests: RunTests, - logger: Logger, results: TestResult) -> None: - self.num_workers = num_workers - self.runtests = runtests - self.log = logger.log - self.display_progress = logger.display_progress - self.results: TestResults = results - - self.output: queue.Queue[QueueOutput] = queue.Queue() - tests_iter = runtests.iter_tests() - self.pending = MultiprocessIterator(tests_iter) - self.timeout = runtests.timeout - if self.timeout is not None: - # Rely on faulthandler to kill a worker process. This timouet is - # when faulthandler fails to kill a worker process. Give a maximum - # of 5 minutes to faulthandler to kill the worker. - self.worker_timeout = min(self.timeout * 1.5, self.timeout + 5 * 60) - else: - self.worker_timeout = None - self.workers = None - - def start_workers(self) -> None: - self.workers = [WorkerThread(index, self) - for index in range(1, self.num_workers + 1)] - msg = f"Run tests in parallel using {len(self.workers)} child processes" - if self.timeout: - msg += (" (timeout: %s, worker timeout: %s)" - % (format_duration(self.timeout), - format_duration(self.worker_timeout))) - self.log(msg) - for worker in self.workers: - worker.start() - - def stop_workers(self) -> None: - start_time = time.monotonic() - for worker in self.workers: - worker.stop() - for worker in self.workers: - worker.wait_stopped(start_time) - - def _get_result(self) -> QueueOutput | None: - pgo = self.runtests.pgo - use_faulthandler = (self.timeout is not None) - - # bpo-46205: check the status of workers every iteration to avoid - # waiting forever on an empty queue. - while any(worker.is_alive() for worker in self.workers): - if use_faulthandler: - faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, - exit=True) - - # wait for a thread - try: - return self.output.get(timeout=PROGRESS_UPDATE) - except queue.Empty: - pass - - if not pgo: - # display progress - running = get_running(self.workers) - if running: - self.log(running) - - # all worker threads are done: consume pending results - try: - return self.output.get(timeout=0) - except queue.Empty: - return None - - def display_result(self, mp_result: MultiprocessResult) -> None: - result = mp_result.result - pgo = self.runtests.pgo - - text = str(result) - if mp_result.err_msg: - # MULTIPROCESSING_ERROR - text += ' (%s)' % mp_result.err_msg - elif (result.duration >= PROGRESS_MIN_TIME and not pgo): - text += ' (%s)' % format_duration(result.duration) - if not pgo: - running = get_running(self.workers) - if running: - text += f' -- {running}' - self.display_progress(self.test_index, text) - - def _process_result(self, item: QueueOutput) -> bool: - """Returns True if test runner must stop.""" - if item[0]: - # Thread got an exception - format_exc = item[1] - print_warning(f"regrtest worker thread failed: {format_exc}") - result = TestResult("", state=State.MULTIPROCESSING_ERROR) - self.results.accumulate_result(result, self.runtests) - return result - - self.test_index += 1 - mp_result = item[1] - result = mp_result.result - self.results.accumulate_result(result, self.runtests) - self.display_result(mp_result) - - if mp_result.worker_stdout: - print(mp_result.worker_stdout, flush=True) - - return result - - def run(self) -> None: - fail_fast = self.runtests.fail_fast - fail_env_changed = self.runtests.fail_env_changed - - self.start_workers() - - self.test_index = 0 - try: - while True: - item = self._get_result() - if item is None: - break - - result = self._process_result(item) - if result.must_stop(fail_fast, fail_env_changed): - break - except KeyboardInterrupt: - print() - self.results.interrupted = True - finally: - if self.timeout is not None: - faulthandler.cancel_dump_traceback_later() - - # Always ensure that all worker processes are no longer - # worker when we exit this function - self.pending.stop() - self.stop_workers() diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py index 366c6f1..e16e79e 100644 --- a/Lib/test/libregrtest/runtests.py +++ b/Lib/test/libregrtest/runtests.py @@ -27,14 +27,14 @@ class RunTests: pgo_extended: bool = False output_on_failure: bool = False timeout: float | None = None - verbose: bool = False + verbose: int = 0 quiet: bool = False hunt_refleak: HuntRefleak | None = None test_dir: StrPath | None = None use_junit: bool = False memory_limit: str | None = None gc_threshold: int | None = None - use_resources: list[str] = None + use_resources: list[str] = dataclasses.field(default_factory=list) python_cmd: list[str] | None = None def copy(self, **override): diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py index 20ef3dc..c3d8127 100644 --- a/Lib/test/libregrtest/setup.py +++ b/Lib/test/libregrtest/setup.py @@ -1,4 +1,3 @@ -import atexit import faulthandler import os import signal @@ -13,7 +12,8 @@ except ImportError: from test.libregrtest.runtests import RunTests from test.libregrtest.utils import ( - setup_unraisable_hook, setup_threading_excepthook, fix_umask) + setup_unraisable_hook, setup_threading_excepthook, fix_umask, + replace_stdout, adjust_rlimit_nofile) UNICODE_GUARD_ENV = "PYTHONREGRTEST_UNICODE_GUARD" @@ -26,19 +26,7 @@ def setup_test_dir(testdir: str | None) -> None: sys.path.insert(0, os.path.abspath(testdir)) -def setup_support(runtests: RunTests): - support.PGO = runtests.pgo - support.PGO_EXTENDED = runtests.pgo_extended - support.set_match_tests(runtests.match_tests, runtests.ignore_tests) - support.failfast = runtests.fail_fast - support.verbose = runtests.verbose - if runtests.use_junit: - support.junit_xml_list = [] - else: - support.junit_xml_list = None - - -def setup_tests(runtests): +def setup_process(): fix_umask() try: @@ -62,7 +50,7 @@ def setup_tests(runtests): for signum in signals: faulthandler.register(signum, chain=True, file=stderr_fd) - _adjust_resource_limits() + adjust_rlimit_nofile() replace_stdout() support.record_original_stdout(sys.stdout) @@ -83,19 +71,6 @@ def setup_tests(runtests): if getattr(module, '__file__', None): module.__file__ = os.path.abspath(module.__file__) - if runtests.hunt_refleak: - unittest.BaseTestSuite._cleanup = False - - if runtests.memory_limit is not None: - support.set_memlimit(runtests.memory_limit) - - if runtests.gc_threshold is not None: - gc.set_threshold(runtests.gc_threshold) - - support.suppress_msvcrt_asserts(runtests.verbose and runtests.verbose >= 2) - - support.use_resources = runtests.use_resources - if hasattr(sys, 'addaudithook'): # Add an auditing hook for all tests to ensure PySys_Audit is tested def _test_audit_hook(name, args): @@ -105,6 +80,36 @@ def setup_tests(runtests): setup_unraisable_hook() setup_threading_excepthook() + # Ensure there's a non-ASCII character in env vars at all times to force + # tests consider this case. See BPO-44647 for details. + if TESTFN_UNDECODABLE and os.supports_bytes_environ: + os.environb.setdefault(UNICODE_GUARD_ENV.encode(), TESTFN_UNDECODABLE) + elif FS_NONASCII: + os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII) + + +def setup_tests(runtests: RunTests): + support.verbose = runtests.verbose + support.failfast = runtests.fail_fast + support.PGO = runtests.pgo + support.PGO_EXTENDED = runtests.pgo_extended + + support.set_match_tests(runtests.match_tests, runtests.ignore_tests) + + if runtests.use_junit: + support.junit_xml_list = [] + from test.support.testresult import RegressionTestResult + RegressionTestResult.USE_XML = True + else: + support.junit_xml_list = None + + if runtests.memory_limit is not None: + support.set_memlimit(runtests.memory_limit) + + support.suppress_msvcrt_asserts(runtests.verbose >= 2) + + support.use_resources = runtests.use_resources + timeout = runtests.timeout if timeout is not None: # For a slow buildbot worker, increase SHORT_TIMEOUT and LONG_TIMEOUT @@ -117,61 +122,8 @@ def setup_tests(runtests): support.SHORT_TIMEOUT = min(support.SHORT_TIMEOUT, timeout) support.LONG_TIMEOUT = min(support.LONG_TIMEOUT, timeout) - if runtests.use_junit: - from test.support.testresult import RegressionTestResult - RegressionTestResult.USE_XML = True - - # Ensure there's a non-ASCII character in env vars at all times to force - # tests consider this case. See BPO-44647 for details. - if TESTFN_UNDECODABLE and os.supports_bytes_environ: - os.environb.setdefault(UNICODE_GUARD_ENV.encode(), TESTFN_UNDECODABLE) - elif FS_NONASCII: - os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII) - - -def replace_stdout(): - """Set stdout encoder error handler to backslashreplace (as stderr error - handler) to avoid UnicodeEncodeError when printing a traceback""" - stdout = sys.stdout - try: - fd = stdout.fileno() - except ValueError: - # On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper - # object. Leaving sys.stdout unchanged. - # - # Catch ValueError to catch io.UnsupportedOperation on TextIOBase - # and ValueError on a closed stream. - return - - sys.stdout = open(fd, 'w', - encoding=stdout.encoding, - errors="backslashreplace", - closefd=False, - newline='\n') - - def restore_stdout(): - sys.stdout.close() - sys.stdout = stdout - atexit.register(restore_stdout) - + if runtests.hunt_refleak: + unittest.BaseTestSuite._cleanup = False -def _adjust_resource_limits(): - """Adjust the system resource limits (ulimit) if needed.""" - try: - import resource - from resource import RLIMIT_NOFILE - except ImportError: - return - fd_limit, max_fds = resource.getrlimit(RLIMIT_NOFILE) - # On macOS the default fd limit is sometimes too low (256) for our - # test suite to succeed. Raise it to something more reasonable. - # 1024 is a common Linux default. - desired_fds = 1024 - if fd_limit < desired_fds and fd_limit < max_fds: - new_fd_limit = min(desired_fds, max_fds) - try: - resource.setrlimit(RLIMIT_NOFILE, (new_fd_limit, max_fds)) - print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}") - except (ValueError, OSError) as err: - print(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to " - f"{new_fd_limit}: {err}.") + if runtests.gc_threshold is not None: + gc.set_threshold(runtests.gc_threshold) diff --git a/Lib/test/libregrtest/single.py b/Lib/test/libregrtest/single.py index bb33387..0cb3192 100644 --- a/Lib/test/libregrtest/single.py +++ b/Lib/test/libregrtest/single.py @@ -15,7 +15,7 @@ from test.support import threading_helper from test.libregrtest.result import State, TestResult from test.libregrtest.runtests import RunTests from test.libregrtest.save_env import saved_test_environment -from test.libregrtest.setup import setup_support +from test.libregrtest.setup import setup_tests from test.libregrtest.utils import ( TestName, clear_caches, remove_testfn, abs_module_name, print_warning) @@ -201,7 +201,7 @@ def _runtest(result: TestResult, runtests: RunTests) -> None: faulthandler.dump_traceback_later(timeout, exit=True) try: - setup_support(runtests) + setup_tests(runtests) if output_on_failure: support.verbose = True diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py index 011d287..f97e3fd 100644 --- a/Lib/test/libregrtest/utils.py +++ b/Lib/test/libregrtest/utils.py @@ -1,3 +1,4 @@ +import atexit import contextlib import faulthandler import math @@ -471,3 +472,55 @@ def normalize_test_name(test_full_name, *, is_error=False): rpar = test_full_name.index(')') return test_full_name[lpar + 1: rpar].split('.')[-1] return short_name + + +def replace_stdout(): + """Set stdout encoder error handler to backslashreplace (as stderr error + handler) to avoid UnicodeEncodeError when printing a traceback""" + stdout = sys.stdout + try: + fd = stdout.fileno() + except ValueError: + # On IDLE, sys.stdout has no file descriptor and is not a TextIOWrapper + # object. Leaving sys.stdout unchanged. + # + # Catch ValueError to catch io.UnsupportedOperation on TextIOBase + # and ValueError on a closed stream. + return + + sys.stdout = open(fd, 'w', + encoding=stdout.encoding, + errors="backslashreplace", + closefd=False, + newline='\n') + + def restore_stdout(): + sys.stdout.close() + sys.stdout = stdout + atexit.register(restore_stdout) + + +def adjust_rlimit_nofile(): + """ + On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256) + for our test suite to succeed. Raise it to something more reasonable. 1024 + is a common Linux default. + """ + try: + import resource + except ImportError: + return + + fd_limit, max_fds = resource.getrlimit(resource.RLIMIT_NOFILE) + + desired_fds = 1024 + + if fd_limit < desired_fds and fd_limit < max_fds: + new_fd_limit = min(desired_fds, max_fds) + try: + resource.setrlimit(resource.RLIMIT_NOFILE, + (new_fd_limit, max_fds)) + print(f"Raised RLIMIT_NOFILE: {fd_limit} -> {new_fd_limit}") + except (ValueError, OSError) as err: + print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to " + f"{new_fd_limit}: {err}.") diff --git a/Lib/test/libregrtest/worker.py b/Lib/test/libregrtest/worker.py index 24251c3..b9fb031 100644 --- a/Lib/test/libregrtest/worker.py +++ b/Lib/test/libregrtest/worker.py @@ -6,7 +6,7 @@ from typing import TextIO, NoReturn from test import support from test.support import os_helper -from test.libregrtest.setup import setup_tests, setup_test_dir +from test.libregrtest.setup import setup_process, setup_test_dir from test.libregrtest.runtests import RunTests from test.libregrtest.single import run_single_test from test.libregrtest.utils import ( @@ -60,7 +60,7 @@ def worker_process(worker_json: StrJSON) -> NoReturn: match_tests: FilterTuple | None = runtests.match_tests setup_test_dir(runtests.test_dir) - setup_tests(runtests) + setup_process() if runtests.rerun: if match_tests: -- cgit v0.12