summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py298
1 files changed, 240 insertions, 58 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 4b49d33..a424f76 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -35,6 +35,7 @@ import weakref
from collections import deque, UserList
from itertools import cycle, count
from test import support
+from test.script_helper import assert_python_ok
import codecs
import io # C implementation of io
@@ -164,7 +165,7 @@ class CloseFailureIO(MockRawIO):
def close(self):
if not self.closed:
self.closed = 1
- raise IOError
+ raise OSError
class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
pass
@@ -595,9 +596,9 @@ class IOTest(unittest.TestCase):
def test_flush_error_on_close(self):
f = self.open(support.TESTFN, "wb", buffering=0)
def bad_flush():
- raise IOError()
+ raise OSError()
f.flush = bad_flush
- self.assertRaises(IOError, f.close) # exception not swallowed
+ self.assertRaises(OSError, f.close) # exception not swallowed
self.assertTrue(f.closed)
def test_multi_close(self):
@@ -652,6 +653,20 @@ class IOTest(unittest.TestCase):
fileio.close()
f2.readline()
+ def test_nonbuffered_textio(self):
+ with warnings.catch_warnings(record=True) as recorded:
+ with self.assertRaises(ValueError):
+ self.open(support.TESTFN, 'w', buffering=0)
+ support.gc_collect()
+ self.assertEqual(recorded, [])
+
+ def test_invalid_newline(self):
+ with warnings.catch_warnings(record=True) as recorded:
+ with self.assertRaises(ValueError):
+ self.open(support.TESTFN, 'w', newline='invalid')
+ support.gc_collect()
+ self.assertEqual(recorded, [])
+
class CIOTest(IOTest):
@@ -686,6 +701,8 @@ class CommonBufferedTests:
self.assertIs(buf.detach(), raw)
self.assertRaises(ValueError, buf.detach)
+ repr(buf) # Should still work
+
def test_fileno(self):
rawio = self.MockRawIO()
bufio = self.tp(rawio)
@@ -757,7 +774,7 @@ class CommonBufferedTests:
if s:
# The destructor *may* have printed an unraisable error, check it
self.assertEqual(len(s.splitlines()), 1)
- self.assertTrue(s.startswith("Exception IOError: "), s)
+ self.assertTrue(s.startswith("Exception OSError: "), s)
self.assertTrue(s.endswith(" ignored"), s)
def test_repr(self):
@@ -773,27 +790,45 @@ class CommonBufferedTests:
def test_flush_error_on_close(self):
raw = self.MockRawIO()
def bad_flush():
- raise IOError()
+ raise OSError()
raw.flush = bad_flush
b = self.tp(raw)
- self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertRaises(OSError, b.close) # exception not swallowed
self.assertTrue(b.closed)
def test_close_error_on_close(self):
raw = self.MockRawIO()
def bad_flush():
- raise IOError('flush')
+ raise OSError('flush')
def bad_close():
- raise IOError('close')
+ raise OSError('close')
raw.close = bad_close
b = self.tp(raw)
b.flush = bad_flush
- with self.assertRaises(IOError) as err: # exception not swallowed
+ with self.assertRaises(OSError) as err: # exception not swallowed
b.close()
self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
self.assertEqual(err.exception.__context__.args, ('flush',))
self.assertFalse(b.closed)
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ raw = self.MockRawIO()
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ raw.close = bad_close
+ b = self.tp(raw)
+ b.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ b.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(b.closed)
+
def test_multi_close(self):
raw = self.MockRawIO()
b = self.tp(raw)
@@ -828,6 +863,14 @@ class SizeofTest:
bufio = self.tp(rawio, buffer_size=bufsize2)
self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+ @support.cpython_only
+ def test_buffer_freeing(self) :
+ bufsize = 4096
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize)
+ size = sys.getsizeof(bufio) - bufsize
+ bufio.close()
+ self.assertEqual(sys.getsizeof(bufio), size)
class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
read_mode = "rb"
@@ -1012,8 +1055,8 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
def test_misbehaved_io(self):
rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
bufio = self.tp(rawio)
- self.assertRaises(IOError, bufio.seek, 0)
- self.assertRaises(IOError, bufio.tell)
+ self.assertRaises(OSError, bufio.seek, 0)
+ self.assertRaises(OSError, bufio.tell)
def test_no_extraneous_read(self):
# Issue #9550; when the raw IO object has satisfied the read request,
@@ -1064,17 +1107,18 @@ class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
bufio = self.tp(rawio)
# _pyio.BufferedReader seems to implement reading different, so that
# checking this is not so easy.
- self.assertRaises(IOError, bufio.read, 10)
+ self.assertRaises(OSError, bufio.read, 10)
def test_garbage_collection(self):
# C BufferedReader objects are collected.
# The Python version has __del__, so it ends into gc.garbage instead
- rawio = self.FileIO(support.TESTFN, "w+b")
- f = self.tp(rawio)
- f.f = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = self.FileIO(support.TESTFN, "w+b")
+ f = self.tp(rawio)
+ f.f = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
def test_args_error(self):
@@ -1327,9 +1371,9 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
def test_misbehaved_io(self):
rawio = self.MisbehavedRawIO()
bufio = self.tp(rawio, 5)
- self.assertRaises(IOError, bufio.seek, 0)
- self.assertRaises(IOError, bufio.tell)
- self.assertRaises(IOError, bufio.write, b"abcdef")
+ self.assertRaises(OSError, bufio.seek, 0)
+ self.assertRaises(OSError, bufio.tell)
+ self.assertRaises(OSError, bufio.write, b"abcdef")
def test_max_buffer_size_removal(self):
with self.assertRaises(TypeError):
@@ -1338,11 +1382,11 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
def test_write_error_on_close(self):
raw = self.MockRawIO()
def bad_write(b):
- raise IOError()
+ raise OSError()
raw.write = bad_write
b = self.tp(raw)
b.write(b'spam')
- self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertRaises(OSError, b.close) # exception not swallowed
self.assertTrue(b.closed)
@@ -1373,13 +1417,14 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
# C BufferedWriter objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends into gc.garbage instead
- rawio = self.FileIO(support.TESTFN, "w+b")
- f = self.tp(rawio)
- f.write(b"123xxx")
- f.x = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = self.FileIO(support.TESTFN, "w+b")
+ f = self.tp(rawio)
+ f.write(b"123xxx")
+ f.x = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"123xxx")
@@ -1426,14 +1471,14 @@ class BufferedRWPairTest(unittest.TestCase):
def readable(self):
return False
- self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
+ self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
def test_constructor_with_not_writeable(self):
class NotWriteable(MockRawIO):
def writable(self):
return False
- self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
+ self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
def test_read(self):
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
@@ -1968,7 +2013,7 @@ class TextIOWrapperTest(unittest.TestCase):
r = self.BytesIO()
b = self.BufferedWriter(r)
with self.assertRaisesRegex(LookupError, "is not a text encoding"):
- self.TextIOWrapper(b, encoding="hex_codec")
+ self.TextIOWrapper(b, encoding="hex")
def test_detach(self):
r = self.BytesIO()
@@ -1983,6 +2028,12 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(r.getvalue(), b"howdy")
self.assertRaises(ValueError, t.detach)
+ # Operations independent of the detached stream should still work
+ repr(t)
+ self.assertEqual(t.encoding, "ascii")
+ self.assertEqual(t.errors, "strict")
+ self.assertFalse(t.line_buffering)
+
def test_repr(self):
raw = self.BytesIO("hello".encode("utf-8"))
b = self.BufferedReader(raw)
@@ -2000,6 +2051,9 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(repr(t),
"<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
+ t.buffer.detach()
+ repr(t) # Should not raise an exception
+
def test_line_buffering(self):
r = self.BytesIO()
b = self.BufferedWriter(r, 1000)
@@ -2215,7 +2269,7 @@ class TextIOWrapperTest(unittest.TestCase):
if s:
# The destructor *may* have printed an unraisable error, check it
self.assertEqual(len(s.splitlines()), 1)
- self.assertTrue(s.startswith("Exception IOError: "), s)
+ self.assertTrue(s.startswith("Exception OSError: "), s)
self.assertTrue(s.endswith(" ignored"), s)
# Systematic tests of the text I/O API
@@ -2287,7 +2341,7 @@ class TextIOWrapperTest(unittest.TestCase):
f.seek(0)
for line in f:
self.assertEqual(line, "\xff\n")
- self.assertRaises(IOError, f.tell)
+ self.assertRaises(OSError, f.tell)
self.assertEqual(f.tell(), p2)
f.close()
@@ -2391,7 +2445,7 @@ class TextIOWrapperTest(unittest.TestCase):
def readable(self):
return False
txt = self.TextIOWrapper(UnReadable())
- self.assertRaises(IOError, txt.read)
+ self.assertRaises(OSError, txt.read)
def test_read_one_by_one(self):
txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
@@ -2566,11 +2620,44 @@ class TextIOWrapperTest(unittest.TestCase):
def test_flush_error_on_close(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
def bad_flush():
- raise IOError()
+ raise OSError()
txt.flush = bad_flush
- self.assertRaises(IOError, txt.close) # exception not swallowed
+ self.assertRaises(OSError, txt.close) # exception not swallowed
self.assertTrue(txt.closed)
+ def test_close_error_on_close(self):
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise OSError('flush')
+ def bad_close():
+ raise OSError('close')
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(OSError) as err: # exception not swallowed
+ txt.close()
+ self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
+ self.assertEqual(err.exception.__context__.args, ('flush',))
+ self.assertFalse(txt.closed)
+
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ txt.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(txt.closed)
+
def test_multi_close(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
txt.close()
@@ -2610,6 +2697,38 @@ class TextIOWrapperTest(unittest.TestCase):
txt.write('5')
self.assertEqual(b''.join(raw._write_stack), b'123\n45')
+ def test_bufio_write_through(self):
+ # Issue #21396: write_through=True doesn't force a flush()
+ # on the underlying binary buffered object.
+ flush_called, write_called = [], []
+ class BufferedWriter(self.BufferedWriter):
+ def flush(self, *args, **kwargs):
+ flush_called.append(True)
+ return super().flush(*args, **kwargs)
+ def write(self, *args, **kwargs):
+ write_called.append(True)
+ return super().write(*args, **kwargs)
+
+ rawio = self.BytesIO()
+ data = b"a"
+ bufio = BufferedWriter(rawio, len(data)*2)
+ textio = self.TextIOWrapper(bufio, encoding='ascii',
+ write_through=True)
+ # write to the buffered io but don't overflow the buffer
+ text = data.decode('ascii')
+ textio.write(text)
+
+ # buffer.flush is not called with write_through=True
+ self.assertFalse(flush_called)
+ # buffer.write *is* called with write_through=True
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), b"") # no flush
+
+ write_called = [] # reset
+ textio.write(text * 10) # total content is larger than bufio buffer
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), data * 11) # all flushed
+
def test_read_nonbytes(self):
# Issue #17106
# Crash when underlying read() returns non-bytes
@@ -2624,11 +2743,11 @@ class TextIOWrapperTest(unittest.TestCase):
# Issue #17106
# Bypass the early encoding check added in issue 20404
def _make_illegal_wrapper():
- quopri = codecs.lookup("quopri_codec")
+ quopri = codecs.lookup("quopri")
quopri._is_text_encoding = True
try:
t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
- newline='\n', encoding="quopri_codec")
+ newline='\n', encoding="quopri")
finally:
quopri._is_text_encoding = False
return t
@@ -2640,8 +2759,61 @@ class TextIOWrapperTest(unittest.TestCase):
t = _make_illegal_wrapper()
self.assertRaises(TypeError, t.read)
+ def _check_create_at_shutdown(self, **kwargs):
+ # Issue #20037: creating a TextIOWrapper at shutdown
+ # shouldn't crash the interpreter.
+ iomod = self.io.__name__
+ code = """if 1:
+ import codecs
+ import {iomod} as io
+
+ # Avoid looking up codecs at shutdown
+ codecs.lookup('utf-8')
+
+ class C:
+ def __init__(self):
+ self.buf = io.BytesIO()
+ def __del__(self):
+ io.TextIOWrapper(self.buf, **{kwargs})
+ print("ok")
+ c = C()
+ """.format(iomod=iomod, kwargs=kwargs)
+ return assert_python_ok("-c", code)
+
+ def test_create_at_shutdown_without_encoding(self):
+ rc, out, err = self._check_create_at_shutdown()
+ if err:
+ # Can error out with a RuntimeError if the module state
+ # isn't found.
+ self.assertIn(self.shutdown_error, err.decode())
+ else:
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_create_at_shutdown_with_encoding(self):
+ rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
+ errors='strict')
+ self.assertFalse(err)
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_issue22849(self):
+ class F(object):
+ def readable(self): return True
+ def writable(self): return True
+ def seekable(self): return True
+
+ for i in range(10):
+ try:
+ self.TextIOWrapper(F(), encoding='utf-8')
+ except Exception:
+ pass
+
+ F.tell = lambda x: 0
+ t = self.TextIOWrapper(F(), encoding='utf-8')
+
class CTextIOWrapperTest(TextIOWrapperTest):
+ io = io
+ shutdown_error = "RuntimeError: could not find io module state"
def test_initialization(self):
r = self.BytesIO(b"\xc3\xa9\n\n")
@@ -2652,18 +2824,22 @@ class CTextIOWrapperTest(TextIOWrapperTest):
self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
self.assertRaises(ValueError, t.read)
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ self.assertRaises(Exception, repr, t)
+
def test_garbage_collection(self):
# C TextIOWrapper objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends in gc.garbage instead.
- rawio = io.FileIO(support.TESTFN, "wb")
- b = self.BufferedWriter(rawio)
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("456def")
- t.x = t
- wr = weakref.ref(t)
- del t
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = io.FileIO(support.TESTFN, "wb")
+ b = self.BufferedWriter(rawio)
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("456def")
+ t.x = t
+ wr = weakref.ref(t)
+ del t
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"456def")
@@ -2684,7 +2860,9 @@ class CTextIOWrapperTest(TextIOWrapperTest):
class PyTextIOWrapperTest(TextIOWrapperTest):
- pass
+ io = pyio
+ #shutdown_error = "LookupError: unknown encoding: ascii"
+ shutdown_error = "TypeError: 'NoneType' object is not iterable"
class IncrementalNewlineDecoderTest(unittest.TestCase):
@@ -2823,7 +3001,8 @@ class MiscIOTest(unittest.TestCase):
self.assertEqual(f.mode, "wb")
f.close()
- f = self.open(support.TESTFN, "U")
+ with support.check_warnings(('', DeprecationWarning)):
+ f = self.open(support.TESTFN, "U")
self.assertEqual(f.name, support.TESTFN)
self.assertEqual(f.buffer.name, support.TESTFN)
self.assertEqual(f.buffer.raw.name, support.TESTFN)
@@ -2953,7 +3132,7 @@ class MiscIOTest(unittest.TestCase):
for fd in fds:
try:
os.close(fd)
- except EnvironmentError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise
self.addCleanup(cleanup_fds)
@@ -3075,7 +3254,6 @@ class MiscIOTest(unittest.TestCase):
class CMiscIOTest(MiscIOTest):
io = io
- shutdown_error = "RuntimeError: could not find io module state"
def test_readinto_buffer_overflow(self):
# Issue #18025
@@ -3088,7 +3266,6 @@ class CMiscIOTest(MiscIOTest):
class PyMiscIOTest(MiscIOTest):
io = pyio
- shutdown_error = "LookupError: unknown encoding: ascii"
@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
@@ -3121,15 +3298,18 @@ class SignalsTest(unittest.TestCase):
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
- signal.alarm(1)
# Fill the pipe enough that the write will be blocking.
# It will be interrupted by the timer armed above. Since the
# other thread has read one byte, the low-level write will
# return with a successful (partial) result rather than an EINTR.
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
- self.assertRaises(ZeroDivisionError,
- wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
+ signal.alarm(1)
+ try:
+ self.assertRaises(ZeroDivisionError,
+ wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
+ finally:
+ signal.alarm(0)
t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
@@ -3143,7 +3323,7 @@ class SignalsTest(unittest.TestCase):
# buffer, and block again.
try:
wio.close()
- except IOError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise
@@ -3153,6 +3333,8 @@ class SignalsTest(unittest.TestCase):
def test_interrupted_write_buffered(self):
self.check_interrupted_write(b"xy", b"xy", mode="wb")
+ # Issue #22331: The test hangs on FreeBSD 7.2
+ @support.requires_freebsd_version(8)
def test_interrupted_write_text(self):
self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
@@ -3271,7 +3453,7 @@ class SignalsTest(unittest.TestCase):
# buffer, and could block (in case of failure).
try:
wio.close()
- except IOError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise