summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2019-05-09 19:14:58 (GMT)
committerGitHub <noreply@github.com>2019-05-09 19:14:58 (GMT)
commita076e4f5e42b85664693191d04cfb33e2f9acfa5 (patch)
tree3b70de541e9e742bde047631a2db60078ad18e35
parent3b2f9ab31db81405650325920465378532ab2d78 (diff)
downloadcpython-a076e4f5e42b85664693191d04cfb33e2f9acfa5.zip
cpython-a076e4f5e42b85664693191d04cfb33e2f9acfa5.tar.gz
cpython-a076e4f5e42b85664693191d04cfb33e2f9acfa5.tar.bz2
bpo-36802: Drop awrite()/aclose(), support await write() and await close() instead (#13099)
-rw-r--r--Doc/library/asyncio-stream.rst90
-rw-r--r--Lib/asyncio/streams.py35
-rw-r--r--Lib/test/test_asyncio/test_streams.py42
-rw-r--r--Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst2
4 files changed, 118 insertions, 51 deletions
diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index e686a6a..e735b81 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -22,13 +22,13 @@ streams::
'127.0.0.1', 8888)
print(f'Send: {message!r}')
- await writer.awrite(message.encode())
+ await writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
- await writer.aclose()
+ await writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
@@ -226,23 +226,70 @@ StreamWriter
directly; use :func:`open_connection` and :func:`start_server`
instead.
- .. coroutinemethod:: awrite(data)
+ .. method:: write(data)
+
+ The method attempts to write the *data* to the underlying socket immediately.
+ If that fails, the data is queued in an internal write buffer until it can be
+ sent.
+
+ Starting with Python 3.8, it is possible to directly await on the `write()`
+ method::
+
+ await stream.write(data)
+
+ The ``await`` pauses the current coroutine until the data is written to the
+ socket.
+
+ Below is an equivalent code that works with Python <= 3.7::
+
+ stream.write(data)
+ await stream.drain()
+
+ .. versionchanged:: 3.8
+ Support ``await stream.write(...)`` syntax.
+
+ .. method:: writelines(data)
+
+ The method writes a list (or any iterable) of bytes to the underlying socket
+ immediately.
+ If that fails, the data is queued in an internal write buffer until it can be
+ sent.
+
+ Starting with Python 3.8, it is possible to directly await on the `write()`
+ method::
+
+ await stream.writelines(lines)
+
+ The ``await`` pauses the current coroutine until the data is written to the
+ socket.
+
+ Below is an equivalent code that works with Python <= 3.7::
- Write *data* to the stream.
+ stream.writelines(lines)
+ await stream.drain()
- The method respects flow control, execution is paused if the write
- buffer reaches the high watermark.
+ .. versionchanged:: 3.8
+ Support ``await stream.writelines()`` syntax.
- .. versionadded:: 3.8
+ .. method:: close()
+
+ The method closes the stream and the underlying socket.
+
+ Starting with Python 3.8, it is possible to directly await on the `close()`
+ method::
+
+ await stream.close()
- .. coroutinemethod:: aclose()
+ The ``await`` pauses the current coroutine until the stream and the underlying
+ socket are closed (and SSL shutdown is performed for a secure connection).
- Close the stream.
+ Below is an equivalent code that works with Python <= 3.7::
- Wait until all closing actions are complete, e.g. SSL shutdown for
- secure sockets.
+ stream.close()
+ await stream.wait_closed()
- .. versionadded:: 3.8
+ .. versionchanged:: 3.8
+ Support ``await stream.close()`` syntax.
.. method:: can_write_eof()
@@ -263,21 +310,6 @@ StreamWriter
Access optional transport information; see
:meth:`BaseTransport.get_extra_info` for details.
- .. method:: write(data)
-
- Write *data* to the stream.
-
- This method is not subject to flow control. Calls to ``write()`` should
- be followed by :meth:`drain`. The :meth:`awrite` method is a
- recommended alternative the applies flow control automatically.
-
- .. method:: writelines(data)
-
- Write a list (or any iterable) of bytes to the stream.
-
- This method is not subject to flow control. Calls to ``writelines()``
- should be followed by :meth:`drain`.
-
.. coroutinemethod:: drain()
Wait until it is appropriate to resume writing to the stream.
@@ -293,10 +325,6 @@ StreamWriter
be resumed. When there is nothing to wait for, the :meth:`drain`
returns immediately.
- .. method:: close()
-
- Close the stream.
-
.. method:: is_closing()
Return ``True`` if the stream is closed or in the process of
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 79adf02..d9a9f5e 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -352,6 +352,8 @@ class StreamWriter:
assert reader is None or isinstance(reader, StreamReader)
self._reader = reader
self._loop = loop
+ self._complete_fut = self._loop.create_future()
+ self._complete_fut.set_result(None)
def __repr__(self):
info = [self.__class__.__name__, f'transport={self._transport!r}']
@@ -365,9 +367,33 @@ class StreamWriter:
def write(self, data):
self._transport.write(data)
+ return self._fast_drain()
def writelines(self, data):
self._transport.writelines(data)
+ return self._fast_drain()
+
+ def _fast_drain(self):
+ # The helper tries to use fast-path to return already existing complete future
+ # object if underlying transport is not paused and actual waiting for writing
+ # resume is not needed
+ if self._reader is not None:
+ # this branch will be simplified after merging reader with writer
+ exc = self._reader.exception()
+ if exc is not None:
+ fut = self._loop.create_future()
+ fut.set_exception(exc)
+ return fut
+ if not self._transport.is_closing():
+ if self._protocol._connection_lost:
+ fut = self._loop.create_future()
+ fut.set_exception(ConnectionResetError('Connection lost'))
+ return fut
+ if not self._protocol._paused:
+ # fast path, the stream is not paused
+ # no need to wait for resume signal
+ return self._complete_fut
+ return self._loop.create_task(self.drain())
def write_eof(self):
return self._transport.write_eof()
@@ -377,6 +403,7 @@ class StreamWriter:
def close(self):
self._transport.close()
+ return self._protocol._get_close_waiter(self)
def is_closing(self):
return self._transport.is_closing()
@@ -408,14 +435,6 @@ class StreamWriter:
raise ConnectionResetError('Connection lost')
await self._protocol._drain_helper()
- async def aclose(self):
- self.close()
- await self.wait_closed()
-
- async def awrite(self, data):
- self.write(data)
- await self.drain()
-
class StreamReader:
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index 905141c..bf93f30 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -1035,24 +1035,42 @@ os.close(fd)
messages[0]['message'])
def test_async_writer_api(self):
+ async def inner(httpd):
+ rd, wr = await asyncio.open_connection(*httpd.address)
+
+ await wr.write(b'GET / HTTP/1.0\r\n\r\n')
+ data = await rd.readline()
+ self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
+ data = await rd.read()
+ self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+ await wr.close()
+
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
with test_utils.run_test_server() as httpd:
- rd, wr = self.loop.run_until_complete(
- asyncio.open_connection(*httpd.address,
- loop=self.loop))
+ self.loop.run_until_complete(inner(httpd))
- f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n')
- self.loop.run_until_complete(f)
- f = rd.readline()
- data = self.loop.run_until_complete(f)
+ self.assertEqual(messages, [])
+
+ def test_async_writer_api(self):
+ async def inner(httpd):
+ rd, wr = await asyncio.open_connection(*httpd.address)
+
+ await wr.write(b'GET / HTTP/1.0\r\n\r\n')
+ data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
- f = rd.read()
- data = self.loop.run_until_complete(f)
+ data = await rd.read()
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
- f = wr.aclose()
- self.loop.run_until_complete(f)
+ wr.close()
+ with self.assertRaises(ConnectionResetError):
+ await wr.write(b'data')
+
+ messages = []
+ self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
+
+ with test_utils.run_test_server() as httpd:
+ self.loop.run_until_complete(inner(httpd))
self.assertEqual(messages, [])
@@ -1066,7 +1084,7 @@ os.close(fd)
asyncio.open_connection(*httpd.address,
loop=self.loop))
- f = wr.aclose()
+ f = wr.close()
self.loop.run_until_complete(f)
assert rd.at_eof()
f = rd.read()
diff --git a/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst b/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst
new file mode 100644
index 0000000..f59863b
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst
@@ -0,0 +1,2 @@
+Provide both sync and async calls for StreamWriter.write() and
+StreamWriter.close()