diff options
author | Martin Panter <vadmium+py@gmail.com> | 2016-05-28 01:07:08 (GMT) |
---|---|---|
committer | Martin Panter <vadmium+py@gmail.com> | 2016-05-28 01:07:08 (GMT) |
commit | c249221dfda0bf05c6fab37fab9c0eb4f23c870c (patch) | |
tree | a922dedc5f1a61c5b242d08f9b00f6ba587e9452 /Lib/test/test_io.py | |
parent | 0472217d433634cb7180a37045744ad4a570d500 (diff) | |
parent | 6bb91f3b6e51352f91bcf785d3f6fe160ed2cd85 (diff) | |
download | cpython-c249221dfda0bf05c6fab37fab9c0eb4f23c870c.zip cpython-c249221dfda0bf05c6fab37fab9c0eb4f23c870c.tar.gz cpython-c249221dfda0bf05c6fab37fab9c0eb4f23c870c.tar.bz2 |
Issue #20699: Merge io bytes-like fixes from 3.5
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 78 |
1 files changed, 64 insertions, 14 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 1dab166..53e776d 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -45,6 +45,22 @@ try: except ImportError: threading = None +try: + import ctypes +except ImportError: + def byteslike(*pos, **kw): + return array.array("b", bytes(*pos, **kw)) +else: + def byteslike(*pos, **kw): + """Create a bytes-like object having no string or sequence methods""" + data = bytes(*pos, **kw) + obj = EmptyStruct() + ctypes.resize(obj, len(data)) + memoryview(obj).cast("B")[:] = data + return obj + class EmptyStruct(ctypes.Structure): + pass + def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" with open(__file__, "r", encoding="latin-1") as f: @@ -284,7 +300,9 @@ class IOTest(unittest.TestCase): self.assertEqual(f.tell(), 6) self.assertEqual(f.seek(-1, 1), 5) self.assertEqual(f.tell(), 5) - self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9) + buffer = bytearray(b" world\n\n\n") + self.assertEqual(f.write(buffer), 9) + buffer[:] = b"*" * 9 # Overwrite our copy of the data self.assertEqual(f.seek(0), 0) self.assertEqual(f.write(b"h"), 1) self.assertEqual(f.seek(-1, 2), 13) @@ -297,20 +315,21 @@ class IOTest(unittest.TestCase): def read_ops(self, f, buffered=False): data = f.read(5) self.assertEqual(data, b"hello") - data = bytearray(data) + data = byteslike(data) self.assertEqual(f.readinto(data), 5) - self.assertEqual(data, b" worl") + self.assertEqual(bytes(data), b" worl") + data = bytearray(5) self.assertEqual(f.readinto(data), 2) self.assertEqual(len(data), 5) self.assertEqual(data[:2], b"d\n") self.assertEqual(f.seek(0), 0) self.assertEqual(f.read(20), b"hello world\n") self.assertEqual(f.read(1), b"") - self.assertEqual(f.readinto(bytearray(b"x")), 0) + self.assertEqual(f.readinto(byteslike(b"x")), 0) self.assertEqual(f.seek(-6, 2), 6) self.assertEqual(f.read(5), b"world") self.assertEqual(f.read(0), b"") - self.assertEqual(f.readinto(bytearray()), 0) + self.assertEqual(f.readinto(byteslike()), 0) self.assertEqual(f.seek(-6, 1), 5) self.assertEqual(f.read(5), b" worl") self.assertEqual(f.tell(), 10) @@ -321,6 +340,10 @@ class IOTest(unittest.TestCase): f.seek(6) self.assertEqual(f.read(), b"world\n") self.assertEqual(f.read(), b"") + f.seek(0) + data = byteslike(5) + self.assertEqual(f.readinto1(data), 5) + self.assertEqual(bytes(data), b"hello") LARGE = 2**31 @@ -641,10 +664,15 @@ class IOTest(unittest.TestCase): def test_array_writes(self): a = array.array('i', range(10)) n = len(a.tobytes()) - with self.open(support.TESTFN, "wb", 0) as f: - self.assertEqual(f.write(a), n) - with self.open(support.TESTFN, "wb") as f: - self.assertEqual(f.write(a), n) + def check(f): + with f: + self.assertEqual(f.write(a), n) + f.writelines((a,)) + check(self.BytesIO()) + check(self.FileIO(support.TESTFN, "w")) + check(self.BufferedWriter(self.MockRawIO())) + check(self.BufferedRandom(self.MockRawIO())) + check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) def test_closefd(self): self.assertRaises(ValueError, self.open, support.TESTFN, 'w', @@ -803,6 +831,19 @@ class IOTest(unittest.TestCase): with self.assertRaises(ValueError): self.open(support.TESTFN, 'w', newline='invalid') + def test_buffered_readinto_mixin(self): + # Test the implementation provided by BufferedIOBase + class Stream(self.BufferedIOBase): + def read(self, size): + return b"12345" + read1 = read + stream = Stream() + for method in ("readinto", "readinto1"): + with self.subTest(method): + buffer = byteslike(5) + self.assertEqual(getattr(stream, method)(buffer), 5) + self.assertEqual(bytes(buffer), b"12345") + class CIOTest(IOTest): @@ -1394,6 +1435,11 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): bufio = self.tp(writer, 8) bufio.write(b"abc") self.assertFalse(writer._write_stack) + buffer = bytearray(b"def") + bufio.write(buffer) + buffer[:] = b"***" # Overwrite our copy of the data + bufio.flush() + self.assertEqual(b"".join(writer._write_stack), b"abcdef") def test_write_overflow(self): writer = self.MockRawIO() @@ -1720,11 +1766,13 @@ class BufferedRWPairTest(unittest.TestCase): self.assertEqual(pair.read1(3), b"abc") def test_readinto(self): - pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) + for method in ("readinto", "readinto1"): + with self.subTest(method): + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) - data = bytearray(5) - self.assertEqual(pair.readinto(data), 5) - self.assertEqual(data, b"abcde") + data = byteslike(5) + self.assertEqual(getattr(pair, method)(data), 5) + self.assertEqual(bytes(data), b"abcde") def test_write(self): w = self.MockRawIO() @@ -1732,7 +1780,9 @@ class BufferedRWPairTest(unittest.TestCase): pair.write(b"abc") pair.flush() - pair.write(b"def") + buffer = bytearray(b"def") + pair.write(buffer) + buffer[:] = b"***" # Overwrite our copy of the data pair.flush() self.assertEqual(w._write_stack, [b"abc", b"def"]) |