diff options
author | Yury Selivanov <yury@magic.io> | 2018-09-11 16:54:40 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-11 16:54:40 (GMT) |
commit | 7c7605ff1133cf757cac428c483827f666c7c827 (patch) | |
tree | f2ec281f9302eb4b493c34624577224c38c83949 /Doc/library/asyncio-stream.rst | |
parent | 735171e33486131d93865cf851c0c3d63fffd364 (diff) | |
download | cpython-7c7605ff1133cf757cac428c483827f666c7c827.zip cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.gz cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.bz2 |
bpo-33649: First asyncio docs improvement pass (GH-9142)
Rewritten/updated sections:
* Event Loop APIs
* Transports & Protocols
* Streams
* Exceptions
* Policies
* Queues
* Subprocesses
* Platforms
Diffstat (limited to 'Doc/library/asyncio-stream.rst')
-rw-r--r-- | Doc/library/asyncio-stream.rst | 294 |
1 files changed, 142 insertions, 152 deletions
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index f662e72..3fe7ac7 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -2,83 +2,107 @@ .. _asyncio-streams: -+++++++++++++++++++++++++++++ -Streams (coroutine based API) -+++++++++++++++++++++++++++++ +======= +Streams +======= -**Source code:** :source:`Lib/asyncio/streams.py` +Streams are high-level async/await-ready primitives to work with +network connections. Streams allow send and receive data without +using callbacks or low-level protocols and transports. -Stream functions -================ +Here's an example of a TCP echo client written using asyncio +streams:: -.. note:: + import asyncio + + async def tcp_echo_client(message): + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) + + print(f'Send: {message!r}') + writer.write(message.encode()) + + data = await reader.read(100) + print(f'Received: {data.decode()!r}') + + print('Close the connection') + writer.close() - The top-level functions in this module are meant as convenience wrappers - only; there's really nothing special there, and if they don't do - exactly what you want, feel free to copy their code. + asyncio.run(tcp_echo_client('Hello World!')) -.. coroutinefunction:: open_connection(host=None, port=None, \*, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None) +.. rubric:: Stream Functions - A wrapper for :meth:`~AbstractEventLoop.create_connection()` returning a (reader, - writer) pair. +The following top-level asyncio functions can be used to create +and work with streams: - The reader returned is a :class:`StreamReader` instance; the writer is - a :class:`StreamWriter` instance. - When specified, the *loop* argument determines which event loop to use, - and the *limit* argument determines the buffer size limit used by the +.. coroutinefunction:: open_connection(host=None, port=None, \*, \ + loop=None, limit=None, ssl=None, family=0, \ + proto=0, flags=0, sock=None, local_addr=None, \ + server_hostname=None, ssl_handshake_timeout=None) + + Establish a network connection and return a pair of + ``(reader, writer)``. + + The returned *reader* and *writer* objects are instances of + :class:`StreamReader` and :class:`StreamWriter` classes. + + The *loop* argument is optional and can always be determined + automatically when this method is awaited from a coroutine. + + *limit* determines the buffer size limit used by the returned :class:`StreamReader` instance. The rest of the arguments are passed directly to - :meth:`AbstractEventLoop.create_connection`. - - This function is a :ref:`coroutine <coroutine>`. + :meth:`loop.create_connection`. .. versionadded:: 3.7 The *ssl_handshake_timeout* parameter. -.. coroutinefunction:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True) +.. coroutinefunction:: start_server(client_connected_cb, host=None, \ + port=None, \*, loop=None, limit=None, \ + family=socket.AF_UNSPEC, \ + flags=socket.AI_PASSIVE, sock=None, \ + backlog=100, ssl=None, reuse_address=None, \ + reuse_port=None, ssl_handshake_timeout=None, \ + start_serving=True) - Start a socket server, with a callback for each client connected. The return - value is the same as :meth:`~AbstractEventLoop.create_server()`. + Start a socket server. The *client_connected_cb* callback is called whenever a new client - connection is established. It receives a reader/writer pair as two - arguments, the first is a :class:`StreamReader` instance, - and the second is a :class:`StreamWriter` instance. + connection is established. It receives a ``(reader, writer)`` pair + as two arguments, instances of the :class:`StreamReader` and + :class:`StreamWriter` classes. - *client_connected_cb* accepts a plain callable or a + *client_connected_cb* can be a plain callable or a :ref:`coroutine function <coroutine>`; if it is a coroutine function, - it will be automatically converted into a :class:`Task`. + it will be automatically wrapped into a :class:`Task`. - When specified, the *loop* argument determines which event loop to use, - and the *limit* argument determines the buffer size limit used by the - :class:`StreamReader` instance passed to *client_connected_cb*. + The *loop* argument is optional and can always be determined + automatically when this method is awaited from a coroutine. - The rest of the arguments are passed directly to - :meth:`~AbstractEventLoop.create_server()`. + *limit* determines the buffer size limit used by the + returned :class:`StreamReader` instance. - This function is a :ref:`coroutine <coroutine>`. + The rest of the arguments are passed directly to + :meth:`loop.create_server`. .. versionadded:: 3.7 The *ssl_handshake_timeout* and *start_serving* parameters. -.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None) - - A wrapper for :meth:`~AbstractEventLoop.create_unix_connection()` returning - a (reader, writer) pair. +.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \ + limit=None, ssl=None, sock=None, \ + server_hostname=None, ssl_handshake_timeout=None) - When specified, the *loop* argument determines which event loop to use, - and the *limit* argument determines the buffer size limit used by the - returned :class:`StreamReader` instance. + Establish a UNIX socket connection and return a pair of + ``(reader, writer)``. - The rest of the arguments are passed directly to - :meth:`~AbstractEventLoop.create_unix_connection()`. + Similar to :func:`open_connection` but operates on UNIX sockets. - This function is a :ref:`coroutine <coroutine>`. + See also the documentation of :meth:`loop.create_unix_connection`. Availability: UNIX. @@ -90,27 +114,16 @@ Stream functions The *path* parameter can now be a :term:`path-like object` -.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \*, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True) - - Start a UNIX Domain Socket server, with a callback for each client connected. - - The *client_connected_cb* callback is called whenever a new client - connection is established. It receives a reader/writer pair as two - arguments, the first is a :class:`StreamReader` instance, - and the second is a :class:`StreamWriter` instance. - - *client_connected_cb* accepts a plain callable or a - :ref:`coroutine function <coroutine>`; if it is a coroutine function, - it will be automatically converted into a :class:`Task`. +.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ + \*, loop=None, limit=None, sock=None, \ + backlog=100, ssl=None, ssl_handshake_timeout=None, \ + start_serving=True) - When specified, the *loop* argument determines which event loop to use, - and the *limit* argument determines the buffer size limit used by the - :class:`StreamReader` instance passed to *client_connected_cb*. + Start a UNIX socket server. - The rest of the arguments are passed directly to - :meth:`~AbstractEventLoop.create_unix_server()`. + Similar to :func:`start_server` but operates on UNIX sockets. - This function is a :ref:`coroutine <coroutine>`. + See also the documentation of :meth:`loop.create_unix_server`. Availability: UNIX. @@ -123,6 +136,13 @@ Stream functions The *path* parameter can now be a :term:`path-like object`. +.. rubric:: Contents + +* `StreamReader`_ and `StreamWriter`_ +* `StreamReaderProtocol`_ +* `Examples`_ + + StreamReader ============ @@ -159,8 +179,6 @@ StreamReader If the EOF was received and the internal buffer is empty, return an empty ``bytes`` object. - This method is a :ref:`coroutine <coroutine>`. - .. coroutinemethod:: readline() Read one line, where "line" is a sequence of bytes ending with ``\n``. @@ -171,8 +189,6 @@ StreamReader If the EOF was received and the internal buffer is empty, return an empty ``bytes`` object. - This method is a :ref:`coroutine <coroutine>`. - .. coroutinemethod:: readexactly(n) Read exactly *n* bytes. Raise an :exc:`IncompleteReadError` if the end of @@ -180,8 +196,6 @@ StreamReader :attr:`IncompleteReadError.partial` attribute of the exception contains the partial read bytes. - This method is a :ref:`coroutine <coroutine>`. - .. coroutinemethod:: readuntil(separator=b'\\n') Read data from the stream until ``separator`` is found. @@ -208,7 +222,8 @@ StreamReader .. method:: at_eof() - Return ``True`` if the buffer is empty and :meth:`feed_eof` was called. + Return ``True`` if the buffer is empty and :meth:`feed_eof` + was called. StreamWriter @@ -299,7 +314,8 @@ StreamWriter StreamReaderProtocol ==================== -.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None) +.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, \ + loop=None) Trivial helper class to adapt between :class:`Protocol` and :class:`StreamReader`. Subclass of :class:`Protocol`. @@ -314,36 +330,8 @@ StreamReaderProtocol accidentally calling inappropriate methods of the protocol.) -IncompleteReadError -=================== - -.. exception:: IncompleteReadError - - Incomplete read error, subclass of :exc:`EOFError`. - - .. attribute:: expected - - Total number of expected bytes (:class:`int`). - - .. attribute:: partial - - Read bytes string before the end of stream was reached (:class:`bytes`). - - -LimitOverrunError -================= - -.. exception:: LimitOverrunError - - Reached the buffer limit while looking for a separator. - - .. attribute:: consumed - - Total number of to be consumed bytes. - - -Stream examples -=============== +Examples +======== .. _asyncio-tcp-echo-client-streams: @@ -354,28 +342,26 @@ TCP echo client using the :func:`asyncio.open_connection` function:: import asyncio - async def tcp_echo_client(message, loop): - reader, writer = await asyncio.open_connection('127.0.0.1', 8888, - loop=loop) + async def tcp_echo_client(message): + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) - print('Send: %r' % message) + print(f'Send: {message!r}') writer.write(message.encode()) data = await reader.read(100) - print('Received: %r' % data.decode()) + print(f'Received: {data.decode()!r}') - print('Close the socket') + print('Close the connection') writer.close() - message = 'Hello World!' - loop = asyncio.get_event_loop() - loop.run_until_complete(tcp_echo_client(message, loop)) - loop.close() + asyncio.run(tcp_echo_client('Hello World!')) + .. seealso:: The :ref:`TCP echo client protocol <asyncio-tcp-echo-client-protocol>` - example uses the :meth:`AbstractEventLoop.create_connection` method. + example uses the low-level :meth:`loop.create_connection` method. .. _asyncio-tcp-echo-server-streams: @@ -391,35 +377,33 @@ TCP echo server using the :func:`asyncio.start_server` function:: data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') - print("Received %r from %r" % (message, addr)) - print("Send: %r" % message) + print(f"Received {message!r} from {addr!r}") + + print(f"Send: {message!r}") writer.write(data) await writer.drain() - print("Close the client socket") + print("Close the connection") writer.close() - loop = asyncio.get_event_loop() - coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop) - server = loop.run_until_complete(coro) + async def main(): + server = await asyncio.start_server( + handle_echo, '127.0.0.1', 8888) + + addr = server.sockets[0].getsockname() + print(f'Serving on {addr}') - # Serve requests until Ctrl+C is pressed - print('Serving on {}'.format(server.sockets[0].getsockname())) - try: - loop.run_forever() - except KeyboardInterrupt: - pass + async with server: + await server.serve_forever() + + asyncio.run(main()) - # Close the server - server.close() - loop.run_until_complete(server.wait_closed()) - loop.close() .. seealso:: The :ref:`TCP echo server protocol <asyncio-tcp-echo-server-protocol>` - example uses the :meth:`AbstractEventLoop.create_server` method. + example uses the :meth:`loop.create_server` method. Get HTTP headers @@ -434,30 +418,34 @@ Simple example querying HTTP headers of the URL passed on the command line:: async def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == 'https': - connect = asyncio.open_connection(url.hostname, 443, ssl=True) + reader, writer = await asyncio.open_connection( + url.hostname, 443, ssl=True) else: - connect = asyncio.open_connection(url.hostname, 80) - reader, writer = await connect - query = ('HEAD {path} HTTP/1.0\r\n' - 'Host: {hostname}\r\n' - '\r\n').format(path=url.path or '/', hostname=url.hostname) + reader, writer = await asyncio.open_connection( + url.hostname, 80) + + query = ( + f"HEAD {url.path or '/'} HTTP/1.0\r\n" + f"Host: {url.hostname}\r\n" + f"\r\n" + ) + writer.write(query.encode('latin-1')) while True: line = await reader.readline() if not line: break + line = line.decode('latin1').rstrip() if line: - print('HTTP header> %s' % line) + print(f'HTTP header> {line}') # Ignore the body, close the socket writer.close() url = sys.argv[1] - loop = asyncio.get_event_loop() - task = asyncio.ensure_future(print_http_headers(url)) - loop.run_until_complete(task) - loop.close() + asyncio.run(print_http_headers(url)) + Usage:: @@ -467,6 +455,7 @@ or with HTTPS:: python example.py https://example.com/path/page.html + .. _asyncio-register-socket-streams: Register an open socket to wait for data using streams @@ -476,14 +465,18 @@ Coroutine waiting until a socket receives data using the :func:`open_connection` function:: import asyncio - from socket import socketpair + import socket + + async def wait_for_data(): + # Get a reference to the current event loop because + # we want to access low-level APIs. + loop = asyncio.get_running_loop() - async def wait_for_data(loop): - # Create a pair of connected sockets - rsock, wsock = socketpair() + # Create a pair of connected sockets. + rsock, wsock = socket.socketpair() - # Register the open socket to wait for data - reader, writer = await asyncio.open_connection(sock=rsock, loop=loop) + # Register the open socket to wait for data. + reader, writer = await asyncio.open_connection(sock=rsock) # Simulate the reception of data from the network loop.call_soon(wsock.send, 'abc'.encode()) @@ -498,17 +491,14 @@ Coroutine waiting until a socket receives data using the # Close the second socket wsock.close() - loop = asyncio.get_event_loop() - loop.run_until_complete(wait_for_data(loop)) - loop.close() + asyncio.run(wait_for_data()) .. seealso:: The :ref:`register an open socket to wait for data using a protocol - <asyncio-register-socket>` example uses a low-level protocol created by the - :meth:`AbstractEventLoop.create_connection` method. + <asyncio-register-socket>` example uses a low-level protocol and + the :meth:`loop.create_connection` method. The :ref:`watch a file descriptor for read events <asyncio-watch-read-event>` example uses the low-level - :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a - socket. + :meth:`loop.add_reader` method to watch a file descriptor. |