diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-11-04 23:50:46 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-11-04 23:50:46 (GMT) |
commit | 0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963 (patch) | |
tree | 4d8d97d49e5a5aa9aa03df73450545ab4d780943 /Lib/asyncio/unix_events.py | |
parent | ccea08462b753fc78ec97cc5717de8f163b503ec (diff) | |
download | cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.zip cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.tar.gz cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.tar.bz2 |
asyncio: Refactor SIGCHLD handling. By Anthony Baire.
Diffstat (limited to 'Lib/asyncio/unix_events.py')
-rw-r--r-- | Lib/asyncio/unix_events.py | 396 |
1 files changed, 348 insertions, 48 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index c95ad48..dd57fe8 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -8,6 +8,7 @@ import socket import stat import subprocess import sys +import threading from . import base_subprocess @@ -20,7 +21,10 @@ from . import transports from .log import logger -__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR'] +__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR', + 'AbstractChildWatcher', 'SafeChildWatcher', + 'FastChildWatcher', 'DefaultEventLoopPolicy', + ] STDIN = 0 STDOUT = 1 @@ -31,7 +35,7 @@ if sys.platform == 'win32': # pragma: no cover raise ImportError('Signals are not really supported on Windows') -class SelectorEventLoop(selector_events.BaseSelectorEventLoop): +class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): """Unix event loop Adds signal handling to SelectorEventLoop @@ -40,17 +44,10 @@ class SelectorEventLoop(selector_events.BaseSelectorEventLoop): def __init__(self, selector=None): super().__init__(selector) self._signal_handlers = {} - self._subprocesses = {} def _socketpair(self): return socket.socketpair() - def close(self): - handler = self._signal_handlers.get(signal.SIGCHLD) - if handler is not None: - self.remove_signal_handler(signal.SIGCHLD) - super().close() - def add_signal_handler(self, sig, callback, *args): """Add a handler for a signal. UNIX only. @@ -152,49 +149,20 @@ class SelectorEventLoop(selector_events.BaseSelectorEventLoop): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): - self._reg_sigchld() - transp = _UnixSubprocessTransport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - extra=None, **kwargs) - self._subprocesses[transp.get_pid()] = transp + with events.get_child_watcher() as watcher: + transp = _UnixSubprocessTransport(self, protocol, args, shell, + stdin, stdout, stderr, bufsize, + extra=None, **kwargs) + watcher.add_child_handler(transp.get_pid(), + self._child_watcher_callback, transp) yield from transp._post_init() return transp - def _reg_sigchld(self): - if signal.SIGCHLD not in self._signal_handlers: - self.add_signal_handler(signal.SIGCHLD, self._sig_chld) + def _child_watcher_callback(self, pid, returncode, transp): + self.call_soon_threadsafe(transp._process_exited, returncode) - def _sig_chld(self): - try: - # Because of signal coalescing, we must keep calling waitpid() as - # long as we're able to reap a child. - while True: - try: - pid, status = os.waitpid(-1, os.WNOHANG) - except ChildProcessError: - break # No more child processes exist. - if pid == 0: - break # All remaining child processes are still alive. - elif os.WIFSIGNALED(status): - # A child process died because of a signal. - returncode = -os.WTERMSIG(status) - elif os.WIFEXITED(status): - # A child process exited (e.g. sys.exit()). - returncode = os.WEXITSTATUS(status) - else: - # A child exited, but we don't understand its status. - # This shouldn't happen, but if it does, let's just - # return that status; perhaps that helps debug it. - returncode = status - transp = self._subprocesses.get(pid) - if transp is not None: - transp._process_exited(returncode) - except Exception: - logger.exception('Unknown exception in SIGCHLD handler') - - def _subprocess_closed(self, transport): - pid = transport.get_pid() - self._subprocesses.pop(pid, None) + def _subprocess_closed(self, transp): + pass def _set_nonblocking(fd): @@ -423,3 +391,335 @@ class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): if stdin_w is not None: stdin.close() self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize) + + +class AbstractChildWatcher: + """Abstract base class for monitoring child processes. + + Objects derived from this class monitor a collection of subprocesses and + report their termination or interruption by a signal. + + New callbacks are registered with .add_child_handler(). Starting a new + process must be done within a 'with' block to allow the watcher to suspend + its activity until the new process if fully registered (this is needed to + prevent a race condition in some implementations). + + Example: + with watcher: + proc = subprocess.Popen("sleep 1") + watcher.add_child_handler(proc.pid, callback) + + Notes: + Implementations of this class must be thread-safe. + + Since child watcher objects may catch the SIGCHLD signal and call + waitpid(-1), there should be only one active object per process. + """ + + def add_child_handler(self, pid, callback, *args): + """Register a new child handler. + + Arrange for callback(pid, returncode, *args) to be called when + process 'pid' terminates. Specifying another callback for the same + process replaces the previous handler. + + Note: callback() must be thread-safe + """ + raise NotImplementedError() + + def remove_child_handler(self, pid): + """Removes the handler for process 'pid'. + + The function returns True if the handler was successfully removed, + False if there was nothing to remove.""" + + raise NotImplementedError() + + def set_loop(self, loop): + """Reattach the watcher to another event loop. + + Note: loop may be None + """ + raise NotImplementedError() + + def close(self): + """Close the watcher. + + This must be called to make sure that any underlying resource is freed. + """ + raise NotImplementedError() + + def __enter__(self): + """Enter the watcher's context and allow starting new processes + + This function must return self""" + raise NotImplementedError() + + def __exit__(self, a, b, c): + """Exit the watcher's context""" + raise NotImplementedError() + + +class BaseChildWatcher(AbstractChildWatcher): + + def __init__(self, loop): + self._loop = None + self._callbacks = {} + + self.set_loop(loop) + + def close(self): + self.set_loop(None) + self._callbacks.clear() + + def _do_waitpid(self, expected_pid): + raise NotImplementedError() + + def _do_waitpid_all(self): + raise NotImplementedError() + + def set_loop(self, loop): + assert loop is None or isinstance(loop, events.AbstractEventLoop) + + if self._loop is not None: + self._loop.remove_signal_handler(signal.SIGCHLD) + + self._loop = loop + if loop is not None: + loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) + + # Prevent a race condition in case a child terminated + # during the switch. + self._do_waitpid_all() + + def remove_child_handler(self, pid): + try: + del self._callbacks[pid] + return True + except KeyError: + return False + + def _sig_chld(self): + try: + self._do_waitpid_all() + except Exception: + logger.exception('Unknown exception in SIGCHLD handler') + + def _compute_returncode(self, status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + + +class SafeChildWatcher(BaseChildWatcher): + """'Safe' child watcher implementation. + + This implementation avoids disrupting other code spawning processes by + polling explicitly each process in the SIGCHLD handler instead of calling + os.waitpid(-1). + + This is a safe solution but it has a significant overhead when handling a + big number of children (O(n) each time SIGCHLD is raised) + """ + + def __enter__(self): + return self + + def __exit__(self, a, b, c): + pass + + def add_child_handler(self, pid, callback, *args): + self._callbacks[pid] = callback, args + + # Prevent a race condition in case the child is already terminated. + self._do_waitpid(pid) + + def _do_waitpid_all(self): + + for pid in list(self._callbacks): + self._do_waitpid(pid) + + def _do_waitpid(self, expected_pid): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, os.WNOHANG) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + else: + if pid == 0: + # The child process is still alive. + return + + returncode = self._compute_returncode(status) + + try: + callback, args = self._callbacks.pop(pid) + except KeyError: # pragma: no cover + # May happen if .remove_child_handler() is called + # after os.waitpid() returns. + pass + else: + callback(pid, returncode, *args) + + +class FastChildWatcher(BaseChildWatcher): + """'Fast' child watcher implementation. + + This implementation reaps every terminated processes by calling + os.waitpid(-1) directly, possibly breaking other code spawning processes + and waiting for their termination. + + There is no noticeable overhead when handling a big number of children + (O(1) each time a child terminates). + """ + def __init__(self, loop): + super().__init__(loop) + + self._lock = threading.Lock() + self._zombies = {} + self._forks = 0 + + def close(self): + super().close() + self._zombies.clear() + + def __enter__(self): + with self._lock: + self._forks += 1 + + return self + + def __exit__(self, a, b, c): + with self._lock: + self._forks -= 1 + + if self._forks or not self._zombies: + return + + collateral_victims = str(self._zombies) + self._zombies.clear() + + logger.warning( + "Caught subprocesses termination from unknown pids: %s", + collateral_victims) + + def add_child_handler(self, pid, callback, *args): + assert self._forks, "Must use the context manager" + + self._callbacks[pid] = callback, args + + try: + # Ensure that the child is not already terminated. + # (raise KeyError if still alive) + returncode = self._zombies.pop(pid) + + # Child is dead, therefore we can fire the callback immediately. + # First we remove it from the dict. + # (raise KeyError if .remove_child_handler() was called in-between) + del self._callbacks[pid] + except KeyError: + pass + else: + callback(pid, returncode, *args) + + def _do_waitpid_all(self): + # Because of signal coalescing, we must keep calling waitpid() as + # long as we're able to reap a child. + while True: + try: + pid, status = os.waitpid(-1, os.WNOHANG) + except ChildProcessError: + # No more child processes exist. + return + else: + if pid == 0: + # A child process is still alive. + return + + returncode = self._compute_returncode(status) + + try: + callback, args = self._callbacks.pop(pid) + except KeyError: + # unknown child + with self._lock: + if self._forks: + # It may not be registered yet. + self._zombies[pid] = returncode + continue + + logger.warning( + "Caught subprocess termination from unknown pid: " + "%d -> %d", pid, returncode) + else: + callback(pid, returncode, *args) + + +class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): + """XXX""" + _loop_factory = _UnixSelectorEventLoop + + def __init__(self): + super().__init__() + self._watcher = None + + def _init_watcher(self): + with events._lock: + if self._watcher is None: # pragma: no branch + if isinstance(threading.current_thread(), + threading._MainThread): + self._watcher = SafeChildWatcher(self._local._loop) + else: + self._watcher = SafeChildWatcher(None) + + def set_event_loop(self, loop): + """Set the event loop. + + As a side effect, if a child watcher was set before, then calling + .set_event_loop() from the main thread will call .set_loop(loop) on the + child watcher. + """ + + super().set_event_loop(loop) + + if self._watcher is not None and \ + isinstance(threading.current_thread(), threading._MainThread): + self._watcher.set_loop(loop) + + def get_child_watcher(self): + """Get the child watcher + + If not yet set, a SafeChildWatcher object is automatically created. + """ + if self._watcher is None: + self._init_watcher() + + return self._watcher + + def set_child_watcher(self, watcher): + """Set the child watcher""" + + assert watcher is None or isinstance(watcher, AbstractChildWatcher) + + if self._watcher is not None: + self._watcher.close() + + self._watcher = watcher + +SelectorEventLoop = _UnixSelectorEventLoop +DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy |