summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/selector_events.py127
-rw-r--r--Lib/asyncio/test_utils.py42
-rw-r--r--Lib/asyncio/unix_events.py26
3 files changed, 130 insertions, 65 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 1e16ac6..9dbe550 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -11,6 +11,7 @@ import errno
import functools
import socket
import warnings
+import weakref
try:
import ssl
except ImportError: # pragma: no cover
@@ -64,6 +65,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
self._make_self_pipe()
+ self._transports = weakref.WeakValueDictionary()
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
@@ -115,7 +117,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
raise NotImplementedError
def _close_self_pipe(self):
- self.remove_reader(self._ssock.fileno())
+ self._remove_reader(self._ssock.fileno())
self._ssock.close()
self._ssock = None
self._csock.close()
@@ -128,7 +130,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
- self.add_reader(self._ssock.fileno(), self._read_from_self)
+ self._add_reader(self._ssock.fileno(), self._read_from_self)
def _process_self_data(self, data):
pass
@@ -163,8 +165,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def _start_serving(self, protocol_factory, sock,
sslcontext=None, server=None, backlog=100):
- self.add_reader(sock.fileno(), self._accept_connection,
- protocol_factory, sock, sslcontext, server, backlog)
+ self._add_reader(sock.fileno(), self._accept_connection,
+ protocol_factory, sock, sslcontext, server, backlog)
def _accept_connection(self, protocol_factory, sock,
sslcontext=None, server=None, backlog=100):
@@ -194,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
'exception': exc,
'socket': sock,
})
- self.remove_reader(sock.fileno())
+ self._remove_reader(sock.fileno())
self.call_later(constants.ACCEPT_RETRY_DELAY,
self._start_serving,
protocol_factory, sock, sslcontext, server,
@@ -244,8 +246,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
context['transport'] = transport
self.call_exception_handler(context)
- def add_reader(self, fd, callback, *args):
- """Add a reader callback."""
+ def _ensure_fd_no_transport(self, fd):
+ try:
+ transport = self._transports[fd]
+ except KeyError:
+ pass
+ else:
+ if not transport.is_closing():
+ raise RuntimeError(
+ 'File descriptor {!r} is used by transport {!r}'.format(
+ fd, transport))
+
+ def _add_reader(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self)
try:
@@ -260,8 +272,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
if reader is not None:
reader.cancel()
- def remove_reader(self, fd):
- """Remove a reader callback."""
+ def _remove_reader(self, fd):
if self.is_closed():
return False
try:
@@ -282,8 +293,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
return False
- def add_writer(self, fd, callback, *args):
- """Add a writer callback.."""
+ def _add_writer(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self)
try:
@@ -298,7 +308,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
if writer is not None:
writer.cancel()
- def remove_writer(self, fd):
+ def _remove_writer(self, fd):
"""Remove a writer callback."""
if self.is_closed():
return False
@@ -321,6 +331,26 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
return False
+ def add_reader(self, fd, callback, *args):
+ """Add a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_reader(fd, callback, *args)
+
+ def remove_reader(self, fd):
+ """Remove a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_reader(fd)
+
+ def add_writer(self, fd, callback, *args):
+ """Add a writer callback.."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_writer(fd, callback, *args)
+
+ def remove_writer(self, fd):
+ """Remove a writer callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_writer(fd)
+
def sock_recv(self, sock, n):
"""Receive data from the socket.
@@ -494,17 +524,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
fileobj, (reader, writer) = key.fileobj, key.data
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
- self.remove_reader(fileobj)
+ self._remove_reader(fileobj)
else:
self._add_callback(reader)
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
- self.remove_writer(fileobj)
+ self._remove_writer(fileobj)
else:
self._add_callback(writer)
def _stop_serving(self, sock):
- self.remove_reader(sock.fileno())
+ self._remove_reader(sock.fileno())
sock.close()
@@ -539,6 +569,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._closing = False # Set when close() called.
if self._server is not None:
self._server._attach()
+ loop._transports[self._sock_fd] = self
def __repr__(self):
info = [self.__class__.__name__]
@@ -584,10 +615,10 @@ class _SelectorTransport(transports._FlowControlMixin,
if self._closing:
return
self._closing = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if not self._buffer:
self._conn_lost += 1
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._loop.call_soon(self._call_connection_lost, None)
# On Python 3.3 and older, objects with a destructor part of a reference
@@ -619,10 +650,10 @@ class _SelectorTransport(transports._FlowControlMixin,
return
if self._buffer:
self._buffer.clear()
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if not self._closing:
self._closing = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
self._conn_lost += 1
self._loop.call_soon(self._call_connection_lost, exc)
@@ -659,7 +690,7 @@ class _SelectorSocketTransport(_SelectorTransport):
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -672,7 +703,7 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
@@ -682,7 +713,7 @@ class _SelectorSocketTransport(_SelectorTransport):
self._paused = False
if self._closing:
return
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
@@ -706,7 +737,7 @@ class _SelectorSocketTransport(_SelectorTransport):
# We're keeping the connection open so the
# protocol can write more, but we still can't
# receive more, so remove the reader callback.
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
else:
self.close()
@@ -739,7 +770,7 @@ class _SelectorSocketTransport(_SelectorTransport):
if not data:
return
# Not all was written; register write handler.
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
@@ -755,7 +786,7 @@ class _SelectorSocketTransport(_SelectorTransport):
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
else:
@@ -763,7 +794,7 @@ class _SelectorSocketTransport(_SelectorTransport):
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
elif self._eof:
@@ -834,19 +865,19 @@ class _SelectorSslTransport(_SelectorTransport):
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
- self._loop.add_reader(self._sock_fd,
- self._on_handshake, start_time)
+ self._loop._add_reader(self._sock_fd,
+ self._on_handshake, start_time)
return
except ssl.SSLWantWriteError:
- self._loop.add_writer(self._sock_fd,
- self._on_handshake, start_time)
+ self._loop._add_writer(self._sock_fd,
+ self._on_handshake, start_time)
return
except BaseException as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed",
self, exc_info=True)
- self._loop.remove_reader(self._sock_fd)
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._sock.close()
self._wakeup_waiter(exc)
if isinstance(exc, Exception):
@@ -854,8 +885,8 @@ class _SelectorSslTransport(_SelectorTransport):
else:
raise
- self._loop.remove_reader(self._sock_fd)
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
peercert = self._sock.getpeercert()
if not hasattr(self._sslcontext, 'check_hostname'):
@@ -883,7 +914,7 @@ class _SelectorSslTransport(_SelectorTransport):
self._read_wants_write = False
self._write_wants_read = False
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
self._protocol_connected = True
self._loop.call_soon(self._protocol.connection_made, self)
# only wake up the waiter when connection_made() has been called
@@ -905,7 +936,7 @@ class _SelectorSslTransport(_SelectorTransport):
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
@@ -915,7 +946,7 @@ class _SelectorSslTransport(_SelectorTransport):
self._paused = False
if self._closing:
return
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
@@ -927,7 +958,7 @@ class _SelectorSslTransport(_SelectorTransport):
self._write_ready()
if self._buffer:
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
try:
data = self._sock.recv(self.max_size)
@@ -935,8 +966,8 @@ class _SelectorSslTransport(_SelectorTransport):
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
- self._loop.remove_reader(self._sock_fd)
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
@@ -961,7 +992,7 @@ class _SelectorSslTransport(_SelectorTransport):
self._read_ready()
if not (self._paused or self._closing):
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._buffer:
try:
@@ -970,10 +1001,10 @@ class _SelectorSslTransport(_SelectorTransport):
n = 0
except ssl.SSLWantReadError:
n = 0
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
@@ -984,7 +1015,7 @@ class _SelectorSslTransport(_SelectorTransport):
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
@@ -1002,7 +1033,7 @@ class _SelectorSslTransport(_SelectorTransport):
return
if not self._buffer:
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
@@ -1022,7 +1053,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._address = address
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -1072,7 +1103,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._sock.sendto(data, addr)
return
except (BlockingIOError, InterruptedError):
- self._loop.add_writer(self._sock_fd, self._sendto_ready)
+ self._loop._add_writer(self._sock_fd, self._sendto_ready)
except OSError as exc:
self._protocol.error_received(exc)
return
@@ -1106,6 +1137,6 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
index ac8a8ef..fdd3ba0 100644
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -13,6 +13,8 @@ import tempfile
import threading
import time
import unittest
+import weakref
+
from unittest import mock
from http.server import HTTPServer
@@ -300,6 +302,8 @@ class TestLoop(base_events.BaseEventLoop):
self.writers = {}
self.reset_counters()
+ self._transports = weakref.WeakValueDictionary()
+
def time(self):
return self._time
@@ -318,10 +322,10 @@ class TestLoop(base_events.BaseEventLoop):
else: # pragma: no cover
raise AssertionError("Time generator is not finished")
- def add_reader(self, fd, callback, *args):
+ def _add_reader(self, fd, callback, *args):
self.readers[fd] = events.Handle(callback, args, self)
- def remove_reader(self, fd):
+ def _remove_reader(self, fd):
self.remove_reader_count[fd] += 1
if fd in self.readers:
del self.readers[fd]
@@ -337,10 +341,10 @@ class TestLoop(base_events.BaseEventLoop):
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)
- def add_writer(self, fd, callback, *args):
+ def _add_writer(self, fd, callback, *args):
self.writers[fd] = events.Handle(callback, args, self)
- def remove_writer(self, fd):
+ def _remove_writer(self, fd):
self.remove_writer_count[fd] += 1
if fd in self.writers:
del self.writers[fd]
@@ -356,6 +360,36 @@ class TestLoop(base_events.BaseEventLoop):
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)
+ def _ensure_fd_no_transport(self, fd):
+ try:
+ transport = self._transports[fd]
+ except KeyError:
+ pass
+ else:
+ raise RuntimeError(
+ 'File descriptor {!r} is used by transport {!r}'.format(
+ fd, transport))
+
+ def add_reader(self, fd, callback, *args):
+ """Add a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_reader(fd, callback, *args)
+
+ def remove_reader(self, fd):
+ """Remove a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_reader(fd)
+
+ def add_writer(self, fd, callback, *args):
+ """Add a writer callback.."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_writer(fd, callback, *args)
+
+ def remove_writer(self, fd):
+ """Remove a writer callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_writer(fd)
+
def reset_counters(self):
self.remove_reader_count = collections.defaultdict(int)
self.remove_writer_count = collections.defaultdict(int)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 972a42d..b227ee0 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -321,7 +321,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -364,15 +364,15 @@ class _UnixReadPipeTransport(transports.ReadTransport):
if self._loop.get_debug():
logger.info("%r was closed by peer", self)
self._closing = True
- self._loop.remove_reader(self._fileno)
+ self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._protocol.eof_received)
self._loop.call_soon(self._call_connection_lost, None)
def pause_reading(self):
- self._loop.remove_reader(self._fileno)
+ self._loop._remove_reader(self._fileno)
def resume_reading(self):
- self._loop.add_reader(self._fileno, self._read_ready)
+ self._loop._add_reader(self._fileno, self._read_ready)
def set_protocol(self, protocol):
self._protocol = protocol
@@ -413,7 +413,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
def _close(self, exc):
self._closing = True
- self._loop.remove_reader(self._fileno)
+ self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._call_connection_lost, exc)
def _call_connection_lost(self, exc):
@@ -458,7 +458,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
if is_socket or (is_fifo and not sys.platform.startswith("aix")):
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._fileno, self._read_ready)
if waiter is not None:
@@ -531,7 +531,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
return
elif n > 0:
data = memoryview(data)[n:]
- self._loop.add_writer(self._fileno, self._write_ready)
+ self._loop._add_writer(self._fileno, self._write_ready)
self._buffer += data
self._maybe_pause_protocol()
@@ -548,15 +548,15 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
self._conn_lost += 1
# Remove writer here, _fatal_error() doesn't it
# because _buffer is empty.
- self._loop.remove_writer(self._fileno)
+ self._loop._remove_writer(self._fileno)
self._fatal_error(exc, 'Fatal write error on pipe transport')
else:
if n == len(self._buffer):
self._buffer.clear()
- self._loop.remove_writer(self._fileno)
+ self._loop._remove_writer(self._fileno)
self._maybe_resume_protocol() # May append to buffer.
if self._closing:
- self._loop.remove_reader(self._fileno)
+ self._loop._remove_reader(self._fileno)
self._call_connection_lost(None)
return
elif n > 0:
@@ -571,7 +571,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
assert self._pipe
self._closing = True
if not self._buffer:
- self._loop.remove_reader(self._fileno)
+ self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._call_connection_lost, None)
def set_protocol(self, protocol):
@@ -618,9 +618,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
def _close(self, exc=None):
self._closing = True
if self._buffer:
- self._loop.remove_writer(self._fileno)
+ self._loop._remove_writer(self._fileno)
self._buffer.clear()
- self._loop.remove_reader(self._fileno)
+ self._loop._remove_reader(self._fileno)
self._loop.call_soon(self._call_connection_lost, exc)
def _call_connection_lost(self, exc):