summaryrefslogtreecommitdiffstats
path: root/Lib/test/support
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2024-07-15 19:43:59 (GMT)
committerGitHub <noreply@github.com>2024-07-15 19:43:59 (GMT)
commit8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a (patch)
tree517e2f87dc7a278d390cfd51c91a03a694c6c7f2 /Lib/test/support
parentfd085a411ed2ccc9bde2338cf50068bc7f213ece (diff)
downloadcpython-8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a.zip
cpython-8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a.tar.gz
cpython-8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a.tar.bz2
gh-76785: Expand How Interpreter Channels Handle Interpreter Finalization (gh-121805)
See 6b98b274b6 for an explanation of the problem and solution. Here I've applied the solution to channels.
Diffstat (limited to 'Lib/test/support')
-rw-r--r--Lib/test/support/interpreters/_crossinterp.py102
-rw-r--r--Lib/test/support/interpreters/channels.py110
-rw-r--r--Lib/test/support/interpreters/queues.py60
3 files changed, 209 insertions, 63 deletions
diff --git a/Lib/test/support/interpreters/_crossinterp.py b/Lib/test/support/interpreters/_crossinterp.py
new file mode 100644
index 0000000..544e197
--- /dev/null
+++ b/Lib/test/support/interpreters/_crossinterp.py
@@ -0,0 +1,102 @@
+"""Common code between queues and channels."""
+
+
+class ItemInterpreterDestroyed(Exception):
+ """Raised when trying to get an item whose interpreter was destroyed."""
+
+
+class classonly:
+ """A non-data descriptor that makes a value only visible on the class.
+
+ This is like the "classmethod" builtin, but does not show up on
+ instances of the class. It may be used as a decorator.
+ """
+
+ def __init__(self, value):
+ self.value = value
+ self.getter = classmethod(value).__get__
+ self.name = None
+
+ def __set_name__(self, cls, name):
+ if self.name is not None:
+ raise TypeError('already used')
+ self.name = name
+
+ def __get__(self, obj, cls):
+ if obj is not None:
+ raise AttributeError(self.name)
+ # called on the class
+ return self.getter(None, cls)
+
+
+class UnboundItem:
+ """Represents a cross-interpreter item no longer bound to an interpreter.
+
+ An item is unbound when the interpreter that added it to the
+ cross-interpreter container is destroyed.
+ """
+
+ __slots__ = ()
+
+ @classonly
+ def singleton(cls, kind, module, name='UNBOUND'):
+ doc = cls.__doc__.replace('cross-interpreter container', kind)
+ doc = doc.replace('cross-interpreter', kind)
+ subclass = type(
+ f'Unbound{kind.capitalize()}Item',
+ (cls,),
+ dict(
+ _MODULE=module,
+ _NAME=name,
+ __doc__=doc,
+ ),
+ )
+ return object.__new__(subclass)
+
+ _MODULE = __name__
+ _NAME = 'UNBOUND'
+
+ def __new__(cls):
+ raise Exception(f'use {cls._MODULE}.{cls._NAME}')
+
+ def __repr__(self):
+ return f'{self._MODULE}.{self._NAME}'
+# return f'interpreters.queues.UNBOUND'
+
+
+UNBOUND = object.__new__(UnboundItem)
+UNBOUND_ERROR = object()
+UNBOUND_REMOVE = object()
+
+_UNBOUND_CONSTANT_TO_FLAG = {
+ UNBOUND_REMOVE: 1,
+ UNBOUND_ERROR: 2,
+ UNBOUND: 3,
+}
+_UNBOUND_FLAG_TO_CONSTANT = {v: k
+ for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
+
+
+def serialize_unbound(unbound):
+ op = unbound
+ try:
+ flag = _UNBOUND_CONSTANT_TO_FLAG[op]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
+ return flag,
+
+
+def resolve_unbound(flag, exctype_destroyed):
+ try:
+ op = _UNBOUND_FLAG_TO_CONSTANT[flag]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
+ if op is UNBOUND_REMOVE:
+ # "remove" not possible here
+ raise NotImplementedError
+ elif op is UNBOUND_ERROR:
+ raise exctype_destroyed("item's original interpreter destroyed")
+ elif op is UNBOUND:
+ return UNBOUND
+ else:
+ raise NotImplementedError(repr(op))
diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/interpreters/channels.py
index fbae7e6..d2bd93d 100644
--- a/Lib/test/support/interpreters/channels.py
+++ b/Lib/test/support/interpreters/channels.py
@@ -2,35 +2,68 @@
import time
import _interpchannels as _channels
+from . import _crossinterp
# aliases:
from _interpchannels import (
ChannelError, ChannelNotFoundError, ChannelClosedError,
ChannelEmptyError, ChannelNotEmptyError,
)
+from ._crossinterp import (
+ UNBOUND_ERROR, UNBOUND_REMOVE,
+)
__all__ = [
+ 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
'create', 'list_all',
'SendChannel', 'RecvChannel',
'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError',
+ 'ItemInterpreterDestroyed',
]
-def create():
+class ItemInterpreterDestroyed(ChannelError,
+ _crossinterp.ItemInterpreterDestroyed):
+ """Raised from get() and get_nowait()."""
+
+
+UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
+
+
+def _serialize_unbound(unbound):
+ if unbound is UNBOUND:
+ unbound = _crossinterp.UNBOUND
+ return _crossinterp.serialize_unbound(unbound)
+
+
+def _resolve_unbound(flag):
+ resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
+ if resolved is _crossinterp.UNBOUND:
+ resolved = UNBOUND
+ return resolved
+
+
+def create(*, unbounditems=UNBOUND):
"""Return (recv, send) for a new cross-interpreter channel.
The channel may be used to pass data safely between interpreters.
+
+ "unbounditems" sets the default for the send end of the channel.
+ See SendChannel.send() for supported values. The default value
+ is UNBOUND, which replaces the unbound item when received.
"""
- cid = _channels.create()
- recv, send = RecvChannel(cid), SendChannel(cid)
+ unbound = _serialize_unbound(unbounditems)
+ unboundop, = unbound
+ cid = _channels.create(unboundop)
+ recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
return recv, send
def list_all():
"""Return a list of (recv, send) for all open channels."""
- return [(RecvChannel(cid), SendChannel(cid))
- for cid in _channels.list_all()]
+ return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
+ for cid, unbound in _channels.list_all()]
class _ChannelEnd:
@@ -106,12 +139,15 @@ class RecvChannel(_ChannelEnd):
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
- obj = _channels.recv(self._id, _sentinel)
+ obj, unboundop = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
if timeout is not None and time.time() >= end:
raise TimeoutError
- obj = _channels.recv(self._id, _sentinel)
+ obj, unboundop = _channels.recv(self._id, _sentinel)
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
return obj
def recv_nowait(self, default=_NOT_SET):
@@ -122,9 +158,13 @@ class RecvChannel(_ChannelEnd):
is the same as recv().
"""
if default is _NOT_SET:
- return _channels.recv(self._id)
+ obj, unboundop = _channels.recv(self._id)
else:
- return _channels.recv(self._id, default)
+ obj, unboundop = _channels.recv(self._id, default)
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
+ return obj
def close(self):
_channels.close(self._id, recv=True)
@@ -135,43 +175,79 @@ class SendChannel(_ChannelEnd):
_end = 'send'
+ def __new__(cls, cid, *, _unbound=None):
+ if _unbound is None:
+ try:
+ op = _channels.get_channel_defaults(cid)
+ _unbound = (op,)
+ except ChannelNotFoundError:
+ _unbound = _serialize_unbound(UNBOUND)
+ self = super().__new__(cls, cid)
+ self._unbound = _unbound
+ return self
+
@property
def is_closed(self):
info = self._info
return info.closed or info.closing
- def send(self, obj, timeout=None):
+ def send(self, obj, timeout=None, *,
+ unbound=None,
+ ):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
- _channels.send(self._id, obj, timeout=timeout, blocking=True)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
- def send_nowait(self, obj):
+ def send_nowait(self, obj, *,
+ unbound=None,
+ ):
"""Send the object to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
- return _channels.send(self._id, obj, blocking=False)
+ return _channels.send(self._id, obj, unboundop, blocking=False)
- def send_buffer(self, obj, timeout=None):
+ def send_buffer(self, obj, timeout=None, *,
+ unbound=None,
+ ):
"""Send the object's buffer to the channel's receiving end.
This blocks until the object is received.
"""
- _channels.send_buffer(self._id, obj, timeout=timeout, blocking=True)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ _channels.send_buffer(self._id, obj, unboundop,
+ timeout=timeout, blocking=True)
- def send_buffer_nowait(self, obj):
+ def send_buffer_nowait(self, obj, *,
+ unbound=None,
+ ):
"""Send the object's buffer to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
- return _channels.send_buffer(self._id, obj, blocking=False)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
def close(self):
_channels.close(self._id, send=True)
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py
index 402ceff..deb8e86 100644
--- a/Lib/test/support/interpreters/queues.py
+++ b/Lib/test/support/interpreters/queues.py
@@ -5,11 +5,15 @@ import queue
import time
import weakref
import _interpqueues as _queues
+from . import _crossinterp
# aliases:
from _interpqueues import (
QueueError, QueueNotFoundError,
)
+from ._crossinterp import (
+ UNBOUND_ERROR, UNBOUND_REMOVE,
+)
__all__ = [
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
@@ -34,7 +38,8 @@ class QueueFull(QueueError, queue.Full):
"""
-class ItemInterpreterDestroyed(QueueError):
+class ItemInterpreterDestroyed(QueueError,
+ _crossinterp.ItemInterpreterDestroyed):
"""Raised from get() and get_nowait()."""
@@ -42,57 +47,20 @@ _SHARED_ONLY = 0
_PICKLED = 1
-class UnboundItem:
- """Represents a Queue item no longer bound to an interpreter.
-
- An item is unbound when the interpreter that added it to the queue
- is destroyed.
- """
-
- __slots__ = ()
-
- def __new__(cls):
- return UNBOUND
-
- def __repr__(self):
- return f'interpreters.queues.UNBOUND'
-
-
-UNBOUND = object.__new__(UnboundItem)
-UNBOUND_ERROR = object()
-UNBOUND_REMOVE = object()
+UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
-_UNBOUND_CONSTANT_TO_FLAG = {
- UNBOUND_REMOVE: 1,
- UNBOUND_ERROR: 2,
- UNBOUND: 3,
-}
-_UNBOUND_FLAG_TO_CONSTANT = {v: k
- for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
def _serialize_unbound(unbound):
- op = unbound
- try:
- flag = _UNBOUND_CONSTANT_TO_FLAG[op]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
- return flag,
+ if unbound is UNBOUND:
+ unbound = _crossinterp.UNBOUND
+ return _crossinterp.serialize_unbound(unbound)
def _resolve_unbound(flag):
- try:
- op = _UNBOUND_FLAG_TO_CONSTANT[flag]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
- if op is UNBOUND_REMOVE:
- # "remove" not possible here
- raise NotImplementedError
- elif op is UNBOUND_ERROR:
- raise ItemInterpreterDestroyed("item's original interpreter destroyed")
- elif op is UNBOUND:
- return UNBOUND
- else:
- raise NotImplementedError(repr(op))
+ resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
+ if resolved is _crossinterp.UNBOUND:
+ resolved = UNBOUND
+ return resolved
def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):