summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Include/cpython/ceval.h2
-rw-r--r--Include/internal/pycore_ceval.h3
-rw-r--r--Include/internal/pycore_ceval_state.h38
-rw-r--r--Include/internal/pycore_pystate.h8
-rw-r--r--Lib/test/support/threading_helper.py9
-rw-r--r--Lib/test/test_capi/test_misc.py401
-rw-r--r--Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst9
-rw-r--r--Modules/_queuemodule.c3
-rw-r--r--Modules/_testinternalcapi.c122
-rw-r--r--Modules/_threadmodule.c3
-rw-r--r--Modules/signalmodule.c6
-rw-r--r--Python/ceval.c55
-rw-r--r--Python/ceval_gil.c213
-rw-r--r--Python/pylifecycle.c3
-rw-r--r--Python/pystate.c3
-rw-r--r--Tools/c-analyzer/cpython/ignored.tsv1
16 files changed, 761 insertions, 118 deletions
diff --git a/Include/cpython/ceval.h b/Include/cpython/ceval.h
index 0fbbee1..a9616bd 100644
--- a/Include/cpython/ceval.h
+++ b/Include/cpython/ceval.h
@@ -22,6 +22,8 @@ PyAPI_FUNC(PyObject *) _PyEval_EvalFrameDefault(PyThreadState *tstate, struct _P
PyAPI_FUNC(void) _PyEval_SetSwitchInterval(unsigned long microseconds);
PyAPI_FUNC(unsigned long) _PyEval_GetSwitchInterval(void);
+PyAPI_FUNC(int) _PyEval_MakePendingCalls(PyThreadState *);
+
PyAPI_FUNC(Py_ssize_t) PyUnstable_Eval_RequestCodeExtraIndex(freefunc);
// Old name -- remove when this API changes:
_Py_DEPRECATED_EXTERNALLY(3.12) static inline Py_ssize_t
diff --git a/Include/internal/pycore_ceval.h b/Include/internal/pycore_ceval.h
index ca27037..9e9b523 100644
--- a/Include/internal/pycore_ceval.h
+++ b/Include/internal/pycore_ceval.h
@@ -27,7 +27,8 @@ PyAPI_FUNC(void) _PyEval_SignalReceived(PyInterpreterState *interp);
PyAPI_FUNC(int) _PyEval_AddPendingCall(
PyInterpreterState *interp,
int (*func)(void *),
- void *arg);
+ void *arg,
+ int mainthreadonly);
PyAPI_FUNC(void) _PyEval_SignalAsyncExc(PyInterpreterState *interp);
#ifdef HAVE_FORK
extern PyStatus _PyEval_ReInitThreads(PyThreadState *tstate);
diff --git a/Include/internal/pycore_ceval_state.h b/Include/internal/pycore_ceval_state.h
index 95d1fa1..e56e43c 100644
--- a/Include/internal/pycore_ceval_state.h
+++ b/Include/internal/pycore_ceval_state.h
@@ -13,6 +13,24 @@ extern "C" {
#include "pycore_gil.h" // struct _gil_runtime_state
+struct _pending_calls {
+ int busy;
+ PyThread_type_lock lock;
+ /* Request for running pending calls. */
+ _Py_atomic_int calls_to_do;
+ /* Request for looking at the `async_exc` field of the current
+ thread state.
+ Guarded by the GIL. */
+ int async_exc;
+#define NPENDINGCALLS 32
+ struct _pending_call {
+ int (*func)(void *);
+ void *arg;
+ } calls[NPENDINGCALLS];
+ int first;
+ int last;
+};
+
typedef enum {
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
@@ -49,6 +67,8 @@ struct _ceval_runtime_state {
the main thread of the main interpreter can handle signals: see
_Py_ThreadCanHandleSignals(). */
_Py_atomic_int signals_pending;
+ /* Pending calls to be made only on the main thread. */
+ struct _pending_calls pending_mainthread;
};
#ifdef PY_HAVE_PERF_TRAMPOLINE
@@ -62,24 +82,6 @@ struct _ceval_runtime_state {
#endif
-struct _pending_calls {
- int busy;
- PyThread_type_lock lock;
- /* Request for running pending calls. */
- _Py_atomic_int calls_to_do;
- /* Request for looking at the `async_exc` field of the current
- thread state.
- Guarded by the GIL. */
- int async_exc;
-#define NPENDINGCALLS 32
- struct {
- int (*func)(void *);
- void *arg;
- } calls[NPENDINGCALLS];
- int first;
- int last;
-};
-
struct _ceval_state {
/* This single variable consolidates all requests to break out of
the fast path in the eval loop. */
diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h
index daa40cf..43652c4 100644
--- a/Include/internal/pycore_pystate.h
+++ b/Include/internal/pycore_pystate.h
@@ -60,14 +60,6 @@ _Py_ThreadCanHandleSignals(PyInterpreterState *interp)
}
-/* Only execute pending calls on the main thread. */
-static inline int
-_Py_ThreadCanHandlePendingCalls(void)
-{
- return _Py_IsMainThread();
-}
-
-
/* Variable and static inline functions for in-line access to current thread
and interpreter state */
diff --git a/Lib/test/support/threading_helper.py b/Lib/test/support/threading_helper.py
index b9973c8..7f16050 100644
--- a/Lib/test/support/threading_helper.py
+++ b/Lib/test/support/threading_helper.py
@@ -115,7 +115,11 @@ def join_thread(thread, timeout=None):
@contextlib.contextmanager
def start_threads(threads, unlock=None):
- import faulthandler
+ try:
+ import faulthandler
+ except ImportError:
+ # It isn't supported on subinterpreters yet.
+ faulthandler = None
threads = list(threads)
started = []
try:
@@ -147,7 +151,8 @@ def start_threads(threads, unlock=None):
finally:
started = [t for t in started if t.is_alive()]
if started:
- faulthandler.dump_traceback(sys.stdout)
+ if faulthandler is not None:
+ faulthandler.dump_traceback(sys.stdout)
raise AssertionError('Unable to join %d threads' % len(started))
diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py
index 04a0f8f..58e1a83 100644
--- a/Lib/test/test_capi/test_misc.py
+++ b/Lib/test/test_capi/test_misc.py
@@ -2,17 +2,20 @@
# these are all functions _testcapi exports whose name begins with 'test_'.
import _thread
-from collections import OrderedDict
+from collections import OrderedDict, deque
import contextlib
import importlib.machinery
import importlib.util
+import json
import os
import pickle
+import queue
import random
import sys
import textwrap
import threading
import time
+import types
import unittest
import warnings
import weakref
@@ -36,6 +39,10 @@ try:
import _testsinglephase
except ImportError:
_testsinglephase = None
+try:
+ import _xxsubinterpreters as _interpreters
+except ModuleNotFoundError:
+ _interpreters = None
# Skip this test if the _testcapi module isn't available.
_testcapi = import_helper.import_module('_testcapi')
@@ -47,6 +54,12 @@ def decode_stderr(err):
return err.decode('utf-8', 'replace').replace('\r', '')
+def requires_subinterpreters(meth):
+ """Decorator to skip a test if subinterpreters are not supported."""
+ return unittest.skipIf(_interpreters is None,
+ 'subinterpreters required')(meth)
+
+
def testfunction(self):
"""some doc"""
return self
@@ -1259,6 +1272,10 @@ class TestHeapTypeRelative(unittest.TestCase):
class TestPendingCalls(unittest.TestCase):
+ # See the comment in ceval.c (at the "handle_eval_breaker" label)
+ # about when pending calls get run. This is especially relevant
+ # here for creating deterministic tests.
+
def pendingcalls_submit(self, l, n):
def callback():
#this function can be interrupted by thread switching so let's
@@ -1341,6 +1358,388 @@ class TestPendingCalls(unittest.TestCase):
gen = genf()
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
+ class PendingTask(types.SimpleNamespace):
+
+ _add_pending = _testinternalcapi.pending_threadfunc
+
+ def __init__(self, req, taskid=None, notify_done=None):
+ self.id = taskid
+ self.req = req
+ self.notify_done = notify_done
+
+ self.creator_tid = threading.get_ident()
+ self.requester_tid = None
+ self.runner_tid = None
+ self.result = None
+
+ def run(self):
+ assert self.result is None
+ self.runner_tid = threading.get_ident()
+ self._run()
+ if self.notify_done is not None:
+ self.notify_done()
+
+ def _run(self):
+ self.result = self.req
+
+ def run_in_pending_call(self, worker_tids):
+ assert self._add_pending is _testinternalcapi.pending_threadfunc
+ self.requester_tid = threading.get_ident()
+ def callback():
+ assert self.result is None
+ # It can be tricky to control which thread handles
+ # the eval breaker, so we take a naive approach to
+ # make sure.
+ if threading.get_ident() not in worker_tids:
+ self._add_pending(callback, ensure_added=True)
+ return
+ self.run()
+ self._add_pending(callback, ensure_added=True)
+
+ def create_thread(self, worker_tids):
+ return threading.Thread(
+ target=self.run_in_pending_call,
+ args=(worker_tids,),
+ )
+
+ def wait_for_result(self):
+ while self.result is None:
+ time.sleep(0.01)
+
+ def test_subthreads_can_handle_pending_calls(self):
+ payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!'
+
+ task = self.PendingTask(payload)
+ def do_the_work():
+ tid = threading.get_ident()
+ t = task.create_thread({tid})
+ with threading_helper.start_threads([t]):
+ task.wait_for_result()
+ t = threading.Thread(target=do_the_work)
+ with threading_helper.start_threads([t]):
+ pass
+
+ self.assertEqual(task.result, payload)
+
+ def test_many_subthreads_can_handle_pending_calls(self):
+ main_tid = threading.get_ident()
+ self.assertEqual(threading.main_thread().ident, main_tid)
+
+ # We can't use queue.Queue since it isn't reentrant relative
+ # to pending calls.
+ _queue = deque()
+ _active = deque()
+ _done_lock = threading.Lock()
+ def queue_put(task):
+ _queue.append(task)
+ _active.append(True)
+ def queue_get():
+ try:
+ task = _queue.popleft()
+ except IndexError:
+ raise queue.Empty
+ return task
+ def queue_task_done():
+ _active.pop()
+ if not _active:
+ try:
+ _done_lock.release()
+ except RuntimeError:
+ assert not _done_lock.locked()
+ def queue_empty():
+ return not _queue
+ def queue_join():
+ _done_lock.acquire()
+ _done_lock.release()
+
+ tasks = []
+ for i in range(20):
+ task = self.PendingTask(
+ req=f'request {i}',
+ taskid=i,
+ notify_done=queue_task_done,
+ )
+ tasks.append(task)
+ queue_put(task)
+ # This will be released once all the tasks have finished.
+ _done_lock.acquire()
+
+ def add_tasks(worker_tids):
+ while True:
+ if done:
+ return
+ try:
+ task = queue_get()
+ except queue.Empty:
+ break
+ task.run_in_pending_call(worker_tids)
+
+ done = False
+ def run_tasks():
+ while not queue_empty():
+ if done:
+ return
+ time.sleep(0.01)
+ # Give the worker a chance to handle any remaining pending calls.
+ while not done:
+ time.sleep(0.01)
+
+ # Start the workers and wait for them to finish.
+ worker_threads = [threading.Thread(target=run_tasks)
+ for _ in range(3)]
+ with threading_helper.start_threads(worker_threads):
+ try:
+ # Add a pending call for each task.
+ worker_tids = [t.ident for t in worker_threads]
+ threads = [threading.Thread(target=add_tasks, args=(worker_tids,))
+ for _ in range(3)]
+ with threading_helper.start_threads(threads):
+ try:
+ pass
+ except BaseException:
+ done = True
+ raise # re-raise
+ # Wait for the pending calls to finish.
+ queue_join()
+ # Notify the workers that they can stop.
+ done = True
+ except BaseException:
+ done = True
+ raise # re-raise
+ runner_tids = [t.runner_tid for t in tasks]
+
+ self.assertNotIn(main_tid, runner_tids)
+ for task in tasks:
+ with self.subTest(f'task {task.id}'):
+ self.assertNotEqual(task.requester_tid, main_tid)
+ self.assertNotEqual(task.requester_tid, task.runner_tid)
+ self.assertNotIn(task.requester_tid, runner_tids)
+
+ @requires_subinterpreters
+ def test_isolated_subinterpreter(self):
+ # We exercise the most important permutations.
+
+ # This test relies on pending calls getting called
+ # (eval breaker tripped) at each loop iteration
+ # and at each call.
+
+ maxtext = 250
+ main_interpid = 0
+ interpid = _interpreters.create()
+ _interpreters.run_string(interpid, f"""if True:
+ import json
+ import os
+ import threading
+ import time
+ import _testinternalcapi
+ from test.support import threading_helper
+ """)
+
+ def create_pipe():
+ r, w = os.pipe()
+ self.addCleanup(lambda: os.close(r))
+ self.addCleanup(lambda: os.close(w))
+ return r, w
+
+ with self.subTest('add in main, run in subinterpreter'):
+ r_ready, w_ready = create_pipe()
+ r_done, w_done= create_pipe()
+ timeout = time.time() + 30 # seconds
+
+ def do_work():
+ _interpreters.run_string(interpid, f"""if True:
+ # Wait until this interp has handled the pending call.
+ waiting = False
+ done = False
+ def wait(os_read=os.read):
+ global done, waiting
+ waiting = True
+ os_read({r_done}, 1)
+ done = True
+ t = threading.Thread(target=wait)
+ with threading_helper.start_threads([t]):
+ while not waiting:
+ pass
+ os.write({w_ready}, b'\\0')
+ # Loop to trigger the eval breaker.
+ while not done:
+ time.sleep(0.01)
+ if time.time() > {timeout}:
+ raise Exception('timed out!')
+ """)
+ t = threading.Thread(target=do_work)
+ with threading_helper.start_threads([t]):
+ os.read(r_ready, 1)
+ # Add the pending call and wait for it to finish.
+ actual = _testinternalcapi.pending_identify(interpid)
+ # Signal the subinterpreter to stop.
+ os.write(w_done, b'\0')
+
+ self.assertEqual(actual, int(interpid))
+
+ with self.subTest('add in main, run in subinterpreter sub-thread'):
+ r_ready, w_ready = create_pipe()
+ r_done, w_done= create_pipe()
+ timeout = time.time() + 30 # seconds
+
+ def do_work():
+ _interpreters.run_string(interpid, f"""if True:
+ waiting = False
+ done = False
+ def subthread():
+ while not waiting:
+ pass
+ os.write({w_ready}, b'\\0')
+ # Loop to trigger the eval breaker.
+ while not done:
+ time.sleep(0.01)
+ if time.time() > {timeout}:
+ raise Exception('timed out!')
+ t = threading.Thread(target=subthread)
+ with threading_helper.start_threads([t]):
+ # Wait until this interp has handled the pending call.
+ waiting = True
+ os.read({r_done}, 1)
+ done = True
+ """)
+ t = threading.Thread(target=do_work)
+ with threading_helper.start_threads([t]):
+ os.read(r_ready, 1)
+ # Add the pending call and wait for it to finish.
+ actual = _testinternalcapi.pending_identify(interpid)
+ # Signal the subinterpreter to stop.
+ os.write(w_done, b'\0')
+
+ self.assertEqual(actual, int(interpid))
+
+ with self.subTest('add in subinterpreter, run in main'):
+ r_ready, w_ready = create_pipe()
+ r_done, w_done= create_pipe()
+ r_data, w_data= create_pipe()
+ timeout = time.time() + 30 # seconds
+
+ def add_job():
+ os.read(r_ready, 1)
+ _interpreters.run_string(interpid, f"""if True:
+ # Add the pending call and wait for it to finish.
+ actual = _testinternalcapi.pending_identify({main_interpid})
+ # Signal the subinterpreter to stop.
+ os.write({w_done}, b'\\0')
+ os.write({w_data}, actual.to_bytes(1, 'little'))
+ """)
+ # Wait until this interp has handled the pending call.
+ waiting = False
+ done = False
+ def wait(os_read=os.read):
+ nonlocal done, waiting
+ waiting = True
+ os_read(r_done, 1)
+ done = True
+ t1 = threading.Thread(target=add_job)
+ t2 = threading.Thread(target=wait)
+ with threading_helper.start_threads([t1, t2]):
+ while not waiting:
+ pass
+ os.write(w_ready, b'\0')
+ # Loop to trigger the eval breaker.
+ while not done:
+ time.sleep(0.01)
+ if time.time() > timeout:
+ raise Exception('timed out!')
+ text = os.read(r_data, 1)
+ actual = int.from_bytes(text, 'little')
+
+ self.assertEqual(actual, int(main_interpid))
+
+ with self.subTest('add in subinterpreter, run in sub-thread'):
+ r_ready, w_ready = create_pipe()
+ r_done, w_done= create_pipe()
+ r_data, w_data= create_pipe()
+ timeout = time.time() + 30 # seconds
+
+ def add_job():
+ os.read(r_ready, 1)
+ _interpreters.run_string(interpid, f"""if True:
+ # Add the pending call and wait for it to finish.
+ actual = _testinternalcapi.pending_identify({main_interpid})
+ # Signal the subinterpreter to stop.
+ os.write({w_done}, b'\\0')
+ os.write({w_data}, actual.to_bytes(1, 'little'))
+ """)
+ # Wait until this interp has handled the pending call.
+ waiting = False
+ done = False
+ def wait(os_read=os.read):
+ nonlocal done, waiting
+ waiting = True
+ os_read(r_done, 1)
+ done = True
+ def subthread():
+ while not waiting:
+ pass
+ os.write(w_ready, b'\0')
+ # Loop to trigger the eval breaker.
+ while not done:
+ time.sleep(0.01)
+ if time.time() > timeout:
+ raise Exception('timed out!')
+ t1 = threading.Thread(target=add_job)
+ t2 = threading.Thread(target=wait)
+ t3 = threading.Thread(target=subthread)
+ with threading_helper.start_threads([t1, t2, t3]):
+ pass
+ text = os.read(r_data, 1)
+ actual = int.from_bytes(text, 'little')
+
+ self.assertEqual(actual, int(main_interpid))
+
+ # XXX We can't use the rest until gh-105716 is fixed.
+ return
+
+ with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'):
+ r_ready, w_ready = create_pipe()
+ r_done, w_done= create_pipe()
+ r_data, w_data= create_pipe()
+ timeout = time.time() + 30 # seconds
+
+ def do_work():
+ _interpreters.run_string(interpid, f"""if True:
+ waiting = False
+ done = False
+ def subthread():
+ while not waiting:
+ pass
+ os.write({w_ready}, b'\\0')
+ # Loop to trigger the eval breaker.
+ while not done:
+ time.sleep(0.01)
+ if time.time() > {timeout}:
+ raise Exception('timed out!')
+ t = threading.Thread(target=subthread)
+ with threading_helper.start_threads([t]):
+ # Wait until this interp has handled the pending call.
+ waiting = True
+ os.read({r_done}, 1)
+ done = True
+ """)
+ t = threading.Thread(target=do_work)
+ #with threading_helper.start_threads([t]):
+ t.start()
+ if True:
+ os.read(r_ready, 1)
+ _interpreters.run_string(interpid, f"""if True:
+ # Add the pending call and wait for it to finish.
+ actual = _testinternalcapi.pending_identify({interpid})
+ # Signal the subinterpreter to stop.
+ os.write({w_done}, b'\\0')
+ os.write({w_data}, actual.to_bytes(1, 'little'))
+ """)
+ t.join()
+ text = os.read(r_data, 1)
+ actual = int.from_bytes(text, 'little')
+
+ self.assertEqual(actual, int(interpid))
+
class SubinterpreterTest(unittest.TestCase):
diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst
new file mode 100644
index 0000000..da29a8c
--- /dev/null
+++ b/Misc/NEWS.d/next/Core and Builtins/2023-06-02-15-15-41.gh-issue-104812.dfZiG5.rst
@@ -0,0 +1,9 @@
+The "pending call" machinery now works for all interpreters, not just the
+main interpreter, and runs in all threads, not just the main thread. Some
+calls are still only done in the main thread, ergo in the main interpreter.
+This change does not affect signal handling nor the existing public C-API
+(``Py_AddPendingCall()``), which both still only target the main thread.
+The new functionality is meant strictly for internal use for now, since
+consequences of its use are not well understood yet outside some very
+restricted cases. This change brings the capability in line with the
+intention when the state was made per-interpreter several years ago.
diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c
index d36a911..db5be84 100644
--- a/Modules/_queuemodule.c
+++ b/Modules/_queuemodule.c
@@ -210,6 +210,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
PyObject *item;
PyLockStatus r;
PY_TIMEOUT_T microseconds;
+ PyThreadState *tstate = PyThreadState_Get();
if (block == 0) {
/* Non-blocking */
@@ -253,7 +254,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
Py_END_ALLOW_THREADS
}
- if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) {
+ if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
return NULL;
}
if (r == PY_LOCK_FAILURE) {
diff --git a/Modules/_testinternalcapi.c b/Modules/_testinternalcapi.c
index b43dc7f..3de32a3 100644
--- a/Modules/_testinternalcapi.c
+++ b/Modules/_testinternalcapi.c
@@ -13,16 +13,18 @@
#include "Python.h"
#include "frameobject.h"
+#include "interpreteridobject.h" // _PyInterpreterID_LookUp()
#include "pycore_atomic_funcs.h" // _Py_atomic_int_get()
#include "pycore_bitutils.h" // _Py_bswap32()
#include "pycore_compile.h" // _PyCompile_CodeGen, _PyCompile_OptimizeCfg, _PyCompile_Assemble
+#include "pycore_ceval.h" // _PyEval_AddPendingCall
#include "pycore_fileutils.h" // _Py_normpath
#include "pycore_frame.h" // _PyInterpreterFrame
#include "pycore_gc.h" // PyGC_Head
#include "pycore_hashtable.h" // _Py_hashtable_new()
#include "pycore_initconfig.h" // _Py_GetConfigsAsDict()
-#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
#include "pycore_interp.h" // _PyInterpreterState_GetConfigCopy()
+#include "pycore_pathconfig.h" // _PyPathConfig_ClearGlobal()
#include "pycore_pyerrors.h" // _Py_UTF8_Edit_Cost()
#include "pycore_pystate.h" // _PyThreadState_GET()
#include "osdefs.h" // MAXPATHLEN
@@ -838,6 +840,120 @@ set_optimizer(PyObject *self, PyObject *opt)
Py_RETURN_NONE;
}
+
+static int _pending_callback(void *arg)
+{
+ /* we assume the argument is callable object to which we own a reference */
+ PyObject *callable = (PyObject *)arg;
+ PyObject *r = PyObject_CallNoArgs(callable);
+ Py_DECREF(callable);
+ Py_XDECREF(r);
+ return r != NULL ? 0 : -1;
+}
+
+/* The following requests n callbacks to _pending_callback. It can be
+ * run from any python thread.
+ */
+static PyObject *
+pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
+{
+ PyObject *callable;
+ int ensure_added = 0;
+ static char *kwlist[] = {"", "ensure_added", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs,
+ "O|$p:pending_threadfunc", kwlist,
+ &callable, &ensure_added))
+ {
+ return NULL;
+ }
+ PyInterpreterState *interp = PyInterpreterState_Get();
+
+ /* create the reference for the callbackwhile we hold the lock */
+ Py_INCREF(callable);
+
+ int r;
+ Py_BEGIN_ALLOW_THREADS
+ r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
+ Py_END_ALLOW_THREADS
+ if (r < 0) {
+ /* unsuccessful add */
+ if (!ensure_added) {
+ Py_DECREF(callable);
+ Py_RETURN_FALSE;
+ }
+ do {
+ Py_BEGIN_ALLOW_THREADS
+ r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
+ Py_END_ALLOW_THREADS
+ } while (r < 0);
+ }
+
+ Py_RETURN_TRUE;
+}
+
+
+static struct {
+ int64_t interpid;
+} pending_identify_result;
+
+static int
+_pending_identify_callback(void *arg)
+{
+ PyThread_type_lock mutex = (PyThread_type_lock)arg;
+ assert(pending_identify_result.interpid == -1);
+ PyThreadState *tstate = PyThreadState_Get();
+ pending_identify_result.interpid = PyInterpreterState_GetID(tstate->interp);
+ PyThread_release_lock(mutex);
+ return 0;
+}
+
+static PyObject *
+pending_identify(PyObject *self, PyObject *args)
+{
+ PyObject *interpid;
+ if (!PyArg_ParseTuple(args, "O:pending_identify", &interpid)) {
+ return NULL;
+ }
+ PyInterpreterState *interp = _PyInterpreterID_LookUp(interpid);
+ if (interp == NULL) {
+ if (!PyErr_Occurred()) {
+ PyErr_SetString(PyExc_ValueError, "interpreter not found");
+ }
+ return NULL;
+ }
+
+ pending_identify_result.interpid = -1;
+
+ PyThread_type_lock mutex = PyThread_allocate_lock();
+ if (mutex == NULL) {
+ return NULL;
+ }
+ PyThread_acquire_lock(mutex, WAIT_LOCK);
+ /* It gets released in _pending_identify_callback(). */
+
+ int r;
+ do {
+ Py_BEGIN_ALLOW_THREADS
+ r = _PyEval_AddPendingCall(interp,
+ &_pending_identify_callback, (void *)mutex,
+ 0);
+ Py_END_ALLOW_THREADS
+ } while (r < 0);
+
+ /* Wait for the pending call to complete. */
+ PyThread_acquire_lock(mutex, WAIT_LOCK);
+ PyThread_release_lock(mutex);
+ PyThread_free_lock(mutex);
+
+ PyObject *res = PyLong_FromLongLong(pending_identify_result.interpid);
+ pending_identify_result.interpid = -1;
+ if (res == NULL) {
+ return NULL;
+ }
+ return res;
+}
+
+
static PyMethodDef module_functions[] = {
{"get_configs", get_configs, METH_NOARGS},
{"get_recursion_depth", get_recursion_depth, METH_NOARGS},
@@ -868,6 +984,10 @@ static PyMethodDef module_functions[] = {
{"iframe_getlasti", iframe_getlasti, METH_O, NULL},
{"set_optimizer", set_optimizer, METH_O, NULL},
{"get_counter_optimizer", get_counter_optimizer, METH_NOARGS, NULL},
+ {"pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
+ METH_VARARGS | METH_KEYWORDS},
+// {"pending_fd_identify", pending_fd_identify, METH_VARARGS, NULL},
+ {"pending_identify", pending_identify, METH_VARARGS, NULL},
{NULL, NULL} /* sentinel */
};
diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c
index b6f878e..c553d03 100644
--- a/Modules/_threadmodule.c
+++ b/Modules/_threadmodule.c
@@ -81,6 +81,7 @@ lock_dealloc(lockobject *self)
static PyLockStatus
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
{
+ PyThreadState *tstate = _PyThreadState_GET();
_PyTime_t endtime = 0;
if (timeout > 0) {
endtime = _PyDeadline_Init(timeout);
@@ -103,7 +104,7 @@ acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
/* Run signal handlers if we were interrupted. Propagate
* exceptions from signal handlers, such as KeyboardInterrupt, by
* passing up PY_LOCK_INTR. */
- if (Py_MakePendingCalls() < 0) {
+ if (_PyEval_MakePendingCalls(tstate) < 0) {
return PY_LOCK_INTR;
}
diff --git a/Modules/signalmodule.c b/Modules/signalmodule.c
index 2350236..00ea434 100644
--- a/Modules/signalmodule.c
+++ b/Modules/signalmodule.c
@@ -314,7 +314,8 @@ trip_signal(int sig_num)
still use it for this exceptional case. */
_PyEval_AddPendingCall(interp,
report_wakeup_send_error,
- (void *)(intptr_t) last_error);
+ (void *)(intptr_t) last_error,
+ 1);
}
}
}
@@ -333,7 +334,8 @@ trip_signal(int sig_num)
still use it for this exceptional case. */
_PyEval_AddPendingCall(interp,
report_wakeup_write_error,
- (void *)(intptr_t)errno);
+ (void *)(intptr_t)errno,
+ 1);
}
}
}
diff --git a/Python/ceval.c b/Python/ceval.c
index e81b6be..b91f94d 100644
--- a/Python/ceval.c
+++ b/Python/ceval.c
@@ -758,6 +758,61 @@ handle_eval_breaker:
* We need to do reasonably frequently, but not too frequently.
* All loops should include a check of the eval breaker.
* We also check on return from any builtin function.
+ *
+ * ## More Details ###
+ *
+ * The eval loop (this function) normally executes the instructions
+ * of a code object sequentially. However, the runtime supports a
+ * number of out-of-band execution scenarios that may pause that
+ * sequential execution long enough to do that out-of-band work
+ * in the current thread using the current PyThreadState.
+ *
+ * The scenarios include:
+ *
+ * - cyclic garbage collection
+ * - GIL drop requests
+ * - "async" exceptions
+ * - "pending calls" (some only in the main thread)
+ * - signal handling (only in the main thread)
+ *
+ * When the need for one of the above is detected, the eval loop
+ * pauses long enough to handle the detected case. Then, if doing
+ * so didn't trigger an exception, the eval loop resumes executing
+ * the sequential instructions.
+ *
+ * To make this work, the eval loop periodically checks if any
+ * of the above needs to happen. The individual checks can be
+ * expensive if computed each time, so a while back we switched
+ * to using pre-computed, per-interpreter variables for the checks,
+ * and later consolidated that to a single "eval breaker" variable
+ * (now a PyInterpreterState field).
+ *
+ * For the longest time, the eval breaker check would happen
+ * frequently, every 5 or so times through the loop, regardless
+ * of what instruction ran last or what would run next. Then, in
+ * early 2021 (gh-18334, commit 4958f5d), we switched to checking
+ * the eval breaker less frequently, by hard-coding the check to
+ * specific places in the eval loop (e.g. certain instructions).
+ * The intent then was to check after returning from calls
+ * and on the back edges of loops.
+ *
+ * In addition to being more efficient, that approach keeps
+ * the eval loop from running arbitrary code between instructions
+ * that don't handle that well. (See gh-74174.)
+ *
+ * Currently, the eval breaker check happens here at the
+ * "handle_eval_breaker" label. Some instructions come here
+ * explicitly (goto) and some indirectly. Notably, the check
+ * happens on back edges in the control flow graph, which
+ * pretty much applies to all loops and most calls.
+ * (See bytecodes.c for exact information.)
+ *
+ * One consequence of this approach is that it might not be obvious
+ * how to force any specific thread to pick up the eval breaker,
+ * or for any specific thread to not pick it up. Mostly this
+ * involves judicious uses of locks and careful ordering of code,
+ * while avoiding code that might trigger the eval breaker
+ * until so desired.
*/
if (_Py_HandlePending(tstate) != 0) {
goto error;
diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c
index 723cf0f..bb1279f 100644
--- a/Python/ceval_gil.c
+++ b/Python/ceval_gil.c
@@ -68,8 +68,9 @@ COMPUTE_EVAL_BREAKER(PyInterpreterState *interp,
_Py_atomic_load_relaxed_int32(&ceval2->gil_drop_request)
| (_Py_atomic_load_relaxed_int32(&ceval->signals_pending)
&& _Py_ThreadCanHandleSignals(interp))
- | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do)
- && _Py_ThreadCanHandlePendingCalls())
+ | (_Py_atomic_load_relaxed_int32(&ceval2->pending.calls_to_do))
+ | (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)
+ &&_Py_atomic_load_relaxed_int32(&ceval->pending_mainthread.calls_to_do))
| ceval2->pending.async_exc
| _Py_atomic_load_relaxed_int32(&ceval2->gc_scheduled));
}
@@ -95,11 +96,11 @@ RESET_GIL_DROP_REQUEST(PyInterpreterState *interp)
static inline void
-SIGNAL_PENDING_CALLS(PyInterpreterState *interp)
+SIGNAL_PENDING_CALLS(struct _pending_calls *pending, PyInterpreterState *interp)
{
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
struct _ceval_state *ceval2 = &interp->ceval;
- _Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 1);
+ _Py_atomic_store_relaxed(&pending->calls_to_do, 1);
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
}
@@ -109,6 +110,9 @@ UNSIGNAL_PENDING_CALLS(PyInterpreterState *interp)
{
struct _ceval_runtime_state *ceval = &interp->runtime->ceval;
struct _ceval_state *ceval2 = &interp->ceval;
+ if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
+ _Py_atomic_store_relaxed(&ceval->pending_mainthread.calls_to_do, 0);
+ }
_Py_atomic_store_relaxed(&ceval2->pending.calls_to_do, 0);
COMPUTE_EVAL_BREAKER(interp, ceval, ceval2);
}
@@ -803,19 +807,31 @@ _push_pending_call(struct _pending_calls *pending,
return 0;
}
-/* Pop one item off the queue while holding the lock. */
-static void
-_pop_pending_call(struct _pending_calls *pending,
- int (**func)(void *), void **arg)
+static int
+_next_pending_call(struct _pending_calls *pending,
+ int (**func)(void *), void **arg)
{
int i = pending->first;
if (i == pending->last) {
- return; /* Queue empty */
+ /* Queue empty */
+ assert(pending->calls[i].func == NULL);
+ return -1;
}
-
*func = pending->calls[i].func;
*arg = pending->calls[i].arg;
- pending->first = (i + 1) % NPENDINGCALLS;
+ return i;
+}
+
+/* Pop one item off the queue while holding the lock. */
+static void
+_pop_pending_call(struct _pending_calls *pending,
+ int (**func)(void *), void **arg)
+{
+ int i = _next_pending_call(pending, func, arg);
+ if (i >= 0) {
+ pending->calls[i] = (struct _pending_call){0};
+ pending->first = (i + 1) % NPENDINGCALLS;
+ }
}
/* This implementation is thread-safe. It allows
@@ -825,9 +841,16 @@ _pop_pending_call(struct _pending_calls *pending,
int
_PyEval_AddPendingCall(PyInterpreterState *interp,
- int (*func)(void *), void *arg)
+ int (*func)(void *), void *arg,
+ int mainthreadonly)
{
+ assert(!mainthreadonly || _Py_IsMainInterpreter(interp));
struct _pending_calls *pending = &interp->ceval.pending;
+ if (mainthreadonly) {
+ /* The main thread only exists in the main interpreter. */
+ assert(_Py_IsMainInterpreter(interp));
+ pending = &_PyRuntime.ceval.pending_mainthread;
+ }
/* Ensure that _PyEval_InitState() was called
and that _PyEval_FiniState() is not called yet. */
assert(pending->lock != NULL);
@@ -837,39 +860,17 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
PyThread_release_lock(pending->lock);
/* signal main loop */
- SIGNAL_PENDING_CALLS(interp);
+ SIGNAL_PENDING_CALLS(pending, interp);
return result;
}
int
Py_AddPendingCall(int (*func)(void *), void *arg)
{
- /* Best-effort to support subinterpreters and calls with the GIL released.
-
- First attempt _PyThreadState_GET() since it supports subinterpreters.
-
- If the GIL is released, _PyThreadState_GET() returns NULL . In this
- case, use PyGILState_GetThisThreadState() which works even if the GIL
- is released.
-
- Sadly, PyGILState_GetThisThreadState() doesn't support subinterpreters:
- see bpo-10915 and bpo-15751.
-
- Py_AddPendingCall() doesn't require the caller to hold the GIL. */
- PyThreadState *tstate = _PyThreadState_GET();
- if (tstate == NULL) {
- tstate = PyGILState_GetThisThreadState();
- }
-
- PyInterpreterState *interp;
- if (tstate != NULL) {
- interp = tstate->interp;
- }
- else {
- /* Last resort: use the main interpreter */
- interp = _PyInterpreterState_Main();
- }
- return _PyEval_AddPendingCall(interp, func, arg);
+ /* Legacy users of this API will continue to target the main thread
+ (of the main interpreter). */
+ PyInterpreterState *interp = _PyInterpreterState_Main();
+ return _PyEval_AddPendingCall(interp, func, arg, 1);
}
static int
@@ -889,27 +890,24 @@ handle_signals(PyThreadState *tstate)
return 0;
}
-static int
-make_pending_calls(PyInterpreterState *interp)
+static inline int
+maybe_has_pending_calls(PyInterpreterState *interp)
{
- /* only execute pending calls on main thread */
- if (!_Py_ThreadCanHandlePendingCalls()) {
- return 0;
+ struct _pending_calls *pending = &interp->ceval.pending;
+ if (_Py_atomic_load_relaxed_int32(&pending->calls_to_do)) {
+ return 1;
}
-
- /* don't perform recursive pending calls */
- if (interp->ceval.pending.busy) {
+ if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(interp)) {
return 0;
}
- interp->ceval.pending.busy = 1;
-
- /* unsignal before starting to call callbacks, so that any callback
- added in-between re-signals */
- UNSIGNAL_PENDING_CALLS(interp);
- int res = 0;
+ pending = &_PyRuntime.ceval.pending_mainthread;
+ return _Py_atomic_load_relaxed_int32(&pending->calls_to_do);
+}
+static int
+_make_pending_calls(struct _pending_calls *pending)
+{
/* perform a bounded number of calls, in case of recursion */
- struct _pending_calls *pending = &interp->ceval.pending;
for (int i=0; i<NPENDINGCALLS; i++) {
int (*func)(void *) = NULL;
void *arg = NULL;
@@ -923,19 +921,61 @@ make_pending_calls(PyInterpreterState *interp)
if (func == NULL) {
break;
}
- res = func(arg);
- if (res) {
- goto error;
+ if (func(arg) != 0) {
+ return -1;
}
}
+ return 0;
+}
+
+static int
+make_pending_calls(PyInterpreterState *interp)
+{
+ struct _pending_calls *pending = &interp->ceval.pending;
+ struct _pending_calls *pending_main = &_PyRuntime.ceval.pending_mainthread;
+
+ /* Only one thread (per interpreter) may run the pending calls
+ at once. In the same way, we don't do recursive pending calls. */
+ PyThread_acquire_lock(pending->lock, WAIT_LOCK);
+ if (pending->busy) {
+ /* A pending call was added after another thread was already
+ handling the pending calls (and had already "unsignaled").
+ Once that thread is done, it may have taken care of all the
+ pending calls, or there might be some still waiting.
+ Regardless, this interpreter's pending calls will stay
+ "signaled" until that first thread has finished. At that
+ point the next thread to trip the eval breaker will take
+ care of any remaining pending calls. Until then, though,
+ all the interpreter's threads will be tripping the eval
+ breaker every time it's checked. */
+ PyThread_release_lock(pending->lock);
+ return 0;
+ }
+ pending->busy = 1;
+ PyThread_release_lock(pending->lock);
+
+ /* unsignal before starting to call callbacks, so that any callback
+ added in-between re-signals */
+ UNSIGNAL_PENDING_CALLS(interp);
+
+ if (_make_pending_calls(pending) != 0) {
+ pending->busy = 0;
+ /* There might not be more calls to make, but we play it safe. */
+ SIGNAL_PENDING_CALLS(pending, interp);
+ return -1;
+ }
- interp->ceval.pending.busy = 0;
- return res;
+ if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
+ if (_make_pending_calls(pending_main) != 0) {
+ pending->busy = 0;
+ /* There might not be more calls to make, but we play it safe. */
+ SIGNAL_PENDING_CALLS(pending_main, interp);
+ return -1;
+ }
+ }
-error:
- interp->ceval.pending.busy = 0;
- SIGNAL_PENDING_CALLS(interp);
- return res;
+ pending->busy = 0;
+ return 0;
}
void
@@ -944,12 +984,6 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
assert(PyGILState_Check());
assert(is_tstate_valid(tstate));
- struct _pending_calls *pending = &tstate->interp->ceval.pending;
-
- if (!_Py_atomic_load_relaxed_int32(&(pending->calls_to_do))) {
- return;
- }
-
if (make_pending_calls(tstate->interp) < 0) {
PyObject *exc = _PyErr_GetRaisedException(tstate);
PyErr_BadInternalCall();
@@ -958,6 +992,29 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
}
}
+int
+_PyEval_MakePendingCalls(PyThreadState *tstate)
+{
+ int res;
+
+ if (_Py_IsMainThread() && _Py_IsMainInterpreter(tstate->interp)) {
+ /* Python signal handler doesn't really queue a callback:
+ it only signals that a signal was received,
+ see _PyEval_SignalReceived(). */
+ res = handle_signals(tstate);
+ if (res != 0) {
+ return res;
+ }
+ }
+
+ res = make_pending_calls(tstate->interp);
+ if (res != 0) {
+ return res;
+ }
+
+ return 0;
+}
+
/* Py_MakePendingCalls() is a simple wrapper for the sake
of backward-compatibility. */
int
@@ -968,19 +1025,11 @@ Py_MakePendingCalls(void)
PyThreadState *tstate = _PyThreadState_GET();
assert(is_tstate_valid(tstate));
- /* Python signal handler doesn't really queue a callback: it only signals
- that a signal was received, see _PyEval_SignalReceived(). */
- int res = handle_signals(tstate);
- if (res != 0) {
- return res;
- }
-
- res = make_pending_calls(tstate->interp);
- if (res != 0) {
- return res;
+ /* Only execute pending calls on the main thread. */
+ if (!_Py_IsMainThread() || !_Py_IsMainInterpreter(tstate->interp)) {
+ return 0;
}
-
- return 0;
+ return _PyEval_MakePendingCalls(tstate);
}
void
@@ -1020,7 +1069,7 @@ _Py_HandlePending(PyThreadState *tstate)
}
/* Pending calls */
- if (_Py_atomic_load_relaxed_int32(&interp_ceval_state->pending.calls_to_do)) {
+ if (maybe_has_pending_calls(tstate->interp)) {
if (make_pending_calls(tstate->interp) != 0) {
return -1;
}
diff --git a/Python/pylifecycle.c b/Python/pylifecycle.c
index 9ae03d4..d45b739 100644
--- a/Python/pylifecycle.c
+++ b/Python/pylifecycle.c
@@ -2152,6 +2152,9 @@ Py_EndInterpreter(PyThreadState *tstate)
// Wrap up existing "threading"-module-created, non-daemon threads.
wait_for_thread_shutdown(tstate);
+ // Make any remaining pending calls.
+ _Py_FinishPendingCalls(tstate);
+
_PyAtExit_Call(tstate->interp);
if (tstate != interp->threads.head || tstate->next != NULL) {
diff --git a/Python/pystate.c b/Python/pystate.c
index fb95825..52d6b29 100644
--- a/Python/pystate.c
+++ b/Python/pystate.c
@@ -380,7 +380,7 @@ _Py_COMP_DIAG_IGNORE_DEPR_DECLS
static const _PyRuntimeState initial = _PyRuntimeState_INIT(_PyRuntime);
_Py_COMP_DIAG_POP
-#define NUMLOCKS 8
+#define NUMLOCKS 9
#define LOCKS_INIT(runtime) \
{ \
&(runtime)->interpreters.mutex, \
@@ -388,6 +388,7 @@ _Py_COMP_DIAG_POP
&(runtime)->getargs.mutex, \
&(runtime)->unicode_state.ids.lock, \
&(runtime)->imports.extensions.mutex, \
+ &(runtime)->ceval.pending_mainthread.lock, \
&(runtime)->atexit.mutex, \
&(runtime)->audit_hooks.mutex, \
&(runtime)->allocators.mutex, \
diff --git a/Tools/c-analyzer/cpython/ignored.tsv b/Tools/c-analyzer/cpython/ignored.tsv
index 607976f..87d9b39 100644
--- a/Tools/c-analyzer/cpython/ignored.tsv
+++ b/Tools/c-analyzer/cpython/ignored.tsv
@@ -517,6 +517,7 @@ Modules/_testcapimodule.c - g_type_watchers_installed -
Modules/_testimportmultiple.c - _barmodule -
Modules/_testimportmultiple.c - _foomodule -
Modules/_testimportmultiple.c - _testimportmultiple -
+Modules/_testinternalcapi.c - pending_identify_result -
Modules/_testmultiphase.c - Example_Type_slots -
Modules/_testmultiphase.c - Example_Type_spec -
Modules/_testmultiphase.c - Example_methods -