diff options
Diffstat (limited to 'Lib/test/test_io.py')
| -rw-r--r-- | Lib/test/test_io.py | 543 |
1 files changed, 462 insertions, 81 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 5333bb6..ea82cea 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -23,23 +23,29 @@ import os import sys import time import array -import threading import random import unittest -import warnings import weakref -import gc import abc import signal import errno -from itertools import chain, cycle, count +import warnings +import pickle +from itertools import cycle, count from collections import deque from test import support import codecs import io # C implementation of io import _pyio as pyio # Python implementation of io - +try: + import threading +except ImportError: + threading = None +try: + import fcntl +except ImportError: + fcntl = None def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" @@ -189,6 +195,23 @@ class PyMockFileIO(MockFileIO, pyio.BytesIO): pass +class MockUnseekableIO: + def seekable(self): + return False + + def seek(self, *args): + raise self.UnsupportedOperation("not seekable") + + def tell(self, *args): + raise self.UnsupportedOperation("not seekable") + +class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): + UnsupportedOperation = io.UnsupportedOperation + +class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): + UnsupportedOperation = pyio.UnsupportedOperation + + class MockNonBlockWriterIO: def __init__(self): @@ -222,9 +245,14 @@ class MockNonBlockWriterIO: except ValueError: pass else: - self._blocker_char = None - self._write_stack.append(b[:n]) - raise self.BlockingIOError(0, "test blocking", n) + if n > 0: + # write data up to the first blocker + self._write_stack.append(b[:n]) + return n + else: + # cancel blocker and indicate would block + self._blocker_char = None + return None self._write_stack.append(b) return len(b) @@ -314,16 +342,31 @@ class IOTest(unittest.TestCase): def test_invalid_operations(self): # Try writing on a file opened in read mode and vice-versa. + exc = self.UnsupportedOperation for mode in ("w", "wb"): with self.open(support.TESTFN, mode) as fp: - self.assertRaises(IOError, fp.read) - self.assertRaises(IOError, fp.readline) + self.assertRaises(exc, fp.read) + self.assertRaises(exc, fp.readline) + with self.open(support.TESTFN, "wb", buffering=0) as fp: + self.assertRaises(exc, fp.read) + self.assertRaises(exc, fp.readline) + with self.open(support.TESTFN, "rb", buffering=0) as fp: + self.assertRaises(exc, fp.write, b"blah") + self.assertRaises(exc, fp.writelines, [b"blah\n"]) with self.open(support.TESTFN, "rb") as fp: - self.assertRaises(IOError, fp.write, b"blah") - self.assertRaises(IOError, fp.writelines, [b"blah\n"]) + self.assertRaises(exc, fp.write, b"blah") + self.assertRaises(exc, fp.writelines, [b"blah\n"]) with self.open(support.TESTFN, "r") as fp: - self.assertRaises(IOError, fp.write, "blah") - self.assertRaises(IOError, fp.writelines, ["blah\n"]) + self.assertRaises(exc, fp.write, "blah") + self.assertRaises(exc, fp.writelines, ["blah\n"]) + # Non-zero seeking from current or end pos + self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) + self.assertRaises(exc, fp.seek, -1, self.SEEK_END) + + def test_open_handles_NUL_chars(self): + fn_with_NUL = 'foo\0bar' + self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') + self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') def test_raw_file_io(self): with self.open(support.TESTFN, "wb", buffering=0) as f: @@ -432,13 +475,14 @@ class IOTest(unittest.TestCase): def flush(self): record.append(3) super().flush() - f = MyFileIO(support.TESTFN, "wb") - f.write(b"xxx") - del f - support.gc_collect() - self.assertEqual(record, [1, 2, 3]) - with self.open(support.TESTFN, "rb") as f: - self.assertEqual(f.read(), b"xxx") + with support.check_warnings(('', ResourceWarning)): + f = MyFileIO(support.TESTFN, "wb") + f.write(b"xxx") + del f + support.gc_collect() + self.assertEqual(record, [1, 2, 3]) + with self.open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"xxx") def _check_base_destructor(self, base): record = [] @@ -490,7 +534,7 @@ class IOTest(unittest.TestCase): def test_array_writes(self): a = array.array('i', range(10)) - n = len(a.tostring()) + n = len(a.tobytes()) with self.open(support.TESTFN, "wb", 0) as f: self.assertEqual(f.write(a), n) with self.open(support.TESTFN, "wb") as f: @@ -525,12 +569,13 @@ class IOTest(unittest.TestCase): def test_garbage_collection(self): # FileIO objects are collected, and collecting them flushes # all data to disk. - f = self.FileIO(support.TESTFN, "wb") - f.write(b"abcxxx") - f.f = f - wr = weakref.ref(f) - del f - support.gc_collect() + with support.check_warnings(('', ResourceWarning)): + f = self.FileIO(support.TESTFN, "wb") + f.write(b"abcxxx") + f.f = f + wr = weakref.ref(f) + del f + support.gc_collect() self.assertTrue(wr() is None, wr) with self.open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"abcxxx") @@ -578,8 +623,36 @@ class IOTest(unittest.TestCase): self.assertEqual(rawio.read(2), None) self.assertEqual(rawio.read(2), b"") + def test_types_have_dict(self): + test = ( + self.IOBase(), + self.RawIOBase(), + self.TextIOBase(), + self.StringIO(), + self.BytesIO() + ) + for obj in test: + self.assertTrue(hasattr(obj, "__dict__")) + class CIOTest(IOTest): - pass + + def test_IOBase_finalize(self): + # Issue #12149: segmentation fault on _PyIOBase_finalize when both a + # class which inherits IOBase and an object of this class are caught + # in a reference cycle and close() is already in the method cache. + class MyIO(self.IOBase): + def close(self): + pass + + # create an instance to populate the method cache + MyIO() + obj = MyIO() + obj.obj = obj + wr = weakref.ref(obj) + del MyIO + del obj + support.gc_collect() + self.assertTrue(wr() is None, wr) class PyIOTest(IOTest): pass @@ -693,6 +766,11 @@ class CommonBufferedTests: b.close() self.assertRaises(ValueError, b.flush) + def test_unseekable(self): + bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) + self.assertRaises(self.UnsupportedOperation, bufio.tell) + self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) + def test_readonly_attributes(self): raw = self.MockRawIO() buf = self.tp(raw) @@ -813,6 +891,7 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(b"abcdefg", bufio.read()) + @unittest.skipUnless(threading, 'Threading required for this test.') @support.requires_resource('cpu') def test_threads(self): try: @@ -856,6 +935,14 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): finally: support.unlink(support.TESTFN) + def test_unseekable(self): + bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) + self.assertRaises(self.UnsupportedOperation, bufio.tell) + self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) + bufio.read(1) + self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) + self.assertRaises(self.UnsupportedOperation, bufio.tell) + def test_misbehaved_io(self): rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) bufio = self.tp(rawio) @@ -1081,6 +1168,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): with self.open(support.TESTFN, "rb", buffering=0) as f: self.assertEqual(f.read(), b"abc") + @unittest.skipUnless(threading, 'Threading required for this test.') @support.requires_resource('cpu') def test_threads(self): try: @@ -1139,14 +1227,9 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): self.assertRaises(IOError, bufio.write, b"abcdef") def test_max_buffer_size_deprecation(self): - with support.check_warnings() as w: - warnings.simplefilter("always", DeprecationWarning) + with support.check_warnings(("max_buffer_size is deprecated", + DeprecationWarning)): self.tp(self.MockRawIO(), 8, 12) - self.assertEqual(len(w.warnings), 1) - warning = w.warnings[0] - self.assertTrue(warning.category is DeprecationWarning) - self.assertEqual(str(warning.message), - "max_buffer_size is deprecated") class CBufferedWriterTest(BufferedWriterTest): @@ -1202,14 +1285,9 @@ class BufferedRWPairTest(unittest.TestCase): self.assertRaises(self.UnsupportedOperation, pair.detach) def test_constructor_max_buffer_size_deprecation(self): - with support.check_warnings() as w: - warnings.simplefilter("always", DeprecationWarning) + with support.check_warnings(("max_buffer_size is deprecated", + DeprecationWarning)): self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) - self.assertEqual(len(w.warnings), 1) - warning = w.warnings[0] - self.assertTrue(warning.category is DeprecationWarning) - self.assertEqual(str(warning.message), - "max_buffer_size is deprecated") def test_constructor_with_not_readable(self): class NotReadable(MockRawIO): @@ -1349,15 +1427,18 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): rw.seek(0, 0) self.assertEqual(b"asdf", rw.read(4)) - rw.write(b"asdf") + rw.write(b"123f") rw.seek(0, 0) - self.assertEqual(b"asdfasdfl", rw.read()) + self.assertEqual(b"asdf123fl", rw.read()) self.assertEqual(9, rw.tell()) rw.seek(-4, 2) self.assertEqual(5, rw.tell()) rw.seek(2, 1) self.assertEqual(7, rw.tell()) self.assertEqual(b"fl", rw.read(11)) + rw.flush() + self.assertEqual(b"asdf123fl", raw.getvalue()) + self.assertRaises(TypeError, rw.seek, 0.0) def check_flush_and_read(self, read_func): @@ -1502,6 +1583,46 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): BufferedReaderTest.test_misbehaved_io(self) BufferedWriterTest.test_misbehaved_io(self) + def test_interleaved_read_write(self): + # Test for issue #12213 + with self.BytesIO(b'abcdefgh') as raw: + with self.tp(raw, 100) as f: + f.write(b"1") + self.assertEqual(f.read(1), b'b') + f.write(b'2') + self.assertEqual(f.read1(1), b'd') + f.write(b'3') + buf = bytearray(1) + f.readinto(buf) + self.assertEqual(buf, b'f') + f.write(b'4') + self.assertEqual(f.peek(1), b'h') + f.flush() + self.assertEqual(raw.getvalue(), b'1b2d3f4h') + + with self.BytesIO(b'abc') as raw: + with self.tp(raw, 100) as f: + self.assertEqual(f.read(1), b'a') + f.write(b"2") + self.assertEqual(f.read(1), b'c') + f.flush() + self.assertEqual(raw.getvalue(), b'a2c') + + def test_interleaved_readline_write(self): + with self.BytesIO(b'ab\ncdef\ng\n') as raw: + with self.tp(raw) as f: + f.write(b'1') + self.assertEqual(f.readline(), b'b\n') + f.write(b'2') + self.assertEqual(f.readline(), b'def\n') + f.write(b'3') + self.assertEqual(f.readline(), b'\n') + f.flush() + self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') + + # You can't construct a BufferedRandom over a non-seekable stream. + test_unseekable = None + class CBufferedRandomTest(BufferedRandomTest): tp = io.BufferedRandom @@ -1714,9 +1835,12 @@ class TextIOWrapperTest(unittest.TestCase): raw.name = "dummy" self.assertEqual(repr(t), "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) + t.mode = "r" + self.assertEqual(repr(t), + "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) raw.name = b"dummy" self.assertEqual(repr(t), - "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname) + "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) def test_line_buffering(self): r = self.BytesIO() @@ -1990,26 +2114,24 @@ class TextIOWrapperTest(unittest.TestCase): u_suffix = "\u8888\n" suffix = bytes(u_suffix.encode("utf-8")) line = prefix + suffix - f = self.open(support.TESTFN, "wb") - f.write(line*2) - f.close() - f = self.open(support.TESTFN, "r", encoding="utf-8") - s = f.read(prefix_size) - self.assertEqual(s, str(prefix, "ascii")) - self.assertEqual(f.tell(), prefix_size) - self.assertEqual(f.readline(), u_suffix) + with self.open(support.TESTFN, "wb") as f: + f.write(line*2) + with self.open(support.TESTFN, "r", encoding="utf-8") as f: + s = f.read(prefix_size) + self.assertEqual(s, str(prefix, "ascii")) + self.assertEqual(f.tell(), prefix_size) + self.assertEqual(f.readline(), u_suffix) def test_seeking_too(self): # Regression test for a specific bug data = b'\xe0\xbf\xbf\n' - f = self.open(support.TESTFN, "wb") - f.write(data) - f.close() - f = self.open(support.TESTFN, "r", encoding="utf-8") - f._CHUNK_SIZE # Just test that it exists - f._CHUNK_SIZE = 2 - f.readline() - f.tell() + with self.open(support.TESTFN, "wb") as f: + f.write(data) + with self.open(support.TESTFN, "r", encoding="utf-8") as f: + f._CHUNK_SIZE # Just test that it exists + f._CHUNK_SIZE = 2 + f.readline() + f.tell() def test_seek_and_tell(self): #Test seek/tell using the StatefulIncrementalDecoder. @@ -2210,7 +2332,7 @@ class TextIOWrapperTest(unittest.TestCase): with self.open(support.TESTFN, "w", errors="replace") as f: self.assertEqual(f.errors, "replace") - + @unittest.skipUnless(threading, 'Threading required for this test.') def test_threads_write(self): # Issue6750: concurrent writes could duplicate data event = threading.Event() @@ -2246,12 +2368,38 @@ class TextIOWrapperTest(unittest.TestCase): txt.close() self.assertRaises(ValueError, txt.flush) + def test_unseekable(self): + txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata)) + self.assertRaises(self.UnsupportedOperation, txt.tell) + self.assertRaises(self.UnsupportedOperation, txt.seek, 0) + def test_readonly_attributes(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") buf = self.BytesIO(self.testdata) with self.assertRaises(AttributeError): txt.buffer = buf + def test_rawio(self): + # Issue #12591: TextIOWrapper must work with raw I/O objects, so + # that subprocess.Popen() can have the required unbuffered + # semantics with universal_newlines=True. + raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + # Reads + self.assertEqual(txt.read(4), 'abcd') + self.assertEqual(txt.readline(), 'efghi\n') + self.assertEqual(list(txt), ['jkl\n', 'opq\n']) + + def test_rawio_write_through(self): + # Issue #12591: with write_through=True, writes don't need a flush + raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', + write_through=True) + txt.write('1') + txt.write('23\n4') + txt.write('5') + self.assertEqual(b''.join(raw._write_stack), b'123\n45') + class CTextIOWrapperTest(TextIOWrapperTest): def test_initialization(self): @@ -2279,6 +2427,21 @@ class CTextIOWrapperTest(TextIOWrapperTest): with self.open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"456def") + def test_rwpair_cleared_before_textio(self): + # Issue 13070: TextIOWrapper's finalization would crash when called + # after the reference to the underlying BufferedRWPair's writer got + # cleared by the GC. + for i in range(1000): + b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) + t1 = self.TextIOWrapper(b1, encoding="ascii") + b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) + t2 = self.TextIOWrapper(b2, encoding="ascii") + # circular references + t1.buddy = t2 + t2.buddy = t1 + support.gc_collect() + + class PyTextIOWrapperTest(TextIOWrapperTest): pass @@ -2505,27 +2668,27 @@ class MiscIOTest(unittest.TestCase): def test_abcs(self): # Test the visible base classes are ABCs. - self.assertTrue(isinstance(self.IOBase, abc.ABCMeta)) - self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta)) - self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta)) - self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta)) + self.assertIsInstance(self.IOBase, abc.ABCMeta) + self.assertIsInstance(self.RawIOBase, abc.ABCMeta) + self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) + self.assertIsInstance(self.TextIOBase, abc.ABCMeta) def _check_abc_inheritance(self, abcmodule): with self.open(support.TESTFN, "wb", buffering=0) as f: - self.assertTrue(isinstance(f, abcmodule.IOBase)) - self.assertTrue(isinstance(f, abcmodule.RawIOBase)) - self.assertFalse(isinstance(f, abcmodule.BufferedIOBase)) - self.assertFalse(isinstance(f, abcmodule.TextIOBase)) + self.assertIsInstance(f, abcmodule.IOBase) + self.assertIsInstance(f, abcmodule.RawIOBase) + self.assertNotIsInstance(f, abcmodule.BufferedIOBase) + self.assertNotIsInstance(f, abcmodule.TextIOBase) with self.open(support.TESTFN, "wb") as f: - self.assertTrue(isinstance(f, abcmodule.IOBase)) - self.assertFalse(isinstance(f, abcmodule.RawIOBase)) - self.assertTrue(isinstance(f, abcmodule.BufferedIOBase)) - self.assertFalse(isinstance(f, abcmodule.TextIOBase)) + self.assertIsInstance(f, abcmodule.IOBase) + self.assertNotIsInstance(f, abcmodule.RawIOBase) + self.assertIsInstance(f, abcmodule.BufferedIOBase) + self.assertNotIsInstance(f, abcmodule.TextIOBase) with self.open(support.TESTFN, "w") as f: - self.assertTrue(isinstance(f, abcmodule.IOBase)) - self.assertFalse(isinstance(f, abcmodule.RawIOBase)) - self.assertFalse(isinstance(f, abcmodule.BufferedIOBase)) - self.assertTrue(isinstance(f, abcmodule.TextIOBase)) + self.assertIsInstance(f, abcmodule.IOBase) + self.assertNotIsInstance(f, abcmodule.RawIOBase) + self.assertNotIsInstance(f, abcmodule.BufferedIOBase) + self.assertIsInstance(f, abcmodule.TextIOBase) def test_abc_inheritance(self): # Test implementations inherit from their respective ABCs @@ -2536,6 +2699,125 @@ class MiscIOTest(unittest.TestCase): # baseline "io" module. self._check_abc_inheritance(io) + def _check_warn_on_dealloc(self, *args, **kwargs): + f = open(*args, **kwargs) + r = repr(f) + with self.assertWarns(ResourceWarning) as cm: + f = None + support.gc_collect() + self.assertIn(r, str(cm.warning.args[0])) + + def test_warn_on_dealloc(self): + self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0) + self._check_warn_on_dealloc(support.TESTFN, "wb") + self._check_warn_on_dealloc(support.TESTFN, "w") + + def _check_warn_on_dealloc_fd(self, *args, **kwargs): + fds = [] + def cleanup_fds(): + for fd in fds: + try: + os.close(fd) + except EnvironmentError as e: + if e.errno != errno.EBADF: + raise + self.addCleanup(cleanup_fds) + r, w = os.pipe() + fds += r, w + self._check_warn_on_dealloc(r, *args, **kwargs) + # When using closefd=False, there's no warning + r, w = os.pipe() + fds += r, w + with warnings.catch_warnings(record=True) as recorded: + open(r, *args, closefd=False, **kwargs) + support.gc_collect() + self.assertEqual(recorded, []) + + def test_warn_on_dealloc_fd(self): + self._check_warn_on_dealloc_fd("rb", buffering=0) + self._check_warn_on_dealloc_fd("rb") + self._check_warn_on_dealloc_fd("r") + + + def test_pickling(self): + # Pickling file objects is forbidden + for kwargs in [ + {"mode": "w"}, + {"mode": "wb"}, + {"mode": "wb", "buffering": 0}, + {"mode": "r"}, + {"mode": "rb"}, + {"mode": "rb", "buffering": 0}, + {"mode": "w+"}, + {"mode": "w+b"}, + {"mode": "w+b", "buffering": 0}, + ]: + for protocol in range(pickle.HIGHEST_PROTOCOL + 1): + with self.open(support.TESTFN, **kwargs) as f: + self.assertRaises(TypeError, pickle.dumps, f, protocol) + + @unittest.skipUnless(fcntl, 'fcntl required for this test') + def test_nonblock_pipe_write_bigbuf(self): + self._test_nonblock_pipe_write(16*1024) + + @unittest.skipUnless(fcntl, 'fcntl required for this test') + def test_nonblock_pipe_write_smallbuf(self): + self._test_nonblock_pipe_write(1024) + + def _set_non_blocking(self, fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + self.assertNotEqual(flags, -1) + res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + self.assertEqual(res, 0) + + def _test_nonblock_pipe_write(self, bufsize): + sent = [] + received = [] + r, w = os.pipe() + self._set_non_blocking(r) + self._set_non_blocking(w) + + # To exercise all code paths in the C implementation we need + # to play with buffer sizes. For instance, if we choose a + # buffer size less than or equal to _PIPE_BUF (4096 on Linux) + # then we will never get a partial write of the buffer. + rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) + wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) + + with rf, wf: + for N in 9999, 73, 7574: + try: + i = 0 + while True: + msg = bytes([i % 26 + 97]) * N + sent.append(msg) + wf.write(msg) + i += 1 + + except self.BlockingIOError as e: + self.assertEqual(e.args[0], errno.EAGAIN) + sent[-1] = sent[-1][:e.characters_written] + received.append(rf.read()) + msg = b'BLOCKED' + wf.write(msg) + sent.append(msg) + + while True: + try: + wf.flush() + break + except self.BlockingIOError as e: + self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.characters_written, 0) + received.append(rf.read()) + + received += iter(rf.read, None) + + sent, received = b''.join(sent), b''.join(received) + self.assertTrue(sent == received) + self.assertTrue(wf.closed) + self.assertTrue(rf.closed) + class CMiscIOTest(MiscIOTest): io = io @@ -2556,9 +2838,12 @@ class SignalsTest(unittest.TestCase): 1/0 @unittest.skipUnless(threading, 'Threading required for this test.') + @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), + 'issue #12429: skip test on FreeBSD <= 7') def check_interrupted_write(self, item, bytes, **fdopen_kwargs): """Check that a partial write, when it gets interrupted, properly - invokes the signal handler.""" + invokes the signal handler, and bubbles up the exception raised + in the latter.""" read_results = [] def _read(): s = os.read(r, 1) @@ -2566,6 +2851,7 @@ class SignalsTest(unittest.TestCase): t = threading.Thread(target=_read) t.daemon = True r, w = os.pipe() + fdopen_kwargs["closefd"] = False try: wio = self.io.open(w, **fdopen_kwargs) t.start() @@ -2623,6 +2909,9 @@ class SignalsTest(unittest.TestCase): wio.flush() # Make sure the buffer doesn't fill up and block further writes os.read(r, len(data) * 100) + exc = cm.exception + if isinstance(exc, RuntimeError): + self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) finally: wio.close() os.close(r) @@ -2633,6 +2922,98 @@ class SignalsTest(unittest.TestCase): def test_reentrant_write_text(self): self.check_reentrant_write("xy", mode="w", encoding="ascii") + def check_interrupted_read_retry(self, decode, **fdopen_kwargs): + """Check that a buffered read, when it gets interrupted (either + returning a partial result or EINTR), properly invokes the signal + handler and retries if the latter returned successfully.""" + r, w = os.pipe() + fdopen_kwargs["closefd"] = False + def alarm_handler(sig, frame): + os.write(w, b"bar") + signal.signal(signal.SIGALRM, alarm_handler) + try: + rio = self.io.open(r, **fdopen_kwargs) + os.write(w, b"foo") + signal.alarm(1) + # Expected behaviour: + # - first raw read() returns partial b"foo" + # - second raw read() returns EINTR + # - third raw read() returns b"bar" + self.assertEqual(decode(rio.read(6)), "foobar") + finally: + rio.close() + os.close(w) + os.close(r) + + def test_interrupted_read_retry_buffered(self): + self.check_interrupted_read_retry(lambda x: x.decode('latin1'), + mode="rb") + + def test_interrupted_read_retry_text(self): + self.check_interrupted_read_retry(lambda x: x, + mode="r") + + @unittest.skipUnless(threading, 'Threading required for this test.') + def check_interrupted_write_retry(self, item, **fdopen_kwargs): + """Check that a buffered write, when it gets interrupted (either + returning a partial result or EINTR), properly invokes the signal + handler and retries if the latter returned successfully.""" + select = support.import_module("select") + # A quantity that exceeds the buffer size of an anonymous pipe's + # write end. + N = 1024 * 1024 + r, w = os.pipe() + fdopen_kwargs["closefd"] = False + # We need a separate thread to read from the pipe and allow the + # write() to finish. This thread is started after the SIGALRM is + # received (forcing a first EINTR in write()). + read_results = [] + write_finished = False + def _read(): + while not write_finished: + while r in select.select([r], [], [], 1.0)[0]: + s = os.read(r, 1024) + read_results.append(s) + t = threading.Thread(target=_read) + t.daemon = True + def alarm1(sig, frame): + signal.signal(signal.SIGALRM, alarm2) + signal.alarm(1) + def alarm2(sig, frame): + t.start() + signal.signal(signal.SIGALRM, alarm1) + try: + wio = self.io.open(w, **fdopen_kwargs) + signal.alarm(1) + # Expected behaviour: + # - first raw write() is partial (because of the limited pipe buffer + # and the first alarm) + # - second raw write() returns EINTR (because of the second alarm) + # - subsequent write()s are successful (either partial or complete) + self.assertEqual(N, wio.write(item * N)) + wio.flush() + write_finished = True + t.join() + self.assertEqual(N, sum(len(x) for x in read_results)) + finally: + write_finished = True + os.close(w) + os.close(r) + # This is deliberate. If we didn't close the file descriptor + # before closing wio, wio would try to flush its internal + # buffer, and could block (in case of failure). + try: + wio.close() + except IOError as e: + if e.errno != errno.EBADF: + raise + + def test_interrupted_write_retry_buffered(self): + self.check_interrupted_write_retry(b"x", mode="wb") + + def test_interrupted_write_retry_text(self): + self.check_interrupted_write_retry("x", mode="w", encoding="latin1") + class CSignalsTest(SignalsTest): io = io @@ -2662,7 +3043,7 @@ def test_main(): # Put the namespaces of the IO module we are testing and some useful mock # classes in the __dict__ of each test. mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, - MockNonBlockWriterIO, MockRawIOWithoutRead) + MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead) all_members = io.__all__ + ["IncrementalNewlineDecoder"] c_io_ns = {name : getattr(io, name) for name in all_members} py_io_ns = {name : getattr(pyio, name) for name in all_members} |
