summaryrefslogtreecommitdiffstats
path: root/Modules/_xxsubinterpretersmodule.c
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2018-02-03 04:49:49 (GMT)
committerGitHub <noreply@github.com>2018-02-03 04:49:49 (GMT)
commit4e9da0d163731caa79811c723c703ee416c31826 (patch)
treefa71d5f25341b13bcb446a929a3d46d864b51360 /Modules/_xxsubinterpretersmodule.c
parent2f79c014931cbb23b08a7d16c534a3cc9607ae14 (diff)
downloadcpython-4e9da0d163731caa79811c723c703ee416c31826.zip
cpython-4e9da0d163731caa79811c723c703ee416c31826.tar.gz
cpython-4e9da0d163731caa79811c723c703ee416c31826.tar.bz2
bpo-32604: Fix memory leaks in the new _xxsubinterpreters module. (#5507)
Diffstat (limited to 'Modules/_xxsubinterpretersmodule.c')
-rw-r--r--Modules/_xxsubinterpretersmodule.c864
1 files changed, 596 insertions, 268 deletions
diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c
index d2b5f26..7829b4c 100644
--- a/Modules/_xxsubinterpretersmodule.c
+++ b/Modules/_xxsubinterpretersmodule.c
@@ -7,6 +7,22 @@
#include "internal/pystate.h"
+static char *
+_copy_raw_string(PyObject *strobj)
+{
+ const char *str = PyUnicode_AsUTF8(strobj);
+ if (str == NULL) {
+ return NULL;
+ }
+ char *copied = PyMem_Malloc(strlen(str)+1);
+ if (str == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ strcpy(copied, str);
+ return copied;
+}
+
static PyInterpreterState *
_get_current(void)
{
@@ -31,10 +47,13 @@ _coerce_id(PyObject *id)
}
return -1;
}
- long long cid = PyLong_AsLongLong(id);
+ int64_t cid = PyLong_AsLongLong(id);
+ Py_DECREF(id);
if (cid == -1 && PyErr_Occurred() != NULL) {
- PyErr_SetString(PyExc_ValueError,
- "'id' must be a non-negative int");
+ if (!PyErr_ExceptionMatches(PyExc_OverflowError)) {
+ PyErr_SetString(PyExc_ValueError,
+ "'id' must be a non-negative int");
+ }
return -1;
}
if (cid < 0) {
@@ -42,92 +61,131 @@ _coerce_id(PyObject *id)
"'id' must be a non-negative int");
return -1;
}
- if (cid > INT64_MAX) {
- PyErr_SetString(PyExc_ValueError,
- "'id' too large (must be 64-bit int)");
- return -1;
- }
return cid;
}
+
/* data-sharing-specific code ***********************************************/
-typedef struct _shareditem {
- Py_UNICODE *name;
- Py_ssize_t namelen;
+struct _sharednsitem {
+ char *name;
_PyCrossInterpreterData data;
-} _shareditem;
+};
+
+static int
+_sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
+{
+ item->name = _copy_raw_string(key);
+ if (item->name == NULL) {
+ return -1;
+ }
+ if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
+ return -1;
+ }
+ return 0;
+}
+
+static void
+_sharednsitem_clear(struct _sharednsitem *item)
+{
+ if (item->name != NULL) {
+ PyMem_Free(item->name);
+ }
+ _PyCrossInterpreterData_Release(&item->data);
+}
+
+static int
+_sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
+{
+ PyObject *name = PyUnicode_FromString(item->name);
+ if (name == NULL) {
+ return -1;
+ }
+ PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
+ if (value == NULL) {
+ Py_DECREF(name);
+ return -1;
+ }
+ int res = PyDict_SetItem(ns, name, value);
+ Py_DECREF(name);
+ Py_DECREF(value);
+ return res;
+}
+
+typedef struct _sharedns {
+ Py_ssize_t len;
+ struct _sharednsitem* items;
+} _sharedns;
+
+static _sharedns *
+_sharedns_new(Py_ssize_t len)
+{
+ _sharedns *shared = PyMem_NEW(_sharedns, 1);
+ if (shared == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ shared->len = len;
+ shared->items = PyMem_NEW(struct _sharednsitem, len);
+ if (shared->items == NULL) {
+ PyErr_NoMemory();
+ PyMem_Free(shared);
+ return NULL;
+ }
+ return shared;
+}
-void
-_sharedns_clear(_shareditem *shared)
+static void
+_sharedns_free(_sharedns *shared)
{
- for (_shareditem *item=shared; item->name != NULL; item += 1) {
- _PyCrossInterpreterData_Release(&item->data);
+ for (Py_ssize_t i=0; i < shared->len; i++) {
+ _sharednsitem_clear(&shared->items[i]);
}
+ PyMem_Free(shared->items);
+ PyMem_Free(shared);
}
-static _shareditem *
-_get_shared_ns(PyObject *shareable, Py_ssize_t *lenp)
+static _sharedns *
+_get_shared_ns(PyObject *shareable)
{
if (shareable == NULL || shareable == Py_None) {
- *lenp = 0;
return NULL;
}
Py_ssize_t len = PyDict_Size(shareable);
- *lenp = len;
if (len == 0) {
return NULL;
}
- _shareditem *shared = PyMem_NEW(_shareditem, len+1);
+ _sharedns *shared = _sharedns_new(len);
if (shared == NULL) {
return NULL;
}
- for (Py_ssize_t i=0; i < len; i++) {
- *(shared + i) = (_shareditem){0};
- }
Py_ssize_t pos = 0;
for (Py_ssize_t i=0; i < len; i++) {
PyObject *key, *value;
if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
break;
}
- _shareditem *item = shared + i;
-
- if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
+ if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
break;
}
- item->name = PyUnicode_AsUnicodeAndSize(key, &item->namelen);
- if (item->name == NULL) {
- _PyCrossInterpreterData_Release(&item->data);
- break;
- }
- (item + 1)->name = NULL; // Mark the next one as the last.
}
if (PyErr_Occurred()) {
- _sharedns_clear(shared);
- PyMem_Free(shared);
+ _sharedns_free(shared);
return NULL;
}
return shared;
}
static int
-_shareditem_apply(_shareditem *item, PyObject *ns)
+_sharedns_apply(_sharedns *shared, PyObject *ns)
{
- PyObject *name = PyUnicode_FromUnicode(item->name, item->namelen);
- if (name == NULL) {
- return 1;
- }
- PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
- if (value == NULL) {
- Py_DECREF(name);
- return 1;
+ for (Py_ssize_t i=0; i < shared->len; i++) {
+ if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
+ return -1;
+ }
}
- int res = PyDict_SetItem(ns, name, value);
- Py_DECREF(name);
- Py_DECREF(value);
- return res;
+ return 0;
}
// Ultimately we'd like to preserve enough information about the
@@ -136,65 +194,117 @@ _shareditem_apply(_shareditem *item, PyObject *ns)
// of the exception in the calling interpreter.
typedef struct _sharedexception {
+ char *name;
char *msg;
} _sharedexception;
static _sharedexception *
-_get_shared_exception(void)
+_sharedexception_new(void)
{
_sharedexception *err = PyMem_NEW(_sharedexception, 1);
if (err == NULL) {
+ PyErr_NoMemory();
return NULL;
}
- PyObject *exc;
- PyObject *value;
- PyObject *tb;
- PyErr_Fetch(&exc, &value, &tb);
- PyObject *msg;
- if (value == NULL) {
- msg = PyUnicode_FromFormat("%S", exc);
- }
- else {
- msg = PyUnicode_FromFormat("%S: %S", exc, value);
- }
- if (msg == NULL) {
- err->msg = "unable to format exception";
- return err;
+ err->name = NULL;
+ err->msg = NULL;
+ return err;
+}
+
+static void
+_sharedexception_clear(_sharedexception *exc)
+{
+ if (exc->name != NULL) {
+ PyMem_Free(exc->name);
}
- err->msg = (char *)PyUnicode_AsUTF8(msg);
- if (err->msg == NULL) {
- err->msg = "unable to encode exception";
+ if (exc->msg != NULL) {
+ PyMem_Free(exc->msg);
}
- return err;
}
-static PyObject * RunFailedError;
+static void
+_sharedexception_free(_sharedexception *exc)
+{
+ _sharedexception_clear(exc);
+ PyMem_Free(exc);
+}
-static int
-interp_exceptions_init(PyObject *ns)
+static _sharedexception *
+_sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
{
- // XXX Move the exceptions into per-module memory?
+ assert(exctype != NULL);
+ char *failure = NULL;
- // An uncaught exception came out of interp_run_string().
- RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
- PyExc_RuntimeError, NULL);
- if (RunFailedError == NULL) {
- return -1;
+ _sharedexception *err = _sharedexception_new();
+ if (err == NULL) {
+ goto finally;
}
- if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
- return -1;
+
+ PyObject *name = PyUnicode_FromFormat("%S", exctype);
+ if (name == NULL) {
+ failure = "unable to format exception type name";
+ goto finally;
+ }
+ err->name = _copy_raw_string(name);
+ Py_DECREF(name);
+ if (err->name == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
+ failure = "out of memory copying exception type name";
+ }
+ failure = "unable to encode and copy exception type name";
+ goto finally;
}
- return 0;
+ if (exc != NULL) {
+ PyObject *msg = PyUnicode_FromFormat("%S", exc);
+ if (msg == NULL) {
+ failure = "unable to format exception message";
+ goto finally;
+ }
+ err->msg = _copy_raw_string(msg);
+ Py_DECREF(msg);
+ if (err->msg == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
+ failure = "out of memory copying exception message";
+ }
+ failure = "unable to encode and copy exception message";
+ goto finally;
+ }
+ }
+
+finally:
+ if (failure != NULL) {
+ PyErr_Clear();
+ if (err->name != NULL) {
+ PyMem_Free(err->name);
+ err->name = NULL;
+ }
+ err->msg = failure;
+ }
+ return err;
}
static void
-_apply_shared_exception(_sharedexception *exc)
+_sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
{
- PyErr_SetString(RunFailedError, exc->msg);
+ if (exc->name != NULL) {
+ if (exc->msg != NULL) {
+ PyErr_Format(wrapperclass, "%s: %s", exc->name, exc->msg);
+ }
+ else {
+ PyErr_SetString(wrapperclass, exc->name);
+ }
+ }
+ else if (exc->msg != NULL) {
+ PyErr_SetString(wrapperclass, exc->msg);
+ }
+ else {
+ PyErr_SetNone(wrapperclass);
+ }
}
-/* channel-specific code */
+
+/* channel-specific code ****************************************************/
static PyObject *ChannelError;
static PyObject *ChannelNotFoundError;
@@ -249,6 +359,139 @@ channel_exceptions_init(PyObject *ns)
return 0;
}
+/* the channel queue */
+
+struct _channelitem;
+
+typedef struct _channelitem {
+ _PyCrossInterpreterData *data;
+ struct _channelitem *next;
+} _channelitem;
+
+static _channelitem *
+_channelitem_new(void)
+{
+ _channelitem *item = PyMem_NEW(_channelitem, 1);
+ if (item == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ item->data = NULL;
+ item->next = NULL;
+ return item;
+}
+
+static void
+_channelitem_clear(_channelitem *item)
+{
+ if (item->data != NULL) {
+ _PyCrossInterpreterData_Release(item->data);
+ PyMem_Free(item->data);
+ item->data = NULL;
+ }
+ item->next = NULL;
+}
+
+static void
+_channelitem_free(_channelitem *item)
+{
+ _channelitem_clear(item);
+ PyMem_Free(item);
+}
+
+static void
+_channelitem_free_all(_channelitem *item)
+{
+ while (item != NULL) {
+ _channelitem *last = item;
+ item = item->next;
+ _channelitem_free(last);
+ }
+}
+
+static _PyCrossInterpreterData *
+_channelitem_popped(_channelitem *item)
+{
+ _PyCrossInterpreterData *data = item->data;
+ item->data = NULL;
+ _channelitem_free(item);
+ return data;
+}
+
+typedef struct _channelqueue {
+ int64_t count;
+ _channelitem *first;
+ _channelitem *last;
+} _channelqueue;
+
+static _channelqueue *
+_channelqueue_new(void)
+{
+ _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
+ if (queue == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ queue->count = 0;
+ queue->first = NULL;
+ queue->last = NULL;
+ return queue;
+}
+
+static void
+_channelqueue_clear(_channelqueue *queue)
+{
+ _channelitem_free_all(queue->first);
+ queue->count = 0;
+ queue->first = NULL;
+ queue->last = NULL;
+}
+
+static void
+_channelqueue_free(_channelqueue *queue)
+{
+ _channelqueue_clear(queue);
+ PyMem_Free(queue);
+}
+
+static int
+_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
+{
+ _channelitem *item = _channelitem_new();
+ if (item == NULL) {
+ return -1;
+ }
+ item->data = data;
+
+ queue->count += 1;
+ if (queue->first == NULL) {
+ queue->first = item;
+ }
+ else {
+ queue->last->next = item;
+ }
+ queue->last = item;
+ return 0;
+}
+
+static _PyCrossInterpreterData *
+_channelqueue_get(_channelqueue *queue)
+{
+ _channelitem *item = queue->first;
+ if (item == NULL) {
+ return NULL;
+ }
+ queue->first = item->next;
+ if (queue->last == item) {
+ queue->last = NULL;
+ }
+ queue->count -= 1;
+
+ return _channelitem_popped(item);
+}
+
+/* channel-interpreter associations */
+
struct _channelend;
typedef struct _channelend {
@@ -262,23 +505,28 @@ _channelend_new(int64_t interp)
{
_channelend *end = PyMem_NEW(_channelend, 1);
if (end == NULL) {
+ PyErr_NoMemory();
return NULL;
}
-
end->next = NULL;
end->interp = interp;
-
end->open = 1;
-
return end;
}
static void
-_channelend_free_all(_channelend *end) {
+_channelend_free(_channelend *end)
+{
+ PyMem_Free(end);
+}
+
+static void
+_channelend_free_all(_channelend *end)
+{
while (end != NULL) {
_channelend *last = end;
end = end->next;
- PyMem_Free(last);
+ _channelend_free(last);
}
}
@@ -300,24 +548,7 @@ _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
return end;
}
-struct _channelitem;
-
-typedef struct _channelitem {
- _PyCrossInterpreterData *data;
- struct _channelitem *next;
-} _channelitem;
-
-struct _channel;
-
-typedef struct _channel {
- PyThread_type_lock mutex;
-
- int open;
-
- int64_t count;
- _channelitem *first;
- _channelitem *last;
-
+typedef struct _channelassociations {
// Note that the list entries are never removed for interpreter
// for which the channel is closed. This should be a problem in
// practice. Also, a channel isn't automatically closed when an
@@ -326,39 +557,43 @@ typedef struct _channel {
int64_t numrecvopen;
_channelend *send;
_channelend *recv;
-} _PyChannelState;
+} _channelends;
-static _PyChannelState *
-_channel_new(void)
+static _channelends *
+_channelends_new(void)
{
- _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
- if (chan == NULL) {
+ _channelends *ends = PyMem_NEW(_channelends, 1);
+ if (ends== NULL) {
return NULL;
}
- chan->mutex = PyThread_allocate_lock();
- if (chan->mutex == NULL) {
- PyMem_Free(chan);
- PyErr_SetString(ChannelError,
- "can't initialize mutex for new channel");
- return NULL;
- }
-
- chan->open = 1;
+ ends->numsendopen = 0;
+ ends->numrecvopen = 0;
+ ends->send = NULL;
+ ends->recv = NULL;
+ return ends;
+}
- chan->count = 0;
- chan->first = NULL;
- chan->last = NULL;
+static void
+_channelends_clear(_channelends *ends)
+{
+ _channelend_free_all(ends->send);
+ ends->send = NULL;
+ ends->numsendopen = 0;
- chan->numsendopen = 0;
- chan->numrecvopen = 0;
- chan->send = NULL;
- chan->recv = NULL;
+ _channelend_free_all(ends->recv);
+ ends->recv = NULL;
+ ends->numrecvopen = 0;
+}
- return chan;
+static void
+_channelends_free(_channelends *ends)
+{
+ _channelends_clear(ends);
+ PyMem_Free(ends);
}
static _channelend *
-_channel_add_end(_PyChannelState *chan, _channelend *prev, int64_t interp,
+_channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
int send)
{
_channelend *end = _channelend_new(interp);
@@ -368,137 +603,163 @@ _channel_add_end(_PyChannelState *chan, _channelend *prev, int64_t interp,
if (prev == NULL) {
if (send) {
- chan->send = end;
+ ends->send = end;
}
else {
- chan->recv = end;
+ ends->recv = end;
}
}
else {
prev->next = end;
}
if (send) {
- chan->numsendopen += 1;
+ ends->numsendopen += 1;
}
else {
- chan->numrecvopen += 1;
+ ends->numrecvopen += 1;
}
return end;
}
-static _channelend *
-_channel_associate_end(_PyChannelState *chan, int64_t interp, int send)
+static int
+_channelends_associate(_channelends *ends, int64_t interp, int send)
{
- if (!chan->open) {
- PyErr_SetString(ChannelClosedError, "channel closed");
- return NULL;
- }
-
_channelend *prev;
- _channelend *end = _channelend_find(send ? chan->send : chan->recv,
+ _channelend *end = _channelend_find(send ? ends->send : ends->recv,
interp, &prev);
if (end != NULL) {
if (!end->open) {
PyErr_SetString(ChannelClosedError, "channel already closed");
- return NULL;
+ return -1;
}
// already associated
- return end;
+ return 0;
+ }
+ if (_channelends_add(ends, prev, interp, send) == NULL) {
+ return -1;
}
- return _channel_add_end(chan, prev, interp, send);
+ return 0;
+}
+
+static int
+_channelends_is_open(_channelends *ends)
+{
+ if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
+ return 1;
+ }
+ if (ends->send == NULL && ends->recv == NULL) {
+ return 1;
+ }
+ return 0;
}
static void
-_channel_close_channelend(_PyChannelState *chan, _channelend *end, int send)
+_channelends_close_end(_channelends *ends, _channelend *end, int send)
{
end->open = 0;
if (send) {
- chan->numsendopen -= 1;
+ ends->numsendopen -= 1;
}
else {
- chan->numrecvopen -= 1;
+ ends->numrecvopen -= 1;
}
}
static int
-_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int which)
+_channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
{
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
-
- int res = -1;
- if (!chan->open) {
- PyErr_SetString(ChannelClosedError, "channel already closed");
- goto done;
- }
-
_channelend *prev;
_channelend *end;
if (which >= 0) { // send/both
- end = _channelend_find(chan->send, interp, &prev);
+ end = _channelend_find(ends->send, interp, &prev);
if (end == NULL) {
// never associated so add it
- end = _channel_add_end(chan, prev, interp, 1);
+ end = _channelends_add(ends, prev, interp, 1);
if (end == NULL) {
- goto done;
+ return -1;
}
}
- _channel_close_channelend(chan, end, 1);
+ _channelends_close_end(ends, end, 1);
}
if (which <= 0) { // recv/both
- end = _channelend_find(chan->recv, interp, &prev);
+ end = _channelend_find(ends->recv, interp, &prev);
if (end == NULL) {
// never associated so add it
- end = _channel_add_end(chan, prev, interp, 0);
+ end = _channelends_add(ends, prev, interp, 0);
if (end == NULL) {
- goto done;
+ return -1;
}
}
- _channel_close_channelend(chan, end, 0);
- }
-
- if (chan->numsendopen == 0 && chan->numrecvopen == 0) {
- if (chan->send != NULL || chan->recv != NULL) {
- chan->open = 0;
- }
+ _channelends_close_end(ends, end, 0);
}
-
- res = 0;
-done:
- PyThread_release_lock(chan->mutex);
- return res;
+ return 0;
}
-static int
-_channel_close_all(_PyChannelState *chan)
+static void
+_channelends_close_all(_channelends *ends)
{
- int res = -1;
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+ // Ensure all the "send"-associated interpreters are closed.
+ _channelend *end;
+ for (end = ends->send; end != NULL; end = end->next) {
+ _channelends_close_end(ends, end, 1);
+ }
- if (!chan->open) {
- PyErr_SetString(ChannelClosedError, "channel already closed");
- goto done;
+ // Ensure all the "recv"-associated interpreters are closed.
+ for (end = ends->recv; end != NULL; end = end->next) {
+ _channelends_close_end(ends, end, 0);
}
+}
- chan->open = 0;
+/* channels */
- // We *could* also just leave these in place, since we've marked
- // the channel as closed already.
+struct _channel;
- // Ensure all the "send"-associated interpreters are closed.
- _channelend *end;
- for (end = chan->send; end != NULL; end = end->next) {
- _channel_close_channelend(chan, end, 1);
- }
+typedef struct _channel {
+ PyThread_type_lock mutex;
+ _channelqueue *queue;
+ _channelends *ends;
+ int open;
+} _PyChannelState;
- // Ensure all the "recv"-associated interpreters are closed.
- for (end = chan->recv; end != NULL; end = end->next) {
- _channel_close_channelend(chan, end, 0);
+static _PyChannelState *
+_channel_new(void)
+{
+ _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
+ if (chan == NULL) {
+ return NULL;
+ }
+ chan->mutex = PyThread_allocate_lock();
+ if (chan->mutex == NULL) {
+ PyMem_Free(chan);
+ PyErr_SetString(ChannelError,
+ "can't initialize mutex for new channel");
+ return NULL;
+ }
+ chan->queue = _channelqueue_new();
+ if (chan->queue == NULL) {
+ PyMem_Free(chan);
+ return NULL;
+ }
+ chan->ends = _channelends_new();
+ if (chan->ends == NULL) {
+ _channelqueue_free(chan->queue);
+ PyMem_Free(chan);
+ return NULL;
}
+ chan->open = 1;
+ return chan;
+}
- res = 0;
-done:
+static void
+_channel_free(_PyChannelState *chan)
+{
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+ _channelqueue_free(chan->queue);
+ _channelends_free(chan->ends);
PyThread_release_lock(chan->mutex);
- return res;
+
+ PyThread_free_lock(chan->mutex);
+ PyMem_Free(chan);
}
static int
@@ -506,24 +767,19 @@ _channel_add(_PyChannelState *chan, int64_t interp,
_PyCrossInterpreterData *data)
{
int res = -1;
-
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (_channel_associate_end(chan, interp, 1) == NULL) {
+
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel closed");
goto done;
}
-
- _channelitem *item = PyMem_NEW(_channelitem, 1);
- if (item == NULL) {
+ if (_channelends_associate(chan->ends, interp, 1) != 0) {
goto done;
}
- item->data = data;
- item->next = NULL;
- chan->count += 1;
- if (chan->first == NULL) {
- chan->first = item;
+ if (_channelqueue_put(chan->queue, data) != 0) {
+ goto done;
}
- chan->last = item;
res = 0;
done:
@@ -536,56 +792,68 @@ _channel_next(_PyChannelState *chan, int64_t interp)
{
_PyCrossInterpreterData *data = NULL;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (_channel_associate_end(chan, interp, 0) == NULL) {
- goto done;
- }
- _channelitem *item = chan->first;
- if (item == NULL) {
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel closed");
goto done;
}
- chan->first = item->next;
- if (chan->last == item) {
- chan->last = NULL;
+ if (_channelends_associate(chan->ends, interp, 0) != 0) {
+ goto done;
}
- chan->count -= 1;
-
- data = item->data;
- PyMem_Free(item);
+ data = _channelqueue_get(chan->queue);
done:
PyThread_release_lock(chan->mutex);
return data;
}
-static void
-_channel_clear(_PyChannelState *chan)
+static int
+_channel_close_interpreter(_PyChannelState *chan, int64_t interp, int which)
{
- _channelitem *item = chan->first;
- while (item != NULL) {
- _PyCrossInterpreterData_Release(item->data);
- PyMem_Free(item->data);
- _channelitem *last = item;
- item = item->next;
- PyMem_Free(last);
+ PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
+
+ int res = -1;
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel already closed");
+ goto done;
}
- chan->first = NULL;
- chan->last = NULL;
+
+ if (_channelends_close_interpreter(chan->ends, interp, which) != 0) {
+ goto done;
+ }
+ chan->open = _channelends_is_open(chan->ends);
+
+ res = 0;
+done:
+ PyThread_release_lock(chan->mutex);
+ return res;
}
-static void
-_channel_free(_PyChannelState *chan)
+static int
+_channel_close_all(_PyChannelState *chan)
{
+ int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- _channel_clear(chan);
- _channelend_free_all(chan->send);
- _channelend_free_all(chan->recv);
- PyThread_release_lock(chan->mutex);
- PyThread_free_lock(chan->mutex);
- PyMem_Free(chan);
+ if (!chan->open) {
+ PyErr_SetString(ChannelClosedError, "channel already closed");
+ goto done;
+ }
+
+ chan->open = 0;
+
+ // We *could* also just leave these in place, since we've marked
+ // the channel as closed already.
+ _channelends_close_all(chan->ends);
+
+ res = 0;
+done:
+ PyThread_release_lock(chan->mutex);
+ return res;
}
+/* the set of channels */
+
struct _channelref;
typedef struct _channelref {
@@ -609,6 +877,22 @@ _channelref_new(int64_t id, _PyChannelState *chan)
return ref;
}
+//static void
+//_channelref_clear(_channelref *ref)
+//{
+// ref->id = -1;
+// ref->chan = NULL;
+// ref->next = NULL;
+// ref->objcount = 0;
+//}
+
+static void
+_channelref_free(_channelref *ref)
+{
+ //_channelref_clear(ref);
+ PyMem_Free(ref);
+}
+
static _channelref *
_channelref_find(_channelref *first, int64_t id, _channelref **pprev)
{
@@ -640,7 +924,6 @@ _channels_init(_channels *channels)
if (channels->mutex == NULL) {
channels->mutex = PyThread_allocate_lock();
if (channels->mutex == NULL) {
- PyMem_Free(channels);
PyErr_SetString(ChannelError,
"can't initialize mutex for channel management");
return -1;
@@ -752,6 +1035,9 @@ _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan)
if (pchan != NULL) {
*pchan = ref->chan;
}
+ else {
+ _channel_free(ref->chan);
+ }
ref->chan = NULL;
}
@@ -776,7 +1062,7 @@ _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
if (pchan != NULL) {
*pchan = ref->chan;
}
- PyMem_Free(ref);
+ _channelref_free(ref);
}
static int
@@ -974,6 +1260,7 @@ _channel_recv(_channels *channels, int64_t id)
return NULL;
}
_PyCrossInterpreterData_Release(data);
+ PyMem_Free(data);
return obj;
}
@@ -995,7 +1282,7 @@ _channel_drop(_channels *channels, int64_t id, int send, int recv)
// Past this point we are responsible for releasing the mutex.
// Close one or both of the two ends.
- int res =_channel_close_interpreter(chan, interp->id, send-recv);
+ int res = _channel_close_interpreter(chan, interp->id, send-recv);
PyThread_release_lock(mutex);
return res;
}
@@ -1078,6 +1365,7 @@ channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds)
"'send' and 'recv' cannot both be False");
return NULL;
}
+
int end = 0;
if (send == 1) {
if (recv == 0 || recv == -1) {
@@ -1176,7 +1464,9 @@ channelid_hash(PyObject *self)
if (id == NULL) {
return -1;
}
- return PyObject_Hash(id);
+ Py_hash_t hash = PyObject_Hash(id);
+ Py_DECREF(id);
+ return hash;
}
static PyObject *
@@ -1208,11 +1498,11 @@ channelid_richcompare(PyObject *self, PyObject *other, int op)
Py_RETURN_NOTIMPLEMENTED;
}
int64_t othercid = PyLong_AsLongLong(other);
- // XXX decref other here?
+ Py_DECREF(other);
if (othercid == -1 && PyErr_Occurred() != NULL) {
return NULL;
}
- if (othercid < 0 || othercid > INT64_MAX) {
+ if (othercid < 0) {
equal = 0;
}
else {
@@ -1338,7 +1628,30 @@ static PyTypeObject ChannelIDtype = {
NULL, /* tp_new */
};
-/* interpreter-specific functions *******************************************/
+
+/* interpreter-specific code ************************************************/
+
+static PyObject * RunFailedError = NULL;
+
+static int
+interp_exceptions_init(PyObject *ns)
+{
+ // XXX Move the exceptions into per-module memory?
+
+ if (RunFailedError == NULL) {
+ // An uncaught exception came out of interp_run_string().
+ RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
+ PyExc_RuntimeError, NULL);
+ if (RunFailedError == NULL) {
+ return -1;
+ }
+ if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
static PyInterpreterState *
_look_up(PyObject *requested_id)
@@ -1396,10 +1709,12 @@ _ensure_not_running(PyInterpreterState *interp)
static int
_run_script(PyInterpreterState *interp, const char *codestr,
- _shareditem *shared, Py_ssize_t num_shared,
- _sharedexception **exc)
+ _sharedns *shared, _sharedexception **exc)
{
- assert(num_shared >= 0);
+ PyObject *exctype = NULL;
+ PyObject *excval = NULL;
+ PyObject *tb = NULL;
+
PyObject *main_mod = PyMapping_GetItemString(interp->modules, "__main__");
if (main_mod == NULL) {
goto error;
@@ -1413,12 +1728,9 @@ _run_script(PyInterpreterState *interp, const char *codestr,
// Apply the cross-interpreter data.
if (shared != NULL) {
- for (Py_ssize_t i=0; i < num_shared; i++) {
- _shareditem *item = &shared[i];
- if (_shareditem_apply(item, ns) != 0) {
- Py_DECREF(ns);
- goto error;
- }
+ if (_sharedns_apply(shared, ns) != 0) {
+ Py_DECREF(ns);
+ goto error;
}
}
@@ -1432,11 +1744,25 @@ _run_script(PyInterpreterState *interp, const char *codestr,
Py_DECREF(result); // We throw away the result.
}
+ *exc = NULL;
return 0;
error:
- *exc = _get_shared_exception();
- PyErr_Clear();
+ PyErr_Fetch(&exctype, &excval, &tb);
+
+ _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
+ Py_XDECREF(exctype);
+ Py_XDECREF(excval);
+ Py_XDECREF(tb);
+ if (sharedexc == NULL) {
+ fprintf(stderr, "RunFailedError: script raised an uncaught exception");
+ PyErr_Clear();
+ sharedexc = NULL;
+ }
+ else {
+ assert(!PyErr_Occurred());
+ }
+ *exc = sharedexc;
return -1;
}
@@ -1448,8 +1774,7 @@ _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
return -1;
}
- Py_ssize_t num_shared = -1;
- _shareditem *shared = _get_shared_ns(shareables, &num_shared);
+ _sharedns *shared = _get_shared_ns(shareables);
if (shared == NULL && PyErr_Occurred()) {
return -1;
}
@@ -1460,7 +1785,7 @@ _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
// Run the script.
_sharedexception *exc = NULL;
- int result = _run_script(interp, codestr, shared, num_shared, &exc);
+ int result = _run_script(interp, codestr, shared, &exc);
// Switch back.
if (save_tstate != NULL) {
@@ -1469,8 +1794,8 @@ _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
// Propagate any exception out to the caller.
if (exc != NULL) {
- _apply_shared_exception(exc);
- PyMem_Free(exc);
+ _sharedexception_apply(exc, RunFailedError);
+ _sharedexception_free(exc);
}
else if (result != 0) {
// We were unable to allocate a shared exception.
@@ -1478,8 +1803,7 @@ _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
}
if (shared != NULL) {
- _sharedns_clear(shared);
- PyMem_Free(shared);
+ _sharedns_free(shared);
}
return result;
@@ -1612,7 +1936,9 @@ interp_list_all(PyObject *self)
return NULL;
}
// insert at front of list
- if (PyList_Insert(ids, 0, id) < 0) {
+ int res = PyList_Insert(ids, 0, id);
+ Py_DECREF(id);
+ if (res < 0) {
Py_DECREF(ids);
return NULL;
}
@@ -1822,11 +2148,11 @@ channel_list_all(PyObject *self)
}
PyObject *ids = PyList_New((Py_ssize_t)count);
if (ids == NULL) {
- // XXX free cids
- return NULL;
+ goto finally;
}
- for (int64_t i=0; i < count; cids++, i++) {
- PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cids, 0,
+ int64_t *cur = cids;
+ for (int64_t i=0; i < count; cur++, i++) {
+ PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
&_globals.channels, 0);
if (id == NULL) {
Py_DECREF(ids);
@@ -1835,7 +2161,9 @@ channel_list_all(PyObject *self)
}
PyList_SET_ITEM(ids, i, id);
}
- // XXX free cids
+
+finally:
+ PyMem_Free(cids);
return ids;
}