summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2022-06-15 09:42:10 (GMT)
committerGitHub <noreply@github.com>2022-06-15 09:42:10 (GMT)
commit7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16 (patch)
tree7bc4498a4c5ad027208a714ff9281d2f4639e301 /Lib
parent4e9fa71d7e8e2f322f0b81b315ddc921f57384c0 (diff)
downloadcpython-7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16.zip
cpython-7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16.tar.gz
cpython-7e9eaad864349d2cfd4c9ffc4453aba03b2cbc16.tar.bz2
Add test.support.busy_retry() (#93770)
Add busy_retry() and sleeping_retry() functions to test.support.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/_test_multiprocessing.py60
-rw-r--r--Lib/test/fork_wait.py6
-rw-r--r--Lib/test/support/__init__.py76
-rw-r--r--Lib/test/test__xxsubinterpreters.py11
-rw-r--r--Lib/test/test_concurrent_futures.py10
-rw-r--r--Lib/test/test_multiprocessing_main_handling.py25
-rw-r--r--Lib/test/test_signal.py25
-rw-r--r--Lib/test/test_ssl.py5
-rw-r--r--Lib/test/test_support.py12
-rw-r--r--Lib/test/test_wait3.py5
-rw-r--r--Lib/test/test_wait4.py5
11 files changed, 141 insertions, 99 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 4a588d9..dca5a19 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -4313,18 +4313,13 @@ class _TestSharedMemory(BaseTestCase):
p.terminate()
p.wait()
- deadline = time.monotonic() + support.LONG_TIMEOUT
- t = 0.1
- while time.monotonic() < deadline:
- time.sleep(t)
- t = min(t*2, 5)
+ err_msg = ("A SharedMemory segment was leaked after "
+ "a process was abruptly terminated")
+ for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg):
try:
smm = shared_memory.SharedMemory(name, create=False)
except FileNotFoundError:
break
- else:
- raise AssertionError("A SharedMemory segment was leaked after"
- " a process was abruptly terminated.")
if os.name == 'posix':
# Without this line it was raising warnings like:
@@ -5334,9 +5329,10 @@ class TestResourceTracker(unittest.TestCase):
p.terminate()
p.wait()
- deadline = time.monotonic() + support.LONG_TIMEOUT
- while time.monotonic() < deadline:
- time.sleep(.5)
+ err_msg = (f"A {rtype} resource was leaked after a process was "
+ f"abruptly terminated")
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
+ err_msg):
try:
_resource_unlink(name2, rtype)
except OSError as e:
@@ -5344,10 +5340,7 @@ class TestResourceTracker(unittest.TestCase):
# EINVAL
self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
break
- else:
- raise AssertionError(
- f"A {rtype} resource was leaked after a process was "
- f"abruptly terminated.")
+
err = p.stderr.read().decode('utf-8')
p.stderr.close()
expected = ('resource_tracker: There appear to be 2 leaked {} '
@@ -5575,18 +5568,17 @@ class TestSyncManagerTypes(unittest.TestCase):
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395).
join_process(self.proc)
+
start_time = time.monotonic()
- t = 0.01
- while len(multiprocessing.active_children()) > 1:
- time.sleep(t)
- t *= 2
- dt = time.monotonic() - start_time
- if dt >= 5.0:
- test.support.environment_altered = True
- support.print_warning(f"multiprocessing.Manager still has "
- f"{multiprocessing.active_children()} "
- f"active children after {dt} seconds")
+ for _ in support.sleeping_retry(5.0, error=False):
+ if len(multiprocessing.active_children()) <= 1:
break
+ else:
+ dt = time.monotonic() - start_time
+ support.environment_altered = True
+ support.print_warning(f"multiprocessing.Manager still has "
+ f"{multiprocessing.active_children()} "
+ f"active children after {dt:.1f} seconds")
def run_worker(self, worker, obj):
self.proc = multiprocessing.Process(target=worker, args=(obj, ))
@@ -5884,17 +5876,15 @@ class ManagerMixin(BaseMixin):
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395)
start_time = time.monotonic()
- t = 0.01
- while len(multiprocessing.active_children()) > 1:
- time.sleep(t)
- t *= 2
- dt = time.monotonic() - start_time
- if dt >= 5.0:
- test.support.environment_altered = True
- support.print_warning(f"multiprocessing.Manager still has "
- f"{multiprocessing.active_children()} "
- f"active children after {dt} seconds")
+ for _ in support.sleeping_retry(5.0, error=False):
+ if len(multiprocessing.active_children()) <= 1:
break
+ else:
+ dt = time.monotonic() - start_time
+ support.environment_altered = True
+ support.print_warning(f"multiprocessing.Manager still has "
+ f"{multiprocessing.active_children()} "
+ f"active children after {dt:.1f} seconds")
gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
diff --git a/Lib/test/fork_wait.py b/Lib/test/fork_wait.py
index 4d3dbd8..c565f59 100644
--- a/Lib/test/fork_wait.py
+++ b/Lib/test/fork_wait.py
@@ -54,10 +54,8 @@ class ForkWait(unittest.TestCase):
self.threads.append(thread)
# busy-loop to wait for threads
- deadline = time.monotonic() + support.SHORT_TIMEOUT
- while len(self.alive) < NUM_THREADS:
- time.sleep(0.1)
- if deadline < time.monotonic():
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+ if len(self.alive) >= NUM_THREADS:
break
a = sorted(self.alive.keys())
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 4baaeb7..a62e8b4 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -2250,3 +2250,79 @@ def late_deletion(obj):
pass
atfork_func.reference = ref_cycle
os.register_at_fork(before=atfork_func)
+
+
+def busy_retry(timeout, err_msg=None, /, *, error=True):
+ """
+ Run the loop body until "break" stops the loop.
+
+ After *timeout* seconds, raise an AssertionError if *error* is true,
+ or just stop if *error is false.
+
+ Example:
+
+ for _ in support.busy_retry(support.SHORT_TIMEOUT):
+ if check():
+ break
+
+ Example of error=False usage:
+
+ for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
+ if check():
+ break
+ else:
+ raise RuntimeError('my custom error')
+
+ """
+ if timeout <= 0:
+ raise ValueError("timeout must be greater than zero")
+
+ start_time = time.monotonic()
+ deadline = start_time + timeout
+
+ while True:
+ yield
+
+ if time.monotonic() >= deadline:
+ break
+
+ if error:
+ dt = time.monotonic() - start_time
+ msg = f"timeout ({dt:.1f} seconds)"
+ if err_msg:
+ msg = f"{msg}: {err_msg}"
+ raise AssertionError(msg)
+
+
+def sleeping_retry(timeout, err_msg=None, /,
+ *, init_delay=0.010, max_delay=1.0, error=True):
+ """
+ Wait strategy that applies exponential backoff.
+
+ Run the loop body until "break" stops the loop. Sleep at each loop
+ iteration, but not at the first iteration. The sleep delay is doubled at
+ each iteration (up to *max_delay* seconds).
+
+ See busy_retry() documentation for the parameters usage.
+
+ Example raising an exception after SHORT_TIMEOUT seconds:
+
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if check():
+ break
+
+ Example of error=False usage:
+
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+ if check():
+ break
+ else:
+ raise RuntimeError('my custom error')
+ """
+
+ delay = init_delay
+ for _ in busy_retry(timeout, err_msg, error=error):
+ yield
+
+ time.sleep(delay)
+ delay = min(delay * 2, max_delay)
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 5d0ed9e..f20aae8 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -45,12 +45,11 @@ def _wait_for_interp_to_run(interp, timeout=None):
# run subinterpreter eariler than the main thread in multiprocess.
if timeout is None:
timeout = support.SHORT_TIMEOUT
- start_time = time.monotonic()
- deadline = start_time + timeout
- while not interpreters.is_running(interp):
- if time.monotonic() > deadline:
- raise RuntimeError('interp is not running')
- time.sleep(0.010)
+ for _ in support.sleeping_retry(timeout, error=False):
+ if interpreters.is_running(interp):
+ break
+ else:
+ raise RuntimeError('interp is not running')
@contextlib.contextmanager
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 6f3b460..c121653 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -256,12 +256,12 @@ class FailingInitializerMixin(ExecutorMixin):
else:
with self.assertRaises(BrokenExecutor):
future.result()
+
# At some point, the executor should break
- t1 = time.monotonic()
- while not self.executor._broken:
- if time.monotonic() - t1 > 5:
- self.fail("executor not broken after 5 s.")
- time.sleep(0.01)
+ for _ in support.sleeping_retry(5, "executor not broken"):
+ if self.executor._broken:
+ break
+
# ... and from this point submit() is guaranteed to fail
with self.assertRaises(BrokenExecutor):
self.executor.submit(get_init_status)
diff --git a/Lib/test/test_multiprocessing_main_handling.py b/Lib/test/test_multiprocessing_main_handling.py
index 510d8d3..35e9cd6 100644
--- a/Lib/test/test_multiprocessing_main_handling.py
+++ b/Lib/test/test_multiprocessing_main_handling.py
@@ -40,6 +40,7 @@ test_source = """\
import sys
import time
from multiprocessing import Pool, set_start_method
+from test import support
# We use this __main__ defined function in the map call below in order to
# check that multiprocessing in correctly running the unguarded
@@ -59,13 +60,11 @@ if __name__ == '__main__':
results = []
with Pool(5) as pool:
pool.map_async(f, [1, 2, 3], callback=results.extend)
- start_time = time.monotonic()
- while not results:
- time.sleep(0.05)
- # up to 1 min to report the results
- dt = time.monotonic() - start_time
- if dt > 60.0:
- raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
+
+ # up to 1 min to report the results
+ for _ in support.sleeping_retry(60, "Timed out waiting for results"):
+ if results:
+ break
results.sort()
print(start_method, "->", results)
@@ -86,19 +85,17 @@ if __name__ != "__main__":
import sys
import time
from multiprocessing import Pool, set_start_method
+from test import support
start_method = sys.argv[1]
set_start_method(start_method)
results = []
with Pool(5) as pool:
pool.map_async(int, [1, 4, 9], callback=results.extend)
- start_time = time.monotonic()
- while not results:
- time.sleep(0.05)
- # up to 1 min to report the results
- dt = time.monotonic() - start_time
- if dt > 60.0:
- raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
+ # up to 1 min to report the results
+ for _ in support.sleeping_retry(60, "Timed out waiting for results"):
+ if results:
+ break
results.sort()
print(start_method, "->", results)
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 6aa529b..a1d074a 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -812,13 +812,14 @@ class ItimerTest(unittest.TestCase):
signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
signal.setitimer(self.itimer, 0.3, 0.2)
- start_time = time.monotonic()
- while time.monotonic() - start_time < 60.0:
+ for _ in support.busy_retry(60.0, error=False):
# use up some virtual time by doing real work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
- break # sig_vtalrm handler stopped this itimer
- else: # Issue 8424
+ # sig_vtalrm handler stopped this itimer
+ break
+ else:
+ # bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
@@ -832,13 +833,14 @@ class ItimerTest(unittest.TestCase):
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
- start_time = time.monotonic()
- while time.monotonic() - start_time < 60.0:
+ for _ in support.busy_retry(60.0, error=False):
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
- break # sig_prof handler stopped this itimer
- else: # Issue 8424
+ # sig_prof handler stopped this itimer
+ break
+ else:
+ # bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
@@ -1307,8 +1309,6 @@ class StressTest(unittest.TestCase):
self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
expected_sigs = 0
- deadline = time.monotonic() + support.SHORT_TIMEOUT
-
while expected_sigs < N:
# Hopefully the SIGALRM will be received somewhere during
# initial processing of SIGUSR1.
@@ -1317,8 +1317,9 @@ class StressTest(unittest.TestCase):
expected_sigs += 2
# Wait for handlers to run to avoid signal coalescing
- while len(sigs) < expected_sigs and time.monotonic() < deadline:
- time.sleep(1e-5)
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
+ if len(sigs) >= expected_sigs:
+ break
# All ITIMER_REAL signals should have been delivered to the
# Python handler
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 6a66c16..3acafbd 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -2262,11 +2262,8 @@ class SimpleBackgroundTests(unittest.TestCase):
# A simple IO loop. Call func(*args) depending on the error we get
# (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
timeout = kwargs.get('timeout', support.SHORT_TIMEOUT)
- deadline = time.monotonic() + timeout
count = 0
- while True:
- if time.monotonic() > deadline:
- self.fail("timeout")
+ for _ in support.busy_retry(timeout):
errno = None
count += 1
try:
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 23bccee..7738ca5 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -9,7 +9,6 @@ import subprocess
import sys
import tempfile
import textwrap
-import time
import unittest
import warnings
@@ -461,18 +460,12 @@ class TestSupport(unittest.TestCase):
# child process: do nothing, just exit
os._exit(0)
- t0 = time.monotonic()
- deadline = time.monotonic() + support.SHORT_TIMEOUT
-
was_altered = support.environment_altered
try:
support.environment_altered = False
stderr = io.StringIO()
- while True:
- if time.monotonic() > deadline:
- self.fail("timeout")
-
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
support.reap_children()
@@ -481,9 +474,6 @@ class TestSupport(unittest.TestCase):
if support.environment_altered:
break
- # loop until the child process completed
- time.sleep(0.100)
-
msg = "Warning -- reap_children() reaped child process %s" % pid
self.assertIn(msg, stderr.getvalue())
self.assertTrue(support.environment_altered)
diff --git a/Lib/test/test_wait3.py b/Lib/test/test_wait3.py
index 4ec7690..15d66ae 100644
--- a/Lib/test/test_wait3.py
+++ b/Lib/test/test_wait3.py
@@ -4,7 +4,6 @@
import os
import subprocess
import sys
-import time
import unittest
from test.fork_wait import ForkWait
from test import support
@@ -20,14 +19,12 @@ class Wait3Test(ForkWait):
# This many iterations can be required, since some previously run
# tests (e.g. test_ctypes) could have spawned a lot of children
# very quickly.
- deadline = time.monotonic() + support.SHORT_TIMEOUT
- while time.monotonic() <= deadline:
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
# wait3() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status, rusage = os.wait3(os.WNOHANG)
if spid == cpid:
break
- time.sleep(0.1)
self.assertEqual(spid, cpid)
self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
diff --git a/Lib/test/test_wait4.py b/Lib/test/test_wait4.py
index 24f1aae..f66c0db 100644
--- a/Lib/test/test_wait4.py
+++ b/Lib/test/test_wait4.py
@@ -2,7 +2,6 @@
"""
import os
-import time
import sys
import unittest
from test.fork_wait import ForkWait
@@ -22,14 +21,12 @@ class Wait4Test(ForkWait):
# Issue #11185: wait4 is broken on AIX and will always return 0
# with WNOHANG.
option = 0
- deadline = time.monotonic() + support.SHORT_TIMEOUT
- while time.monotonic() <= deadline:
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
# wait4() shouldn't hang, but some of the buildbots seem to hang
# in the forking tests. This is an attempt to fix the problem.
spid, status, rusage = os.wait4(cpid, option)
if spid == cpid:
break
- time.sleep(0.1)
self.assertEqual(spid, cpid)
self.assertEqual(os.waitstatus_to_exitcode(status), exitcode)
self.assertTrue(rusage)