diff options
Diffstat (limited to 'Lib/test/test_io.py')
| -rw-r--r-- | Lib/test/test_io.py | 133 | 
1 files changed, 113 insertions, 20 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 95277d9..9fb85f2 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -44,10 +44,6 @@ try:      import threading  except ImportError:      threading = None -try: -    import fcntl -except ImportError: -    fcntl = None  def _default_chunk_size():      """Get the default TextIOWrapper chunk size""" @@ -367,8 +363,8 @@ class IOTest(unittest.TestCase):      def test_open_handles_NUL_chars(self):          fn_with_NUL = 'foo\0bar' -        self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') -        self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w') +        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') +        self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')      def test_raw_file_io(self):          with self.open(support.TESTFN, "wb", buffering=0) as f: @@ -811,7 +807,7 @@ class CommonBufferedTests:      def test_repr(self):          raw = self.MockRawIO()          b = self.tp(raw) -        clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) +        clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)          self.assertEqual(repr(b), "<%s>" % clsname)          raw.name = "dummy"          self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) @@ -985,6 +981,71 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):          self.assertEqual(bufio.readinto(b), 1)          self.assertEqual(b, b"cb") +    def test_readinto1(self): +        buffer_size = 10 +        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) +        bufio = self.tp(rawio, buffer_size=buffer_size) +        b = bytearray(2) +        self.assertEqual(bufio.peek(3), b'abc') +        self.assertEqual(rawio._reads, 1) +        self.assertEqual(bufio.readinto1(b), 2) +        self.assertEqual(b, b"ab") +        self.assertEqual(rawio._reads, 1) +        self.assertEqual(bufio.readinto1(b), 1) +        self.assertEqual(b[:1], b"c") +        self.assertEqual(rawio._reads, 1) +        self.assertEqual(bufio.readinto1(b), 2) +        self.assertEqual(b, b"de") +        self.assertEqual(rawio._reads, 2) +        b = bytearray(2*buffer_size) +        self.assertEqual(bufio.peek(3), b'fgh') +        self.assertEqual(rawio._reads, 3) +        self.assertEqual(bufio.readinto1(b), 6) +        self.assertEqual(b[:6], b"fghjkl") +        self.assertEqual(rawio._reads, 4) + +    def test_readinto_array(self): +        buffer_size = 60 +        data = b"a" * 26 +        rawio = self.MockRawIO((data,)) +        bufio = self.tp(rawio, buffer_size=buffer_size) + +        # Create an array with element size > 1 byte +        b = array.array('i', b'x' * 32) +        assert len(b) != 16 + +        # Read into it. We should get as many *bytes* as we can fit into b +        # (which is more than the number of elements) +        n = bufio.readinto(b) +        self.assertGreater(n, len(b)) + +        # Check that old contents of b are preserved +        bm = memoryview(b).cast('B') +        self.assertLess(n, len(bm)) +        self.assertEqual(bm[:n], data[:n]) +        self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) + +    def test_readinto1_array(self): +        buffer_size = 60 +        data = b"a" * 26 +        rawio = self.MockRawIO((data,)) +        bufio = self.tp(rawio, buffer_size=buffer_size) + +        # Create an array with element size > 1 byte +        b = array.array('i', b'x' * 32) +        assert len(b) != 16 + +        # Read into it. We should get as many *bytes* as we can fit into b +        # (which is more than the number of elements) +        n = bufio.readinto1(b) +        self.assertGreater(n, len(b)) + +        # Check that old contents of b are preserved +        bm = memoryview(b).cast('B') +        self.assertLess(n, len(bm)) +        self.assertEqual(bm[:n], data[:n]) +        self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) +      def test_readlines(self):          def bufio():              rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) @@ -2881,6 +2942,17 @@ class TextIOWrapperTest(unittest.TestCase):          self.assertFalse(err)          self.assertEqual("ok", out.decode().strip()) +    def test_read_byteslike(self): +        r = MemviewBytesIO(b'Just some random string\n') +        t = self.TextIOWrapper(r, 'utf-8') + +        # TextIOwrapper will not read the full string, because +        # we truncate it to a multiple of the native int size +        # so that we can construct a more complex memoryview. +        bytes_val =  _to_memoryview(r.getvalue()).tobytes() + +        self.assertEqual(t.read(200), bytes_val.decode('utf-8')) +      def test_issue22849(self):          class F(object):              def readable(self): return True @@ -2897,6 +2969,25 @@ class TextIOWrapperTest(unittest.TestCase):          t = self.TextIOWrapper(F(), encoding='utf-8') +class MemviewBytesIO(io.BytesIO): +    '''A BytesIO object whose read method returns memoryviews +       rather than bytes''' + +    def read1(self, len_): +        return _to_memoryview(super().read1(len_)) + +    def read(self, len_): +        return _to_memoryview(super().read(len_)) + +def _to_memoryview(buf): +    '''Convert bytes-object *buf* to a non-trivial memoryview''' + +    arr = array.array('i') +    idx = len(buf) - len(buf) % arr.itemsize +    arr.frombytes(buf[:idx]) +    return memoryview(arr) + +  class CTextIOWrapperTest(TextIOWrapperTest):      io = io      shutdown_error = "RuntimeError: could not find io module state" @@ -3143,6 +3234,8 @@ class MiscIOTest(unittest.TestCase):                  self.assertRaises(ValueError, f.readall)              if hasattr(f, "readinto"):                  self.assertRaises(ValueError, f.readinto, bytearray(1024)) +            if hasattr(f, "readinto1"): +                self.assertRaises(ValueError, f.readinto1, bytearray(1024))              self.assertRaises(ValueError, f.readline)              self.assertRaises(ValueError, f.readlines)              self.assertRaises(ValueError, f.seek, 0) @@ -3256,26 +3349,20 @@ class MiscIOTest(unittest.TestCase):                  with self.open(support.TESTFN, **kwargs) as f:                      self.assertRaises(TypeError, pickle.dumps, f, protocol) -    @unittest.skipUnless(fcntl, 'fcntl required for this test')      def test_nonblock_pipe_write_bigbuf(self):          self._test_nonblock_pipe_write(16*1024) -    @unittest.skipUnless(fcntl, 'fcntl required for this test')      def test_nonblock_pipe_write_smallbuf(self):          self._test_nonblock_pipe_write(1024) -    def _set_non_blocking(self, fd): -        flags = fcntl.fcntl(fd, fcntl.F_GETFL) -        self.assertNotEqual(flags, -1) -        res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) -        self.assertEqual(res, 0) - +    @unittest.skipUnless(hasattr(os, 'set_blocking'), +                         'os.set_blocking() required for this test')      def _test_nonblock_pipe_write(self, bufsize):          sent = []          received = []          r, w = os.pipe() -        self._set_non_blocking(r) -        self._set_non_blocking(w) +        os.set_blocking(r, False) +        os.set_blocking(w, False)          # To exercise all code paths in the C implementation we need          # to play with buffer sizes.  For instance, if we choose a @@ -3381,6 +3468,7 @@ class SignalsTest(unittest.TestCase):          t.daemon = True          r, w = os.pipe()          fdopen_kwargs["closefd"] = False +        large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)          try:              wio = self.io.open(w, **fdopen_kwargs)              t.start() @@ -3392,8 +3480,7 @@ class SignalsTest(unittest.TestCase):              # handlers, which in this case will invoke alarm_interrupt().              signal.alarm(1)              try: -                with self.assertRaises(ZeroDivisionError): -                    wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1)) +                self.assertRaises(ZeroDivisionError, wio.write, large_data)              finally:                  signal.alarm(0)                  t.join() @@ -3494,11 +3581,13 @@ class SignalsTest(unittest.TestCase):          returning a partial result or EINTR), properly invokes the signal          handler and retries if the latter returned successfully."""          select = support.import_module("select") +          # A quantity that exceeds the buffer size of an anonymous pipe's          # write end.          N = support.PIPE_MAX_SIZE          r, w = os.pipe()          fdopen_kwargs["closefd"] = False +          # We need a separate thread to read from the pipe and allow the          # write() to finish.  This thread is started after the SIGALRM is          # received (forcing a first EINTR in write()). @@ -3521,6 +3610,8 @@ class SignalsTest(unittest.TestCase):              signal.alarm(1)          def alarm2(sig, frame):              t.start() + +        large_data = item * N          signal.signal(signal.SIGALRM, alarm1)          try:              wio = self.io.open(w, **fdopen_kwargs) @@ -3530,7 +3621,9 @@ class SignalsTest(unittest.TestCase):              #   and the first alarm)              # - second raw write() returns EINTR (because of the second alarm)              # - subsequent write()s are successful (either partial or complete) -            self.assertEqual(N, wio.write(item * N)) +            written = wio.write(large_data) +            self.assertEqual(N, written) +              wio.flush()              write_finished = True              t.join()  | 
