summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py543
1 files changed, 462 insertions, 81 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 5333bb6..ea82cea 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -23,23 +23,29 @@ import os
import sys
import time
import array
-import threading
import random
import unittest
-import warnings
import weakref
-import gc
import abc
import signal
import errno
-from itertools import chain, cycle, count
+import warnings
+import pickle
+from itertools import cycle, count
from collections import deque
from test import support
import codecs
import io # C implementation of io
import _pyio as pyio # Python implementation of io
-
+try:
+ import threading
+except ImportError:
+ threading = None
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
def _default_chunk_size():
"""Get the default TextIOWrapper chunk size"""
@@ -189,6 +195,23 @@ class PyMockFileIO(MockFileIO, pyio.BytesIO):
pass
+class MockUnseekableIO:
+ def seekable(self):
+ return False
+
+ def seek(self, *args):
+ raise self.UnsupportedOperation("not seekable")
+
+ def tell(self, *args):
+ raise self.UnsupportedOperation("not seekable")
+
+class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
+ UnsupportedOperation = io.UnsupportedOperation
+
+class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
+ UnsupportedOperation = pyio.UnsupportedOperation
+
+
class MockNonBlockWriterIO:
def __init__(self):
@@ -222,9 +245,14 @@ class MockNonBlockWriterIO:
except ValueError:
pass
else:
- self._blocker_char = None
- self._write_stack.append(b[:n])
- raise self.BlockingIOError(0, "test blocking", n)
+ if n > 0:
+ # write data up to the first blocker
+ self._write_stack.append(b[:n])
+ return n
+ else:
+ # cancel blocker and indicate would block
+ self._blocker_char = None
+ return None
self._write_stack.append(b)
return len(b)
@@ -314,16 +342,31 @@ class IOTest(unittest.TestCase):
def test_invalid_operations(self):
# Try writing on a file opened in read mode and vice-versa.
+ exc = self.UnsupportedOperation
for mode in ("w", "wb"):
with self.open(support.TESTFN, mode) as fp:
- self.assertRaises(IOError, fp.read)
- self.assertRaises(IOError, fp.readline)
+ self.assertRaises(exc, fp.read)
+ self.assertRaises(exc, fp.readline)
+ with self.open(support.TESTFN, "wb", buffering=0) as fp:
+ self.assertRaises(exc, fp.read)
+ self.assertRaises(exc, fp.readline)
+ with self.open(support.TESTFN, "rb", buffering=0) as fp:
+ self.assertRaises(exc, fp.write, b"blah")
+ self.assertRaises(exc, fp.writelines, [b"blah\n"])
with self.open(support.TESTFN, "rb") as fp:
- self.assertRaises(IOError, fp.write, b"blah")
- self.assertRaises(IOError, fp.writelines, [b"blah\n"])
+ self.assertRaises(exc, fp.write, b"blah")
+ self.assertRaises(exc, fp.writelines, [b"blah\n"])
with self.open(support.TESTFN, "r") as fp:
- self.assertRaises(IOError, fp.write, "blah")
- self.assertRaises(IOError, fp.writelines, ["blah\n"])
+ self.assertRaises(exc, fp.write, "blah")
+ self.assertRaises(exc, fp.writelines, ["blah\n"])
+ # Non-zero seeking from current or end pos
+ self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
+ self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
+
+ def test_open_handles_NUL_chars(self):
+ fn_with_NUL = 'foo\0bar'
+ self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
+ self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
def test_raw_file_io(self):
with self.open(support.TESTFN, "wb", buffering=0) as f:
@@ -432,13 +475,14 @@ class IOTest(unittest.TestCase):
def flush(self):
record.append(3)
super().flush()
- f = MyFileIO(support.TESTFN, "wb")
- f.write(b"xxx")
- del f
- support.gc_collect()
- self.assertEqual(record, [1, 2, 3])
- with self.open(support.TESTFN, "rb") as f:
- self.assertEqual(f.read(), b"xxx")
+ with support.check_warnings(('', ResourceWarning)):
+ f = MyFileIO(support.TESTFN, "wb")
+ f.write(b"xxx")
+ del f
+ support.gc_collect()
+ self.assertEqual(record, [1, 2, 3])
+ with self.open(support.TESTFN, "rb") as f:
+ self.assertEqual(f.read(), b"xxx")
def _check_base_destructor(self, base):
record = []
@@ -490,7 +534,7 @@ class IOTest(unittest.TestCase):
def test_array_writes(self):
a = array.array('i', range(10))
- n = len(a.tostring())
+ n = len(a.tobytes())
with self.open(support.TESTFN, "wb", 0) as f:
self.assertEqual(f.write(a), n)
with self.open(support.TESTFN, "wb") as f:
@@ -525,12 +569,13 @@ class IOTest(unittest.TestCase):
def test_garbage_collection(self):
# FileIO objects are collected, and collecting them flushes
# all data to disk.
- f = self.FileIO(support.TESTFN, "wb")
- f.write(b"abcxxx")
- f.f = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
+ with support.check_warnings(('', ResourceWarning)):
+ f = self.FileIO(support.TESTFN, "wb")
+ f.write(b"abcxxx")
+ f.f = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"abcxxx")
@@ -578,8 +623,36 @@ class IOTest(unittest.TestCase):
self.assertEqual(rawio.read(2), None)
self.assertEqual(rawio.read(2), b"")
+ def test_types_have_dict(self):
+ test = (
+ self.IOBase(),
+ self.RawIOBase(),
+ self.TextIOBase(),
+ self.StringIO(),
+ self.BytesIO()
+ )
+ for obj in test:
+ self.assertTrue(hasattr(obj, "__dict__"))
+
class CIOTest(IOTest):
- pass
+
+ def test_IOBase_finalize(self):
+ # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
+ # class which inherits IOBase and an object of this class are caught
+ # in a reference cycle and close() is already in the method cache.
+ class MyIO(self.IOBase):
+ def close(self):
+ pass
+
+ # create an instance to populate the method cache
+ MyIO()
+ obj = MyIO()
+ obj.obj = obj
+ wr = weakref.ref(obj)
+ del MyIO
+ del obj
+ support.gc_collect()
+ self.assertTrue(wr() is None, wr)
class PyIOTest(IOTest):
pass
@@ -693,6 +766,11 @@ class CommonBufferedTests:
b.close()
self.assertRaises(ValueError, b.flush)
+ def test_unseekable(self):
+ bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
+ self.assertRaises(self.UnsupportedOperation, bufio.tell)
+ self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
+
def test_readonly_attributes(self):
raw = self.MockRawIO()
buf = self.tp(raw)
@@ -813,6 +891,7 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
self.assertEqual(b"abcdefg", bufio.read())
+ @unittest.skipUnless(threading, 'Threading required for this test.')
@support.requires_resource('cpu')
def test_threads(self):
try:
@@ -856,6 +935,14 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
finally:
support.unlink(support.TESTFN)
+ def test_unseekable(self):
+ bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
+ self.assertRaises(self.UnsupportedOperation, bufio.tell)
+ self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
+ bufio.read(1)
+ self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
+ self.assertRaises(self.UnsupportedOperation, bufio.tell)
+
def test_misbehaved_io(self):
rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
bufio = self.tp(rawio)
@@ -1081,6 +1168,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
with self.open(support.TESTFN, "rb", buffering=0) as f:
self.assertEqual(f.read(), b"abc")
+ @unittest.skipUnless(threading, 'Threading required for this test.')
@support.requires_resource('cpu')
def test_threads(self):
try:
@@ -1139,14 +1227,9 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
self.assertRaises(IOError, bufio.write, b"abcdef")
def test_max_buffer_size_deprecation(self):
- with support.check_warnings() as w:
- warnings.simplefilter("always", DeprecationWarning)
+ with support.check_warnings(("max_buffer_size is deprecated",
+ DeprecationWarning)):
self.tp(self.MockRawIO(), 8, 12)
- self.assertEqual(len(w.warnings), 1)
- warning = w.warnings[0]
- self.assertTrue(warning.category is DeprecationWarning)
- self.assertEqual(str(warning.message),
- "max_buffer_size is deprecated")
class CBufferedWriterTest(BufferedWriterTest):
@@ -1202,14 +1285,9 @@ class BufferedRWPairTest(unittest.TestCase):
self.assertRaises(self.UnsupportedOperation, pair.detach)
def test_constructor_max_buffer_size_deprecation(self):
- with support.check_warnings() as w:
- warnings.simplefilter("always", DeprecationWarning)
+ with support.check_warnings(("max_buffer_size is deprecated",
+ DeprecationWarning)):
self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
- self.assertEqual(len(w.warnings), 1)
- warning = w.warnings[0]
- self.assertTrue(warning.category is DeprecationWarning)
- self.assertEqual(str(warning.message),
- "max_buffer_size is deprecated")
def test_constructor_with_not_readable(self):
class NotReadable(MockRawIO):
@@ -1349,15 +1427,18 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
rw.seek(0, 0)
self.assertEqual(b"asdf", rw.read(4))
- rw.write(b"asdf")
+ rw.write(b"123f")
rw.seek(0, 0)
- self.assertEqual(b"asdfasdfl", rw.read())
+ self.assertEqual(b"asdf123fl", rw.read())
self.assertEqual(9, rw.tell())
rw.seek(-4, 2)
self.assertEqual(5, rw.tell())
rw.seek(2, 1)
self.assertEqual(7, rw.tell())
self.assertEqual(b"fl", rw.read(11))
+ rw.flush()
+ self.assertEqual(b"asdf123fl", raw.getvalue())
+
self.assertRaises(TypeError, rw.seek, 0.0)
def check_flush_and_read(self, read_func):
@@ -1502,6 +1583,46 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
BufferedReaderTest.test_misbehaved_io(self)
BufferedWriterTest.test_misbehaved_io(self)
+ def test_interleaved_read_write(self):
+ # Test for issue #12213
+ with self.BytesIO(b'abcdefgh') as raw:
+ with self.tp(raw, 100) as f:
+ f.write(b"1")
+ self.assertEqual(f.read(1), b'b')
+ f.write(b'2')
+ self.assertEqual(f.read1(1), b'd')
+ f.write(b'3')
+ buf = bytearray(1)
+ f.readinto(buf)
+ self.assertEqual(buf, b'f')
+ f.write(b'4')
+ self.assertEqual(f.peek(1), b'h')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'1b2d3f4h')
+
+ with self.BytesIO(b'abc') as raw:
+ with self.tp(raw, 100) as f:
+ self.assertEqual(f.read(1), b'a')
+ f.write(b"2")
+ self.assertEqual(f.read(1), b'c')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'a2c')
+
+ def test_interleaved_readline_write(self):
+ with self.BytesIO(b'ab\ncdef\ng\n') as raw:
+ with self.tp(raw) as f:
+ f.write(b'1')
+ self.assertEqual(f.readline(), b'b\n')
+ f.write(b'2')
+ self.assertEqual(f.readline(), b'def\n')
+ f.write(b'3')
+ self.assertEqual(f.readline(), b'\n')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
+
+ # You can't construct a BufferedRandom over a non-seekable stream.
+ test_unseekable = None
+
class CBufferedRandomTest(BufferedRandomTest):
tp = io.BufferedRandom
@@ -1714,9 +1835,12 @@ class TextIOWrapperTest(unittest.TestCase):
raw.name = "dummy"
self.assertEqual(repr(t),
"<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
+ t.mode = "r"
+ self.assertEqual(repr(t),
+ "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
raw.name = b"dummy"
self.assertEqual(repr(t),
- "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
+ "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
def test_line_buffering(self):
r = self.BytesIO()
@@ -1990,26 +2114,24 @@ class TextIOWrapperTest(unittest.TestCase):
u_suffix = "\u8888\n"
suffix = bytes(u_suffix.encode("utf-8"))
line = prefix + suffix
- f = self.open(support.TESTFN, "wb")
- f.write(line*2)
- f.close()
- f = self.open(support.TESTFN, "r", encoding="utf-8")
- s = f.read(prefix_size)
- self.assertEqual(s, str(prefix, "ascii"))
- self.assertEqual(f.tell(), prefix_size)
- self.assertEqual(f.readline(), u_suffix)
+ with self.open(support.TESTFN, "wb") as f:
+ f.write(line*2)
+ with self.open(support.TESTFN, "r", encoding="utf-8") as f:
+ s = f.read(prefix_size)
+ self.assertEqual(s, str(prefix, "ascii"))
+ self.assertEqual(f.tell(), prefix_size)
+ self.assertEqual(f.readline(), u_suffix)
def test_seeking_too(self):
# Regression test for a specific bug
data = b'\xe0\xbf\xbf\n'
- f = self.open(support.TESTFN, "wb")
- f.write(data)
- f.close()
- f = self.open(support.TESTFN, "r", encoding="utf-8")
- f._CHUNK_SIZE # Just test that it exists
- f._CHUNK_SIZE = 2
- f.readline()
- f.tell()
+ with self.open(support.TESTFN, "wb") as f:
+ f.write(data)
+ with self.open(support.TESTFN, "r", encoding="utf-8") as f:
+ f._CHUNK_SIZE # Just test that it exists
+ f._CHUNK_SIZE = 2
+ f.readline()
+ f.tell()
def test_seek_and_tell(self):
#Test seek/tell using the StatefulIncrementalDecoder.
@@ -2210,7 +2332,7 @@ class TextIOWrapperTest(unittest.TestCase):
with self.open(support.TESTFN, "w", errors="replace") as f:
self.assertEqual(f.errors, "replace")
-
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
event = threading.Event()
@@ -2246,12 +2368,38 @@ class TextIOWrapperTest(unittest.TestCase):
txt.close()
self.assertRaises(ValueError, txt.flush)
+ def test_unseekable(self):
+ txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
+ self.assertRaises(self.UnsupportedOperation, txt.tell)
+ self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
+
def test_readonly_attributes(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
buf = self.BytesIO(self.testdata)
with self.assertRaises(AttributeError):
txt.buffer = buf
+ def test_rawio(self):
+ # Issue #12591: TextIOWrapper must work with raw I/O objects, so
+ # that subprocess.Popen() can have the required unbuffered
+ # semantics with universal_newlines=True.
+ raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ # Reads
+ self.assertEqual(txt.read(4), 'abcd')
+ self.assertEqual(txt.readline(), 'efghi\n')
+ self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
+
+ def test_rawio_write_through(self):
+ # Issue #12591: with write_through=True, writes don't need a flush
+ raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
+ write_through=True)
+ txt.write('1')
+ txt.write('23\n4')
+ txt.write('5')
+ self.assertEqual(b''.join(raw._write_stack), b'123\n45')
+
class CTextIOWrapperTest(TextIOWrapperTest):
def test_initialization(self):
@@ -2279,6 +2427,21 @@ class CTextIOWrapperTest(TextIOWrapperTest):
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"456def")
+ def test_rwpair_cleared_before_textio(self):
+ # Issue 13070: TextIOWrapper's finalization would crash when called
+ # after the reference to the underlying BufferedRWPair's writer got
+ # cleared by the GC.
+ for i in range(1000):
+ b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
+ t1 = self.TextIOWrapper(b1, encoding="ascii")
+ b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
+ t2 = self.TextIOWrapper(b2, encoding="ascii")
+ # circular references
+ t1.buddy = t2
+ t2.buddy = t1
+ support.gc_collect()
+
+
class PyTextIOWrapperTest(TextIOWrapperTest):
pass
@@ -2505,27 +2668,27 @@ class MiscIOTest(unittest.TestCase):
def test_abcs(self):
# Test the visible base classes are ABCs.
- self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
- self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
- self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
- self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
+ self.assertIsInstance(self.IOBase, abc.ABCMeta)
+ self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
+ self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
+ self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
def _check_abc_inheritance(self, abcmodule):
with self.open(support.TESTFN, "wb", buffering=0) as f:
- self.assertTrue(isinstance(f, abcmodule.IOBase))
- self.assertTrue(isinstance(f, abcmodule.RawIOBase))
- self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
- self.assertFalse(isinstance(f, abcmodule.TextIOBase))
+ self.assertIsInstance(f, abcmodule.IOBase)
+ self.assertIsInstance(f, abcmodule.RawIOBase)
+ self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
+ self.assertNotIsInstance(f, abcmodule.TextIOBase)
with self.open(support.TESTFN, "wb") as f:
- self.assertTrue(isinstance(f, abcmodule.IOBase))
- self.assertFalse(isinstance(f, abcmodule.RawIOBase))
- self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
- self.assertFalse(isinstance(f, abcmodule.TextIOBase))
+ self.assertIsInstance(f, abcmodule.IOBase)
+ self.assertNotIsInstance(f, abcmodule.RawIOBase)
+ self.assertIsInstance(f, abcmodule.BufferedIOBase)
+ self.assertNotIsInstance(f, abcmodule.TextIOBase)
with self.open(support.TESTFN, "w") as f:
- self.assertTrue(isinstance(f, abcmodule.IOBase))
- self.assertFalse(isinstance(f, abcmodule.RawIOBase))
- self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
- self.assertTrue(isinstance(f, abcmodule.TextIOBase))
+ self.assertIsInstance(f, abcmodule.IOBase)
+ self.assertNotIsInstance(f, abcmodule.RawIOBase)
+ self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
+ self.assertIsInstance(f, abcmodule.TextIOBase)
def test_abc_inheritance(self):
# Test implementations inherit from their respective ABCs
@@ -2536,6 +2699,125 @@ class MiscIOTest(unittest.TestCase):
# baseline "io" module.
self._check_abc_inheritance(io)
+ def _check_warn_on_dealloc(self, *args, **kwargs):
+ f = open(*args, **kwargs)
+ r = repr(f)
+ with self.assertWarns(ResourceWarning) as cm:
+ f = None
+ support.gc_collect()
+ self.assertIn(r, str(cm.warning.args[0]))
+
+ def test_warn_on_dealloc(self):
+ self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
+ self._check_warn_on_dealloc(support.TESTFN, "wb")
+ self._check_warn_on_dealloc(support.TESTFN, "w")
+
+ def _check_warn_on_dealloc_fd(self, *args, **kwargs):
+ fds = []
+ def cleanup_fds():
+ for fd in fds:
+ try:
+ os.close(fd)
+ except EnvironmentError as e:
+ if e.errno != errno.EBADF:
+ raise
+ self.addCleanup(cleanup_fds)
+ r, w = os.pipe()
+ fds += r, w
+ self._check_warn_on_dealloc(r, *args, **kwargs)
+ # When using closefd=False, there's no warning
+ r, w = os.pipe()
+ fds += r, w
+ with warnings.catch_warnings(record=True) as recorded:
+ open(r, *args, closefd=False, **kwargs)
+ support.gc_collect()
+ self.assertEqual(recorded, [])
+
+ def test_warn_on_dealloc_fd(self):
+ self._check_warn_on_dealloc_fd("rb", buffering=0)
+ self._check_warn_on_dealloc_fd("rb")
+ self._check_warn_on_dealloc_fd("r")
+
+
+ def test_pickling(self):
+ # Pickling file objects is forbidden
+ for kwargs in [
+ {"mode": "w"},
+ {"mode": "wb"},
+ {"mode": "wb", "buffering": 0},
+ {"mode": "r"},
+ {"mode": "rb"},
+ {"mode": "rb", "buffering": 0},
+ {"mode": "w+"},
+ {"mode": "w+b"},
+ {"mode": "w+b", "buffering": 0},
+ ]:
+ for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.open(support.TESTFN, **kwargs) as f:
+ self.assertRaises(TypeError, pickle.dumps, f, protocol)
+
+ @unittest.skipUnless(fcntl, 'fcntl required for this test')
+ def test_nonblock_pipe_write_bigbuf(self):
+ self._test_nonblock_pipe_write(16*1024)
+
+ @unittest.skipUnless(fcntl, 'fcntl required for this test')
+ def test_nonblock_pipe_write_smallbuf(self):
+ self._test_nonblock_pipe_write(1024)
+
+ def _set_non_blocking(self, fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ self.assertNotEqual(flags, -1)
+ res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+ self.assertEqual(res, 0)
+
+ def _test_nonblock_pipe_write(self, bufsize):
+ sent = []
+ received = []
+ r, w = os.pipe()
+ self._set_non_blocking(r)
+ self._set_non_blocking(w)
+
+ # To exercise all code paths in the C implementation we need
+ # to play with buffer sizes. For instance, if we choose a
+ # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
+ # then we will never get a partial write of the buffer.
+ rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
+ wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
+
+ with rf, wf:
+ for N in 9999, 73, 7574:
+ try:
+ i = 0
+ while True:
+ msg = bytes([i % 26 + 97]) * N
+ sent.append(msg)
+ wf.write(msg)
+ i += 1
+
+ except self.BlockingIOError as e:
+ self.assertEqual(e.args[0], errno.EAGAIN)
+ sent[-1] = sent[-1][:e.characters_written]
+ received.append(rf.read())
+ msg = b'BLOCKED'
+ wf.write(msg)
+ sent.append(msg)
+
+ while True:
+ try:
+ wf.flush()
+ break
+ except self.BlockingIOError as e:
+ self.assertEqual(e.args[0], errno.EAGAIN)
+ self.assertEqual(e.characters_written, 0)
+ received.append(rf.read())
+
+ received += iter(rf.read, None)
+
+ sent, received = b''.join(sent), b''.join(received)
+ self.assertTrue(sent == received)
+ self.assertTrue(wf.closed)
+ self.assertTrue(rf.closed)
+
class CMiscIOTest(MiscIOTest):
io = io
@@ -2556,9 +2838,12 @@ class SignalsTest(unittest.TestCase):
1/0
@unittest.skipUnless(threading, 'Threading required for this test.')
+ @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
+ 'issue #12429: skip test on FreeBSD <= 7')
def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
"""Check that a partial write, when it gets interrupted, properly
- invokes the signal handler."""
+ invokes the signal handler, and bubbles up the exception raised
+ in the latter."""
read_results = []
def _read():
s = os.read(r, 1)
@@ -2566,6 +2851,7 @@ class SignalsTest(unittest.TestCase):
t = threading.Thread(target=_read)
t.daemon = True
r, w = os.pipe()
+ fdopen_kwargs["closefd"] = False
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
@@ -2623,6 +2909,9 @@ class SignalsTest(unittest.TestCase):
wio.flush()
# Make sure the buffer doesn't fill up and block further writes
os.read(r, len(data) * 100)
+ exc = cm.exception
+ if isinstance(exc, RuntimeError):
+ self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
finally:
wio.close()
os.close(r)
@@ -2633,6 +2922,98 @@ class SignalsTest(unittest.TestCase):
def test_reentrant_write_text(self):
self.check_reentrant_write("xy", mode="w", encoding="ascii")
+ def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
+ """Check that a buffered read, when it gets interrupted (either
+ returning a partial result or EINTR), properly invokes the signal
+ handler and retries if the latter returned successfully."""
+ r, w = os.pipe()
+ fdopen_kwargs["closefd"] = False
+ def alarm_handler(sig, frame):
+ os.write(w, b"bar")
+ signal.signal(signal.SIGALRM, alarm_handler)
+ try:
+ rio = self.io.open(r, **fdopen_kwargs)
+ os.write(w, b"foo")
+ signal.alarm(1)
+ # Expected behaviour:
+ # - first raw read() returns partial b"foo"
+ # - second raw read() returns EINTR
+ # - third raw read() returns b"bar"
+ self.assertEqual(decode(rio.read(6)), "foobar")
+ finally:
+ rio.close()
+ os.close(w)
+ os.close(r)
+
+ def test_interrupted_read_retry_buffered(self):
+ self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
+ mode="rb")
+
+ def test_interrupted_read_retry_text(self):
+ self.check_interrupted_read_retry(lambda x: x,
+ mode="r")
+
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def check_interrupted_write_retry(self, item, **fdopen_kwargs):
+ """Check that a buffered write, when it gets interrupted (either
+ returning a partial result or EINTR), properly invokes the signal
+ handler and retries if the latter returned successfully."""
+ select = support.import_module("select")
+ # A quantity that exceeds the buffer size of an anonymous pipe's
+ # write end.
+ N = 1024 * 1024
+ r, w = os.pipe()
+ fdopen_kwargs["closefd"] = False
+ # We need a separate thread to read from the pipe and allow the
+ # write() to finish. This thread is started after the SIGALRM is
+ # received (forcing a first EINTR in write()).
+ read_results = []
+ write_finished = False
+ def _read():
+ while not write_finished:
+ while r in select.select([r], [], [], 1.0)[0]:
+ s = os.read(r, 1024)
+ read_results.append(s)
+ t = threading.Thread(target=_read)
+ t.daemon = True
+ def alarm1(sig, frame):
+ signal.signal(signal.SIGALRM, alarm2)
+ signal.alarm(1)
+ def alarm2(sig, frame):
+ t.start()
+ signal.signal(signal.SIGALRM, alarm1)
+ try:
+ wio = self.io.open(w, **fdopen_kwargs)
+ signal.alarm(1)
+ # Expected behaviour:
+ # - first raw write() is partial (because of the limited pipe buffer
+ # and the first alarm)
+ # - second raw write() returns EINTR (because of the second alarm)
+ # - subsequent write()s are successful (either partial or complete)
+ self.assertEqual(N, wio.write(item * N))
+ wio.flush()
+ write_finished = True
+ t.join()
+ self.assertEqual(N, sum(len(x) for x in read_results))
+ finally:
+ write_finished = True
+ os.close(w)
+ os.close(r)
+ # This is deliberate. If we didn't close the file descriptor
+ # before closing wio, wio would try to flush its internal
+ # buffer, and could block (in case of failure).
+ try:
+ wio.close()
+ except IOError as e:
+ if e.errno != errno.EBADF:
+ raise
+
+ def test_interrupted_write_retry_buffered(self):
+ self.check_interrupted_write_retry(b"x", mode="wb")
+
+ def test_interrupted_write_retry_text(self):
+ self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
+
class CSignalsTest(SignalsTest):
io = io
@@ -2662,7 +3043,7 @@ def test_main():
# Put the namespaces of the IO module we are testing and some useful mock
# classes in the __dict__ of each test.
mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
- MockNonBlockWriterIO, MockRawIOWithoutRead)
+ MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
all_members = io.__all__ + ["IncrementalNewlineDecoder"]
c_io_ns = {name : getattr(io, name) for name in all_members}
py_io_ns = {name : getattr(pyio, name) for name in all_members}