diff options
author | Eric Snow <ericsnowcurrently@gmail.com> | 2024-07-15 19:43:59 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-15 19:43:59 (GMT) |
commit | 8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a (patch) | |
tree | 517e2f87dc7a278d390cfd51c91a03a694c6c7f2 /Lib/test/support | |
parent | fd085a411ed2ccc9bde2338cf50068bc7f213ece (diff) | |
download | cpython-8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a.zip cpython-8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a.tar.gz cpython-8b209fd4f8a9bf9603888bda2c44b5cfd4ebf47a.tar.bz2 |
gh-76785: Expand How Interpreter Channels Handle Interpreter Finalization (gh-121805)
See 6b98b274b6 for an explanation of the problem and solution. Here I've applied the solution to channels.
Diffstat (limited to 'Lib/test/support')
-rw-r--r-- | Lib/test/support/interpreters/_crossinterp.py | 102 | ||||
-rw-r--r-- | Lib/test/support/interpreters/channels.py | 110 | ||||
-rw-r--r-- | Lib/test/support/interpreters/queues.py | 60 |
3 files changed, 209 insertions, 63 deletions
diff --git a/Lib/test/support/interpreters/_crossinterp.py b/Lib/test/support/interpreters/_crossinterp.py new file mode 100644 index 0000000..544e197 --- /dev/null +++ b/Lib/test/support/interpreters/_crossinterp.py @@ -0,0 +1,102 @@ +"""Common code between queues and channels.""" + + +class ItemInterpreterDestroyed(Exception): + """Raised when trying to get an item whose interpreter was destroyed.""" + + +class classonly: + """A non-data descriptor that makes a value only visible on the class. + + This is like the "classmethod" builtin, but does not show up on + instances of the class. It may be used as a decorator. + """ + + def __init__(self, value): + self.value = value + self.getter = classmethod(value).__get__ + self.name = None + + def __set_name__(self, cls, name): + if self.name is not None: + raise TypeError('already used') + self.name = name + + def __get__(self, obj, cls): + if obj is not None: + raise AttributeError(self.name) + # called on the class + return self.getter(None, cls) + + +class UnboundItem: + """Represents a cross-interpreter item no longer bound to an interpreter. + + An item is unbound when the interpreter that added it to the + cross-interpreter container is destroyed. + """ + + __slots__ = () + + @classonly + def singleton(cls, kind, module, name='UNBOUND'): + doc = cls.__doc__.replace('cross-interpreter container', kind) + doc = doc.replace('cross-interpreter', kind) + subclass = type( + f'Unbound{kind.capitalize()}Item', + (cls,), + dict( + _MODULE=module, + _NAME=name, + __doc__=doc, + ), + ) + return object.__new__(subclass) + + _MODULE = __name__ + _NAME = 'UNBOUND' + + def __new__(cls): + raise Exception(f'use {cls._MODULE}.{cls._NAME}') + + def __repr__(self): + return f'{self._MODULE}.{self._NAME}' +# return f'interpreters.queues.UNBOUND' + + +UNBOUND = object.__new__(UnboundItem) +UNBOUND_ERROR = object() +UNBOUND_REMOVE = object() + +_UNBOUND_CONSTANT_TO_FLAG = { + UNBOUND_REMOVE: 1, + UNBOUND_ERROR: 2, + UNBOUND: 3, +} +_UNBOUND_FLAG_TO_CONSTANT = {v: k + for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} + + +def serialize_unbound(unbound): + op = unbound + try: + flag = _UNBOUND_CONSTANT_TO_FLAG[op] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {op!r}') + return flag, + + +def resolve_unbound(flag, exctype_destroyed): + try: + op = _UNBOUND_FLAG_TO_CONSTANT[flag] + except KeyError: + raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') + if op is UNBOUND_REMOVE: + # "remove" not possible here + raise NotImplementedError + elif op is UNBOUND_ERROR: + raise exctype_destroyed("item's original interpreter destroyed") + elif op is UNBOUND: + return UNBOUND + else: + raise NotImplementedError(repr(op)) diff --git a/Lib/test/support/interpreters/channels.py b/Lib/test/support/interpreters/channels.py index fbae7e6..d2bd93d 100644 --- a/Lib/test/support/interpreters/channels.py +++ b/Lib/test/support/interpreters/channels.py @@ -2,35 +2,68 @@ import time import _interpchannels as _channels +from . import _crossinterp # aliases: from _interpchannels import ( ChannelError, ChannelNotFoundError, ChannelClosedError, ChannelEmptyError, ChannelNotEmptyError, ) +from ._crossinterp import ( + UNBOUND_ERROR, UNBOUND_REMOVE, +) __all__ = [ + 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', 'create', 'list_all', 'SendChannel', 'RecvChannel', 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError', + 'ItemInterpreterDestroyed', ] -def create(): +class ItemInterpreterDestroyed(ChannelError, + _crossinterp.ItemInterpreterDestroyed): + """Raised from get() and get_nowait().""" + + +UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) + + +def _serialize_unbound(unbound): + if unbound is UNBOUND: + unbound = _crossinterp.UNBOUND + return _crossinterp.serialize_unbound(unbound) + + +def _resolve_unbound(flag): + resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) + if resolved is _crossinterp.UNBOUND: + resolved = UNBOUND + return resolved + + +def create(*, unbounditems=UNBOUND): """Return (recv, send) for a new cross-interpreter channel. The channel may be used to pass data safely between interpreters. + + "unbounditems" sets the default for the send end of the channel. + See SendChannel.send() for supported values. The default value + is UNBOUND, which replaces the unbound item when received. """ - cid = _channels.create() - recv, send = RecvChannel(cid), SendChannel(cid) + unbound = _serialize_unbound(unbounditems) + unboundop, = unbound + cid = _channels.create(unboundop) + recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound) return recv, send def list_all(): """Return a list of (recv, send) for all open channels.""" - return [(RecvChannel(cid), SendChannel(cid)) - for cid in _channels.list_all()] + return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound)) + for cid, unbound in _channels.list_all()] class _ChannelEnd: @@ -106,12 +139,15 @@ class RecvChannel(_ChannelEnd): if timeout < 0: raise ValueError(f'timeout value must be non-negative') end = time.time() + timeout - obj = _channels.recv(self._id, _sentinel) + obj, unboundop = _channels.recv(self._id, _sentinel) while obj is _sentinel: time.sleep(_delay) if timeout is not None and time.time() >= end: raise TimeoutError - obj = _channels.recv(self._id, _sentinel) + obj, unboundop = _channels.recv(self._id, _sentinel) + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) return obj def recv_nowait(self, default=_NOT_SET): @@ -122,9 +158,13 @@ class RecvChannel(_ChannelEnd): is the same as recv(). """ if default is _NOT_SET: - return _channels.recv(self._id) + obj, unboundop = _channels.recv(self._id) else: - return _channels.recv(self._id, default) + obj, unboundop = _channels.recv(self._id, default) + if unboundop is not None: + assert obj is None, repr(obj) + return _resolve_unbound(unboundop) + return obj def close(self): _channels.close(self._id, recv=True) @@ -135,43 +175,79 @@ class SendChannel(_ChannelEnd): _end = 'send' + def __new__(cls, cid, *, _unbound=None): + if _unbound is None: + try: + op = _channels.get_channel_defaults(cid) + _unbound = (op,) + except ChannelNotFoundError: + _unbound = _serialize_unbound(UNBOUND) + self = super().__new__(cls, cid) + self._unbound = _unbound + return self + @property def is_closed(self): info = self._info return info.closed or info.closing - def send(self, obj, timeout=None): + def send(self, obj, timeout=None, *, + unbound=None, + ): """Send the object (i.e. its data) to the channel's receiving end. This blocks until the object is received. """ - _channels.send(self._id, obj, timeout=timeout, blocking=True) + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True) - def send_nowait(self, obj): + def send_nowait(self, obj, *, + unbound=None, + ): """Send the object to the channel's receiving end. If the object is immediately received then return True (else False). Otherwise this is the same as send(). """ + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) # XXX Note that at the moment channel_send() only ever returns # None. This should be fixed when channel_send_wait() is added. # See bpo-32604 and gh-19829. - return _channels.send(self._id, obj, blocking=False) + return _channels.send(self._id, obj, unboundop, blocking=False) - def send_buffer(self, obj, timeout=None): + def send_buffer(self, obj, timeout=None, *, + unbound=None, + ): """Send the object's buffer to the channel's receiving end. This blocks until the object is received. """ - _channels.send_buffer(self._id, obj, timeout=timeout, blocking=True) + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + _channels.send_buffer(self._id, obj, unboundop, + timeout=timeout, blocking=True) - def send_buffer_nowait(self, obj): + def send_buffer_nowait(self, obj, *, + unbound=None, + ): """Send the object's buffer to the channel's receiving end. If the object is immediately received then return True (else False). Otherwise this is the same as send(). """ - return _channels.send_buffer(self._id, obj, blocking=False) + if unbound is None: + unboundop, = self._unbound + else: + unboundop, = _serialize_unbound(unbound) + return _channels.send_buffer(self._id, obj, unboundop, blocking=False) def close(self): _channels.close(self._id, send=True) diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py index 402ceff..deb8e86 100644 --- a/Lib/test/support/interpreters/queues.py +++ b/Lib/test/support/interpreters/queues.py @@ -5,11 +5,15 @@ import queue import time import weakref import _interpqueues as _queues +from . import _crossinterp # aliases: from _interpqueues import ( QueueError, QueueNotFoundError, ) +from ._crossinterp import ( + UNBOUND_ERROR, UNBOUND_REMOVE, +) __all__ = [ 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE', @@ -34,7 +38,8 @@ class QueueFull(QueueError, queue.Full): """ -class ItemInterpreterDestroyed(QueueError): +class ItemInterpreterDestroyed(QueueError, + _crossinterp.ItemInterpreterDestroyed): """Raised from get() and get_nowait().""" @@ -42,57 +47,20 @@ _SHARED_ONLY = 0 _PICKLED = 1 -class UnboundItem: - """Represents a Queue item no longer bound to an interpreter. - - An item is unbound when the interpreter that added it to the queue - is destroyed. - """ - - __slots__ = () - - def __new__(cls): - return UNBOUND - - def __repr__(self): - return f'interpreters.queues.UNBOUND' - - -UNBOUND = object.__new__(UnboundItem) -UNBOUND_ERROR = object() -UNBOUND_REMOVE = object() +UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) -_UNBOUND_CONSTANT_TO_FLAG = { - UNBOUND_REMOVE: 1, - UNBOUND_ERROR: 2, - UNBOUND: 3, -} -_UNBOUND_FLAG_TO_CONSTANT = {v: k - for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} def _serialize_unbound(unbound): - op = unbound - try: - flag = _UNBOUND_CONSTANT_TO_FLAG[op] - except KeyError: - raise NotImplementedError(f'unsupported unbound replacement op {op!r}') - return flag, + if unbound is UNBOUND: + unbound = _crossinterp.UNBOUND + return _crossinterp.serialize_unbound(unbound) def _resolve_unbound(flag): - try: - op = _UNBOUND_FLAG_TO_CONSTANT[flag] - except KeyError: - raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') - if op is UNBOUND_REMOVE: - # "remove" not possible here - raise NotImplementedError - elif op is UNBOUND_ERROR: - raise ItemInterpreterDestroyed("item's original interpreter destroyed") - elif op is UNBOUND: - return UNBOUND - else: - raise NotImplementedError(repr(op)) + resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) + if resolved is _crossinterp.UNBOUND: + resolved = UNBOUND + return resolved def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND): |