diff options
author | Yury Selivanov <yury@magic.io> | 2016-09-15 20:51:48 (GMT) |
---|---|---|
committer | Yury Selivanov <yury@magic.io> | 2016-09-15 20:51:48 (GMT) |
commit | 4c5bf3bc527b67dd2a6f03c772287842124d607b (patch) | |
tree | 598a0e23775699fbade70ff6ca8a3f6071aebc6d /Lib | |
parent | 45dccdad93fbfa5c2b90a697b47d5286115827aa (diff) | |
download | cpython-4c5bf3bc527b67dd2a6f03c772287842124d607b.zip cpython-4c5bf3bc527b67dd2a6f03c772287842124d607b.tar.gz cpython-4c5bf3bc527b67dd2a6f03c772287842124d607b.tar.bz2 |
Issue #26909: Fix slow pipes IO in asyncio.
Patch by INADA Naoki.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/unix_events.py | 27 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_unix_events.py | 49 |
2 files changed, 34 insertions, 42 deletions
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index f7f9eb2..a0113f6 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -434,7 +434,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, self._pipe = pipe self._fileno = pipe.fileno() self._protocol = protocol - self._buffer = [] + self._buffer = bytearray() self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. @@ -450,7 +450,6 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, "pipes, sockets and character devices") _set_nonblocking(self._fileno) - self._loop.call_soon(self._protocol.connection_made, self) # On AIX, the reader trick (to be notified when the read end of the @@ -492,7 +491,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, return '<%s>' % ' '.join(info) def get_write_buffer_size(self): - return sum(len(data) for data in self._buffer) + return len(self._buffer) def _read_ready(self): # Pipe was closed by peer. @@ -530,39 +529,37 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, if n == len(data): return elif n > 0: - data = data[n:] + data = memoryview(data)[n:] self._loop.add_writer(self._fileno, self._write_ready) - self._buffer.append(data) + self._buffer += data self._maybe_pause_protocol() def _write_ready(self): - data = b''.join(self._buffer) - assert data, 'Data should not be empty' + assert self._buffer, 'Data should not be empty' - self._buffer.clear() try: - n = os.write(self._fileno, data) + n = os.write(self._fileno, self._buffer) except (BlockingIOError, InterruptedError): - self._buffer.append(data) + pass except Exception as exc: + self._buffer.clear() self._conn_lost += 1 # Remove writer here, _fatal_error() doesn't it # because _buffer is empty. self._loop.remove_writer(self._fileno) self._fatal_error(exc, 'Fatal write error on pipe transport') else: - if n == len(data): + if n == len(self._buffer): + self._buffer.clear() self._loop.remove_writer(self._fileno) self._maybe_resume_protocol() # May append to buffer. - if not self._buffer and self._closing: + if self._closing: self._loop.remove_reader(self._fileno) self._call_connection_lost(None) return elif n > 0: - data = data[n:] - - self._buffer.append(data) # Try again later. + del self._buffer[:n] def can_write_eof(self): return True diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 22dc688..570022f 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -518,7 +518,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase): tr.write(b'data') m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertEqual(bytearray(), tr._buffer) @mock.patch('os.write') def test_write_no_data(self, m_write): @@ -526,35 +526,34 @@ class UnixWritePipeTransportTests(test_utils.TestCase): tr.write(b'') self.assertFalse(m_write.called) self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertEqual(bytearray(b''), tr._buffer) @mock.patch('os.write') def test_write_partial(self, m_write): tr = self.write_pipe_transport() m_write.return_value = 2 tr.write(b'data') - m_write.assert_called_with(5, b'data') self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'ta'], tr._buffer) + self.assertEqual(bytearray(b'ta'), tr._buffer) @mock.patch('os.write') def test_write_buffer(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'previous'] + tr._buffer = bytearray(b'previous') tr.write(b'data') self.assertFalse(m_write.called) self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'previous', b'data'], tr._buffer) + self.assertEqual(bytearray(b'previousdata'), tr._buffer) @mock.patch('os.write') def test_write_again(self, m_write): tr = self.write_pipe_transport() m_write.side_effect = BlockingIOError() tr.write(b'data') - m_write.assert_called_with(5, b'data') + m_write.assert_called_with(5, bytearray(b'data')) self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) + self.assertEqual(bytearray(b'data'), tr._buffer) @mock.patch('asyncio.unix_events.logger') @mock.patch('os.write') @@ -566,7 +565,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase): tr.write(b'data') m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertEqual(bytearray(), tr._buffer) tr._fatal_error.assert_called_with( err, 'Fatal write error on pipe transport') @@ -606,58 +605,55 @@ class UnixWritePipeTransportTests(test_utils.TestCase): def test__write_ready(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] + tr._buffer = bytearray(b'data') m_write.return_value = 4 tr._write_ready() - m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertEqual(bytearray(), tr._buffer) @mock.patch('os.write') def test__write_ready_partial(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] + tr._buffer = bytearray(b'data') m_write.return_value = 3 tr._write_ready() - m_write.assert_called_with(5, b'data') self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'a'], tr._buffer) + self.assertEqual(bytearray(b'a'), tr._buffer) @mock.patch('os.write') def test__write_ready_again(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] + tr._buffer = bytearray(b'data') m_write.side_effect = BlockingIOError() tr._write_ready() - m_write.assert_called_with(5, b'data') + m_write.assert_called_with(5, bytearray(b'data')) self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) + self.assertEqual(bytearray(b'data'), tr._buffer) @mock.patch('os.write') def test__write_ready_empty(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] + tr._buffer = bytearray(b'data') m_write.return_value = 0 tr._write_ready() - m_write.assert_called_with(5, b'data') + m_write.assert_called_with(5, bytearray(b'data')) self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) + self.assertEqual(bytearray(b'data'), tr._buffer) @mock.patch('asyncio.log.logger.error') @mock.patch('os.write') def test__write_ready_err(self, m_write, m_logexc): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] + tr._buffer = bytearray(b'data') m_write.side_effect = err = OSError() tr._write_ready() - m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) self.assertFalse(self.loop.readers) - self.assertEqual([], tr._buffer) + self.assertEqual(bytearray(), tr._buffer) self.assertTrue(tr.is_closing()) m_logexc.assert_called_with( test_utils.MockPattern( @@ -673,13 +669,12 @@ class UnixWritePipeTransportTests(test_utils.TestCase): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) tr._closing = True - tr._buffer = [b'da', b'ta'] + tr._buffer = bytearray(b'data') m_write.return_value = 4 tr._write_ready() - m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) self.assertFalse(self.loop.readers) - self.assertEqual([], tr._buffer) + self.assertEqual(bytearray(), tr._buffer) self.protocol.connection_lost.assert_called_with(None) self.pipe.close.assert_called_with() |