From c9070d03f5169ad6e171e641b7fa8feab18bf229 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Thu, 25 Jan 2018 18:08:09 -0500 Subject: bpo-32662: Implement Server.start_serving() and Server.serve_forever() (#5312) * bpo-32662: Implement Server.start_serving() and Server.serve_forever() New methods: * Server.start_serving(), * Server.serve_forever(), and * Server.is_serving(). Add 'start_serving' keyword parameter to loop.create_server() and loop.create_unix_server(). --- Doc/library/asyncio-eventloop.rst | 87 ++++++++++++++- Lib/asyncio/base_events.py | 102 ++++++++++++++---- Lib/asyncio/events.py | 48 ++++++++- Lib/asyncio/unix_events.py | 12 ++- Lib/test/test_asyncio/test_server.py | 117 +++++++++++++++++++++ .../2018-01-25-01-45-30.bpo-32662.oabhd8.rst | 3 + 6 files changed, 334 insertions(+), 35 deletions(-) create mode 100644 Lib/test/test_asyncio/test_server.py create mode 100644 Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 6cee171..834a4e8 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -424,7 +424,7 @@ Creating connections Creating listening connections ------------------------------ -.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None) +.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to *host* and *port*. @@ -472,9 +472,15 @@ Creating listening connections for the SSL handshake to complete before aborting the connection. ``10.0`` seconds if ``None`` (default). + * *start_serving* set to ``True`` (the default) causes the created server + to start accepting connections immediately. When set to ``False``, + the user should await on :meth:`Server.start_serving` or + :meth:`Server.serve_forever` to make the server to start accepting + connections. + .. versionadded:: 3.7 - The *ssl_handshake_timeout* parameter. + *ssl_handshake_timeout* and *start_serving* parameters. .. versionchanged:: 3.5 @@ -490,7 +496,7 @@ Creating listening connections The *host* parameter can now be a sequence of strings. -.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None) +.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) Similar to :meth:`AbstractEventLoop.create_server`, but specific to the socket family :py:data:`~socket.AF_UNIX`. @@ -929,8 +935,26 @@ Server Server listening on sockets. - Object created by the :meth:`AbstractEventLoop.create_server` method and the - :func:`start_server` function. Don't instantiate the class directly. + Object created by :meth:`AbstractEventLoop.create_server`, + :meth:`AbstractEventLoop.create_unix_server`, :func:`start_server`, + and :func:`start_unix_server` functions. Don't instantiate the class + directly. + + *Server* objects are asynchronous context managers. When used in an + ``async with`` statement, it's guaranteed that the Server object is + closed and not accepting new connections when the ``async with`` + statement is completed:: + + srv = await loop.create_server(...) + + async with srv: + # some code + + # At this point, srv is closed and no longer accepts new connections. + + + .. versionchanged:: 3.7 + Server object is an asynchronous context manager since Python 3.7. .. method:: close() @@ -949,6 +973,54 @@ Server .. versionadded:: 3.7 + .. coroutinemethod:: start_serving() + + Start accepting connections. + + This method is idempotent, so it can be called when + the server is already being serving. + + The new *start_serving* keyword-only parameter to + :meth:`AbstractEventLoop.create_server` and + :meth:`asyncio.start_server` allows to create a Server object + that is not accepting connections right away. In which case + this method, or :meth:`Server.serve_forever` can be used + to make the Server object to start accepting connections. + + .. versionadded:: 3.7 + + .. coroutinemethod:: serve_forever() + + Start accepting connections until the coroutine is cancelled. + Cancellation of ``serve_forever`` task causes the server + to be closed. + + This method can be called if the server is already accepting + connections. Only one ``serve_forever`` task can exist per + one *Server* object. + + Example:: + + async def client_connected(reader, writer): + # Communicate with the client with + # reader/writer streams. For example: + await reader.readline() + + async def main(host, port): + srv = await asyncio.start_server( + client_connected, host, port) + await loop.serve_forever() + + asyncio.run(main('127.0.0.1', 0)) + + .. versionadded:: 3.7 + + .. method:: is_serving() + + Return ``True`` if the server is accepting new connections. + + .. versionadded:: 3.7 + .. coroutinemethod:: wait_closed() Wait until the :meth:`close` method completes. @@ -958,6 +1030,11 @@ Server List of :class:`socket.socket` objects the server is listening to, or ``None`` if the server is closed. + .. versionchanged:: 3.7 + Prior to Python 3.7 ``Server.sockets`` used to return the + internal list of server's sockets directly. In 3.7 a copy + of that list is returned. + Handle ------ diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index e722cf2..94eb308 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -157,47 +157,106 @@ def _run_until_complete_cb(fut): class Server(events.AbstractServer): - def __init__(self, loop, sockets): + def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, + ssl_handshake_timeout): self._loop = loop - self.sockets = sockets + self._sockets = sockets self._active_count = 0 self._waiters = [] + self._protocol_factory = protocol_factory + self._backlog = backlog + self._ssl_context = ssl_context + self._ssl_handshake_timeout = ssl_handshake_timeout + self._serving = False + self._serving_forever_fut = None def __repr__(self): return f'<{self.__class__.__name__} sockets={self.sockets!r}>' def _attach(self): - assert self.sockets is not None + assert self._sockets is not None self._active_count += 1 def _detach(self): assert self._active_count > 0 self._active_count -= 1 - if self._active_count == 0 and self.sockets is None: + if self._active_count == 0 and self._sockets is None: self._wakeup() + def _wakeup(self): + waiters = self._waiters + self._waiters = None + for waiter in waiters: + if not waiter.done(): + waiter.set_result(waiter) + + def _start_serving(self): + if self._serving: + return + self._serving = True + for sock in self._sockets: + sock.listen(self._backlog) + self._loop._start_serving( + self._protocol_factory, sock, self._ssl_context, + self, self._backlog, self._ssl_handshake_timeout) + + def get_loop(self): + return self._loop + + def is_serving(self): + return self._serving + + @property + def sockets(self): + if self._sockets is None: + return [] + return list(self._sockets) + def close(self): - sockets = self.sockets + sockets = self._sockets if sockets is None: return - self.sockets = None + self._sockets = None + for sock in sockets: self._loop._stop_serving(sock) + + self._serving = False + + if (self._serving_forever_fut is not None and + not self._serving_forever_fut.done()): + self._serving_forever_fut.cancel() + self._serving_forever_fut = None + if self._active_count == 0: self._wakeup() - def get_loop(self): - return self._loop + async def start_serving(self): + self._start_serving() - def _wakeup(self): - waiters = self._waiters - self._waiters = None - for waiter in waiters: - if not waiter.done(): - waiter.set_result(waiter) + async def serve_forever(self): + if self._serving_forever_fut is not None: + raise RuntimeError( + f'server {self!r} is already being awaited on serve_forever()') + if self._sockets is None: + raise RuntimeError(f'server {self!r} is closed') + + self._start_serving() + self._serving_forever_fut = self._loop.create_future() + + try: + await self._serving_forever_fut + except futures.CancelledError: + try: + self.close() + await self.wait_closed() + finally: + raise + finally: + self._serving_forever_fut = None async def wait_closed(self): - if self.sockets is None or self._waiters is None: + if self._sockets is None or self._waiters is None: return waiter = self._loop.create_future() self._waiters.append(waiter) @@ -1059,7 +1118,8 @@ class BaseEventLoop(events.AbstractEventLoop): ssl=None, reuse_address=None, reuse_port=None, - ssl_handshake_timeout=None): + ssl_handshake_timeout=None, + start_serving=True): """Create a TCP server. The host parameter can be a string, in that case the TCP server is @@ -1149,12 +1209,14 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError(f'A Stream Socket was expected, got {sock!r}') sockets = [sock] - server = Server(self, sockets) for sock in sockets: - sock.listen(backlog) sock.setblocking(False) - self._start_serving(protocol_factory, sock, ssl, server, backlog, - ssl_handshake_timeout) + + server = Server(self, sockets, protocol_factory, + ssl, backlog, ssl_handshake_timeout) + if start_serving: + server._start_serving() + if self._debug: logger.info("%r is serving", server) return server diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 5c68d4c..7aa3de0 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -164,13 +164,39 @@ class AbstractServer: """Stop serving. This leaves existing connections open.""" raise NotImplementedError + def get_loop(self): + """Get the event loop the Server object is attached to.""" + raise NotImplementedError + + def is_serving(self): + """Return True if the server is accepting connections.""" + raise NotImplementedError + + async def start_serving(self): + """Start accepting connections. + + This method is idempotent, so it can be called when + the server is already being serving. + """ + raise NotImplementedError + + async def serve_forever(self): + """Start accepting connections until the coroutine is cancelled. + + The server is closed when the coroutine is cancelled. + """ + raise NotImplementedError + async def wait_closed(self): """Coroutine to wait until service is closed.""" raise NotImplementedError - def get_loop(self): - """ Get the event loop the Server object is attached to.""" - raise NotImplementedError + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + self.close() + await self.wait_closed() class AbstractEventLoop: @@ -279,7 +305,8 @@ class AbstractEventLoop: *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, - ssl_handshake_timeout=None): + ssl_handshake_timeout=None, + start_serving=True): """A coroutine which creates a TCP server bound to host and port. The return value is a Server object which can be used to stop @@ -319,6 +346,11 @@ class AbstractEventLoop: will wait for completion of the SSL handshake before aborting the connection. Default is 10s, longer timeouts may increase vulnerability to DoS attacks (see https://support.f5.com/csp/article/K13834) + + start_serving set to True (default) causes the created server + to start accepting connections immediately. When set to False, + the user should await Server.start_serving() or Server.serve_forever() + to make the server to start accepting connections. """ raise NotImplementedError @@ -343,7 +375,8 @@ class AbstractEventLoop: async def create_unix_server( self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, - ssl_handshake_timeout=None): + ssl_handshake_timeout=None, + start_serving=True): """A coroutine which creates a UNIX Domain Socket server. The return value is a Server object, which can be used to stop @@ -363,6 +396,11 @@ class AbstractEventLoop: ssl_handshake_timeout is the time in seconds that an SSL server will wait for the SSL handshake to complete (defaults to 10s). + + start_serving set to True (default) causes the created server + to start accepting connections immediately. When set to False, + the user should await Server.start_serving() or Server.serve_forever() + to make the server to start accepting connections. """ raise NotImplementedError diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 9b9d004..a4d892a 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -250,7 +250,8 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): async def create_unix_server( self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, - ssl_handshake_timeout=None): + ssl_handshake_timeout=None, + start_serving=True): if isinstance(ssl, bool): raise TypeError('ssl argument must be an SSLContext or None') @@ -302,11 +303,12 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): raise ValueError( f'A UNIX Domain Stream Socket was expected, got {sock!r}') - server = base_events.Server(self, [sock]) - sock.listen(backlog) sock.setblocking(False) - self._start_serving(protocol_factory, sock, ssl, server, - ssl_handshake_timeout=ssl_handshake_timeout) + server = base_events.Server(self, [sock], protocol_factory, + ssl, backlog, ssl_handshake_timeout) + if start_serving: + server._start_serving() + return server async def _sock_sendfile_native(self, sock, file, offset, count): diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py new file mode 100644 index 0000000..44d135d --- /dev/null +++ b/Lib/test/test_asyncio/test_server.py @@ -0,0 +1,117 @@ +import asyncio +import socket +import threading +import unittest + +from test.test_asyncio import utils as test_utils +from test.test_asyncio import functional as func_tests + + +class BaseStartServer(func_tests.FunctionalTestCaseMixin): + + def new_loop(self): + raise NotImplementedError + + def test_start_server_1(self): + HELLO_MSG = b'1' * 1024 * 5 + b'\n' + + def client(sock, addr): + sock.connect(addr) + sock.send(HELLO_MSG) + sock.recv_all(1) + sock.close() + + async def serve(reader, writer): + await reader.readline() + main_task.cancel() + writer.write(b'1') + writer.close() + await writer.wait_closed() + + async def main(srv): + async with srv: + await srv.serve_forever() + + srv = self.loop.run_until_complete(asyncio.start_server( + serve, '127.0.0.1', 0, loop=self.loop, start_serving=False)) + + self.assertFalse(srv.is_serving()) + + main_task = self.loop.create_task(main(srv)) + + addr = srv.sockets[0].getsockname() + with self.assertRaises(asyncio.CancelledError): + with self.tcp_client(lambda sock: client(sock, addr)): + self.loop.run_until_complete(main_task) + + self.assertEqual(srv.sockets, []) + + self.assertIsNone(srv._sockets) + self.assertIsNone(srv._waiters) + self.assertFalse(srv.is_serving()) + + with self.assertRaisesRegex(RuntimeError, r'is closed'): + self.loop.run_until_complete(srv.serve_forever()) + + +class SelectorStartServerTests(BaseStartServer, unittest.TestCase): + + def new_loop(self): + return asyncio.SelectorEventLoop() + + @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'no Unix sockets') + def test_start_unix_server_1(self): + HELLO_MSG = b'1' * 1024 * 5 + b'\n' + started = threading.Event() + + def client(sock, addr): + started.wait(5) + sock.connect(addr) + sock.send(HELLO_MSG) + sock.recv_all(1) + sock.close() + + async def serve(reader, writer): + await reader.readline() + main_task.cancel() + writer.write(b'1') + writer.close() + await writer.wait_closed() + + async def main(srv): + async with srv: + self.assertFalse(srv.is_serving()) + await srv.start_serving() + self.assertTrue(srv.is_serving()) + started.set() + await srv.serve_forever() + + with test_utils.unix_socket_path() as addr: + srv = self.loop.run_until_complete(asyncio.start_unix_server( + serve, addr, loop=self.loop, start_serving=False)) + + main_task = self.loop.create_task(main(srv)) + + with self.assertRaises(asyncio.CancelledError): + with self.unix_client(lambda sock: client(sock, addr)): + self.loop.run_until_complete(main_task) + + self.assertEqual(srv.sockets, []) + + self.assertIsNone(srv._sockets) + self.assertIsNone(srv._waiters) + self.assertFalse(srv.is_serving()) + + with self.assertRaisesRegex(RuntimeError, r'is closed'): + self.loop.run_until_complete(srv.serve_forever()) + + +@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only') +class ProactorStartServerTests(BaseStartServer, unittest.TestCase): + + def new_loop(self): + return asyncio.ProactorEventLoop() + + +if __name__ == '__main__': + unittest.main() diff --git a/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst b/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst new file mode 100644 index 0000000..44c8b95 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst @@ -0,0 +1,3 @@ +Implement Server.start_serving(), Server.serve_forever(), and +Server.is_serving() methods. Add 'start_serving' keyword parameter to +loop.create_server() and loop.create_unix_server(). -- cgit v0.12