From 11194c877c902a6c3b769d85be887c2272e0a541 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Thu, 13 Sep 2018 16:53:49 -0700 Subject: bpo-34666: Implement stream.awrite() and stream.aclose() (GH-9274) --- Doc/library/asyncio-stream.rst | 66 +++++++++++++++------- Lib/asyncio/streams.py | 10 +++- Lib/test/test_asyncio/test_streams.py | 22 ++++++++ .../2018-09-13-11-49-52.bpo-34666.3uLtWv.rst | 3 + 4 files changed, 79 insertions(+), 22 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-09-13-11-49-52.bpo-34666.3uLtWv.rst diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index 0cfecda..80b7625 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -20,13 +20,13 @@ streams:: '127.0.0.1', 8888) print(f'Send: {message!r}') - writer.write(message.encode()) + await writer.awrite(message.encode()) data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') - writer.close() + await writer.aclose() asyncio.run(tcp_echo_client('Hello World!')) @@ -229,14 +229,57 @@ StreamWriter directly; use :func:`open_connection` and :func:`start_server` instead. + .. coroutinemethod:: awrite(data) + + Write *data* to the stream. + + The method respects control-flow, execution is paused if write + buffer reaches high-water limit. + + .. versionadded:: 3.8 + + .. coroutinemethod:: aclose() + + Close the stream. + + Wait for finishing all closing actions, e.g. SSL shutdown for + secure sockets. + + .. versionadded:: 3.8 + + .. method:: can_write_eof() + + Return *True* if the underlying transport supports + the :meth:`write_eof` method, *False* otherwise. + + .. method:: write_eof() + + Close the write end of the stream after the buffered write + data is flushed. + + .. attribute:: transport + + Return the underlying asyncio transport. + + .. method:: get_extra_info(name, default=None) + + Access optional transport information; see + :meth:`BaseTransport.get_extra_info` for details. + .. method:: write(data) Write *data* to the stream. + This method doesn't apply control-flow. The call should be + followed by :meth:`drain`. + .. method:: writelines(data) Write a list (or any iterable) of bytes to the stream. + This method doesn't apply control-flow. The call should be + followed by :meth:`drain`. + .. coroutinemethod:: drain() Wait until it is appropriate to resume writing to the stream. @@ -272,25 +315,6 @@ StreamWriter .. versionadded:: 3.7 - .. method:: can_write_eof() - - Return *True* if the underlying transport supports - the :meth:`write_eof` method, *False* otherwise. - - .. method:: write_eof() - - Close the write end of the stream after the buffered write - data is flushed. - - .. attribute:: transport - - Return the underlying asyncio transport. - - .. method:: get_extra_info(name, default=None) - - Access optional transport information; see - :meth:`BaseTransport.get_extra_info` for details. - Examples ======== diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index e7fb22e..0afc66a 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -348,7 +348,7 @@ class StreamWriter: # a reader can be garbage collected # after connection closing self._protocol._untrack_reader() - return self._transport.close() + self._transport.close() def is_closing(self): return self._transport.is_closing() @@ -381,6 +381,14 @@ class StreamWriter: await sleep(0, loop=self._loop) await self._protocol._drain_helper() + async def aclose(self): + self.close() + await self.wait_closed() + + async def awrite(self, data): + self.write(data) + await self.drain() + class StreamReader: diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 67ac9d9..d8e3715 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -964,6 +964,28 @@ os.close(fd) 'call "stream.close()" explicitly.', messages[0]['message']) + def test_async_writer_api(self): + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) + + with test_utils.run_test_server() as httpd: + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address, + loop=self.loop)) + + f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n') + self.loop.run_until_complete(f) + f = rd.readline() + data = self.loop.run_until_complete(f) + self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') + f = rd.read() + data = self.loop.run_until_complete(f) + self.assertTrue(data.endswith(b'\r\n\r\nTest message')) + f = wr.aclose() + self.loop.run_until_complete(f) + + self.assertEqual(messages, []) + if __name__ == '__main__': unittest.main() diff --git a/Misc/NEWS.d/next/Library/2018-09-13-11-49-52.bpo-34666.3uLtWv.rst b/Misc/NEWS.d/next/Library/2018-09-13-11-49-52.bpo-34666.3uLtWv.rst new file mode 100644 index 0000000..be82cfe --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-09-13-11-49-52.bpo-34666.3uLtWv.rst @@ -0,0 +1,3 @@ +Implement ``asyncio.StreamWriter.awrite`` and +``asyncio.StreamWriter.aclose()`` coroutines. Methods are needed for +providing a consistent stream API with control flow switched on by default. -- cgit v0.12