diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_transports.py')
-rw-r--r-- | Lib/test/test_asyncio/test_transports.py | 97 |
1 files changed, 0 insertions, 97 deletions
diff --git a/Lib/test/test_asyncio/test_transports.py b/Lib/test/test_asyncio/test_transports.py deleted file mode 100644 index df44855..0000000 --- a/Lib/test/test_asyncio/test_transports.py +++ /dev/null @@ -1,97 +0,0 @@ -"""Tests for transports.py.""" - -import unittest -from unittest import mock - -import asyncio -from asyncio import transports - - -class TransportTests(unittest.TestCase): - - def test_ctor_extra_is_none(self): - transport = asyncio.Transport() - self.assertEqual(transport._extra, {}) - - def test_get_extra_info(self): - transport = asyncio.Transport({'extra': 'info'}) - self.assertEqual('info', transport.get_extra_info('extra')) - self.assertIsNone(transport.get_extra_info('unknown')) - - default = object() - self.assertIs(default, transport.get_extra_info('unknown', default)) - - def test_writelines(self): - writer = mock.Mock() - - class MyTransport(asyncio.Transport): - def write(self, data): - writer(data) - - transport = MyTransport() - - transport.writelines([b'line1', - bytearray(b'line2'), - memoryview(b'line3')]) - self.assertEqual(1, writer.call_count) - writer.assert_called_with(b'line1line2line3') - - def test_not_implemented(self): - transport = asyncio.Transport() - - self.assertRaises(NotImplementedError, - transport.set_write_buffer_limits) - self.assertRaises(NotImplementedError, transport.get_write_buffer_size) - self.assertRaises(NotImplementedError, transport.write, 'data') - self.assertRaises(NotImplementedError, transport.write_eof) - self.assertRaises(NotImplementedError, transport.can_write_eof) - self.assertRaises(NotImplementedError, transport.pause_reading) - self.assertRaises(NotImplementedError, transport.resume_reading) - self.assertRaises(NotImplementedError, transport.is_reading) - self.assertRaises(NotImplementedError, transport.close) - self.assertRaises(NotImplementedError, transport.abort) - - def test_dgram_not_implemented(self): - transport = asyncio.DatagramTransport() - - self.assertRaises(NotImplementedError, transport.sendto, 'data') - self.assertRaises(NotImplementedError, transport.abort) - - def test_subprocess_transport_not_implemented(self): - transport = asyncio.SubprocessTransport() - - self.assertRaises(NotImplementedError, transport.get_pid) - self.assertRaises(NotImplementedError, transport.get_returncode) - self.assertRaises(NotImplementedError, transport.get_pipe_transport, 1) - self.assertRaises(NotImplementedError, transport.send_signal, 1) - self.assertRaises(NotImplementedError, transport.terminate) - self.assertRaises(NotImplementedError, transport.kill) - - def test_flowcontrol_mixin_set_write_limits(self): - - class MyTransport(transports._FlowControlMixin, - transports.Transport): - - def get_write_buffer_size(self): - return 512 - - loop = mock.Mock() - transport = MyTransport(loop=loop) - transport._protocol = mock.Mock() - - self.assertFalse(transport._protocol_paused) - - with self.assertRaisesRegex(ValueError, 'high.*must be >= low'): - transport.set_write_buffer_limits(high=0, low=1) - - transport.set_write_buffer_limits(high=1024, low=128) - self.assertFalse(transport._protocol_paused) - self.assertEqual(transport.get_write_buffer_limits(), (128, 1024)) - - transport.set_write_buffer_limits(high=256, low=128) - self.assertTrue(transport._protocol_paused) - self.assertEqual(transport.get_write_buffer_limits(), (128, 256)) - - -if __name__ == '__main__': - unittest.main() |