From d757aaf9dd767d13205bf9917e520ebf43e7f6e5 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Mon, 18 Dec 2017 17:03:23 -0500 Subject: bpo-32356: idempotent pause_/resume_reading; new is_reading method. (#4914) --- Doc/library/asyncio-protocol.rst | 14 +++++++++++ Lib/asyncio/proactor_events.py | 15 ++++++------ Lib/asyncio/selector_events.py | 15 ++++++------ Lib/asyncio/sslproto.py | 6 +++++ Lib/asyncio/transports.py | 4 ++++ Lib/test/test_asyncio/test_proactor_events.py | 10 ++++++++ Lib/test/test_asyncio/test_selector_events.py | 28 +++++++++++++++++++--- Lib/test/test_asyncio/utils.py | 17 +++++++++---- .../2017-12-17-22-50-51.bpo-32356.roZJpA.rst | 2 ++ 9 files changed, 87 insertions(+), 24 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-12-17-22-50-51.bpo-32356.roZJpA.rst diff --git a/Doc/library/asyncio-protocol.rst b/Doc/library/asyncio-protocol.rst index a4b0d59..3aa1f2f 100644 --- a/Doc/library/asyncio-protocol.rst +++ b/Doc/library/asyncio-protocol.rst @@ -118,17 +118,31 @@ ReadTransport Interface for read-only transports. + .. method:: is_reading() + + Return ``True`` if the transport is receiving new data. + + .. versionadded:: 3.7 + .. method:: pause_reading() Pause the receiving end of the transport. No data will be passed to the protocol's :meth:`data_received` method until :meth:`resume_reading` is called. + .. versionchanged:: 3.7 + The method is idempotent, i.e. it can be called when the + transport is already paused or closed. + .. method:: resume_reading() Resume the receiving end. The protocol's :meth:`data_received` method will be called once again if some data is available for reading. + .. versionchanged:: 3.7 + The method is idempotent, i.e. it can be called when the + transport is already reading. + WriteTransport -------------- diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 291d989..915ad1a 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -152,21 +152,20 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self._paused = False self._loop.call_soon(self._loop_reading) + def is_reading(self): + return not self._paused and not self._closing + def pause_reading(self): - if self._closing: - raise RuntimeError('Cannot pause_reading() when closing') - if self._paused: - raise RuntimeError('Already paused') + if self._closing or self._paused: + return self._paused = True if self._loop.get_debug(): logger.debug("%r pauses reading", self) def resume_reading(self): - if not self._paused: - raise RuntimeError('Not paused') - self._paused = False - if self._closing: + if self._closing or not self._paused: return + self._paused = False self._loop.call_soon(self._loop_reading, self._read_fut) if self._loop.get_debug(): logger.debug("%r resumes reading", self) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index cb33cd3..3f44a99 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -702,22 +702,21 @@ class _SelectorSocketTransport(_SelectorTransport): self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None) + def is_reading(self): + return not self._paused and not self._closing + def pause_reading(self): - if self._closing: - raise RuntimeError('Cannot pause_reading() when closing') - if self._paused: - raise RuntimeError('Already paused') + if self._closing or self._paused: + return self._paused = True self._loop._remove_reader(self._sock_fd) if self._loop.get_debug(): logger.debug("%r pauses reading", self) def resume_reading(self): - if not self._paused: - raise RuntimeError('Not paused') - self._paused = False - if self._closing: + if self._closing or not self._paused: return + self._paused = False self._loop._add_reader(self._sock_fd, self._read_ready) if self._loop.get_debug(): logger.debug("%r resumes reading", self) diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index 0c8f01a..8da8570 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -317,6 +317,12 @@ class _SSLProtocolTransport(transports._FlowControlMixin, source=self) self.close() + def is_reading(self): + tr = self._ssl_protocol._transport + if tr is None: + raise RuntimeError('SSL transport has not been initialized yet') + return tr.is_reading() + def pause_reading(self): """Pause the receiving end. diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py index 51f5673..233bbb5 100644 --- a/Lib/asyncio/transports.py +++ b/Lib/asyncio/transports.py @@ -44,6 +44,10 @@ class BaseTransport: class ReadTransport(BaseTransport): """Interface for read-only transports.""" + def is_reading(self): + """Return True if the transport is receiving.""" + raise NotImplementedError + def pause_reading(self): """Pause the receiving end. diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 910f259..47ebcad 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -334,26 +334,36 @@ class ProactorSocketTransportTests(test_utils.TestCase): f = asyncio.Future(loop=self.loop) f.set_result(msg) futures.append(f) + self.loop._proactor.recv.side_effect = futures self.loop._run_once() self.assertFalse(tr._paused) + self.assertTrue(tr.is_reading()) self.loop._run_once() self.protocol.data_received.assert_called_with(b'data1') self.loop._run_once() self.protocol.data_received.assert_called_with(b'data2') + + tr.pause_reading() tr.pause_reading() self.assertTrue(tr._paused) + self.assertFalse(tr.is_reading()) for i in range(10): self.loop._run_once() self.protocol.data_received.assert_called_with(b'data2') + + tr.resume_reading() tr.resume_reading() self.assertFalse(tr._paused) + self.assertTrue(tr.is_reading()) self.loop._run_once() self.protocol.data_received.assert_called_with(b'data3') self.loop._run_once() self.protocol.data_received.assert_called_with(b'data4') tr.close() + self.assertFalse(tr.is_reading()) + def pause_writing_transport(self, high): tr = self.socket_transport() diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index b1ca3fc..89c3d5e 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -80,10 +80,23 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): with test_utils.disable_logger(): transport = self.loop._make_ssl_transport( m, asyncio.Protocol(), m, waiter) + + with self.assertRaisesRegex(RuntimeError, + r'SSL transport.*not.*initialized'): + transport.is_reading() + # execute the handshake while the logger is disabled # to ignore SSL handshake failure test_utils.run_briefly(self.loop) + self.assertTrue(transport.is_reading()) + transport.pause_reading() + transport.pause_reading() + self.assertFalse(transport.is_reading()) + transport.resume_reading() + transport.resume_reading() + self.assertTrue(transport.is_reading()) + # Sanity check class_name = transport.__class__.__name__ self.assertIn("ssl", class_name.lower()) @@ -894,15 +907,24 @@ class SelectorSocketTransportTests(test_utils.TestCase): tr = self.socket_transport() test_utils.run_briefly(self.loop) self.assertFalse(tr._paused) + self.assertTrue(tr.is_reading()) self.loop.assert_reader(7, tr._read_ready) + + tr.pause_reading() tr.pause_reading() self.assertTrue(tr._paused) - self.assertFalse(7 in self.loop.readers) + self.assertFalse(tr.is_reading()) + self.loop.assert_no_reader(7) + + tr.resume_reading() tr.resume_reading() self.assertFalse(tr._paused) + self.assertTrue(tr.is_reading()) self.loop.assert_reader(7, tr._read_ready) - with self.assertRaises(RuntimeError): - tr.resume_reading() + + tr.close() + self.assertFalse(tr.is_reading()) + self.loop.assert_no_reader(7) def test_read_ready(self): transport = self.socket_transport() diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index a1a9bb3..eaafe3a 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -327,12 +327,19 @@ class TestLoop(base_events.BaseEventLoop): return False def assert_reader(self, fd, callback, *args): - assert fd in self.readers, 'fd {} is not registered'.format(fd) + if fd not in self.readers: + raise AssertionError(f'fd {fd} is not registered') handle = self.readers[fd] - assert handle._callback == callback, '{!r} != {!r}'.format( - handle._callback, callback) - assert handle._args == args, '{!r} != {!r}'.format( - handle._args, args) + if handle._callback != callback: + raise AssertionError( + f'unexpected callback: {handle._callback} != {callback}') + if handle._args != args: + raise AssertionError( + f'unexpected callback args: {handle._args} != {args}') + + def assert_no_reader(self, fd): + if fd in self.readers: + raise AssertionError(f'fd {fd} is registered') def _add_writer(self, fd, callback, *args): self.writers[fd] = events.Handle(callback, args, self) diff --git a/Misc/NEWS.d/next/Library/2017-12-17-22-50-51.bpo-32356.roZJpA.rst b/Misc/NEWS.d/next/Library/2017-12-17-22-50-51.bpo-32356.roZJpA.rst new file mode 100644 index 0000000..84b5381 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-12-17-22-50-51.bpo-32356.roZJpA.rst @@ -0,0 +1,2 @@ +asyncio.transport.resume_reading() and pause_reading() are now idempotent. +New transport.is_reading() method is added. -- cgit v0.12