diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2009-06-12 20:14:08 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2009-06-12 20:14:08 (GMT) |
commit | 19690593273a5b210a9b9ea72bd59840d02759b0 (patch) | |
tree | 2c3c63588e62e5bc3c93c0f49d30e39ee6763c8f /Lib/test/test_io.py | |
parent | 55bd1efb2ae7b559ac6a89a6f24bb62560eeafcf (diff) | |
download | cpython-19690593273a5b210a9b9ea72bd59840d02759b0.zip cpython-19690593273a5b210a9b9ea72bd59840d02759b0.tar.gz cpython-19690593273a5b210a9b9ea72bd59840d02759b0.tar.bz2 |
Issue #6215: backport the 3.1 io lib
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 1996 |
1 files changed, 1508 insertions, 488 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 02533c9..62b6142 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -1,4 +1,24 @@ -"""Unit tests for io.py.""" +"""Unit tests for the io module.""" + +# Tests of io are scattered over the test suite: +# * test_bufio - tests file buffering +# * test_memoryio - tests BytesIO and StringIO +# * test_fileio - tests FileIO +# * test_file - tests the file interface +# * test_io - tests everything else in the io module +# * test_univnewlines - tests universal newline support +# * test_largefile - tests operations on a file greater than 2**32 bytes +# (only enabled with -ulargefile) + +################################################################################ +# ATTENTION TEST WRITERS!!! +################################################################################ +# When writing tests for io, it's important to test both the C and Python +# implementations. This is usually done by writing a base test that refers to +# the type it is testing as a attribute. Then it provides custom subclasses to +# test both implementations. This file has lots of examples. +################################################################################ + from __future__ import print_function from __future__ import unicode_literals @@ -9,27 +29,43 @@ import array import threading import random import unittest -from itertools import chain, cycle -from test import test_support +import warnings +import weakref +import gc +import abc +from itertools import chain, cycle, count +from collections import deque +from test import test_support as support import codecs -import io # The module under test +import io # C implementation of io +import _pyio as pyio # Python implementation of io + +__metaclass__ = type +bytes = support.py3k_bytes + +def _default_chunk_size(): + """Get the default TextIOWrapper chunk size""" + with io.open(__file__, "r", encoding="latin1") as f: + return f._CHUNK_SIZE -class MockRawIO(io.RawIOBase): +class MockRawIO: def __init__(self, read_stack=()): self._read_stack = list(read_stack) self._write_stack = [] + self._reads = 0 def read(self, n=None): + self._reads += 1 try: return self._read_stack.pop(0) except: return b"" def write(self, b): - self._write_stack.append(b[:]) + self._write_stack.append(bytes(b)) return len(b) def writable(self): @@ -45,46 +81,156 @@ class MockRawIO(io.RawIOBase): return True def seek(self, pos, whence): - pass + return 0 # wrong but we gotta return something def tell(self): - return 42 + return 0 # same comment as above + + def readinto(self, buf): + self._reads += 1 + max_len = len(buf) + try: + data = self._read_stack[0] + except IndexError: + return 0 + if data is None: + del self._read_stack[0] + return None + n = len(data) + if len(data) <= max_len: + del self._read_stack[0] + buf[:n] = data + return n + else: + buf[:] = data[:max_len] + self._read_stack[0] = data[max_len:] + return max_len + def truncate(self, pos=None): + return pos -class MockFileIO(io.BytesIO): +class CMockRawIO(MockRawIO, io.RawIOBase): + pass + +class PyMockRawIO(MockRawIO, pyio.RawIOBase): + pass + + +class MisbehavedRawIO(MockRawIO): + def write(self, b): + return MockRawIO.write(self, b) * 2 + + def read(self, n=None): + return MockRawIO.read(self, n) * 2 + + def seek(self, pos, whence): + return -123 + + def tell(self): + return -456 + + def readinto(self, buf): + MockRawIO.readinto(self, buf) + return len(buf) * 5 + +class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): + pass + +class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): + pass + + +class CloseFailureIO(MockRawIO): + closed = 0 + + def close(self): + if not self.closed: + self.closed = 1 + raise IOError + +class CCloseFailureIO(CloseFailureIO, io.RawIOBase): + pass + +class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): + pass + + +class MockFileIO: def __init__(self, data): self.read_history = [] - io.BytesIO.__init__(self, data) + super(MockFileIO, self).__init__(data) def read(self, n=None): - res = io.BytesIO.read(self, n) + res = super(MockFileIO, self).read(n) self.read_history.append(None if res is None else len(res)) return res + def readinto(self, b): + res = super(MockFileIO, self).readinto(b) + self.read_history.append(res) + return res + +class CMockFileIO(MockFileIO, io.BytesIO): + pass + +class PyMockFileIO(MockFileIO, pyio.BytesIO): + pass + -class MockNonBlockWriterIO(io.RawIOBase): +class MockNonBlockWriterIO: - def __init__(self, blocking_script): - self._blocking_script = list(blocking_script) + def __init__(self): self._write_stack = [] + self._blocker_char = None - def write(self, b): - self._write_stack.append(b[:]) - n = self._blocking_script.pop(0) - if (n < 0): - raise io.BlockingIOError(0, "test blocking", -n) - else: - return n + def pop_written(self): + s = b"".join(self._write_stack) + self._write_stack[:] = [] + return s + + def block_on(self, char): + """Block when a given char is encountered.""" + self._blocker_char = char + + def readable(self): + return True + + def seekable(self): + return True def writable(self): return True + def write(self, b): + b = bytes(b) + n = -1 + if self._blocker_char: + try: + n = b.index(self._blocker_char) + except ValueError: + pass + else: + self._blocker_char = None + self._write_stack.append(b[:n]) + raise self.BlockingIOError(0, "test blocking", n) + self._write_stack.append(b) + return len(b) + +class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): + BlockingIOError = io.BlockingIOError + +class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): + BlockingIOError = pyio.BlockingIOError + class IOTest(unittest.TestCase): + def setUp(self): + support.unlink(support.TESTFN) + def tearDown(self): - test_support.unlink(test_support.TESTFN) + support.unlink(support.TESTFN) def write_ops(self, f): self.assertEqual(f.write(b"blah."), 5) @@ -149,60 +295,71 @@ class IOTest(unittest.TestCase): self.assertEqual(f.seek(-1, 2), self.LARGE) self.assertEqual(f.read(2), b"x") + def test_invalid_operations(self): + # Try writing on a file opened in read mode and vice-versa. + for mode in ("w", "wb"): + with open(support.TESTFN, mode) as fp: + self.assertRaises(IOError, fp.read) + self.assertRaises(IOError, fp.readline) + with open(support.TESTFN, "rb") as fp: + self.assertRaises(IOError, fp.write, b"blah") + self.assertRaises(IOError, fp.writelines, [b"blah\n"]) + with open(support.TESTFN, "r") as fp: + self.assertRaises(IOError, fp.write, "blah") + self.assertRaises(IOError, fp.writelines, ["blah\n"]) + def test_raw_file_io(self): - f = io.open(test_support.TESTFN, "wb", buffering=0) - self.assertEqual(f.readable(), False) - self.assertEqual(f.writable(), True) - self.assertEqual(f.seekable(), True) - self.write_ops(f) - f.close() - f = io.open(test_support.TESTFN, "rb", buffering=0) - self.assertEqual(f.readable(), True) - self.assertEqual(f.writable(), False) - self.assertEqual(f.seekable(), True) - self.read_ops(f) - f.close() + with self.open(support.TESTFN, "wb", buffering=0) as f: + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + self.write_ops(f) + with self.open(support.TESTFN, "rb", buffering=0) as f: + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), False) + self.assertEqual(f.seekable(), True) + self.read_ops(f) def test_buffered_file_io(self): - f = io.open(test_support.TESTFN, "wb") - self.assertEqual(f.readable(), False) - self.assertEqual(f.writable(), True) - self.assertEqual(f.seekable(), True) - self.write_ops(f) - f.close() - f = io.open(test_support.TESTFN, "rb") - self.assertEqual(f.readable(), True) - self.assertEqual(f.writable(), False) - self.assertEqual(f.seekable(), True) - self.read_ops(f, True) - f.close() + with self.open(support.TESTFN, "wb") as f: + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + self.write_ops(f) + with self.open(support.TESTFN, "rb") as f: + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), False) + self.assertEqual(f.seekable(), True) + self.read_ops(f, True) def test_readline(self): - f = io.open(test_support.TESTFN, "wb") - f.write(b"abc\ndef\nxyzzy\nfoo") - f.close() - f = io.open(test_support.TESTFN, "rb") - self.assertEqual(f.readline(), b"abc\n") - self.assertEqual(f.readline(10), b"def\n") - self.assertEqual(f.readline(2), b"xy") - self.assertEqual(f.readline(4), b"zzy\n") - self.assertEqual(f.readline(), b"foo") - f.close() + with self.open(support.TESTFN, "wb") as f: + f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") + with self.open(support.TESTFN, "rb") as f: + self.assertEqual(f.readline(), b"abc\n") + self.assertEqual(f.readline(10), b"def\n") + self.assertEqual(f.readline(2), b"xy") + self.assertEqual(f.readline(4), b"zzy\n") + self.assertEqual(f.readline(), b"foo\x00bar\n") + self.assertEqual(f.readline(), b"another line") + self.assertRaises(TypeError, f.readline, 5.3) + with self.open(support.TESTFN, "r") as f: + self.assertRaises(TypeError, f.readline, 5.3) def test_raw_bytes_io(self): - f = io.BytesIO() + f = self.BytesIO() self.write_ops(f) data = f.getvalue() self.assertEqual(data, b"hello world\n") - f = io.BytesIO(data) + f = self.BytesIO(data) self.read_ops(f, True) def test_large_file_ops(self): # On Windows and Mac OSX this test comsumes large resources; It takes # a long time to build the >2GB file and takes >2GB of disk space # therefore the resource must be enabled to run this test. - if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin': - if not test_support.is_resource_enabled("largefile"): + if sys.platform[:3] == 'win' or sys.platform == 'darwin': + if not support.is_resource_enabled("largefile"): print("\nTesting large file ops skipped on %s." % sys.platform, file=sys.stderr) print("It requires %d bytes and a long time." % self.LARGE, @@ -210,22 +367,20 @@ class IOTest(unittest.TestCase): print("Use 'regrtest.py -u largefile test_io' to run it.", file=sys.stderr) return - f = io.open(test_support.TESTFN, "w+b", 0) - self.large_file_ops(f) - f.close() - f = io.open(test_support.TESTFN, "w+b") - self.large_file_ops(f) - f.close() + with self.open(support.TESTFN, "w+b", 0) as f: + self.large_file_ops(f) + with self.open(support.TESTFN, "w+b") as f: + self.large_file_ops(f) def test_with_open(self): for bufsize in (0, 1, 100): f = None - with open(test_support.TESTFN, "wb", bufsize) as f: + with open(support.TESTFN, "wb", bufsize) as f: f.write(b"xxx") self.assertEqual(f.closed, True) f = None try: - with open(test_support.TESTFN, "wb", bufsize) as f: + with open(support.TESTFN, "wb", bufsize) as f: 1/0 except ZeroDivisionError: self.assertEqual(f.closed, True) @@ -234,60 +389,105 @@ class IOTest(unittest.TestCase): # issue 5008 def test_append_mode_tell(self): - with io.open(test_support.TESTFN, "wb") as f: + with self.open(support.TESTFN, "wb") as f: f.write(b"xxx") - with io.open(test_support.TESTFN, "ab", buffering=0) as f: + with self.open(support.TESTFN, "ab", buffering=0) as f: self.assertEqual(f.tell(), 3) - with io.open(test_support.TESTFN, "ab") as f: + with self.open(support.TESTFN, "ab") as f: self.assertEqual(f.tell(), 3) - with io.open(test_support.TESTFN, "a") as f: + with self.open(support.TESTFN, "a") as f: self.assert_(f.tell() > 0) def test_destructor(self): record = [] - class MyFileIO(io.FileIO): + class MyFileIO(self.FileIO): def __del__(self): record.append(1) - io.FileIO.__del__(self) + try: + f = super(MyFileIO, self).__del__ + except AttributeError: + pass + else: + f() def close(self): record.append(2) - io.FileIO.close(self) + super(MyFileIO, self).close() def flush(self): record.append(3) - io.FileIO.flush(self) - f = MyFileIO(test_support.TESTFN, "w") - f.write("xxx") + super(MyFileIO, self).flush() + f = MyFileIO(support.TESTFN, "wb") + f.write(b"xxx") + del f + support.gc_collect() + self.assertEqual(record, [1, 2, 3]) + with open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"xxx") + + def _check_base_destructor(self, base): + record = [] + class MyIO(base): + def __init__(self): + # This exercises the availability of attributes on object + # destruction. + # (in the C version, close() is called by the tp_dealloc + # function, not by __del__) + self.on_del = 1 + self.on_close = 2 + self.on_flush = 3 + def __del__(self): + record.append(self.on_del) + try: + f = super(MyIO, self).__del__ + except AttributeError: + pass + else: + f() + def close(self): + record.append(self.on_close) + super(MyIO, self).close() + def flush(self): + record.append(self.on_flush) + super(MyIO, self).flush() + f = MyIO() del f + support.gc_collect() self.assertEqual(record, [1, 2, 3]) + def test_IOBase_destructor(self): + self._check_base_destructor(self.IOBase) + + def test_RawIOBase_destructor(self): + self._check_base_destructor(self.RawIOBase) + + def test_BufferedIOBase_destructor(self): + self._check_base_destructor(self.BufferedIOBase) + + def test_TextIOBase_destructor(self): + self._check_base_destructor(self.TextIOBase) + def test_close_flushes(self): - f = io.open(test_support.TESTFN, "wb") - f.write(b"xxx") - f.close() - f = io.open(test_support.TESTFN, "rb") - self.assertEqual(f.read(), b"xxx") - f.close() + with self.open(support.TESTFN, "wb") as f: + f.write(b"xxx") + with self.open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"xxx") - def XXXtest_array_writes(self): - # XXX memory view not available yet - a = array.array('i', range(10)) - n = len(memoryview(a)) - f = io.open(test_support.TESTFN, "wb", 0) - self.assertEqual(f.write(a), n) - f.close() - f = io.open(test_support.TESTFN, "wb") - self.assertEqual(f.write(a), n) - f.close() + def test_array_writes(self): + a = array.array(b'i', range(10)) + n = len(a.tostring()) + with self.open(support.TESTFN, "wb", 0) as f: + self.assertEqual(f.write(a), n) + with self.open(support.TESTFN, "wb") as f: + self.assertEqual(f.write(a), n) def test_closefd(self): - self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w', + self.assertRaises(ValueError, self.open, support.TESTFN, 'w', closefd=False) - def testReadClosed(self): - with io.open(test_support.TESTFN, "w") as f: + def test_read_closed(self): + with self.open(support.TESTFN, "w") as f: f.write("egg\n") - with io.open(test_support.TESTFN, "r") as f: - file = io.open(f.fileno(), "r", closefd=False) + with self.open(support.TESTFN, "r") as f: + file = self.open(f.fileno(), "r", closefd=False) self.assertEqual(file.read(), "egg\n") file.seek(0) file.close() @@ -295,86 +495,203 @@ class IOTest(unittest.TestCase): def test_no_closefd_with_filename(self): # can't use closefd in combination with a file name - self.assertRaises(ValueError, - io.open, test_support.TESTFN, "r", closefd=False) + self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) def test_closefd_attr(self): - with io.open(test_support.TESTFN, "wb") as f: + with self.open(support.TESTFN, "wb") as f: f.write(b"egg\n") - with io.open(test_support.TESTFN, "r") as f: + with self.open(support.TESTFN, "r") as f: self.assertEqual(f.buffer.raw.closefd, True) - file = io.open(f.fileno(), "r", closefd=False) + file = self.open(f.fileno(), "r", closefd=False) self.assertEqual(file.buffer.raw.closefd, False) + def test_garbage_collection(self): + # FileIO objects are collected, and collecting them flushes + # all data to disk. + f = self.FileIO(support.TESTFN, "wb") + f.write(b"abcxxx") + f.f = f + wr = weakref.ref(f) + del f + support.gc_collect() + self.assert_(wr() is None, wr) + with open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"abcxxx") + + def test_unbounded_file(self): + # Issue #1174606: reading from an unbounded stream such as /dev/zero. + zero = "/dev/zero" + if not os.path.exists(zero): + self.skipTest("{0} does not exist".format(zero)) + if sys.maxsize > 0x7FFFFFFF: + self.skipTest("test can only run in a 32-bit address space") + if support.real_max_memuse < support._2G: + self.skipTest("test requires at least 2GB of memory") + with open(zero, "rb", buffering=0) as f: + self.assertRaises(OverflowError, f.read) + with open(zero, "rb") as f: + self.assertRaises(OverflowError, f.read) + with open(zero, "r") as f: + self.assertRaises(OverflowError, f.read) + +class CIOTest(IOTest): + pass + +class PyIOTest(IOTest): + test_array_writes = unittest.skip( + "len(array.array) returns number of elements rather than bytelength" + )(IOTest.test_array_writes) + + +class CommonBufferedTests: + # Tests common to BufferedReader, BufferedWriter and BufferedRandom + + def test_detach(self): + raw = self.MockRawIO() + buf = self.tp(raw) + self.assertIs(buf.detach(), raw) + self.assertRaises(ValueError, buf.detach) + + def test_fileno(self): + rawio = self.MockRawIO() + bufio = self.tp(rawio) -class MemorySeekTestMixin: - - def testInit(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - def testRead(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - self.assertEquals(buf[:1], bytesIo.read(1)) - self.assertEquals(buf[1:5], bytesIo.read(4)) - self.assertEquals(buf[5:], bytesIo.read(900)) - self.assertEquals(self.EOF, bytesIo.read()) - - def testReadNoArgs(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - self.assertEquals(buf, bytesIo.read()) - self.assertEquals(self.EOF, bytesIo.read()) - - def testSeek(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - bytesIo.read(5) - bytesIo.seek(0) - self.assertEquals(buf, bytesIo.read()) - - bytesIo.seek(3) - self.assertEquals(buf[3:], bytesIo.read()) - self.assertRaises(TypeError, bytesIo.seek, 0.0) - - def testTell(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - self.assertEquals(0, bytesIo.tell()) - bytesIo.seek(5) - self.assertEquals(5, bytesIo.tell()) - bytesIo.seek(10000) - self.assertEquals(10000, bytesIo.tell()) - - -class BytesIOTest(MemorySeekTestMixin, unittest.TestCase): - @staticmethod - def buftype(s): - return s.encode("utf-8") - ioclass = io.BytesIO - EOF = b"" - - -class StringIOTest(MemorySeekTestMixin, unittest.TestCase): - buftype = str - ioclass = io.StringIO - EOF = "" - + self.assertEquals(42, bufio.fileno()) -class BufferedReaderTest(unittest.TestCase): + def test_no_fileno(self): + # XXX will we always have fileno() function? If so, kill + # this test. Else, write it. + pass - def testRead(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedReader(rawio) + def test_invalid_args(self): + rawio = self.MockRawIO() + bufio = self.tp(rawio) + # Invalid whence + self.assertRaises(ValueError, bufio.seek, 0, -1) + self.assertRaises(ValueError, bufio.seek, 0, 3) + def test_override_destructor(self): + tp = self.tp + record = [] + class MyBufferedIO(tp): + def __del__(self): + record.append(1) + try: + f = super(MyBufferedIO, self).__del__ + except AttributeError: + pass + else: + f() + def close(self): + record.append(2) + super(MyBufferedIO, self).close() + def flush(self): + record.append(3) + super(MyBufferedIO, self).flush() + rawio = self.MockRawIO() + bufio = MyBufferedIO(rawio) + writable = bufio.writable() + del bufio + support.gc_collect() + if writable: + self.assertEqual(record, [1, 2, 3]) + else: + self.assertEqual(record, [1, 2]) + + def test_context_manager(self): + # Test usability as a context manager + rawio = self.MockRawIO() + bufio = self.tp(rawio) + def _with(): + with bufio: + pass + _with() + # bufio should now be closed, and using it a second time should raise + # a ValueError. + self.assertRaises(ValueError, _with) + + def test_error_through_destructor(self): + # Test that the exception state is not modified by a destructor, + # even if close() fails. + rawio = self.CloseFailureIO() + def f(): + self.tp(rawio).xyzzy + with support.captured_output("stderr") as s: + self.assertRaises(AttributeError, f) + s = s.getvalue().strip() + if s: + # The destructor *may* have printed an unraisable error, check it + self.assertEqual(len(s.splitlines()), 1) + self.assert_(s.startswith("Exception IOError: "), s) + self.assert_(s.endswith(" ignored"), s) + + def test_repr(self): + raw = self.MockRawIO() + b = self.tp(raw) + clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) + self.assertEqual(repr(b), "<%s>" % clsname) + raw.name = "dummy" + self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) + raw.name = b"dummy" + self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) + + +class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): + read_mode = "rb" + + def test_constructor(self): + rawio = self.MockRawIO([b"abc"]) + bufio = self.tp(rawio) + bufio.__init__(rawio) + bufio.__init__(rawio, buffer_size=1024) + bufio.__init__(rawio, buffer_size=16) + self.assertEquals(b"abc", bufio.read()) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) + rawio = self.MockRawIO([b"abc"]) + bufio.__init__(rawio) + self.assertEquals(b"abc", bufio.read()) + + def test_read(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) self.assertEquals(b"abcdef", bufio.read(6)) - - def testBuffering(self): + # Invalid args + self.assertRaises(ValueError, bufio.read, -2) + + def test_read1(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) + self.assertEquals(b"a", bufio.read(1)) + self.assertEquals(b"b", bufio.read1(1)) + self.assertEquals(rawio._reads, 1) + self.assertEquals(b"c", bufio.read1(100)) + self.assertEquals(rawio._reads, 1) + self.assertEquals(b"d", bufio.read1(100)) + self.assertEquals(rawio._reads, 2) + self.assertEquals(b"efg", bufio.read1(100)) + self.assertEquals(rawio._reads, 3) + self.assertEquals(b"", bufio.read1(100)) + # Invalid args + self.assertRaises(ValueError, bufio.read1, -1) + + def test_readinto(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) + b = bytearray(2) + self.assertEquals(bufio.readinto(b), 2) + self.assertEquals(b, b"ab") + self.assertEquals(bufio.readinto(b), 2) + self.assertEquals(b, b"cd") + self.assertEquals(bufio.readinto(b), 2) + self.assertEquals(b, b"ef") + self.assertEquals(bufio.readinto(b), 1) + self.assertEquals(b, b"gf") + self.assertEquals(bufio.readinto(b), 0) + self.assertEquals(b, b"gf") + + def test_buffering(self): data = b"abcdefghi" dlen = len(data) @@ -385,61 +702,52 @@ class BufferedReaderTest(unittest.TestCase): ] for bufsize, buf_read_sizes, raw_read_sizes in tests: - rawio = MockFileIO(data) - bufio = io.BufferedReader(rawio, buffer_size=bufsize) + rawio = self.MockFileIO(data) + bufio = self.tp(rawio, buffer_size=bufsize) pos = 0 for nbytes in buf_read_sizes: self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes]) pos += nbytes + # this is mildly implementation-dependent self.assertEquals(rawio.read_history, raw_read_sizes) - def testReadNonBlocking(self): + def test_read_non_blocking(self): # Inject some None's in there to simulate EWOULDBLOCK - rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None)) - bufio = io.BufferedReader(rawio) + rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) + bufio = self.tp(rawio) self.assertEquals(b"abcd", bufio.read(6)) self.assertEquals(b"e", bufio.read(1)) self.assertEquals(b"fg", bufio.read()) + self.assertEquals(b"", bufio.peek(1)) self.assert_(None is bufio.read()) self.assertEquals(b"", bufio.read()) - def testReadToEof(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedReader(rawio) + def test_read_past_eof(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) self.assertEquals(b"abcdefg", bufio.read(9000)) - def testReadNoArgs(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedReader(rawio) + def test_read_all(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) self.assertEquals(b"abcdefg", bufio.read()) - def testFileno(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedReader(rawio) - - self.assertEquals(42, bufio.fileno()) - - def testFilenoNoFileno(self): - # XXX will we always have fileno() function? If so, kill - # this test. Else, write it. - pass - - def testThreads(self): + def test_threads(self): try: # Write out many bytes with exactly the same number of 0's, # 1's... 255's. This will help us check that concurrent reading # doesn't duplicate or forget contents. N = 1000 - l = range(256) * N + l = list(range(256)) * N random.shuffle(l) s = bytes(bytearray(l)) - with io.open(test_support.TESTFN, "wb") as f: + with io.open(support.TESTFN, "wb") as f: f.write(s) - with io.open(test_support.TESTFN, "rb", buffering=0) as raw: - bufio = io.BufferedReader(raw, 8) + with io.open(support.TESTFN, self.read_mode, buffering=0) as raw: + bufio = self.tp(raw, 8) errors = [] results = [] def f(): @@ -467,82 +775,242 @@ class BufferedReaderTest(unittest.TestCase): c = bytes(bytearray([i])) self.assertEqual(s.count(c), N) finally: - test_support.unlink(test_support.TESTFN) - - + support.unlink(support.TESTFN) + + def test_misbehaved_io(self): + rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) + self.assertRaises(IOError, bufio.seek, 0) + self.assertRaises(IOError, bufio.tell) + +class CBufferedReaderTest(BufferedReaderTest): + tp = io.BufferedReader + + def test_constructor(self): + BufferedReaderTest.test_constructor(self) + # The allocation can succeed on 32-bit builds, e.g. with more + # than 2GB RAM and a 64-bit kernel. + if sys.maxsize > 0x7FFFFFFF: + rawio = self.MockRawIO() + bufio = self.tp(rawio) + self.assertRaises((OverflowError, MemoryError, ValueError), + bufio.__init__, rawio, sys.maxsize) + + def test_initialization(self): + rawio = self.MockRawIO([b"abc"]) + bufio = self.tp(rawio) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) + self.assertRaises(ValueError, bufio.read) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) + self.assertRaises(ValueError, bufio.read) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) + self.assertRaises(ValueError, bufio.read) + + def test_misbehaved_io_read(self): + rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) + # _pyio.BufferedReader seems to implement reading different, so that + # checking this is not so easy. + self.assertRaises(IOError, bufio.read, 10) + + def test_garbage_collection(self): + # C BufferedReader objects are collected. + # The Python version has __del__, so it ends into gc.garbage instead + rawio = self.FileIO(support.TESTFN, "w+b") + f = self.tp(rawio) + f.f = f + wr = weakref.ref(f) + del f + support.gc_collect() + self.assert_(wr() is None, wr) -class BufferedWriterTest(unittest.TestCase): +class PyBufferedReaderTest(BufferedReaderTest): + tp = pyio.BufferedReader - def testWrite(self): - # Write to the buffered IO but don't overflow the buffer. - writer = MockRawIO() - bufio = io.BufferedWriter(writer, 8) - bufio.write(b"abc") +class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): + write_mode = "wb" - self.assertFalse(writer._write_stack) + def test_constructor(self): + rawio = self.MockRawIO() + bufio = self.tp(rawio) + bufio.__init__(rawio) + bufio.__init__(rawio, buffer_size=1024) + bufio.__init__(rawio, buffer_size=16) + self.assertEquals(3, bufio.write(b"abc")) + bufio.flush() + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) + bufio.__init__(rawio) + self.assertEquals(3, bufio.write(b"ghi")) + bufio.flush() + self.assertEquals(b"".join(rawio._write_stack), b"abcghi") - def testWriteOverflow(self): - writer = MockRawIO() - bufio = io.BufferedWriter(writer, 8) + def test_detach_flush(self): + raw = self.MockRawIO() + buf = self.tp(raw) + buf.write(b"howdy!") + self.assertFalse(raw._write_stack) + buf.detach() + self.assertEqual(raw._write_stack, [b"howdy!"]) + def test_write(self): + # Write to the buffered IO but don't overflow the buffer. + writer = self.MockRawIO() + bufio = self.tp(writer, 8) bufio.write(b"abc") - bufio.write(b"defghijkl") - - self.assertEquals(b"abcdefghijkl", writer._write_stack[0]) - - def testWriteNonBlocking(self): - raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12)) - bufio = io.BufferedWriter(raw, 8, 16) - - bufio.write(b"asdf") - bufio.write(b"asdfa") - self.assertEquals(b"asdfasdfa", raw._write_stack[0]) - - bufio.write(b"asdfasdfasdf") - self.assertEquals(b"asdfasdfasdf", raw._write_stack[1]) - bufio.write(b"asdfasdfasdf") - self.assertEquals(b"dfasdfasdf", raw._write_stack[2]) - self.assertEquals(b"asdfasdfasdf", raw._write_stack[3]) - - bufio.write(b"asdfasdfasdf") - - # XXX I don't like this test. It relies too heavily on how the - # algorithm actually works, which we might change. Refactor - # later. - - def testFileno(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedWriter(rawio) - - self.assertEquals(42, bufio.fileno()) + self.assertFalse(writer._write_stack) - def testFlush(self): - writer = MockRawIO() - bufio = io.BufferedWriter(writer, 8) + def test_write_overflow(self): + writer = self.MockRawIO() + bufio = self.tp(writer, 8) + contents = b"abcdefghijklmnop" + for n in range(0, len(contents), 3): + bufio.write(contents[n:n+3]) + flushed = b"".join(writer._write_stack) + # At least (total - 8) bytes were implicitly flushed, perhaps more + # depending on the implementation. + self.assert_(flushed.startswith(contents[:-8]), flushed) + + def check_writes(self, intermediate_func): + # Lots of writes, test the flushed output is as expected. + contents = bytes(range(256)) * 1000 + n = 0 + writer = self.MockRawIO() + bufio = self.tp(writer, 13) + # Generator of write sizes: repeat each N 15 times then proceed to N+1 + def gen_sizes(): + for size in count(1): + for i in range(15): + yield size + sizes = gen_sizes() + while n < len(contents): + size = min(next(sizes), len(contents) - n) + self.assertEquals(bufio.write(contents[n:n+size]), size) + intermediate_func(bufio) + n += size + bufio.flush() + self.assertEquals(contents, + b"".join(writer._write_stack)) + + def test_writes(self): + self.check_writes(lambda bufio: None) + + def test_writes_and_flushes(self): + self.check_writes(lambda bufio: bufio.flush()) + + def test_writes_and_seeks(self): + def _seekabs(bufio): + pos = bufio.tell() + bufio.seek(pos + 1, 0) + bufio.seek(pos - 1, 0) + bufio.seek(pos, 0) + self.check_writes(_seekabs) + def _seekrel(bufio): + pos = bufio.seek(0, 1) + bufio.seek(+1, 1) + bufio.seek(-1, 1) + bufio.seek(pos, 0) + self.check_writes(_seekrel) + + def test_writes_and_truncates(self): + self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) + + def test_write_non_blocking(self): + raw = self.MockNonBlockWriterIO() + bufio = self.tp(raw, 8) + + self.assertEquals(bufio.write(b"abcd"), 4) + self.assertEquals(bufio.write(b"efghi"), 5) + # 1 byte will be written, the rest will be buffered + raw.block_on(b"k") + self.assertEquals(bufio.write(b"jklmn"), 5) + + # 8 bytes will be written, 8 will be buffered and the rest will be lost + raw.block_on(b"0") + try: + bufio.write(b"opqrwxyz0123456789") + except self.BlockingIOError as e: + written = e.characters_written + else: + self.fail("BlockingIOError should have been raised") + self.assertEquals(written, 16) + self.assertEquals(raw.pop_written(), + b"abcdefghijklmnopqrwxyz") + + self.assertEquals(bufio.write(b"ABCDEFGHI"), 9) + s = raw.pop_written() + # Previously buffered bytes were flushed + self.assertTrue(s.startswith(b"01234567A"), s) + + def test_write_and_rewind(self): + raw = io.BytesIO() + bufio = self.tp(raw, 4) + self.assertEqual(bufio.write(b"abcdef"), 6) + self.assertEqual(bufio.tell(), 6) + bufio.seek(0, 0) + self.assertEqual(bufio.write(b"XY"), 2) + bufio.seek(6, 0) + self.assertEqual(raw.getvalue(), b"XYcdef") + self.assertEqual(bufio.write(b"123456"), 6) + bufio.flush() + self.assertEqual(raw.getvalue(), b"XYcdef123456") + def test_flush(self): + writer = self.MockRawIO() + bufio = self.tp(writer, 8) bufio.write(b"abc") bufio.flush() + self.assertEquals(b"abc", writer._write_stack[0]) + def test_destructor(self): + writer = self.MockRawIO() + bufio = self.tp(writer, 8) + bufio.write(b"abc") + del bufio + support.gc_collect() self.assertEquals(b"abc", writer._write_stack[0]) - def testThreads(self): - # BufferedWriter should not raise exceptions or crash - # when called from multiple threads. + def test_truncate(self): + # Truncate implicitly flushes the buffer. + with io.open(support.TESTFN, self.write_mode, buffering=0) as raw: + bufio = self.tp(raw, 8) + bufio.write(b"abcdef") + self.assertEqual(bufio.truncate(3), 3) + self.assertEqual(bufio.tell(), 3) + with io.open(support.TESTFN, "rb", buffering=0) as f: + self.assertEqual(f.read(), b"abc") + + def test_threads(self): try: + # Write out many bytes from many threads and test they were + # all flushed. + N = 1000 + contents = bytes(range(256)) * N + sizes = cycle([1, 19]) + n = 0 + queue = deque() + while n < len(contents): + size = next(sizes) + queue.append(contents[n:n+size]) + n += size + del contents # We use a real file object because it allows us to # exercise situations where the GIL is released before # writing the buffer to the raw streams. This is in addition # to concurrency issues due to switching threads in the middle # of Python code. - with io.open(test_support.TESTFN, "wb", buffering=0) as raw: - bufio = io.BufferedWriter(raw, 8) + with io.open(support.TESTFN, self.write_mode, buffering=0) as raw: + bufio = self.tp(raw, 8) errors = [] def f(): try: - # Write enough bytes to flush the buffer - s = b"a" * 19 - for i in range(50): + while True: + try: + s = queue.popleft() + except IndexError: + return bufio.write(s) except Exception as e: errors.append(e) @@ -555,37 +1023,218 @@ class BufferedWriterTest(unittest.TestCase): t.join() self.assertFalse(errors, "the following exceptions were caught: %r" % errors) + bufio.close() + with io.open(support.TESTFN, "rb") as f: + s = f.read() + for i in range(256): + self.assertEquals(s.count(bytes([i])), N) finally: - test_support.unlink(test_support.TESTFN) + support.unlink(support.TESTFN) + + def test_misbehaved_io(self): + rawio = self.MisbehavedRawIO() + bufio = self.tp(rawio, 5) + self.assertRaises(IOError, bufio.seek, 0) + self.assertRaises(IOError, bufio.tell) + self.assertRaises(IOError, bufio.write, b"abcdef") + + def test_max_buffer_size_deprecation(self): + with support.check_warnings() as w: + warnings.simplefilter("always", DeprecationWarning) + self.tp(self.MockRawIO(), 8, 12) + self.assertEqual(len(w.warnings), 1) + warning = w.warnings[0] + self.assertTrue(warning.category is DeprecationWarning) + self.assertEqual(str(warning.message), + "max_buffer_size is deprecated") + + +class CBufferedWriterTest(BufferedWriterTest): + tp = io.BufferedWriter + + def test_constructor(self): + BufferedWriterTest.test_constructor(self) + # The allocation can succeed on 32-bit builds, e.g. with more + # than 2GB RAM and a 64-bit kernel. + if sys.maxsize > 0x7FFFFFFF: + rawio = self.MockRawIO() + bufio = self.tp(rawio) + self.assertRaises((OverflowError, MemoryError, ValueError), + bufio.__init__, rawio, sys.maxsize) + + def test_initialization(self): + rawio = self.MockRawIO() + bufio = self.tp(rawio) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) + self.assertRaises(ValueError, bufio.write, b"def") + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) + self.assertRaises(ValueError, bufio.write, b"def") + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) + self.assertRaises(ValueError, bufio.write, b"def") + + def test_garbage_collection(self): + # C BufferedWriter objects are collected, and collecting them flushes + # all data to disk. + # The Python version has __del__, so it ends into gc.garbage instead + rawio = self.FileIO(support.TESTFN, "w+b") + f = self.tp(rawio) + f.write(b"123xxx") + f.x = f + wr = weakref.ref(f) + del f + support.gc_collect() + self.assert_(wr() is None, wr) + with open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"123xxx") + +class PyBufferedWriterTest(BufferedWriterTest): + tp = pyio.BufferedWriter class BufferedRWPairTest(unittest.TestCase): - def testRWPair(self): - r = MockRawIO(()) - w = MockRawIO() - pair = io.BufferedRWPair(r, w) + def test_constructor(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) self.assertFalse(pair.closed) - # XXX More Tests + def test_detach(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertRaises(self.UnsupportedOperation, pair.detach) + + def test_constructor_max_buffer_size_deprecation(self): + with support.check_warnings() as w: + warnings.simplefilter("always", DeprecationWarning) + self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) + self.assertEqual(len(w.warnings), 1) + warning = w.warnings[0] + self.assertTrue(warning.category is DeprecationWarning) + self.assertEqual(str(warning.message), + "max_buffer_size is deprecated") + + def test_constructor_with_not_readable(self): + class NotReadable(MockRawIO): + def readable(self): + return False + + self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) + + def test_constructor_with_not_writeable(self): + class NotWriteable(MockRawIO): + def writable(self): + return False + + self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) + + def test_read(self): + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) + + self.assertEqual(pair.read(3), b"abc") + self.assertEqual(pair.read(1), b"d") + self.assertEqual(pair.read(), b"ef") + + def test_read1(self): + # .read1() is delegated to the underlying reader object, so this test + # can be shallow. + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) + + self.assertEqual(pair.read1(3), b"abc") + + def test_readinto(self): + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) + + data = bytearray(5) + self.assertEqual(pair.readinto(data), 5) + self.assertEqual(data, b"abcde") + + def test_write(self): + w = self.MockRawIO() + pair = self.tp(self.MockRawIO(), w) + pair.write(b"abc") + pair.flush() + pair.write(b"def") + pair.flush() + self.assertEqual(w._write_stack, [b"abc", b"def"]) -class BufferedRandomTest(unittest.TestCase): + def test_peek(self): + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) - def testReadAndWrite(self): - raw = MockRawIO((b"asdf", b"ghjk")) - rw = io.BufferedRandom(raw, 8, 12) + self.assertTrue(pair.peek(3).startswith(b"abc")) + self.assertEqual(pair.read(3), b"abc") + + def test_readable(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertTrue(pair.readable()) + + def test_writeable(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertTrue(pair.writable()) + + def test_seekable(self): + # BufferedRWPairs are never seekable, even if their readers and writers + # are. + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertFalse(pair.seekable()) + + # .flush() is delegated to the underlying writer object and has been + # tested in the test_write method. + + def test_close_and_closed(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertFalse(pair.closed) + pair.close() + self.assertTrue(pair.closed) + + def test_isatty(self): + class SelectableIsAtty(MockRawIO): + def __init__(self, isatty): + MockRawIO.__init__(self) + self._isatty = isatty + + def isatty(self): + return self._isatty + + pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) + self.assertFalse(pair.isatty()) + + pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) + self.assertTrue(pair.isatty()) + + pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) + self.assertTrue(pair.isatty()) + + pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) + self.assertTrue(pair.isatty()) + +class CBufferedRWPairTest(BufferedRWPairTest): + tp = io.BufferedRWPair + +class PyBufferedRWPairTest(BufferedRWPairTest): + tp = pyio.BufferedRWPair + + +class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): + read_mode = "rb+" + write_mode = "wb+" + + def test_constructor(self): + BufferedReaderTest.test_constructor(self) + BufferedWriterTest.test_constructor(self) + + def test_read_and_write(self): + raw = self.MockRawIO((b"asdf", b"ghjk")) + rw = self.tp(raw, 8) self.assertEqual(b"as", rw.read(2)) rw.write(b"ddd") rw.write(b"eee") self.assertFalse(raw._write_stack) # Buffer writes - self.assertEqual(b"ghjk", rw.read()) # This read forces write flush + self.assertEqual(b"ghjk", rw.read()) self.assertEquals(b"dddeee", raw._write_stack[0]) - def testSeekAndTell(self): - raw = io.BytesIO(b"asdfghjkl") - rw = io.BufferedRandom(raw) + def test_seek_and_tell(self): + raw = self.BytesIO(b"asdfghjkl") + rw = self.tp(raw) self.assertEquals(b"as", rw.read(2)) self.assertEquals(2, rw.tell()) @@ -603,6 +1252,115 @@ class BufferedRandomTest(unittest.TestCase): self.assertEquals(b"fl", rw.read(11)) self.assertRaises(TypeError, rw.seek, 0.0) + def check_flush_and_read(self, read_func): + raw = self.BytesIO(b"abcdefghi") + bufio = self.tp(raw) + + self.assertEquals(b"ab", read_func(bufio, 2)) + bufio.write(b"12") + self.assertEquals(b"ef", read_func(bufio, 2)) + self.assertEquals(6, bufio.tell()) + bufio.flush() + self.assertEquals(6, bufio.tell()) + self.assertEquals(b"ghi", read_func(bufio)) + raw.seek(0, 0) + raw.write(b"XYZ") + # flush() resets the read buffer + bufio.flush() + bufio.seek(0, 0) + self.assertEquals(b"XYZ", read_func(bufio, 3)) + + def test_flush_and_read(self): + self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) + + def test_flush_and_readinto(self): + def _readinto(bufio, n=-1): + b = bytearray(n if n >= 0 else 9999) + n = bufio.readinto(b) + return bytes(b[:n]) + self.check_flush_and_read(_readinto) + + def test_flush_and_peek(self): + def _peek(bufio, n=-1): + # This relies on the fact that the buffer can contain the whole + # raw stream, otherwise peek() can return less. + b = bufio.peek(n) + if n != -1: + b = b[:n] + bufio.seek(len(b), 1) + return b + self.check_flush_and_read(_peek) + + def test_flush_and_write(self): + raw = self.BytesIO(b"abcdefghi") + bufio = self.tp(raw) + + bufio.write(b"123") + bufio.flush() + bufio.write(b"45") + bufio.flush() + bufio.seek(0, 0) + self.assertEquals(b"12345fghi", raw.getvalue()) + self.assertEquals(b"12345fghi", bufio.read()) + + def test_threads(self): + BufferedReaderTest.test_threads(self) + BufferedWriterTest.test_threads(self) + + def test_writes_and_peek(self): + def _peek(bufio): + bufio.peek(1) + self.check_writes(_peek) + def _peek(bufio): + pos = bufio.tell() + bufio.seek(-1, 1) + bufio.peek(1) + bufio.seek(pos, 0) + self.check_writes(_peek) + + def test_writes_and_reads(self): + def _read(bufio): + bufio.seek(-1, 1) + bufio.read(1) + self.check_writes(_read) + + def test_writes_and_read1s(self): + def _read1(bufio): + bufio.seek(-1, 1) + bufio.read1(1) + self.check_writes(_read1) + + def test_writes_and_readintos(self): + def _read(bufio): + bufio.seek(-1, 1) + bufio.readinto(bytearray(1)) + self.check_writes(_read) + + def test_misbehaved_io(self): + BufferedReaderTest.test_misbehaved_io(self) + BufferedWriterTest.test_misbehaved_io(self) + +class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest): + tp = io.BufferedRandom + + def test_constructor(self): + BufferedRandomTest.test_constructor(self) + # The allocation can succeed on 32-bit builds, e.g. with more + # than 2GB RAM and a 64-bit kernel. + if sys.maxsize > 0x7FFFFFFF: + rawio = self.MockRawIO() + bufio = self.tp(rawio) + self.assertRaises((OverflowError, MemoryError, ValueError), + bufio.__init__, rawio, sys.maxsize) + + def test_garbage_collection(self): + CBufferedReaderTest.test_garbage_collection(self) + CBufferedWriterTest.test_garbage_collection(self) + +class PyBufferedRandomTest(BufferedRandomTest): + tp = pyio.BufferedRandom + + # To fully exercise seek/tell, the StatefulIncrementalDecoder has these # properties: # - A single output character can correspond to many bytes of input. @@ -736,7 +1494,7 @@ class StatefulIncrementalDecoderTest(unittest.TestCase): 'm--------------.') ] - def testDecoder(self): + def test_decoder(self): # Try a few one-shot test cases. for input, eof, output in self.test_cases: d = StatefulIncrementalDecoder() @@ -752,98 +1510,115 @@ class TextIOWrapperTest(unittest.TestCase): def setUp(self): self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") + support.unlink(support.TESTFN) def tearDown(self): - test_support.unlink(test_support.TESTFN) - - def testLineBuffering(self): - r = io.BytesIO() - b = io.BufferedWriter(r, 1000) - t = io.TextIOWrapper(b, newline="\n", line_buffering=True) - t.write(u"X") + support.unlink(support.TESTFN) + + def test_constructor(self): + r = self.BytesIO(b"\xc3\xa9\n\n") + b = self.BufferedReader(r, 1000) + t = self.TextIOWrapper(b) + t.__init__(b, encoding="latin1", newline="\r\n") + self.assertEquals(t.encoding, "latin1") + self.assertEquals(t.line_buffering, False) + t.__init__(b, encoding="utf8", line_buffering=True) + self.assertEquals(t.encoding, "utf8") + self.assertEquals(t.line_buffering, True) + self.assertEquals("\xe9\n", t.readline()) + self.assertRaises(TypeError, t.__init__, b, newline=42) + self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') + + def test_detach(self): + r = self.BytesIO() + b = self.BufferedWriter(r) + t = self.TextIOWrapper(b) + self.assertIs(t.detach(), b) + + t = self.TextIOWrapper(b, encoding="ascii") + t.write("howdy") + self.assertFalse(r.getvalue()) + t.detach() + self.assertEqual(r.getvalue(), b"howdy") + self.assertRaises(ValueError, t.detach) + + def test_repr(self): + raw = self.BytesIO("hello".encode("utf-8")) + b = self.BufferedReader(raw) + t = self.TextIOWrapper(b, encoding="utf-8") + modname = self.TextIOWrapper.__module__ + self.assertEqual(repr(t), + "<%s.TextIOWrapper encoding='utf-8'>" % modname) + raw.name = "dummy" + self.assertEqual(repr(t), + "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) + raw.name = b"dummy" + self.assertEqual(repr(t), + "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) + + def test_line_buffering(self): + r = self.BytesIO() + b = self.BufferedWriter(r, 1000) + t = self.TextIOWrapper(b, newline="\n", line_buffering=True) + t.write("X") self.assertEquals(r.getvalue(), b"") # No flush happened - t.write(u"Y\nZ") + t.write("Y\nZ") self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed - t.write(u"A\rB") + t.write("A\rB") self.assertEquals(r.getvalue(), b"XY\nZA\rB") - def testEncodingErrorsReading(self): + def test_encoding(self): + # Check the encoding attribute is always set, and valid + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="utf8") + self.assertEqual(t.encoding, "utf8") + t = self.TextIOWrapper(b) + self.assert_(t.encoding is not None) + codecs.lookup(t.encoding) + + def test_encoding_errors_reading(self): # (1) default - b = io.BytesIO(b"abc\n\xff\n") - t = io.TextIOWrapper(b, encoding="ascii") + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii") self.assertRaises(UnicodeError, t.read) # (2) explicit strict - b = io.BytesIO(b"abc\n\xff\n") - t = io.TextIOWrapper(b, encoding="ascii", errors="strict") + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="strict") self.assertRaises(UnicodeError, t.read) # (3) ignore - b = io.BytesIO(b"abc\n\xff\n") - t = io.TextIOWrapper(b, encoding="ascii", errors="ignore") + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") self.assertEquals(t.read(), "abc\n\n") # (4) replace - b = io.BytesIO(b"abc\n\xff\n") - t = io.TextIOWrapper(b, encoding="ascii", errors="replace") - self.assertEquals(t.read(), u"abc\n\ufffd\n") + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="replace") + self.assertEquals(t.read(), "abc\n\ufffd\n") - def testEncodingErrorsWriting(self): + def test_encoding_errors_writing(self): # (1) default - b = io.BytesIO() - t = io.TextIOWrapper(b, encoding="ascii") - self.assertRaises(UnicodeError, t.write, u"\xff") + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii") + self.assertRaises(UnicodeError, t.write, "\xff") # (2) explicit strict - b = io.BytesIO() - t = io.TextIOWrapper(b, encoding="ascii", errors="strict") - self.assertRaises(UnicodeError, t.write, u"\xff") + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="strict") + self.assertRaises(UnicodeError, t.write, "\xff") # (3) ignore - b = io.BytesIO() - t = io.TextIOWrapper(b, encoding="ascii", errors="ignore", + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", newline="\n") - t.write(u"abc\xffdef\n") + t.write("abc\xffdef\n") t.flush() self.assertEquals(b.getvalue(), b"abcdef\n") # (4) replace - b = io.BytesIO() - t = io.TextIOWrapper(b, encoding="ascii", errors="replace", + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="replace", newline="\n") - t.write(u"abc\xffdef\n") + t.write("abc\xffdef\n") t.flush() self.assertEquals(b.getvalue(), b"abc?def\n") - def testNewlinesInput(self): - testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" - normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") - for newline, expected in [ - (None, normalized.decode("ascii").splitlines(True)), - ("", testdata.decode("ascii").splitlines(True)), - ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), - ]: - buf = io.BytesIO(testdata) - txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) - self.assertEquals(txt.readlines(), expected) - txt.seek(0) - self.assertEquals(txt.read(), "".join(expected)) - - def testNewlinesOutput(self): - testdict = { - "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", - "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", - "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", - "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", - } - tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) - for newline, expected in tests: - buf = io.BytesIO() - txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) - txt.write("AAA\nB") - txt.write("BB\nCCC\n") - txt.write("X\rY\r\nZ") - txt.flush() - self.assertEquals(buf.closed, False) - self.assertEquals(buf.getvalue(), expected) - - def testNewlines(self): + def test_newlines(self): input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] tests = [ @@ -867,8 +1642,8 @@ class TextIOWrapperTest(unittest.TestCase): for do_reads in (False, True): for bufsize in range(1, 10): for newline, exp_lines in tests: - bufio = io.BufferedReader(io.BytesIO(data), bufsize) - textio = io.TextIOWrapper(bufio, newline=newline, + bufio = self.BufferedReader(self.BytesIO(data), bufsize) + textio = self.TextIOWrapper(bufio, newline=newline, encoding=encoding) if do_reads: got_lines = [] @@ -885,75 +1660,117 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(got_line, exp_line) self.assertEquals(len(got_lines), len(exp_lines)) - def testNewlinesInput(self): - testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" + def test_newlines_input(self): + testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") for newline, expected in [ (None, normalized.decode("ascii").splitlines(True)), ("", testdata.decode("ascii").splitlines(True)), - ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), + ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), ]: - buf = io.BytesIO(testdata) - txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) + buf = self.BytesIO(testdata) + txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) self.assertEquals(txt.readlines(), expected) txt.seek(0) self.assertEquals(txt.read(), "".join(expected)) - def testNewlinesOutput(self): - data = u"AAA\nBBB\rCCC\n" - data_lf = b"AAA\nBBB\rCCC\n" - data_cr = b"AAA\rBBB\rCCC\r" - data_crlf = b"AAA\r\nBBB\rCCC\r\n" - save_linesep = os.linesep - try: - for os.linesep, newline, expected in [ - ("\n", None, data_lf), - ("\r\n", None, data_crlf), - ("\n", "", data_lf), - ("\r\n", "", data_lf), - ("\n", "\n", data_lf), - ("\r\n", "\n", data_lf), - ("\n", "\r", data_cr), - ("\r\n", "\r", data_cr), - ("\n", "\r\n", data_crlf), - ("\r\n", "\r\n", data_crlf), - ]: - buf = io.BytesIO() - txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) - txt.write(data) - txt.close() - self.assertEquals(buf.closed, True) - self.assertRaises(ValueError, buf.getvalue) - finally: - os.linesep = save_linesep + def test_newlines_output(self): + testdict = { + "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", + "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", + "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", + "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", + } + tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) + for newline, expected in tests: + buf = self.BytesIO() + txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) + txt.write("AAA\nB") + txt.write("BB\nCCC\n") + txt.write("X\rY\r\nZ") + txt.flush() + self.assertEquals(buf.closed, False) + self.assertEquals(buf.getvalue(), expected) + + def test_destructor(self): + l = [] + base = self.BytesIO + class MyBytesIO(base): + def close(self): + l.append(self.getvalue()) + base.close(self) + b = MyBytesIO() + t = self.TextIOWrapper(b, encoding="ascii") + t.write("abc") + del t + support.gc_collect() + self.assertEquals([b"abc"], l) + + def test_override_destructor(self): + record = [] + class MyTextIO(self.TextIOWrapper): + def __del__(self): + record.append(1) + try: + f = super(MyTextIO, self).__del__ + except AttributeError: + pass + else: + f() + def close(self): + record.append(2) + super(MyTextIO, self).close() + def flush(self): + record.append(3) + super(MyTextIO, self).flush() + b = self.BytesIO() + t = MyTextIO(b, encoding="ascii") + del t + support.gc_collect() + self.assertEqual(record, [1, 2, 3]) + + def test_error_through_destructor(self): + # Test that the exception state is not modified by a destructor, + # even if close() fails. + rawio = self.CloseFailureIO() + def f(): + self.TextIOWrapper(rawio).xyzzy + with support.captured_output("stderr") as s: + self.assertRaises(AttributeError, f) + s = s.getvalue().strip() + if s: + # The destructor *may* have printed an unraisable error, check it + self.assertEqual(len(s.splitlines()), 1) + self.assert_(s.startswith("Exception IOError: "), s) + self.assert_(s.endswith(" ignored"), s) # Systematic tests of the text I/O API - def testBasicIO(self): + def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": - f = io.open(test_support.TESTFN, "w+", encoding=enc) + f = self.open(support.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize - self.assertEquals(f.write(u"abc"), 3) + self.assertEquals(f.write("abc"), 3) f.close() - f = io.open(test_support.TESTFN, "r+", encoding=enc) + f = self.open(support.TESTFN, "r+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEquals(f.tell(), 0) - self.assertEquals(f.read(), u"abc") + self.assertEquals(f.read(), "abc") cookie = f.tell() self.assertEquals(f.seek(0), 0) - self.assertEquals(f.read(2), u"ab") - self.assertEquals(f.read(1), u"c") - self.assertEquals(f.read(1), u"") - self.assertEquals(f.read(), u"") + self.assertEquals(f.read(2), "ab") + self.assertEquals(f.read(1), "c") + self.assertEquals(f.read(1), "") + self.assertEquals(f.read(), "") self.assertEquals(f.tell(), cookie) self.assertEquals(f.seek(0), 0) self.assertEquals(f.seek(0, 2), cookie) - self.assertEquals(f.write(u"def"), 3) + self.assertEquals(f.write("def"), 3) self.assertEquals(f.seek(cookie), cookie) - self.assertEquals(f.read(), u"def") + self.assertEquals(f.read(), "def") if enc.startswith("utf"): self.multi_line_test(f, enc) f.close() @@ -961,13 +1778,13 @@ class TextIOWrapperTest(unittest.TestCase): def multi_line_test(self, f, enc): f.seek(0) f.truncate() - sample = u"s\xff\u0fff\uffff" + sample = "s\xff\u0fff\uffff" wlines = [] for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): chars = [] for i in range(size): chars.append(sample[i % len(sample)]) - line = u"".join(chars) + u"\n" + line = "".join(chars) + "\n" wlines.append((f.tell(), line)) f.write(line) f.seek(0) @@ -980,28 +1797,28 @@ class TextIOWrapperTest(unittest.TestCase): rlines.append((pos, line)) self.assertEquals(rlines, wlines) - def testTelling(self): - f = io.open(test_support.TESTFN, "w+", encoding="utf8") + def test_telling(self): + f = self.open(support.TESTFN, "w+", encoding="utf8") p0 = f.tell() - f.write(u"\xff\n") + f.write("\xff\n") p1 = f.tell() - f.write(u"\xff\n") + f.write("\xff\n") p2 = f.tell() f.seek(0) self.assertEquals(f.tell(), p0) - self.assertEquals(f.readline(), u"\xff\n") + self.assertEquals(f.readline(), "\xff\n") self.assertEquals(f.tell(), p1) - self.assertEquals(f.readline(), u"\xff\n") + self.assertEquals(f.readline(), "\xff\n") self.assertEquals(f.tell(), p2) f.seek(0) for line in f: - self.assertEquals(line, u"\xff\n") + self.assertEquals(line, "\xff\n") self.assertRaises(IOError, f.tell) self.assertEquals(f.tell(), p2) f.close() - def testSeeking(self): - chunk_size = io.TextIOWrapper._CHUNK_SIZE + def test_seeking(self): + chunk_size = _default_chunk_size() prefix_size = chunk_size - 2 u_prefix = "a" * prefix_size prefix = bytes(u_prefix.encode("utf-8")) @@ -1009,43 +1826,46 @@ class TextIOWrapperTest(unittest.TestCase): u_suffix = "\u8888\n" suffix = bytes(u_suffix.encode("utf-8")) line = prefix + suffix - f = io.open(test_support.TESTFN, "wb") + f = self.open(support.TESTFN, "wb") f.write(line*2) f.close() - f = io.open(test_support.TESTFN, "r", encoding="utf-8") + f = self.open(support.TESTFN, "r", encoding="utf-8") s = f.read(prefix_size) - self.assertEquals(s, unicode(prefix, "ascii")) + self.assertEquals(s, prefix.decode("ascii")) self.assertEquals(f.tell(), prefix_size) self.assertEquals(f.readline(), u_suffix) - def testSeekingToo(self): + def test_seeking_too(self): # Regression test for a specific bug data = b'\xe0\xbf\xbf\n' - f = io.open(test_support.TESTFN, "wb") + f = self.open(support.TESTFN, "wb") f.write(data) f.close() - f = io.open(test_support.TESTFN, "r", encoding="utf-8") + f = self.open(support.TESTFN, "r", encoding="utf-8") f._CHUNK_SIZE # Just test that it exists f._CHUNK_SIZE = 2 f.readline() f.tell() - def testSeekAndTell(self): - """Test seek/tell using the StatefulIncrementalDecoder.""" + def test_seek_and_tell(self): + #Test seek/tell using the StatefulIncrementalDecoder. + # Make test faster by doing smaller seeks + CHUNK_SIZE = 128 - def testSeekAndTellWithData(data, min_pos=0): + def test_seek_and_tell_with_data(data, min_pos=0): """Tell/seek to various points within a data stream and ensure that the decoded data returned by read() is consistent.""" - f = io.open(test_support.TESTFN, 'wb') + f = self.open(support.TESTFN, 'wb') f.write(data) f.close() - f = io.open(test_support.TESTFN, encoding='test_decoder') + f = self.open(support.TESTFN, encoding='test_decoder') + f._CHUNK_SIZE = CHUNK_SIZE decoded = f.read() f.close() for i in range(min_pos, len(decoded) + 1): # seek positions for j in [1, 5, len(decoded) - i]: # read lengths - f = io.open(test_support.TESTFN, encoding='test_decoder') + f = self.open(support.TESTFN, encoding='test_decoder') self.assertEquals(f.read(i), decoded[:i]) cookie = f.tell() self.assertEquals(f.read(j), decoded[i:i + j]) @@ -1060,23 +1880,22 @@ class TextIOWrapperTest(unittest.TestCase): try: # Try each test case. for input, _, _ in StatefulIncrementalDecoderTest.test_cases: - testSeekAndTellWithData(input) + test_seek_and_tell_with_data(input) # Position each test case so that it crosses a chunk boundary. - CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE for input, _, _ in StatefulIncrementalDecoderTest.test_cases: offset = CHUNK_SIZE - len(input)//2 prefix = b'.'*offset # Don't bother seeking into the prefix (takes too long). min_pos = offset*2 - testSeekAndTellWithData(prefix + input, min_pos) + test_seek_and_tell_with_data(prefix + input, min_pos) # Ensure our test decoder won't interfere with subsequent tests. finally: StatefulIncrementalDecoder.codecEnabled = 0 - def testEncodedWrites(self): - data = u"1234567890" + def test_encoded_writes(self): + data = "1234567890" tests = ("utf-16", "utf-16-le", "utf-16-be", @@ -1084,54 +1903,26 @@ class TextIOWrapperTest(unittest.TestCase): "utf-32-le", "utf-32-be") for encoding in tests: - buf = io.BytesIO() - f = io.TextIOWrapper(buf, encoding=encoding) + buf = self.BytesIO() + f = self.TextIOWrapper(buf, encoding=encoding) # Check if the BOM is written only once (see issue1753). f.write(data) f.write(data) f.seek(0) self.assertEquals(f.read(), data * 2) + f.seek(0) + self.assertEquals(f.read(), data * 2) self.assertEquals(buf.getvalue(), (data * 2).encode(encoding)) - def timingTest(self): - timer = time.time - enc = "utf8" - line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n" - nlines = 10000 - nchars = len(line) - nbytes = len(line.encode(enc)) - for chunk_size in (32, 64, 128, 256): - f = io.open(test_support.TESTFN, "w+", encoding=enc) - f._CHUNK_SIZE = chunk_size - t0 = timer() - for i in range(nlines): - f.write(line) - f.flush() - t1 = timer() - f.seek(0) - for line in f: - pass - t2 = timer() - f.seek(0) - while f.readline(): - pass - t3 = timer() - f.seek(0) - while f.readline(): - f.tell() - t4 = timer() - f.close() - if test_support.verbose: - print("\nTiming test: %d lines of %d characters (%d bytes)" % - (nlines, nchars, nbytes)) - print("File chunk size: %6s" % f._CHUNK_SIZE) - print("Writing: %6.3f seconds" % (t1-t0)) - print("Reading using iteration: %6.3f seconds" % (t2-t1)) - print("Reading using readline(): %6.3f seconds" % (t3-t2)) - print("Using readline()+tell(): %6.3f seconds" % (t4-t3)) - - def testReadOneByOne(self): - txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB")) + def test_unreadable(self): + class UnReadable(self.BytesIO): + def readable(self): + return False + txt = self.TextIOWrapper(UnReadable()) + self.assertRaises(IOError, txt.read) + + def test_read_one_by_one(self): + txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) reads = "" while True: c = txt.read(1) @@ -1141,9 +1932,9 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, "AA\nBB") # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. - def testReadByChunk(self): + def test_read_by_chunk(self): # make sure "\r\n" straddles 128 char boundary. - txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB")) + txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) reads = "" while True: c = txt.read(128) @@ -1153,7 +1944,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, "A"*127+"\nB") def test_issue1395_1(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") # read one char at a time reads = "" @@ -1165,7 +1956,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, self.normalized) def test_issue1395_2(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = "" @@ -1177,7 +1968,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, self.normalized) def test_issue1395_3(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) @@ -1188,7 +1979,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, self.normalized) def test_issue1395_4(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) @@ -1196,7 +1987,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, self.normalized) def test_issue1395_5(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) @@ -1206,12 +1997,84 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(txt.read(4), "BBB\n") def test_issue2282(self): - buffer = io.BytesIO(self.testdata) - txt = io.TextIOWrapper(buffer, encoding="ascii") + buffer = self.BytesIO(self.testdata) + txt = self.TextIOWrapper(buffer, encoding="ascii") self.assertEqual(buffer.seekable(), txt.seekable()) - def check_newline_decoder_utf8(self, decoder): + @unittest.skip("Issue #6213 with incremental encoders") + def test_append_bom(self): + # The BOM is not written again when appending to a non-empty file + filename = support.TESTFN + for charset in ('utf-8-sig', 'utf-16', 'utf-32'): + with self.open(filename, 'w', encoding=charset) as f: + f.write('aaa') + pos = f.tell() + with self.open(filename, 'rb') as f: + self.assertEquals(f.read(), 'aaa'.encode(charset)) + + with self.open(filename, 'a', encoding=charset) as f: + f.write('xxx') + with self.open(filename, 'rb') as f: + self.assertEquals(f.read(), 'aaaxxx'.encode(charset)) + + @unittest.skip("Issue #6213 with incremental encoders") + def test_seek_bom(self): + # Same test, but when seeking manually + filename = support.TESTFN + for charset in ('utf-8-sig', 'utf-16', 'utf-32'): + with self.open(filename, 'w', encoding=charset) as f: + f.write('aaa') + pos = f.tell() + with self.open(filename, 'r+', encoding=charset) as f: + f.seek(pos) + f.write('zzz') + f.seek(0) + f.write('bbb') + with self.open(filename, 'rb') as f: + self.assertEquals(f.read(), 'bbbzzz'.encode(charset)) + + def test_errors_property(self): + with self.open(support.TESTFN, "w") as f: + self.assertEqual(f.errors, "strict") + with self.open(support.TESTFN, "w", errors="replace") as f: + self.assertEqual(f.errors, "replace") + + +class CTextIOWrapperTest(TextIOWrapperTest): + + def test_initialization(self): + r = self.BytesIO(b"\xc3\xa9\n\n") + b = self.BufferedReader(r, 1000) + t = self.TextIOWrapper(b) + self.assertRaises(TypeError, t.__init__, b, newline=42) + self.assertRaises(ValueError, t.read) + self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') + self.assertRaises(ValueError, t.read) + + def test_garbage_collection(self): + # C TextIOWrapper objects are collected, and collecting them flushes + # all data to disk. + # The Python version has __del__, so it ends in gc.garbage instead. + rawio = io.FileIO(support.TESTFN, "wb") + b = self.BufferedWriter(rawio) + t = self.TextIOWrapper(b, encoding="ascii") + t.write("456def") + t.x = t + wr = weakref.ref(t) + del t + support.gc_collect() + self.assert_(wr() is None, wr) + with open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"456def") + +class PyTextIOWrapperTest(TextIOWrapperTest): + pass + + +class IncrementalNewlineDecoderTest(unittest.TestCase): + + def check_newline_decoding_utf8(self, decoder): # UTF-8 specific tests for a newline decoder def _check_decode(b, s, **kwargs): # We exercise getstate() / setstate() as well as decode() @@ -1253,12 +2116,20 @@ class TextIOWrapperTest(unittest.TestCase): _check_decode(b'\xe8\xa2\x88\r', "\u8888") _check_decode(b'\n', "\n") - def check_newline_decoder(self, decoder, encoding): + def check_newline_decoding(self, decoder, encoding): result = [] - encoder = codecs.getincrementalencoder(encoding)() - def _decode_bytewise(s): - for b in encoder.encode(s): - result.append(decoder.decode(b)) + if encoding is not None: + encoder = codecs.getincrementalencoder(encoding)() + def _decode_bytewise(s): + # Decode one byte at a time + for b in encoder.encode(s): + result.append(decoder.decode(b)) + else: + encoder = None + def _decode_bytewise(s): + # Decode one char at a time + for c in s: + result.append(decoder.decode(c)) self.assertEquals(decoder.newlines, None) _decode_bytewise("abc\n\r") self.assertEquals(decoder.newlines, '\n') @@ -1271,22 +2142,47 @@ class TextIOWrapperTest(unittest.TestCase): _decode_bytewise("abc\r") self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc") decoder.reset() - self.assertEquals(decoder.decode("abc".encode(encoding)), "abc") + input = "abc" + if encoder is not None: + encoder.reset() + input = encoder.encode(input) + self.assertEquals(decoder.decode(input), "abc") self.assertEquals(decoder.newlines, None) def test_newline_decoder(self): encodings = ( - 'utf-8', 'latin-1', + # None meaning the IncrementalNewlineDecoder takes unicode input + # rather than bytes input + None, 'utf-8', 'latin-1', 'utf-16', 'utf-16-le', 'utf-16-be', 'utf-32', 'utf-32-le', 'utf-32-be', ) for enc in encodings: - decoder = codecs.getincrementaldecoder(enc)() - decoder = io.IncrementalNewlineDecoder(decoder, translate=True) - self.check_newline_decoder(decoder, enc) + decoder = enc and codecs.getincrementaldecoder(enc)() + decoder = self.IncrementalNewlineDecoder(decoder, translate=True) + self.check_newline_decoding(decoder, enc) decoder = codecs.getincrementaldecoder("utf-8")() - decoder = io.IncrementalNewlineDecoder(decoder, translate=True) - self.check_newline_decoder_utf8(decoder) + decoder = self.IncrementalNewlineDecoder(decoder, translate=True) + self.check_newline_decoding_utf8(decoder) + + def test_newline_bytes(self): + # Issue 5433: Excessive optimization in IncrementalNewlineDecoder + def _check(dec): + self.assertEquals(dec.newlines, None) + self.assertEquals(dec.decode("\u0D00"), "\u0D00") + self.assertEquals(dec.newlines, None) + self.assertEquals(dec.decode("\u0A00"), "\u0A00") + self.assertEquals(dec.newlines, None) + dec = self.IncrementalNewlineDecoder(None, translate=False) + _check(dec) + dec = self.IncrementalNewlineDecoder(None, translate=True) + _check(dec) + +class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): + pass + +class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): + pass # XXX Tests for open() @@ -1294,40 +2190,39 @@ class TextIOWrapperTest(unittest.TestCase): class MiscIOTest(unittest.TestCase): def tearDown(self): - test_support.unlink(test_support.TESTFN) + support.unlink(support.TESTFN) - def testImport__all__(self): - for name in io.__all__: - obj = getattr(io, name, None) + def test___all__(self): + for name in self.io.__all__: + obj = getattr(self.io, name, None) self.assertTrue(obj is not None, name) if name == "open": continue - elif "error" in name.lower(): + elif "error" in name.lower() or name == "UnsupportedOperation": self.assertTrue(issubclass(obj, Exception), name) elif not name.startswith("SEEK_"): - self.assertTrue(issubclass(obj, io.IOBase)) - + self.assertTrue(issubclass(obj, self.IOBase)) def test_attributes(self): - f = io.open(test_support.TESTFN, "wb", buffering=0) + f = self.open(support.TESTFN, "wb", buffering=0) self.assertEquals(f.mode, "wb") f.close() - f = io.open(test_support.TESTFN, "U") - self.assertEquals(f.name, test_support.TESTFN) - self.assertEquals(f.buffer.name, test_support.TESTFN) - self.assertEquals(f.buffer.raw.name, test_support.TESTFN) + f = self.open(support.TESTFN, "U") + self.assertEquals(f.name, support.TESTFN) + self.assertEquals(f.buffer.name, support.TESTFN) + self.assertEquals(f.buffer.raw.name, support.TESTFN) self.assertEquals(f.mode, "U") self.assertEquals(f.buffer.mode, "rb") self.assertEquals(f.buffer.raw.mode, "rb") f.close() - f = io.open(test_support.TESTFN, "w+") + f = self.open(support.TESTFN, "w+") self.assertEquals(f.mode, "w+") self.assertEquals(f.buffer.mode, "rb+") # Does it really matter? self.assertEquals(f.buffer.raw.mode, "rb+") - g = io.open(f.fileno(), "wb", closefd=False) + g = self.open(f.fileno(), "wb", closefd=False) self.assertEquals(g.mode, "wb") self.assertEquals(g.raw.mode, "wb") self.assertEquals(g.name, f.fileno()) @@ -1335,13 +2230,138 @@ class MiscIOTest(unittest.TestCase): f.close() g.close() + def test_io_after_close(self): + for kwargs in [ + {"mode": "w"}, + {"mode": "wb"}, + {"mode": "w", "buffering": 1}, + {"mode": "w", "buffering": 2}, + {"mode": "wb", "buffering": 0}, + {"mode": "r"}, + {"mode": "rb"}, + {"mode": "r", "buffering": 1}, + {"mode": "r", "buffering": 2}, + {"mode": "rb", "buffering": 0}, + {"mode": "w+"}, + {"mode": "w+b"}, + {"mode": "w+", "buffering": 1}, + {"mode": "w+", "buffering": 2}, + {"mode": "w+b", "buffering": 0}, + ]: + f = self.open(support.TESTFN, **kwargs) + f.close() + self.assertRaises(ValueError, f.flush) + self.assertRaises(ValueError, f.fileno) + self.assertRaises(ValueError, f.isatty) + self.assertRaises(ValueError, f.__iter__) + if hasattr(f, "peek"): + self.assertRaises(ValueError, f.peek, 1) + self.assertRaises(ValueError, f.read) + if hasattr(f, "read1"): + self.assertRaises(ValueError, f.read1, 1024) + if hasattr(f, "readinto"): + self.assertRaises(ValueError, f.readinto, bytearray(1024)) + self.assertRaises(ValueError, f.readline) + self.assertRaises(ValueError, f.readlines) + self.assertRaises(ValueError, f.seek, 0) + self.assertRaises(ValueError, f.tell) + self.assertRaises(ValueError, f.truncate) + self.assertRaises(ValueError, f.write, + b"" if "b" in kwargs['mode'] else "") + self.assertRaises(ValueError, f.writelines, []) + self.assertRaises(ValueError, next, f) + + def test_blockingioerror(self): + # Various BlockingIOError issues + self.assertRaises(TypeError, self.BlockingIOError) + self.assertRaises(TypeError, self.BlockingIOError, 1) + self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) + self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) + b = self.BlockingIOError(1, "") + self.assertEqual(b.characters_written, 0) + class C(unicode): + pass + c = C("") + b = self.BlockingIOError(1, c) + c.b = b + b.c = c + wr = weakref.ref(c) + del c, b + support.gc_collect() + self.assert_(wr() is None, wr) + + def test_abcs(self): + # Test the visible base classes are ABCs. + self.assertTrue(isinstance(self.IOBase, abc.ABCMeta)) + self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta)) + self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta)) + self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta)) + + def _check_abc_inheritance(self, abcmodule): + with self.open(support.TESTFN, "wb", buffering=0) as f: + self.assertTrue(isinstance(f, abcmodule.IOBase)) + self.assertTrue(isinstance(f, abcmodule.RawIOBase)) + self.assertFalse(isinstance(f, abcmodule.BufferedIOBase)) + self.assertFalse(isinstance(f, abcmodule.TextIOBase)) + with self.open(support.TESTFN, "wb") as f: + self.assertTrue(isinstance(f, abcmodule.IOBase)) + self.assertFalse(isinstance(f, abcmodule.RawIOBase)) + self.assertTrue(isinstance(f, abcmodule.BufferedIOBase)) + self.assertFalse(isinstance(f, abcmodule.TextIOBase)) + with self.open(support.TESTFN, "w") as f: + self.assertTrue(isinstance(f, abcmodule.IOBase)) + self.assertFalse(isinstance(f, abcmodule.RawIOBase)) + self.assertFalse(isinstance(f, abcmodule.BufferedIOBase)) + self.assertTrue(isinstance(f, abcmodule.TextIOBase)) + + def test_abc_inheritance(self): + # Test implementations inherit from their respective ABCs + self._check_abc_inheritance(self) + + def test_abc_inheritance_official(self): + # Test implementations inherit from the official ABCs of the + # baseline "io" module. + self._check_abc_inheritance(io) + +class CMiscIOTest(MiscIOTest): + io = io + +class PyMiscIOTest(MiscIOTest): + io = pyio def test_main(): - test_support.run_unittest(IOTest, BytesIOTest, StringIOTest, - BufferedReaderTest, BufferedWriterTest, - BufferedRWPairTest, BufferedRandomTest, - StatefulIncrementalDecoderTest, - TextIOWrapperTest, MiscIOTest) + tests = (CIOTest, PyIOTest, + CBufferedReaderTest, PyBufferedReaderTest, + CBufferedWriterTest, PyBufferedWriterTest, + CBufferedRWPairTest, PyBufferedRWPairTest, + CBufferedRandomTest, PyBufferedRandomTest, + StatefulIncrementalDecoderTest, + CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, + CTextIOWrapperTest, PyTextIOWrapperTest, + CMiscIOTest, PyMiscIOTest, + ) + + # Put the namespaces of the IO module we are testing and some useful mock + # classes in the __dict__ of each test. + mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, + MockNonBlockWriterIO) + all_members = io.__all__ + ["IncrementalNewlineDecoder"] + c_io_ns = dict((name, getattr(io, name)) for name in all_members) + py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) + globs = globals() + c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) + py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) + # Avoid turning open into a bound method. + py_io_ns["open"] = pyio.OpenWrapper + for test in tests: + if test.__name__.startswith("C"): + for name, obj in c_io_ns.items(): + setattr(test, name, obj) + elif test.__name__.startswith("Py"): + for name, obj in py_io_ns.items(): + setattr(test, name, obj) + + support.run_unittest(*tests) if __name__ == "__main__": - unittest.main() + test_main() |