From 5b8d4f97f8bbb357d7c8b79007fdff22e2c8d1ae Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Wed, 5 Oct 2016 17:48:59 -0400 Subject: Issue #28369: Raise an error when transport's FD is used with add_reader --- Lib/asyncio/selector_events.py | 127 ++++++++++++++++---------- Lib/asyncio/test_utils.py | 42 ++++++++- Lib/asyncio/unix_events.py | 26 +++--- Lib/test/test_asyncio/test_base_events.py | 24 ++--- Lib/test/test_asyncio/test_selector_events.py | 64 ++++++------- Misc/NEWS | 3 + 6 files changed, 177 insertions(+), 109 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 2f02d76..12d357b 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -11,6 +11,7 @@ import errno import functools import socket import warnings +import weakref try: import ssl except ImportError: # pragma: no cover @@ -64,6 +65,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): logger.debug('Using selector: %s', selector.__class__.__name__) self._selector = selector self._make_self_pipe() + self._transports = weakref.WeakValueDictionary() def _make_socket_transport(self, sock, protocol, waiter=None, *, extra=None, server=None): @@ -115,7 +117,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): raise NotImplementedError def _close_self_pipe(self): - self.remove_reader(self._ssock.fileno()) + self._remove_reader(self._ssock.fileno()) self._ssock.close() self._ssock = None self._csock.close() @@ -128,7 +130,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 - self.add_reader(self._ssock.fileno(), self._read_from_self) + self._add_reader(self._ssock.fileno(), self._read_from_self) def _process_self_data(self, data): pass @@ -163,8 +165,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def _start_serving(self, protocol_factory, sock, sslcontext=None, server=None, backlog=100): - self.add_reader(sock.fileno(), self._accept_connection, - protocol_factory, sock, sslcontext, server, backlog) + self._add_reader(sock.fileno(), self._accept_connection, + protocol_factory, sock, sslcontext, server, backlog) def _accept_connection(self, protocol_factory, sock, sslcontext=None, server=None, backlog=100): @@ -194,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): 'exception': exc, 'socket': sock, }) - self.remove_reader(sock.fileno()) + self._remove_reader(sock.fileno()) self.call_later(constants.ACCEPT_RETRY_DELAY, self._start_serving, protocol_factory, sock, sslcontext, server, @@ -244,8 +246,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): context['transport'] = transport self.call_exception_handler(context) - def add_reader(self, fd, callback, *args): - """Add a reader callback.""" + def _ensure_fd_no_transport(self, fd): + try: + transport = self._transports[fd] + except KeyError: + pass + else: + if not transport.is_closing(): + raise RuntimeError( + 'File descriptor {!r} is used by transport {!r}'.format( + fd, transport)) + + def _add_reader(self, fd, callback, *args): self._check_closed() handle = events.Handle(callback, args, self) try: @@ -260,8 +272,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): if reader is not None: reader.cancel() - def remove_reader(self, fd): - """Remove a reader callback.""" + def _remove_reader(self, fd): if self.is_closed(): return False try: @@ -282,8 +293,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: return False - def add_writer(self, fd, callback, *args): - """Add a writer callback..""" + def _add_writer(self, fd, callback, *args): self._check_closed() handle = events.Handle(callback, args, self) try: @@ -298,7 +308,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): if writer is not None: writer.cancel() - def remove_writer(self, fd): + def _remove_writer(self, fd): """Remove a writer callback.""" if self.is_closed(): return False @@ -321,6 +331,26 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: return False + def add_reader(self, fd, callback, *args): + """Add a reader callback.""" + self._ensure_fd_no_transport(fd) + return self._add_reader(fd, callback, *args) + + def remove_reader(self, fd): + """Remove a reader callback.""" + self._ensure_fd_no_transport(fd) + return self._remove_reader(fd) + + def add_writer(self, fd, callback, *args): + """Add a writer callback..""" + self._ensure_fd_no_transport(fd) + return self._add_writer(fd, callback, *args) + + def remove_writer(self, fd): + """Remove a writer callback.""" + self._ensure_fd_no_transport(fd) + return self._remove_writer(fd) + def sock_recv(self, sock, n): """Receive data from the socket. @@ -494,17 +524,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None: if reader._cancelled: - self.remove_reader(fileobj) + self._remove_reader(fileobj) else: self._add_callback(reader) if mask & selectors.EVENT_WRITE and writer is not None: if writer._cancelled: - self.remove_writer(fileobj) + self._remove_writer(fileobj) else: self._add_callback(writer) def _stop_serving(self, sock): - self.remove_reader(sock.fileno()) + self._remove_reader(sock.fileno()) sock.close() @@ -539,6 +569,7 @@ class _SelectorTransport(transports._FlowControlMixin, self._closing = False # Set when close() called. if self._server is not None: self._server._attach() + loop._transports[self._sock_fd] = self def __repr__(self): info = [self.__class__.__name__] @@ -584,10 +615,10 @@ class _SelectorTransport(transports._FlowControlMixin, if self._closing: return self._closing = True - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) if not self._buffer: self._conn_lost += 1 - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._loop.call_soon(self._call_connection_lost, None) # On Python 3.3 and older, objects with a destructor part of a reference @@ -618,10 +649,10 @@ class _SelectorTransport(transports._FlowControlMixin, return if self._buffer: self._buffer.clear() - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) if not self._closing: self._closing = True - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) self._conn_lost += 1 self._loop.call_soon(self._call_connection_lost, exc) @@ -658,7 +689,7 @@ class _SelectorSocketTransport(_SelectorTransport): self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, + self._loop.call_soon(self._loop._add_reader, self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called @@ -671,7 +702,7 @@ class _SelectorSocketTransport(_SelectorTransport): if self._paused: raise RuntimeError('Already paused') self._paused = True - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) if self._loop.get_debug(): logger.debug("%r pauses reading", self) @@ -681,7 +712,7 @@ class _SelectorSocketTransport(_SelectorTransport): self._paused = False if self._closing: return - self._loop.add_reader(self._sock_fd, self._read_ready) + self._loop._add_reader(self._sock_fd, self._read_ready) if self._loop.get_debug(): logger.debug("%r resumes reading", self) @@ -705,7 +736,7 @@ class _SelectorSocketTransport(_SelectorTransport): # We're keeping the connection open so the # protocol can write more, but we still can't # receive more, so remove the reader callback. - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) else: self.close() @@ -738,7 +769,7 @@ class _SelectorSocketTransport(_SelectorTransport): if not data: return # Not all was written; register write handler. - self._loop.add_writer(self._sock_fd, self._write_ready) + self._loop._add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. self._buffer.extend(data) @@ -754,7 +785,7 @@ class _SelectorSocketTransport(_SelectorTransport): except (BlockingIOError, InterruptedError): pass except Exception as exc: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on socket transport') else: @@ -762,7 +793,7 @@ class _SelectorSocketTransport(_SelectorTransport): del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. if not self._buffer: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) elif self._eof: @@ -833,19 +864,19 @@ class _SelectorSslTransport(_SelectorTransport): try: self._sock.do_handshake() except ssl.SSLWantReadError: - self._loop.add_reader(self._sock_fd, - self._on_handshake, start_time) + self._loop._add_reader(self._sock_fd, + self._on_handshake, start_time) return except ssl.SSLWantWriteError: - self._loop.add_writer(self._sock_fd, - self._on_handshake, start_time) + self._loop._add_writer(self._sock_fd, + self._on_handshake, start_time) return except BaseException as exc: if self._loop.get_debug(): logger.warning("%r: SSL handshake failed", self, exc_info=True) - self._loop.remove_reader(self._sock_fd) - self._loop.remove_writer(self._sock_fd) + self._loop._remove_reader(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._sock.close() self._wakeup_waiter(exc) if isinstance(exc, Exception): @@ -853,8 +884,8 @@ class _SelectorSslTransport(_SelectorTransport): else: raise - self._loop.remove_reader(self._sock_fd) - self._loop.remove_writer(self._sock_fd) + self._loop._remove_reader(self._sock_fd) + self._loop._remove_writer(self._sock_fd) peercert = self._sock.getpeercert() if not hasattr(self._sslcontext, 'check_hostname'): @@ -882,7 +913,7 @@ class _SelectorSslTransport(_SelectorTransport): self._read_wants_write = False self._write_wants_read = False - self._loop.add_reader(self._sock_fd, self._read_ready) + self._loop._add_reader(self._sock_fd, self._read_ready) self._protocol_connected = True self._loop.call_soon(self._protocol.connection_made, self) # only wake up the waiter when connection_made() has been called @@ -904,7 +935,7 @@ class _SelectorSslTransport(_SelectorTransport): if self._paused: raise RuntimeError('Already paused') self._paused = True - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) if self._loop.get_debug(): logger.debug("%r pauses reading", self) @@ -914,7 +945,7 @@ class _SelectorSslTransport(_SelectorTransport): self._paused = False if self._closing: return - self._loop.add_reader(self._sock_fd, self._read_ready) + self._loop._add_reader(self._sock_fd, self._read_ready) if self._loop.get_debug(): logger.debug("%r resumes reading", self) @@ -926,7 +957,7 @@ class _SelectorSslTransport(_SelectorTransport): self._write_ready() if self._buffer: - self._loop.add_writer(self._sock_fd, self._write_ready) + self._loop._add_writer(self._sock_fd, self._write_ready) try: data = self._sock.recv(self.max_size) @@ -934,8 +965,8 @@ class _SelectorSslTransport(_SelectorTransport): pass except ssl.SSLWantWriteError: self._read_wants_write = True - self._loop.remove_reader(self._sock_fd) - self._loop.add_writer(self._sock_fd, self._write_ready) + self._loop._remove_reader(self._sock_fd) + self._loop._add_writer(self._sock_fd, self._write_ready) except Exception as exc: self._fatal_error(exc, 'Fatal read error on SSL transport') else: @@ -960,7 +991,7 @@ class _SelectorSslTransport(_SelectorTransport): self._read_ready() if not (self._paused or self._closing): - self._loop.add_reader(self._sock_fd, self._read_ready) + self._loop._add_reader(self._sock_fd, self._read_ready) if self._buffer: try: @@ -969,10 +1000,10 @@ class _SelectorSslTransport(_SelectorTransport): n = 0 except ssl.SSLWantReadError: n = 0 - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._write_wants_read = True except Exception as exc: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on SSL transport') return @@ -983,7 +1014,7 @@ class _SelectorSslTransport(_SelectorTransport): self._maybe_resume_protocol() # May append to buffer. if not self._buffer: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) @@ -1001,7 +1032,7 @@ class _SelectorSslTransport(_SelectorTransport): return if not self._buffer: - self._loop.add_writer(self._sock_fd, self._write_ready) + self._loop._add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. self._buffer.extend(data) @@ -1021,7 +1052,7 @@ class _SelectorDatagramTransport(_SelectorTransport): self._address = address self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, + self._loop.call_soon(self._loop._add_reader, self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called @@ -1071,7 +1102,7 @@ class _SelectorDatagramTransport(_SelectorTransport): self._sock.sendto(data, addr) return except (BlockingIOError, InterruptedError): - self._loop.add_writer(self._sock_fd, self._sendto_ready) + self._loop._add_writer(self._sock_fd, self._sendto_ready) except OSError as exc: self._protocol.error_received(exc) return @@ -1105,6 +1136,6 @@ class _SelectorDatagramTransport(_SelectorTransport): self._maybe_resume_protocol() # May append to buffer. if not self._buffer: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py index 396e6ae..307fffc 100644 --- a/Lib/asyncio/test_utils.py +++ b/Lib/asyncio/test_utils.py @@ -13,6 +13,8 @@ import tempfile import threading import time import unittest +import weakref + from unittest import mock from http.server import HTTPServer @@ -300,6 +302,8 @@ class TestLoop(base_events.BaseEventLoop): self.writers = {} self.reset_counters() + self._transports = weakref.WeakValueDictionary() + def time(self): return self._time @@ -318,10 +322,10 @@ class TestLoop(base_events.BaseEventLoop): else: # pragma: no cover raise AssertionError("Time generator is not finished") - def add_reader(self, fd, callback, *args): + def _add_reader(self, fd, callback, *args): self.readers[fd] = events.Handle(callback, args, self) - def remove_reader(self, fd): + def _remove_reader(self, fd): self.remove_reader_count[fd] += 1 if fd in self.readers: del self.readers[fd] @@ -337,10 +341,10 @@ class TestLoop(base_events.BaseEventLoop): assert handle._args == args, '{!r} != {!r}'.format( handle._args, args) - def add_writer(self, fd, callback, *args): + def _add_writer(self, fd, callback, *args): self.writers[fd] = events.Handle(callback, args, self) - def remove_writer(self, fd): + def _remove_writer(self, fd): self.remove_writer_count[fd] += 1 if fd in self.writers: del self.writers[fd] @@ -356,6 +360,36 @@ class TestLoop(base_events.BaseEventLoop): assert handle._args == args, '{!r} != {!r}'.format( handle._args, args) + def _ensure_fd_no_transport(self, fd): + try: + transport = self._transports[fd] + except KeyError: + pass + else: + raise RuntimeError( + 'File descriptor {!r} is used by transport {!r}'.format( + fd, transport)) + + def add_reader(self, fd, callback, *args): + """Add a reader callback.""" + self._ensure_fd_no_transport(fd) + return self._add_reader(fd, callback, *args) + + def remove_reader(self, fd): + """Remove a reader callback.""" + self._ensure_fd_no_transport(fd) + return self._remove_reader(fd) + + def add_writer(self, fd, callback, *args): + """Add a writer callback..""" + self._ensure_fd_no_transport(fd) + return self._add_writer(fd, callback, *args) + + def remove_writer(self, fd): + """Remove a writer callback.""" + self._ensure_fd_no_transport(fd) + return self._remove_writer(fd) + def reset_counters(self): self.remove_reader_count = collections.defaultdict(int) self.remove_writer_count = collections.defaultdict(int) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index ac44218..e1f5c52 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -321,7 +321,7 @@ class _UnixReadPipeTransport(transports.ReadTransport): self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, + self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called @@ -364,15 +364,15 @@ class _UnixReadPipeTransport(transports.ReadTransport): if self._loop.get_debug(): logger.info("%r was closed by peer", self) self._closing = True - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._loop.call_soon(self._protocol.eof_received) self._loop.call_soon(self._call_connection_lost, None) def pause_reading(self): - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) def resume_reading(self): - self._loop.add_reader(self._fileno, self._read_ready) + self._loop._add_reader(self._fileno, self._read_ready) def set_protocol(self, protocol): self._protocol = protocol @@ -412,7 +412,7 @@ class _UnixReadPipeTransport(transports.ReadTransport): def _close(self, exc): self._closing = True - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, exc) def _call_connection_lost(self, exc): @@ -457,7 +457,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) if is_socket or (is_fifo and not sys.platform.startswith("aix")): # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, + self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready) if waiter is not None: @@ -530,7 +530,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, return elif n > 0: data = memoryview(data)[n:] - self._loop.add_writer(self._fileno, self._write_ready) + self._loop._add_writer(self._fileno, self._write_ready) self._buffer += data self._maybe_pause_protocol() @@ -547,15 +547,15 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, self._conn_lost += 1 # Remove writer here, _fatal_error() doesn't it # because _buffer is empty. - self._loop.remove_writer(self._fileno) + self._loop._remove_writer(self._fileno) self._fatal_error(exc, 'Fatal write error on pipe transport') else: if n == len(self._buffer): self._buffer.clear() - self._loop.remove_writer(self._fileno) + self._loop._remove_writer(self._fileno) self._maybe_resume_protocol() # May append to buffer. if self._closing: - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._call_connection_lost(None) return elif n > 0: @@ -570,7 +570,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, assert self._pipe self._closing = True if not self._buffer: - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, None) def set_protocol(self, protocol): @@ -616,9 +616,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, def _close(self, exc=None): self._closing = True if self._buffer: - self._loop.remove_writer(self._fileno) + self._loop._remove_writer(self._fileno) self._buffer.clear() - self._loop.remove_reader(self._fileno) + self._loop._remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, exc) def _call_connection_lost(self, exc): diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index e86b74e..362ab06 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1148,10 +1148,10 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): m_socket.getaddrinfo = socket.getaddrinfo sock = m_socket.socket.return_value - self.loop.add_reader = mock.Mock() - self.loop.add_reader._is_coroutine = False - self.loop.add_writer = mock.Mock() - self.loop.add_writer._is_coroutine = False + self.loop._add_reader = mock.Mock() + self.loop._add_reader._is_coroutine = False + self.loop._add_writer = mock.Mock() + self.loop._add_writer._is_coroutine = False coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) t, p = self.loop.run_until_complete(coro) @@ -1194,10 +1194,10 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): m_socket.getaddrinfo = socket.getaddrinfo sock = m_socket.socket.return_value - self.loop.add_reader = mock.Mock() - self.loop.add_reader._is_coroutine = False - self.loop.add_writer = mock.Mock() - self.loop.add_writer._is_coroutine = False + self.loop._add_reader = mock.Mock() + self.loop._add_reader._is_coroutine = False + self.loop._add_writer = mock.Mock() + self.loop._add_writer._is_coroutine = False for service, port in ('http', 80), (b'http', 80): coro = self.loop.create_connection(asyncio.Protocol, @@ -1614,8 +1614,8 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): m_socket.getaddrinfo = getaddrinfo m_socket.socket.return_value.bind = bind = mock.Mock() - self.loop.add_reader = mock.Mock() - self.loop.add_reader._is_coroutine = False + self.loop._add_reader = mock.Mock() + self.loop._add_reader._is_coroutine = False reuseport_supported = hasattr(socket, 'SO_REUSEPORT') coro = self.loop.create_datagram_endpoint( @@ -1646,13 +1646,13 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): sock = mock.Mock() sock.fileno.return_value = 10 sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files') - self.loop.remove_reader = mock.Mock() + self.loop._remove_reader = mock.Mock() self.loop.call_later = mock.Mock() self.loop._accept_connection(MyProto, sock) self.assertTrue(m_log.error.called) self.assertFalse(sock.close.called) - self.loop.remove_reader.assert_called_with(10) + self.loop._remove_reader.assert_called_with(10) self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY, # self.loop._start_serving mock.ANY, diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 8b621bf..07de640 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -72,11 +72,11 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): @unittest.skipIf(ssl is None, 'No ssl module') def test_make_ssl_transport(self): m = mock.Mock() - self.loop.add_reader = mock.Mock() - self.loop.add_reader._is_coroutine = False - self.loop.add_writer = mock.Mock() - self.loop.remove_reader = mock.Mock() - self.loop.remove_writer = mock.Mock() + self.loop._add_reader = mock.Mock() + self.loop._add_reader._is_coroutine = False + self.loop._add_writer = mock.Mock() + self.loop._remove_reader = mock.Mock() + self.loop._remove_writer = mock.Mock() waiter = asyncio.Future(loop=self.loop) with test_utils.disable_logger(): transport = self.loop._make_ssl_transport( @@ -119,7 +119,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): ssock.fileno.return_value = 7 csock = self.loop._csock csock.fileno.return_value = 1 - remove_reader = self.loop.remove_reader = mock.Mock() + remove_reader = self.loop._remove_reader = mock.Mock() self.loop._selector.close() self.loop._selector = selector = mock.Mock() @@ -651,12 +651,12 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): reader = mock.Mock() reader.cancelled = True - self.loop.remove_reader = mock.Mock() + self.loop._remove_reader = mock.Mock() self.loop._process_events( [(selectors.SelectorKey( 1, 1, selectors.EVENT_READ, (reader, None)), selectors.EVENT_READ)]) - self.loop.remove_reader.assert_called_with(1) + self.loop._remove_reader.assert_called_with(1) def test_process_events_write(self): writer = mock.Mock() @@ -672,13 +672,13 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): def test_process_events_write_cancelled(self): writer = mock.Mock() writer.cancelled = True - self.loop.remove_writer = mock.Mock() + self.loop._remove_writer = mock.Mock() self.loop._process_events( [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE, (None, writer)), selectors.EVENT_WRITE)]) - self.loop.remove_writer.assert_called_with(1) + self.loop._remove_writer.assert_called_with(1) def test_accept_connection_multiple(self): sock = mock.Mock() @@ -747,8 +747,8 @@ class SelectorTransportTests(test_utils.TestCase): def test_force_close(self): tr = self.create_transport() tr._buffer.extend(b'1') - self.loop.add_reader(7, mock.sentinel) - self.loop.add_writer(7, mock.sentinel) + self.loop._add_reader(7, mock.sentinel) + self.loop._add_writer(7, mock.sentinel) tr._force_close(None) self.assertTrue(tr.is_closing()) @@ -1037,7 +1037,7 @@ class SelectorSocketTransportTests(test_utils.TestCase): transport = self.socket_transport() transport._buffer.extend(data) - self.loop.add_writer(7, transport._write_ready) + self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.assertTrue(self.sock.send.called) self.assertFalse(self.loop.writers) @@ -1049,7 +1049,7 @@ class SelectorSocketTransportTests(test_utils.TestCase): transport = self.socket_transport() transport._closing = True transport._buffer.extend(data) - self.loop.add_writer(7, transport._write_ready) + self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.assertTrue(self.sock.send.called) self.assertFalse(self.loop.writers) @@ -1067,7 +1067,7 @@ class SelectorSocketTransportTests(test_utils.TestCase): transport = self.socket_transport() transport._buffer.extend(data) - self.loop.add_writer(7, transport._write_ready) + self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) self.assertEqual(list_to_buffer([b'ta']), transport._buffer) @@ -1078,7 +1078,7 @@ class SelectorSocketTransportTests(test_utils.TestCase): transport = self.socket_transport() transport._buffer.extend(data) - self.loop.add_writer(7, transport._write_ready) + self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) self.assertEqual(list_to_buffer([b'data']), transport._buffer) @@ -1088,7 +1088,7 @@ class SelectorSocketTransportTests(test_utils.TestCase): transport = self.socket_transport() transport._buffer = list_to_buffer([b'data1', b'data2']) - self.loop.add_writer(7, transport._write_ready) + self.loop._add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) @@ -1130,7 +1130,7 @@ class SelectorSocketTransportTests(test_utils.TestCase): @mock.patch('asyncio.base_events.logger') def test_transport_close_remove_writer(self, m_log): - remove_writer = self.loop.remove_writer = mock.Mock() + remove_writer = self.loop._remove_writer = mock.Mock() transport = self.socket_transport() transport.close() @@ -1288,7 +1288,7 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertEqual((b'data',), self.protocol.data_received.call_args[0]) def test_read_ready_write_wants_read(self): - self.loop.add_writer = mock.Mock() + self.loop._add_writer = mock.Mock() self.sslsock.recv.side_effect = BlockingIOError transport = self._make_one() transport._write_wants_read = True @@ -1298,7 +1298,7 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertFalse(transport._write_wants_read) transport._write_ready.assert_called_with() - self.loop.add_writer.assert_called_with( + self.loop._add_writer.assert_called_with( transport._sock_fd, transport._write_ready) def test_read_ready_recv_eof(self): @@ -1333,16 +1333,16 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertFalse(self.protocol.data_received.called) def test_read_ready_recv_write(self): - self.loop.remove_reader = mock.Mock() - self.loop.add_writer = mock.Mock() + self.loop._remove_reader = mock.Mock() + self.loop._add_writer = mock.Mock() self.sslsock.recv.side_effect = ssl.SSLWantWriteError transport = self._make_one() transport._read_ready() self.assertFalse(self.protocol.data_received.called) self.assertTrue(transport._read_wants_write) - self.loop.remove_reader.assert_called_with(transport._sock_fd) - self.loop.add_writer.assert_called_with( + self.loop._remove_reader.assert_called_with(transport._sock_fd) + self.loop._add_writer.assert_called_with( transport._sock_fd, transport._write_ready) def test_read_ready_recv_exc(self): @@ -1419,12 +1419,12 @@ class SelectorSslTransportTests(test_utils.TestCase): transport = self._make_one() transport._buffer = list_to_buffer([b'data']) - self.loop.remove_writer = mock.Mock() + self.loop._remove_writer = mock.Mock() self.sslsock.send.side_effect = ssl.SSLWantReadError transport._write_ready() self.assertFalse(self.protocol.data_received.called) self.assertTrue(transport._write_wants_read) - self.loop.remove_writer.assert_called_with(transport._sock_fd) + self.loop._remove_writer.assert_called_with(transport._sock_fd) def test_write_ready_send_exc(self): err = self.sslsock.send.side_effect = OSError() @@ -1439,7 +1439,7 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertEqual(list_to_buffer(), transport._buffer) def test_write_ready_read_wants_write(self): - self.loop.add_reader = mock.Mock() + self.loop._add_reader = mock.Mock() self.sslsock.send.side_effect = BlockingIOError transport = self._make_one() transport._read_wants_write = True @@ -1448,7 +1448,7 @@ class SelectorSslTransportTests(test_utils.TestCase): self.assertFalse(transport._read_wants_write) transport._read_ready.assert_called_with() - self.loop.add_reader.assert_called_with( + self.loop._add_reader.assert_called_with( transport._sock_fd, transport._read_ready) def test_write_eof(self): @@ -1699,7 +1699,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase): transport = self.datagram_transport() transport._buffer.append((data, ('0.0.0.0', 12345))) - self.loop.add_writer(7, transport._sendto_ready) + self.loop._add_writer(7, transport._sendto_ready) transport._sendto_ready() self.assertTrue(self.sock.sendto.called) self.assertEqual( @@ -1713,7 +1713,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase): transport = self.datagram_transport() transport._closing = True transport._buffer.append((data, ())) - self.loop.add_writer(7, transport._sendto_ready) + self.loop._add_writer(7, transport._sendto_ready) transport._sendto_ready() self.sock.sendto.assert_called_with(data, ()) self.assertFalse(self.loop.writers) @@ -1722,7 +1722,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase): def test_sendto_ready_no_data(self): transport = self.datagram_transport() - self.loop.add_writer(7, transport._sendto_ready) + self.loop._add_writer(7, transport._sendto_ready) transport._sendto_ready() self.assertFalse(self.sock.sendto.called) self.assertFalse(self.loop.writers) @@ -1732,7 +1732,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase): transport = self.datagram_transport() transport._buffer.extend([(b'data1', ()), (b'data2', ())]) - self.loop.add_writer(7, transport._sendto_ready) + self.loop._add_writer(7, transport._sendto_ready) transport._sendto_ready() self.loop.assert_writer(7, transport._sendto_ready) diff --git a/Misc/NEWS b/Misc/NEWS index 92ec745..8ea2ce2 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -350,6 +350,9 @@ Library no loop attached. Patch by Vincent Michel. +- Issue #28369: Raise RuntimeError when transport's FD is used with + add_reader, add_writer, etc. + IDLE ---- -- cgit v0.12