summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2018-01-25 23:08:09 (GMT)
committerGitHub <noreply@github.com>2018-01-25 23:08:09 (GMT)
commitc9070d03f5169ad6e171e641b7fa8feab18bf229 (patch)
tree4bad875e6f68874a980e5289a45893f4335c5afb
parent1aa094f74039cd20fdc7df56c68f6848c18ce4dd (diff)
downloadcpython-c9070d03f5169ad6e171e641b7fa8feab18bf229.zip
cpython-c9070d03f5169ad6e171e641b7fa8feab18bf229.tar.gz
cpython-c9070d03f5169ad6e171e641b7fa8feab18bf229.tar.bz2
bpo-32662: Implement Server.start_serving() and Server.serve_forever() (#5312)
* bpo-32662: Implement Server.start_serving() and Server.serve_forever() New methods: * Server.start_serving(), * Server.serve_forever(), and * Server.is_serving(). Add 'start_serving' keyword parameter to loop.create_server() and loop.create_unix_server().
-rw-r--r--Doc/library/asyncio-eventloop.rst87
-rw-r--r--Lib/asyncio/base_events.py102
-rw-r--r--Lib/asyncio/events.py48
-rw-r--r--Lib/asyncio/unix_events.py12
-rw-r--r--Lib/test/test_asyncio/test_server.py117
-rw-r--r--Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst3
6 files changed, 334 insertions, 35 deletions
diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst
index 6cee171..834a4e8 100644
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -424,7 +424,7 @@ Creating connections
Creating listening connections
------------------------------
-.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None)
+.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to
*host* and *port*.
@@ -472,9 +472,15 @@ Creating listening connections
for the SSL handshake to complete before aborting the connection.
``10.0`` seconds if ``None`` (default).
+ * *start_serving* set to ``True`` (the default) causes the created server
+ to start accepting connections immediately. When set to ``False``,
+ the user should await on :meth:`Server.start_serving` or
+ :meth:`Server.serve_forever` to make the server to start accepting
+ connections.
+
.. versionadded:: 3.7
- The *ssl_handshake_timeout* parameter.
+ *ssl_handshake_timeout* and *start_serving* parameters.
.. versionchanged:: 3.5
@@ -490,7 +496,7 @@ Creating listening connections
The *host* parameter can now be a sequence of strings.
-.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None)
+.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
Similar to :meth:`AbstractEventLoop.create_server`, but specific to the
socket family :py:data:`~socket.AF_UNIX`.
@@ -929,8 +935,26 @@ Server
Server listening on sockets.
- Object created by the :meth:`AbstractEventLoop.create_server` method and the
- :func:`start_server` function. Don't instantiate the class directly.
+ Object created by :meth:`AbstractEventLoop.create_server`,
+ :meth:`AbstractEventLoop.create_unix_server`, :func:`start_server`,
+ and :func:`start_unix_server` functions. Don't instantiate the class
+ directly.
+
+ *Server* objects are asynchronous context managers. When used in an
+ ``async with`` statement, it's guaranteed that the Server object is
+ closed and not accepting new connections when the ``async with``
+ statement is completed::
+
+ srv = await loop.create_server(...)
+
+ async with srv:
+ # some code
+
+ # At this point, srv is closed and no longer accepts new connections.
+
+
+ .. versionchanged:: 3.7
+ Server object is an asynchronous context manager since Python 3.7.
.. method:: close()
@@ -949,6 +973,54 @@ Server
.. versionadded:: 3.7
+ .. coroutinemethod:: start_serving()
+
+ Start accepting connections.
+
+ This method is idempotent, so it can be called when
+ the server is already being serving.
+
+ The new *start_serving* keyword-only parameter to
+ :meth:`AbstractEventLoop.create_server` and
+ :meth:`asyncio.start_server` allows to create a Server object
+ that is not accepting connections right away. In which case
+ this method, or :meth:`Server.serve_forever` can be used
+ to make the Server object to start accepting connections.
+
+ .. versionadded:: 3.7
+
+ .. coroutinemethod:: serve_forever()
+
+ Start accepting connections until the coroutine is cancelled.
+ Cancellation of ``serve_forever`` task causes the server
+ to be closed.
+
+ This method can be called if the server is already accepting
+ connections. Only one ``serve_forever`` task can exist per
+ one *Server* object.
+
+ Example::
+
+ async def client_connected(reader, writer):
+ # Communicate with the client with
+ # reader/writer streams. For example:
+ await reader.readline()
+
+ async def main(host, port):
+ srv = await asyncio.start_server(
+ client_connected, host, port)
+ await loop.serve_forever()
+
+ asyncio.run(main('127.0.0.1', 0))
+
+ .. versionadded:: 3.7
+
+ .. method:: is_serving()
+
+ Return ``True`` if the server is accepting new connections.
+
+ .. versionadded:: 3.7
+
.. coroutinemethod:: wait_closed()
Wait until the :meth:`close` method completes.
@@ -958,6 +1030,11 @@ Server
List of :class:`socket.socket` objects the server is listening to, or
``None`` if the server is closed.
+ .. versionchanged:: 3.7
+ Prior to Python 3.7 ``Server.sockets`` used to return the
+ internal list of server's sockets directly. In 3.7 a copy
+ of that list is returned.
+
Handle
------
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index e722cf2..94eb308 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -157,47 +157,106 @@ def _run_until_complete_cb(fut):
class Server(events.AbstractServer):
- def __init__(self, loop, sockets):
+ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
+ ssl_handshake_timeout):
self._loop = loop
- self.sockets = sockets
+ self._sockets = sockets
self._active_count = 0
self._waiters = []
+ self._protocol_factory = protocol_factory
+ self._backlog = backlog
+ self._ssl_context = ssl_context
+ self._ssl_handshake_timeout = ssl_handshake_timeout
+ self._serving = False
+ self._serving_forever_fut = None
def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
def _attach(self):
- assert self.sockets is not None
+ assert self._sockets is not None
self._active_count += 1
def _detach(self):
assert self._active_count > 0
self._active_count -= 1
- if self._active_count == 0 and self.sockets is None:
+ if self._active_count == 0 and self._sockets is None:
self._wakeup()
+ def _wakeup(self):
+ waiters = self._waiters
+ self._waiters = None
+ for waiter in waiters:
+ if not waiter.done():
+ waiter.set_result(waiter)
+
+ def _start_serving(self):
+ if self._serving:
+ return
+ self._serving = True
+ for sock in self._sockets:
+ sock.listen(self._backlog)
+ self._loop._start_serving(
+ self._protocol_factory, sock, self._ssl_context,
+ self, self._backlog, self._ssl_handshake_timeout)
+
+ def get_loop(self):
+ return self._loop
+
+ def is_serving(self):
+ return self._serving
+
+ @property
+ def sockets(self):
+ if self._sockets is None:
+ return []
+ return list(self._sockets)
+
def close(self):
- sockets = self.sockets
+ sockets = self._sockets
if sockets is None:
return
- self.sockets = None
+ self._sockets = None
+
for sock in sockets:
self._loop._stop_serving(sock)
+
+ self._serving = False
+
+ if (self._serving_forever_fut is not None and
+ not self._serving_forever_fut.done()):
+ self._serving_forever_fut.cancel()
+ self._serving_forever_fut = None
+
if self._active_count == 0:
self._wakeup()
- def get_loop(self):
- return self._loop
+ async def start_serving(self):
+ self._start_serving()
- def _wakeup(self):
- waiters = self._waiters
- self._waiters = None
- for waiter in waiters:
- if not waiter.done():
- waiter.set_result(waiter)
+ async def serve_forever(self):
+ if self._serving_forever_fut is not None:
+ raise RuntimeError(
+ f'server {self!r} is already being awaited on serve_forever()')
+ if self._sockets is None:
+ raise RuntimeError(f'server {self!r} is closed')
+
+ self._start_serving()
+ self._serving_forever_fut = self._loop.create_future()
+
+ try:
+ await self._serving_forever_fut
+ except futures.CancelledError:
+ try:
+ self.close()
+ await self.wait_closed()
+ finally:
+ raise
+ finally:
+ self._serving_forever_fut = None
async def wait_closed(self):
- if self.sockets is None or self._waiters is None:
+ if self._sockets is None or self._waiters is None:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
@@ -1059,7 +1118,8 @@ class BaseEventLoop(events.AbstractEventLoop):
ssl=None,
reuse_address=None,
reuse_port=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ start_serving=True):
"""Create a TCP server.
The host parameter can be a string, in that case the TCP server is
@@ -1149,12 +1209,14 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
sockets = [sock]
- server = Server(self, sockets)
for sock in sockets:
- sock.listen(backlog)
sock.setblocking(False)
- self._start_serving(protocol_factory, sock, ssl, server, backlog,
- ssl_handshake_timeout)
+
+ server = Server(self, sockets, protocol_factory,
+ ssl, backlog, ssl_handshake_timeout)
+ if start_serving:
+ server._start_serving()
+
if self._debug:
logger.info("%r is serving", server)
return server
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 5c68d4c..7aa3de0 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -164,13 +164,39 @@ class AbstractServer:
"""Stop serving. This leaves existing connections open."""
raise NotImplementedError
+ def get_loop(self):
+ """Get the event loop the Server object is attached to."""
+ raise NotImplementedError
+
+ def is_serving(self):
+ """Return True if the server is accepting connections."""
+ raise NotImplementedError
+
+ async def start_serving(self):
+ """Start accepting connections.
+
+ This method is idempotent, so it can be called when
+ the server is already being serving.
+ """
+ raise NotImplementedError
+
+ async def serve_forever(self):
+ """Start accepting connections until the coroutine is cancelled.
+
+ The server is closed when the coroutine is cancelled.
+ """
+ raise NotImplementedError
+
async def wait_closed(self):
"""Coroutine to wait until service is closed."""
raise NotImplementedError
- def get_loop(self):
- """ Get the event loop the Server object is attached to."""
- raise NotImplementedError
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, *exc):
+ self.close()
+ await self.wait_closed()
class AbstractEventLoop:
@@ -279,7 +305,8 @@ class AbstractEventLoop:
*, family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE, sock=None, backlog=100,
ssl=None, reuse_address=None, reuse_port=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ start_serving=True):
"""A coroutine which creates a TCP server bound to host and port.
The return value is a Server object which can be used to stop
@@ -319,6 +346,11 @@ class AbstractEventLoop:
will wait for completion of the SSL handshake before aborting the
connection. Default is 10s, longer timeouts may increase vulnerability
to DoS attacks (see https://support.f5.com/csp/article/K13834)
+
+ start_serving set to True (default) causes the created server
+ to start accepting connections immediately. When set to False,
+ the user should await Server.start_serving() or Server.serve_forever()
+ to make the server to start accepting connections.
"""
raise NotImplementedError
@@ -343,7 +375,8 @@ class AbstractEventLoop:
async def create_unix_server(
self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ start_serving=True):
"""A coroutine which creates a UNIX Domain Socket server.
The return value is a Server object, which can be used to stop
@@ -363,6 +396,11 @@ class AbstractEventLoop:
ssl_handshake_timeout is the time in seconds that an SSL server
will wait for the SSL handshake to complete (defaults to 10s).
+
+ start_serving set to True (default) causes the created server
+ to start accepting connections immediately. When set to False,
+ the user should await Server.start_serving() or Server.serve_forever()
+ to make the server to start accepting connections.
"""
raise NotImplementedError
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 9b9d004..a4d892a 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -250,7 +250,8 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
async def create_unix_server(
self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ start_serving=True):
if isinstance(ssl, bool):
raise TypeError('ssl argument must be an SSLContext or None')
@@ -302,11 +303,12 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
raise ValueError(
f'A UNIX Domain Stream Socket was expected, got {sock!r}')
- server = base_events.Server(self, [sock])
- sock.listen(backlog)
sock.setblocking(False)
- self._start_serving(protocol_factory, sock, ssl, server,
- ssl_handshake_timeout=ssl_handshake_timeout)
+ server = base_events.Server(self, [sock], protocol_factory,
+ ssl, backlog, ssl_handshake_timeout)
+ if start_serving:
+ server._start_serving()
+
return server
async def _sock_sendfile_native(self, sock, file, offset, count):
diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py
new file mode 100644
index 0000000..44d135d
--- /dev/null
+++ b/Lib/test/test_asyncio/test_server.py
@@ -0,0 +1,117 @@
+import asyncio
+import socket
+import threading
+import unittest
+
+from test.test_asyncio import utils as test_utils
+from test.test_asyncio import functional as func_tests
+
+
+class BaseStartServer(func_tests.FunctionalTestCaseMixin):
+
+ def new_loop(self):
+ raise NotImplementedError
+
+ def test_start_server_1(self):
+ HELLO_MSG = b'1' * 1024 * 5 + b'\n'
+
+ def client(sock, addr):
+ sock.connect(addr)
+ sock.send(HELLO_MSG)
+ sock.recv_all(1)
+ sock.close()
+
+ async def serve(reader, writer):
+ await reader.readline()
+ main_task.cancel()
+ writer.write(b'1')
+ writer.close()
+ await writer.wait_closed()
+
+ async def main(srv):
+ async with srv:
+ await srv.serve_forever()
+
+ srv = self.loop.run_until_complete(asyncio.start_server(
+ serve, '127.0.0.1', 0, loop=self.loop, start_serving=False))
+
+ self.assertFalse(srv.is_serving())
+
+ main_task = self.loop.create_task(main(srv))
+
+ addr = srv.sockets[0].getsockname()
+ with self.assertRaises(asyncio.CancelledError):
+ with self.tcp_client(lambda sock: client(sock, addr)):
+ self.loop.run_until_complete(main_task)
+
+ self.assertEqual(srv.sockets, [])
+
+ self.assertIsNone(srv._sockets)
+ self.assertIsNone(srv._waiters)
+ self.assertFalse(srv.is_serving())
+
+ with self.assertRaisesRegex(RuntimeError, r'is closed'):
+ self.loop.run_until_complete(srv.serve_forever())
+
+
+class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
+
+ def new_loop(self):
+ return asyncio.SelectorEventLoop()
+
+ @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'no Unix sockets')
+ def test_start_unix_server_1(self):
+ HELLO_MSG = b'1' * 1024 * 5 + b'\n'
+ started = threading.Event()
+
+ def client(sock, addr):
+ started.wait(5)
+ sock.connect(addr)
+ sock.send(HELLO_MSG)
+ sock.recv_all(1)
+ sock.close()
+
+ async def serve(reader, writer):
+ await reader.readline()
+ main_task.cancel()
+ writer.write(b'1')
+ writer.close()
+ await writer.wait_closed()
+
+ async def main(srv):
+ async with srv:
+ self.assertFalse(srv.is_serving())
+ await srv.start_serving()
+ self.assertTrue(srv.is_serving())
+ started.set()
+ await srv.serve_forever()
+
+ with test_utils.unix_socket_path() as addr:
+ srv = self.loop.run_until_complete(asyncio.start_unix_server(
+ serve, addr, loop=self.loop, start_serving=False))
+
+ main_task = self.loop.create_task(main(srv))
+
+ with self.assertRaises(asyncio.CancelledError):
+ with self.unix_client(lambda sock: client(sock, addr)):
+ self.loop.run_until_complete(main_task)
+
+ self.assertEqual(srv.sockets, [])
+
+ self.assertIsNone(srv._sockets)
+ self.assertIsNone(srv._waiters)
+ self.assertFalse(srv.is_serving())
+
+ with self.assertRaisesRegex(RuntimeError, r'is closed'):
+ self.loop.run_until_complete(srv.serve_forever())
+
+
+@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
+class ProactorStartServerTests(BaseStartServer, unittest.TestCase):
+
+ def new_loop(self):
+ return asyncio.ProactorEventLoop()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst b/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst
new file mode 100644
index 0000000..44c8b95
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst
@@ -0,0 +1,3 @@
+Implement Server.start_serving(), Server.serve_forever(), and
+Server.is_serving() methods. Add 'start_serving' keyword parameter to
+loop.create_server() and loop.create_unix_server().