From 9dabace39d118ec7a204b6970f8a3f475a11522c Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 21 Nov 2024 11:08:38 -0700 Subject: gh-114940: Add _Py_FOR_EACH_TSTATE_UNLOCKED(), and Friends (gh-127077) This is a precursor to the actual fix for gh-114940, where we will change these macros to use the new lock. This change is almost entirely mechanical; the exceptions are the loops in codeobject.c and ceval.c, which now hold the "head" lock. Note that almost all of the uses of _Py_FOR_EACH_TSTATE_UNLOCKED() here will change to _Py_FOR_EACH_TSTATE_BEGIN() once we add the new per-interpreter lock. --- Include/internal/pycore_pystate.h | 9 ++++ Objects/codeobject.c | 6 ++- Objects/object.c | 2 +- Objects/obmalloc.c | 2 +- Python/ceval.c | 3 +- Python/ceval_gil.c | 14 ++---- Python/gc_free_threading.c | 25 ++++------ Python/instrumentation.c | 7 +-- Python/pystate.c | 98 ++++++++++++++++++--------------------- 9 files changed, 79 insertions(+), 87 deletions(-) diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h index f4fbf37..54d8803 100644 --- a/Include/internal/pycore_pystate.h +++ b/Include/internal/pycore_pystate.h @@ -269,6 +269,15 @@ extern int _PyOS_InterruptOccurred(PyThreadState *tstate); #define HEAD_UNLOCK(runtime) \ PyMutex_Unlock(&(runtime)->interpreters.mutex) +#define _Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) \ + for (PyThreadState *t = interp->threads.head; t; t = t->next) +#define _Py_FOR_EACH_TSTATE_BEGIN(interp, t) \ + HEAD_LOCK(interp->runtime); \ + _Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) +#define _Py_FOR_EACH_TSTATE_END(interp) \ + HEAD_UNLOCK(interp->runtime) + + // Get the configuration of the current interpreter. // The caller must hold the GIL. // Export for test_peg_generator. diff --git a/Objects/codeobject.c b/Objects/codeobject.c index ec5d8a5..148350c 100644 --- a/Objects/codeobject.c +++ b/Objects/codeobject.c @@ -2895,20 +2895,22 @@ get_indices_in_use(PyInterpreterState *interp, struct flag_set *in_use) assert(interp->stoptheworld.world_stopped); assert(in_use->flags == NULL); int32_t max_index = 0; - for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { int32_t idx = ((_PyThreadStateImpl *) p)->tlbc_index; if (idx > max_index) { max_index = idx; } } + _Py_FOR_EACH_TSTATE_END(interp); in_use->size = (size_t) max_index + 1; in_use->flags = PyMem_Calloc(in_use->size, sizeof(*in_use->flags)); if (in_use->flags == NULL) { return -1; } - for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { in_use->flags[((_PyThreadStateImpl *) p)->tlbc_index] = 1; } + _Py_FOR_EACH_TSTATE_END(interp); return 0; } diff --git a/Objects/object.c b/Objects/object.c index b115bc7..8868fa2 100644 --- a/Objects/object.c +++ b/Objects/object.c @@ -119,7 +119,7 @@ get_reftotal(PyInterpreterState *interp) since we can't determine which interpreter updated it. */ Py_ssize_t total = REFTOTAL(interp); #ifdef Py_GIL_DISABLED - for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + _Py_FOR_EACH_TSTATE_UNLOCKED(interp, p) { /* This may race with other threads modifications to their reftotal */ _PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)p; total += _Py_atomic_load_ssize_relaxed(&tstate_impl->reftotal); diff --git a/Objects/obmalloc.c b/Objects/obmalloc.c index 3d6b1ab..2cc0377 100644 --- a/Objects/obmalloc.c +++ b/Objects/obmalloc.c @@ -1439,7 +1439,7 @@ get_mimalloc_allocated_blocks(PyInterpreterState *interp) { size_t allocated_blocks = 0; #ifdef Py_GIL_DISABLED - for (PyThreadState *t = interp->threads.head; t != NULL; t = t->next) { + _Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) { _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)t; for (int i = 0; i < _Py_MIMALLOC_HEAP_COUNT; i++) { mi_heap_t *heap = &tstate->mimalloc.heaps[i]; diff --git a/Python/ceval.c b/Python/ceval.c index 892dc5f..2a39385 100644 --- a/Python/ceval.c +++ b/Python/ceval.c @@ -296,11 +296,12 @@ Py_SetRecursionLimit(int new_limit) { PyInterpreterState *interp = _PyInterpreterState_GET(); interp->ceval.recursion_limit = new_limit; - for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { int depth = p->py_recursion_limit - p->py_recursion_remaining; p->py_recursion_limit = new_limit; p->py_recursion_remaining = new_limit - depth; } + _Py_FOR_EACH_TSTATE_END(interp); } /* The function _Py_EnterRecursiveCallTstate() only calls _Py_CheckRecursiveCall() diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 4c9f59f..1f811e7 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -977,25 +977,19 @@ make_pending_calls(PyThreadState *tstate) void _Py_set_eval_breaker_bit_all(PyInterpreterState *interp, uintptr_t bit) { - _PyRuntimeState *runtime = &_PyRuntime; - - HEAD_LOCK(runtime); - for (PyThreadState *tstate = interp->threads.head; tstate != NULL; tstate = tstate->next) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, tstate) { _Py_set_eval_breaker_bit(tstate, bit); } - HEAD_UNLOCK(runtime); + _Py_FOR_EACH_TSTATE_END(interp); } void _Py_unset_eval_breaker_bit_all(PyInterpreterState *interp, uintptr_t bit) { - _PyRuntimeState *runtime = &_PyRuntime; - - HEAD_LOCK(runtime); - for (PyThreadState *tstate = interp->threads.head; tstate != NULL; tstate = tstate->next) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, tstate) { _Py_unset_eval_breaker_bit(tstate, bit); } - HEAD_UNLOCK(runtime); + _Py_FOR_EACH_TSTATE_END(interp); } void diff --git a/Python/gc_free_threading.c b/Python/gc_free_threading.c index 0920c61..f7f4440 100644 --- a/Python/gc_free_threading.c +++ b/Python/gc_free_threading.c @@ -304,7 +304,7 @@ gc_visit_heaps_lock_held(PyInterpreterState *interp, mi_block_visit_fun *visitor Py_ssize_t offset_pre = offset_base + 2 * sizeof(PyObject*); // visit each thread's heaps for GC objects - for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + _Py_FOR_EACH_TSTATE_UNLOCKED(interp, p) { struct _mimalloc_thread_state *m = &((_PyThreadStateImpl *)p)->mimalloc; if (!_Py_atomic_load_int(&m->initialized)) { // The thread may not have called tstate_mimalloc_bind() yet. @@ -374,8 +374,7 @@ gc_visit_stackref(_PyStackRef stackref) static void gc_visit_thread_stacks(PyInterpreterState *interp) { - HEAD_LOCK(&_PyRuntime); - for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { for (_PyInterpreterFrame *f = p->current_frame; f != NULL; f = f->previous) { PyObject *executable = PyStackRef_AsPyObjectBorrow(f->f_executable); if (executable == NULL || !PyCode_Check(executable)) { @@ -390,7 +389,7 @@ gc_visit_thread_stacks(PyInterpreterState *interp) } } } - HEAD_UNLOCK(&_PyRuntime); + _Py_FOR_EACH_TSTATE_END(interp); } static void @@ -444,14 +443,13 @@ process_delayed_frees(PyInterpreterState *interp, struct collection_state *state // Merge the queues from other threads into our own queue so that we can // process all of the pending delayed free requests at once. - HEAD_LOCK(&_PyRuntime); - for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { _PyThreadStateImpl *other = (_PyThreadStateImpl *)p; if (other != current_tstate) { llist_concat(¤t_tstate->mem_free_queue, &other->mem_free_queue); } } - HEAD_UNLOCK(&_PyRuntime); + _Py_FOR_EACH_TSTATE_END(interp); _PyMem_ProcessDelayedNoDealloc((PyThreadState *)current_tstate, queue_freed_object, state); } @@ -1234,8 +1232,7 @@ gc_collect_internal(PyInterpreterState *interp, struct collection_state *state, state->gcstate->old[i-1].count = 0; } - HEAD_LOCK(&_PyRuntime); - for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p; // merge per-thread refcount for types into the type's actual refcount @@ -1244,7 +1241,7 @@ gc_collect_internal(PyInterpreterState *interp, struct collection_state *state, // merge refcounts for all queued objects merge_queued_objects(tstate, state); } - HEAD_UNLOCK(&_PyRuntime); + _Py_FOR_EACH_TSTATE_END(interp); process_delayed_frees(interp, state); @@ -1993,13 +1990,11 @@ PyUnstable_GC_VisitObjects(gcvisitobjects_t callback, void *arg) void _PyGC_ClearAllFreeLists(PyInterpreterState *interp) { - HEAD_LOCK(&_PyRuntime); - _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)interp->threads.head; - while (tstate != NULL) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { + _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p; _PyObject_ClearFreeLists(&tstate->freelists, 0); - tstate = (_PyThreadStateImpl *)tstate->base.next; } - HEAD_UNLOCK(&_PyRuntime); + _Py_FOR_EACH_TSTATE_END(interp); } #endif // Py_GIL_DISABLED diff --git a/Python/instrumentation.c b/Python/instrumentation.c index 87c2add..3503809 100644 --- a/Python/instrumentation.c +++ b/Python/instrumentation.c @@ -1006,13 +1006,10 @@ set_global_version(PyThreadState *tstate, uint32_t version) #ifdef Py_GIL_DISABLED // Set the version on all threads in free-threaded builds. - _PyRuntimeState *runtime = &_PyRuntime; - HEAD_LOCK(runtime); - for (tstate = interp->threads.head; tstate; - tstate = PyThreadState_Next(tstate)) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, tstate) { set_version_raw(&tstate->eval_breaker, version); }; - HEAD_UNLOCK(runtime); + _Py_FOR_EACH_TSTATE_END(interp); #else // Normal builds take the current version from instrumentation_version when // attaching a thread, so we only have to set the current thread's version. diff --git a/Python/pystate.c b/Python/pystate.c index 44f55be..975eb6d 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -790,18 +790,15 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate) } // Clear the current/main thread state last. - HEAD_LOCK(runtime); - PyThreadState *p = interp->threads.head; - HEAD_UNLOCK(runtime); - while (p != NULL) { + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { // See https://github.com/python/cpython/issues/102126 // Must be called without HEAD_LOCK held as it can deadlock // if any finalizer tries to acquire that lock. + HEAD_UNLOCK(runtime); PyThreadState_Clear(p); HEAD_LOCK(runtime); - p = p->next; - HEAD_UNLOCK(runtime); } + _Py_FOR_EACH_TSTATE_END(interp); if (tstate->interp == interp) { /* We fix tstate->_status below when we for sure aren't using it (e.g. no longer need the GIL). */ @@ -1801,10 +1798,9 @@ tstate_delete_common(PyThreadState *tstate, int release_gil) static void zapthreads(PyInterpreterState *interp) { - PyThreadState *tstate; /* No need to lock the mutex here because this should only happen when the threads are all really dead (XXX famous last words). */ - while ((tstate = interp->threads.head) != NULL) { + _Py_FOR_EACH_TSTATE_UNLOCKED(interp, tstate) { tstate_verify_not_active(tstate); tstate_delete_common(tstate, 0); free_threadstate((_PyThreadStateImpl *)tstate); @@ -2161,7 +2157,7 @@ decrement_stoptheworld_countdown(struct _stoptheworld_state *stw) } #ifdef Py_GIL_DISABLED -// Interpreter for _Py_FOR_EACH_THREAD(). For global stop-the-world events, +// Interpreter for _Py_FOR_EACH_STW_INTERP(). For global stop-the-world events, // we start with the first interpreter and then iterate over all interpreters. // For per-interpreter stop-the-world events, we only operate on the one // interpreter. @@ -2176,10 +2172,9 @@ interp_for_stop_the_world(struct _stoptheworld_state *stw) // Loops over threads for a stop-the-world event. // For global: all threads in all interpreters // For per-interpreter: all threads in the interpreter -#define _Py_FOR_EACH_THREAD(stw, i, t) \ - for (i = interp_for_stop_the_world((stw)); \ - i != NULL; i = ((stw->is_global) ? i->next : NULL)) \ - for (t = i->threads.head; t; t = t->next) +#define _Py_FOR_EACH_STW_INTERP(stw, i) \ + for (PyInterpreterState *i = interp_for_stop_the_world((stw)); \ + i != NULL; i = ((stw->is_global) ? i->next : NULL)) // Try to transition threads atomically from the "detached" state to the @@ -2188,19 +2183,19 @@ static bool park_detached_threads(struct _stoptheworld_state *stw) { int num_parked = 0; - PyInterpreterState *i; - PyThreadState *t; - _Py_FOR_EACH_THREAD(stw, i, t) { - int state = _Py_atomic_load_int_relaxed(&t->state); - if (state == _Py_THREAD_DETACHED) { - // Atomically transition to "suspended" if in "detached" state. - if (_Py_atomic_compare_exchange_int(&t->state, - &state, _Py_THREAD_SUSPENDED)) { - num_parked++; + _Py_FOR_EACH_STW_INTERP(stw, i) { + _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { + int state = _Py_atomic_load_int_relaxed(&t->state); + if (state == _Py_THREAD_DETACHED) { + // Atomically transition to "suspended" if in "detached" state. + if (_Py_atomic_compare_exchange_int( + &t->state, &state, _Py_THREAD_SUSPENDED)) { + num_parked++; + } + } + else if (state == _Py_THREAD_ATTACHED && t != stw->requester) { + _Py_set_eval_breaker_bit(t, _PY_EVAL_PLEASE_STOP_BIT); } - } - else if (state == _Py_THREAD_ATTACHED && t != stw->requester) { - _Py_set_eval_breaker_bit(t, _PY_EVAL_PLEASE_STOP_BIT); } } stw->thread_countdown -= num_parked; @@ -2227,12 +2222,12 @@ stop_the_world(struct _stoptheworld_state *stw) stw->stop_event = (PyEvent){0}; // zero-initialize (unset) stw->requester = _PyThreadState_GET(); // may be NULL - PyInterpreterState *i; - PyThreadState *t; - _Py_FOR_EACH_THREAD(stw, i, t) { - if (t != stw->requester) { - // Count all the other threads (we don't wait on ourself). - stw->thread_countdown++; + _Py_FOR_EACH_STW_INTERP(stw, i) { + _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { + if (t != stw->requester) { + // Count all the other threads (we don't wait on ourself). + stw->thread_countdown++; + } } } @@ -2273,14 +2268,14 @@ start_the_world(struct _stoptheworld_state *stw) stw->requested = 0; stw->world_stopped = 0; // Switch threads back to the detached state. - PyInterpreterState *i; - PyThreadState *t; - _Py_FOR_EACH_THREAD(stw, i, t) { - if (t != stw->requester) { - assert(_Py_atomic_load_int_relaxed(&t->state) == - _Py_THREAD_SUSPENDED); - _Py_atomic_store_int(&t->state, _Py_THREAD_DETACHED); - _PyParkingLot_UnparkAll(&t->state); + _Py_FOR_EACH_STW_INTERP(stw, i) { + _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { + if (t != stw->requester) { + assert(_Py_atomic_load_int_relaxed(&t->state) == + _Py_THREAD_SUSPENDED); + _Py_atomic_store_int(&t->state, _Py_THREAD_DETACHED); + _PyParkingLot_UnparkAll(&t->state); + } } } stw->requester = NULL; @@ -2344,7 +2339,6 @@ _PyEval_StartTheWorld(PyInterpreterState *interp) int PyThreadState_SetAsyncExc(unsigned long id, PyObject *exc) { - _PyRuntimeState *runtime = &_PyRuntime; PyInterpreterState *interp = _PyInterpreterState_GET(); /* Although the GIL is held, a few C API functions can be called @@ -2353,12 +2347,16 @@ PyThreadState_SetAsyncExc(unsigned long id, PyObject *exc) * list of thread states we're traversing, so to prevent that we lock * head_mutex for the duration. */ - HEAD_LOCK(runtime); - for (PyThreadState *tstate = interp->threads.head; tstate != NULL; tstate = tstate->next) { - if (tstate->thread_id != id) { - continue; + PyThreadState *tstate = NULL; + _Py_FOR_EACH_TSTATE_BEGIN(interp, t) { + if (t->thread_id == id) { + tstate = t; + break; } + } + _Py_FOR_EACH_TSTATE_END(interp); + if (tstate != NULL) { /* Tricky: we need to decref the current value * (if any) in tstate->async_exc, but that can in turn * allow arbitrary Python code to run, including @@ -2368,14 +2366,12 @@ PyThreadState_SetAsyncExc(unsigned long id, PyObject *exc) */ Py_XINCREF(exc); PyObject *old_exc = _Py_atomic_exchange_ptr(&tstate->async_exc, exc); - HEAD_UNLOCK(runtime); Py_XDECREF(old_exc); _Py_set_eval_breaker_bit(tstate, _PY_ASYNC_EXCEPTION_BIT); - return 1; } - HEAD_UNLOCK(runtime); - return 0; + + return tstate != NULL; } //--------------------------------- @@ -2515,8 +2511,7 @@ _PyThread_CurrentFrames(void) HEAD_LOCK(runtime); PyInterpreterState *i; for (i = runtime->interpreters.head; i != NULL; i = i->next) { - PyThreadState *t; - for (t = i->threads.head; t != NULL; t = t->next) { + _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { _PyInterpreterFrame *frame = t->current_frame; frame = _PyFrame_GetFirstComplete(frame); if (frame == NULL) { @@ -2581,8 +2576,7 @@ _PyThread_CurrentExceptions(void) HEAD_LOCK(runtime); PyInterpreterState *i; for (i = runtime->interpreters.head; i != NULL; i = i->next) { - PyThreadState *t; - for (t = i->threads.head; t != NULL; t = t->next) { + _Py_FOR_EACH_TSTATE_UNLOCKED(i, t) { _PyErr_StackItem *err_info = _PyErr_GetTopmostException(t); if (err_info == NULL) { continue; -- cgit v0.12