summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/asyncio/unix_events.py240
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py40
-rw-r--r--Lib/test/test_asyncio/test_unix_events.py34
-rw-r--r--Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst2
4 files changed, 54 insertions, 262 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index b943845..28128d2 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -2,7 +2,6 @@
import errno
import io
-import itertools
import os
import selectors
import signal
@@ -30,9 +29,7 @@ from .log import logger
__all__ = (
'SelectorEventLoop',
'AbstractChildWatcher', 'SafeChildWatcher',
- 'FastChildWatcher',
- 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
- 'DefaultEventLoopPolicy',
+ 'FastChildWatcher', 'DefaultEventLoopPolicy',
)
@@ -187,13 +184,6 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
- if not watcher.is_active():
- # Check early.
- # Raising exception before process creation
- # prevents subprocess execution if the watcher
- # is not ready to handle it.
- raise RuntimeError("asyncio.get_child_watcher() is not activated, "
- "subprocess support is not installed.")
waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
@@ -848,15 +838,6 @@ class AbstractChildWatcher:
"""
raise NotImplementedError()
- def is_active(self):
- """Watcher status.
-
- Return True if the watcher is installed and ready to handle process exit
- notifications.
-
- """
- raise NotImplementedError()
-
def __enter__(self):
"""Enter the watcher's context and allow starting new processes
@@ -868,20 +849,6 @@ class AbstractChildWatcher:
raise NotImplementedError()
-def _compute_returncode(status):
- if os.WIFSIGNALED(status):
- # The child process died because of a signal.
- return -os.WTERMSIG(status)
- elif os.WIFEXITED(status):
- # The child process exited (e.g sys.exit()).
- return os.WEXITSTATUS(status)
- else:
- # The child exited, but we don't understand its status.
- # This shouldn't happen, but if it does, let's just
- # return that status; perhaps that helps debug it.
- return status
-
-
class BaseChildWatcher(AbstractChildWatcher):
def __init__(self):
@@ -891,9 +858,6 @@ class BaseChildWatcher(AbstractChildWatcher):
def close(self):
self.attach_loop(None)
- def is_active(self):
- return self._loop is not None and self._loop.is_running()
-
def _do_waitpid(self, expected_pid):
raise NotImplementedError()
@@ -934,6 +898,19 @@ class BaseChildWatcher(AbstractChildWatcher):
'exception': exc,
})
+ def _compute_returncode(self, status):
+ if os.WIFSIGNALED(status):
+ # The child process died because of a signal.
+ return -os.WTERMSIG(status)
+ elif os.WIFEXITED(status):
+ # The child process exited (e.g sys.exit()).
+ return os.WEXITSTATUS(status)
+ else:
+ # The child exited, but we don't understand its status.
+ # This shouldn't happen, but if it does, let's just
+ # return that status; perhaps that helps debug it.
+ return status
+
class SafeChildWatcher(BaseChildWatcher):
"""'Safe' child watcher implementation.
@@ -957,6 +934,11 @@ class SafeChildWatcher(BaseChildWatcher):
pass
def add_child_handler(self, pid, callback, *args):
+ if self._loop is None:
+ raise RuntimeError(
+ "Cannot add child handler, "
+ "the child watcher does not have a loop attached")
+
self._callbacks[pid] = (callback, args)
# Prevent a race condition in case the child is already terminated.
@@ -992,7 +974,7 @@ class SafeChildWatcher(BaseChildWatcher):
# The child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = self._compute_returncode(status)
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
@@ -1053,6 +1035,11 @@ class FastChildWatcher(BaseChildWatcher):
def add_child_handler(self, pid, callback, *args):
assert self._forks, "Must use the context manager"
+ if self._loop is None:
+ raise RuntimeError(
+ "Cannot add child handler, "
+ "the child watcher does not have a loop attached")
+
with self._lock:
try:
returncode = self._zombies.pop(pid)
@@ -1085,7 +1072,7 @@ class FastChildWatcher(BaseChildWatcher):
# A child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = self._compute_returncode(status)
with self._lock:
try:
@@ -1114,177 +1101,6 @@ class FastChildWatcher(BaseChildWatcher):
callback(pid, returncode, *args)
-class MultiLoopChildWatcher(AbstractChildWatcher):
- # The class keeps compatibility with AbstractChildWatcher ABC
- # To achieve this it has empty attach_loop() method
- # and doesn't accept explicit loop argument
- # for add_child_handler()/remove_child_handler()
- # but retrieves the current loop by get_running_loop()
-
- def __init__(self):
- self._callbacks = {}
- self._saved_sighandler = None
-
- def is_active(self):
- return self._saved_sighandler is not None
-
- def close(self):
- self._callbacks.clear()
- if self._saved_sighandler is not None:
- handler = signal.getsignal(signal.SIGCHLD)
- if handler != self._sig_chld:
- logger.warning("SIGCHLD handler was changed by outside code")
- else:
- signal.signal(signal.SIGCHLD, self._saved_sighandler)
- self._saved_sighandler = None
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
-
- def add_child_handler(self, pid, callback, *args):
- loop = events.get_running_loop()
- self._callbacks[pid] = (loop, callback, args)
-
- # Prevent a race condition in case the child is already terminated.
- self._do_waitpid(pid)
-
- def remove_child_handler(self, pid):
- try:
- del self._callbacks[pid]
- return True
- except KeyError:
- return False
-
- def attach_loop(self, loop):
- # Don't save the loop but initialize itself if called first time
- # The reason to do it here is that attach_loop() is called from
- # unix policy only for the main thread.
- # Main thread is required for subscription on SIGCHLD signal
- if self._saved_sighandler is None:
- self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
- if self._saved_sighandler is None:
- logger.warning("Previous SIGCHLD handler was set by non-Python code, "
- "restore to default handler on watcher close.")
- self._saved_sighandler = signal.SIG_DFL
-
- # Set SA_RESTART to limit EINTR occurrences.
- signal.siginterrupt(signal.SIGCHLD, False)
-
- def _do_waitpid_all(self):
- for pid in list(self._callbacks):
- self._do_waitpid(pid)
-
- def _do_waitpid(self, expected_pid):
- assert expected_pid > 0
-
- try:
- pid, status = os.waitpid(expected_pid, os.WNOHANG)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- pid = expected_pid
- returncode = 255
- logger.warning(
- "Unknown child process pid %d, will report returncode 255",
- pid)
- debug_log = False
- else:
- if pid == 0:
- # The child process is still alive.
- return
-
- returncode = _compute_returncode(status)
- debug_log = True
- try:
- loop, callback, args = self._callbacks.pop(pid)
- except KeyError: # pragma: no cover
- # May happen if .remove_child_handler() is called
- # after os.waitpid() returns.
- logger.warning("Child watcher got an unexpected pid: %r",
- pid, exc_info=True)
- else:
- if loop.is_closed():
- logger.warning("Loop %r that handles pid %r is closed", loop, pid)
- else:
- if debug_log and loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
- loop.call_soon_threadsafe(callback, pid, returncode, *args)
-
- def _sig_chld(self, signum, frame):
- try:
- self._do_waitpid_all()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
- logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
-
-
-class ThreadedChildWatcher(AbstractChildWatcher):
- # The watcher uses a thread per process
- # for waiting for the process finish.
- # It doesn't require subscription on POSIX signal
-
- def __init__(self):
- self._pid_counter = itertools.count(0)
-
- def is_active(self):
- return True
-
- def close(self):
- pass
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
-
- def add_child_handler(self, pid, callback, *args):
- loop = events.get_running_loop()
- thread = threading.Thread(target=self._do_waitpid,
- name=f"waitpid-{next(self._pid_counter)}",
- args=(loop, pid, callback, args),
- daemon=True)
- thread.start()
-
- def remove_child_handler(self, pid):
- # asyncio never calls remove_child_handler() !!!
- # The method is no-op but is implemented because
- # abstract base classe requires it
- return True
-
- def attach_loop(self, loop):
- pass
-
- def _do_waitpid(self, loop, expected_pid, callback, args):
- assert expected_pid > 0
-
- try:
- pid, status = os.waitpid(expected_pid, 0)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- pid = expected_pid
- returncode = 255
- logger.warning(
- "Unknown child process pid %d, will report returncode 255",
- pid)
- else:
- returncode = _compute_returncode(status)
- if loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
-
- if loop.is_closed():
- logger.warning("Loop %r that handles pid %r is closed", loop, pid)
- else:
- loop.call_soon_threadsafe(callback, pid, returncode, *args)
-
-
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes."""
_loop_factory = _UnixSelectorEventLoop
@@ -1296,7 +1112,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
def _init_watcher(self):
with events._lock:
if self._watcher is None: # pragma: no branch
- self._watcher = ThreadedChildWatcher()
+ self._watcher = SafeChildWatcher()
if isinstance(threading.current_thread(),
threading._MainThread):
self._watcher.attach_loop(self._local._loop)
@@ -1318,7 +1134,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
def get_child_watcher(self):
"""Get the watcher for child processes.
- If not yet set, a ThreadedChildWatcher object is automatically created.
+ If not yet set, a SafeChildWatcher object is automatically created.
"""
if self._watcher is None:
self._init_watcher()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index 582e172..7d72e6c 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -633,7 +633,6 @@ class SubprocessMixin:
self.assertIsNone(self.loop.run_until_complete(execute()))
-
if sys.platform != 'win32':
# Unix
class SubprocessWatcherMixin(SubprocessMixin):
@@ -649,24 +648,7 @@ if sys.platform != 'win32':
watcher = self.Watcher()
watcher.attach_loop(self.loop)
policy.set_child_watcher(watcher)
-
- def tearDown(self):
- super().setUp()
- policy = asyncio.get_event_loop_policy()
- watcher = policy.get_child_watcher()
- policy.set_child_watcher(None)
- watcher.attach_loop(None)
- watcher.close()
-
- class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
- test_utils.TestCase):
-
- Watcher = unix_events.ThreadedChildWatcher
-
- class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
- test_utils.TestCase):
-
- Watcher = unix_events.MultiLoopChildWatcher
+ self.addCleanup(policy.set_child_watcher, None)
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
@@ -688,25 +670,5 @@ else:
self.set_event_loop(self.loop)
-class GenericWatcherTests:
-
- def test_create_subprocess_fails_with_inactive_watcher(self):
-
- async def execute():
- watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
- watcher.is_active.return_value = False
- asyncio.set_child_watcher(watcher)
-
- with self.assertRaises(RuntimeError):
- await subprocess.create_subprocess_exec(
- support.FakePath(sys.executable), '-c', 'pass')
-
- watcher.add_child_handler.assert_not_called()
-
- self.assertIsNone(self.loop.run_until_complete(execute()))
-
-
-
-
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index 462a8b3..5c610cd 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -1083,8 +1083,6 @@ class AbstractChildWatcherTests(unittest.TestCase):
self.assertRaises(
NotImplementedError, watcher.close)
self.assertRaises(
- NotImplementedError, watcher.is_active)
- self.assertRaises(
NotImplementedError, watcher.__enter__)
self.assertRaises(
NotImplementedError, watcher.__exit__, f, f, f)
@@ -1786,6 +1784,15 @@ class ChildWatcherTestsMixin:
if isinstance(self.watcher, asyncio.FastChildWatcher):
self.assertFalse(self.watcher._zombies)
+ @waitpid_mocks
+ def test_add_child_handler_with_no_loop_attached(self, m):
+ callback = mock.Mock()
+ with self.create_watcher() as watcher:
+ with self.assertRaisesRegex(
+ RuntimeError,
+ 'the child watcher does not have a loop attached'):
+ watcher.add_child_handler(100, callback)
+
class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
def create_watcher(self):
@@ -1802,16 +1809,17 @@ class PolicyTests(unittest.TestCase):
def create_policy(self):
return asyncio.DefaultEventLoopPolicy()
- def test_get_default_child_watcher(self):
+ def test_get_child_watcher(self):
policy = self.create_policy()
self.assertIsNone(policy._watcher)
watcher = policy.get_child_watcher()
- self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
+ self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
self.assertIs(policy._watcher, watcher)
self.assertIs(watcher, policy.get_child_watcher())
+ self.assertIsNone(watcher._loop)
def test_get_child_watcher_after_set(self):
policy = self.create_policy()
@@ -1821,6 +1829,18 @@ class PolicyTests(unittest.TestCase):
self.assertIs(policy._watcher, watcher)
self.assertIs(watcher, policy.get_child_watcher())
+ def test_get_child_watcher_with_mainloop_existing(self):
+ policy = self.create_policy()
+ loop = policy.get_event_loop()
+
+ self.assertIsNone(policy._watcher)
+ watcher = policy.get_child_watcher()
+
+ self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
+ self.assertIs(watcher._loop, loop)
+
+ loop.close()
+
def test_get_child_watcher_thread(self):
def f():
@@ -1846,11 +1866,7 @@ class PolicyTests(unittest.TestCase):
policy = self.create_policy()
loop = policy.get_event_loop()
- # Explicitly setup SafeChildWatcher,
- # default ThreadedChildWatcher has no _loop property
- watcher = asyncio.SafeChildWatcher()
- policy.set_child_watcher(watcher)
- watcher.attach_loop(loop)
+ watcher = policy.get_child_watcher()
self.assertIs(watcher._loop, loop)
diff --git a/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst b/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst
deleted file mode 100644
index c492e1d..0000000
--- a/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst
+++ /dev/null
@@ -1,2 +0,0 @@
-Support running asyncio subprocesses when execution event loop in a thread
-on UNIX.