diff options
author | Eric Snow <ericsnowcurrently@gmail.com> | 2023-10-17 23:05:49 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-17 23:05:49 (GMT) |
commit | c58c63fdf615a1c2bfc995dd0b938d82e32b6cde (patch) | |
tree | c55bd273d26fdcaf30013a44a7bf973940771a23 /Modules/_xxinterpchannelsmodule.c | |
parent | 7029c1a1c5b864056aa00298b1d0e0269f073f99 (diff) | |
download | cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.zip cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.tar.gz cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.tar.bz2 |
gh-84570: Add Timeouts to SendChannel.send() and RecvChannel.recv() (gh-110567)
Diffstat (limited to 'Modules/_xxinterpchannelsmodule.c')
-rw-r--r-- | Modules/_xxinterpchannelsmodule.c | 43 |
1 files changed, 26 insertions, 17 deletions
diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index be53cbf..2e2878d 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -242,9 +242,8 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared, } static int -wait_for_lock(PyThread_type_lock mutex) +wait_for_lock(PyThread_type_lock mutex, PY_TIMEOUT_T timeout) { - PY_TIMEOUT_T timeout = PyThread_UNSET_TIMEOUT; PyLockStatus res = PyThread_acquire_lock_timed_with_retries(mutex, timeout); if (res == PY_LOCK_INTR) { /* KeyboardInterrupt, etc. */ @@ -1883,7 +1882,8 @@ _channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting) } static int -_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj) +_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, + PY_TIMEOUT_T timeout) { // We use a stack variable here, so we must ensure that &waiting // is not held by any channel item at the point this function exits. @@ -1901,7 +1901,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj) } /* Wait until the object is received. */ - if (wait_for_lock(waiting.mutex) < 0) { + if (wait_for_lock(waiting.mutex, timeout) < 0) { assert(PyErr_Occurred()); _waiting_finish_releasing(&waiting); /* The send() call is failing now, so make sure the item @@ -2816,25 +2816,29 @@ receive end."); static PyObject * channel_send(PyObject *self, PyObject *args, PyObject *kwds) { - // XXX Add a timeout arg. - static char *kwlist[] = {"cid", "obj", "blocking", NULL}; - int64_t cid; + static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; int blocking = 1; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$p:channel_send", kwlist, + PyObject *timeout_obj = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist, channel_id_converter, &cid_data, &obj, - &blocking)) { + &blocking, &timeout_obj)) { + return NULL; + } + + int64_t cid = cid_data.cid; + PY_TIMEOUT_T timeout; + if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) { return NULL; } - cid = cid_data.cid; /* Queue up the object. */ int err = 0; if (blocking) { - err = _channel_send_wait(&_globals.channels, cid, obj); + err = _channel_send_wait(&_globals.channels, cid, obj, timeout); } else { err = _channel_send(&_globals.channels, cid, obj, NULL); @@ -2855,20 +2859,25 @@ By default this waits for the object to be received."); static PyObject * channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", "blocking", NULL}; - int64_t cid; + static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; int blocking = 1; + PyObject *timeout_obj = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O&O|$p:channel_send_buffer", kwlist, + "O&O|$pO:channel_send_buffer", kwlist, channel_id_converter, &cid_data, &obj, - &blocking)) { + &blocking, &timeout_obj)) { + return NULL; + } + + int64_t cid = cid_data.cid; + PY_TIMEOUT_T timeout; + if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) { return NULL; } - cid = cid_data.cid; PyObject *tempobj = PyMemoryView_FromObject(obj); if (tempobj == NULL) { @@ -2878,7 +2887,7 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) /* Queue up the object. */ int err = 0; if (blocking) { - err = _channel_send_wait(&_globals.channels, cid, tempobj); + err = _channel_send_wait(&_globals.channels, cid, tempobj, timeout); } else { err = _channel_send(&_globals.channels, cid, tempobj, NULL); |