summaryrefslogtreecommitdiffstats
path: root/Modules/_xxinterpchannelsmodule.c
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2023-10-17 23:05:49 (GMT)
committerGitHub <noreply@github.com>2023-10-17 23:05:49 (GMT)
commitc58c63fdf615a1c2bfc995dd0b938d82e32b6cde (patch)
treec55bd273d26fdcaf30013a44a7bf973940771a23 /Modules/_xxinterpchannelsmodule.c
parent7029c1a1c5b864056aa00298b1d0e0269f073f99 (diff)
downloadcpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.zip
cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.tar.gz
cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.tar.bz2
gh-84570: Add Timeouts to SendChannel.send() and RecvChannel.recv() (gh-110567)
Diffstat (limited to 'Modules/_xxinterpchannelsmodule.c')
-rw-r--r--Modules/_xxinterpchannelsmodule.c43
1 files changed, 26 insertions, 17 deletions
diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c
index be53cbf..2e2878d 100644
--- a/Modules/_xxinterpchannelsmodule.c
+++ b/Modules/_xxinterpchannelsmodule.c
@@ -242,9 +242,8 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared,
}
static int
-wait_for_lock(PyThread_type_lock mutex)
+wait_for_lock(PyThread_type_lock mutex, PY_TIMEOUT_T timeout)
{
- PY_TIMEOUT_T timeout = PyThread_UNSET_TIMEOUT;
PyLockStatus res = PyThread_acquire_lock_timed_with_retries(mutex, timeout);
if (res == PY_LOCK_INTR) {
/* KeyboardInterrupt, etc. */
@@ -1883,7 +1882,8 @@ _channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
}
static int
-_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
+_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
+ PY_TIMEOUT_T timeout)
{
// We use a stack variable here, so we must ensure that &waiting
// is not held by any channel item at the point this function exits.
@@ -1901,7 +1901,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
}
/* Wait until the object is received. */
- if (wait_for_lock(waiting.mutex) < 0) {
+ if (wait_for_lock(waiting.mutex, timeout) < 0) {
assert(PyErr_Occurred());
_waiting_finish_releasing(&waiting);
/* The send() call is failing now, so make sure the item
@@ -2816,25 +2816,29 @@ receive end.");
static PyObject *
channel_send(PyObject *self, PyObject *args, PyObject *kwds)
{
- // XXX Add a timeout arg.
- static char *kwlist[] = {"cid", "obj", "blocking", NULL};
- int64_t cid;
+ static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
int blocking = 1;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$p:channel_send", kwlist,
+ PyObject *timeout_obj = NULL;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist,
channel_id_converter, &cid_data, &obj,
- &blocking)) {
+ &blocking, &timeout_obj)) {
+ return NULL;
+ }
+
+ int64_t cid = cid_data.cid;
+ PY_TIMEOUT_T timeout;
+ if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
return NULL;
}
- cid = cid_data.cid;
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = _channel_send_wait(&_globals.channels, cid, obj);
+ err = _channel_send_wait(&_globals.channels, cid, obj, timeout);
}
else {
err = _channel_send(&_globals.channels, cid, obj, NULL);
@@ -2855,20 +2859,25 @@ By default this waits for the object to be received.");
static PyObject *
channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "blocking", NULL};
- int64_t cid;
+ static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
int blocking = 1;
+ PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&O|$p:channel_send_buffer", kwlist,
+ "O&O|$pO:channel_send_buffer", kwlist,
channel_id_converter, &cid_data, &obj,
- &blocking)) {
+ &blocking, &timeout_obj)) {
+ return NULL;
+ }
+
+ int64_t cid = cid_data.cid;
+ PY_TIMEOUT_T timeout;
+ if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
return NULL;
}
- cid = cid_data.cid;
PyObject *tempobj = PyMemoryView_FromObject(obj);
if (tempobj == NULL) {
@@ -2878,7 +2887,7 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = _channel_send_wait(&_globals.channels, cid, tempobj);
+ err = _channel_send_wait(&_globals.channels, cid, tempobj, timeout);
}
else {
err = _channel_send(&_globals.channels, cid, tempobj, NULL);