summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py267
1 files changed, 209 insertions, 58 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 2a96b7b..1cf97dd 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -35,6 +35,7 @@ import weakref
from collections import deque, UserList
from itertools import cycle, count
from test import support
+from test.script_helper import assert_python_ok
import codecs
import io # C implementation of io
@@ -164,7 +165,7 @@ class CloseFailureIO(MockRawIO):
def close(self):
if not self.closed:
self.closed = 1
- raise IOError
+ raise OSError
class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
pass
@@ -595,9 +596,9 @@ class IOTest(unittest.TestCase):
def test_flush_error_on_close(self):
f = self.open(support.TESTFN, "wb", buffering=0)
def bad_flush():
- raise IOError()
+ raise OSError()
f.flush = bad_flush
- self.assertRaises(IOError, f.close) # exception not swallowed
+ self.assertRaises(OSError, f.close) # exception not swallowed
self.assertTrue(f.closed)
def test_multi_close(self):
@@ -652,6 +653,20 @@ class IOTest(unittest.TestCase):
fileio.close()
f2.readline()
+ def test_nonbuffered_textio(self):
+ with warnings.catch_warnings(record=True) as recorded:
+ with self.assertRaises(ValueError):
+ self.open(support.TESTFN, 'w', buffering=0)
+ support.gc_collect()
+ self.assertEqual(recorded, [])
+
+ def test_invalid_newline(self):
+ with warnings.catch_warnings(record=True) as recorded:
+ with self.assertRaises(ValueError):
+ self.open(support.TESTFN, 'w', newline='invalid')
+ support.gc_collect()
+ self.assertEqual(recorded, [])
+
class CIOTest(IOTest):
@@ -757,7 +772,7 @@ class CommonBufferedTests:
if s:
# The destructor *may* have printed an unraisable error, check it
self.assertEqual(len(s.splitlines()), 1)
- self.assertTrue(s.startswith("Exception IOError: "), s)
+ self.assertTrue(s.startswith("Exception OSError: "), s)
self.assertTrue(s.endswith(" ignored"), s)
def test_repr(self):
@@ -773,27 +788,45 @@ class CommonBufferedTests:
def test_flush_error_on_close(self):
raw = self.MockRawIO()
def bad_flush():
- raise IOError()
+ raise OSError()
raw.flush = bad_flush
b = self.tp(raw)
- self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertRaises(OSError, b.close) # exception not swallowed
self.assertTrue(b.closed)
def test_close_error_on_close(self):
raw = self.MockRawIO()
def bad_flush():
- raise IOError('flush')
+ raise OSError('flush')
def bad_close():
- raise IOError('close')
+ raise OSError('close')
raw.close = bad_close
b = self.tp(raw)
b.flush = bad_flush
- with self.assertRaises(IOError) as err: # exception not swallowed
+ with self.assertRaises(OSError) as err: # exception not swallowed
b.close()
self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
self.assertEqual(err.exception.__context__.args, ('flush',))
self.assertFalse(b.closed)
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ raw = self.MockRawIO()
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ raw.close = bad_close
+ b = self.tp(raw)
+ b.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ b.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(b.closed)
+
def test_multi_close(self):
raw = self.MockRawIO()
b = self.tp(raw)
@@ -828,6 +861,14 @@ class SizeofTest:
bufio = self.tp(rawio, buffer_size=bufsize2)
self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+ @support.cpython_only
+ def test_buffer_freeing(self) :
+ bufsize = 4096
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize)
+ size = sys.getsizeof(bufio) - bufsize
+ bufio.close()
+ self.assertEqual(sys.getsizeof(bufio), size)
class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
read_mode = "rb"
@@ -1012,8 +1053,8 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
def test_misbehaved_io(self):
rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
bufio = self.tp(rawio)
- self.assertRaises(IOError, bufio.seek, 0)
- self.assertRaises(IOError, bufio.tell)
+ self.assertRaises(OSError, bufio.seek, 0)
+ self.assertRaises(OSError, bufio.tell)
def test_no_extraneous_read(self):
# Issue #9550; when the raw IO object has satisfied the read request,
@@ -1064,17 +1105,18 @@ class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
bufio = self.tp(rawio)
# _pyio.BufferedReader seems to implement reading different, so that
# checking this is not so easy.
- self.assertRaises(IOError, bufio.read, 10)
+ self.assertRaises(OSError, bufio.read, 10)
def test_garbage_collection(self):
# C BufferedReader objects are collected.
# The Python version has __del__, so it ends into gc.garbage instead
- rawio = self.FileIO(support.TESTFN, "w+b")
- f = self.tp(rawio)
- f.f = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = self.FileIO(support.TESTFN, "w+b")
+ f = self.tp(rawio)
+ f.f = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
def test_args_error(self):
@@ -1327,9 +1369,9 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
def test_misbehaved_io(self):
rawio = self.MisbehavedRawIO()
bufio = self.tp(rawio, 5)
- self.assertRaises(IOError, bufio.seek, 0)
- self.assertRaises(IOError, bufio.tell)
- self.assertRaises(IOError, bufio.write, b"abcdef")
+ self.assertRaises(OSError, bufio.seek, 0)
+ self.assertRaises(OSError, bufio.tell)
+ self.assertRaises(OSError, bufio.write, b"abcdef")
def test_max_buffer_size_removal(self):
with self.assertRaises(TypeError):
@@ -1338,11 +1380,11 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
def test_write_error_on_close(self):
raw = self.MockRawIO()
def bad_write(b):
- raise IOError()
+ raise OSError()
raw.write = bad_write
b = self.tp(raw)
b.write(b'spam')
- self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertRaises(OSError, b.close) # exception not swallowed
self.assertTrue(b.closed)
@@ -1373,13 +1415,14 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
# C BufferedWriter objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends into gc.garbage instead
- rawio = self.FileIO(support.TESTFN, "w+b")
- f = self.tp(rawio)
- f.write(b"123xxx")
- f.x = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = self.FileIO(support.TESTFN, "w+b")
+ f = self.tp(rawio)
+ f.write(b"123xxx")
+ f.x = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"123xxx")
@@ -1426,14 +1469,14 @@ class BufferedRWPairTest(unittest.TestCase):
def readable(self):
return False
- self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
+ self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
def test_constructor_with_not_writeable(self):
class NotWriteable(MockRawIO):
def writable(self):
return False
- self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
+ self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
def test_read(self):
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
@@ -1962,7 +2005,7 @@ class TextIOWrapperTest(unittest.TestCase):
r = self.BytesIO()
b = self.BufferedWriter(r)
with self.assertRaisesRegex(LookupError, "is not a text encoding"):
- self.TextIOWrapper(b, encoding="hex_codec")
+ self.TextIOWrapper(b, encoding="hex")
def test_detach(self):
r = self.BytesIO()
@@ -2209,7 +2252,7 @@ class TextIOWrapperTest(unittest.TestCase):
if s:
# The destructor *may* have printed an unraisable error, check it
self.assertEqual(len(s.splitlines()), 1)
- self.assertTrue(s.startswith("Exception IOError: "), s)
+ self.assertTrue(s.startswith("Exception OSError: "), s)
self.assertTrue(s.endswith(" ignored"), s)
# Systematic tests of the text I/O API
@@ -2281,7 +2324,7 @@ class TextIOWrapperTest(unittest.TestCase):
f.seek(0)
for line in f:
self.assertEqual(line, "\xff\n")
- self.assertRaises(IOError, f.tell)
+ self.assertRaises(OSError, f.tell)
self.assertEqual(f.tell(), p2)
f.close()
@@ -2385,7 +2428,7 @@ class TextIOWrapperTest(unittest.TestCase):
def readable(self):
return False
txt = self.TextIOWrapper(UnReadable())
- self.assertRaises(IOError, txt.read)
+ self.assertRaises(OSError, txt.read)
def test_read_one_by_one(self):
txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
@@ -2560,11 +2603,44 @@ class TextIOWrapperTest(unittest.TestCase):
def test_flush_error_on_close(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
def bad_flush():
- raise IOError()
+ raise OSError()
txt.flush = bad_flush
- self.assertRaises(IOError, txt.close) # exception not swallowed
+ self.assertRaises(OSError, txt.close) # exception not swallowed
self.assertTrue(txt.closed)
+ def test_close_error_on_close(self):
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise OSError('flush')
+ def bad_close():
+ raise OSError('close')
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(OSError) as err: # exception not swallowed
+ txt.close()
+ self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
+ self.assertEqual(err.exception.__context__.args, ('flush',))
+ self.assertFalse(txt.closed)
+
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ txt.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(txt.closed)
+
def test_multi_close(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
txt.close()
@@ -2604,6 +2680,38 @@ class TextIOWrapperTest(unittest.TestCase):
txt.write('5')
self.assertEqual(b''.join(raw._write_stack), b'123\n45')
+ def test_bufio_write_through(self):
+ # Issue #21396: write_through=True doesn't force a flush()
+ # on the underlying binary buffered object.
+ flush_called, write_called = [], []
+ class BufferedWriter(self.BufferedWriter):
+ def flush(self, *args, **kwargs):
+ flush_called.append(True)
+ return super().flush(*args, **kwargs)
+ def write(self, *args, **kwargs):
+ write_called.append(True)
+ return super().write(*args, **kwargs)
+
+ rawio = self.BytesIO()
+ data = b"a"
+ bufio = BufferedWriter(rawio, len(data)*2)
+ textio = self.TextIOWrapper(bufio, encoding='ascii',
+ write_through=True)
+ # write to the buffered io but don't overflow the buffer
+ text = data.decode('ascii')
+ textio.write(text)
+
+ # buffer.flush is not called with write_through=True
+ self.assertFalse(flush_called)
+ # buffer.write *is* called with write_through=True
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), b"") # no flush
+
+ write_called = [] # reset
+ textio.write(text * 10) # total content is larger than bufio buffer
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), data * 11) # all flushed
+
def test_read_nonbytes(self):
# Issue #17106
# Crash when underlying read() returns non-bytes
@@ -2618,11 +2726,11 @@ class TextIOWrapperTest(unittest.TestCase):
# Issue #17106
# Bypass the early encoding check added in issue 20404
def _make_illegal_wrapper():
- quopri = codecs.lookup("quopri_codec")
+ quopri = codecs.lookup("quopri")
quopri._is_text_encoding = True
try:
t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
- newline='\n', encoding="quopri_codec")
+ newline='\n', encoding="quopri")
finally:
quopri._is_text_encoding = False
return t
@@ -2634,8 +2742,46 @@ class TextIOWrapperTest(unittest.TestCase):
t = _make_illegal_wrapper()
self.assertRaises(TypeError, t.read)
+ def _check_create_at_shutdown(self, **kwargs):
+ # Issue #20037: creating a TextIOWrapper at shutdown
+ # shouldn't crash the interpreter.
+ iomod = self.io.__name__
+ code = """if 1:
+ import codecs
+ import {iomod} as io
+
+ # Avoid looking up codecs at shutdown
+ codecs.lookup('utf-8')
+
+ class C:
+ def __init__(self):
+ self.buf = io.BytesIO()
+ def __del__(self):
+ io.TextIOWrapper(self.buf, **{kwargs})
+ print("ok")
+ c = C()
+ """.format(iomod=iomod, kwargs=kwargs)
+ return assert_python_ok("-c", code)
+
+ def test_create_at_shutdown_without_encoding(self):
+ rc, out, err = self._check_create_at_shutdown()
+ if err:
+ # Can error out with a RuntimeError if the module state
+ # isn't found.
+ self.assertIn(self.shutdown_error, err.decode())
+ else:
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_create_at_shutdown_with_encoding(self):
+ rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
+ errors='strict')
+ self.assertFalse(err)
+ self.assertEqual("ok", out.decode().strip())
+
class CTextIOWrapperTest(TextIOWrapperTest):
+ io = io
+ shutdown_error = "RuntimeError: could not find io module state"
def test_initialization(self):
r = self.BytesIO(b"\xc3\xa9\n\n")
@@ -2650,14 +2796,15 @@ class CTextIOWrapperTest(TextIOWrapperTest):
# C TextIOWrapper objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends in gc.garbage instead.
- rawio = io.FileIO(support.TESTFN, "wb")
- b = self.BufferedWriter(rawio)
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("456def")
- t.x = t
- wr = weakref.ref(t)
- del t
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = io.FileIO(support.TESTFN, "wb")
+ b = self.BufferedWriter(rawio)
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("456def")
+ t.x = t
+ wr = weakref.ref(t)
+ del t
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"456def")
@@ -2678,7 +2825,9 @@ class CTextIOWrapperTest(TextIOWrapperTest):
class PyTextIOWrapperTest(TextIOWrapperTest):
- pass
+ io = pyio
+ #shutdown_error = "LookupError: unknown encoding: ascii"
+ shutdown_error = "TypeError: 'NoneType' object is not iterable"
class IncrementalNewlineDecoderTest(unittest.TestCase):
@@ -2817,7 +2966,8 @@ class MiscIOTest(unittest.TestCase):
self.assertEqual(f.mode, "wb")
f.close()
- f = self.open(support.TESTFN, "U")
+ with support.check_warnings(('', DeprecationWarning)):
+ f = self.open(support.TESTFN, "U")
self.assertEqual(f.name, support.TESTFN)
self.assertEqual(f.buffer.name, support.TESTFN)
self.assertEqual(f.buffer.raw.name, support.TESTFN)
@@ -2947,7 +3097,7 @@ class MiscIOTest(unittest.TestCase):
for fd in fds:
try:
os.close(fd)
- except EnvironmentError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise
self.addCleanup(cleanup_fds)
@@ -3069,7 +3219,6 @@ class MiscIOTest(unittest.TestCase):
class CMiscIOTest(MiscIOTest):
io = io
- shutdown_error = "RuntimeError: could not find io module state"
def test_readinto_buffer_overflow(self):
# Issue #18025
@@ -3082,7 +3231,6 @@ class CMiscIOTest(MiscIOTest):
class PyMiscIOTest(MiscIOTest):
io = pyio
- shutdown_error = "LookupError: unknown encoding: ascii"
@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
@@ -3115,15 +3263,18 @@ class SignalsTest(unittest.TestCase):
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
- signal.alarm(1)
# Fill the pipe enough that the write will be blocking.
# It will be interrupted by the timer armed above. Since the
# other thread has read one byte, the low-level write will
# return with a successful (partial) result rather than an EINTR.
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
- self.assertRaises(ZeroDivisionError,
- wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
+ signal.alarm(1)
+ try:
+ self.assertRaises(ZeroDivisionError,
+ wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
+ finally:
+ signal.alarm(0)
t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
@@ -3137,7 +3288,7 @@ class SignalsTest(unittest.TestCase):
# buffer, and block again.
try:
wio.close()
- except IOError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise
@@ -3265,7 +3416,7 @@ class SignalsTest(unittest.TestCase):
# buffer, and could block (in case of failure).
try:
wio.close()
- except IOError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise