diff options
author | Yury Selivanov <yury@magic.io> | 2016-09-15 18:13:15 (GMT) |
---|---|---|
committer | Yury Selivanov <yury@magic.io> | 2016-09-15 18:13:15 (GMT) |
commit | a1b0e7db7315ff0d8d0f8edc056f387f198cf5a1 (patch) | |
tree | fec197b937b7573fb1558aa9ce4e6b0d75557f24 /Lib | |
parent | 4357cf62028964eb1a56c503ec1296de3034b77b (diff) | |
download | cpython-a1b0e7db7315ff0d8d0f8edc056f387f198cf5a1.zip cpython-a1b0e7db7315ff0d8d0f8edc056f387f198cf5a1.tar.gz cpython-a1b0e7db7315ff0d8d0f8edc056f387f198cf5a1.tar.bz2 |
Issue #27906: Fix socket accept exhaustion during high TCP traffic.
Patch by Kevin Conway.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/base_events.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 73 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_base_events.py | 2 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_selector_events.py | 14 |
5 files changed, 57 insertions, 36 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index bc3e012..8d926dc 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -1034,7 +1034,7 @@ class BaseEventLoop(events.AbstractEventLoop): for sock in sockets: sock.listen(backlog) sock.setblocking(False) - self._start_serving(protocol_factory, sock, ssl, server) + self._start_serving(protocol_factory, sock, ssl, server, backlog) if self._debug: logger.info("%r is serving", server) return server diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 97ab487..fef3205 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -494,7 +494,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): self._csock.send(b'\0') def _start_serving(self, protocol_factory, sock, - sslcontext=None, server=None): + sslcontext=None, server=None, backlog=100): def loop(f=None): try: diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index c91ab04..c18885e 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -162,43 +162,50 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): exc_info=True) def _start_serving(self, protocol_factory, sock, - sslcontext=None, server=None): + sslcontext=None, server=None, backlog=100): self.add_reader(sock.fileno(), self._accept_connection, - protocol_factory, sock, sslcontext, server) + protocol_factory, sock, sslcontext, server, backlog) def _accept_connection(self, protocol_factory, sock, - sslcontext=None, server=None): - try: - conn, addr = sock.accept() - if self._debug: - logger.debug("%r got a new connection from %r: %r", - server, addr, conn) - conn.setblocking(False) - except (BlockingIOError, InterruptedError, ConnectionAbortedError): - pass # False alarm. - except OSError as exc: - # There's nowhere to send the error, so just log it. - if exc.errno in (errno.EMFILE, errno.ENFILE, - errno.ENOBUFS, errno.ENOMEM): - # Some platforms (e.g. Linux keep reporting the FD as - # ready, so we remove the read handler temporarily. - # We'll try again in a while. - self.call_exception_handler({ - 'message': 'socket.accept() out of system resource', - 'exception': exc, - 'socket': sock, - }) - self.remove_reader(sock.fileno()) - self.call_later(constants.ACCEPT_RETRY_DELAY, - self._start_serving, - protocol_factory, sock, sslcontext, server) + sslcontext=None, server=None, backlog=100): + # This method is only called once for each event loop tick where the + # listening socket has triggered an EVENT_READ. There may be multiple + # connections waiting for an .accept() so it is called in a loop. + # See https://bugs.python.org/issue27906 for more details. + for _ in range(backlog): + try: + conn, addr = sock.accept() + if self._debug: + logger.debug("%r got a new connection from %r: %r", + server, addr, conn) + conn.setblocking(False) + except (BlockingIOError, InterruptedError, ConnectionAbortedError): + # Early exit because the socket accept buffer is empty. + return None + except OSError as exc: + # There's nowhere to send the error, so just log it. + if exc.errno in (errno.EMFILE, errno.ENFILE, + errno.ENOBUFS, errno.ENOMEM): + # Some platforms (e.g. Linux keep reporting the FD as + # ready, so we remove the read handler temporarily. + # We'll try again in a while. + self.call_exception_handler({ + 'message': 'socket.accept() out of system resource', + 'exception': exc, + 'socket': sock, + }) + self.remove_reader(sock.fileno()) + self.call_later(constants.ACCEPT_RETRY_DELAY, + self._start_serving, + protocol_factory, sock, sslcontext, server, + backlog) + else: + raise # The event loop will catch, log and ignore it. else: - raise # The event loop will catch, log and ignore it. - else: - extra = {'peername': addr} - accept = self._accept_connection2(protocol_factory, conn, extra, - sslcontext, server) - self.create_task(accept) + extra = {'peername': addr} + accept = self._accept_connection2(protocol_factory, conn, extra, + sslcontext, server) + self.create_task(accept) @coroutine def _accept_connection2(self, protocol_factory, conn, extra, diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 206ebc6..0efdc20 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1634,7 +1634,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY, # self.loop._start_serving mock.ANY, - MyProto, sock, None, None) + MyProto, sock, None, None, mock.ANY) def test_call_coroutine(self): @asyncio.coroutine diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index ff71c21..73bc3f3 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -687,6 +687,20 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): selectors.EVENT_WRITE)]) self.loop.remove_writer.assert_called_with(1) + def test_accept_connection_multiple(self): + sock = mock.Mock() + sock.accept.return_value = (mock.Mock(), mock.Mock()) + backlog = 100 + # Mock the coroutine generation for a connection to prevent + # warnings related to un-awaited coroutines. + mock_obj = mock.patch.object + with mock_obj(self.loop, '_accept_connection2') as accept2_mock: + accept2_mock.return_value = None + with mock_obj(self.loop, 'create_task') as task_mock: + task_mock.return_value = None + self.loop._accept_connection(mock.Mock(), sock, backlog=backlog) + self.assertEqual(sock.accept.call_count, backlog) + class SelectorTransportTests(test_utils.TestCase): |