diff options
author | Eric Snow <ericsnowcurrently@gmail.com> | 2023-12-12 15:24:31 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-12 15:24:31 (GMT) |
commit | 86a77f4e1a5ceaff1036b0072521e12752b5df47 (patch) | |
tree | cecc78dab93112a3a92ae66fc0156630408063b3 /Lib/test/support | |
parent | f26bfe4b25f7e5a4f68fcac26207b7175abad208 (diff) | |
download | cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.zip cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.tar.gz cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.tar.bz2 |
gh-76785: Fixes for test.support.interpreters (gh-112982)
This involves a number of changes for PEP 734.
Diffstat (limited to 'Lib/test/support')
-rw-r--r-- | Lib/test/support/interpreters/__init__.py | 160 | ||||
-rw-r--r-- | Lib/test/support/interpreters/channels.py (renamed from Lib/test/support/interpreters.py) | 124 | ||||
-rw-r--r-- | Lib/test/support/interpreters/queues.py | 156 |
3 files changed, 322 insertions, 118 deletions
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py new file mode 100644 index 0000000..2d6376d --- /dev/null +++ b/Lib/test/support/interpreters/__init__.py @@ -0,0 +1,160 @@ +"""Subinterpreters High Level Module.""" + +import threading +import weakref +import _xxsubinterpreters as _interpreters + +# aliases: +from _xxsubinterpreters import ( + InterpreterError, InterpreterNotFoundError, + is_shareable, +) + + +__all__ = [ + 'get_current', 'get_main', 'create', 'list_all', 'is_shareable', + 'Interpreter', + 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure', + 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull', +] + + +_queuemod = None + +def __getattr__(name): + if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'): + global create_queue, Queue, QueueEmpty, QueueFull + ns = globals() + from .queues import ( + create as create_queue, + Queue, QueueEmpty, QueueFull, + ) + return ns[name] + else: + raise AttributeError(name) + + +class ExecFailure(RuntimeError): + + def __init__(self, excinfo): + msg = excinfo.formatted + if not msg: + if excinfo.type and snapshot.msg: + msg = f'{snapshot.type.__name__}: {snapshot.msg}' + else: + msg = snapshot.type.__name__ or snapshot.msg + super().__init__(msg) + self.snapshot = excinfo + + +def create(): + """Return a new (idle) Python interpreter.""" + id = _interpreters.create(isolated=True) + return Interpreter(id) + + +def list_all(): + """Return all existing interpreters.""" + return [Interpreter(id) for id in _interpreters.list_all()] + + +def get_current(): + """Return the currently running interpreter.""" + id = _interpreters.get_current() + return Interpreter(id) + + +def get_main(): + """Return the main interpreter.""" + id = _interpreters.get_main() + return Interpreter(id) + + +_known = weakref.WeakValueDictionary() + +class Interpreter: + """A single Python interpreter.""" + + def __new__(cls, id, /): + # There is only one instance for any given ID. + if not isinstance(id, int): + raise TypeError(f'id must be an int, got {id!r}') + id = int(id) + try: + self = _known[id] + assert hasattr(self, '_ownsref') + except KeyError: + # This may raise InterpreterNotFoundError: + _interpreters._incref(id) + try: + self = super().__new__(cls) + self._id = id + self._ownsref = True + except BaseException: + _interpreters._deccref(id) + raise + _known[id] = self + return self + + def __repr__(self): + return f'{type(self).__name__}({self.id})' + + def __hash__(self): + return hash(self._id) + + def __del__(self): + self._decref() + + def _decref(self): + if not self._ownsref: + return + self._ownsref = False + try: + _interpreters._decref(self.id) + except InterpreterNotFoundError: + pass + + @property + def id(self): + return self._id + + def is_running(self): + """Return whether or not the identified interpreter is running.""" + return _interpreters.is_running(self._id) + + def close(self): + """Finalize and destroy the interpreter. + + Attempting to destroy the current interpreter results + in a RuntimeError. + """ + return _interpreters.destroy(self._id) + + def exec_sync(self, code, /, channels=None): + """Run the given source code in the interpreter. + + This is essentially the same as calling the builtin "exec" + with this interpreter, using the __dict__ of its __main__ + module as both globals and locals. + + There is no return value. + + If the code raises an unhandled exception then an ExecFailure + is raised, which summarizes the unhandled exception. The actual + exception is discarded because objects cannot be shared between + interpreters. + + This blocks the current Python thread until done. During + that time, the previous interpreter is allowed to run + in other threads. + """ + excinfo = _interpreters.exec(self._id, code, channels) + if excinfo is not None: + raise ExecFailure(excinfo) + + def run(self, code, /, channels=None): + def task(): + self.exec_sync(code, channels=channels) + t = threading.Thread(target=task) + t.start() + return t diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters/channels.py index 089fe7e..75a5a60 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters/channels.py @@ -1,11 +1,9 @@ -"""Subinterpreters High Level Module.""" +"""Cross-interpreter Channels High Level Module.""" import time -import _xxsubinterpreters as _interpreters import _xxinterpchannels as _channels # aliases: -from _xxsubinterpreters import is_shareable from _xxinterpchannels import ( ChannelError, ChannelNotFoundError, ChannelClosedError, ChannelEmptyError, ChannelNotEmptyError, @@ -13,123 +11,13 @@ from _xxinterpchannels import ( __all__ = [ - 'Interpreter', 'get_current', 'get_main', 'create', 'list_all', - 'RunFailedError', + 'create', 'list_all', 'SendChannel', 'RecvChannel', - 'create_channel', 'list_all_channels', 'is_shareable', - 'ChannelError', 'ChannelNotFoundError', - 'ChannelEmptyError', - ] + 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError', +] -class RunFailedError(RuntimeError): - - def __init__(self, excinfo): - msg = excinfo.formatted - if not msg: - if excinfo.type and snapshot.msg: - msg = f'{snapshot.type.__name__}: {snapshot.msg}' - else: - msg = snapshot.type.__name__ or snapshot.msg - super().__init__(msg) - self.snapshot = excinfo - - -def create(*, isolated=True): - """Return a new (idle) Python interpreter.""" - id = _interpreters.create(isolated=isolated) - return Interpreter(id, isolated=isolated) - - -def list_all(): - """Return all existing interpreters.""" - return [Interpreter(id) for id in _interpreters.list_all()] - - -def get_current(): - """Return the currently running interpreter.""" - id = _interpreters.get_current() - return Interpreter(id) - - -def get_main(): - """Return the main interpreter.""" - id = _interpreters.get_main() - return Interpreter(id) - - -class Interpreter: - """A single Python interpreter.""" - - def __init__(self, id, *, isolated=None): - if not isinstance(id, (int, _interpreters.InterpreterID)): - raise TypeError(f'id must be an int, got {id!r}') - self._id = id - self._isolated = isolated - - def __repr__(self): - data = dict(id=int(self._id), isolated=self._isolated) - kwargs = (f'{k}={v!r}' for k, v in data.items()) - return f'{type(self).__name__}({", ".join(kwargs)})' - - def __hash__(self): - return hash(self._id) - - def __eq__(self, other): - if not isinstance(other, Interpreter): - return NotImplemented - else: - return other._id == self._id - - @property - def id(self): - return self._id - - @property - def isolated(self): - if self._isolated is None: - # XXX The low-level function has not been added yet. - # See bpo-.... - self._isolated = _interpreters.is_isolated(self._id) - return self._isolated - - def is_running(self): - """Return whether or not the identified interpreter is running.""" - return _interpreters.is_running(self._id) - - def close(self): - """Finalize and destroy the interpreter. - - Attempting to destroy the current interpreter results - in a RuntimeError. - """ - return _interpreters.destroy(self._id) - - # XXX Rename "run" to "exec"? - def run(self, src_str, /, channels=None): - """Run the given source code in the interpreter. - - This is essentially the same as calling the builtin "exec" - with this interpreter, using the __dict__ of its __main__ - module as both globals and locals. - - There is no return value. - - If the code raises an unhandled exception then a RunFailedError - is raised, which summarizes the unhandled exception. The actual - exception is discarded because objects cannot be shared between - interpreters. - - This blocks the current Python thread until done. During - that time, the previous interpreter is allowed to run - in other threads. - """ - excinfo = _interpreters.exec(self._id, src_str, channels) - if excinfo is not None: - raise RunFailedError(excinfo) - - -def create_channel(): +def create(): """Return (recv, send) for a new cross-interpreter channel. The channel may be used to pass data safely between interpreters. @@ -139,7 +27,7 @@ def create_channel(): return recv, send -def list_all_channels(): +def list_all(): """Return a list of (recv, send) for all open channels.""" return [(RecvChannel(cid), SendChannel(cid)) for cid in _channels.list_all()] diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py new file mode 100644 index 0000000..ed6b0d5 --- /dev/null +++ b/Lib/test/support/interpreters/queues.py @@ -0,0 +1,156 @@ +"""Cross-interpreter Queues High Level Module.""" + +import queue +import time +import weakref +import _xxinterpchannels as _channels +import _xxinterpchannels as _queues + +# aliases: +from _xxinterpchannels import ( + ChannelError as QueueError, + ChannelNotFoundError as QueueNotFoundError, +) + +__all__ = [ + 'create', 'list_all', + 'Queue', + 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', +] + + +def create(maxsize=0): + """Return a new cross-interpreter queue. + + The queue may be used to pass data safely between interpreters. + """ + # XXX honor maxsize + qid = _queues.create() + return Queue._with_maxsize(qid, maxsize) + + +def list_all(): + """Return a list of all open queues.""" + return [Queue(qid) + for qid in _queues.list_all()] + + +class QueueEmpty(queue.Empty): + """Raised from get_nowait() when the queue is empty. + + It is also raised from get() if it times out. + """ + + +class QueueFull(queue.Full): + """Raised from put_nowait() when the queue is full. + + It is also raised from put() if it times out. + """ + + +_known_queues = weakref.WeakValueDictionary() + +class Queue: + """A cross-interpreter queue.""" + + @classmethod + def _with_maxsize(cls, id, maxsize): + if not isinstance(maxsize, int): + raise TypeError(f'maxsize must be an int, got {maxsize!r}') + elif maxsize < 0: + maxsize = 0 + else: + maxsize = int(maxsize) + self = cls(id) + self._maxsize = maxsize + return self + + def __new__(cls, id, /): + # There is only one instance for any given ID. + if isinstance(id, int): + id = _channels._channel_id(id, force=False) + elif not isinstance(id, _channels.ChannelID): + raise TypeError(f'id must be an int, got {id!r}') + key = int(id) + try: + self = _known_queues[key] + except KeyError: + self = super().__new__(cls) + self._id = id + self._maxsize = 0 + _known_queues[key] = self + return self + + def __repr__(self): + return f'{type(self).__name__}({self.id})' + + def __hash__(self): + return hash(self._id) + + @property + def id(self): + return int(self._id) + + @property + def maxsize(self): + return self._maxsize + + @property + def _info(self): + return _channels.get_info(self._id) + + def empty(self): + return self._info.count == 0 + + def full(self): + if self._maxsize <= 0: + return False + return self._info.count >= self._maxsize + + def qsize(self): + return self._info.count + + def put(self, obj, timeout=None): + # XXX block if full + _channels.send(self._id, obj, blocking=False) + + def put_nowait(self, obj): + # XXX raise QueueFull if full + return _channels.send(self._id, obj, blocking=False) + + def get(self, timeout=None, *, + _sentinel=object(), + _delay=10 / 1000, # 10 milliseconds + ): + """Return the next object from the queue. + + This blocks while the queue is empty. + """ + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.time() + timeout + obj = _channels.recv(self._id, _sentinel) + while obj is _sentinel: + time.sleep(_delay) + if timeout is not None and time.time() >= end: + raise QueueEmpty + obj = _channels.recv(self._id, _sentinel) + return obj + + def get_nowait(self, *, _sentinel=object()): + """Return the next object from the channel. + + If the queue is empty then raise QueueEmpty. Otherwise this + is the same as get(). + """ + obj = _channels.recv(self._id, _sentinel) + if obj is _sentinel: + raise QueueEmpty + return obj + + +# XXX add this: +#_channels._register_queue_type(Queue) |