diff options
author | Yury Selivanov <yury@edgedb.com> | 2019-09-30 04:59:55 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-30 04:59:55 (GMT) |
commit | 6758e6e12a71ef5530146161881f88df1fa43382 (patch) | |
tree | da1f89f35e54ddcfffc3706b87bb13f54907f7ea | |
parent | 3667e1ee6c90e6d3b6a745cd590ece87118f81ad (diff) | |
download | cpython-6758e6e12a71ef5530146161881f88df1fa43382.zip cpython-6758e6e12a71ef5530146161881f88df1fa43382.tar.gz cpython-6758e6e12a71ef5530146161881f88df1fa43382.tar.bz2 |
bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482)
See https://bugs.python.org/issue38242 for more details
-rw-r--r-- | Doc/library/asyncio-api-index.rst | 36 | ||||
-rw-r--r-- | Doc/library/asyncio-eventloop.rst | 3 | ||||
-rw-r--r-- | Doc/library/asyncio-protocol.rst | 4 | ||||
-rw-r--r-- | Doc/library/asyncio-stream.rst | 563 | ||||
-rw-r--r-- | Lib/asyncio/__init__.py | 38 | ||||
-rw-r--r-- | Lib/asyncio/streams.py | 1252 | ||||
-rw-r--r-- | Lib/asyncio/subprocess.py | 64 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_buffered_proto.py | 7 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_pep492.py | 8 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_streams.py | 1132 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 12 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_windows_events.py | 12 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst | 1 |
13 files changed, 379 insertions, 2753 deletions
diff --git a/Doc/library/asyncio-api-index.rst b/Doc/library/asyncio-api-index.rst index 716cf09..d5b5659 100644 --- a/Doc/library/asyncio-api-index.rst +++ b/Doc/library/asyncio-api-index.rst @@ -132,47 +132,23 @@ High-level APIs to work with network IO. :widths: 50 50 :class: full-width-table - * - ``await`` :func:`connect` - - Establish a TCP connection to send and receive data. - * - ``await`` :func:`open_connection` - - Establish a TCP connection. (Deprecated in favor of :func:`connect`) - - * - ``await`` :func:`connect_unix` - - Establish a Unix socket connection to send and receive data. + - Establish a TCP connection. * - ``await`` :func:`open_unix_connection` - - Establish a Unix socket connection. (Deprecated in favor of :func:`connect_unix`) - - * - :class:`StreamServer` - - Start a TCP server. + - Establish a Unix socket connection. * - ``await`` :func:`start_server` - - Start a TCP server. (Deprecated in favor of :class:`StreamServer`) - - * - :class:`UnixStreamServer` - - Start a Unix socket server. + - Start a TCP server. * - ``await`` :func:`start_unix_server` - - Start a Unix socket server. (Deprecated in favor of :class:`UnixStreamServer`) - - * - :func:`connect_read_pipe` - - Establish a connection to :term:`file-like object <file object>` *pipe* - to receive data. - - * - :func:`connect_write_pipe` - - Establish a connection to :term:`file-like object <file object>` *pipe* - to send data. - - * - :class:`Stream` - - Stream is a single object combining APIs of :class:`StreamReader` and - :class:`StreamWriter`. + - Start a Unix socket server. * - :class:`StreamReader` - - High-level async/await object to receive network data. (Deprecated in favor of :class:`Stream`) + - High-level async/await object to receive network data. * - :class:`StreamWriter` - - High-level async/await object to send network data. (Deprecated in favor of :class:`Stream`) + - High-level async/await object to send network data. .. rubric:: Examples diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 2fd4cf3..045787e 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -1637,7 +1637,8 @@ Wait until a file descriptor received some data using the :meth:`loop.create_connection` method. * Another similar :ref:`example <asyncio_example_create_connection-streams>` - using the high-level :func:`asyncio.connect` function and streams. + using the high-level :func:`asyncio.open_connection` function + and streams. .. _asyncio_example_unix_signals: diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst index cb0317e..67ca121 100644 --- a/Doc/library/asyncio-protocol.rst +++ b/Doc/library/asyncio-protocol.rst @@ -809,7 +809,7 @@ data, and waits until the connection is closed:: .. seealso:: The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` - example uses the high-level :func:`asyncio.connect` function. + example uses the high-level :func:`asyncio.open_connection` function. .. _asyncio-udp-echo-server-protocol: @@ -978,7 +978,7 @@ Wait until a socket receives data using the The :ref:`register an open socket to wait for data using streams <asyncio_example_create_connection-streams>` example uses high-level streams - created by the :func:`asyncio.connect` function in a coroutine. + created by the :func:`open_connection` function in a coroutine. .. _asyncio_example_subprocess_proto: diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index feebd22..471e6e9 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -18,12 +18,19 @@ streams:: import asyncio async def tcp_echo_client(message): - async with asyncio.connect('127.0.0.1', 8888) as stream: - print(f'Send: {message!r}') - await stream.write(message.encode()) + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) - data = await stream.read(100) - print(f'Received: {data.decode()!r}') + print(f'Send: {message!r}') + writer.write(message.encode()) + await writer.drain() + + data = await reader.read(100) + print(f'Received: {data.decode()!r}') + + print('Close the connection') + writer.close() + await writer.wait_closed() asyncio.run(tcp_echo_client('Hello World!')) @@ -37,31 +44,6 @@ The following top-level asyncio functions can be used to create and work with streams: -.. coroutinefunction:: connect(host=None, port=None, \*, \ - limit=2**16, ssl=None, family=0, \ - proto=0, flags=0, sock=None, local_addr=None, \ - server_hostname=None, ssl_handshake_timeout=None, \ - happy_eyeballs_delay=None, interleave=None) - - Connect to TCP socket on *host* : *port* address and return a :class:`Stream` - object of mode :attr:`StreamMode.READWRITE`. - - *limit* determines the buffer size limit used by the returned :class:`Stream` - instance. By default the *limit* is set to 64 KiB. - - The rest of the arguments are passed directly to :meth:`loop.create_connection`. - - The function can be used with ``await`` to get a connected stream:: - - stream = await asyncio.connect('127.0.0.1', 8888) - - The function can also be used as an async context manager:: - - async with asyncio.connect('127.0.0.1', 8888) as stream: - ... - - .. versionadded:: 3.8 - .. coroutinefunction:: open_connection(host=None, port=None, \*, \ loop=None, limit=None, ssl=None, family=0, \ proto=0, flags=0, sock=None, local_addr=None, \ @@ -87,12 +69,8 @@ and work with streams: The *ssl_handshake_timeout* parameter. - .. deprecated-removed:: 3.8 3.10 - - `open_connection()` is deprecated in favor of :func:`connect`. - .. coroutinefunction:: start_server(client_connected_cb, host=None, \ - port=None, \*, loop=None, limit=2**16, \ + port=None, \*, loop=None, limit=None, \ family=socket.AF_UNSPEC, \ flags=socket.AI_PASSIVE, sock=None, \ backlog=100, ssl=None, reuse_address=None, \ @@ -124,60 +102,9 @@ and work with streams: The *ssl_handshake_timeout* and *start_serving* parameters. - .. deprecated-removed:: 3.8 3.10 - - `start_server()` is deprecated if favor of :class:`StreamServer` - -.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16) - - Takes a :term:`file-like object <file object>` *pipe* to return a - :class:`Stream` object of the mode :attr:`StreamMode.READ` that has - similar API of :class:`StreamReader`. It can also be used as an async context manager. - - *limit* determines the buffer size limit used by the returned :class:`Stream` - instance. By default the limit is set to 64 KiB. - - .. versionadded:: 3.8 - -.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16) - - Takes a :term:`file-like object <file object>` *pipe* to return a - :class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has - similar API of :class:`StreamWriter`. It can also be used as an async context manager. - - *limit* determines the buffer size limit used by the returned :class:`Stream` - instance. By default the limit is set to 64 KiB. - - .. versionadded:: 3.8 .. rubric:: Unix Sockets -.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \ - sock=None, server_hostname=None, \ - ssl_handshake_timeout=None) - - Establish a Unix socket connection to socket with *path* address and - return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE` - that can be used as a reader and a writer. - - *limit* determines the buffer size limit used by the returned :class:`Stream` - instance. By default the *limit* is set to 64 KiB. - - The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`. - - The function can be used with ``await`` to get a connected stream:: - - stream = await asyncio.connect_unix('/tmp/example.sock') - - The function can also be used as an async context manager:: - - async with asyncio.connect_unix('/tmp/example.sock') as stream: - ... - - .. availability:: Unix. - - .. versionadded:: 3.8 - .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \ limit=None, ssl=None, sock=None, \ server_hostname=None, ssl_handshake_timeout=None) @@ -199,10 +126,6 @@ and work with streams: The *path* parameter can now be a :term:`path-like object` - .. deprecated-removed:: 3.8 3.10 - - ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`. - .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ \*, loop=None, limit=None, sock=None, \ @@ -225,349 +148,6 @@ and work with streams: The *path* parameter can now be a :term:`path-like object`. - .. deprecated-removed:: 3.8 3.10 - - ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`. - - ---------- - -StreamServer -============ - -.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \ - limit=2**16, family=socket.AF_UNSPEC, \ - flags=socket.AI_PASSIVE, sock=None, backlog=100, \ - ssl=None, reuse_address=None, reuse_port=None, \ - ssl_handshake_timeout=None, shutdown_timeout=60) - - The *client_connected_cb* callback is called whenever a new client - connection is established. It receives a :class:`Stream` object of the - mode :attr:`StreamMode.READWRITE`. - - *client_connected_cb* can be a plain callable or a - :ref:`coroutine function <coroutine>`; if it is a coroutine function, - it will be automatically scheduled as a :class:`Task`. - - *limit* determines the buffer size limit used by the - returned :class:`Stream` instance. By default the *limit* - is set to 64 KiB. - - The rest of the arguments are passed directly to - :meth:`loop.create_server`. - - .. coroutinemethod:: start_serving() - - Binds to the given host and port to start the server. - - .. coroutinemethod:: serve_forever() - - Start accepting connections until the coroutine is cancelled. - Cancellation of ``serve_forever`` task causes the server - to be closed. - - This method can be called if the server is already accepting - connections. Only one ``serve_forever`` task can exist per - one *Server* object. - - .. method:: is_serving() - - Returns ``True`` if the server is bound and currently serving. - - .. method:: bind() - - Bind the server to the given *host* and *port*. This method is - automatically called during ``__aenter__`` when :class:`StreamServer` is - used as an async context manager. - - .. method:: is_bound() - - Return ``True`` if the server is bound. - - .. coroutinemethod:: abort() - - Closes the connection and cancels all pending tasks. - - .. coroutinemethod:: close() - - Closes the connection. This method is automatically called during - ``__aexit__`` when :class:`StreamServer` is used as an async context - manager. - - .. attribute:: sockets - - Returns a tuple of socket objects the server is bound to. - - .. versionadded:: 3.8 - - -UnixStreamServer -================ - -.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \ - limit=2**16, sock=None, backlog=100, \ - ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60) - - The *client_connected_cb* callback is called whenever a new client - connection is established. It receives a :class:`Stream` object of the - mode :attr:`StreamMode.READWRITE`. - - *client_connected_cb* can be a plain callable or a - :ref:`coroutine function <coroutine>`; if it is a coroutine function, - it will be automatically scheduled as a :class:`Task`. - - *limit* determines the buffer size limit used by the - returned :class:`Stream` instance. By default the *limit* - is set to 64 KiB. - - The rest of the arguments are passed directly to - :meth:`loop.create_unix_server`. - - .. coroutinemethod:: start_serving() - - Binds to the given host and port to start the server. - - .. method:: is_serving() - - Returns ``True`` if the server is bound and currently serving. - - .. method:: bind() - - Bind the server to the given *host* and *port*. This method is - automatically called during ``__aenter__`` when :class:`UnixStreamServer` is - used as an async context manager. - - .. method:: is_bound() - - Return ``True`` if the server is bound. - - .. coroutinemethod:: abort() - - Closes the connection and cancels all pending tasks. - - .. coroutinemethod:: close() - - Closes the connection. This method is automatically called during - ``__aexit__`` when :class:`UnixStreamServer` is used as an async context - manager. - - .. attribute:: sockets - - Returns a tuple of socket objects the server is bound to. - - .. availability:: Unix. - - .. versionadded:: 3.8 - -Stream -====== - -.. class:: Stream - - Represents a Stream object that provides APIs to read and write data - to the IO stream . It includes the API provided by :class:`StreamReader` - and :class:`StreamWriter`. It can also be used as :term:`asynchronous iterator` - where :meth:`readline` is used. It raises :exc:`StopAsyncIteration` when - :meth:`readline` returns empty data. - - Do not instantiate *Stream* objects directly; use API like :func:`connect` - and :class:`StreamServer` instead. - - .. versionadded:: 3.8 - - .. attribute:: mode - - Returns the mode of the stream which is a :class:`StreamMode` value. It could - be one of the below: - - * :attr:`StreamMode.READ` - Connection can receive data. - * :attr:`StreamMode.WRITE` - Connection can send data. - * :attr:`StreamMode.READWRITE` - Connection can send and receive data. - - .. coroutinemethod:: abort() - - Aborts the connection immediately, without waiting for the send buffer to drain. - - .. method:: at_eof() - - Return ``True`` if the buffer is empty. - - .. method:: can_write_eof() - - Return *True* if the underlying transport supports - the :meth:`write_eof` method, *False* otherwise. - - .. method:: close() - - The method closes the stream and the underlying socket. - - It is possible to directly await on the `close()` method:: - - await stream.close() - - The ``await`` pauses the current coroutine until the stream and the underlying - socket are closed (and SSL shutdown is performed for a secure connection). - - .. coroutinemethod:: drain() - - Wait until it is appropriate to resume writing to the stream. - Example:: - - stream.write(data) - await stream.drain() - - This is a flow control method that interacts with the underlying - IO write buffer. When the size of the buffer reaches - the high watermark, *drain()* blocks until the size of the - buffer is drained down to the low watermark and writing can - be resumed. When there is nothing to wait for, the :meth:`drain` - returns immediately. - - .. deprecated:: 3.8 - - It is recommended to directly await on the `write()` method instead:: - - await stream.write(data) - - .. method:: get_extra_info(name, default=None) - - Access optional transport information; see - :meth:`BaseTransport.get_extra_info` for details. - - .. method:: is_closing() - - Return ``True`` if the stream is closed or in the process of - being closed. - - .. coroutinemethod:: read(n=-1) - - Read up to *n* bytes. If *n* is not provided, or set to ``-1``, - read until EOF and return all read bytes. - - If EOF was received and the internal buffer is empty, - return an empty ``bytes`` object. - - .. coroutinemethod:: readexactly(n) - - Read exactly *n* bytes. - - Raise an :exc:`IncompleteReadError` if EOF is reached before *n* - can be read. Use the :attr:`IncompleteReadError.partial` - attribute to get the partially read data. - - .. coroutinemethod:: readline() - - Read one line, where "line" is a sequence of bytes - ending with ``\n``. - - If EOF is received and ``\n`` was not found, the method - returns partially read data. - - If EOF is received and the internal buffer is empty, - return an empty ``bytes`` object. - - .. coroutinemethod:: readuntil(separator=b'\\n') - - Read data from the stream until *separator* is found. - - On success, the data and separator will be removed from the - internal buffer (consumed). Returned data will include the - separator at the end. - - If the amount of data read exceeds the configured stream limit, a - :exc:`LimitOverrunError` exception is raised, and the data - is left in the internal buffer and can be read again. - - If EOF is reached before the complete separator is found, - an :exc:`IncompleteReadError` exception is raised, and the internal - buffer is reset. The :attr:`IncompleteReadError.partial` attribute - may contain a portion of the separator. - - .. coroutinemethod:: sendfile(file, offset=0, count=None, *, fallback=True) - - Sends a *file* over the stream using an optimized syscall if available. - - For other parameters meaning please see :meth:`AbstractEventloop.sendfile`. - - .. coroutinemethod:: start_tls(sslcontext, *, server_hostname=None, \ - ssl_handshake_timeout=None) - - Upgrades the existing transport-based connection to TLS. - - For other parameters meaning please see :meth:`AbstractEventloop.start_tls`. - - .. coroutinemethod:: wait_closed() - - Wait until the stream is closed. - - Should be called after :meth:`close` to wait until the underlying - connection is closed. - - .. coroutinemethod:: write(data) - - Write *data* to the underlying socket; wait until the data is sent, e.g.:: - - await stream.write(data) - - .. method:: write(data) - - The method attempts to write the *data* to the underlying socket immediately. - If that fails, the data is queued in an internal write buffer until it can be - sent. :meth:`drain` can be used to flush the underlying buffer once writing is - available:: - - stream.write(data) - await stream.drain() - - .. deprecated:: 3.8 - - It is recommended to directly await on the `write()` method instead:: - - await stream.write(data) - - .. method:: writelines(data) - - The method writes a list (or any iterable) of bytes to the underlying socket - immediately. - If that fails, the data is queued in an internal write buffer until it can be - sent. - - It is possible to directly await on the `writelines()` method:: - - await stream.writelines(lines) - - The ``await`` pauses the current coroutine until the data is written to the - socket. - - .. method:: write_eof() - - Close the write end of the stream after the buffered write - data is flushed. - - -StreamMode -========== - -.. class:: StreamMode - - A subclass of :class:`enum.Flag` that defines a set of values that can be - used to determine the ``mode`` of :class:`Stream` objects. - - .. data:: READ - - The stream object is readable and provides the API of :class:`StreamReader`. - - .. data:: WRITE - - The stream object is writeable and provides the API of :class:`StreamWriter`. - - .. data:: READWRITE - - The stream object is readable and writeable and provides the API of both - :class:`StreamReader` and :class:`StreamWriter`. - - .. versionadded:: 3.8 - StreamReader ============ @@ -629,7 +209,8 @@ StreamReader .. method:: at_eof() - Return ``True`` if the buffer is empty. + Return ``True`` if the buffer is empty and :meth:`feed_eof` + was called. StreamWriter @@ -650,22 +231,11 @@ StreamWriter If that fails, the data is queued in an internal write buffer until it can be sent. - Starting with Python 3.8, it is possible to directly await on the `write()` - method:: - - await stream.write(data) - - The ``await`` pauses the current coroutine until the data is written to the - socket. - - Below is an equivalent code that works with Python <= 3.7:: + The method should be used along with the ``drain()`` method:: stream.write(data) await stream.drain() - .. versionchanged:: 3.8 - Support ``await stream.write(...)`` syntax. - .. method:: writelines(data) The method writes a list (or any iterable) of bytes to the underlying socket @@ -673,42 +243,20 @@ StreamWriter If that fails, the data is queued in an internal write buffer until it can be sent. - Starting with Python 3.8, it is possible to directly await on the `writelines()` - method:: - - await stream.writelines(lines) - - The ``await`` pauses the current coroutine until the data is written to the - socket. - - Below is an equivalent code that works with Python <= 3.7:: + The method should be used along with the ``drain()`` method:: stream.writelines(lines) await stream.drain() - .. versionchanged:: 3.8 - Support ``await stream.writelines()`` syntax. - .. method:: close() The method closes the stream and the underlying socket. - Starting with Python 3.8, it is possible to directly await on the `close()` - method:: - - await stream.close() - - The ``await`` pauses the current coroutine until the stream and the underlying - socket are closed (and SSL shutdown is performed for a secure connection). - - Below is an equivalent code that works with Python <= 3.7:: + The method should be used along with the ``wait_closed()`` method:: stream.close() await stream.wait_closed() - .. versionchanged:: 3.8 - Support ``await stream.close()`` syntax. - .. method:: can_write_eof() Return *True* if the underlying transport supports @@ -768,17 +316,22 @@ Examples TCP echo client using streams ----------------------------- -TCP echo client using the :func:`asyncio.connect` function:: +TCP echo client using the :func:`asyncio.open_connection` function:: import asyncio async def tcp_echo_client(message): - async with asyncio.connect('127.0.0.1', 8888) as stream: - print(f'Send: {message!r}') - await stream.write(message.encode()) + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) + + print(f'Send: {message!r}') + writer.write(message.encode()) + + data = await reader.read(100) + print(f'Received: {data.decode()!r}') - data = await stream.read(100) - print(f'Received: {data.decode()!r}') + print('Close the connection') + writer.close() asyncio.run(tcp_echo_client('Hello World!')) @@ -794,28 +347,32 @@ TCP echo client using the :func:`asyncio.connect` function:: TCP echo server using streams ----------------------------- -TCP echo server using the :class:`asyncio.StreamServer` class:: +TCP echo server using the :func:`asyncio.start_server` function:: import asyncio - async def handle_echo(stream): - data = await stream.read(100) + async def handle_echo(reader, writer): + data = await reader.read(100) message = data.decode() - addr = stream.get_extra_info('peername') + addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") - await stream.write(data) + writer.write(data) + await writer.drain() print("Close the connection") - await stream.close() + writer.close() async def main(): - async with asyncio.StreamServer( - handle_echo, '127.0.0.1', 8888) as server: - addr = server.sockets[0].getsockname() - print(f'Serving on {addr}') + server = await asyncio.start_server( + handle_echo, '127.0.0.1', 8888) + + addr = server.sockets[0].getsockname() + print(f'Serving on {addr}') + + async with server: await server.serve_forever() asyncio.run(main()) @@ -839,9 +396,11 @@ Simple example querying HTTP headers of the URL passed on the command line:: async def print_http_headers(url): url = urllib.parse.urlsplit(url) if url.scheme == 'https': - stream = await asyncio.connect(url.hostname, 443, ssl=True) + reader, writer = await asyncio.open_connection( + url.hostname, 443, ssl=True) else: - stream = await asyncio.connect(url.hostname, 80) + reader, writer = await asyncio.open_connection( + url.hostname, 80) query = ( f"HEAD {url.path or '/'} HTTP/1.0\r\n" @@ -849,14 +408,18 @@ Simple example querying HTTP headers of the URL passed on the command line:: f"\r\n" ) - stream.write(query.encode('latin-1')) - while (line := await stream.readline()): + writer.write(query.encode('latin-1')) + while True: + line = await reader.readline() + if not line: + break + line = line.decode('latin1').rstrip() if line: print(f'HTTP header> {line}') # Ignore the body, close the socket - await stream.close() + writer.close() url = sys.argv[1] asyncio.run(print_http_headers(url)) @@ -877,7 +440,7 @@ Register an open socket to wait for data using streams ------------------------------------------------------ Coroutine waiting until a socket receives data using the -:func:`asyncio.connect` function:: +:func:`open_connection` function:: import asyncio import socket @@ -891,15 +454,17 @@ Coroutine waiting until a socket receives data using the rsock, wsock = socket.socketpair() # Register the open socket to wait for data. - async with asyncio.connect(sock=rsock) as stream: - # Simulate the reception of data from the network - loop.call_soon(wsock.send, 'abc'.encode()) + reader, writer = await asyncio.open_connection(sock=rsock) + + # Simulate the reception of data from the network + loop.call_soon(wsock.send, 'abc'.encode()) - # Wait for data - data = await stream.read(100) + # Wait for data + data = await reader.read(100) - # Got data, we are done: close the socket - print("Received:", data.decode()) + # Got data, we are done: close the socket + print("Received:", data.decode()) + writer.close() # Close the second socket wsock.close() diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index a6a29db..28c2e2c 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -3,7 +3,6 @@ # flake8: noqa import sys -import warnings # This relies on each of the submodules having an __all__ variable. from .base_events import * @@ -44,40 +43,3 @@ if sys.platform == 'win32': # pragma: no cover else: from .unix_events import * # pragma: no cover __all__ += unix_events.__all__ - - -__all__ += ('StreamReader', 'StreamWriter', 'StreamReaderProtocol') # deprecated - - -def __getattr__(name): - global StreamReader, StreamWriter, StreamReaderProtocol - if name == 'StreamReader': - warnings.warn("StreamReader is deprecated since Python 3.8 " - "in favor of Stream, and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) - from .streams import StreamReader as sr - StreamReader = sr - return StreamReader - if name == 'StreamWriter': - warnings.warn("StreamWriter is deprecated since Python 3.8 " - "in favor of Stream, and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) - from .streams import StreamWriter as sw - StreamWriter = sw - return StreamWriter - if name == 'StreamReaderProtocol': - warnings.warn("Using asyncio internal class StreamReaderProtocol " - "is deprecated since Python 3.8 " - " and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) - from .streams import StreamReaderProtocol as srp - StreamReaderProtocol = srp - return StreamReaderProtocol - - raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index b709dc1..795530e 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -1,19 +1,14 @@ __all__ = ( - 'Stream', 'StreamMode', - 'open_connection', 'start_server', - 'connect', 'connect_read_pipe', 'connect_write_pipe', - 'StreamServer') + 'StreamReader', 'StreamWriter', 'StreamReaderProtocol', + 'open_connection', 'start_server') -import enum import socket import sys import warnings import weakref if hasattr(socket, 'AF_UNIX'): - __all__ += ('open_unix_connection', 'start_unix_server', - 'connect_unix', - 'UnixStreamServer') + __all__ += ('open_unix_connection', 'start_unix_server') from . import coroutines from . import events @@ -21,155 +16,12 @@ from . import exceptions from . import format_helpers from . import protocols from .log import logger -from . import tasks +from .tasks import sleep _DEFAULT_LIMIT = 2 ** 16 # 64 KiB -class StreamMode(enum.Flag): - READ = enum.auto() - WRITE = enum.auto() - READWRITE = READ | WRITE - - -def _ensure_can_read(mode): - if not mode & StreamMode.READ: - raise RuntimeError("The stream is write-only") - - -def _ensure_can_write(mode): - if not mode & StreamMode.WRITE: - raise RuntimeError("The stream is read-only") - - -class _ContextManagerHelper: - __slots__ = ('_awaitable', '_result') - - def __init__(self, awaitable): - self._awaitable = awaitable - self._result = None - - def __await__(self): - return self._awaitable.__await__() - - async def __aenter__(self): - ret = await self._awaitable - result = await ret.__aenter__() - self._result = result - return result - - async def __aexit__(self, exc_type, exc_val, exc_tb): - return await self._result.__aexit__(exc_type, exc_val, exc_tb) - - -def connect(host=None, port=None, *, - limit=_DEFAULT_LIMIT, - ssl=None, family=0, proto=0, - flags=0, sock=None, local_addr=None, - server_hostname=None, - ssl_handshake_timeout=None, - happy_eyeballs_delay=None, interleave=None): - """Connect to TCP socket on *host* : *port* address to send and receive data. - - *limit* determines the buffer size limit used by the returned `Stream` - instance. By default the *limit* is set to 64 KiB. - - The rest of the arguments are passed directly to `loop.create_connection()`. - """ - # Design note: - # Don't use decorator approach but explicit non-async - # function to fail fast and explicitly - # if passed arguments don't match the function signature - return _ContextManagerHelper(_connect(host, port, limit, - ssl, family, proto, - flags, sock, local_addr, - server_hostname, - ssl_handshake_timeout, - happy_eyeballs_delay, - interleave)) - - -async def _connect(host, port, - limit, - ssl, family, proto, - flags, sock, local_addr, - server_hostname, - ssl_handshake_timeout, - happy_eyeballs_delay, interleave): - loop = events.get_running_loop() - stream = Stream(mode=StreamMode.READWRITE, - limit=limit, - loop=loop, - _asyncio_internal=True) - await loop.create_connection( - lambda: _StreamProtocol(stream, loop=loop, - _asyncio_internal=True), - host, port, - ssl=ssl, family=family, proto=proto, - flags=flags, sock=sock, local_addr=local_addr, - server_hostname=server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout, - happy_eyeballs_delay=happy_eyeballs_delay, interleave=interleave) - return stream - - -def connect_read_pipe(pipe, *, limit=_DEFAULT_LIMIT): - """Establish a connection to a file-like object *pipe* to receive data. - - Takes a file-like object *pipe* to return a Stream object of the mode - StreamMode.READ that has similar API of StreamReader. It can also be used - as an async context manager. - """ - - # Design note: - # Don't use decorator approach but explicit non-async - # function to fail fast and explicitly - # if passed arguments don't match the function signature - return _ContextManagerHelper(_connect_read_pipe(pipe, limit)) - - -async def _connect_read_pipe(pipe, limit): - loop = events.get_running_loop() - stream = Stream(mode=StreamMode.READ, - limit=limit, - loop=loop, - _asyncio_internal=True) - await loop.connect_read_pipe( - lambda: _StreamProtocol(stream, loop=loop, - _asyncio_internal=True), - pipe) - return stream - - -def connect_write_pipe(pipe, *, limit=_DEFAULT_LIMIT): - """Establish a connection to a file-like object *pipe* to send data. - - Takes a file-like object *pipe* to return a Stream object of the mode - StreamMode.WRITE that has similar API of StreamWriter. It can also be used - as an async context manager. - """ - - # Design note: - # Don't use decorator approach but explicit non-async - # function to fail fast and explicitly - # if passed arguments don't match the function signature - return _ContextManagerHelper(_connect_write_pipe(pipe, limit)) - - -async def _connect_write_pipe(pipe, limit): - loop = events.get_running_loop() - stream = Stream(mode=StreamMode.WRITE, - limit=limit, - loop=loop, - _asyncio_internal=True) - await loop.connect_write_pipe( - lambda: _StreamProtocol(stream, loop=loop, - _asyncio_internal=True), - pipe) - return stream - - async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. @@ -189,11 +41,6 @@ async def open_connection(host=None, port=None, *, StreamReaderProtocol classes, just copy the code -- there's really nothing special here except some convenience.) """ - warnings.warn("open_connection() is deprecated since Python 3.8 " - "in favor of connect(), and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) if loop is None: loop = events.get_event_loop() else: @@ -201,7 +48,7 @@ async def open_connection(host=None, port=None, *, "and scheduled for removal in Python 3.10.", DeprecationWarning, stacklevel=2) reader = StreamReader(limit=limit, loop=loop) - protocol = StreamReaderProtocol(reader, loop=loop, _asyncio_internal=True) + protocol = StreamReaderProtocol(reader, loop=loop) transport, _ = await loop.create_connection( lambda: protocol, host, port, **kwds) writer = StreamWriter(transport, protocol, reader, loop) @@ -231,11 +78,6 @@ async def start_server(client_connected_cb, host=None, port=None, *, The return value is the same as loop.create_server(), i.e. a Server object which can be used to stop the service. """ - warnings.warn("start_server() is deprecated since Python 3.8 " - "in favor of StreamServer(), and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) if loop is None: loop = events.get_event_loop() else: @@ -246,201 +88,18 @@ async def start_server(client_connected_cb, host=None, port=None, *, def factory(): reader = StreamReader(limit=limit, loop=loop) protocol = StreamReaderProtocol(reader, client_connected_cb, - loop=loop, - _asyncio_internal=True) + loop=loop) return protocol return await loop.create_server(factory, host, port, **kwds) -class _BaseStreamServer: - # Design notes. - # StreamServer and UnixStreamServer are exposed as FINAL classes, - # not function factories. - # async with serve(host, port) as server: - # server.start_serving() - # looks ugly. - # The class doesn't provide API for enumerating connected streams - # It can be a subject for improvements in Python 3.9 - - _server_impl = None - - def __init__(self, client_connected_cb, - /, - limit=_DEFAULT_LIMIT, - shutdown_timeout=60, - _asyncio_internal=False): - if not _asyncio_internal: - raise RuntimeError("_ServerStream is a private asyncio class") - self._client_connected_cb = client_connected_cb - self._limit = limit - self._loop = events.get_running_loop() - self._streams = {} - self._shutdown_timeout = shutdown_timeout - - def __init_subclass__(cls): - if not cls.__module__.startswith('asyncio.'): - raise TypeError(f"asyncio.{cls.__name__} " - "class cannot be inherited from") - - async def bind(self): - if self._server_impl is not None: - return - self._server_impl = await self._bind() - - def is_bound(self): - return self._server_impl is not None - - @property - def sockets(self): - # multiple value for socket bound to both IPv4 and IPv6 families - if self._server_impl is None: - return () - return self._server_impl.sockets - - def is_serving(self): - if self._server_impl is None: - return False - return self._server_impl.is_serving() - - async def start_serving(self): - await self.bind() - await self._server_impl.start_serving() - - async def serve_forever(self): - await self.start_serving() - await self._server_impl.serve_forever() - - async def close(self): - if self._server_impl is None: - return - self._server_impl.close() - streams = list(self._streams.keys()) - active_tasks = list(self._streams.values()) - if streams: - await tasks.wait([stream.close() for stream in streams]) - await self._server_impl.wait_closed() - self._server_impl = None - await self._shutdown_active_tasks(active_tasks) - - async def abort(self): - if self._server_impl is None: - return - self._server_impl.close() - streams = list(self._streams.keys()) - active_tasks = list(self._streams.values()) - if streams: - await tasks.wait([stream.abort() for stream in streams]) - await self._server_impl.wait_closed() - self._server_impl = None - await self._shutdown_active_tasks(active_tasks) - - async def __aenter__(self): - await self.bind() - return self - - async def __aexit__(self, exc_type, exc_value, exc_tb): - await self.close() - - def _attach(self, stream, task): - self._streams[stream] = task - - def _detach(self, stream, task): - del self._streams[stream] - - async def _shutdown_active_tasks(self, active_tasks): - if not active_tasks: - return - # NOTE: tasks finished with exception are reported - # by the Task.__del__() method. - done, pending = await tasks.wait(active_tasks, - timeout=self._shutdown_timeout) - if not pending: - return - for task in pending: - task.cancel() - done, pending = await tasks.wait(pending, - timeout=self._shutdown_timeout) - for task in pending: - self._loop.call_exception_handler({ - "message": (f'{task!r} ignored cancellation request ' - f'from a closing {self!r}'), - "stream_server": self - }) - - def __repr__(self): - ret = [f'{self.__class__.__name__}'] - if self.is_serving(): - ret.append('serving') - if self.sockets: - ret.append(f'sockets={self.sockets!r}') - return '<' + ' '.join(ret) + '>' - - def __del__(self, _warn=warnings.warn): - if self._server_impl is not None: - _warn(f"unclosed stream server {self!r}", - ResourceWarning, source=self) - self._server_impl.close() - - -class StreamServer(_BaseStreamServer): - - def __init__(self, client_connected_cb, /, host=None, port=None, *, - limit=_DEFAULT_LIMIT, - family=socket.AF_UNSPEC, - flags=socket.AI_PASSIVE, sock=None, backlog=100, - ssl=None, reuse_address=None, reuse_port=None, - ssl_handshake_timeout=None, - shutdown_timeout=60): - super().__init__(client_connected_cb, - limit=limit, - shutdown_timeout=shutdown_timeout, - _asyncio_internal=True) - self._host = host - self._port = port - self._family = family - self._flags = flags - self._sock = sock - self._backlog = backlog - self._ssl = ssl - self._reuse_address = reuse_address - self._reuse_port = reuse_port - self._ssl_handshake_timeout = ssl_handshake_timeout - - async def _bind(self): - def factory(): - protocol = _ServerStreamProtocol(self, - self._limit, - self._client_connected_cb, - loop=self._loop, - _asyncio_internal=True) - return protocol - return await self._loop.create_server( - factory, - self._host, - self._port, - start_serving=False, - family=self._family, - flags=self._flags, - sock=self._sock, - backlog=self._backlog, - ssl=self._ssl, - reuse_address=self._reuse_address, - reuse_port=self._reuse_port, - ssl_handshake_timeout=self._ssl_handshake_timeout) - - if hasattr(socket, 'AF_UNIX'): # UNIX Domain Sockets are supported on this platform async def open_unix_connection(path=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """Similar to `open_connection` but works with UNIX Domain Sockets.""" - warnings.warn("open_unix_connection() is deprecated since Python 3.8 " - "in favor of connect_unix(), and scheduled for removal " - "in Python 3.10", - DeprecationWarning, - stacklevel=2) if loop is None: loop = events.get_event_loop() else: @@ -448,62 +107,15 @@ if hasattr(socket, 'AF_UNIX'): "and scheduled for removal in Python 3.10.", DeprecationWarning, stacklevel=2) reader = StreamReader(limit=limit, loop=loop) - protocol = StreamReaderProtocol(reader, loop=loop, - _asyncio_internal=True) + protocol = StreamReaderProtocol(reader, loop=loop) transport, _ = await loop.create_unix_connection( lambda: protocol, path, **kwds) writer = StreamWriter(transport, protocol, reader, loop) return reader, writer - - def connect_unix(path=None, *, - limit=_DEFAULT_LIMIT, - ssl=None, sock=None, - server_hostname=None, - ssl_handshake_timeout=None): - """Similar to `connect()` but works with UNIX Domain Sockets.""" - # Design note: - # Don't use decorator approach but explicit non-async - # function to fail fast and explicitly - # if passed arguments don't match the function signature - return _ContextManagerHelper(_connect_unix(path, - limit, - ssl, sock, - server_hostname, - ssl_handshake_timeout)) - - - async def _connect_unix(path, - limit, - ssl, sock, - server_hostname, - ssl_handshake_timeout): - """Similar to `connect()` but works with UNIX Domain Sockets.""" - loop = events.get_running_loop() - stream = Stream(mode=StreamMode.READWRITE, - limit=limit, - loop=loop, - _asyncio_internal=True) - await loop.create_unix_connection( - lambda: _StreamProtocol(stream, - loop=loop, - _asyncio_internal=True), - path, - ssl=ssl, - sock=sock, - server_hostname=server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout) - return stream - - async def start_unix_server(client_connected_cb, path=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """Similar to `start_server` but works with UNIX Domain Sockets.""" - warnings.warn("start_unix_server() is deprecated since Python 3.8 " - "in favor of UnixStreamServer(), and scheduled " - "for removal in Python 3.10", - DeprecationWarning, - stacklevel=2) if loop is None: loop = events.get_event_loop() else: @@ -514,48 +126,11 @@ if hasattr(socket, 'AF_UNIX'): def factory(): reader = StreamReader(limit=limit, loop=loop) protocol = StreamReaderProtocol(reader, client_connected_cb, - loop=loop, - _asyncio_internal=True) + loop=loop) return protocol return await loop.create_unix_server(factory, path, **kwds) - class UnixStreamServer(_BaseStreamServer): - - def __init__(self, client_connected_cb, /, path=None, *, - limit=_DEFAULT_LIMIT, - sock=None, - backlog=100, - ssl=None, - ssl_handshake_timeout=None, - shutdown_timeout=60): - super().__init__(client_connected_cb, - limit=limit, - shutdown_timeout=shutdown_timeout, - _asyncio_internal=True) - self._path = path - self._sock = sock - self._backlog = backlog - self._ssl = ssl - self._ssl_handshake_timeout = ssl_handshake_timeout - - async def _bind(self): - def factory(): - protocol = _ServerStreamProtocol(self, - self._limit, - self._client_connected_cb, - loop=self._loop, - _asyncio_internal=True) - return protocol - return await self._loop.create_unix_server( - factory, - self._path, - start_serving=False, - sock=self._sock, - backlog=self._backlog, - ssl=self._ssl, - ssl_handshake_timeout=self._ssl_handshake_timeout) - class FlowControlMixin(protocols.Protocol): """Reusable flow control logic for StreamWriter.drain(). @@ -567,20 +142,11 @@ class FlowControlMixin(protocols.Protocol): StreamWriter.drain() must wait for _drain_helper() coroutine. """ - def __init__(self, loop=None, *, _asyncio_internal=False): + def __init__(self, loop=None): if loop is None: self._loop = events.get_event_loop() else: self._loop = loop - if not _asyncio_internal: - # NOTE: - # Avoid inheritance from FlowControlMixin - # Copy-paste the code to your project - # if you need flow control helpers - warnings.warn(f"{self.__class__} should be instantiated " - "by asyncio internals only, " - "please avoid its creation from user code", - DeprecationWarning) self._paused = False self._drain_waiter = None self._connection_lost = False @@ -634,8 +200,6 @@ class FlowControlMixin(protocols.Protocol): raise NotImplementedError -# begin legacy stream APIs - class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): """Helper class to adapt between Protocol and StreamReader. @@ -645,47 +209,103 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): call inappropriate methods of the protocol.) """ - def __init__(self, stream_reader, client_connected_cb=None, loop=None, - *, _asyncio_internal=False): - super().__init__(loop=loop, _asyncio_internal=_asyncio_internal) - self._stream_reader = stream_reader + _source_traceback = None + + def __init__(self, stream_reader, client_connected_cb=None, loop=None): + super().__init__(loop=loop) + if stream_reader is not None: + self._stream_reader_wr = weakref.ref(stream_reader, + self._on_reader_gc) + self._source_traceback = stream_reader._source_traceback + else: + self._stream_reader_wr = None + if client_connected_cb is not None: + # This is a stream created by the `create_server()` function. + # Keep a strong reference to the reader until a connection + # is established. + self._strong_reader = stream_reader + self._reject_connection = False self._stream_writer = None + self._transport = None self._client_connected_cb = client_connected_cb self._over_ssl = False self._closed = self._loop.create_future() + def _on_reader_gc(self, wr): + transport = self._transport + if transport is not None: + # connection_made was called + context = { + 'message': ('An open stream object is being garbage ' + 'collected; call "stream.close()" explicitly.') + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + transport.abort() + else: + self._reject_connection = True + self._stream_reader_wr = None + + @property + def _stream_reader(self): + if self._stream_reader_wr is None: + return None + return self._stream_reader_wr() + def connection_made(self, transport): - self._stream_reader.set_transport(transport) + if self._reject_connection: + context = { + 'message': ('An open stream was garbage collected prior to ' + 'establishing network connection; ' + 'call "stream.close()" explicitly.') + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + transport.abort() + return + self._transport = transport + reader = self._stream_reader + if reader is not None: + reader.set_transport(transport) self._over_ssl = transport.get_extra_info('sslcontext') is not None if self._client_connected_cb is not None: self._stream_writer = StreamWriter(transport, self, - self._stream_reader, + reader, self._loop) - res = self._client_connected_cb(self._stream_reader, + res = self._client_connected_cb(reader, self._stream_writer) if coroutines.iscoroutine(res): self._loop.create_task(res) + self._strong_reader = None def connection_lost(self, exc): - if self._stream_reader is not None: + reader = self._stream_reader + if reader is not None: if exc is None: - self._stream_reader.feed_eof() + reader.feed_eof() else: - self._stream_reader.set_exception(exc) + reader.set_exception(exc) if not self._closed.done(): if exc is None: self._closed.set_result(None) else: self._closed.set_exception(exc) super().connection_lost(exc) - self._stream_reader = None + self._stream_reader_wr = None self._stream_writer = None + self._transport = None def data_received(self, data): - self._stream_reader.feed_data(data) + reader = self._stream_reader + if reader is not None: + reader.feed_data(data) def eof_received(self): - self._stream_reader.feed_eof() + reader = self._stream_reader + if reader is not None: + reader.feed_eof() if self._over_ssl: # Prevent a warning in SSLProtocol.eof_received: # "returning true from eof_received() @@ -693,6 +313,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): return False return True + def _get_close_waiter(self, stream): + return self._closed + def __del__(self): # Prevent reports about unhandled exceptions. # Better than self._closed._log_traceback = False hack @@ -718,6 +341,8 @@ class StreamWriter: assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop + self._complete_fut = self._loop.create_future() + self._complete_fut.set_result(None) def __repr__(self): info = [self.__class__.__name__, f'transport={self._transport!r}'] @@ -748,7 +373,7 @@ class StreamWriter: return self._transport.is_closing() async def wait_closed(self): - await self._protocol._closed + await self._protocol._get_close_waiter(self) def get_extra_info(self, name, default=None): return self._transport.get_extra_info(name, default) @@ -766,561 +391,24 @@ class StreamWriter: if exc is not None: raise exc if self._transport.is_closing(): + # Wait for protocol.connection_lost() call + # Raise connection closing error if any, + # ConnectionResetError otherwise # Yield to the event loop so connection_lost() may be # called. Without this, _drain_helper() would return # immediately, and code that calls # write(...); await drain() # in a loop would never call connection_lost(), so it # would not see an error when the socket is closed. - await tasks.sleep(0, loop=self._loop) + await sleep(0) await self._protocol._drain_helper() class StreamReader: - def __init__(self, limit=_DEFAULT_LIMIT, loop=None): - # The line length limit is a security feature; - # it also doubles as half the buffer limit. - - if limit <= 0: - raise ValueError('Limit cannot be <= 0') - - self._limit = limit - if loop is None: - self._loop = events.get_event_loop() - else: - self._loop = loop - self._buffer = bytearray() - self._eof = False # Whether we're done. - self._waiter = None # A future used by _wait_for_data() - self._exception = None - self._transport = None - self._paused = False - - def __repr__(self): - info = ['StreamReader'] - if self._buffer: - info.append(f'{len(self._buffer)} bytes') - if self._eof: - info.append('eof') - if self._limit != _DEFAULT_LIMIT: - info.append(f'limit={self._limit}') - if self._waiter: - info.append(f'waiter={self._waiter!r}') - if self._exception: - info.append(f'exception={self._exception!r}') - if self._transport: - info.append(f'transport={self._transport!r}') - if self._paused: - info.append('paused') - return '<{}>'.format(' '.join(info)) - - def exception(self): - return self._exception - - def set_exception(self, exc): - self._exception = exc - - waiter = self._waiter - if waiter is not None: - self._waiter = None - if not waiter.cancelled(): - waiter.set_exception(exc) - - def _wakeup_waiter(self): - """Wakeup read*() functions waiting for data or EOF.""" - waiter = self._waiter - if waiter is not None: - self._waiter = None - if not waiter.cancelled(): - waiter.set_result(None) - - def set_transport(self, transport): - assert self._transport is None, 'Transport already set' - self._transport = transport - - def _maybe_resume_transport(self): - if self._paused and len(self._buffer) <= self._limit: - self._paused = False - self._transport.resume_reading() - - def feed_eof(self): - self._eof = True - self._wakeup_waiter() - - def at_eof(self): - """Return True if the buffer is empty and 'feed_eof' was called.""" - return self._eof and not self._buffer - - def feed_data(self, data): - assert not self._eof, 'feed_data after feed_eof' - - if not data: - return - - self._buffer.extend(data) - self._wakeup_waiter() - - if (self._transport is not None and - not self._paused and - len(self._buffer) > 2 * self._limit): - try: - self._transport.pause_reading() - except NotImplementedError: - # The transport can't be paused. - # We'll just have to buffer all data. - # Forget the transport so we don't keep trying. - self._transport = None - else: - self._paused = True - - async def _wait_for_data(self, func_name): - """Wait until feed_data() or feed_eof() is called. - - If stream was paused, automatically resume it. - """ - # StreamReader uses a future to link the protocol feed_data() method - # to a read coroutine. Running two read coroutines at the same time - # would have an unexpected behaviour. It would not possible to know - # which coroutine would get the next data. - if self._waiter is not None: - raise RuntimeError( - f'{func_name}() called while another coroutine is ' - f'already waiting for incoming data') - - assert not self._eof, '_wait_for_data after EOF' - - # Waiting for data while paused will make deadlock, so prevent it. - # This is essential for readexactly(n) for case when n > self._limit. - if self._paused: - self._paused = False - self._transport.resume_reading() - - self._waiter = self._loop.create_future() - try: - await self._waiter - finally: - self._waiter = None - - async def readline(self): - """Read chunk of data from the stream until newline (b'\n') is found. - - On success, return chunk that ends with newline. If only partial - line can be read due to EOF, return incomplete line without - terminating newline. When EOF was reached while no bytes read, empty - bytes object is returned. - - If limit is reached, ValueError will be raised. In that case, if - newline was found, complete line including newline will be removed - from internal buffer. Else, internal buffer will be cleared. Limit is - compared against part of the line without newline. - - If stream was paused, this function will automatically resume it if - needed. - """ - sep = b'\n' - seplen = len(sep) - try: - line = await self.readuntil(sep) - except exceptions.IncompleteReadError as e: - return e.partial - except exceptions.LimitOverrunError as e: - if self._buffer.startswith(sep, e.consumed): - del self._buffer[:e.consumed + seplen] - else: - self._buffer.clear() - self._maybe_resume_transport() - raise ValueError(e.args[0]) - return line - - async def readuntil(self, separator=b'\n'): - """Read data from the stream until ``separator`` is found. - - On success, the data and separator will be removed from the - internal buffer (consumed). Returned data will include the - separator at the end. - - Configured stream limit is used to check result. Limit sets the - maximal length of data that can be returned, not counting the - separator. - - If an EOF occurs and the complete separator is still not found, - an IncompleteReadError exception will be raised, and the internal - buffer will be reset. The IncompleteReadError.partial attribute - may contain the separator partially. - - If the data cannot be read because of over limit, a - LimitOverrunError exception will be raised, and the data - will be left in the internal buffer, so it can be read again. - """ - seplen = len(separator) - if seplen == 0: - raise ValueError('Separator should be at least one-byte string') - - if self._exception is not None: - raise self._exception - - # Consume whole buffer except last bytes, which length is - # one less than seplen. Let's check corner cases with - # separator='SEPARATOR': - # * we have received almost complete separator (without last - # byte). i.e buffer='some textSEPARATO'. In this case we - # can safely consume len(separator) - 1 bytes. - # * last byte of buffer is first byte of separator, i.e. - # buffer='abcdefghijklmnopqrS'. We may safely consume - # everything except that last byte, but this require to - # analyze bytes of buffer that match partial separator. - # This is slow and/or require FSM. For this case our - # implementation is not optimal, since require rescanning - # of data that is known to not belong to separator. In - # real world, separator will not be so long to notice - # performance problems. Even when reading MIME-encoded - # messages :) - - # `offset` is the number of bytes from the beginning of the buffer - # where there is no occurrence of `separator`. - offset = 0 - - # Loop until we find `separator` in the buffer, exceed the buffer size, - # or an EOF has happened. - while True: - buflen = len(self._buffer) - - # Check if we now have enough data in the buffer for `separator` to - # fit. - if buflen - offset >= seplen: - isep = self._buffer.find(separator, offset) - - if isep != -1: - # `separator` is in the buffer. `isep` will be used later - # to retrieve the data. - break - - # see upper comment for explanation. - offset = buflen + 1 - seplen - if offset > self._limit: - raise exceptions.LimitOverrunError( - 'Separator is not found, and chunk exceed the limit', - offset) - - # Complete message (with full separator) may be present in buffer - # even when EOF flag is set. This may happen when the last chunk - # adds data which makes separator be found. That's why we check for - # EOF *ater* inspecting the buffer. - if self._eof: - chunk = bytes(self._buffer) - self._buffer.clear() - raise exceptions.IncompleteReadError(chunk, None) - - # _wait_for_data() will resume reading if stream was paused. - await self._wait_for_data('readuntil') - - if isep > self._limit: - raise exceptions.LimitOverrunError( - 'Separator is found, but chunk is longer than limit', isep) - - chunk = self._buffer[:isep + seplen] - del self._buffer[:isep + seplen] - self._maybe_resume_transport() - return bytes(chunk) - - async def read(self, n=-1): - """Read up to `n` bytes from the stream. - - If n is not provided, or set to -1, read until EOF and return all read - bytes. If the EOF was received and the internal buffer is empty, return - an empty bytes object. - - If n is zero, return empty bytes object immediately. - - If n is positive, this function try to read `n` bytes, and may return - less or equal bytes than requested, but at least one byte. If EOF was - received before any byte is read, this function returns empty byte - object. - - Returned value is not limited with limit, configured at stream - creation. - - If stream was paused, this function will automatically resume it if - needed. - """ - - if self._exception is not None: - raise self._exception - - if n == 0: - return b'' - - if n < 0: - # This used to just loop creating a new waiter hoping to - # collect everything in self._buffer, but that would - # deadlock if the subprocess sends more than self.limit - # bytes. So just call self.read(self._limit) until EOF. - blocks = [] - while True: - block = await self.read(self._limit) - if not block: - break - blocks.append(block) - return b''.join(blocks) - - if not self._buffer and not self._eof: - await self._wait_for_data('read') - - # This will work right even if buffer is less than n bytes - data = bytes(self._buffer[:n]) - del self._buffer[:n] - - self._maybe_resume_transport() - return data - - async def readexactly(self, n): - """Read exactly `n` bytes. - - Raise an IncompleteReadError if EOF is reached before `n` bytes can be - read. The IncompleteReadError.partial attribute of the exception will - contain the partial read bytes. - - if n is zero, return empty bytes object. - - Returned value is not limited with limit, configured at stream - creation. - - If stream was paused, this function will automatically resume it if - needed. - """ - if n < 0: - raise ValueError('readexactly size can not be less than zero') - - if self._exception is not None: - raise self._exception - - if n == 0: - return b'' - - while len(self._buffer) < n: - if self._eof: - incomplete = bytes(self._buffer) - self._buffer.clear() - raise exceptions.IncompleteReadError(incomplete, n) - - await self._wait_for_data('readexactly') - - if len(self._buffer) == n: - data = bytes(self._buffer) - self._buffer.clear() - else: - data = bytes(self._buffer[:n]) - del self._buffer[:n] - self._maybe_resume_transport() - return data - - def __aiter__(self): - return self - - async def __anext__(self): - val = await self.readline() - if val == b'': - raise StopAsyncIteration - return val - - -# end legacy stream APIs - - -class _BaseStreamProtocol(FlowControlMixin, protocols.Protocol): - """Helper class to adapt between Protocol and StreamReader. - - (This is a helper class instead of making StreamReader itself a - Protocol subclass, because the StreamReader has other potential - uses, and to prevent the user of the StreamReader to accidentally - call inappropriate methods of the protocol.) - """ - - _stream = None # initialized in derived classes - - def __init__(self, loop=None, - *, _asyncio_internal=False): - super().__init__(loop=loop, _asyncio_internal=_asyncio_internal) - self._transport = None - self._over_ssl = False - self._closed = self._loop.create_future() - - def connection_made(self, transport): - self._transport = transport - self._over_ssl = transport.get_extra_info('sslcontext') is not None - - def connection_lost(self, exc): - stream = self._stream - if stream is not None: - if exc is None: - stream._feed_eof() - else: - stream._set_exception(exc) - if not self._closed.done(): - if exc is None: - self._closed.set_result(None) - else: - self._closed.set_exception(exc) - super().connection_lost(exc) - self._transport = None - - def data_received(self, data): - stream = self._stream - if stream is not None: - stream._feed_data(data) - - def eof_received(self): - stream = self._stream - if stream is not None: - stream._feed_eof() - if self._over_ssl: - # Prevent a warning in SSLProtocol.eof_received: - # "returning true from eof_received() - # has no effect when using ssl" - return False - return True - - def _get_close_waiter(self, stream): - return self._closed - - def __del__(self): - # Prevent reports about unhandled exceptions. - # Better than self._closed._log_traceback = False hack - closed = self._get_close_waiter(self._stream) - if closed.done() and not closed.cancelled(): - closed.exception() - - -class _StreamProtocol(_BaseStreamProtocol): - _source_traceback = None - - def __init__(self, stream, loop=None, - *, _asyncio_internal=False): - super().__init__(loop=loop, _asyncio_internal=_asyncio_internal) - self._source_traceback = stream._source_traceback - self._stream_wr = weakref.ref(stream, self._on_gc) - self._reject_connection = False - - def _on_gc(self, wr): - transport = self._transport - if transport is not None: - # connection_made was called - context = { - 'message': ('An open stream object is being garbage ' - 'collected; call "stream.close()" explicitly.') - } - if self._source_traceback: - context['source_traceback'] = self._source_traceback - self._loop.call_exception_handler(context) - transport.abort() - else: - self._reject_connection = True - self._stream_wr = None - - @property - def _stream(self): - if self._stream_wr is None: - return None - return self._stream_wr() - - def connection_made(self, transport): - if self._reject_connection: - context = { - 'message': ('An open stream was garbage collected prior to ' - 'establishing network connection; ' - 'call "stream.close()" explicitly.') - } - if self._source_traceback: - context['source_traceback'] = self._source_traceback - self._loop.call_exception_handler(context) - transport.abort() - return - super().connection_made(transport) - stream = self._stream - if stream is None: - return - stream._set_transport(transport) - stream._protocol = self - - def connection_lost(self, exc): - super().connection_lost(exc) - self._stream_wr = None - - -class _ServerStreamProtocol(_BaseStreamProtocol): - def __init__(self, server, limit, client_connected_cb, loop=None, - *, _asyncio_internal=False): - super().__init__(loop=loop, _asyncio_internal=_asyncio_internal) - assert self._closed - self._client_connected_cb = client_connected_cb - self._limit = limit - self._server = server - self._task = None - - def connection_made(self, transport): - super().connection_made(transport) - stream = Stream(mode=StreamMode.READWRITE, - transport=transport, - protocol=self, - limit=self._limit, - loop=self._loop, - is_server_side=True, - _asyncio_internal=True) - self._stream = stream - # If self._client_connected_cb(self._stream) fails - # the exception is logged by transport - self._task = self._loop.create_task( - self._client_connected_cb(self._stream)) - self._server._attach(stream, self._task) - - def connection_lost(self, exc): - super().connection_lost(exc) - self._server._detach(self._stream, self._task) - self._stream = None - - -class _OptionalAwait: - # The class doesn't create a coroutine - # if not awaited - # It prevents "coroutine is never awaited" message - - __slots___ = ('_method',) - - def __init__(self, method): - self._method = method - - def __await__(self): - return self._method().__await__() - - -class Stream: - """Wraps a Transport. - - This exposes write(), writelines(), [can_]write_eof(), - get_extra_info() and close(). It adds drain() which returns an - optional Future on which you can wait for flow control. It also - adds a transport property which references the Transport - directly. - """ - _source_traceback = None - def __init__(self, mode, *, - transport=None, - protocol=None, - loop=None, - limit=_DEFAULT_LIMIT, - is_server_side=False, - _asyncio_internal=False): - if not _asyncio_internal: - raise RuntimeError(f"{self.__class__} should be instantiated " - "by asyncio internals only") - self._mode = mode - self._transport = transport - self._protocol = protocol - self._is_server_side = is_server_side - + def __init__(self, limit=_DEFAULT_LIMIT, loop=None): # The line length limit is a security feature; # it also doubles as half the buffer limit. @@ -1336,17 +424,14 @@ class Stream: self._eof = False # Whether we're done. self._waiter = None # A future used by _wait_for_data() self._exception = None + self._transport = None self._paused = False - self._complete_fut = self._loop.create_future() - self._complete_fut.set_result(None) - if self._loop.get_debug(): self._source_traceback = format_helpers.extract_stack( sys._getframe(1)) def __repr__(self): - info = [self.__class__.__name__] - info.append(f'mode={self._mode}') + info = ['StreamReader'] if self._buffer: info.append(f'{len(self._buffer)} bytes') if self._eof: @@ -1363,127 +448,10 @@ class Stream: info.append('paused') return '<{}>'.format(' '.join(info)) - @property - def mode(self): - return self._mode - - def is_server_side(self): - return self._is_server_side - - @property - def transport(self): - warnings.warn("Stream.transport attribute is deprecated " - "since Python 3.8 and is scheduled for removal in 3.10; " - "it is an internal API", - DeprecationWarning, - stacklevel=2) - return self._transport - - def write(self, data): - _ensure_can_write(self._mode) - self._transport.write(data) - return self._fast_drain() - - def writelines(self, data): - _ensure_can_write(self._mode) - self._transport.writelines(data) - return self._fast_drain() - - def _fast_drain(self): - # The helper tries to use fast-path to return already existing - # complete future object if underlying transport is not paused - # and actual waiting for writing resume is not needed - exc = self.exception() - if exc is not None: - fut = self._loop.create_future() - fut.set_exception(exc) - return fut - if not self._transport.is_closing(): - if self._protocol._connection_lost: - fut = self._loop.create_future() - fut.set_exception(ConnectionResetError('Connection lost')) - return fut - if not self._protocol._paused: - # fast path, the stream is not paused - # no need to wait for resume signal - return self._complete_fut - return _OptionalAwait(self.drain) - - def write_eof(self): - _ensure_can_write(self._mode) - return self._transport.write_eof() - - def can_write_eof(self): - if not self._mode.is_write(): - return False - return self._transport.can_write_eof() - - def close(self): - self._transport.close() - return _OptionalAwait(self.wait_closed) - - def is_closing(self): - return self._transport.is_closing() - - async def abort(self): - self._transport.abort() - await self.wait_closed() - - async def wait_closed(self): - await self._protocol._get_close_waiter(self) - - def get_extra_info(self, name, default=None): - return self._transport.get_extra_info(name, default) - - async def drain(self): - """Flush the write buffer. - - The intended use is to write - - w.write(data) - await w.drain() - """ - _ensure_can_write(self._mode) - exc = self.exception() - if exc is not None: - raise exc - if self._transport.is_closing(): - # Wait for protocol.connection_lost() call - # Raise connection closing error if any, - # ConnectionResetError otherwise - await tasks.sleep(0) - await self._protocol._drain_helper() - - async def sendfile(self, file, offset=0, count=None, *, fallback=True): - await self.drain() # check for stream mode and exceptions - return await self._loop.sendfile(self._transport, file, - offset, count, fallback=fallback) - - async def start_tls(self, sslcontext, *, - server_hostname=None, - ssl_handshake_timeout=None): - await self.drain() # check for stream mode and exceptions - transport = await self._loop.start_tls( - self._transport, self._protocol, sslcontext, - server_side=self._is_server_side, - server_hostname=server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout) - self._transport = transport - self._protocol._transport = transport - self._protocol._over_ssl = True - def exception(self): return self._exception def set_exception(self, exc): - warnings.warn("Stream.set_exception() is deprecated " - "since Python 3.8 and is scheduled for removal in 3.10; " - "it is an internal API", - DeprecationWarning, - stacklevel=2) - self._set_exception(exc) - - def _set_exception(self, exc): self._exception = exc waiter = self._waiter @@ -1501,16 +469,6 @@ class Stream: waiter.set_result(None) def set_transport(self, transport): - warnings.warn("Stream.set_transport() is deprecated " - "since Python 3.8 and is scheduled for removal in 3.10; " - "it is an internal API", - DeprecationWarning, - stacklevel=2) - self._set_transport(transport) - - def _set_transport(self, transport): - if transport is self._transport: - return assert self._transport is None, 'Transport already set' self._transport = transport @@ -1520,14 +478,6 @@ class Stream: self._transport.resume_reading() def feed_eof(self): - warnings.warn("Stream.feed_eof() is deprecated " - "since Python 3.8 and is scheduled for removal in 3.10; " - "it is an internal API", - DeprecationWarning, - stacklevel=2) - self._feed_eof() - - def _feed_eof(self): self._eof = True self._wakeup_waiter() @@ -1536,15 +486,6 @@ class Stream: return self._eof and not self._buffer def feed_data(self, data): - warnings.warn("Stream.feed_data() is deprecated " - "since Python 3.8 and is scheduled for removal in 3.10; " - "it is an internal API", - DeprecationWarning, - stacklevel=2) - self._feed_data(data) - - def _feed_data(self, data): - _ensure_can_read(self._mode) assert not self._eof, 'feed_data after feed_eof' if not data: @@ -1610,7 +551,6 @@ class Stream: If stream was paused, this function will automatically resume it if needed. """ - _ensure_can_read(self._mode) sep = b'\n' seplen = len(sep) try: @@ -1646,7 +586,6 @@ class Stream: LimitOverrunError exception will be raised, and the data will be left in the internal buffer, so it can be read again. """ - _ensure_can_read(self._mode) seplen = len(separator) if seplen == 0: raise ValueError('Separator should be at least one-byte string') @@ -1738,7 +677,6 @@ class Stream: If stream was paused, this function will automatically resume it if needed. """ - _ensure_can_read(self._mode) if self._exception is not None: raise self._exception @@ -1784,7 +722,6 @@ class Stream: If stream was paused, this function will automatically resume it if needed. """ - _ensure_can_read(self._mode) if n < 0: raise ValueError('readexactly size can not be less than zero') @@ -1812,7 +749,6 @@ class Stream: return data def __aiter__(self): - _ensure_can_read(self._mode) return self async def __anext__(self): @@ -1820,9 +756,3 @@ class Stream: if val == b'': raise StopAsyncIteration return val - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close() diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index bddfb01..c9506b1 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -19,16 +19,14 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, protocols.SubprocessProtocol): """Like StreamReaderProtocol, but for a subprocess.""" - def __init__(self, limit, loop, *, _asyncio_internal=False): - super().__init__(loop=loop, _asyncio_internal=_asyncio_internal) + def __init__(self, limit, loop): + super().__init__(loop=loop) self._limit = limit self.stdin = self.stdout = self.stderr = None self._transport = None self._process_exited = False self._pipe_fds = [] self._stdin_closed = self._loop.create_future() - self._stdout_closed = self._loop.create_future() - self._stderr_closed = self._loop.create_future() def __repr__(self): info = [self.__class__.__name__] @@ -42,35 +40,27 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, def connection_made(self, transport): self._transport = transport + stdout_transport = transport.get_pipe_transport(1) if stdout_transport is not None: - self.stdout = streams.Stream(mode=streams.StreamMode.READ, - transport=stdout_transport, - protocol=self, - limit=self._limit, - loop=self._loop, - _asyncio_internal=True) - self.stdout._set_transport(stdout_transport) + self.stdout = streams.StreamReader(limit=self._limit, + loop=self._loop) + self.stdout.set_transport(stdout_transport) self._pipe_fds.append(1) stderr_transport = transport.get_pipe_transport(2) if stderr_transport is not None: - self.stderr = streams.Stream(mode=streams.StreamMode.READ, - transport=stderr_transport, - protocol=self, - limit=self._limit, - loop=self._loop, - _asyncio_internal=True) - self.stderr._set_transport(stderr_transport) + self.stderr = streams.StreamReader(limit=self._limit, + loop=self._loop) + self.stderr.set_transport(stderr_transport) self._pipe_fds.append(2) stdin_transport = transport.get_pipe_transport(0) if stdin_transport is not None: - self.stdin = streams.Stream(mode=streams.StreamMode.WRITE, - transport=stdin_transport, - protocol=self, - loop=self._loop, - _asyncio_internal=True) + self.stdin = streams.StreamWriter(stdin_transport, + protocol=self, + reader=None, + loop=self._loop) def pipe_data_received(self, fd, data): if fd == 1: @@ -80,7 +70,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, else: reader = None if reader is not None: - reader._feed_data(data) + reader.feed_data(data) def pipe_connection_lost(self, fd, exc): if fd == 0: @@ -101,9 +91,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, reader = None if reader is not None: if exc is None: - reader._feed_eof() + reader.feed_eof() else: - reader._set_exception(exc) + reader.set_exception(exc) if fd in self._pipe_fds: self._pipe_fds.remove(fd) @@ -121,20 +111,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, def _get_close_waiter(self, stream): if stream is self.stdin: return self._stdin_closed - elif stream is self.stdout: - return self._stdout_closed - elif stream is self.stderr: - return self._stderr_closed class Process: - def __init__(self, transport, protocol, loop, *, _asyncio_internal=False): - if not _asyncio_internal: - warnings.warn(f"{self.__class__} should be instantiated " - "by asyncio internals only, " - "please avoid its creation from user code", - DeprecationWarning) - + def __init__(self, transport, protocol, loop): self._transport = transport self._protocol = protocol self._loop = loop @@ -232,13 +212,12 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, ) protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, - loop=loop, - _asyncio_internal=True) + loop=loop) transport, protocol = await loop.subprocess_shell( protocol_factory, cmd, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - return Process(transport, protocol, loop, _asyncio_internal=True) + return Process(transport, protocol, loop) async def create_subprocess_exec(program, *args, stdin=None, stdout=None, @@ -253,11 +232,10 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None, stacklevel=2 ) protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, - loop=loop, - _asyncio_internal=True) + loop=loop) transport, protocol = await loop.subprocess_exec( protocol_factory, program, *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwds) - return Process(transport, protocol, loop, _asyncio_internal=True) + return Process(transport, protocol, loop) diff --git a/Lib/test/test_asyncio/test_buffered_proto.py b/Lib/test/test_asyncio/test_buffered_proto.py index b1531fb..f24e363 100644 --- a/Lib/test/test_asyncio/test_buffered_proto.py +++ b/Lib/test/test_asyncio/test_buffered_proto.py @@ -58,10 +58,9 @@ class BaseTestBufferedProtocol(func_tests.FunctionalTestCaseMixin): writer.close() await writer.wait_closed() - with self.assertWarns(DeprecationWarning): - srv = self.loop.run_until_complete( - asyncio.start_server( - on_server_client, '127.0.0.1', 0)) + srv = self.loop.run_until_complete( + asyncio.start_server( + on_server_client, '127.0.0.1', 0)) addr = srv.sockets[0].getsockname() self.loop.run_until_complete( diff --git a/Lib/test/test_asyncio/test_pep492.py b/Lib/test/test_asyncio/test_pep492.py index 58a6094..a1f27dd 100644 --- a/Lib/test/test_asyncio/test_pep492.py +++ b/Lib/test/test_asyncio/test_pep492.py @@ -95,11 +95,9 @@ class StreamReaderTests(BaseTest): def test_readline(self): DATA = b'line1\nline2\nline3' - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(DATA) - stream._feed_eof() + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(DATA) + stream.feed_eof() async def reader(): data = [] diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 6325ee3..b9413ab 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -1,8 +1,6 @@ """Tests for streams.py.""" -import contextlib import gc -import io import os import queue import pickle @@ -18,7 +16,6 @@ except ImportError: ssl = None import asyncio -from asyncio.streams import _StreamProtocol, _ensure_can_read, _ensure_can_write from test.test_asyncio import utils as test_utils @@ -26,24 +23,6 @@ def tearDownModule(): asyncio.set_event_loop_policy(None) -class StreamModeTests(unittest.TestCase): - def test__ensure_can_read_ok(self): - self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READ)) - self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READWRITE)) - - def test__ensure_can_read_fail(self): - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - _ensure_can_read(asyncio.StreamMode.WRITE) - - def test__ensure_can_write_ok(self): - self.assertIsNone(_ensure_can_write(asyncio.StreamMode.WRITE)) - self.assertIsNone(_ensure_can_write(asyncio.StreamMode.READWRITE)) - - def test__ensure_can_write_fail(self): - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - _ensure_can_write(asyncio.StreamMode.READ) - - class StreamTests(test_utils.TestCase): DATA = b'line1\nline2\nline3\n' @@ -63,8 +42,7 @@ class StreamTests(test_utils.TestCase): @mock.patch('asyncio.streams.events') def test_ctor_global_loop(self, m_events): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) + stream = asyncio.StreamReader() self.assertIs(stream._loop, m_events.get_event_loop.return_value) def _basetest_open_connection(self, open_connection_fut): @@ -100,8 +78,7 @@ class StreamTests(test_utils.TestCase): self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) try: with self.assertWarns(DeprecationWarning): - reader, writer = self.loop.run_until_complete( - open_connection_fut) + reader, writer = self.loop.run_until_complete(open_connection_fut) finally: asyncio.set_event_loop(None) writer.write(b'GET / HTTP/1.0\r\n\r\n') @@ -161,27 +138,21 @@ class StreamTests(test_utils.TestCase): self._basetest_open_connection_error(conn_fut) def test_feed_empty_data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) - stream._feed_data(b'') + stream.feed_data(b'') self.assertEqual(b'', stream._buffer) def test_feed_nonempty_data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) - stream._feed_data(self.DATA) + stream.feed_data(self.DATA) self.assertEqual(self.DATA, stream._buffer) def test_read_zero(self): # Read zero bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(self.DATA) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA) data = self.loop.run_until_complete(stream.read(0)) self.assertEqual(b'', data) @@ -189,13 +160,11 @@ class StreamTests(test_utils.TestCase): def test_read(self): # Read bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) read_task = self.loop.create_task(stream.read(30)) def cb(): - stream._feed_data(self.DATA) + stream.feed_data(self.DATA) self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) @@ -204,11 +173,9 @@ class StreamTests(test_utils.TestCase): def test_read_line_breaks(self): # Read bytes without line breaks. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'line1') - stream._feed_data(b'line2') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line1') + stream.feed_data(b'line2') data = self.loop.run_until_complete(stream.read(5)) @@ -217,13 +184,11 @@ class StreamTests(test_utils.TestCase): def test_read_eof(self): # Read bytes, stop at eof. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) read_task = self.loop.create_task(stream.read(1024)) def cb(): - stream._feed_eof() + stream.feed_eof() self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) @@ -232,15 +197,13 @@ class StreamTests(test_utils.TestCase): def test_read_until_eof(self): # Read all bytes until eof. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) read_task = self.loop.create_task(stream.read(-1)) def cb(): - stream._feed_data(b'chunk1\n') - stream._feed_data(b'chunk2') - stream._feed_eof() + stream.feed_data(b'chunk1\n') + stream.feed_data(b'chunk2') + stream.feed_eof() self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) @@ -249,34 +212,26 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'', stream._buffer) def test_read_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'line\n') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.read(2)) self.assertEqual(b'li', data) - stream._set_exception(ValueError()) + stream.set_exception(ValueError()) self.assertRaises( ValueError, self.loop.run_until_complete, stream.read(2)) def test_invalid_limit(self): with self.assertRaisesRegex(ValueError, 'imit'): - asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=0, loop=self.loop, - _asyncio_internal=True) + asyncio.StreamReader(limit=0, loop=self.loop) with self.assertRaisesRegex(ValueError, 'imit'): - asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=-1, loop=self.loop, - _asyncio_internal=True) + asyncio.StreamReader(limit=-1, loop=self.loop) def test_read_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'chunk') + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'chunk') data = self.loop.run_until_complete(stream.read(5)) self.assertEqual(b'chunk', data) self.assertEqual(b'', stream._buffer) @@ -284,16 +239,14 @@ class StreamTests(test_utils.TestCase): def test_readline(self): # Read one line. 'readline' will need to wait for the data # to come from 'cb' - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'chunk1 ') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'chunk1 ') read_task = self.loop.create_task(stream.readline()) def cb(): - stream._feed_data(b'chunk2 ') - stream._feed_data(b'chunk3 ') - stream._feed_data(b'\n chunk4') + stream.feed_data(b'chunk2 ') + stream.feed_data(b'chunk3 ') + stream.feed_data(b'\n chunk4') self.loop.call_soon(cb) line = self.loop.run_until_complete(read_task) @@ -301,26 +254,22 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b' chunk4', stream._buffer) def test_readline_limit_with_existing_data(self): - # Read one line. The data is in Stream's buffer + # Read one line. The data is in StreamReader's buffer # before the event loop is run. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'li') - stream._feed_data(b'ne1\nline2\n') + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'li') + stream.feed_data(b'ne1\nline2\n') self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) # The buffer should contain the remaining data after exception self.assertEqual(b'line2\n', stream._buffer) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'li') - stream._feed_data(b'ne1') - stream._feed_data(b'li') + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'li') + stream.feed_data(b'ne1') + stream.feed_data(b'li') self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) @@ -332,34 +281,30 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'', stream._buffer) def test_at_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) self.assertFalse(stream.at_eof()) - stream._feed_data(b'some data\n') + stream.feed_data(b'some data\n') self.assertFalse(stream.at_eof()) self.loop.run_until_complete(stream.readline()) self.assertFalse(stream.at_eof()) - stream._feed_data(b'some data\n') - stream._feed_eof() + stream.feed_data(b'some data\n') + stream.feed_eof() self.loop.run_until_complete(stream.readline()) self.assertTrue(stream.at_eof()) def test_readline_limit(self): - # Read one line. Streams are fed with data after + # Read one line. StreamReaders are fed with data after # their 'readline' methods are called. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=7, loop=self.loop) def cb(): - stream._feed_data(b'chunk1') - stream._feed_data(b'chunk2') - stream._feed_data(b'chunk3\n') - stream._feed_eof() + stream.feed_data(b'chunk1') + stream.feed_data(b'chunk2') + stream.feed_data(b'chunk3\n') + stream.feed_eof() self.loop.call_soon(cb) self.assertRaises( @@ -368,14 +313,12 @@ class StreamTests(test_utils.TestCase): # a ValueError it should be empty. self.assertEqual(b'', stream._buffer) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=7, loop=self.loop) def cb(): - stream._feed_data(b'chunk1') - stream._feed_data(b'chunk2\n') - stream._feed_data(b'chunk3\n') - stream._feed_eof() + stream.feed_data(b'chunk1') + stream.feed_data(b'chunk2\n') + stream.feed_data(b'chunk3\n') + stream.feed_eof() self.loop.call_soon(cb) self.assertRaises( @@ -383,20 +326,18 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'chunk3\n', stream._buffer) # check strictness of the limit - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'1234567\n') + stream = asyncio.StreamReader(limit=7, loop=self.loop) + stream.feed_data(b'1234567\n') line = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'1234567\n', line) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'12345678\n') + stream.feed_data(b'12345678\n') with self.assertRaises(ValueError) as cm: self.loop.run_until_complete(stream.readline()) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'12345678') + stream.feed_data(b'12345678') with self.assertRaises(ValueError) as cm: self.loop.run_until_complete(stream.readline()) self.assertEqual(b'', stream._buffer) @@ -404,11 +345,9 @@ class StreamTests(test_utils.TestCase): def test_readline_nolimit_nowait(self): # All needed data for the first 'readline' call will be # in the buffer. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(self.DATA[:6]) - stream._feed_data(self.DATA[6:]) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA[:6]) + stream.feed_data(self.DATA[6:]) line = self.loop.run_until_complete(stream.readline()) @@ -416,29 +355,23 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'line2\nline3\n', stream._buffer) def test_readline_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'some data') - stream._feed_eof() + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'some data') + stream.feed_eof() line = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'some data', line) def test_readline_empty_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_eof() + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_eof() line = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'', line) def test_readline_read_byte_count(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(self.DATA) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA) self.loop.run_until_complete(stream.readline()) @@ -448,89 +381,79 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'ine3\n', stream._buffer) def test_readline_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'line\n') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'line\n', data) - stream._set_exception(ValueError()) + stream.set_exception(ValueError()) self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) self.assertEqual(b'', stream._buffer) def test_readuntil_separator(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) with self.assertRaisesRegex(ValueError, 'Separator should be'): self.loop.run_until_complete(stream.readuntil(separator=b'')) def test_readuntil_multi_chunks(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) - stream._feed_data(b'lineAAA') + stream.feed_data(b'lineAAA') data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA')) self.assertEqual(b'lineAAA', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'lineAAA') + stream.feed_data(b'lineAAA') data = self.loop.run_until_complete(stream.readuntil(b'AAA')) self.assertEqual(b'lineAAA', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'lineAAAxxx') + stream.feed_data(b'lineAAAxxx') data = self.loop.run_until_complete(stream.readuntil(b'AAA')) self.assertEqual(b'lineAAA', data) self.assertEqual(b'xxx', stream._buffer) def test_readuntil_multi_chunks_1(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) - stream._feed_data(b'QWEaa') - stream._feed_data(b'XYaa') - stream._feed_data(b'a') + stream.feed_data(b'QWEaa') + stream.feed_data(b'XYaa') + stream.feed_data(b'a') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'QWEaaXYaaa', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'QWEaa') - stream._feed_data(b'XYa') - stream._feed_data(b'aa') + stream.feed_data(b'QWEaa') + stream.feed_data(b'XYa') + stream.feed_data(b'aa') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'QWEaaXYaaa', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'aaa') + stream.feed_data(b'aaa') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'aaa', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'Xaaa') + stream.feed_data(b'Xaaa') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'Xaaa', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'XXX') - stream._feed_data(b'a') - stream._feed_data(b'a') - stream._feed_data(b'a') + stream.feed_data(b'XXX') + stream.feed_data(b'a') + stream.feed_data(b'a') + stream.feed_data(b'a') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'XXXaaa', data) self.assertEqual(b'', stream._buffer) def test_readuntil_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'some dataAA') - stream._feed_eof() + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'some dataAA') + stream.feed_eof() with self.assertRaises(asyncio.IncompleteReadError) as cm: self.loop.run_until_complete(stream.readuntil(b'AAA')) @@ -539,18 +462,15 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'', stream._buffer) def test_readuntil_limit_found_sep(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=3, - _asyncio_internal=True) - stream._feed_data(b'some dataAA') - + stream = asyncio.StreamReader(loop=self.loop, limit=3) + stream.feed_data(b'some dataAA') with self.assertRaisesRegex(asyncio.LimitOverrunError, 'not found') as cm: self.loop.run_until_complete(stream.readuntil(b'AAA')) self.assertEqual(b'some dataAA', stream._buffer) - stream._feed_data(b'A') + stream.feed_data(b'A') with self.assertRaisesRegex(asyncio.LimitOverrunError, 'is found') as cm: self.loop.run_until_complete(stream.readuntil(b'AAA')) @@ -559,10 +479,8 @@ class StreamTests(test_utils.TestCase): def test_readexactly_zero_or_less(self): # Read exact number of bytes (zero or less). - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(self.DATA) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA) data = self.loop.run_until_complete(stream.readexactly(0)) self.assertEqual(b'', data) @@ -574,17 +492,15 @@ class StreamTests(test_utils.TestCase): def test_readexactly(self): # Read exact number of bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) n = 2 * len(self.DATA) read_task = self.loop.create_task(stream.readexactly(n)) def cb(): - stream._feed_data(self.DATA) - stream._feed_data(self.DATA) - stream._feed_data(self.DATA) + stream.feed_data(self.DATA) + stream.feed_data(self.DATA) + stream.feed_data(self.DATA) self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) @@ -592,25 +508,21 @@ class StreamTests(test_utils.TestCase): self.assertEqual(self.DATA, stream._buffer) def test_readexactly_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'chunk') + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'chunk') data = self.loop.run_until_complete(stream.readexactly(5)) self.assertEqual(b'chunk', data) self.assertEqual(b'', stream._buffer) def test_readexactly_eof(self): # Read exact number of bytes (eof). - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) n = 2 * len(self.DATA) read_task = self.loop.create_task(stream.readexactly(n)) def cb(): - stream._feed_data(self.DATA) - stream._feed_eof() + stream.feed_data(self.DATA) + stream.feed_eof() self.loop.call_soon(cb) with self.assertRaises(asyncio.IncompleteReadError) as cm: @@ -622,35 +534,29 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'', stream._buffer) def test_readexactly_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'line\n') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.readexactly(2)) self.assertEqual(b'li', data) - stream._set_exception(ValueError()) + stream.set_exception(ValueError()) self.assertRaises( ValueError, self.loop.run_until_complete, stream.readexactly(2)) def test_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) self.assertIsNone(stream.exception()) exc = ValueError() - stream._set_exception(exc) + stream.set_exception(exc) self.assertIs(stream.exception(), exc) def test_exception_waiter(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) async def set_err(): - stream._set_exception(ValueError()) + stream.set_exception(ValueError()) t1 = self.loop.create_task(stream.readline()) t2 = self.loop.create_task(set_err()) @@ -660,16 +566,14 @@ class StreamTests(test_utils.TestCase): self.assertRaises(ValueError, t1.result) def test_exception_cancel(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) t = self.loop.create_task(stream.readline()) test_utils.run_briefly(self.loop) t.cancel() test_utils.run_briefly(self.loop) # The following line fails if set_exception() isn't careful. - stream._set_exception(RuntimeError('message')) + stream.set_exception(RuntimeError('message')) test_utils.run_briefly(self.loop) self.assertIs(stream._waiter, None) @@ -829,7 +733,7 @@ class StreamTests(test_utils.TestCase): def test_read_all_from_pipe_reader(self): # See asyncio issue 168. This test is derived from the example # subprocess_attach_read_pipe.py, but we configure the - # Stream's limit so that twice it is less than the size + # StreamReader's limit so that twice it is less than the size # of the data writter. Also we must explicitly attach a child # watcher to the event loop. @@ -843,11 +747,8 @@ os.close(fd) args = [sys.executable, '-c', code, str(wfd)] pipe = open(rfd, 'rb', 0) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=1, - _asyncio_internal=True) - protocol = _StreamProtocol(stream, loop=self.loop, - _asyncio_internal=True) + reader = asyncio.StreamReader(loop=self.loop, limit=1) + protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) transport, _ = self.loop.run_until_complete( self.loop.connect_read_pipe(lambda: protocol, pipe)) @@ -865,30 +766,29 @@ os.close(fd) asyncio.set_child_watcher(None) os.close(wfd) - data = self.loop.run_until_complete(stream.read(-1)) + data = self.loop.run_until_complete(reader.read(-1)) self.assertEqual(data, b'data') def test_streamreader_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) - # asyncio issue #184: Ensure that _StreamProtocol constructor + # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set - reader = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) + reader = asyncio.StreamReader() self.assertIs(reader._loop, self.loop) def test_streamreaderprotocol_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) - # asyncio issue #184: Ensure that _StreamProtocol constructor + # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set - stream = mock.Mock() - protocol = _StreamProtocol(stream, _asyncio_internal=True) + reader = mock.Mock() + protocol = asyncio.StreamReaderProtocol(reader) self.assertIs(protocol._loop, self.loop) - def test_drain_raises_deprecated(self): + def test_drain_raises(self): # See http://bugs.python.org/issue25441 # This test should not use asyncio for the mock server; the @@ -902,7 +802,7 @@ os.close(fd) def server(): # Runs in a separate thread. - with socket.create_server(('127.0.0.1', 0)) as sock: + with socket.create_server(('localhost', 0)) as sock: addr = sock.getsockname() q.put(addr) clt, _ = sock.accept() @@ -933,106 +833,48 @@ os.close(fd) thread.join() self.assertEqual([], messages) - def test_drain_raises(self): - # See http://bugs.python.org/issue25441 - - # This test should not use asyncio for the mock server; the - # whole point of the test is to test for a bug in drain() - # where it never gives up the event loop but the socket is - # closed on the server side. - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - q = queue.Queue() - - def server(): - # Runs in a separate thread. - with socket.create_server(('localhost', 0)) as sock: - addr = sock.getsockname() - q.put(addr) - clt, _ = sock.accept() - clt.close() - - async def client(host, port): - stream = await asyncio.connect(host, port) - - while True: - stream.write(b"foo\n") - await stream.drain() - - # Start the server thread and wait for it to be listening. - thread = threading.Thread(target=server) - thread.setDaemon(True) - thread.start() - addr = q.get() - - # Should not be stuck in an infinite loop. - with self.assertRaises((ConnectionResetError, ConnectionAbortedError, - BrokenPipeError)): - self.loop.run_until_complete(client(*addr)) - - # Clean up the thread. (Only on success; on failure, it may - # be stuck in accept().) - thread.join() - self.assertEqual([], messages) - def test___repr__(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - self.assertEqual("<Stream mode=StreamMode.READ>", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop) + self.assertEqual("<StreamReader>", repr(stream)) def test___repr__nondefault_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=123, - _asyncio_internal=True) - self.assertEqual("<Stream mode=StreamMode.READ limit=123>", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop, limit=123) + self.assertEqual("<StreamReader limit=123>", repr(stream)) def test___repr__eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_eof() - self.assertEqual("<Stream mode=StreamMode.READ eof>", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_eof() + self.assertEqual("<StreamReader eof>", repr(stream)) def test___repr__data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'data') - self.assertEqual("<Stream mode=StreamMode.READ 4 bytes>", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'data') + self.assertEqual("<StreamReader 4 bytes>", repr(stream)) def test___repr__exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) exc = RuntimeError() - stream._set_exception(exc) - self.assertEqual("<Stream mode=StreamMode.READ exception=RuntimeError()>", + stream.set_exception(exc) + self.assertEqual("<StreamReader exception=RuntimeError()>", repr(stream)) def test___repr__waiter(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._waiter = self.loop.create_future() + stream = asyncio.StreamReader(loop=self.loop) + stream._waiter = asyncio.Future(loop=self.loop) self.assertRegex( repr(stream), - r"<Stream .+ waiter=<Future pending[\S ]*>>") + r"<StreamReader waiter=<Future pending[\S ]*>>") stream._waiter.set_result(None) self.loop.run_until_complete(stream._waiter) stream._waiter = None - self.assertEqual("<Stream mode=StreamMode.READ>", repr(stream)) + self.assertEqual("<StreamReader>", repr(stream)) def test___repr__transport(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) stream._transport = mock.Mock() stream._transport.__repr__ = mock.Mock() stream._transport.__repr__.return_value = "<Transport>" - self.assertEqual("<Stream mode=StreamMode.READ transport=<Transport>>", - repr(stream)) + self.assertEqual("<StreamReader transport=<Transport>>", repr(stream)) def test_IncompleteReadError_pickleable(self): e = asyncio.IncompleteReadError(b'abc', 10) @@ -1051,7 +893,7 @@ os.close(fd) self.assertEqual(str(e), str(e2)) self.assertEqual(e.consumed, e2.consumed) - def test_wait_closed_on_close_deprecated(self): + def test_wait_closed_on_close(self): with test_utils.run_test_server() as httpd: with self.assertWarns(DeprecationWarning): rd, wr = self.loop.run_until_complete( @@ -1069,24 +911,7 @@ os.close(fd) self.assertTrue(wr.is_closing()) self.loop.run_until_complete(wr.wait_closed()) - def test_wait_closed_on_close(self): - with test_utils.run_test_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - f = stream.read() - data = self.loop.run_until_complete(f) - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertFalse(stream.is_closing()) - stream.close() - self.assertTrue(stream.is_closing()) - self.loop.run_until_complete(stream.wait_closed()) - - def test_wait_closed_on_close_with_unread_data_deprecated(self): + def test_wait_closed_on_close_with_unread_data(self): with test_utils.run_test_server() as httpd: with self.assertWarns(DeprecationWarning): rd, wr = self.loop.run_until_complete( @@ -1099,44 +924,33 @@ os.close(fd) wr.close() self.loop.run_until_complete(wr.wait_closed()) - def test_wait_closed_on_close_with_unread_data(self): - with test_utils.run_test_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - stream.close() - self.loop.run_until_complete(stream.wait_closed()) - def test_del_stream_before_sock_closing(self): messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - async def test(): - - with test_utils.run_test_server() as httpd: - stream = await asyncio.connect(*httpd.address) - sock = stream.get_extra_info('socket') - self.assertNotEqual(sock.fileno(), -1) + with test_utils.run_test_server() as httpd: + with self.assertWarns(DeprecationWarning): + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address, loop=self.loop)) + sock = wr.get_extra_info('socket') + self.assertNotEqual(sock.fileno(), -1) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') + wr.write(b'GET / HTTP/1.0\r\n\r\n') + f = rd.readline() + data = self.loop.run_until_complete(f) + self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - # drop refs to reader/writer - del stream - gc.collect() - # make a chance to close the socket - await asyncio.sleep(0) + # drop refs to reader/writer + del rd + del wr + gc.collect() + # make a chance to close the socket + test_utils.run_briefly(self.loop) - self.assertEqual(1, len(messages), messages) - self.assertEqual(sock.fileno(), -1) + self.assertEqual(1, len(messages)) + self.assertEqual(sock.fileno(), -1) - self.loop.run_until_complete(test()) - self.assertEqual(1, len(messages), messages) + self.assertEqual(1, len(messages)) self.assertEqual('An open stream object is being garbage ' 'collected; call "stream.close()" explicitly.', messages[0]['message']) @@ -1146,12 +960,9 @@ os.close(fd) self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) with test_utils.run_test_server() as httpd: - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - pr = _StreamProtocol(stream, loop=self.loop, - _asyncio_internal=True) - del stream + rd = asyncio.StreamReader(loop=self.loop) + pr = asyncio.StreamReaderProtocol(rd, loop=self.loop) + del rd gc.collect() tr, _ = self.loop.run_until_complete( self.loop.create_connection( @@ -1168,14 +979,15 @@ os.close(fd) def test_async_writer_api(self): async def inner(httpd): - stream = await asyncio.connect(*httpd.address) + rd, wr = await asyncio.open_connection(*httpd.address) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() + wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() + data = await rd.read() self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - await stream.close() + wr.close() + await wr.wait_closed() messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) @@ -1187,16 +999,17 @@ os.close(fd) def test_async_writer_api_exception_after_close(self): async def inner(httpd): - stream = await asyncio.connect(*httpd.address) + rd, wr = await asyncio.open_connection(*httpd.address) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() + wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() + data = await rd.read() self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - stream.close() + wr.close() with self.assertRaises(ConnectionResetError): - await stream.write(b'data') + wr.write(b'data') + await wr.drain() messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) @@ -1227,587 +1040,6 @@ os.close(fd) self.assertEqual(messages, []) - def test_stream_reader_create_warning(self): - with contextlib.suppress(AttributeError): - del asyncio.StreamReader - with self.assertWarns(DeprecationWarning): - asyncio.StreamReader - - def test_stream_writer_create_warning(self): - with contextlib.suppress(AttributeError): - del asyncio.StreamWriter - with self.assertWarns(DeprecationWarning): - asyncio.StreamWriter - - def test_stream_reader_forbidden_ops(self): - async def inner(): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.write(b'data') - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.writelines([b'data', b'other']) - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - stream.write_eof() - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.drain() - - self.loop.run_until_complete(inner()) - - def test_stream_writer_forbidden_ops(self): - async def inner(): - stream = asyncio.Stream(mode=asyncio.StreamMode.WRITE, - _asyncio_internal=True) - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - stream._feed_data(b'data') - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readline() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readuntil() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.read() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readexactly(10) - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - async for chunk in stream: - pass - - self.loop.run_until_complete(inner()) - - def _basetest_connect(self, stream): - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - f = stream.read() - data = self.loop.run_until_complete(f) - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - stream.close() - self.loop.run_until_complete(stream.wait_closed()) - - self.assertEqual([], messages) - - def test_connect(self): - with test_utils.run_test_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - self.assertFalse(stream.is_server_side()) - self._basetest_connect(stream) - - @support.skip_unless_bind_unix_socket - def test_connect_unix(self): - with test_utils.run_test_unix_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect_unix(httpd.address)) - self._basetest_connect(stream) - - def test_stream_async_context_manager(self): - async def test(httpd): - stream = await asyncio.connect(*httpd.address) - async with stream: - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertTrue(stream.is_closing()) - - with test_utils.run_test_server() as httpd: - self.loop.run_until_complete(test(httpd)) - - def test_connect_async_context_manager(self): - async def test(httpd): - async with asyncio.connect(*httpd.address) as stream: - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertTrue(stream.is_closing()) - - with test_utils.run_test_server() as httpd: - self.loop.run_until_complete(test(httpd)) - - @support.skip_unless_bind_unix_socket - def test_connect_unix_async_context_manager(self): - async def test(httpd): - async with asyncio.connect_unix(httpd.address) as stream: - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertTrue(stream.is_closing()) - - with test_utils.run_test_unix_server() as httpd: - self.loop.run_until_complete(test(httpd)) - - def test_stream_server(self): - - async def handle_client(stream): - self.assertTrue(stream.is_server_side()) - data = await stream.readline() - await stream.write(data) - await stream.close() - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - # send a line - await stream.write(b"hello world!\n") - # read it back - msgback = await stream.readline() - await stream.close() - self.assertEqual(msgback, b"hello world!\n") - await srv.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - with contextlib.suppress(asyncio.CancelledError): - await server.serve_forever() - await task - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - @support.skip_unless_bind_unix_socket - def test_unix_stream_server(self): - - async def handle_client(stream): - data = await stream.readline() - await stream.write(data) - await stream.close() - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect_unix(addr) - # send a line - await stream.write(b"hello world!\n") - # read it back - msgback = await stream.readline() - await stream.close() - self.assertEqual(msgback, b"hello world!\n") - await srv.close() - - async def test(): - with test_utils.unix_socket_path() as path: - async with asyncio.UnixStreamServer(handle_client, path) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - with contextlib.suppress(asyncio.CancelledError): - await server.serve_forever() - await task - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - def test_stream_server_inheritance_forbidden(self): - with self.assertRaises(TypeError): - class MyServer(asyncio.StreamServer): - pass - - @support.skip_unless_bind_unix_socket - def test_unix_stream_server_inheritance_forbidden(self): - with self.assertRaises(TypeError): - class MyServer(asyncio.UnixStreamServer): - pass - - def test_stream_server_bind(self): - async def handle_client(stream): - await stream.close() - - async def test(): - srv = asyncio.StreamServer(handle_client, '127.0.0.1', 0) - self.assertFalse(srv.is_bound()) - self.assertEqual(0, len(srv.sockets)) - await srv.bind() - self.assertTrue(srv.is_bound()) - self.assertEqual(1, len(srv.sockets)) - await srv.close() - self.assertFalse(srv.is_bound()) - self.assertEqual(0, len(srv.sockets)) - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - def test_stream_server_bind_async_with(self): - async def handle_client(stream): - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as srv: - self.assertTrue(srv.is_bound()) - self.assertEqual(1, len(srv.sockets)) - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - def test_stream_server_start_serving(self): - async def handle_client(stream): - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as srv: - self.assertFalse(srv.is_serving()) - await srv.start_serving() - self.assertTrue(srv.is_serving()) - await srv.close() - self.assertFalse(srv.is_serving()) - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - def test_stream_server_close(self): - server_stream_aborted = False - fut1 = self.loop.create_future() - fut2 = self.loop.create_future() - - async def handle_client(stream): - data = await stream.readexactly(4) - self.assertEqual(b'data', data) - fut1.set_result(None) - await fut2 - self.assertEqual(b'', await stream.readline()) - nonlocal server_stream_aborted - server_stream_aborted = True - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - await stream.write(b'data') - await fut2 - self.assertEqual(b'', await stream.readline()) - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - await fut1 - fut2.set_result(None) - await server.close() - await task - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(asyncio.wait_for(test(), 60.0)) - self.assertEqual(messages, []) - self.assertTrue(fut1.done()) - self.assertTrue(fut2.done()) - self.assertTrue(server_stream_aborted) - - def test_stream_server_abort(self): - server_stream_aborted = False - fut1 = self.loop.create_future() - fut2 = self.loop.create_future() - - async def handle_client(stream): - data = await stream.readexactly(4) - self.assertEqual(b'data', data) - fut1.set_result(None) - await fut2 - self.assertEqual(b'', await stream.readline()) - nonlocal server_stream_aborted - server_stream_aborted = True - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - await stream.write(b'data') - await fut2 - self.assertEqual(b'', await stream.readline()) - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - await fut1 - fut2.set_result(None) - await server.abort() - await task - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(asyncio.wait_for(test(), 60.0)) - self.assertEqual(messages, []) - self.assertTrue(fut1.done()) - self.assertTrue(fut2.done()) - self.assertTrue(server_stream_aborted) - - def test_stream_shutdown_hung_task(self): - fut1 = self.loop.create_future() - fut2 = self.loop.create_future() - cancelled = self.loop.create_future() - - async def handle_client(stream): - data = await stream.readexactly(4) - self.assertEqual(b'data', data) - fut1.set_result(None) - await fut2 - try: - while True: - await asyncio.sleep(0.01) - except asyncio.CancelledError: - cancelled.set_result(None) - raise - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - await stream.write(b'data') - await fut2 - self.assertEqual(b'', await stream.readline()) - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, - '127.0.0.1', - 0, - shutdown_timeout=0.3) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - await fut1 - fut2.set_result(None) - await server.close() - await task - await cancelled - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(asyncio.wait_for(test(), 60.0)) - self.assertEqual(messages, []) - self.assertTrue(fut1.done()) - self.assertTrue(fut2.done()) - self.assertTrue(cancelled.done()) - - def test_stream_shutdown_hung_task_prevents_cancellation(self): - fut1 = self.loop.create_future() - fut2 = self.loop.create_future() - cancelled = self.loop.create_future() - do_handle_client = True - - async def handle_client(stream): - data = await stream.readexactly(4) - self.assertEqual(b'data', data) - fut1.set_result(None) - await fut2 - while do_handle_client: - with contextlib.suppress(asyncio.CancelledError): - await asyncio.sleep(0.01) - cancelled.set_result(None) - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - await stream.write(b'data') - await fut2 - self.assertEqual(b'', await stream.readline()) - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, - '127.0.0.1', - 0, - shutdown_timeout=0.3) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - await fut1 - fut2.set_result(None) - await server.close() - nonlocal do_handle_client - do_handle_client = False - await task - await cancelled - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(asyncio.wait_for(test(), 60.0)) - self.assertEqual(1, len(messages)) - self.assertRegex(messages[0]['message'], - "<Task pending .+ ignored cancellation request") - self.assertTrue(fut1.done()) - self.assertTrue(fut2.done()) - self.assertTrue(cancelled.done()) - - def test_sendfile(self): - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - - with open(support.TESTFN, 'wb') as fp: - fp.write(b'data\n') - self.addCleanup(support.unlink, support.TESTFN) - - async def serve_callback(stream): - data = await stream.readline() - await stream.write(b'ack-' + data) - data = await stream.readline() - await stream.write(b'ack-' + data) - data = await stream.readline() - await stream.write(b'ack-' + data) - await stream.close() - - async def do_connect(host, port): - stream = await asyncio.connect(host, port) - await stream.write(b'begin\n') - data = await stream.readline() - self.assertEqual(b'ack-begin\n', data) - with open(support.TESTFN, 'rb') as fp: - await stream.sendfile(fp) - data = await stream.readline() - self.assertEqual(b'ack-data\n', data) - await stream.write(b'end\n') - data = await stream.readline() - self.assertEqual(data, b'ack-end\n') - await stream.close() - - async def test(): - async with asyncio.StreamServer(serve_callback, '127.0.0.1', 0) as srv: - await srv.start_serving() - await do_connect(*srv.sockets[0].getsockname()) - - self.loop.run_until_complete(test()) - - self.assertEqual([], messages) - - - @unittest.skipIf(ssl is None, 'No ssl module') - def test_connect_start_tls(self): - with test_utils.run_test_server(use_ssl=True) as httpd: - # connect without SSL but upgrade to TLS just after - # connection is established - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - - self.loop.run_until_complete( - stream.start_tls( - sslcontext=test_utils.dummy_ssl_context())) - self._basetest_connect(stream) - - def test_repr_unbound(self): - async def serve(stream): - pass - - async def test(): - srv = asyncio.StreamServer(serve) - self.assertEqual('<StreamServer>', repr(srv)) - await srv.close() - - self.loop.run_until_complete(test()) - - def test_repr_bound(self): - async def serve(stream): - pass - - async def test(): - srv = asyncio.StreamServer(serve, '127.0.0.1', 0) - await srv.bind() - self.assertRegex(repr(srv), r'<StreamServer sockets=\(.+\)>') - await srv.close() - - self.loop.run_until_complete(test()) - - def test_repr_serving(self): - async def serve(stream): - pass - - async def test(): - srv = asyncio.StreamServer(serve, '127.0.0.1', 0) - await srv.start_serving() - self.assertRegex(repr(srv), r'<StreamServer serving sockets=\(.+\)>') - await srv.close() - - self.loop.run_until_complete(test()) - - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - def test_read_pipe(self): - async def test(): - rpipe, wpipe = os.pipe() - pipeobj = io.open(rpipe, 'rb', 1024) - - async with asyncio.connect_read_pipe(pipeobj) as stream: - self.assertEqual(stream.mode, asyncio.StreamMode.READ) - - os.write(wpipe, b'1') - data = await stream.readexactly(1) - self.assertEqual(data, b'1') - - os.write(wpipe, b'2345') - data = await stream.readexactly(4) - self.assertEqual(data, b'2345') - os.close(wpipe) - - self.loop.run_until_complete(test()) - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - def test_write_pipe(self): - async def test(): - rpipe, wpipe = os.pipe() - pipeobj = io.open(wpipe, 'wb', 1024) - - async with asyncio.connect_write_pipe(pipeobj) as stream: - self.assertEqual(stream.mode, asyncio.StreamMode.WRITE) - - await stream.write(b'1') - data = os.read(rpipe, 1024) - self.assertEqual(data, b'1') - - await stream.write(b'2345') - data = os.read(rpipe, 1024) - self.assertEqual(data, b'2345') - - os.close(rpipe) - - self.loop.run_until_complete(test()) - - def test_stream_ctor_forbidden(self): - with self.assertRaisesRegex(RuntimeError, - "should be instantiated " - "by asyncio internals only"): - asyncio.Stream(asyncio.StreamMode.READWRITE) - - def test_deprecated_methods(self): - async def f(): - return asyncio.Stream(mode=asyncio.StreamMode.READWRITE, - _asyncio_internal=True) - - stream = self.loop.run_until_complete(f()) - - tr = mock.Mock() - - with self.assertWarns(DeprecationWarning): - stream.set_transport(tr) - - with self.assertWarns(DeprecationWarning): - stream.transport is tr - - with self.assertWarns(DeprecationWarning): - stream.feed_data(b'data') - - with self.assertWarns(DeprecationWarning): - stream.feed_eof() - - with self.assertWarns(DeprecationWarning): - stream.set_exception(ConnectionResetError("test")) - if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 3ad18e5..fe8cfa6 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -582,18 +582,6 @@ class SubprocessMixin: self.loop.run_until_complete(execute()) - def test_subprocess_protocol_create_warning(self): - with self.assertWarns(DeprecationWarning): - subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop) - - def test_process_create_warning(self): - proto = subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop, - _asyncio_internal=True) - transp = mock.Mock() - - with self.assertWarns(DeprecationWarning): - subprocess.Process(transp, proto, loop=self.loop) - def test_create_subprocess_exec_text_mode_fails(self): async def execute(): with self.assertRaises(ValueError): diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index d0ba193..9ed10fc 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -15,7 +15,6 @@ import _winapi import asyncio from asyncio import windows_events -from asyncio.streams import _StreamProtocol from test.test_asyncio import utils as test_utils @@ -118,16 +117,14 @@ class ProactorTests(test_utils.TestCase): clients = [] for i in range(5): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, _asyncio_internal=True) - protocol = _StreamProtocol(stream, - loop=self.loop, - _asyncio_internal=True) + stream_reader = asyncio.StreamReader(loop=self.loop) + protocol = asyncio.StreamReaderProtocol(stream_reader, + loop=self.loop) trans, proto = await self.loop.create_pipe_connection( lambda: protocol, ADDRESS) self.assertIsInstance(trans, asyncio.Transport) self.assertEqual(protocol, proto) - clients.append((stream, trans)) + clients.append((stream_reader, trans)) for i, (r, w) in enumerate(clients): w.write('lower-{}\n'.format(i).encode()) @@ -136,7 +133,6 @@ class ProactorTests(test_utils.TestCase): response = await r.readline() self.assertEqual(response, 'LOWER-{}\n'.format(i).encode()) w.close() - await r.close() server.close() diff --git a/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst b/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst new file mode 100644 index 0000000..be9da89 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-09-30-00-15-27.bpo-38242.uPIyAc.rst @@ -0,0 +1 @@ +Revert the new asyncio Streams API |