summaryrefslogtreecommitdiffstats
path: root/Modules/_interpqueuesmodule.c
diff options
context:
space:
mode:
Diffstat (limited to 'Modules/_interpqueuesmodule.c')
-rw-r--r--Modules/_interpqueuesmodule.c243
1 files changed, 184 insertions, 59 deletions
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c
index c99111b..8e82789 100644
--- a/Modules/_interpqueuesmodule.c
+++ b/Modules/_interpqueuesmodule.c
@@ -58,6 +58,19 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags)
return res;
}
+static inline int64_t
+_get_interpid(_PyCrossInterpreterData *data)
+{
+ int64_t interpid;
+ if (data != NULL) {
+ interpid = _PyCrossInterpreterData_INTERPID(data);
+ assert(!PyErr_Occurred());
+ }
+ else {
+ interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
+ }
+ return interpid;
+}
static PyInterpreterState *
_get_current_interp(void)
@@ -389,47 +402,98 @@ handle_queue_error(int err, PyObject *mod, int64_t qid)
}
+/* unbound items ************************************************************/
+
+#define UNBOUND_REMOVE 1
+#define UNBOUND_ERROR 2
+#define UNBOUND_REPLACE 3
+
+// It would also be possible to add UNBOUND_REPLACE where the replacement
+// value is user-provided. There would be some limitations there, though.
+// Another possibility would be something like UNBOUND_COPY, where the
+// object is released but the underlying data is copied (with the "raw"
+// allocator) and used when the item is popped off the queue.
+
+static int
+check_unbound(int unboundop)
+{
+ switch (unboundop) {
+ case UNBOUND_REMOVE:
+ case UNBOUND_ERROR:
+ case UNBOUND_REPLACE:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+
/* the basic queue **********************************************************/
struct _queueitem;
typedef struct _queueitem {
+ /* The interpreter that added the item to the queue.
+ The actual bound interpid is found in item->data.
+ This is necessary because item->data might be NULL,
+ meaning the interpreter has been destroyed. */
+ int64_t interpid;
_PyCrossInterpreterData *data;
int fmt;
+ int unboundop;
struct _queueitem *next;
} _queueitem;
static void
_queueitem_init(_queueitem *item,
- _PyCrossInterpreterData *data, int fmt)
+ int64_t interpid, _PyCrossInterpreterData *data,
+ int fmt, int unboundop)
{
+ if (interpid < 0) {
+ interpid = _get_interpid(data);
+ }
+ else {
+ assert(data == NULL
+ || _PyCrossInterpreterData_INTERPID(data) < 0
+ || interpid == _PyCrossInterpreterData_INTERPID(data));
+ }
+ assert(check_unbound(unboundop));
*item = (_queueitem){
+ .interpid = interpid,
.data = data,
.fmt = fmt,
+ .unboundop = unboundop,
};
}
static void
+_queueitem_clear_data(_queueitem *item)
+{
+ if (item->data == NULL) {
+ return;
+ }
+ // It was allocated in queue_put().
+ (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
+ item->data = NULL;
+}
+
+static void
_queueitem_clear(_queueitem *item)
{
item->next = NULL;
-
- if (item->data != NULL) {
- // It was allocated in queue_put().
- (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
- item->data = NULL;
- }
+ _queueitem_clear_data(item);
}
static _queueitem *
-_queueitem_new(_PyCrossInterpreterData *data, int fmt)
+_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data,
+ int fmt, int unboundop)
{
_queueitem *item = GLOBAL_MALLOC(_queueitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
- _queueitem_init(item, data, fmt);
+ _queueitem_init(item, interpid, data, fmt, unboundop);
return item;
}
@@ -452,15 +516,44 @@ _queueitem_free_all(_queueitem *item)
static void
_queueitem_popped(_queueitem *item,
- _PyCrossInterpreterData **p_data, int *p_fmt)
+ _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
{
*p_data = item->data;
*p_fmt = item->fmt;
+ *p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _queueitem_clear().
item->data = NULL;
_queueitem_free(item);
}
+static int
+_queueitem_clear_interpreter(_queueitem *item)
+{
+ assert(item->interpid >= 0);
+ if (item->data == NULL) {
+ // Its interpreter was already cleared (or it was never bound).
+ // For UNBOUND_REMOVE it should have been freed at that time.
+ assert(item->unboundop != UNBOUND_REMOVE);
+ return 0;
+ }
+ assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
+
+ switch (item->unboundop) {
+ case UNBOUND_REMOVE:
+ // The caller must free/clear it.
+ return 1;
+ case UNBOUND_ERROR:
+ case UNBOUND_REPLACE:
+ // We won't need the cross-interpreter data later
+ // so we completely throw it away.
+ _queueitem_clear_data(item);
+ return 0;
+ default:
+ Py_FatalError("not reachable");
+ return -1;
+ }
+}
+
/* the queue */
@@ -474,12 +567,16 @@ typedef struct _queue {
_queueitem *first;
_queueitem *last;
} items;
- int fmt;
+ struct {
+ int fmt;
+ int unboundop;
+ } defaults;
} _queue;
static int
-_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
+_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
{
+ assert(check_unbound(unboundop));
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_QUEUE_ALLOC;
@@ -490,7 +587,10 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
.items = {
.maxsize = maxsize,
},
- .fmt = fmt,
+ .defaults = {
+ .fmt = fmt,
+ .unboundop = unboundop,
+ },
};
return 0;
}
@@ -571,7 +671,8 @@ _queue_unlock(_queue *queue)
}
static int
-_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
+_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data,
+ int fmt, int unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@@ -587,7 +688,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
return ERR_QUEUE_FULL;
}
- _queueitem *item = _queueitem_new(data, fmt);
+ _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
if (item == NULL) {
_queue_unlock(queue);
return -1;
@@ -608,7 +709,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
static int
_queue_next(_queue *queue,
- _PyCrossInterpreterData **p_data, int *p_fmt)
+ _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@@ -627,7 +728,7 @@ _queue_next(_queue *queue,
}
queue->items.count -= 1;
- _queueitem_popped(item, p_data, p_fmt);
+ _queueitem_popped(item, p_data, p_fmt, p_unboundop);
_queue_unlock(queue);
return 0;
@@ -692,14 +793,17 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid)
while (next != NULL) {
_queueitem *item = next;
next = item->next;
- if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) {
+ int remove = (item->interpid == interpid)
+ ? _queueitem_clear_interpreter(item)
+ : 0;
+ if (remove) {
+ _queueitem_free(item);
if (prev == NULL) {
- queue->items.first = item->next;
+ queue->items.first = next;
}
else {
- prev->next = item->next;
+ prev->next = next;
}
- _queueitem_free(item);
queue->items.count -= 1;
}
else {
@@ -966,18 +1070,19 @@ finally:
return res;
}
-struct queue_id_and_fmt {
+struct queue_id_and_info {
int64_t id;
int fmt;
+ int unboundop;
};
-static struct queue_id_and_fmt *
-_queues_list_all(_queues *queues, int64_t *count)
+static struct queue_id_and_info *
+_queues_list_all(_queues *queues, int64_t *p_count)
{
- struct queue_id_and_fmt *qids = NULL;
+ struct queue_id_and_info *qids = NULL;
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
- struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt,
- (Py_ssize_t)(queues->count));
+ struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info,
+ (Py_ssize_t)(queues->count));
if (ids == NULL) {
goto done;
}
@@ -985,9 +1090,10 @@ _queues_list_all(_queues *queues, int64_t *count)
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i].id = ref->qid;
assert(ref->queue != NULL);
- ids[i].fmt = ref->queue->fmt;
+ ids[i].fmt = ref->queue->defaults.fmt;
+ ids[i].unboundop = ref->queue->defaults.unboundop;
}
- *count = queues->count;
+ *p_count = queues->count;
qids = ids;
done:
@@ -1021,13 +1127,13 @@ _queue_free(_queue *queue)
// Create a new queue.
static int64_t
-queue_create(_queues *queues, Py_ssize_t maxsize, int fmt)
+queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
{
_queue *queue = GLOBAL_MALLOC(_queue);
if (queue == NULL) {
return ERR_QUEUE_ALLOC;
}
- int err = _queue_init(queue, maxsize, fmt);
+ int err = _queue_init(queue, maxsize, fmt, unboundop);
if (err < 0) {
GLOBAL_FREE(queue);
return (int64_t)err;
@@ -1056,7 +1162,7 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
-queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
+queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
{
// Look up the queue.
_queue *queue = NULL;
@@ -1077,9 +1183,12 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
GLOBAL_FREE(data);
return -1;
}
+ assert(_PyCrossInterpreterData_INTERPID(data) == \
+ PyInterpreterState_GetID(PyInterpreterState_Get()));
// Add the data to the queue.
- int res = _queue_add(queue, data, fmt);
+ int64_t interpid = -1; // _queueitem_init() will set it.
+ int res = _queue_add(queue, interpid, data, fmt, unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (res != 0) {
// We may chain an exception here:
@@ -1094,7 +1203,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
// Pop the next object off the queue. Fail if empty.
// XXX Support a "wait" mutex?
static int
-queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
+queue_get(_queues *queues, int64_t qid,
+ PyObject **res, int *p_fmt, int *p_unboundop)
{
int err;
*res = NULL;
@@ -1110,7 +1220,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
// Pop off the next item from the queue.
_PyCrossInterpreterData *data = NULL;
- err = _queue_next(queue, &data, p_fmt);
+ err = _queue_next(queue, &data, p_fmt, p_unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (err != 0) {
return err;
@@ -1397,15 +1507,22 @@ qidarg_converter(PyObject *arg, void *ptr)
static PyObject *
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"maxsize", "fmt", NULL};
+ static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
Py_ssize_t maxsize;
int fmt;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist,
- &maxsize, &fmt)) {
+ int unboundop;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
+ &maxsize, &fmt, &unboundop))
+ {
+ return NULL;
+ }
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
return NULL;
}
- int64_t qid = queue_create(&_globals.queues, maxsize, fmt);
+ int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
if (qid < 0) {
(void)handle_queue_error((int)qid, self, qid);
return NULL;
@@ -1427,7 +1544,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_create_doc,
-"create(maxsize, fmt) -> qid\n\
+"create(maxsize, fmt, unboundop) -> qid\n\
\n\
Create a new cross-interpreter queue and return its unique generated ID.\n\
It is a new reference as though bind() had been called on the queue.\n\
@@ -1463,9 +1580,9 @@ static PyObject *
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{
int64_t count = 0;
- struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count);
+ struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count);
if (qids == NULL) {
- if (count == 0) {
+ if (!PyErr_Occurred() && count == 0) {
return PyList_New(0);
}
return NULL;
@@ -1474,9 +1591,10 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
if (ids == NULL) {
goto finally;
}
- struct queue_id_and_fmt *cur = qids;
+ struct queue_id_and_info *cur = qids;
for (int64_t i=0; i < count; cur++, i++) {
- PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt);
+ PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
+ cur->unboundop);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
@@ -1498,18 +1616,26 @@ Each corresponding default format is also included.");
static PyObject *
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"qid", "obj", "fmt", NULL};
+ static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
qidarg_converter_data qidarg;
PyObject *obj;
int fmt;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist,
- qidarg_converter, &qidarg, &obj, &fmt)) {
+ int unboundop;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
+ qidarg_converter, &qidarg, &obj, &fmt,
+ &unboundop))
+ {
return NULL;
}
int64_t qid = qidarg.id;
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
+ return NULL;
+ }
/* Queue up the object. */
- int err = queue_put(&_globals.queues, qid, obj, fmt);
+ int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, self, qid)) {
return NULL;
@@ -1536,13 +1662,17 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
PyObject *obj = NULL;
int fmt = 0;
- int err = queue_get(&_globals.queues, qid, &obj, &fmt);
+ int unboundop = 0;
+ int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
- PyObject *res = Py_BuildValue("Oi", obj, fmt);
+ if (obj == NULL) {
+ return Py_BuildValue("Oii", Py_None, fmt, unboundop);
+ }
+ PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
Py_DECREF(obj);
return res;
}
@@ -1656,17 +1786,12 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
if (handle_queue_error(err, self, qid)) {
return NULL;
}
- int fmt = queue->fmt;
+ int fmt = queue->defaults.fmt;
+ int unboundop = queue->defaults.unboundop;
_queue_unmark_waiter(queue, _globals.queues.mutex);
- PyObject *fmt_obj = PyLong_FromLong(fmt);
- if (fmt_obj == NULL) {
- return NULL;
- }
- // For now queues only have one default.
- PyObject *res = PyTuple_Pack(1, fmt_obj);
- Py_DECREF(fmt_obj);
- return res;
+ PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
+ return defaults;
}
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,