diff options
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 119 |
1 files changed, 102 insertions, 17 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index b5506b0..a3567fa 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -44,10 +44,6 @@ try: import threading except ImportError: threading = None -try: - import fcntl -except ImportError: - fcntl = None def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" @@ -367,8 +363,8 @@ class IOTest(unittest.TestCase): def test_open_handles_NUL_chars(self): fn_with_NUL = 'foo\0bar' - self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') - self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') + self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') + self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') def test_raw_file_io(self): with self.open(support.TESTFN, "wb", buffering=0) as f: @@ -778,7 +774,7 @@ class CommonBufferedTests: def test_repr(self): raw = self.MockRawIO() b = self.tp(raw) - clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) + clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__) self.assertEqual(repr(b), "<%s>" % clsname) raw.name = "dummy" self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) @@ -943,6 +939,71 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(bufio.readinto(b), 1) self.assertEqual(b, b"cb") + def test_readinto1(self): + buffer_size = 10 + rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) + bufio = self.tp(rawio, buffer_size=buffer_size) + b = bytearray(2) + self.assertEqual(bufio.peek(3), b'abc') + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 2) + self.assertEqual(b, b"ab") + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 1) + self.assertEqual(b[:1], b"c") + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 2) + self.assertEqual(b, b"de") + self.assertEqual(rawio._reads, 2) + b = bytearray(2*buffer_size) + self.assertEqual(bufio.peek(3), b'fgh') + self.assertEqual(rawio._reads, 3) + self.assertEqual(bufio.readinto1(b), 6) + self.assertEqual(b[:6], b"fghjkl") + self.assertEqual(rawio._reads, 4) + + def test_readinto_array(self): + buffer_size = 60 + data = b"a" * 26 + rawio = self.MockRawIO((data,)) + bufio = self.tp(rawio, buffer_size=buffer_size) + + # Create an array with element size > 1 byte + b = array.array('i', b'x' * 32) + assert len(b) != 16 + + # Read into it. We should get as many *bytes* as we can fit into b + # (which is more than the number of elements) + n = bufio.readinto(b) + self.assertGreater(n, len(b)) + + # Check that old contents of b are preserved + bm = memoryview(b).cast('B') + self.assertLess(n, len(bm)) + self.assertEqual(bm[:n], data[:n]) + self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) + + def test_readinto1_array(self): + buffer_size = 60 + data = b"a" * 26 + rawio = self.MockRawIO((data,)) + bufio = self.tp(rawio, buffer_size=buffer_size) + + # Create an array with element size > 1 byte + b = array.array('i', b'x' * 32) + assert len(b) != 16 + + # Read into it. We should get as many *bytes* as we can fit into b + # (which is more than the number of elements) + n = bufio.readinto1(b) + self.assertGreater(n, len(b)) + + # Check that old contents of b are preserved + bm = memoryview(b).cast('B') + self.assertLess(n, len(bm)) + self.assertEqual(bm[:n], data[:n]) + self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) + def test_readlines(self): def bufio(): rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) @@ -2784,6 +2845,34 @@ class TextIOWrapperTest(unittest.TestCase): self.assertFalse(err) self.assertEqual("ok", out.decode().strip()) + def test_read_byteslike(self): + r = MemviewBytesIO(b'Just some random string\n') + t = self.TextIOWrapper(r, 'utf-8') + + # TextIOwrapper will not read the full string, because + # we truncate it to a multiple of the native int size + # so that we can construct a more complex memoryview. + bytes_val = _to_memoryview(r.getvalue()).tobytes() + + self.assertEqual(t.read(200), bytes_val.decode('utf-8')) + +class MemviewBytesIO(io.BytesIO): + '''A BytesIO object whose read method returns memoryviews + rather than bytes''' + + def read1(self, len_): + return _to_memoryview(super().read1(len_)) + + def read(self, len_): + return _to_memoryview(super().read(len_)) + +def _to_memoryview(buf): + '''Convert bytes-object *buf* to a non-trivial memoryview''' + + arr = array.array('i') + idx = len(buf) - len(buf) % arr.itemsize + arr.frombytes(buf[:idx]) + return memoryview(arr) class CTextIOWrapperTest(TextIOWrapperTest): io = io @@ -3028,6 +3117,8 @@ class MiscIOTest(unittest.TestCase): self.assertRaises(ValueError, f.readall) if hasattr(f, "readinto"): self.assertRaises(ValueError, f.readinto, bytearray(1024)) + if hasattr(f, "readinto1"): + self.assertRaises(ValueError, f.readinto1, bytearray(1024)) self.assertRaises(ValueError, f.readline) self.assertRaises(ValueError, f.readlines) self.assertRaises(ValueError, f.seek, 0) @@ -3141,26 +3232,20 @@ class MiscIOTest(unittest.TestCase): with self.open(support.TESTFN, **kwargs) as f: self.assertRaises(TypeError, pickle.dumps, f, protocol) - @unittest.skipUnless(fcntl, 'fcntl required for this test') def test_nonblock_pipe_write_bigbuf(self): self._test_nonblock_pipe_write(16*1024) - @unittest.skipUnless(fcntl, 'fcntl required for this test') def test_nonblock_pipe_write_smallbuf(self): self._test_nonblock_pipe_write(1024) - def _set_non_blocking(self, fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - self.assertNotEqual(flags, -1) - res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - self.assertEqual(res, 0) - + @unittest.skipUnless(hasattr(os, 'set_blocking'), + 'os.set_blocking() required for this test') def _test_nonblock_pipe_write(self, bufsize): sent = [] received = [] r, w = os.pipe() - self._set_non_blocking(r) - self._set_non_blocking(w) + os.set_blocking(r, False) + os.set_blocking(w, False) # To exercise all code paths in the C implementation we need # to play with buffer sizes. For instance, if we choose a |