summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorYury Selivanov <yselivanov@sprymix.com>2014-02-18 23:41:13 (GMT)
committerYury Selivanov <yselivanov@sprymix.com>2014-02-18 23:41:13 (GMT)
commitc098241342f9f0636e98b81d016564cc4153f158 (patch)
tree3de1f353e91e15070544ec32a65166b2e5e168c3 /Lib/asyncio
parentff827f08ac9201f56b14cb19ccb9d511434c858b (diff)
downloadcpython-c098241342f9f0636e98b81d016564cc4153f158.zip
cpython-c098241342f9f0636e98b81d016564cc4153f158.tar.gz
cpython-c098241342f9f0636e98b81d016564cc4153f158.tar.bz2
asyncio.transports: Make _ProactorBasePipeTransport use _FlowControlMixin
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/proactor_events.py55
-rw-r--r--Lib/asyncio/selector_events.py73
-rw-r--r--Lib/asyncio/transports.py70
-rw-r--r--Lib/asyncio/unix_events.py2
4 files changed, 75 insertions, 125 deletions
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index b2ac632..d72f927 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -15,7 +15,8 @@ from . import transports
from .log import logger
-class _ProactorBasePipeTransport(transports.BaseTransport):
+class _ProactorBasePipeTransport(transports._FlowControlMixin,
+ transports.BaseTransport):
"""Base class for pipe and socket transports."""
def __init__(self, loop, sock, protocol, waiter=None,
@@ -33,8 +34,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
self._conn_lost = 0
self._closing = False # Set when close() called.
self._eof_written = False
- self._protocol_paused = False
- self.set_write_buffer_limits()
if self._server is not None:
self._server.attach(self)
self._loop.call_soon(self._protocol.connection_made, self)
@@ -94,56 +93,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
server.detach(self)
self._server = None
- # XXX The next four methods are nearly identical to corresponding
- # ones in _SelectorTransport. Maybe refactor buffer management to
- # share the implementations? (Also these are really only needed
- # by _ProactorWritePipeTransport but since _buffer is defined on
- # the base class I am putting it here for now.)
-
- def _maybe_pause_protocol(self):
- size = self.get_write_buffer_size()
- if size <= self._high_water:
- return
- if not self._protocol_paused:
- self._protocol_paused = True
- try:
- self._protocol.pause_writing()
- except Exception as exc:
- self._loop.call_exception_handler({
- 'message': 'protocol.pause_writing() failed',
- 'exception': exc,
- 'transport': self,
- 'protocol': self._protocol,
- })
-
- def _maybe_resume_protocol(self):
- if (self._protocol_paused and
- self.get_write_buffer_size() <= self._low_water):
- self._protocol_paused = False
- try:
- self._protocol.resume_writing()
- except Exception as exc:
- self._loop.call_exception_handler({
- 'message': 'protocol.resume_writing() failed',
- 'exception': exc,
- 'transport': self,
- 'protocol': self._protocol,
- })
-
- def set_write_buffer_limits(self, high=None, low=None):
- if high is None:
- if low is None:
- high = 64*1024
- else:
- high = 4*low
- if low is None:
- low = high // 4
- if not high >= low >= 0:
- raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
- (high, low))
- self._high_water = high
- self._low_water = low
-
def get_write_buffer_size(self):
size = self._pending_write
if self._buffer is not None:
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index fb86f82..869d66e 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -338,77 +338,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
sock.close()
-class _FlowControlMixin(transports.Transport):
- """All the logic for (write) flow control in a mix-in base class.
-
- The subclass must implement get_write_buffer_size(). It must call
- _maybe_pause_protocol() whenever the write buffer size increases,
- and _maybe_resume_protocol() whenever it decreases. It may also
- override set_write_buffer_limits() (e.g. to specify different
- defaults).
-
- The subclass constructor must call super().__init__(extra). This
- will call set_write_buffer_limits().
-
- The user may call set_write_buffer_limits() and
- get_write_buffer_size(), and their protocol's pause_writing() and
- resume_writing() may be called.
- """
-
- def __init__(self, extra=None):
- super().__init__(extra)
- self._protocol_paused = False
- self.set_write_buffer_limits()
-
- def _maybe_pause_protocol(self):
- size = self.get_write_buffer_size()
- if size <= self._high_water:
- return
- if not self._protocol_paused:
- self._protocol_paused = True
- try:
- self._protocol.pause_writing()
- except Exception as exc:
- self._loop.call_exception_handler({
- 'message': 'protocol.pause_writing() failed',
- 'exception': exc,
- 'transport': self,
- 'protocol': self._protocol,
- })
-
- def _maybe_resume_protocol(self):
- if (self._protocol_paused and
- self.get_write_buffer_size() <= self._low_water):
- self._protocol_paused = False
- try:
- self._protocol.resume_writing()
- except Exception as exc:
- self._loop.call_exception_handler({
- 'message': 'protocol.resume_writing() failed',
- 'exception': exc,
- 'transport': self,
- 'protocol': self._protocol,
- })
-
- def set_write_buffer_limits(self, high=None, low=None):
- if high is None:
- if low is None:
- high = 64*1024
- else:
- high = 4*low
- if low is None:
- low = high // 4
- if not high >= low >= 0:
- raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
- (high, low))
- self._high_water = high
- self._low_water = low
-
- def get_write_buffer_size(self):
- raise NotImplementedError
-
-
-class _SelectorTransport(_FlowControlMixin, transports.Transport):
+class _SelectorTransport(transports._FlowControlMixin,
+ transports.Transport):
max_size = 256 * 1024 # Buffer size passed to recv().
diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py
index 67ae7fd..5b975aa 100644
--- a/Lib/asyncio/transports.py
+++ b/Lib/asyncio/transports.py
@@ -219,3 +219,73 @@ class SubprocessTransport(BaseTransport):
http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
"""
raise NotImplementedError
+
+
+class _FlowControlMixin(Transport):
+ """All the logic for (write) flow control in a mix-in base class.
+
+ The subclass must implement get_write_buffer_size(). It must call
+ _maybe_pause_protocol() whenever the write buffer size increases,
+ and _maybe_resume_protocol() whenever it decreases. It may also
+ override set_write_buffer_limits() (e.g. to specify different
+ defaults).
+
+ The subclass constructor must call super().__init__(extra). This
+ will call set_write_buffer_limits().
+
+ The user may call set_write_buffer_limits() and
+ get_write_buffer_size(), and their protocol's pause_writing() and
+ resume_writing() may be called.
+ """
+
+ def __init__(self, extra=None):
+ super().__init__(extra)
+ self._protocol_paused = False
+ self.set_write_buffer_limits()
+
+ def _maybe_pause_protocol(self):
+ size = self.get_write_buffer_size()
+ if size <= self._high_water:
+ return
+ if not self._protocol_paused:
+ self._protocol_paused = True
+ try:
+ self._protocol.pause_writing()
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.pause_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+
+ def _maybe_resume_protocol(self):
+ if (self._protocol_paused and
+ self.get_write_buffer_size() <= self._low_water):
+ self._protocol_paused = False
+ try:
+ self._protocol.resume_writing()
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.resume_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+
+ def set_write_buffer_limits(self, high=None, low=None):
+ if high is None:
+ if low is None:
+ high = 64*1024
+ else:
+ high = 4*low
+ if low is None:
+ low = high // 4
+ if not high >= low >= 0:
+ raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
+ (high, low))
+ self._high_water = high
+ self._low_water = low
+
+ def get_write_buffer_size(self):
+ raise NotImplementedError
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 9a40c04..748452c 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -317,7 +317,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
self._loop = None
-class _UnixWritePipeTransport(selector_events._FlowControlMixin,
+class _UnixWritePipeTransport(transports._FlowControlMixin,
transports.WriteTransport):
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):