summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py379
1 files changed, 323 insertions, 56 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 86cf8e1..5111882 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -35,7 +35,7 @@ import weakref
from collections import deque, UserList
from itertools import cycle, count
from test import support
-from test.script_helper import assert_python_ok, run_python_until_end
+from test.support.script_helper import assert_python_ok, run_python_until_end
import codecs
import io # C implementation of io
@@ -44,10 +44,22 @@ try:
import threading
except ImportError:
threading = None
+
try:
- import fcntl
+ import ctypes
except ImportError:
- fcntl = None
+ def byteslike(*pos, **kw):
+ return array.array("b", bytes(*pos, **kw))
+else:
+ def byteslike(*pos, **kw):
+ """Create a bytes-like object having no string or sequence methods"""
+ data = bytes(*pos, **kw)
+ obj = EmptyStruct()
+ ctypes.resize(obj, len(data))
+ memoryview(obj).cast("B")[:] = data
+ return obj
+ class EmptyStruct(ctypes.Structure):
+ pass
def _default_chunk_size():
"""Get the default TextIOWrapper chunk size"""
@@ -207,6 +219,9 @@ class MockUnseekableIO:
def tell(self, *args):
raise self.UnsupportedOperation("not seekable")
+ def truncate(self, *args):
+ raise self.UnsupportedOperation("not seekable")
+
class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
UnsupportedOperation = io.UnsupportedOperation
@@ -285,7 +300,9 @@ class IOTest(unittest.TestCase):
self.assertEqual(f.tell(), 6)
self.assertEqual(f.seek(-1, 1), 5)
self.assertEqual(f.tell(), 5)
- self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
+ buffer = bytearray(b" world\n\n\n")
+ self.assertEqual(f.write(buffer), 9)
+ buffer[:] = b"*" * 9 # Overwrite our copy of the data
self.assertEqual(f.seek(0), 0)
self.assertEqual(f.write(b"h"), 1)
self.assertEqual(f.seek(-1, 2), 13)
@@ -298,20 +315,21 @@ class IOTest(unittest.TestCase):
def read_ops(self, f, buffered=False):
data = f.read(5)
self.assertEqual(data, b"hello")
- data = bytearray(data)
+ data = byteslike(data)
self.assertEqual(f.readinto(data), 5)
- self.assertEqual(data, b" worl")
+ self.assertEqual(bytes(data), b" worl")
+ data = bytearray(5)
self.assertEqual(f.readinto(data), 2)
self.assertEqual(len(data), 5)
self.assertEqual(data[:2], b"d\n")
self.assertEqual(f.seek(0), 0)
self.assertEqual(f.read(20), b"hello world\n")
self.assertEqual(f.read(1), b"")
- self.assertEqual(f.readinto(bytearray(b"x")), 0)
+ self.assertEqual(f.readinto(byteslike(b"x")), 0)
self.assertEqual(f.seek(-6, 2), 6)
self.assertEqual(f.read(5), b"world")
self.assertEqual(f.read(0), b"")
- self.assertEqual(f.readinto(bytearray()), 0)
+ self.assertEqual(f.readinto(byteslike()), 0)
self.assertEqual(f.seek(-6, 1), 5)
self.assertEqual(f.read(5), b" worl")
self.assertEqual(f.tell(), 10)
@@ -322,6 +340,10 @@ class IOTest(unittest.TestCase):
f.seek(6)
self.assertEqual(f.read(), b"world\n")
self.assertEqual(f.read(), b"")
+ f.seek(0)
+ data = byteslike(5)
+ self.assertEqual(f.readinto1(data), 5)
+ self.assertEqual(bytes(data), b"hello")
LARGE = 2**31
@@ -365,10 +387,116 @@ class IOTest(unittest.TestCase):
self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
+ def test_optional_abilities(self):
+ # Test for OSError when optional APIs are not supported
+ # The purpose of this test is to try fileno(), reading, writing and
+ # seeking operations with various objects that indicate they do not
+ # support these operations.
+
+ def pipe_reader():
+ [r, w] = os.pipe()
+ os.close(w) # So that read() is harmless
+ return self.FileIO(r, "r")
+
+ def pipe_writer():
+ [r, w] = os.pipe()
+ self.addCleanup(os.close, r)
+ # Guarantee that we can write into the pipe without blocking
+ thread = threading.Thread(target=os.read, args=(r, 100))
+ thread.start()
+ self.addCleanup(thread.join)
+ return self.FileIO(w, "w")
+
+ def buffered_reader():
+ return self.BufferedReader(self.MockUnseekableIO())
+
+ def buffered_writer():
+ return self.BufferedWriter(self.MockUnseekableIO())
+
+ def buffered_random():
+ return self.BufferedRandom(self.BytesIO())
+
+ def buffered_rw_pair():
+ return self.BufferedRWPair(self.MockUnseekableIO(),
+ self.MockUnseekableIO())
+
+ def text_reader():
+ class UnseekableReader(self.MockUnseekableIO):
+ writable = self.BufferedIOBase.writable
+ write = self.BufferedIOBase.write
+ return self.TextIOWrapper(UnseekableReader(), "ascii")
+
+ def text_writer():
+ class UnseekableWriter(self.MockUnseekableIO):
+ readable = self.BufferedIOBase.readable
+ read = self.BufferedIOBase.read
+ return self.TextIOWrapper(UnseekableWriter(), "ascii")
+
+ tests = (
+ (pipe_reader, "fr"), (pipe_writer, "fw"),
+ (buffered_reader, "r"), (buffered_writer, "w"),
+ (buffered_random, "rws"), (buffered_rw_pair, "rw"),
+ (text_reader, "r"), (text_writer, "w"),
+ (self.BytesIO, "rws"), (self.StringIO, "rws"),
+ )
+ for [test, abilities] in tests:
+ if test is pipe_writer and not threading:
+ continue # Skip subtest that uses a background thread
+ with self.subTest(test), test() as obj:
+ readable = "r" in abilities
+ self.assertEqual(obj.readable(), readable)
+ writable = "w" in abilities
+ self.assertEqual(obj.writable(), writable)
+
+ if isinstance(obj, self.TextIOBase):
+ data = "3"
+ elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
+ data = b"3"
+ else:
+ self.fail("Unknown base class")
+
+ if "f" in abilities:
+ obj.fileno()
+ else:
+ self.assertRaises(OSError, obj.fileno)
+
+ if readable:
+ obj.read(1)
+ obj.read()
+ else:
+ self.assertRaises(OSError, obj.read, 1)
+ self.assertRaises(OSError, obj.read)
+
+ if writable:
+ obj.write(data)
+ else:
+ self.assertRaises(OSError, obj.write, data)
+
+ if sys.platform.startswith("win") and test in (
+ pipe_reader, pipe_writer):
+ # Pipes seem to appear as seekable on Windows
+ continue
+ seekable = "s" in abilities
+ self.assertEqual(obj.seekable(), seekable)
+
+ if seekable:
+ obj.tell()
+ obj.seek(0)
+ else:
+ self.assertRaises(OSError, obj.tell)
+ self.assertRaises(OSError, obj.seek, 0)
+
+ if writable and seekable:
+ obj.truncate()
+ obj.truncate(0)
+ else:
+ self.assertRaises(OSError, obj.truncate)
+ self.assertRaises(OSError, obj.truncate, 0)
+
def test_open_handles_NUL_chars(self):
fn_with_NUL = 'foo\0bar'
- self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
- self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
+ self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
+ self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
def test_raw_file_io(self):
with self.open(support.TESTFN, "wb", buffering=0) as f:
@@ -532,10 +660,15 @@ class IOTest(unittest.TestCase):
def test_array_writes(self):
a = array.array('i', range(10))
n = len(a.tobytes())
- with self.open(support.TESTFN, "wb", 0) as f:
- self.assertEqual(f.write(a), n)
- with self.open(support.TESTFN, "wb") as f:
- self.assertEqual(f.write(a), n)
+ def check(f):
+ with f:
+ self.assertEqual(f.write(a), n)
+ f.writelines((a,))
+ check(self.BytesIO())
+ check(self.FileIO(support.TESTFN, "w"))
+ check(self.BufferedWriter(self.MockRawIO()))
+ check(self.BufferedRandom(self.MockRawIO()))
+ check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
def test_closefd(self):
self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
@@ -672,6 +805,22 @@ class IOTest(unittest.TestCase):
with self.open("non-existent", "r", opener=opener) as f:
self.assertEqual(f.read(), "egg\n")
+ def test_bad_opener_negative_1(self):
+ # Issue #27066.
+ def badopener(fname, flags):
+ return -1
+ with self.assertRaises(ValueError) as cm:
+ open('non-existent', 'r', opener=badopener)
+ self.assertEqual(str(cm.exception), 'opener returned -1')
+
+ def test_bad_opener_other_negative(self):
+ # Issue #27066.
+ def badopener(fname, flags):
+ return -2
+ with self.assertRaises(ValueError) as cm:
+ open('non-existent', 'r', opener=badopener)
+ self.assertEqual(str(cm.exception), 'opener returned -2')
+
def test_fileio_closefd(self):
# Issue #4841
with self.open(__file__, 'rb') as f1, \
@@ -685,18 +834,27 @@ class IOTest(unittest.TestCase):
f2.readline()
def test_nonbuffered_textio(self):
- with warnings.catch_warnings(record=True) as recorded:
+ with support.check_no_resource_warning(self):
with self.assertRaises(ValueError):
self.open(support.TESTFN, 'w', buffering=0)
- support.gc_collect()
- self.assertEqual(recorded, [])
def test_invalid_newline(self):
- with warnings.catch_warnings(record=True) as recorded:
+ with support.check_no_resource_warning(self):
with self.assertRaises(ValueError):
self.open(support.TESTFN, 'w', newline='invalid')
- support.gc_collect()
- self.assertEqual(recorded, [])
+
+ def test_buffered_readinto_mixin(self):
+ # Test the implementation provided by BufferedIOBase
+ class Stream(self.BufferedIOBase):
+ def read(self, size):
+ return b"12345"
+ read1 = read
+ stream = Stream()
+ for method in ("readinto", "readinto1"):
+ with self.subTest(method):
+ buffer = byteslike(5)
+ self.assertEqual(getattr(stream, method)(buffer), 5)
+ self.assertEqual(bytes(buffer), b"12345")
class CIOTest(IOTest):
@@ -723,6 +881,21 @@ class PyIOTest(IOTest):
pass
+@support.cpython_only
+class APIMismatchTest(unittest.TestCase):
+
+ def test_RawIOBase_io_in_pyio_match(self):
+ """Test that pyio RawIOBase class has all c RawIOBase methods"""
+ mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
+ ignore=('__weakref__',))
+ self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
+
+ def test_RawIOBase_pyio_in_io_match(self):
+ """Test that c RawIOBase class has all pyio RawIOBase methods"""
+ mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
+ self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
+
+
class CommonBufferedTests:
# Tests common to BufferedReader, BufferedWriter and BufferedRandom
@@ -740,12 +913,6 @@ class CommonBufferedTests:
self.assertEqual(42, bufio.fileno())
- @unittest.skip('test having existential crisis')
- def test_no_fileno(self):
- # XXX will we always have fileno() function? If so, kill
- # this test. Else, write it.
- pass
-
def test_invalid_args(self):
rawio = self.MockRawIO()
bufio = self.tp(rawio)
@@ -773,13 +940,9 @@ class CommonBufferedTests:
super().flush()
rawio = self.MockRawIO()
bufio = MyBufferedIO(rawio)
- writable = bufio.writable()
del bufio
support.gc_collect()
- if writable:
- self.assertEqual(record, [1, 2, 3])
- else:
- self.assertEqual(record, [1, 2])
+ self.assertEqual(record, [1, 2, 3])
def test_context_manager(self):
# Test usability as a context manager
@@ -811,7 +974,7 @@ class CommonBufferedTests:
def test_repr(self):
raw = self.MockRawIO()
b = self.tp(raw)
- clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
+ clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
self.assertEqual(repr(b), "<%s>" % clsname)
raw.name = "dummy"
self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
@@ -985,6 +1148,71 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
self.assertEqual(bufio.readinto(b), 1)
self.assertEqual(b, b"cb")
+ def test_readinto1(self):
+ buffer_size = 10
+ rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+ b = bytearray(2)
+ self.assertEqual(bufio.peek(3), b'abc')
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 2)
+ self.assertEqual(b, b"ab")
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 1)
+ self.assertEqual(b[:1], b"c")
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 2)
+ self.assertEqual(b, b"de")
+ self.assertEqual(rawio._reads, 2)
+ b = bytearray(2*buffer_size)
+ self.assertEqual(bufio.peek(3), b'fgh')
+ self.assertEqual(rawio._reads, 3)
+ self.assertEqual(bufio.readinto1(b), 6)
+ self.assertEqual(b[:6], b"fghjkl")
+ self.assertEqual(rawio._reads, 4)
+
+ def test_readinto_array(self):
+ buffer_size = 60
+ data = b"a" * 26
+ rawio = self.MockRawIO((data,))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+
+ # Create an array with element size > 1 byte
+ b = array.array('i', b'x' * 32)
+ assert len(b) != 16
+
+ # Read into it. We should get as many *bytes* as we can fit into b
+ # (which is more than the number of elements)
+ n = bufio.readinto(b)
+ self.assertGreater(n, len(b))
+
+ # Check that old contents of b are preserved
+ bm = memoryview(b).cast('B')
+ self.assertLess(n, len(bm))
+ self.assertEqual(bm[:n], data[:n])
+ self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
+
+ def test_readinto1_array(self):
+ buffer_size = 60
+ data = b"a" * 26
+ rawio = self.MockRawIO((data,))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+
+ # Create an array with element size > 1 byte
+ b = array.array('i', b'x' * 32)
+ assert len(b) != 16
+
+ # Read into it. We should get as many *bytes* as we can fit into b
+ # (which is more than the number of elements)
+ n = bufio.readinto1(b)
+ self.assertGreater(n, len(b))
+
+ # Check that old contents of b are preserved
+ bm = memoryview(b).cast('B')
+ self.assertLess(n, len(bm))
+ self.assertEqual(bm[:n], data[:n])
+ self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
+
def test_readlines(self):
def bufio():
rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
@@ -1219,6 +1447,11 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
bufio = self.tp(writer, 8)
bufio.write(b"abc")
self.assertFalse(writer._write_stack)
+ buffer = bytearray(b"def")
+ bufio.write(buffer)
+ buffer[:] = b"***" # Overwrite our copy of the data
+ bufio.flush()
+ self.assertEqual(b"".join(writer._write_stack), b"abcdef")
def test_write_overflow(self):
writer = self.MockRawIO()
@@ -1545,11 +1778,13 @@ class BufferedRWPairTest(unittest.TestCase):
self.assertEqual(pair.read1(3), b"abc")
def test_readinto(self):
- pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
+ for method in ("readinto", "readinto1"):
+ with self.subTest(method):
+ pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
- data = bytearray(5)
- self.assertEqual(pair.readinto(data), 5)
- self.assertEqual(data, b"abcde")
+ data = byteslike(5)
+ self.assertEqual(getattr(pair, method)(data), 5)
+ self.assertEqual(bytes(data), b"abcde")
def test_write(self):
w = self.MockRawIO()
@@ -1557,7 +1792,9 @@ class BufferedRWPairTest(unittest.TestCase):
pair.write(b"abc")
pair.flush()
- pair.write(b"def")
+ buffer = bytearray(b"def")
+ pair.write(buffer)
+ buffer[:] = b"***" # Overwrite our copy of the data
pair.flush()
self.assertEqual(w._write_stack, [b"abc", b"def"])
@@ -2898,6 +3135,7 @@ class TextIOWrapperTest(unittest.TestCase):
""".format(iomod=iomod, kwargs=kwargs)
return assert_python_ok("-c", code)
+ @support.requires_type_collecting
def test_create_at_shutdown_without_encoding(self):
rc, out, err = self._check_create_at_shutdown()
if err:
@@ -2907,12 +3145,24 @@ class TextIOWrapperTest(unittest.TestCase):
else:
self.assertEqual("ok", out.decode().strip())
+ @support.requires_type_collecting
def test_create_at_shutdown_with_encoding(self):
rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
errors='strict')
self.assertFalse(err)
self.assertEqual("ok", out.decode().strip())
+ def test_read_byteslike(self):
+ r = MemviewBytesIO(b'Just some random string\n')
+ t = self.TextIOWrapper(r, 'utf-8')
+
+ # TextIOwrapper will not read the full string, because
+ # we truncate it to a multiple of the native int size
+ # so that we can construct a more complex memoryview.
+ bytes_val = _to_memoryview(r.getvalue()).tobytes()
+
+ self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
+
def test_issue22849(self):
class F(object):
def readable(self): return True
@@ -2929,6 +3179,25 @@ class TextIOWrapperTest(unittest.TestCase):
t = self.TextIOWrapper(F(), encoding='utf-8')
+class MemviewBytesIO(io.BytesIO):
+ '''A BytesIO object whose read method returns memoryviews
+ rather than bytes'''
+
+ def read1(self, len_):
+ return _to_memoryview(super().read1(len_))
+
+ def read(self, len_):
+ return _to_memoryview(super().read(len_))
+
+def _to_memoryview(buf):
+ '''Convert bytes-object *buf* to a non-trivial memoryview'''
+
+ arr = array.array('i')
+ idx = len(buf) - len(buf) % arr.itemsize
+ arr.frombytes(buf[:idx])
+ return memoryview(arr)
+
+
class CTextIOWrapperTest(TextIOWrapperTest):
io = io
shutdown_error = "RuntimeError: could not find io module state"
@@ -2937,8 +3206,6 @@ class CTextIOWrapperTest(TextIOWrapperTest):
r = self.BytesIO(b"\xc3\xa9\n\n")
b = self.BufferedReader(r, 1000)
t = self.TextIOWrapper(b)
- self.assertRaises(TypeError, t.__init__, b, newline=42)
- self.assertRaises(ValueError, t.read)
self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
self.assertRaises(ValueError, t.read)
@@ -3175,6 +3442,8 @@ class MiscIOTest(unittest.TestCase):
self.assertRaises(ValueError, f.readall)
if hasattr(f, "readinto"):
self.assertRaises(ValueError, f.readinto, bytearray(1024))
+ if hasattr(f, "readinto1"):
+ self.assertRaises(ValueError, f.readinto1, bytearray(1024))
self.assertRaises(ValueError, f.readline)
self.assertRaises(ValueError, f.readlines)
self.assertRaises(ValueError, f.seek, 0)
@@ -3260,10 +3529,8 @@ class MiscIOTest(unittest.TestCase):
# When using closefd=False, there's no warning
r, w = os.pipe()
fds += r, w
- with warnings.catch_warnings(record=True) as recorded:
+ with support.check_no_resource_warning(self):
open(r, *args, closefd=False, **kwargs)
- support.gc_collect()
- self.assertEqual(recorded, [])
def test_warn_on_dealloc_fd(self):
self._check_warn_on_dealloc_fd("rb", buffering=0)
@@ -3288,26 +3555,20 @@ class MiscIOTest(unittest.TestCase):
with self.open(support.TESTFN, **kwargs) as f:
self.assertRaises(TypeError, pickle.dumps, f, protocol)
- @unittest.skipUnless(fcntl, 'fcntl required for this test')
def test_nonblock_pipe_write_bigbuf(self):
self._test_nonblock_pipe_write(16*1024)
- @unittest.skipUnless(fcntl, 'fcntl required for this test')
def test_nonblock_pipe_write_smallbuf(self):
self._test_nonblock_pipe_write(1024)
- def _set_non_blocking(self, fd):
- flags = fcntl.fcntl(fd, fcntl.F_GETFL)
- self.assertNotEqual(flags, -1)
- res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- self.assertEqual(res, 0)
-
+ @unittest.skipUnless(hasattr(os, 'set_blocking'),
+ 'os.set_blocking() required for this test')
def _test_nonblock_pipe_write(self, bufsize):
sent = []
received = []
r, w = os.pipe()
- self._set_non_blocking(r)
- self._set_non_blocking(w)
+ os.set_blocking(r, False)
+ os.set_blocking(w, False)
# To exercise all code paths in the C implementation we need
# to play with buffer sizes. For instance, if we choose a
@@ -3456,6 +3717,7 @@ class SignalsTest(unittest.TestCase):
t.daemon = True
r, w = os.pipe()
fdopen_kwargs["closefd"] = False
+ large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
@@ -3467,8 +3729,7 @@ class SignalsTest(unittest.TestCase):
# handlers, which in this case will invoke alarm_interrupt().
signal.alarm(1)
try:
- with self.assertRaises(ZeroDivisionError):
- wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
+ self.assertRaises(ZeroDivisionError, wio.write, large_data)
finally:
signal.alarm(0)
t.join()
@@ -3569,11 +3830,13 @@ class SignalsTest(unittest.TestCase):
returning a partial result or EINTR), properly invokes the signal
handler and retries if the latter returned successfully."""
select = support.import_module("select")
+
# A quantity that exceeds the buffer size of an anonymous pipe's
# write end.
N = support.PIPE_MAX_SIZE
r, w = os.pipe()
fdopen_kwargs["closefd"] = False
+
# We need a separate thread to read from the pipe and allow the
# write() to finish. This thread is started after the SIGALRM is
# received (forcing a first EINTR in write()).
@@ -3596,6 +3859,8 @@ class SignalsTest(unittest.TestCase):
signal.alarm(1)
def alarm2(sig, frame):
t.start()
+
+ large_data = item * N
signal.signal(signal.SIGALRM, alarm1)
try:
wio = self.io.open(w, **fdopen_kwargs)
@@ -3605,7 +3870,9 @@ class SignalsTest(unittest.TestCase):
# and the first alarm)
# - second raw write() returns EINTR (because of the second alarm)
# - subsequent write()s are successful (either partial or complete)
- self.assertEqual(N, wio.write(item * N))
+ written = wio.write(large_data)
+ self.assertEqual(N, written)
+
wio.flush()
write_finished = True
t.join()
@@ -3645,7 +3912,7 @@ class PySignalsTest(SignalsTest):
def load_tests(*args):
- tests = (CIOTest, PyIOTest,
+ tests = (CIOTest, PyIOTest, APIMismatchTest,
CBufferedReaderTest, PyBufferedReaderTest,
CBufferedWriterTest, PyBufferedWriterTest,
CBufferedRWPairTest, PyBufferedRWPairTest,