summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2023-10-17 23:51:52 (GMT)
committerGitHub <noreply@github.com>2023-10-17 23:51:52 (GMT)
commita77fa0512466bfd6edbab2aa3c830cdd1b0f5ba8 (patch)
tree23074993591d3248f4f50004434a91560b64c9d5
parent73a003f646aea723201f85b858991828f9053d2c (diff)
downloadcpython-a77fa0512466bfd6edbab2aa3c830cdd1b0f5ba8.zip
cpython-a77fa0512466bfd6edbab2aa3c830cdd1b0f5ba8.tar.gz
cpython-a77fa0512466bfd6edbab2aa3c830cdd1b0f5ba8.tar.bz2
gh-76785: Clean Up the Channels Module (gh-110568)
-rw-r--r--Include/cpython/pystate.h2
-rw-r--r--Modules/_xxinterpchannelsmodule.c616
-rw-r--r--Python/pystate.c14
3 files changed, 335 insertions, 297 deletions
diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h
index 40102f8..995f02e 100644
--- a/Include/cpython/pystate.h
+++ b/Include/cpython/pystate.h
@@ -291,7 +291,7 @@ struct _xid {
// with deleted interpreters. Note that IDs are never re-used, so
// each one will always correspond to a specific interpreter
// (whether still alive or not).
- int64_t interp;
+ int64_t interpid;
// new_object is a function that returns a new object in the current
// interpreter given the data. The resulting object (a new
// reference) will be equivalent to the original object. This field
diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c
index 2e2878d..b618592 100644
--- a/Modules/_xxinterpchannelsmodule.c
+++ b/Modules/_xxinterpchannelsmodule.c
@@ -28,7 +28,7 @@ _globals (static struct globals):
next_id; (int64_t)
mutex (PyThread_type_lock)
head (linked list of struct _channelref *):
- id (int64_t)
+ cid (int64_t)
objcount (Py_ssize_t)
next (struct _channelref *):
...
@@ -42,7 +42,7 @@ _globals (static struct globals):
numsendopen (int64_t)
numrecvopen (int64_t)
send (struct _channelend *):
- interp (int64_t)
+ interpid (int64_t)
open (int)
next (struct _channelend *)
recv (struct _channelend *):
@@ -55,7 +55,7 @@ _globals (static struct globals):
data (_PyCrossInterpreterData *):
data (void *)
obj (PyObject *)
- interp (int64_t)
+ interpid (int64_t)
new_object (xid_newobjectfunc)
free (xid_freefunc)
last (struct _channelitem *):
@@ -269,7 +269,7 @@ wait_for_lock(PyThread_type_lock mutex, PY_TIMEOUT_T timeout)
typedef struct {
PyObject_HEAD
Py_buffer *view;
- int64_t interp;
+ int64_t interpid;
} XIBufferViewObject;
static PyObject *
@@ -277,21 +277,21 @@ xibufferview_from_xid(PyTypeObject *cls, _PyCrossInterpreterData *data)
{
assert(data->data != NULL);
assert(data->obj == NULL);
- assert(data->interp >= 0);
+ assert(data->interpid >= 0);
XIBufferViewObject *self = PyObject_Malloc(sizeof(XIBufferViewObject));
if (self == NULL) {
return NULL;
}
PyObject_Init((PyObject *)self, cls);
self->view = (Py_buffer *)data->data;
- self->interp = data->interp;
+ self->interpid = data->interpid;
return (PyObject *)self;
}
static void
xibufferview_dealloc(XIBufferViewObject *self)
{
- PyInterpreterState *interp = _PyInterpreterState_LookUpID(self->interp);
+ PyInterpreterState *interp = _PyInterpreterState_LookUpID(self->interpid);
/* If the interpreter is no longer alive then we have problems,
since other objects may be using the buffer still. */
assert(interp != NULL);
@@ -495,6 +495,7 @@ _get_current_xibufferview_type(void)
#define CHANNEL_BOTH 0
#define CHANNEL_RECV -1
+
/* channel errors */
#define ERR_CHANNEL_NOT_FOUND -2
@@ -593,6 +594,7 @@ handle_channel_error(int err, PyObject *mod, int64_t cid)
return 1;
}
+
/* the channel queue */
typedef uintptr_t _channelitem_id_t;
@@ -711,7 +713,7 @@ _channelitem_clear(_channelitem *item)
item->next = NULL;
if (item->data != NULL) {
- // It was allocated in _channel_send().
+ // It was allocated in channel_send().
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
item->data = NULL;
}
@@ -911,14 +913,14 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid,
}
static void
-_channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp)
+_channelqueue_clear_interpreter(_channelqueue *queue, int64_t interpid)
{
_channelitem *prev = NULL;
_channelitem *next = queue->first;
while (next != NULL) {
_channelitem *item = next;
next = item->next;
- if (item->data->interp == interp) {
+ if (item->data->interpid == interpid) {
if (prev == NULL) {
queue->first = item->next;
}
@@ -934,18 +936,19 @@ _channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp)
}
}
+
/* channel-interpreter associations */
struct _channelend;
typedef struct _channelend {
struct _channelend *next;
- int64_t interp;
+ int64_t interpid;
int open;
} _channelend;
static _channelend *
-_channelend_new(int64_t interp)
+_channelend_new(int64_t interpid)
{
_channelend *end = GLOBAL_MALLOC(_channelend);
if (end == NULL) {
@@ -953,7 +956,7 @@ _channelend_new(int64_t interp)
return NULL;
}
end->next = NULL;
- end->interp = interp;
+ end->interpid = interpid;
end->open = 1;
return end;
}
@@ -975,12 +978,12 @@ _channelend_free_all(_channelend *end)
}
static _channelend *
-_channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
+_channelend_find(_channelend *first, int64_t interpid, _channelend **pprev)
{
_channelend *prev = NULL;
_channelend *end = first;
while (end != NULL) {
- if (end->interp == interp) {
+ if (end->interpid == interpid) {
break;
}
prev = end;
@@ -1037,10 +1040,10 @@ _channelends_free(_channelends *ends)
}
static _channelend *
-_channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
+_channelends_add(_channelends *ends, _channelend *prev, int64_t interpid,
int send)
{
- _channelend *end = _channelend_new(interp);
+ _channelend *end = _channelend_new(interpid);
if (end == NULL) {
return NULL;
}
@@ -1066,11 +1069,11 @@ _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
}
static int
-_channelends_associate(_channelends *ends, int64_t interp, int send)
+_channelends_associate(_channelends *ends, int64_t interpid, int send)
{
_channelend *prev;
_channelend *end = _channelend_find(send ? ends->send : ends->recv,
- interp, &prev);
+ interpid, &prev);
if (end != NULL) {
if (!end->open) {
return ERR_CHANNEL_CLOSED;
@@ -1078,7 +1081,7 @@ _channelends_associate(_channelends *ends, int64_t interp, int send)
// already associated
return 0;
}
- if (_channelends_add(ends, prev, interp, send) == NULL) {
+ if (_channelends_add(ends, prev, interpid, send) == NULL) {
return -1;
}
return 0;
@@ -1088,16 +1091,20 @@ static int
_channelends_is_open(_channelends *ends)
{
if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
+ // At least one interpreter is still associated with the channel
+ // (and hasn't been released).
return 1;
}
+ // XXX This is wrong if an end can ever be removed.
if (ends->send == NULL && ends->recv == NULL) {
+ // The channel has never had any interpreters associated with it.
return 1;
}
return 0;
}
static void
-_channelends_close_end(_channelends *ends, _channelend *end, int send)
+_channelends_release_end(_channelends *ends, _channelend *end, int send)
{
end->open = 0;
if (send) {
@@ -1109,51 +1116,37 @@ _channelends_close_end(_channelends *ends, _channelend *end, int send)
}
static int
-_channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
+_channelends_release_interpreter(_channelends *ends, int64_t interpid, int which)
{
_channelend *prev;
_channelend *end;
if (which >= 0) { // send/both
- end = _channelend_find(ends->send, interp, &prev);
+ end = _channelend_find(ends->send, interpid, &prev);
if (end == NULL) {
// never associated so add it
- end = _channelends_add(ends, prev, interp, 1);
+ end = _channelends_add(ends, prev, interpid, 1);
if (end == NULL) {
return -1;
}
}
- _channelends_close_end(ends, end, 1);
+ _channelends_release_end(ends, end, 1);
}
if (which <= 0) { // recv/both
- end = _channelend_find(ends->recv, interp, &prev);
+ end = _channelend_find(ends->recv, interpid, &prev);
if (end == NULL) {
// never associated so add it
- end = _channelends_add(ends, prev, interp, 0);
+ end = _channelends_add(ends, prev, interpid, 0);
if (end == NULL) {
return -1;
}
}
- _channelends_close_end(ends, end, 0);
+ _channelends_release_end(ends, end, 0);
}
return 0;
}
static void
-_channelends_drop_interpreter(_channelends *ends, int64_t interp)
-{
- _channelend *end;
- end = _channelend_find(ends->send, interp, NULL);
- if (end != NULL) {
- _channelends_close_end(ends, end, 1);
- }
- end = _channelend_find(ends->recv, interp, NULL);
- if (end != NULL) {
- _channelends_close_end(ends, end, 0);
- }
-}
-
-static void
-_channelends_close_all(_channelends *ends, int which, int force)
+_channelends_release_all(_channelends *ends, int which, int force)
{
// XXX Handle the ends.
// XXX Handle force is True.
@@ -1161,16 +1154,32 @@ _channelends_close_all(_channelends *ends, int which, int force)
// Ensure all the "send"-associated interpreters are closed.
_channelend *end;
for (end = ends->send; end != NULL; end = end->next) {
- _channelends_close_end(ends, end, 1);
+ _channelends_release_end(ends, end, 1);
}
// Ensure all the "recv"-associated interpreters are closed.
for (end = ends->recv; end != NULL; end = end->next) {
- _channelends_close_end(ends, end, 0);
+ _channelends_release_end(ends, end, 0);
}
}
-/* channels */
+static void
+_channelends_clear_interpreter(_channelends *ends, int64_t interpid)
+{
+ // XXX Actually remove the entries?
+ _channelend *end;
+ end = _channelend_find(ends->send, interpid, NULL);
+ if (end != NULL) {
+ _channelends_release_end(ends, end, 1);
+ }
+ end = _channelend_find(ends->recv, interpid, NULL);
+ if (end != NULL) {
+ _channelends_release_end(ends, end, 0);
+ }
+}
+
+
+/* each channel's state */
struct _channel;
struct _channel_closing;
@@ -1183,12 +1192,12 @@ typedef struct _channel {
_channelends *ends;
int open;
struct _channel_closing *closing;
-} _PyChannelState;
+} _channel_state;
-static _PyChannelState *
+static _channel_state *
_channel_new(PyThread_type_lock mutex)
{
- _PyChannelState *chan = GLOBAL_MALLOC(_PyChannelState);
+ _channel_state *chan = GLOBAL_MALLOC(_channel_state);
if (chan == NULL) {
return NULL;
}
@@ -1210,7 +1219,7 @@ _channel_new(PyThread_type_lock mutex)
}
static void
-_channel_free(_PyChannelState *chan)
+_channel_free(_channel_state *chan)
{
_channel_clear_closing(chan);
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@@ -1223,7 +1232,7 @@ _channel_free(_PyChannelState *chan)
}
static int
-_channel_add(_PyChannelState *chan, int64_t interp,
+_channel_add(_channel_state *chan, int64_t interpid,
_PyCrossInterpreterData *data, _waiting_t *waiting)
{
int res = -1;
@@ -1233,7 +1242,7 @@ _channel_add(_PyChannelState *chan, int64_t interp,
res = ERR_CHANNEL_CLOSED;
goto done;
}
- if (_channelends_associate(chan->ends, interp, 1) != 0) {
+ if (_channelends_associate(chan->ends, interpid, 1) != 0) {
res = ERR_CHANNEL_INTERP_CLOSED;
goto done;
}
@@ -1250,7 +1259,7 @@ done:
}
static int
-_channel_next(_PyChannelState *chan, int64_t interp,
+_channel_next(_channel_state *chan, int64_t interpid,
_PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
{
int err = 0;
@@ -1260,7 +1269,7 @@ _channel_next(_PyChannelState *chan, int64_t interp,
err = ERR_CHANNEL_CLOSED;
goto done;
}
- if (_channelends_associate(chan->ends, interp, 0) != 0) {
+ if (_channelends_associate(chan->ends, interpid, 0) != 0) {
err = ERR_CHANNEL_INTERP_CLOSED;
goto done;
}
@@ -1281,7 +1290,7 @@ done:
}
static void
-_channel_remove(_PyChannelState *chan, _channelitem_id_t itemid)
+_channel_remove(_channel_state *chan, _channelitem_id_t itemid)
{
_PyCrossInterpreterData *data = NULL;
_waiting_t *waiting = NULL;
@@ -1301,7 +1310,7 @@ _channel_remove(_PyChannelState *chan, _channelitem_id_t itemid)
}
static int
-_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
+_channel_release_interpreter(_channel_state *chan, int64_t interpid, int end)
{
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@@ -1311,10 +1320,12 @@ _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
goto done;
}
- if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
+ if (_channelends_release_interpreter(chan->ends, interpid, end) != 0) {
goto done;
}
chan->open = _channelends_is_open(chan->ends);
+ // XXX Clear the queue if not empty?
+ // XXX Activate the "closing" mechanism?
res = 0;
done:
@@ -1322,20 +1333,8 @@ done:
return res;
}
-static void
-_channel_drop_interpreter(_PyChannelState *chan, int64_t interp)
-{
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-
- _channelqueue_drop_interpreter(chan->queue, interp);
- _channelends_drop_interpreter(chan->ends, interp);
- chan->open = _channelends_is_open(chan->ends);
-
- PyThread_release_lock(chan->mutex);
-}
-
static int
-_channel_close_all(_PyChannelState *chan, int end, int force)
+_channel_release_all(_channel_state *chan, int end, int force)
{
int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@@ -1349,12 +1348,13 @@ _channel_close_all(_PyChannelState *chan, int end, int force)
res = ERR_CHANNEL_NOT_EMPTY;
goto done;
}
+ // XXX Clear the queue?
chan->open = 0;
// We *could* also just leave these in place, since we've marked
// the channel as closed already.
- _channelends_close_all(chan->ends, end, force);
+ _channelends_release_all(chan->ends, end, force);
res = 0;
done:
@@ -1362,25 +1362,39 @@ done:
return res;
}
+static void
+_channel_clear_interpreter(_channel_state *chan, int64_t interpid)
+{
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+ _channelqueue_clear_interpreter(chan->queue, interpid);
+ _channelends_clear_interpreter(chan->ends, interpid);
+ chan->open = _channelends_is_open(chan->ends);
+
+ PyThread_release_lock(chan->mutex);
+}
+
+
/* the set of channels */
struct _channelref;
typedef struct _channelref {
- int64_t id;
- _PyChannelState *chan;
+ int64_t cid;
+ _channel_state *chan;
struct _channelref *next;
+ // The number of ChannelID objects referring to this channel.
Py_ssize_t objcount;
} _channelref;
static _channelref *
-_channelref_new(int64_t id, _PyChannelState *chan)
+_channelref_new(int64_t cid, _channel_state *chan)
{
_channelref *ref = GLOBAL_MALLOC(_channelref);
if (ref == NULL) {
return NULL;
}
- ref->id = id;
+ ref->cid = cid;
ref->chan = chan;
ref->next = NULL;
ref->objcount = 0;
@@ -1390,7 +1404,7 @@ _channelref_new(int64_t id, _PyChannelState *chan)
//static void
//_channelref_clear(_channelref *ref)
//{
-// ref->id = -1;
+// ref->cid = -1;
// ref->chan = NULL;
// ref->next = NULL;
// ref->objcount = 0;
@@ -1407,12 +1421,12 @@ _channelref_free(_channelref *ref)
}
static _channelref *
-_channelref_find(_channelref *first, int64_t id, _channelref **pprev)
+_channelref_find(_channelref *first, int64_t cid, _channelref **pprev)
{
_channelref *prev = NULL;
_channelref *ref = first;
while (ref != NULL) {
- if (ref->id == id) {
+ if (ref->cid == cid) {
break;
}
prev = ref;
@@ -1424,6 +1438,7 @@ _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
return ref;
}
+
typedef struct _channels {
PyThread_type_lock mutex;
_channelref *head;
@@ -1454,27 +1469,27 @@ _channels_fini(_channels *channels)
static int64_t
_channels_next_id(_channels *channels) // needs lock
{
- int64_t id = channels->next_id;
- if (id < 0) {
+ int64_t cid = channels->next_id;
+ if (cid < 0) {
/* overflow */
return -1;
}
channels->next_id += 1;
- return id;
+ return cid;
}
static int
-_channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex,
- _PyChannelState **res)
+_channels_lookup(_channels *channels, int64_t cid, PyThread_type_lock *pmutex,
+ _channel_state **res)
{
int err = -1;
- _PyChannelState *chan = NULL;
+ _channel_state *chan = NULL;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
if (pmutex != NULL) {
*pmutex = NULL;
}
- _channelref *ref = _channelref_find(channels->head, id, NULL);
+ _channelref *ref = _channelref_find(channels->head, cid, NULL);
if (ref == NULL) {
err = ERR_CHANNEL_NOT_FOUND;
goto done;
@@ -1501,18 +1516,18 @@ done:
}
static int64_t
-_channels_add(_channels *channels, _PyChannelState *chan)
+_channels_add(_channels *channels, _channel_state *chan)
{
int64_t cid = -1;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
// Create a new ref.
- int64_t id = _channels_next_id(channels);
- if (id < 0) {
+ int64_t _cid = _channels_next_id(channels);
+ if (_cid < 0) {
cid = ERR_NO_NEXT_CHANNEL_ID;
goto done;
}
- _channelref *ref = _channelref_new(id, chan);
+ _channelref *ref = _channelref_new(_cid, chan);
if (ref == NULL) {
goto done;
}
@@ -1523,17 +1538,17 @@ _channels_add(_channels *channels, _PyChannelState *chan)
channels->head = ref;
channels->numopen += 1;
- cid = id;
+ cid = _cid;
done:
PyThread_release_lock(channels->mutex);
return cid;
}
/* forward */
-static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
+static int _channel_set_closing(_channelref *, PyThread_type_lock);
static int
-_channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
+_channels_close(_channels *channels, int64_t cid, _channel_state **pchan,
int end, int force)
{
int res = -1;
@@ -1557,7 +1572,7 @@ _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
goto done;
}
else {
- int err = _channel_close_all(ref->chan, end, force);
+ int err = _channel_release_all(ref->chan, end, force);
if (err != 0) {
if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
if (ref->chan->closing != NULL) {
@@ -1599,7 +1614,7 @@ done:
static void
_channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
- _PyChannelState **pchan)
+ _channel_state **pchan)
{
if (ref == channels->head) {
channels->head = ref->next;
@@ -1616,7 +1631,7 @@ _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
}
static int
-_channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
+_channels_remove(_channels *channels, int64_t cid, _channel_state **pchan)
{
int res = -1;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
@@ -1626,7 +1641,7 @@ _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
}
_channelref *prev = NULL;
- _channelref *ref = _channelref_find(channels->head, id, &prev);
+ _channelref *ref = _channelref_find(channels->head, cid, &prev);
if (ref == NULL) {
res = ERR_CHANNEL_NOT_FOUND;
goto done;
@@ -1641,12 +1656,12 @@ done:
}
static int
-_channels_add_id_object(_channels *channels, int64_t id)
+_channels_add_id_object(_channels *channels, int64_t cid)
{
int res = -1;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- _channelref *ref = _channelref_find(channels->head, id, NULL);
+ _channelref *ref = _channelref_find(channels->head, cid, NULL);
if (ref == NULL) {
res = ERR_CHANNEL_NOT_FOUND;
goto done;
@@ -1660,12 +1675,12 @@ done:
}
static void
-_channels_drop_id_object(_channels *channels, int64_t id)
+_channels_release_cid_object(_channels *channels, int64_t cid)
{
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
_channelref *prev = NULL;
- _channelref *ref = _channelref_find(channels->head, id, &prev);
+ _channelref *ref = _channelref_find(channels->head, cid, &prev);
if (ref == NULL) {
// Already destroyed.
goto done;
@@ -1674,7 +1689,7 @@ _channels_drop_id_object(_channels *channels, int64_t id)
// Destroy if no longer used.
if (ref->objcount == 0) {
- _PyChannelState *chan = NULL;
+ _channel_state *chan = NULL;
_channels_remove_ref(channels, ref, prev, &chan);
if (chan != NULL) {
_channel_free(chan);
@@ -1696,7 +1711,7 @@ _channels_list_all(_channels *channels, int64_t *count)
}
_channelref *ref = channels->head;
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
- ids[i] = ref->id;
+ ids[i] = ref->cid;
}
*count = channels->numopen;
@@ -1707,29 +1722,30 @@ done:
}
static void
-_channels_drop_interpreter(_channels *channels, int64_t interp)
+_channels_clear_interpreter(_channels *channels, int64_t interpid)
{
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
_channelref *ref = channels->head;
for (; ref != NULL; ref = ref->next) {
if (ref->chan != NULL) {
- _channel_drop_interpreter(ref->chan, interp);
+ _channel_clear_interpreter(ref->chan, interpid);
}
}
PyThread_release_lock(channels->mutex);
}
+
/* support for closing non-empty channels */
struct _channel_closing {
- struct _channelref *ref;
+ _channelref *ref;
};
static int
-_channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
- struct _channel *chan = ref->chan;
+_channel_set_closing(_channelref *ref, PyThread_type_lock mutex) {
+ _channel_state *chan = ref->chan;
if (chan == NULL) {
// already closed
return 0;
@@ -1753,7 +1769,7 @@ done:
}
static void
-_channel_clear_closing(struct _channel *chan) {
+_channel_clear_closing(_channel_state *chan) {
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
if (chan->closing != NULL) {
GLOBAL_FREE(chan->closing);
@@ -1763,7 +1779,7 @@ _channel_clear_closing(struct _channel *chan) {
}
static void
-_channel_finish_closing(struct _channel *chan) {
+_channel_finish_closing(_channel_state *chan) {
struct _channel_closing *closing = chan->closing;
if (closing == NULL) {
return;
@@ -1775,32 +1791,35 @@ _channel_finish_closing(struct _channel *chan) {
_channel_free(chan);
}
+
/* "high"-level channel-related functions */
+// Create a new channel.
static int64_t
-_channel_create(_channels *channels)
+channel_create(_channels *channels)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_CHANNEL_MUTEX_INIT;
}
- _PyChannelState *chan = _channel_new(mutex);
+ _channel_state *chan = _channel_new(mutex);
if (chan == NULL) {
PyThread_free_lock(mutex);
return -1;
}
- int64_t id = _channels_add(channels, chan);
- if (id < 0) {
+ int64_t cid = _channels_add(channels, chan);
+ if (cid < 0) {
_channel_free(chan);
}
- return id;
+ return cid;
}
+// Completely destroy the channel.
static int
-_channel_destroy(_channels *channels, int64_t id)
+channel_destroy(_channels *channels, int64_t cid)
{
- _PyChannelState *chan = NULL;
- int err = _channels_remove(channels, id, &chan);
+ _channel_state *chan = NULL;
+ int err = _channels_remove(channels, cid, &chan);
if (err != 0) {
return err;
}
@@ -1810,19 +1829,23 @@ _channel_destroy(_channels *channels, int64_t id)
return 0;
}
+// Push an object onto the channel.
+// The current interpreter gets associated with the send end of the channel.
+// Optionally request to be notified when it is received.
static int
-_channel_send(_channels *channels, int64_t id, PyObject *obj,
- _waiting_t *waiting)
+channel_send(_channels *channels, int64_t cid, PyObject *obj,
+ _waiting_t *waiting)
{
PyInterpreterState *interp = _get_current_interp();
if (interp == NULL) {
return -1;
}
+ int64_t interpid = PyInterpreterState_GetID(interp);
// Look up the channel.
PyThread_type_lock mutex = NULL;
- _PyChannelState *chan = NULL;
- int err = _channels_lookup(channels, id, &mutex, &chan);
+ _channel_state *chan = NULL;
+ int err = _channels_lookup(channels, cid, &mutex, &chan);
if (err != 0) {
return err;
}
@@ -1847,8 +1870,7 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj,
}
// Add the data to the channel.
- int res = _channel_add(chan, PyInterpreterState_GetID(interp),
- data, waiting);
+ int res = _channel_add(chan, interpid, data, waiting);
PyThread_release_lock(mutex);
if (res != 0) {
// We may chain an exception here:
@@ -1860,12 +1882,13 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj,
return 0;
}
+// Basically, un-send an object.
static void
-_channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
+channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
{
// Look up the channel.
PyThread_type_lock mutex = NULL;
- _PyChannelState *chan = NULL;
+ _channel_state *chan = NULL;
int err = _channels_lookup(channels, cid, &mutex, &chan);
if (err != 0) {
// The channel was already closed, etc.
@@ -1881,8 +1904,9 @@ _channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
PyThread_release_lock(mutex);
}
+// Like channel_send(), but strictly wait for the object to be received.
static int
-_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
+channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
PY_TIMEOUT_T timeout)
{
// We use a stack variable here, so we must ensure that &waiting
@@ -1894,7 +1918,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
}
/* Queue up the object. */
- int res = _channel_send(channels, cid, obj, &waiting);
+ int res = channel_send(channels, cid, obj, &waiting);
if (res < 0) {
assert(waiting.status == WAITING_NO_STATUS);
goto finally;
@@ -1906,7 +1930,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
_waiting_finish_releasing(&waiting);
/* The send() call is failing now, so make sure the item
won't be received. */
- _channel_clear_sent(channels, cid, &waiting);
+ channel_clear_sent(channels, cid, &waiting);
assert(waiting.status == WAITING_RELEASED);
if (!waiting.received) {
res = -1;
@@ -1932,8 +1956,11 @@ finally:
return res;
}
+// Pop the next object off the channel. Fail if empty.
+// The current interpreter gets associated with the recv end of the channel.
+// XXX Support a "wait" mutex?
static int
-_channel_recv(_channels *channels, int64_t id, PyObject **res)
+channel_recv(_channels *channels, int64_t cid, PyObject **res)
{
int err;
*res = NULL;
@@ -1946,11 +1973,12 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res)
}
return 0;
}
+ int64_t interpid = PyInterpreterState_GetID(interp);
// Look up the channel.
PyThread_type_lock mutex = NULL;
- _PyChannelState *chan = NULL;
- err = _channels_lookup(channels, id, &mutex, &chan);
+ _channel_state *chan = NULL;
+ err = _channels_lookup(channels, cid, &mutex, &chan);
if (err != 0) {
return err;
}
@@ -1960,8 +1988,7 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res)
// Pop off the next item from the channel.
_PyCrossInterpreterData *data = NULL;
_waiting_t *waiting = NULL;
- err = _channel_next(chan, PyInterpreterState_GetID(interp), &data,
- &waiting);
+ err = _channel_next(chan, interpid, &data, &waiting);
PyThread_release_lock(mutex);
if (err != 0) {
return err;
@@ -1975,14 +2002,14 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res)
PyObject *obj = _PyCrossInterpreterData_NewObject(data);
if (obj == NULL) {
assert(PyErr_Occurred());
- // It was allocated in _channel_send(), so we free it.
+ // It was allocated in channel_send(), so we free it.
(void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
if (waiting != NULL) {
_waiting_release(waiting, 0);
}
return -1;
}
- // It was allocated in _channel_send(), so we free it.
+ // It was allocated in channel_send(), so we free it.
int release_res = _release_xid_data(data, XID_FREE);
if (release_res < 0) {
// The source interpreter has been destroyed already.
@@ -2003,40 +2030,49 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res)
return 0;
}
+// Disallow send/recv for the current interpreter.
+// The channel is marked as closed if no other interpreters
+// are currently associated.
static int
-_channel_drop(_channels *channels, int64_t id, int send, int recv)
+channel_release(_channels *channels, int64_t cid, int send, int recv)
{
PyInterpreterState *interp = _get_current_interp();
if (interp == NULL) {
return -1;
}
+ int64_t interpid = PyInterpreterState_GetID(interp);
// Look up the channel.
PyThread_type_lock mutex = NULL;
- _PyChannelState *chan = NULL;
- int err = _channels_lookup(channels, id, &mutex, &chan);
+ _channel_state *chan = NULL;
+ int err = _channels_lookup(channels, cid, &mutex, &chan);
if (err != 0) {
return err;
}
// Past this point we are responsible for releasing the mutex.
// Close one or both of the two ends.
- int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
+ int res = _channel_release_interpreter(chan, interpid, send-recv);
PyThread_release_lock(mutex);
return res;
}
+// Close the channel (for all interpreters). Fail if it's already closed.
+// Close immediately if it's empty. Otherwise, disallow sending and
+// finally close once empty. Optionally, immediately clear and close it.
static int
-_channel_close(_channels *channels, int64_t id, int end, int force)
+channel_close(_channels *channels, int64_t cid, int end, int force)
{
- return _channels_close(channels, id, NULL, end, force);
+ return _channels_close(channels, cid, NULL, end, force);
}
+// Return true if the identified interpreter is associated
+// with the given end of the channel.
static int
-_channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
+channel_is_associated(_channels *channels, int64_t cid, int64_t interpid,
int send)
{
- _PyChannelState *chan = NULL;
+ _channel_state *chan = NULL;
int err = _channels_lookup(channels, cid, NULL, &chan);
if (err != 0) {
return err;
@@ -2046,16 +2082,17 @@ _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
}
_channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
- interp, NULL);
+ interpid, NULL);
return (end != NULL && end->open);
}
+
/* ChannelID class */
typedef struct channelid {
PyObject_HEAD
- int64_t id;
+ int64_t cid;
int end;
int resolve;
_channels *channels;
@@ -2076,7 +2113,7 @@ channel_id_converter(PyObject *arg, void *ptr)
module_state *state = get_module_state(data->module);
assert(state != NULL);
if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
- cid = ((channelid *)arg)->id;
+ cid = ((channelid *)arg)->cid;
end = ((channelid *)arg)->end;
}
else if (PyIndex_Check(arg)) {
@@ -2111,7 +2148,7 @@ newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
if (self == NULL) {
return -1;
}
- self->id = cid;
+ self->cid = cid;
self->end = end;
self->resolve = resolve;
self->channels = channels;
@@ -2176,22 +2213,22 @@ _channelid_new(PyObject *mod, PyTypeObject *cls,
end = CHANNEL_RECV;
}
- PyObject *id = NULL;
+ PyObject *cidobj = NULL;
int err = newchannelid(cls, cid, end, _global_channels(),
force, resolve,
- (channelid **)&id);
+ (channelid **)&cidobj);
if (handle_channel_error(err, mod, cid)) {
- assert(id == NULL);
+ assert(cidobj == NULL);
return NULL;
}
- assert(id != NULL);
- return id;
+ assert(cidobj != NULL);
+ return cidobj;
}
static void
channelid_dealloc(PyObject *self)
{
- int64_t cid = ((channelid *)self)->id;
+ int64_t cid = ((channelid *)self)->cid;
_channels *channels = ((channelid *)self)->channels;
PyTypeObject *tp = Py_TYPE(self);
@@ -2204,7 +2241,7 @@ channelid_dealloc(PyObject *self)
// like we do for _abc._abc_data?
Py_DECREF(tp);
- _channels_drop_id_object(channels, cid);
+ _channels_release_cid_object(channels, cid);
}
static PyObject *
@@ -2213,44 +2250,44 @@ channelid_repr(PyObject *self)
PyTypeObject *type = Py_TYPE(self);
const char *name = _PyType_Name(type);
- channelid *cid = (channelid *)self;
+ channelid *cidobj = (channelid *)self;
const char *fmt;
- if (cid->end == CHANNEL_SEND) {
+ if (cidobj->end == CHANNEL_SEND) {
fmt = "%s(%" PRId64 ", send=True)";
}
- else if (cid->end == CHANNEL_RECV) {
+ else if (cidobj->end == CHANNEL_RECV) {
fmt = "%s(%" PRId64 ", recv=True)";
}
else {
fmt = "%s(%" PRId64 ")";
}
- return PyUnicode_FromFormat(fmt, name, cid->id);
+ return PyUnicode_FromFormat(fmt, name, cidobj->cid);
}
static PyObject *
channelid_str(PyObject *self)
{
- channelid *cid = (channelid *)self;
- return PyUnicode_FromFormat("%" PRId64 "", cid->id);
+ channelid *cidobj = (channelid *)self;
+ return PyUnicode_FromFormat("%" PRId64 "", cidobj->cid);
}
static PyObject *
channelid_int(PyObject *self)
{
- channelid *cid = (channelid *)self;
- return PyLong_FromLongLong(cid->id);
+ channelid *cidobj = (channelid *)self;
+ return PyLong_FromLongLong(cidobj->cid);
}
static Py_hash_t
channelid_hash(PyObject *self)
{
- channelid *cid = (channelid *)self;
- PyObject *id = PyLong_FromLongLong(cid->id);
- if (id == NULL) {
+ channelid *cidobj = (channelid *)self;
+ PyObject *pyid = PyLong_FromLongLong(cidobj->cid);
+ if (pyid == NULL) {
return -1;
}
- Py_hash_t hash = PyObject_Hash(id);
- Py_DECREF(id);
+ Py_hash_t hash = PyObject_Hash(pyid);
+ Py_DECREF(pyid);
return hash;
}
@@ -2276,11 +2313,11 @@ channelid_richcompare(PyObject *self, PyObject *other, int op)
goto done;
}
- channelid *cid = (channelid *)self;
+ channelid *cidobj = (channelid *)self;
int equal;
if (PyObject_TypeCheck(other, state->ChannelIDType)) {
- channelid *othercid = (channelid *)other;
- equal = (cid->end == othercid->end) && (cid->id == othercid->id);
+ channelid *othercidobj = (channelid *)other;
+ equal = (cidobj->end == othercidobj->end) && (cidobj->cid == othercidobj->cid);
}
else if (PyLong_Check(other)) {
/* Fast path */
@@ -2289,10 +2326,10 @@ channelid_richcompare(PyObject *self, PyObject *other, int op)
if (othercid == -1 && PyErr_Occurred()) {
goto done;
}
- equal = !overflow && (othercid >= 0) && (cid->id == othercid);
+ equal = !overflow && (othercid >= 0) && (cidobj->cid == othercid);
}
else if (PyNumber_Check(other)) {
- PyObject *pyid = PyLong_FromLongLong(cid->id);
+ PyObject *pyid = PyLong_FromLongLong(cidobj->cid);
if (pyid == NULL) {
goto done;
}
@@ -2317,16 +2354,16 @@ done:
return res;
}
-static PyTypeObject * _get_current_channel_end_type(int end);
+static PyTypeObject * _get_current_channelend_type(int end);
static PyObject *
-_channel_from_cid(PyObject *cid, int end)
+_channelobj_from_cidobj(PyObject *cidobj, int end)
{
- PyObject *cls = (PyObject *)_get_current_channel_end_type(end);
+ PyObject *cls = (PyObject *)_get_current_channelend_type(end);
if (cls == NULL) {
return NULL;
}
- PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
+ PyObject *chan = PyObject_CallFunctionObjArgs(cls, cidobj, NULL);
Py_DECREF(cls);
if (chan == NULL) {
return NULL;
@@ -2335,7 +2372,7 @@ _channel_from_cid(PyObject *cid, int end)
}
struct _channelid_xid {
- int64_t id;
+ int64_t cid;
int end;
int resolve;
};
@@ -2357,16 +2394,16 @@ _channelid_from_xid(_PyCrossInterpreterData *data)
}
// Note that we do not preserve the "resolve" flag.
- PyObject *cid = NULL;
- int err = newchannelid(state->ChannelIDType, xid->id, xid->end,
+ PyObject *cidobj = NULL;
+ int err = newchannelid(state->ChannelIDType, xid->cid, xid->end,
_global_channels(), 0, 0,
- (channelid **)&cid);
+ (channelid **)&cidobj);
if (err != 0) {
- assert(cid == NULL);
- (void)handle_channel_error(err, mod, xid->id);
+ assert(cidobj == NULL);
+ (void)handle_channel_error(err, mod, xid->cid);
goto done;
}
- assert(cid != NULL);
+ assert(cidobj != NULL);
if (xid->end == 0) {
goto done;
}
@@ -2375,17 +2412,17 @@ _channelid_from_xid(_PyCrossInterpreterData *data)
}
/* Try returning a high-level channel end but fall back to the ID. */
- PyObject *chan = _channel_from_cid(cid, xid->end);
+ PyObject *chan = _channelobj_from_cidobj(cidobj, xid->end);
if (chan == NULL) {
PyErr_Clear();
goto done;
}
- Py_DECREF(cid);
- cid = chan;
+ Py_DECREF(cidobj);
+ cidobj = chan;
done:
Py_DECREF(mod);
- return cid;
+ return cidobj;
}
static int
@@ -2400,7 +2437,7 @@ _channelid_shared(PyThreadState *tstate, PyObject *obj,
return -1;
}
struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
- xid->id = ((channelid *)obj)->id;
+ xid->cid = ((channelid *)obj)->cid;
xid->end = ((channelid *)obj)->end;
xid->resolve = ((channelid *)obj)->resolve;
return 0;
@@ -2410,30 +2447,30 @@ static PyObject *
channelid_end(PyObject *self, void *end)
{
int force = 1;
- channelid *cid = (channelid *)self;
+ channelid *cidobj = (channelid *)self;
if (end != NULL) {
- PyObject *id = NULL;
- int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end,
- cid->channels, force, cid->resolve,
- (channelid **)&id);
+ PyObject *obj = NULL;
+ int err = newchannelid(Py_TYPE(self), cidobj->cid, *(int *)end,
+ cidobj->channels, force, cidobj->resolve,
+ (channelid **)&obj);
if (err != 0) {
- assert(id == NULL);
+ assert(obj == NULL);
PyObject *mod = get_module_from_type(Py_TYPE(self));
if (mod == NULL) {
return NULL;
}
- (void)handle_channel_error(err, mod, cid->id);
+ (void)handle_channel_error(err, mod, cidobj->cid);
Py_DECREF(mod);
return NULL;
}
- assert(id != NULL);
- return id;
+ assert(obj != NULL);
+ return obj;
}
- if (cid->end == CHANNEL_SEND) {
+ if (cidobj->end == CHANNEL_SEND) {
return PyUnicode_InternFromString("send");
}
- if (cid->end == CHANNEL_RECV) {
+ if (cidobj->end == CHANNEL_RECV) {
return PyUnicode_InternFromString("recv");
}
return PyUnicode_InternFromString("both");
@@ -2455,7 +2492,7 @@ static PyGetSetDef channelid_getsets[] = {
PyDoc_STRVAR(channelid_doc,
"A channel ID identifies a channel and may be used as an int.");
-static PyType_Slot ChannelIDType_slots[] = {
+static PyType_Slot channelid_typeslots[] = {
{Py_tp_dealloc, (destructor)channelid_dealloc},
{Py_tp_doc, (void *)channelid_doc},
{Py_tp_repr, (reprfunc)channelid_repr},
@@ -2469,12 +2506,12 @@ static PyType_Slot ChannelIDType_slots[] = {
{0, NULL},
};
-static PyType_Spec ChannelIDType_spec = {
+static PyType_Spec channelid_typespec = {
.name = MODULE_NAME ".ChannelID",
.basicsize = sizeof(channelid),
.flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
- .slots = ChannelIDType_slots,
+ .slots = channelid_typeslots,
};
@@ -2483,7 +2520,7 @@ static PyType_Spec ChannelIDType_spec = {
// XXX Use a new __xid__ protocol instead?
static PyTypeObject *
-_get_current_channel_end_type(int end)
+_get_current_channelend_type(int end)
{
module_state *state = _get_current_module_state();
if (state == NULL) {
@@ -2519,24 +2556,24 @@ _get_current_channel_end_type(int end)
}
static PyObject *
-_channel_end_from_xid(_PyCrossInterpreterData *data)
+_channelend_from_xid(_PyCrossInterpreterData *data)
{
- channelid *cid = (channelid *)_channelid_from_xid(data);
- if (cid == NULL) {
+ channelid *cidobj = (channelid *)_channelid_from_xid(data);
+ if (cidobj == NULL) {
return NULL;
}
- PyTypeObject *cls = _get_current_channel_end_type(cid->end);
+ PyTypeObject *cls = _get_current_channelend_type(cidobj->end);
if (cls == NULL) {
- Py_DECREF(cid);
+ Py_DECREF(cidobj);
return NULL;
}
- PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)cid);
- Py_DECREF(cid);
+ PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)cidobj);
+ Py_DECREF(cidobj);
return obj;
}
static int
-_channel_end_shared(PyThreadState *tstate, PyObject *obj,
+_channelend_shared(PyThreadState *tstate, PyObject *obj,
_PyCrossInterpreterData *data)
{
PyObject *cidobj = PyObject_GetAttrString(obj, "_id");
@@ -2548,12 +2585,12 @@ _channel_end_shared(PyThreadState *tstate, PyObject *obj,
if (res < 0) {
return -1;
}
- data->new_object = _channel_end_from_xid;
+ data->new_object = _channelend_from_xid;
return 0;
}
static int
-set_channel_end_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv)
+set_channelend_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv)
{
module_state *state = get_module_state(mod);
if (state == NULL) {
@@ -2570,16 +2607,17 @@ set_channel_end_types(PyObject *mod, PyTypeObject *send, PyTypeObject *recv)
state->send_channel_type = (PyTypeObject *)Py_NewRef(send);
state->recv_channel_type = (PyTypeObject *)Py_NewRef(recv);
- if (register_xid_class(send, _channel_end_shared, xid_classes)) {
+ if (register_xid_class(send, _channelend_shared, xid_classes)) {
return -1;
}
- if (register_xid_class(recv, _channel_end_shared, xid_classes)) {
+ if (register_xid_class(recv, _channelend_shared, xid_classes)) {
return -1;
}
return 0;
}
+
/* module level code ********************************************************/
/* globals is the process-global state for the module. It holds all
@@ -2635,15 +2673,15 @@ clear_interpreter(void *data)
}
PyInterpreterState *interp = (PyInterpreterState *)data;
assert(interp == _get_current_interp());
- int64_t id = PyInterpreterState_GetID(interp);
- _channels_drop_interpreter(&_globals.channels, id);
+ int64_t interpid = PyInterpreterState_GetID(interp);
+ _channels_clear_interpreter(&_globals.channels, interpid);
}
static PyObject *
-channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
+channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored))
{
- int64_t cid = _channel_create(&_globals.channels);
+ int64_t cid = channel_create(&_globals.channels);
if (cid < 0) {
(void)handle_channel_error(-1, self, cid);
return NULL;
@@ -2652,30 +2690,30 @@ channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
if (state == NULL) {
return NULL;
}
- PyObject *id = NULL;
+ PyObject *cidobj = NULL;
int err = newchannelid(state->ChannelIDType, cid, 0,
&_globals.channels, 0, 0,
- (channelid **)&id);
+ (channelid **)&cidobj);
if (handle_channel_error(err, self, cid)) {
- assert(id == NULL);
- err = _channel_destroy(&_globals.channels, cid);
+ assert(cidobj == NULL);
+ err = channel_destroy(&_globals.channels, cid);
if (handle_channel_error(err, self, cid)) {
// XXX issue a warning?
}
return NULL;
}
- assert(id != NULL);
- assert(((channelid *)id)->channels != NULL);
- return id;
+ assert(cidobj != NULL);
+ assert(((channelid *)cidobj)->channels != NULL);
+ return cidobj;
}
-PyDoc_STRVAR(channel_create_doc,
+PyDoc_STRVAR(channelsmod_create_doc,
"channel_create() -> cid\n\
\n\
Create a new cross-interpreter channel and return a unique generated ID.");
static PyObject *
-channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod_destroy(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", NULL};
int64_t cid;
@@ -2688,21 +2726,21 @@ channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
}
cid = cid_data.cid;
- int err = _channel_destroy(&_globals.channels, cid);
+ int err = channel_destroy(&_globals.channels, cid);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
Py_RETURN_NONE;
}
-PyDoc_STRVAR(channel_destroy_doc,
+PyDoc_STRVAR(channelsmod_destroy_doc,
"channel_destroy(cid)\n\
\n\
Close and finalize the channel. Afterward attempts to use the channel\n\
will behave as though it never existed.");
static PyObject *
-channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
+channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{
int64_t count = 0;
int64_t *cids = _channels_list_all(&_globals.channels, &count);
@@ -2724,17 +2762,17 @@ channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
}
int64_t *cur = cids;
for (int64_t i=0; i < count; cur++, i++) {
- PyObject *id = NULL;
+ PyObject *cidobj = NULL;
int err = newchannelid(state->ChannelIDType, *cur, 0,
&_globals.channels, 0, 0,
- (channelid **)&id);
+ (channelid **)&cidobj);
if (handle_channel_error(err, self, *cur)) {
- assert(id == NULL);
+ assert(cidobj == NULL);
Py_SETREF(ids, NULL);
break;
}
- assert(id != NULL);
- PyList_SET_ITEM(ids, (Py_ssize_t)i, id);
+ assert(cidobj != NULL);
+ PyList_SET_ITEM(ids, (Py_ssize_t)i, cidobj);
}
finally:
@@ -2742,13 +2780,13 @@ finally:
return ids;
}
-PyDoc_STRVAR(channel_list_all_doc,
+PyDoc_STRVAR(channelsmod_list_all_doc,
"channel_list_all() -> [cid]\n\
\n\
Return the list of all IDs for active channels.");
static PyObject *
-channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "send", NULL};
int64_t cid; /* Channel ID */
@@ -2756,8 +2794,8 @@ channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
.module = self,
};
int send = 0; /* Send or receive end? */
- int64_t id;
- PyObject *ids, *id_obj;
+ int64_t interpid;
+ PyObject *ids, *interpid_obj;
PyInterpreterState *interp;
if (!PyArg_ParseTupleAndKeywords(
@@ -2774,20 +2812,20 @@ channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
interp = PyInterpreterState_Head();
while (interp != NULL) {
- id = PyInterpreterState_GetID(interp);
- assert(id >= 0);
- int res = _channel_is_associated(&_globals.channels, cid, id, send);
+ interpid = PyInterpreterState_GetID(interp);
+ assert(interpid >= 0);
+ int res = channel_is_associated(&_globals.channels, cid, interpid, send);
if (res < 0) {
(void)handle_channel_error(res, self, cid);
goto except;
}
if (res) {
- id_obj = PyInterpreterState_GetIDObject(interp);
- if (id_obj == NULL) {
+ interpid_obj = PyInterpreterState_GetIDObject(interp);
+ if (interpid_obj == NULL) {
goto except;
}
- res = PyList_Insert(ids, 0, id_obj);
- Py_DECREF(id_obj);
+ res = PyList_Insert(ids, 0, interpid_obj);
+ Py_DECREF(interpid_obj);
if (res < 0) {
goto except;
}
@@ -2804,7 +2842,7 @@ finally:
return ids;
}
-PyDoc_STRVAR(channel_list_interpreters_doc,
+PyDoc_STRVAR(channelsmod_list_interpreters_doc,
"channel_list_interpreters(cid, *, send) -> [id]\n\
\n\
Return the list of all interpreter IDs associated with an end of the channel.\n\
@@ -2814,7 +2852,7 @@ receive end.");
static PyObject *
-channel_send(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
@@ -2838,10 +2876,10 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds)
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = _channel_send_wait(&_globals.channels, cid, obj, timeout);
+ err = channel_send_wait(&_globals.channels, cid, obj, timeout);
}
else {
- err = _channel_send(&_globals.channels, cid, obj, NULL);
+ err = channel_send(&_globals.channels, cid, obj, NULL);
}
if (handle_channel_error(err, self, cid)) {
return NULL;
@@ -2850,14 +2888,14 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds)
Py_RETURN_NONE;
}
-PyDoc_STRVAR(channel_send_doc,
+PyDoc_STRVAR(channelsmod_send_doc,
"channel_send(cid, obj, blocking=True)\n\
\n\
Add the object's data to the channel's queue.\n\
By default this waits for the object to be received.");
static PyObject *
-channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
@@ -2887,10 +2925,10 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = _channel_send_wait(&_globals.channels, cid, tempobj, timeout);
+ err = channel_send_wait(&_globals.channels, cid, tempobj, timeout);
}
else {
- err = _channel_send(&_globals.channels, cid, tempobj, NULL);
+ err = channel_send(&_globals.channels, cid, tempobj, NULL);
}
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
@@ -2900,14 +2938,14 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
Py_RETURN_NONE;
}
-PyDoc_STRVAR(channel_send_buffer_doc,
+PyDoc_STRVAR(channelsmod_send_buffer_doc,
"channel_send_buffer(cid, obj, blocking=True)\n\
\n\
Add the object's buffer to the channel's queue.\n\
By default this waits for the object to be received.");
static PyObject *
-channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "default", NULL};
int64_t cid;
@@ -2922,7 +2960,7 @@ channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
cid = cid_data.cid;
PyObject *obj = NULL;
- int err = _channel_recv(&_globals.channels, cid, &obj);
+ int err = channel_recv(&_globals.channels, cid, &obj);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
@@ -2939,7 +2977,7 @@ channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
return obj;
}
-PyDoc_STRVAR(channel_recv_doc,
+PyDoc_STRVAR(channelsmod_recv_doc,
"channel_recv(cid, [default]) -> obj\n\
\n\
Return a new object from the data at the front of the channel's queue.\n\
@@ -2948,7 +2986,7 @@ If there is nothing to receive then raise ChannelEmptyError, unless\n\
a default value is provided. In that case return it.");
static PyObject *
-channel_close(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod_close(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
int64_t cid;
@@ -2966,14 +3004,14 @@ channel_close(PyObject *self, PyObject *args, PyObject *kwds)
}
cid = cid_data.cid;
- int err = _channel_close(&_globals.channels, cid, send-recv, force);
+ int err = channel_close(&_globals.channels, cid, send-recv, force);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
Py_RETURN_NONE;
}
-PyDoc_STRVAR(channel_close_doc,
+PyDoc_STRVAR(channelsmod_close_doc,
"channel_close(cid, *, send=None, recv=None, force=False)\n\
\n\
Close the channel for all interpreters.\n\
@@ -3001,7 +3039,7 @@ Once the channel's ID has no more ref counts in any interpreter\n\
the channel will be destroyed.");
static PyObject *
-channel_release(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod_release(PyObject *self, PyObject *args, PyObject *kwds)
{
// Note that only the current interpreter is affected.
static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
@@ -3027,14 +3065,14 @@ channel_release(PyObject *self, PyObject *args, PyObject *kwds)
// XXX Handle force is True.
// XXX Fix implicit release.
- int err = _channel_drop(&_globals.channels, cid, send, recv);
+ int err = channel_release(&_globals.channels, cid, send, recv);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
Py_RETURN_NONE;
}
-PyDoc_STRVAR(channel_release_doc,
+PyDoc_STRVAR(channelsmod_release_doc,
"channel_release(cid, *, send=None, recv=None, force=True)\n\
\n\
Close the channel for the current interpreter. 'send' and 'recv'\n\
@@ -3042,7 +3080,7 @@ Close the channel for the current interpreter. 'send' and 'recv'\n\
ends are closed. Closing an already closed end is a noop.");
static PyObject *
-channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
{
module_state *state = get_module_state(self);
if (state == NULL) {
@@ -3058,7 +3096,7 @@ channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
}
static PyObject *
-channel__register_end_types(PyObject *self, PyObject *args, PyObject *kwds)
+channelsmod__register_end_types(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"send", "recv", NULL};
PyObject *send;
@@ -3079,7 +3117,7 @@ channel__register_end_types(PyObject *self, PyObject *args, PyObject *kwds)
PyTypeObject *cls_send = (PyTypeObject *)send;
PyTypeObject *cls_recv = (PyTypeObject *)recv;
- if (set_channel_end_types(self, cls_send, cls_recv) < 0) {
+ if (set_channelend_types(self, cls_send, cls_recv) < 0) {
return NULL;
}
@@ -3087,27 +3125,27 @@ channel__register_end_types(PyObject *self, PyObject *args, PyObject *kwds)
}
static PyMethodDef module_functions[] = {
- {"create", channel_create,
- METH_NOARGS, channel_create_doc},
- {"destroy", _PyCFunction_CAST(channel_destroy),
- METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
- {"list_all", channel_list_all,
- METH_NOARGS, channel_list_all_doc},
- {"list_interpreters", _PyCFunction_CAST(channel_list_interpreters),
- METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
- {"send", _PyCFunction_CAST(channel_send),
- METH_VARARGS | METH_KEYWORDS, channel_send_doc},
- {"send_buffer", _PyCFunction_CAST(channel_send_buffer),
- METH_VARARGS | METH_KEYWORDS, channel_send_buffer_doc},
- {"recv", _PyCFunction_CAST(channel_recv),
- METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
- {"close", _PyCFunction_CAST(channel_close),
- METH_VARARGS | METH_KEYWORDS, channel_close_doc},
- {"release", _PyCFunction_CAST(channel_release),
- METH_VARARGS | METH_KEYWORDS, channel_release_doc},
- {"_channel_id", _PyCFunction_CAST(channel__channel_id),
+ {"create", channelsmod_create,
+ METH_NOARGS, channelsmod_create_doc},
+ {"destroy", _PyCFunction_CAST(channelsmod_destroy),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_destroy_doc},
+ {"list_all", channelsmod_list_all,
+ METH_NOARGS, channelsmod_list_all_doc},
+ {"list_interpreters", _PyCFunction_CAST(channelsmod_list_interpreters),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_list_interpreters_doc},
+ {"send", _PyCFunction_CAST(channelsmod_send),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_send_doc},
+ {"send_buffer", _PyCFunction_CAST(channelsmod_send_buffer),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_send_buffer_doc},
+ {"recv", _PyCFunction_CAST(channelsmod_recv),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_recv_doc},
+ {"close", _PyCFunction_CAST(channelsmod_close),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc},
+ {"release", _PyCFunction_CAST(channelsmod_release),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc},
+ {"_channel_id", _PyCFunction_CAST(channelsmod__channel_id),
METH_VARARGS | METH_KEYWORDS, NULL},
- {"_register_end_types", _PyCFunction_CAST(channel__register_end_types),
+ {"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types),
METH_VARARGS | METH_KEYWORDS, NULL},
{NULL, NULL} /* sentinel */
@@ -3143,7 +3181,7 @@ module_exec(PyObject *mod)
// ChannelID
state->ChannelIDType = add_new_type(
- mod, &ChannelIDType_spec, _channelid_shared, xid_classes);
+ mod, &channelid_typespec, _channelid_shared, xid_classes);
if (state->ChannelIDType == NULL) {
goto error;
}
diff --git a/Python/pystate.c b/Python/pystate.c
index 92cf741..2e6f07e 100644
--- a/Python/pystate.c
+++ b/Python/pystate.c
@@ -2428,7 +2428,7 @@ _xidata_init(_PyCrossInterpreterData *data)
assert(data->data == NULL);
assert(data->obj == NULL);
*data = (_PyCrossInterpreterData){0};
- data->interp = -1;
+ data->interpid = -1;
}
static inline void
@@ -2465,7 +2465,7 @@ _PyCrossInterpreterData_Init(_PyCrossInterpreterData *data,
// Ideally every object would know its owning interpreter.
// Until then, we have to rely on the caller to identify it
// (but we don't need it in all cases).
- data->interp = (interp != NULL) ? interp->id : -1;
+ data->interpid = (interp != NULL) ? interp->id : -1;
data->new_object = new_object;
}
@@ -2494,7 +2494,7 @@ _PyCrossInterpreterData_Clear(PyInterpreterState *interp,
{
assert(data != NULL);
// This must be called in the owning interpreter.
- assert(interp == NULL || data->interp == interp->id);
+ assert(interp == NULL || data->interpid == interp->id);
_xidata_clear(data);
}
@@ -2505,7 +2505,7 @@ _check_xidata(PyThreadState *tstate, _PyCrossInterpreterData *data)
// data->obj may be NULL, so we don't check it.
- if (data->interp < 0) {
+ if (data->interpid < 0) {
_PyErr_SetString(tstate, PyExc_SystemError, "missing interp");
return -1;
}
@@ -2557,7 +2557,7 @@ _PyObject_GetCrossInterpreterData(PyObject *obj, _PyCrossInterpreterData *data)
// Reset data before re-populating.
*data = (_PyCrossInterpreterData){0};
- data->interp = -1;
+ data->interpid = -1;
// Call the "getdata" func for the object.
Py_INCREF(obj);
@@ -2573,7 +2573,7 @@ _PyObject_GetCrossInterpreterData(PyObject *obj, _PyCrossInterpreterData *data)
}
// Fill in the blanks and validate the result.
- data->interp = interp->id;
+ data->interpid = interp->id;
if (_check_xidata(tstate, data) != 0) {
(void)_PyCrossInterpreterData_Release(data);
return -1;
@@ -2636,7 +2636,7 @@ _xidata_release(_PyCrossInterpreterData *data, int rawfree)
}
// Switch to the original interpreter.
- PyInterpreterState *interp = _PyInterpreterState_LookUpID(data->interp);
+ PyInterpreterState *interp = _PyInterpreterState_LookUpID(data->interpid);
if (interp == NULL) {
// The interpreter was already destroyed.
// This function shouldn't have been called.