diff options
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 340 |
1 files changed, 266 insertions, 74 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 1ec6f93..dac30cb 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -23,22 +23,25 @@ import os import sys import time import array -import threading import random import unittest -import warnings import weakref -import gc import abc import signal import errno -from itertools import chain, cycle, count +import warnings +import pickle +from itertools import cycle, count from collections import deque from test import support import codecs import io # C implementation of io import _pyio as pyio # Python implementation of io +try: + import threading +except ImportError: + threading = None def _default_chunk_size(): @@ -189,6 +192,23 @@ class PyMockFileIO(MockFileIO, pyio.BytesIO): pass +class MockUnseekableIO: + def seekable(self): + return False + + def seek(self, *args): + raise self.UnsupportedOperation("not seekable") + + def tell(self, *args): + raise self.UnsupportedOperation("not seekable") + +class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): + UnsupportedOperation = io.UnsupportedOperation + +class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): + UnsupportedOperation = pyio.UnsupportedOperation + + class MockNonBlockWriterIO: def __init__(self): @@ -314,16 +334,26 @@ class IOTest(unittest.TestCase): def test_invalid_operations(self): # Try writing on a file opened in read mode and vice-versa. + exc = self.UnsupportedOperation for mode in ("w", "wb"): with self.open(support.TESTFN, mode) as fp: - self.assertRaises(IOError, fp.read) - self.assertRaises(IOError, fp.readline) + self.assertRaises(exc, fp.read) + self.assertRaises(exc, fp.readline) + with self.open(support.TESTFN, "wb", buffering=0) as fp: + self.assertRaises(exc, fp.read) + self.assertRaises(exc, fp.readline) + with self.open(support.TESTFN, "rb", buffering=0) as fp: + self.assertRaises(exc, fp.write, b"blah") + self.assertRaises(exc, fp.writelines, [b"blah\n"]) with self.open(support.TESTFN, "rb") as fp: - self.assertRaises(IOError, fp.write, b"blah") - self.assertRaises(IOError, fp.writelines, [b"blah\n"]) + self.assertRaises(exc, fp.write, b"blah") + self.assertRaises(exc, fp.writelines, [b"blah\n"]) with self.open(support.TESTFN, "r") as fp: - self.assertRaises(IOError, fp.write, "blah") - self.assertRaises(IOError, fp.writelines, ["blah\n"]) + self.assertRaises(exc, fp.write, "blah") + self.assertRaises(exc, fp.writelines, ["blah\n"]) + # Non-zero seeking from current or end pos + self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) + self.assertRaises(exc, fp.seek, -1, self.SEEK_END) def test_raw_file_io(self): with self.open(support.TESTFN, "wb", buffering=0) as f: @@ -432,13 +462,14 @@ class IOTest(unittest.TestCase): def flush(self): record.append(3) super().flush() - f = MyFileIO(support.TESTFN, "wb") - f.write(b"xxx") - del f - support.gc_collect() - self.assertEqual(record, [1, 2, 3]) - with self.open(support.TESTFN, "rb") as f: - self.assertEqual(f.read(), b"xxx") + with support.check_warnings(('', ResourceWarning)): + f = MyFileIO(support.TESTFN, "wb") + f.write(b"xxx") + del f + support.gc_collect() + self.assertEqual(record, [1, 2, 3]) + with self.open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"xxx") def _check_base_destructor(self, base): record = [] @@ -490,7 +521,7 @@ class IOTest(unittest.TestCase): def test_array_writes(self): a = array.array('i', range(10)) - n = len(a.tostring()) + n = len(a.tobytes()) with self.open(support.TESTFN, "wb", 0) as f: self.assertEqual(f.write(a), n) with self.open(support.TESTFN, "wb") as f: @@ -525,12 +556,13 @@ class IOTest(unittest.TestCase): def test_garbage_collection(self): # FileIO objects are collected, and collecting them flushes # all data to disk. - f = self.FileIO(support.TESTFN, "wb") - f.write(b"abcxxx") - f.f = f - wr = weakref.ref(f) - del f - support.gc_collect() + with support.check_warnings(('', ResourceWarning)): + f = self.FileIO(support.TESTFN, "wb") + f.write(b"abcxxx") + f.f = f + wr = weakref.ref(f) + del f + support.gc_collect() self.assertTrue(wr() is None, wr) with self.open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"abcxxx") @@ -693,6 +725,11 @@ class CommonBufferedTests: b.close() self.assertRaises(ValueError, b.flush) + def test_unseekable(self): + bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) + self.assertRaises(self.UnsupportedOperation, bufio.tell) + self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) + def test_readonly_attributes(self): raw = self.MockRawIO() buf = self.tp(raw) @@ -810,6 +847,7 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(b"abcdefg", bufio.read()) + @unittest.skipUnless(threading, 'Threading required for this test.') @support.requires_resource('cpu') def test_threads(self): try: @@ -1078,6 +1116,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): with self.open(support.TESTFN, "rb", buffering=0) as f: self.assertEqual(f.read(), b"abc") + @unittest.skipUnless(threading, 'Threading required for this test.') @support.requires_resource('cpu') def test_threads(self): try: @@ -1136,14 +1175,9 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): self.assertRaises(IOError, bufio.write, b"abcdef") def test_max_buffer_size_deprecation(self): - with support.check_warnings() as w: - warnings.simplefilter("always", DeprecationWarning) + with support.check_warnings(("max_buffer_size is deprecated", + DeprecationWarning)): self.tp(self.MockRawIO(), 8, 12) - self.assertEqual(len(w.warnings), 1) - warning = w.warnings[0] - self.assertTrue(warning.category is DeprecationWarning) - self.assertEqual(str(warning.message), - "max_buffer_size is deprecated") class CBufferedWriterTest(BufferedWriterTest): @@ -1199,14 +1233,9 @@ class BufferedRWPairTest(unittest.TestCase): self.assertRaises(self.UnsupportedOperation, pair.detach) def test_constructor_max_buffer_size_deprecation(self): - with support.check_warnings() as w: - warnings.simplefilter("always", DeprecationWarning) + with support.check_warnings(("max_buffer_size is deprecated", + DeprecationWarning)): self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) - self.assertEqual(len(w.warnings), 1) - warning = w.warnings[0] - self.assertTrue(warning.category is DeprecationWarning) - self.assertEqual(str(warning.message), - "max_buffer_size is deprecated") def test_constructor_with_not_readable(self): class NotReadable(MockRawIO): @@ -1499,6 +1528,9 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): BufferedReaderTest.test_misbehaved_io(self) BufferedWriterTest.test_misbehaved_io(self) + # You can't construct a BufferedRandom over a non-seekable stream. + test_unseekable = None + class CBufferedRandomTest(BufferedRandomTest): tp = io.BufferedRandom @@ -1711,9 +1743,12 @@ class TextIOWrapperTest(unittest.TestCase): raw.name = "dummy" self.assertEqual(repr(t), "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) + t.mode = "r" + self.assertEqual(repr(t), + "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) raw.name = b"dummy" self.assertEqual(repr(t), - "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname) + "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) def test_line_buffering(self): r = self.BytesIO() @@ -1987,26 +2022,24 @@ class TextIOWrapperTest(unittest.TestCase): u_suffix = "\u8888\n" suffix = bytes(u_suffix.encode("utf-8")) line = prefix + suffix - f = self.open(support.TESTFN, "wb") - f.write(line*2) - f.close() - f = self.open(support.TESTFN, "r", encoding="utf-8") - s = f.read(prefix_size) - self.assertEqual(s, str(prefix, "ascii")) - self.assertEqual(f.tell(), prefix_size) - self.assertEqual(f.readline(), u_suffix) + with self.open(support.TESTFN, "wb") as f: + f.write(line*2) + with self.open(support.TESTFN, "r", encoding="utf-8") as f: + s = f.read(prefix_size) + self.assertEqual(s, str(prefix, "ascii")) + self.assertEqual(f.tell(), prefix_size) + self.assertEqual(f.readline(), u_suffix) def test_seeking_too(self): # Regression test for a specific bug data = b'\xe0\xbf\xbf\n' - f = self.open(support.TESTFN, "wb") - f.write(data) - f.close() - f = self.open(support.TESTFN, "r", encoding="utf-8") - f._CHUNK_SIZE # Just test that it exists - f._CHUNK_SIZE = 2 - f.readline() - f.tell() + with self.open(support.TESTFN, "wb") as f: + f.write(data) + with self.open(support.TESTFN, "r", encoding="utf-8") as f: + f._CHUNK_SIZE # Just test that it exists + f._CHUNK_SIZE = 2 + f.readline() + f.tell() def test_seek_and_tell(self): #Test seek/tell using the StatefulIncrementalDecoder. @@ -2207,7 +2240,7 @@ class TextIOWrapperTest(unittest.TestCase): with self.open(support.TESTFN, "w", errors="replace") as f: self.assertEqual(f.errors, "replace") - + @unittest.skipUnless(threading, 'Threading required for this test.') def test_threads_write(self): # Issue6750: concurrent writes could duplicate data event = threading.Event() @@ -2243,6 +2276,11 @@ class TextIOWrapperTest(unittest.TestCase): txt.close() self.assertRaises(ValueError, txt.flush) + def test_unseekable(self): + txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata)) + self.assertRaises(self.UnsupportedOperation, txt.tell) + self.assertRaises(self.UnsupportedOperation, txt.seek, 0) + def test_readonly_attributes(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") buf = self.BytesIO(self.testdata) @@ -2500,27 +2538,27 @@ class MiscIOTest(unittest.TestCase): def test_abcs(self): # Test the visible base classes are ABCs. - self.assertTrue(isinstance(self.IOBase, abc.ABCMeta)) - self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta)) - self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta)) - self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta)) + self.assertIsInstance(self.IOBase, abc.ABCMeta) + self.assertIsInstance(self.RawIOBase, abc.ABCMeta) + self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) + self.assertIsInstance(self.TextIOBase, abc.ABCMeta) def _check_abc_inheritance(self, abcmodule): with self.open(support.TESTFN, "wb", buffering=0) as f: - self.assertTrue(isinstance(f, abcmodule.IOBase)) - self.assertTrue(isinstance(f, abcmodule.RawIOBase)) - self.assertFalse(isinstance(f, abcmodule.BufferedIOBase)) - self.assertFalse(isinstance(f, abcmodule.TextIOBase)) + self.assertIsInstance(f, abcmodule.IOBase) + self.assertIsInstance(f, abcmodule.RawIOBase) + self.assertNotIsInstance(f, abcmodule.BufferedIOBase) + self.assertNotIsInstance(f, abcmodule.TextIOBase) with self.open(support.TESTFN, "wb") as f: - self.assertTrue(isinstance(f, abcmodule.IOBase)) - self.assertFalse(isinstance(f, abcmodule.RawIOBase)) - self.assertTrue(isinstance(f, abcmodule.BufferedIOBase)) - self.assertFalse(isinstance(f, abcmodule.TextIOBase)) + self.assertIsInstance(f, abcmodule.IOBase) + self.assertNotIsInstance(f, abcmodule.RawIOBase) + self.assertIsInstance(f, abcmodule.BufferedIOBase) + self.assertNotIsInstance(f, abcmodule.TextIOBase) with self.open(support.TESTFN, "w") as f: - self.assertTrue(isinstance(f, abcmodule.IOBase)) - self.assertFalse(isinstance(f, abcmodule.RawIOBase)) - self.assertFalse(isinstance(f, abcmodule.BufferedIOBase)) - self.assertTrue(isinstance(f, abcmodule.TextIOBase)) + self.assertIsInstance(f, abcmodule.IOBase) + self.assertNotIsInstance(f, abcmodule.RawIOBase) + self.assertNotIsInstance(f, abcmodule.BufferedIOBase) + self.assertIsInstance(f, abcmodule.TextIOBase) def test_abc_inheritance(self): # Test implementations inherit from their respective ABCs @@ -2531,6 +2569,63 @@ class MiscIOTest(unittest.TestCase): # baseline "io" module. self._check_abc_inheritance(io) + def _check_warn_on_dealloc(self, *args, **kwargs): + f = open(*args, **kwargs) + r = repr(f) + with self.assertWarns(ResourceWarning) as cm: + f = None + support.gc_collect() + self.assertIn(r, str(cm.warning.args[0])) + + def test_warn_on_dealloc(self): + self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0) + self._check_warn_on_dealloc(support.TESTFN, "wb") + self._check_warn_on_dealloc(support.TESTFN, "w") + + def _check_warn_on_dealloc_fd(self, *args, **kwargs): + fds = [] + def cleanup_fds(): + for fd in fds: + try: + os.close(fd) + except EnvironmentError as e: + if e.errno != errno.EBADF: + raise + self.addCleanup(cleanup_fds) + r, w = os.pipe() + fds += r, w + self._check_warn_on_dealloc(r, *args, **kwargs) + # When using closefd=False, there's no warning + r, w = os.pipe() + fds += r, w + with warnings.catch_warnings(record=True) as recorded: + open(r, *args, closefd=False, **kwargs) + support.gc_collect() + self.assertEqual(recorded, []) + + def test_warn_on_dealloc_fd(self): + self._check_warn_on_dealloc_fd("rb", buffering=0) + self._check_warn_on_dealloc_fd("rb") + self._check_warn_on_dealloc_fd("r") + + + def test_pickling(self): + # Pickling file objects is forbidden + for kwargs in [ + {"mode": "w"}, + {"mode": "wb"}, + {"mode": "wb", "buffering": 0}, + {"mode": "r"}, + {"mode": "rb"}, + {"mode": "rb", "buffering": 0}, + {"mode": "w+"}, + {"mode": "w+b"}, + {"mode": "w+b", "buffering": 0}, + ]: + for protocol in range(pickle.HIGHEST_PROTOCOL + 1): + with self.open(support.TESTFN, **kwargs) as f: + self.assertRaises(TypeError, pickle.dumps, f, protocol) + class CMiscIOTest(MiscIOTest): io = io @@ -2553,7 +2648,8 @@ class SignalsTest(unittest.TestCase): @unittest.skipUnless(threading, 'Threading required for this test.') def check_interrupted_write(self, item, bytes, **fdopen_kwargs): """Check that a partial write, when it gets interrupted, properly - invokes the signal handler.""" + invokes the signal handler, and bubbles up the exception raised + in the latter.""" read_results = [] def _read(): s = os.read(r, 1) @@ -2561,6 +2657,7 @@ class SignalsTest(unittest.TestCase): t = threading.Thread(target=_read) t.daemon = True r, w = os.pipe() + fdopen_kwargs["closefd"] = False try: wio = self.io.open(w, **fdopen_kwargs) t.start() @@ -2618,6 +2715,9 @@ class SignalsTest(unittest.TestCase): wio.flush() # Make sure the buffer doesn't fill up and block further writes os.read(r, len(data) * 100) + exc = cm.exception + if isinstance(exc, RuntimeError): + self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) finally: wio.close() os.close(r) @@ -2628,6 +2728,98 @@ class SignalsTest(unittest.TestCase): def test_reentrant_write_text(self): self.check_reentrant_write("xy", mode="w", encoding="ascii") + def check_interrupted_read_retry(self, decode, **fdopen_kwargs): + """Check that a buffered read, when it gets interrupted (either + returning a partial result or EINTR), properly invokes the signal + handler and retries if the latter returned successfully.""" + r, w = os.pipe() + fdopen_kwargs["closefd"] = False + def alarm_handler(sig, frame): + os.write(w, b"bar") + signal.signal(signal.SIGALRM, alarm_handler) + try: + rio = self.io.open(r, **fdopen_kwargs) + os.write(w, b"foo") + signal.alarm(1) + # Expected behaviour: + # - first raw read() returns partial b"foo" + # - second raw read() returns EINTR + # - third raw read() returns b"bar" + self.assertEqual(decode(rio.read(6)), "foobar") + finally: + rio.close() + os.close(w) + os.close(r) + + def test_interrupterd_read_retry_buffered(self): + self.check_interrupted_read_retry(lambda x: x.decode('latin1'), + mode="rb") + + def test_interrupterd_read_retry_text(self): + self.check_interrupted_read_retry(lambda x: x, + mode="r") + + @unittest.skipUnless(threading, 'Threading required for this test.') + def check_interrupted_write_retry(self, item, **fdopen_kwargs): + """Check that a buffered write, when it gets interrupted (either + returning a partial result or EINTR), properly invokes the signal + handler and retries if the latter returned successfully.""" + select = support.import_module("select") + # A quantity that exceeds the buffer size of an anonymous pipe's + # write end. + N = 1024 * 1024 + r, w = os.pipe() + fdopen_kwargs["closefd"] = False + # We need a separate thread to read from the pipe and allow the + # write() to finish. This thread is started after the SIGALRM is + # received (forcing a first EINTR in write()). + read_results = [] + write_finished = False + def _read(): + while not write_finished: + while r in select.select([r], [], [], 1.0)[0]: + s = os.read(r, 1024) + read_results.append(s) + t = threading.Thread(target=_read) + t.daemon = True + def alarm1(sig, frame): + signal.signal(signal.SIGALRM, alarm2) + signal.alarm(1) + def alarm2(sig, frame): + t.start() + signal.signal(signal.SIGALRM, alarm1) + try: + wio = self.io.open(w, **fdopen_kwargs) + signal.alarm(1) + # Expected behaviour: + # - first raw write() is partial (because of the limited pipe buffer + # and the first alarm) + # - second raw write() returns EINTR (because of the second alarm) + # - subsequent write()s are successful (either partial or complete) + self.assertEqual(N, wio.write(item * N)) + wio.flush() + write_finished = True + t.join() + self.assertEqual(N, sum(len(x) for x in read_results)) + finally: + write_finished = True + os.close(w) + os.close(r) + # This is deliberate. If we didn't close the file descriptor + # before closing wio, wio would try to flush its internal + # buffer, and could block (in case of failure). + try: + wio.close() + except IOError as e: + if e.errno != errno.EBADF: + raise + + def test_interrupterd_write_retry_buffered(self): + self.check_interrupted_write_retry(b"x", mode="wb") + + def test_interrupterd_write_retry_text(self): + self.check_interrupted_write_retry("x", mode="w", encoding="latin1") + class CSignalsTest(SignalsTest): io = io @@ -2657,7 +2849,7 @@ def test_main(): # Put the namespaces of the IO module we are testing and some useful mock # classes in the __dict__ of each test. mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, - MockNonBlockWriterIO, MockRawIOWithoutRead) + MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead) all_members = io.__all__ + ["IncrementalNewlineDecoder"] c_io_ns = {name : getattr(io, name) for name in all_members} py_io_ns = {name : getattr(pyio, name) for name in all_members} |