diff options
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 66 |
1 files changed, 46 insertions, 20 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index ea82cea..96258b4 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -49,7 +49,7 @@ except ImportError: def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" - with open(__file__, "r", encoding="latin1") as f: + with open(__file__, "r", encoding="latin-1") as f: return f._CHUNK_SIZE @@ -634,6 +634,15 @@ class IOTest(unittest.TestCase): for obj in test: self.assertTrue(hasattr(obj, "__dict__")) + def test_opener(self): + with self.open(support.TESTFN, "w") as f: + f.write("egg\n") + fd = os.open(support.TESTFN, os.O_RDONLY) + def opener(path, flags): + return fd + with self.open("non-existent", "r", opener=opener) as f: + self.assertEqual(f.read(), "egg\n") + class CIOTest(IOTest): def test_IOBase_finalize(self): @@ -835,6 +844,12 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): self.assertEqual(b, b"gf") self.assertEqual(bufio.readinto(b), 0) self.assertEqual(b, b"gf") + rawio = self.MockRawIO((b"abc", None)) + bufio = self.tp(rawio) + self.assertEqual(bufio.readinto(b), 2) + self.assertEqual(b, b"ab") + self.assertEqual(bufio.readinto(b), 1) + self.assertEqual(b, b"cb") def test_readlines(self): def bufio(): @@ -1802,11 +1817,11 @@ class TextIOWrapperTest(unittest.TestCase): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) t = self.TextIOWrapper(b) - t.__init__(b, encoding="latin1", newline="\r\n") - self.assertEqual(t.encoding, "latin1") + t.__init__(b, encoding="latin-1", newline="\r\n") + self.assertEqual(t.encoding, "latin-1") self.assertEqual(t.line_buffering, False) - t.__init__(b, encoding="utf8", line_buffering=True) - self.assertEqual(t.encoding, "utf8") + t.__init__(b, encoding="utf-8", line_buffering=True) + self.assertEqual(t.encoding, "utf-8") self.assertEqual(t.line_buffering, True) self.assertEqual("\xe9\n", t.readline()) self.assertRaises(TypeError, t.__init__, b, newline=42) @@ -1856,8 +1871,8 @@ class TextIOWrapperTest(unittest.TestCase): def test_encoding(self): # Check the encoding attribute is always set, and valid b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="utf8") - self.assertEqual(t.encoding, "utf8") + t = self.TextIOWrapper(b, encoding="utf-8") + self.assertEqual(t.encoding, "utf-8") t = self.TextIOWrapper(b) self.assertTrue(t.encoding is not None) codecs.lookup(t.encoding) @@ -1950,8 +1965,8 @@ class TextIOWrapperTest(unittest.TestCase): testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") for newline, expected in [ - (None, normalized.decode("ascii").splitlines(True)), - ("", testdata.decode("ascii").splitlines(True)), + (None, normalized.decode("ascii").splitlines(keepends=True)), + ("", testdata.decode("ascii").splitlines(keepends=True)), ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), @@ -2036,7 +2051,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): - for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": + for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": f = self.open(support.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.write("abc"), 3) @@ -2086,7 +2101,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(rlines, wlines) def test_telling(self): - f = self.open(support.TESTFN, "w+", encoding="utf8") + f = self.open(support.TESTFN, "w+", encoding="utf-8") p0 = f.tell() f.write("\xff\n") p1 = f.tell() @@ -2332,6 +2347,7 @@ class TextIOWrapperTest(unittest.TestCase): with self.open(support.TESTFN, "w", errors="replace") as f: self.assertEqual(f.errors, "replace") + @support.no_tracing @unittest.skipUnless(threading, 'Threading required for this test.') def test_threads_write(self): # Issue6750: concurrent writes could duplicate data @@ -2649,12 +2665,6 @@ class MiscIOTest(unittest.TestCase): def test_blockingioerror(self): # Various BlockingIOError issues - self.assertRaises(TypeError, self.BlockingIOError) - self.assertRaises(TypeError, self.BlockingIOError, 1) - self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) - self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) - b = self.BlockingIOError(1, "") - self.assertEqual(b.characters_written, 0) class C(str): pass c = C("") @@ -2796,6 +2806,7 @@ class MiscIOTest(unittest.TestCase): except self.BlockingIOError as e: self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.args[2], e.characters_written) sent[-1] = sent[-1][:e.characters_written] received.append(rf.read()) msg = b'BLOCKED' @@ -2808,6 +2819,7 @@ class MiscIOTest(unittest.TestCase): break except self.BlockingIOError as e: self.assertEqual(e.args[0], errno.EAGAIN) + self.assertEqual(e.args[2], e.characters_written) self.assertEqual(e.characters_written, 0) received.append(rf.read()) @@ -2818,6 +2830,19 @@ class MiscIOTest(unittest.TestCase): self.assertTrue(wf.closed) self.assertTrue(rf.closed) + def test_create_fail(self): + # 'x' mode fails if file is existing + with self.open(support.TESTFN, 'w'): + pass + self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') + + def test_create_writes(self): + # 'x' mode opens for writing + with self.open(support.TESTFN, 'xb') as f: + f.write(b"spam") + with self.open(support.TESTFN, 'rb') as f: + self.assertEqual(b"spam", f.read()) + class CMiscIOTest(MiscIOTest): io = io @@ -2838,14 +2863,14 @@ class SignalsTest(unittest.TestCase): 1/0 @unittest.skipUnless(threading, 'Threading required for this test.') - @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), - 'issue #12429: skip test on FreeBSD <= 7') def check_interrupted_write(self, item, bytes, **fdopen_kwargs): """Check that a partial write, when it gets interrupted, properly invokes the signal handler, and bubbles up the exception raised in the latter.""" read_results = [] def _read(): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) s = os.read(r, 1) read_results.append(s) t = threading.Thread(target=_read) @@ -2863,7 +2888,7 @@ class SignalsTest(unittest.TestCase): # The buffered IO layer must check for pending signal # handlers, which in this case will invoke alarm_interrupt(). self.assertRaises(ZeroDivisionError, - wio.write, item * (1024 * 1024)) + wio.write, item * (support.PIPE_MAX_SIZE // len(item))) t.join() # We got one byte, get another one and check that it isn't a # repeat of the first one. @@ -2890,6 +2915,7 @@ class SignalsTest(unittest.TestCase): def test_interrupted_write_text(self): self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") + @support.no_tracing def check_reentrant_write(self, data, **fdopen_kwargs): def on_alarm(*args): # Will be called reentrantly from the same thread |