diff options
Diffstat (limited to 'Lib/test/test_io.py')
| -rw-r--r-- | Lib/test/test_io.py | 514 |
1 files changed, 432 insertions, 82 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 4b49d33..416c547 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -35,6 +35,7 @@ import weakref from collections import deque, UserList from itertools import cycle, count from test import support +from test.script_helper import assert_python_ok, run_python_until_end import codecs import io # C implementation of io @@ -164,7 +165,7 @@ class CloseFailureIO(MockRawIO): def close(self): if not self.closed: self.closed = 1 - raise IOError + raise OSError class CCloseFailureIO(CloseFailureIO, io.RawIOBase): pass @@ -592,13 +593,44 @@ class IOTest(unittest.TestCase): with self.open(zero, "r") as f: self.assertRaises(OverflowError, f.read) - def test_flush_error_on_close(self): - f = self.open(support.TESTFN, "wb", buffering=0) + def check_flush_error_on_close(self, *args, **kwargs): + # Test that the file is closed despite failed flush + # and that flush() is called before file closed. + f = self.open(*args, **kwargs) + closed = [] def bad_flush(): - raise IOError() + closed[:] = [f.closed] + raise OSError() f.flush = bad_flush - self.assertRaises(IOError, f.close) # exception not swallowed + self.assertRaises(OSError, f.close) # exception not swallowed self.assertTrue(f.closed) + self.assertTrue(closed) # flush() called + self.assertFalse(closed[0]) # flush() called before file closed + f.flush = lambda: None # break reference loop + + def test_flush_error_on_close(self): + # raw file + # Issue #5700: io.FileIO calls flush() after file closed + self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) + fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(fd, 'wb', buffering=0) + fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) + os.close(fd) + # buffered io + self.check_flush_error_on_close(support.TESTFN, 'wb') + fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(fd, 'wb') + fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(fd, 'wb', closefd=False) + os.close(fd) + # text io + self.check_flush_error_on_close(support.TESTFN, 'w') + fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(fd, 'w') + fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(fd, 'w', closefd=False) + os.close(fd) def test_multi_close(self): f = self.open(support.TESTFN, "wb", buffering=0) @@ -652,6 +684,20 @@ class IOTest(unittest.TestCase): fileio.close() f2.readline() + def test_nonbuffered_textio(self): + with warnings.catch_warnings(record=True) as recorded: + with self.assertRaises(ValueError): + self.open(support.TESTFN, 'w', buffering=0) + support.gc_collect() + self.assertEqual(recorded, []) + + def test_invalid_newline(self): + with warnings.catch_warnings(record=True) as recorded: + with self.assertRaises(ValueError): + self.open(support.TESTFN, 'w', newline='invalid') + support.gc_collect() + self.assertEqual(recorded, []) + class CIOTest(IOTest): @@ -686,6 +732,8 @@ class CommonBufferedTests: self.assertIs(buf.detach(), raw) self.assertRaises(ValueError, buf.detach) + repr(buf) # Should still work + def test_fileno(self): rawio = self.MockRawIO() bufio = self.tp(rawio) @@ -757,7 +805,7 @@ class CommonBufferedTests: if s: # The destructor *may* have printed an unraisable error, check it self.assertEqual(len(s.splitlines()), 1) - self.assertTrue(s.startswith("Exception IOError: "), s) + self.assertTrue(s.startswith("Exception OSError: "), s) self.assertTrue(s.endswith(" ignored"), s) def test_repr(self): @@ -771,29 +819,56 @@ class CommonBufferedTests: self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname) def test_flush_error_on_close(self): + # Test that buffered file is closed despite failed flush + # and that flush() is called before file closed. raw = self.MockRawIO() + closed = [] def bad_flush(): - raise IOError() + closed[:] = [b.closed, raw.closed] + raise OSError() raw.flush = bad_flush b = self.tp(raw) - self.assertRaises(IOError, b.close) # exception not swallowed + self.assertRaises(OSError, b.close) # exception not swallowed self.assertTrue(b.closed) + self.assertTrue(raw.closed) + self.assertTrue(closed) # flush() called + self.assertFalse(closed[0]) # flush() called before file closed + self.assertFalse(closed[1]) + raw.flush = lambda: None # break reference loop def test_close_error_on_close(self): raw = self.MockRawIO() def bad_flush(): - raise IOError('flush') + raise OSError('flush') def bad_close(): - raise IOError('close') + raise OSError('close') raw.close = bad_close b = self.tp(raw) b.flush = bad_flush - with self.assertRaises(IOError) as err: # exception not swallowed + with self.assertRaises(OSError) as err: # exception not swallowed b.close() self.assertEqual(err.exception.args, ('close',)) + self.assertIsInstance(err.exception.__context__, OSError) self.assertEqual(err.exception.__context__.args, ('flush',)) self.assertFalse(b.closed) + def test_nonnormalized_close_error_on_close(self): + # Issue #21677 + raw = self.MockRawIO() + def bad_flush(): + raise non_existing_flush + def bad_close(): + raise non_existing_close + raw.close = bad_close + b = self.tp(raw) + b.flush = bad_flush + with self.assertRaises(NameError) as err: # exception not swallowed + b.close() + self.assertIn('non_existing_close', str(err.exception)) + self.assertIsInstance(err.exception.__context__, NameError) + self.assertIn('non_existing_flush', str(err.exception.__context__)) + self.assertFalse(b.closed) + def test_multi_close(self): raw = self.MockRawIO() b = self.tp(raw) @@ -828,6 +903,14 @@ class SizeofTest: bufio = self.tp(rawio, buffer_size=bufsize2) self.assertEqual(sys.getsizeof(bufio), size + bufsize2) + @support.cpython_only + def test_buffer_freeing(self) : + bufsize = 4096 + rawio = self.MockRawIO() + bufio = self.tp(rawio, buffer_size=bufsize) + size = sys.getsizeof(bufio) - bufsize + bufio.close() + self.assertEqual(sys.getsizeof(bufio), size) class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): read_mode = "rb" @@ -987,11 +1070,8 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): errors.append(e) raise threads = [threading.Thread(target=f) for x in range(20)] - for t in threads: - t.start() - time.sleep(0.02) # yield - for t in threads: - t.join() + with support.start_threads(threads): + time.sleep(0.02) # yield self.assertFalse(errors, "the following exceptions were caught: %r" % errors) s = b''.join(results) @@ -1012,8 +1092,8 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): def test_misbehaved_io(self): rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) bufio = self.tp(rawio) - self.assertRaises(IOError, bufio.seek, 0) - self.assertRaises(IOError, bufio.tell) + self.assertRaises(OSError, bufio.seek, 0) + self.assertRaises(OSError, bufio.tell) def test_no_extraneous_read(self): # Issue #9550; when the raw IO object has satisfied the read request, @@ -1035,6 +1115,14 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(rawio._extraneous_reads, 0, "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) + def test_read_on_closed(self): + # Issue #23796 + b = io.BufferedReader(io.BytesIO(b"12")) + b.read(1) + b.close() + self.assertRaises(ValueError, b.peek) + self.assertRaises(ValueError, b.read1, 1) + class CBufferedReaderTest(BufferedReaderTest, SizeofTest): tp = io.BufferedReader @@ -1064,17 +1152,18 @@ class CBufferedReaderTest(BufferedReaderTest, SizeofTest): bufio = self.tp(rawio) # _pyio.BufferedReader seems to implement reading different, so that # checking this is not so easy. - self.assertRaises(IOError, bufio.read, 10) + self.assertRaises(OSError, bufio.read, 10) def test_garbage_collection(self): # C BufferedReader objects are collected. # The Python version has __del__, so it ends into gc.garbage instead - rawio = self.FileIO(support.TESTFN, "w+b") - f = self.tp(rawio) - f.f = f - wr = weakref.ref(f) - del f - support.gc_collect() + with support.check_warnings(('', ResourceWarning)): + rawio = self.FileIO(support.TESTFN, "w+b") + f = self.tp(rawio) + f.f = f + wr = weakref.ref(f) + del f + support.gc_collect() self.assertTrue(wr() is None, wr) def test_args_error(self): @@ -1309,11 +1398,8 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): errors.append(e) raise threads = [threading.Thread(target=f) for x in range(20)] - for t in threads: - t.start() - time.sleep(0.02) # yield - for t in threads: - t.join() + with support.start_threads(threads): + time.sleep(0.02) # yield self.assertFalse(errors, "the following exceptions were caught: %r" % errors) bufio.close() @@ -1327,9 +1413,9 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): def test_misbehaved_io(self): rawio = self.MisbehavedRawIO() bufio = self.tp(rawio, 5) - self.assertRaises(IOError, bufio.seek, 0) - self.assertRaises(IOError, bufio.tell) - self.assertRaises(IOError, bufio.write, b"abcdef") + self.assertRaises(OSError, bufio.seek, 0) + self.assertRaises(OSError, bufio.tell) + self.assertRaises(OSError, bufio.write, b"abcdef") def test_max_buffer_size_removal(self): with self.assertRaises(TypeError): @@ -1338,11 +1424,11 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): def test_write_error_on_close(self): raw = self.MockRawIO() def bad_write(b): - raise IOError() + raise OSError() raw.write = bad_write b = self.tp(raw) b.write(b'spam') - self.assertRaises(IOError, b.close) # exception not swallowed + self.assertRaises(OSError, b.close) # exception not swallowed self.assertTrue(b.closed) @@ -1373,13 +1459,14 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest): # C BufferedWriter objects are collected, and collecting them flushes # all data to disk. # The Python version has __del__, so it ends into gc.garbage instead - rawio = self.FileIO(support.TESTFN, "w+b") - f = self.tp(rawio) - f.write(b"123xxx") - f.x = f - wr = weakref.ref(f) - del f - support.gc_collect() + with support.check_warnings(('', ResourceWarning)): + rawio = self.FileIO(support.TESTFN, "w+b") + f = self.tp(rawio) + f.write(b"123xxx") + f.x = f + wr = weakref.ref(f) + del f + support.gc_collect() self.assertTrue(wr() is None, wr) with self.open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"123xxx") @@ -1426,14 +1513,14 @@ class BufferedRWPairTest(unittest.TestCase): def readable(self): return False - self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) + self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) def test_constructor_with_not_writeable(self): class NotWriteable(MockRawIO): def writable(self): return False - self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) + self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) def test_read(self): pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) @@ -1503,6 +1590,53 @@ class BufferedRWPairTest(unittest.TestCase): pair.close() self.assertTrue(pair.closed) + def test_reader_close_error_on_close(self): + def reader_close(): + reader_non_existing + reader = self.MockRawIO() + reader.close = reader_close + writer = self.MockRawIO() + pair = self.tp(reader, writer) + with self.assertRaises(NameError) as err: + pair.close() + self.assertIn('reader_non_existing', str(err.exception)) + self.assertTrue(pair.closed) + self.assertFalse(reader.closed) + self.assertTrue(writer.closed) + + def test_writer_close_error_on_close(self): + def writer_close(): + writer_non_existing + reader = self.MockRawIO() + writer = self.MockRawIO() + writer.close = writer_close + pair = self.tp(reader, writer) + with self.assertRaises(NameError) as err: + pair.close() + self.assertIn('writer_non_existing', str(err.exception)) + self.assertFalse(pair.closed) + self.assertTrue(reader.closed) + self.assertFalse(writer.closed) + + def test_reader_writer_close_error_on_close(self): + def reader_close(): + reader_non_existing + def writer_close(): + writer_non_existing + reader = self.MockRawIO() + reader.close = reader_close + writer = self.MockRawIO() + writer.close = writer_close + pair = self.tp(reader, writer) + with self.assertRaises(NameError) as err: + pair.close() + self.assertIn('reader_non_existing', str(err.exception)) + self.assertIsInstance(err.exception.__context__, NameError) + self.assertIn('writer_non_existing', str(err.exception.__context__)) + self.assertFalse(pair.closed) + self.assertFalse(reader.closed) + self.assertFalse(writer.closed) + def test_isatty(self): class SelectableIsAtty(MockRawIO): def __init__(self, isatty): @@ -1961,6 +2095,17 @@ class TextIOWrapperTest(unittest.TestCase): self.assertRaises(TypeError, t.__init__, b, newline=42) self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') + def test_uninitialized(self): + t = self.TextIOWrapper.__new__(self.TextIOWrapper) + del t + t = self.TextIOWrapper.__new__(self.TextIOWrapper) + self.assertRaises(Exception, repr, t) + self.assertRaisesRegex((ValueError, AttributeError), + 'uninitialized|has no attribute', + t.read, 0) + t.__init__(self.MockRawIO()) + self.assertEqual(t.read(0), '') + def test_non_text_encoding_codecs_are_rejected(self): # Ensure the constructor complains if passed a codec that isn't # marked as a text encoding @@ -1968,7 +2113,7 @@ class TextIOWrapperTest(unittest.TestCase): r = self.BytesIO() b = self.BufferedWriter(r) with self.assertRaisesRegex(LookupError, "is not a text encoding"): - self.TextIOWrapper(b, encoding="hex_codec") + self.TextIOWrapper(b, encoding="hex") def test_detach(self): r = self.BytesIO() @@ -1983,6 +2128,12 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(r.getvalue(), b"howdy") self.assertRaises(ValueError, t.detach) + # Operations independent of the detached stream should still work + repr(t) + self.assertEqual(t.encoding, "ascii") + self.assertEqual(t.errors, "strict") + self.assertFalse(t.line_buffering) + def test_repr(self): raw = self.BytesIO("hello".encode("utf-8")) b = self.BufferedReader(raw) @@ -2000,6 +2151,9 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(repr(t), "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) + t.buffer.detach() + repr(t) # Should not raise an exception + def test_line_buffering(self): r = self.BytesIO() b = self.BufferedWriter(r, 1000) @@ -2215,7 +2369,7 @@ class TextIOWrapperTest(unittest.TestCase): if s: # The destructor *may* have printed an unraisable error, check it self.assertEqual(len(s.splitlines()), 1) - self.assertTrue(s.startswith("Exception IOError: "), s) + self.assertTrue(s.startswith("Exception OSError: "), s) self.assertTrue(s.endswith(" ignored"), s) # Systematic tests of the text I/O API @@ -2287,7 +2441,7 @@ class TextIOWrapperTest(unittest.TestCase): f.seek(0) for line in f: self.assertEqual(line, "\xff\n") - self.assertRaises(IOError, f.tell) + self.assertRaises(OSError, f.tell) self.assertEqual(f.tell(), p2) f.close() @@ -2391,7 +2545,7 @@ class TextIOWrapperTest(unittest.TestCase): def readable(self): return False txt = self.TextIOWrapper(UnReadable()) - self.assertRaises(IOError, txt.read) + self.assertRaises(OSError, txt.read) def test_read_one_by_one(self): txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) @@ -2534,6 +2688,19 @@ class TextIOWrapperTest(unittest.TestCase): with self.open(filename, 'rb') as f: self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) + def test_seek_append_bom(self): + # Same test, but first seek to the start and then to the end + filename = support.TESTFN + for charset in ('utf-8-sig', 'utf-16', 'utf-32'): + with self.open(filename, 'w', encoding=charset) as f: + f.write('aaa') + with self.open(filename, 'a', encoding=charset) as f: + f.seek(0) + f.seek(0, self.SEEK_END) + f.write('xxx') + with self.open(filename, 'rb') as f: + self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) + def test_errors_property(self): with self.open(support.TESTFN, "w") as f: self.assertEqual(f.errors, "strict") @@ -2550,26 +2717,64 @@ class TextIOWrapperTest(unittest.TestCase): text = "Thread%03d\n" % n event.wait() f.write(text) - threads = [threading.Thread(target=lambda n=x: run(n)) + threads = [threading.Thread(target=run, args=(x,)) for x in range(20)] - for t in threads: - t.start() - time.sleep(0.02) - event.set() - for t in threads: - t.join() + with support.start_threads(threads, event.set): + time.sleep(0.02) with self.open(support.TESTFN) as f: content = f.read() for n in range(20): self.assertEqual(content.count("Thread%03d\n" % n), 1) def test_flush_error_on_close(self): + # Test that text file is closed despite failed flush + # and that flush() is called before file closed. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + closed = [] def bad_flush(): - raise IOError() + closed[:] = [txt.closed, txt.buffer.closed] + raise OSError() txt.flush = bad_flush - self.assertRaises(IOError, txt.close) # exception not swallowed + self.assertRaises(OSError, txt.close) # exception not swallowed self.assertTrue(txt.closed) + self.assertTrue(txt.buffer.closed) + self.assertTrue(closed) # flush() called + self.assertFalse(closed[0]) # flush() called before file closed + self.assertFalse(closed[1]) + txt.flush = lambda: None # break reference loop + + def test_close_error_on_close(self): + buffer = self.BytesIO(self.testdata) + def bad_flush(): + raise OSError('flush') + def bad_close(): + raise OSError('close') + buffer.close = bad_close + txt = self.TextIOWrapper(buffer, encoding="ascii") + txt.flush = bad_flush + with self.assertRaises(OSError) as err: # exception not swallowed + txt.close() + self.assertEqual(err.exception.args, ('close',)) + self.assertIsInstance(err.exception.__context__, OSError) + self.assertEqual(err.exception.__context__.args, ('flush',)) + self.assertFalse(txt.closed) + + def test_nonnormalized_close_error_on_close(self): + # Issue #21677 + buffer = self.BytesIO(self.testdata) + def bad_flush(): + raise non_existing_flush + def bad_close(): + raise non_existing_close + buffer.close = bad_close + txt = self.TextIOWrapper(buffer, encoding="ascii") + txt.flush = bad_flush + with self.assertRaises(NameError) as err: # exception not swallowed + txt.close() + self.assertIn('non_existing_close', str(err.exception)) + self.assertIsInstance(err.exception.__context__, NameError) + self.assertIn('non_existing_flush', str(err.exception.__context__)) + self.assertFalse(txt.closed) def test_multi_close(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") @@ -2610,6 +2815,38 @@ class TextIOWrapperTest(unittest.TestCase): txt.write('5') self.assertEqual(b''.join(raw._write_stack), b'123\n45') + def test_bufio_write_through(self): + # Issue #21396: write_through=True doesn't force a flush() + # on the underlying binary buffered object. + flush_called, write_called = [], [] + class BufferedWriter(self.BufferedWriter): + def flush(self, *args, **kwargs): + flush_called.append(True) + return super().flush(*args, **kwargs) + def write(self, *args, **kwargs): + write_called.append(True) + return super().write(*args, **kwargs) + + rawio = self.BytesIO() + data = b"a" + bufio = BufferedWriter(rawio, len(data)*2) + textio = self.TextIOWrapper(bufio, encoding='ascii', + write_through=True) + # write to the buffered io but don't overflow the buffer + text = data.decode('ascii') + textio.write(text) + + # buffer.flush is not called with write_through=True + self.assertFalse(flush_called) + # buffer.write *is* called with write_through=True + self.assertTrue(write_called) + self.assertEqual(rawio.getvalue(), b"") # no flush + + write_called = [] # reset + textio.write(text * 10) # total content is larger than bufio buffer + self.assertTrue(write_called) + self.assertEqual(rawio.getvalue(), data * 11) # all flushed + def test_read_nonbytes(self): # Issue #17106 # Crash when underlying read() returns non-bytes @@ -2624,11 +2861,11 @@ class TextIOWrapperTest(unittest.TestCase): # Issue #17106 # Bypass the early encoding check added in issue 20404 def _make_illegal_wrapper(): - quopri = codecs.lookup("quopri_codec") + quopri = codecs.lookup("quopri") quopri._is_text_encoding = True try: t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), - newline='\n', encoding="quopri_codec") + newline='\n', encoding="quopri") finally: quopri._is_text_encoding = False return t @@ -2640,8 +2877,61 @@ class TextIOWrapperTest(unittest.TestCase): t = _make_illegal_wrapper() self.assertRaises(TypeError, t.read) + def _check_create_at_shutdown(self, **kwargs): + # Issue #20037: creating a TextIOWrapper at shutdown + # shouldn't crash the interpreter. + iomod = self.io.__name__ + code = """if 1: + import codecs + import {iomod} as io + + # Avoid looking up codecs at shutdown + codecs.lookup('utf-8') + + class C: + def __init__(self): + self.buf = io.BytesIO() + def __del__(self): + io.TextIOWrapper(self.buf, **{kwargs}) + print("ok") + c = C() + """.format(iomod=iomod, kwargs=kwargs) + return assert_python_ok("-c", code) + + def test_create_at_shutdown_without_encoding(self): + rc, out, err = self._check_create_at_shutdown() + if err: + # Can error out with a RuntimeError if the module state + # isn't found. + self.assertIn(self.shutdown_error, err.decode()) + else: + self.assertEqual("ok", out.decode().strip()) + + def test_create_at_shutdown_with_encoding(self): + rc, out, err = self._check_create_at_shutdown(encoding='utf-8', + errors='strict') + self.assertFalse(err) + self.assertEqual("ok", out.decode().strip()) + + def test_issue22849(self): + class F(object): + def readable(self): return True + def writable(self): return True + def seekable(self): return True + + for i in range(10): + try: + self.TextIOWrapper(F(), encoding='utf-8') + except Exception: + pass + + F.tell = lambda x: 0 + t = self.TextIOWrapper(F(), encoding='utf-8') + class CTextIOWrapperTest(TextIOWrapperTest): + io = io + shutdown_error = "RuntimeError: could not find io module state" def test_initialization(self): r = self.BytesIO(b"\xc3\xa9\n\n") @@ -2652,18 +2942,22 @@ class CTextIOWrapperTest(TextIOWrapperTest): self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') self.assertRaises(ValueError, t.read) + t = self.TextIOWrapper.__new__(self.TextIOWrapper) + self.assertRaises(Exception, repr, t) + def test_garbage_collection(self): # C TextIOWrapper objects are collected, and collecting them flushes # all data to disk. # The Python version has __del__, so it ends in gc.garbage instead. - rawio = io.FileIO(support.TESTFN, "wb") - b = self.BufferedWriter(rawio) - t = self.TextIOWrapper(b, encoding="ascii") - t.write("456def") - t.x = t - wr = weakref.ref(t) - del t - support.gc_collect() + with support.check_warnings(('', ResourceWarning)): + rawio = io.FileIO(support.TESTFN, "wb") + b = self.BufferedWriter(rawio) + t = self.TextIOWrapper(b, encoding="ascii") + t.write("456def") + t.x = t + wr = weakref.ref(t) + del t + support.gc_collect() self.assertTrue(wr() is None, wr) with self.open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"456def") @@ -2684,7 +2978,9 @@ class CTextIOWrapperTest(TextIOWrapperTest): class PyTextIOWrapperTest(TextIOWrapperTest): - pass + io = pyio + #shutdown_error = "LookupError: unknown encoding: ascii" + shutdown_error = "TypeError: 'NoneType' object is not iterable" class IncrementalNewlineDecoderTest(unittest.TestCase): @@ -2823,7 +3119,8 @@ class MiscIOTest(unittest.TestCase): self.assertEqual(f.mode, "wb") f.close() - f = self.open(support.TESTFN, "U") + with support.check_warnings(('', DeprecationWarning)): + f = self.open(support.TESTFN, "U") self.assertEqual(f.name, support.TESTFN) self.assertEqual(f.buffer.name, support.TESTFN) self.assertEqual(f.buffer.raw.name, support.TESTFN) @@ -2953,7 +3250,7 @@ class MiscIOTest(unittest.TestCase): for fd in fds: try: os.close(fd) - except EnvironmentError as e: + except OSError as e: if e.errno != errno.EBADF: raise self.addCleanup(cleanup_fds) @@ -3075,7 +3372,6 @@ class MiscIOTest(unittest.TestCase): class CMiscIOTest(MiscIOTest): io = io - shutdown_error = "RuntimeError: could not find io module state" def test_readinto_buffer_overflow(self): # Issue #18025 @@ -3086,9 +3382,51 @@ class CMiscIOTest(MiscIOTest): b = bytearray(2) self.assertRaises(ValueError, bufio.readinto, b) + @unittest.skipUnless(threading, 'Threading required for this test.') + def check_daemon_threads_shutdown_deadlock(self, stream_name): + # Issue #23309: deadlocks at shutdown should be avoided when a + # daemon thread and the main thread both write to a file. + code = """if 1: + import sys + import time + import threading + + file = sys.{stream_name} + + def run(): + while True: + file.write('.') + file.flush() + + thread = threading.Thread(target=run) + thread.daemon = True + thread.start() + + time.sleep(0.5) + file.write('!') + file.flush() + """.format_map(locals()) + res, _ = run_python_until_end("-c", code) + err = res.err.decode() + if res.rc != 0: + # Failure: should be a fatal error + self.assertIn("Fatal Python error: could not acquire lock " + "for <_io.BufferedWriter name='<{stream_name}>'> " + "at interpreter shutdown, possibly due to " + "daemon threads".format_map(locals()), + err) + else: + self.assertFalse(err.strip('.!')) + + def test_daemon_threads_shutdown_stdout_deadlock(self): + self.check_daemon_threads_shutdown_deadlock('stdout') + + def test_daemon_threads_shutdown_stderr_deadlock(self): + self.check_daemon_threads_shutdown_deadlock('stderr') + + class PyMiscIOTest(MiscIOTest): io = pyio - shutdown_error = "LookupError: unknown encoding: ascii" @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') @@ -3121,16 +3459,19 @@ class SignalsTest(unittest.TestCase): try: wio = self.io.open(w, **fdopen_kwargs) t.start() - signal.alarm(1) # Fill the pipe enough that the write will be blocking. # It will be interrupted by the timer armed above. Since the # other thread has read one byte, the low-level write will # return with a successful (partial) result rather than an EINTR. # The buffered IO layer must check for pending signal # handlers, which in this case will invoke alarm_interrupt(). - self.assertRaises(ZeroDivisionError, - wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1)) - t.join() + signal.alarm(1) + try: + with self.assertRaises(ZeroDivisionError): + wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1)) + finally: + signal.alarm(0) + t.join() # We got one byte, get another one and check that it isn't a # repeat of the first one. read_results.append(os.read(r, 1)) @@ -3143,7 +3484,7 @@ class SignalsTest(unittest.TestCase): # buffer, and block again. try: wio.close() - except IOError as e: + except OSError as e: if e.errno != errno.EBADF: raise @@ -3153,6 +3494,8 @@ class SignalsTest(unittest.TestCase): def test_interrupted_write_buffered(self): self.check_interrupted_write(b"xy", b"xy", mode="wb") + # Issue #22331: The test hangs on FreeBSD 7.2 + @support.requires_freebsd_version(8) def test_interrupted_write_text(self): self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") @@ -3236,11 +3579,16 @@ class SignalsTest(unittest.TestCase): # received (forcing a first EINTR in write()). read_results = [] write_finished = False + error = None def _read(): - while not write_finished: - while r in select.select([r], [], [], 1.0)[0]: - s = os.read(r, 1024) - read_results.append(s) + try: + while not write_finished: + while r in select.select([r], [], [], 1.0)[0]: + s = os.read(r, 1024) + read_results.append(s) + except BaseException as exc: + nonlocal error + error = exc t = threading.Thread(target=_read) t.daemon = True def alarm1(sig, frame): @@ -3261,6 +3609,8 @@ class SignalsTest(unittest.TestCase): wio.flush() write_finished = True t.join() + + self.assertIsNone(error) self.assertEqual(N, sum(len(x) for x in read_results)) finally: write_finished = True @@ -3271,7 +3621,7 @@ class SignalsTest(unittest.TestCase): # buffer, and could block (in case of failure). try: wio.close() - except IOError as e: + except OSError as e: if e.errno != errno.EBADF: raise |
