diff options
Diffstat (limited to 'Python')
-rw-r--r-- | Python/brc.c | 198 | ||||
-rw-r--r-- | Python/ceval_gil.c | 8 | ||||
-rw-r--r-- | Python/gc_free_threading.c | 46 | ||||
-rw-r--r-- | Python/object_stack.c | 21 | ||||
-rw-r--r-- | Python/pystate.c | 11 |
5 files changed, 282 insertions, 2 deletions
diff --git a/Python/brc.c b/Python/brc.c new file mode 100644 index 0000000..f1fd57a --- /dev/null +++ b/Python/brc.c @@ -0,0 +1,198 @@ +// Implementation of biased reference counting inter-thread queue. +// +// Biased reference counting maintains two refcount fields in each object: +// ob_ref_local and ob_ref_shared. The true refcount is the sum of these two +// fields. In some cases, when refcounting operations are split across threads, +// the ob_ref_shared field can be negative (although the total refcount must +// be at least zero). In this case, the thread that decremented the refcount +// requests that the owning thread give up ownership and merge the refcount +// fields. This file implements the mechanism for doing so. +// +// Each thread state maintains a queue of objects whose refcounts it should +// merge. The thread states are stored in a per-interpreter hash table by +// thread id. The hash table has a fixed size and uses a linked list to store +// thread states within each bucket. +// +// The queueing thread uses the eval breaker mechanism to notify the owning +// thread that it has objects to merge. Additionaly, all queued objects are +// merged during GC. +#include "Python.h" +#include "pycore_object.h" // _Py_ExplicitMergeRefcount +#include "pycore_brc.h" // struct _brc_thread_state +#include "pycore_ceval.h" // _Py_set_eval_breaker_bit +#include "pycore_llist.h" // struct llist_node +#include "pycore_pystate.h" // _PyThreadStateImpl + +#ifdef Py_GIL_DISABLED + +// Get the hashtable bucket for a given thread id. +static struct _brc_bucket * +get_bucket(PyInterpreterState *interp, uintptr_t tid) +{ + return &interp->brc.table[tid % _Py_BRC_NUM_BUCKETS]; +} + +// Find the thread state in a hash table bucket by thread id. +static _PyThreadStateImpl * +find_thread_state(struct _brc_bucket *bucket, uintptr_t thread_id) +{ + struct llist_node *node; + llist_for_each(node, &bucket->root) { + // Get the containing _PyThreadStateImpl from the linked-list node. + _PyThreadStateImpl *ts = llist_data(node, _PyThreadStateImpl, + brc.bucket_node); + if (ts->brc.tid == thread_id) { + return ts; + } + } + return NULL; +} + +// Enqueue an object to be merged by the owning thread. This steals a +// reference to the object. +void +_Py_brc_queue_object(PyObject *ob) +{ + PyInterpreterState *interp = _PyInterpreterState_GET(); + + uintptr_t ob_tid = _Py_atomic_load_uintptr(&ob->ob_tid); + if (ob_tid == 0) { + // The owning thread may have concurrently decided to merge the + // refcount fields. + Py_DECREF(ob); + return; + } + + struct _brc_bucket *bucket = get_bucket(interp, ob_tid); + PyMutex_Lock(&bucket->mutex); + _PyThreadStateImpl *tstate = find_thread_state(bucket, ob_tid); + if (tstate == NULL) { + // If we didn't find the owning thread then it must have already exited. + // It's safe (and necessary) to merge the refcount. Subtract one when + // merging because we've stolen a reference. + Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1); + PyMutex_Unlock(&bucket->mutex); + if (refcount == 0) { + _Py_Dealloc(ob); + } + return; + } + + if (_PyObjectStack_Push(&tstate->brc.objects_to_merge, ob) < 0) { + PyMutex_Unlock(&bucket->mutex); + + // Fall back to stopping all threads and manually merging the refcount + // if we can't enqueue the object to be merged. + _PyEval_StopTheWorld(interp); + Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1); + _PyEval_StartTheWorld(interp); + + if (refcount == 0) { + _Py_Dealloc(ob); + } + return; + } + + // Notify owning thread + _Py_set_eval_breaker_bit(interp, _PY_EVAL_EXPLICIT_MERGE_BIT, 1); + + PyMutex_Unlock(&bucket->mutex); +} + +static void +merge_queued_objects(_PyObjectStack *to_merge) +{ + PyObject *ob; + while ((ob = _PyObjectStack_Pop(to_merge)) != NULL) { + // Subtract one when merging because the queue had a reference. + Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1); + if (refcount == 0) { + _Py_Dealloc(ob); + } + } +} + +// Process this thread's queue of objects to merge. +void +_Py_brc_merge_refcounts(PyThreadState *tstate) +{ + struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc; + struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid); + + // Append all objects into a local stack. We don't want to hold the lock + // while calling destructors. + PyMutex_Lock(&bucket->mutex); + _PyObjectStack_Merge(&brc->local_objects_to_merge, &brc->objects_to_merge); + PyMutex_Unlock(&bucket->mutex); + + // Process the local stack until it's empty + merge_queued_objects(&brc->local_objects_to_merge); +} + +void +_Py_brc_init_state(PyInterpreterState *interp) +{ + struct _brc_state *brc = &interp->brc; + for (Py_ssize_t i = 0; i < _Py_BRC_NUM_BUCKETS; i++) { + llist_init(&brc->table[i].root); + } +} + +void +_Py_brc_init_thread(PyThreadState *tstate) +{ + struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc; + brc->tid = _Py_ThreadId(); + + // Add ourself to the hashtable + struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid); + PyMutex_Lock(&bucket->mutex); + llist_insert_tail(&bucket->root, &brc->bucket_node); + PyMutex_Unlock(&bucket->mutex); +} + +void +_Py_brc_remove_thread(PyThreadState *tstate) +{ + struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc; + struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid); + + // We need to fully process any objects to merge before removing ourself + // from the hashtable. It is not safe to perform any refcount operations + // after we are removed. After that point, other threads treat our objects + // as abandoned and may merge the objects' refcounts directly. + bool empty = false; + while (!empty) { + // Process the local stack until it's empty + merge_queued_objects(&brc->local_objects_to_merge); + + PyMutex_Lock(&bucket->mutex); + empty = (brc->objects_to_merge.head == NULL); + if (empty) { + llist_remove(&brc->bucket_node); + } + else { + _PyObjectStack_Merge(&brc->local_objects_to_merge, + &brc->objects_to_merge); + } + PyMutex_Unlock(&bucket->mutex); + } + + assert(brc->local_objects_to_merge.head == NULL); + assert(brc->objects_to_merge.head == NULL); +} + +void +_Py_brc_after_fork(PyInterpreterState *interp) +{ + // Unlock all bucket mutexes. Some of the buckets may be locked because + // locks can be handed off to a parked thread (see lock.c). We don't have + // to worry about consistency here, becuase no thread can be actively + // modifying a bucket, but it might be paused (not yet woken up) on a + // PyMutex_Lock while holding that lock. + for (Py_ssize_t i = 0; i < _Py_BRC_NUM_BUCKETS; i++) { + _PyMutex_at_fork_reinit(&interp->brc.table[i].mutex); + } +} + +#endif /* Py_GIL_DISABLED */ diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index ad90359..deb9741 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -980,6 +980,14 @@ _Py_HandlePending(PyThreadState *tstate) } } +#ifdef Py_GIL_DISABLED + /* Objects with refcounts to merge */ + if (_Py_eval_breaker_bit_is_set(interp, _PY_EVAL_EXPLICIT_MERGE_BIT)) { + _Py_set_eval_breaker_bit(interp, _PY_EVAL_EXPLICIT_MERGE_BIT, 0); + _Py_brc_merge_refcounts(tstate); + } +#endif + /* GC scheduled to run */ if (_Py_eval_breaker_bit_is_set(interp, _PY_GC_SCHEDULED_BIT)) { _Py_set_eval_breaker_bit(interp, _PY_GC_SCHEDULED_BIT, 0); diff --git a/Python/gc_free_threading.c b/Python/gc_free_threading.c index 8fbcdb1..5d3b097 100644 --- a/Python/gc_free_threading.c +++ b/Python/gc_free_threading.c @@ -1,5 +1,6 @@ // Cyclic garbage collector implementation for free-threaded build. #include "Python.h" +#include "pycore_brc.h" // struct _brc_thread_state #include "pycore_ceval.h" // _Py_set_eval_breaker_bit() #include "pycore_context.h" #include "pycore_dict.h" // _PyDict_MaybeUntrack() @@ -152,8 +153,7 @@ gc_decref(PyObject *op) op->ob_tid -= 1; } -// Merge refcounts while the world is stopped. -static void +static Py_ssize_t merge_refcount(PyObject *op, Py_ssize_t extra) { assert(_PyInterpreterState_GET()->stoptheworld.world_stopped); @@ -169,6 +169,7 @@ merge_refcount(PyObject *op, Py_ssize_t extra) op->ob_tid = 0; op->ob_ref_local = 0; op->ob_ref_shared = _Py_REF_SHARED(refcount, _Py_REF_MERGED); + return refcount; } static void @@ -282,6 +283,41 @@ gc_visit_heaps(PyInterpreterState *interp, mi_block_visit_fun *visitor, return err; } +static void +merge_queued_objects(_PyThreadStateImpl *tstate, struct collection_state *state) +{ + struct _brc_thread_state *brc = &tstate->brc; + _PyObjectStack_Merge(&brc->local_objects_to_merge, &brc->objects_to_merge); + + PyObject *op; + while ((op = _PyObjectStack_Pop(&brc->local_objects_to_merge)) != NULL) { + // Subtract one when merging because the queue had a reference. + Py_ssize_t refcount = merge_refcount(op, -1); + + if (!_PyObject_GC_IS_TRACKED(op) && refcount == 0) { + // GC objects with zero refcount are handled subsequently by the + // GC as if they were cyclic trash, but we have to handle dead + // non-GC objects here. Add one to the refcount so that we can + // decref and deallocate the object once we start the world again. + op->ob_ref_shared += (1 << _Py_REF_SHARED_SHIFT); +#ifdef Py_REF_DEBUG + _Py_IncRefTotal(_PyInterpreterState_GET()); +#endif + worklist_push(&state->objs_to_decref, op); + } + } +} + +static void +merge_all_queued_objects(PyInterpreterState *interp, struct collection_state *state) +{ + HEAD_LOCK(&_PyRuntime); + for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) { + merge_queued_objects((_PyThreadStateImpl *)p, state); + } + HEAD_UNLOCK(&_PyRuntime); +} + // Subtract an incoming reference from the computed "gc_refs" refcount. static int visit_decref(PyObject *op, void *arg) @@ -927,6 +963,9 @@ static void gc_collect_internal(PyInterpreterState *interp, struct collection_state *state) { _PyEval_StopTheWorld(interp); + // merge refcounts for all queued objects + merge_all_queued_objects(interp, state); + // Find unreachable objects int err = deduce_unreachable_heap(interp, state); if (err < 0) { @@ -946,6 +985,9 @@ gc_collect_internal(PyInterpreterState *interp, struct collection_state *state) clear_weakrefs(state); _PyEval_StartTheWorld(interp); + // Deallocate any object from the refcount merge step + cleanup_worklist(&state->objs_to_decref); + // Call weakref callbacks and finalizers after unpausing other threads to // avoid potential deadlocks. call_weakref_callbacks(state); diff --git a/Python/object_stack.c b/Python/object_stack.c index 8544892..ced4460 100644 --- a/Python/object_stack.c +++ b/Python/object_stack.c @@ -68,6 +68,27 @@ _PyObjectStack_Clear(_PyObjectStack *queue) } void +_PyObjectStack_Merge(_PyObjectStack *dst, _PyObjectStack *src) +{ + if (src->head == NULL) { + return; + } + + if (dst->head != NULL) { + // First, append dst to the bottom of src + _PyObjectStackChunk *last = src->head; + while (last->prev != NULL) { + last = last->prev; + } + last->prev = dst->head; + } + + // Now that src has all the chunks, set dst to src + dst->head = src->head; + src->head = NULL; +} + +void _PyObjectStackChunk_ClearFreeList(_PyFreeListState *free_lists, int is_finalization) { if (!is_finalization) { diff --git a/Python/pystate.c b/Python/pystate.c index e77e5bf..6cd0347 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -611,6 +611,9 @@ init_interpreter(PyInterpreterState *interp, _PyGC_InitState(&interp->gc); PyConfig_InitPythonConfig(&interp->config); _PyType_InitCache(interp); +#ifdef Py_GIL_DISABLED + _Py_brc_init_state(interp); +#endif for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) { interp->monitors.tools[i] = 0; } @@ -1336,6 +1339,11 @@ init_threadstate(_PyThreadStateImpl *_tstate, tstate->datastack_limit = NULL; tstate->what_event = -1; +#ifdef Py_GIL_DISABLED + // Initialize biased reference counting inter-thread queue + _Py_brc_init_thread(tstate); +#endif + if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) { // Start in the suspended state if there is an ongoing stop-the-world. tstate->state = _Py_THREAD_SUSPENDED; @@ -1561,6 +1569,9 @@ PyThreadState_Clear(PyThreadState *tstate) _PyFreeListState *freelist_state = &((_PyThreadStateImpl*)tstate)->freelist_state; _Py_ClearFreeLists(freelist_state, 1); _PySlice_ClearCache(freelist_state); + + // Remove ourself from the biased reference counting table of threads. + _Py_brc_remove_thread(tstate); #endif _PyThreadState_ClearMimallocHeaps(tstate); |