diff options
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 103 |
1 files changed, 74 insertions, 29 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 28b4f06..dfc7c69 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -19,20 +19,21 @@ # test both implementations. This file has lots of examples. ################################################################################ +import abc +import array +import errno +import locale import os +import pickle +import random +import signal import sys import time -import array -import random import unittest -import weakref -import abc -import signal -import errno import warnings -import pickle -from itertools import cycle, count +import weakref from collections import deque +from itertools import cycle, count from test import support import codecs @@ -49,7 +50,7 @@ except ImportError: def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" - with open(__file__, "r", encoding="latin1") as f: + with open(__file__, "r", encoding="latin-1") as f: return f._CHUNK_SIZE @@ -634,6 +635,15 @@ class IOTest(unittest.TestCase): for obj in test: self.assertTrue(hasattr(obj, "__dict__")) + def test_opener(self): + with self.open(support.TESTFN, "w") as f: + f.write("egg\n") + fd = os.open(support.TESTFN, os.O_RDONLY) + def opener(path, flags): + return fd + with self.open("non-existent", "r", opener=opener) as f: + self.assertEqual(f.read(), "egg\n") + def test_fileio_closefd(self): # Issue #4841 with self.open(__file__, 'rb') as f1, \ @@ -696,7 +706,7 @@ class CommonBufferedTests: bufio = self.tp(rawio) # Invalid whence self.assertRaises(ValueError, bufio.seek, 0, -1) - self.assertRaises(ValueError, bufio.seek, 0, 3) + self.assertRaises(ValueError, bufio.seek, 0, 9) def test_override_destructor(self): tp = self.tp @@ -848,6 +858,12 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(b, b"gf") self.assertEqual(bufio.readinto(b), 0) self.assertEqual(b, b"gf") + rawio = self.MockRawIO((b"abc", None)) + bufio = self.tp(rawio) + self.assertEqual(bufio.readinto(b), 2) + self.assertEqual(b, b"ab") + self.assertEqual(bufio.readinto(b), 1) + self.assertEqual(b, b"cb") def test_readlines(self): def bufio(): @@ -1815,11 +1831,11 @@ class TextIOWrapperTest(unittest.TestCase): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) t = self.TextIOWrapper(b) - t.__init__(b, encoding="latin1", newline="\r\n") - self.assertEqual(t.encoding, "latin1") + t.__init__(b, encoding="latin-1", newline="\r\n") + self.assertEqual(t.encoding, "latin-1") self.assertEqual(t.line_buffering, False) - t.__init__(b, encoding="utf8", line_buffering=True) - self.assertEqual(t.encoding, "utf8") + t.__init__(b, encoding="utf-8", line_buffering=True) + self.assertEqual(t.encoding, "utf-8") self.assertEqual(t.line_buffering, True) self.assertEqual("\xe9\n", t.readline()) self.assertRaises(TypeError, t.__init__, b, newline=42) @@ -1866,11 +1882,29 @@ class TextIOWrapperTest(unittest.TestCase): t.write("A\rB") self.assertEqual(r.getvalue(), b"XY\nZA\rB") + def test_default_encoding(self): + old_environ = dict(os.environ) + try: + # try to get a user preferred encoding different than the current + # locale encoding to check that TextIOWrapper() uses the current + # locale encoding and not the user preferred encoding + for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): + if key in os.environ: + del os.environ[key] + + current_locale_encoding = locale.getpreferredencoding(False) + b = self.BytesIO() + t = self.TextIOWrapper(b) + self.assertEqual(t.encoding, current_locale_encoding) + finally: + os.environ.clear() + os.environ.update(old_environ) + def test_encoding(self): # Check the encoding attribute is always set, and valid b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="utf8") - self.assertEqual(t.encoding, "utf8") + t = self.TextIOWrapper(b, encoding="utf-8") + self.assertEqual(t.encoding, "utf-8") t = self.TextIOWrapper(b) self.assertTrue(t.encoding is not None) codecs.lookup(t.encoding) @@ -1963,8 +1997,8 @@ class TextIOWrapperTest(unittest.TestCase): testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") for newline, expected in [ - (None, normalized.decode("ascii").splitlines(True)), - ("", testdata.decode("ascii").splitlines(True)), + (None, normalized.decode("ascii").splitlines(keepends=True)), + ("", testdata.decode("ascii").splitlines(keepends=True)), ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), @@ -2049,7 +2083,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): - for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": + for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": f = self.open(support.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.write("abc"), 3) @@ -2099,7 +2133,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(rlines, wlines) def test_telling(self): - f = self.open(support.TESTFN, "w+", encoding="utf8") + f = self.open(support.TESTFN, "w+", encoding="utf-8") p0 = f.tell() f.write("\xff\n") p1 = f.tell() @@ -2345,6 +2379,7 @@ class TextIOWrapperTest(unittest.TestCase): with self.open(support.TESTFN, "w", errors="replace") as f: self.assertEqual(f.errors, "replace") + @support.no_tracing @unittest.skipUnless(threading, 'Threading required for this test.') def test_threads_write(self): # Issue6750: concurrent writes could duplicate data @@ -2662,12 +2697,6 @@ class MiscIOTest(unittest.TestCase): def test_blockingioerror(self): # Various BlockingIOError issues - self.assertRaises(TypeError, self.BlockingIOError) - self.assertRaises(TypeError, self.BlockingIOError, 1) - self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) - self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) - b = self.BlockingIOError(1, "") - self.assertEqual(b.characters_written, 0) class C(str): pass c = C("") @@ -2809,6 +2838,7 @@ class MiscIOTest(unittest.TestCase): except self.BlockingIOError as e: self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.args[2], e.characters_written) sent[-1] = sent[-1][:e.characters_written] received.append(rf.read()) msg = b'BLOCKED' @@ -2821,6 +2851,7 @@ class MiscIOTest(unittest.TestCase): break except self.BlockingIOError as e: self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.args[2], e.characters_written) self.assertEqual(e.characters_written, 0) received.append(rf.read()) @@ -2831,6 +2862,19 @@ class MiscIOTest(unittest.TestCase): self.assertTrue(wf.closed) self.assertTrue(rf.closed) + def test_create_fail(self): + # 'x' mode fails if file is existing + with self.open(support.TESTFN, 'w'): + pass + self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') + + def test_create_writes(self): + # 'x' mode opens for writing + with self.open(support.TESTFN, 'xb') as f: + f.write(b"spam") + with self.open(support.TESTFN, 'rb') as f: + self.assertEqual(b"spam", f.read()) + class CMiscIOTest(MiscIOTest): io = io @@ -2851,14 +2895,14 @@ class SignalsTest(unittest.TestCase): 1/0 @unittest.skipUnless(threading, 'Threading required for this test.') - @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), - 'issue #12429: skip test on FreeBSD <= 7') def check_interrupted_write(self, item, bytes, **fdopen_kwargs): """Check that a partial write, when it gets interrupted, properly invokes the signal handler, and bubbles up the exception raised in the latter.""" read_results = [] def _read(): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) s = os.read(r, 1) read_results.append(s) t = threading.Thread(target=_read) @@ -2876,7 +2920,7 @@ class SignalsTest(unittest.TestCase): # The buffered IO layer must check for pending signal # handlers, which in this case will invoke alarm_interrupt(). self.assertRaises(ZeroDivisionError, - wio.write, item * (1024 * 1024)) + wio.write, item * (support.PIPE_MAX_SIZE // len(item))) t.join() # We got one byte, get another one and check that it isn't a # repeat of the first one. @@ -2903,6 +2947,7 @@ class SignalsTest(unittest.TestCase): def test_interrupted_write_text(self): self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") + @support.no_tracing def check_reentrant_write(self, data, **fdopen_kwargs): def on_alarm(*args): # Will be called reentrantly from the same thread |