summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/asyncio/unix_events.py199
-rw-r--r--Lib/test/test_asyncio/test_events.py26
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py30
-rw-r--r--Lib/test/test_asyncio/test_unix_events.py26
-rw-r--r--Lib/test/test_asyncio/utils.py17
5 files changed, 39 insertions, 259 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index ff2df65..c22d077 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -28,9 +28,6 @@ from .log import logger
__all__ = (
'SelectorEventLoop',
- 'AbstractChildWatcher',
- 'PidfdChildWatcher',
- 'ThreadedChildWatcher',
'DefaultEventLoopPolicy',
'EventLoop',
)
@@ -65,6 +62,10 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
super().__init__(selector)
self._signal_handlers = {}
self._unix_server_sockets = {}
+ if can_use_pidfd():
+ self._watcher = _PidfdChildWatcher()
+ else:
+ self._watcher = _ThreadedChildWatcher()
def close(self):
super().close()
@@ -197,33 +198,22 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
async def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = events.get_event_loop_policy()._watcher
-
- with watcher:
- if not watcher.is_active():
- # Check early.
- # Raising exception before process creation
- # prevents subprocess execution if the watcher
- # is not ready to handle it.
- raise RuntimeError("asyncio.get_child_watcher() is not activated, "
- "subprocess support is not installed.")
- waiter = self.create_future()
- transp = _UnixSubprocessTransport(self, protocol, args, shell,
- stdin, stdout, stderr, bufsize,
- waiter=waiter, extra=extra,
- **kwargs)
- watcher.add_child_handler(transp.get_pid(),
- self._child_watcher_callback, transp)
- try:
- await waiter
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
- transp.close()
- await transp._wait()
- raise
+ watcher = self._watcher
+ waiter = self.create_future()
+ transp = _UnixSubprocessTransport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ waiter=waiter, extra=extra,
+ **kwargs)
+ watcher.add_child_handler(transp.get_pid(),
+ self._child_watcher_callback, transp)
+ try:
+ await waiter
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException:
+ transp.close()
+ await transp._wait()
+ raise
return transp
@@ -865,93 +855,7 @@ class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
stdin_w.close()
-class AbstractChildWatcher:
- """Abstract base class for monitoring child processes.
-
- Objects derived from this class monitor a collection of subprocesses and
- report their termination or interruption by a signal.
-
- New callbacks are registered with .add_child_handler(). Starting a new
- process must be done within a 'with' block to allow the watcher to suspend
- its activity until the new process if fully registered (this is needed to
- prevent a race condition in some implementations).
-
- Example:
- with watcher:
- proc = subprocess.Popen("sleep 1")
- watcher.add_child_handler(proc.pid, callback)
-
- Notes:
- Implementations of this class must be thread-safe.
-
- Since child watcher objects may catch the SIGCHLD signal and call
- waitpid(-1), there should be only one active object per process.
- """
-
- def __init_subclass__(cls) -> None:
- if cls.__module__ != __name__:
- warnings._deprecated("AbstractChildWatcher",
- "{name!r} is deprecated as of Python 3.12 and will be "
- "removed in Python {remove}.",
- remove=(3, 14))
-
- def add_child_handler(self, pid, callback, *args):
- """Register a new child handler.
-
- Arrange for callback(pid, returncode, *args) to be called when
- process 'pid' terminates. Specifying another callback for the same
- process replaces the previous handler.
-
- Note: callback() must be thread-safe.
- """
- raise NotImplementedError()
-
- def remove_child_handler(self, pid):
- """Removes the handler for process 'pid'.
-
- The function returns True if the handler was successfully removed,
- False if there was nothing to remove."""
-
- raise NotImplementedError()
-
- def attach_loop(self, loop):
- """Attach the watcher to an event loop.
-
- If the watcher was previously attached to an event loop, then it is
- first detached before attaching to the new loop.
-
- Note: loop may be None.
- """
- raise NotImplementedError()
-
- def close(self):
- """Close the watcher.
-
- This must be called to make sure that any underlying resource is freed.
- """
- raise NotImplementedError()
-
- def is_active(self):
- """Return ``True`` if the watcher is active and is used by the event loop.
-
- Return True if the watcher is installed and ready to handle process exit
- notifications.
-
- """
- raise NotImplementedError()
-
- def __enter__(self):
- """Enter the watcher's context and allow starting new processes
-
- This function must return self"""
- raise NotImplementedError()
-
- def __exit__(self, a, b, c):
- """Exit the watcher's context"""
- raise NotImplementedError()
-
-
-class PidfdChildWatcher(AbstractChildWatcher):
+class _PidfdChildWatcher:
"""Child watcher implementation using Linux's pid file descriptors.
This child watcher polls process file descriptors (pidfds) to await child
@@ -963,21 +867,9 @@ class PidfdChildWatcher(AbstractChildWatcher):
recent (5.3+) kernels.
"""
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, exc_traceback):
- pass
-
def is_active(self):
return True
- def close(self):
- pass
-
- def attach_loop(self, loop):
- pass
-
def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
pidfd = os.pidfd_open(pid)
@@ -1002,14 +894,7 @@ class PidfdChildWatcher(AbstractChildWatcher):
os.close(pidfd)
callback(pid, returncode, *args)
- def remove_child_handler(self, pid):
- # asyncio never calls remove_child_handler() !!!
- # The method is no-op but is implemented because
- # abstract base classes require it.
- return True
-
-
-class ThreadedChildWatcher(AbstractChildWatcher):
+class _ThreadedChildWatcher:
"""Threaded child watcher implementation.
The watcher uses a thread per process
@@ -1029,15 +914,6 @@ class ThreadedChildWatcher(AbstractChildWatcher):
def is_active(self):
return True
- def close(self):
- pass
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
-
def __del__(self, _warn=warnings.warn):
threads = [thread for thread in list(self._threads.values())
if thread.is_alive()]
@@ -1055,15 +931,6 @@ class ThreadedChildWatcher(AbstractChildWatcher):
self._threads[pid] = thread
thread.start()
- def remove_child_handler(self, pid):
- # asyncio never calls remove_child_handler() !!!
- # The method is no-op but is implemented because
- # abstract base classes require it.
- return True
-
- def attach_loop(self, loop):
- pass
-
def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0
@@ -1103,29 +970,9 @@ def can_use_pidfd():
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
- """UNIX event loop policy with a watcher for child processes."""
+ """UNIX event loop policy"""
_loop_factory = _UnixSelectorEventLoop
- def __init__(self):
- super().__init__()
- if can_use_pidfd():
- self._watcher = PidfdChildWatcher()
- else:
- self._watcher = ThreadedChildWatcher()
-
- def set_event_loop(self, loop):
- """Set the event loop.
-
- As a side effect, if a child watcher was set before, then calling
- .set_event_loop() from the main thread will call .attach_loop(loop) on
- the child watcher.
- """
-
- super().set_event_loop(loop)
-
- if (self._watcher is not None and
- threading.current_thread() is threading.main_thread()):
- self._watcher.attach_loop(loop)
SelectorEventLoop = _UnixSelectorEventLoop
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 5b660de..34ea02b 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -2209,22 +2209,8 @@ if sys.platform == 'win32':
else:
import selectors
- class UnixEventLoopTestsMixin(EventLoopTestsMixin):
- def setUp(self):
- super().setUp()
- watcher = asyncio.ThreadedChildWatcher()
- watcher.attach_loop(self.loop)
- policy = asyncio.get_event_loop_policy()
- policy._watcher = watcher
-
- def tearDown(self):
- policy = asyncio.get_event_loop_policy()
- policy._watcher = None
- super().tearDown()
-
-
if hasattr(selectors, 'KqueueSelector'):
- class KqueueEventLoopTests(UnixEventLoopTestsMixin,
+ class KqueueEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):
@@ -2249,7 +2235,7 @@ else:
super().test_write_pty()
if hasattr(selectors, 'EpollSelector'):
- class EPollEventLoopTests(UnixEventLoopTestsMixin,
+ class EPollEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):
@@ -2257,7 +2243,7 @@ else:
return asyncio.SelectorEventLoop(selectors.EpollSelector())
if hasattr(selectors, 'PollSelector'):
- class PollEventLoopTests(UnixEventLoopTestsMixin,
+ class PollEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):
@@ -2265,7 +2251,7 @@ else:
return asyncio.SelectorEventLoop(selectors.PollSelector())
# Should always exist.
- class SelectEventLoopTests(UnixEventLoopTestsMixin,
+ class SelectEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):
@@ -2830,10 +2816,6 @@ class GetEventLoopTestsMixin:
def tearDown(self):
try:
- if sys.platform != 'win32':
- policy = asyncio.get_event_loop_policy()
- policy._watcher = None
-
super().tearDown()
finally:
self.loop.close()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index d7f03e6..23987c7 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -869,31 +869,27 @@ if sys.platform != 'win32':
# Unix
class SubprocessWatcherMixin(SubprocessMixin):
- Watcher = None
-
def setUp(self):
super().setUp()
policy = asyncio.get_event_loop_policy()
self.loop = policy.new_event_loop()
self.set_event_loop(self.loop)
- watcher = self._get_watcher()
- watcher.attach_loop(self.loop)
- policy._watcher = watcher
+ def test_watcher_implementation(self):
+ loop = self.loop
+ watcher = loop._watcher
+ if unix_events.can_use_pidfd():
+ self.assertIsInstance(watcher, unix_events._PidfdChildWatcher)
+ else:
+ self.assertIsInstance(watcher, unix_events._ThreadedChildWatcher)
- def tearDown(self):
- super().tearDown()
- policy = asyncio.get_event_loop_policy()
- watcher = policy._watcher
- policy._watcher = None
- watcher.attach_loop(None)
- watcher.close()
class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
-
- def _get_watcher(self):
- return unix_events.ThreadedChildWatcher()
+ def setUp(self):
+ # Force the use of the threaded child watcher
+ unix_events.can_use_pidfd = mock.Mock(return_value=False)
+ super().setUp()
@unittest.skipUnless(
unix_events.can_use_pidfd(),
@@ -902,9 +898,7 @@ if sys.platform != 'win32':
class SubprocessPidfdWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
- def _get_watcher(self):
- return unix_events.PidfdChildWatcher()
-
+ pass
else:
# Windows
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index 2ea698f..4966775 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -1112,32 +1112,6 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
self.assertFalse(self.protocol.connection_lost.called)
-class AbstractChildWatcherTests(unittest.TestCase):
-
- def test_warns_on_subclassing(self):
- with self.assertWarns(DeprecationWarning):
- class MyWatcher(asyncio.AbstractChildWatcher):
- pass
-
- def test_not_implemented(self):
- f = mock.Mock()
- watcher = asyncio.AbstractChildWatcher()
- self.assertRaises(
- NotImplementedError, watcher.add_child_handler, f, f)
- self.assertRaises(
- NotImplementedError, watcher.remove_child_handler, f)
- self.assertRaises(
- NotImplementedError, watcher.attach_loop, f)
- self.assertRaises(
- NotImplementedError, watcher.close)
- self.assertRaises(
- NotImplementedError, watcher.is_active)
- self.assertRaises(
- NotImplementedError, watcher.__enter__)
- self.assertRaises(
- NotImplementedError, watcher.__exit__, f, f, f)
-
-
class TestFunctional(unittest.TestCase):
def setUp(self):
diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py
index 3fe2ecd..dbb8d27 100644
--- a/Lib/test/test_asyncio/utils.py
+++ b/Lib/test/test_asyncio/utils.py
@@ -547,23 +547,6 @@ class TestCase(unittest.TestCase):
loop._default_executor.shutdown(wait=True)
loop.close()
- policy = support.maybe_get_event_loop_policy()
- if policy is not None:
- try:
- watcher = policy._watcher
- except AttributeError:
- # watcher is not implemented by EventLoopPolicy, e.g. Windows
- pass
- else:
- if isinstance(watcher, asyncio.ThreadedChildWatcher):
- # Wait for subprocess to finish, but not forever
- for thread in list(watcher._threads.values()):
- thread.join(timeout=support.SHORT_TIMEOUT)
- if thread.is_alive():
- raise RuntimeError(f"thread {thread} still alive: "
- "subprocess still running")
-
-
def set_event_loop(self, loop, *, cleanup=True):
if loop is None:
raise AssertionError('loop is None')