diff options
author | Guido van Rossum <guido@python.org> | 2007-03-08 00:43:48 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2007-03-08 00:43:48 (GMT) |
commit | a9e2024b8443959cc906958d161afe9f0d08bc25 (patch) | |
tree | e90d1893c99002ba822e9f7e9b4c41ce79923774 /Lib | |
parent | 4d0f5a4934854207948115b14b4643a6cb600a0d (diff) | |
download | cpython-a9e2024b8443959cc906958d161afe9f0d08bc25.zip cpython-a9e2024b8443959cc906958d161afe9f0d08bc25.tar.gz cpython-a9e2024b8443959cc906958d161afe9f0d08bc25.tar.bz2 |
Check in Daniel Stutzbach's _fileio.c and test_fileio.py
(see SF#1671314) with small tweaks.
The io module now uses this instead of its own implementation
of the FileIO class, if it can import _fileio.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/io.py | 14 | ||||
-rw-r--r-- | Lib/test/test_fileio.py | 227 | ||||
-rw-r--r-- | Lib/test/test_io.py | 46 |
3 files changed, 286 insertions, 1 deletions
@@ -177,7 +177,7 @@ class RawIOBase: raise IOError(".fileno() not supported") -class FileIO(RawIOBase): +class _PyFileIO(RawIOBase): """Raw I/O implementation for OS files.""" @@ -243,6 +243,18 @@ class FileIO(RawIOBase): return self._fd +try: + import _fileio +except ImportError: + # Let's use the Python version + FileIO = _PyFileIO +else: + # Create a trivial subclass with the proper inheritance structure + class FileIO(_fileio._FileIO, RawIOBase): + """Raw I/O implementation for OS files.""" + # XXX More docs + + class SocketIO(RawIOBase): """Raw I/O implementation for stream sockets.""" diff --git a/Lib/test/test_fileio.py b/Lib/test/test_fileio.py new file mode 100644 index 0000000..8ebba2b --- /dev/null +++ b/Lib/test/test_fileio.py @@ -0,0 +1,227 @@ +/* Adapted from test_file.py by Daniel Stutzbach */ + +import sys +import os +import unittest +from array import array +from weakref import proxy + +from test.test_support import TESTFN, findfile, run_unittest +from UserList import UserList + +import _fileio + +class AutoFileTests(unittest.TestCase): + # file tests for which a test file is automatically set up + + def setUp(self): + self.f = _fileio._FileIO(TESTFN, 'w') + + def tearDown(self): + if self.f: + self.f.close() + os.remove(TESTFN) + + def testWeakRefs(self): + # verify weak references + p = proxy(self.f) + p.write(bytes(range(10))) + self.assertEquals(self.f.tell(), p.tell()) + self.f.close() + self.f = None + self.assertRaises(ReferenceError, getattr, p, 'tell') + + def testSeekTell(self): + self.f.write(bytes(range(20))) + self.assertEquals(self.f.tell(), 20) + self.f.seek(0) + self.assertEquals(self.f.tell(), 0) + self.f.seek(10) + self.assertEquals(self.f.tell(), 10) + self.f.seek(5, 1) + self.assertEquals(self.f.tell(), 15) + self.f.seek(-5, 1) + self.assertEquals(self.f.tell(), 10) + self.f.seek(-5, 2) + self.assertEquals(self.f.tell(), 15) + + def testAttributes(self): + # verify expected attributes exist + f = self.f + # XXX do we want these? + #f.name # merely shouldn't blow up + #f.mode # ditto + #f.closed # ditto + + # verify the others aren't + for attr in 'name', 'mode', 'closed': + self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops') + + def testReadinto(self): + # verify readinto + self.f.write(bytes([1, 2])) + self.f.close() + a = array('b', 'x'*10) + self.f = _fileio._FileIO(TESTFN, 'r') + n = self.f.readinto(a) + self.assertEquals(array('b', [1, 2]), a[:n]) + + def testRepr(self): + # verify repr works + return # XXX doesn't work yet + self.assert_(repr(self.f).startswith("<open file '" + TESTFN)) + + def testErrors(self): + f = self.f + self.assert_(not f.isatty()) + #self.assert_(not f.closed) # XXX Do we want to support these? + #self.assertEquals(f.name, TESTFN) + + self.assertRaises(TypeError, f.readinto, "") + f.close() + #self.assert_(f.closed) # XXX + + def testMethods(self): + methods = ['fileno', 'isatty', 'read', 'readinto', + 'seek', 'tell', 'truncate', 'write', 'seekable', + 'readable', 'writable'] + if sys.platform.startswith('atheos'): + methods.remove('truncate') + + # __exit__ should close the file + self.f.__exit__(None, None, None) + #self.assert_(self.f.closed) # XXX + + for methodname in methods: + method = getattr(self.f, methodname) + # should raise on closed file + self.assertRaises(ValueError, method) + + # file is closed, __exit__ shouldn't do anything + self.assertEquals(self.f.__exit__(None, None, None), None) + # it must also return None if an exception was given + try: + 1/0 + except: + self.assertEquals(self.f.__exit__(*sys.exc_info()), None) + + +class OtherFileTests(unittest.TestCase): + + def testAbles(self): + try: + f = _fileio._FileIO(TESTFN, "w") + self.assertEquals(f.readable(), False) + self.assertEquals(f.writable(), True) + self.assertEquals(f.seekable(), True) + f.close() + + f = _fileio._FileIO(TESTFN, "r") + self.assertEquals(f.readable(), True) + self.assertEquals(f.writable(), False) + self.assertEquals(f.seekable(), True) + f.close() + + f = _fileio._FileIO(TESTFN, "a+") + self.assertEquals(f.readable(), True) + self.assertEquals(f.writable(), True) + self.assertEquals(f.seekable(), True) + self.assertEquals(f.isatty(), False) + f.close() + + f = _fileio._FileIO("/dev/tty", "a") # XXX, won't work on e.g., Windows + self.assertEquals(f.readable(), False) + self.assertEquals(f.writable(), True) + ##self.assertEquals(f.seekable(), False) # XXX True on OSX!? + self.assertEquals(f.isatty(), True) + f.close() + finally: + os.unlink(TESTFN) + + def testModeStrings(self): + # check invalid mode strings + for mode in ("", "aU", "wU+", "rb", "rt"): + try: + f = _fileio._FileIO(TESTFN, mode) + except ValueError: + pass + else: + f.close() + self.fail('%r is an invalid file mode' % mode) + + def testStdin(self): + ## This causes the interpreter to exit on OSF1 v5.1. + #if sys.platform != 'osf1V5': + # self.assertRaises(IOError, sys.stdin.seek, -1) + #else: + # print(( + # ' Skipping sys.stdin.seek(-1), it may crash the interpreter.' + # ' Test manually.'), file=sys.__stdout__) + #self.assertRaises(IOError, sys.stdin.truncate) + # XXX Comment this out since sys.stdin is currently an old style file + pass + + def testUnicodeOpen(self): + # verify repr works for unicode too + f = _fileio._FileIO(unicode(TESTFN), "w") + # XXX doesn't work yet: + ##self.assert_(repr(f).startswith("<open file u'" + TESTFN)) + f.close() + os.unlink(TESTFN) + + def testBadModeArgument(self): + # verify that we get a sensible error message for bad mode argument + bad_mode = "qwerty" + try: + f = _fileio._FileIO(TESTFN, bad_mode) + except ValueError as msg: + if msg.message != 0: + s = str(msg) + if s.find(TESTFN) != -1 or s.find(bad_mode) == -1: + self.fail("bad error message for invalid mode: %s" % s) + # if msg[0] == 0, we're probably on Windows where there may be + # no obvious way to discover why open() failed. + else: + f.close() + self.fail("no error for invalid mode: %s" % bad_mode) + + def testTruncateOnWindows(self): + def bug801631(): + # SF bug <http://www.python.org/sf/801631> + # "file.truncate fault on windows" + f = _fileio._FileIO(TESTFN, 'w') + f.write(bytes(range(11))) + f.close() + + f = _fileio._FileIO(TESTFN,'r+') + data = f.read(5) + if data != bytes(range(5)): + self.fail("Read on file opened for update failed %r" % data) + if f.tell() != 5: + self.fail("File pos after read wrong %d" % f.tell()) + + f.truncate() + if f.tell() != 5: + self.fail("File pos after ftruncate wrong %d" % f.tell()) + + f.close() + size = os.path.getsize(TESTFN) + if size != 5: + self.fail("File size after ftruncate wrong %d" % size) + + try: + bug801631() + finally: + os.unlink(TESTFN) + +def test_main(): + # Historically, these tests have been sloppy about removing TESTFN. + # So get rid of it no matter what. + try: + run_unittest(AutoFileTests, OtherFileTests) + finally: + if os.path.exists(TESTFN): + os.unlink(TESTFN) + +if __name__ == '__main__': + test_main() diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 193130e..956a502 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -5,7 +5,9 @@ from test import test_support import io + class MockIO(io.RawIOBase): + def __init__(self, readStack=()): self._readStack = list(readStack) self._writeStack = [] @@ -38,10 +40,13 @@ class MockIO(io.RawIOBase): def tell(self): return 42 + class MockNonBlockWriterIO(io.RawIOBase): + def __init__(self, blockingScript): self.bs = list(blockingScript) self._write_stack = [] + def write(self, b): self._write_stack.append(b) n = self.bs.pop(0) @@ -49,9 +54,11 @@ class MockNonBlockWriterIO(io.RawIOBase): raise io.BlockingIO(0, "test blocking", -n) else: return n + def writable(self): return True + class IOTest(unittest.TestCase): def tearDown(self): @@ -110,7 +117,38 @@ class IOTest(unittest.TestCase): f = io.BytesIO(data) self.read_ops(f) + def test_fileio_FileIO(self): + import _fileio + f = _fileio._FileIO(test_support.TESTFN, "w") + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + self.write_ops(f) + f.close() + f = _fileio._FileIO(test_support.TESTFN, "r") + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), False) + self.assertEqual(f.seekable(), True) + self.read_ops(f) + f.close() + + def test_PyFileIO(self): + f = io._PyFileIO(test_support.TESTFN, "w") + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + self.write_ops(f) + f.close() + f = io._PyFileIO(test_support.TESTFN, "r") + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), False) + self.assertEqual(f.seekable(), True) + self.read_ops(f) + f.close() + + class BytesIOTest(unittest.TestCase): + def testInit(self): buf = b"1234567890" bytesIo = io.BytesIO(buf) @@ -152,7 +190,9 @@ class BytesIOTest(unittest.TestCase): bytesIo.seek(10000) self.assertEquals(10000, bytesIo.tell()) + class BufferedReaderTest(unittest.TestCase): + def testRead(self): rawIo = MockIO((b"abc", b"d", b"efg")) bufIo = io.BufferedReader(rawIo) @@ -193,7 +233,9 @@ class BufferedReaderTest(unittest.TestCase): # this test. Else, write it. pass + class BufferedWriterTest(unittest.TestCase): + def testWrite(self): # Write to the buffered IO but don't overflow the buffer. writer = MockIO() @@ -246,7 +288,9 @@ class BufferedWriterTest(unittest.TestCase): self.assertEquals(b"abc", writer._writeStack[0]) + class BufferedRWPairTest(unittest.TestCase): + def testRWPair(self): r = MockIO(()) w = MockIO() @@ -254,7 +298,9 @@ class BufferedRWPairTest(unittest.TestCase): # XXX need implementation + class BufferedRandom(unittest.TestCase): + def testReadAndWrite(self): raw = MockIO((b"asdf", b"ghjk")) rw = io.BufferedRandom(raw, 8, 12) |