summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/streams.py
diff options
context:
space:
mode:
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>2022-08-30 11:00:21 (GMT)
committerGitHub <noreply@github.com>2022-08-30 11:00:21 (GMT)
commit2e9f29e6a660a33f84d64a5bce5b422e72c06e39 (patch)
tree1271027d12786058e726036ece13af9a566d4cb4 /Lib/asyncio/streams.py
parent126ec34558ef9e2fd688cc85b0951b559ce6a889 (diff)
downloadcpython-2e9f29e6a660a33f84d64a5bce5b422e72c06e39.zip
cpython-2e9f29e6a660a33f84d64a5bce5b422e72c06e39.tar.gz
cpython-2e9f29e6a660a33f84d64a5bce5b422e72c06e39.tar.bz2
GH-74116: Allow multiple drain waiters for asyncio.StreamWriter (GH-94705) (#96395)
(cherry picked from commit e5b2453e61ba5376831093236d598ef5f9f1de61) Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
Diffstat (limited to 'Lib/asyncio/streams.py')
-rw-r--r--Lib/asyncio/streams.py35
1 files changed, 16 insertions, 19 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index af78e6b..d21a31d 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -2,6 +2,7 @@ __all__ = (
'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
'open_connection', 'start_server')
+import collections
import socket
import sys
import warnings
@@ -129,7 +130,7 @@ class FlowControlMixin(protocols.Protocol):
else:
self._loop = loop
self._paused = False
- self._drain_waiter = None
+ self._drain_waiters = collections.deque()
self._connection_lost = False
def pause_writing(self):
@@ -144,38 +145,34 @@ class FlowControlMixin(protocols.Protocol):
if self._loop.get_debug():
logger.debug("%r resumes writing", self)
- waiter = self._drain_waiter
- if waiter is not None:
- self._drain_waiter = None
+ for waiter in self._drain_waiters:
if not waiter.done():
waiter.set_result(None)
def connection_lost(self, exc):
self._connection_lost = True
- # Wake up the writer if currently paused.
+ # Wake up the writer(s) if currently paused.
if not self._paused:
return
- waiter = self._drain_waiter
- if waiter is None:
- return
- self._drain_waiter = None
- if waiter.done():
- return
- if exc is None:
- waiter.set_result(None)
- else:
- waiter.set_exception(exc)
+
+ for waiter in self._drain_waiters:
+ if not waiter.done():
+ if exc is None:
+ waiter.set_result(None)
+ else:
+ waiter.set_exception(exc)
async def _drain_helper(self):
if self._connection_lost:
raise ConnectionResetError('Connection lost')
if not self._paused:
return
- waiter = self._drain_waiter
- assert waiter is None or waiter.cancelled()
waiter = self._loop.create_future()
- self._drain_waiter = waiter
- await waiter
+ self._drain_waiters.append(waiter)
+ try:
+ await waiter
+ finally:
+ self._drain_waiters.remove(waiter)
def _get_close_waiter(self, stream):
raise NotImplementedError