diff options
author | Antoine Pitrou <antoine@python.org> | 2023-11-04 13:59:24 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-04 13:59:24 (GMT) |
commit | 0e9c364f4ac18a2237bdbac702b96bcf8ef9cb09 (patch) | |
tree | 8febb8282c2c1ebd73a18205ec5b9229a99ac4fe /Lib/threading.py | |
parent | a28a3967ab9a189122f895d51d2551f7b3a273b0 (diff) | |
download | cpython-0e9c364f4ac18a2237bdbac702b96bcf8ef9cb09.zip cpython-0e9c364f4ac18a2237bdbac702b96bcf8ef9cb09.tar.gz cpython-0e9c364f4ac18a2237bdbac702b96bcf8ef9cb09.tar.bz2 |
GH-110829: Ensure Thread.join() joins the OS thread (#110848)
Joining a thread now ensures the underlying OS thread has exited. This is required for safer fork() in multi-threaded processes.
---------
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Diffstat (limited to 'Lib/threading.py')
-rw-r--r-- | Lib/threading.py | 63 |
1 files changed, 48 insertions, 15 deletions
diff --git a/Lib/threading.py b/Lib/threading.py index 41c3a9f..85aff58 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -5,6 +5,7 @@ import sys as _sys import _thread import functools import warnings +import _weakref from time import monotonic as _time from _weakrefset import WeakSet @@ -33,7 +34,7 @@ __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 'setprofile_all_threads','settrace_all_threads'] # Rename some stuff so "from threading import *" is safe -_start_new_thread = _thread.start_new_thread +_start_joinable_thread = _thread.start_joinable_thread _daemon_threads_allowed = _thread.daemon_threads_allowed _allocate_lock = _thread.allocate_lock _set_sentinel = _thread._set_sentinel @@ -589,7 +590,7 @@ class Event: return f"<{cls.__module__}.{cls.__qualname__} at {id(self):#x}: {status}>" def _at_fork_reinit(self): - # Private method called by Thread._reset_internal_locks() + # Private method called by Thread._after_fork() self._cond._at_fork_reinit() def is_set(self): @@ -924,6 +925,8 @@ class Thread: if _HAVE_THREAD_NATIVE_ID: self._native_id = None self._tstate_lock = None + self._join_lock = None + self._handle = None self._started = Event() self._is_stopped = False self._initialized = True @@ -933,22 +936,32 @@ class Thread: # For debugging and _after_fork() _dangling.add(self) - def _reset_internal_locks(self, is_alive): - # private! Called by _after_fork() to reset our internal locks as - # they may be in an invalid state leading to a deadlock or crash. + def _after_fork(self, new_ident=None): + # Private! Called by threading._after_fork(). self._started._at_fork_reinit() - if is_alive: + if new_ident is not None: + # This thread is alive. + self._ident = new_ident + if self._handle is not None: + self._handle.after_fork_alive() + assert self._handle.ident == new_ident # bpo-42350: If the fork happens when the thread is already stopped # (ex: after threading._shutdown() has been called), _tstate_lock # is None. Do nothing in this case. if self._tstate_lock is not None: self._tstate_lock._at_fork_reinit() self._tstate_lock.acquire() + if self._join_lock is not None: + self._join_lock._at_fork_reinit() else: - # The thread isn't alive after fork: it doesn't have a tstate + # This thread isn't alive after fork: it doesn't have a tstate # anymore. self._is_stopped = True self._tstate_lock = None + self._join_lock = None + if self._handle is not None: + self._handle.after_fork_dead() + self._handle = None def __repr__(self): assert self._initialized, "Thread.__init__() was not called" @@ -980,15 +993,18 @@ class Thread: if self._started.is_set(): raise RuntimeError("threads can only be started once") + self._join_lock = _allocate_lock() + with _active_limbo_lock: _limbo[self] = self try: - _start_new_thread(self._bootstrap, ()) + # Start joinable thread + self._handle = _start_joinable_thread(self._bootstrap) except Exception: with _active_limbo_lock: del _limbo[self] raise - self._started.wait() + self._started.wait() # Will set ident and native_id def run(self): """Method representing the thread's activity. @@ -1144,6 +1160,22 @@ class Thread: # historically .join(timeout=x) for x<0 has acted as if timeout=0 self._wait_for_tstate_lock(timeout=max(timeout, 0)) + if self._is_stopped: + self._join_os_thread() + + def _join_os_thread(self): + join_lock = self._join_lock + if join_lock is None: + return + with join_lock: + # Calling join() multiple times would raise an exception + # in one of the callers. + if self._handle is not None: + self._handle.join() + self._handle = None + # No need to keep this around + self._join_lock = None + def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. # At the end of the thread's life, after all knowledge of the thread @@ -1223,7 +1255,10 @@ class Thread: if self._is_stopped or not self._started.is_set(): return False self._wait_for_tstate_lock(False) - return not self._is_stopped + if not self._is_stopped: + return True + self._join_os_thread() + return False @property def daemon(self): @@ -1679,15 +1714,13 @@ def _after_fork(): # Any lock/condition variable may be currently locked or in an # invalid state, so we reinitialize them. if thread is current: - # There is only one active thread. We reset the ident to - # its new value since it can have changed. - thread._reset_internal_locks(True) + # This is the one and only active thread. ident = get_ident() - thread._ident = ident + thread._after_fork(new_ident=ident) new_active[ident] = thread else: # All the others are already stopped. - thread._reset_internal_locks(False) + thread._after_fork() thread._stop() _limbo.clear() |