diff options
author | Yury Selivanov <yury@edgedb.com> | 2019-09-30 04:59:55 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-30 04:59:55 (GMT) |
commit | 6758e6e12a71ef5530146161881f88df1fa43382 (patch) | |
tree | da1f89f35e54ddcfffc3706b87bb13f54907f7ea /Doc/library/asyncio-stream.rst | |
parent | 3667e1ee6c90e6d3b6a745cd590ece87118f81ad (diff) | |
download | cpython-6758e6e12a71ef5530146161881f88df1fa43382.zip cpython-6758e6e12a71ef5530146161881f88df1fa43382.tar.gz cpython-6758e6e12a71ef5530146161881f88df1fa43382.tar.bz2 |
bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482)
See https://bugs.python.org/issue38242 for more details
Diffstat (limited to 'Doc/library/asyncio-stream.rst')
-rw-r--r-- | Doc/library/asyncio-stream.rst | 563 |
1 files changed, 64 insertions, 499 deletions
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index feebd22..471e6e9 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -18,12 +18,19 @@ streams:: import asyncio async def tcp_echo_client(message): - async with asyncio.connect('127.0.0.1', 8888) as stream: - print(f'Send: {message!r}') - await stream.write(message.encode()) + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) - data = await stream.read(100) - print(f'Received: {data.decode()!r}') + print(f'Send: {message!r}') + writer.write(message.encode()) + await writer.drain() + + data = await reader.read(100) + print(f'Received: {data.decode()!r}') + + print('Close the connection') + writer.close() + await writer.wait_closed() asyncio.run(tcp_echo_client('Hello World!')) @@ -37,31 +44,6 @@ The following top-level asyncio functions can be used to create and work with streams: -.. coroutinefunction:: connect(host=None, port=None, \*, \ - limit=2**16, ssl=None, family=0, \ - proto=0, flags=0, sock=None, local_addr=None, \ - server_hostname=None, ssl_handshake_timeout=None, \ - happy_eyeballs_delay=None, interleave=None) - - Connect to TCP socket on *host* : *port* address and return a :class:`Stream` - object of mode :attr:`StreamMode.READWRITE`. - - *limit* determines the buffer size limit used by the returned :class:`Stream` - instance. By default the *limit* is set to 64 KiB. - - The rest of the arguments are passed directly to :meth:`loop.create_connection`. - - The function can be used with ``await`` to get a connected stream:: - - stream = await asyncio.connect('127.0.0.1', 8888) - - The function can also be used as an async context manager:: - - async with asyncio.connect('127.0.0.1', 8888) as stream: - ... - - .. versionadded:: 3.8 - .. coroutinefunction:: open_connection(host=None, port=None, \*, \ loop=None, limit=None, ssl=None, family=0, \ proto=0, flags=0, sock=None, local_addr=None, \ @@ -87,12 +69,8 @@ and work with streams: The *ssl_handshake_timeout* parameter. - .. deprecated-removed:: 3.8 3.10 - - `open_connection()` is deprecated in favor of :func:`connect`. - .. coroutinefunction:: start_server(client_connected_cb, host=None, \ - port=None, \*, loop=None, limit=2**16, \ + port=None, \*, loop=None, limit=None, \ family=socket.AF_UNSPEC, \ flags=socket.AI_PASSIVE, sock=None, \ backlog=100, ssl=None, reuse_address=None, \ @@ -124,60 +102,9 @@ and work with streams: The *ssl_handshake_timeout* and *start_serving* parameters. - .. deprecated-removed:: 3.8 3.10 - - `start_server()` is deprecated if favor of :class:`StreamServer` - -.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16) - - Takes a :term:`file-like object <file object>` *pipe* to return a - :class:`Stream` object of the mode :attr:`StreamMode.READ` that has - similar API of :class:`StreamReader`. It can also be used as an async context manager. - - *limit* determines the buffer size limit used by the returned :class:`Stream` - instance. By default the limit is set to 64 KiB. - - .. versionadded:: 3.8 - -.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16) - - Takes a :term:`file-like object <file object>` *pipe* to return a - :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has - similar API of :class:`StreamWriter`. It can also be used as an async context manager. - - *limit* determines the buffer size limit used by the returned :class:`Stream` - instance. By default the limit is set to 64 KiB. - - .. versionadded:: 3.8 .. rubric:: Unix Sockets -.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \ - sock=None, server_hostname=None, \ - ssl_handshake_timeout=None) - - Establish a Unix socket connection to socket with *path* address and - return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE` - that can be used as a reader and a writer. - - *limit* determines the buffer size limit used by the returned :class:`Stream` - instance. By default the *limit* is set to 64 KiB. - - The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`. - - The function can be used with ``await`` to get a connected stream:: - - stream = await asyncio.connect_unix('/tmp/example.sock') - - The function can also be used as an async context manager:: - - async with asyncio.connect_unix('/tmp/example.sock') as stream: - ... - - .. availability:: Unix. - - .. versionadded:: 3.8 - .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \ limit=None, ssl=None, sock=None, \ server_hostname=None, ssl_handshake_timeout=None) @@ -199,10 +126,6 @@ and work with streams: The *path* parameter can now be a :term:`path-like object` - .. deprecated-removed:: 3.8 3.10 - - ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`. - .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ \*, loop=None, limit=None, sock=None, \ @@ -225,349 +148,6 @@ and work with streams: The *path* parameter can now be a :term:`path-like object`. - .. deprecated-removed:: 3.8 3.10 - - ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`. - - ---------- - -StreamServer -============ - -.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \ - limit=2**16, family=socket.AF_UNSPEC, \ - flags=socket.AI_PASSIVE, sock=None, backlog=100, \ - ssl=None, reuse_address=None, reuse_port=None, \ - ssl_handshake_timeout=None, shutdown_timeout=60) - - The *client_connected_cb* callback is called whenever a new client - connection is established. It receives a :class:`Stream` object of the - mode :attr:`StreamMode.READWRITE`. - - *client_connected_cb* can be a plain callable or a - :ref:`coroutine function <coroutine>`; if it is a coroutine function, - it will be automatically scheduled as a :class:`Task`. - - *limit* determines the buffer size limit used by the - returned :class:`Stream` instance. By default the *limit* - is set to 64 KiB. - - The rest of the arguments are passed directly to - :meth:`loop.create_server`. - - .. coroutinemethod:: start_serving() - - Binds to the given host and port to start the server. - - .. coroutinemethod:: serve_forever() - - Start accepting connections until the coroutine is cancelled. - Cancellation of ``serve_forever`` task causes the server - to be closed. - - This method can be called if the server is already accepting - connections. Only one ``serve_forever`` task can exist per - one *Server* object. - - .. method:: is_serving() - - Returns ``True`` if the server is bound and currently serving. - - .. method:: bind() - - Bind the server to the given *host* and *port*. This method is - automatically called during ``__aenter__`` when :class:`StreamServer` is - used as an async context manager. - - .. method:: is_bound() - - Return ``True`` if the server is bound. - - .. coroutinemethod:: abort() - - Closes the connection and cancels all pending tasks. - - .. coroutinemethod:: close() - - Closes the connection. This method is automatically called during - ``__aexit__`` when :class:`StreamServer` is used as an async context - manager. - - .. attribute:: sockets - - Returns a tuple of socket objects the server is bound to. - - .. versionadded:: 3.8 - - -UnixStreamServer -================ - -.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \ - limit=2**16, sock=None, backlog=100, \ - ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60) - - The *client_connected_cb* callback is called whenever a new client - connection is established. It receives a :class:`Stream` object of the - mode :attr:`StreamMode.READWRITE`. - - *client_connected_cb* can be a plain callable or a - :ref:`coroutine function <coroutine>`; if it is a coroutine function, - it will be automatically scheduled as a :class:`Task`. - - *limit* determines the buffer size limit used by the - returned :class:`Stream` instance. By default the *limit* - is set to 64 KiB. - - The rest of the arguments are passed directly to - :meth:`loop.create_unix_server`. - - .. coroutinemethod:: start_serving() - - Binds to the given host and port to start the server. - - .. method:: is_serving() - - Returns ``True`` if the server is bound and currently serving. - - .. method:: bind() - - Bind the server to the given *host* and *port*. This method is - automatically called during ``__aenter__`` when :class:`UnixStreamServer` is - used as an async context manager. - - .. method:: is_bound() - - Return ``True`` if the server is bound. - - .. coroutinemethod:: abort() - - Closes the connection and cancels all pending tasks. - - .. coroutinemethod:: close() - - Closes the connection. This method is automatically called during - ``__aexit__`` when :class:`UnixStreamServer` is used as an async context - manager. - - .. attribute:: sockets - - Returns a tuple of socket objects the server is bound to. - - .. availability:: Unix. - - .. versionadded:: 3.8 - -Stream -====== - -.. class:: Stream - - Represents a Stream object that provides APIs to read and write data - to the IO stream . It includes the API provided by :class:`StreamReader` - and :class:`StreamWriter`. It can also be used as :term:`asynchronous iterator` - where :meth:`readline` is used. It raises :exc:`StopAsyncIteration` when - :meth:`readline` returns empty data. - - Do not instantiate *Stream* objects directly; use API like :func:`connect` - and :class:`StreamServer` instead. - - .. versionadded:: 3.8 - - .. attribute:: mode - - Returns the mode of the stream which is a :class:`StreamMode` value. It could - be one of the below: - - * :attr:`StreamMode.READ` - Connection can receive data. - * :attr:`StreamMode.WRITE` - Connection can send data. - * :attr:`StreamMode.READWRITE` - Connection can send and receive data. - - .. coroutinemethod:: abort() - - Aborts the connection immediately, without waiting for the send buffer to drain. - - .. method:: at_eof() - - Return ``True`` if the buffer is empty. - - .. method:: can_write_eof() - - Return *True* if the underlying transport supports - the :meth:`write_eof` method, *False* otherwise. - - .. method:: close() - - The method closes the stream and the underlying socket. - - It is possible to directly await on the `close()` method:: - - await stream.close() - - The ``await`` pauses the current coroutine until the stream and the underlying - socket are closed (and SSL shutdown is performed for a secure connection). - - .. coroutinemethod:: drain() - - Wait until it is appropriate to resume writing to the stream. - Example:: - - stream.write(data) - await stream.drain() - - This is a flow control method that interacts with the underlying - IO write buffer. When the size of the buffer reaches - the high watermark, *drain()* blocks until the size of the - buffer is drained down to the low watermark and writing can - be resumed. When there is nothing to wait for, the :meth:`drain` - returns immediately. - - .. deprecated:: 3.8 - - It is recommended to directly await on the `write()` method instead:: - - await stream.write(data) - - .. method:: get_extra_info(name, default=None) - - Access optional transport information; see - :meth:`BaseTransport.get_extra_info` for details. - - .. method:: is_closing() - - Return ``True`` if the stream is closed or in the process of - being closed. - - .. coroutinemethod:: read(n=-1) - - Read up to *n* bytes. If *n* is not provided, or set to ``-1``, - read until EOF and return all read bytes. - - If EOF was received and the internal buffer is empty, - return an empty ``bytes`` object. - - .. coroutinemethod:: readexactly(n) - - Read exactly *n* bytes. - - Raise an :exc:`IncompleteReadError` if EOF is reached before *n* - can be read. Use the :attr:`IncompleteReadError.partial` - attribute to get the partially read data. - - .. coroutinemethod:: readline() - - Read one line, where "line" is a sequence of bytes - ending with ``\n``. - - If EOF is received and ``\n`` was not found, the method - returns partially read data. - - If EOF is received and the internal buffer is empty, - return an empty ``bytes`` object. - - .. coroutinemethod:: readuntil(separator=b'\\n') - - Read data from the stream until *separator* is found. - - On success, the data and separator will be removed from the - internal buffer (consumed). Returned data will include the - separator at the end. - - If the amount of data read exceeds the configured stream limit, a - :exc:`LimitOverrunError` exception is raised, and the data - is left in the internal buffer and can be read again. - - If EOF is reached before the complete separator is found, - an :exc:`IncompleteReadError` exception is raised, and the internal - buffer is reset. The :attr:`IncompleteReadError.partial` attribute - may contain a portion of the separator. - - .. coroutinemethod:: sendfile(file, offset=0, count=None, *, fallback=True) - - Sends a *file* over the stream using an optimized syscall if available. - - For other parameters meaning please see :meth:`AbstractEventloop.sendfile`. - - .. coroutinemethod:: start_tls(sslcontext, *, server_hostname=None, \ - ssl_handshake_timeout=None) - - Upgrades the existing transport-based connection to TLS. - - For other parameters meaning please see :meth:`AbstractEventloop.start_tls`. - - .. coroutinemethod:: wait_closed() - - Wait until the stream is closed. - - Should be called after :meth:`close` to wait until the underlying - connection is closed. - - .. coroutinemethod:: write(data) - - Write *data* to the underlying socket; wait until the data is sent, e.g.:: - - await stream.write(data) - - .. method:: write(data) - - The method attempts to write the *data* to the underlying socket immediately. - If that fails, the data is queued in an internal write buffer until it can be - sent. :meth:`drain` can be used to flush the underlying buffer once writing is - available:: - - stream.write(data) - await stream.drain() - - .. deprecated:: 3.8 - - It is recommended to directly await on the `write()` method instead:: - - await stream.write(data) - - .. method:: writelines(data) - - The method writes a list (or any iterable) of bytes to the underlying socket - immediately. - If that fails, the data is queued in an internal write buffer until it can be - sent. - - It is possible to directly await on the `writelines()` method:: - - await stream.writelines(lines) - - The ``await`` pauses the current coroutine until the data is written to the - socket. - - .. method:: write_eof() - - Close the write end of the stream after the buffered write - data is flushed. - - -StreamMode -========== - -.. class:: StreamMode - - A subclass of :class:`enum.Flag` that defines a set of values that can be - used to determine the ``mode`` of :class:`Stream` objects. - - .. data:: READ - - The stream object is readable and provides the API of :class:`StreamReader`. - - .. data:: WRITE - - The stream object is writeable and provides the API of :class:`StreamWriter`. - - .. data:: READWRITE - - The stream object is readable and writeable and provides the API of both - :class:`StreamReader` and :class:`StreamWriter`. - - .. versionadded:: 3.8 - StreamReader ============ @@ -629,7 +209,8 @@ StreamReader .. method:: at_eof() - Return ``True`` if the buffer is empty. + Return ``True`` if the buffer is empty and :meth:`feed_eof` + was called. StreamWriter @@ -650,22 +231,11 @@ StreamWriter If that fails, the data is queued in an internal write buffer until it can be sent. - Starting with Python 3.8, it is possible to directly await on the `write()` - method:: - - await stream.write(data) - - The ``await`` pauses the current coroutine until the data is written to the - socket. - - Below is an equivalent code that works with Python <= 3.7:: + The method should be used along with the ``drain()`` method:: stream.write(data) await stream.drain() - .. versionchanged:: 3.8 - Support ``await stream.write(...)`` syntax. - .. method:: writelines(data) The method writes a list (or any iterable) of bytes to the underlying socket @@ -673,42 +243,20 @@ StreamWriter If that fails, the data is queued in an internal write buffer until it can be sent. - Starting with Python 3.8, it is possible to directly await on the `writelines()` - method:: - - await stream.writelines(lines) - - The ``await`` pauses the current coroutine until the data is written to the - socket. - - Below is an equivalent code that works with Python <= 3.7:: + The method should be used along with the ``drain()`` method:: stream.writelines(lines) await stream.drain() - .. versionchanged:: 3.8 - Support ``await stream.writelines()`` syntax. - .. method:: close() The method closes the stream and the underlying socket. - Starting with Python 3.8, it is possible to directly await on the `close()` - method:: - - await stream.close() - - The ``await`` pauses the current coroutine until the stream and the underlying - socket are closed (and SSL shutdown is performed for a secure connection). - - Below is an equivalent code that works with Python <= 3.7:: + The method should be used along with the ``wait_closed()`` method:: stream.close() await stream.wait_closed() - .. versionchanged:: 3.8 - Support ``await stream.close()`` syntax. - .. method:: can_write_eof() Return *True* if the underlying transport supports @@ -768,17 +316,22 @@ Examples TCP echo client using streams ----------------------------- -TCP echo client using the :func:`asyncio.connect` function:: +TCP echo client using the :func:`asyncio.open_connection` function:: import asyncio async def tcp_echo_client(message): - async with asyncio.connect('127.0.0.1', 8888) as stream: - print(f'Send: {message!r}') - await stream.write(message.encode()) + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) + + print(f'Send: {message!r}') + writer.write(message.encode()) + + data = await reader.read(100) + print(f'Received: {data.decode()!r}') - data = await stream.read(100) - print(f'Received: {data.decode()!r}') + print('Close the connection') + writer.close() asyncio.run(tcp_echo_client('Hello World!')) @@ -794,28 +347,32 @@ TCP echo client using the :func:`asyncio.connect` function:: TCP echo server using streams ----------------------------- -TCP echo server using the :class:`asyncio.StreamServer` class:: +TCP echo server using the :func:`asyncio.start_server` function:: import asyncio - async def handle_echo(stream): - data = await stream.read(100) + async def handle_echo(reader, writer): + data = await reader.read(100) message = data.decode() - addr = stream.get_extra_info('peername') + addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") - await stream.write(data) + writer.write(data) + await writer.drain() print("Close the connection") - await stream.close() + writer.close() async def main(): - async with asyncio.StreamServer( - handle_echo, '127.0.0.1', 8888) as server: - addr = server.sockets[0].getsockname() - print(f'Serving on {addr}') + server = await asyncio.start_server( + handle_echo, '127.0.0.1', 8888) + + addr = server.sockets[0].getsockname() + print(f'Serving on {addr}') + + async with server: await server.serve_forever() asyncio.run(main()) @@ -839,9 +396,11 @@ Simple example querying HTTP headers of the URL passed on the command line:: async def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == 'https': - stream = await asyncio.connect(url.hostname, 443, ssl=True) + reader, writer = await asyncio.open_connection( + url.hostname, 443, ssl=True) else: - stream = await asyncio.connect(url.hostname, 80) + reader, writer = await asyncio.open_connection( + url.hostname, 80) query = ( f"HEAD {url.path or '/'} HTTP/1.0\r\n" @@ -849,14 +408,18 @@ Simple example querying HTTP headers of the URL passed on the command line:: f"\r\n" ) - stream.write(query.encode('latin-1')) - while (line := await stream.readline()): + writer.write(query.encode('latin-1')) + while True: + line = await reader.readline() + if not line: + break + line = line.decode('latin1').rstrip() if line: print(f'HTTP header> {line}') # Ignore the body, close the socket - await stream.close() + writer.close() url = sys.argv[1] asyncio.run(print_http_headers(url)) @@ -877,7 +440,7 @@ Register an open socket to wait for data using streams ------------------------------------------------------ Coroutine waiting until a socket receives data using the -:func:`asyncio.connect` function:: +:func:`open_connection` function:: import asyncio import socket @@ -891,15 +454,17 @@ Coroutine waiting until a socket receives data using the rsock, wsock = socket.socketpair() # Register the open socket to wait for data. - async with asyncio.connect(sock=rsock) as stream: - # Simulate the reception of data from the network - loop.call_soon(wsock.send, 'abc'.encode()) + reader, writer = await asyncio.open_connection(sock=rsock) + + # Simulate the reception of data from the network + loop.call_soon(wsock.send, 'abc'.encode()) - # Wait for data - data = await stream.read(100) + # Wait for data + data = await reader.read(100) - # Got data, we are done: close the socket - print("Received:", data.decode()) + # Got data, we are done: close the socket + print("Received:", data.decode()) + writer.close() # Close the second socket wsock.close() |