summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2015-04-10 13:16:16 (GMT)
committerSerhiy Storchaka <storchaka@gmail.com>2015-04-10 13:16:16 (GMT)
commit71fd224af0c8cb15039fce3c8aaeabb7215518df (patch)
treed50c2913a4d14c3ddc9c56775171c380835e2bb5 /Lib/test
parentcd092efb16113db0ac24a44dd5584552cd10a4ca (diff)
downloadcpython-71fd224af0c8cb15039fce3c8aaeabb7215518df.zip
cpython-71fd224af0c8cb15039fce3c8aaeabb7215518df.tar.gz
cpython-71fd224af0c8cb15039fce3c8aaeabb7215518df.tar.bz2
Issue #21859: Added Python implementation of io.FileIO.
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_file_eintr.py40
-rw-r--r--Lib/test/test_fileio.py180
2 files changed, 154 insertions, 66 deletions
diff --git a/Lib/test/test_file_eintr.py b/Lib/test/test_file_eintr.py
index b4e18ce..65a4369 100644
--- a/Lib/test/test_file_eintr.py
+++ b/Lib/test/test_file_eintr.py
@@ -18,11 +18,12 @@ import time
import unittest
# Test import all of the things we're about to try testing up front.
-from _io import FileIO
+import _io
+import _pyio
@unittest.skipUnless(os.name == 'posix', 'tests requires a posix system.')
-class TestFileIOSignalInterrupt(unittest.TestCase):
+class TestFileIOSignalInterrupt:
def setUp(self):
self._process = None
@@ -38,8 +39,9 @@ class TestFileIOSignalInterrupt(unittest.TestCase):
subclasseses should override this to test different IO objects.
"""
- return ('import _io ;'
- 'infile = _io.FileIO(sys.stdin.fileno(), "rb")')
+ return ('import %s as io ;'
+ 'infile = io.FileIO(sys.stdin.fileno(), "rb")' %
+ self.modname)
def fail_with_process_info(self, why, stdout=b'', stderr=b'',
communicate=True):
@@ -179,11 +181,19 @@ class TestFileIOSignalInterrupt(unittest.TestCase):
expected=b'hello\nworld!\n'))
+class CTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
+ modname = '_io'
+
+class PyTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
+ modname = '_pyio'
+
+
class TestBufferedIOSignalInterrupt(TestFileIOSignalInterrupt):
def _generate_infile_setup_code(self):
"""Returns the infile = ... line of code to make a BufferedReader."""
- return ('infile = open(sys.stdin.fileno(), "rb") ;'
- 'import _io ;assert isinstance(infile, _io.BufferedReader)')
+ return ('import %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;'
+ 'assert isinstance(infile, io.BufferedReader)' %
+ self.modname)
def test_readall(self):
"""BufferedReader.read() must handle signals and not lose data."""
@@ -193,12 +203,20 @@ class TestBufferedIOSignalInterrupt(TestFileIOSignalInterrupt):
read_method_name='read',
expected=b'hello\nworld!\n'))
+class CTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase):
+ modname = '_io'
+
+class PyTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase):
+ modname = '_pyio'
+
class TestTextIOSignalInterrupt(TestFileIOSignalInterrupt):
def _generate_infile_setup_code(self):
"""Returns the infile = ... line of code to make a TextIOWrapper."""
- return ('infile = open(sys.stdin.fileno(), "rt", newline=None) ;'
- 'import _io ;assert isinstance(infile, _io.TextIOWrapper)')
+ return ('import %s as io ;'
+ 'infile = io.open(sys.stdin.fileno(), "rt", newline=None) ;'
+ 'assert isinstance(infile, io.TextIOWrapper)' %
+ self.modname)
def test_readline(self):
"""readline() must handle signals and not lose data."""
@@ -224,6 +242,12 @@ class TestTextIOSignalInterrupt(TestFileIOSignalInterrupt):
read_method_name='read',
expected="hello\nworld!\n"))
+class CTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase):
+ modname = '_io'
+
+class PyTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase):
+ modname = '_pyio'
+
def test_main():
test_cases = [
diff --git a/Lib/test/test_fileio.py b/Lib/test/test_fileio.py
index 743ca5c..35b30a6 100644
--- a/Lib/test/test_fileio.py
+++ b/Lib/test/test_fileio.py
@@ -12,13 +12,15 @@ from functools import wraps
from test.support import TESTFN, check_warnings, run_unittest, make_bad_fd, cpython_only
from collections import UserList
-from _io import FileIO as _FileIO
+import _io # C implementation of io
+import _pyio # Python implementation of io
-class AutoFileTests(unittest.TestCase):
+
+class AutoFileTests:
# file tests for which a test file is automatically set up
def setUp(self):
- self.f = _FileIO(TESTFN, 'w')
+ self.f = self.FileIO(TESTFN, 'w')
def tearDown(self):
if self.f:
@@ -69,20 +71,60 @@ class AutoFileTests(unittest.TestCase):
blksize = getattr(fst, 'st_blksize', blksize)
self.assertEqual(self.f._blksize, blksize)
- def testReadinto(self):
- # verify readinto
- self.f.write(bytes([1, 2]))
+ # verify readinto
+ def testReadintoByteArray(self):
+ self.f.write(bytes([1, 2, 0, 255]))
self.f.close()
- a = array('b', b'x'*10)
- self.f = _FileIO(TESTFN, 'r')
- n = self.f.readinto(a)
- self.assertEqual(array('b', [1, 2]), a[:n])
+
+ ba = bytearray(b'abcdefgh')
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(ba)
+ self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
+ self.assertEqual(n, 4)
+
+ def _testReadintoMemoryview(self):
+ self.f.write(bytes([1, 2, 0, 255]))
+ self.f.close()
+
+ m = memoryview(bytearray(b'abcdefgh'))
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(m)
+ self.assertEqual(m, b'\x01\x02\x00\xffefgh')
+ self.assertEqual(n, 4)
+
+ m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(m)
+ self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
+ self.assertEqual(n, 4)
+
+ def _testReadintoArray(self):
+ self.f.write(bytes([1, 2, 0, 255]))
+ self.f.close()
+
+ a = array('B', b'abcdefgh')
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(a)
+ self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
+ self.assertEqual(n, 4)
+
+ a = array('b', b'abcdefgh')
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(a)
+ self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
+ self.assertEqual(n, 4)
+
+ a = array('I', b'abcdefgh')
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(a)
+ self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
+ self.assertEqual(n, 4)
def testWritelinesList(self):
l = [b'123', b'456']
self.f.writelines(l)
self.f.close()
- self.f = _FileIO(TESTFN, 'rb')
+ self.f = self.FileIO(TESTFN, 'rb')
buf = self.f.read()
self.assertEqual(buf, b'123456')
@@ -90,7 +132,7 @@ class AutoFileTests(unittest.TestCase):
l = UserList([b'123', b'456'])
self.f.writelines(l)
self.f.close()
- self.f = _FileIO(TESTFN, 'rb')
+ self.f = self.FileIO(TESTFN, 'rb')
buf = self.f.read()
self.assertEqual(buf, b'123456')
@@ -102,7 +144,7 @@ class AutoFileTests(unittest.TestCase):
def test_none_args(self):
self.f.write(b"hi\nbye\nabc")
self.f.close()
- self.f = _FileIO(TESTFN, 'r')
+ self.f = self.FileIO(TESTFN, 'r')
self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
self.f.seek(0)
self.assertEqual(self.f.readline(None), b"hi\n")
@@ -112,23 +154,24 @@ class AutoFileTests(unittest.TestCase):
self.assertRaises(TypeError, self.f.write, "Hello!")
def testRepr(self):
- self.assertEqual(
- repr(self.f), "<_io.FileIO name=%r mode=%r closefd=True>"
- % (self.f.name, self.f.mode))
+ self.assertEqual(repr(self.f),
+ "<%s.FileIO name=%r mode=%r closefd=True>" %
+ (self.modulename, self.f.name, self.f.mode))
del self.f.name
- self.assertEqual(
- repr(self.f), "<_io.FileIO fd=%r mode=%r closefd=True>"
- % (self.f.fileno(), self.f.mode))
+ self.assertEqual(repr(self.f),
+ "<%s.FileIO fd=%r mode=%r closefd=True>" %
+ (self.modulename, self.f.fileno(), self.f.mode))
self.f.close()
- self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
+ self.assertEqual(repr(self.f),
+ "<%s.FileIO [closed]>" % (self.modulename,))
def testReprNoCloseFD(self):
fd = os.open(TESTFN, os.O_RDONLY)
try:
- with _FileIO(fd, 'r', closefd=False) as f:
+ with self.FileIO(fd, 'r', closefd=False) as f:
self.assertEqual(repr(f),
- "<_io.FileIO name=%r mode=%r closefd=False>"
- % (f.name, f.mode))
+ "<%s.FileIO name=%r mode=%r closefd=False>" %
+ (self.modulename, f.name, f.mode))
finally:
os.close(fd)
@@ -140,15 +183,15 @@ class AutoFileTests(unittest.TestCase):
self.assertRaises(ValueError, f.read, 10) # Open for reading
f.close()
self.assertTrue(f.closed)
- f = _FileIO(TESTFN, 'r')
+ f = self.FileIO(TESTFN, 'r')
self.assertRaises(TypeError, f.readinto, "")
self.assertTrue(not f.closed)
f.close()
self.assertTrue(f.closed)
def testMethods(self):
- methods = ['fileno', 'isatty', 'read', 'readinto',
- 'seek', 'tell', 'truncate', 'write', 'seekable',
+ methods = ['fileno', 'isatty', 'read',
+ 'tell', 'truncate', 'seekable',
'readable', 'writable']
self.f.close()
@@ -158,13 +201,16 @@ class AutoFileTests(unittest.TestCase):
method = getattr(self.f, methodname)
# should raise on closed file
self.assertRaises(ValueError, method)
+ self.assertRaises(ValueError, self.f.readinto, bytearray())
+ self.assertRaises(ValueError, self.f.seek, 0, os.SEEK_CUR)
+ self.assertRaises(ValueError, self.f.write, b'')
def testOpendir(self):
# Issue 3703: opening a directory should fill the errno
# Windows always returns "[Errno 13]: Permission denied
# Unix uses fstat and returns "[Errno 21]: Is a directory"
try:
- _FileIO('.', 'r')
+ self.FileIO('.', 'r')
except OSError as e:
self.assertNotEqual(e.errno, 0)
self.assertEqual(e.filename, ".")
@@ -175,7 +221,7 @@ class AutoFileTests(unittest.TestCase):
def testOpenDirFD(self):
fd = os.open('.', os.O_RDONLY)
with self.assertRaises(OSError) as cm:
- _FileIO(fd, 'r')
+ self.FileIO(fd, 'r')
os.close(fd)
self.assertEqual(cm.exception.errno, errno.EISDIR)
@@ -260,7 +306,7 @@ class AutoFileTests(unittest.TestCase):
self.f.close()
except OSError:
pass
- self.f = _FileIO(TESTFN, 'r')
+ self.f = self.FileIO(TESTFN, 'r')
os.close(self.f.fileno())
return self.f
@@ -280,23 +326,32 @@ class AutoFileTests(unittest.TestCase):
a = array('b', b'x'*10)
f.readinto(a)
-class OtherFileTests(unittest.TestCase):
+class CAutoFileTests(AutoFileTests, unittest.TestCase):
+ FileIO = _io.FileIO
+ modulename = '_io'
+
+class PyAutoFileTests(AutoFileTests, unittest.TestCase):
+ FileIO = _pyio.FileIO
+ modulename = '_pyio'
+
+
+class OtherFileTests:
def testAbles(self):
try:
- f = _FileIO(TESTFN, "w")
+ f = self.FileIO(TESTFN, "w")
self.assertEqual(f.readable(), False)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
f.close()
- f = _FileIO(TESTFN, "r")
+ f = self.FileIO(TESTFN, "r")
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), False)
self.assertEqual(f.seekable(), True)
f.close()
- f = _FileIO(TESTFN, "a+")
+ f = self.FileIO(TESTFN, "a+")
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
@@ -305,7 +360,7 @@ class OtherFileTests(unittest.TestCase):
if sys.platform != "win32":
try:
- f = _FileIO("/dev/tty", "a")
+ f = self.FileIO("/dev/tty", "a")
except OSError:
# When run in a cron job there just aren't any
# ttys, so skip the test. This also handles other
@@ -328,7 +383,7 @@ class OtherFileTests(unittest.TestCase):
# check invalid mode strings
for mode in ("", "aU", "wU+", "rw", "rt"):
try:
- f = _FileIO(TESTFN, mode)
+ f = self.FileIO(TESTFN, mode)
except ValueError:
pass
else:
@@ -344,7 +399,7 @@ class OtherFileTests(unittest.TestCase):
('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
# read modes are last so that TESTFN will exist first
- with _FileIO(TESTFN, modes[0]) as f:
+ with self.FileIO(TESTFN, modes[0]) as f:
self.assertEqual(f.mode, modes[1])
finally:
if os.path.exists(TESTFN):
@@ -352,7 +407,7 @@ class OtherFileTests(unittest.TestCase):
def testUnicodeOpen(self):
# verify repr works for unicode too
- f = _FileIO(str(TESTFN), "w")
+ f = self.FileIO(str(TESTFN), "w")
f.close()
os.unlink(TESTFN)
@@ -362,7 +417,7 @@ class OtherFileTests(unittest.TestCase):
fn = TESTFN.encode("ascii")
except UnicodeEncodeError:
self.skipTest('could not encode %r to ascii' % TESTFN)
- f = _FileIO(fn, "w")
+ f = self.FileIO(fn, "w")
try:
f.write(b"abc")
f.close()
@@ -373,28 +428,21 @@ class OtherFileTests(unittest.TestCase):
def testConstructorHandlesNULChars(self):
fn_with_NUL = 'foo\0bar'
- self.assertRaises(ValueError, _FileIO, fn_with_NUL, 'w')
- self.assertRaises(ValueError, _FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
+ self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
+ self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
def testInvalidFd(self):
- self.assertRaises(ValueError, _FileIO, -10)
- self.assertRaises(OSError, _FileIO, make_bad_fd())
+ self.assertRaises(ValueError, self.FileIO, -10)
+ self.assertRaises(OSError, self.FileIO, make_bad_fd())
if sys.platform == 'win32':
import msvcrt
self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
- @cpython_only
- def testInvalidFd_overflow(self):
- # Issue 15989
- import _testcapi
- self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
- self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
-
def testBadModeArgument(self):
# verify that we get a sensible error message for bad mode argument
bad_mode = "qwerty"
try:
- f = _FileIO(TESTFN, bad_mode)
+ f = self.FileIO(TESTFN, bad_mode)
except ValueError as msg:
if msg.args[0] != 0:
s = str(msg)
@@ -407,7 +455,7 @@ class OtherFileTests(unittest.TestCase):
self.fail("no error for invalid mode: %s" % bad_mode)
def testTruncate(self):
- f = _FileIO(TESTFN, 'w')
+ f = self.FileIO(TESTFN, 'w')
f.write(bytes(bytearray(range(10))))
self.assertEqual(f.tell(), 10)
f.truncate(5)
@@ -422,11 +470,11 @@ class OtherFileTests(unittest.TestCase):
def bug801631():
# SF bug <http://www.python.org/sf/801631>
# "file.truncate fault on windows"
- f = _FileIO(TESTFN, 'w')
+ f = self.FileIO(TESTFN, 'w')
f.write(bytes(range(11)))
f.close()
- f = _FileIO(TESTFN,'r+')
+ f = self.FileIO(TESTFN,'r+')
data = f.read(5)
if data != bytes(range(5)):
self.fail("Read on file opened for update failed %r" % data)
@@ -466,19 +514,19 @@ class OtherFileTests(unittest.TestCase):
pass
def testInvalidInit(self):
- self.assertRaises(TypeError, _FileIO, "1", 0, 0)
+ self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
def testWarnings(self):
with check_warnings(quiet=True) as w:
self.assertEqual(w.warnings, [])
- self.assertRaises(TypeError, _FileIO, [])
+ self.assertRaises(TypeError, self.FileIO, [])
self.assertEqual(w.warnings, [])
- self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
+ self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
self.assertEqual(w.warnings, [])
def testUnclosedFDOnException(self):
class MyException(Exception): pass
- class MyFileIO(_FileIO):
+ class MyFileIO(self.FileIO):
def __setattr__(self, name, value):
if name == "name":
raise MyException("blocked setting name")
@@ -487,12 +535,28 @@ class OtherFileTests(unittest.TestCase):
self.assertRaises(MyException, MyFileIO, fd)
os.close(fd) # should not raise OSError(EBADF)
+class COtherFileTests(OtherFileTests, unittest.TestCase):
+ FileIO = _io.FileIO
+ modulename = '_io'
+
+ @cpython_only
+ def testInvalidFd_overflow(self):
+ # Issue 15989
+ import _testcapi
+ self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
+ self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
+
+class PyOtherFileTests(OtherFileTests, unittest.TestCase):
+ FileIO = _pyio.FileIO
+ modulename = '_pyio'
+
def test_main():
# Historically, these tests have been sloppy about removing TESTFN.
# So get rid of it no matter what.
try:
- run_unittest(AutoFileTests, OtherFileTests)
+ run_unittest(CAutoFileTests, PyAutoFileTests,
+ COtherFileTests, PyOtherFileTests)
finally:
if os.path.exists(TESTFN):
os.unlink(TESTFN)