diff options
author | Guido van Rossum <guido@python.org> | 2007-04-06 17:31:18 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2007-04-06 17:31:18 (GMT) |
commit | 78892e46131d01c6f6e6dd7276143e50ffac442d (patch) | |
tree | 6fd83de5a189dd3eeb5b3131788273520993b780 /Lib/test/test_io.py | |
parent | 0e074483e7b1f99488c57296b34452fec718a895 (diff) | |
download | cpython-78892e46131d01c6f6e6dd7276143e50ffac442d.zip cpython-78892e46131d01c6f6e6dd7276143e50ffac442d.tar.gz cpython-78892e46131d01c6f6e6dd7276143e50ffac442d.tar.bz2 |
Added a working Text I/O layer, by Mark Russell.
This is essentially a checkpoint.
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 130 |
1 files changed, 109 insertions, 21 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 956a502..53419f4 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -2,7 +2,7 @@ import unittest from test import test_support - +from itertools import chain import io @@ -16,7 +16,7 @@ class MockIO(io.RawIOBase): try: return self._readStack.pop(0) except: - return io.EOF + return b"" def write(self, b): self._writeStack.append(b) @@ -41,6 +41,18 @@ class MockIO(io.RawIOBase): return 42 +class MockFileIO(io.BytesIO): + + def __init__(self, data): + self.read_history = [] + io.BytesIO.__init__(self, data) + + def read(self, n=None): + res = io.BytesIO.read(self, n) + self.read_history.append(None if res is None else len(res)) + return res + + class MockNonBlockWriterIO(io.RawIOBase): def __init__(self, blockingScript): @@ -147,31 +159,31 @@ class IOTest(unittest.TestCase): f.close() -class BytesIOTest(unittest.TestCase): +class MemorySeekTest(unittest.TestCase): def testInit(self): - buf = b"1234567890" - bytesIo = io.BytesIO(buf) + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) def testRead(self): - buf = b"1234567890" - bytesIo = io.BytesIO(buf) + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) self.assertEquals(buf[:1], bytesIo.read(1)) self.assertEquals(buf[1:5], bytesIo.read(4)) self.assertEquals(buf[5:], bytesIo.read(900)) - self.assertEquals(io.EOF, bytesIo.read()) + self.assertEquals(self.EOF, bytesIo.read()) def testReadNoArgs(self): - buf = b"1234567890" - bytesIo = io.BytesIO(buf) + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) self.assertEquals(buf, bytesIo.read()) - self.assertEquals(io.EOF, bytesIo.read()) + self.assertEquals(self.EOF, bytesIo.read()) def testSeek(self): - buf = b"1234567890" - bytesIo = io.BytesIO(buf) + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) bytesIo.read(5) bytesIo.seek(0) @@ -181,8 +193,8 @@ class BytesIOTest(unittest.TestCase): self.assertEquals(buf[3:], bytesIo.read()) def testTell(self): - buf = b"1234567890" - bytesIo = io.BytesIO(buf) + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) self.assertEquals(0, bytesIo.tell()) bytesIo.seek(5) @@ -191,6 +203,18 @@ class BytesIOTest(unittest.TestCase): self.assertEquals(10000, bytesIo.tell()) +class BytesIOTest(MemorySeekTest): + buftype = bytes + ioclass = io.BytesIO + EOF = b"" + + +class StringIOTest(MemorySeekTest): + buftype = str + ioclass = io.StringIO + EOF = "" + + class BufferedReaderTest(unittest.TestCase): def testRead(self): @@ -199,6 +223,25 @@ class BufferedReaderTest(unittest.TestCase): self.assertEquals(b"abcdef", bufIo.read(6)) + def testBuffering(self): + data = b"abcdefghi" + dlen = len(data) + + tests = [ + [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], + [ 100, [ 3, 3, 3], [ dlen ] ], + [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], + ] + + for bufsize, buf_read_sizes, raw_read_sizes in tests: + rawIo = MockFileIO(data) + bufIo = io.BufferedReader(rawIo, buffer_size=bufsize) + pos = 0 + for nbytes in buf_read_sizes: + self.assertEquals(bufIo.read(nbytes), data[pos:pos+nbytes]) + pos += nbytes + self.assertEquals(rawIo.read_history, raw_read_sizes) + def testReadNonBlocking(self): # Inject some None's in there to simulate EWOULDBLOCK rawIo = MockIO((b"abc", b"d", None, b"efg", None, None)) @@ -208,7 +251,7 @@ class BufferedReaderTest(unittest.TestCase): self.assertEquals(b"e", bufIo.read(1)) self.assertEquals(b"fg", bufIo.read()) self.assert_(None is bufIo.read()) - self.assertEquals(io.EOF, bufIo.read()) + self.assertEquals(b"", bufIo.read()) def testReadToEof(self): rawIo = MockIO((b"abc", b"d", b"efg")) @@ -270,8 +313,9 @@ class BufferedWriterTest(unittest.TestCase): bufIo.write(b"asdfasdfasdf") - # XXX I don't like this test. It relies too heavily on how the algorithm - # actually works, which we might change. Refactor later. + # XXX I don't like this test. It relies too heavily on how the + # algorithm actually works, which we might change. Refactor + # later. def testFileno(self): rawIo = MockIO((b"abc", b"d", b"efg")) @@ -299,7 +343,7 @@ class BufferedRWPairTest(unittest.TestCase): # XXX need implementation -class BufferedRandom(unittest.TestCase): +class BufferedRandomTest(unittest.TestCase): def testReadAndWrite(self): raw = MockIO((b"asdf", b"ghjk")) @@ -331,12 +375,56 @@ class BufferedRandom(unittest.TestCase): self.assertEquals(7, rw.tell()) self.assertEquals(b"fl", rw.read(11)) + +class TextIOWrapperTest(unittest.TestCase): + def testNewlines(self): + input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] + + tests = [ + [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], + [ '\n', input_lines ], + [ '\r\n', input_lines ], + ] + + encodings = ('utf-8', 'bz2') + + # Try a range of pad sizes to test the case where \r is the last + # character in TextIOWrapper._pending_line. + for encoding in encodings: + for do_reads in (False, True): + for padlen in chain(range(10), range(50, 60)): + pad = '.' * padlen + data_lines = [ pad + line for line in input_lines ] + # XXX: str.encode() should return bytes + data = bytes(''.join(data_lines).encode(encoding)) + + for newline, exp_line_ends in tests: + exp_lines = [ pad + line for line in exp_line_ends ] + bufIo = io.BufferedReader(io.BytesIO(data)) + textIo = io.TextIOWrapper(bufIo, newline=newline, + encoding=encoding) + if do_reads: + got_lines = [] + while True: + c2 = textIo.read(2) + if c2 == '': + break + self.assertEquals(len(c2), 2) + got_lines.append(c2 + textIo.readline()) + else: + got_lines = list(textIo) + + for got_line, exp_line in zip(got_lines, exp_lines): + self.assertEquals(got_line, exp_line) + self.assertEquals(len(got_lines), len(exp_lines)) + # XXX Tests for open() def test_main(): - test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest, + test_support.run_unittest(IOTest, BytesIOTest, StringIOTest, + BufferedReaderTest, BufferedWriterTest, BufferedRWPairTest, - BufferedRandom) + BufferedRandomTest, TextIOWrapperTest) if __name__ == "__main__": test_main() |