summaryrefslogtreecommitdiffstats
path: root/Doc/library/asyncio-protocol.rst
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2018-09-11 16:54:40 (GMT)
committerGitHub <noreply@github.com>2018-09-11 16:54:40 (GMT)
commit7c7605ff1133cf757cac428c483827f666c7c827 (patch)
treef2ec281f9302eb4b493c34624577224c38c83949 /Doc/library/asyncio-protocol.rst
parent735171e33486131d93865cf851c0c3d63fffd364 (diff)
downloadcpython-7c7605ff1133cf757cac428c483827f666c7c827.zip
cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.gz
cpython-7c7605ff1133cf757cac428c483827f666c7c827.tar.bz2
bpo-33649: First asyncio docs improvement pass (GH-9142)
Rewritten/updated sections: * Event Loop APIs * Transports & Protocols * Streams * Exceptions * Policies * Queues * Subprocesses * Platforms
Diffstat (limited to 'Doc/library/asyncio-protocol.rst')
-rw-r--r--Doc/library/asyncio-protocol.rst1088
1 files changed, 646 insertions, 442 deletions
diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst
index 9a08a4a..348ced5 100644
--- a/Doc/library/asyncio-protocol.rst
+++ b/Doc/library/asyncio-protocol.rst
@@ -1,12 +1,65 @@
.. currentmodule:: asyncio
-+++++++++++++++++++++++++++++++++++++++++++++
-Transports and protocols (callback based API)
-+++++++++++++++++++++++++++++++++++++++++++++
-**Source code:** :source:`Lib/asyncio/transports.py`
+========================
+Transports and Protocols
+========================
+
+.. rubric:: Preface
+
+Transports and Protocols are used by **low-level** event loop
+APIs such as :meth:`loop.create_connection`. They require using
+callback-based programming style and enable high-performance
+implementations of network or IPC protocols (e.g. HTTP).
+
+Essentially, transports and protocols should only be used in
+libraries and frameworks and never in high-level asyncio
+applications.
+
+This documentation page covers both `Transports`_ and `Protocols`_.
+
+.. rubric:: Introduction
+
+At the highest level, the transport is concerned with *how* bytes
+are transmitted, while the protocol determines *which* bytes to
+transmit (and to some extent when).
+
+A different way of saying the same thing: a transport is an
+abstraction for a socket (or similar I/O endpoint) while a protocol
+is an abstraction for an application, from the transport's point
+of view.
+
+Yet another view is simply that the transport and protocol interfaces
+together define an abstract interface for using network I/O and
+interprocess I/O.
+
+There is always a 1:1 relationship between transport and protocol
+objects: the protocol calls transport methods to send data,
+while the transport calls protocol methods to pass it data that
+has been received.
+
+Most of connection oriented event loop methods
+(such as :meth:`loop.create_connection`) usually accept a
+*protocol_factory* argument used to create a *Protocol* object
+for an accepted connection, represented by a *Transport* object.
+Such methods usually return a tuple of ``(transport, protocol)``.
+
+.. rubric:: Contents
+
+This documentation page contains the following sections:
+
+* The `Transports`_ section documents asyncio :class:`BaseTransport`,
+ :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`,
+ :class:`DatagramTransport`, and :class:`SubprocessTransport`
+ classes.
+
+* The `Protocols`_ section documents asyncio :class:`BaseProtocol`,
+ :class:`Protocol`, :class:`BufferedProtocol`,
+ :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes.
+
+* The `Examples`_ section showcases how to work with transports,
+ protocols, and low-level event loop APIs.
-**Source code:** :source:`Lib/asyncio/protocols.py`
.. _asyncio-transport:
@@ -14,293 +67,356 @@ Transports
==========
Transports are classes provided by :mod:`asyncio` in order to abstract
-various kinds of communication channels. You generally won't instantiate
-a transport yourself; instead, you will call an :class:`AbstractEventLoop` method
-which will create the transport and try to initiate the underlying
-communication channel, calling you back when it succeeds.
+various kinds of communication channels.
-Once the communication channel is established, a transport is always
-paired with a :ref:`protocol <asyncio-protocol>` instance. The protocol can
-then call the transport's methods for various purposes.
+Transport objects are always instantiated by an
+ref:`asyncio event loop <asyncio-event-loop>`.
-:mod:`asyncio` currently implements transports for TCP, UDP, SSL, and
-subprocess pipes. The methods available on a transport depend on
-the transport's kind.
+asyncio implements transports for TCP, UDP, SSL, and subprocess pipes.
+The methods available on a transport depend on the transport's kind.
The transport classes are :ref:`not thread safe <asyncio-multithreading>`.
-.. versionchanged:: 3.6
- The socket option ``TCP_NODELAY`` is now set by default.
-
-BaseTransport
--------------
+Transports Hierarchy
+--------------------
.. class:: BaseTransport
- Base class for transports.
+ Base class for all transports. Contains methods that all
+ asyncio transports share.
- .. method:: close()
+.. class:: WriteTransport(BaseTransport)
- Close the transport. If the transport has a buffer for outgoing
- data, buffered data will be flushed asynchronously. No more data
- will be received. After all buffered data is flushed, the
- protocol's :meth:`connection_lost` method will be called with
- :const:`None` as its argument.
+ A base transport for write-only connections.
- .. method:: is_closing()
+ Instances of the *WriteTransport* class are returned from
+ the :meth:`loop.connect_write_pipe` event loop method and
+ are also used by subprocess-related methods like
+ :meth:`loop.subprocess_exec`.
- Return ``True`` if the transport is closing or is closed.
+.. class:: ReadTransport(BaseTransport)
- .. versionadded:: 3.5.1
+ A base transport for read-only connections.
- .. method:: get_extra_info(name, default=None)
+ Instances of the *ReadTransport* class are returned from
+ the :meth:`loop.connect_read_pipe` event loop method and
+ are also used by subprocess-related methods like
+ :meth:`loop.subprocess_exec`.
- Return optional transport information. *name* is a string representing
- the piece of transport-specific information to get, *default* is the
- value to return if the information doesn't exist.
+.. class:: Transport(WriteTransport, ReadTransport)
- This method allows transport implementations to easily expose
- channel-specific information.
+ Interface representing a bidirectional transport, such as a
+ TCP connection.
- * socket:
+ The user never instantiates a transport directly; they call a
+ utility function, passing it a protocol factory and other
+ information necessary to create the transport and protocol.
- - ``'peername'``: the remote address to which the socket is connected,
- result of :meth:`socket.socket.getpeername` (``None`` on error)
- - ``'socket'``: :class:`socket.socket` instance
- - ``'sockname'``: the socket's own address,
- result of :meth:`socket.socket.getsockname`
+ Instances of the *Transport* class are returned from or used by
+ event loop methods like :meth:`loop.create_connection`,
+ :meth:`loop.create_unix_connection`,
+ :meth:`loop.create_server`, :meth:`loop.sendfile`, etc.
- * SSL socket:
- - ``'compression'``: the compression algorithm being used as a string,
- or ``None`` if the connection isn't compressed; result of
- :meth:`ssl.SSLSocket.compression`
- - ``'cipher'``: a three-value tuple containing the name of the cipher
- being used, the version of the SSL protocol that defines its use, and
- the number of secret bits being used; result of
- :meth:`ssl.SSLSocket.cipher`
- - ``'peercert'``: peer certificate; result of
- :meth:`ssl.SSLSocket.getpeercert`
- - ``'sslcontext'``: :class:`ssl.SSLContext` instance
- - ``'ssl_object'``: :class:`ssl.SSLObject` or :class:`ssl.SSLSocket`
- instance
+.. class:: DatagramTransport(BaseTransport)
- * pipe:
+ A transport for datagram (UDP) connections.
- - ``'pipe'``: pipe object
+ Instances of the *DatagramTransport* class are returned from
+ the :meth:`loop.create_datagram_endpoint` event loop method.
- * subprocess:
- - ``'subprocess'``: :class:`subprocess.Popen` instance
+.. class:: SubprocessTransport(BaseTransport)
- .. method:: set_protocol(protocol)
+ An abstraction to represent a connection between a parent and its
+ child OS process.
- Set a new protocol. Switching protocol should only be done when both
- protocols are documented to support the switch.
+ Instances of the *SubprocessTransport* class are returned from
+ event loop methods :meth:`loop.subprocess_shell` and
+ :meth:`loop.subprocess_exec`.
- .. versionadded:: 3.5.3
- .. method:: get_protocol
+Base Transport
+--------------
- Return the current protocol.
+.. method:: BaseTransport.close()
- .. versionadded:: 3.5.3
+ Close the transport.
- .. versionchanged:: 3.5.1
- ``'ssl_object'`` info was added to SSL sockets.
+ If the transport has a buffer for outgoing
+ data, buffered data will be flushed asynchronously. No more data
+ will be received. After all buffered data is flushed, the
+ protocol's :meth:`protocol.connection_lost()
+ <BaseProtocol.connection_lost>` method will be called with
+ :const:`None` as its argument.
+.. method:: BaseTransport.is_closing()
-ReadTransport
--------------
+ Return ``True`` if the transport is closing or is closed.
-.. class:: ReadTransport
+.. method:: BaseTransport.get_extra_info(name, default=None)
- Interface for read-only transports.
+ Return information about the transport or underlying resources
+ it uses.
- .. method:: is_reading()
+ *name* is a string representing the piece of transport-specific
+ information to get.
- Return ``True`` if the transport is receiving new data.
+ *default* is the value to return if the information is not
+ available, or if the transport does not support querying it
+ with the given third-party event loop implementation or on the
+ current platform.
- .. versionadded:: 3.7
+ For example, the following code attempts to get the underlying
+ socket object of the transport::
- .. method:: pause_reading()
+ sock = transport.get_extra_info('socket')
+ if sock is not None:
+ print(sock.getsockopt(...))
- Pause the receiving end of the transport. No data will be passed to
- the protocol's :meth:`data_received` method until :meth:`resume_reading`
- is called.
+ Categories of information that can be queried on some transports:
- .. versionchanged:: 3.7
- The method is idempotent, i.e. it can be called when the
- transport is already paused or closed.
+ * socket:
- .. method:: resume_reading()
+ - ``'peername'``: the remote address to which the socket is
+ connected, result of :meth:`socket.socket.getpeername`
+ (``None`` on error)
- Resume the receiving end. The protocol's :meth:`data_received` method
- will be called once again if some data is available for reading.
+ - ``'socket'``: :class:`socket.socket` instance
- .. versionchanged:: 3.7
- The method is idempotent, i.e. it can be called when the
- transport is already reading.
+ - ``'sockname'``: the socket's own address,
+ result of :meth:`socket.socket.getsockname`
+ * SSL socket:
-WriteTransport
---------------
+ - ``'compression'``: the compression algorithm being used as a
+ string, or ``None`` if the connection isn't compressed; result
+ of :meth:`ssl.SSLSocket.compression`
-.. class:: WriteTransport
+ - ``'cipher'``: a three-value tuple containing the name of the
+ cipher being used, the version of the SSL protocol that defines
+ its use, and the number of secret bits being used; result of
+ :meth:`ssl.SSLSocket.cipher`
- Interface for write-only transports.
+ - ``'peercert'``: peer certificate; result of
+ :meth:`ssl.SSLSocket.getpeercert`
- .. method:: abort()
+ - ``'sslcontext'``: :class:`ssl.SSLContext` instance
- Close the transport immediately, without waiting for pending operations
- to complete. Buffered data will be lost. No more data will be received.
- The protocol's :meth:`connection_lost` method will eventually be
- called with :const:`None` as its argument.
+ - ``'ssl_object'``: :class:`ssl.SSLObject` or
+ :class:`ssl.SSLSocket` instance
- .. method:: can_write_eof()
+ * pipe:
- Return :const:`True` if the transport supports :meth:`write_eof`,
- :const:`False` if not.
+ - ``'pipe'``: pipe object
- .. method:: get_write_buffer_size()
+ * subprocess:
- Return the current size of the output buffer used by the transport.
+ - ``'subprocess'``: :class:`subprocess.Popen` instance
- .. method:: get_write_buffer_limits()
+.. method:: BaseTransport.set_protocol(protocol)
- Get the *high*- and *low*-water limits for write flow control. Return a
- tuple ``(low, high)`` where *low* and *high* are positive number of
- bytes.
+ Set a new protocol.
- Use :meth:`set_write_buffer_limits` to set the limits.
+ Switching protocol should only be done when both
+ protocols are documented to support the switch.
- .. versionadded:: 3.4.2
+.. method:: BaseTransport.get_protocol()
- .. method:: set_write_buffer_limits(high=None, low=None)
+ Return the current protocol.
- Set the *high*- and *low*-water limits for write flow control.
- These two values (measured in number of
- bytes) control when the protocol's
- :meth:`pause_writing` and :meth:`resume_writing` methods are called.
- If specified, the low-water limit must be less than or equal to the
- high-water limit. Neither *high* nor *low* can be negative.
+Read-only Transports
+--------------------
- :meth:`pause_writing` is called when the buffer size becomes greater
- than or equal to the *high* value. If writing has been paused,
- :meth:`resume_writing` is called when the buffer size becomes less
- than or equal to the *low* value.
+.. method:: ReadTransport.is_reading()
- The defaults are implementation-specific. If only the
- high-water limit is given, the low-water limit defaults to an
- implementation-specific value less than or equal to the
- high-water limit. Setting *high* to zero forces *low* to zero as
- well, and causes :meth:`pause_writing` to be called whenever the
- buffer becomes non-empty. Setting *low* to zero causes
- :meth:`resume_writing` to be called only once the buffer is empty.
- Use of zero for either limit is generally sub-optimal as it
- reduces opportunities for doing I/O and computation
- concurrently.
+ Return ``True`` if the transport is receiving new data.
- Use :meth:`get_write_buffer_limits` to get the limits.
+ .. versionadded:: 3.7
- .. method:: write(data)
+.. method:: ReadTransport.pause_reading()
- Write some *data* bytes to the transport.
+ Pause the receiving end of the transport. No data will be passed to
+ the protocol's :meth:`protocol.data_received() <Protocol.data_received>`
+ method until :meth:`resume_reading` is called.
- This method does not block; it buffers the data and arranges for it
- to be sent out asynchronously.
+ .. versionchanged:: 3.7
+ The method is idempotent, i.e. it can be called when the
+ transport is already paused or closed.
- .. method:: writelines(list_of_data)
+.. method:: ReadTransport.resume_reading()
- Write a list (or any iterable) of data bytes to the transport.
- This is functionally equivalent to calling :meth:`write` on each
- element yielded by the iterable, but may be implemented more efficiently.
+ Resume the receiving end. The protocol's
+ :meth:`protocol.data_received() <Protocol.data_received>` method
+ will be called once again if some data is available for reading.
- .. method:: write_eof()
+ .. versionchanged:: 3.7
+ The method is idempotent, i.e. it can be called when the
+ transport is already reading.
- Close the write end of the transport after flushing buffered data.
- Data may still be received.
- This method can raise :exc:`NotImplementedError` if the transport
- (e.g. SSL) doesn't support half-closes.
+Write-only Transports
+---------------------
+.. method:: WriteTransport.abort()
-DatagramTransport
------------------
+ Close the transport immediately, without waiting for pending operations
+ to complete. Buffered data will be lost. No more data will be received.
+ The protocol's :meth:`protocol.connection_lost()
+ <BaseProtocol.connection_lost>` method will eventually be
+ called with :const:`None` as its argument.
-.. method:: DatagramTransport.sendto(data, addr=None)
+.. method:: WriteTransport.can_write_eof()
- Send the *data* bytes to the remote peer given by *addr* (a
- transport-dependent target address). If *addr* is :const:`None`, the
- data is sent to the target address given on transport creation.
+ Return :const:`True` if the transport supports
+ :meth:`~WriteTransport.write_eof`, :const:`False` if not.
+
+.. method:: WriteTransport.get_write_buffer_size()
+
+ Return the current size of the output buffer used by the transport.
+
+.. method:: WriteTransport.get_write_buffer_limits()
+
+ Get the *high*- and *low*-water limits for write flow control. Return a
+ tuple ``(low, high)`` where *low* and *high* are positive number of
+ bytes.
+
+ Use :meth:`set_write_buffer_limits` to set the limits.
+
+ .. versionadded:: 3.4.2
+
+.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None)
+
+ Set the *high*- and *low*-water limits for write flow control.
+
+ These two values (measured in number of
+ bytes) control when the protocol's
+ :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>`
+ and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>`
+ methods are called. If specified, the low-water limit must be less
+ than or equal to the high-water limit. Neither *high* nor *low*
+ can be negative.
+
+ :meth:`~BaseProtocol.pause_writing` is called when the buffer size
+ becomes greater than or equal to the *high* value. If writing has
+ been paused, :meth:`~BaseProtocol.resume_writing` is called when
+ the buffer size becomes less than or equal to the *low* value.
+
+ The defaults are implementation-specific. If only the
+ high-water limit is given, the low-water limit defaults to an
+ implementation-specific value less than or equal to the
+ high-water limit. Setting *high* to zero forces *low* to zero as
+ well, and causes :meth:`~BaseProtocol.pause_writing` to be called
+ whenever the buffer becomes non-empty. Setting *low* to zero causes
+ :meth:`~BaseProtocol.resume_writing` to be called only once the
+ buffer is empty. Use of zero for either limit is generally
+ sub-optimal as it reduces opportunities for doing I/O and
+ computation concurrently.
+
+ Use :meth:`~WriteTransport.get_write_buffer_limits`
+ to get the limits.
+
+.. method:: WriteTransport.write(data)
+
+ Write some *data* bytes to the transport.
This method does not block; it buffers the data and arranges for it
to be sent out asynchronously.
+.. method:: WriteTransport.writelines(list_of_data)
+
+ Write a list (or any iterable) of data bytes to the transport.
+ This is functionally equivalent to calling :meth:`write` on each
+ element yielded by the iterable, but may be implemented more
+ efficiently.
+
+.. method:: WriteTransport.write_eof()
+
+ Close the write end of the transport after flushing buffered data.
+ Data may still be received.
+
+ This method can raise :exc:`NotImplementedError` if the transport
+ (e.g. SSL) doesn't support half-closes.
+
+
+Datagram Transports
+-------------------
+
+.. method:: DatagramTransport.sendto(data, addr=None)
+
+ Send the *data* bytes to the remote peer given by *addr* (a
+ transport-dependent target address). If *addr* is :const:`None`,
+ the data is sent to the target address given on transport
+ creation.
+
+ This method does not block; it buffers the data and arranges
+ for it to be sent out asynchronously.
+
.. method:: DatagramTransport.abort()
- Close the transport immediately, without waiting for pending operations
- to complete. Buffered data will be lost. No more data will be received.
- The protocol's :meth:`connection_lost` method will eventually be
- called with :const:`None` as its argument.
+ Close the transport immediately, without waiting for pending
+ operations to complete. Buffered data will be lost.
+ No more data will be received. The protocol's
+ :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>`
+ method will eventually be called with :const:`None` as its argument.
-BaseSubprocessTransport
------------------------
+.. _asyncio-subprocess-transports:
-.. class:: BaseSubprocessTransport
+Subprocess Transports
+---------------------
- .. method:: get_pid()
+.. method:: SubprocessTransport.get_pid()
- Return the subprocess process id as an integer.
+ Return the subprocess process id as an integer.
- .. method:: get_pipe_transport(fd)
+.. method:: SubprocessTransport.get_pipe_transport(fd)
- Return the transport for the communication pipe corresponding to the
- integer file descriptor *fd*:
+ Return the transport for the communication pipe corresponding to the
+ integer file descriptor *fd*:
- * ``0``: readable streaming transport of the standard input (*stdin*),
- or :const:`None` if the subprocess was not created with ``stdin=PIPE``
- * ``1``: writable streaming transport of the standard output (*stdout*),
- or :const:`None` if the subprocess was not created with ``stdout=PIPE``
- * ``2``: writable streaming transport of the standard error (*stderr*),
- or :const:`None` if the subprocess was not created with ``stderr=PIPE``
- * other *fd*: :const:`None`
+ * ``0``: readable streaming transport of the standard input (*stdin*),
+ or :const:`None` if the subprocess was not created with ``stdin=PIPE``
+ * ``1``: writable streaming transport of the standard output (*stdout*),
+ or :const:`None` if the subprocess was not created with ``stdout=PIPE``
+ * ``2``: writable streaming transport of the standard error (*stderr*),
+ or :const:`None` if the subprocess was not created with ``stderr=PIPE``
+ * other *fd*: :const:`None`
- .. method:: get_returncode()
+.. method:: SubprocessTransport.get_returncode()
- Return the subprocess returncode as an integer or :const:`None`
- if it hasn't returned, similarly to the
- :attr:`subprocess.Popen.returncode` attribute.
+ Return the subprocess return code as an integer or :const:`None`
+ if it hasn't returned, similarly to the
+ :attr:`subprocess.Popen.returncode` attribute.
- .. method:: kill()
+.. method:: SubprocessTransport.kill()
- Kill the subprocess, as in :meth:`subprocess.Popen.kill`.
+ Kill the subprocess, as in :meth:`subprocess.Popen.kill`.
- On POSIX systems, the function sends SIGKILL to the subprocess.
- On Windows, this method is an alias for :meth:`terminate`.
+ On POSIX systems, the function sends SIGKILL to the subprocess.
+ On Windows, this method is an alias for :meth:`terminate`.
- .. method:: send_signal(signal)
+.. method:: SubprocessTransport.send_signal(signal)
- Send the *signal* number to the subprocess, as in
- :meth:`subprocess.Popen.send_signal`.
+ Send the *signal* number to the subprocess, as in
+ :meth:`subprocess.Popen.send_signal`.
- .. method:: terminate()
+.. method:: SubprocessTransport.terminate()
- Ask the subprocess to stop, as in :meth:`subprocess.Popen.terminate`.
- This method is an alias for the :meth:`close` method.
+ Ask the subprocess to stop, as in :meth:`subprocess.Popen.terminate`.
+ This method is an alias for the :meth:`close` method.
- On POSIX systems, this method sends SIGTERM to the subprocess.
- On Windows, the Windows API function TerminateProcess() is called to
- stop the subprocess.
+ On POSIX systems, this method sends SIGTERM to the subprocess.
+ On Windows, the Windows API function TerminateProcess() is called to
+ stop the subprocess.
- .. method:: close()
+.. method:: SubprocessTransport.close()
- Ask the subprocess to stop by calling the :meth:`terminate` method if the
- subprocess hasn't returned yet, and close transports of all pipes
- (*stdin*, *stdout* and *stderr*).
+ Ask the subprocess to stop by calling the :meth:`terminate` method
+ if the subprocess hasn't returned yet, and close transports of all
+ pipes (*stdin*, *stdout* and *stderr*).
.. _asyncio-protocol:
@@ -308,65 +424,62 @@ BaseSubprocessTransport
Protocols
=========
-:mod:`asyncio` provides base classes that you can subclass to implement
-your network protocols. Those classes are used in conjunction with
-:ref:`transports <asyncio-transport>` (see below): the protocol parses incoming
-data and asks for the writing of outgoing data, while the transport is
-responsible for the actual I/O and buffering.
+asyncio provides a set of abstract base classes that should be used
+to implement network protocols. Those classes are meant to be used
+together with :ref:`transports <asyncio-transport>`.
-When subclassing a protocol class, it is recommended you override certain
-methods. Those methods are callbacks: they will be called by the transport
-on certain events (for example when some data is received); you shouldn't
-call them yourself, unless you are implementing a transport.
+Subclasses of abstract base protocol classes can implement some or
+all methods. All those methods are callbacks: they are called by
+transports on certain events, for example when some data is received.
+Base protocol methods are not supposed to be called by anything but
+the corresponding transport.
-.. note::
- All callbacks have default implementations, which are empty. Therefore,
- you only need to implement the callbacks for the events in which you
- are interested.
+Base Protocols
+--------------
+
+.. class:: BaseProtocol
-Protocol classes
-----------------
+ Base protocol with methods that all protocols share.
-.. class:: Protocol
+.. class:: Protocol(BaseProtocol)
- The base class for implementing streaming protocols (for use with
- e.g. TCP and SSL transports).
+ The base class for implementing streaming protocols
+ (TCP, Unix sockets, etc).
-.. class:: BufferedProtocol
+.. class:: BufferedProtocol(BaseProtocol)
A base class for implementing streaming protocols with manual
control of the receive buffer.
- .. versionadded:: 3.7
- **Important:** this has been added to asyncio in Python 3.7
- *on a provisional basis*! Treat it as an experimental API that
- might be changed or removed in Python 3.8.
-
-.. class:: DatagramProtocol
+.. class:: DatagramProtocol(BaseProtocol)
- The base class for implementing datagram protocols (for use with
- e.g. UDP transports).
+ The base class for implementing datagram (UDP) protocols.
-.. class:: SubprocessProtocol
+.. class:: SubprocessProtocol(BaseProtocol)
The base class for implementing protocols communicating with child
- processes (through a set of unidirectional pipes).
+ processes (unidirectional pipes).
-Connection callbacks
---------------------
+Base Protocol
+-------------
+
+All asyncio protocols can implement Base Protocol callbacks.
-These callbacks may be called on :class:`Protocol`, :class:`DatagramProtocol`
-and :class:`SubprocessProtocol` instances:
+.. rubric:: Connection Callbacks
+
+Connection callbacks are called on all protocols, exactly once per
+a successful connection. All other protocol callbacks can only be
+called between those two methods.
.. method:: BaseProtocol.connection_made(transport)
Called when a connection is made.
The *transport* argument is the transport representing the
- connection. You are responsible for storing it somewhere
- (e.g. as an attribute) if you need to.
+ connection. The protocol is responsible for storing the reference
+ to its transport.
.. method:: BaseProtocol.connection_lost(exc)
@@ -376,65 +489,77 @@ and :class:`SubprocessProtocol` instances:
The latter means a regular EOF is received, or the connection was
aborted or closed by this side of the connection.
-:meth:`~BaseProtocol.connection_made` and :meth:`~BaseProtocol.connection_lost`
-are called exactly once per successful connection. All other callbacks will be
-called between those two methods, which allows for easier resource management
-in your protocol implementation.
-The following callbacks may be called only on :class:`SubprocessProtocol`
-instances:
+.. rubric:: Flow Control Callbacks
-.. method:: SubprocessProtocol.pipe_data_received(fd, data)
+Flow control callbacks can be called by transports to pause or
+resume writing performed by the protocol.
- Called when the child process writes data into its stdout or stderr pipe.
- *fd* is the integer file descriptor of the pipe. *data* is a non-empty
- bytes object containing the data.
+See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits`
+method for more details.
-.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc)
+.. method:: BaseProtocol.pause_writing()
- Called when one of the pipes communicating with the child process
- is closed. *fd* is the integer file descriptor that was closed.
+ Called when the transport's buffer goes over the high-water mark.
-.. method:: SubprocessProtocol.process_exited()
+.. method:: BaseProtocol.resume_writing()
- Called when the child process has exited.
+ Called when the transport's buffer drains below the low-water mark.
+If the buffer size equals the high-water mark,
+:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must
+go strictly over.
-Streaming protocols
+Conversely, :meth:`~BaseProtocol.resume_writing` is called when the
+buffer size is equal or lower than the low-water mark. These end
+conditions are important to ensure that things go as expected when
+either mark is zero.
+
+
+Streaming Protocols
-------------------
-The following callbacks are called on :class:`Protocol` instances:
+Event methods, such as :meth:`loop.create_server`,
+:meth:`loop.create_unix_server`, :meth:`loop.create_connection`,
+:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`,
+:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe`
+accept factories that return streaming protocols.
.. method:: Protocol.data_received(data)
- Called when some data is received. *data* is a non-empty bytes object
- containing the incoming data.
+ Called when some data is received. *data* is a non-empty bytes
+ object containing the incoming data.
+
+ Whether the data is buffered, chunked or reassembled depends on
+ the transport. In general, you shouldn't rely on specific semantics
+ and instead make your parsing generic and flexible enough. However,
+ data is always received in the correct order.
+
+ The method can be called an arbitrary number of times during
+ a connection.
- .. note::
- Whether the data is buffered, chunked or reassembled depends on
- the transport. In general, you shouldn't rely on specific semantics
- and instead make your parsing generic and flexible enough. However,
- data is always received in the correct order.
+ However, :meth:`protocol.eof_received() <Protocol.eof_received>`
+ is called at most once and, if called,
+ :meth:`protocol.data_received() <Protocol.data_received>`
+ won't be called after it.
.. method:: Protocol.eof_received()
Called when the other end signals it won't send any more data
- (for example by calling :meth:`write_eof`, if the other end also uses
+ (for example by calling :meth:`transport.write_eof()
+ <WriteTransport.write_eof>`, if the other end also uses
asyncio).
This method may return a false value (including ``None``), in which case
the transport will close itself. Conversely, if this method returns a
true value, closing the transport is up to the protocol. Since the
- default implementation returns ``None``, it implicitly closes the connection.
+ default implementation returns ``None``, it implicitly closes the
+ connection.
- .. note::
- Some transports such as SSL don't support half-closed connections,
- in which case returning true from this method will not prevent closing
- the connection.
+ Some transports such as SSL don't support half-closed connections,
+ in which case returning true from this method will not prevent closing
+ the connection.
-:meth:`data_received` can be called an arbitrary number of times during
-a connection. However, :meth:`eof_received` is called at most once
-and, if called, :meth:`data_received` won't be called after it.
State machine:
@@ -446,20 +571,18 @@ State machine:
-> connection_lost -> end
-Streaming protocols with manual receive buffer control
-------------------------------------------------------
+Buffered Streaming Protocols
+----------------------------
.. versionadded:: 3.7
- **Important:** :class:`BufferedProtocol` has been added to
- asyncio in Python 3.7 *on a provisional basis*! Consider it as an
- experimental API that might be changed or removed in Python 3.8.
+ **Important:** this has been added to asyncio in Python 3.7
+ *on a provisional basis*! This is as an experimental API that
+ might be changed or removed completely in Python 3.8.
+Buffered Protocols can be used with any event loop method
+that supports `Streaming Protocols`_.
-Event methods, such as :meth:`AbstractEventLoop.create_server` and
-:meth:`AbstractEventLoop.create_connection`, accept factories that
-return protocols that implement this interface.
-
-The idea of BufferedProtocol is that it allows to manually allocate
+The idea of ``BufferedProtocol`` is that it allows to manually allocate
and control the receive buffer. Event loops can then use the buffer
provided by the protocol to avoid unnecessary data copies. This
can result in noticeable performance improvement for protocols that
@@ -489,13 +612,15 @@ instances:
.. method:: BufferedProtocol.eof_received()
- See the documentation of the :meth:`Protocol.eof_received` method.
+ See the documentation of the :meth:`protocol.eof_received()
+ <Protocol.eof_received>` method.
-:meth:`get_buffer` can be called an arbitrary number of times during
-a connection. However, :meth:`eof_received` is called at most once
-and, if called, :meth:`get_buffer` and :meth:`buffer_updated`
-won't be called after it.
+:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number
+of times during a connection. However, :meth:`protocol.eof_received()
+<Protocol.eof_received>` is called at most once
+and, if called, :meth:`~BufferedProtocol.get_buffer` and
+:meth:`~BufferedProtocol.buffer_updated` won't be called after it.
State machine:
@@ -509,10 +634,11 @@ State machine:
-> connection_lost -> end
-Datagram protocols
+Datagram Protocols
------------------
-The following callbacks are called on :class:`DatagramProtocol` instances.
+Datagram Protocol instances should be constructed by protocol
+factories passed to the :meth:`loop.create_datagram_endpoint` method.
.. method:: DatagramProtocol.datagram_received(data, addr)
@@ -530,76 +656,116 @@ The following callbacks are called on :class:`DatagramProtocol` instances.
In many conditions though, undeliverable datagrams will be silently
dropped.
+.. note::
-Flow control callbacks
-----------------------
+ On BSD systems (macOS, FreeBSD, etc.) flow control is not supported
+ for datagram protocols, because send failures caused by
+ writing too many packets cannot be detected easily.
-These callbacks may be called on :class:`Protocol`,
-:class:`DatagramProtocol` and :class:`SubprocessProtocol` instances:
+ The socket always appears 'ready' and excess packets are dropped; an
+ :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may
+ or may not be raised; if it is raised, it will be reported to
+ :meth:`DatagramProtocol.error_received` but otherwise ignored.
-.. method:: BaseProtocol.pause_writing()
- Called when the transport's buffer goes over the high-water mark.
+.. _asyncio-subprocess-protocols:
-.. method:: BaseProtocol.resume_writing()
+Subprocess Protocols
+--------------------
- Called when the transport's buffer drains below the low-water mark.
+Datagram Protocol instances should be constructed by protocol
+factories passed to the :meth:`loop.subprocess_exec` and
+:meth:`loop.subprocess_shell` methods.
+.. method:: SubprocessProtocol.pipe_data_received(fd, data)
-:meth:`pause_writing` and :meth:`resume_writing` calls are paired --
-:meth:`pause_writing` is called once when the buffer goes strictly over
-the high-water mark (even if subsequent writes increases the buffer size
-even more), and eventually :meth:`resume_writing` is called once when the
-buffer size reaches the low-water mark.
+ Called when the child process writes data into its stdout or stderr
+ pipe.
-.. note::
- If the buffer size equals the high-water mark,
- :meth:`pause_writing` is not called -- it must go strictly over.
- Conversely, :meth:`resume_writing` is called when the buffer size is
- equal or lower than the low-water mark. These end conditions
- are important to ensure that things go as expected when either
- mark is zero.
+ *fd* is the integer file descriptor of the pipe.
-.. note::
- On BSD systems (OS X, FreeBSD, etc.) flow control is not supported
- for :class:`DatagramProtocol`, because send failures caused by
- writing too many packets cannot be detected easily. The socket
- always appears 'ready' and excess packets are dropped; an
- :class:`OSError` with errno set to :const:`errno.ENOBUFS` may or
- may not be raised; if it is raised, it will be reported to
- :meth:`DatagramProtocol.error_received` but otherwise ignored.
+ *data* is a non-empty bytes object containing the received data.
+
+.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc)
+
+ Called when one of the pipes communicating with the child process
+ is closed.
+
+ *fd* is the integer file descriptor that was closed.
+
+.. method:: SubprocessProtocol.process_exited()
+
+ Called when the child process has exited.
-Coroutines and protocols
-------------------------
+Examples
+========
-Coroutines can be scheduled in a protocol method using :func:`ensure_future`,
-but there is no guarantee made about the execution order. Protocols are not
-aware of coroutines created in protocol methods and so will not wait for them.
+.. _asyncio-tcp-echo-server-protocol:
+
+TCP Echo Server
+---------------
+
+TCP echo server using the :meth:`loop.create_server` method, send back
+received data and close the connection::
+
+ import asyncio
+
+
+ class EchoServerClientProtocol(asyncio.Protocol):
+ def connection_made(self, transport):
+ peername = transport.get_extra_info('peername')
+ print('Connection from {}'.format(peername))
+ self.transport = transport
+
+ def data_received(self, data):
+ message = data.decode()
+ print('Data received: {!r}'.format(message))
+
+ print('Send: {!r}'.format(message))
+ self.transport.write(data)
+
+ print('Close the client socket')
+ self.transport.close()
+
+
+ async def main():
+ # Get a reference to the event loop as we plan to use
+ # low-level APIs.
+ loop = asyncio.get_running_loop()
+
+ server = await loop.create_server(
+ lambda: EchoServerClientProtocol(),
+ '127.0.0.1', 8888)
-To have a reliable execution order,
-use :ref:`stream objects <asyncio-streams>` in a
-coroutine with ``await``. For example, the :meth:`StreamWriter.drain`
-coroutine can be used to wait until the write buffer is flushed.
+ async with server:
+ await server.serve_forever()
-Protocol examples
-=================
+ asyncio.run(main())
+
+
+.. seealso::
+
+ The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>`
+ example uses the high-level :func:`asyncio.start_server` function.
.. _asyncio-tcp-echo-client-protocol:
-TCP echo client protocol
-------------------------
+TCP Echo Client
+---------------
-TCP echo client using the :meth:`AbstractEventLoop.create_connection` method, send
+TCP echo client using the :meth:`loop.create_connection` method, send
data and wait until the connection is closed::
import asyncio
+
class EchoClientProtocol(asyncio.Protocol):
- def __init__(self, message, loop):
+ def __init__(self, message, on_con_lost, loop):
self.message = message
self.loop = loop
+ self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
@@ -610,99 +776,99 @@ data and wait until the connection is closed::
def connection_lost(self, exc):
print('The server closed the connection')
- print('Stop the event loop')
- self.loop.stop()
-
- loop = asyncio.get_event_loop()
- message = 'Hello World!'
- coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
- '127.0.0.1', 8888)
- loop.run_until_complete(coro)
- loop.run_forever()
- loop.close()
-
-The event loop is running twice. The
-:meth:`~AbstractEventLoop.run_until_complete` method is preferred in this short
-example to raise an exception if the server is not listening, instead of
-having to write a short coroutine to handle the exception and stop the
-running loop. At :meth:`~AbstractEventLoop.run_until_complete` exit, the loop is
-no longer running, so there is no need to stop the loop in case of an error.
+ self.on_con_lost.set_result(True)
+
+
+ async def main():
+ # Get a reference to the event loop as we plan to use
+ # low-level APIs.
+ loop = asyncio.get_running_loop()
+
+ on_con_lost = loop.create_future()
+ message = 'Hello World!'
+
+ transport, protocol = await loop.create_connection(
+ lambda: EchoClientProtocol(message, on_con_lost, loop),
+ '127.0.0.1', 8888)
+
+ # Wait until the protocol signals that the connection
+ # is lost and close the transport.
+ try:
+ await on_con_lost
+ finally:
+ transport.close()
+
+
+ asyncio.run(main())
+
.. seealso::
The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
- example uses the :func:`asyncio.open_connection` function.
+ example uses the high-level :func:`asyncio.open_connection` function.
-.. _asyncio-tcp-echo-server-protocol:
+.. _asyncio-udp-echo-server-protocol:
-TCP echo server protocol
-------------------------
+UDP Echo Server
+---------------
-TCP echo server using the :meth:`AbstractEventLoop.create_server` method, send back
-received data and close the connection::
+UDP echo server using the :meth:`loop.create_datagram_endpoint`
+method, send back received data::
import asyncio
- class EchoServerClientProtocol(asyncio.Protocol):
+
+ class EchoServerProtocol:
def connection_made(self, transport):
- peername = transport.get_extra_info('peername')
- print('Connection from {}'.format(peername))
self.transport = transport
- def data_received(self, data):
+ def datagram_received(self, data, addr):
message = data.decode()
- print('Data received: {!r}'.format(message))
-
- print('Send: {!r}'.format(message))
- self.transport.write(data)
+ print('Received %r from %s' % (message, addr))
+ print('Send %r to %s' % (message, addr))
+ self.transport.sendto(data, addr)
- print('Close the client socket')
- self.transport.close()
- loop = asyncio.get_event_loop()
- # Each client connection will create a new protocol instance
- coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
- server = loop.run_until_complete(coro)
+ async def main():
+ print("Starting UDP server")
- # Serve requests until Ctrl+C is pressed
- print('Serving on {}'.format(server.sockets[0].getsockname()))
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
+ # Get a reference to the event loop as we plan to use
+ # low-level APIs.
+ loop = asyncio.get_running_loop()
- # Close the server
- server.close()
- loop.run_until_complete(server.wait_closed())
- loop.close()
+ # One protocol instance will be created to serve all
+ # client requests.
+ transport, protocol = await loop.create_datagram_endpoint(
+ lambda: EchoServerProtocol(),
+ local_addr=('127.0.0.1', 9999))
-:meth:`Transport.close` can be called immediately after
-:meth:`WriteTransport.write` even if data are not sent yet on the socket: both
-methods are asynchronous. ``await`` is not needed because these transport
-methods are not coroutines.
+ try:
+ await asyncio.sleep(3600) # Serve for 1 hour.
+ finally:
+ transport.close()
-.. seealso::
- The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>`
- example uses the :func:`asyncio.start_server` function.
+ asyncio.run(main())
.. _asyncio-udp-echo-client-protocol:
-UDP echo client protocol
-------------------------
+UDP Echo Client
+---------------
-UDP echo client using the :meth:`AbstractEventLoop.create_datagram_endpoint`
+UDP echo client using the :meth:`loop.create_datagram_endpoint`
method, send data and close the transport when we received the answer::
import asyncio
+
class EchoClientProtocol:
def __init__(self, message, loop):
self.message = message
self.loop = loop
self.transport = None
+ self.on_con_lost = loop.create_future()
def connection_made(self, transport):
self.transport = transport
@@ -719,75 +885,46 @@ method, send data and close the transport when we received the answer::
print('Error received:', exc)
def connection_lost(self, exc):
- print("Socket closed, stop the event loop")
- loop = asyncio.get_event_loop()
- loop.stop()
+ print("Connection closed")
+ self.on_con_lost.set_result(True)
- loop = asyncio.get_event_loop()
- message = "Hello World!"
- connect = loop.create_datagram_endpoint(
- lambda: EchoClientProtocol(message, loop),
- remote_addr=('127.0.0.1', 9999))
- transport, protocol = loop.run_until_complete(connect)
- loop.run_forever()
- transport.close()
- loop.close()
+ async def main():
+ # Get a reference to the event loop as we plan to use
+ # low-level APIs.
+ loop = asyncio.get_running_loop()
-.. _asyncio-udp-echo-server-protocol:
-
-UDP echo server protocol
-------------------------
+ message = "Hello World!"
+ transport, protocol = await loop.create_datagram_endpoint(
+ lambda: EchoClientProtocol(message, loop),
+ remote_addr=('127.0.0.1', 9999))
-UDP echo server using the :meth:`AbstractEventLoop.create_datagram_endpoint`
-method, send back received data::
+ try:
+ await protocol.on_con_lost
+ finally:
+ transport.close()
- import asyncio
- class EchoServerProtocol:
- def connection_made(self, transport):
- self.transport = transport
-
- def datagram_received(self, data, addr):
- message = data.decode()
- print('Received %r from %s' % (message, addr))
- print('Send %r to %s' % (message, addr))
- self.transport.sendto(data, addr)
-
- loop = asyncio.get_event_loop()
- print("Starting UDP server")
- # One protocol instance will be created to serve all client requests
- listen = loop.create_datagram_endpoint(
- EchoServerProtocol, local_addr=('127.0.0.1', 9999))
- transport, protocol = loop.run_until_complete(listen)
-
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
-
- transport.close()
- loop.close()
+ asyncio.run(main())
.. _asyncio-register-socket:
-Register an open socket to wait for data using a protocol
----------------------------------------------------------
+Connecting Existing Sockets
+---------------------------
Wait until a socket receives data using the
-:meth:`AbstractEventLoop.create_connection` method with a protocol, and then close
-the event loop ::
+:meth:`loop.create_connection` method with a protocol::
import asyncio
- from socket import socketpair
+ import socket
- # Create a pair of connected sockets
- rsock, wsock = socketpair()
- loop = asyncio.get_event_loop()
class MyProtocol(asyncio.Protocol):
- transport = None
+
+ def __init__(self, loop):
+ self.transport = None
+ self.on_con_lost = loop.create_future()
def connection_made(self, transport):
self.transport = transport
@@ -795,35 +932,102 @@ the event loop ::
def data_received(self, data):
print("Received:", data.decode())
- # We are done: close the transport (it will call connection_lost())
+ # We are done: close the transport;
+ # connection_lost() will be called automatically.
self.transport.close()
def connection_lost(self, exc):
- # The socket has been closed, stop the event loop
- loop.stop()
+ # The socket has been closed
+ self.on_con_lost.set_result(True)
+
+
+ async def main():
+ # Get a reference to the event loop as we plan to use
+ # low-level APIs.
+ loop = asyncio.get_running_loop()
- # Register the socket to wait for data
- connect_coro = loop.create_connection(MyProtocol, sock=rsock)
- transport, protocol = loop.run_until_complete(connect_coro)
+ # Create a pair of connected sockets
+ rsock, wsock = socket.socketpair()
- # Simulate the reception of data from the network
- loop.call_soon(wsock.send, 'abc'.encode())
+ # Register the socket to wait for data.
+ transport, protocol = await loop.create_connection(
+ lambda: MyProtocol(loop), sock=rsock)
- # Run the event loop
- loop.run_forever()
+ # Simulate the reception of data from the network.
+ loop.call_soon(wsock.send, 'abc'.encode())
- # We are done, close sockets and the event loop
- rsock.close()
- wsock.close()
- loop.close()
+ try:
+ await protocol.on_con_lost
+ finally:
+ transport.close()
+ wsock.close()
+
+ asyncio.run(main())
.. seealso::
The :ref:`watch a file descriptor for read events
<asyncio-watch-read-event>` example uses the low-level
- :meth:`AbstractEventLoop.add_reader` method to register the file descriptor of a
- socket.
+ :meth:`loop.add_reader` method to register an FD.
The :ref:`register an open socket to wait for data using streams
<asyncio-register-socket-streams>` example uses high-level streams
created by the :func:`open_connection` function in a coroutine.
+
+.. _asyncio-subprocess-proto-example:
+
+loop.subprocess_exec() and SubprocessProtocol
+---------------------------------------------
+
+An example of a subprocess protocol using to get the output of a
+subprocess and to wait for the subprocess exit.
+
+The subprocess is created by th :meth:`loop.subprocess_exec` method::
+
+ import asyncio
+ import sys
+
+ class DateProtocol(asyncio.SubprocessProtocol):
+ def __init__(self, exit_future):
+ self.exit_future = exit_future
+ self.output = bytearray()
+
+ def pipe_data_received(self, fd, data):
+ self.output.extend(data)
+
+ def process_exited(self):
+ self.exit_future.set_result(True)
+
+ async def get_date():
+ # Get a reference to the event loop as we plan to use
+ # low-level APIs.
+ loop = asyncio.get_running_loop()
+
+ code = 'import datetime; print(datetime.datetime.now())'
+ exit_future = asyncio.Future(loop=loop)
+
+ # Create the subprocess controlled by DateProtocol;
+ # redirect the standard output into a pipe.
+ transport, protocol = await loop.subprocess_exec(
+ lambda: DateProtocol(exit_future),
+ sys.executable, '-c', code,
+ stdin=None, stderr=None)
+
+ # Wait for the subprocess exit using the process_exited()
+ # method of the protocol.
+ await exit_future
+
+ # Close the stdout pipe.
+ transport.close()
+
+ # Read the output which was collected by the
+ # pipe_data_received() method of the protocol.
+ data = bytes(protocol.output)
+ return data.decode('ascii').rstrip()
+
+ if sys.platform == "win32":
+ asyncio.set_event_loop_policy(
+ asyncio.WindowsProactorEventLoopPolicy())
+
+ date = asyncio.run(get_date())
+ print(f"Current date: {date}")