summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/test/support/interpreters/queues.py141
-rw-r--r--Lib/test/test_interpreters/test_queues.py219
-rw-r--r--Modules/_interpqueuesmodule.c243
3 files changed, 513 insertions, 90 deletions
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py
index 1b9e748..402ceff 100644
--- a/Lib/test/support/interpreters/queues.py
+++ b/Lib/test/support/interpreters/queues.py
@@ -12,9 +12,11 @@ from _interpqueues import (
)
__all__ = [
+ 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
'create', 'list_all',
'Queue',
'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
+ 'ItemInterpreterDestroyed',
]
@@ -32,26 +34,90 @@ class QueueFull(QueueError, queue.Full):
"""
+class ItemInterpreterDestroyed(QueueError):
+ """Raised from get() and get_nowait()."""
+
+
_SHARED_ONLY = 0
_PICKLED = 1
-def create(maxsize=0, *, syncobj=False):
+
+class UnboundItem:
+ """Represents a Queue item no longer bound to an interpreter.
+
+ An item is unbound when the interpreter that added it to the queue
+ is destroyed.
+ """
+
+ __slots__ = ()
+
+ def __new__(cls):
+ return UNBOUND
+
+ def __repr__(self):
+ return f'interpreters.queues.UNBOUND'
+
+
+UNBOUND = object.__new__(UnboundItem)
+UNBOUND_ERROR = object()
+UNBOUND_REMOVE = object()
+
+_UNBOUND_CONSTANT_TO_FLAG = {
+ UNBOUND_REMOVE: 1,
+ UNBOUND_ERROR: 2,
+ UNBOUND: 3,
+}
+_UNBOUND_FLAG_TO_CONSTANT = {v: k
+ for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
+
+def _serialize_unbound(unbound):
+ op = unbound
+ try:
+ flag = _UNBOUND_CONSTANT_TO_FLAG[op]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
+ return flag,
+
+
+def _resolve_unbound(flag):
+ try:
+ op = _UNBOUND_FLAG_TO_CONSTANT[flag]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
+ if op is UNBOUND_REMOVE:
+ # "remove" not possible here
+ raise NotImplementedError
+ elif op is UNBOUND_ERROR:
+ raise ItemInterpreterDestroyed("item's original interpreter destroyed")
+ elif op is UNBOUND:
+ return UNBOUND
+ else:
+ raise NotImplementedError(repr(op))
+
+
+def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
"""Return a new cross-interpreter queue.
The queue may be used to pass data safely between interpreters.
"syncobj" sets the default for Queue.put()
and Queue.put_nowait().
+
+ "unbounditems" likewise sets the default. See Queue.put() for
+ supported values. The default value is UNBOUND, which replaces
+ the unbound item.
"""
fmt = _SHARED_ONLY if syncobj else _PICKLED
- qid = _queues.create(maxsize, fmt)
- return Queue(qid, _fmt=fmt)
+ unbound = _serialize_unbound(unbounditems)
+ unboundop, = unbound
+ qid = _queues.create(maxsize, fmt, unboundop)
+ return Queue(qid, _fmt=fmt, _unbound=unbound)
def list_all():
"""Return a list of all open queues."""
- return [Queue(qid, _fmt=fmt)
- for qid, fmt in _queues.list_all()]
+ return [Queue(qid, _fmt=fmt, _unbound=(unboundop,))
+ for qid, fmt, unboundop in _queues.list_all()]
_known_queues = weakref.WeakValueDictionary()
@@ -59,20 +125,28 @@ _known_queues = weakref.WeakValueDictionary()
class Queue:
"""A cross-interpreter queue."""
- def __new__(cls, id, /, *, _fmt=None):
+ def __new__(cls, id, /, *, _fmt=None, _unbound=None):
# There is only one instance for any given ID.
if isinstance(id, int):
id = int(id)
else:
raise TypeError(f'id must be an int, got {id!r}')
if _fmt is None:
- _fmt, = _queues.get_queue_defaults(id)
+ if _unbound is None:
+ _fmt, op = _queues.get_queue_defaults(id)
+ _unbound = (op,)
+ else:
+ _fmt, _ = _queues.get_queue_defaults(id)
+ elif _unbound is None:
+ _, op = _queues.get_queue_defaults(id)
+ _unbound = (op,)
try:
self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
self._fmt = _fmt
+ self._unbound = _unbound
_known_queues[id] = self
_queues.bind(id)
return self
@@ -124,6 +198,7 @@ class Queue:
def put(self, obj, timeout=None, *,
syncobj=None,
+ unbound=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
@@ -131,7 +206,7 @@ class Queue:
This blocks while the queue is full.
If "syncobj" is None (the default) then it uses the
- queue's default, set with create_queue()..
+ queue's default, set with create_queue().
If "syncobj" is false then all objects are supported,
at the expense of worse performance.
@@ -152,11 +227,37 @@ class Queue:
actually is. That's a slightly different and stronger promise
than just (initial) equality, which is all "syncobj=False"
can promise.
+
+ "unbound" controls the behavior of Queue.get() for the given
+ object if the current interpreter (calling put()) is later
+ destroyed.
+
+ If "unbound" is None (the default) then it uses the
+ queue's default, set with create_queue(),
+ which is usually UNBOUND.
+
+ If "unbound" is UNBOUND_ERROR then get() will raise an
+ ItemInterpreterDestroyed exception if the original interpreter
+ has been destroyed. This does not otherwise affect the queue;
+ the next call to put() will work like normal, returning the next
+ item in the queue.
+
+ If "unbound" is UNBOUND_REMOVE then the item will be removed
+ from the queue as soon as the original interpreter is destroyed.
+ Be aware that this will introduce an imbalance between put()
+ and get() calls.
+
+ If "unbound" is UNBOUND then it is returned by get() in place
+ of the unbound item.
"""
if syncobj is None:
fmt = self._fmt
else:
fmt = _SHARED_ONLY if syncobj else _PICKLED
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
@@ -166,7 +267,7 @@ class Queue:
obj = pickle.dumps(obj)
while True:
try:
- _queues.put(self._id, obj, fmt)
+ _queues.put(self._id, obj, fmt, unboundop)
except QueueFull as exc:
if timeout is not None and time.time() >= end:
raise # re-raise
@@ -174,14 +275,18 @@ class Queue:
else:
break
- def put_nowait(self, obj, *, syncobj=None):
+ def put_nowait(self, obj, *, syncobj=None, unbound=None):
if syncobj is None:
fmt = self._fmt
else:
fmt = _SHARED_ONLY if syncobj else _PICKLED
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
if fmt is _PICKLED:
obj = pickle.dumps(obj)
- _queues.put(self._id, obj, fmt)
+ _queues.put(self._id, obj, fmt, unboundop)
def get(self, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
@@ -189,6 +294,10 @@ class Queue:
"""Return the next object from the queue.
This blocks while the queue is empty.
+
+ If the next item's original interpreter has been destroyed
+ then the "next object" is determined by the value of the
+ "unbound" argument to put().
"""
if timeout is not None:
timeout = int(timeout)
@@ -197,13 +306,16 @@ class Queue:
end = time.time() + timeout
while True:
try:
- obj, fmt = _queues.get(self._id)
+ obj, fmt, unboundop = _queues.get(self._id)
except QueueEmpty as exc:
if timeout is not None and time.time() >= end:
raise # re-raise
time.sleep(_delay)
else:
break
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
if fmt == _PICKLED:
obj = pickle.loads(obj)
else:
@@ -217,9 +329,12 @@ class Queue:
is the same as get().
"""
try:
- obj, fmt = _queues.get(self._id)
+ obj, fmt, unboundop = _queues.get(self._id)
except QueueEmpty as exc:
raise # re-raise
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
if fmt == _PICKLED:
obj = pickle.loads(obj)
else:
diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py
index 9ee7647..30d58a5 100644
--- a/Lib/test/test_interpreters/test_queues.py
+++ b/Lib/test/test_interpreters/test_queues.py
@@ -12,13 +12,16 @@ from test.support.interpreters import queues
from .utils import _run_output, TestBase as _TestBase
+REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND]
+
+
def get_num_queues():
return len(_queues.list_all())
class TestBase(_TestBase):
def tearDown(self):
- for qid, _ in _queues.list_all():
+ for qid, _, _ in _queues.list_all():
try:
_queues.destroy(qid)
except Exception:
@@ -39,7 +42,7 @@ class LowLevelTests(TestBase):
importlib.reload(queues)
def test_create_destroy(self):
- qid = _queues.create(2, 0)
+ qid = _queues.create(2, 0, REPLACE)
_queues.destroy(qid)
self.assertEqual(get_num_queues(), 0)
with self.assertRaises(queues.QueueNotFoundError):
@@ -53,7 +56,7 @@ class LowLevelTests(TestBase):
'-c',
dedent(f"""
import {_queues.__name__} as _queues
- _queues.create(2, 0)
+ _queues.create(2, 0, {REPLACE})
"""),
)
self.assertEqual(stdout, '')
@@ -64,13 +67,13 @@ class LowLevelTests(TestBase):
def test_bind_release(self):
with self.subTest('typical'):
- qid = _queues.create(2, 0)
+ qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('bind too much'):
- qid = _queues.create(2, 0)
+ qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@@ -78,7 +81,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('nested'):
- qid = _queues.create(2, 0)
+ qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@@ -86,7 +89,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('release without binding'):
- qid = _queues.create(2, 0)
+ qid = _queues.create(2, 0, REPLACE)
with self.assertRaises(queues.QueueError):
_queues.release(qid)
@@ -426,26 +429,206 @@ class TestQueueOps(TestBase):
self.assertNotEqual(id(obj2), int(out))
def test_put_cleared_with_subinterpreter(self):
- interp = interpreters.create()
- queue = queues.create()
-
- out = _run_output(
- interp,
- dedent(f"""
+ def common(queue, unbound=None, presize=0):
+ if not unbound:
+ extraargs = ''
+ elif unbound is queues.UNBOUND:
+ extraargs = ', unbound=queues.UNBOUND'
+ elif unbound is queues.UNBOUND_ERROR:
+ extraargs = ', unbound=queues.UNBOUND_ERROR'
+ elif unbound is queues.UNBOUND_REMOVE:
+ extraargs = ', unbound=queues.UNBOUND_REMOVE'
+ else:
+ raise NotImplementedError(repr(unbound))
+ interp = interpreters.create()
+
+ _run_output(interp, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
- queue.put(obj1, syncobj=True)
- queue.put(obj2, syncobj=True)
+ queue.put(obj1, syncobj=True{extraargs})
+ queue.put(obj2, syncobj=True{extraargs})
"""))
- self.assertEqual(queue.qsize(), 2)
+ self.assertEqual(queue.qsize(), presize + 2)
+
+ if presize == 0:
+ obj1 = queue.get()
+ self.assertEqual(obj1, b'spam')
+ self.assertEqual(queue.qsize(), presize + 1)
+
+ return interp
+
+ with self.subTest('default'): # UNBOUND
+ queue = queues.create()
+ interp = common(queue)
+ del interp
+ obj1 = queue.get()
+ self.assertIs(obj1, queues.UNBOUND)
+ self.assertEqual(queue.qsize(), 0)
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get_nowait()
+
+ with self.subTest('UNBOUND'):
+ queue = queues.create()
+ interp = common(queue, queues.UNBOUND)
+ del interp
+ obj1 = queue.get()
+ self.assertIs(obj1, queues.UNBOUND)
+ self.assertEqual(queue.qsize(), 0)
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get_nowait()
+
+ with self.subTest('UNBOUND_ERROR'):
+ queue = queues.create()
+ interp = common(queue, queues.UNBOUND_ERROR)
+
+ del interp
+ self.assertEqual(queue.qsize(), 1)
+ with self.assertRaises(queues.ItemInterpreterDestroyed):
+ queue.get()
+
+ self.assertEqual(queue.qsize(), 0)
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get_nowait()
+
+ with self.subTest('UNBOUND_REMOVE'):
+ queue = queues.create()
+
+ interp = common(queue, queues.UNBOUND_REMOVE)
+ del interp
+ self.assertEqual(queue.qsize(), 0)
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get_nowait()
+
+ queue.put(b'ham', unbound=queues.UNBOUND_REMOVE)
+ self.assertEqual(queue.qsize(), 1)
+ interp = common(queue, queues.UNBOUND_REMOVE, 1)
+ self.assertEqual(queue.qsize(), 3)
+ queue.put(42, unbound=queues.UNBOUND_REMOVE)
+ self.assertEqual(queue.qsize(), 4)
+ del interp
+ self.assertEqual(queue.qsize(), 2)
+ obj1 = queue.get()
+ obj2 = queue.get()
+ self.assertEqual(obj1, b'ham')
+ self.assertEqual(obj2, 42)
+ self.assertEqual(queue.qsize(), 0)
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get_nowait()
+
+ def test_put_cleared_with_subinterpreter_mixed(self):
+ queue = queues.create()
+ interp = interpreters.create()
+ _run_output(interp, dedent(f"""
+ from test.support.interpreters import queues
+ queue = queues.Queue({queue.id})
+ queue.put(1, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR)
+ queue.put(3, syncobj=True)
+ queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE)
+ queue.put(5, syncobj=True, unbound=queues.UNBOUND)
+ """))
+ self.assertEqual(queue.qsize(), 5)
+
+ del interp
+ self.assertEqual(queue.qsize(), 4)
obj1 = queue.get()
- self.assertEqual(obj1, b'spam')
+ self.assertIs(obj1, queues.UNBOUND)
+ self.assertEqual(queue.qsize(), 3)
+
+ with self.assertRaises(queues.ItemInterpreterDestroyed):
+ queue.get()
+ self.assertEqual(queue.qsize(), 2)
+
+ obj2 = queue.get()
+ self.assertIs(obj2, queues.UNBOUND)
self.assertEqual(queue.qsize(), 1)
- del interp
+ obj3 = queue.get()
+ self.assertIs(obj3, queues.UNBOUND)
+ self.assertEqual(queue.qsize(), 0)
+
+ def test_put_cleared_with_subinterpreter_multiple(self):
+ queue = queues.create()
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+
+ queue.put(1, syncobj=True)
+ _run_output(interp1, dedent(f"""
+ from test.support.interpreters import queues
+ queue = queues.Queue({queue.id})
+ obj1 = queue.get()
+ queue.put(2, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE)
+ """))
+ _run_output(interp2, dedent(f"""
+ from test.support.interpreters import queues
+ queue = queues.Queue({queue.id})
+ obj2 = queue.get()
+ obj1 = queue.get()
+ """))
+ self.assertEqual(queue.qsize(), 0)
+ queue.put(3)
+ _run_output(interp1, dedent("""
+ queue.put(4, syncobj=True, unbound=queues.UNBOUND)
+ # interp closed here
+ queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE)
+ queue.put(6, syncobj=True, unbound=queues.UNBOUND)
+ """))
+ _run_output(interp2, dedent("""
+ queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR)
+ # interp closed here
+ queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR)
+ queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE)
+ queue.put(8, syncobj=True, unbound=queues.UNBOUND)
+ """))
+ _run_output(interp1, dedent("""
+ queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE)
+ queue.put(10, syncobj=True, unbound=queues.UNBOUND)
+ """))
+ self.assertEqual(queue.qsize(), 10)
+
+ obj3 = queue.get()
+ self.assertEqual(obj3, 3)
+ self.assertEqual(queue.qsize(), 9)
+
+ obj4 = queue.get()
+ self.assertEqual(obj4, 4)
+ self.assertEqual(queue.qsize(), 8)
+
+ del interp1
+ self.assertEqual(queue.qsize(), 6)
+
+ # obj5 was removed
+
+ obj6 = queue.get()
+ self.assertIs(obj6, queues.UNBOUND)
+ self.assertEqual(queue.qsize(), 5)
+
+ obj7 = queue.get()
+ self.assertEqual(obj7, 7)
+ self.assertEqual(queue.qsize(), 4)
+
+ del interp2
+ self.assertEqual(queue.qsize(), 3)
+
+ # obj1
+ with self.assertRaises(queues.ItemInterpreterDestroyed):
+ queue.get()
+ self.assertEqual(queue.qsize(), 2)
+
+ # obj2 was removed
+
+ obj8 = queue.get()
+ self.assertIs(obj8, queues.UNBOUND)
+ self.assertEqual(queue.qsize(), 1)
+
+ # obj9 was removed
+
+ obj10 = queue.get()
+ self.assertIs(obj10, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
def test_put_get_different_threads(self):
diff --git a/Modules/_interpqueuesmodule.c b/Modules/_interpqueuesmodule.c
index c99111b..8e82789 100644
--- a/Modules/_interpqueuesmodule.c
+++ b/Modules/_interpqueuesmodule.c
@@ -58,6 +58,19 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags)
return res;
}
+static inline int64_t
+_get_interpid(_PyCrossInterpreterData *data)
+{
+ int64_t interpid;
+ if (data != NULL) {
+ interpid = _PyCrossInterpreterData_INTERPID(data);
+ assert(!PyErr_Occurred());
+ }
+ else {
+ interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
+ }
+ return interpid;
+}
static PyInterpreterState *
_get_current_interp(void)
@@ -389,47 +402,98 @@ handle_queue_error(int err, PyObject *mod, int64_t qid)
}
+/* unbound items ************************************************************/
+
+#define UNBOUND_REMOVE 1
+#define UNBOUND_ERROR 2
+#define UNBOUND_REPLACE 3
+
+// It would also be possible to add UNBOUND_REPLACE where the replacement
+// value is user-provided. There would be some limitations there, though.
+// Another possibility would be something like UNBOUND_COPY, where the
+// object is released but the underlying data is copied (with the "raw"
+// allocator) and used when the item is popped off the queue.
+
+static int
+check_unbound(int unboundop)
+{
+ switch (unboundop) {
+ case UNBOUND_REMOVE:
+ case UNBOUND_ERROR:
+ case UNBOUND_REPLACE:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+
/* the basic queue **********************************************************/
struct _queueitem;
typedef struct _queueitem {
+ /* The interpreter that added the item to the queue.
+ The actual bound interpid is found in item->data.
+ This is necessary because item->data might be NULL,
+ meaning the interpreter has been destroyed. */
+ int64_t interpid;
_PyCrossInterpreterData *data;
int fmt;
+ int unboundop;
struct _queueitem *next;
} _queueitem;
static void
_queueitem_init(_queueitem *item,
- _PyCrossInterpreterData *data, int fmt)
+ int64_t interpid, _PyCrossInterpreterData *data,
+ int fmt, int unboundop)
{
+ if (interpid < 0) {
+ interpid = _get_interpid(data);
+ }
+ else {
+ assert(data == NULL
+ || _PyCrossInterpreterData_INTERPID(data) < 0
+ || interpid == _PyCrossInterpreterData_INTERPID(data));
+ }
+ assert(check_unbound(unboundop));
*item = (_queueitem){
+ .interpid = interpid,
.data = data,
.fmt = fmt,
+ .unboundop = unboundop,
};
}
static void
+_queueitem_clear_data(_queueitem *item)
+{
+ if (item->data == NULL) {
+ return;
+ }
+ // It was allocated in queue_put().
+ (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
+ item->data = NULL;
+}
+
+static void
_queueitem_clear(_queueitem *item)
{
item->next = NULL;
-
- if (item->data != NULL) {
- // It was allocated in queue_put().
- (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
- item->data = NULL;
- }
+ _queueitem_clear_data(item);
}
static _queueitem *
-_queueitem_new(_PyCrossInterpreterData *data, int fmt)
+_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data,
+ int fmt, int unboundop)
{
_queueitem *item = GLOBAL_MALLOC(_queueitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
- _queueitem_init(item, data, fmt);
+ _queueitem_init(item, interpid, data, fmt, unboundop);
return item;
}
@@ -452,15 +516,44 @@ _queueitem_free_all(_queueitem *item)
static void
_queueitem_popped(_queueitem *item,
- _PyCrossInterpreterData **p_data, int *p_fmt)
+ _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
{
*p_data = item->data;
*p_fmt = item->fmt;
+ *p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _queueitem_clear().
item->data = NULL;
_queueitem_free(item);
}
+static int
+_queueitem_clear_interpreter(_queueitem *item)
+{
+ assert(item->interpid >= 0);
+ if (item->data == NULL) {
+ // Its interpreter was already cleared (or it was never bound).
+ // For UNBOUND_REMOVE it should have been freed at that time.
+ assert(item->unboundop != UNBOUND_REMOVE);
+ return 0;
+ }
+ assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
+
+ switch (item->unboundop) {
+ case UNBOUND_REMOVE:
+ // The caller must free/clear it.
+ return 1;
+ case UNBOUND_ERROR:
+ case UNBOUND_REPLACE:
+ // We won't need the cross-interpreter data later
+ // so we completely throw it away.
+ _queueitem_clear_data(item);
+ return 0;
+ default:
+ Py_FatalError("not reachable");
+ return -1;
+ }
+}
+
/* the queue */
@@ -474,12 +567,16 @@ typedef struct _queue {
_queueitem *first;
_queueitem *last;
} items;
- int fmt;
+ struct {
+ int fmt;
+ int unboundop;
+ } defaults;
} _queue;
static int
-_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
+_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
{
+ assert(check_unbound(unboundop));
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_QUEUE_ALLOC;
@@ -490,7 +587,10 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
.items = {
.maxsize = maxsize,
},
- .fmt = fmt,
+ .defaults = {
+ .fmt = fmt,
+ .unboundop = unboundop,
+ },
};
return 0;
}
@@ -571,7 +671,8 @@ _queue_unlock(_queue *queue)
}
static int
-_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
+_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data,
+ int fmt, int unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@@ -587,7 +688,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
return ERR_QUEUE_FULL;
}
- _queueitem *item = _queueitem_new(data, fmt);
+ _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
if (item == NULL) {
_queue_unlock(queue);
return -1;
@@ -608,7 +709,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
static int
_queue_next(_queue *queue,
- _PyCrossInterpreterData **p_data, int *p_fmt)
+ _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
@@ -627,7 +728,7 @@ _queue_next(_queue *queue,
}
queue->items.count -= 1;
- _queueitem_popped(item, p_data, p_fmt);
+ _queueitem_popped(item, p_data, p_fmt, p_unboundop);
_queue_unlock(queue);
return 0;
@@ -692,14 +793,17 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid)
while (next != NULL) {
_queueitem *item = next;
next = item->next;
- if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) {
+ int remove = (item->interpid == interpid)
+ ? _queueitem_clear_interpreter(item)
+ : 0;
+ if (remove) {
+ _queueitem_free(item);
if (prev == NULL) {
- queue->items.first = item->next;
+ queue->items.first = next;
}
else {
- prev->next = item->next;
+ prev->next = next;
}
- _queueitem_free(item);
queue->items.count -= 1;
}
else {
@@ -966,18 +1070,19 @@ finally:
return res;
}
-struct queue_id_and_fmt {
+struct queue_id_and_info {
int64_t id;
int fmt;
+ int unboundop;
};
-static struct queue_id_and_fmt *
-_queues_list_all(_queues *queues, int64_t *count)
+static struct queue_id_and_info *
+_queues_list_all(_queues *queues, int64_t *p_count)
{
- struct queue_id_and_fmt *qids = NULL;
+ struct queue_id_and_info *qids = NULL;
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
- struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt,
- (Py_ssize_t)(queues->count));
+ struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info,
+ (Py_ssize_t)(queues->count));
if (ids == NULL) {
goto done;
}
@@ -985,9 +1090,10 @@ _queues_list_all(_queues *queues, int64_t *count)
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i].id = ref->qid;
assert(ref->queue != NULL);
- ids[i].fmt = ref->queue->fmt;
+ ids[i].fmt = ref->queue->defaults.fmt;
+ ids[i].unboundop = ref->queue->defaults.unboundop;
}
- *count = queues->count;
+ *p_count = queues->count;
qids = ids;
done:
@@ -1021,13 +1127,13 @@ _queue_free(_queue *queue)
// Create a new queue.
static int64_t
-queue_create(_queues *queues, Py_ssize_t maxsize, int fmt)
+queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
{
_queue *queue = GLOBAL_MALLOC(_queue);
if (queue == NULL) {
return ERR_QUEUE_ALLOC;
}
- int err = _queue_init(queue, maxsize, fmt);
+ int err = _queue_init(queue, maxsize, fmt, unboundop);
if (err < 0) {
GLOBAL_FREE(queue);
return (int64_t)err;
@@ -1056,7 +1162,7 @@ queue_destroy(_queues *queues, int64_t qid)
// Push an object onto the queue.
static int
-queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
+queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
{
// Look up the queue.
_queue *queue = NULL;
@@ -1077,9 +1183,12 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
GLOBAL_FREE(data);
return -1;
}
+ assert(_PyCrossInterpreterData_INTERPID(data) == \
+ PyInterpreterState_GetID(PyInterpreterState_Get()));
// Add the data to the queue.
- int res = _queue_add(queue, data, fmt);
+ int64_t interpid = -1; // _queueitem_init() will set it.
+ int res = _queue_add(queue, interpid, data, fmt, unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (res != 0) {
// We may chain an exception here:
@@ -1094,7 +1203,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
// Pop the next object off the queue. Fail if empty.
// XXX Support a "wait" mutex?
static int
-queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
+queue_get(_queues *queues, int64_t qid,
+ PyObject **res, int *p_fmt, int *p_unboundop)
{
int err;
*res = NULL;
@@ -1110,7 +1220,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
// Pop off the next item from the queue.
_PyCrossInterpreterData *data = NULL;
- err = _queue_next(queue, &data, p_fmt);
+ err = _queue_next(queue, &data, p_fmt, p_unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (err != 0) {
return err;
@@ -1397,15 +1507,22 @@ qidarg_converter(PyObject *arg, void *ptr)
static PyObject *
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"maxsize", "fmt", NULL};
+ static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
Py_ssize_t maxsize;
int fmt;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist,
- &maxsize, &fmt)) {
+ int unboundop;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
+ &maxsize, &fmt, &unboundop))
+ {
+ return NULL;
+ }
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
return NULL;
}
- int64_t qid = queue_create(&_globals.queues, maxsize, fmt);
+ int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
if (qid < 0) {
(void)handle_queue_error((int)qid, self, qid);
return NULL;
@@ -1427,7 +1544,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_create_doc,
-"create(maxsize, fmt) -> qid\n\
+"create(maxsize, fmt, unboundop) -> qid\n\
\n\
Create a new cross-interpreter queue and return its unique generated ID.\n\
It is a new reference as though bind() had been called on the queue.\n\
@@ -1463,9 +1580,9 @@ static PyObject *
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{
int64_t count = 0;
- struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count);
+ struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count);
if (qids == NULL) {
- if (count == 0) {
+ if (!PyErr_Occurred() && count == 0) {
return PyList_New(0);
}
return NULL;
@@ -1474,9 +1591,10 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
if (ids == NULL) {
goto finally;
}
- struct queue_id_and_fmt *cur = qids;
+ struct queue_id_and_info *cur = qids;
for (int64_t i=0; i < count; cur++, i++) {
- PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt);
+ PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
+ cur->unboundop);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
@@ -1498,18 +1616,26 @@ Each corresponding default format is also included.");
static PyObject *
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"qid", "obj", "fmt", NULL};
+ static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
qidarg_converter_data qidarg;
PyObject *obj;
int fmt;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist,
- qidarg_converter, &qidarg, &obj, &fmt)) {
+ int unboundop;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
+ qidarg_converter, &qidarg, &obj, &fmt,
+ &unboundop))
+ {
return NULL;
}
int64_t qid = qidarg.id;
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
+ return NULL;
+ }
/* Queue up the object. */
- int err = queue_put(&_globals.queues, qid, obj, fmt);
+ int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, self, qid)) {
return NULL;
@@ -1536,13 +1662,17 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
PyObject *obj = NULL;
int fmt = 0;
- int err = queue_get(&_globals.queues, qid, &obj, &fmt);
+ int unboundop = 0;
+ int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
- PyObject *res = Py_BuildValue("Oi", obj, fmt);
+ if (obj == NULL) {
+ return Py_BuildValue("Oii", Py_None, fmt, unboundop);
+ }
+ PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
Py_DECREF(obj);
return res;
}
@@ -1656,17 +1786,12 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
if (handle_queue_error(err, self, qid)) {
return NULL;
}
- int fmt = queue->fmt;
+ int fmt = queue->defaults.fmt;
+ int unboundop = queue->defaults.unboundop;
_queue_unmark_waiter(queue, _globals.queues.mutex);
- PyObject *fmt_obj = PyLong_FromLong(fmt);
- if (fmt_obj == NULL) {
- return NULL;
- }
- // For now queues only have one default.
- PyObject *res = PyTuple_Pack(1, fmt_obj);
- Py_DECREF(fmt_obj);
- return res;
+ PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
+ return defaults;
}
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,