diff options
-rw-r--r-- | Doc/c-api/init.rst | 44 | ||||
-rw-r--r-- | Doc/whatsnew/2.7.rst | 5 | ||||
-rw-r--r-- | Lib/test/test_capi.py | 67 | ||||
-rw-r--r-- | Modules/_testcapimodule.c | 38 |
4 files changed, 154 insertions, 0 deletions
diff --git a/Doc/c-api/init.rst b/Doc/c-api/init.rst index a4a202a..6ac696e 100644 --- a/Doc/c-api/init.rst +++ b/Doc/c-api/init.rst @@ -780,6 +780,50 @@ created. .. versionadded:: 2.3 + +Asynchronous Notifications +========================== + +A mechanism is provided to make asynchronous notifications to the the main +interpreter thread. These notifications take the form of a function +pointer and a void argument. + +.. index:: single: setcheckinterval() (in module sys) + +Every check interval, when the interpreter lock is released and reacquired, +python will also call any such provided functions. This can be used for +example by asynchronous IO handlers. The notification can be scheduled +from a worker thread and the actual call than made at the earliest +convenience by the main thread where it has possession of the global +interpreter lock and can perform any Python API calls. + +.. cfunction:: void Py_AddPendingCall( int (*func)(void *), void *arg) ) + + .. index:: single: Py_AddPendingCall() + + Post a notification to the Python main thread. If successful, + \*:attr`func` will be called with the argument :attr:`arg` at the earliest + convenience. \*:attr:`func` will be called having the global interpreter + lock held and can thus use the full Python API and can take any + action such as setting object attributes to signal IO completion. + It must return 0 on success, or -1 signalling an exception. + The notification function won't be interrupted to perform another + asynchronous notification recursively, + but it can still be interrupted to switch threads if the interpreter + lock is released, for example, if it calls back into python code. + + This function returns 0 on success in which case the notification has been + scheduled. Otherwise, for example if the notification buffer is full, + it returns -1 without setting any exception. + + This function can be called on any thread, be it a Python thread or + some other system thread. If it is a Python thread, it doesen't matter if + it holds the global interpreter lock or not. + + .. versionadded:: 2.7 + + + .. _profiling: Profiling and Tracing diff --git a/Doc/whatsnew/2.7.rst b/Doc/whatsnew/2.7.rst index 5daf12e..bdc364b 100644 --- a/Doc/whatsnew/2.7.rst +++ b/Doc/whatsnew/2.7.rst @@ -60,6 +60,11 @@ No release schedule has been decided yet for 2.7. .. ======================================================================== +Kristján Valur Jónsson, issue 4293 +Py_AddPendingCall is now thread safe. This allows any worker thread +to submit notifications to the python main thread. This is particularly +useful for asynchronous IO operations. + Other Language Changes ====================== diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py index eea41c1..17c4baf 100644 --- a/Lib/test/test_capi.py +++ b/Lib/test/test_capi.py @@ -2,9 +2,74 @@ # these are all functions _testcapi exports whose name begins with 'test_'. import sys +import time +import random +import unittest +import threading from test import test_support import _testcapi +class TestPendingCalls(unittest.TestCase): + + def pendingcalls_submit(self, l, n): + def callback(): + #this function can be interrupted by thread switching so let's + #use an atomic operation + l.append(None) + + for i in range(n): + time.sleep(random.random()*0.02) #0.01 secs on average + #try submitting callback until successful. + #rely on regular interrupt to flush queue if we are + #unsuccessful. + while True: + if _testcapi._pending_threadfunc(callback): + break; + + def pendingcalls_wait(self, l, n): + #now, stick around until l[0] has grown to 10 + count = 0; + while len(l) != n: + #this busy loop is where we expect to be interrupted to + #run our callbacks. Note that callbacks are only run on the + #main thread + if False and test_support.verbose: + print "(%i)"%(len(l),), + for i in xrange(1000): + a = i*i + count += 1 + self.failUnless(count < 10000, + "timeout waiting for %i callbacks, got %i"%(n, len(l))) + if False and test_support.verbose: + print "(%i)"%(len(l),) + + def test_pendingcalls_threaded(self): + l = [] + + #do every callback on a separate thread + n = 32 + threads = [] + for i in range(n): + t = threading.Thread(target=self.pendingcalls_submit, args = (l, 1)) + t.start() + threads.append(t) + + self.pendingcalls_wait(l, n) + + for t in threads: + t.join() + + def test_pendingcalls_non_threaded(self): + #again, just using the main thread, likely they will all be dispathced at + #once. It is ok to ask for too many, because we loop until we find a slot. + #the loop can be interrupted to dispatch. + #there are only 32 dispatch slots, so we go for twice that! + l = [] + n = 64 + self.pendingcalls_submit(l, n) + self.pendingcalls_wait(l, n) + + def test_main(): for name in dir(_testcapi): @@ -50,5 +115,7 @@ def test_main(): t.start() t.join() + test_support.run_unittest(TestPendingCalls) + if __name__ == "__main__": test_main() diff --git a/Modules/_testcapimodule.c b/Modules/_testcapimodule.c index 665d375..1475f72 100644 --- a/Modules/_testcapimodule.c +++ b/Modules/_testcapimodule.c @@ -837,6 +837,43 @@ test_thread_state(PyObject *self, PyObject *args) return NULL; Py_RETURN_NONE; } + +/* test Py_AddPendingCalls using threads */ +static int _pending_callback(void *arg) +{ + /* we assume the argument is callable object to which we own a reference */ + PyObject *callable = (PyObject *)arg; + PyObject *r = PyObject_CallObject(callable, NULL); + Py_DECREF(callable); + Py_XDECREF(r); + return r != NULL ? 0 : -1; +} + +/* The following requests n callbacks to _pending_callback. It can be + * run from any python thread. + */ +PyObject *pending_threadfunc(PyObject *self, PyObject *arg) +{ + PyObject *callable; + int r; + if (PyArg_ParseTuple(arg, "O", &callable) == 0) + return NULL; + + /* create the reference for the callbackwhile we hold the lock */ + Py_INCREF(callable); + + Py_BEGIN_ALLOW_THREADS + r = Py_AddPendingCall(&_pending_callback, callable); + Py_END_ALLOW_THREADS + + if (r<0) { + Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */ + Py_INCREF(Py_False); + return Py_False; + } + Py_INCREF(Py_True); + return Py_True; +} #endif /* Some tests of PyString_FromFormat(). This needs more tests. */ @@ -941,6 +978,7 @@ static PyMethodDef TestMethods[] = { #endif #ifdef WITH_THREAD {"_test_thread_state", test_thread_state, METH_VARARGS}, + {"_pending_threadfunc", pending_threadfunc, METH_VARARGS}, #endif {"traceback_print", traceback_print, METH_VARARGS}, {NULL, NULL} /* sentinel */ |