summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py514
1 files changed, 432 insertions, 82 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 4b49d33..416c547 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -35,6 +35,7 @@ import weakref
from collections import deque, UserList
from itertools import cycle, count
from test import support
+from test.script_helper import assert_python_ok, run_python_until_end
import codecs
import io # C implementation of io
@@ -164,7 +165,7 @@ class CloseFailureIO(MockRawIO):
def close(self):
if not self.closed:
self.closed = 1
- raise IOError
+ raise OSError
class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
pass
@@ -592,13 +593,44 @@ class IOTest(unittest.TestCase):
with self.open(zero, "r") as f:
self.assertRaises(OverflowError, f.read)
- def test_flush_error_on_close(self):
- f = self.open(support.TESTFN, "wb", buffering=0)
+ def check_flush_error_on_close(self, *args, **kwargs):
+ # Test that the file is closed despite failed flush
+ # and that flush() is called before file closed.
+ f = self.open(*args, **kwargs)
+ closed = []
def bad_flush():
- raise IOError()
+ closed[:] = [f.closed]
+ raise OSError()
f.flush = bad_flush
- self.assertRaises(IOError, f.close) # exception not swallowed
+ self.assertRaises(OSError, f.close) # exception not swallowed
self.assertTrue(f.closed)
+ self.assertTrue(closed) # flush() called
+ self.assertFalse(closed[0]) # flush() called before file closed
+ f.flush = lambda: None # break reference loop
+
+ def test_flush_error_on_close(self):
+ # raw file
+ # Issue #5700: io.FileIO calls flush() after file closed
+ self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
+ fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(fd, 'wb', buffering=0)
+ fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
+ os.close(fd)
+ # buffered io
+ self.check_flush_error_on_close(support.TESTFN, 'wb')
+ fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(fd, 'wb')
+ fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(fd, 'wb', closefd=False)
+ os.close(fd)
+ # text io
+ self.check_flush_error_on_close(support.TESTFN, 'w')
+ fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(fd, 'w')
+ fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(fd, 'w', closefd=False)
+ os.close(fd)
def test_multi_close(self):
f = self.open(support.TESTFN, "wb", buffering=0)
@@ -652,6 +684,20 @@ class IOTest(unittest.TestCase):
fileio.close()
f2.readline()
+ def test_nonbuffered_textio(self):
+ with warnings.catch_warnings(record=True) as recorded:
+ with self.assertRaises(ValueError):
+ self.open(support.TESTFN, 'w', buffering=0)
+ support.gc_collect()
+ self.assertEqual(recorded, [])
+
+ def test_invalid_newline(self):
+ with warnings.catch_warnings(record=True) as recorded:
+ with self.assertRaises(ValueError):
+ self.open(support.TESTFN, 'w', newline='invalid')
+ support.gc_collect()
+ self.assertEqual(recorded, [])
+
class CIOTest(IOTest):
@@ -686,6 +732,8 @@ class CommonBufferedTests:
self.assertIs(buf.detach(), raw)
self.assertRaises(ValueError, buf.detach)
+ repr(buf) # Should still work
+
def test_fileno(self):
rawio = self.MockRawIO()
bufio = self.tp(rawio)
@@ -757,7 +805,7 @@ class CommonBufferedTests:
if s:
# The destructor *may* have printed an unraisable error, check it
self.assertEqual(len(s.splitlines()), 1)
- self.assertTrue(s.startswith("Exception IOError: "), s)
+ self.assertTrue(s.startswith("Exception OSError: "), s)
self.assertTrue(s.endswith(" ignored"), s)
def test_repr(self):
@@ -771,29 +819,56 @@ class CommonBufferedTests:
self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
def test_flush_error_on_close(self):
+ # Test that buffered file is closed despite failed flush
+ # and that flush() is called before file closed.
raw = self.MockRawIO()
+ closed = []
def bad_flush():
- raise IOError()
+ closed[:] = [b.closed, raw.closed]
+ raise OSError()
raw.flush = bad_flush
b = self.tp(raw)
- self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertRaises(OSError, b.close) # exception not swallowed
self.assertTrue(b.closed)
+ self.assertTrue(raw.closed)
+ self.assertTrue(closed) # flush() called
+ self.assertFalse(closed[0]) # flush() called before file closed
+ self.assertFalse(closed[1])
+ raw.flush = lambda: None # break reference loop
def test_close_error_on_close(self):
raw = self.MockRawIO()
def bad_flush():
- raise IOError('flush')
+ raise OSError('flush')
def bad_close():
- raise IOError('close')
+ raise OSError('close')
raw.close = bad_close
b = self.tp(raw)
b.flush = bad_flush
- with self.assertRaises(IOError) as err: # exception not swallowed
+ with self.assertRaises(OSError) as err: # exception not swallowed
b.close()
self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
self.assertEqual(err.exception.__context__.args, ('flush',))
self.assertFalse(b.closed)
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ raw = self.MockRawIO()
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ raw.close = bad_close
+ b = self.tp(raw)
+ b.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ b.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(b.closed)
+
def test_multi_close(self):
raw = self.MockRawIO()
b = self.tp(raw)
@@ -828,6 +903,14 @@ class SizeofTest:
bufio = self.tp(rawio, buffer_size=bufsize2)
self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+ @support.cpython_only
+ def test_buffer_freeing(self) :
+ bufsize = 4096
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize)
+ size = sys.getsizeof(bufio) - bufsize
+ bufio.close()
+ self.assertEqual(sys.getsizeof(bufio), size)
class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
read_mode = "rb"
@@ -987,11 +1070,8 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
errors.append(e)
raise
threads = [threading.Thread(target=f) for x in range(20)]
- for t in threads:
- t.start()
- time.sleep(0.02) # yield
- for t in threads:
- t.join()
+ with support.start_threads(threads):
+ time.sleep(0.02) # yield
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
s = b''.join(results)
@@ -1012,8 +1092,8 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
def test_misbehaved_io(self):
rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
bufio = self.tp(rawio)
- self.assertRaises(IOError, bufio.seek, 0)
- self.assertRaises(IOError, bufio.tell)
+ self.assertRaises(OSError, bufio.seek, 0)
+ self.assertRaises(OSError, bufio.tell)
def test_no_extraneous_read(self):
# Issue #9550; when the raw IO object has satisfied the read request,
@@ -1035,6 +1115,14 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
self.assertEqual(rawio._extraneous_reads, 0,
"failed for {}: {} != 0".format(n, rawio._extraneous_reads))
+ def test_read_on_closed(self):
+ # Issue #23796
+ b = io.BufferedReader(io.BytesIO(b"12"))
+ b.read(1)
+ b.close()
+ self.assertRaises(ValueError, b.peek)
+ self.assertRaises(ValueError, b.read1, 1)
+
class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
tp = io.BufferedReader
@@ -1064,17 +1152,18 @@ class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
bufio = self.tp(rawio)
# _pyio.BufferedReader seems to implement reading different, so that
# checking this is not so easy.
- self.assertRaises(IOError, bufio.read, 10)
+ self.assertRaises(OSError, bufio.read, 10)
def test_garbage_collection(self):
# C BufferedReader objects are collected.
# The Python version has __del__, so it ends into gc.garbage instead
- rawio = self.FileIO(support.TESTFN, "w+b")
- f = self.tp(rawio)
- f.f = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = self.FileIO(support.TESTFN, "w+b")
+ f = self.tp(rawio)
+ f.f = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
def test_args_error(self):
@@ -1309,11 +1398,8 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
errors.append(e)
raise
threads = [threading.Thread(target=f) for x in range(20)]
- for t in threads:
- t.start()
- time.sleep(0.02) # yield
- for t in threads:
- t.join()
+ with support.start_threads(threads):
+ time.sleep(0.02) # yield
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
bufio.close()
@@ -1327,9 +1413,9 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
def test_misbehaved_io(self):
rawio = self.MisbehavedRawIO()
bufio = self.tp(rawio, 5)
- self.assertRaises(IOError, bufio.seek, 0)
- self.assertRaises(IOError, bufio.tell)
- self.assertRaises(IOError, bufio.write, b"abcdef")
+ self.assertRaises(OSError, bufio.seek, 0)
+ self.assertRaises(OSError, bufio.tell)
+ self.assertRaises(OSError, bufio.write, b"abcdef")
def test_max_buffer_size_removal(self):
with self.assertRaises(TypeError):
@@ -1338,11 +1424,11 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
def test_write_error_on_close(self):
raw = self.MockRawIO()
def bad_write(b):
- raise IOError()
+ raise OSError()
raw.write = bad_write
b = self.tp(raw)
b.write(b'spam')
- self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertRaises(OSError, b.close) # exception not swallowed
self.assertTrue(b.closed)
@@ -1373,13 +1459,14 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
# C BufferedWriter objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends into gc.garbage instead
- rawio = self.FileIO(support.TESTFN, "w+b")
- f = self.tp(rawio)
- f.write(b"123xxx")
- f.x = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = self.FileIO(support.TESTFN, "w+b")
+ f = self.tp(rawio)
+ f.write(b"123xxx")
+ f.x = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"123xxx")
@@ -1426,14 +1513,14 @@ class BufferedRWPairTest(unittest.TestCase):
def readable(self):
return False
- self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
+ self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
def test_constructor_with_not_writeable(self):
class NotWriteable(MockRawIO):
def writable(self):
return False
- self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
+ self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
def test_read(self):
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
@@ -1503,6 +1590,53 @@ class BufferedRWPairTest(unittest.TestCase):
pair.close()
self.assertTrue(pair.closed)
+ def test_reader_close_error_on_close(self):
+ def reader_close():
+ reader_non_existing
+ reader = self.MockRawIO()
+ reader.close = reader_close
+ writer = self.MockRawIO()
+ pair = self.tp(reader, writer)
+ with self.assertRaises(NameError) as err:
+ pair.close()
+ self.assertIn('reader_non_existing', str(err.exception))
+ self.assertTrue(pair.closed)
+ self.assertFalse(reader.closed)
+ self.assertTrue(writer.closed)
+
+ def test_writer_close_error_on_close(self):
+ def writer_close():
+ writer_non_existing
+ reader = self.MockRawIO()
+ writer = self.MockRawIO()
+ writer.close = writer_close
+ pair = self.tp(reader, writer)
+ with self.assertRaises(NameError) as err:
+ pair.close()
+ self.assertIn('writer_non_existing', str(err.exception))
+ self.assertFalse(pair.closed)
+ self.assertTrue(reader.closed)
+ self.assertFalse(writer.closed)
+
+ def test_reader_writer_close_error_on_close(self):
+ def reader_close():
+ reader_non_existing
+ def writer_close():
+ writer_non_existing
+ reader = self.MockRawIO()
+ reader.close = reader_close
+ writer = self.MockRawIO()
+ writer.close = writer_close
+ pair = self.tp(reader, writer)
+ with self.assertRaises(NameError) as err:
+ pair.close()
+ self.assertIn('reader_non_existing', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('writer_non_existing', str(err.exception.__context__))
+ self.assertFalse(pair.closed)
+ self.assertFalse(reader.closed)
+ self.assertFalse(writer.closed)
+
def test_isatty(self):
class SelectableIsAtty(MockRawIO):
def __init__(self, isatty):
@@ -1961,6 +2095,17 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertRaises(TypeError, t.__init__, b, newline=42)
self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
+ def test_uninitialized(self):
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ del t
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ self.assertRaises(Exception, repr, t)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ t.read, 0)
+ t.__init__(self.MockRawIO())
+ self.assertEqual(t.read(0), '')
+
def test_non_text_encoding_codecs_are_rejected(self):
# Ensure the constructor complains if passed a codec that isn't
# marked as a text encoding
@@ -1968,7 +2113,7 @@ class TextIOWrapperTest(unittest.TestCase):
r = self.BytesIO()
b = self.BufferedWriter(r)
with self.assertRaisesRegex(LookupError, "is not a text encoding"):
- self.TextIOWrapper(b, encoding="hex_codec")
+ self.TextIOWrapper(b, encoding="hex")
def test_detach(self):
r = self.BytesIO()
@@ -1983,6 +2128,12 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(r.getvalue(), b"howdy")
self.assertRaises(ValueError, t.detach)
+ # Operations independent of the detached stream should still work
+ repr(t)
+ self.assertEqual(t.encoding, "ascii")
+ self.assertEqual(t.errors, "strict")
+ self.assertFalse(t.line_buffering)
+
def test_repr(self):
raw = self.BytesIO("hello".encode("utf-8"))
b = self.BufferedReader(raw)
@@ -2000,6 +2151,9 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(repr(t),
"<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
+ t.buffer.detach()
+ repr(t) # Should not raise an exception
+
def test_line_buffering(self):
r = self.BytesIO()
b = self.BufferedWriter(r, 1000)
@@ -2215,7 +2369,7 @@ class TextIOWrapperTest(unittest.TestCase):
if s:
# The destructor *may* have printed an unraisable error, check it
self.assertEqual(len(s.splitlines()), 1)
- self.assertTrue(s.startswith("Exception IOError: "), s)
+ self.assertTrue(s.startswith("Exception OSError: "), s)
self.assertTrue(s.endswith(" ignored"), s)
# Systematic tests of the text I/O API
@@ -2287,7 +2441,7 @@ class TextIOWrapperTest(unittest.TestCase):
f.seek(0)
for line in f:
self.assertEqual(line, "\xff\n")
- self.assertRaises(IOError, f.tell)
+ self.assertRaises(OSError, f.tell)
self.assertEqual(f.tell(), p2)
f.close()
@@ -2391,7 +2545,7 @@ class TextIOWrapperTest(unittest.TestCase):
def readable(self):
return False
txt = self.TextIOWrapper(UnReadable())
- self.assertRaises(IOError, txt.read)
+ self.assertRaises(OSError, txt.read)
def test_read_one_by_one(self):
txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
@@ -2534,6 +2688,19 @@ class TextIOWrapperTest(unittest.TestCase):
with self.open(filename, 'rb') as f:
self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
+ def test_seek_append_bom(self):
+ # Same test, but first seek to the start and then to the end
+ filename = support.TESTFN
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with self.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ with self.open(filename, 'a', encoding=charset) as f:
+ f.seek(0)
+ f.seek(0, self.SEEK_END)
+ f.write('xxx')
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
+
def test_errors_property(self):
with self.open(support.TESTFN, "w") as f:
self.assertEqual(f.errors, "strict")
@@ -2550,26 +2717,64 @@ class TextIOWrapperTest(unittest.TestCase):
text = "Thread%03d\n" % n
event.wait()
f.write(text)
- threads = [threading.Thread(target=lambda n=x: run(n))
+ threads = [threading.Thread(target=run, args=(x,))
for x in range(20)]
- for t in threads:
- t.start()
- time.sleep(0.02)
- event.set()
- for t in threads:
- t.join()
+ with support.start_threads(threads, event.set):
+ time.sleep(0.02)
with self.open(support.TESTFN) as f:
content = f.read()
for n in range(20):
self.assertEqual(content.count("Thread%03d\n" % n), 1)
def test_flush_error_on_close(self):
+ # Test that text file is closed despite failed flush
+ # and that flush() is called before file closed.
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ closed = []
def bad_flush():
- raise IOError()
+ closed[:] = [txt.closed, txt.buffer.closed]
+ raise OSError()
txt.flush = bad_flush
- self.assertRaises(IOError, txt.close) # exception not swallowed
+ self.assertRaises(OSError, txt.close) # exception not swallowed
self.assertTrue(txt.closed)
+ self.assertTrue(txt.buffer.closed)
+ self.assertTrue(closed) # flush() called
+ self.assertFalse(closed[0]) # flush() called before file closed
+ self.assertFalse(closed[1])
+ txt.flush = lambda: None # break reference loop
+
+ def test_close_error_on_close(self):
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise OSError('flush')
+ def bad_close():
+ raise OSError('close')
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(OSError) as err: # exception not swallowed
+ txt.close()
+ self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
+ self.assertEqual(err.exception.__context__.args, ('flush',))
+ self.assertFalse(txt.closed)
+
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ txt.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(txt.closed)
def test_multi_close(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
@@ -2610,6 +2815,38 @@ class TextIOWrapperTest(unittest.TestCase):
txt.write('5')
self.assertEqual(b''.join(raw._write_stack), b'123\n45')
+ def test_bufio_write_through(self):
+ # Issue #21396: write_through=True doesn't force a flush()
+ # on the underlying binary buffered object.
+ flush_called, write_called = [], []
+ class BufferedWriter(self.BufferedWriter):
+ def flush(self, *args, **kwargs):
+ flush_called.append(True)
+ return super().flush(*args, **kwargs)
+ def write(self, *args, **kwargs):
+ write_called.append(True)
+ return super().write(*args, **kwargs)
+
+ rawio = self.BytesIO()
+ data = b"a"
+ bufio = BufferedWriter(rawio, len(data)*2)
+ textio = self.TextIOWrapper(bufio, encoding='ascii',
+ write_through=True)
+ # write to the buffered io but don't overflow the buffer
+ text = data.decode('ascii')
+ textio.write(text)
+
+ # buffer.flush is not called with write_through=True
+ self.assertFalse(flush_called)
+ # buffer.write *is* called with write_through=True
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), b"") # no flush
+
+ write_called = [] # reset
+ textio.write(text * 10) # total content is larger than bufio buffer
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), data * 11) # all flushed
+
def test_read_nonbytes(self):
# Issue #17106
# Crash when underlying read() returns non-bytes
@@ -2624,11 +2861,11 @@ class TextIOWrapperTest(unittest.TestCase):
# Issue #17106
# Bypass the early encoding check added in issue 20404
def _make_illegal_wrapper():
- quopri = codecs.lookup("quopri_codec")
+ quopri = codecs.lookup("quopri")
quopri._is_text_encoding = True
try:
t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
- newline='\n', encoding="quopri_codec")
+ newline='\n', encoding="quopri")
finally:
quopri._is_text_encoding = False
return t
@@ -2640,8 +2877,61 @@ class TextIOWrapperTest(unittest.TestCase):
t = _make_illegal_wrapper()
self.assertRaises(TypeError, t.read)
+ def _check_create_at_shutdown(self, **kwargs):
+ # Issue #20037: creating a TextIOWrapper at shutdown
+ # shouldn't crash the interpreter.
+ iomod = self.io.__name__
+ code = """if 1:
+ import codecs
+ import {iomod} as io
+
+ # Avoid looking up codecs at shutdown
+ codecs.lookup('utf-8')
+
+ class C:
+ def __init__(self):
+ self.buf = io.BytesIO()
+ def __del__(self):
+ io.TextIOWrapper(self.buf, **{kwargs})
+ print("ok")
+ c = C()
+ """.format(iomod=iomod, kwargs=kwargs)
+ return assert_python_ok("-c", code)
+
+ def test_create_at_shutdown_without_encoding(self):
+ rc, out, err = self._check_create_at_shutdown()
+ if err:
+ # Can error out with a RuntimeError if the module state
+ # isn't found.
+ self.assertIn(self.shutdown_error, err.decode())
+ else:
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_create_at_shutdown_with_encoding(self):
+ rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
+ errors='strict')
+ self.assertFalse(err)
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_issue22849(self):
+ class F(object):
+ def readable(self): return True
+ def writable(self): return True
+ def seekable(self): return True
+
+ for i in range(10):
+ try:
+ self.TextIOWrapper(F(), encoding='utf-8')
+ except Exception:
+ pass
+
+ F.tell = lambda x: 0
+ t = self.TextIOWrapper(F(), encoding='utf-8')
+
class CTextIOWrapperTest(TextIOWrapperTest):
+ io = io
+ shutdown_error = "RuntimeError: could not find io module state"
def test_initialization(self):
r = self.BytesIO(b"\xc3\xa9\n\n")
@@ -2652,18 +2942,22 @@ class CTextIOWrapperTest(TextIOWrapperTest):
self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
self.assertRaises(ValueError, t.read)
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ self.assertRaises(Exception, repr, t)
+
def test_garbage_collection(self):
# C TextIOWrapper objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends in gc.garbage instead.
- rawio = io.FileIO(support.TESTFN, "wb")
- b = self.BufferedWriter(rawio)
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("456def")
- t.x = t
- wr = weakref.ref(t)
- del t
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ rawio = io.FileIO(support.TESTFN, "wb")
+ b = self.BufferedWriter(rawio)
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("456def")
+ t.x = t
+ wr = weakref.ref(t)
+ del t
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"456def")
@@ -2684,7 +2978,9 @@ class CTextIOWrapperTest(TextIOWrapperTest):
class PyTextIOWrapperTest(TextIOWrapperTest):
- pass
+ io = pyio
+ #shutdown_error = "LookupError: unknown encoding: ascii"
+ shutdown_error = "TypeError: 'NoneType' object is not iterable"
class IncrementalNewlineDecoderTest(unittest.TestCase):
@@ -2823,7 +3119,8 @@ class MiscIOTest(unittest.TestCase):
self.assertEqual(f.mode, "wb")
f.close()
- f = self.open(support.TESTFN, "U")
+ with support.check_warnings(('', DeprecationWarning)):
+ f = self.open(support.TESTFN, "U")
self.assertEqual(f.name, support.TESTFN)
self.assertEqual(f.buffer.name, support.TESTFN)
self.assertEqual(f.buffer.raw.name, support.TESTFN)
@@ -2953,7 +3250,7 @@ class MiscIOTest(unittest.TestCase):
for fd in fds:
try:
os.close(fd)
- except EnvironmentError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise
self.addCleanup(cleanup_fds)
@@ -3075,7 +3372,6 @@ class MiscIOTest(unittest.TestCase):
class CMiscIOTest(MiscIOTest):
io = io
- shutdown_error = "RuntimeError: could not find io module state"
def test_readinto_buffer_overflow(self):
# Issue #18025
@@ -3086,9 +3382,51 @@ class CMiscIOTest(MiscIOTest):
b = bytearray(2)
self.assertRaises(ValueError, bufio.readinto, b)
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def check_daemon_threads_shutdown_deadlock(self, stream_name):
+ # Issue #23309: deadlocks at shutdown should be avoided when a
+ # daemon thread and the main thread both write to a file.
+ code = """if 1:
+ import sys
+ import time
+ import threading
+
+ file = sys.{stream_name}
+
+ def run():
+ while True:
+ file.write('.')
+ file.flush()
+
+ thread = threading.Thread(target=run)
+ thread.daemon = True
+ thread.start()
+
+ time.sleep(0.5)
+ file.write('!')
+ file.flush()
+ """.format_map(locals())
+ res, _ = run_python_until_end("-c", code)
+ err = res.err.decode()
+ if res.rc != 0:
+ # Failure: should be a fatal error
+ self.assertIn("Fatal Python error: could not acquire lock "
+ "for <_io.BufferedWriter name='<{stream_name}>'> "
+ "at interpreter shutdown, possibly due to "
+ "daemon threads".format_map(locals()),
+ err)
+ else:
+ self.assertFalse(err.strip('.!'))
+
+ def test_daemon_threads_shutdown_stdout_deadlock(self):
+ self.check_daemon_threads_shutdown_deadlock('stdout')
+
+ def test_daemon_threads_shutdown_stderr_deadlock(self):
+ self.check_daemon_threads_shutdown_deadlock('stderr')
+
+
class PyMiscIOTest(MiscIOTest):
io = pyio
- shutdown_error = "LookupError: unknown encoding: ascii"
@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
@@ -3121,16 +3459,19 @@ class SignalsTest(unittest.TestCase):
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
- signal.alarm(1)
# Fill the pipe enough that the write will be blocking.
# It will be interrupted by the timer armed above. Since the
# other thread has read one byte, the low-level write will
# return with a successful (partial) result rather than an EINTR.
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
- self.assertRaises(ZeroDivisionError,
- wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
- t.join()
+ signal.alarm(1)
+ try:
+ with self.assertRaises(ZeroDivisionError):
+ wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
+ finally:
+ signal.alarm(0)
+ t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
read_results.append(os.read(r, 1))
@@ -3143,7 +3484,7 @@ class SignalsTest(unittest.TestCase):
# buffer, and block again.
try:
wio.close()
- except IOError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise
@@ -3153,6 +3494,8 @@ class SignalsTest(unittest.TestCase):
def test_interrupted_write_buffered(self):
self.check_interrupted_write(b"xy", b"xy", mode="wb")
+ # Issue #22331: The test hangs on FreeBSD 7.2
+ @support.requires_freebsd_version(8)
def test_interrupted_write_text(self):
self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
@@ -3236,11 +3579,16 @@ class SignalsTest(unittest.TestCase):
# received (forcing a first EINTR in write()).
read_results = []
write_finished = False
+ error = None
def _read():
- while not write_finished:
- while r in select.select([r], [], [], 1.0)[0]:
- s = os.read(r, 1024)
- read_results.append(s)
+ try:
+ while not write_finished:
+ while r in select.select([r], [], [], 1.0)[0]:
+ s = os.read(r, 1024)
+ read_results.append(s)
+ except BaseException as exc:
+ nonlocal error
+ error = exc
t = threading.Thread(target=_read)
t.daemon = True
def alarm1(sig, frame):
@@ -3261,6 +3609,8 @@ class SignalsTest(unittest.TestCase):
wio.flush()
write_finished = True
t.join()
+
+ self.assertIsNone(error)
self.assertEqual(N, sum(len(x) for x in read_results))
finally:
write_finished = True
@@ -3271,7 +3621,7 @@ class SignalsTest(unittest.TestCase):
# buffer, and could block (in case of failure).
try:
wio.close()
- except IOError as e:
+ except OSError as e:
if e.errno != errno.EBADF:
raise