summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2007-04-06 17:31:18 (GMT)
committerGuido van Rossum <guido@python.org>2007-04-06 17:31:18 (GMT)
commit78892e46131d01c6f6e6dd7276143e50ffac442d (patch)
tree6fd83de5a189dd3eeb5b3131788273520993b780 /Lib/test/test_io.py
parent0e074483e7b1f99488c57296b34452fec718a895 (diff)
downloadcpython-78892e46131d01c6f6e6dd7276143e50ffac442d.zip
cpython-78892e46131d01c6f6e6dd7276143e50ffac442d.tar.gz
cpython-78892e46131d01c6f6e6dd7276143e50ffac442d.tar.bz2
Added a working Text I/O layer, by Mark Russell.
This is essentially a checkpoint.
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py130
1 files changed, 109 insertions, 21 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 956a502..53419f4 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -2,7 +2,7 @@
import unittest
from test import test_support
-
+from itertools import chain
import io
@@ -16,7 +16,7 @@ class MockIO(io.RawIOBase):
try:
return self._readStack.pop(0)
except:
- return io.EOF
+ return b""
def write(self, b):
self._writeStack.append(b)
@@ -41,6 +41,18 @@ class MockIO(io.RawIOBase):
return 42
+class MockFileIO(io.BytesIO):
+
+ def __init__(self, data):
+ self.read_history = []
+ io.BytesIO.__init__(self, data)
+
+ def read(self, n=None):
+ res = io.BytesIO.read(self, n)
+ self.read_history.append(None if res is None else len(res))
+ return res
+
+
class MockNonBlockWriterIO(io.RawIOBase):
def __init__(self, blockingScript):
@@ -147,31 +159,31 @@ class IOTest(unittest.TestCase):
f.close()
-class BytesIOTest(unittest.TestCase):
+class MemorySeekTest(unittest.TestCase):
def testInit(self):
- buf = b"1234567890"
- bytesIo = io.BytesIO(buf)
+ buf = self.buftype("1234567890")
+ bytesIo = self.ioclass(buf)
def testRead(self):
- buf = b"1234567890"
- bytesIo = io.BytesIO(buf)
+ buf = self.buftype("1234567890")
+ bytesIo = self.ioclass(buf)
self.assertEquals(buf[:1], bytesIo.read(1))
self.assertEquals(buf[1:5], bytesIo.read(4))
self.assertEquals(buf[5:], bytesIo.read(900))
- self.assertEquals(io.EOF, bytesIo.read())
+ self.assertEquals(self.EOF, bytesIo.read())
def testReadNoArgs(self):
- buf = b"1234567890"
- bytesIo = io.BytesIO(buf)
+ buf = self.buftype("1234567890")
+ bytesIo = self.ioclass(buf)
self.assertEquals(buf, bytesIo.read())
- self.assertEquals(io.EOF, bytesIo.read())
+ self.assertEquals(self.EOF, bytesIo.read())
def testSeek(self):
- buf = b"1234567890"
- bytesIo = io.BytesIO(buf)
+ buf = self.buftype("1234567890")
+ bytesIo = self.ioclass(buf)
bytesIo.read(5)
bytesIo.seek(0)
@@ -181,8 +193,8 @@ class BytesIOTest(unittest.TestCase):
self.assertEquals(buf[3:], bytesIo.read())
def testTell(self):
- buf = b"1234567890"
- bytesIo = io.BytesIO(buf)
+ buf = self.buftype("1234567890")
+ bytesIo = self.ioclass(buf)
self.assertEquals(0, bytesIo.tell())
bytesIo.seek(5)
@@ -191,6 +203,18 @@ class BytesIOTest(unittest.TestCase):
self.assertEquals(10000, bytesIo.tell())
+class BytesIOTest(MemorySeekTest):
+ buftype = bytes
+ ioclass = io.BytesIO
+ EOF = b""
+
+
+class StringIOTest(MemorySeekTest):
+ buftype = str
+ ioclass = io.StringIO
+ EOF = ""
+
+
class BufferedReaderTest(unittest.TestCase):
def testRead(self):
@@ -199,6 +223,25 @@ class BufferedReaderTest(unittest.TestCase):
self.assertEquals(b"abcdef", bufIo.read(6))
+ def testBuffering(self):
+ data = b"abcdefghi"
+ dlen = len(data)
+
+ tests = [
+ [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
+ [ 100, [ 3, 3, 3], [ dlen ] ],
+ [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
+ ]
+
+ for bufsize, buf_read_sizes, raw_read_sizes in tests:
+ rawIo = MockFileIO(data)
+ bufIo = io.BufferedReader(rawIo, buffer_size=bufsize)
+ pos = 0
+ for nbytes in buf_read_sizes:
+ self.assertEquals(bufIo.read(nbytes), data[pos:pos+nbytes])
+ pos += nbytes
+ self.assertEquals(rawIo.read_history, raw_read_sizes)
+
def testReadNonBlocking(self):
# Inject some None's in there to simulate EWOULDBLOCK
rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
@@ -208,7 +251,7 @@ class BufferedReaderTest(unittest.TestCase):
self.assertEquals(b"e", bufIo.read(1))
self.assertEquals(b"fg", bufIo.read())
self.assert_(None is bufIo.read())
- self.assertEquals(io.EOF, bufIo.read())
+ self.assertEquals(b"", bufIo.read())
def testReadToEof(self):
rawIo = MockIO((b"abc", b"d", b"efg"))
@@ -270,8 +313,9 @@ class BufferedWriterTest(unittest.TestCase):
bufIo.write(b"asdfasdfasdf")
- # XXX I don't like this test. It relies too heavily on how the algorithm
- # actually works, which we might change. Refactor later.
+ # XXX I don't like this test. It relies too heavily on how the
+ # algorithm actually works, which we might change. Refactor
+ # later.
def testFileno(self):
rawIo = MockIO((b"abc", b"d", b"efg"))
@@ -299,7 +343,7 @@ class BufferedRWPairTest(unittest.TestCase):
# XXX need implementation
-class BufferedRandom(unittest.TestCase):
+class BufferedRandomTest(unittest.TestCase):
def testReadAndWrite(self):
raw = MockIO((b"asdf", b"ghjk"))
@@ -331,12 +375,56 @@ class BufferedRandom(unittest.TestCase):
self.assertEquals(7, rw.tell())
self.assertEquals(b"fl", rw.read(11))
+
+class TextIOWrapperTest(unittest.TestCase):
+ def testNewlines(self):
+ input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
+
+ tests = [
+ [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
+ [ '\n', input_lines ],
+ [ '\r\n', input_lines ],
+ ]
+
+ encodings = ('utf-8', 'bz2')
+
+ # Try a range of pad sizes to test the case where \r is the last
+ # character in TextIOWrapper._pending_line.
+ for encoding in encodings:
+ for do_reads in (False, True):
+ for padlen in chain(range(10), range(50, 60)):
+ pad = '.' * padlen
+ data_lines = [ pad + line for line in input_lines ]
+ # XXX: str.encode() should return bytes
+ data = bytes(''.join(data_lines).encode(encoding))
+
+ for newline, exp_line_ends in tests:
+ exp_lines = [ pad + line for line in exp_line_ends ]
+ bufIo = io.BufferedReader(io.BytesIO(data))
+ textIo = io.TextIOWrapper(bufIo, newline=newline,
+ encoding=encoding)
+ if do_reads:
+ got_lines = []
+ while True:
+ c2 = textIo.read(2)
+ if c2 == '':
+ break
+ self.assertEquals(len(c2), 2)
+ got_lines.append(c2 + textIo.readline())
+ else:
+ got_lines = list(textIo)
+
+ for got_line, exp_line in zip(got_lines, exp_lines):
+ self.assertEquals(got_line, exp_line)
+ self.assertEquals(len(got_lines), len(exp_lines))
+
# XXX Tests for open()
def test_main():
- test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
+ test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
+ BufferedReaderTest,
BufferedWriterTest, BufferedRWPairTest,
- BufferedRandom)
+ BufferedRandomTest, TextIOWrapperTest)
if __name__ == "__main__":
test_main()