summaryrefslogtreecommitdiffstats
path: root/Doc/library/asyncio-stream.rst
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2018-09-11 16:54:40 (GMT)
committerGitHub <noreply@github.com>2018-09-11 16:54:40 (GMT)
commit7c7605ff1133cf757cac428c483827f666c7c827 (patch)
treef2ec281f9302eb4b493c34624577224c38c83949 /Doc/library/asyncio-stream.rst
parent735171e33486131d93865cf851c0c3d63fffd364 (diff)
downloadcpython-7c7605ff1133cf757cac428c483827f666c7c827.zip
cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.gz
cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.bz2
bpo-33649: First asyncio docs improvement pass (GH-9142)
Rewritten/updated sections: * Event Loop APIs * Transports & Protocols * Streams * Exceptions * Policies * Queues * Subprocesses * Platforms
Diffstat (limited to 'Doc/library/asyncio-stream.rst')
-rw-r--r--Doc/library/asyncio-stream.rst294
1 files changed, 142 insertions, 152 deletions
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index f662e72..3fe7ac7 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -2,83 +2,107 @@
.. _asyncio-streams:
-+++++++++++++++++++++++++++++
-Streams (coroutine based API)
-+++++++++++++++++++++++++++++
+=======
+Streams
+=======
-**Source code:** :source:`Lib/asyncio/streams.py`
+Streams are high-level async/await-ready primitives to work with
+network connections. Streams allow send and receive data without
+using callbacks or low-level protocols and transports.
-Stream functions
-================
+Here's an example of a TCP echo client written using asyncio
+streams::
-.. note::
+ import asyncio
+
+ async def tcp_echo_client(message):
+ reader, writer = await asyncio.open_connection(
+ '127.0.0.1', 8888)
+
+ print(f'Send: {message!r}')
+ writer.write(message.encode())
+
+ data = await reader.read(100)
+ print(f'Received: {data.decode()!r}')
+
+ print('Close the connection')
+ writer.close()
- The top-level functions in this module are meant as convenience wrappers
- only; there's really nothing special there, and if they don't do
- exactly what you want, feel free to copy their code.
+ asyncio.run(tcp_echo_client('Hello World!'))
-.. coroutinefunction:: open_connection(host=None, port=None, \*, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)
+.. rubric:: Stream Functions
- A wrapper for :meth:`~AbstractEventLoop.create_connection()` returning a (reader,
- writer) pair.
+The following top-level asyncio functions can be used to create
+and work with streams:
- The reader returned is a :class:`StreamReader` instance; the writer is
- a :class:`StreamWriter` instance.
- When specified, the *loop* argument determines which event loop to use,
- and the *limit* argument determines the buffer size limit used by the
+.. coroutinefunction:: open_connection(host=None, port=None, \*, \
+ loop=None, limit=None, ssl=None, family=0, \
+ proto=0, flags=0, sock=None, local_addr=None, \
+ server_hostname=None, ssl_handshake_timeout=None)
+
+ Establish a network connection and return a pair of
+ ``(reader, writer)``.
+
+ The returned *reader* and *writer* objects are instances of
+ :class:`StreamReader` and :class:`StreamWriter` classes.
+
+ The *loop* argument is optional and can always be determined
+ automatically when this method is awaited from a coroutine.
+
+ *limit* determines the buffer size limit used by the
returned :class:`StreamReader` instance.
The rest of the arguments are passed directly to
- :meth:`AbstractEventLoop.create_connection`.
-
- This function is a :ref:`coroutine <coroutine>`.
+ :meth:`loop.create_connection`.
.. versionadded:: 3.7
The *ssl_handshake_timeout* parameter.
-.. coroutinefunction:: start_server(client_connected_cb, host=None, port=None, \*, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
+.. coroutinefunction:: start_server(client_connected_cb, host=None, \
+ port=None, \*, loop=None, limit=None, \
+ family=socket.AF_UNSPEC, \
+ flags=socket.AI_PASSIVE, sock=None, \
+ backlog=100, ssl=None, reuse_address=None, \
+ reuse_port=None, ssl_handshake_timeout=None, \
+ start_serving=True)
- Start a socket server, with a callback for each client connected. The return
- value is the same as :meth:`~AbstractEventLoop.create_server()`.
+ Start a socket server.
The *client_connected_cb* callback is called whenever a new client
- connection is established. It receives a reader/writer pair as two
- arguments, the first is a :class:`StreamReader` instance,
- and the second is a :class:`StreamWriter` instance.
+ connection is established. It receives a ``(reader, writer)`` pair
+ as two arguments, instances of the :class:`StreamReader` and
+ :class:`StreamWriter` classes.
- *client_connected_cb* accepts a plain callable or a
+ *client_connected_cb* can be a plain callable or a
:ref:`coroutine function <coroutine>`; if it is a coroutine function,
- it will be automatically converted into a :class:`Task`.
+ it will be automatically wrapped into a :class:`Task`.
- When specified, the *loop* argument determines which event loop to use,
- and the *limit* argument determines the buffer size limit used by the
- :class:`StreamReader` instance passed to *client_connected_cb*.
+ The *loop* argument is optional and can always be determined
+ automatically when this method is awaited from a coroutine.
- The rest of the arguments are passed directly to
- :meth:`~AbstractEventLoop.create_server()`.
+ *limit* determines the buffer size limit used by the
+ returned :class:`StreamReader` instance.
- This function is a :ref:`coroutine <coroutine>`.
+ The rest of the arguments are passed directly to
+ :meth:`loop.create_server`.
.. versionadded:: 3.7
The *ssl_handshake_timeout* and *start_serving* parameters.
-.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
-
- A wrapper for :meth:`~AbstractEventLoop.create_unix_connection()` returning
- a (reader, writer) pair.
+.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
+ limit=None, ssl=None, sock=None, \
+ server_hostname=None, ssl_handshake_timeout=None)
- When specified, the *loop* argument determines which event loop to use,
- and the *limit* argument determines the buffer size limit used by the
- returned :class:`StreamReader` instance.
+ Establish a UNIX socket connection and return a pair of
+ ``(reader, writer)``.
- The rest of the arguments are passed directly to
- :meth:`~AbstractEventLoop.create_unix_connection()`.
+ Similar to :func:`open_connection` but operates on UNIX sockets.
- This function is a :ref:`coroutine <coroutine>`.
+ See also the documentation of :meth:`loop.create_unix_connection`.
Availability: UNIX.
@@ -90,27 +114,16 @@ Stream functions
The *path* parameter can now be a :term:`path-like object`
-.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \*, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
-
- Start a UNIX Domain Socket server, with a callback for each client connected.
-
- The *client_connected_cb* callback is called whenever a new client
- connection is established. It receives a reader/writer pair as two
- arguments, the first is a :class:`StreamReader` instance,
- and the second is a :class:`StreamWriter` instance.
-
- *client_connected_cb* accepts a plain callable or a
- :ref:`coroutine function <coroutine>`; if it is a coroutine function,
- it will be automatically converted into a :class:`Task`.
+.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
+ \*, loop=None, limit=None, sock=None, \
+ backlog=100, ssl=None, ssl_handshake_timeout=None, \
+ start_serving=True)
- When specified, the *loop* argument determines which event loop to use,
- and the *limit* argument determines the buffer size limit used by the
- :class:`StreamReader` instance passed to *client_connected_cb*.
+ Start a UNIX socket server.
- The rest of the arguments are passed directly to
- :meth:`~AbstractEventLoop.create_unix_server()`.
+ Similar to :func:`start_server` but operates on UNIX sockets.
- This function is a :ref:`coroutine <coroutine>`.
+ See also the documentation of :meth:`loop.create_unix_server`.
Availability: UNIX.
@@ -123,6 +136,13 @@ Stream functions
The *path* parameter can now be a :term:`path-like object`.
+.. rubric:: Contents
+
+* `StreamReader`_ and `StreamWriter`_
+* `StreamReaderProtocol`_
+* `Examples`_
+
+
StreamReader
============
@@ -159,8 +179,6 @@ StreamReader
If the EOF was received and the internal buffer is empty,
return an empty ``bytes`` object.
- This method is a :ref:`coroutine <coroutine>`.
-
.. coroutinemethod:: readline()
Read one line, where "line" is a sequence of bytes ending with ``\n``.
@@ -171,8 +189,6 @@ StreamReader
If the EOF was received and the internal buffer is empty,
return an empty ``bytes`` object.
- This method is a :ref:`coroutine <coroutine>`.
-
.. coroutinemethod:: readexactly(n)
Read exactly *n* bytes. Raise an :exc:`IncompleteReadError` if the end of
@@ -180,8 +196,6 @@ StreamReader
:attr:`IncompleteReadError.partial` attribute of the exception contains
the partial read bytes.
- This method is a :ref:`coroutine <coroutine>`.
-
.. coroutinemethod:: readuntil(separator=b'\\n')
Read data from the stream until ``separator`` is found.
@@ -208,7 +222,8 @@ StreamReader
.. method:: at_eof()
- Return ``True`` if the buffer is empty and :meth:`feed_eof` was called.
+ Return ``True`` if the buffer is empty and :meth:`feed_eof`
+ was called.
StreamWriter
@@ -299,7 +314,8 @@ StreamWriter
StreamReaderProtocol
====================
-.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)
+.. class:: StreamReaderProtocol(stream_reader, client_connected_cb=None, \
+ loop=None)
Trivial helper class to adapt between :class:`Protocol` and
:class:`StreamReader`. Subclass of :class:`Protocol`.
@@ -314,36 +330,8 @@ StreamReaderProtocol
accidentally calling inappropriate methods of the protocol.)
-IncompleteReadError
-===================
-
-.. exception:: IncompleteReadError
-
- Incomplete read error, subclass of :exc:`EOFError`.
-
- .. attribute:: expected
-
- Total number of expected bytes (:class:`int`).
-
- .. attribute:: partial
-
- Read bytes string before the end of stream was reached (:class:`bytes`).
-
-
-LimitOverrunError
-=================
-
-.. exception:: LimitOverrunError
-
- Reached the buffer limit while looking for a separator.
-
- .. attribute:: consumed
-
- Total number of to be consumed bytes.
-
-
-Stream examples
-===============
+Examples
+========
.. _asyncio-tcp-echo-client-streams:
@@ -354,28 +342,26 @@ TCP echo client using the :func:`asyncio.open_connection` function::
import asyncio
- async def tcp_echo_client(message, loop):
- reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
- loop=loop)
+ async def tcp_echo_client(message):
+ reader, writer = await asyncio.open_connection(
+ '127.0.0.1', 8888)
- print('Send: %r' % message)
+ print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
- print('Received: %r' % data.decode())
+ print(f'Received: {data.decode()!r}')
- print('Close the socket')
+ print('Close the connection')
writer.close()
- message = 'Hello World!'
- loop = asyncio.get_event_loop()
- loop.run_until_complete(tcp_echo_client(message, loop))
- loop.close()
+ asyncio.run(tcp_echo_client('Hello World!'))
+
.. seealso::
The :ref:`TCP echo client protocol <asyncio-tcp-echo-client-protocol>`
- example uses the :meth:`AbstractEventLoop.create_connection` method.
+ example uses the low-level :meth:`loop.create_connection` method.
.. _asyncio-tcp-echo-server-streams:
@@ -391,35 +377,33 @@ TCP echo server using the :func:`asyncio.start_server` function::
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
- print("Received %r from %r" % (message, addr))
- print("Send: %r" % message)
+ print(f"Received {message!r} from {addr!r}")
+
+ print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
- print("Close the client socket")
+ print("Close the connection")
writer.close()
- loop = asyncio.get_event_loop()
- coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
- server = loop.run_until_complete(coro)
+ async def main():
+ server = await asyncio.start_server(
+ handle_echo, '127.0.0.1', 8888)
+
+ addr = server.sockets[0].getsockname()
+ print(f'Serving on {addr}')
- # Serve requests until Ctrl+C is pressed
- print('Serving on {}'.format(server.sockets[0].getsockname()))
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
+ async with server:
+ await server.serve_forever()
+
+ asyncio.run(main())
- # Close the server
- server.close()
- loop.run_until_complete(server.wait_closed())
- loop.close()
.. seealso::
The :ref:`TCP echo server protocol <asyncio-tcp-echo-server-protocol>`
- example uses the :meth:`AbstractEventLoop.create_server` method.
+ example uses the :meth:`loop.create_server` method.
Get HTTP headers
@@ -434,30 +418,34 @@ Simple example querying HTTP headers of the URL passed on the command line::
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
- connect = asyncio.open_connection(url.hostname, 443, ssl=True)
+ reader, writer = await asyncio.open_connection(
+ url.hostname, 443, ssl=True)
else:
- connect = asyncio.open_connection(url.hostname, 80)
- reader, writer = await connect
- query = ('HEAD {path} HTTP/1.0\r\n'
- 'Host: {hostname}\r\n'
- '\r\n').format(path=url.path or '/', hostname=url.hostname)
+ reader, writer = await asyncio.open_connection(
+ url.hostname, 80)
+
+ query = (
+ f"HEAD {url.path or '/'} HTTP/1.0\r\n"
+ f"Host: {url.hostname}\r\n"
+ f"\r\n"
+ )
+
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
+
line = line.decode('latin1').rstrip()
if line:
- print('HTTP header> %s' % line)
+ print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
- loop = asyncio.get_event_loop()
- task = asyncio.ensure_future(print_http_headers(url))
- loop.run_until_complete(task)
- loop.close()
+ asyncio.run(print_http_headers(url))
+
Usage::
@@ -467,6 +455,7 @@ or with HTTPS::
python example.py https://example.com/path/page.html
+
.. _asyncio-register-socket-streams:
Register an open socket to wait for data using streams
@@ -476,14 +465,18 @@ Coroutine waiting until a socket receives data using the
:func:`open_connection` function::
import asyncio
- from socket import socketpair
+ import socket
+
+ async def wait_for_data():
+ # Get a reference to the current event loop because
+ # we want to access low-level APIs.
+ loop = asyncio.get_running_loop()
- async def wait_for_data(loop):
- # Create a pair of connected sockets
- rsock, wsock = socketpair()
+ # Create a pair of connected sockets.
+ rsock, wsock = socket.socketpair()
- # Register the open socket to wait for data
- reader, writer = await asyncio.open_connection(sock=rsock, loop=loop)
+ # Register the open socket to wait for data.
+ reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
@@ -498,17 +491,14 @@ Coroutine waiting until a socket receives data using the
# Close the second socket
wsock.close()
- loop = asyncio.get_event_loop()
- loop.run_until_complete(wait_for_data(loop))
- loop.close()
+ asyncio.run(wait_for_data())
.. seealso::
The :ref:`register an open socket to wait for data using a protocol
- <asyncio-register-socket>` example uses a low-level protocol created by the
- :meth:`AbstractEventLoop.create_connection` method.
+ <asyncio-register-socket>` example uses a low-level protocol and
+ the :meth:`loop.create_connection` method.
The :ref:`watch a file descriptor for read events
<asyncio-watch-read-event>` example uses the low-level
- :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a
- socket.
+ :meth:`loop.add_reader` method to watch a file descriptor.