diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-11-04 23:50:46 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-11-04 23:50:46 (GMT) |
commit | 0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963 (patch) | |
tree | 4d8d97d49e5a5aa9aa03df73450545ab4d780943 /Lib/asyncio | |
parent | ccea08462b753fc78ec97cc5717de8f163b503ec (diff) | |
download | cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.zip cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.tar.gz cpython-0eaa5ac9b5b47d4ae0e851a8652fdc6be7a15963.tar.bz2 |
asyncio: Refactor SIGCHLD handling. By Anthony Baire.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/events.py | 70 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 396 | ||||
-rw-r--r-- | Lib/asyncio/windows_events.py | 17 |
3 files changed, 414 insertions, 69 deletions
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 7ebc3cb..36ae312 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -1,10 +1,11 @@ """Event loop and event loop policy.""" -__all__ = ['AbstractEventLoopPolicy', 'DefaultEventLoopPolicy', +__all__ = ['AbstractEventLoopPolicy', 'AbstractEventLoop', 'AbstractServer', 'Handle', 'TimerHandle', 'get_event_loop_policy', 'set_event_loop_policy', 'get_event_loop', 'set_event_loop', 'new_event_loop', + 'get_child_watcher', 'set_child_watcher', ] import subprocess @@ -318,8 +319,18 @@ class AbstractEventLoopPolicy: """XXX""" raise NotImplementedError + # Child processes handling (Unix only). -class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): + def get_child_watcher(self): + """XXX""" + raise NotImplementedError + + def set_child_watcher(self, watcher): + """XXX""" + raise NotImplementedError + + +class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): """Default policy implementation for accessing the event loop. In this policy, each thread has its own event loop. However, we @@ -332,28 +343,34 @@ class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): associated). """ - _loop = None - _set_called = False + _loop_factory = None + + class _Local(threading.local): + _loop = None + _set_called = False + + def __init__(self): + self._local = self._Local() def get_event_loop(self): """Get the event loop. This may be None or an instance of EventLoop. """ - if (self._loop is None and - not self._set_called and + if (self._local._loop is None and + not self._local._set_called and isinstance(threading.current_thread(), threading._MainThread)): - self._loop = self.new_event_loop() - assert self._loop is not None, \ + self._local._loop = self.new_event_loop() + assert self._local._loop is not None, \ ('There is no current event loop in thread %r.' % threading.current_thread().name) - return self._loop + return self._local._loop def set_event_loop(self, loop): """Set the event loop.""" - self._set_called = True + self._local._set_called = True assert loop is None or isinstance(loop, AbstractEventLoop) - self._loop = loop + self._local._loop = loop def new_event_loop(self): """Create a new event loop. @@ -361,12 +378,7 @@ class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): You must call set_event_loop() to make this the current event loop. """ - if sys.platform == 'win32': # pragma: no cover - from . import windows_events - return windows_events.SelectorEventLoop() - else: # pragma: no cover - from . import unix_events - return unix_events.SelectorEventLoop() + return self._loop_factory() # Event loop policy. The policy itself is always global, even if the @@ -375,12 +387,22 @@ class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy): # call to get_event_loop_policy(). _event_loop_policy = None +# Lock for protecting the on-the-fly creation of the event loop policy. +_lock = threading.Lock() + + +def _init_event_loop_policy(): + global _event_loop_policy + with _lock: + if _event_loop_policy is None: # pragma: no branch + from . import DefaultEventLoopPolicy + _event_loop_policy = DefaultEventLoopPolicy() + def get_event_loop_policy(): """XXX""" - global _event_loop_policy if _event_loop_policy is None: - _event_loop_policy = DefaultEventLoopPolicy() + _init_event_loop_policy() return _event_loop_policy @@ -404,3 +426,13 @@ def set_event_loop(loop): def new_event_loop(): """XXX""" return get_event_loop_policy().new_event_loop() + + +def get_child_watcher(): + """XXX""" + return get_event_loop_policy().get_child_watcher() + + +def set_child_watcher(watcher): + """XXX""" + return get_event_loop_policy().set_child_watcher(watcher) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index c95ad48..dd57fe8 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -8,6 +8,7 @@ import socket import stat import subprocess import sys +import threading from . import base_subprocess @@ -20,7 +21,10 @@ from . import transports from .log import logger -__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR'] +__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR', + 'AbstractChildWatcher', 'SafeChildWatcher', + 'FastChildWatcher', 'DefaultEventLoopPolicy', + ] STDIN = 0 STDOUT = 1 @@ -31,7 +35,7 @@ if sys.platform == 'win32': # pragma: no cover raise ImportError('Signals are not really supported on Windows') -class SelectorEventLoop(selector_events.BaseSelectorEventLoop): +class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): """Unix event loop Adds signal handling to SelectorEventLoop @@ -40,17 +44,10 @@ class SelectorEventLoop(selector_events.BaseSelectorEventLoop): def __init__(self, selector=None): super().__init__(selector) self._signal_handlers = {} - self._subprocesses = {} def _socketpair(self): return socket.socketpair() - def close(self): - handler = self._signal_handlers.get(signal.SIGCHLD) - if handler is not None: - self.remove_signal_handler(signal.SIGCHLD) - super().close() - def add_signal_handler(self, sig, callback, *args): """Add a handler for a signal. UNIX only. @@ -152,49 +149,20 @@ class SelectorEventLoop(selector_events.BaseSelectorEventLoop): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): - self._reg_sigchld() - transp = _UnixSubprocessTransport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - extra=None, **kwargs) - self._subprocesses[transp.get_pid()] = transp + with events.get_child_watcher() as watcher: + transp = _UnixSubprocessTransport(self, protocol, args, shell, + stdin, stdout, stderr, bufsize, + extra=None, **kwargs) + watcher.add_child_handler(transp.get_pid(), + self._child_watcher_callback, transp) yield from transp._post_init() return transp - def _reg_sigchld(self): - if signal.SIGCHLD not in self._signal_handlers: - self.add_signal_handler(signal.SIGCHLD, self._sig_chld) + def _child_watcher_callback(self, pid, returncode, transp): + self.call_soon_threadsafe(transp._process_exited, returncode) - def _sig_chld(self): - try: - # Because of signal coalescing, we must keep calling waitpid() as - # long as we're able to reap a child. - while True: - try: - pid, status = os.waitpid(-1, os.WNOHANG) - except ChildProcessError: - break # No more child processes exist. - if pid == 0: - break # All remaining child processes are still alive. - elif os.WIFSIGNALED(status): - # A child process died because of a signal. - returncode = -os.WTERMSIG(status) - elif os.WIFEXITED(status): - # A child process exited (e.g. sys.exit()). - returncode = os.WEXITSTATUS(status) - else: - # A child exited, but we don't understand its status. - # This shouldn't happen, but if it does, let's just - # return that status; perhaps that helps debug it. - returncode = status - transp = self._subprocesses.get(pid) - if transp is not None: - transp._process_exited(returncode) - except Exception: - logger.exception('Unknown exception in SIGCHLD handler') - - def _subprocess_closed(self, transport): - pid = transport.get_pid() - self._subprocesses.pop(pid, None) + def _subprocess_closed(self, transp): + pass def _set_nonblocking(fd): @@ -423,3 +391,335 @@ class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): if stdin_w is not None: stdin.close() self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize) + + +class AbstractChildWatcher: + """Abstract base class for monitoring child processes. + + Objects derived from this class monitor a collection of subprocesses and + report their termination or interruption by a signal. + + New callbacks are registered with .add_child_handler(). Starting a new + process must be done within a 'with' block to allow the watcher to suspend + its activity until the new process if fully registered (this is needed to + prevent a race condition in some implementations). + + Example: + with watcher: + proc = subprocess.Popen("sleep 1") + watcher.add_child_handler(proc.pid, callback) + + Notes: + Implementations of this class must be thread-safe. + + Since child watcher objects may catch the SIGCHLD signal and call + waitpid(-1), there should be only one active object per process. + """ + + def add_child_handler(self, pid, callback, *args): + """Register a new child handler. + + Arrange for callback(pid, returncode, *args) to be called when + process 'pid' terminates. Specifying another callback for the same + process replaces the previous handler. + + Note: callback() must be thread-safe + """ + raise NotImplementedError() + + def remove_child_handler(self, pid): + """Removes the handler for process 'pid'. + + The function returns True if the handler was successfully removed, + False if there was nothing to remove.""" + + raise NotImplementedError() + + def set_loop(self, loop): + """Reattach the watcher to another event loop. + + Note: loop may be None + """ + raise NotImplementedError() + + def close(self): + """Close the watcher. + + This must be called to make sure that any underlying resource is freed. + """ + raise NotImplementedError() + + def __enter__(self): + """Enter the watcher's context and allow starting new processes + + This function must return self""" + raise NotImplementedError() + + def __exit__(self, a, b, c): + """Exit the watcher's context""" + raise NotImplementedError() + + +class BaseChildWatcher(AbstractChildWatcher): + + def __init__(self, loop): + self._loop = None + self._callbacks = {} + + self.set_loop(loop) + + def close(self): + self.set_loop(None) + self._callbacks.clear() + + def _do_waitpid(self, expected_pid): + raise NotImplementedError() + + def _do_waitpid_all(self): + raise NotImplementedError() + + def set_loop(self, loop): + assert loop is None or isinstance(loop, events.AbstractEventLoop) + + if self._loop is not None: + self._loop.remove_signal_handler(signal.SIGCHLD) + + self._loop = loop + if loop is not None: + loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) + + # Prevent a race condition in case a child terminated + # during the switch. + self._do_waitpid_all() + + def remove_child_handler(self, pid): + try: + del self._callbacks[pid] + return True + except KeyError: + return False + + def _sig_chld(self): + try: + self._do_waitpid_all() + except Exception: + logger.exception('Unknown exception in SIGCHLD handler') + + def _compute_returncode(self, status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + + +class SafeChildWatcher(BaseChildWatcher): + """'Safe' child watcher implementation. + + This implementation avoids disrupting other code spawning processes by + polling explicitly each process in the SIGCHLD handler instead of calling + os.waitpid(-1). + + This is a safe solution but it has a significant overhead when handling a + big number of children (O(n) each time SIGCHLD is raised) + """ + + def __enter__(self): + return self + + def __exit__(self, a, b, c): + pass + + def add_child_handler(self, pid, callback, *args): + self._callbacks[pid] = callback, args + + # Prevent a race condition in case the child is already terminated. + self._do_waitpid(pid) + + def _do_waitpid_all(self): + + for pid in list(self._callbacks): + self._do_waitpid(pid) + + def _do_waitpid(self, expected_pid): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, os.WNOHANG) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + else: + if pid == 0: + # The child process is still alive. + return + + returncode = self._compute_returncode(status) + + try: + callback, args = self._callbacks.pop(pid) + except KeyError: # pragma: no cover + # May happen if .remove_child_handler() is called + # after os.waitpid() returns. + pass + else: + callback(pid, returncode, *args) + + +class FastChildWatcher(BaseChildWatcher): + """'Fast' child watcher implementation. + + This implementation reaps every terminated processes by calling + os.waitpid(-1) directly, possibly breaking other code spawning processes + and waiting for their termination. + + There is no noticeable overhead when handling a big number of children + (O(1) each time a child terminates). + """ + def __init__(self, loop): + super().__init__(loop) + + self._lock = threading.Lock() + self._zombies = {} + self._forks = 0 + + def close(self): + super().close() + self._zombies.clear() + + def __enter__(self): + with self._lock: + self._forks += 1 + + return self + + def __exit__(self, a, b, c): + with self._lock: + self._forks -= 1 + + if self._forks or not self._zombies: + return + + collateral_victims = str(self._zombies) + self._zombies.clear() + + logger.warning( + "Caught subprocesses termination from unknown pids: %s", + collateral_victims) + + def add_child_handler(self, pid, callback, *args): + assert self._forks, "Must use the context manager" + + self._callbacks[pid] = callback, args + + try: + # Ensure that the child is not already terminated. + # (raise KeyError if still alive) + returncode = self._zombies.pop(pid) + + # Child is dead, therefore we can fire the callback immediately. + # First we remove it from the dict. + # (raise KeyError if .remove_child_handler() was called in-between) + del self._callbacks[pid] + except KeyError: + pass + else: + callback(pid, returncode, *args) + + def _do_waitpid_all(self): + # Because of signal coalescing, we must keep calling waitpid() as + # long as we're able to reap a child. + while True: + try: + pid, status = os.waitpid(-1, os.WNOHANG) + except ChildProcessError: + # No more child processes exist. + return + else: + if pid == 0: + # A child process is still alive. + return + + returncode = self._compute_returncode(status) + + try: + callback, args = self._callbacks.pop(pid) + except KeyError: + # unknown child + with self._lock: + if self._forks: + # It may not be registered yet. + self._zombies[pid] = returncode + continue + + logger.warning( + "Caught subprocess termination from unknown pid: " + "%d -> %d", pid, returncode) + else: + callback(pid, returncode, *args) + + +class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): + """XXX""" + _loop_factory = _UnixSelectorEventLoop + + def __init__(self): + super().__init__() + self._watcher = None + + def _init_watcher(self): + with events._lock: + if self._watcher is None: # pragma: no branch + if isinstance(threading.current_thread(), + threading._MainThread): + self._watcher = SafeChildWatcher(self._local._loop) + else: + self._watcher = SafeChildWatcher(None) + + def set_event_loop(self, loop): + """Set the event loop. + + As a side effect, if a child watcher was set before, then calling + .set_event_loop() from the main thread will call .set_loop(loop) on the + child watcher. + """ + + super().set_event_loop(loop) + + if self._watcher is not None and \ + isinstance(threading.current_thread(), threading._MainThread): + self._watcher.set_loop(loop) + + def get_child_watcher(self): + """Get the child watcher + + If not yet set, a SafeChildWatcher object is automatically created. + """ + if self._watcher is None: + self._init_watcher() + + return self._watcher + + def set_child_watcher(self, watcher): + """Set the child watcher""" + + assert watcher is None or isinstance(watcher, AbstractChildWatcher) + + if self._watcher is not None: + self._watcher.close() + + self._watcher = watcher + +SelectorEventLoop = _UnixSelectorEventLoop +DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index d7444bd..64fe386 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -7,6 +7,7 @@ import weakref import struct import _winapi +from . import events from . import base_subprocess from . import futures from . import proactor_events @@ -17,7 +18,9 @@ from .log import logger from . import _overlapped -__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor'] +__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', + 'DefaultEventLoopPolicy', + ] NULL = 0 @@ -108,7 +111,7 @@ class PipeServer(object): __del__ = close -class SelectorEventLoop(selector_events.BaseSelectorEventLoop): +class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): """Windows version of selector event loop.""" def _socketpair(self): @@ -453,3 +456,13 @@ class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) f.add_done_callback(callback) + + +SelectorEventLoop = _WindowsSelectorEventLoop + + +class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): + _loop_factory = SelectorEventLoop + + +DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy |