From bf8cb31803558f1105efb15b0ee4bd184f3218c8 Mon Sep 17 00:00:00 2001 From: "Miss Islington (bot)" <31488909+miss-islington@users.noreply.github.com> Date: Sun, 30 Jun 2019 03:22:34 -0700 Subject: bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (GH-14344) (cherry picked from commit 0d671c04c39b52e44597491b893eb0b6c86b3d45) Co-authored-by: Andrew Svetlov --- Doc/library/asyncio-policy.rst | 62 ++++- Doc/library/asyncio-subprocess.rst | 26 +- Lib/asyncio/unix_events.py | 273 ++++++++++++++++++--- Lib/test/test_asyncio/test_subprocess.py | 40 ++- Lib/test/test_asyncio/test_unix_events.py | 34 +-- Lib/test/test_asyncio/utils.py | 13 + .../2019-05-28-19-03-46.bpo-35621.Abc1lf.rst | 2 + 7 files changed, 378 insertions(+), 72 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst diff --git a/Doc/library/asyncio-policy.rst b/Doc/library/asyncio-policy.rst index 6212df8..aa8f8f1 100644 --- a/Doc/library/asyncio-policy.rst +++ b/Doc/library/asyncio-policy.rst @@ -117,6 +117,7 @@ asyncio ships with the following built-in policies: .. availability:: Windows. +.. _asyncio-watchers: Process Watchers ================ @@ -129,10 +130,11 @@ In asyncio, child processes are created with :func:`create_subprocess_exec` and :meth:`loop.subprocess_exec` functions. -asyncio defines the :class:`AbstractChildWatcher` abstract base class, -which child watchers should implement, and has two different -implementations: :class:`SafeChildWatcher` (configured to be used -by default) and :class:`FastChildWatcher`. +asyncio defines the :class:`AbstractChildWatcher` abstract base class, which child +watchers should implement, and has four different implementations: +:class:`ThreadedChildWatcher` (configured to be used by default), +:class:`MultiLoopChildWatcher`, :class:`SafeChildWatcher`, and +:class:`FastChildWatcher`. See also the :ref:`Subprocess and Threads ` section. @@ -184,6 +186,15 @@ implementation used by the asyncio event loop: Note: loop may be ``None``. + .. method:: is_active() + + Return ``True`` if the watcher is ready to use. + + Spawning a subprocess with *inactive* current child watcher raises + :exc:`RuntimeError`. + + .. versionadded:: 3.8 + .. method:: close() Close the watcher. @@ -191,16 +202,48 @@ implementation used by the asyncio event loop: This method has to be called to ensure that underlying resources are cleaned-up. -.. class:: SafeChildWatcher +.. class:: ThreadedChildWatcher + + This implementation starts a new waiting thread for every subprocess spawn. + + It works reliably even when the asyncio event loop is run in a non-main OS thread. + + There is no noticeable overhead when handling a big number of children (*O(1)* each + time a child terminates), but stating a thread per process requires extra memory. + + This watcher is used by default. + + .. versionadded:: 3.8 - This implementation avoids disrupting other code spawning processes +.. class:: MultiLoopChildWatcher + + This implementation registers a :py:data:`SIGCHLD` signal handler on + instantiation. That can break third-party code that installs a custom handler for + `SIGCHLD`. signal). + + The watcher avoids disrupting other code spawning processes by polling every process explicitly on a :py:data:`SIGCHLD` signal. - This is a safe solution but it has a significant overhead when + There is no limitation for running subprocesses from different threads once the + watcher is installed. + + The solution is safe but it has a significant overhead when handling a big number of processes (*O(n)* each time a :py:data:`SIGCHLD` is received). - asyncio uses this safe implementation by default. + .. versionadded:: 3.8 + +.. class:: SafeChildWatcher + + This implementation uses active event loop from the main thread to handle + :py:data:`SIGCHLD` signal. If the main thread has no running event loop another + thread cannot spawn a subprocess (:exc:`RuntimeError` is raised). + + The watcher avoids disrupting other code spawning processes + by polling every process explicitly on a :py:data:`SIGCHLD` signal. + + This solution is as safe as :class:`MultiLoopChildWatcher` and has the same *O(N)* + complexity but requires a running event loop in the main thread to work. .. class:: FastChildWatcher @@ -211,6 +254,9 @@ implementation used by the asyncio event loop: There is no noticeable overhead when handling a big number of children (*O(1)* each time a child terminates). + This solution requires a running event loop in the main thread to work, as + :class:`SafeChildWatcher`. + Custom Policies =============== diff --git a/Doc/library/asyncio-subprocess.rst b/Doc/library/asyncio-subprocess.rst index 00dc66c..444fb63 100644 --- a/Doc/library/asyncio-subprocess.rst +++ b/Doc/library/asyncio-subprocess.rst @@ -293,18 +293,26 @@ their completion. Subprocess and Threads ---------------------- -Standard asyncio event loop supports running subprocesses from -different threads, but there are limitations: +Standard asyncio event loop supports running subprocesses from different threads by +default. -* An event loop must run in the main thread. +On Windows subprocesses are provided by :class:`ProactorEventLoop` only (default), +:class:`SelectorEventLoop` has no subprocess support. -* The child watcher must be instantiated in the main thread - before executing subprocesses from other threads. Call the - :func:`get_child_watcher` function in the main thread to instantiate - the child watcher. +On UNIX *child watchers* are used for subprocess finish waiting, see +:ref:`asyncio-watchers` for more info. -Note that alternative event loop implementations might not share -the above limitations; please refer to their documentation. + +.. versionchanged:: 3.8 + + UNIX switched to use :class:`ThreadedChildWatcher` for spawning subprocesses from + different threads without any limitation. + + Spawning a subprocess with *inactive* current child watcher raises + :exc:`RuntimeError`. + +Note that alternative event loop implementations might have own limitations; +please refer to their documentation. .. seealso:: diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 28128d2..d7a4af8 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -2,6 +2,7 @@ import errno import io +import itertools import os import selectors import signal @@ -12,7 +13,6 @@ import sys import threading import warnings - from . import base_events from . import base_subprocess from . import constants @@ -29,7 +29,9 @@ from .log import logger __all__ = ( 'SelectorEventLoop', 'AbstractChildWatcher', 'SafeChildWatcher', - 'FastChildWatcher', 'DefaultEventLoopPolicy', + 'FastChildWatcher', + 'MultiLoopChildWatcher', 'ThreadedChildWatcher', + 'DefaultEventLoopPolicy', ) @@ -184,6 +186,13 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: + if not watcher.is_active(): + # Check early. + # Raising exception before process creation + # prevents subprocess execution if the watcher + # is not ready to handle it. + raise RuntimeError("asyncio.get_child_watcher() is not activated, " + "subprocess support is not installed.") waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, @@ -838,6 +847,15 @@ class AbstractChildWatcher: """ raise NotImplementedError() + def is_active(self): + """Return ``True`` if the watcher is active and is used by the event loop. + + Return True if the watcher is installed and ready to handle process exit + notifications. + + """ + raise NotImplementedError() + def __enter__(self): """Enter the watcher's context and allow starting new processes @@ -849,6 +867,20 @@ class AbstractChildWatcher: raise NotImplementedError() +def _compute_returncode(status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + + class BaseChildWatcher(AbstractChildWatcher): def __init__(self): @@ -858,6 +890,9 @@ class BaseChildWatcher(AbstractChildWatcher): def close(self): self.attach_loop(None) + def is_active(self): + return self._loop is not None and self._loop.is_running() + def _do_waitpid(self, expected_pid): raise NotImplementedError() @@ -898,19 +933,6 @@ class BaseChildWatcher(AbstractChildWatcher): 'exception': exc, }) - def _compute_returncode(self, status): - if os.WIFSIGNALED(status): - # The child process died because of a signal. - return -os.WTERMSIG(status) - elif os.WIFEXITED(status): - # The child process exited (e.g sys.exit()). - return os.WEXITSTATUS(status) - else: - # The child exited, but we don't understand its status. - # This shouldn't happen, but if it does, let's just - # return that status; perhaps that helps debug it. - return status - class SafeChildWatcher(BaseChildWatcher): """'Safe' child watcher implementation. @@ -934,11 +956,6 @@ class SafeChildWatcher(BaseChildWatcher): pass def add_child_handler(self, pid, callback, *args): - if self._loop is None: - raise RuntimeError( - "Cannot add child handler, " - "the child watcher does not have a loop attached") - self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. @@ -974,7 +991,7 @@ class SafeChildWatcher(BaseChildWatcher): # The child process is still alive. return - returncode = self._compute_returncode(status) + returncode = _compute_returncode(status) if self._loop.get_debug(): logger.debug('process %s exited with returncode %s', expected_pid, returncode) @@ -1035,11 +1052,6 @@ class FastChildWatcher(BaseChildWatcher): def add_child_handler(self, pid, callback, *args): assert self._forks, "Must use the context manager" - if self._loop is None: - raise RuntimeError( - "Cannot add child handler, " - "the child watcher does not have a loop attached") - with self._lock: try: returncode = self._zombies.pop(pid) @@ -1072,7 +1084,7 @@ class FastChildWatcher(BaseChildWatcher): # A child process is still alive. return - returncode = self._compute_returncode(status) + returncode = _compute_returncode(status) with self._lock: try: @@ -1101,6 +1113,209 @@ class FastChildWatcher(BaseChildWatcher): callback(pid, returncode, *args) +class MultiLoopChildWatcher(AbstractChildWatcher): + """A watcher that doesn't require running loop in the main thread. + + This implementation registers a SIGCHLD signal handler on + instantiation (which may conflict with other code that + install own handler for this signal). + + The solution is safe but it has a significant overhead when + handling a big number of processes (*O(n)* each time a + SIGCHLD is received). + """ + + # Implementation note: + # The class keeps compatibility with AbstractChildWatcher ABC + # To achieve this it has empty attach_loop() method + # and doesn't accept explicit loop argument + # for add_child_handler()/remove_child_handler() + # but retrieves the current loop by get_running_loop() + + def __init__(self): + self._callbacks = {} + self._saved_sighandler = None + + def is_active(self): + return self._saved_sighandler is not None + + def close(self): + self._callbacks.clear() + if self._saved_sighandler is not None: + handler = signal.getsignal(signal.SIGCHLD) + if handler != self._sig_chld: + logger.warning("SIGCHLD handler was changed by outside code") + else: + signal.signal(signal.SIGCHLD, self._saved_sighandler) + self._saved_sighandler = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def add_child_handler(self, pid, callback, *args): + loop = events.get_running_loop() + self._callbacks[pid] = (loop, callback, args) + + # Prevent a race condition in case the child is already terminated. + self._do_waitpid(pid) + + def remove_child_handler(self, pid): + try: + del self._callbacks[pid] + return True + except KeyError: + return False + + def attach_loop(self, loop): + # Don't save the loop but initialize itself if called first time + # The reason to do it here is that attach_loop() is called from + # unix policy only for the main thread. + # Main thread is required for subscription on SIGCHLD signal + if self._saved_sighandler is None: + self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) + if self._saved_sighandler is None: + logger.warning("Previous SIGCHLD handler was set by non-Python code, " + "restore to default handler on watcher close.") + self._saved_sighandler = signal.SIG_DFL + + # Set SA_RESTART to limit EINTR occurrences. + signal.siginterrupt(signal.SIGCHLD, False) + + def _do_waitpid_all(self): + for pid in list(self._callbacks): + self._do_waitpid(pid) + + def _do_waitpid(self, expected_pid): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, os.WNOHANG) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + debug_log = False + else: + if pid == 0: + # The child process is still alive. + return + + returncode = _compute_returncode(status) + debug_log = True + try: + loop, callback, args = self._callbacks.pop(pid) + except KeyError: # pragma: no cover + # May happen if .remove_child_handler() is called + # after os.waitpid() returns. + logger.warning("Child watcher got an unexpected pid: %r", + pid, exc_info=True) + else: + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is closed", loop, pid) + else: + if debug_log and loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + def _sig_chld(self, signum, frame): + try: + self._do_waitpid_all() + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: + logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) + + +class ThreadedChildWatcher(AbstractChildWatcher): + """Threaded child watcher implementation. + + The watcher uses a thread per process + for waiting for the process finish. + + It doesn't require subscription on POSIX signal + but a thread creation is not free. + + The watcher has O(1) complexity, its perfomance doesn't depend + on amount of spawn processes. + """ + + def __init__(self): + self._pid_counter = itertools.count(0) + self._threads = {} + + def is_active(self): + return True + + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __del__(self, _warn=warnings.warn): + threads = [thread for thread in list(self._threads.values()) + if thread.is_alive()] + if threads: + _warn(f"{self.__class__} has registered but not finished child processes", + ResourceWarning, + source=self) + + def add_child_handler(self, pid, callback, *args): + loop = events.get_running_loop() + thread = threading.Thread(target=self._do_waitpid, + name=f"waitpid-{next(self._pid_counter)}", + args=(loop, pid, callback, args), + daemon=True) + self._threads[pid] = thread + thread.start() + + def remove_child_handler(self, pid): + # asyncio never calls remove_child_handler() !!! + # The method is no-op but is implemented because + # abstract base classe requires it + return True + + def attach_loop(self, loop): + pass + + def _do_waitpid(self, loop, expected_pid, callback, args): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, 0) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + else: + returncode = _compute_returncode(status) + if loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is closed", loop, pid) + else: + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + self._threads.pop(expected_pid) + + class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): """UNIX event loop policy with a watcher for child processes.""" _loop_factory = _UnixSelectorEventLoop @@ -1112,7 +1327,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): def _init_watcher(self): with events._lock: if self._watcher is None: # pragma: no branch - self._watcher = SafeChildWatcher() + self._watcher = ThreadedChildWatcher() if isinstance(threading.current_thread(), threading._MainThread): self._watcher.attach_loop(self._local._loop) @@ -1134,7 +1349,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): def get_child_watcher(self): """Get the watcher for child processes. - If not yet set, a SafeChildWatcher object is automatically created. + If not yet set, a ThreadedChildWatcher object is automatically created. """ if self._watcher is None: self._init_watcher() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index e9a9e50..b9578b2 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -633,6 +633,7 @@ class SubprocessMixin: self.assertIsNone(self.loop.run_until_complete(execute())) + if sys.platform != 'win32': # Unix class SubprocessWatcherMixin(SubprocessMixin): @@ -648,7 +649,24 @@ if sys.platform != 'win32': watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) - self.addCleanup(policy.set_child_watcher, None) + + def tearDown(self): + super().tearDown() + policy = asyncio.get_event_loop_policy() + watcher = policy.get_child_watcher() + policy.set_child_watcher(None) + watcher.attach_loop(None) + watcher.close() + + class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, + test_utils.TestCase): + + Watcher = unix_events.ThreadedChildWatcher + + class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin, + test_utils.TestCase): + + Watcher = unix_events.MultiLoopChildWatcher class SubprocessSafeWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): @@ -670,5 +688,25 @@ else: self.set_event_loop(self.loop) +class GenericWatcherTests: + + def test_create_subprocess_fails_with_inactive_watcher(self): + + async def execute(): + watcher = mock.create_authspec(asyncio.AbstractChildWatcher) + watcher.is_active.return_value = False + asyncio.set_child_watcher(watcher) + + with self.assertRaises(RuntimeError): + await subprocess.create_subprocess_exec( + support.FakePath(sys.executable), '-c', 'pass') + + watcher.add_child_handler.assert_not_called() + + self.assertIsNone(self.loop.run_until_complete(execute())) + + + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 5c610cd..462a8b3 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1083,6 +1083,8 @@ class AbstractChildWatcherTests(unittest.TestCase): self.assertRaises( NotImplementedError, watcher.close) self.assertRaises( + NotImplementedError, watcher.is_active) + self.assertRaises( NotImplementedError, watcher.__enter__) self.assertRaises( NotImplementedError, watcher.__exit__, f, f, f) @@ -1784,15 +1786,6 @@ class ChildWatcherTestsMixin: if isinstance(self.watcher, asyncio.FastChildWatcher): self.assertFalse(self.watcher._zombies) - @waitpid_mocks - def test_add_child_handler_with_no_loop_attached(self, m): - callback = mock.Mock() - with self.create_watcher() as watcher: - with self.assertRaisesRegex( - RuntimeError, - 'the child watcher does not have a loop attached'): - watcher.add_child_handler(100, callback) - class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): def create_watcher(self): @@ -1809,17 +1802,16 @@ class PolicyTests(unittest.TestCase): def create_policy(self): return asyncio.DefaultEventLoopPolicy() - def test_get_child_watcher(self): + def test_get_default_child_watcher(self): policy = self.create_policy() self.assertIsNone(policy._watcher) watcher = policy.get_child_watcher() - self.assertIsInstance(watcher, asyncio.SafeChildWatcher) + self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher) self.assertIs(policy._watcher, watcher) self.assertIs(watcher, policy.get_child_watcher()) - self.assertIsNone(watcher._loop) def test_get_child_watcher_after_set(self): policy = self.create_policy() @@ -1829,18 +1821,6 @@ class PolicyTests(unittest.TestCase): self.assertIs(policy._watcher, watcher) self.assertIs(watcher, policy.get_child_watcher()) - def test_get_child_watcher_with_mainloop_existing(self): - policy = self.create_policy() - loop = policy.get_event_loop() - - self.assertIsNone(policy._watcher) - watcher = policy.get_child_watcher() - - self.assertIsInstance(watcher, asyncio.SafeChildWatcher) - self.assertIs(watcher._loop, loop) - - loop.close() - def test_get_child_watcher_thread(self): def f(): @@ -1866,7 +1846,11 @@ class PolicyTests(unittest.TestCase): policy = self.create_policy() loop = policy.get_event_loop() - watcher = policy.get_child_watcher() + # Explicitly setup SafeChildWatcher, + # default ThreadedChildWatcher has no _loop property + watcher = asyncio.SafeChildWatcher() + policy.set_child_watcher(watcher) + watcher.attach_loop(loop) self.assertIs(watcher._loop, loop) diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index cb373d5..5b4bb12 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -1,5 +1,6 @@ """Utilities shared by tests.""" +import asyncio import collections import contextlib import io @@ -512,6 +513,18 @@ class TestCase(unittest.TestCase): if executor is not None: executor.shutdown(wait=True) loop.close() + policy = support.maybe_get_event_loop_policy() + if policy is not None: + try: + watcher = policy.get_child_watcher() + except NotImplementedError: + # watcher is not implemented by EventLoopPolicy, e.g. Windows + pass + else: + if isinstance(watcher, asyncio.ThreadedChildWatcher): + threads = list(watcher._threads.values()) + for thread in threads: + thread.join() def set_event_loop(self, loop, *, cleanup=True): assert loop is not None diff --git a/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst b/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst new file mode 100644 index 0000000..c492e1d --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst @@ -0,0 +1,2 @@ +Support running asyncio subprocesses when execution event loop in a thread +on UNIX. -- cgit v0.12