diff options
Diffstat (limited to 'Lib/test/test_io.py')
| -rw-r--r-- | Lib/test/test_io.py | 379 |
1 files changed, 323 insertions, 56 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 86cf8e1..5111882 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -35,7 +35,7 @@ import weakref from collections import deque, UserList from itertools import cycle, count from test import support -from test.script_helper import assert_python_ok, run_python_until_end +from test.support.script_helper import assert_python_ok, run_python_until_end import codecs import io # C implementation of io @@ -44,10 +44,22 @@ try: import threading except ImportError: threading = None + try: - import fcntl + import ctypes except ImportError: - fcntl = None + def byteslike(*pos, **kw): + return array.array("b", bytes(*pos, **kw)) +else: + def byteslike(*pos, **kw): + """Create a bytes-like object having no string or sequence methods""" + data = bytes(*pos, **kw) + obj = EmptyStruct() + ctypes.resize(obj, len(data)) + memoryview(obj).cast("B")[:] = data + return obj + class EmptyStruct(ctypes.Structure): + pass def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" @@ -207,6 +219,9 @@ class MockUnseekableIO: def tell(self, *args): raise self.UnsupportedOperation("not seekable") + def truncate(self, *args): + raise self.UnsupportedOperation("not seekable") + class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): UnsupportedOperation = io.UnsupportedOperation @@ -285,7 +300,9 @@ class IOTest(unittest.TestCase): self.assertEqual(f.tell(), 6) self.assertEqual(f.seek(-1, 1), 5) self.assertEqual(f.tell(), 5) - self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9) + buffer = bytearray(b" world\n\n\n") + self.assertEqual(f.write(buffer), 9) + buffer[:] = b"*" * 9 # Overwrite our copy of the data self.assertEqual(f.seek(0), 0) self.assertEqual(f.write(b"h"), 1) self.assertEqual(f.seek(-1, 2), 13) @@ -298,20 +315,21 @@ class IOTest(unittest.TestCase): def read_ops(self, f, buffered=False): data = f.read(5) self.assertEqual(data, b"hello") - data = bytearray(data) + data = byteslike(data) self.assertEqual(f.readinto(data), 5) - self.assertEqual(data, b" worl") + self.assertEqual(bytes(data), b" worl") + data = bytearray(5) self.assertEqual(f.readinto(data), 2) self.assertEqual(len(data), 5) self.assertEqual(data[:2], b"d\n") self.assertEqual(f.seek(0), 0) self.assertEqual(f.read(20), b"hello world\n") self.assertEqual(f.read(1), b"") - self.assertEqual(f.readinto(bytearray(b"x")), 0) + self.assertEqual(f.readinto(byteslike(b"x")), 0) self.assertEqual(f.seek(-6, 2), 6) self.assertEqual(f.read(5), b"world") self.assertEqual(f.read(0), b"") - self.assertEqual(f.readinto(bytearray()), 0) + self.assertEqual(f.readinto(byteslike()), 0) self.assertEqual(f.seek(-6, 1), 5) self.assertEqual(f.read(5), b" worl") self.assertEqual(f.tell(), 10) @@ -322,6 +340,10 @@ class IOTest(unittest.TestCase): f.seek(6) self.assertEqual(f.read(), b"world\n") self.assertEqual(f.read(), b"") + f.seek(0) + data = byteslike(5) + self.assertEqual(f.readinto1(data), 5) + self.assertEqual(bytes(data), b"hello") LARGE = 2**31 @@ -365,10 +387,116 @@ class IOTest(unittest.TestCase): self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) self.assertRaises(exc, fp.seek, -1, self.SEEK_END) + def test_optional_abilities(self): + # Test for OSError when optional APIs are not supported + # The purpose of this test is to try fileno(), reading, writing and + # seeking operations with various objects that indicate they do not + # support these operations. + + def pipe_reader(): + [r, w] = os.pipe() + os.close(w) # So that read() is harmless + return self.FileIO(r, "r") + + def pipe_writer(): + [r, w] = os.pipe() + self.addCleanup(os.close, r) + # Guarantee that we can write into the pipe without blocking + thread = threading.Thread(target=os.read, args=(r, 100)) + thread.start() + self.addCleanup(thread.join) + return self.FileIO(w, "w") + + def buffered_reader(): + return self.BufferedReader(self.MockUnseekableIO()) + + def buffered_writer(): + return self.BufferedWriter(self.MockUnseekableIO()) + + def buffered_random(): + return self.BufferedRandom(self.BytesIO()) + + def buffered_rw_pair(): + return self.BufferedRWPair(self.MockUnseekableIO(), + self.MockUnseekableIO()) + + def text_reader(): + class UnseekableReader(self.MockUnseekableIO): + writable = self.BufferedIOBase.writable + write = self.BufferedIOBase.write + return self.TextIOWrapper(UnseekableReader(), "ascii") + + def text_writer(): + class UnseekableWriter(self.MockUnseekableIO): + readable = self.BufferedIOBase.readable + read = self.BufferedIOBase.read + return self.TextIOWrapper(UnseekableWriter(), "ascii") + + tests = ( + (pipe_reader, "fr"), (pipe_writer, "fw"), + (buffered_reader, "r"), (buffered_writer, "w"), + (buffered_random, "rws"), (buffered_rw_pair, "rw"), + (text_reader, "r"), (text_writer, "w"), + (self.BytesIO, "rws"), (self.StringIO, "rws"), + ) + for [test, abilities] in tests: + if test is pipe_writer and not threading: + continue # Skip subtest that uses a background thread + with self.subTest(test), test() as obj: + readable = "r" in abilities + self.assertEqual(obj.readable(), readable) + writable = "w" in abilities + self.assertEqual(obj.writable(), writable) + + if isinstance(obj, self.TextIOBase): + data = "3" + elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): + data = b"3" + else: + self.fail("Unknown base class") + + if "f" in abilities: + obj.fileno() + else: + self.assertRaises(OSError, obj.fileno) + + if readable: + obj.read(1) + obj.read() + else: + self.assertRaises(OSError, obj.read, 1) + self.assertRaises(OSError, obj.read) + + if writable: + obj.write(data) + else: + self.assertRaises(OSError, obj.write, data) + + if sys.platform.startswith("win") and test in ( + pipe_reader, pipe_writer): + # Pipes seem to appear as seekable on Windows + continue + seekable = "s" in abilities + self.assertEqual(obj.seekable(), seekable) + + if seekable: + obj.tell() + obj.seek(0) + else: + self.assertRaises(OSError, obj.tell) + self.assertRaises(OSError, obj.seek, 0) + + if writable and seekable: + obj.truncate() + obj.truncate(0) + else: + self.assertRaises(OSError, obj.truncate) + self.assertRaises(OSError, obj.truncate, 0) + def test_open_handles_NUL_chars(self): fn_with_NUL = 'foo\0bar' - self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') - self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') + self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') + self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') def test_raw_file_io(self): with self.open(support.TESTFN, "wb", buffering=0) as f: @@ -532,10 +660,15 @@ class IOTest(unittest.TestCase): def test_array_writes(self): a = array.array('i', range(10)) n = len(a.tobytes()) - with self.open(support.TESTFN, "wb", 0) as f: - self.assertEqual(f.write(a), n) - with self.open(support.TESTFN, "wb") as f: - self.assertEqual(f.write(a), n) + def check(f): + with f: + self.assertEqual(f.write(a), n) + f.writelines((a,)) + check(self.BytesIO()) + check(self.FileIO(support.TESTFN, "w")) + check(self.BufferedWriter(self.MockRawIO())) + check(self.BufferedRandom(self.MockRawIO())) + check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) def test_closefd(self): self.assertRaises(ValueError, self.open, support.TESTFN, 'w', @@ -672,6 +805,22 @@ class IOTest(unittest.TestCase): with self.open("non-existent", "r", opener=opener) as f: self.assertEqual(f.read(), "egg\n") + def test_bad_opener_negative_1(self): + # Issue #27066. + def badopener(fname, flags): + return -1 + with self.assertRaises(ValueError) as cm: + open('non-existent', 'r', opener=badopener) + self.assertEqual(str(cm.exception), 'opener returned -1') + + def test_bad_opener_other_negative(self): + # Issue #27066. + def badopener(fname, flags): + return -2 + with self.assertRaises(ValueError) as cm: + open('non-existent', 'r', opener=badopener) + self.assertEqual(str(cm.exception), 'opener returned -2') + def test_fileio_closefd(self): # Issue #4841 with self.open(__file__, 'rb') as f1, \ @@ -685,18 +834,27 @@ class IOTest(unittest.TestCase): f2.readline() def test_nonbuffered_textio(self): - with warnings.catch_warnings(record=True) as recorded: + with support.check_no_resource_warning(self): with self.assertRaises(ValueError): self.open(support.TESTFN, 'w', buffering=0) - support.gc_collect() - self.assertEqual(recorded, []) def test_invalid_newline(self): - with warnings.catch_warnings(record=True) as recorded: + with support.check_no_resource_warning(self): with self.assertRaises(ValueError): self.open(support.TESTFN, 'w', newline='invalid') - support.gc_collect() - self.assertEqual(recorded, []) + + def test_buffered_readinto_mixin(self): + # Test the implementation provided by BufferedIOBase + class Stream(self.BufferedIOBase): + def read(self, size): + return b"12345" + read1 = read + stream = Stream() + for method in ("readinto", "readinto1"): + with self.subTest(method): + buffer = byteslike(5) + self.assertEqual(getattr(stream, method)(buffer), 5) + self.assertEqual(bytes(buffer), b"12345") class CIOTest(IOTest): @@ -723,6 +881,21 @@ class PyIOTest(IOTest): pass +@support.cpython_only +class APIMismatchTest(unittest.TestCase): + + def test_RawIOBase_io_in_pyio_match(self): + """Test that pyio RawIOBase class has all c RawIOBase methods""" + mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, + ignore=('__weakref__',)) + self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') + + def test_RawIOBase_pyio_in_io_match(self): + """Test that c RawIOBase class has all pyio RawIOBase methods""" + mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) + self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') + + class CommonBufferedTests: # Tests common to BufferedReader, BufferedWriter and BufferedRandom @@ -740,12 +913,6 @@ class CommonBufferedTests: self.assertEqual(42, bufio.fileno()) - @unittest.skip('test having existential crisis') - def test_no_fileno(self): - # XXX will we always have fileno() function? If so, kill - # this test. Else, write it. - pass - def test_invalid_args(self): rawio = self.MockRawIO() bufio = self.tp(rawio) @@ -773,13 +940,9 @@ class CommonBufferedTests: super().flush() rawio = self.MockRawIO() bufio = MyBufferedIO(rawio) - writable = bufio.writable() del bufio support.gc_collect() - if writable: - self.assertEqual(record, [1, 2, 3]) - else: - self.assertEqual(record, [1, 2]) + self.assertEqual(record, [1, 2, 3]) def test_context_manager(self): # Test usability as a context manager @@ -811,7 +974,7 @@ class CommonBufferedTests: def test_repr(self): raw = self.MockRawIO() b = self.tp(raw) - clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) + clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__) self.assertEqual(repr(b), "<%s>" % clsname) raw.name = "dummy" self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) @@ -985,6 +1148,71 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(bufio.readinto(b), 1) self.assertEqual(b, b"cb") + def test_readinto1(self): + buffer_size = 10 + rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) + bufio = self.tp(rawio, buffer_size=buffer_size) + b = bytearray(2) + self.assertEqual(bufio.peek(3), b'abc') + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 2) + self.assertEqual(b, b"ab") + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 1) + self.assertEqual(b[:1], b"c") + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 2) + self.assertEqual(b, b"de") + self.assertEqual(rawio._reads, 2) + b = bytearray(2*buffer_size) + self.assertEqual(bufio.peek(3), b'fgh') + self.assertEqual(rawio._reads, 3) + self.assertEqual(bufio.readinto1(b), 6) + self.assertEqual(b[:6], b"fghjkl") + self.assertEqual(rawio._reads, 4) + + def test_readinto_array(self): + buffer_size = 60 + data = b"a" * 26 + rawio = self.MockRawIO((data,)) + bufio = self.tp(rawio, buffer_size=buffer_size) + + # Create an array with element size > 1 byte + b = array.array('i', b'x' * 32) + assert len(b) != 16 + + # Read into it. We should get as many *bytes* as we can fit into b + # (which is more than the number of elements) + n = bufio.readinto(b) + self.assertGreater(n, len(b)) + + # Check that old contents of b are preserved + bm = memoryview(b).cast('B') + self.assertLess(n, len(bm)) + self.assertEqual(bm[:n], data[:n]) + self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) + + def test_readinto1_array(self): + buffer_size = 60 + data = b"a" * 26 + rawio = self.MockRawIO((data,)) + bufio = self.tp(rawio, buffer_size=buffer_size) + + # Create an array with element size > 1 byte + b = array.array('i', b'x' * 32) + assert len(b) != 16 + + # Read into it. We should get as many *bytes* as we can fit into b + # (which is more than the number of elements) + n = bufio.readinto1(b) + self.assertGreater(n, len(b)) + + # Check that old contents of b are preserved + bm = memoryview(b).cast('B') + self.assertLess(n, len(bm)) + self.assertEqual(bm[:n], data[:n]) + self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) + def test_readlines(self): def bufio(): rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) @@ -1219,6 +1447,11 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): bufio = self.tp(writer, 8) bufio.write(b"abc") self.assertFalse(writer._write_stack) + buffer = bytearray(b"def") + bufio.write(buffer) + buffer[:] = b"***" # Overwrite our copy of the data + bufio.flush() + self.assertEqual(b"".join(writer._write_stack), b"abcdef") def test_write_overflow(self): writer = self.MockRawIO() @@ -1545,11 +1778,13 @@ class BufferedRWPairTest(unittest.TestCase): self.assertEqual(pair.read1(3), b"abc") def test_readinto(self): - pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) + for method in ("readinto", "readinto1"): + with self.subTest(method): + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) - data = bytearray(5) - self.assertEqual(pair.readinto(data), 5) - self.assertEqual(data, b"abcde") + data = byteslike(5) + self.assertEqual(getattr(pair, method)(data), 5) + self.assertEqual(bytes(data), b"abcde") def test_write(self): w = self.MockRawIO() @@ -1557,7 +1792,9 @@ class BufferedRWPairTest(unittest.TestCase): pair.write(b"abc") pair.flush() - pair.write(b"def") + buffer = bytearray(b"def") + pair.write(buffer) + buffer[:] = b"***" # Overwrite our copy of the data pair.flush() self.assertEqual(w._write_stack, [b"abc", b"def"]) @@ -2898,6 +3135,7 @@ class TextIOWrapperTest(unittest.TestCase): """.format(iomod=iomod, kwargs=kwargs) return assert_python_ok("-c", code) + @support.requires_type_collecting def test_create_at_shutdown_without_encoding(self): rc, out, err = self._check_create_at_shutdown() if err: @@ -2907,12 +3145,24 @@ class TextIOWrapperTest(unittest.TestCase): else: self.assertEqual("ok", out.decode().strip()) + @support.requires_type_collecting def test_create_at_shutdown_with_encoding(self): rc, out, err = self._check_create_at_shutdown(encoding='utf-8', errors='strict') self.assertFalse(err) self.assertEqual("ok", out.decode().strip()) + def test_read_byteslike(self): + r = MemviewBytesIO(b'Just some random string\n') + t = self.TextIOWrapper(r, 'utf-8') + + # TextIOwrapper will not read the full string, because + # we truncate it to a multiple of the native int size + # so that we can construct a more complex memoryview. + bytes_val = _to_memoryview(r.getvalue()).tobytes() + + self.assertEqual(t.read(200), bytes_val.decode('utf-8')) + def test_issue22849(self): class F(object): def readable(self): return True @@ -2929,6 +3179,25 @@ class TextIOWrapperTest(unittest.TestCase): t = self.TextIOWrapper(F(), encoding='utf-8') +class MemviewBytesIO(io.BytesIO): + '''A BytesIO object whose read method returns memoryviews + rather than bytes''' + + def read1(self, len_): + return _to_memoryview(super().read1(len_)) + + def read(self, len_): + return _to_memoryview(super().read(len_)) + +def _to_memoryview(buf): + '''Convert bytes-object *buf* to a non-trivial memoryview''' + + arr = array.array('i') + idx = len(buf) - len(buf) % arr.itemsize + arr.frombytes(buf[:idx]) + return memoryview(arr) + + class CTextIOWrapperTest(TextIOWrapperTest): io = io shutdown_error = "RuntimeError: could not find io module state" @@ -2937,8 +3206,6 @@ class CTextIOWrapperTest(TextIOWrapperTest): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) t = self.TextIOWrapper(b) - self.assertRaises(TypeError, t.__init__, b, newline=42) - self.assertRaises(ValueError, t.read) self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') self.assertRaises(ValueError, t.read) @@ -3175,6 +3442,8 @@ class MiscIOTest(unittest.TestCase): self.assertRaises(ValueError, f.readall) if hasattr(f, "readinto"): self.assertRaises(ValueError, f.readinto, bytearray(1024)) + if hasattr(f, "readinto1"): + self.assertRaises(ValueError, f.readinto1, bytearray(1024)) self.assertRaises(ValueError, f.readline) self.assertRaises(ValueError, f.readlines) self.assertRaises(ValueError, f.seek, 0) @@ -3260,10 +3529,8 @@ class MiscIOTest(unittest.TestCase): # When using closefd=False, there's no warning r, w = os.pipe() fds += r, w - with warnings.catch_warnings(record=True) as recorded: + with support.check_no_resource_warning(self): open(r, *args, closefd=False, **kwargs) - support.gc_collect() - self.assertEqual(recorded, []) def test_warn_on_dealloc_fd(self): self._check_warn_on_dealloc_fd("rb", buffering=0) @@ -3288,26 +3555,20 @@ class MiscIOTest(unittest.TestCase): with self.open(support.TESTFN, **kwargs) as f: self.assertRaises(TypeError, pickle.dumps, f, protocol) - @unittest.skipUnless(fcntl, 'fcntl required for this test') def test_nonblock_pipe_write_bigbuf(self): self._test_nonblock_pipe_write(16*1024) - @unittest.skipUnless(fcntl, 'fcntl required for this test') def test_nonblock_pipe_write_smallbuf(self): self._test_nonblock_pipe_write(1024) - def _set_non_blocking(self, fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - self.assertNotEqual(flags, -1) - res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - self.assertEqual(res, 0) - + @unittest.skipUnless(hasattr(os, 'set_blocking'), + 'os.set_blocking() required for this test') def _test_nonblock_pipe_write(self, bufsize): sent = [] received = [] r, w = os.pipe() - self._set_non_blocking(r) - self._set_non_blocking(w) + os.set_blocking(r, False) + os.set_blocking(w, False) # To exercise all code paths in the C implementation we need # to play with buffer sizes. For instance, if we choose a @@ -3456,6 +3717,7 @@ class SignalsTest(unittest.TestCase): t.daemon = True r, w = os.pipe() fdopen_kwargs["closefd"] = False + large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) try: wio = self.io.open(w, **fdopen_kwargs) t.start() @@ -3467,8 +3729,7 @@ class SignalsTest(unittest.TestCase): # handlers, which in this case will invoke alarm_interrupt(). signal.alarm(1) try: - with self.assertRaises(ZeroDivisionError): - wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1)) + self.assertRaises(ZeroDivisionError, wio.write, large_data) finally: signal.alarm(0) t.join() @@ -3569,11 +3830,13 @@ class SignalsTest(unittest.TestCase): returning a partial result or EINTR), properly invokes the signal handler and retries if the latter returned successfully.""" select = support.import_module("select") + # A quantity that exceeds the buffer size of an anonymous pipe's # write end. N = support.PIPE_MAX_SIZE r, w = os.pipe() fdopen_kwargs["closefd"] = False + # We need a separate thread to read from the pipe and allow the # write() to finish. This thread is started after the SIGALRM is # received (forcing a first EINTR in write()). @@ -3596,6 +3859,8 @@ class SignalsTest(unittest.TestCase): signal.alarm(1) def alarm2(sig, frame): t.start() + + large_data = item * N signal.signal(signal.SIGALRM, alarm1) try: wio = self.io.open(w, **fdopen_kwargs) @@ -3605,7 +3870,9 @@ class SignalsTest(unittest.TestCase): # and the first alarm) # - second raw write() returns EINTR (because of the second alarm) # - subsequent write()s are successful (either partial or complete) - self.assertEqual(N, wio.write(item * N)) + written = wio.write(large_data) + self.assertEqual(N, written) + wio.flush() write_finished = True t.join() @@ -3645,7 +3912,7 @@ class PySignalsTest(SignalsTest): def load_tests(*args): - tests = (CIOTest, PyIOTest, + tests = (CIOTest, PyIOTest, APIMismatchTest, CBufferedReaderTest, PyBufferedReaderTest, CBufferedWriterTest, PyBufferedWriterTest, CBufferedRWPairTest, PyBufferedRWPairTest, |
