diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2019-06-04 00:09:19 (GMT) |
---|---|---|
committer | Miss Islington (bot) <31488909+miss-islington@users.noreply.github.com> | 2019-06-04 00:09:19 (GMT) |
commit | 9535aff9421f0a5639f6e4c4bb0f07a743ea8dba (patch) | |
tree | f4488fd169ff1bbc62e47eaef6e0930af6be2852 /Lib | |
parent | eddef861b49f1635222a9e1771231c34a807debf (diff) | |
download | cpython-9535aff9421f0a5639f6e4c4bb0f07a743ea8dba.zip cpython-9535aff9421f0a5639f6e4c4bb0f07a743ea8dba.tar.gz cpython-9535aff9421f0a5639f6e4c4bb0f07a743ea8dba.tar.bz2 |
Revert "bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (#13630)" (GH-13793)
https://bugs.python.org/issue35621
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/unix_events.py | 240 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 40 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_unix_events.py | 34 |
3 files changed, 54 insertions, 260 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index b943845..28128d2 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -2,7 +2,6 @@ import errno import io -import itertools import os import selectors import signal @@ -30,9 +29,7 @@ from .log import logger __all__ = ( 'SelectorEventLoop', 'AbstractChildWatcher', 'SafeChildWatcher', - 'FastChildWatcher', - 'MultiLoopChildWatcher', 'ThreadedChildWatcher', - 'DefaultEventLoopPolicy', + 'FastChildWatcher', 'DefaultEventLoopPolicy', ) @@ -187,13 +184,6 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: - if not watcher.is_active(): - # Check early. - # Raising exception before process creation - # prevents subprocess execution if the watcher - # is not ready to handle it. - raise RuntimeError("asyncio.get_child_watcher() is not activated, " - "subprocess support is not installed.") waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, @@ -848,15 +838,6 @@ class AbstractChildWatcher: """ raise NotImplementedError() - def is_active(self): - """Watcher status. - - Return True if the watcher is installed and ready to handle process exit - notifications. - - """ - raise NotImplementedError() - def __enter__(self): """Enter the watcher's context and allow starting new processes @@ -868,20 +849,6 @@ class AbstractChildWatcher: raise NotImplementedError() -def _compute_returncode(status): - if os.WIFSIGNALED(status): - # The child process died because of a signal. - return -os.WTERMSIG(status) - elif os.WIFEXITED(status): - # The child process exited (e.g sys.exit()). - return os.WEXITSTATUS(status) - else: - # The child exited, but we don't understand its status. - # This shouldn't happen, but if it does, let's just - # return that status; perhaps that helps debug it. - return status - - class BaseChildWatcher(AbstractChildWatcher): def __init__(self): @@ -891,9 +858,6 @@ class BaseChildWatcher(AbstractChildWatcher): def close(self): self.attach_loop(None) - def is_active(self): - return self._loop is not None and self._loop.is_running() - def _do_waitpid(self, expected_pid): raise NotImplementedError() @@ -934,6 +898,19 @@ class BaseChildWatcher(AbstractChildWatcher): 'exception': exc, }) + def _compute_returncode(self, status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + class SafeChildWatcher(BaseChildWatcher): """'Safe' child watcher implementation. @@ -957,6 +934,11 @@ class SafeChildWatcher(BaseChildWatcher): pass def add_child_handler(self, pid, callback, *args): + if self._loop is None: + raise RuntimeError( + "Cannot add child handler, " + "the child watcher does not have a loop attached") + self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. @@ -992,7 +974,7 @@ class SafeChildWatcher(BaseChildWatcher): # The child process is still alive. return - returncode = _compute_returncode(status) + returncode = self._compute_returncode(status) if self._loop.get_debug(): logger.debug('process %s exited with returncode %s', expected_pid, returncode) @@ -1053,6 +1035,11 @@ class FastChildWatcher(BaseChildWatcher): def add_child_handler(self, pid, callback, *args): assert self._forks, "Must use the context manager" + if self._loop is None: + raise RuntimeError( + "Cannot add child handler, " + "the child watcher does not have a loop attached") + with self._lock: try: returncode = self._zombies.pop(pid) @@ -1085,7 +1072,7 @@ class FastChildWatcher(BaseChildWatcher): # A child process is still alive. return - returncode = _compute_returncode(status) + returncode = self._compute_returncode(status) with self._lock: try: @@ -1114,177 +1101,6 @@ class FastChildWatcher(BaseChildWatcher): callback(pid, returncode, *args) -class MultiLoopChildWatcher(AbstractChildWatcher): - # The class keeps compatibility with AbstractChildWatcher ABC - # To achieve this it has empty attach_loop() method - # and doesn't accept explicit loop argument - # for add_child_handler()/remove_child_handler() - # but retrieves the current loop by get_running_loop() - - def __init__(self): - self._callbacks = {} - self._saved_sighandler = None - - def is_active(self): - return self._saved_sighandler is not None - - def close(self): - self._callbacks.clear() - if self._saved_sighandler is not None: - handler = signal.getsignal(signal.SIGCHLD) - if handler != self._sig_chld: - logger.warning("SIGCHLD handler was changed by outside code") - else: - signal.signal(signal.SIGCHLD, self._saved_sighandler) - self._saved_sighandler = None - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def add_child_handler(self, pid, callback, *args): - loop = events.get_running_loop() - self._callbacks[pid] = (loop, callback, args) - - # Prevent a race condition in case the child is already terminated. - self._do_waitpid(pid) - - def remove_child_handler(self, pid): - try: - del self._callbacks[pid] - return True - except KeyError: - return False - - def attach_loop(self, loop): - # Don't save the loop but initialize itself if called first time - # The reason to do it here is that attach_loop() is called from - # unix policy only for the main thread. - # Main thread is required for subscription on SIGCHLD signal - if self._saved_sighandler is None: - self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) - if self._saved_sighandler is None: - logger.warning("Previous SIGCHLD handler was set by non-Python code, " - "restore to default handler on watcher close.") - self._saved_sighandler = signal.SIG_DFL - - # Set SA_RESTART to limit EINTR occurrences. - signal.siginterrupt(signal.SIGCHLD, False) - - def _do_waitpid_all(self): - for pid in list(self._callbacks): - self._do_waitpid(pid) - - def _do_waitpid(self, expected_pid): - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, os.WNOHANG) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - "Unknown child process pid %d, will report returncode 255", - pid) - debug_log = False - else: - if pid == 0: - # The child process is still alive. - return - - returncode = _compute_returncode(status) - debug_log = True - try: - loop, callback, args = self._callbacks.pop(pid) - except KeyError: # pragma: no cover - # May happen if .remove_child_handler() is called - # after os.waitpid() returns. - logger.warning("Child watcher got an unexpected pid: %r", - pid, exc_info=True) - else: - if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is closed", loop, pid) - else: - if debug_log and loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - loop.call_soon_threadsafe(callback, pid, returncode, *args) - - def _sig_chld(self, signum, frame): - try: - self._do_waitpid_all() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: - logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) - - -class ThreadedChildWatcher(AbstractChildWatcher): - # The watcher uses a thread per process - # for waiting for the process finish. - # It doesn't require subscription on POSIX signal - - def __init__(self): - self._pid_counter = itertools.count(0) - - def is_active(self): - return True - - def close(self): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def add_child_handler(self, pid, callback, *args): - loop = events.get_running_loop() - thread = threading.Thread(target=self._do_waitpid, - name=f"waitpid-{next(self._pid_counter)}", - args=(loop, pid, callback, args), - daemon=True) - thread.start() - - def remove_child_handler(self, pid): - # asyncio never calls remove_child_handler() !!! - # The method is no-op but is implemented because - # abstract base classe requires it - return True - - def attach_loop(self, loop): - pass - - def _do_waitpid(self, loop, expected_pid, callback, args): - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, 0) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - "Unknown child process pid %d, will report returncode 255", - pid) - else: - returncode = _compute_returncode(status) - if loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - - if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is closed", loop, pid) - else: - loop.call_soon_threadsafe(callback, pid, returncode, *args) - - class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): """UNIX event loop policy with a watcher for child processes.""" _loop_factory = _UnixSelectorEventLoop @@ -1296,7 +1112,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): def _init_watcher(self): with events._lock: if self._watcher is None: # pragma: no branch - self._watcher = ThreadedChildWatcher() + self._watcher = SafeChildWatcher() if isinstance(threading.current_thread(), threading._MainThread): self._watcher.attach_loop(self._local._loop) @@ -1318,7 +1134,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): def get_child_watcher(self): """Get the watcher for child processes. - If not yet set, a ThreadedChildWatcher object is automatically created. + If not yet set, a SafeChildWatcher object is automatically created. """ if self._watcher is None: self._init_watcher() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 582e172..7d72e6c 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -633,7 +633,6 @@ class SubprocessMixin: self.assertIsNone(self.loop.run_until_complete(execute())) - if sys.platform != 'win32': # Unix class SubprocessWatcherMixin(SubprocessMixin): @@ -649,24 +648,7 @@ if sys.platform != 'win32': watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) - - def tearDown(self): - super().setUp() - policy = asyncio.get_event_loop_policy() - watcher = policy.get_child_watcher() - policy.set_child_watcher(None) - watcher.attach_loop(None) - watcher.close() - - class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - - Watcher = unix_events.ThreadedChildWatcher - - class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin, - test_utils.TestCase): - - Watcher = unix_events.MultiLoopChildWatcher + self.addCleanup(policy.set_child_watcher, None) class SubprocessSafeWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): @@ -688,25 +670,5 @@ else: self.set_event_loop(self.loop) -class GenericWatcherTests: - - def test_create_subprocess_fails_with_inactive_watcher(self): - - async def execute(): - watcher = mock.create_authspec(asyncio.AbstractChildWatcher) - watcher.is_active.return_value = False - asyncio.set_child_watcher(watcher) - - with self.assertRaises(RuntimeError): - await subprocess.create_subprocess_exec( - support.FakePath(sys.executable), '-c', 'pass') - - watcher.add_child_handler.assert_not_called() - - self.assertIsNone(self.loop.run_until_complete(execute())) - - - - if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 462a8b3..5c610cd 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1083,8 +1083,6 @@ class AbstractChildWatcherTests(unittest.TestCase): self.assertRaises( NotImplementedError, watcher.close) self.assertRaises( - NotImplementedError, watcher.is_active) - self.assertRaises( NotImplementedError, watcher.__enter__) self.assertRaises( NotImplementedError, watcher.__exit__, f, f, f) @@ -1786,6 +1784,15 @@ class ChildWatcherTestsMixin: if isinstance(self.watcher, asyncio.FastChildWatcher): self.assertFalse(self.watcher._zombies) + @waitpid_mocks + def test_add_child_handler_with_no_loop_attached(self, m): + callback = mock.Mock() + with self.create_watcher() as watcher: + with self.assertRaisesRegex( + RuntimeError, + 'the child watcher does not have a loop attached'): + watcher.add_child_handler(100, callback) + class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): def create_watcher(self): @@ -1802,16 +1809,17 @@ class PolicyTests(unittest.TestCase): def create_policy(self): return asyncio.DefaultEventLoopPolicy() - def test_get_default_child_watcher(self): + def test_get_child_watcher(self): policy = self.create_policy() self.assertIsNone(policy._watcher) watcher = policy.get_child_watcher() - self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher) + self.assertIsInstance(watcher, asyncio.SafeChildWatcher) self.assertIs(policy._watcher, watcher) self.assertIs(watcher, policy.get_child_watcher()) + self.assertIsNone(watcher._loop) def test_get_child_watcher_after_set(self): policy = self.create_policy() @@ -1821,6 +1829,18 @@ class PolicyTests(unittest.TestCase): self.assertIs(policy._watcher, watcher) self.assertIs(watcher, policy.get_child_watcher()) + def test_get_child_watcher_with_mainloop_existing(self): + policy = self.create_policy() + loop = policy.get_event_loop() + + self.assertIsNone(policy._watcher) + watcher = policy.get_child_watcher() + + self.assertIsInstance(watcher, asyncio.SafeChildWatcher) + self.assertIs(watcher._loop, loop) + + loop.close() + def test_get_child_watcher_thread(self): def f(): @@ -1846,11 +1866,7 @@ class PolicyTests(unittest.TestCase): policy = self.create_policy() loop = policy.get_event_loop() - # Explicitly setup SafeChildWatcher, - # default ThreadedChildWatcher has no _loop property - watcher = asyncio.SafeChildWatcher() - policy.set_child_watcher(watcher) - watcher.attach_loop(loop) + watcher = policy.get_child_watcher() self.assertIs(watcher._loop, loop) |