diff options
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 146 |
1 files changed, 111 insertions, 35 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 8ef314e..64c66d88 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -19,21 +19,22 @@ # test both implementations. This file has lots of examples. ################################################################################ +import abc +import array +import errno +import locale import os +import pickle +import random +import signal import sys import time -import array -import random import unittest -import weakref -import abc -import signal -import errno import warnings -import pickle +import weakref import _testcapi -from itertools import cycle, count from collections import deque, UserList +from itertools import cycle, count from test import support import codecs @@ -50,7 +51,7 @@ except ImportError: def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" - with open(__file__, "r", encoding="latin1") as f: + with open(__file__, "r", encoding="latin-1") as f: return f._CHUNK_SIZE @@ -603,6 +604,7 @@ class IOTest(unittest.TestCase): raise IOError() f.flush = bad_flush self.assertRaises(IOError, f.close) # exception not swallowed + self.assertTrue(f.closed) def test_multi_close(self): f = self.open(support.TESTFN, "wb", buffering=0) @@ -635,6 +637,15 @@ class IOTest(unittest.TestCase): for obj in test: self.assertTrue(hasattr(obj, "__dict__")) + def test_opener(self): + with self.open(support.TESTFN, "w") as f: + f.write("egg\n") + fd = os.open(support.TESTFN, os.O_RDONLY) + def opener(path, flags): + return fd + with self.open("non-existent", "r", opener=opener) as f: + self.assertEqual(f.read(), "egg\n") + def test_fileio_closefd(self): # Issue #4841 with self.open(__file__, 'rb') as f1, \ @@ -697,7 +708,7 @@ class CommonBufferedTests: bufio = self.tp(rawio) # Invalid whence self.assertRaises(ValueError, bufio.seek, 0, -1) - self.assertRaises(ValueError, bufio.seek, 0, 3) + self.assertRaises(ValueError, bufio.seek, 0, 9) def test_override_destructor(self): tp = self.tp @@ -771,6 +782,22 @@ class CommonBufferedTests: raw.flush = bad_flush b = self.tp(raw) self.assertRaises(IOError, b.close) # exception not swallowed + self.assertTrue(b.closed) + + def test_close_error_on_close(self): + raw = self.MockRawIO() + def bad_flush(): + raise IOError('flush') + def bad_close(): + raise IOError('close') + raw.close = bad_close + b = self.tp(raw) + b.flush = bad_flush + with self.assertRaises(IOError) as err: # exception not swallowed + b.close() + self.assertEqual(err.exception.args, ('close',)) + self.assertEqual(err.exception.__context__.args, ('flush',)) + self.assertFalse(b.closed) def test_multi_close(self): raw = self.MockRawIO() @@ -863,6 +890,12 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(b, b"gf") self.assertEqual(bufio.readinto(b), 0) self.assertEqual(b, b"gf") + rawio = self.MockRawIO((b"abc", None)) + bufio = self.tp(rawio) + self.assertEqual(bufio.readinto(b), 2) + self.assertEqual(b, b"ab") + self.assertEqual(bufio.readinto(b), 1) + self.assertEqual(b, b"cb") def test_readlines(self): def bufio(): @@ -1283,11 +1316,20 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): self.assertRaises(IOError, bufio.tell) self.assertRaises(IOError, bufio.write, b"abcdef") - def test_max_buffer_size_deprecation(self): - with support.check_warnings(("max_buffer_size is deprecated", - DeprecationWarning)): + def test_max_buffer_size_removal(self): + with self.assertRaises(TypeError): self.tp(self.MockRawIO(), 8, 12) + def test_write_error_on_close(self): + raw = self.MockRawIO() + def bad_write(b): + raise IOError() + raw.write = bad_write + b = self.tp(raw) + b.write(b'spam') + self.assertRaises(IOError, b.close) # exception not swallowed + self.assertTrue(b.closed) + class CBufferedWriterTest(BufferedWriterTest, SizeofTest): tp = io.BufferedWriter @@ -1346,9 +1388,8 @@ class BufferedRWPairTest(unittest.TestCase): pair = self.tp(self.MockRawIO(), self.MockRawIO()) self.assertRaises(self.UnsupportedOperation, pair.detach) - def test_constructor_max_buffer_size_deprecation(self): - with support.check_warnings(("max_buffer_size is deprecated", - DeprecationWarning)): + def test_constructor_max_buffer_size_removal(self): + with self.assertRaises(TypeError): self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) def test_constructor_with_not_readable(self): @@ -1871,11 +1912,11 @@ class TextIOWrapperTest(unittest.TestCase): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) t = self.TextIOWrapper(b) - t.__init__(b, encoding="latin1", newline="\r\n") - self.assertEqual(t.encoding, "latin1") + t.__init__(b, encoding="latin-1", newline="\r\n") + self.assertEqual(t.encoding, "latin-1") self.assertEqual(t.line_buffering, False) - t.__init__(b, encoding="utf8", line_buffering=True) - self.assertEqual(t.encoding, "utf8") + t.__init__(b, encoding="utf-8", line_buffering=True) + self.assertEqual(t.encoding, "utf-8") self.assertEqual(t.line_buffering, True) self.assertEqual("\xe9\n", t.readline()) self.assertRaises(TypeError, t.__init__, b, newline=42) @@ -1922,6 +1963,24 @@ class TextIOWrapperTest(unittest.TestCase): t.write("A\rB") self.assertEqual(r.getvalue(), b"XY\nZA\rB") + def test_default_encoding(self): + old_environ = dict(os.environ) + try: + # try to get a user preferred encoding different than the current + # locale encoding to check that TextIOWrapper() uses the current + # locale encoding and not the user preferred encoding + for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): + if key in os.environ: + del os.environ[key] + + current_locale_encoding = locale.getpreferredencoding(False) + b = self.BytesIO() + t = self.TextIOWrapper(b) + self.assertEqual(t.encoding, current_locale_encoding) + finally: + os.environ.clear() + os.environ.update(old_environ) + # Issue 15989 def test_device_encoding(self): b = self.BytesIO() @@ -1933,8 +1992,8 @@ class TextIOWrapperTest(unittest.TestCase): def test_encoding(self): # Check the encoding attribute is always set, and valid b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="utf8") - self.assertEqual(t.encoding, "utf8") + t = self.TextIOWrapper(b, encoding="utf-8") + self.assertEqual(t.encoding, "utf-8") t = self.TextIOWrapper(b) self.assertTrue(t.encoding is not None) codecs.lookup(t.encoding) @@ -2027,8 +2086,8 @@ class TextIOWrapperTest(unittest.TestCase): testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") for newline, expected in [ - (None, normalized.decode("ascii").splitlines(True)), - ("", testdata.decode("ascii").splitlines(True)), + (None, normalized.decode("ascii").splitlines(keepends=True)), + ("", testdata.decode("ascii").splitlines(keepends=True)), ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), @@ -2113,7 +2172,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): - for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": + for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": f = self.open(support.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.write("abc"), 3) @@ -2163,7 +2222,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(rlines, wlines) def test_telling(self): - f = self.open(support.TESTFN, "w+", encoding="utf8") + f = self.open(support.TESTFN, "w+", encoding="utf-8") p0 = f.tell() f.write("\xff\n") p1 = f.tell() @@ -2431,6 +2490,7 @@ class TextIOWrapperTest(unittest.TestCase): with self.open(support.TESTFN, "w", errors="replace") as f: self.assertEqual(f.errors, "replace") + @support.no_tracing @unittest.skipUnless(threading, 'Threading required for this test.') def test_threads_write(self): # Issue6750: concurrent writes could duplicate data @@ -2459,6 +2519,7 @@ class TextIOWrapperTest(unittest.TestCase): raise IOError() txt.flush = bad_flush self.assertRaises(IOError, txt.close) # exception not swallowed + self.assertTrue(txt.closed) def test_multi_close(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") @@ -2772,12 +2833,6 @@ class MiscIOTest(unittest.TestCase): def test_blockingioerror(self): # Various BlockingIOError issues - self.assertRaises(TypeError, self.BlockingIOError) - self.assertRaises(TypeError, self.BlockingIOError, 1) - self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) - self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) - b = self.BlockingIOError(1, "") - self.assertEqual(b.characters_written, 0) class C(str): pass c = C("") @@ -2919,6 +2974,7 @@ class MiscIOTest(unittest.TestCase): except self.BlockingIOError as e: self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.args[2], e.characters_written) sent[-1] = sent[-1][:e.characters_written] received.append(rf.read()) msg = b'BLOCKED' @@ -2931,6 +2987,7 @@ class MiscIOTest(unittest.TestCase): break except self.BlockingIOError as e: self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.args[2], e.characters_written) self.assertEqual(e.characters_written, 0) received.append(rf.read()) @@ -2941,6 +2998,24 @@ class MiscIOTest(unittest.TestCase): self.assertTrue(wf.closed) self.assertTrue(rf.closed) + def test_create_fail(self): + # 'x' mode fails if file is existing + with self.open(support.TESTFN, 'w'): + pass + self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') + + def test_create_writes(self): + # 'x' mode opens for writing + with self.open(support.TESTFN, 'xb') as f: + f.write(b"spam") + with self.open(support.TESTFN, 'rb') as f: + self.assertEqual(b"spam", f.read()) + + def test_open_allargs(self): + # there used to be a buffer overflow in the parser for rawmode + self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+') + + class CMiscIOTest(MiscIOTest): io = io @@ -2961,14 +3036,14 @@ class SignalsTest(unittest.TestCase): 1/0 @unittest.skipUnless(threading, 'Threading required for this test.') - @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), - 'issue #12429: skip test on FreeBSD <= 7') def check_interrupted_write(self, item, bytes, **fdopen_kwargs): """Check that a partial write, when it gets interrupted, properly invokes the signal handler, and bubbles up the exception raised in the latter.""" read_results = [] def _read(): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) s = os.read(r, 1) read_results.append(s) t = threading.Thread(target=_read) @@ -2986,7 +3061,7 @@ class SignalsTest(unittest.TestCase): # The buffered IO layer must check for pending signal # handlers, which in this case will invoke alarm_interrupt(). self.assertRaises(ZeroDivisionError, - wio.write, item * (1024 * 1024)) + wio.write, item * (support.PIPE_MAX_SIZE // len(item))) t.join() # We got one byte, get another one and check that it isn't a # repeat of the first one. @@ -3013,6 +3088,7 @@ class SignalsTest(unittest.TestCase): def test_interrupted_write_text(self): self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") + @support.no_tracing def check_reentrant_write(self, data, **fdopen_kwargs): def on_alarm(*args): # Will be called reentrantly from the same thread |