diff options
author | Yury Selivanov <yselivanov@sprymix.com> | 2014-02-18 23:41:13 (GMT) |
---|---|---|
committer | Yury Selivanov <yselivanov@sprymix.com> | 2014-02-18 23:41:13 (GMT) |
commit | c098241342f9f0636e98b81d016564cc4153f158 (patch) | |
tree | 3de1f353e91e15070544ec32a65166b2e5e168c3 /Lib/asyncio/transports.py | |
parent | ff827f08ac9201f56b14cb19ccb9d511434c858b (diff) | |
download | cpython-c098241342f9f0636e98b81d016564cc4153f158.zip cpython-c098241342f9f0636e98b81d016564cc4153f158.tar.gz cpython-c098241342f9f0636e98b81d016564cc4153f158.tar.bz2 |
asyncio.transports: Make _ProactorBasePipeTransport use _FlowControlMixin
Diffstat (limited to 'Lib/asyncio/transports.py')
-rw-r--r-- | Lib/asyncio/transports.py | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py index 67ae7fd..5b975aa 100644 --- a/Lib/asyncio/transports.py +++ b/Lib/asyncio/transports.py @@ -219,3 +219,73 @@ class SubprocessTransport(BaseTransport): http://docs.python.org/3/library/subprocess#subprocess.Popen.kill """ raise NotImplementedError + + +class _FlowControlMixin(Transport): + """All the logic for (write) flow control in a mix-in base class. + + The subclass must implement get_write_buffer_size(). It must call + _maybe_pause_protocol() whenever the write buffer size increases, + and _maybe_resume_protocol() whenever it decreases. It may also + override set_write_buffer_limits() (e.g. to specify different + defaults). + + The subclass constructor must call super().__init__(extra). This + will call set_write_buffer_limits(). + + The user may call set_write_buffer_limits() and + get_write_buffer_size(), and their protocol's pause_writing() and + resume_writing() may be called. + """ + + def __init__(self, extra=None): + super().__init__(extra) + self._protocol_paused = False + self.set_write_buffer_limits() + + def _maybe_pause_protocol(self): + size = self.get_write_buffer_size() + if size <= self._high_water: + return + if not self._protocol_paused: + self._protocol_paused = True + try: + self._protocol.pause_writing() + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.pause_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + def _maybe_resume_protocol(self): + if (self._protocol_paused and + self.get_write_buffer_size() <= self._low_water): + self._protocol_paused = False + try: + self._protocol.resume_writing() + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.resume_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + def set_write_buffer_limits(self, high=None, low=None): + if high is None: + if low is None: + high = 64*1024 + else: + high = 4*low + if low is None: + low = high // 4 + if not high >= low >= 0: + raise ValueError('high (%r) must be >= low (%r) must be >= 0' % + (high, low)) + self._high_water = high + self._low_water = low + + def get_write_buffer_size(self): + raise NotImplementedError |