diff options
author | Victor Stinner <vstinner@python.org> | 2022-06-15 09:42:10 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-15 09:42:10 (GMT) |
commit | 7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16 (patch) | |
tree | 7bc4498a4c5ad027208a714ff9281d2f4639e301 /Lib/test | |
parent | 4e9fa71d7e8e2f322f0b81b315ddc921f57384c0 (diff) | |
download | cpython-7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16.zip cpython-7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16.tar.gz cpython-7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16.tar.bz2 |
Add test.support.busy_retry() (#93770)
Add busy_retry() and sleeping_retry() functions to test.support.
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 60 | ||||
-rw-r--r-- | Lib/test/fork_wait.py | 6 | ||||
-rw-r--r-- | Lib/test/support/__init__.py | 76 | ||||
-rw-r--r-- | Lib/test/test__xxsubinterpreters.py | 11 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 10 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing_main_handling.py | 25 | ||||
-rw-r--r-- | Lib/test/test_signal.py | 25 | ||||
-rw-r--r-- | Lib/test/test_ssl.py | 5 | ||||
-rw-r--r-- | Lib/test/test_support.py | 12 | ||||
-rw-r--r-- | Lib/test/test_wait3.py | 5 | ||||
-rw-r--r-- | Lib/test/test_wait4.py | 5 |
11 files changed, 141 insertions, 99 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 4a588d9..dca5a19 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4313,18 +4313,13 @@ class _TestSharedMemory(BaseTestCase): p.terminate() p.wait() - deadline = time.monotonic() + support.LONG_TIMEOUT - t = 0.1 - while time.monotonic() < deadline: - time.sleep(t) - t = min(t*2, 5) + err_msg = ("A SharedMemory segment was leaked after " + "a process was abruptly terminated") + for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg): try: smm = shared_memory.SharedMemory(name, create=False) except FileNotFoundError: break - else: - raise AssertionError("A SharedMemory segment was leaked after" - " a process was abruptly terminated.") if os.name == 'posix': # Without this line it was raising warnings like: @@ -5334,9 +5329,10 @@ class TestResourceTracker(unittest.TestCase): p.terminate() p.wait() - deadline = time.monotonic() + support.LONG_TIMEOUT - while time.monotonic() < deadline: - time.sleep(.5) + err_msg = (f"A {rtype} resource was leaked after a process was " + f"abruptly terminated") + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, + err_msg): try: _resource_unlink(name2, rtype) except OSError as e: @@ -5344,10 +5340,7 @@ class TestResourceTracker(unittest.TestCase): # EINVAL self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) break - else: - raise AssertionError( - f"A {rtype} resource was leaked after a process was " - f"abruptly terminated.") + err = p.stderr.read().decode('utf-8') p.stderr.close() expected = ('resource_tracker: There appear to be 2 leaked {} ' @@ -5575,18 +5568,17 @@ class TestSyncManagerTypes(unittest.TestCase): # but this can take a bit on slow machines, so wait a few seconds # if there are other children too (see #17395). join_process(self.proc) + start_time = time.monotonic() - t = 0.01 - while len(multiprocessing.active_children()) > 1: - time.sleep(t) - t *= 2 - dt = time.monotonic() - start_time - if dt >= 5.0: - test.support.environment_altered = True - support.print_warning(f"multiprocessing.Manager still has " - f"{multiprocessing.active_children()} " - f"active children after {dt} seconds") + for _ in support.sleeping_retry(5.0, error=False): + if len(multiprocessing.active_children()) <= 1: break + else: + dt = time.monotonic() - start_time + support.environment_altered = True + support.print_warning(f"multiprocessing.Manager still has " + f"{multiprocessing.active_children()} " + f"active children after {dt:.1f} seconds") def run_worker(self, worker, obj): self.proc = multiprocessing.Process(target=worker, args=(obj, )) @@ -5884,17 +5876,15 @@ class ManagerMixin(BaseMixin): # but this can take a bit on slow machines, so wait a few seconds # if there are other children too (see #17395) start_time = time.monotonic() - t = 0.01 - while len(multiprocessing.active_children()) > 1: - time.sleep(t) - t *= 2 - dt = time.monotonic() - start_time - if dt >= 5.0: - test.support.environment_altered = True - support.print_warning(f"multiprocessing.Manager still has " - f"{multiprocessing.active_children()} " - f"active children after {dt} seconds") + for _ in support.sleeping_retry(5.0, error=False): + if len(multiprocessing.active_children()) <= 1: break + else: + dt = time.monotonic() - start_time + support.environment_altered = True + support.print_warning(f"multiprocessing.Manager still has " + f"{multiprocessing.active_children()} " + f"active children after {dt:.1f} seconds") gc.collect() # do garbage collection if cls.manager._number_of_objects() != 0: diff --git a/Lib/test/fork_wait.py b/Lib/test/fork_wait.py index 4d3dbd8..c565f59 100644 --- a/Lib/test/fork_wait.py +++ b/Lib/test/fork_wait.py @@ -54,10 +54,8 @@ class ForkWait(unittest.TestCase): self.threads.append(thread) # busy-loop to wait for threads - deadline = time.monotonic() + support.SHORT_TIMEOUT - while len(self.alive) < NUM_THREADS: - time.sleep(0.1) - if deadline < time.monotonic(): + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): + if len(self.alive) >= NUM_THREADS: break a = sorted(self.alive.keys()) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 4baaeb7..a62e8b4 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -2250,3 +2250,79 @@ def late_deletion(obj): pass atfork_func.reference = ref_cycle os.register_at_fork(before=atfork_func) + + +def busy_retry(timeout, err_msg=None, /, *, error=True): + """ + Run the loop body until "break" stops the loop. + + After *timeout* seconds, raise an AssertionError if *error* is true, + or just stop if *error is false. + + Example: + + for _ in support.busy_retry(support.SHORT_TIMEOUT): + if check(): + break + + Example of error=False usage: + + for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False): + if check(): + break + else: + raise RuntimeError('my custom error') + + """ + if timeout <= 0: + raise ValueError("timeout must be greater than zero") + + start_time = time.monotonic() + deadline = start_time + timeout + + while True: + yield + + if time.monotonic() >= deadline: + break + + if error: + dt = time.monotonic() - start_time + msg = f"timeout ({dt:.1f} seconds)" + if err_msg: + msg = f"{msg}: {err_msg}" + raise AssertionError(msg) + + +def sleeping_retry(timeout, err_msg=None, /, + *, init_delay=0.010, max_delay=1.0, error=True): + """ + Wait strategy that applies exponential backoff. + + Run the loop body until "break" stops the loop. Sleep at each loop + iteration, but not at the first iteration. The sleep delay is doubled at + each iteration (up to *max_delay* seconds). + + See busy_retry() documentation for the parameters usage. + + Example raising an exception after SHORT_TIMEOUT seconds: + + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if check(): + break + + Example of error=False usage: + + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): + if check(): + break + else: + raise RuntimeError('my custom error') + """ + + delay = init_delay + for _ in busy_retry(timeout, err_msg, error=error): + yield + + time.sleep(delay) + delay = min(delay * 2, max_delay) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 5d0ed9e..f20aae8 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -45,12 +45,11 @@ def _wait_for_interp_to_run(interp, timeout=None): # run subinterpreter eariler than the main thread in multiprocess. if timeout is None: timeout = support.SHORT_TIMEOUT - start_time = time.monotonic() - deadline = start_time + timeout - while not interpreters.is_running(interp): - if time.monotonic() > deadline: - raise RuntimeError('interp is not running') - time.sleep(0.010) + for _ in support.sleeping_retry(timeout, error=False): + if interpreters.is_running(interp): + break + else: + raise RuntimeError('interp is not running') @contextlib.contextmanager diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 6f3b460..c121653 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -256,12 +256,12 @@ class FailingInitializerMixin(ExecutorMixin): else: with self.assertRaises(BrokenExecutor): future.result() + # At some point, the executor should break - t1 = time.monotonic() - while not self.executor._broken: - if time.monotonic() - t1 > 5: - self.fail("executor not broken after 5 s.") - time.sleep(0.01) + for _ in support.sleeping_retry(5, "executor not broken"): + if self.executor._broken: + break + # ... and from this point submit() is guaranteed to fail with self.assertRaises(BrokenExecutor): self.executor.submit(get_init_status) diff --git a/Lib/test/test_multiprocessing_main_handling.py b/Lib/test/test_multiprocessing_main_handling.py index 510d8d3..35e9cd6 100644 --- a/Lib/test/test_multiprocessing_main_handling.py +++ b/Lib/test/test_multiprocessing_main_handling.py @@ -40,6 +40,7 @@ test_source = """\ import sys import time from multiprocessing import Pool, set_start_method +from test import support # We use this __main__ defined function in the map call below in order to # check that multiprocessing in correctly running the unguarded @@ -59,13 +60,11 @@ if __name__ == '__main__': results = [] with Pool(5) as pool: pool.map_async(f, [1, 2, 3], callback=results.extend) - start_time = time.monotonic() - while not results: - time.sleep(0.05) - # up to 1 min to report the results - dt = time.monotonic() - start_time - if dt > 60.0: - raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt) + + # up to 1 min to report the results + for _ in support.sleeping_retry(60, "Timed out waiting for results"): + if results: + break results.sort() print(start_method, "->", results) @@ -86,19 +85,17 @@ if __name__ != "__main__": import sys import time from multiprocessing import Pool, set_start_method +from test import support start_method = sys.argv[1] set_start_method(start_method) results = [] with Pool(5) as pool: pool.map_async(int, [1, 4, 9], callback=results.extend) - start_time = time.monotonic() - while not results: - time.sleep(0.05) - # up to 1 min to report the results - dt = time.monotonic() - start_time - if dt > 60.0: - raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt) + # up to 1 min to report the results + for _ in support.sleeping_retry(60, "Timed out waiting for results"): + if results: + break results.sort() print(start_method, "->", results) diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 6aa529b..a1d074a 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -812,13 +812,14 @@ class ItimerTest(unittest.TestCase): signal.signal(signal.SIGVTALRM, self.sig_vtalrm) signal.setitimer(self.itimer, 0.3, 0.2) - start_time = time.monotonic() - while time.monotonic() - start_time < 60.0: + for _ in support.busy_retry(60.0, error=False): # use up some virtual time by doing real work _ = pow(12345, 67890, 10000019) if signal.getitimer(self.itimer) == (0.0, 0.0): - break # sig_vtalrm handler stopped this itimer - else: # Issue 8424 + # sig_vtalrm handler stopped this itimer + break + else: + # bpo-8424 self.skipTest("timeout: likely cause: machine too slow or load too " "high") @@ -832,13 +833,14 @@ class ItimerTest(unittest.TestCase): signal.signal(signal.SIGPROF, self.sig_prof) signal.setitimer(self.itimer, 0.2, 0.2) - start_time = time.monotonic() - while time.monotonic() - start_time < 60.0: + for _ in support.busy_retry(60.0, error=False): # do some work _ = pow(12345, 67890, 10000019) if signal.getitimer(self.itimer) == (0.0, 0.0): - break # sig_prof handler stopped this itimer - else: # Issue 8424 + # sig_prof handler stopped this itimer + break + else: + # bpo-8424 self.skipTest("timeout: likely cause: machine too slow or load too " "high") @@ -1307,8 +1309,6 @@ class StressTest(unittest.TestCase): self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL expected_sigs = 0 - deadline = time.monotonic() + support.SHORT_TIMEOUT - while expected_sigs < N: # Hopefully the SIGALRM will be received somewhere during # initial processing of SIGUSR1. @@ -1317,8 +1317,9 @@ class StressTest(unittest.TestCase): expected_sigs += 2 # Wait for handlers to run to avoid signal coalescing - while len(sigs) < expected_sigs and time.monotonic() < deadline: - time.sleep(1e-5) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): + if len(sigs) >= expected_sigs: + break # All ITIMER_REAL signals should have been delivered to the # Python handler diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 6a66c16..3acafbd 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -2262,11 +2262,8 @@ class SimpleBackgroundTests(unittest.TestCase): # A simple IO loop. Call func(*args) depending on the error we get # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs. timeout = kwargs.get('timeout', support.SHORT_TIMEOUT) - deadline = time.monotonic() + timeout count = 0 - while True: - if time.monotonic() > deadline: - self.fail("timeout") + for _ in support.busy_retry(timeout): errno = None count += 1 try: diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 23bccee..7738ca5 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -9,7 +9,6 @@ import subprocess import sys import tempfile import textwrap -import time import unittest import warnings @@ -461,18 +460,12 @@ class TestSupport(unittest.TestCase): # child process: do nothing, just exit os._exit(0) - t0 = time.monotonic() - deadline = time.monotonic() + support.SHORT_TIMEOUT - was_altered = support.environment_altered try: support.environment_altered = False stderr = io.StringIO() - while True: - if time.monotonic() > deadline: - self.fail("timeout") - + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): with support.swap_attr(support.print_warning, 'orig_stderr', stderr): support.reap_children() @@ -481,9 +474,6 @@ class TestSupport(unittest.TestCase): if support.environment_altered: break - # loop until the child process completed - time.sleep(0.100) - msg = "Warning -- reap_children() reaped child process %s" % pid self.assertIn(msg, stderr.getvalue()) self.assertTrue(support.environment_altered) diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py index 4ec7690..15d66ae 100644 --- a/Lib/test/test_wait3.py +++ b/Lib/test/test_wait3.py @@ -4,7 +4,6 @@ import os import subprocess import sys -import time import unittest from test.fork_wait import ForkWait from test import support @@ -20,14 +19,12 @@ class Wait3Test(ForkWait): # This many iterations can be required, since some previously run # tests (e.g. test_ctypes) could have spawned a lot of children # very quickly. - deadline = time.monotonic() + support.SHORT_TIMEOUT - while time.monotonic() <= deadline: + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): # wait3() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait3(os.WNOHANG) if spid == cpid: break - time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py index 24f1aae..f66c0db 100644 --- a/Lib/test/test_wait4.py +++ b/Lib/test/test_wait4.py @@ -2,7 +2,6 @@ """ import os -import time import sys import unittest from test.fork_wait import ForkWait @@ -22,14 +21,12 @@ class Wait4Test(ForkWait): # Issue #11185: wait4 is broken on AIX and will always return 0 # with WNOHANG. option = 0 - deadline = time.monotonic() + support.SHORT_TIMEOUT - while time.monotonic() <= deadline: + for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): # wait4() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. spid, status, rusage = os.wait4(cpid, option) if spid == cpid: break - time.sleep(0.1) self.assertEqual(spid, cpid) self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) self.assertTrue(rusage) |