summaryrefslogtreecommitdiffstats
path: root/Lib/test/support/threading_helper.py
diff options
context:
space:
mode:
authorHai Shi <shihai1992@gmail.com>2020-05-27 22:10:27 (GMT)
committerGitHub <noreply@github.com>2020-05-27 22:10:27 (GMT)
commite80697d687b610bd7fb9104af905dec8f0bc55a7 (patch)
treec848b98eaec2d959237fda725cf47b059f34b12a /Lib/test/support/threading_helper.py
parent7d80b35af1ee03834ae4af83e920dee89c2bc273 (diff)
downloadcpython-e80697d687b610bd7fb9104af905dec8f0bc55a7.zip
cpython-e80697d687b610bd7fb9104af905dec8f0bc55a7.tar.gz
cpython-e80697d687b610bd7fb9104af905dec8f0bc55a7.tar.bz2
bpo-40275: Adding threading_helper submodule in test.support (GH-20263)
Diffstat (limited to 'Lib/test/support/threading_helper.py')
-rw-r--r--Lib/test/support/threading_helper.py208
1 files changed, 208 insertions, 0 deletions
diff --git a/Lib/test/support/threading_helper.py b/Lib/test/support/threading_helper.py
new file mode 100644
index 0000000..96f7b3f
--- /dev/null
+++ b/Lib/test/support/threading_helper.py
@@ -0,0 +1,208 @@
+import contextlib
+import functools
+import _thread
+import threading
+import time
+
+from test import support
+
+
+#=======================================================================
+# Threading support to prevent reporting refleaks when running regrtest.py -R
+
+# NOTE: we use thread._count() rather than threading.enumerate() (or the
+# moral equivalent thereof) because a threading.Thread object is still alive
+# until its __bootstrap() method has returned, even after it has been
+# unregistered from the threading module.
+# thread._count(), on the other hand, only gets decremented *after* the
+# __bootstrap() method has returned, which gives us reliable reference counts
+# at the end of a test run.
+
+
+def threading_setup():
+ return _thread._count(), threading._dangling.copy()
+
+
+def threading_cleanup(*original_values):
+ _MAX_COUNT = 100
+
+ for count in range(_MAX_COUNT):
+ values = _thread._count(), threading._dangling
+ if values == original_values:
+ break
+
+ if not count:
+ # Display a warning at the first iteration
+ support.environment_altered = True
+ dangling_threads = values[1]
+ support.print_warning(f"threading_cleanup() failed to cleanup "
+ f"{values[0] - original_values[0]} threads "
+ f"(count: {values[0]}, "
+ f"dangling: {len(dangling_threads)})")
+ for thread in dangling_threads:
+ support.print_warning(f"Dangling thread: {thread!r}")
+
+ # Don't hold references to threads
+ dangling_threads = None
+ values = None
+
+ time.sleep(0.01)
+ gc_collect()
+
+
+def reap_threads(func):
+ """Use this function when threads are being used. This will
+ ensure that the threads are cleaned up even when the test fails.
+ """
+ @functools.wraps(func)
+ def decorator(*args):
+ key = threading_setup()
+ try:
+ return func(*args)
+ finally:
+ threading_cleanup(*key)
+ return decorator
+
+
+@contextlib.contextmanager
+def wait_threads_exit(timeout=None):
+ """
+ bpo-31234: Context manager to wait until all threads created in the with
+ statement exit.
+
+ Use _thread.count() to check if threads exited. Indirectly, wait until
+ threads exit the internal t_bootstrap() C function of the _thread module.
+
+ threading_setup() and threading_cleanup() are designed to emit a warning
+ if a test leaves running threads in the background. This context manager
+ is designed to cleanup threads started by the _thread.start_new_thread()
+ which doesn't allow to wait for thread exit, whereas thread.Thread has a
+ join() method.
+ """
+ if timeout is None:
+ timeout = support.SHORT_TIMEOUT
+ old_count = _thread._count()
+ try:
+ yield
+ finally:
+ start_time = time.monotonic()
+ deadline = start_time + timeout
+ while True:
+ count = _thread._count()
+ if count <= old_count:
+ break
+ if time.monotonic() > deadline:
+ dt = time.monotonic() - start_time
+ msg = (f"wait_threads() failed to cleanup {count - old_count} "
+ f"threads after {dt:.1f} seconds "
+ f"(count: {count}, old count: {old_count})")
+ raise AssertionError(msg)
+ time.sleep(0.010)
+ gc_collect()
+
+
+def join_thread(thread, timeout=None):
+ """Join a thread. Raise an AssertionError if the thread is still alive
+ after timeout seconds.
+ """
+ if timeout is None:
+ timeout = support.SHORT_TIMEOUT
+ thread.join(timeout)
+ if thread.is_alive():
+ msg = f"failed to join the thread in {timeout:.1f} seconds"
+ raise AssertionError(msg)
+
+
+@contextlib.contextmanager
+def start_threads(threads, unlock=None):
+ import faulthandler
+ threads = list(threads)
+ started = []
+ try:
+ try:
+ for t in threads:
+ t.start()
+ started.append(t)
+ except:
+ if verbose:
+ print("Can't start %d threads, only %d threads started" %
+ (len(threads), len(started)))
+ raise
+ yield
+ finally:
+ try:
+ if unlock:
+ unlock()
+ endtime = starttime = time.monotonic()
+ for timeout in range(1, 16):
+ endtime += 60
+ for t in started:
+ t.join(max(endtime - time.monotonic(), 0.01))
+ started = [t for t in started if t.is_alive()]
+ if not started:
+ break
+ if verbose:
+ print('Unable to join %d threads during a period of '
+ '%d minutes' % (len(started), timeout))
+ finally:
+ started = [t for t in started if t.is_alive()]
+ if started:
+ faulthandler.dump_traceback(sys.stdout)
+ raise AssertionError('Unable to join %d threads' % len(started))
+
+
+class catch_threading_exception:
+ """
+ Context manager catching threading.Thread exception using
+ threading.excepthook.
+
+ Attributes set when an exception is catched:
+
+ * exc_type
+ * exc_value
+ * exc_traceback
+ * thread
+
+ See threading.excepthook() documentation for these attributes.
+
+ These attributes are deleted at the context manager exit.
+
+ Usage:
+
+ with threading_helper.catch_threading_exception() as cm:
+ # code spawning a thread which raises an exception
+ ...
+
+ # check the thread exception, use cm attributes:
+ # exc_type, exc_value, exc_traceback, thread
+ ...
+
+ # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
+ # exists at this point
+ # (to avoid reference cycles)
+ """
+
+ def __init__(self):
+ self.exc_type = None
+ self.exc_value = None
+ self.exc_traceback = None
+ self.thread = None
+ self._old_hook = None
+
+ def _hook(self, args):
+ self.exc_type = args.exc_type
+ self.exc_value = args.exc_value
+ self.exc_traceback = args.exc_traceback
+ self.thread = args.thread
+
+ def __enter__(self):
+ self._old_hook = threading.excepthook
+ threading.excepthook = self._hook
+ return self
+
+ def __exit__(self, *exc_info):
+ threading.excepthook = self._old_hook
+ del self.exc_type
+ del self.exc_value
+ del self.exc_traceback
+ del self.thread