diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_bufio.py | 43 | ||||
-rw-r--r-- | Lib/test/test_file.py | 421 | ||||
-rw-r--r-- | Lib/test/test_fileio.py | 185 | ||||
-rw-r--r-- | Lib/test/test_io.py | 1996 | ||||
-rw-r--r-- | Lib/test/test_largefile.py | 89 | ||||
-rw-r--r-- | Lib/test/test_memoryio.py | 217 | ||||
-rw-r--r-- | Lib/test/test_support.py | 17 | ||||
-rw-r--r-- | Lib/test/test_univnewlines.py | 81 |
8 files changed, 2077 insertions, 972 deletions
diff --git a/Lib/test/test_bufio.py b/Lib/test/test_bufio.py index 4f5de10..b2c1bf8 100644 --- a/Lib/test/test_bufio.py +++ b/Lib/test/test_bufio.py @@ -1,12 +1,15 @@ import unittest -from test import test_support +from test import test_support as support -# Simple test to ensure that optimizations in fileobject.c deliver -# the expected results. For best testing, run this under a debug-build -# Python too (to exercise asserts in the C code). +import io # C implementation. +import _pyio as pyio # Python implementation. -lengths = range(1, 257) + [512, 1000, 1024, 2048, 4096, 8192, 10000, - 16384, 32768, 65536, 1000000] +# Simple test to ensure that optimizations in the IO library deliver the +# expected results. For best testing, run this under a debug-build Python too +# (to exercise asserts in the C code). + +lengths = list(range(1, 257)) + [512, 1000, 1024, 2048, 4096, 8192, 10000, + 16384, 32768, 65536, 1000000] class BufferSizeTest(unittest.TestCase): def try_one(self, s): @@ -14,27 +17,27 @@ class BufferSizeTest(unittest.TestCase): # .readline()s deliver what we wrote. # Ensure we can open TESTFN for writing. - test_support.unlink(test_support.TESTFN) + support.unlink(support.TESTFN) # Since C doesn't guarantee we can write/read arbitrary bytes in text # files, use binary mode. - f = open(test_support.TESTFN, "wb") + f = self.open(support.TESTFN, "wb") try: # write once with \n and once without f.write(s) - f.write("\n") + f.write(b"\n") f.write(s) f.close() - f = open(test_support.TESTFN, "rb") + f = open(support.TESTFN, "rb") line = f.readline() - self.assertEqual(line, s + "\n") + self.assertEqual(line, s + b"\n") line = f.readline() self.assertEqual(line, s) line = f.readline() self.assert_(not line) # Must be at EOF f.close() finally: - test_support.unlink(test_support.TESTFN) + support.unlink(support.TESTFN) def drive_one(self, pattern): for length in lengths: @@ -47,19 +50,27 @@ class BufferSizeTest(unittest.TestCase): teststring = pattern * q + pattern[:r] self.assertEqual(len(teststring), length) self.try_one(teststring) - self.try_one(teststring + "x") + self.try_one(teststring + b"x") self.try_one(teststring[:-1]) def test_primepat(self): # A pattern with prime length, to avoid simple relationships with # stdio buffer sizes. - self.drive_one("1234567890\00\01\02\03\04\05\06") + self.drive_one(b"1234567890\00\01\02\03\04\05\06") def test_nullpat(self): - self.drive_one("\0" * 1000) + self.drive_one(bytes(1000)) + + +class CBufferSizeTest(BufferSizeTest): + open = io.open + +class PyBufferSizeTest(BufferSizeTest): + open = staticmethod(pyio.open) + def test_main(): - test_support.run_unittest(BufferSizeTest) + support.run_unittest(CBufferSizeTest, PyBufferSizeTest) if __name__ == "__main__": test_main() diff --git a/Lib/test/test_file.py b/Lib/test/test_file.py index a134a89..4b0c759 100644 --- a/Lib/test/test_file.py +++ b/Lib/test/test_file.py @@ -1,13 +1,14 @@ +from __future__ import print_function + import sys import os import unittest -import itertools -import time -import threading from array import array from weakref import proxy -from test import test_support +import io +import _pyio as pyio + from test.test_support import TESTFN, findfile, run_unittest from UserList import UserList @@ -15,7 +16,7 @@ class AutoFileTests(unittest.TestCase): # file tests for which a test file is automatically set up def setUp(self): - self.f = open(TESTFN, 'wb') + self.f = self.open(TESTFN, 'wb') def tearDown(self): if self.f: @@ -25,7 +26,7 @@ class AutoFileTests(unittest.TestCase): def testWeakRefs(self): # verify weak references p = proxy(self.f) - p.write('teststring') + p.write(b'teststring') self.assertEquals(self.f.tell(), p.tell()) self.f.close() self.f = None @@ -34,35 +35,35 @@ class AutoFileTests(unittest.TestCase): def testAttributes(self): # verify expected attributes exist f = self.f - softspace = f.softspace f.name # merely shouldn't blow up f.mode # ditto f.closed # ditto - # verify softspace is writable - f.softspace = softspace # merely shouldn't blow up - - # verify the others aren't - for attr in 'name', 'mode', 'closed': - self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops') - def testReadinto(self): # verify readinto - self.f.write('12') + self.f.write(b'12') self.f.close() - a = array('c', 'x'*10) - self.f = open(TESTFN, 'rb') + a = array('b', b'x'*10) + self.f = self.open(TESTFN, 'rb') n = self.f.readinto(a) - self.assertEquals('12', a.tostring()[:n]) + self.assertEquals(b'12', a.tostring()[:n]) + + def testReadinto_text(self): + # verify readinto refuses text files + a = array('b', b'x'*10) + self.f.close() + self.f = self.open(TESTFN, 'r') + if hasattr(self.f, "readinto"): + self.assertRaises(TypeError, self.f.readinto, a) def testWritelinesUserList(self): # verify writelines with instance sequence - l = UserList(['1', '2']) + l = UserList([b'1', b'2']) self.f.writelines(l) self.f.close() - self.f = open(TESTFN, 'rb') + self.f = self.open(TESTFN, 'rb') buf = self.f.read() - self.assertEquals(buf, '12') + self.assertEquals(buf, b'12') def testWritelinesIntegers(self): # verify writelines with integers @@ -81,36 +82,43 @@ class AutoFileTests(unittest.TestCase): self.assertRaises(TypeError, self.f.writelines, [NonString(), NonString()]) - def testRepr(self): - # verify repr works - self.assert_(repr(self.f).startswith("<open file '" + TESTFN)) - def testErrors(self): f = self.f self.assertEquals(f.name, TESTFN) self.assert_(not f.isatty()) self.assert_(not f.closed) - self.assertRaises(TypeError, f.readinto, "") + if hasattr(f, "readinto"): + self.assertRaises((IOError, TypeError), f.readinto, "") f.close() self.assert_(f.closed) def testMethods(self): - methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto', - 'readline', 'readlines', 'seek', 'tell', 'truncate', - 'write', 'xreadlines', '__iter__'] - if sys.platform.startswith('atheos'): - methods.remove('truncate') + methods = [('fileno', ()), + ('flush', ()), + ('isatty', ()), + ('next', ()), + ('read', ()), + ('write', (b"",)), + ('readline', ()), + ('readlines', ()), + ('seek', (0,)), + ('tell', ()), + ('write', (b"",)), + ('writelines', ([],)), + ('__iter__', ()), + ] + if not sys.platform.startswith('atheos'): + methods.append(('truncate', ())) # __exit__ should close the file self.f.__exit__(None, None, None) self.assert_(self.f.closed) - for methodname in methods: + for methodname, args in methods: method = getattr(self.f, methodname) # should raise on closed file - self.assertRaises(ValueError, method) - self.assertRaises(ValueError, self.f.writelines, []) + self.assertRaises(ValueError, method, *args) # file is closed, __exit__ shouldn't do anything self.assertEquals(self.f.__exit__(None, None, None), None) @@ -123,70 +131,47 @@ class AutoFileTests(unittest.TestCase): def testReadWhenWriting(self): self.assertRaises(IOError, self.f.read) -class OtherFileTests(unittest.TestCase): +class CAutoFileTests(AutoFileTests): + open = io.open - def testOpenDir(self): - this_dir = os.path.dirname(__file__) - for mode in (None, "w"): - try: - if mode: - f = open(this_dir, mode) - else: - f = open(this_dir) - except IOError as e: - self.assertEqual(e.filename, this_dir) - else: - self.fail("opening a directory didn't raise an IOError") +class PyAutoFileTests(AutoFileTests): + open = staticmethod(pyio.open) + + +class OtherFileTests(unittest.TestCase): def testModeStrings(self): # check invalid mode strings for mode in ("", "aU", "wU+"): try: - f = open(TESTFN, mode) + f = self.open(TESTFN, mode) except ValueError: pass else: f.close() self.fail('%r is an invalid file mode' % mode) - # Some invalid modes fail on Windows, but pass on Unix - # Issue3965: avoid a crash on Windows when filename is unicode - for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')): - try: - f = open(name, "rr") - except (IOError, ValueError): - pass - else: - f.close() - def testStdin(self): # This causes the interpreter to exit on OSF1 v5.1. if sys.platform != 'osf1V5': - self.assertRaises(IOError, sys.stdin.seek, -1) + self.assertRaises((IOError, ValueError), sys.stdin.seek, -1) else: - print >>sys.__stdout__, ( + print(( ' Skipping sys.stdin.seek(-1), it may crash the interpreter.' - ' Test manually.') - self.assertRaises(IOError, sys.stdin.truncate) - - def testUnicodeOpen(self): - # verify repr works for unicode too - f = open(unicode(TESTFN), "w") - self.assert_(repr(f).startswith("<open file u'" + TESTFN)) - f.close() - os.unlink(TESTFN) + ' Test manually.'), file=sys.__stdout__) + self.assertRaises((IOError, ValueError), sys.stdin.truncate) def testBadModeArgument(self): # verify that we get a sensible error message for bad mode argument bad_mode = "qwerty" try: - f = open(TESTFN, bad_mode) - except ValueError, msg: - if msg[0] != 0: + f = self.open(TESTFN, bad_mode) + except ValueError as msg: + if msg.args[0] != 0: s = str(msg) if s.find(TESTFN) != -1 or s.find(bad_mode) == -1: self.fail("bad error message for invalid mode: %s" % s) - # if msg[0] == 0, we're probably on Windows where there may be + # if msg.args[0] == 0, we're probably on Windows where there may be # no obvious way to discover why open() failed. else: f.close() @@ -197,31 +182,32 @@ class OtherFileTests(unittest.TestCase): # misbehaviour especially with repeated close() calls for s in (-1, 0, 1, 512): try: - f = open(TESTFN, 'w', s) - f.write(str(s)) + f = self.open(TESTFN, 'wb', s) + f.write(str(s).encode("ascii")) f.close() f.close() - f = open(TESTFN, 'r', s) - d = int(f.read()) + f = self.open(TESTFN, 'rb', s) + d = int(f.read().decode("ascii")) f.close() f.close() - except IOError, msg: + except IOError as msg: self.fail('error setting buffer size %d: %s' % (s, str(msg))) self.assertEquals(d, s) def testTruncateOnWindows(self): + # SF bug <http://www.python.org/sf/801631> + # "file.truncate fault on windows" + os.unlink(TESTFN) + f = self.open(TESTFN, 'wb') - def bug801631(): - # SF bug <http://www.python.org/sf/801631> - # "file.truncate fault on windows" - f = open(TESTFN, 'wb') - f.write('12345678901') # 11 bytes + try: + f.write(b'12345678901') # 11 bytes f.close() - f = open(TESTFN,'rb+') + f = self.open(TESTFN,'rb+') data = f.read(5) - if data != '12345': + if data != b'12345': self.fail("Read on file opened for update failed %r" % data) if f.tell() != 5: self.fail("File pos after read wrong %d" % f.tell()) @@ -234,56 +220,42 @@ class OtherFileTests(unittest.TestCase): size = os.path.getsize(TESTFN) if size != 5: self.fail("File size after ftruncate wrong %d" % size) - - try: - bug801631() finally: + f.close() os.unlink(TESTFN) def testIteration(self): # Test the complex interaction when mixing file-iteration and the - # various read* methods. Ostensibly, the mixture could just be tested - # to work when it should work according to the Python language, - # instead of fail when it should fail according to the current CPython - # implementation. People don't always program Python the way they - # should, though, and the implemenation might change in subtle ways, - # so we explicitly test for errors, too; the test will just have to - # be updated when the implementation changes. + # various read* methods. dataoffset = 16384 - filler = "ham\n" + filler = b"ham\n" assert not dataoffset % len(filler), \ "dataoffset must be multiple of len(filler)" nchunks = dataoffset // len(filler) testlines = [ - "spam, spam and eggs\n", - "eggs, spam, ham and spam\n", - "saussages, spam, spam and eggs\n", - "spam, ham, spam and eggs\n", - "spam, spam, spam, spam, spam, ham, spam\n", - "wonderful spaaaaaam.\n" + b"spam, spam and eggs\n", + b"eggs, spam, ham and spam\n", + b"saussages, spam, spam and eggs\n", + b"spam, ham, spam and eggs\n", + b"spam, spam, spam, spam, spam, ham, spam\n", + b"wonderful spaaaaaam.\n" ] methods = [("readline", ()), ("read", ()), ("readlines", ()), - ("readinto", (array("c", " "*100),))] + ("readinto", (array("b", b" "*100),))] try: # Prepare the testfile - bag = open(TESTFN, "w") + bag = self.open(TESTFN, "wb") bag.write(filler * nchunks) bag.writelines(testlines) bag.close() # Test for appropriate errors mixing read* and iteration for methodname, args in methods: - f = open(TESTFN) - if f.next() != filler: + f = self.open(TESTFN, 'rb') + if next(f) != filler: self.fail, "Broken testfile" meth = getattr(f, methodname) - try: - meth(*args) - except ValueError: - pass - else: - self.fail("%s%r after next() didn't raise ValueError" % - (methodname, args)) + meth(*args) # This simply shouldn't fail f.close() # Test to see if harmless (by accident) mixing of read* and @@ -293,9 +265,9 @@ class OtherFileTests(unittest.TestCase): # ("h", "a", "m", "\n"), so 4096 lines of that should get us # exactly on the buffer boundary for any power-of-2 buffersize # between 4 and 16384 (inclusive). - f = open(TESTFN) + f = self.open(TESTFN, 'rb') for i in range(nchunks): - f.next() + next(f) testline = testlines.pop(0) try: line = f.readline() @@ -306,7 +278,7 @@ class OtherFileTests(unittest.TestCase): self.fail("readline() after next() with empty buffer " "failed. Got %r, expected %r" % (line, testline)) testline = testlines.pop(0) - buf = array("c", "\x00" * len(testline)) + buf = array("b", b"\x00" * len(testline)) try: f.readinto(buf) except ValueError: @@ -335,7 +307,7 @@ class OtherFileTests(unittest.TestCase): self.fail("readlines() after next() with empty buffer " "failed. Got %r, expected %r" % (line, testline)) # Reading after iteration hit EOF shouldn't hurt either - f = open(TESTFN) + f = self.open(TESTFN, 'rb') try: for line in f: pass @@ -351,222 +323,19 @@ class OtherFileTests(unittest.TestCase): finally: os.unlink(TESTFN) -class FileSubclassTests(unittest.TestCase): +class COtherFileTests(OtherFileTests): + open = io.open - def testExit(self): - # test that exiting with context calls subclass' close - class C(file): - def __init__(self, *args): - self.subclass_closed = False - file.__init__(self, *args) - def close(self): - self.subclass_closed = True - file.close(self) - - with C(TESTFN, 'w') as f: - pass - self.failUnless(f.subclass_closed) - - -class FileThreadingTests(unittest.TestCase): - # These tests check the ability to call various methods of file objects - # (including close()) concurrently without crashing the Python interpreter. - # See #815646, #595601 - - def setUp(self): - self.f = None - self.filename = TESTFN - with open(self.filename, "w") as f: - f.write("\n".join("0123456789")) - self._count_lock = threading.Lock() - self.close_count = 0 - self.close_success_count = 0 - - def tearDown(self): - if self.f: - try: - self.f.close() - except (EnvironmentError, ValueError): - pass - try: - os.remove(self.filename) - except EnvironmentError: - pass - - def _create_file(self): - self.f = open(self.filename, "w+") - - def _close_file(self): - with self._count_lock: - self.close_count += 1 - self.f.close() - with self._count_lock: - self.close_success_count += 1 - - def _close_and_reopen_file(self): - self._close_file() - # if close raises an exception thats fine, self.f remains valid so - # we don't need to reopen. - self._create_file() - - def _run_workers(self, func, nb_workers, duration=0.2): - with self._count_lock: - self.close_count = 0 - self.close_success_count = 0 - self.do_continue = True - threads = [] - try: - for i in range(nb_workers): - t = threading.Thread(target=func) - t.start() - threads.append(t) - for _ in xrange(100): - time.sleep(duration/100) - with self._count_lock: - if self.close_count-self.close_success_count > nb_workers+1: - if test_support.verbose: - print 'Q', - break - time.sleep(duration) - finally: - self.do_continue = False - for t in threads: - t.join() - - def _test_close_open_io(self, io_func, nb_workers=5): - def worker(): - self._create_file() - funcs = itertools.cycle(( - lambda: io_func(), - lambda: self._close_and_reopen_file(), - )) - for f in funcs: - if not self.do_continue: - break - try: - f() - except (IOError, ValueError): - pass - self._run_workers(worker, nb_workers) - if test_support.verbose: - # Useful verbose statistics when tuning this test to take - # less time to run but still ensuring that its still useful. - # - # the percent of close calls that raised an error - percent = 100. - 100.*self.close_success_count/self.close_count - print self.close_count, ('%.4f ' % percent), - - def test_close_open(self): - def io_func(): - pass - self._test_close_open_io(io_func) - - def test_close_open_flush(self): - def io_func(): - self.f.flush() - self._test_close_open_io(io_func) - - def test_close_open_iter(self): - def io_func(): - list(iter(self.f)) - self._test_close_open_io(io_func) - - def test_close_open_isatty(self): - def io_func(): - self.f.isatty() - self._test_close_open_io(io_func) - - def test_close_open_print(self): - def io_func(): - print >> self.f, '' - self._test_close_open_io(io_func) - - def test_close_open_read(self): - def io_func(): - self.f.read(0) - self._test_close_open_io(io_func) - - def test_close_open_readinto(self): - def io_func(): - a = array('c', 'xxxxx') - self.f.readinto(a) - self._test_close_open_io(io_func) - - def test_close_open_readline(self): - def io_func(): - self.f.readline() - self._test_close_open_io(io_func) - - def test_close_open_readlines(self): - def io_func(): - self.f.readlines() - self._test_close_open_io(io_func) - - def test_close_open_seek(self): - def io_func(): - self.f.seek(0, 0) - self._test_close_open_io(io_func) - - def test_close_open_tell(self): - def io_func(): - self.f.tell() - self._test_close_open_io(io_func) - - def test_close_open_truncate(self): - def io_func(): - self.f.truncate() - self._test_close_open_io(io_func) - - def test_close_open_write(self): - def io_func(): - self.f.write('') - self._test_close_open_io(io_func) - - def test_close_open_writelines(self): - def io_func(): - self.f.writelines('') - self._test_close_open_io(io_func) - - -class StdoutTests(unittest.TestCase): - - def test_move_stdout_on_write(self): - # Issue 3242: sys.stdout can be replaced (and freed) during a - # print statement; prevent a segfault in this case - save_stdout = sys.stdout - - class File: - def write(self, data): - if '\n' in data: - sys.stdout = save_stdout - - try: - sys.stdout = File() - print "some text" - finally: - sys.stdout = save_stdout - - def test_del_stdout_before_print(self): - # Issue 4597: 'print' with no argument wasn't reporting when - # sys.stdout was deleted. - save_stdout = sys.stdout - del sys.stdout - try: - print - except RuntimeError as e: - self.assertEquals(str(e), "lost sys.stdout") - else: - self.fail("Expected RuntimeError") - finally: - sys.stdout = save_stdout +class PyOtherFileTests(OtherFileTests): + open = staticmethod(pyio.open) def test_main(): # Historically, these tests have been sloppy about removing TESTFN. # So get rid of it no matter what. try: - run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests, - FileThreadingTests, StdoutTests) + run_unittest(CAutoFileTests, PyAutoFileTests, + COtherFileTests, PyOtherFileTests) finally: if os.path.exists(TESTFN): os.unlink(TESTFN) diff --git a/Lib/test/test_fileio.py b/Lib/test/test_fileio.py index 928fbec..e8d6362 100644 --- a/Lib/test/test_fileio.py +++ b/Lib/test/test_fileio.py @@ -1,23 +1,26 @@ # Adapted from test_file.py by Daniel Stutzbach -#from __future__ import unicode_literals + +from __future__ import unicode_literals import sys import os +import errno import unittest from array import array from weakref import proxy +from functools import wraps from test.test_support import (TESTFN, findfile, check_warnings, run_unittest, make_bad_fd) -from UserList import UserList +from test.test_support import py3k_bytes as bytes -import _fileio +from _io import FileIO as _FileIO class AutoFileTests(unittest.TestCase): # file tests for which a test file is automatically set up def setUp(self): - self.f = _fileio._FileIO(TESTFN, 'w') + self.f = _FileIO(TESTFN, 'w') def tearDown(self): if self.f: @@ -34,7 +37,7 @@ class AutoFileTests(unittest.TestCase): self.assertRaises(ReferenceError, getattr, p, 'tell') def testSeekTell(self): - self.f.write(bytes(bytearray(range(20)))) + self.f.write(bytes(range(20))) self.assertEquals(self.f.tell(), 20) self.f.seek(0) self.assertEquals(self.f.tell(), 0) @@ -61,17 +64,21 @@ class AutoFileTests(unittest.TestCase): def testReadinto(self): # verify readinto - self.f.write(bytes(bytearray([1, 2]))) + self.f.write(b"\x01\x02") self.f.close() - a = array('b', b'x'*10) - self.f = _fileio._FileIO(TESTFN, 'r') + a = array(b'b', b'x'*10) + self.f = _FileIO(TESTFN, 'r') n = self.f.readinto(a) - self.assertEquals(array('b', [1, 2]), a[:n]) + self.assertEquals(array(b'b', [1, 2]), a[:n]) def testRepr(self): - self.assertEquals(repr(self.f), - "_fileio._FileIO(%d, %s)" % (self.f.fileno(), - repr(self.f.mode))) + self.assertEquals(repr(self.f), "<_io.FileIO name=%r mode='%s'>" + % (self.f.name, self.f.mode)) + del self.f.name + self.assertEquals(repr(self.f), "<_io.FileIO fd=%r mode='%s'>" + % (self.f.fileno(), self.f.mode)) + self.f.close() + self.assertEquals(repr(self.f), "<_io.FileIO [closed]>") def testErrors(self): f = self.f @@ -81,7 +88,7 @@ class AutoFileTests(unittest.TestCase): self.assertRaises(ValueError, f.read, 10) # Open for reading f.close() self.assert_(f.closed) - f = _fileio._FileIO(TESTFN, 'r') + f = _FileIO(TESTFN, 'r') self.assertRaises(TypeError, f.readinto, "") self.assert_(not f.closed) f.close() @@ -107,31 +114,131 @@ class AutoFileTests(unittest.TestCase): # Windows always returns "[Errno 13]: Permission denied # Unix calls dircheck() and returns "[Errno 21]: Is a directory" try: - _fileio._FileIO('.', 'r') + _FileIO('.', 'r') except IOError as e: self.assertNotEqual(e.errno, 0) self.assertEqual(e.filename, ".") else: self.fail("Should have raised IOError") + #A set of functions testing that we get expected behaviour if someone has + #manually closed the internal file descriptor. First, a decorator: + def ClosedFD(func): + @wraps(func) + def wrapper(self): + #forcibly close the fd before invoking the problem function + f = self.f + os.close(f.fileno()) + try: + func(self, f) + finally: + try: + self.f.close() + except IOError: + pass + return wrapper + + def ClosedFDRaises(func): + @wraps(func) + def wrapper(self): + #forcibly close the fd before invoking the problem function + f = self.f + os.close(f.fileno()) + try: + func(self, f) + except IOError as e: + self.assertEqual(e.errno, errno.EBADF) + else: + self.fail("Should have raised IOError") + finally: + try: + self.f.close() + except IOError: + pass + return wrapper + + @ClosedFDRaises + def testErrnoOnClose(self, f): + f.close() + + @ClosedFDRaises + def testErrnoOnClosedWrite(self, f): + f.write('a') + + @ClosedFDRaises + def testErrnoOnClosedSeek(self, f): + f.seek(0) + + @ClosedFDRaises + def testErrnoOnClosedTell(self, f): + f.tell() + + @ClosedFDRaises + def testErrnoOnClosedTruncate(self, f): + f.truncate(0) + + @ClosedFD + def testErrnoOnClosedSeekable(self, f): + f.seekable() + + @ClosedFD + def testErrnoOnClosedReadable(self, f): + f.readable() + + @ClosedFD + def testErrnoOnClosedWritable(self, f): + f.writable() + + @ClosedFD + def testErrnoOnClosedFileno(self, f): + f.fileno() + + @ClosedFD + def testErrnoOnClosedIsatty(self, f): + self.assertEqual(f.isatty(), False) + + def ReopenForRead(self): + try: + self.f.close() + except IOError: + pass + self.f = _FileIO(TESTFN, 'r') + os.close(self.f.fileno()) + return self.f + + @ClosedFDRaises + def testErrnoOnClosedRead(self, f): + f = self.ReopenForRead() + f.read(1) + + @ClosedFDRaises + def testErrnoOnClosedReadall(self, f): + f = self.ReopenForRead() + f.readall() + + @ClosedFDRaises + def testErrnoOnClosedReadinto(self, f): + f = self.ReopenForRead() + a = array(b'b', b'x'*10) + f.readinto(a) class OtherFileTests(unittest.TestCase): def testAbles(self): try: - f = _fileio._FileIO(TESTFN, "w") + f = _FileIO(TESTFN, "w") self.assertEquals(f.readable(), False) self.assertEquals(f.writable(), True) self.assertEquals(f.seekable(), True) f.close() - f = _fileio._FileIO(TESTFN, "r") + f = _FileIO(TESTFN, "r") self.assertEquals(f.readable(), True) self.assertEquals(f.writable(), False) self.assertEquals(f.seekable(), True) f.close() - f = _fileio._FileIO(TESTFN, "a+") + f = _FileIO(TESTFN, "a+") self.assertEquals(f.readable(), True) self.assertEquals(f.writable(), True) self.assertEquals(f.seekable(), True) @@ -140,14 +247,14 @@ class OtherFileTests(unittest.TestCase): if sys.platform != "win32": try: - f = _fileio._FileIO("/dev/tty", "a") + f = _FileIO("/dev/tty", "a") except EnvironmentError: # When run in a cron job there just aren't any # ttys, so skip the test. This also handles other # OS'es that don't support /dev/tty. pass else: - f = _fileio._FileIO("/dev/tty", "a") + f = _FileIO("/dev/tty", "a") self.assertEquals(f.readable(), False) self.assertEquals(f.writable(), True) if sys.platform != "darwin" and \ @@ -164,7 +271,7 @@ class OtherFileTests(unittest.TestCase): # check invalid mode strings for mode in ("", "aU", "wU+", "rw", "rt"): try: - f = _fileio._FileIO(TESTFN, mode) + f = _FileIO(TESTFN, mode) except ValueError: pass else: @@ -173,19 +280,35 @@ class OtherFileTests(unittest.TestCase): def testUnicodeOpen(self): # verify repr works for unicode too - f = _fileio._FileIO(str(TESTFN), "w") + f = _FileIO(str(TESTFN), "w") f.close() os.unlink(TESTFN) + def testBytesOpen(self): + # Opening a bytes filename + try: + fn = TESTFN.encode("ascii") + except UnicodeEncodeError: + # Skip test + return + f = _FileIO(fn, "w") + try: + f.write(b"abc") + f.close() + with open(TESTFN, "rb") as f: + self.assertEquals(f.read(), b"abc") + finally: + os.unlink(TESTFN) + def testInvalidFd(self): - self.assertRaises(ValueError, _fileio._FileIO, -10) - self.assertRaises(OSError, _fileio._FileIO, make_bad_fd()) + self.assertRaises(ValueError, _FileIO, -10) + self.assertRaises(OSError, _FileIO, make_bad_fd()) def testBadModeArgument(self): # verify that we get a sensible error message for bad mode argument bad_mode = "qwerty" try: - f = _fileio._FileIO(TESTFN, bad_mode) + f = _FileIO(TESTFN, bad_mode) except ValueError as msg: if msg.args[0] != 0: s = str(msg) @@ -201,13 +324,13 @@ class OtherFileTests(unittest.TestCase): def bug801631(): # SF bug <http://www.python.org/sf/801631> # "file.truncate fault on windows" - f = _fileio._FileIO(TESTFN, 'w') - f.write(bytes(bytearray(range(11)))) + f = _FileIO(TESTFN, 'w') + f.write(bytes(range(11))) f.close() - f = _fileio._FileIO(TESTFN,'r+') + f = _FileIO(TESTFN,'r+') data = f.read(5) - if data != bytes(bytearray(range(5))): + if data != bytes(range(5)): self.fail("Read on file opened for update failed %r" % data) if f.tell() != 5: self.fail("File pos after read wrong %d" % f.tell()) @@ -245,14 +368,14 @@ class OtherFileTests(unittest.TestCase): pass def testInvalidInit(self): - self.assertRaises(TypeError, _fileio._FileIO, "1", 0, 0) + self.assertRaises(TypeError, _FileIO, "1", 0, 0) def testWarnings(self): with check_warnings() as w: self.assertEqual(w.warnings, []) - self.assertRaises(TypeError, _fileio._FileIO, []) + self.assertRaises(TypeError, _FileIO, []) self.assertEqual(w.warnings, []) - self.assertRaises(ValueError, _fileio._FileIO, "/some/invalid/name", "rt") + self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt") self.assertEqual(w.warnings, []) diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 02533c9..62b6142 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -1,4 +1,24 @@ -"""Unit tests for io.py.""" +"""Unit tests for the io module.""" + +# Tests of io are scattered over the test suite: +# * test_bufio - tests file buffering +# * test_memoryio - tests BytesIO and StringIO +# * test_fileio - tests FileIO +# * test_file - tests the file interface +# * test_io - tests everything else in the io module +# * test_univnewlines - tests universal newline support +# * test_largefile - tests operations on a file greater than 2**32 bytes +# (only enabled with -ulargefile) + +################################################################################ +# ATTENTION TEST WRITERS!!! +################################################################################ +# When writing tests for io, it's important to test both the C and Python +# implementations. This is usually done by writing a base test that refers to +# the type it is testing as a attribute. Then it provides custom subclasses to +# test both implementations. This file has lots of examples. +################################################################################ + from __future__ import print_function from __future__ import unicode_literals @@ -9,27 +29,43 @@ import array import threading import random import unittest -from itertools import chain, cycle -from test import test_support +import warnings +import weakref +import gc +import abc +from itertools import chain, cycle, count +from collections import deque +from test import test_support as support import codecs -import io # The module under test +import io # C implementation of io +import _pyio as pyio # Python implementation of io + +__metaclass__ = type +bytes = support.py3k_bytes + +def _default_chunk_size(): + """Get the default TextIOWrapper chunk size""" + with io.open(__file__, "r", encoding="latin1") as f: + return f._CHUNK_SIZE -class MockRawIO(io.RawIOBase): +class MockRawIO: def __init__(self, read_stack=()): self._read_stack = list(read_stack) self._write_stack = [] + self._reads = 0 def read(self, n=None): + self._reads += 1 try: return self._read_stack.pop(0) except: return b"" def write(self, b): - self._write_stack.append(b[:]) + self._write_stack.append(bytes(b)) return len(b) def writable(self): @@ -45,46 +81,156 @@ class MockRawIO(io.RawIOBase): return True def seek(self, pos, whence): - pass + return 0 # wrong but we gotta return something def tell(self): - return 42 + return 0 # same comment as above + + def readinto(self, buf): + self._reads += 1 + max_len = len(buf) + try: + data = self._read_stack[0] + except IndexError: + return 0 + if data is None: + del self._read_stack[0] + return None + n = len(data) + if len(data) <= max_len: + del self._read_stack[0] + buf[:n] = data + return n + else: + buf[:] = data[:max_len] + self._read_stack[0] = data[max_len:] + return max_len + def truncate(self, pos=None): + return pos -class MockFileIO(io.BytesIO): +class CMockRawIO(MockRawIO, io.RawIOBase): + pass + +class PyMockRawIO(MockRawIO, pyio.RawIOBase): + pass + + +class MisbehavedRawIO(MockRawIO): + def write(self, b): + return MockRawIO.write(self, b) * 2 + + def read(self, n=None): + return MockRawIO.read(self, n) * 2 + + def seek(self, pos, whence): + return -123 + + def tell(self): + return -456 + + def readinto(self, buf): + MockRawIO.readinto(self, buf) + return len(buf) * 5 + +class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): + pass + +class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): + pass + + +class CloseFailureIO(MockRawIO): + closed = 0 + + def close(self): + if not self.closed: + self.closed = 1 + raise IOError + +class CCloseFailureIO(CloseFailureIO, io.RawIOBase): + pass + +class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): + pass + + +class MockFileIO: def __init__(self, data): self.read_history = [] - io.BytesIO.__init__(self, data) + super(MockFileIO, self).__init__(data) def read(self, n=None): - res = io.BytesIO.read(self, n) + res = super(MockFileIO, self).read(n) self.read_history.append(None if res is None else len(res)) return res + def readinto(self, b): + res = super(MockFileIO, self).readinto(b) + self.read_history.append(res) + return res + +class CMockFileIO(MockFileIO, io.BytesIO): + pass + +class PyMockFileIO(MockFileIO, pyio.BytesIO): + pass + -class MockNonBlockWriterIO(io.RawIOBase): +class MockNonBlockWriterIO: - def __init__(self, blocking_script): - self._blocking_script = list(blocking_script) + def __init__(self): self._write_stack = [] + self._blocker_char = None - def write(self, b): - self._write_stack.append(b[:]) - n = self._blocking_script.pop(0) - if (n < 0): - raise io.BlockingIOError(0, "test blocking", -n) - else: - return n + def pop_written(self): + s = b"".join(self._write_stack) + self._write_stack[:] = [] + return s + + def block_on(self, char): + """Block when a given char is encountered.""" + self._blocker_char = char + + def readable(self): + return True + + def seekable(self): + return True def writable(self): return True + def write(self, b): + b = bytes(b) + n = -1 + if self._blocker_char: + try: + n = b.index(self._blocker_char) + except ValueError: + pass + else: + self._blocker_char = None + self._write_stack.append(b[:n]) + raise self.BlockingIOError(0, "test blocking", n) + self._write_stack.append(b) + return len(b) + +class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): + BlockingIOError = io.BlockingIOError + +class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): + BlockingIOError = pyio.BlockingIOError + class IOTest(unittest.TestCase): + def setUp(self): + support.unlink(support.TESTFN) + def tearDown(self): - test_support.unlink(test_support.TESTFN) + support.unlink(support.TESTFN) def write_ops(self, f): self.assertEqual(f.write(b"blah."), 5) @@ -149,60 +295,71 @@ class IOTest(unittest.TestCase): self.assertEqual(f.seek(-1, 2), self.LARGE) self.assertEqual(f.read(2), b"x") + def test_invalid_operations(self): + # Try writing on a file opened in read mode and vice-versa. + for mode in ("w", "wb"): + with open(support.TESTFN, mode) as fp: + self.assertRaises(IOError, fp.read) + self.assertRaises(IOError, fp.readline) + with open(support.TESTFN, "rb") as fp: + self.assertRaises(IOError, fp.write, b"blah") + self.assertRaises(IOError, fp.writelines, [b"blah\n"]) + with open(support.TESTFN, "r") as fp: + self.assertRaises(IOError, fp.write, "blah") + self.assertRaises(IOError, fp.writelines, ["blah\n"]) + def test_raw_file_io(self): - f = io.open(test_support.TESTFN, "wb", buffering=0) - self.assertEqual(f.readable(), False) - self.assertEqual(f.writable(), True) - self.assertEqual(f.seekable(), True) - self.write_ops(f) - f.close() - f = io.open(test_support.TESTFN, "rb", buffering=0) - self.assertEqual(f.readable(), True) - self.assertEqual(f.writable(), False) - self.assertEqual(f.seekable(), True) - self.read_ops(f) - f.close() + with self.open(support.TESTFN, "wb", buffering=0) as f: + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + self.write_ops(f) + with self.open(support.TESTFN, "rb", buffering=0) as f: + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), False) + self.assertEqual(f.seekable(), True) + self.read_ops(f) def test_buffered_file_io(self): - f = io.open(test_support.TESTFN, "wb") - self.assertEqual(f.readable(), False) - self.assertEqual(f.writable(), True) - self.assertEqual(f.seekable(), True) - self.write_ops(f) - f.close() - f = io.open(test_support.TESTFN, "rb") - self.assertEqual(f.readable(), True) - self.assertEqual(f.writable(), False) - self.assertEqual(f.seekable(), True) - self.read_ops(f, True) - f.close() + with self.open(support.TESTFN, "wb") as f: + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + self.write_ops(f) + with self.open(support.TESTFN, "rb") as f: + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), False) + self.assertEqual(f.seekable(), True) + self.read_ops(f, True) def test_readline(self): - f = io.open(test_support.TESTFN, "wb") - f.write(b"abc\ndef\nxyzzy\nfoo") - f.close() - f = io.open(test_support.TESTFN, "rb") - self.assertEqual(f.readline(), b"abc\n") - self.assertEqual(f.readline(10), b"def\n") - self.assertEqual(f.readline(2), b"xy") - self.assertEqual(f.readline(4), b"zzy\n") - self.assertEqual(f.readline(), b"foo") - f.close() + with self.open(support.TESTFN, "wb") as f: + f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") + with self.open(support.TESTFN, "rb") as f: + self.assertEqual(f.readline(), b"abc\n") + self.assertEqual(f.readline(10), b"def\n") + self.assertEqual(f.readline(2), b"xy") + self.assertEqual(f.readline(4), b"zzy\n") + self.assertEqual(f.readline(), b"foo\x00bar\n") + self.assertEqual(f.readline(), b"another line") + self.assertRaises(TypeError, f.readline, 5.3) + with self.open(support.TESTFN, "r") as f: + self.assertRaises(TypeError, f.readline, 5.3) def test_raw_bytes_io(self): - f = io.BytesIO() + f = self.BytesIO() self.write_ops(f) data = f.getvalue() self.assertEqual(data, b"hello world\n") - f = io.BytesIO(data) + f = self.BytesIO(data) self.read_ops(f, True) def test_large_file_ops(self): # On Windows and Mac OSX this test comsumes large resources; It takes # a long time to build the >2GB file and takes >2GB of disk space # therefore the resource must be enabled to run this test. - if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin': - if not test_support.is_resource_enabled("largefile"): + if sys.platform[:3] == 'win' or sys.platform == 'darwin': + if not support.is_resource_enabled("largefile"): print("\nTesting large file ops skipped on %s." % sys.platform, file=sys.stderr) print("It requires %d bytes and a long time." % self.LARGE, @@ -210,22 +367,20 @@ class IOTest(unittest.TestCase): print("Use 'regrtest.py -u largefile test_io' to run it.", file=sys.stderr) return - f = io.open(test_support.TESTFN, "w+b", 0) - self.large_file_ops(f) - f.close() - f = io.open(test_support.TESTFN, "w+b") - self.large_file_ops(f) - f.close() + with self.open(support.TESTFN, "w+b", 0) as f: + self.large_file_ops(f) + with self.open(support.TESTFN, "w+b") as f: + self.large_file_ops(f) def test_with_open(self): for bufsize in (0, 1, 100): f = None - with open(test_support.TESTFN, "wb", bufsize) as f: + with open(support.TESTFN, "wb", bufsize) as f: f.write(b"xxx") self.assertEqual(f.closed, True) f = None try: - with open(test_support.TESTFN, "wb", bufsize) as f: + with open(support.TESTFN, "wb", bufsize) as f: 1/0 except ZeroDivisionError: self.assertEqual(f.closed, True) @@ -234,60 +389,105 @@ class IOTest(unittest.TestCase): # issue 5008 def test_append_mode_tell(self): - with io.open(test_support.TESTFN, "wb") as f: + with self.open(support.TESTFN, "wb") as f: f.write(b"xxx") - with io.open(test_support.TESTFN, "ab", buffering=0) as f: + with self.open(support.TESTFN, "ab", buffering=0) as f: self.assertEqual(f.tell(), 3) - with io.open(test_support.TESTFN, "ab") as f: + with self.open(support.TESTFN, "ab") as f: self.assertEqual(f.tell(), 3) - with io.open(test_support.TESTFN, "a") as f: + with self.open(support.TESTFN, "a") as f: self.assert_(f.tell() > 0) def test_destructor(self): record = [] - class MyFileIO(io.FileIO): + class MyFileIO(self.FileIO): def __del__(self): record.append(1) - io.FileIO.__del__(self) + try: + f = super(MyFileIO, self).__del__ + except AttributeError: + pass + else: + f() def close(self): record.append(2) - io.FileIO.close(self) + super(MyFileIO, self).close() def flush(self): record.append(3) - io.FileIO.flush(self) - f = MyFileIO(test_support.TESTFN, "w") - f.write("xxx") + super(MyFileIO, self).flush() + f = MyFileIO(support.TESTFN, "wb") + f.write(b"xxx") + del f + support.gc_collect() + self.assertEqual(record, [1, 2, 3]) + with open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"xxx") + + def _check_base_destructor(self, base): + record = [] + class MyIO(base): + def __init__(self): + # This exercises the availability of attributes on object + # destruction. + # (in the C version, close() is called by the tp_dealloc + # function, not by __del__) + self.on_del = 1 + self.on_close = 2 + self.on_flush = 3 + def __del__(self): + record.append(self.on_del) + try: + f = super(MyIO, self).__del__ + except AttributeError: + pass + else: + f() + def close(self): + record.append(self.on_close) + super(MyIO, self).close() + def flush(self): + record.append(self.on_flush) + super(MyIO, self).flush() + f = MyIO() del f + support.gc_collect() self.assertEqual(record, [1, 2, 3]) + def test_IOBase_destructor(self): + self._check_base_destructor(self.IOBase) + + def test_RawIOBase_destructor(self): + self._check_base_destructor(self.RawIOBase) + + def test_BufferedIOBase_destructor(self): + self._check_base_destructor(self.BufferedIOBase) + + def test_TextIOBase_destructor(self): + self._check_base_destructor(self.TextIOBase) + def test_close_flushes(self): - f = io.open(test_support.TESTFN, "wb") - f.write(b"xxx") - f.close() - f = io.open(test_support.TESTFN, "rb") - self.assertEqual(f.read(), b"xxx") - f.close() + with self.open(support.TESTFN, "wb") as f: + f.write(b"xxx") + with self.open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"xxx") - def XXXtest_array_writes(self): - # XXX memory view not available yet - a = array.array('i', range(10)) - n = len(memoryview(a)) - f = io.open(test_support.TESTFN, "wb", 0) - self.assertEqual(f.write(a), n) - f.close() - f = io.open(test_support.TESTFN, "wb") - self.assertEqual(f.write(a), n) - f.close() + def test_array_writes(self): + a = array.array(b'i', range(10)) + n = len(a.tostring()) + with self.open(support.TESTFN, "wb", 0) as f: + self.assertEqual(f.write(a), n) + with self.open(support.TESTFN, "wb") as f: + self.assertEqual(f.write(a), n) def test_closefd(self): - self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w', + self.assertRaises(ValueError, self.open, support.TESTFN, 'w', closefd=False) - def testReadClosed(self): - with io.open(test_support.TESTFN, "w") as f: + def test_read_closed(self): + with self.open(support.TESTFN, "w") as f: f.write("egg\n") - with io.open(test_support.TESTFN, "r") as f: - file = io.open(f.fileno(), "r", closefd=False) + with self.open(support.TESTFN, "r") as f: + file = self.open(f.fileno(), "r", closefd=False) self.assertEqual(file.read(), "egg\n") file.seek(0) file.close() @@ -295,86 +495,203 @@ class IOTest(unittest.TestCase): def test_no_closefd_with_filename(self): # can't use closefd in combination with a file name - self.assertRaises(ValueError, - io.open, test_support.TESTFN, "r", closefd=False) + self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) def test_closefd_attr(self): - with io.open(test_support.TESTFN, "wb") as f: + with self.open(support.TESTFN, "wb") as f: f.write(b"egg\n") - with io.open(test_support.TESTFN, "r") as f: + with self.open(support.TESTFN, "r") as f: self.assertEqual(f.buffer.raw.closefd, True) - file = io.open(f.fileno(), "r", closefd=False) + file = self.open(f.fileno(), "r", closefd=False) self.assertEqual(file.buffer.raw.closefd, False) + def test_garbage_collection(self): + # FileIO objects are collected, and collecting them flushes + # all data to disk. + f = self.FileIO(support.TESTFN, "wb") + f.write(b"abcxxx") + f.f = f + wr = weakref.ref(f) + del f + support.gc_collect() + self.assert_(wr() is None, wr) + with open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"abcxxx") + + def test_unbounded_file(self): + # Issue #1174606: reading from an unbounded stream such as /dev/zero. + zero = "/dev/zero" + if not os.path.exists(zero): + self.skipTest("{0} does not exist".format(zero)) + if sys.maxsize > 0x7FFFFFFF: + self.skipTest("test can only run in a 32-bit address space") + if support.real_max_memuse < support._2G: + self.skipTest("test requires at least 2GB of memory") + with open(zero, "rb", buffering=0) as f: + self.assertRaises(OverflowError, f.read) + with open(zero, "rb") as f: + self.assertRaises(OverflowError, f.read) + with open(zero, "r") as f: + self.assertRaises(OverflowError, f.read) + +class CIOTest(IOTest): + pass + +class PyIOTest(IOTest): + test_array_writes = unittest.skip( + "len(array.array) returns number of elements rather than bytelength" + )(IOTest.test_array_writes) + + +class CommonBufferedTests: + # Tests common to BufferedReader, BufferedWriter and BufferedRandom + + def test_detach(self): + raw = self.MockRawIO() + buf = self.tp(raw) + self.assertIs(buf.detach(), raw) + self.assertRaises(ValueError, buf.detach) + + def test_fileno(self): + rawio = self.MockRawIO() + bufio = self.tp(rawio) -class MemorySeekTestMixin: - - def testInit(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - def testRead(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - self.assertEquals(buf[:1], bytesIo.read(1)) - self.assertEquals(buf[1:5], bytesIo.read(4)) - self.assertEquals(buf[5:], bytesIo.read(900)) - self.assertEquals(self.EOF, bytesIo.read()) - - def testReadNoArgs(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - self.assertEquals(buf, bytesIo.read()) - self.assertEquals(self.EOF, bytesIo.read()) - - def testSeek(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - bytesIo.read(5) - bytesIo.seek(0) - self.assertEquals(buf, bytesIo.read()) - - bytesIo.seek(3) - self.assertEquals(buf[3:], bytesIo.read()) - self.assertRaises(TypeError, bytesIo.seek, 0.0) - - def testTell(self): - buf = self.buftype("1234567890") - bytesIo = self.ioclass(buf) - - self.assertEquals(0, bytesIo.tell()) - bytesIo.seek(5) - self.assertEquals(5, bytesIo.tell()) - bytesIo.seek(10000) - self.assertEquals(10000, bytesIo.tell()) - - -class BytesIOTest(MemorySeekTestMixin, unittest.TestCase): - @staticmethod - def buftype(s): - return s.encode("utf-8") - ioclass = io.BytesIO - EOF = b"" - - -class StringIOTest(MemorySeekTestMixin, unittest.TestCase): - buftype = str - ioclass = io.StringIO - EOF = "" - + self.assertEquals(42, bufio.fileno()) -class BufferedReaderTest(unittest.TestCase): + def test_no_fileno(self): + # XXX will we always have fileno() function? If so, kill + # this test. Else, write it. + pass - def testRead(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedReader(rawio) + def test_invalid_args(self): + rawio = self.MockRawIO() + bufio = self.tp(rawio) + # Invalid whence + self.assertRaises(ValueError, bufio.seek, 0, -1) + self.assertRaises(ValueError, bufio.seek, 0, 3) + def test_override_destructor(self): + tp = self.tp + record = [] + class MyBufferedIO(tp): + def __del__(self): + record.append(1) + try: + f = super(MyBufferedIO, self).__del__ + except AttributeError: + pass + else: + f() + def close(self): + record.append(2) + super(MyBufferedIO, self).close() + def flush(self): + record.append(3) + super(MyBufferedIO, self).flush() + rawio = self.MockRawIO() + bufio = MyBufferedIO(rawio) + writable = bufio.writable() + del bufio + support.gc_collect() + if writable: + self.assertEqual(record, [1, 2, 3]) + else: + self.assertEqual(record, [1, 2]) + + def test_context_manager(self): + # Test usability as a context manager + rawio = self.MockRawIO() + bufio = self.tp(rawio) + def _with(): + with bufio: + pass + _with() + # bufio should now be closed, and using it a second time should raise + # a ValueError. + self.assertRaises(ValueError, _with) + + def test_error_through_destructor(self): + # Test that the exception state is not modified by a destructor, + # even if close() fails. + rawio = self.CloseFailureIO() + def f(): + self.tp(rawio).xyzzy + with support.captured_output("stderr") as s: + self.assertRaises(AttributeError, f) + s = s.getvalue().strip() + if s: + # The destructor *may* have printed an unraisable error, check it + self.assertEqual(len(s.splitlines()), 1) + self.assert_(s.startswith("Exception IOError: "), s) + self.assert_(s.endswith(" ignored"), s) + + def test_repr(self): + raw = self.MockRawIO() + b = self.tp(raw) + clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) + self.assertEqual(repr(b), "<%s>" % clsname) + raw.name = "dummy" + self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) + raw.name = b"dummy" + self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) + + +class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): + read_mode = "rb" + + def test_constructor(self): + rawio = self.MockRawIO([b"abc"]) + bufio = self.tp(rawio) + bufio.__init__(rawio) + bufio.__init__(rawio, buffer_size=1024) + bufio.__init__(rawio, buffer_size=16) + self.assertEquals(b"abc", bufio.read()) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) + rawio = self.MockRawIO([b"abc"]) + bufio.__init__(rawio) + self.assertEquals(b"abc", bufio.read()) + + def test_read(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) self.assertEquals(b"abcdef", bufio.read(6)) - - def testBuffering(self): + # Invalid args + self.assertRaises(ValueError, bufio.read, -2) + + def test_read1(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) + self.assertEquals(b"a", bufio.read(1)) + self.assertEquals(b"b", bufio.read1(1)) + self.assertEquals(rawio._reads, 1) + self.assertEquals(b"c", bufio.read1(100)) + self.assertEquals(rawio._reads, 1) + self.assertEquals(b"d", bufio.read1(100)) + self.assertEquals(rawio._reads, 2) + self.assertEquals(b"efg", bufio.read1(100)) + self.assertEquals(rawio._reads, 3) + self.assertEquals(b"", bufio.read1(100)) + # Invalid args + self.assertRaises(ValueError, bufio.read1, -1) + + def test_readinto(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) + b = bytearray(2) + self.assertEquals(bufio.readinto(b), 2) + self.assertEquals(b, b"ab") + self.assertEquals(bufio.readinto(b), 2) + self.assertEquals(b, b"cd") + self.assertEquals(bufio.readinto(b), 2) + self.assertEquals(b, b"ef") + self.assertEquals(bufio.readinto(b), 1) + self.assertEquals(b, b"gf") + self.assertEquals(bufio.readinto(b), 0) + self.assertEquals(b, b"gf") + + def test_buffering(self): data = b"abcdefghi" dlen = len(data) @@ -385,61 +702,52 @@ class BufferedReaderTest(unittest.TestCase): ] for bufsize, buf_read_sizes, raw_read_sizes in tests: - rawio = MockFileIO(data) - bufio = io.BufferedReader(rawio, buffer_size=bufsize) + rawio = self.MockFileIO(data) + bufio = self.tp(rawio, buffer_size=bufsize) pos = 0 for nbytes in buf_read_sizes: self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes]) pos += nbytes + # this is mildly implementation-dependent self.assertEquals(rawio.read_history, raw_read_sizes) - def testReadNonBlocking(self): + def test_read_non_blocking(self): # Inject some None's in there to simulate EWOULDBLOCK - rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None)) - bufio = io.BufferedReader(rawio) + rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) + bufio = self.tp(rawio) self.assertEquals(b"abcd", bufio.read(6)) self.assertEquals(b"e", bufio.read(1)) self.assertEquals(b"fg", bufio.read()) + self.assertEquals(b"", bufio.peek(1)) self.assert_(None is bufio.read()) self.assertEquals(b"", bufio.read()) - def testReadToEof(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedReader(rawio) + def test_read_past_eof(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) self.assertEquals(b"abcdefg", bufio.read(9000)) - def testReadNoArgs(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedReader(rawio) + def test_read_all(self): + rawio = self.MockRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) self.assertEquals(b"abcdefg", bufio.read()) - def testFileno(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedReader(rawio) - - self.assertEquals(42, bufio.fileno()) - - def testFilenoNoFileno(self): - # XXX will we always have fileno() function? If so, kill - # this test. Else, write it. - pass - - def testThreads(self): + def test_threads(self): try: # Write out many bytes with exactly the same number of 0's, # 1's... 255's. This will help us check that concurrent reading # doesn't duplicate or forget contents. N = 1000 - l = range(256) * N + l = list(range(256)) * N random.shuffle(l) s = bytes(bytearray(l)) - with io.open(test_support.TESTFN, "wb") as f: + with io.open(support.TESTFN, "wb") as f: f.write(s) - with io.open(test_support.TESTFN, "rb", buffering=0) as raw: - bufio = io.BufferedReader(raw, 8) + with io.open(support.TESTFN, self.read_mode, buffering=0) as raw: + bufio = self.tp(raw, 8) errors = [] results = [] def f(): @@ -467,82 +775,242 @@ class BufferedReaderTest(unittest.TestCase): c = bytes(bytearray([i])) self.assertEqual(s.count(c), N) finally: - test_support.unlink(test_support.TESTFN) - - + support.unlink(support.TESTFN) + + def test_misbehaved_io(self): + rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) + self.assertRaises(IOError, bufio.seek, 0) + self.assertRaises(IOError, bufio.tell) + +class CBufferedReaderTest(BufferedReaderTest): + tp = io.BufferedReader + + def test_constructor(self): + BufferedReaderTest.test_constructor(self) + # The allocation can succeed on 32-bit builds, e.g. with more + # than 2GB RAM and a 64-bit kernel. + if sys.maxsize > 0x7FFFFFFF: + rawio = self.MockRawIO() + bufio = self.tp(rawio) + self.assertRaises((OverflowError, MemoryError, ValueError), + bufio.__init__, rawio, sys.maxsize) + + def test_initialization(self): + rawio = self.MockRawIO([b"abc"]) + bufio = self.tp(rawio) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) + self.assertRaises(ValueError, bufio.read) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) + self.assertRaises(ValueError, bufio.read) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) + self.assertRaises(ValueError, bufio.read) + + def test_misbehaved_io_read(self): + rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) + bufio = self.tp(rawio) + # _pyio.BufferedReader seems to implement reading different, so that + # checking this is not so easy. + self.assertRaises(IOError, bufio.read, 10) + + def test_garbage_collection(self): + # C BufferedReader objects are collected. + # The Python version has __del__, so it ends into gc.garbage instead + rawio = self.FileIO(support.TESTFN, "w+b") + f = self.tp(rawio) + f.f = f + wr = weakref.ref(f) + del f + support.gc_collect() + self.assert_(wr() is None, wr) -class BufferedWriterTest(unittest.TestCase): +class PyBufferedReaderTest(BufferedReaderTest): + tp = pyio.BufferedReader - def testWrite(self): - # Write to the buffered IO but don't overflow the buffer. - writer = MockRawIO() - bufio = io.BufferedWriter(writer, 8) - bufio.write(b"abc") +class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): + write_mode = "wb" - self.assertFalse(writer._write_stack) + def test_constructor(self): + rawio = self.MockRawIO() + bufio = self.tp(rawio) + bufio.__init__(rawio) + bufio.__init__(rawio, buffer_size=1024) + bufio.__init__(rawio, buffer_size=16) + self.assertEquals(3, bufio.write(b"abc")) + bufio.flush() + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) + bufio.__init__(rawio) + self.assertEquals(3, bufio.write(b"ghi")) + bufio.flush() + self.assertEquals(b"".join(rawio._write_stack), b"abcghi") - def testWriteOverflow(self): - writer = MockRawIO() - bufio = io.BufferedWriter(writer, 8) + def test_detach_flush(self): + raw = self.MockRawIO() + buf = self.tp(raw) + buf.write(b"howdy!") + self.assertFalse(raw._write_stack) + buf.detach() + self.assertEqual(raw._write_stack, [b"howdy!"]) + def test_write(self): + # Write to the buffered IO but don't overflow the buffer. + writer = self.MockRawIO() + bufio = self.tp(writer, 8) bufio.write(b"abc") - bufio.write(b"defghijkl") - - self.assertEquals(b"abcdefghijkl", writer._write_stack[0]) - - def testWriteNonBlocking(self): - raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12)) - bufio = io.BufferedWriter(raw, 8, 16) - - bufio.write(b"asdf") - bufio.write(b"asdfa") - self.assertEquals(b"asdfasdfa", raw._write_stack[0]) - - bufio.write(b"asdfasdfasdf") - self.assertEquals(b"asdfasdfasdf", raw._write_stack[1]) - bufio.write(b"asdfasdfasdf") - self.assertEquals(b"dfasdfasdf", raw._write_stack[2]) - self.assertEquals(b"asdfasdfasdf", raw._write_stack[3]) - - bufio.write(b"asdfasdfasdf") - - # XXX I don't like this test. It relies too heavily on how the - # algorithm actually works, which we might change. Refactor - # later. - - def testFileno(self): - rawio = MockRawIO((b"abc", b"d", b"efg")) - bufio = io.BufferedWriter(rawio) - - self.assertEquals(42, bufio.fileno()) + self.assertFalse(writer._write_stack) - def testFlush(self): - writer = MockRawIO() - bufio = io.BufferedWriter(writer, 8) + def test_write_overflow(self): + writer = self.MockRawIO() + bufio = self.tp(writer, 8) + contents = b"abcdefghijklmnop" + for n in range(0, len(contents), 3): + bufio.write(contents[n:n+3]) + flushed = b"".join(writer._write_stack) + # At least (total - 8) bytes were implicitly flushed, perhaps more + # depending on the implementation. + self.assert_(flushed.startswith(contents[:-8]), flushed) + + def check_writes(self, intermediate_func): + # Lots of writes, test the flushed output is as expected. + contents = bytes(range(256)) * 1000 + n = 0 + writer = self.MockRawIO() + bufio = self.tp(writer, 13) + # Generator of write sizes: repeat each N 15 times then proceed to N+1 + def gen_sizes(): + for size in count(1): + for i in range(15): + yield size + sizes = gen_sizes() + while n < len(contents): + size = min(next(sizes), len(contents) - n) + self.assertEquals(bufio.write(contents[n:n+size]), size) + intermediate_func(bufio) + n += size + bufio.flush() + self.assertEquals(contents, + b"".join(writer._write_stack)) + + def test_writes(self): + self.check_writes(lambda bufio: None) + + def test_writes_and_flushes(self): + self.check_writes(lambda bufio: bufio.flush()) + + def test_writes_and_seeks(self): + def _seekabs(bufio): + pos = bufio.tell() + bufio.seek(pos + 1, 0) + bufio.seek(pos - 1, 0) + bufio.seek(pos, 0) + self.check_writes(_seekabs) + def _seekrel(bufio): + pos = bufio.seek(0, 1) + bufio.seek(+1, 1) + bufio.seek(-1, 1) + bufio.seek(pos, 0) + self.check_writes(_seekrel) + + def test_writes_and_truncates(self): + self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) + + def test_write_non_blocking(self): + raw = self.MockNonBlockWriterIO() + bufio = self.tp(raw, 8) + + self.assertEquals(bufio.write(b"abcd"), 4) + self.assertEquals(bufio.write(b"efghi"), 5) + # 1 byte will be written, the rest will be buffered + raw.block_on(b"k") + self.assertEquals(bufio.write(b"jklmn"), 5) + + # 8 bytes will be written, 8 will be buffered and the rest will be lost + raw.block_on(b"0") + try: + bufio.write(b"opqrwxyz0123456789") + except self.BlockingIOError as e: + written = e.characters_written + else: + self.fail("BlockingIOError should have been raised") + self.assertEquals(written, 16) + self.assertEquals(raw.pop_written(), + b"abcdefghijklmnopqrwxyz") + + self.assertEquals(bufio.write(b"ABCDEFGHI"), 9) + s = raw.pop_written() + # Previously buffered bytes were flushed + self.assertTrue(s.startswith(b"01234567A"), s) + + def test_write_and_rewind(self): + raw = io.BytesIO() + bufio = self.tp(raw, 4) + self.assertEqual(bufio.write(b"abcdef"), 6) + self.assertEqual(bufio.tell(), 6) + bufio.seek(0, 0) + self.assertEqual(bufio.write(b"XY"), 2) + bufio.seek(6, 0) + self.assertEqual(raw.getvalue(), b"XYcdef") + self.assertEqual(bufio.write(b"123456"), 6) + bufio.flush() + self.assertEqual(raw.getvalue(), b"XYcdef123456") + def test_flush(self): + writer = self.MockRawIO() + bufio = self.tp(writer, 8) bufio.write(b"abc") bufio.flush() + self.assertEquals(b"abc", writer._write_stack[0]) + def test_destructor(self): + writer = self.MockRawIO() + bufio = self.tp(writer, 8) + bufio.write(b"abc") + del bufio + support.gc_collect() self.assertEquals(b"abc", writer._write_stack[0]) - def testThreads(self): - # BufferedWriter should not raise exceptions or crash - # when called from multiple threads. + def test_truncate(self): + # Truncate implicitly flushes the buffer. + with io.open(support.TESTFN, self.write_mode, buffering=0) as raw: + bufio = self.tp(raw, 8) + bufio.write(b"abcdef") + self.assertEqual(bufio.truncate(3), 3) + self.assertEqual(bufio.tell(), 3) + with io.open(support.TESTFN, "rb", buffering=0) as f: + self.assertEqual(f.read(), b"abc") + + def test_threads(self): try: + # Write out many bytes from many threads and test they were + # all flushed. + N = 1000 + contents = bytes(range(256)) * N + sizes = cycle([1, 19]) + n = 0 + queue = deque() + while n < len(contents): + size = next(sizes) + queue.append(contents[n:n+size]) + n += size + del contents # We use a real file object because it allows us to # exercise situations where the GIL is released before # writing the buffer to the raw streams. This is in addition # to concurrency issues due to switching threads in the middle # of Python code. - with io.open(test_support.TESTFN, "wb", buffering=0) as raw: - bufio = io.BufferedWriter(raw, 8) + with io.open(support.TESTFN, self.write_mode, buffering=0) as raw: + bufio = self.tp(raw, 8) errors = [] def f(): try: - # Write enough bytes to flush the buffer - s = b"a" * 19 - for i in range(50): + while True: + try: + s = queue.popleft() + except IndexError: + return bufio.write(s) except Exception as e: errors.append(e) @@ -555,37 +1023,218 @@ class BufferedWriterTest(unittest.TestCase): t.join() self.assertFalse(errors, "the following exceptions were caught: %r" % errors) + bufio.close() + with io.open(support.TESTFN, "rb") as f: + s = f.read() + for i in range(256): + self.assertEquals(s.count(bytes([i])), N) finally: - test_support.unlink(test_support.TESTFN) + support.unlink(support.TESTFN) + + def test_misbehaved_io(self): + rawio = self.MisbehavedRawIO() + bufio = self.tp(rawio, 5) + self.assertRaises(IOError, bufio.seek, 0) + self.assertRaises(IOError, bufio.tell) + self.assertRaises(IOError, bufio.write, b"abcdef") + + def test_max_buffer_size_deprecation(self): + with support.check_warnings() as w: + warnings.simplefilter("always", DeprecationWarning) + self.tp(self.MockRawIO(), 8, 12) + self.assertEqual(len(w.warnings), 1) + warning = w.warnings[0] + self.assertTrue(warning.category is DeprecationWarning) + self.assertEqual(str(warning.message), + "max_buffer_size is deprecated") + + +class CBufferedWriterTest(BufferedWriterTest): + tp = io.BufferedWriter + + def test_constructor(self): + BufferedWriterTest.test_constructor(self) + # The allocation can succeed on 32-bit builds, e.g. with more + # than 2GB RAM and a 64-bit kernel. + if sys.maxsize > 0x7FFFFFFF: + rawio = self.MockRawIO() + bufio = self.tp(rawio) + self.assertRaises((OverflowError, MemoryError, ValueError), + bufio.__init__, rawio, sys.maxsize) + + def test_initialization(self): + rawio = self.MockRawIO() + bufio = self.tp(rawio) + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) + self.assertRaises(ValueError, bufio.write, b"def") + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) + self.assertRaises(ValueError, bufio.write, b"def") + self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) + self.assertRaises(ValueError, bufio.write, b"def") + + def test_garbage_collection(self): + # C BufferedWriter objects are collected, and collecting them flushes + # all data to disk. + # The Python version has __del__, so it ends into gc.garbage instead + rawio = self.FileIO(support.TESTFN, "w+b") + f = self.tp(rawio) + f.write(b"123xxx") + f.x = f + wr = weakref.ref(f) + del f + support.gc_collect() + self.assert_(wr() is None, wr) + with open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"123xxx") + +class PyBufferedWriterTest(BufferedWriterTest): + tp = pyio.BufferedWriter class BufferedRWPairTest(unittest.TestCase): - def testRWPair(self): - r = MockRawIO(()) - w = MockRawIO() - pair = io.BufferedRWPair(r, w) + def test_constructor(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) self.assertFalse(pair.closed) - # XXX More Tests + def test_detach(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertRaises(self.UnsupportedOperation, pair.detach) + + def test_constructor_max_buffer_size_deprecation(self): + with support.check_warnings() as w: + warnings.simplefilter("always", DeprecationWarning) + self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) + self.assertEqual(len(w.warnings), 1) + warning = w.warnings[0] + self.assertTrue(warning.category is DeprecationWarning) + self.assertEqual(str(warning.message), + "max_buffer_size is deprecated") + + def test_constructor_with_not_readable(self): + class NotReadable(MockRawIO): + def readable(self): + return False + + self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) + + def test_constructor_with_not_writeable(self): + class NotWriteable(MockRawIO): + def writable(self): + return False + + self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) + + def test_read(self): + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) + + self.assertEqual(pair.read(3), b"abc") + self.assertEqual(pair.read(1), b"d") + self.assertEqual(pair.read(), b"ef") + + def test_read1(self): + # .read1() is delegated to the underlying reader object, so this test + # can be shallow. + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) + + self.assertEqual(pair.read1(3), b"abc") + + def test_readinto(self): + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) + + data = bytearray(5) + self.assertEqual(pair.readinto(data), 5) + self.assertEqual(data, b"abcde") + + def test_write(self): + w = self.MockRawIO() + pair = self.tp(self.MockRawIO(), w) + pair.write(b"abc") + pair.flush() + pair.write(b"def") + pair.flush() + self.assertEqual(w._write_stack, [b"abc", b"def"]) -class BufferedRandomTest(unittest.TestCase): + def test_peek(self): + pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) - def testReadAndWrite(self): - raw = MockRawIO((b"asdf", b"ghjk")) - rw = io.BufferedRandom(raw, 8, 12) + self.assertTrue(pair.peek(3).startswith(b"abc")) + self.assertEqual(pair.read(3), b"abc") + + def test_readable(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertTrue(pair.readable()) + + def test_writeable(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertTrue(pair.writable()) + + def test_seekable(self): + # BufferedRWPairs are never seekable, even if their readers and writers + # are. + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertFalse(pair.seekable()) + + # .flush() is delegated to the underlying writer object and has been + # tested in the test_write method. + + def test_close_and_closed(self): + pair = self.tp(self.MockRawIO(), self.MockRawIO()) + self.assertFalse(pair.closed) + pair.close() + self.assertTrue(pair.closed) + + def test_isatty(self): + class SelectableIsAtty(MockRawIO): + def __init__(self, isatty): + MockRawIO.__init__(self) + self._isatty = isatty + + def isatty(self): + return self._isatty + + pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) + self.assertFalse(pair.isatty()) + + pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) + self.assertTrue(pair.isatty()) + + pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) + self.assertTrue(pair.isatty()) + + pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) + self.assertTrue(pair.isatty()) + +class CBufferedRWPairTest(BufferedRWPairTest): + tp = io.BufferedRWPair + +class PyBufferedRWPairTest(BufferedRWPairTest): + tp = pyio.BufferedRWPair + + +class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): + read_mode = "rb+" + write_mode = "wb+" + + def test_constructor(self): + BufferedReaderTest.test_constructor(self) + BufferedWriterTest.test_constructor(self) + + def test_read_and_write(self): + raw = self.MockRawIO((b"asdf", b"ghjk")) + rw = self.tp(raw, 8) self.assertEqual(b"as", rw.read(2)) rw.write(b"ddd") rw.write(b"eee") self.assertFalse(raw._write_stack) # Buffer writes - self.assertEqual(b"ghjk", rw.read()) # This read forces write flush + self.assertEqual(b"ghjk", rw.read()) self.assertEquals(b"dddeee", raw._write_stack[0]) - def testSeekAndTell(self): - raw = io.BytesIO(b"asdfghjkl") - rw = io.BufferedRandom(raw) + def test_seek_and_tell(self): + raw = self.BytesIO(b"asdfghjkl") + rw = self.tp(raw) self.assertEquals(b"as", rw.read(2)) self.assertEquals(2, rw.tell()) @@ -603,6 +1252,115 @@ class BufferedRandomTest(unittest.TestCase): self.assertEquals(b"fl", rw.read(11)) self.assertRaises(TypeError, rw.seek, 0.0) + def check_flush_and_read(self, read_func): + raw = self.BytesIO(b"abcdefghi") + bufio = self.tp(raw) + + self.assertEquals(b"ab", read_func(bufio, 2)) + bufio.write(b"12") + self.assertEquals(b"ef", read_func(bufio, 2)) + self.assertEquals(6, bufio.tell()) + bufio.flush() + self.assertEquals(6, bufio.tell()) + self.assertEquals(b"ghi", read_func(bufio)) + raw.seek(0, 0) + raw.write(b"XYZ") + # flush() resets the read buffer + bufio.flush() + bufio.seek(0, 0) + self.assertEquals(b"XYZ", read_func(bufio, 3)) + + def test_flush_and_read(self): + self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) + + def test_flush_and_readinto(self): + def _readinto(bufio, n=-1): + b = bytearray(n if n >= 0 else 9999) + n = bufio.readinto(b) + return bytes(b[:n]) + self.check_flush_and_read(_readinto) + + def test_flush_and_peek(self): + def _peek(bufio, n=-1): + # This relies on the fact that the buffer can contain the whole + # raw stream, otherwise peek() can return less. + b = bufio.peek(n) + if n != -1: + b = b[:n] + bufio.seek(len(b), 1) + return b + self.check_flush_and_read(_peek) + + def test_flush_and_write(self): + raw = self.BytesIO(b"abcdefghi") + bufio = self.tp(raw) + + bufio.write(b"123") + bufio.flush() + bufio.write(b"45") + bufio.flush() + bufio.seek(0, 0) + self.assertEquals(b"12345fghi", raw.getvalue()) + self.assertEquals(b"12345fghi", bufio.read()) + + def test_threads(self): + BufferedReaderTest.test_threads(self) + BufferedWriterTest.test_threads(self) + + def test_writes_and_peek(self): + def _peek(bufio): + bufio.peek(1) + self.check_writes(_peek) + def _peek(bufio): + pos = bufio.tell() + bufio.seek(-1, 1) + bufio.peek(1) + bufio.seek(pos, 0) + self.check_writes(_peek) + + def test_writes_and_reads(self): + def _read(bufio): + bufio.seek(-1, 1) + bufio.read(1) + self.check_writes(_read) + + def test_writes_and_read1s(self): + def _read1(bufio): + bufio.seek(-1, 1) + bufio.read1(1) + self.check_writes(_read1) + + def test_writes_and_readintos(self): + def _read(bufio): + bufio.seek(-1, 1) + bufio.readinto(bytearray(1)) + self.check_writes(_read) + + def test_misbehaved_io(self): + BufferedReaderTest.test_misbehaved_io(self) + BufferedWriterTest.test_misbehaved_io(self) + +class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest): + tp = io.BufferedRandom + + def test_constructor(self): + BufferedRandomTest.test_constructor(self) + # The allocation can succeed on 32-bit builds, e.g. with more + # than 2GB RAM and a 64-bit kernel. + if sys.maxsize > 0x7FFFFFFF: + rawio = self.MockRawIO() + bufio = self.tp(rawio) + self.assertRaises((OverflowError, MemoryError, ValueError), + bufio.__init__, rawio, sys.maxsize) + + def test_garbage_collection(self): + CBufferedReaderTest.test_garbage_collection(self) + CBufferedWriterTest.test_garbage_collection(self) + +class PyBufferedRandomTest(BufferedRandomTest): + tp = pyio.BufferedRandom + + # To fully exercise seek/tell, the StatefulIncrementalDecoder has these # properties: # - A single output character can correspond to many bytes of input. @@ -736,7 +1494,7 @@ class StatefulIncrementalDecoderTest(unittest.TestCase): 'm--------------.') ] - def testDecoder(self): + def test_decoder(self): # Try a few one-shot test cases. for input, eof, output in self.test_cases: d = StatefulIncrementalDecoder() @@ -752,98 +1510,115 @@ class TextIOWrapperTest(unittest.TestCase): def setUp(self): self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") + support.unlink(support.TESTFN) def tearDown(self): - test_support.unlink(test_support.TESTFN) - - def testLineBuffering(self): - r = io.BytesIO() - b = io.BufferedWriter(r, 1000) - t = io.TextIOWrapper(b, newline="\n", line_buffering=True) - t.write(u"X") + support.unlink(support.TESTFN) + + def test_constructor(self): + r = self.BytesIO(b"\xc3\xa9\n\n") + b = self.BufferedReader(r, 1000) + t = self.TextIOWrapper(b) + t.__init__(b, encoding="latin1", newline="\r\n") + self.assertEquals(t.encoding, "latin1") + self.assertEquals(t.line_buffering, False) + t.__init__(b, encoding="utf8", line_buffering=True) + self.assertEquals(t.encoding, "utf8") + self.assertEquals(t.line_buffering, True) + self.assertEquals("\xe9\n", t.readline()) + self.assertRaises(TypeError, t.__init__, b, newline=42) + self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') + + def test_detach(self): + r = self.BytesIO() + b = self.BufferedWriter(r) + t = self.TextIOWrapper(b) + self.assertIs(t.detach(), b) + + t = self.TextIOWrapper(b, encoding="ascii") + t.write("howdy") + self.assertFalse(r.getvalue()) + t.detach() + self.assertEqual(r.getvalue(), b"howdy") + self.assertRaises(ValueError, t.detach) + + def test_repr(self): + raw = self.BytesIO("hello".encode("utf-8")) + b = self.BufferedReader(raw) + t = self.TextIOWrapper(b, encoding="utf-8") + modname = self.TextIOWrapper.__module__ + self.assertEqual(repr(t), + "<%s.TextIOWrapper encoding='utf-8'>" % modname) + raw.name = "dummy" + self.assertEqual(repr(t), + "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) + raw.name = b"dummy" + self.assertEqual(repr(t), + "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) + + def test_line_buffering(self): + r = self.BytesIO() + b = self.BufferedWriter(r, 1000) + t = self.TextIOWrapper(b, newline="\n", line_buffering=True) + t.write("X") self.assertEquals(r.getvalue(), b"") # No flush happened - t.write(u"Y\nZ") + t.write("Y\nZ") self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed - t.write(u"A\rB") + t.write("A\rB") self.assertEquals(r.getvalue(), b"XY\nZA\rB") - def testEncodingErrorsReading(self): + def test_encoding(self): + # Check the encoding attribute is always set, and valid + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="utf8") + self.assertEqual(t.encoding, "utf8") + t = self.TextIOWrapper(b) + self.assert_(t.encoding is not None) + codecs.lookup(t.encoding) + + def test_encoding_errors_reading(self): # (1) default - b = io.BytesIO(b"abc\n\xff\n") - t = io.TextIOWrapper(b, encoding="ascii") + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii") self.assertRaises(UnicodeError, t.read) # (2) explicit strict - b = io.BytesIO(b"abc\n\xff\n") - t = io.TextIOWrapper(b, encoding="ascii", errors="strict") + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="strict") self.assertRaises(UnicodeError, t.read) # (3) ignore - b = io.BytesIO(b"abc\n\xff\n") - t = io.TextIOWrapper(b, encoding="ascii", errors="ignore") + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") self.assertEquals(t.read(), "abc\n\n") # (4) replace - b = io.BytesIO(b"abc\n\xff\n") - t = io.TextIOWrapper(b, encoding="ascii", errors="replace") - self.assertEquals(t.read(), u"abc\n\ufffd\n") + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="replace") + self.assertEquals(t.read(), "abc\n\ufffd\n") - def testEncodingErrorsWriting(self): + def test_encoding_errors_writing(self): # (1) default - b = io.BytesIO() - t = io.TextIOWrapper(b, encoding="ascii") - self.assertRaises(UnicodeError, t.write, u"\xff") + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii") + self.assertRaises(UnicodeError, t.write, "\xff") # (2) explicit strict - b = io.BytesIO() - t = io.TextIOWrapper(b, encoding="ascii", errors="strict") - self.assertRaises(UnicodeError, t.write, u"\xff") + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="strict") + self.assertRaises(UnicodeError, t.write, "\xff") # (3) ignore - b = io.BytesIO() - t = io.TextIOWrapper(b, encoding="ascii", errors="ignore", + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", newline="\n") - t.write(u"abc\xffdef\n") + t.write("abc\xffdef\n") t.flush() self.assertEquals(b.getvalue(), b"abcdef\n") # (4) replace - b = io.BytesIO() - t = io.TextIOWrapper(b, encoding="ascii", errors="replace", + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="replace", newline="\n") - t.write(u"abc\xffdef\n") + t.write("abc\xffdef\n") t.flush() self.assertEquals(b.getvalue(), b"abc?def\n") - def testNewlinesInput(self): - testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" - normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") - for newline, expected in [ - (None, normalized.decode("ascii").splitlines(True)), - ("", testdata.decode("ascii").splitlines(True)), - ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), - ]: - buf = io.BytesIO(testdata) - txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) - self.assertEquals(txt.readlines(), expected) - txt.seek(0) - self.assertEquals(txt.read(), "".join(expected)) - - def testNewlinesOutput(self): - testdict = { - "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", - "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", - "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", - "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", - } - tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) - for newline, expected in tests: - buf = io.BytesIO() - txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) - txt.write("AAA\nB") - txt.write("BB\nCCC\n") - txt.write("X\rY\r\nZ") - txt.flush() - self.assertEquals(buf.closed, False) - self.assertEquals(buf.getvalue(), expected) - - def testNewlines(self): + def test_newlines(self): input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] tests = [ @@ -867,8 +1642,8 @@ class TextIOWrapperTest(unittest.TestCase): for do_reads in (False, True): for bufsize in range(1, 10): for newline, exp_lines in tests: - bufio = io.BufferedReader(io.BytesIO(data), bufsize) - textio = io.TextIOWrapper(bufio, newline=newline, + bufio = self.BufferedReader(self.BytesIO(data), bufsize) + textio = self.TextIOWrapper(bufio, newline=newline, encoding=encoding) if do_reads: got_lines = [] @@ -885,75 +1660,117 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(got_line, exp_line) self.assertEquals(len(got_lines), len(exp_lines)) - def testNewlinesInput(self): - testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" + def test_newlines_input(self): + testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") for newline, expected in [ (None, normalized.decode("ascii").splitlines(True)), ("", testdata.decode("ascii").splitlines(True)), - ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), + ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), ]: - buf = io.BytesIO(testdata) - txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) + buf = self.BytesIO(testdata) + txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) self.assertEquals(txt.readlines(), expected) txt.seek(0) self.assertEquals(txt.read(), "".join(expected)) - def testNewlinesOutput(self): - data = u"AAA\nBBB\rCCC\n" - data_lf = b"AAA\nBBB\rCCC\n" - data_cr = b"AAA\rBBB\rCCC\r" - data_crlf = b"AAA\r\nBBB\rCCC\r\n" - save_linesep = os.linesep - try: - for os.linesep, newline, expected in [ - ("\n", None, data_lf), - ("\r\n", None, data_crlf), - ("\n", "", data_lf), - ("\r\n", "", data_lf), - ("\n", "\n", data_lf), - ("\r\n", "\n", data_lf), - ("\n", "\r", data_cr), - ("\r\n", "\r", data_cr), - ("\n", "\r\n", data_crlf), - ("\r\n", "\r\n", data_crlf), - ]: - buf = io.BytesIO() - txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline) - txt.write(data) - txt.close() - self.assertEquals(buf.closed, True) - self.assertRaises(ValueError, buf.getvalue) - finally: - os.linesep = save_linesep + def test_newlines_output(self): + testdict = { + "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", + "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", + "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", + "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", + } + tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) + for newline, expected in tests: + buf = self.BytesIO() + txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) + txt.write("AAA\nB") + txt.write("BB\nCCC\n") + txt.write("X\rY\r\nZ") + txt.flush() + self.assertEquals(buf.closed, False) + self.assertEquals(buf.getvalue(), expected) + + def test_destructor(self): + l = [] + base = self.BytesIO + class MyBytesIO(base): + def close(self): + l.append(self.getvalue()) + base.close(self) + b = MyBytesIO() + t = self.TextIOWrapper(b, encoding="ascii") + t.write("abc") + del t + support.gc_collect() + self.assertEquals([b"abc"], l) + + def test_override_destructor(self): + record = [] + class MyTextIO(self.TextIOWrapper): + def __del__(self): + record.append(1) + try: + f = super(MyTextIO, self).__del__ + except AttributeError: + pass + else: + f() + def close(self): + record.append(2) + super(MyTextIO, self).close() + def flush(self): + record.append(3) + super(MyTextIO, self).flush() + b = self.BytesIO() + t = MyTextIO(b, encoding="ascii") + del t + support.gc_collect() + self.assertEqual(record, [1, 2, 3]) + + def test_error_through_destructor(self): + # Test that the exception state is not modified by a destructor, + # even if close() fails. + rawio = self.CloseFailureIO() + def f(): + self.TextIOWrapper(rawio).xyzzy + with support.captured_output("stderr") as s: + self.assertRaises(AttributeError, f) + s = s.getvalue().strip() + if s: + # The destructor *may* have printed an unraisable error, check it + self.assertEqual(len(s.splitlines()), 1) + self.assert_(s.startswith("Exception IOError: "), s) + self.assert_(s.endswith(" ignored"), s) # Systematic tests of the text I/O API - def testBasicIO(self): + def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": - f = io.open(test_support.TESTFN, "w+", encoding=enc) + f = self.open(support.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize - self.assertEquals(f.write(u"abc"), 3) + self.assertEquals(f.write("abc"), 3) f.close() - f = io.open(test_support.TESTFN, "r+", encoding=enc) + f = self.open(support.TESTFN, "r+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEquals(f.tell(), 0) - self.assertEquals(f.read(), u"abc") + self.assertEquals(f.read(), "abc") cookie = f.tell() self.assertEquals(f.seek(0), 0) - self.assertEquals(f.read(2), u"ab") - self.assertEquals(f.read(1), u"c") - self.assertEquals(f.read(1), u"") - self.assertEquals(f.read(), u"") + self.assertEquals(f.read(2), "ab") + self.assertEquals(f.read(1), "c") + self.assertEquals(f.read(1), "") + self.assertEquals(f.read(), "") self.assertEquals(f.tell(), cookie) self.assertEquals(f.seek(0), 0) self.assertEquals(f.seek(0, 2), cookie) - self.assertEquals(f.write(u"def"), 3) + self.assertEquals(f.write("def"), 3) self.assertEquals(f.seek(cookie), cookie) - self.assertEquals(f.read(), u"def") + self.assertEquals(f.read(), "def") if enc.startswith("utf"): self.multi_line_test(f, enc) f.close() @@ -961,13 +1778,13 @@ class TextIOWrapperTest(unittest.TestCase): def multi_line_test(self, f, enc): f.seek(0) f.truncate() - sample = u"s\xff\u0fff\uffff" + sample = "s\xff\u0fff\uffff" wlines = [] for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): chars = [] for i in range(size): chars.append(sample[i % len(sample)]) - line = u"".join(chars) + u"\n" + line = "".join(chars) + "\n" wlines.append((f.tell(), line)) f.write(line) f.seek(0) @@ -980,28 +1797,28 @@ class TextIOWrapperTest(unittest.TestCase): rlines.append((pos, line)) self.assertEquals(rlines, wlines) - def testTelling(self): - f = io.open(test_support.TESTFN, "w+", encoding="utf8") + def test_telling(self): + f = self.open(support.TESTFN, "w+", encoding="utf8") p0 = f.tell() - f.write(u"\xff\n") + f.write("\xff\n") p1 = f.tell() - f.write(u"\xff\n") + f.write("\xff\n") p2 = f.tell() f.seek(0) self.assertEquals(f.tell(), p0) - self.assertEquals(f.readline(), u"\xff\n") + self.assertEquals(f.readline(), "\xff\n") self.assertEquals(f.tell(), p1) - self.assertEquals(f.readline(), u"\xff\n") + self.assertEquals(f.readline(), "\xff\n") self.assertEquals(f.tell(), p2) f.seek(0) for line in f: - self.assertEquals(line, u"\xff\n") + self.assertEquals(line, "\xff\n") self.assertRaises(IOError, f.tell) self.assertEquals(f.tell(), p2) f.close() - def testSeeking(self): - chunk_size = io.TextIOWrapper._CHUNK_SIZE + def test_seeking(self): + chunk_size = _default_chunk_size() prefix_size = chunk_size - 2 u_prefix = "a" * prefix_size prefix = bytes(u_prefix.encode("utf-8")) @@ -1009,43 +1826,46 @@ class TextIOWrapperTest(unittest.TestCase): u_suffix = "\u8888\n" suffix = bytes(u_suffix.encode("utf-8")) line = prefix + suffix - f = io.open(test_support.TESTFN, "wb") + f = self.open(support.TESTFN, "wb") f.write(line*2) f.close() - f = io.open(test_support.TESTFN, "r", encoding="utf-8") + f = self.open(support.TESTFN, "r", encoding="utf-8") s = f.read(prefix_size) - self.assertEquals(s, unicode(prefix, "ascii")) + self.assertEquals(s, prefix.decode("ascii")) self.assertEquals(f.tell(), prefix_size) self.assertEquals(f.readline(), u_suffix) - def testSeekingToo(self): + def test_seeking_too(self): # Regression test for a specific bug data = b'\xe0\xbf\xbf\n' - f = io.open(test_support.TESTFN, "wb") + f = self.open(support.TESTFN, "wb") f.write(data) f.close() - f = io.open(test_support.TESTFN, "r", encoding="utf-8") + f = self.open(support.TESTFN, "r", encoding="utf-8") f._CHUNK_SIZE # Just test that it exists f._CHUNK_SIZE = 2 f.readline() f.tell() - def testSeekAndTell(self): - """Test seek/tell using the StatefulIncrementalDecoder.""" + def test_seek_and_tell(self): + #Test seek/tell using the StatefulIncrementalDecoder. + # Make test faster by doing smaller seeks + CHUNK_SIZE = 128 - def testSeekAndTellWithData(data, min_pos=0): + def test_seek_and_tell_with_data(data, min_pos=0): """Tell/seek to various points within a data stream and ensure that the decoded data returned by read() is consistent.""" - f = io.open(test_support.TESTFN, 'wb') + f = self.open(support.TESTFN, 'wb') f.write(data) f.close() - f = io.open(test_support.TESTFN, encoding='test_decoder') + f = self.open(support.TESTFN, encoding='test_decoder') + f._CHUNK_SIZE = CHUNK_SIZE decoded = f.read() f.close() for i in range(min_pos, len(decoded) + 1): # seek positions for j in [1, 5, len(decoded) - i]: # read lengths - f = io.open(test_support.TESTFN, encoding='test_decoder') + f = self.open(support.TESTFN, encoding='test_decoder') self.assertEquals(f.read(i), decoded[:i]) cookie = f.tell() self.assertEquals(f.read(j), decoded[i:i + j]) @@ -1060,23 +1880,22 @@ class TextIOWrapperTest(unittest.TestCase): try: # Try each test case. for input, _, _ in StatefulIncrementalDecoderTest.test_cases: - testSeekAndTellWithData(input) + test_seek_and_tell_with_data(input) # Position each test case so that it crosses a chunk boundary. - CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE for input, _, _ in StatefulIncrementalDecoderTest.test_cases: offset = CHUNK_SIZE - len(input)//2 prefix = b'.'*offset # Don't bother seeking into the prefix (takes too long). min_pos = offset*2 - testSeekAndTellWithData(prefix + input, min_pos) + test_seek_and_tell_with_data(prefix + input, min_pos) # Ensure our test decoder won't interfere with subsequent tests. finally: StatefulIncrementalDecoder.codecEnabled = 0 - def testEncodedWrites(self): - data = u"1234567890" + def test_encoded_writes(self): + data = "1234567890" tests = ("utf-16", "utf-16-le", "utf-16-be", @@ -1084,54 +1903,26 @@ class TextIOWrapperTest(unittest.TestCase): "utf-32-le", "utf-32-be") for encoding in tests: - buf = io.BytesIO() - f = io.TextIOWrapper(buf, encoding=encoding) + buf = self.BytesIO() + f = self.TextIOWrapper(buf, encoding=encoding) # Check if the BOM is written only once (see issue1753). f.write(data) f.write(data) f.seek(0) self.assertEquals(f.read(), data * 2) + f.seek(0) + self.assertEquals(f.read(), data * 2) self.assertEquals(buf.getvalue(), (data * 2).encode(encoding)) - def timingTest(self): - timer = time.time - enc = "utf8" - line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n" - nlines = 10000 - nchars = len(line) - nbytes = len(line.encode(enc)) - for chunk_size in (32, 64, 128, 256): - f = io.open(test_support.TESTFN, "w+", encoding=enc) - f._CHUNK_SIZE = chunk_size - t0 = timer() - for i in range(nlines): - f.write(line) - f.flush() - t1 = timer() - f.seek(0) - for line in f: - pass - t2 = timer() - f.seek(0) - while f.readline(): - pass - t3 = timer() - f.seek(0) - while f.readline(): - f.tell() - t4 = timer() - f.close() - if test_support.verbose: - print("\nTiming test: %d lines of %d characters (%d bytes)" % - (nlines, nchars, nbytes)) - print("File chunk size: %6s" % f._CHUNK_SIZE) - print("Writing: %6.3f seconds" % (t1-t0)) - print("Reading using iteration: %6.3f seconds" % (t2-t1)) - print("Reading using readline(): %6.3f seconds" % (t3-t2)) - print("Using readline()+tell(): %6.3f seconds" % (t4-t3)) - - def testReadOneByOne(self): - txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB")) + def test_unreadable(self): + class UnReadable(self.BytesIO): + def readable(self): + return False + txt = self.TextIOWrapper(UnReadable()) + self.assertRaises(IOError, txt.read) + + def test_read_one_by_one(self): + txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) reads = "" while True: c = txt.read(1) @@ -1141,9 +1932,9 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, "AA\nBB") # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. - def testReadByChunk(self): + def test_read_by_chunk(self): # make sure "\r\n" straddles 128 char boundary. - txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB")) + txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) reads = "" while True: c = txt.read(128) @@ -1153,7 +1944,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, "A"*127+"\nB") def test_issue1395_1(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") # read one char at a time reads = "" @@ -1165,7 +1956,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, self.normalized) def test_issue1395_2(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = "" @@ -1177,7 +1968,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, self.normalized) def test_issue1395_3(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) @@ -1188,7 +1979,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, self.normalized) def test_issue1395_4(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) @@ -1196,7 +1987,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(reads, self.normalized) def test_issue1395_5(self): - txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii") + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) @@ -1206,12 +1997,84 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(txt.read(4), "BBB\n") def test_issue2282(self): - buffer = io.BytesIO(self.testdata) - txt = io.TextIOWrapper(buffer, encoding="ascii") + buffer = self.BytesIO(self.testdata) + txt = self.TextIOWrapper(buffer, encoding="ascii") self.assertEqual(buffer.seekable(), txt.seekable()) - def check_newline_decoder_utf8(self, decoder): + @unittest.skip("Issue #6213 with incremental encoders") + def test_append_bom(self): + # The BOM is not written again when appending to a non-empty file + filename = support.TESTFN + for charset in ('utf-8-sig', 'utf-16', 'utf-32'): + with self.open(filename, 'w', encoding=charset) as f: + f.write('aaa') + pos = f.tell() + with self.open(filename, 'rb') as f: + self.assertEquals(f.read(), 'aaa'.encode(charset)) + + with self.open(filename, 'a', encoding=charset) as f: + f.write('xxx') + with self.open(filename, 'rb') as f: + self.assertEquals(f.read(), 'aaaxxx'.encode(charset)) + + @unittest.skip("Issue #6213 with incremental encoders") + def test_seek_bom(self): + # Same test, but when seeking manually + filename = support.TESTFN + for charset in ('utf-8-sig', 'utf-16', 'utf-32'): + with self.open(filename, 'w', encoding=charset) as f: + f.write('aaa') + pos = f.tell() + with self.open(filename, 'r+', encoding=charset) as f: + f.seek(pos) + f.write('zzz') + f.seek(0) + f.write('bbb') + with self.open(filename, 'rb') as f: + self.assertEquals(f.read(), 'bbbzzz'.encode(charset)) + + def test_errors_property(self): + with self.open(support.TESTFN, "w") as f: + self.assertEqual(f.errors, "strict") + with self.open(support.TESTFN, "w", errors="replace") as f: + self.assertEqual(f.errors, "replace") + + +class CTextIOWrapperTest(TextIOWrapperTest): + + def test_initialization(self): + r = self.BytesIO(b"\xc3\xa9\n\n") + b = self.BufferedReader(r, 1000) + t = self.TextIOWrapper(b) + self.assertRaises(TypeError, t.__init__, b, newline=42) + self.assertRaises(ValueError, t.read) + self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') + self.assertRaises(ValueError, t.read) + + def test_garbage_collection(self): + # C TextIOWrapper objects are collected, and collecting them flushes + # all data to disk. + # The Python version has __del__, so it ends in gc.garbage instead. + rawio = io.FileIO(support.TESTFN, "wb") + b = self.BufferedWriter(rawio) + t = self.TextIOWrapper(b, encoding="ascii") + t.write("456def") + t.x = t + wr = weakref.ref(t) + del t + support.gc_collect() + self.assert_(wr() is None, wr) + with open(support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"456def") + +class PyTextIOWrapperTest(TextIOWrapperTest): + pass + + +class IncrementalNewlineDecoderTest(unittest.TestCase): + + def check_newline_decoding_utf8(self, decoder): # UTF-8 specific tests for a newline decoder def _check_decode(b, s, **kwargs): # We exercise getstate() / setstate() as well as decode() @@ -1253,12 +2116,20 @@ class TextIOWrapperTest(unittest.TestCase): _check_decode(b'\xe8\xa2\x88\r', "\u8888") _check_decode(b'\n', "\n") - def check_newline_decoder(self, decoder, encoding): + def check_newline_decoding(self, decoder, encoding): result = [] - encoder = codecs.getincrementalencoder(encoding)() - def _decode_bytewise(s): - for b in encoder.encode(s): - result.append(decoder.decode(b)) + if encoding is not None: + encoder = codecs.getincrementalencoder(encoding)() + def _decode_bytewise(s): + # Decode one byte at a time + for b in encoder.encode(s): + result.append(decoder.decode(b)) + else: + encoder = None + def _decode_bytewise(s): + # Decode one char at a time + for c in s: + result.append(decoder.decode(c)) self.assertEquals(decoder.newlines, None) _decode_bytewise("abc\n\r") self.assertEquals(decoder.newlines, '\n') @@ -1271,22 +2142,47 @@ class TextIOWrapperTest(unittest.TestCase): _decode_bytewise("abc\r") self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc") decoder.reset() - self.assertEquals(decoder.decode("abc".encode(encoding)), "abc") + input = "abc" + if encoder is not None: + encoder.reset() + input = encoder.encode(input) + self.assertEquals(decoder.decode(input), "abc") self.assertEquals(decoder.newlines, None) def test_newline_decoder(self): encodings = ( - 'utf-8', 'latin-1', + # None meaning the IncrementalNewlineDecoder takes unicode input + # rather than bytes input + None, 'utf-8', 'latin-1', 'utf-16', 'utf-16-le', 'utf-16-be', 'utf-32', 'utf-32-le', 'utf-32-be', ) for enc in encodings: - decoder = codecs.getincrementaldecoder(enc)() - decoder = io.IncrementalNewlineDecoder(decoder, translate=True) - self.check_newline_decoder(decoder, enc) + decoder = enc and codecs.getincrementaldecoder(enc)() + decoder = self.IncrementalNewlineDecoder(decoder, translate=True) + self.check_newline_decoding(decoder, enc) decoder = codecs.getincrementaldecoder("utf-8")() - decoder = io.IncrementalNewlineDecoder(decoder, translate=True) - self.check_newline_decoder_utf8(decoder) + decoder = self.IncrementalNewlineDecoder(decoder, translate=True) + self.check_newline_decoding_utf8(decoder) + + def test_newline_bytes(self): + # Issue 5433: Excessive optimization in IncrementalNewlineDecoder + def _check(dec): + self.assertEquals(dec.newlines, None) + self.assertEquals(dec.decode("\u0D00"), "\u0D00") + self.assertEquals(dec.newlines, None) + self.assertEquals(dec.decode("\u0A00"), "\u0A00") + self.assertEquals(dec.newlines, None) + dec = self.IncrementalNewlineDecoder(None, translate=False) + _check(dec) + dec = self.IncrementalNewlineDecoder(None, translate=True) + _check(dec) + +class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): + pass + +class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): + pass # XXX Tests for open() @@ -1294,40 +2190,39 @@ class TextIOWrapperTest(unittest.TestCase): class MiscIOTest(unittest.TestCase): def tearDown(self): - test_support.unlink(test_support.TESTFN) + support.unlink(support.TESTFN) - def testImport__all__(self): - for name in io.__all__: - obj = getattr(io, name, None) + def test___all__(self): + for name in self.io.__all__: + obj = getattr(self.io, name, None) self.assertTrue(obj is not None, name) if name == "open": continue - elif "error" in name.lower(): + elif "error" in name.lower() or name == "UnsupportedOperation": self.assertTrue(issubclass(obj, Exception), name) elif not name.startswith("SEEK_"): - self.assertTrue(issubclass(obj, io.IOBase)) - + self.assertTrue(issubclass(obj, self.IOBase)) def test_attributes(self): - f = io.open(test_support.TESTFN, "wb", buffering=0) + f = self.open(support.TESTFN, "wb", buffering=0) self.assertEquals(f.mode, "wb") f.close() - f = io.open(test_support.TESTFN, "U") - self.assertEquals(f.name, test_support.TESTFN) - self.assertEquals(f.buffer.name, test_support.TESTFN) - self.assertEquals(f.buffer.raw.name, test_support.TESTFN) + f = self.open(support.TESTFN, "U") + self.assertEquals(f.name, support.TESTFN) + self.assertEquals(f.buffer.name, support.TESTFN) + self.assertEquals(f.buffer.raw.name, support.TESTFN) self.assertEquals(f.mode, "U") self.assertEquals(f.buffer.mode, "rb") self.assertEquals(f.buffer.raw.mode, "rb") f.close() - f = io.open(test_support.TESTFN, "w+") + f = self.open(support.TESTFN, "w+") self.assertEquals(f.mode, "w+") self.assertEquals(f.buffer.mode, "rb+") # Does it really matter? self.assertEquals(f.buffer.raw.mode, "rb+") - g = io.open(f.fileno(), "wb", closefd=False) + g = self.open(f.fileno(), "wb", closefd=False) self.assertEquals(g.mode, "wb") self.assertEquals(g.raw.mode, "wb") self.assertEquals(g.name, f.fileno()) @@ -1335,13 +2230,138 @@ class MiscIOTest(unittest.TestCase): f.close() g.close() + def test_io_after_close(self): + for kwargs in [ + {"mode": "w"}, + {"mode": "wb"}, + {"mode": "w", "buffering": 1}, + {"mode": "w", "buffering": 2}, + {"mode": "wb", "buffering": 0}, + {"mode": "r"}, + {"mode": "rb"}, + {"mode": "r", "buffering": 1}, + {"mode": "r", "buffering": 2}, + {"mode": "rb", "buffering": 0}, + {"mode": "w+"}, + {"mode": "w+b"}, + {"mode": "w+", "buffering": 1}, + {"mode": "w+", "buffering": 2}, + {"mode": "w+b", "buffering": 0}, + ]: + f = self.open(support.TESTFN, **kwargs) + f.close() + self.assertRaises(ValueError, f.flush) + self.assertRaises(ValueError, f.fileno) + self.assertRaises(ValueError, f.isatty) + self.assertRaises(ValueError, f.__iter__) + if hasattr(f, "peek"): + self.assertRaises(ValueError, f.peek, 1) + self.assertRaises(ValueError, f.read) + if hasattr(f, "read1"): + self.assertRaises(ValueError, f.read1, 1024) + if hasattr(f, "readinto"): + self.assertRaises(ValueError, f.readinto, bytearray(1024)) + self.assertRaises(ValueError, f.readline) + self.assertRaises(ValueError, f.readlines) + self.assertRaises(ValueError, f.seek, 0) + self.assertRaises(ValueError, f.tell) + self.assertRaises(ValueError, f.truncate) + self.assertRaises(ValueError, f.write, + b"" if "b" in kwargs['mode'] else "") + self.assertRaises(ValueError, f.writelines, []) + self.assertRaises(ValueError, next, f) + + def test_blockingioerror(self): + # Various BlockingIOError issues + self.assertRaises(TypeError, self.BlockingIOError) + self.assertRaises(TypeError, self.BlockingIOError, 1) + self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) + self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) + b = self.BlockingIOError(1, "") + self.assertEqual(b.characters_written, 0) + class C(unicode): + pass + c = C("") + b = self.BlockingIOError(1, c) + c.b = b + b.c = c + wr = weakref.ref(c) + del c, b + support.gc_collect() + self.assert_(wr() is None, wr) + + def test_abcs(self): + # Test the visible base classes are ABCs. + self.assertTrue(isinstance(self.IOBase, abc.ABCMeta)) + self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta)) + self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta)) + self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta)) + + def _check_abc_inheritance(self, abcmodule): + with self.open(support.TESTFN, "wb", buffering=0) as f: + self.assertTrue(isinstance(f, abcmodule.IOBase)) + self.assertTrue(isinstance(f, abcmodule.RawIOBase)) + self.assertFalse(isinstance(f, abcmodule.BufferedIOBase)) + self.assertFalse(isinstance(f, abcmodule.TextIOBase)) + with self.open(support.TESTFN, "wb") as f: + self.assertTrue(isinstance(f, abcmodule.IOBase)) + self.assertFalse(isinstance(f, abcmodule.RawIOBase)) + self.assertTrue(isinstance(f, abcmodule.BufferedIOBase)) + self.assertFalse(isinstance(f, abcmodule.TextIOBase)) + with self.open(support.TESTFN, "w") as f: + self.assertTrue(isinstance(f, abcmodule.IOBase)) + self.assertFalse(isinstance(f, abcmodule.RawIOBase)) + self.assertFalse(isinstance(f, abcmodule.BufferedIOBase)) + self.assertTrue(isinstance(f, abcmodule.TextIOBase)) + + def test_abc_inheritance(self): + # Test implementations inherit from their respective ABCs + self._check_abc_inheritance(self) + + def test_abc_inheritance_official(self): + # Test implementations inherit from the official ABCs of the + # baseline "io" module. + self._check_abc_inheritance(io) + +class CMiscIOTest(MiscIOTest): + io = io + +class PyMiscIOTest(MiscIOTest): + io = pyio def test_main(): - test_support.run_unittest(IOTest, BytesIOTest, StringIOTest, - BufferedReaderTest, BufferedWriterTest, - BufferedRWPairTest, BufferedRandomTest, - StatefulIncrementalDecoderTest, - TextIOWrapperTest, MiscIOTest) + tests = (CIOTest, PyIOTest, + CBufferedReaderTest, PyBufferedReaderTest, + CBufferedWriterTest, PyBufferedWriterTest, + CBufferedRWPairTest, PyBufferedRWPairTest, + CBufferedRandomTest, PyBufferedRandomTest, + StatefulIncrementalDecoderTest, + CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, + CTextIOWrapperTest, PyTextIOWrapperTest, + CMiscIOTest, PyMiscIOTest, + ) + + # Put the namespaces of the IO module we are testing and some useful mock + # classes in the __dict__ of each test. + mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, + MockNonBlockWriterIO) + all_members = io.__all__ + ["IncrementalNewlineDecoder"] + c_io_ns = dict((name, getattr(io, name)) for name in all_members) + py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) + globs = globals() + c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) + py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) + # Avoid turning open into a bound method. + py_io_ns["open"] = pyio.OpenWrapper + for test in tests: + if test.__name__.startswith("C"): + for name, obj in c_io_ns.items(): + setattr(test, name, obj) + elif test.__name__.startswith("Py"): + for name, obj in py_io_ns.items(): + setattr(test, name, obj) + + support.run_unittest(*tests) if __name__ == "__main__": - unittest.main() + test_main() diff --git a/Lib/test/test_largefile.py b/Lib/test/test_largefile.py index 0f1c190..5335e2c 100644 --- a/Lib/test/test_largefile.py +++ b/Lib/test/test_largefile.py @@ -1,12 +1,16 @@ """Test largefile support on system where this makes sense. """ +from __future__ import print_function + import os import stat import sys import unittest from test.test_support import run_unittest, TESTFN, verbose, requires, \ unlink +import io # C implementation of io +import _pyio as pyio # Python implementation of io try: import signal @@ -18,10 +22,10 @@ except (ImportError, AttributeError): pass # create >2GB file (2GB = 2147483648 bytes) -size = 2500000000L +size = 2500000000 -class TestCase(unittest.TestCase): +class LargeFileTest(unittest.TestCase): """Test that each file function works as expected for a large (i.e. > 2GB, do we have to check > 4GB) files. @@ -33,28 +37,28 @@ class TestCase(unittest.TestCase): def test_seek(self): if verbose: - print 'create large file via seek (may be sparse file) ...' - with open(TESTFN, 'wb') as f: - f.write('z') + print('create large file via seek (may be sparse file) ...') + with self.open(TESTFN, 'wb') as f: + f.write(b'z') f.seek(0) f.seek(size) - f.write('a') + f.write(b'a') f.flush() if verbose: - print 'check file size with os.fstat' + print('check file size with os.fstat') self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1) def test_osstat(self): if verbose: - print 'check file size with os.stat' + print('check file size with os.stat') self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1) def test_seek_read(self): if verbose: - print 'play around with seek() and read() with the built largefile' - with open(TESTFN, 'rb') as f: + print('play around with seek() and read() with the built largefile') + with self.open(TESTFN, 'rb') as f: self.assertEqual(f.tell(), 0) - self.assertEqual(f.read(1), 'z') + self.assertEqual(f.read(1), b'z') self.assertEqual(f.tell(), 1) f.seek(0) self.assertEqual(f.tell(), 0) @@ -77,15 +81,15 @@ class TestCase(unittest.TestCase): f.seek(size) self.assertEqual(f.tell(), size) # the 'a' that was written at the end of file above - self.assertEqual(f.read(1), 'a') + self.assertEqual(f.read(1), b'a') f.seek(-size-1, 1) - self.assertEqual(f.read(1), 'z') + self.assertEqual(f.read(1), b'z') self.assertEqual(f.tell(), 1) def test_lseek(self): if verbose: - print 'play around with os.lseek() with the built largefile' - with open(TESTFN, 'rb') as f: + print('play around with os.lseek() with the built largefile') + with self.open(TESTFN, 'rb') as f: self.assertEqual(os.lseek(f.fileno(), 0, 0), 0) self.assertEqual(os.lseek(f.fileno(), 42, 0), 42) self.assertEqual(os.lseek(f.fileno(), 42, 1), 84) @@ -95,16 +99,16 @@ class TestCase(unittest.TestCase): self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0) self.assertEqual(os.lseek(f.fileno(), size, 0), size) # the 'a' that was written at the end of file above - self.assertEqual(f.read(1), 'a') + self.assertEqual(f.read(1), b'a') def test_truncate(self): if verbose: - print 'try truncate' - with open(TESTFN, 'r+b') as f: + print('try truncate') + with self.open(TESTFN, 'r+b') as f: # this is already decided before start running the test suite # but we do it anyway for extra protection if not hasattr(f, 'truncate'): - raise unittest.SkipTest, "open().truncate() not available on this system" + raise unittest.SkipTest("open().truncate() not available on this system") f.seek(0, 2) # else we've lost track of the true size self.assertEqual(f.tell(), size+1) @@ -120,17 +124,25 @@ class TestCase(unittest.TestCase): newsize -= 1 f.seek(42) f.truncate(newsize) - self.assertEqual(f.tell(), 42) # else pointer moved - f.seek(0, 2) self.assertEqual(f.tell(), newsize) # else wasn't truncated - + f.seek(0, 2) + self.assertEqual(f.tell(), newsize) # XXX truncate(larger than true size) is ill-defined # across platform; cut it waaaaay back f.seek(0) f.truncate(1) - self.assertEqual(f.tell(), 0) # else pointer moved + self.assertEqual(f.tell(), 1) # else pointer moved + f.seek(0) self.assertEqual(len(f.read()), 1) # else wasn't truncated + def test_seekable(self): + # Issue #5016; seekable() can return False when the current position + # is negative when truncated to an int. + for pos in (2**31-1, 2**31, 2**31+1): + with self.open(TESTFN, 'rb') as f: + f.seek(pos) + self.assert_(f.seekable()) + def test_main(): # On Windows and Mac OSX this test comsumes large resources; It @@ -144,34 +156,39 @@ def test_main(): # Only run if the current filesystem supports large files. # (Skip this test on Windows, since we now always support # large files.) - f = open(TESTFN, 'wb') + f = open(TESTFN, 'wb', buffering=0) try: # 2**31 == 2147483648 - f.seek(2147483649L) + f.seek(2147483649) # Seeking is not enough of a test: you must write and # flush, too! - f.write("x") + f.write(b'x') f.flush() except (IOError, OverflowError): f.close() unlink(TESTFN) - raise unittest.SkipTest, "filesystem does not have largefile support" + raise unittest.SkipTest("filesystem does not have largefile support") else: f.close() suite = unittest.TestSuite() - suite.addTest(TestCase('test_seek')) - suite.addTest(TestCase('test_osstat')) - suite.addTest(TestCase('test_seek_read')) - suite.addTest(TestCase('test_lseek')) - with open(TESTFN, 'w') as f: - if hasattr(f, 'truncate'): - suite.addTest(TestCase('test_truncate')) - unlink(TESTFN) + for _open, prefix in [(io.open, 'C'), (pyio.open, 'Py')]: + class TestCase(LargeFileTest): + pass + TestCase.open = staticmethod(_open) + TestCase.__name__ = prefix + LargeFileTest.__name__ + suite.addTest(TestCase('test_seek')) + suite.addTest(TestCase('test_osstat')) + suite.addTest(TestCase('test_seek_read')) + suite.addTest(TestCase('test_lseek')) + with _open(TESTFN, 'wb') as f: + if hasattr(f, 'truncate'): + suite.addTest(TestCase('test_truncate')) + suite.addTest(TestCase('test_seekable')) + unlink(TESTFN) try: run_unittest(suite) finally: unlink(TESTFN) - if __name__ == '__main__': test_main() diff --git a/Lib/test/test_memoryio.py b/Lib/test/test_memoryio.py index 0b5ec9f..d1281b4 100644 --- a/Lib/test/test_memoryio.py +++ b/Lib/test/test_memoryio.py @@ -4,23 +4,66 @@ BytesIO -- for bytes """ from __future__ import unicode_literals +from __future__ import print_function import unittest -from test import test_support +from test import test_support as support import io +import _pyio as pyio import sys -import array -try: - import _bytesio - has_c_implementation = True -except ImportError: - has_c_implementation = False +class MemorySeekTestMixin: + + def testInit(self): + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) + + def testRead(self): + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) + + self.assertEquals(buf[:1], bytesIo.read(1)) + self.assertEquals(buf[1:5], bytesIo.read(4)) + self.assertEquals(buf[5:], bytesIo.read(900)) + self.assertEquals(self.EOF, bytesIo.read()) + + def testReadNoArgs(self): + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) + + self.assertEquals(buf, bytesIo.read()) + self.assertEquals(self.EOF, bytesIo.read()) + + def testSeek(self): + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) + + bytesIo.read(5) + bytesIo.seek(0) + self.assertEquals(buf, bytesIo.read()) + + bytesIo.seek(3) + self.assertEquals(buf[3:], bytesIo.read()) + self.assertRaises(TypeError, bytesIo.seek, 0.0) + + def testTell(self): + buf = self.buftype("1234567890") + bytesIo = self.ioclass(buf) + + self.assertEquals(0, bytesIo.tell()) + bytesIo.seek(5) + self.assertEquals(5, bytesIo.tell()) + bytesIo.seek(10000) + self.assertEquals(10000, bytesIo.tell()) class MemoryTestMixin: + def test_detach(self): + buf = self.ioclass() + self.assertRaises(self.UnsupportedOperation, buf.detach) + def write_ops(self, f, t): self.assertEqual(f.write(t("blah.")), 5) self.assertEqual(f.seek(0), 0) @@ -151,7 +194,7 @@ class MemoryTestMixin: self.assertEqual(memio.readline(), self.EOF) memio.seek(0) self.assertEqual(type(memio.readline()), type(buf)) - self.assertEqual(memio.readline(None), buf) + self.assertEqual(memio.readline(), buf) self.assertRaises(TypeError, memio.readline, '') memio.close() self.assertRaises(ValueError, memio.readline) @@ -197,7 +240,7 @@ class MemoryTestMixin: self.assertEqual(i, 10) memio = self.ioclass(buf * 2) memio.close() - self.assertRaises(ValueError, memio.next) + self.assertRaises(ValueError, next, memio) def test_getvalue(self): buf = self.buftype("1234567890") @@ -299,11 +342,14 @@ class MemoryTestMixin: self.assertEqual(test2(), buf) -class PyBytesIOTest(MemoryTestMixin, unittest.TestCase): +class PyBytesIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase): + + UnsupportedOperation = pyio.UnsupportedOperation + @staticmethod def buftype(s): return s.encode("ascii") - ioclass = io._BytesIO + ioclass = pyio.BytesIO EOF = b"" def test_read1(self): @@ -333,7 +379,8 @@ class PyBytesIOTest(MemoryTestMixin, unittest.TestCase): self.assertEqual(memio.readinto(b), 0) self.assertEqual(b, b"") self.assertRaises(TypeError, memio.readinto, '') - a = array.array(b'b', map(ord, b"hello world")) + import array + a = array.array(b'b', b"hello world") memio = self.ioclass(buf) memio.readinto(a) self.assertEqual(a.tostring(), b"1234567890d") @@ -365,19 +412,41 @@ class PyBytesIOTest(MemoryTestMixin, unittest.TestCase): def test_bytes_array(self): buf = b"1234567890" - - a = array.array(b'b', map(ord, buf)) + import array + a = array.array(b'b', buf) memio = self.ioclass(a) self.assertEqual(memio.getvalue(), buf) self.assertEqual(memio.write(a), 10) self.assertEqual(memio.getvalue(), buf) -class PyStringIOTest(MemoryTestMixin, unittest.TestCase): +class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase): buftype = unicode - ioclass = io.StringIO + ioclass = pyio.StringIO + UnsupportedOperation = pyio.UnsupportedOperation EOF = "" + # TextIO-specific behaviour. + + def test_newlines_property(self): + memio = self.ioclass(newline=None) + # The C StringIO decodes newlines in write() calls, but the Python + # implementation only does when reading. This function forces them to + # be decoded for testing. + def force_decode(): + memio.seek(0) + memio.read() + self.assertEqual(memio.newlines, None) + memio.write("a\n") + force_decode() + self.assertEqual(memio.newlines, "\n") + memio.write("b\r\n") + force_decode() + self.assertEqual(memio.newlines, ("\n", "\r\n")) + memio.write("c\rd") + force_decode() + self.assertEqual(memio.newlines, ("\r", "\n", "\r\n")) + def test_relative_seek(self): memio = self.ioclass() @@ -388,29 +457,107 @@ class PyStringIOTest(MemoryTestMixin, unittest.TestCase): self.assertRaises(IOError, memio.seek, 1, 1) self.assertRaises(IOError, memio.seek, 1, 2) + def test_textio_properties(self): + memio = self.ioclass() + + # These are just dummy values but we nevertheless check them for fear + # of unexpected breakage. + self.assertIsNone(memio.encoding) + self.assertIsNone(memio.errors) + self.assertFalse(memio.line_buffering) + + def test_newline_none(self): + # newline=None + memio = self.ioclass("a\nb\r\nc\rd", newline=None) + self.assertEqual(list(memio), ["a\n", "b\n", "c\n", "d"]) + memio.seek(0) + self.assertEqual(memio.read(1), "a") + self.assertEqual(memio.read(2), "\nb") + self.assertEqual(memio.read(2), "\nc") + self.assertEqual(memio.read(1), "\n") + memio = self.ioclass(newline=None) + self.assertEqual(2, memio.write("a\n")) + self.assertEqual(3, memio.write("b\r\n")) + self.assertEqual(3, memio.write("c\rd")) + memio.seek(0) + self.assertEqual(memio.read(), "a\nb\nc\nd") + memio = self.ioclass("a\r\nb", newline=None) + self.assertEqual(memio.read(3), "a\nb") + + def test_newline_empty(self): + # newline="" + memio = self.ioclass("a\nb\r\nc\rd", newline="") + self.assertEqual(list(memio), ["a\n", "b\r\n", "c\r", "d"]) + memio.seek(0) + self.assertEqual(memio.read(4), "a\nb\r") + self.assertEqual(memio.read(2), "\nc") + self.assertEqual(memio.read(1), "\r") + memio = self.ioclass(newline="") + self.assertEqual(2, memio.write("a\n")) + self.assertEqual(2, memio.write("b\r")) + self.assertEqual(2, memio.write("\nc")) + self.assertEqual(2, memio.write("\rd")) + memio.seek(0) + self.assertEqual(list(memio), ["a\n", "b\r\n", "c\r", "d"]) + + def test_newline_lf(self): + # newline="\n" + memio = self.ioclass("a\nb\r\nc\rd") + self.assertEqual(list(memio), ["a\n", "b\r\n", "c\rd"]) + + def test_newline_cr(self): + # newline="\r" + memio = self.ioclass("a\nb\r\nc\rd", newline="\r") + memio.seek(0) + self.assertEqual(memio.read(), "a\rb\r\rc\rd") + memio.seek(0) + self.assertEqual(list(memio), ["a\r", "b\r", "\r", "c\r", "d"]) + + def test_newline_crlf(self): + # newline="\r\n" + memio = self.ioclass("a\nb\r\nc\rd", newline="\r\n") + memio.seek(0) + self.assertEqual(memio.read(), "a\r\nb\r\r\nc\rd") + memio.seek(0) + self.assertEqual(list(memio), ["a\r\n", "b\r\r\n", "c\rd"]) + + def test_issue5265(self): + # StringIO can duplicate newlines in universal newlines mode + memio = self.ioclass("a\r\nb\r\n", newline=None) + self.assertEqual(memio.read(5), "a\nb\n") + + +class CBytesIOTest(PyBytesIOTest): + ioclass = io.BytesIO + UnsupportedOperation = io.UnsupportedOperation + + test_bytes_array = unittest.skip( + "array.array() does not have the new buffer API" + )(PyBytesIOTest.test_bytes_array) + + +class CStringIOTest(PyStringIOTest): + ioclass = io.StringIO + UnsupportedOperation = io.UnsupportedOperation + # XXX: For the Python version of io.StringIO, this is highly # dependent on the encoding used for the underlying buffer. - # def test_widechar(self): - # buf = self.buftype("\U0002030a\U00020347") - # memio = self.ioclass(buf) - # - # self.assertEqual(memio.getvalue(), buf) - # self.assertEqual(memio.write(buf), len(buf)) - # self.assertEqual(memio.tell(), len(buf)) - # self.assertEqual(memio.getvalue(), buf) - # self.assertEqual(memio.write(buf), len(buf)) - # self.assertEqual(memio.tell(), len(buf) * 2) - # self.assertEqual(memio.getvalue(), buf + buf) - -if has_c_implementation: - class CBytesIOTest(PyBytesIOTest): - ioclass = io.BytesIO + def test_widechar(self): + buf = self.buftype("\U0002030a\U00020347") + memio = self.ioclass(buf) + + self.assertEqual(memio.getvalue(), buf) + self.assertEqual(memio.write(buf), len(buf)) + self.assertEqual(memio.tell(), len(buf)) + self.assertEqual(memio.getvalue(), buf) + self.assertEqual(memio.write(buf), len(buf)) + self.assertEqual(memio.tell(), len(buf) * 2) + self.assertEqual(memio.getvalue(), buf + buf) + def test_main(): - tests = [PyBytesIOTest, PyStringIOTest] - if has_c_implementation: - tests.extend([CBytesIOTest]) - test_support.run_unittest(*tests) + tests = [PyBytesIOTest, PyStringIOTest, CBytesIOTest, CStringIOTest] + support.run_unittest(*tests) if __name__ == '__main__': test_main() diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 6f7b239..dd1f005 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -29,7 +29,7 @@ __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", "threading_cleanup", "reap_children", "cpython_only", - "check_impl_detail", "get_attribute"] + "check_impl_detail", "get_attribute", "py3k_bytes"] class Error(Exception): """Base class for regression test exceptions.""" @@ -968,3 +968,18 @@ def reap_children(): break except: break + +def py3k_bytes(b): + """Emulate the py3k bytes() constructor. + + NOTE: This is only a best effort function. + """ + try: + # memoryview? + return b.tobytes() + except AttributeError: + try: + # iterable of ints? + return b"".join(chr(x) for x in b) + except TypeError: + return bytes(b) diff --git a/Lib/test/test_univnewlines.py b/Lib/test/test_univnewlines.py index 63c6fe8..1f7352a 100644 --- a/Lib/test/test_univnewlines.py +++ b/Lib/test/test_univnewlines.py @@ -1,20 +1,25 @@ # Tests universal newline support for both reading and parsing files. + +from __future__ import print_function +from __future__ import unicode_literals + +import io +import _pyio as pyio import unittest import os import sys -from test import test_support +from test import test_support as support if not hasattr(sys.stdin, 'newlines'): - raise unittest.SkipTest, \ - "This Python does not have universal newline support" + raise unittest.SkipTest( + "This Python does not have universal newline support") FATX = 'x' * (2**14) DATA_TEMPLATE = [ "line1=1", - "line2='this is a very long line designed to go past the magic " + - "hundred character limit that is inside fileobject.c and which " + - "is meant to speed up the common case, but we also want to test " + + "line2='this is a very long line designed to go past any default " + + "buffer limits that exist in io.py but we also want to test " + "the uncommon case, naturally.'", "def line3():pass", "line4 = '%s'" % FATX, @@ -28,48 +33,50 @@ DATA_CRLF = "\r\n".join(DATA_TEMPLATE) + "\r\n" # before end-of-file. DATA_MIXED = "\n".join(DATA_TEMPLATE) + "\r" DATA_SPLIT = [x + "\n" for x in DATA_TEMPLATE] -del x class TestGenericUnivNewlines(unittest.TestCase): # use a class variable DATA to define the data to write to the file # and a class variable NEWLINE to set the expected newlines value - READMODE = 'U' + READMODE = 'r' WRITEMODE = 'wb' def setUp(self): - with open(test_support.TESTFN, self.WRITEMODE) as fp: - fp.write(self.DATA) + data = self.DATA + if "b" in self.WRITEMODE: + data = data.encode("ascii") + with self.open(support.TESTFN, self.WRITEMODE) as fp: + fp.write(data) def tearDown(self): try: - os.unlink(test_support.TESTFN) + os.unlink(support.TESTFN) except: pass def test_read(self): - with open(test_support.TESTFN, self.READMODE) as fp: + with self.open(support.TESTFN, self.READMODE) as fp: data = fp.read() self.assertEqual(data, DATA_LF) - self.assertEqual(repr(fp.newlines), repr(self.NEWLINE)) + self.assertEqual(set(fp.newlines), set(self.NEWLINE)) def test_readlines(self): - with open(test_support.TESTFN, self.READMODE) as fp: + with self.open(support.TESTFN, self.READMODE) as fp: data = fp.readlines() self.assertEqual(data, DATA_SPLIT) - self.assertEqual(repr(fp.newlines), repr(self.NEWLINE)) + self.assertEqual(set(fp.newlines), set(self.NEWLINE)) def test_readline(self): - with open(test_support.TESTFN, self.READMODE) as fp: + with self.open(support.TESTFN, self.READMODE) as fp: data = [] d = fp.readline() while d: data.append(d) d = fp.readline() self.assertEqual(data, DATA_SPLIT) - self.assertEqual(repr(fp.newlines), repr(self.NEWLINE)) + self.assertEqual(set(fp.newlines), set(self.NEWLINE)) def test_seek(self): - with open(test_support.TESTFN, self.READMODE) as fp: + with self.open(support.TESTFN, self.READMODE) as fp: fp.readline() pos = fp.tell() data = fp.readlines() @@ -78,19 +85,6 @@ class TestGenericUnivNewlines(unittest.TestCase): data = fp.readlines() self.assertEqual(data, DATA_SPLIT[1:]) - def test_execfile(self): - namespace = {} - execfile(test_support.TESTFN, namespace) - func = namespace['line3'] - self.assertEqual(func.func_code.co_firstlineno, 3) - self.assertEqual(namespace['line4'], FATX) - - -class TestNativeNewlines(TestGenericUnivNewlines): - NEWLINE = None - DATA = DATA_LF - READMODE = 'r' - WRITEMODE = 'w' class TestCRNewlines(TestGenericUnivNewlines): NEWLINE = '\r' @@ -105,7 +99,7 @@ class TestCRLFNewlines(TestGenericUnivNewlines): DATA = DATA_CRLF def test_tell(self): - with open(test_support.TESTFN, self.READMODE) as fp: + with self.open(support.TESTFN, self.READMODE) as fp: self.assertEqual(repr(fp.newlines), repr(None)) data = fp.readline() pos = fp.tell() @@ -117,13 +111,22 @@ class TestMixedNewlines(TestGenericUnivNewlines): def test_main(): - test_support.run_unittest( - TestNativeNewlines, - TestCRNewlines, - TestLFNewlines, - TestCRLFNewlines, - TestMixedNewlines - ) + base_tests = (TestCRNewlines, + TestLFNewlines, + TestCRLFNewlines, + TestMixedNewlines) + tests = [] + # Test the C and Python implementations. + for test in base_tests: + class CTest(test): + open = io.open + CTest.__name__ = str("C" + test.__name__) + class PyTest(test): + open = staticmethod(pyio.open) + PyTest.__name__ = str("Py" + test.__name__) + tests.append(CTest) + tests.append(PyTest) + support.run_unittest(*tests) if __name__ == '__main__': test_main() |