diff options
Diffstat (limited to 'Lib/test/test_io.py')
| -rw-r--r-- | Lib/test/test_io.py | 121 | 
1 files changed, 104 insertions, 17 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 668b023..a0a4670 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -44,10 +44,6 @@ try:      import threading  except ImportError:      threading = None -try: -    import fcntl -except ImportError: -    fcntl = None  def _default_chunk_size():      """Get the default TextIOWrapper chunk size""" @@ -367,8 +363,8 @@ class IOTest(unittest.TestCase):      def test_open_handles_NUL_chars(self):          fn_with_NUL = 'foo\0bar' -        self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') -        self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') +        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') +        self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')      def test_raw_file_io(self):          with self.open(support.TESTFN, "wb", buffering=0) as f: @@ -811,7 +807,7 @@ class CommonBufferedTests:      def test_repr(self):          raw = self.MockRawIO()          b = self.tp(raw) -        clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) +        clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)          self.assertEqual(repr(b), "<%s>" % clsname)          raw.name = "dummy"          self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) @@ -985,6 +981,71 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):          self.assertEqual(bufio.readinto(b), 1)          self.assertEqual(b, b"cb") +    def test_readinto1(self): +        buffer_size = 10 +        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) +        bufio = self.tp(rawio, buffer_size=buffer_size) +        b = bytearray(2) +        self.assertEqual(bufio.peek(3), b'abc') +        self.assertEqual(rawio._reads, 1) +        self.assertEqual(bufio.readinto1(b), 2) +        self.assertEqual(b, b"ab") +        self.assertEqual(rawio._reads, 1) +        self.assertEqual(bufio.readinto1(b), 1) +        self.assertEqual(b[:1], b"c") +        self.assertEqual(rawio._reads, 1) +        self.assertEqual(bufio.readinto1(b), 2) +        self.assertEqual(b, b"de") +        self.assertEqual(rawio._reads, 2) +        b = bytearray(2*buffer_size) +        self.assertEqual(bufio.peek(3), b'fgh') +        self.assertEqual(rawio._reads, 3) +        self.assertEqual(bufio.readinto1(b), 6) +        self.assertEqual(b[:6], b"fghjkl") +        self.assertEqual(rawio._reads, 4) + +    def test_readinto_array(self): +        buffer_size = 60 +        data = b"a" * 26 +        rawio = self.MockRawIO((data,)) +        bufio = self.tp(rawio, buffer_size=buffer_size) + +        # Create an array with element size > 1 byte +        b = array.array('i', b'x' * 32) +        assert len(b) != 16 + +        # Read into it. We should get as many *bytes* as we can fit into b +        # (which is more than the number of elements) +        n = bufio.readinto(b) +        self.assertGreater(n, len(b)) + +        # Check that old contents of b are preserved +        bm = memoryview(b).cast('B') +        self.assertLess(n, len(bm)) +        self.assertEqual(bm[:n], data[:n]) +        self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) + +    def test_readinto1_array(self): +        buffer_size = 60 +        data = b"a" * 26 +        rawio = self.MockRawIO((data,)) +        bufio = self.tp(rawio, buffer_size=buffer_size) + +        # Create an array with element size > 1 byte +        b = array.array('i', b'x' * 32) +        assert len(b) != 16 + +        # Read into it. We should get as many *bytes* as we can fit into b +        # (which is more than the number of elements) +        n = bufio.readinto1(b) +        self.assertGreater(n, len(b)) + +        # Check that old contents of b are preserved +        bm = memoryview(b).cast('B') +        self.assertLess(n, len(bm)) +        self.assertEqual(bm[:n], data[:n]) +        self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) +      def test_readlines(self):          def bufio():              rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) @@ -2844,6 +2905,17 @@ class TextIOWrapperTest(unittest.TestCase):          self.assertFalse(err)          self.assertEqual("ok", out.decode().strip()) +    def test_read_byteslike(self): +        r = MemviewBytesIO(b'Just some random string\n') +        t = self.TextIOWrapper(r, 'utf-8') + +        # TextIOwrapper will not read the full string, because +        # we truncate it to a multiple of the native int size +        # so that we can construct a more complex memoryview. +        bytes_val =  _to_memoryview(r.getvalue()).tobytes() + +        self.assertEqual(t.read(200), bytes_val.decode('utf-8')) +      def test_issue22849(self):          class F(object):              def readable(self): return True @@ -2860,6 +2932,25 @@ class TextIOWrapperTest(unittest.TestCase):          t = self.TextIOWrapper(F(), encoding='utf-8') +class MemviewBytesIO(io.BytesIO): +    '''A BytesIO object whose read method returns memoryviews +       rather than bytes''' + +    def read1(self, len_): +        return _to_memoryview(super().read1(len_)) + +    def read(self, len_): +        return _to_memoryview(super().read(len_)) + +def _to_memoryview(buf): +    '''Convert bytes-object *buf* to a non-trivial memoryview''' + +    arr = array.array('i') +    idx = len(buf) - len(buf) % arr.itemsize +    arr.frombytes(buf[:idx]) +    return memoryview(arr) + +  class CTextIOWrapperTest(TextIOWrapperTest):      io = io      shutdown_error = "RuntimeError: could not find io module state" @@ -3106,6 +3197,8 @@ class MiscIOTest(unittest.TestCase):                  self.assertRaises(ValueError, f.readall)              if hasattr(f, "readinto"):                  self.assertRaises(ValueError, f.readinto, bytearray(1024)) +            if hasattr(f, "readinto1"): +                self.assertRaises(ValueError, f.readinto1, bytearray(1024))              self.assertRaises(ValueError, f.readline)              self.assertRaises(ValueError, f.readlines)              self.assertRaises(ValueError, f.seek, 0) @@ -3219,26 +3312,20 @@ class MiscIOTest(unittest.TestCase):                  with self.open(support.TESTFN, **kwargs) as f:                      self.assertRaises(TypeError, pickle.dumps, f, protocol) -    @unittest.skipUnless(fcntl, 'fcntl required for this test')      def test_nonblock_pipe_write_bigbuf(self):          self._test_nonblock_pipe_write(16*1024) -    @unittest.skipUnless(fcntl, 'fcntl required for this test')      def test_nonblock_pipe_write_smallbuf(self):          self._test_nonblock_pipe_write(1024) -    def _set_non_blocking(self, fd): -        flags = fcntl.fcntl(fd, fcntl.F_GETFL) -        self.assertNotEqual(flags, -1) -        res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) -        self.assertEqual(res, 0) - +    @unittest.skipUnless(hasattr(os, 'set_blocking'), +                         'os.set_blocking() required for this test')      def _test_nonblock_pipe_write(self, bufsize):          sent = []          received = []          r, w = os.pipe() -        self._set_non_blocking(r) -        self._set_non_blocking(w) +        os.set_blocking(r, False) +        os.set_blocking(w, False)          # To exercise all code paths in the C implementation we need          # to play with buffer sizes.  For instance, if we choose a  | 
