summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/_pyio.py344
-rw-r--r--Lib/test/test_file_eintr.py40
-rw-r--r--Lib/test/test_fileio.py180
-rw-r--r--Misc/NEWS2
4 files changed, 500 insertions, 66 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py
index 1df44cc..400a56a 100644
--- a/Lib/_pyio.py
+++ b/Lib/_pyio.py
@@ -7,11 +7,16 @@ import abc
import codecs
import errno
import array
+import stat
# Import _thread instead of threading to reduce startup cost
try:
from _thread import allocate_lock as Lock
except ImportError:
from _dummy_thread import allocate_lock as Lock
+if os.name == 'win32':
+ from msvcrt import setmode as _setmode
+else:
+ _setmode = None
import io
from io import (__all__, SEEK_SET, SEEK_CUR, SEEK_END)
@@ -1378,6 +1383,345 @@ class BufferedRandom(BufferedWriter, BufferedReader):
return BufferedWriter.write(self, b)
+class FileIO(RawIOBase):
+ _fd = -1
+ _created = False
+ _readable = False
+ _writable = False
+ _appending = False
+ _seekable = None
+ _closefd = True
+
+ def __init__(self, file, mode='r', closefd=True, opener=None):
+ """Open a file. The mode can be 'r' (default), 'w', 'x' or 'a' for reading,
+ writing, exclusive creation or appending. The file will be created if it
+ doesn't exist when opened for writing or appending; it will be truncated
+ when opened for writing. A FileExistsError will be raised if it already
+ exists when opened for creating. Opening a file for creating implies
+ writing so this mode behaves in a similar way to 'w'. Add a '+' to the mode
+ to allow simultaneous reading and writing. A custom opener can be used by
+ passing a callable as *opener*. The underlying file descriptor for the file
+ object is then obtained by calling opener with (*name*, *flags*).
+ *opener* must return an open file descriptor (passing os.open as *opener*
+ results in functionality similar to passing None).
+ """
+ if self._fd >= 0:
+ # Have to close the existing file first.
+ try:
+ if self._closefd:
+ os.close(self._fd)
+ finally:
+ self._fd = -1
+
+ if isinstance(file, float):
+ raise TypeError('integer argument expected, got float')
+ if isinstance(file, int):
+ fd = file
+ if fd < 0:
+ raise ValueError('negative file descriptor')
+ else:
+ fd = -1
+
+ if not isinstance(mode, str):
+ raise TypeError('invalid mode: %s' % (mode,))
+ if not set(mode) <= set('xrwab+'):
+ raise ValueError('invalid mode: %s' % (mode,))
+ if sum(c in 'rwax' for c in mode) != 1 or mode.count('+') > 1:
+ raise ValueError('Must have exactly one of create/read/write/append '
+ 'mode and at most one plus')
+
+ if 'x' in mode:
+ self._created = True
+ self._writable = True
+ flags = os.O_EXCL | os.O_CREAT
+ elif 'r' in mode:
+ self._readable = True
+ flags = 0
+ elif 'w' in mode:
+ self._writable = True
+ flags = os.O_CREAT | os.O_TRUNC
+ elif 'a' in mode:
+ self._writable = True
+ self._appending = True
+ flags = os.O_APPEND | os.O_CREAT
+
+ if '+' in mode:
+ self._readable = True
+ self._writable = True
+
+ if self._readable and self._writable:
+ flags |= os.O_RDWR
+ elif self._readable:
+ flags |= os.O_RDONLY
+ else:
+ flags |= os.O_WRONLY
+
+ flags |= getattr(os, 'O_BINARY', 0)
+
+ noinherit_flag = (getattr(os, 'O_NOINHERIT', 0) or
+ getattr(os, 'O_CLOEXEC', 0))
+ flags |= noinherit_flag
+
+ owned_fd = None
+ try:
+ if fd < 0:
+ if not closefd:
+ raise ValueError('Cannot use closefd=False with file name')
+ if opener is None:
+ fd = os.open(file, flags, 0o666)
+ else:
+ fd = opener(file, flags)
+ if not isinstance(fd, int):
+ raise TypeError('expected integer from opener')
+ if fd < 0:
+ raise OSError('Negative file descriptor')
+ owned_fd = fd
+ if not noinherit_flag:
+ os.set_inheritable(fd, False)
+
+ self._closefd = closefd
+ fdfstat = os.fstat(fd)
+ try:
+ if stat.S_ISDIR(fdfstat.st_mode):
+ raise IsADirectoryError(errno.EISDIR,
+ os.strerror(errno.EISDIR), file)
+ except AttributeError:
+ # Ignore the AttribueError if stat.S_ISDIR or errno.EISDIR
+ # don't exist.
+ pass
+ self._blksize = getattr(fdfstat, 'st_blksize', 0)
+ if self._blksize <= 1:
+ self._blksize = DEFAULT_BUFFER_SIZE
+
+ if _setmode:
+ # don't translate newlines (\r\n <=> \n)
+ _setmode(fd, os.O_BINARY)
+
+ self.name = file
+ if self._appending:
+ # For consistent behaviour, we explicitly seek to the
+ # end of file (otherwise, it might be done only on the
+ # first write()).
+ os.lseek(fd, 0, SEEK_END)
+ except:
+ if owned_fd is not None:
+ os.close(owned_fd)
+ raise
+ self._fd = fd
+
+ def __del__(self):
+ if self._fd >= 0 and self._closefd and not self.closed:
+ import warnings
+ warnings.warn('unclosed file %r' % (self,), ResourceWarning,
+ stacklevel=2)
+ self.close()
+
+ def __getstate__(self):
+ raise TypeError("cannot serialize '%s' object", self.__class__.__name__)
+
+ def __repr__(self):
+ class_name = '%s.%s' % (self.__class__.__module__,
+ self.__class__.__qualname__)
+ if self.closed:
+ return '<%s [closed]>' % class_name
+ try:
+ name = self.name
+ except AttributeError:
+ return ('<%s fd=%d mode=%r closefd=%r>' %
+ (class_name, self._fd, self.mode, self._closefd))
+ else:
+ return ('<%s name=%r mode=%r closefd=%r>' %
+ (class_name, name, self.mode, self._closefd))
+
+ def _checkReadable(self):
+ if not self._readable:
+ raise UnsupportedOperation('File not open for reading')
+
+ def _checkWritable(self, msg=None):
+ if not self._writable:
+ raise UnsupportedOperation('File not open for writing')
+
+ def read(self, size=None):
+ """Read at most size bytes, returned as bytes.
+
+ Only makes one system call, so less data may be returned than requested
+ In non-blocking mode, returns None if no data is available.
+ Return an empty bytes object at EOF.
+ """
+ self._checkClosed()
+ self._checkReadable()
+ if size is None or size < 0:
+ return self.readall()
+ try:
+ return os.read(self._fd, size)
+ except BlockingIOError:
+ return None
+
+ def readall(self):
+ """Read all data from the file, returned as bytes.
+
+ In non-blocking mode, returns as much as is immediately available,
+ or None if no data is available. Return an empty bytes object at EOF.
+ """
+ self._checkClosed()
+ self._checkReadable()
+ bufsize = DEFAULT_BUFFER_SIZE
+ try:
+ pos = os.lseek(self._fd, 0, SEEK_CUR)
+ end = os.fstat(self._fd).st_size
+ if end >= pos:
+ bufsize = end - pos + 1
+ except OSError:
+ pass
+
+ result = bytearray()
+ while True:
+ if len(result) >= bufsize:
+ bufsize = len(result)
+ bufsize += max(bufsize, DEFAULT_BUFFER_SIZE)
+ n = bufsize - len(result)
+ try:
+ chunk = os.read(self._fd, n)
+ except BlockingIOError:
+ if result:
+ break
+ return None
+ if not chunk: # reached the end of the file
+ break
+ result += chunk
+
+ return bytes(result)
+
+ def readinto(self, b):
+ """Same as RawIOBase.readinto()."""
+ m = memoryview(b).cast('B')
+ data = self.read(len(m))
+ n = len(data)
+ m[:n] = data
+ return n
+
+ def write(self, b):
+ """Write bytes b to file, return number written.
+
+ Only makes one system call, so not all of the data may be written.
+ The number of bytes actually written is returned. In non-blocking mode,
+ returns None if the write would block.
+ """
+ self._checkClosed()
+ self._checkWritable()
+ try:
+ return os.write(self._fd, b)
+ except BlockingIOError:
+ return None
+
+ def seek(self, pos, whence=SEEK_SET):
+ """Move to new file position.
+
+ Argument offset is a byte count. Optional argument whence defaults to
+ SEEK_SET or 0 (offset from start of file, offset should be >= 0); other values
+ are SEEK_CUR or 1 (move relative to current position, positive or negative),
+ and SEEK_END or 2 (move relative to end of file, usually negative, although
+ many platforms allow seeking beyond the end of a file).
+
+ Note that not all file objects are seekable.
+ """
+ if isinstance(pos, float):
+ raise TypeError('an integer is required')
+ self._checkClosed()
+ return os.lseek(self._fd, pos, whence)
+
+ def tell(self):
+ """tell() -> int. Current file position.
+
+ Can raise OSError for non seekable files."""
+ self._checkClosed()
+ return os.lseek(self._fd, 0, SEEK_CUR)
+
+ def truncate(self, size=None):
+ """Truncate the file to at most size bytes.
+
+ Size defaults to the current file position, as returned by tell().
+ The current file position is changed to the value of size.
+ """
+ self._checkClosed()
+ self._checkWritable()
+ if size is None:
+ size = self.tell()
+ os.ftruncate(self._fd, size)
+ return size
+
+ def close(self):
+ """Close the file.
+
+ A closed file cannot be used for further I/O operations. close() may be
+ called more than once without error.
+ """
+ if not self.closed:
+ try:
+ if self._closefd:
+ os.close(self._fd)
+ finally:
+ super().close()
+
+ def seekable(self):
+ """True if file supports random-access."""
+ self._checkClosed()
+ if self._seekable is None:
+ try:
+ self.tell()
+ except OSError:
+ self._seekable = False
+ else:
+ self._seekable = True
+ return self._seekable
+
+ def readable(self):
+ """True if file was opened in a read mode."""
+ self._checkClosed()
+ return self._readable
+
+ def writable(self):
+ """True if file was opened in a write mode."""
+ self._checkClosed()
+ return self._writable
+
+ def fileno(self):
+ """Return the underlying file descriptor (an integer)."""
+ self._checkClosed()
+ return self._fd
+
+ def isatty(self):
+ """True if the file is connected to a TTY device."""
+ self._checkClosed()
+ return os.isatty(self._fd)
+
+ @property
+ def closefd(self):
+ """True if the file descriptor will be closed by close()."""
+ return self._closefd
+
+ @property
+ def mode(self):
+ """String giving the file mode"""
+ if self._created:
+ if self._readable:
+ return 'xb+'
+ else:
+ return 'xb'
+ elif self._appending:
+ if self._readable:
+ return 'ab+'
+ else:
+ return 'ab'
+ elif self._readable:
+ if self._writable:
+ return 'rb+'
+ else:
+ return 'rb'
+ else:
+ return 'wb'
+
+
class TextIOBase(IOBase):
"""Base class for text I/O.
diff --git a/Lib/test/test_file_eintr.py b/Lib/test/test_file_eintr.py
index b4e18ce..65a4369 100644
--- a/Lib/test/test_file_eintr.py
+++ b/Lib/test/test_file_eintr.py
@@ -18,11 +18,12 @@ import time
import unittest
# Test import all of the things we're about to try testing up front.
-from _io import FileIO
+import _io
+import _pyio
@unittest.skipUnless(os.name == 'posix', 'tests requires a posix system.')
-class TestFileIOSignalInterrupt(unittest.TestCase):
+class TestFileIOSignalInterrupt:
def setUp(self):
self._process = None
@@ -38,8 +39,9 @@ class TestFileIOSignalInterrupt(unittest.TestCase):
subclasseses should override this to test different IO objects.
"""
- return ('import _io ;'
- 'infile = _io.FileIO(sys.stdin.fileno(), "rb")')
+ return ('import %s as io ;'
+ 'infile = io.FileIO(sys.stdin.fileno(), "rb")' %
+ self.modname)
def fail_with_process_info(self, why, stdout=b'', stderr=b'',
communicate=True):
@@ -179,11 +181,19 @@ class TestFileIOSignalInterrupt(unittest.TestCase):
expected=b'hello\nworld!\n'))
+class CTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
+ modname = '_io'
+
+class PyTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
+ modname = '_pyio'
+
+
class TestBufferedIOSignalInterrupt(TestFileIOSignalInterrupt):
def _generate_infile_setup_code(self):
"""Returns the infile = ... line of code to make a BufferedReader."""
- return ('infile = open(sys.stdin.fileno(), "rb") ;'
- 'import _io ;assert isinstance(infile, _io.BufferedReader)')
+ return ('import %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;'
+ 'assert isinstance(infile, io.BufferedReader)' %
+ self.modname)
def test_readall(self):
"""BufferedReader.read() must handle signals and not lose data."""
@@ -193,12 +203,20 @@ class TestBufferedIOSignalInterrupt(TestFileIOSignalInterrupt):
read_method_name='read',
expected=b'hello\nworld!\n'))
+class CTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase):
+ modname = '_io'
+
+class PyTestBufferedIOSignalInterrupt(TestBufferedIOSignalInterrupt, unittest.TestCase):
+ modname = '_pyio'
+
class TestTextIOSignalInterrupt(TestFileIOSignalInterrupt):
def _generate_infile_setup_code(self):
"""Returns the infile = ... line of code to make a TextIOWrapper."""
- return ('infile = open(sys.stdin.fileno(), "rt", newline=None) ;'
- 'import _io ;assert isinstance(infile, _io.TextIOWrapper)')
+ return ('import %s as io ;'
+ 'infile = io.open(sys.stdin.fileno(), "rt", newline=None) ;'
+ 'assert isinstance(infile, io.TextIOWrapper)' %
+ self.modname)
def test_readline(self):
"""readline() must handle signals and not lose data."""
@@ -224,6 +242,12 @@ class TestTextIOSignalInterrupt(TestFileIOSignalInterrupt):
read_method_name='read',
expected="hello\nworld!\n"))
+class CTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase):
+ modname = '_io'
+
+class PyTestTextIOSignalInterrupt(TestTextIOSignalInterrupt, unittest.TestCase):
+ modname = '_pyio'
+
def test_main():
test_cases = [
diff --git a/Lib/test/test_fileio.py b/Lib/test/test_fileio.py
index 743ca5c..35b30a6 100644
--- a/Lib/test/test_fileio.py
+++ b/Lib/test/test_fileio.py
@@ -12,13 +12,15 @@ from functools import wraps
from test.support import TESTFN, check_warnings, run_unittest, make_bad_fd, cpython_only
from collections import UserList
-from _io import FileIO as _FileIO
+import _io # C implementation of io
+import _pyio # Python implementation of io
-class AutoFileTests(unittest.TestCase):
+
+class AutoFileTests:
# file tests for which a test file is automatically set up
def setUp(self):
- self.f = _FileIO(TESTFN, 'w')
+ self.f = self.FileIO(TESTFN, 'w')
def tearDown(self):
if self.f:
@@ -69,20 +71,60 @@ class AutoFileTests(unittest.TestCase):
blksize = getattr(fst, 'st_blksize', blksize)
self.assertEqual(self.f._blksize, blksize)
- def testReadinto(self):
- # verify readinto
- self.f.write(bytes([1, 2]))
+ # verify readinto
+ def testReadintoByteArray(self):
+ self.f.write(bytes([1, 2, 0, 255]))
self.f.close()
- a = array('b', b'x'*10)
- self.f = _FileIO(TESTFN, 'r')
- n = self.f.readinto(a)
- self.assertEqual(array('b', [1, 2]), a[:n])
+
+ ba = bytearray(b'abcdefgh')
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(ba)
+ self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
+ self.assertEqual(n, 4)
+
+ def _testReadintoMemoryview(self):
+ self.f.write(bytes([1, 2, 0, 255]))
+ self.f.close()
+
+ m = memoryview(bytearray(b'abcdefgh'))
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(m)
+ self.assertEqual(m, b'\x01\x02\x00\xffefgh')
+ self.assertEqual(n, 4)
+
+ m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(m)
+ self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
+ self.assertEqual(n, 4)
+
+ def _testReadintoArray(self):
+ self.f.write(bytes([1, 2, 0, 255]))
+ self.f.close()
+
+ a = array('B', b'abcdefgh')
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(a)
+ self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
+ self.assertEqual(n, 4)
+
+ a = array('b', b'abcdefgh')
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(a)
+ self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
+ self.assertEqual(n, 4)
+
+ a = array('I', b'abcdefgh')
+ with self.FileIO(TESTFN, 'r') as f:
+ n = f.readinto(a)
+ self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
+ self.assertEqual(n, 4)
def testWritelinesList(self):
l = [b'123', b'456']
self.f.writelines(l)
self.f.close()
- self.f = _FileIO(TESTFN, 'rb')
+ self.f = self.FileIO(TESTFN, 'rb')
buf = self.f.read()
self.assertEqual(buf, b'123456')
@@ -90,7 +132,7 @@ class AutoFileTests(unittest.TestCase):
l = UserList([b'123', b'456'])
self.f.writelines(l)
self.f.close()
- self.f = _FileIO(TESTFN, 'rb')
+ self.f = self.FileIO(TESTFN, 'rb')
buf = self.f.read()
self.assertEqual(buf, b'123456')
@@ -102,7 +144,7 @@ class AutoFileTests(unittest.TestCase):
def test_none_args(self):
self.f.write(b"hi\nbye\nabc")
self.f.close()
- self.f = _FileIO(TESTFN, 'r')
+ self.f = self.FileIO(TESTFN, 'r')
self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
self.f.seek(0)
self.assertEqual(self.f.readline(None), b"hi\n")
@@ -112,23 +154,24 @@ class AutoFileTests(unittest.TestCase):
self.assertRaises(TypeError, self.f.write, "Hello!")
def testRepr(self):
- self.assertEqual(
- repr(self.f), "<_io.FileIO name=%r mode=%r closefd=True>"
- % (self.f.name, self.f.mode))
+ self.assertEqual(repr(self.f),
+ "<%s.FileIO name=%r mode=%r closefd=True>" %
+ (self.modulename, self.f.name, self.f.mode))
del self.f.name
- self.assertEqual(
- repr(self.f), "<_io.FileIO fd=%r mode=%r closefd=True>"
- % (self.f.fileno(), self.f.mode))
+ self.assertEqual(repr(self.f),
+ "<%s.FileIO fd=%r mode=%r closefd=True>" %
+ (self.modulename, self.f.fileno(), self.f.mode))
self.f.close()
- self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
+ self.assertEqual(repr(self.f),
+ "<%s.FileIO [closed]>" % (self.modulename,))
def testReprNoCloseFD(self):
fd = os.open(TESTFN, os.O_RDONLY)
try:
- with _FileIO(fd, 'r', closefd=False) as f:
+ with self.FileIO(fd, 'r', closefd=False) as f:
self.assertEqual(repr(f),
- "<_io.FileIO name=%r mode=%r closefd=False>"
- % (f.name, f.mode))
+ "<%s.FileIO name=%r mode=%r closefd=False>" %
+ (self.modulename, f.name, f.mode))
finally:
os.close(fd)
@@ -140,15 +183,15 @@ class AutoFileTests(unittest.TestCase):
self.assertRaises(ValueError, f.read, 10) # Open for reading
f.close()
self.assertTrue(f.closed)
- f = _FileIO(TESTFN, 'r')
+ f = self.FileIO(TESTFN, 'r')
self.assertRaises(TypeError, f.readinto, "")
self.assertTrue(not f.closed)
f.close()
self.assertTrue(f.closed)
def testMethods(self):
- methods = ['fileno', 'isatty', 'read', 'readinto',
- 'seek', 'tell', 'truncate', 'write', 'seekable',
+ methods = ['fileno', 'isatty', 'read',
+ 'tell', 'truncate', 'seekable',
'readable', 'writable']
self.f.close()
@@ -158,13 +201,16 @@ class AutoFileTests(unittest.TestCase):
method = getattr(self.f, methodname)
# should raise on closed file
self.assertRaises(ValueError, method)
+ self.assertRaises(ValueError, self.f.readinto, bytearray())
+ self.assertRaises(ValueError, self.f.seek, 0, os.SEEK_CUR)
+ self.assertRaises(ValueError, self.f.write, b'')
def testOpendir(self):
# Issue 3703: opening a directory should fill the errno
# Windows always returns "[Errno 13]: Permission denied
# Unix uses fstat and returns "[Errno 21]: Is a directory"
try:
- _FileIO('.', 'r')
+ self.FileIO('.', 'r')
except OSError as e:
self.assertNotEqual(e.errno, 0)
self.assertEqual(e.filename, ".")
@@ -175,7 +221,7 @@ class AutoFileTests(unittest.TestCase):
def testOpenDirFD(self):
fd = os.open('.', os.O_RDONLY)
with self.assertRaises(OSError) as cm:
- _FileIO(fd, 'r')
+ self.FileIO(fd, 'r')
os.close(fd)
self.assertEqual(cm.exception.errno, errno.EISDIR)
@@ -260,7 +306,7 @@ class AutoFileTests(unittest.TestCase):
self.f.close()
except OSError:
pass
- self.f = _FileIO(TESTFN, 'r')
+ self.f = self.FileIO(TESTFN, 'r')
os.close(self.f.fileno())
return self.f
@@ -280,23 +326,32 @@ class AutoFileTests(unittest.TestCase):
a = array('b', b'x'*10)
f.readinto(a)
-class OtherFileTests(unittest.TestCase):
+class CAutoFileTests(AutoFileTests, unittest.TestCase):
+ FileIO = _io.FileIO
+ modulename = '_io'
+
+class PyAutoFileTests(AutoFileTests, unittest.TestCase):
+ FileIO = _pyio.FileIO
+ modulename = '_pyio'
+
+
+class OtherFileTests:
def testAbles(self):
try:
- f = _FileIO(TESTFN, "w")
+ f = self.FileIO(TESTFN, "w")
self.assertEqual(f.readable(), False)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
f.close()
- f = _FileIO(TESTFN, "r")
+ f = self.FileIO(TESTFN, "r")
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), False)
self.assertEqual(f.seekable(), True)
f.close()
- f = _FileIO(TESTFN, "a+")
+ f = self.FileIO(TESTFN, "a+")
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
@@ -305,7 +360,7 @@ class OtherFileTests(unittest.TestCase):
if sys.platform != "win32":
try:
- f = _FileIO("/dev/tty", "a")
+ f = self.FileIO("/dev/tty", "a")
except OSError:
# When run in a cron job there just aren't any
# ttys, so skip the test. This also handles other
@@ -328,7 +383,7 @@ class OtherFileTests(unittest.TestCase):
# check invalid mode strings
for mode in ("", "aU", "wU+", "rw", "rt"):
try:
- f = _FileIO(TESTFN, mode)
+ f = self.FileIO(TESTFN, mode)
except ValueError:
pass
else:
@@ -344,7 +399,7 @@ class OtherFileTests(unittest.TestCase):
('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
# read modes are last so that TESTFN will exist first
- with _FileIO(TESTFN, modes[0]) as f:
+ with self.FileIO(TESTFN, modes[0]) as f:
self.assertEqual(f.mode, modes[1])
finally:
if os.path.exists(TESTFN):
@@ -352,7 +407,7 @@ class OtherFileTests(unittest.TestCase):
def testUnicodeOpen(self):
# verify repr works for unicode too
- f = _FileIO(str(TESTFN), "w")
+ f = self.FileIO(str(TESTFN), "w")
f.close()
os.unlink(TESTFN)
@@ -362,7 +417,7 @@ class OtherFileTests(unittest.TestCase):
fn = TESTFN.encode("ascii")
except UnicodeEncodeError:
self.skipTest('could not encode %r to ascii' % TESTFN)
- f = _FileIO(fn, "w")
+ f = self.FileIO(fn, "w")
try:
f.write(b"abc")
f.close()
@@ -373,28 +428,21 @@ class OtherFileTests(unittest.TestCase):
def testConstructorHandlesNULChars(self):
fn_with_NUL = 'foo\0bar'
- self.assertRaises(ValueError, _FileIO, fn_with_NUL, 'w')
- self.assertRaises(ValueError, _FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
+ self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
+ self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
def testInvalidFd(self):
- self.assertRaises(ValueError, _FileIO, -10)
- self.assertRaises(OSError, _FileIO, make_bad_fd())
+ self.assertRaises(ValueError, self.FileIO, -10)
+ self.assertRaises(OSError, self.FileIO, make_bad_fd())
if sys.platform == 'win32':
import msvcrt
self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
- @cpython_only
- def testInvalidFd_overflow(self):
- # Issue 15989
- import _testcapi
- self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
- self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
-
def testBadModeArgument(self):
# verify that we get a sensible error message for bad mode argument
bad_mode = "qwerty"
try:
- f = _FileIO(TESTFN, bad_mode)
+ f = self.FileIO(TESTFN, bad_mode)
except ValueError as msg:
if msg.args[0] != 0:
s = str(msg)
@@ -407,7 +455,7 @@ class OtherFileTests(unittest.TestCase):
self.fail("no error for invalid mode: %s" % bad_mode)
def testTruncate(self):
- f = _FileIO(TESTFN, 'w')
+ f = self.FileIO(TESTFN, 'w')
f.write(bytes(bytearray(range(10))))
self.assertEqual(f.tell(), 10)
f.truncate(5)
@@ -422,11 +470,11 @@ class OtherFileTests(unittest.TestCase):
def bug801631():
# SF bug <http://www.python.org/sf/801631>
# "file.truncate fault on windows"
- f = _FileIO(TESTFN, 'w')
+ f = self.FileIO(TESTFN, 'w')
f.write(bytes(range(11)))
f.close()
- f = _FileIO(TESTFN,'r+')
+ f = self.FileIO(TESTFN,'r+')
data = f.read(5)
if data != bytes(range(5)):
self.fail("Read on file opened for update failed %r" % data)
@@ -466,19 +514,19 @@ class OtherFileTests(unittest.TestCase):
pass
def testInvalidInit(self):
- self.assertRaises(TypeError, _FileIO, "1", 0, 0)
+ self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
def testWarnings(self):
with check_warnings(quiet=True) as w:
self.assertEqual(w.warnings, [])
- self.assertRaises(TypeError, _FileIO, [])
+ self.assertRaises(TypeError, self.FileIO, [])
self.assertEqual(w.warnings, [])
- self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
+ self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
self.assertEqual(w.warnings, [])
def testUnclosedFDOnException(self):
class MyException(Exception): pass
- class MyFileIO(_FileIO):
+ class MyFileIO(self.FileIO):
def __setattr__(self, name, value):
if name == "name":
raise MyException("blocked setting name")
@@ -487,12 +535,28 @@ class OtherFileTests(unittest.TestCase):
self.assertRaises(MyException, MyFileIO, fd)
os.close(fd) # should not raise OSError(EBADF)
+class COtherFileTests(OtherFileTests, unittest.TestCase):
+ FileIO = _io.FileIO
+ modulename = '_io'
+
+ @cpython_only
+ def testInvalidFd_overflow(self):
+ # Issue 15989
+ import _testcapi
+ self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
+ self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
+
+class PyOtherFileTests(OtherFileTests, unittest.TestCase):
+ FileIO = _pyio.FileIO
+ modulename = '_pyio'
+
def test_main():
# Historically, these tests have been sloppy about removing TESTFN.
# So get rid of it no matter what.
try:
- run_unittest(AutoFileTests, OtherFileTests)
+ run_unittest(CAutoFileTests, PyAutoFileTests,
+ COtherFileTests, PyOtherFileTests)
finally:
if os.path.exists(TESTFN):
os.unlink(TESTFN)
diff --git a/Misc/NEWS b/Misc/NEWS
index 7f91349..cee7db8 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -19,6 +19,8 @@ Core and Builtins
Library
-------
+- Issue #21859: Added Python implementation of io.FileIO.
+
- Issue #23865: close() methods in multiple modules now are idempotent and more
robust at shutdown. If needs to release multiple resources, they are released
even if errors are occured.