diff options
Diffstat (limited to 'Modules/_interpqueuesmodule.c')
-rw-r--r-- | Modules/_interpqueuesmodule.c | 243 |
1 files changed, 184 insertions, 59 deletions
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c index c99111b..8e82789 100644 --- a/Modules/_interpqueuesmodule.c +++ b/Modules/_interpqueuesmodule.c @@ -58,6 +58,19 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags) return res; } +static inline int64_t +_get_interpid(_PyCrossInterpreterData *data) +{ + int64_t interpid; + if (data != NULL) { + interpid = _PyCrossInterpreterData_INTERPID(data); + assert(!PyErr_Occurred()); + } + else { + interpid = PyInterpreterState_GetID(PyInterpreterState_Get()); + } + return interpid; +} static PyInterpreterState * _get_current_interp(void) @@ -389,47 +402,98 @@ handle_queue_error(int err, PyObject *mod, int64_t qid) } +/* unbound items ************************************************************/ + +#define UNBOUND_REMOVE 1 +#define UNBOUND_ERROR 2 +#define UNBOUND_REPLACE 3 + +// It would also be possible to add UNBOUND_REPLACE where the replacement +// value is user-provided. There would be some limitations there, though. +// Another possibility would be something like UNBOUND_COPY, where the +// object is released but the underlying data is copied (with the "raw" +// allocator) and used when the item is popped off the queue. + +static int +check_unbound(int unboundop) +{ + switch (unboundop) { + case UNBOUND_REMOVE: + case UNBOUND_ERROR: + case UNBOUND_REPLACE: + return 1; + default: + return 0; + } +} + + /* the basic queue **********************************************************/ struct _queueitem; typedef struct _queueitem { + /* The interpreter that added the item to the queue. + The actual bound interpid is found in item->data. + This is necessary because item->data might be NULL, + meaning the interpreter has been destroyed. */ + int64_t interpid; _PyCrossInterpreterData *data; int fmt; + int unboundop; struct _queueitem *next; } _queueitem; static void _queueitem_init(_queueitem *item, - _PyCrossInterpreterData *data, int fmt) + int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { + if (interpid < 0) { + interpid = _get_interpid(data); + } + else { + assert(data == NULL + || _PyCrossInterpreterData_INTERPID(data) < 0 + || interpid == _PyCrossInterpreterData_INTERPID(data)); + } + assert(check_unbound(unboundop)); *item = (_queueitem){ + .interpid = interpid, .data = data, .fmt = fmt, + .unboundop = unboundop, }; } static void +_queueitem_clear_data(_queueitem *item) +{ + if (item->data == NULL) { + return; + } + // It was allocated in queue_put(). + (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); + item->data = NULL; +} + +static void _queueitem_clear(_queueitem *item) { item->next = NULL; - - if (item->data != NULL) { - // It was allocated in queue_put(). - (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE); - item->data = NULL; - } + _queueitem_clear_data(item); } static _queueitem * -_queueitem_new(_PyCrossInterpreterData *data, int fmt) +_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { _queueitem *item = GLOBAL_MALLOC(_queueitem); if (item == NULL) { PyErr_NoMemory(); return NULL; } - _queueitem_init(item, data, fmt); + _queueitem_init(item, interpid, data, fmt, unboundop); return item; } @@ -452,15 +516,44 @@ _queueitem_free_all(_queueitem *item) static void _queueitem_popped(_queueitem *item, - _PyCrossInterpreterData **p_data, int *p_fmt) + _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop) { *p_data = item->data; *p_fmt = item->fmt; + *p_unboundop = item->unboundop; // We clear them here, so they won't be released in _queueitem_clear(). item->data = NULL; _queueitem_free(item); } +static int +_queueitem_clear_interpreter(_queueitem *item) +{ + assert(item->interpid >= 0); + if (item->data == NULL) { + // Its interpreter was already cleared (or it was never bound). + // For UNBOUND_REMOVE it should have been freed at that time. + assert(item->unboundop != UNBOUND_REMOVE); + return 0; + } + assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid); + + switch (item->unboundop) { + case UNBOUND_REMOVE: + // The caller must free/clear it. + return 1; + case UNBOUND_ERROR: + case UNBOUND_REPLACE: + // We won't need the cross-interpreter data later + // so we completely throw it away. + _queueitem_clear_data(item); + return 0; + default: + Py_FatalError("not reachable"); + return -1; + } +} + /* the queue */ @@ -474,12 +567,16 @@ typedef struct _queue { _queueitem *first; _queueitem *last; } items; - int fmt; + struct { + int fmt; + int unboundop; + } defaults; } _queue; static int -_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt) +_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop) { + assert(check_unbound(unboundop)); PyThread_type_lock mutex = PyThread_allocate_lock(); if (mutex == NULL) { return ERR_QUEUE_ALLOC; @@ -490,7 +587,10 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt) .items = { .maxsize = maxsize, }, - .fmt = fmt, + .defaults = { + .fmt = fmt, + .unboundop = unboundop, + }, }; return 0; } @@ -571,7 +671,8 @@ _queue_unlock(_queue *queue) } static int -_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) +_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data, + int fmt, int unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -587,7 +688,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) return ERR_QUEUE_FULL; } - _queueitem *item = _queueitem_new(data, fmt); + _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop); if (item == NULL) { _queue_unlock(queue); return -1; @@ -608,7 +709,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt) static int _queue_next(_queue *queue, - _PyCrossInterpreterData **p_data, int *p_fmt) + _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop) { int err = _queue_lock(queue); if (err < 0) { @@ -627,7 +728,7 @@ _queue_next(_queue *queue, } queue->items.count -= 1; - _queueitem_popped(item, p_data, p_fmt); + _queueitem_popped(item, p_data, p_fmt, p_unboundop); _queue_unlock(queue); return 0; @@ -692,14 +793,17 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid) while (next != NULL) { _queueitem *item = next; next = item->next; - if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) { + int remove = (item->interpid == interpid) + ? _queueitem_clear_interpreter(item) + : 0; + if (remove) { + _queueitem_free(item); if (prev == NULL) { - queue->items.first = item->next; + queue->items.first = next; } else { - prev->next = item->next; + prev->next = next; } - _queueitem_free(item); queue->items.count -= 1; } else { @@ -966,18 +1070,19 @@ finally: return res; } -struct queue_id_and_fmt { +struct queue_id_and_info { int64_t id; int fmt; + int unboundop; }; -static struct queue_id_and_fmt * -_queues_list_all(_queues *queues, int64_t *count) +static struct queue_id_and_info * +_queues_list_all(_queues *queues, int64_t *p_count) { - struct queue_id_and_fmt *qids = NULL; + struct queue_id_and_info *qids = NULL; PyThread_acquire_lock(queues->mutex, WAIT_LOCK); - struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt, - (Py_ssize_t)(queues->count)); + struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info, + (Py_ssize_t)(queues->count)); if (ids == NULL) { goto done; } @@ -985,9 +1090,10 @@ _queues_list_all(_queues *queues, int64_t *count) for (int64_t i=0; ref != NULL; ref = ref->next, i++) { ids[i].id = ref->qid; assert(ref->queue != NULL); - ids[i].fmt = ref->queue->fmt; + ids[i].fmt = ref->queue->defaults.fmt; + ids[i].unboundop = ref->queue->defaults.unboundop; } - *count = queues->count; + *p_count = queues->count; qids = ids; done: @@ -1021,13 +1127,13 @@ _queue_free(_queue *queue) // Create a new queue. static int64_t -queue_create(_queues *queues, Py_ssize_t maxsize, int fmt) +queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop) { _queue *queue = GLOBAL_MALLOC(_queue); if (queue == NULL) { return ERR_QUEUE_ALLOC; } - int err = _queue_init(queue, maxsize, fmt); + int err = _queue_init(queue, maxsize, fmt, unboundop); if (err < 0) { GLOBAL_FREE(queue); return (int64_t)err; @@ -1056,7 +1162,7 @@ queue_destroy(_queues *queues, int64_t qid) // Push an object onto the queue. static int -queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) +queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) { // Look up the queue. _queue *queue = NULL; @@ -1077,9 +1183,12 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) GLOBAL_FREE(data); return -1; } + assert(_PyCrossInterpreterData_INTERPID(data) == \ + PyInterpreterState_GetID(PyInterpreterState_Get())); // Add the data to the queue. - int res = _queue_add(queue, data, fmt); + int64_t interpid = -1; // _queueitem_init() will set it. + int res = _queue_add(queue, interpid, data, fmt, unboundop); _queue_unmark_waiter(queue, queues->mutex); if (res != 0) { // We may chain an exception here: @@ -1094,7 +1203,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt) // Pop the next object off the queue. Fail if empty. // XXX Support a "wait" mutex? static int -queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt) +queue_get(_queues *queues, int64_t qid, + PyObject **res, int *p_fmt, int *p_unboundop) { int err; *res = NULL; @@ -1110,7 +1220,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt) // Pop off the next item from the queue. _PyCrossInterpreterData *data = NULL; - err = _queue_next(queue, &data, p_fmt); + err = _queue_next(queue, &data, p_fmt, p_unboundop); _queue_unmark_waiter(queue, queues->mutex); if (err != 0) { return err; @@ -1397,15 +1507,22 @@ qidarg_converter(PyObject *arg, void *ptr) static PyObject * queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"maxsize", "fmt", NULL}; + static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL}; Py_ssize_t maxsize; int fmt; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist, - &maxsize, &fmt)) { + int unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist, + &maxsize, &fmt, &unboundop)) + { + return NULL; + } + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); return NULL; } - int64_t qid = queue_create(&_globals.queues, maxsize, fmt); + int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop); if (qid < 0) { (void)handle_queue_error((int)qid, self, qid); return NULL; @@ -1427,7 +1544,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(queuesmod_create_doc, -"create(maxsize, fmt) -> qid\n\ +"create(maxsize, fmt, unboundop) -> qid\n\ \n\ Create a new cross-interpreter queue and return its unique generated ID.\n\ It is a new reference as though bind() had been called on the queue.\n\ @@ -1463,9 +1580,9 @@ static PyObject * queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) { int64_t count = 0; - struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count); + struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count); if (qids == NULL) { - if (count == 0) { + if (!PyErr_Occurred() && count == 0) { return PyList_New(0); } return NULL; @@ -1474,9 +1591,10 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) if (ids == NULL) { goto finally; } - struct queue_id_and_fmt *cur = qids; + struct queue_id_and_info *cur = qids; for (int64_t i=0; i < count; cur++, i++) { - PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt); + PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt, + cur->unboundop); if (item == NULL) { Py_SETREF(ids, NULL); break; @@ -1498,18 +1616,26 @@ Each corresponding default format is also included."); static PyObject * queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"qid", "obj", "fmt", NULL}; + static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL}; qidarg_converter_data qidarg; PyObject *obj; int fmt; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist, - qidarg_converter, &qidarg, &obj, &fmt)) { + int unboundop; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist, + qidarg_converter, &qidarg, &obj, &fmt, + &unboundop)) + { return NULL; } int64_t qid = qidarg.id; + if (!check_unbound(unboundop)) { + PyErr_Format(PyExc_ValueError, + "unsupported unboundop %d", unboundop); + return NULL; + } /* Queue up the object. */ - int err = queue_put(&_globals.queues, qid, obj, fmt); + int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop); // This is the only place that raises QueueFull. if (handle_queue_error(err, self, qid)) { return NULL; @@ -1536,13 +1662,17 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds) PyObject *obj = NULL; int fmt = 0; - int err = queue_get(&_globals.queues, qid, &obj, &fmt); + int unboundop = 0; + int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop); // This is the only place that raises QueueEmpty. if (handle_queue_error(err, self, qid)) { return NULL; } - PyObject *res = Py_BuildValue("Oi", obj, fmt); + if (obj == NULL) { + return Py_BuildValue("Oii", Py_None, fmt, unboundop); + } + PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None); Py_DECREF(obj); return res; } @@ -1656,17 +1786,12 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds) if (handle_queue_error(err, self, qid)) { return NULL; } - int fmt = queue->fmt; + int fmt = queue->defaults.fmt; + int unboundop = queue->defaults.unboundop; _queue_unmark_waiter(queue, _globals.queues.mutex); - PyObject *fmt_obj = PyLong_FromLong(fmt); - if (fmt_obj == NULL) { - return NULL; - } - // For now queues only have one default. - PyObject *res = PyTuple_Pack(1, fmt_obj); - Py_DECREF(fmt_obj); - return res; + PyObject *defaults = Py_BuildValue("ii", fmt, unboundop); + return defaults; } PyDoc_STRVAR(queuesmod_get_queue_defaults_doc, |