summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Include/cpython/pystate.h26
-rw-r--r--Include/internal/pycore_lock.h10
-rw-r--r--Include/internal/pycore_pythread.h2
-rw-r--r--Lib/test/test_audit.py2
-rw-r--r--Lib/test/test_concurrent_futures/test_process_pool.py4
-rw-r--r--Lib/test/test_thread.py48
-rw-r--r--Lib/test/test_threading.py61
-rw-r--r--Lib/threading.py218
-rw-r--r--Misc/NEWS.d/next/Library/2024-02-01-03-09-38.gh-issue-114271.raCkt5.rst7
-rw-r--r--Modules/_threadmodule.c979
-rw-r--r--Python/lock.c24
-rw-r--r--Python/pystate.c25
12 files changed, 767 insertions, 639 deletions
diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h
index ac7ff83..38d0897 100644
--- a/Include/cpython/pystate.h
+++ b/Include/cpython/pystate.h
@@ -161,32 +161,6 @@ struct _ts {
*/
uintptr_t critical_section;
- /* Called when a thread state is deleted normally, but not when it
- * is destroyed after fork().
- * Pain: to prevent rare but fatal shutdown errors (issue 18808),
- * Thread.join() must wait for the join'ed thread's tstate to be unlinked
- * from the tstate chain. That happens at the end of a thread's life,
- * in pystate.c.
- * The obvious way doesn't quite work: create a lock which the tstate
- * unlinking code releases, and have Thread.join() wait to acquire that
- * lock. The problem is that we _are_ at the end of the thread's life:
- * if the thread holds the last reference to the lock, decref'ing the
- * lock will delete the lock, and that may trigger arbitrary Python code
- * if there's a weakref, with a callback, to the lock. But by this time
- * _PyRuntime.gilstate.tstate_current is already NULL, so only the simplest
- * of C code can be allowed to run (in particular it must not be possible to
- * release the GIL).
- * So instead of holding the lock directly, the tstate holds a weakref to
- * the lock: that's the value of on_delete_data below. Decref'ing a
- * weakref is harmless.
- * on_delete points to _threadmodule.c's static release_sentinel() function.
- * After the tstate is unlinked, release_sentinel is called with the
- * weakref-to-lock (on_delete_data) argument, and release_sentinel releases
- * the indirectly held lock.
- */
- void (*on_delete)(void *);
- void *on_delete_data;
-
int coroutine_origin_tracking_depth;
PyObject *async_gen_firstiter;
diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h
index 971b461..f993c95 100644
--- a/Include/internal/pycore_lock.h
+++ b/Include/internal/pycore_lock.h
@@ -153,16 +153,6 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt);
// and 0 if the timeout expired or thread was interrupted.
PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns);
-// A one-time event notification with reference counting.
-typedef struct _PyEventRc {
- PyEvent event;
- Py_ssize_t refcount;
-} _PyEventRc;
-
-_PyEventRc *_PyEventRc_New(void);
-void _PyEventRc_Incref(_PyEventRc *erc);
-void _PyEventRc_Decref(_PyEventRc *erc);
-
// _PyRawMutex implements a word-sized mutex that that does not depend on the
// parking lot API, and therefore can be used in the parking lot
// implementation.
diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h
index d2e7cc2..f032cb9 100644
--- a/Include/internal/pycore_pythread.h
+++ b/Include/internal/pycore_pythread.h
@@ -78,7 +78,7 @@ struct _pythread_runtime_state {
} stubs;
#endif
- // Linked list of ThreadHandleObjects
+ // Linked list of ThreadHandles
struct llist_node handles;
};
diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py
index cd0a4e2..c24c821 100644
--- a/Lib/test/test_audit.py
+++ b/Lib/test/test_audit.py
@@ -209,7 +209,7 @@ class AuditTest(unittest.TestCase):
expected = [
("_thread.start_new_thread", "(<test_func>, (), None)"),
("test.test_func", "()"),
- ("_thread.start_joinable_thread", "(<test_func>,)"),
+ ("_thread.start_joinable_thread", "(<test_func>, 1, None)"),
("test.test_func", "()"),
]
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py
index 7fc59a0..70444bb 100644
--- a/Lib/test/test_concurrent_futures/test_process_pool.py
+++ b/Lib/test/test_concurrent_futures/test_process_pool.py
@@ -201,13 +201,13 @@ class ProcessPoolExecutorTest(ExecutorTest):
# QueueFeederThread.
orig_start_new_thread = threading._start_joinable_thread
nthread = 0
- def mock_start_new_thread(func, *args):
+ def mock_start_new_thread(func, *args, **kwargs):
nonlocal nthread
if nthread >= 1:
raise RuntimeError("can't create new thread at "
"interpreter shutdown")
nthread += 1
- return orig_start_new_thread(func, *args)
+ return orig_start_new_thread(func, *args, **kwargs)
with support.swap_attr(threading, '_start_joinable_thread',
mock_start_new_thread):
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index 8323523..d94e042 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -289,6 +289,54 @@ class ThreadRunningTests(BasicThreadTest):
with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
raise error
+ def test_join_with_timeout(self):
+ lock = thread.allocate_lock()
+ lock.acquire()
+
+ def thr():
+ lock.acquire()
+
+ with threading_helper.wait_threads_exit():
+ handle = thread.start_joinable_thread(thr)
+ handle.join(0.1)
+ self.assertFalse(handle.is_done())
+ lock.release()
+ handle.join()
+ self.assertTrue(handle.is_done())
+
+ def test_join_unstarted(self):
+ handle = thread._ThreadHandle()
+ with self.assertRaisesRegex(RuntimeError, "thread not started"):
+ handle.join()
+
+ def test_set_done_unstarted(self):
+ handle = thread._ThreadHandle()
+ with self.assertRaisesRegex(RuntimeError, "thread not started"):
+ handle._set_done()
+
+ def test_start_duplicate_handle(self):
+ lock = thread.allocate_lock()
+ lock.acquire()
+
+ def func():
+ lock.acquire()
+
+ handle = thread._ThreadHandle()
+ with threading_helper.wait_threads_exit():
+ thread.start_joinable_thread(func, handle=handle)
+ with self.assertRaisesRegex(RuntimeError, "thread already started"):
+ thread.start_joinable_thread(func, handle=handle)
+ lock.release()
+ handle.join()
+
+ def test_start_with_none_handle(self):
+ def func():
+ pass
+
+ with threading_helper.wait_threads_exit():
+ handle = thread.start_joinable_thread(func, handle=None)
+ handle.join()
+
class Barrier:
def __init__(self, num_threads):
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 8868666..f1dc129 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -408,7 +408,7 @@ class ThreadTests(BaseTestCase):
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
- def fail_new_thread(*args):
+ def fail_new_thread(*args, **kwargs):
raise threading.ThreadError()
_start_joinable_thread = threading._start_joinable_thread
threading._start_joinable_thread = fail_new_thread
@@ -912,41 +912,6 @@ class ThreadTests(BaseTestCase):
rc, out, err = assert_python_ok("-c", code)
self.assertEqual(err, b"")
- def test_tstate_lock(self):
- # Test an implementation detail of Thread objects.
- started = _thread.allocate_lock()
- finish = _thread.allocate_lock()
- started.acquire()
- finish.acquire()
- def f():
- started.release()
- finish.acquire()
- time.sleep(0.01)
- # The tstate lock is None until the thread is started
- t = threading.Thread(target=f)
- self.assertIs(t._tstate_lock, None)
- t.start()
- started.acquire()
- self.assertTrue(t.is_alive())
- # The tstate lock can't be acquired when the thread is running
- # (or suspended).
- tstate_lock = t._tstate_lock
- self.assertFalse(tstate_lock.acquire(timeout=0), False)
- finish.release()
- # When the thread ends, the state_lock can be successfully
- # acquired.
- self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
- # But is_alive() is still True: we hold _tstate_lock now, which
- # prevents is_alive() from knowing the thread's end-of-life C code
- # is done.
- self.assertTrue(t.is_alive())
- # Let is_alive() find out the C code is done.
- tstate_lock.release()
- self.assertFalse(t.is_alive())
- # And verify the thread disposed of _tstate_lock.
- self.assertIsNone(t._tstate_lock)
- t.join()
-
def test_repr_stopped(self):
# Verify that "stopped" shows up in repr(Thread) appropriately.
started = _thread.allocate_lock()
@@ -1112,30 +1077,6 @@ class ThreadTests(BaseTestCase):
self.assertEqual(threading.getprofile(), old_profile)
self.assertEqual(sys.getprofile(), old_profile)
- @cpython_only
- def test_shutdown_locks(self):
- for daemon in (False, True):
- with self.subTest(daemon=daemon):
- event = threading.Event()
- thread = threading.Thread(target=event.wait, daemon=daemon)
-
- # Thread.start() must add lock to _shutdown_locks,
- # but only for non-daemon thread
- thread.start()
- tstate_lock = thread._tstate_lock
- if not daemon:
- self.assertIn(tstate_lock, threading._shutdown_locks)
- else:
- self.assertNotIn(tstate_lock, threading._shutdown_locks)
-
- # unblock the thread and join it
- event.set()
- thread.join()
-
- # Thread._stop() must remove tstate_lock from _shutdown_locks.
- # Daemon threads must never add it to _shutdown_locks.
- self.assertNotIn(tstate_lock, threading._shutdown_locks)
-
def test_locals_at_exit(self):
# bpo-19466: thread locals must not be deleted before destructors
# are called
diff --git a/Lib/threading.py b/Lib/threading.py
index ec89550..31ab77c 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -36,8 +36,11 @@ _start_joinable_thread = _thread.start_joinable_thread
_daemon_threads_allowed = _thread.daemon_threads_allowed
_allocate_lock = _thread.allocate_lock
_LockType = _thread.LockType
-_set_sentinel = _thread._set_sentinel
+_thread_shutdown = _thread._shutdown
+_make_thread_handle = _thread._make_thread_handle
+_ThreadHandle = _thread._ThreadHandle
get_ident = _thread.get_ident
+_get_main_thread_ident = _thread._get_main_thread_ident
_is_main_interpreter = _thread._is_main_interpreter
try:
get_native_id = _thread.get_native_id
@@ -847,25 +850,6 @@ _active = {} # maps thread id to Thread object
_limbo = {}
_dangling = WeakSet()
-# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
-# to wait until all Python thread states get deleted:
-# see Thread._set_tstate_lock().
-_shutdown_locks_lock = _allocate_lock()
-_shutdown_locks = set()
-
-def _maintain_shutdown_locks():
- """
- Drop any shutdown locks that don't correspond to running threads anymore.
-
- Calling this from time to time avoids an ever-growing _shutdown_locks
- set when Thread objects are not joined explicitly. See bpo-37788.
-
- This must be called with _shutdown_locks_lock acquired.
- """
- # If a lock was released, the corresponding thread has exited
- to_remove = [lock for lock in _shutdown_locks if not lock.locked()]
- _shutdown_locks.difference_update(to_remove)
-
# Main class for threads
@@ -930,10 +914,8 @@ class Thread:
self._ident = None
if _HAVE_THREAD_NATIVE_ID:
self._native_id = None
- self._tstate_lock = None
- self._handle = None
+ self._handle = _ThreadHandle()
self._started = Event()
- self._is_stopped = False
self._initialized = True
# Copy of sys.stderr used by self._invoke_excepthook()
self._stderr = _sys.stderr
@@ -947,28 +929,18 @@ class Thread:
if new_ident is not None:
# This thread is alive.
self._ident = new_ident
- if self._handle is not None:
- assert self._handle.ident == new_ident
- # bpo-42350: If the fork happens when the thread is already stopped
- # (ex: after threading._shutdown() has been called), _tstate_lock
- # is None. Do nothing in this case.
- if self._tstate_lock is not None:
- self._tstate_lock._at_fork_reinit()
- self._tstate_lock.acquire()
+ assert self._handle.ident == new_ident
else:
- # This thread isn't alive after fork: it doesn't have a tstate
- # anymore.
- self._is_stopped = True
- self._tstate_lock = None
- self._handle = None
+ # Otherwise, the thread is dead, Jim. _PyThread_AfterFork()
+ # already marked our handle done.
+ pass
def __repr__(self):
assert self._initialized, "Thread.__init__() was not called"
status = "initial"
if self._started.is_set():
status = "started"
- self.is_alive() # easy way to get ._is_stopped set when appropriate
- if self._is_stopped:
+ if self._handle.is_done():
status = "stopped"
if self._daemonic:
status += " daemon"
@@ -996,7 +968,8 @@ class Thread:
_limbo[self] = self
try:
# Start joinable thread
- self._handle = _start_joinable_thread(self._bootstrap)
+ _start_joinable_thread(self._bootstrap, handle=self._handle,
+ daemon=self.daemon)
except Exception:
with _active_limbo_lock:
del _limbo[self]
@@ -1047,23 +1020,9 @@ class Thread:
def _set_native_id(self):
self._native_id = get_native_id()
- def _set_tstate_lock(self):
- """
- Set a lock object which will be released by the interpreter when
- the underlying thread state (see pystate.h) gets deleted.
- """
- self._tstate_lock = _set_sentinel()
- self._tstate_lock.acquire()
-
- if not self.daemon:
- with _shutdown_locks_lock:
- _maintain_shutdown_locks()
- _shutdown_locks.add(self._tstate_lock)
-
def _bootstrap_inner(self):
try:
self._set_ident()
- self._set_tstate_lock()
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
self._started.set()
@@ -1083,33 +1042,6 @@ class Thread:
finally:
self._delete()
- def _stop(self):
- # After calling ._stop(), .is_alive() returns False and .join() returns
- # immediately. ._tstate_lock must be released before calling ._stop().
- #
- # Normal case: C code at the end of the thread's life
- # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
- # that's detected by our ._wait_for_tstate_lock(), called by .join()
- # and .is_alive(). Any number of threads _may_ call ._stop()
- # simultaneously (for example, if multiple threads are blocked in
- # .join() calls), and they're not serialized. That's harmless -
- # they'll just make redundant rebindings of ._is_stopped and
- # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the
- # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
- # (the assert is executed only if ._tstate_lock is None).
- #
- # Special case: _main_thread releases ._tstate_lock via this
- # module's _shutdown() function.
- lock = self._tstate_lock
- if lock is not None:
- assert not lock.locked()
- self._is_stopped = True
- self._tstate_lock = None
- if not self.daemon:
- with _shutdown_locks_lock:
- # Remove our lock and other released locks from _shutdown_locks
- _maintain_shutdown_locks()
-
def _delete(self):
"Remove current thread from the dict of currently running threads."
with _active_limbo_lock:
@@ -1150,47 +1082,12 @@ class Thread:
if self is current_thread():
raise RuntimeError("cannot join current thread")
- if timeout is None:
- self._wait_for_tstate_lock()
- else:
- # the behavior of a negative timeout isn't documented, but
- # historically .join(timeout=x) for x<0 has acted as if timeout=0
- self._wait_for_tstate_lock(timeout=max(timeout, 0))
-
- if self._is_stopped:
- self._join_os_thread()
-
- def _join_os_thread(self):
- # self._handle may be cleared post-fork
- if self._handle is not None:
- self._handle.join()
-
- def _wait_for_tstate_lock(self, block=True, timeout=-1):
- # Issue #18808: wait for the thread state to be gone.
- # At the end of the thread's life, after all knowledge of the thread
- # is removed from C data structures, C code releases our _tstate_lock.
- # This method passes its arguments to _tstate_lock.acquire().
- # If the lock is acquired, the C code is done, and self._stop() is
- # called. That sets ._is_stopped to True, and ._tstate_lock to None.
- lock = self._tstate_lock
- if lock is None:
- # already determined that the C code is done
- assert self._is_stopped
- return
+ # the behavior of a negative timeout isn't documented, but
+ # historically .join(timeout=x) for x<0 has acted as if timeout=0
+ if timeout is not None:
+ timeout = max(timeout, 0)
- try:
- if lock.acquire(block, timeout):
- lock.release()
- self._stop()
- except:
- if lock.locked():
- # bpo-45274: lock.acquire() acquired the lock, but the function
- # was interrupted with an exception before reaching the
- # lock.release(). It can happen if a signal handler raises an
- # exception, like CTRL+C which raises KeyboardInterrupt.
- lock.release()
- self._stop()
- raise
+ self._handle.join(timeout)
@property
def name(self):
@@ -1241,13 +1138,7 @@ class Thread:
"""
assert self._initialized, "Thread.__init__() not called"
- if self._is_stopped or not self._started.is_set():
- return False
- self._wait_for_tstate_lock(False)
- if not self._is_stopped:
- return True
- self._join_os_thread()
- return False
+ return self._started.is_set() and not self._handle.is_done()
@property
def daemon(self):
@@ -1456,18 +1347,14 @@ class _MainThread(Thread):
def __init__(self):
Thread.__init__(self, name="MainThread", daemon=False)
- self._set_tstate_lock()
self._started.set()
- self._set_ident()
+ self._ident = _get_main_thread_ident()
+ self._handle = _make_thread_handle(self._ident)
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
with _active_limbo_lock:
_active[self._ident] = self
- def _join_os_thread(self):
- # No ThreadHandle for main thread
- pass
-
# Helper thread-local instance to detect when a _DummyThread
# is collected. Not a part of the public API.
@@ -1510,17 +1397,15 @@ class _DummyThread(Thread):
daemon=_daemon_threads_allowed())
self._started.set()
self._set_ident()
+ self._handle = _make_thread_handle(self._ident)
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
with _active_limbo_lock:
_active[self._ident] = self
_DeleteDummyThreadOnDel(self)
- def _stop(self):
- pass
-
def is_alive(self):
- if not self._is_stopped and self._started.is_set():
+ if not self._handle.is_done() and self._started.is_set():
return True
raise RuntimeError("thread is not alive")
@@ -1532,7 +1417,6 @@ class _DummyThread(Thread):
self.__class__ = _MainThread
self._name = 'MainThread'
self._daemonic = False
- self._set_tstate_lock()
Thread._after_fork(self, new_ident=new_ident)
@@ -1631,12 +1515,11 @@ def _shutdown():
"""
Wait until the Python thread state of all non-daemon threads get deleted.
"""
- # Obscure: other threads may be waiting to join _main_thread. That's
- # dubious, but some code does it. We can't wait for C code to release
- # the main thread's tstate_lock - that won't happen until the interpreter
- # is nearly dead. So we release it here. Note that just calling _stop()
- # isn't enough: other threads may already be waiting on _tstate_lock.
- if _main_thread._is_stopped and _is_main_interpreter():
+ # Obscure: other threads may be waiting to join _main_thread. That's
+ # dubious, but some code does it. We can't wait for it to be marked as done
+ # normally - that won't happen until the interpreter is nearly dead. So
+ # mark it done here.
+ if _main_thread._handle.is_done() and _is_main_interpreter():
# _shutdown() was already called
return
@@ -1648,42 +1531,11 @@ def _shutdown():
for atexit_call in reversed(_threading_atexits):
atexit_call()
- # Main thread
- if _main_thread.ident == get_ident():
- tlock = _main_thread._tstate_lock
- # The main thread isn't finished yet, so its thread state lock can't
- # have been released.
- assert tlock is not None
- if tlock.locked():
- # It should have been released already by
- # _PyInterpreterState_SetNotRunningMain(), but there may be
- # embedders that aren't calling that yet.
- tlock.release()
- _main_thread._stop()
- else:
- # bpo-1596321: _shutdown() must be called in the main thread.
- # If the threading module was not imported by the main thread,
- # _main_thread is the thread which imported the threading module.
- # In this case, ignore _main_thread, similar behavior than for threads
- # spawned by C libraries or using _thread.start_new_thread().
- pass
-
- # Join all non-deamon threads
- while True:
- with _shutdown_locks_lock:
- locks = list(_shutdown_locks)
- _shutdown_locks.clear()
-
- if not locks:
- break
-
- for lock in locks:
- # mimic Thread.join()
- lock.acquire()
- lock.release()
-
- # new threads can be spawned while we were waiting for the other
- # threads to complete
+ if _is_main_interpreter():
+ _main_thread._handle._set_done()
+
+ # Wait for all non-daemon threads to exit.
+ _thread_shutdown()
def main_thread():
@@ -1703,7 +1555,6 @@ def _after_fork():
# Reset _active_limbo_lock, in case we forked while the lock was held
# by another (non-forked) thread. http://bugs.python.org/issue874900
global _active_limbo_lock, _main_thread
- global _shutdown_locks_lock, _shutdown_locks
_active_limbo_lock = RLock()
# fork() only copied the current thread; clear references to others.
@@ -1719,10 +1570,6 @@ def _after_fork():
_main_thread = current
- # reset _shutdown() locks: threads re-register their _tstate_lock below
- _shutdown_locks_lock = _allocate_lock()
- _shutdown_locks = set()
-
with _active_limbo_lock:
# Dangling thread instances must still have their locks reset,
# because someone may join() them.
@@ -1739,7 +1586,6 @@ def _after_fork():
else:
# All the others are already stopped.
thread._after_fork()
- thread._stop()
_limbo.clear()
_active.clear()
diff --git a/Misc/NEWS.d/next/Library/2024-02-01-03-09-38.gh-issue-114271.raCkt5.rst b/Misc/NEWS.d/next/Library/2024-02-01-03-09-38.gh-issue-114271.raCkt5.rst
new file mode 100644
index 0000000..2cd35ee
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2024-02-01-03-09-38.gh-issue-114271.raCkt5.rst
@@ -0,0 +1,7 @@
+Fix a race in ``threading.Thread.join()``.
+
+``threading._MainThread`` now always represents the main thread of the main
+interpreter.
+
+``PyThreadState.on_delete`` and ``PyThreadState.on_delete_data`` have been
+removed.
diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c
index cc5396a..6c84b7e 100644
--- a/Modules/_threadmodule.c
+++ b/Modules/_threadmodule.c
@@ -12,7 +12,6 @@
#include "pycore_time.h" // _PyTime_FromSeconds()
#include "pycore_weakref.h" // _PyWeakref_GET_REF()
-#include <stdbool.h>
#include <stddef.h> // offsetof()
#ifdef HAVE_SIGNAL_H
# include <signal.h> // SIGINT
@@ -21,7 +20,6 @@
// ThreadError is just an alias to PyExc_RuntimeError
#define ThreadError PyExc_RuntimeError
-
// Forward declarations
static struct PyModuleDef thread_module;
@@ -32,6 +30,10 @@ typedef struct {
PyTypeObject *local_type;
PyTypeObject *local_dummy_type;
PyTypeObject *thread_handle_type;
+
+ // Linked list of handles to all non-daemon threads created by the
+ // threading module. We wait for these to finish at shutdown.
+ struct llist_node shutdown_handles;
} thread_module_state;
static inline thread_module_state*
@@ -44,76 +46,148 @@ get_thread_state(PyObject *module)
// _ThreadHandle type
-// Handles transition from RUNNING to one of JOINED, DETACHED, or INVALID (post
-// fork).
+// Handles state transitions according to the following diagram:
+//
+// NOT_STARTED -> STARTING -> RUNNING -> DONE
+// | ^
+// | |
+// +----- error --------+
typedef enum {
- THREAD_HANDLE_RUNNING = 1,
- THREAD_HANDLE_JOINED = 2,
- THREAD_HANDLE_DETACHED = 3,
- THREAD_HANDLE_INVALID = 4,
+ THREAD_HANDLE_NOT_STARTED = 1,
+ THREAD_HANDLE_STARTING = 2,
+ THREAD_HANDLE_RUNNING = 3,
+ THREAD_HANDLE_DONE = 4,
} ThreadHandleState;
-// A handle around an OS thread.
+// A handle to wait for thread completion.
+//
+// This may be used to wait for threads that were spawned by the threading
+// module as well as for the "main" thread of the threading module. In the
+// former case an OS thread, identified by the `os_handle` field, will be
+// associated with the handle. The handle "owns" this thread and ensures that
+// the thread is either joined or detached after the handle is destroyed.
//
-// The OS thread is either joined or detached after the handle is destroyed.
+// Joining the handle is idempotent; the underlying OS thread, if any, is
+// joined or detached only once. Concurrent join operations are serialized
+// until it is their turn to execute or an earlier operation completes
+// successfully. Once a join has completed successfully all future joins
+// complete immediately.
//
-// Joining the handle is idempotent; the underlying OS thread is joined or
-// detached only once. Concurrent join operations are serialized until it is
-// their turn to execute or an earlier operation completes successfully. Once a
-// join has completed successfully all future joins complete immediately.
+// This must be separately reference counted because it may be destroyed
+// in `thread_run()` after the PyThreadState has been destroyed.
typedef struct {
- PyObject_HEAD
struct llist_node node; // linked list node (see _pythread_runtime_state)
- // The `ident` and `handle` fields are immutable once the object is visible
- // to threads other than its creator, thus they do not need to be accessed
- // atomically.
+ // linked list node (see thread_module_state)
+ struct llist_node shutdown_node;
+
+ // The `ident`, `os_handle`, `has_os_handle`, and `state` fields are
+ // protected by `mutex`.
PyThread_ident_t ident;
- PyThread_handle_t handle;
+ PyThread_handle_t os_handle;
+ int has_os_handle;
// Holds a value from the `ThreadHandleState` enum.
int state;
+ PyMutex mutex;
+
// Set immediately before `thread_run` returns to indicate that the OS
// thread is about to exit. This is used to avoid false positives when
// detecting self-join attempts. See the comment in `ThreadHandle_join()`
// for a more detailed explanation.
- _PyEventRc *thread_is_exiting;
+ PyEvent thread_is_exiting;
- // Serializes calls to `join`.
+ // Serializes calls to `join` and `set_done`.
_PyOnceFlag once;
-} ThreadHandleObject;
+
+ Py_ssize_t refcount;
+} ThreadHandle;
static inline int
-get_thread_handle_state(ThreadHandleObject *handle)
+get_thread_handle_state(ThreadHandle *handle)
{
- return _Py_atomic_load_int(&handle->state);
+ PyMutex_Lock(&handle->mutex);
+ int state = handle->state;
+ PyMutex_Unlock(&handle->mutex);
+ return state;
}
static inline void
-set_thread_handle_state(ThreadHandleObject *handle, ThreadHandleState state)
+set_thread_handle_state(ThreadHandle *handle, ThreadHandleState state)
{
- _Py_atomic_store_int(&handle->state, state);
+ PyMutex_Lock(&handle->mutex);
+ handle->state = state;
+ PyMutex_Unlock(&handle->mutex);
}
-static ThreadHandleObject*
-new_thread_handle(thread_module_state* state)
+static PyThread_ident_t
+ThreadHandle_ident(ThreadHandle *handle)
{
- _PyEventRc *event = _PyEventRc_New();
- if (event == NULL) {
- PyErr_NoMemory();
- return NULL;
+ PyMutex_Lock(&handle->mutex);
+ PyThread_ident_t ident = handle->ident;
+ PyMutex_Unlock(&handle->mutex);
+ return ident;
+}
+
+static int
+ThreadHandle_get_os_handle(ThreadHandle *handle, PyThread_handle_t *os_handle)
+{
+ PyMutex_Lock(&handle->mutex);
+ int has_os_handle = handle->has_os_handle;
+ if (has_os_handle) {
+ *os_handle = handle->os_handle;
+ }
+ PyMutex_Unlock(&handle->mutex);
+ return has_os_handle;
+}
+
+static void
+add_to_shutdown_handles(thread_module_state *state, ThreadHandle *handle)
+{
+ HEAD_LOCK(&_PyRuntime);
+ llist_insert_tail(&state->shutdown_handles, &handle->shutdown_node);
+ HEAD_UNLOCK(&_PyRuntime);
+}
+
+static void
+clear_shutdown_handles(thread_module_state *state)
+{
+ HEAD_LOCK(&_PyRuntime);
+ struct llist_node *node;
+ llist_for_each_safe(node, &state->shutdown_handles) {
+ llist_remove(node);
+ }
+ HEAD_UNLOCK(&_PyRuntime);
+}
+
+static void
+remove_from_shutdown_handles(ThreadHandle *handle)
+{
+ HEAD_LOCK(&_PyRuntime);
+ if (handle->shutdown_node.next != NULL) {
+ llist_remove(&handle->shutdown_node);
}
- ThreadHandleObject* self = PyObject_New(ThreadHandleObject, state->thread_handle_type);
+ HEAD_UNLOCK(&_PyRuntime);
+}
+
+static ThreadHandle *
+ThreadHandle_new(void)
+{
+ ThreadHandle *self =
+ (ThreadHandle *)PyMem_RawCalloc(1, sizeof(ThreadHandle));
if (self == NULL) {
- _PyEventRc_Decref(event);
+ PyErr_NoMemory();
return NULL;
}
self->ident = 0;
- self->handle = 0;
- self->thread_is_exiting = event;
+ self->os_handle = 0;
+ self->has_os_handle = 0;
+ self->thread_is_exiting = (PyEvent){0};
+ self->mutex = (PyMutex){_Py_UNLOCKED};
self->once = (_PyOnceFlag){0};
- self->state = THREAD_HANDLE_INVALID;
+ self->state = THREAD_HANDLE_NOT_STARTED;
+ self->refcount = 1;
HEAD_LOCK(&_PyRuntime);
llist_insert_tail(&_PyRuntime.threads.handles, &self->node);
@@ -123,9 +197,34 @@ new_thread_handle(thread_module_state* state)
}
static void
-ThreadHandle_dealloc(ThreadHandleObject *self)
+ThreadHandle_incref(ThreadHandle *self)
{
- PyObject *tp = (PyObject *) Py_TYPE(self);
+ _Py_atomic_add_ssize(&self->refcount, 1);
+}
+
+static int
+detach_thread(ThreadHandle *self)
+{
+ if (!self->has_os_handle) {
+ return 0;
+ }
+ // This is typically short so no need to release the GIL
+ if (PyThread_detach_thread(self->os_handle)) {
+ fprintf(stderr, "detach_thread: failed detaching thread\n");
+ return -1;
+ }
+ return 0;
+}
+
+// NB: This may be called after the PyThreadState in `thread_run` has been
+// deleted; it cannot call anything that relies on a valid PyThreadState
+// existing.
+static void
+ThreadHandle_decref(ThreadHandle *self)
+{
+ if (_Py_atomic_add_ssize(&self->refcount, -1) > 1) {
+ return;
+ }
// Remove ourself from the global list of handles
HEAD_LOCK(&_PyRuntime);
@@ -134,23 +233,17 @@ ThreadHandle_dealloc(ThreadHandleObject *self)
}
HEAD_UNLOCK(&_PyRuntime);
+ assert(self->shutdown_node.next == NULL);
+
// It's safe to access state non-atomically:
// 1. This is the destructor; nothing else holds a reference.
- // 2. The refcount going to zero is a "synchronizes-with" event;
- // all changes from other threads are visible.
- if (self->state == THREAD_HANDLE_RUNNING) {
- // This is typically short so no need to release the GIL
- if (PyThread_detach_thread(self->handle)) {
- PyErr_SetString(ThreadError, "Failed detaching thread");
- PyErr_WriteUnraisable(tp);
- }
- else {
- self->state = THREAD_HANDLE_DETACHED;
- }
+ // 2. The refcount going to zero is a "synchronizes-with" event; all
+ // changes from other threads are visible.
+ if (self->state == THREAD_HANDLE_RUNNING && !detach_thread(self)) {
+ self->state = THREAD_HANDLE_DONE;
}
- _PyEventRc_Decref(self->thread_is_exiting);
- PyObject_Free(self);
- Py_DECREF(tp);
+
+ PyMem_RawFree(self);
}
void
@@ -164,55 +257,229 @@ _PyThread_AfterFork(struct _pythread_runtime_state *state)
struct llist_node *node;
llist_for_each_safe(node, &state->handles) {
- ThreadHandleObject *hobj = llist_data(node, ThreadHandleObject, node);
- if (hobj->ident == current) {
+ ThreadHandle *handle = llist_data(node, ThreadHandle, node);
+ if (handle->ident == current) {
continue;
}
- // Disallow calls to join() as they could crash. We are the only
- // thread; it's safe to set this without an atomic.
- hobj->state = THREAD_HANDLE_INVALID;
+ // Mark all threads as done. Any attempts to join or detach the
+ // underlying OS thread (if any) could crash. We are the only thread;
+ // it's safe to set this non-atomically.
+ handle->state = THREAD_HANDLE_DONE;
+ handle->once = (_PyOnceFlag){_Py_ONCE_INITIALIZED};
+ handle->mutex = (PyMutex){_Py_UNLOCKED};
+ _PyEvent_Notify(&handle->thread_is_exiting);
llist_remove(node);
+ remove_from_shutdown_handles(handle);
}
}
-static PyObject *
-ThreadHandle_repr(ThreadHandleObject *self)
+// bootstate is used to "bootstrap" new threads. Any arguments needed by
+// `thread_run()`, which can only take a single argument due to platform
+// limitations, are contained in bootstate.
+struct bootstate {
+ PyThreadState *tstate;
+ PyObject *func;
+ PyObject *args;
+ PyObject *kwargs;
+ ThreadHandle *handle;
+ PyEvent handle_ready;
+};
+
+static void
+thread_bootstate_free(struct bootstate *boot, int decref)
{
- return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">",
- Py_TYPE(self)->tp_name, self->ident);
+ if (decref) {
+ Py_DECREF(boot->func);
+ Py_DECREF(boot->args);
+ Py_XDECREF(boot->kwargs);
+ }
+ ThreadHandle_decref(boot->handle);
+ PyMem_RawFree(boot);
}
-static PyObject *
-ThreadHandle_get_ident(ThreadHandleObject *self, void *ignored)
+static void
+thread_run(void *boot_raw)
{
- return PyLong_FromUnsignedLongLong(self->ident);
+ struct bootstate *boot = (struct bootstate *) boot_raw;
+ PyThreadState *tstate = boot->tstate;
+
+ // Wait until the handle is marked as running
+ PyEvent_Wait(&boot->handle_ready);
+
+ // `handle` needs to be manipulated after bootstate has been freed
+ ThreadHandle *handle = boot->handle;
+ ThreadHandle_incref(handle);
+
+ // gh-108987: If _thread.start_new_thread() is called before or while
+ // Python is being finalized, thread_run() can called *after*.
+ // _PyRuntimeState_SetFinalizing() is called. At this point, all Python
+ // threads must exit, except of the thread calling Py_Finalize() whch holds
+ // the GIL and must not exit.
+ //
+ // At this stage, tstate can be a dangling pointer (point to freed memory),
+ // it's ok to call _PyThreadState_MustExit() with a dangling pointer.
+ if (_PyThreadState_MustExit(tstate)) {
+ // Don't call PyThreadState_Clear() nor _PyThreadState_DeleteCurrent().
+ // These functions are called on tstate indirectly by Py_Finalize()
+ // which calls _PyInterpreterState_Clear().
+ //
+ // Py_DECREF() cannot be called because the GIL is not held: leak
+ // references on purpose. Python is being finalized anyway.
+ thread_bootstate_free(boot, 0);
+ goto exit;
+ }
+
+ _PyThreadState_Bind(tstate);
+ PyEval_AcquireThread(tstate);
+ _Py_atomic_add_ssize(&tstate->interp->threads.count, 1);
+
+ PyObject *res = PyObject_Call(boot->func, boot->args, boot->kwargs);
+ if (res == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_SystemExit))
+ /* SystemExit is ignored silently */
+ PyErr_Clear();
+ else {
+ PyErr_FormatUnraisable(
+ "Exception ignored in thread started by %R", boot->func);
+ }
+ }
+ else {
+ Py_DECREF(res);
+ }
+
+ thread_bootstate_free(boot, 1);
+
+ _Py_atomic_add_ssize(&tstate->interp->threads.count, -1);
+ PyThreadState_Clear(tstate);
+ _PyThreadState_DeleteCurrent(tstate);
+
+exit:
+ // Don't need to wait for this thread anymore
+ remove_from_shutdown_handles(handle);
+
+ _PyEvent_Notify(&handle->thread_is_exiting);
+ ThreadHandle_decref(handle);
+
+ // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with
+ // the glibc, pthread_exit() can abort the whole process if dlopen() fails
+ // to open the libgcc_s.so library (ex: EMFILE error).
+ return;
+}
+
+static int
+force_done(ThreadHandle *handle)
+{
+ assert(get_thread_handle_state(handle) == THREAD_HANDLE_STARTING);
+ _PyEvent_Notify(&handle->thread_is_exiting);
+ set_thread_handle_state(handle, THREAD_HANDLE_DONE);
+ return 0;
+}
+
+static int
+ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args,
+ PyObject *kwargs)
+{
+ // Mark the handle as starting to prevent any other threads from doing so
+ PyMutex_Lock(&self->mutex);
+ if (self->state != THREAD_HANDLE_NOT_STARTED) {
+ PyMutex_Unlock(&self->mutex);
+ PyErr_SetString(ThreadError, "thread already started");
+ return -1;
+ }
+ self->state = THREAD_HANDLE_STARTING;
+ PyMutex_Unlock(&self->mutex);
+
+ // Do all the heavy lifting outside of the mutex. All other operations on
+ // the handle should fail since the handle is in the starting state.
+
+ // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(),
+ // because it should be possible to call thread_bootstate_free()
+ // without holding the GIL.
+ struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate));
+ if (boot == NULL) {
+ PyErr_NoMemory();
+ goto start_failed;
+ }
+ PyInterpreterState *interp = _PyInterpreterState_GET();
+ boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING);
+ if (boot->tstate == NULL) {
+ PyMem_RawFree(boot);
+ if (!PyErr_Occurred()) {
+ PyErr_NoMemory();
+ }
+ goto start_failed;
+ }
+ boot->func = Py_NewRef(func);
+ boot->args = Py_NewRef(args);
+ boot->kwargs = Py_XNewRef(kwargs);
+ boot->handle = self;
+ ThreadHandle_incref(self);
+ boot->handle_ready = (PyEvent){0};
+
+ PyThread_ident_t ident;
+ PyThread_handle_t os_handle;
+ if (PyThread_start_joinable_thread(thread_run, boot, &ident, &os_handle)) {
+ PyThreadState_Clear(boot->tstate);
+ thread_bootstate_free(boot, 1);
+ PyErr_SetString(ThreadError, "can't start new thread");
+ goto start_failed;
+ }
+
+ // Mark the handle running
+ PyMutex_Lock(&self->mutex);
+ assert(self->state == THREAD_HANDLE_STARTING);
+ self->ident = ident;
+ self->has_os_handle = 1;
+ self->os_handle = os_handle;
+ self->state = THREAD_HANDLE_RUNNING;
+ PyMutex_Unlock(&self->mutex);
+
+ // Unblock the thread
+ _PyEvent_Notify(&boot->handle_ready);
+
+ return 0;
+
+start_failed:
+ _PyOnceFlag_CallOnce(&self->once, (_Py_once_fn_t *)force_done, self);
+ return -1;
}
static int
-join_thread(ThreadHandleObject *handle)
+join_thread(ThreadHandle *handle)
{
assert(get_thread_handle_state(handle) == THREAD_HANDLE_RUNNING);
+ PyThread_handle_t os_handle;
+ if (ThreadHandle_get_os_handle(handle, &os_handle)) {
+ int err = 0;
+ Py_BEGIN_ALLOW_THREADS
+ err = PyThread_join_thread(os_handle);
+ Py_END_ALLOW_THREADS
+ if (err) {
+ PyErr_SetString(ThreadError, "Failed joining thread");
+ return -1;
+ }
+ }
+ set_thread_handle_state(handle, THREAD_HANDLE_DONE);
+ return 0;
+}
- int err;
- Py_BEGIN_ALLOW_THREADS
- err = PyThread_join_thread(handle->handle);
- Py_END_ALLOW_THREADS
- if (err) {
- PyErr_SetString(ThreadError, "Failed joining thread");
+static int
+check_started(ThreadHandle *self)
+{
+ ThreadHandleState state = get_thread_handle_state(self);
+ if (state < THREAD_HANDLE_RUNNING) {
+ PyErr_SetString(ThreadError, "thread not started");
return -1;
}
- set_thread_handle_state(handle, THREAD_HANDLE_JOINED);
return 0;
}
-static PyObject *
-ThreadHandle_join(ThreadHandleObject *self, void* ignored)
+static int
+ThreadHandle_join(ThreadHandle *self, PyTime_t timeout_ns)
{
- if (get_thread_handle_state(self) == THREAD_HANDLE_INVALID) {
- PyErr_SetString(PyExc_ValueError,
- "the handle is invalid and thus cannot be joined");
- return NULL;
+ if (check_started(self) < 0) {
+ return -1;
}
// We want to perform this check outside of the `_PyOnceFlag` to prevent
@@ -225,45 +492,197 @@ ThreadHandle_join(ThreadHandleObject *self, void* ignored)
// To work around this, we set `thread_is_exiting` immediately before
// `thread_run` returns. We can be sure that we are not attempting to join
// ourselves if the handle's thread is about to exit.
- if (!_PyEvent_IsSet(&self->thread_is_exiting->event) &&
- self->ident == PyThread_get_thread_ident_ex()) {
+ if (!_PyEvent_IsSet(&self->thread_is_exiting) &&
+ ThreadHandle_ident(self) == PyThread_get_thread_ident_ex()) {
// PyThread_join_thread() would deadlock or error out.
PyErr_SetString(ThreadError, "Cannot join current thread");
- return NULL;
+ return -1;
+ }
+
+ // Wait until the deadline for the thread to exit.
+ PyTime_t deadline = timeout_ns != -1 ? _PyDeadline_Init(timeout_ns) : 0;
+ while (!PyEvent_WaitTimed(&self->thread_is_exiting, timeout_ns)) {
+ if (deadline) {
+ // _PyDeadline_Get will return a negative value if the deadline has
+ // been exceeded.
+ timeout_ns = Py_MAX(_PyDeadline_Get(deadline), 0);
+ }
+
+ if (timeout_ns) {
+ // Interrupted
+ if (Py_MakePendingCalls() < 0) {
+ return -1;
+ }
+ }
+ else {
+ // Timed out
+ return 0;
+ }
}
if (_PyOnceFlag_CallOnce(&self->once, (_Py_once_fn_t *)join_thread,
self) == -1) {
+ return -1;
+ }
+ assert(get_thread_handle_state(self) == THREAD_HANDLE_DONE);
+ return 0;
+}
+
+static int
+set_done(ThreadHandle *handle)
+{
+ assert(get_thread_handle_state(handle) == THREAD_HANDLE_RUNNING);
+ if (detach_thread(handle) < 0) {
+ PyErr_SetString(ThreadError, "failed detaching handle");
+ return -1;
+ }
+ _PyEvent_Notify(&handle->thread_is_exiting);
+ set_thread_handle_state(handle, THREAD_HANDLE_DONE);
+ return 0;
+}
+
+static int
+ThreadHandle_set_done(ThreadHandle *self)
+{
+ if (check_started(self) < 0) {
+ return -1;
+ }
+
+ if (_PyOnceFlag_CallOnce(&self->once, (_Py_once_fn_t *)set_done, self) ==
+ -1) {
+ return -1;
+ }
+ assert(get_thread_handle_state(self) == THREAD_HANDLE_DONE);
+ return 0;
+}
+
+// A wrapper around a ThreadHandle.
+typedef struct {
+ PyObject_HEAD
+
+ ThreadHandle *handle;
+} PyThreadHandleObject;
+
+static PyThreadHandleObject *
+PyThreadHandleObject_new(PyTypeObject *type)
+{
+ ThreadHandle *handle = ThreadHandle_new();
+ if (handle == NULL) {
+ return NULL;
+ }
+
+ PyThreadHandleObject *self =
+ (PyThreadHandleObject *)type->tp_alloc(type, 0);
+ if (self == NULL) {
+ ThreadHandle_decref(handle);
+ return NULL;
+ }
+
+ self->handle = handle;
+
+ return self;
+}
+
+static PyObject *
+PyThreadHandleObject_tp_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ return (PyObject *)PyThreadHandleObject_new(type);
+}
+
+static void
+PyThreadHandleObject_dealloc(PyThreadHandleObject *self)
+{
+ PyObject *tp = (PyObject *) Py_TYPE(self);
+ ThreadHandle_decref(self->handle);
+ PyObject_Free(self);
+ Py_DECREF(tp);
+}
+
+static PyObject *
+PyThreadHandleObject_repr(PyThreadHandleObject *self)
+{
+ PyThread_ident_t ident = ThreadHandle_ident(self->handle);
+ return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">",
+ Py_TYPE(self)->tp_name, ident);
+}
+
+static PyObject *
+PyThreadHandleObject_get_ident(PyThreadHandleObject *self,
+ PyObject *Py_UNUSED(ignored))
+{
+ return PyLong_FromUnsignedLongLong(ThreadHandle_ident(self->handle));
+}
+
+static PyObject *
+PyThreadHandleObject_join(PyThreadHandleObject *self, PyObject *args)
+{
+ PyObject *timeout_obj = NULL;
+ if (!PyArg_ParseTuple(args, "|O:join", &timeout_obj)) {
+ return NULL;
+ }
+
+ PyTime_t timeout_ns = -1;
+ if (timeout_obj != NULL && timeout_obj != Py_None) {
+ if (_PyTime_FromSecondsObject(&timeout_ns, timeout_obj,
+ _PyTime_ROUND_TIMEOUT) < 0) {
+ return NULL;
+ }
+ }
+
+ if (ThreadHandle_join(self->handle, timeout_ns) < 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+PyThreadHandleObject_is_done(PyThreadHandleObject *self,
+ PyObject *Py_UNUSED(ignored))
+{
+ if (_PyEvent_IsSet(&self->handle->thread_is_exiting)) {
+ Py_RETURN_TRUE;
+ }
+ else {
+ Py_RETURN_FALSE;
+ }
+}
+
+static PyObject *
+PyThreadHandleObject_set_done(PyThreadHandleObject *self,
+ PyObject *Py_UNUSED(ignored))
+{
+ if (ThreadHandle_set_done(self->handle) < 0) {
return NULL;
}
- assert(get_thread_handle_state(self) == THREAD_HANDLE_JOINED);
Py_RETURN_NONE;
}
static PyGetSetDef ThreadHandle_getsetlist[] = {
- {"ident", (getter)ThreadHandle_get_ident, NULL, NULL},
+ {"ident", (getter)PyThreadHandleObject_get_ident, NULL, NULL},
{0},
};
-static PyMethodDef ThreadHandle_methods[] =
-{
- {"join", (PyCFunction)ThreadHandle_join, METH_NOARGS},
+static PyMethodDef ThreadHandle_methods[] = {
+ {"join", (PyCFunction)PyThreadHandleObject_join, METH_VARARGS, NULL},
+ {"_set_done", (PyCFunction)PyThreadHandleObject_set_done, METH_NOARGS, NULL},
+ {"is_done", (PyCFunction)PyThreadHandleObject_is_done, METH_NOARGS, NULL},
{0, 0}
};
static PyType_Slot ThreadHandle_Type_slots[] = {
- {Py_tp_dealloc, (destructor)ThreadHandle_dealloc},
- {Py_tp_repr, (reprfunc)ThreadHandle_repr},
+ {Py_tp_dealloc, (destructor)PyThreadHandleObject_dealloc},
+ {Py_tp_repr, (reprfunc)PyThreadHandleObject_repr},
{Py_tp_getset, ThreadHandle_getsetlist},
{Py_tp_methods, ThreadHandle_methods},
+ {Py_tp_new, PyThreadHandleObject_tp_new},
{0, 0}
};
static PyType_Spec ThreadHandle_Type_spec = {
"_thread._ThreadHandle",
- sizeof(ThreadHandleObject),
+ sizeof(PyThreadHandleObject),
0,
- Py_TPFLAGS_DEFAULT | Py_TPFLAGS_DISALLOW_INSTANTIATION,
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_IMMUTABLETYPE,
ThreadHandle_Type_slots,
};
@@ -1272,98 +1691,6 @@ _localdummy_destroyed(PyObject *localweakref, PyObject *dummyweakref)
/* Module functions */
-// bootstate is used to "bootstrap" new threads. Any arguments needed by
-// `thread_run()`, which can only take a single argument due to platform
-// limitations, are contained in bootstate.
-struct bootstate {
- PyThreadState *tstate;
- PyObject *func;
- PyObject *args;
- PyObject *kwargs;
- _PyEventRc *thread_is_exiting;
-};
-
-
-static void
-thread_bootstate_free(struct bootstate *boot, int decref)
-{
- if (decref) {
- Py_DECREF(boot->func);
- Py_DECREF(boot->args);
- Py_XDECREF(boot->kwargs);
- }
- if (boot->thread_is_exiting != NULL) {
- _PyEventRc_Decref(boot->thread_is_exiting);
- }
- PyMem_RawFree(boot);
-}
-
-
-static void
-thread_run(void *boot_raw)
-{
- struct bootstate *boot = (struct bootstate *) boot_raw;
- PyThreadState *tstate = boot->tstate;
-
- // `thread_is_exiting` needs to be set after bootstate has been freed
- _PyEventRc *thread_is_exiting = boot->thread_is_exiting;
- boot->thread_is_exiting = NULL;
-
- // gh-108987: If _thread.start_new_thread() is called before or while
- // Python is being finalized, thread_run() can called *after*.
- // _PyRuntimeState_SetFinalizing() is called. At this point, all Python
- // threads must exit, except of the thread calling Py_Finalize() whch holds
- // the GIL and must not exit.
- //
- // At this stage, tstate can be a dangling pointer (point to freed memory),
- // it's ok to call _PyThreadState_MustExit() with a dangling pointer.
- if (_PyThreadState_MustExit(tstate)) {
- // Don't call PyThreadState_Clear() nor _PyThreadState_DeleteCurrent().
- // These functions are called on tstate indirectly by Py_Finalize()
- // which calls _PyInterpreterState_Clear().
- //
- // Py_DECREF() cannot be called because the GIL is not held: leak
- // references on purpose. Python is being finalized anyway.
- thread_bootstate_free(boot, 0);
- goto exit;
- }
-
- _PyThreadState_Bind(tstate);
- PyEval_AcquireThread(tstate);
- _Py_atomic_add_ssize(&tstate->interp->threads.count, 1);
-
- PyObject *res = PyObject_Call(boot->func, boot->args, boot->kwargs);
- if (res == NULL) {
- if (PyErr_ExceptionMatches(PyExc_SystemExit))
- /* SystemExit is ignored silently */
- PyErr_Clear();
- else {
- PyErr_FormatUnraisable(
- "Exception ignored in thread started by %R", boot->func);
- }
- }
- else {
- Py_DECREF(res);
- }
-
- thread_bootstate_free(boot, 1);
-
- _Py_atomic_add_ssize(&tstate->interp->threads.count, -1);
- PyThreadState_Clear(tstate);
- _PyThreadState_DeleteCurrent(tstate);
-
-exit:
- if (thread_is_exiting != NULL) {
- _PyEvent_Notify(&thread_is_exiting->event);
- _PyEventRc_Decref(thread_is_exiting);
- }
-
- // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with
- // the glibc, pthread_exit() can abort the whole process if dlopen() fails
- // to open the libgcc_s.so library (ex: EMFILE error).
- return;
-}
-
static PyObject *
thread_daemon_threads_allowed(PyObject *module, PyObject *Py_UNUSED(ignored))
{
@@ -1383,11 +1710,8 @@ Return True if daemon threads are allowed in the current interpreter,\n\
and False otherwise.\n");
static int
-do_start_new_thread(thread_module_state* state,
- PyObject *func, PyObject* args, PyObject* kwargs,
- int joinable,
- PyThread_ident_t* ident, PyThread_handle_t* handle,
- _PyEventRc *thread_is_exiting)
+do_start_new_thread(thread_module_state *state, PyObject *func, PyObject *args,
+ PyObject *kwargs, ThreadHandle *handle, int daemon)
{
PyInterpreterState *interp = _PyInterpreterState_GET();
if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) {
@@ -1401,44 +1725,20 @@ do_start_new_thread(thread_module_state* state,
return -1;
}
- // gh-109795: Use PyMem_RawMalloc() instead of PyMem_Malloc(),
- // because it should be possible to call thread_bootstate_free()
- // without holding the GIL.
- struct bootstate *boot = PyMem_RawMalloc(sizeof(struct bootstate));
- if (boot == NULL) {
- PyErr_NoMemory();
- return -1;
+ if (!daemon) {
+ // Add the handle before starting the thread to avoid adding a handle
+ // to a thread that has already finished (i.e. if the thread finishes
+ // before the call to `ThreadHandle_start()` below returns).
+ add_to_shutdown_handles(state, handle);
}
- boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING);
- if (boot->tstate == NULL) {
- PyMem_RawFree(boot);
- if (!PyErr_Occurred()) {
- PyErr_NoMemory();
+
+ if (ThreadHandle_start(handle, func, args, kwargs) < 0) {
+ if (!daemon) {
+ remove_from_shutdown_handles(handle);
}
return -1;
}
- boot->func = Py_NewRef(func);
- boot->args = Py_NewRef(args);
- boot->kwargs = Py_XNewRef(kwargs);
- boot->thread_is_exiting = thread_is_exiting;
- if (thread_is_exiting != NULL) {
- _PyEventRc_Incref(thread_is_exiting);
- }
- int err;
- if (joinable) {
- err = PyThread_start_joinable_thread(thread_run, (void*) boot, ident, handle);
- } else {
- *handle = 0;
- *ident = PyThread_start_new_thread(thread_run, (void*) boot);
- err = (*ident == PYTHREAD_INVALID_THREAD_ID);
- }
- if (err) {
- PyErr_SetString(ThreadError, "can't start new thread");
- PyThreadState_Clear(boot->tstate);
- thread_bootstate_free(boot, 1);
- return -1;
- }
return 0;
}
@@ -1472,12 +1772,19 @@ thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs)
return NULL;
}
- PyThread_ident_t ident = 0;
- PyThread_handle_t handle;
- if (do_start_new_thread(state, func, args, kwargs, /*joinable=*/ 0,
- &ident, &handle, NULL)) {
+ ThreadHandle *handle = ThreadHandle_new();
+ if (handle == NULL) {
+ return NULL;
+ }
+
+ int st =
+ do_start_new_thread(state, func, args, kwargs, handle, /*daemon=*/1);
+ if (st < 0) {
+ ThreadHandle_decref(handle);
return NULL;
}
+ PyThread_ident_t ident = ThreadHandle_ident(handle);
+ ThreadHandle_decref(handle);
return PyLong_FromUnsignedLongLong(ident);
}
@@ -1495,9 +1802,19 @@ unhandled exception; a stack trace will be printed unless the exception\n\
is SystemExit.\n");
static PyObject *
-thread_PyThread_start_joinable_thread(PyObject *module, PyObject *func)
+thread_PyThread_start_joinable_thread(PyObject *module, PyObject *fargs,
+ PyObject *fkwargs)
{
+ static char *keywords[] = {"function", "handle", "daemon", NULL};
+ PyObject *func = NULL;
+ int daemon = 1;
thread_module_state *state = get_thread_state(module);
+ PyObject *hobj = NULL;
+ if (!PyArg_ParseTupleAndKeywords(fargs, fkwargs,
+ "O|Op:start_joinable_thread", keywords,
+ &func, &hobj, &daemon)) {
+ return NULL;
+ }
if (!PyCallable_Check(func)) {
PyErr_SetString(PyExc_TypeError,
@@ -1505,32 +1822,45 @@ thread_PyThread_start_joinable_thread(PyObject *module, PyObject *func)
return NULL;
}
- if (PySys_Audit("_thread.start_joinable_thread", "O", func) < 0) {
+ if (hobj == NULL) {
+ hobj = Py_None;
+ }
+ else if (hobj != Py_None && !Py_IS_TYPE(hobj, state->thread_handle_type)) {
+ PyErr_SetString(PyExc_TypeError, "'handle' must be a _ThreadHandle");
+ return NULL;
+ }
+
+ if (PySys_Audit("_thread.start_joinable_thread", "OiO", func, daemon,
+ hobj) < 0) {
return NULL;
}
+ if (hobj == Py_None) {
+ hobj = (PyObject *)PyThreadHandleObject_new(state->thread_handle_type);
+ if (hobj == NULL) {
+ return NULL;
+ }
+ }
+ else {
+ Py_INCREF(hobj);
+ }
+
PyObject* args = PyTuple_New(0);
if (args == NULL) {
return NULL;
}
- ThreadHandleObject* hobj = new_thread_handle(state);
- if (hobj == NULL) {
- Py_DECREF(args);
- return NULL;
- }
- if (do_start_new_thread(state, func, args, /*kwargs=*/ NULL, /*joinable=*/ 1,
- &hobj->ident, &hobj->handle, hobj->thread_is_exiting)) {
- Py_DECREF(args);
+ int st = do_start_new_thread(state, func, args,
+ /*kwargs=*/ NULL, ((PyThreadHandleObject*)hobj)->handle, daemon);
+ Py_DECREF(args);
+ if (st < 0) {
Py_DECREF(hobj);
return NULL;
}
- set_thread_handle_state(hobj, THREAD_HANDLE_RUNNING);
- Py_DECREF(args);
- return (PyObject*) hobj;
+ return (PyObject *) hobj;
}
PyDoc_STRVAR(start_joinable_doc,
-"start_joinable_thread(function)\n\
+"start_joinable_thread(function[, daemon=True[, handle=None]])\n\
\n\
*For internal use only*: start a new thread.\n\
\n\
@@ -1538,7 +1868,9 @@ Like start_new_thread(), this starts a new thread calling the given function.\n\
Unlike start_new_thread(), this returns a handle object with methods to join\n\
or detach the given thread.\n\
This function is not for third-party code, please use the\n\
-`threading` module instead.\n");
+`threading` module instead. During finalization the runtime will not wait for\n\
+the thread to exit if daemon is True. If handle is provided it must be a\n\
+newly created thread._ThreadHandle instance.");
static PyObject *
thread_PyThread_exit_thread(PyObject *self, PyObject *Py_UNUSED(ignored))
@@ -1650,67 +1982,6 @@ yet finished.\n\
This function is meant for internal and specialized purposes only.\n\
In most applications `threading.enumerate()` should be used instead.");
-static void
-release_sentinel(void *weakref_raw)
-{
- PyObject *weakref = _PyObject_CAST(weakref_raw);
-
- /* Tricky: this function is called when the current thread state
- is being deleted. Therefore, only simple C code can safely
- execute here. */
- lockobject *lock = (lockobject *)_PyWeakref_GET_REF(weakref);
- if (lock != NULL) {
- if (lock->locked) {
- lock->locked = 0;
- PyThread_release_lock(lock->lock_lock);
- }
- Py_DECREF(lock);
- }
-
- /* Deallocating a weakref with a NULL callback only calls
- PyObject_GC_Del(), which can't call any Python code. */
- Py_DECREF(weakref);
-}
-
-static PyObject *
-thread__set_sentinel(PyObject *module, PyObject *Py_UNUSED(ignored))
-{
- PyObject *wr;
- PyThreadState *tstate = _PyThreadState_GET();
- lockobject *lock;
-
- if (tstate->on_delete_data != NULL) {
- /* We must support the re-creation of the lock from a
- fork()ed child. */
- assert(tstate->on_delete == &release_sentinel);
- wr = (PyObject *) tstate->on_delete_data;
- tstate->on_delete = NULL;
- tstate->on_delete_data = NULL;
- Py_DECREF(wr);
- }
- lock = newlockobject(module);
- if (lock == NULL)
- return NULL;
- /* The lock is owned by whoever called _set_sentinel(), but the weakref
- hangs to the thread state. */
- wr = PyWeakref_NewRef((PyObject *) lock, NULL);
- if (wr == NULL) {
- Py_DECREF(lock);
- return NULL;
- }
- tstate->on_delete_data = (void *) wr;
- tstate->on_delete = &release_sentinel;
- return (PyObject *) lock;
-}
-
-PyDoc_STRVAR(_set_sentinel_doc,
-"_set_sentinel() -> lock\n\
-\n\
-Set a sentinel lock that will be released when the current thread\n\
-state is finalized (after it is untied from the interpreter).\n\
-\n\
-This is a private API for the threading module.");
-
static PyObject *
thread_stack_size(PyObject *self, PyObject *args)
{
@@ -1917,13 +2188,101 @@ PyDoc_STRVAR(thread__is_main_interpreter_doc,
\n\
Return True if the current interpreter is the main Python interpreter.");
+static PyObject *
+thread_shutdown(PyObject *self, PyObject *args)
+{
+ PyThread_ident_t ident = PyThread_get_thread_ident_ex();
+ thread_module_state *state = get_thread_state(self);
+
+ for (;;) {
+ ThreadHandle *handle = NULL;
+
+ // Find a thread that's not yet finished.
+ HEAD_LOCK(&_PyRuntime);
+ struct llist_node *node;
+ llist_for_each_safe(node, &state->shutdown_handles) {
+ ThreadHandle *cur = llist_data(node, ThreadHandle, shutdown_node);
+ if (cur->ident != ident) {
+ ThreadHandle_incref(cur);
+ handle = cur;
+ break;
+ }
+ }
+ HEAD_UNLOCK(&_PyRuntime);
+
+ if (!handle) {
+ // No more threads to wait on!
+ break;
+ }
+
+ // Wait for the thread to finish. If we're interrupted, such
+ // as by a ctrl-c we print the error and exit early.
+ if (ThreadHandle_join(handle, -1) < 0) {
+ PyErr_WriteUnraisable(NULL);
+ ThreadHandle_decref(handle);
+ Py_RETURN_NONE;
+ }
+
+ ThreadHandle_decref(handle);
+ }
+
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(shutdown_doc,
+"_shutdown()\n\
+\n\
+Wait for all non-daemon threads (other than the calling thread) to stop.");
+
+static PyObject *
+thread__make_thread_handle(PyObject *module, PyObject *identobj)
+{
+ thread_module_state *state = get_thread_state(module);
+ if (!PyLong_Check(identobj)) {
+ PyErr_SetString(PyExc_TypeError, "ident must be an integer");
+ return NULL;
+ }
+ PyThread_ident_t ident = PyLong_AsUnsignedLongLong(identobj);
+ if (PyErr_Occurred()) {
+ return NULL;
+ }
+ PyThreadHandleObject *hobj =
+ PyThreadHandleObject_new(state->thread_handle_type);
+ if (hobj == NULL) {
+ return NULL;
+ }
+ PyMutex_Lock(&hobj->handle->mutex);
+ hobj->handle->ident = ident;
+ hobj->handle->state = THREAD_HANDLE_RUNNING;
+ PyMutex_Unlock(&hobj->handle->mutex);
+ return (PyObject*) hobj;
+}
+
+PyDoc_STRVAR(thread__make_thread_handle_doc,
+"_make_thread_handle(ident)\n\
+\n\
+Internal only. Make a thread handle for threads not spawned\n\
+by the _thread or threading module.");
+
+static PyObject *
+thread__get_main_thread_ident(PyObject *module, PyObject *Py_UNUSED(ignored))
+{
+ return PyLong_FromUnsignedLongLong(_PyRuntime.main_thread);
+}
+
+PyDoc_STRVAR(thread__get_main_thread_ident_doc,
+"_get_main_thread_ident()\n\
+\n\
+Internal only. Return a non-zero integer that uniquely identifies the main thread\n\
+of the main interpreter.");
+
static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
{"start_new", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
- {"start_joinable_thread", (PyCFunction)thread_PyThread_start_joinable_thread,
- METH_O, start_joinable_doc},
+ {"start_joinable_thread", _PyCFunction_CAST(thread_PyThread_start_joinable_thread),
+ METH_VARARGS | METH_KEYWORDS, start_joinable_doc},
{"daemon_threads_allowed", (PyCFunction)thread_daemon_threads_allowed,
METH_NOARGS, daemon_threads_allowed_doc},
{"allocate_lock", thread_PyThread_allocate_lock,
@@ -1946,12 +2305,16 @@ static PyMethodDef thread_methods[] = {
METH_NOARGS, _count_doc},
{"stack_size", (PyCFunction)thread_stack_size,
METH_VARARGS, stack_size_doc},
- {"_set_sentinel", thread__set_sentinel,
- METH_NOARGS, _set_sentinel_doc},
{"_excepthook", thread_excepthook,
METH_O, excepthook_doc},
{"_is_main_interpreter", thread__is_main_interpreter,
METH_NOARGS, thread__is_main_interpreter_doc},
+ {"_shutdown", thread_shutdown,
+ METH_NOARGS, shutdown_doc},
+ {"_make_thread_handle", thread__make_thread_handle,
+ METH_O, thread__make_thread_handle_doc},
+ {"_get_main_thread_ident", thread__get_main_thread_ident,
+ METH_NOARGS, thread__get_main_thread_ident_doc},
{NULL, NULL} /* sentinel */
};
@@ -2041,6 +2404,8 @@ thread_module_exec(PyObject *module)
return -1;
}
+ llist_init(&state->shutdown_handles);
+
return 0;
}
@@ -2066,6 +2431,10 @@ thread_module_clear(PyObject *module)
Py_CLEAR(state->local_type);
Py_CLEAR(state->local_dummy_type);
Py_CLEAR(state->thread_handle_type);
+ // Remove any remaining handles (e.g. if shutdown exited early due to
+ // interrupt) so that attempts to unlink the handle after our module state
+ // is destroyed do not crash.
+ clear_shutdown_handles(state);
return 0;
}
diff --git a/Python/lock.c b/Python/lock.c
index de25adc..7d1ead5 100644
--- a/Python/lock.c
+++ b/Python/lock.c
@@ -304,30 +304,6 @@ PyEvent_WaitTimed(PyEvent *evt, PyTime_t timeout_ns)
}
}
-_PyEventRc *
-_PyEventRc_New(void)
-{
- _PyEventRc *erc = (_PyEventRc *)PyMem_RawCalloc(1, sizeof(_PyEventRc));
- if (erc != NULL) {
- erc->refcount = 1;
- }
- return erc;
-}
-
-void
-_PyEventRc_Incref(_PyEventRc *erc)
-{
- _Py_atomic_add_ssize(&erc->refcount, 1);
-}
-
-void
-_PyEventRc_Decref(_PyEventRc *erc)
-{
- if (_Py_atomic_add_ssize(&erc->refcount, -1) == 1) {
- PyMem_RawFree(erc);
- }
-}
-
static int
unlock_once(_PyOnceFlag *o, int res)
{
diff --git a/Python/pystate.c b/Python/pystate.c
index 635616c..9f14222 100644
--- a/Python/pystate.c
+++ b/Python/pystate.c
@@ -1032,20 +1032,7 @@ _PyInterpreterState_SetRunningMain(PyInterpreterState *interp)
void
_PyInterpreterState_SetNotRunningMain(PyInterpreterState *interp)
{
- PyThreadState *tstate = interp->threads.main;
- assert(tstate == current_fast_get());
-
- if (tstate->on_delete != NULL) {
- // The threading module was imported for the first time in this
- // thread, so it was set as threading._main_thread. (See gh-75698.)
- // The thread has finished running the Python program so we mark
- // the thread object as finished.
- assert(tstate->_whence != _PyThreadState_WHENCE_THREADING);
- tstate->on_delete(tstate->on_delete_data);
- tstate->on_delete = NULL;
- tstate->on_delete_data = NULL;
- }
-
+ assert(interp->threads.main == current_fast_get());
interp->threads.main = NULL;
}
@@ -1570,16 +1557,6 @@ PyThreadState_Clear(PyThreadState *tstate)
Py_CLEAR(tstate->context);
- if (tstate->on_delete != NULL) {
- // For the "main" thread of each interpreter, this is meant
- // to be done in _PyInterpreterState_SetNotRunningMain().
- // That leaves threads created by the threading module,
- // and any threads killed by forking.
- // However, we also accommodate "main" threads that still
- // don't call _PyInterpreterState_SetNotRunningMain() yet.
- tstate->on_delete(tstate->on_delete_data);
- }
-
#ifdef Py_GIL_DISABLED
// Each thread should clear own freelists in free-threading builds.
struct _Py_object_freelists *freelists = _Py_object_freelists_GET();