diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-10-17 20:40:50 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-10-17 20:40:50 (GMT) |
commit | 27b7c7ebf1039e96cac41b6330cf16b5632d9e49 (patch) | |
tree | 814505b0f9d02a5cabdec733dcde70250b04ee28 /Lib/asyncio/unix_events.py | |
parent | 5b37f97ea5ac9f6b33b0e0269c69539cbb478142 (diff) | |
download | cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.zip cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.gz cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.bz2 |
Initial checkin of asyncio package (== Tulip, == PEP 3156).
Diffstat (limited to 'Lib/asyncio/unix_events.py')
-rw-r--r-- | Lib/asyncio/unix_events.py | 541 |
1 files changed, 541 insertions, 0 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py new file mode 100644 index 0000000..a3a8e11 --- /dev/null +++ b/Lib/asyncio/unix_events.py @@ -0,0 +1,541 @@ +"""Selector eventloop for Unix with signal handling.""" + +import collections +import errno +import fcntl +import functools +import os +import signal +import socket +import stat +import subprocess +import sys + + +from . import constants +from . import events +from . import protocols +from . import selector_events +from . import tasks +from . import transports +from .log import asyncio_log + + +__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR'] + +STDIN = 0 +STDOUT = 1 +STDERR = 2 + + +if sys.platform == 'win32': # pragma: no cover + raise ImportError('Signals are not really supported on Windows') + + +class SelectorEventLoop(selector_events.BaseSelectorEventLoop): + """Unix event loop + + Adds signal handling to SelectorEventLoop + """ + + def __init__(self, selector=None): + super().__init__(selector) + self._signal_handlers = {} + self._subprocesses = {} + + def _socketpair(self): + return socket.socketpair() + + def close(self): + handler = self._signal_handlers.get(signal.SIGCHLD) + if handler is not None: + self.remove_signal_handler(signal.SIGCHLD) + super().close() + + def add_signal_handler(self, sig, callback, *args): + """Add a handler for a signal. UNIX only. + + Raise ValueError if the signal number is invalid or uncatchable. + Raise RuntimeError if there is a problem setting up the handler. + """ + self._check_signal(sig) + try: + # set_wakeup_fd() raises ValueError if this is not the + # main thread. By calling it early we ensure that an + # event loop running in another thread cannot add a signal + # handler. + signal.set_wakeup_fd(self._csock.fileno()) + except ValueError as exc: + raise RuntimeError(str(exc)) + + handle = events.make_handle(callback, args) + self._signal_handlers[sig] = handle + + try: + signal.signal(sig, self._handle_signal) + except OSError as exc: + del self._signal_handlers[sig] + if not self._signal_handlers: + try: + signal.set_wakeup_fd(-1) + except ValueError as nexc: + asyncio_log.info('set_wakeup_fd(-1) failed: %s', nexc) + + if exc.errno == errno.EINVAL: + raise RuntimeError('sig {} cannot be caught'.format(sig)) + else: + raise + + def _handle_signal(self, sig, arg): + """Internal helper that is the actual signal handler.""" + handle = self._signal_handlers.get(sig) + if handle is None: + return # Assume it's some race condition. + if handle._cancelled: + self.remove_signal_handler(sig) # Remove it properly. + else: + self._add_callback_signalsafe(handle) + + def remove_signal_handler(self, sig): + """Remove a handler for a signal. UNIX only. + + Return True if a signal handler was removed, False if not. + """ + self._check_signal(sig) + try: + del self._signal_handlers[sig] + except KeyError: + return False + + if sig == signal.SIGINT: + handler = signal.default_int_handler + else: + handler = signal.SIG_DFL + + try: + signal.signal(sig, handler) + except OSError as exc: + if exc.errno == errno.EINVAL: + raise RuntimeError('sig {} cannot be caught'.format(sig)) + else: + raise + + if not self._signal_handlers: + try: + signal.set_wakeup_fd(-1) + except ValueError as exc: + asyncio_log.info('set_wakeup_fd(-1) failed: %s', exc) + + return True + + def _check_signal(self, sig): + """Internal helper to validate a signal. + + Raise ValueError if the signal number is invalid or uncatchable. + Raise RuntimeError if there is a problem setting up the handler. + """ + if not isinstance(sig, int): + raise TypeError('sig must be an int, not {!r}'.format(sig)) + + if not (1 <= sig < signal.NSIG): + raise ValueError( + 'sig {} out of range(1, {})'.format(sig, signal.NSIG)) + + def _make_read_pipe_transport(self, pipe, protocol, waiter=None, + extra=None): + return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) + + def _make_write_pipe_transport(self, pipe, protocol, waiter=None, + extra=None): + return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) + + @tasks.coroutine + def _make_subprocess_transport(self, protocol, args, shell, + stdin, stdout, stderr, bufsize, + extra=None, **kwargs): + self._reg_sigchld() + transp = _UnixSubprocessTransport(self, protocol, args, shell, + stdin, stdout, stderr, bufsize, + extra=None, **kwargs) + self._subprocesses[transp.get_pid()] = transp + yield from transp._post_init() + return transp + + def _reg_sigchld(self): + if signal.SIGCHLD not in self._signal_handlers: + self.add_signal_handler(signal.SIGCHLD, self._sig_chld) + + def _sig_chld(self): + try: + try: + pid, status = os.waitpid(0, os.WNOHANG) + except ChildProcessError: + return + if pid == 0: + self.call_soon(self._sig_chld) + return + elif os.WIFSIGNALED(status): + returncode = -os.WTERMSIG(status) + elif os.WIFEXITED(status): + returncode = os.WEXITSTATUS(status) + else: + self.call_soon(self._sig_chld) + return + transp = self._subprocesses.get(pid) + if transp is not None: + transp._process_exited(returncode) + except Exception: + asyncio_log.exception('Unknown exception in SIGCHLD handler') + + def _subprocess_closed(self, transport): + pid = transport.get_pid() + self._subprocesses.pop(pid, None) + + +def _set_nonblocking(fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + + +class _UnixReadPipeTransport(transports.ReadTransport): + + max_size = 256 * 1024 # max bytes we read in one eventloop iteration + + def __init__(self, loop, pipe, protocol, waiter=None, extra=None): + super().__init__(extra) + self._extra['pipe'] = pipe + self._loop = loop + self._pipe = pipe + self._fileno = pipe.fileno() + _set_nonblocking(self._fileno) + self._protocol = protocol + self._closing = False + self._loop.add_reader(self._fileno, self._read_ready) + self._loop.call_soon(self._protocol.connection_made, self) + if waiter is not None: + self._loop.call_soon(waiter.set_result, None) + + def _read_ready(self): + try: + data = os.read(self._fileno, self.max_size) + except (BlockingIOError, InterruptedError): + pass + except OSError as exc: + self._fatal_error(exc) + else: + if data: + self._protocol.data_received(data) + else: + self._closing = True + self._loop.remove_reader(self._fileno) + self._loop.call_soon(self._protocol.eof_received) + self._loop.call_soon(self._call_connection_lost, None) + + def pause(self): + self._loop.remove_reader(self._fileno) + + def resume(self): + self._loop.add_reader(self._fileno, self._read_ready) + + def close(self): + if not self._closing: + self._close(None) + + def _fatal_error(self, exc): + # should be called by exception handler only + asyncio_log.exception('Fatal error for %s', self) + self._close(exc) + + def _close(self, exc): + self._closing = True + self._loop.remove_reader(self._fileno) + self._loop.call_soon(self._call_connection_lost, exc) + + def _call_connection_lost(self, exc): + try: + self._protocol.connection_lost(exc) + finally: + self._pipe.close() + self._pipe = None + self._protocol = None + self._loop = None + + +class _UnixWritePipeTransport(transports.WriteTransport): + + def __init__(self, loop, pipe, protocol, waiter=None, extra=None): + super().__init__(extra) + self._extra['pipe'] = pipe + self._loop = loop + self._pipe = pipe + self._fileno = pipe.fileno() + if not stat.S_ISFIFO(os.fstat(self._fileno).st_mode): + raise ValueError("Pipe transport is for pipes only.") + _set_nonblocking(self._fileno) + self._protocol = protocol + self._buffer = [] + self._conn_lost = 0 + self._closing = False # Set when close() or write_eof() called. + self._loop.add_reader(self._fileno, self._read_ready) + + self._loop.call_soon(self._protocol.connection_made, self) + if waiter is not None: + self._loop.call_soon(waiter.set_result, None) + + def _read_ready(self): + # pipe was closed by peer + self._close() + + def write(self, data): + assert isinstance(data, bytes), repr(data) + if not data: + return + + if self._conn_lost or self._closing: + if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: + asyncio_log.warning('pipe closed by peer or ' + 'os.write(pipe, data) raised exception.') + self._conn_lost += 1 + return + + if not self._buffer: + # Attempt to send it right away first. + try: + n = os.write(self._fileno, data) + except (BlockingIOError, InterruptedError): + n = 0 + except Exception as exc: + self._conn_lost += 1 + self._fatal_error(exc) + return + if n == len(data): + return + elif n > 0: + data = data[n:] + self._loop.add_writer(self._fileno, self._write_ready) + + self._buffer.append(data) + + def _write_ready(self): + data = b''.join(self._buffer) + assert data, 'Data should not be empty' + + self._buffer.clear() + try: + n = os.write(self._fileno, data) + except (BlockingIOError, InterruptedError): + self._buffer.append(data) + except Exception as exc: + self._conn_lost += 1 + # Remove writer here, _fatal_error() doesn't it + # because _buffer is empty. + self._loop.remove_writer(self._fileno) + self._fatal_error(exc) + else: + if n == len(data): + self._loop.remove_writer(self._fileno) + if self._closing: + self._loop.remove_reader(self._fileno) + self._call_connection_lost(None) + return + elif n > 0: + data = data[n:] + + self._buffer.append(data) # Try again later. + + def can_write_eof(self): + return True + + # TODO: Make the relationships between write_eof(), close(), + # abort(), _fatal_error() and _close() more straightforward. + + def write_eof(self): + if self._closing: + return + assert self._pipe + self._closing = True + if not self._buffer: + self._loop.remove_reader(self._fileno) + self._loop.call_soon(self._call_connection_lost, None) + + def close(self): + if not self._closing: + # write_eof is all what we needed to close the write pipe + self.write_eof() + + def abort(self): + self._close(None) + + def _fatal_error(self, exc): + # should be called by exception handler only + asyncio_log.exception('Fatal error for %s', self) + self._close(exc) + + def _close(self, exc=None): + self._closing = True + if self._buffer: + self._loop.remove_writer(self._fileno) + self._buffer.clear() + self._loop.remove_reader(self._fileno) + self._loop.call_soon(self._call_connection_lost, exc) + + def _call_connection_lost(self, exc): + try: + self._protocol.connection_lost(exc) + finally: + self._pipe.close() + self._pipe = None + self._protocol = None + self._loop = None + + +class _UnixWriteSubprocessPipeProto(protocols.BaseProtocol): + pipe = None + + def __init__(self, proc, fd): + self.proc = proc + self.fd = fd + self.connected = False + self.disconnected = False + proc._pipes[fd] = self + + def connection_made(self, transport): + self.connected = True + self.pipe = transport + self.proc._try_connected() + + def connection_lost(self, exc): + self.disconnected = True + self.proc._pipe_connection_lost(self.fd, exc) + + +class _UnixReadSubprocessPipeProto(_UnixWriteSubprocessPipeProto, + protocols.Protocol): + + def data_received(self, data): + self.proc._pipe_data_received(self.fd, data) + + def eof_received(self): + pass + + +class _UnixSubprocessTransport(transports.SubprocessTransport): + + def __init__(self, loop, protocol, args, shell, + stdin, stdout, stderr, bufsize, + extra=None, **kwargs): + super().__init__(extra) + self._protocol = protocol + self._loop = loop + + self._pipes = {} + if stdin == subprocess.PIPE: + self._pipes[STDIN] = None + if stdout == subprocess.PIPE: + self._pipes[STDOUT] = None + if stderr == subprocess.PIPE: + self._pipes[STDERR] = None + self._pending_calls = collections.deque() + self._finished = False + self._returncode = None + + self._proc = subprocess.Popen( + args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, + universal_newlines=False, bufsize=bufsize, **kwargs) + self._extra['subprocess'] = self._proc + + def close(self): + for proto in self._pipes.values(): + proto.pipe.close() + if self._returncode is None: + self.terminate() + + def get_pid(self): + return self._proc.pid + + def get_returncode(self): + return self._returncode + + def get_pipe_transport(self, fd): + if fd in self._pipes: + return self._pipes[fd].pipe + else: + return None + + def send_signal(self, signal): + self._proc.send_signal(signal) + + def terminate(self): + self._proc.terminate() + + def kill(self): + self._proc.kill() + + @tasks.coroutine + def _post_init(self): + proc = self._proc + loop = self._loop + if proc.stdin is not None: + transp, proto = yield from loop.connect_write_pipe( + functools.partial( + _UnixWriteSubprocessPipeProto, self, STDIN), + proc.stdin) + if proc.stdout is not None: + transp, proto = yield from loop.connect_read_pipe( + functools.partial( + _UnixReadSubprocessPipeProto, self, STDOUT), + proc.stdout) + if proc.stderr is not None: + transp, proto = yield from loop.connect_read_pipe( + functools.partial( + _UnixReadSubprocessPipeProto, self, STDERR), + proc.stderr) + if not self._pipes: + self._try_connected() + + def _call(self, cb, *data): + if self._pending_calls is not None: + self._pending_calls.append((cb, data)) + else: + self._loop.call_soon(cb, *data) + + def _try_connected(self): + assert self._pending_calls is not None + if all(p is not None and p.connected for p in self._pipes.values()): + self._loop.call_soon(self._protocol.connection_made, self) + for callback, data in self._pending_calls: + self._loop.call_soon(callback, *data) + self._pending_calls = None + + def _pipe_connection_lost(self, fd, exc): + self._call(self._protocol.pipe_connection_lost, fd, exc) + self._try_finish() + + def _pipe_data_received(self, fd, data): + self._call(self._protocol.pipe_data_received, fd, data) + + def _process_exited(self, returncode): + assert returncode is not None, returncode + assert self._returncode is None, self._returncode + self._returncode = returncode + self._loop._subprocess_closed(self) + self._call(self._protocol.process_exited) + self._try_finish() + + def _try_finish(self): + assert not self._finished + if self._returncode is None: + return + if all(p is not None and p.disconnected + for p in self._pipes.values()): + self._finished = True + self._loop.call_soon(self._call_connection_lost, None) + + def _call_connection_lost(self, exc): + try: + self._protocol.connection_lost(exc) + finally: + self._proc = None + self._protocol = None + self._loop = None |