summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py119
1 files changed, 102 insertions, 17 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index b5506b0..a3567fa 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -44,10 +44,6 @@ try:
import threading
except ImportError:
threading = None
-try:
- import fcntl
-except ImportError:
- fcntl = None
def _default_chunk_size():
"""Get the default TextIOWrapper chunk size"""
@@ -367,8 +363,8 @@ class IOTest(unittest.TestCase):
def test_open_handles_NUL_chars(self):
fn_with_NUL = 'foo\0bar'
- self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
- self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
+ self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
+ self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
def test_raw_file_io(self):
with self.open(support.TESTFN, "wb", buffering=0) as f:
@@ -778,7 +774,7 @@ class CommonBufferedTests:
def test_repr(self):
raw = self.MockRawIO()
b = self.tp(raw)
- clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
+ clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
self.assertEqual(repr(b), "<%s>" % clsname)
raw.name = "dummy"
self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
@@ -943,6 +939,71 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
self.assertEqual(bufio.readinto(b), 1)
self.assertEqual(b, b"cb")
+ def test_readinto1(self):
+ buffer_size = 10
+ rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+ b = bytearray(2)
+ self.assertEqual(bufio.peek(3), b'abc')
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 2)
+ self.assertEqual(b, b"ab")
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 1)
+ self.assertEqual(b[:1], b"c")
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 2)
+ self.assertEqual(b, b"de")
+ self.assertEqual(rawio._reads, 2)
+ b = bytearray(2*buffer_size)
+ self.assertEqual(bufio.peek(3), b'fgh')
+ self.assertEqual(rawio._reads, 3)
+ self.assertEqual(bufio.readinto1(b), 6)
+ self.assertEqual(b[:6], b"fghjkl")
+ self.assertEqual(rawio._reads, 4)
+
+ def test_readinto_array(self):
+ buffer_size = 60
+ data = b"a" * 26
+ rawio = self.MockRawIO((data,))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+
+ # Create an array with element size > 1 byte
+ b = array.array('i', b'x' * 32)
+ assert len(b) != 16
+
+ # Read into it. We should get as many *bytes* as we can fit into b
+ # (which is more than the number of elements)
+ n = bufio.readinto(b)
+ self.assertGreater(n, len(b))
+
+ # Check that old contents of b are preserved
+ bm = memoryview(b).cast('B')
+ self.assertLess(n, len(bm))
+ self.assertEqual(bm[:n], data[:n])
+ self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
+
+ def test_readinto1_array(self):
+ buffer_size = 60
+ data = b"a" * 26
+ rawio = self.MockRawIO((data,))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+
+ # Create an array with element size > 1 byte
+ b = array.array('i', b'x' * 32)
+ assert len(b) != 16
+
+ # Read into it. We should get as many *bytes* as we can fit into b
+ # (which is more than the number of elements)
+ n = bufio.readinto1(b)
+ self.assertGreater(n, len(b))
+
+ # Check that old contents of b are preserved
+ bm = memoryview(b).cast('B')
+ self.assertLess(n, len(bm))
+ self.assertEqual(bm[:n], data[:n])
+ self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
+
def test_readlines(self):
def bufio():
rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
@@ -2784,6 +2845,34 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertFalse(err)
self.assertEqual("ok", out.decode().strip())
+ def test_read_byteslike(self):
+ r = MemviewBytesIO(b'Just some random string\n')
+ t = self.TextIOWrapper(r, 'utf-8')
+
+ # TextIOwrapper will not read the full string, because
+ # we truncate it to a multiple of the native int size
+ # so that we can construct a more complex memoryview.
+ bytes_val = _to_memoryview(r.getvalue()).tobytes()
+
+ self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
+
+class MemviewBytesIO(io.BytesIO):
+ '''A BytesIO object whose read method returns memoryviews
+ rather than bytes'''
+
+ def read1(self, len_):
+ return _to_memoryview(super().read1(len_))
+
+ def read(self, len_):
+ return _to_memoryview(super().read(len_))
+
+def _to_memoryview(buf):
+ '''Convert bytes-object *buf* to a non-trivial memoryview'''
+
+ arr = array.array('i')
+ idx = len(buf) - len(buf) % arr.itemsize
+ arr.frombytes(buf[:idx])
+ return memoryview(arr)
class CTextIOWrapperTest(TextIOWrapperTest):
io = io
@@ -3028,6 +3117,8 @@ class MiscIOTest(unittest.TestCase):
self.assertRaises(ValueError, f.readall)
if hasattr(f, "readinto"):
self.assertRaises(ValueError, f.readinto, bytearray(1024))
+ if hasattr(f, "readinto1"):
+ self.assertRaises(ValueError, f.readinto1, bytearray(1024))
self.assertRaises(ValueError, f.readline)
self.assertRaises(ValueError, f.readlines)
self.assertRaises(ValueError, f.seek, 0)
@@ -3141,26 +3232,20 @@ class MiscIOTest(unittest.TestCase):
with self.open(support.TESTFN, **kwargs) as f:
self.assertRaises(TypeError, pickle.dumps, f, protocol)
- @unittest.skipUnless(fcntl, 'fcntl required for this test')
def test_nonblock_pipe_write_bigbuf(self):
self._test_nonblock_pipe_write(16*1024)
- @unittest.skipUnless(fcntl, 'fcntl required for this test')
def test_nonblock_pipe_write_smallbuf(self):
self._test_nonblock_pipe_write(1024)
- def _set_non_blocking(self, fd):
- flags = fcntl.fcntl(fd, fcntl.F_GETFL)
- self.assertNotEqual(flags, -1)
- res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- self.assertEqual(res, 0)
-
+ @unittest.skipUnless(hasattr(os, 'set_blocking'),
+ 'os.set_blocking() required for this test')
def _test_nonblock_pipe_write(self, bufsize):
sent = []
received = []
r, w = os.pipe()
- self._set_non_blocking(r)
- self._set_non_blocking(w)
+ os.set_blocking(r, False)
+ os.set_blocking(w, False)
# To exercise all code paths in the C implementation we need
# to play with buffer sizes. For instance, if we choose a