diff options
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 134 |
1 files changed, 114 insertions, 20 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 668b023..e5c6073 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -44,10 +44,6 @@ try: import threading except ImportError: threading = None -try: - import fcntl -except ImportError: - fcntl = None def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" @@ -367,8 +363,8 @@ class IOTest(unittest.TestCase): def test_open_handles_NUL_chars(self): fn_with_NUL = 'foo\0bar' - self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') - self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') + self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') + self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') def test_raw_file_io(self): with self.open(support.TESTFN, "wb", buffering=0) as f: @@ -811,7 +807,7 @@ class CommonBufferedTests: def test_repr(self): raw = self.MockRawIO() b = self.tp(raw) - clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) + clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__) self.assertEqual(repr(b), "<%s>" % clsname) raw.name = "dummy" self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) @@ -985,6 +981,71 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(bufio.readinto(b), 1) self.assertEqual(b, b"cb") + def test_readinto1(self): + buffer_size = 10 + rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) + bufio = self.tp(rawio, buffer_size=buffer_size) + b = bytearray(2) + self.assertEqual(bufio.peek(3), b'abc') + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 2) + self.assertEqual(b, b"ab") + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 1) + self.assertEqual(b[:1], b"c") + self.assertEqual(rawio._reads, 1) + self.assertEqual(bufio.readinto1(b), 2) + self.assertEqual(b, b"de") + self.assertEqual(rawio._reads, 2) + b = bytearray(2*buffer_size) + self.assertEqual(bufio.peek(3), b'fgh') + self.assertEqual(rawio._reads, 3) + self.assertEqual(bufio.readinto1(b), 6) + self.assertEqual(b[:6], b"fghjkl") + self.assertEqual(rawio._reads, 4) + + def test_readinto_array(self): + buffer_size = 60 + data = b"a" * 26 + rawio = self.MockRawIO((data,)) + bufio = self.tp(rawio, buffer_size=buffer_size) + + # Create an array with element size > 1 byte + b = array.array('i', b'x' * 32) + assert len(b) != 16 + + # Read into it. We should get as many *bytes* as we can fit into b + # (which is more than the number of elements) + n = bufio.readinto(b) + self.assertGreater(n, len(b)) + + # Check that old contents of b are preserved + bm = memoryview(b).cast('B') + self.assertLess(n, len(bm)) + self.assertEqual(bm[:n], data[:n]) + self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) + + def test_readinto1_array(self): + buffer_size = 60 + data = b"a" * 26 + rawio = self.MockRawIO((data,)) + bufio = self.tp(rawio, buffer_size=buffer_size) + + # Create an array with element size > 1 byte + b = array.array('i', b'x' * 32) + assert len(b) != 16 + + # Read into it. We should get as many *bytes* as we can fit into b + # (which is more than the number of elements) + n = bufio.readinto1(b) + self.assertGreater(n, len(b)) + + # Check that old contents of b are preserved + bm = memoryview(b).cast('B') + self.assertLess(n, len(bm)) + self.assertEqual(bm[:n], data[:n]) + self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) + def test_readlines(self): def bufio(): rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) @@ -2844,6 +2905,17 @@ class TextIOWrapperTest(unittest.TestCase): self.assertFalse(err) self.assertEqual("ok", out.decode().strip()) + def test_read_byteslike(self): + r = MemviewBytesIO(b'Just some random string\n') + t = self.TextIOWrapper(r, 'utf-8') + + # TextIOwrapper will not read the full string, because + # we truncate it to a multiple of the native int size + # so that we can construct a more complex memoryview. + bytes_val = _to_memoryview(r.getvalue()).tobytes() + + self.assertEqual(t.read(200), bytes_val.decode('utf-8')) + def test_issue22849(self): class F(object): def readable(self): return True @@ -2860,6 +2932,25 @@ class TextIOWrapperTest(unittest.TestCase): t = self.TextIOWrapper(F(), encoding='utf-8') +class MemviewBytesIO(io.BytesIO): + '''A BytesIO object whose read method returns memoryviews + rather than bytes''' + + def read1(self, len_): + return _to_memoryview(super().read1(len_)) + + def read(self, len_): + return _to_memoryview(super().read(len_)) + +def _to_memoryview(buf): + '''Convert bytes-object *buf* to a non-trivial memoryview''' + + arr = array.array('i') + idx = len(buf) - len(buf) % arr.itemsize + arr.frombytes(buf[:idx]) + return memoryview(arr) + + class CTextIOWrapperTest(TextIOWrapperTest): io = io shutdown_error = "RuntimeError: could not find io module state" @@ -3106,6 +3197,8 @@ class MiscIOTest(unittest.TestCase): self.assertRaises(ValueError, f.readall) if hasattr(f, "readinto"): self.assertRaises(ValueError, f.readinto, bytearray(1024)) + if hasattr(f, "readinto1"): + self.assertRaises(ValueError, f.readinto1, bytearray(1024)) self.assertRaises(ValueError, f.readline) self.assertRaises(ValueError, f.readlines) self.assertRaises(ValueError, f.seek, 0) @@ -3219,26 +3312,20 @@ class MiscIOTest(unittest.TestCase): with self.open(support.TESTFN, **kwargs) as f: self.assertRaises(TypeError, pickle.dumps, f, protocol) - @unittest.skipUnless(fcntl, 'fcntl required for this test') def test_nonblock_pipe_write_bigbuf(self): self._test_nonblock_pipe_write(16*1024) - @unittest.skipUnless(fcntl, 'fcntl required for this test') def test_nonblock_pipe_write_smallbuf(self): self._test_nonblock_pipe_write(1024) - def _set_non_blocking(self, fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - self.assertNotEqual(flags, -1) - res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - self.assertEqual(res, 0) - + @unittest.skipUnless(hasattr(os, 'set_blocking'), + 'os.set_blocking() required for this test') def _test_nonblock_pipe_write(self, bufsize): sent = [] received = [] r, w = os.pipe() - self._set_non_blocking(r) - self._set_non_blocking(w) + os.set_blocking(r, False) + os.set_blocking(w, False) # To exercise all code paths in the C implementation we need # to play with buffer sizes. For instance, if we choose a @@ -3344,6 +3431,7 @@ class SignalsTest(unittest.TestCase): t.daemon = True r, w = os.pipe() fdopen_kwargs["closefd"] = False + large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) try: wio = self.io.open(w, **fdopen_kwargs) t.start() @@ -3355,8 +3443,7 @@ class SignalsTest(unittest.TestCase): # handlers, which in this case will invoke alarm_interrupt(). signal.alarm(1) try: - self.assertRaises(ZeroDivisionError, - wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1)) + self.assertRaises(ZeroDivisionError, wio.write, large_data) finally: signal.alarm(0) t.join() @@ -3457,11 +3544,13 @@ class SignalsTest(unittest.TestCase): returning a partial result or EINTR), properly invokes the signal handler and retries if the latter returned successfully.""" select = support.import_module("select") + # A quantity that exceeds the buffer size of an anonymous pipe's # write end. N = support.PIPE_MAX_SIZE r, w = os.pipe() fdopen_kwargs["closefd"] = False + # We need a separate thread to read from the pipe and allow the # write() to finish. This thread is started after the SIGALRM is # received (forcing a first EINTR in write()). @@ -3479,6 +3568,8 @@ class SignalsTest(unittest.TestCase): signal.alarm(1) def alarm2(sig, frame): t.start() + + large_data = item * N signal.signal(signal.SIGALRM, alarm1) try: wio = self.io.open(w, **fdopen_kwargs) @@ -3488,10 +3579,13 @@ class SignalsTest(unittest.TestCase): # and the first alarm) # - second raw write() returns EINTR (because of the second alarm) # - subsequent write()s are successful (either partial or complete) - self.assertEqual(N, wio.write(item * N)) + written = wio.write(large_data) + self.assertEqual(N, written) + wio.flush() write_finished = True t.join() + self.assertEqual(N, sum(len(x) for x in read_results)) finally: write_finished = True |