diff options
author | Sam Gross <colesbury@gmail.com> | 2024-01-23 18:08:23 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-23 18:08:23 (GMT) |
commit | 441affc9e7f419ef0b68f734505fa2f79fe653c7 (patch) | |
tree | 1c7771190fffc7927ab70f485500521a96de7938 /Python | |
parent | 5f1997896d9c3ecf92e9863177c452b468a6a2c8 (diff) | |
download | cpython-441affc9e7f419ef0b68f734505fa2f79fe653c7.zip cpython-441affc9e7f419ef0b68f734505fa2f79fe653c7.tar.gz cpython-441affc9e7f419ef0b68f734505fa2f79fe653c7.tar.bz2 |
gh-111964: Implement stop-the-world pauses (gh-112471)
The `--disable-gil` builds occasionally need to pause all but one thread. Some
examples include:
* Cyclic garbage collection, where this is often called a "stop the world event"
* Before calling `fork()`, to ensure a consistent state for internal data structures
* During interpreter shutdown, to ensure that daemon threads aren't accessing Python objects
This adds the following functions to implement global and per-interpreter pauses:
* `_PyEval_StopTheWorldAll()` and `_PyEval_StartTheWorldAll()` (for the global runtime)
* `_PyEval_StopTheWorld()` and `_PyEval_StartTheWorld()` (per-interpreter)
(The function names may change.)
These functions are no-ops outside of the `--disable-gil` build.
Diffstat (limited to 'Python')
-rw-r--r-- | Python/ceval_gil.c | 9 | ||||
-rw-r--r-- | Python/pystate.c | 269 |
2 files changed, 264 insertions, 14 deletions
diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index d70abbc..f3b1692 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -949,6 +949,15 @@ _Py_HandlePending(PyThreadState *tstate) { PyInterpreterState *interp = tstate->interp; + /* Stop-the-world */ + if (_Py_eval_breaker_bit_is_set(interp, _PY_EVAL_PLEASE_STOP_BIT)) { + _Py_set_eval_breaker_bit(interp, _PY_EVAL_PLEASE_STOP_BIT, 0); + _PyThreadState_Suspend(tstate); + + /* The attach blocks until the stop-the-world event is complete. */ + _PyThreadState_Attach(tstate); + } + /* Pending signals */ if (_Py_eval_breaker_bit_is_set(interp, _PY_SIGNALS_PENDING_BIT)) { if (handle_signals(tstate) != 0) { diff --git a/Python/pystate.c b/Python/pystate.c index 9999762..23ddc78 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1336,6 +1336,11 @@ init_threadstate(_PyThreadStateImpl *_tstate, tstate->datastack_limit = NULL; tstate->what_event = -1; + if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) { + // Start in the suspended state if there is an ongoing stop-the-world. + tstate->state = _Py_THREAD_SUSPENDED; + } + tstate->_status.initialized = 1; } @@ -1562,6 +1567,9 @@ PyThreadState_Clear(PyThreadState *tstate) // XXX Do it as early in the function as possible. } +static void +decrement_stoptheworld_countdown(struct _stoptheworld_state *stw); + /* Common code for PyThreadState_Delete() and PyThreadState_DeleteCurrent() */ static void tstate_delete_common(PyThreadState *tstate) @@ -1585,6 +1593,16 @@ tstate_delete_common(PyThreadState *tstate) if (tstate->next) { tstate->next->prev = tstate->prev; } + if (tstate->state != _Py_THREAD_SUSPENDED) { + // Any ongoing stop-the-world request should not wait for us because + // our thread is getting deleted. + if (interp->stoptheworld.requested) { + decrement_stoptheworld_countdown(&interp->stoptheworld); + } + if (runtime->stoptheworld.requested) { + decrement_stoptheworld_countdown(&runtime->stoptheworld); + } + } HEAD_UNLOCK(runtime); // XXX Unbind in PyThreadState_Clear(), or earlier @@ -1790,13 +1808,9 @@ tstate_try_attach(PyThreadState *tstate) { #ifdef Py_GIL_DISABLED int expected = _Py_THREAD_DETACHED; - if (_Py_atomic_compare_exchange_int( - &tstate->state, - &expected, - _Py_THREAD_ATTACHED)) { - return 1; - } - return 0; + return _Py_atomic_compare_exchange_int(&tstate->state, + &expected, + _Py_THREAD_ATTACHED); #else assert(tstate->state == _Py_THREAD_DETACHED); tstate->state = _Py_THREAD_ATTACHED; @@ -1815,6 +1829,20 @@ tstate_set_detached(PyThreadState *tstate) #endif } +static void +tstate_wait_attach(PyThreadState *tstate) +{ + do { + int expected = _Py_THREAD_SUSPENDED; + + // Wait until we're switched out of SUSPENDED to DETACHED. + _PyParkingLot_Park(&tstate->state, &expected, sizeof(tstate->state), + /*timeout=*/-1, NULL, /*detach=*/0); + + // Once we're back in DETACHED we can re-attach + } while (!tstate_try_attach(tstate)); +} + void _PyThreadState_Attach(PyThreadState *tstate) { @@ -1836,10 +1864,7 @@ _PyThreadState_Attach(PyThreadState *tstate) tstate_activate(tstate); if (!tstate_try_attach(tstate)) { - // TODO: Once stop-the-world GC is implemented for --disable-gil builds - // this will need to wait until the GC completes. For now, this case - // should never happen. - Py_FatalError("thread attach failed"); + tstate_wait_attach(tstate); } // Resume previous critical section. This acquires the lock(s) from the @@ -1853,8 +1878,8 @@ _PyThreadState_Attach(PyThreadState *tstate) #endif } -void -_PyThreadState_Detach(PyThreadState *tstate) +static void +detach_thread(PyThreadState *tstate, int detached_state) { // XXX assert(tstate_is_alive(tstate) && tstate_is_bound(tstate)); assert(tstate->state == _Py_THREAD_ATTACHED); @@ -1862,12 +1887,228 @@ _PyThreadState_Detach(PyThreadState *tstate) if (tstate->critical_section != 0) { _PyCriticalSection_SuspendAll(tstate); } - tstate_set_detached(tstate); tstate_deactivate(tstate); + tstate_set_detached(tstate); current_fast_clear(&_PyRuntime); _PyEval_ReleaseLock(tstate->interp, tstate); } +void +_PyThreadState_Detach(PyThreadState *tstate) +{ + detach_thread(tstate, _Py_THREAD_DETACHED); +} + +void +_PyThreadState_Suspend(PyThreadState *tstate) +{ + _PyRuntimeState *runtime = &_PyRuntime; + + assert(tstate->state == _Py_THREAD_ATTACHED); + + struct _stoptheworld_state *stw = NULL; + HEAD_LOCK(runtime); + if (runtime->stoptheworld.requested) { + stw = &runtime->stoptheworld; + } + else if (tstate->interp->stoptheworld.requested) { + stw = &tstate->interp->stoptheworld; + } + HEAD_UNLOCK(runtime); + + if (stw == NULL) { + // Switch directly to "detached" if there is no active stop-the-world + // request. + detach_thread(tstate, _Py_THREAD_DETACHED); + return; + } + + // Switch to "suspended" state. + detach_thread(tstate, _Py_THREAD_SUSPENDED); + + // Decrease the count of remaining threads needing to park. + HEAD_LOCK(runtime); + decrement_stoptheworld_countdown(stw); + HEAD_UNLOCK(runtime); +} + +// Decrease stop-the-world counter of remaining number of threads that need to +// pause. If we are the final thread to pause, notify the requesting thread. +static void +decrement_stoptheworld_countdown(struct _stoptheworld_state *stw) +{ + assert(stw->thread_countdown > 0); + if (--stw->thread_countdown == 0) { + _PyEvent_Notify(&stw->stop_event); + } +} + +#ifdef Py_GIL_DISABLED +// Interpreter for _Py_FOR_EACH_THREAD(). For global stop-the-world events, +// we start with the first interpreter and then iterate over all interpreters. +// For per-interpreter stop-the-world events, we only operate on the one +// interpreter. +static PyInterpreterState * +interp_for_stop_the_world(struct _stoptheworld_state *stw) +{ + return (stw->is_global + ? PyInterpreterState_Head() + : _Py_CONTAINER_OF(stw, PyInterpreterState, stoptheworld)); +} + +// Loops over threads for a stop-the-world event. +// For global: all threads in all interpreters +// For per-interpreter: all threads in the interpreter +#define _Py_FOR_EACH_THREAD(stw, i, t) \ + for (i = interp_for_stop_the_world((stw)); \ + i != NULL; i = ((stw->is_global) ? i->next : NULL)) \ + for (t = i->threads.head; t; t = t->next) + + +// Try to transition threads atomically from the "detached" state to the +// "gc stopped" state. Returns true if all threads are in the "gc stopped" +static bool +park_detached_threads(struct _stoptheworld_state *stw) +{ + int num_parked = 0; + PyInterpreterState *i; + PyThreadState *t; + _Py_FOR_EACH_THREAD(stw, i, t) { + int state = _Py_atomic_load_int_relaxed(&t->state); + if (state == _Py_THREAD_DETACHED) { + // Atomically transition to "suspended" if in "detached" state. + if (_Py_atomic_compare_exchange_int(&t->state, + &state, _Py_THREAD_SUSPENDED)) { + num_parked++; + } + } + else if (state == _Py_THREAD_ATTACHED && t != stw->requester) { + // TODO: set this per-thread, rather than per-interpreter. + _Py_set_eval_breaker_bit(t->interp, _PY_EVAL_PLEASE_STOP_BIT, 1); + } + } + stw->thread_countdown -= num_parked; + assert(stw->thread_countdown >= 0); + return num_parked > 0 && stw->thread_countdown == 0; +} + +static void +stop_the_world(struct _stoptheworld_state *stw) +{ + _PyRuntimeState *runtime = &_PyRuntime; + + PyMutex_Lock(&stw->mutex); + if (stw->is_global) { + _PyRWMutex_Lock(&runtime->stoptheworld_mutex); + } + else { + _PyRWMutex_RLock(&runtime->stoptheworld_mutex); + } + + HEAD_LOCK(runtime); + stw->requested = 1; + stw->thread_countdown = 0; + stw->stop_event = (PyEvent){0}; // zero-initialize (unset) + stw->requester = _PyThreadState_GET(); // may be NULL + + PyInterpreterState *i; + PyThreadState *t; + _Py_FOR_EACH_THREAD(stw, i, t) { + if (t != stw->requester) { + // Count all the other threads (we don't wait on ourself). + stw->thread_countdown++; + } + } + + if (stw->thread_countdown == 0) { + HEAD_UNLOCK(runtime); + stw->world_stopped = 1; + return; + } + + for (;;) { + // Switch threads that are detached to the GC stopped state + bool stopped_all_threads = park_detached_threads(stw); + HEAD_UNLOCK(runtime); + + if (stopped_all_threads) { + break; + } + + _PyTime_t wait_ns = 1000*1000; // 1ms (arbitrary, may need tuning) + if (PyEvent_WaitTimed(&stw->stop_event, wait_ns)) { + assert(stw->thread_countdown == 0); + break; + } + + HEAD_LOCK(runtime); + } + stw->world_stopped = 1; +} + +static void +start_the_world(struct _stoptheworld_state *stw) +{ + _PyRuntimeState *runtime = &_PyRuntime; + assert(PyMutex_IsLocked(&stw->mutex)); + + HEAD_LOCK(runtime); + stw->requested = 0; + stw->world_stopped = 0; + stw->requester = NULL; + // Switch threads back to the detached state. + PyInterpreterState *i; + PyThreadState *t; + _Py_FOR_EACH_THREAD(stw, i, t) { + if (t != stw->requester) { + assert(t->state == _Py_THREAD_SUSPENDED); + _Py_atomic_store_int(&t->state, _Py_THREAD_DETACHED); + _PyParkingLot_UnparkAll(&t->state); + } + } + HEAD_UNLOCK(runtime); + if (stw->is_global) { + _PyRWMutex_Unlock(&runtime->stoptheworld_mutex); + } + else { + _PyRWMutex_RUnlock(&runtime->stoptheworld_mutex); + } + PyMutex_Unlock(&stw->mutex); +} +#endif // Py_GIL_DISABLED + +void +_PyEval_StopTheWorldAll(_PyRuntimeState *runtime) +{ +#ifdef Py_GIL_DISABLED + stop_the_world(&runtime->stoptheworld); +#endif +} + +void +_PyEval_StartTheWorldAll(_PyRuntimeState *runtime) +{ +#ifdef Py_GIL_DISABLED + start_the_world(&runtime->stoptheworld); +#endif +} + +void +_PyEval_StopTheWorld(PyInterpreterState *interp) +{ +#ifdef Py_GIL_DISABLED + stop_the_world(&interp->stoptheworld); +#endif +} + +void +_PyEval_StartTheWorld(PyInterpreterState *interp) +{ +#ifdef Py_GIL_DISABLED + start_the_world(&interp->stoptheworld); +#endif +} + //---------- // other API //---------- |