diff options
author | Guido van Rossum <guido@python.org> | 2007-03-07 01:00:12 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2007-03-07 01:00:12 (GMT) |
commit | 01a2752d19d0ad615c989b8d742a736ca8d51a57 (patch) | |
tree | 45f0d698cfbe5081647b7fbbc79890cf6be5b24b /Lib/test/test_io.py | |
parent | c78855465fb392db0a821e8ea586a22234857006 (diff) | |
download | cpython-01a2752d19d0ad615c989b8d742a736ca8d51a57.zip cpython-01a2752d19d0ad615c989b8d742a736ca8d51a57.tar.gz cpython-01a2752d19d0ad615c989b8d742a736ca8d51a57.tar.bz2 |
New version from Mike Verdone (sat in my inbox since 2/27).
I cleaned up whitespace but otherwise didn't change it.
This will need work to reflect the tentative decision to drop nonblocking I/O
support from the buffering layers.
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 144 |
1 files changed, 114 insertions, 30 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 7384999..1be1b71 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -5,10 +5,10 @@ from test import test_support import io - -class MockReadIO(io.RawIOBase): - def __init__(self, readStack): +class MockIO(io.RawIOBase): + def __init__(self, readStack=()): self._readStack = list(readStack) + self._writeStack = [] def read(self, n=None): try: @@ -16,27 +16,41 @@ class MockReadIO(io.RawIOBase): except: return io.EOF + def write(self, b): + self._writeStack.append(b) + return len(b) + + def writable(self): + return True + def fileno(self): return 42 def readable(self): return True - -class MockWriteIO(io.RawIOBase): - def __init__(self): - self._writeStack = [] - - def write(self, b): - self._writeStack.append(b) - return len(b) - - def writeable(self): + def seekable(self): return True - def fileno(self): + def seek(self, pos, whence): + pass + + def tell(self): return 42 +class MockNonBlockWriterIO(io.RawIOBase): + def __init__(self, blockingScript): + self.bs = list(blockingScript) + self._write_stack = [] + def write(self, b): + self._write_stack.append(b) + n = self.bs.pop(0) + if (n < 0): + raise io.BlockingIO(0, "test blocking", -n) + else: + return n + def writable(self): + return True class IOTest(unittest.TestCase): @@ -90,9 +104,7 @@ class IOTest(unittest.TestCase): f = io.BytesIO(data) self.read_ops(f) - class BytesIOTest(unittest.TestCase): - def testInit(self): buf = b"1234567890" bytesIo = io.BytesIO(buf) @@ -134,44 +146,51 @@ class BytesIOTest(unittest.TestCase): bytesIo.seek(10000) self.assertEquals(10000, bytesIo.tell()) - class BufferedReaderTest(unittest.TestCase): - def testRead(self): - rawIo = MockReadIO((b"abc", b"d", b"efg")) + rawIo = MockIO((b"abc", b"d", b"efg")) bufIo = io.BufferedReader(rawIo) self.assertEquals(b"abcdef", bufIo.read(6)) + def testReadNonBlocking(self): + # Inject some None's in there to simulate EWOULDBLOCK + rawIo = MockIO((b"abc", b"d", None, b"efg", None, None)) + bufIo = io.BufferedReader(rawIo) + + self.assertEquals(b"abcd", bufIo.read(6)) + self.assertEquals(b"e", bufIo.read(1)) + self.assertEquals(b"fg", bufIo.read()) + self.assert_(None is bufIo.read()) + self.assertEquals(io.EOF, bufIo.read()) + def testReadToEof(self): - rawIo = MockReadIO((b"abc", b"d", b"efg")) + rawIo = MockIO((b"abc", b"d", b"efg")) bufIo = io.BufferedReader(rawIo) self.assertEquals(b"abcdefg", bufIo.read(9000)) def testReadNoArgs(self): - rawIo = MockReadIO((b"abc", b"d", b"efg")) + rawIo = MockIO((b"abc", b"d", b"efg")) bufIo = io.BufferedReader(rawIo) self.assertEquals(b"abcdefg", bufIo.read()) def testFileno(self): - rawIo = MockReadIO((b"abc", b"d", b"efg")) + rawIo = MockIO((b"abc", b"d", b"efg")) bufIo = io.BufferedReader(rawIo) self.assertEquals(42, bufIo.fileno()) def testFilenoNoFileno(self): - # TODO will we always have fileno() function? If so, kill + # XXX will we always have fileno() function? If so, kill # this test. Else, write it. pass - class BufferedWriterTest(unittest.TestCase): - def testWrite(self): # Write to the buffered IO but don't overflow the buffer. - writer = MockWriteIO() + writer = MockIO() bufIo = io.BufferedWriter(writer, 8) bufIo.write(b"abc") @@ -179,7 +198,7 @@ class BufferedWriterTest(unittest.TestCase): self.assertFalse(writer._writeStack) def testWriteOverflow(self): - writer = MockWriteIO() + writer = MockIO() bufIo = io.BufferedWriter(writer, 8) bufIo.write(b"abc") @@ -187,8 +206,33 @@ class BufferedWriterTest(unittest.TestCase): self.assertEquals(b"abcdefghijkl", writer._writeStack[0]) + def testWriteNonBlocking(self): + raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12)) + bufIo = io.BufferedWriter(raw, 8, 16) + + bufIo.write(b"asdf") + bufIo.write(b"asdfa") + self.assertEquals(b"asdfasdfa", raw._write_stack[0]) + + bufIo.write(b"asdfasdfasdf") + self.assertEquals(b"asdfasdfasdf", raw._write_stack[1]) + bufIo.write(b"asdfasdfasdf") + self.assertEquals(b"dfasdfasdf", raw._write_stack[2]) + self.assertEquals(b"asdfasdfasdf", raw._write_stack[3]) + + bufIo.write(b"asdfasdfasdf") + + # XXX I don't like this test. It relies too heavily on how the algorithm + # actually works, which we might change. Refactor later. + + def testFileno(self): + rawIo = MockIO((b"abc", b"d", b"efg")) + bufIo = io.BufferedWriter(rawIo) + + self.assertEquals(42, bufIo.fileno()) + def testFlush(self): - writer = MockWriteIO() + writer = MockIO() bufIo = io.BufferedWriter(writer, 8) bufIo.write(b"abc") @@ -196,11 +240,51 @@ class BufferedWriterTest(unittest.TestCase): self.assertEquals(b"abc", writer._writeStack[0]) -# TODO. Tests for open() +class BufferedRWPairTest(unittest.TestCase): + def testRWPair(self): + r = MockIO(()) + w = MockIO() + pair = io.BufferedRWPair(r, w) + + # XXX need implementation + +class BufferedRandom(unittest.TestCase): + def testReadAndWrite(self): + raw = MockIO((b"asdf", b"ghjk")) + rw = io.BufferedRandom(raw, 8, 12) + + self.assertEqual(b"as", rw.read(2)) + rw.write(b"ddd") + rw.write(b"eee") + self.assertFalse(raw._writeStack) # Buffer writes + self.assertEqual(b"ghjk", rw.read()) # This read forces write flush + self.assertEquals(b"dddeee", raw._writeStack[0]) + + def testSeekAndTell(self): + raw = io.BytesIO(b"asdfghjkl") + rw = io.BufferedRandom(raw) + + self.assertEquals(b"as", rw.read(2)) + self.assertEquals(2, rw.tell()) + rw.seek(0, 0) + self.assertEquals(b"asdf", rw.read(4)) + + rw.write(b"asdf") + rw.seek(0, 0) + self.assertEquals(b"asdfasdfl", rw.read()) + self.assertEquals(9, rw.tell()) + rw.seek(-4, 2) + self.assertEquals(5, rw.tell()) + rw.seek(2, 1) + self.assertEquals(7, rw.tell()) + self.assertEquals(b"fl", rw.read(11)) + +# XXX Tests for open() def test_main(): test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest, - BufferedWriterTest) + BufferedWriterTest, BufferedRWPairTest, + BufferedRandom) if __name__ == "__main__": test_main() |