summaryrefslogtreecommitdiffstats
path: root/Lib/test/support
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2023-12-12 15:24:31 (GMT)
committerGitHub <noreply@github.com>2023-12-12 15:24:31 (GMT)
commit86a77f4e1a5ceaff1036b0072521e12752b5df47 (patch)
treececc78dab93112a3a92ae66fc0156630408063b3 /Lib/test/support
parentf26bfe4b25f7e5a4f68fcac26207b7175abad208 (diff)
downloadcpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.zip
cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.tar.gz
cpython-86a77f4e1a5ceaff1036b0072521e12752b5df47.tar.bz2
gh-76785: Fixes for test.support.interpreters (gh-112982)
This involves a number of changes for PEP 734.
Diffstat (limited to 'Lib/test/support')
-rw-r--r--Lib/test/support/interpreters/__init__.py160
-rw-r--r--Lib/test/support/interpreters/channels.py (renamed from Lib/test/support/interpreters.py)124
-rw-r--r--Lib/test/support/interpreters/queues.py156
3 files changed, 322 insertions, 118 deletions
diff --git a/Lib/test/support/interpreters/__init__.py b/Lib/test/support/interpreters/__init__.py
new file mode 100644
index 0000000..2d6376d
--- /dev/null
+++ b/Lib/test/support/interpreters/__init__.py
@@ -0,0 +1,160 @@
+"""Subinterpreters High Level Module."""
+
+import threading
+import weakref
+import _xxsubinterpreters as _interpreters
+
+# aliases:
+from _xxsubinterpreters import (
+ InterpreterError, InterpreterNotFoundError,
+ is_shareable,
+)
+
+
+__all__ = [
+ 'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
+ 'Interpreter',
+ 'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure',
+ 'create_queue', 'Queue', 'QueueEmpty', 'QueueFull',
+]
+
+
+_queuemod = None
+
+def __getattr__(name):
+ if name in ('Queue', 'QueueEmpty', 'QueueFull', 'create_queue'):
+ global create_queue, Queue, QueueEmpty, QueueFull
+ ns = globals()
+ from .queues import (
+ create as create_queue,
+ Queue, QueueEmpty, QueueFull,
+ )
+ return ns[name]
+ else:
+ raise AttributeError(name)
+
+
+class ExecFailure(RuntimeError):
+
+ def __init__(self, excinfo):
+ msg = excinfo.formatted
+ if not msg:
+ if excinfo.type and snapshot.msg:
+ msg = f'{snapshot.type.__name__}: {snapshot.msg}'
+ else:
+ msg = snapshot.type.__name__ or snapshot.msg
+ super().__init__(msg)
+ self.snapshot = excinfo
+
+
+def create():
+ """Return a new (idle) Python interpreter."""
+ id = _interpreters.create(isolated=True)
+ return Interpreter(id)
+
+
+def list_all():
+ """Return all existing interpreters."""
+ return [Interpreter(id) for id in _interpreters.list_all()]
+
+
+def get_current():
+ """Return the currently running interpreter."""
+ id = _interpreters.get_current()
+ return Interpreter(id)
+
+
+def get_main():
+ """Return the main interpreter."""
+ id = _interpreters.get_main()
+ return Interpreter(id)
+
+
+_known = weakref.WeakValueDictionary()
+
+class Interpreter:
+ """A single Python interpreter."""
+
+ def __new__(cls, id, /):
+ # There is only one instance for any given ID.
+ if not isinstance(id, int):
+ raise TypeError(f'id must be an int, got {id!r}')
+ id = int(id)
+ try:
+ self = _known[id]
+ assert hasattr(self, '_ownsref')
+ except KeyError:
+ # This may raise InterpreterNotFoundError:
+ _interpreters._incref(id)
+ try:
+ self = super().__new__(cls)
+ self._id = id
+ self._ownsref = True
+ except BaseException:
+ _interpreters._deccref(id)
+ raise
+ _known[id] = self
+ return self
+
+ def __repr__(self):
+ return f'{type(self).__name__}({self.id})'
+
+ def __hash__(self):
+ return hash(self._id)
+
+ def __del__(self):
+ self._decref()
+
+ def _decref(self):
+ if not self._ownsref:
+ return
+ self._ownsref = False
+ try:
+ _interpreters._decref(self.id)
+ except InterpreterNotFoundError:
+ pass
+
+ @property
+ def id(self):
+ return self._id
+
+ def is_running(self):
+ """Return whether or not the identified interpreter is running."""
+ return _interpreters.is_running(self._id)
+
+ def close(self):
+ """Finalize and destroy the interpreter.
+
+ Attempting to destroy the current interpreter results
+ in a RuntimeError.
+ """
+ return _interpreters.destroy(self._id)
+
+ def exec_sync(self, code, /, channels=None):
+ """Run the given source code in the interpreter.
+
+ This is essentially the same as calling the builtin "exec"
+ with this interpreter, using the __dict__ of its __main__
+ module as both globals and locals.
+
+ There is no return value.
+
+ If the code raises an unhandled exception then an ExecFailure
+ is raised, which summarizes the unhandled exception. The actual
+ exception is discarded because objects cannot be shared between
+ interpreters.
+
+ This blocks the current Python thread until done. During
+ that time, the previous interpreter is allowed to run
+ in other threads.
+ """
+ excinfo = _interpreters.exec(self._id, code, channels)
+ if excinfo is not None:
+ raise ExecFailure(excinfo)
+
+ def run(self, code, /, channels=None):
+ def task():
+ self.exec_sync(code, channels=channels)
+ t = threading.Thread(target=task)
+ t.start()
+ return t
diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters/channels.py
index 089fe7e..75a5a60 100644
--- a/Lib/test/support/interpreters.py
+++ b/Lib/test/support/interpreters/channels.py
@@ -1,11 +1,9 @@
-"""Subinterpreters High Level Module."""
+"""Cross-interpreter Channels High Level Module."""
import time
-import _xxsubinterpreters as _interpreters
import _xxinterpchannels as _channels
# aliases:
-from _xxsubinterpreters import is_shareable
from _xxinterpchannels import (
ChannelError, ChannelNotFoundError, ChannelClosedError,
ChannelEmptyError, ChannelNotEmptyError,
@@ -13,123 +11,13 @@ from _xxinterpchannels import (
__all__ = [
- 'Interpreter', 'get_current', 'get_main', 'create', 'list_all',
- 'RunFailedError',
+ 'create', 'list_all',
'SendChannel', 'RecvChannel',
- 'create_channel', 'list_all_channels', 'is_shareable',
- 'ChannelError', 'ChannelNotFoundError',
- 'ChannelEmptyError',
- ]
+ 'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError',
+]
-class RunFailedError(RuntimeError):
-
- def __init__(self, excinfo):
- msg = excinfo.formatted
- if not msg:
- if excinfo.type and snapshot.msg:
- msg = f'{snapshot.type.__name__}: {snapshot.msg}'
- else:
- msg = snapshot.type.__name__ or snapshot.msg
- super().__init__(msg)
- self.snapshot = excinfo
-
-
-def create(*, isolated=True):
- """Return a new (idle) Python interpreter."""
- id = _interpreters.create(isolated=isolated)
- return Interpreter(id, isolated=isolated)
-
-
-def list_all():
- """Return all existing interpreters."""
- return [Interpreter(id) for id in _interpreters.list_all()]
-
-
-def get_current():
- """Return the currently running interpreter."""
- id = _interpreters.get_current()
- return Interpreter(id)
-
-
-def get_main():
- """Return the main interpreter."""
- id = _interpreters.get_main()
- return Interpreter(id)
-
-
-class Interpreter:
- """A single Python interpreter."""
-
- def __init__(self, id, *, isolated=None):
- if not isinstance(id, (int, _interpreters.InterpreterID)):
- raise TypeError(f'id must be an int, got {id!r}')
- self._id = id
- self._isolated = isolated
-
- def __repr__(self):
- data = dict(id=int(self._id), isolated=self._isolated)
- kwargs = (f'{k}={v!r}' for k, v in data.items())
- return f'{type(self).__name__}({", ".join(kwargs)})'
-
- def __hash__(self):
- return hash(self._id)
-
- def __eq__(self, other):
- if not isinstance(other, Interpreter):
- return NotImplemented
- else:
- return other._id == self._id
-
- @property
- def id(self):
- return self._id
-
- @property
- def isolated(self):
- if self._isolated is None:
- # XXX The low-level function has not been added yet.
- # See bpo-....
- self._isolated = _interpreters.is_isolated(self._id)
- return self._isolated
-
- def is_running(self):
- """Return whether or not the identified interpreter is running."""
- return _interpreters.is_running(self._id)
-
- def close(self):
- """Finalize and destroy the interpreter.
-
- Attempting to destroy the current interpreter results
- in a RuntimeError.
- """
- return _interpreters.destroy(self._id)
-
- # XXX Rename "run" to "exec"?
- def run(self, src_str, /, channels=None):
- """Run the given source code in the interpreter.
-
- This is essentially the same as calling the builtin "exec"
- with this interpreter, using the __dict__ of its __main__
- module as both globals and locals.
-
- There is no return value.
-
- If the code raises an unhandled exception then a RunFailedError
- is raised, which summarizes the unhandled exception. The actual
- exception is discarded because objects cannot be shared between
- interpreters.
-
- This blocks the current Python thread until done. During
- that time, the previous interpreter is allowed to run
- in other threads.
- """
- excinfo = _interpreters.exec(self._id, src_str, channels)
- if excinfo is not None:
- raise RunFailedError(excinfo)
-
-
-def create_channel():
+def create():
"""Return (recv, send) for a new cross-interpreter channel.
The channel may be used to pass data safely between interpreters.
@@ -139,7 +27,7 @@ def create_channel():
return recv, send
-def list_all_channels():
+def list_all():
"""Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid), SendChannel(cid))
for cid in _channels.list_all()]
diff --git a/Lib/test/support/interpreters/queues.py b/Lib/test/support/interpreters/queues.py
new file mode 100644
index 0000000..ed6b0d5
--- /dev/null
+++ b/Lib/test/support/interpreters/queues.py
@@ -0,0 +1,156 @@
+"""Cross-interpreter Queues High Level Module."""
+
+import queue
+import time
+import weakref
+import _xxinterpchannels as _channels
+import _xxinterpchannels as _queues
+
+# aliases:
+from _xxinterpchannels import (
+ ChannelError as QueueError,
+ ChannelNotFoundError as QueueNotFoundError,
+)
+
+__all__ = [
+ 'create', 'list_all',
+ 'Queue',
+ 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
+]
+
+
+def create(maxsize=0):
+ """Return a new cross-interpreter queue.
+
+ The queue may be used to pass data safely between interpreters.
+ """
+ # XXX honor maxsize
+ qid = _queues.create()
+ return Queue._with_maxsize(qid, maxsize)
+
+
+def list_all():
+ """Return a list of all open queues."""
+ return [Queue(qid)
+ for qid in _queues.list_all()]
+
+
+class QueueEmpty(queue.Empty):
+ """Raised from get_nowait() when the queue is empty.
+
+ It is also raised from get() if it times out.
+ """
+
+
+class QueueFull(queue.Full):
+ """Raised from put_nowait() when the queue is full.
+
+ It is also raised from put() if it times out.
+ """
+
+
+_known_queues = weakref.WeakValueDictionary()
+
+class Queue:
+ """A cross-interpreter queue."""
+
+ @classmethod
+ def _with_maxsize(cls, id, maxsize):
+ if not isinstance(maxsize, int):
+ raise TypeError(f'maxsize must be an int, got {maxsize!r}')
+ elif maxsize < 0:
+ maxsize = 0
+ else:
+ maxsize = int(maxsize)
+ self = cls(id)
+ self._maxsize = maxsize
+ return self
+
+ def __new__(cls, id, /):
+ # There is only one instance for any given ID.
+ if isinstance(id, int):
+ id = _channels._channel_id(id, force=False)
+ elif not isinstance(id, _channels.ChannelID):
+ raise TypeError(f'id must be an int, got {id!r}')
+ key = int(id)
+ try:
+ self = _known_queues[key]
+ except KeyError:
+ self = super().__new__(cls)
+ self._id = id
+ self._maxsize = 0
+ _known_queues[key] = self
+ return self
+
+ def __repr__(self):
+ return f'{type(self).__name__}({self.id})'
+
+ def __hash__(self):
+ return hash(self._id)
+
+ @property
+ def id(self):
+ return int(self._id)
+
+ @property
+ def maxsize(self):
+ return self._maxsize
+
+ @property
+ def _info(self):
+ return _channels.get_info(self._id)
+
+ def empty(self):
+ return self._info.count == 0
+
+ def full(self):
+ if self._maxsize <= 0:
+ return False
+ return self._info.count >= self._maxsize
+
+ def qsize(self):
+ return self._info.count
+
+ def put(self, obj, timeout=None):
+ # XXX block if full
+ _channels.send(self._id, obj, blocking=False)
+
+ def put_nowait(self, obj):
+ # XXX raise QueueFull if full
+ return _channels.send(self._id, obj, blocking=False)
+
+ def get(self, timeout=None, *,
+ _sentinel=object(),
+ _delay=10 / 1000, # 10 milliseconds
+ ):
+ """Return the next object from the queue.
+
+ This blocks while the queue is empty.
+ """
+ if timeout is not None:
+ timeout = int(timeout)
+ if timeout < 0:
+ raise ValueError(f'timeout value must be non-negative')
+ end = time.time() + timeout
+ obj = _channels.recv(self._id, _sentinel)
+ while obj is _sentinel:
+ time.sleep(_delay)
+ if timeout is not None and time.time() >= end:
+ raise QueueEmpty
+ obj = _channels.recv(self._id, _sentinel)
+ return obj
+
+ def get_nowait(self, *, _sentinel=object()):
+ """Return the next object from the channel.
+
+ If the queue is empty then raise QueueEmpty. Otherwise this
+ is the same as get().
+ """
+ obj = _channels.recv(self._id, _sentinel)
+ if obj is _sentinel:
+ raise QueueEmpty
+ return obj
+
+
+# XXX add this:
+#_channels._register_queue_type(Queue)