diff options
Diffstat (limited to 'Lib/asyncio/unix_events.py')
-rw-r--r-- | Lib/asyncio/unix_events.py | 157 |
1 files changed, 107 insertions, 50 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 7747ff4..788a5a0 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -177,7 +177,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: - waiter = futures.Future(loop=self) + waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, waiter=waiter, extra=extra, @@ -234,6 +234,11 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): else: if sock is None: raise ValueError('no path and sock were specified') + if (sock.family != socket.AF_UNIX or + not base_events._is_stream_socket(sock)): + raise ValueError( + 'A UNIX Domain Stream Socket was expected, got {!r}' + .format(sock)) sock.setblocking(False) transport, protocol = yield from self._create_connection_transport( @@ -253,6 +258,17 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # Check for abstract socket. `str` and `bytes` paths are supported. + if path[0] not in (0, '\x00'): + try: + if stat.S_ISSOCK(os.stat(path).st_mode): + os.remove(path) + except FileNotFoundError: + pass + except OSError as err: + # Directory may have permissions only to create socket. + logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err) + try: sock.bind(path) except OSError as exc: @@ -272,9 +288,11 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): raise ValueError( 'path was not specified, and no sock specified') - if sock.family != socket.AF_UNIX: + if (sock.family != socket.AF_UNIX or + not base_events._is_stream_socket(sock)): raise ValueError( - 'A UNIX Domain Socket was expected, got {!r}'.format(sock)) + 'A UNIX Domain Stream Socket was expected, got {!r}' + .format(sock)) server = base_events.Server(self, [sock]) sock.listen(backlog) @@ -305,17 +323,23 @@ class _UnixReadPipeTransport(transports.ReadTransport): self._loop = loop self._pipe = pipe self._fileno = pipe.fileno() + self._protocol = protocol + self._closing = False + mode = os.fstat(self._fileno).st_mode if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)): + self._pipe = None + self._fileno = None + self._protocol = None raise ValueError("Pipe transport is for pipes/sockets only.") + _set_nonblocking(self._fileno) - self._protocol = protocol - self._closing = False + self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, + self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called @@ -329,14 +353,17 @@ class _UnixReadPipeTransport(transports.ReadTransport): elif self._closing: info.append('closing') info.append('fd=%s' % self._fileno) - if self._pipe is not None: + selector = getattr(self._loop, '_selector', None) + if self._pipe is not None and selector is not None: polling = selector_events._test_selector_event( - self._loop._selector, + selector, self._fileno, selectors.EVENT_READ) if polling: info.append('polling') else: info.append('idle') + elif self._pipe is not None: + info.append('open') else: info.append('closed') return '<%s>' % ' '.join(info) @@ -355,15 +382,21 @@ class _UnixReadPipeTransport(transports.ReadTransport): if self._loop.get_debug(): logger.info("%r was closed by peer", self) self._closing = True - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._loop.call_soon(self._protocol.eof_received) self._loop.call_soon(self._call_connection_lost, None) def pause_reading(self): - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) def resume_reading(self): - self._loop.add_reader(self._fileno, self._read_ready) + self._loop._add_reader(self._fileno, self._read_ready) + + def set_protocol(self, protocol): + self._protocol = protocol + + def get_protocol(self): + return self._protocol def is_closing(self): return self._closing @@ -397,7 +430,7 @@ class _UnixReadPipeTransport(transports.ReadTransport): def _close(self, exc): self._closing = True - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, exc) def _call_connection_lost(self, exc): @@ -418,27 +451,31 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, self._extra['pipe'] = pipe self._pipe = pipe self._fileno = pipe.fileno() + self._protocol = protocol + self._buffer = bytearray() + self._conn_lost = 0 + self._closing = False # Set when close() or write_eof() called. + mode = os.fstat(self._fileno).st_mode + is_char = stat.S_ISCHR(mode) + is_fifo = stat.S_ISFIFO(mode) is_socket = stat.S_ISSOCK(mode) - if not (is_socket or - stat.S_ISFIFO(mode) or - stat.S_ISCHR(mode)): + if not (is_char or is_fifo or is_socket): + self._pipe = None + self._fileno = None + self._protocol = None raise ValueError("Pipe transport is only for " "pipes, sockets and character devices") - _set_nonblocking(self._fileno) - self._protocol = protocol - self._buffer = [] - self._conn_lost = 0 - self._closing = False # Set when close() or write_eof() called. + _set_nonblocking(self._fileno) self._loop.call_soon(self._protocol.connection_made, self) # On AIX, the reader trick (to be notified when the read end of the # socket is closed) only works for sockets. On other platforms it # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) - if is_socket or not sys.platform.startswith("aix"): + if is_socket or (is_fifo and not sys.platform.startswith("aix")): # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, + self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready) if waiter is not None: @@ -453,9 +490,10 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, elif self._closing: info.append('closing') info.append('fd=%s' % self._fileno) - if self._pipe is not None: + selector = getattr(self._loop, '_selector', None) + if self._pipe is not None and selector is not None: polling = selector_events._test_selector_event( - self._loop._selector, + selector, self._fileno, selectors.EVENT_WRITE) if polling: info.append('polling') @@ -464,12 +502,14 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, bufsize = self.get_write_buffer_size() info.append('bufsize=%s' % bufsize) + elif self._pipe is not None: + info.append('open') else: info.append('closed') return '<%s>' % ' '.join(info) def get_write_buffer_size(self): - return sum(len(data) for data in self._buffer) + return len(self._buffer) def _read_ready(self): # Pipe was closed by peer. @@ -507,39 +547,37 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, if n == len(data): return elif n > 0: - data = data[n:] - self._loop.add_writer(self._fileno, self._write_ready) + data = memoryview(data)[n:] + self._loop._add_writer(self._fileno, self._write_ready) - self._buffer.append(data) + self._buffer += data self._maybe_pause_protocol() def _write_ready(self): - data = b''.join(self._buffer) - assert data, 'Data should not be empty' + assert self._buffer, 'Data should not be empty' - self._buffer.clear() try: - n = os.write(self._fileno, data) + n = os.write(self._fileno, self._buffer) except (BlockingIOError, InterruptedError): - self._buffer.append(data) + pass except Exception as exc: + self._buffer.clear() self._conn_lost += 1 # Remove writer here, _fatal_error() doesn't it # because _buffer is empty. - self._loop.remove_writer(self._fileno) + self._loop._remove_writer(self._fileno) self._fatal_error(exc, 'Fatal write error on pipe transport') else: - if n == len(data): - self._loop.remove_writer(self._fileno) + if n == len(self._buffer): + self._buffer.clear() + self._loop._remove_writer(self._fileno) self._maybe_resume_protocol() # May append to buffer. - if not self._buffer and self._closing: - self._loop.remove_reader(self._fileno) + if self._closing: + self._loop._remove_reader(self._fileno) self._call_connection_lost(None) return elif n > 0: - data = data[n:] - - self._buffer.append(data) # Try again later. + del self._buffer[:n] def can_write_eof(self): return True @@ -550,9 +588,15 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, assert self._pipe self._closing = True if not self._buffer: - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, None) + def set_protocol(self, protocol): + self._protocol = protocol + + def get_protocol(self): + return self._protocol + def is_closing(self): return self._closing @@ -575,7 +619,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, def _fatal_error(self, exc, message='Fatal error on pipe transport'): # should be called by exception handler only - if isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if isinstance(exc, base_events._FATAL_ERROR_IGNORE): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: @@ -590,9 +634,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, def _close(self, exc=None): self._closing = True if self._buffer: - self._loop.remove_writer(self._fileno) + self._loop._remove_writer(self._fileno) self._buffer.clear() - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, exc) def _call_connection_lost(self, exc): @@ -720,6 +764,7 @@ class BaseChildWatcher(AbstractChildWatcher): def __init__(self): self._loop = None + self._callbacks = {} def close(self): self.attach_loop(None) @@ -733,6 +778,12 @@ class BaseChildWatcher(AbstractChildWatcher): def attach_loop(self, loop): assert loop is None or isinstance(loop, events.AbstractEventLoop) + if self._loop is not None and loop is None and self._callbacks: + warnings.warn( + 'A loop is being detached ' + 'from a child watcher with pending handlers', + RuntimeWarning) + if self._loop is not None: self._loop.remove_signal_handler(signal.SIGCHLD) @@ -781,10 +832,6 @@ class SafeChildWatcher(BaseChildWatcher): big number of children (O(n) each time SIGCHLD is raised) """ - def __init__(self): - super().__init__() - self._callbacks = {} - def close(self): self._callbacks.clear() super().close() @@ -796,6 +843,11 @@ class SafeChildWatcher(BaseChildWatcher): pass def add_child_handler(self, pid, callback, *args): + if self._loop is None: + raise RuntimeError( + "Cannot add child handler, " + "the child watcher does not have a loop attached") + self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. @@ -860,7 +912,6 @@ class FastChildWatcher(BaseChildWatcher): """ def __init__(self): super().__init__() - self._callbacks = {} self._lock = threading.Lock() self._zombies = {} self._forks = 0 @@ -892,6 +943,12 @@ class FastChildWatcher(BaseChildWatcher): def add_child_handler(self, pid, callback, *args): assert self._forks, "Must use the context manager" + + if self._loop is None: + raise RuntimeError( + "Cannot add child handler, " + "the child watcher does not have a loop attached") + with self._lock: try: returncode = self._zombies.pop(pid) |