summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py144
1 files changed, 114 insertions, 30 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 7384999..1be1b71 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -5,10 +5,10 @@ from test import test_support
import io
-
-class MockReadIO(io.RawIOBase):
- def __init__(self, readStack):
+class MockIO(io.RawIOBase):
+ def __init__(self, readStack=()):
self._readStack = list(readStack)
+ self._writeStack = []
def read(self, n=None):
try:
@@ -16,27 +16,41 @@ class MockReadIO(io.RawIOBase):
except:
return io.EOF
+ def write(self, b):
+ self._writeStack.append(b)
+ return len(b)
+
+ def writable(self):
+ return True
+
def fileno(self):
return 42
def readable(self):
return True
-
-class MockWriteIO(io.RawIOBase):
- def __init__(self):
- self._writeStack = []
-
- def write(self, b):
- self._writeStack.append(b)
- return len(b)
-
- def writeable(self):
+ def seekable(self):
return True
- def fileno(self):
+ def seek(self, pos, whence):
+ pass
+
+ def tell(self):
return 42
+class MockNonBlockWriterIO(io.RawIOBase):
+ def __init__(self, blockingScript):
+ self.bs = list(blockingScript)
+ self._write_stack = []
+ def write(self, b):
+ self._write_stack.append(b)
+ n = self.bs.pop(0)
+ if (n < 0):
+ raise io.BlockingIO(0, "test blocking", -n)
+ else:
+ return n
+ def writable(self):
+ return True
class IOTest(unittest.TestCase):
@@ -90,9 +104,7 @@ class IOTest(unittest.TestCase):
f = io.BytesIO(data)
self.read_ops(f)
-
class BytesIOTest(unittest.TestCase):
-
def testInit(self):
buf = b"1234567890"
bytesIo = io.BytesIO(buf)
@@ -134,44 +146,51 @@ class BytesIOTest(unittest.TestCase):
bytesIo.seek(10000)
self.assertEquals(10000, bytesIo.tell())
-
class BufferedReaderTest(unittest.TestCase):
-
def testRead(self):
- rawIo = MockReadIO((b"abc", b"d", b"efg"))
+ rawIo = MockIO((b"abc", b"d", b"efg"))
bufIo = io.BufferedReader(rawIo)
self.assertEquals(b"abcdef", bufIo.read(6))
+ def testReadNonBlocking(self):
+ # Inject some None's in there to simulate EWOULDBLOCK
+ rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
+ bufIo = io.BufferedReader(rawIo)
+
+ self.assertEquals(b"abcd", bufIo.read(6))
+ self.assertEquals(b"e", bufIo.read(1))
+ self.assertEquals(b"fg", bufIo.read())
+ self.assert_(None is bufIo.read())
+ self.assertEquals(io.EOF, bufIo.read())
+
def testReadToEof(self):
- rawIo = MockReadIO((b"abc", b"d", b"efg"))
+ rawIo = MockIO((b"abc", b"d", b"efg"))
bufIo = io.BufferedReader(rawIo)
self.assertEquals(b"abcdefg", bufIo.read(9000))
def testReadNoArgs(self):
- rawIo = MockReadIO((b"abc", b"d", b"efg"))
+ rawIo = MockIO((b"abc", b"d", b"efg"))
bufIo = io.BufferedReader(rawIo)
self.assertEquals(b"abcdefg", bufIo.read())
def testFileno(self):
- rawIo = MockReadIO((b"abc", b"d", b"efg"))
+ rawIo = MockIO((b"abc", b"d", b"efg"))
bufIo = io.BufferedReader(rawIo)
self.assertEquals(42, bufIo.fileno())
def testFilenoNoFileno(self):
- # TODO will we always have fileno() function? If so, kill
+ # XXX will we always have fileno() function? If so, kill
# this test. Else, write it.
pass
-
class BufferedWriterTest(unittest.TestCase):
-
def testWrite(self):
# Write to the buffered IO but don't overflow the buffer.
- writer = MockWriteIO()
+ writer = MockIO()
bufIo = io.BufferedWriter(writer, 8)
bufIo.write(b"abc")
@@ -179,7 +198,7 @@ class BufferedWriterTest(unittest.TestCase):
self.assertFalse(writer._writeStack)
def testWriteOverflow(self):
- writer = MockWriteIO()
+ writer = MockIO()
bufIo = io.BufferedWriter(writer, 8)
bufIo.write(b"abc")
@@ -187,8 +206,33 @@ class BufferedWriterTest(unittest.TestCase):
self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
+ def testWriteNonBlocking(self):
+ raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
+ bufIo = io.BufferedWriter(raw, 8, 16)
+
+ bufIo.write(b"asdf")
+ bufIo.write(b"asdfa")
+ self.assertEquals(b"asdfasdfa", raw._write_stack[0])
+
+ bufIo.write(b"asdfasdfasdf")
+ self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
+ bufIo.write(b"asdfasdfasdf")
+ self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
+ self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
+
+ bufIo.write(b"asdfasdfasdf")
+
+ # XXX I don't like this test. It relies too heavily on how the algorithm
+ # actually works, which we might change. Refactor later.
+
+ def testFileno(self):
+ rawIo = MockIO((b"abc", b"d", b"efg"))
+ bufIo = io.BufferedWriter(rawIo)
+
+ self.assertEquals(42, bufIo.fileno())
+
def testFlush(self):
- writer = MockWriteIO()
+ writer = MockIO()
bufIo = io.BufferedWriter(writer, 8)
bufIo.write(b"abc")
@@ -196,11 +240,51 @@ class BufferedWriterTest(unittest.TestCase):
self.assertEquals(b"abc", writer._writeStack[0])
-# TODO. Tests for open()
+class BufferedRWPairTest(unittest.TestCase):
+ def testRWPair(self):
+ r = MockIO(())
+ w = MockIO()
+ pair = io.BufferedRWPair(r, w)
+
+ # XXX need implementation
+
+class BufferedRandom(unittest.TestCase):
+ def testReadAndWrite(self):
+ raw = MockIO((b"asdf", b"ghjk"))
+ rw = io.BufferedRandom(raw, 8, 12)
+
+ self.assertEqual(b"as", rw.read(2))
+ rw.write(b"ddd")
+ rw.write(b"eee")
+ self.assertFalse(raw._writeStack) # Buffer writes
+ self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
+ self.assertEquals(b"dddeee", raw._writeStack[0])
+
+ def testSeekAndTell(self):
+ raw = io.BytesIO(b"asdfghjkl")
+ rw = io.BufferedRandom(raw)
+
+ self.assertEquals(b"as", rw.read(2))
+ self.assertEquals(2, rw.tell())
+ rw.seek(0, 0)
+ self.assertEquals(b"asdf", rw.read(4))
+
+ rw.write(b"asdf")
+ rw.seek(0, 0)
+ self.assertEquals(b"asdfasdfl", rw.read())
+ self.assertEquals(9, rw.tell())
+ rw.seek(-4, 2)
+ self.assertEquals(5, rw.tell())
+ rw.seek(2, 1)
+ self.assertEquals(7, rw.tell())
+ self.assertEquals(b"fl", rw.read(11))
+
+# XXX Tests for open()
def test_main():
test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
- BufferedWriterTest)
+ BufferedWriterTest, BufferedRWPairTest,
+ BufferedRandom)
if __name__ == "__main__":
test_main()