summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py146
1 files changed, 111 insertions, 35 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 8ef314e..64c66d88 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -19,21 +19,22 @@
# test both implementations. This file has lots of examples.
################################################################################
+import abc
+import array
+import errno
+import locale
import os
+import pickle
+import random
+import signal
import sys
import time
-import array
-import random
import unittest
-import weakref
-import abc
-import signal
-import errno
import warnings
-import pickle
+import weakref
import _testcapi
-from itertools import cycle, count
from collections import deque, UserList
+from itertools import cycle, count
from test import support
import codecs
@@ -50,7 +51,7 @@ except ImportError:
def _default_chunk_size():
"""Get the default TextIOWrapper chunk size"""
- with open(__file__, "r", encoding="latin1") as f:
+ with open(__file__, "r", encoding="latin-1") as f:
return f._CHUNK_SIZE
@@ -603,6 +604,7 @@ class IOTest(unittest.TestCase):
raise IOError()
f.flush = bad_flush
self.assertRaises(IOError, f.close) # exception not swallowed
+ self.assertTrue(f.closed)
def test_multi_close(self):
f = self.open(support.TESTFN, "wb", buffering=0)
@@ -635,6 +637,15 @@ class IOTest(unittest.TestCase):
for obj in test:
self.assertTrue(hasattr(obj, "__dict__"))
+ def test_opener(self):
+ with self.open(support.TESTFN, "w") as f:
+ f.write("egg\n")
+ fd = os.open(support.TESTFN, os.O_RDONLY)
+ def opener(path, flags):
+ return fd
+ with self.open("non-existent", "r", opener=opener) as f:
+ self.assertEqual(f.read(), "egg\n")
+
def test_fileio_closefd(self):
# Issue #4841
with self.open(__file__, 'rb') as f1, \
@@ -697,7 +708,7 @@ class CommonBufferedTests:
bufio = self.tp(rawio)
# Invalid whence
self.assertRaises(ValueError, bufio.seek, 0, -1)
- self.assertRaises(ValueError, bufio.seek, 0, 3)
+ self.assertRaises(ValueError, bufio.seek, 0, 9)
def test_override_destructor(self):
tp = self.tp
@@ -771,6 +782,22 @@ class CommonBufferedTests:
raw.flush = bad_flush
b = self.tp(raw)
self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertTrue(b.closed)
+
+ def test_close_error_on_close(self):
+ raw = self.MockRawIO()
+ def bad_flush():
+ raise IOError('flush')
+ def bad_close():
+ raise IOError('close')
+ raw.close = bad_close
+ b = self.tp(raw)
+ b.flush = bad_flush
+ with self.assertRaises(IOError) as err: # exception not swallowed
+ b.close()
+ self.assertEqual(err.exception.args, ('close',))
+ self.assertEqual(err.exception.__context__.args, ('flush',))
+ self.assertFalse(b.closed)
def test_multi_close(self):
raw = self.MockRawIO()
@@ -863,6 +890,12 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
self.assertEqual(b, b"gf")
self.assertEqual(bufio.readinto(b), 0)
self.assertEqual(b, b"gf")
+ rawio = self.MockRawIO((b"abc", None))
+ bufio = self.tp(rawio)
+ self.assertEqual(bufio.readinto(b), 2)
+ self.assertEqual(b, b"ab")
+ self.assertEqual(bufio.readinto(b), 1)
+ self.assertEqual(b, b"cb")
def test_readlines(self):
def bufio():
@@ -1283,11 +1316,20 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
self.assertRaises(IOError, bufio.tell)
self.assertRaises(IOError, bufio.write, b"abcdef")
- def test_max_buffer_size_deprecation(self):
- with support.check_warnings(("max_buffer_size is deprecated",
- DeprecationWarning)):
+ def test_max_buffer_size_removal(self):
+ with self.assertRaises(TypeError):
self.tp(self.MockRawIO(), 8, 12)
+ def test_write_error_on_close(self):
+ raw = self.MockRawIO()
+ def bad_write(b):
+ raise IOError()
+ raw.write = bad_write
+ b = self.tp(raw)
+ b.write(b'spam')
+ self.assertRaises(IOError, b.close) # exception not swallowed
+ self.assertTrue(b.closed)
+
class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
tp = io.BufferedWriter
@@ -1346,9 +1388,8 @@ class BufferedRWPairTest(unittest.TestCase):
pair = self.tp(self.MockRawIO(), self.MockRawIO())
self.assertRaises(self.UnsupportedOperation, pair.detach)
- def test_constructor_max_buffer_size_deprecation(self):
- with support.check_warnings(("max_buffer_size is deprecated",
- DeprecationWarning)):
+ def test_constructor_max_buffer_size_removal(self):
+ with self.assertRaises(TypeError):
self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
def test_constructor_with_not_readable(self):
@@ -1871,11 +1912,11 @@ class TextIOWrapperTest(unittest.TestCase):
r = self.BytesIO(b"\xc3\xa9\n\n")
b = self.BufferedReader(r, 1000)
t = self.TextIOWrapper(b)
- t.__init__(b, encoding="latin1", newline="\r\n")
- self.assertEqual(t.encoding, "latin1")
+ t.__init__(b, encoding="latin-1", newline="\r\n")
+ self.assertEqual(t.encoding, "latin-1")
self.assertEqual(t.line_buffering, False)
- t.__init__(b, encoding="utf8", line_buffering=True)
- self.assertEqual(t.encoding, "utf8")
+ t.__init__(b, encoding="utf-8", line_buffering=True)
+ self.assertEqual(t.encoding, "utf-8")
self.assertEqual(t.line_buffering, True)
self.assertEqual("\xe9\n", t.readline())
self.assertRaises(TypeError, t.__init__, b, newline=42)
@@ -1922,6 +1963,24 @@ class TextIOWrapperTest(unittest.TestCase):
t.write("A\rB")
self.assertEqual(r.getvalue(), b"XY\nZA\rB")
+ def test_default_encoding(self):
+ old_environ = dict(os.environ)
+ try:
+ # try to get a user preferred encoding different than the current
+ # locale encoding to check that TextIOWrapper() uses the current
+ # locale encoding and not the user preferred encoding
+ for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
+ if key in os.environ:
+ del os.environ[key]
+
+ current_locale_encoding = locale.getpreferredencoding(False)
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b)
+ self.assertEqual(t.encoding, current_locale_encoding)
+ finally:
+ os.environ.clear()
+ os.environ.update(old_environ)
+
# Issue 15989
def test_device_encoding(self):
b = self.BytesIO()
@@ -1933,8 +1992,8 @@ class TextIOWrapperTest(unittest.TestCase):
def test_encoding(self):
# Check the encoding attribute is always set, and valid
b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="utf8")
- self.assertEqual(t.encoding, "utf8")
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ self.assertEqual(t.encoding, "utf-8")
t = self.TextIOWrapper(b)
self.assertTrue(t.encoding is not None)
codecs.lookup(t.encoding)
@@ -2027,8 +2086,8 @@ class TextIOWrapperTest(unittest.TestCase):
testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
- (None, normalized.decode("ascii").splitlines(True)),
- ("", testdata.decode("ascii").splitlines(True)),
+ (None, normalized.decode("ascii").splitlines(keepends=True)),
+ ("", testdata.decode("ascii").splitlines(keepends=True)),
("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
@@ -2113,7 +2172,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_basic_io(self):
for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
- for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
+ for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
f = self.open(support.TESTFN, "w+", encoding=enc)
f._CHUNK_SIZE = chunksize
self.assertEqual(f.write("abc"), 3)
@@ -2163,7 +2222,7 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(rlines, wlines)
def test_telling(self):
- f = self.open(support.TESTFN, "w+", encoding="utf8")
+ f = self.open(support.TESTFN, "w+", encoding="utf-8")
p0 = f.tell()
f.write("\xff\n")
p1 = f.tell()
@@ -2431,6 +2490,7 @@ class TextIOWrapperTest(unittest.TestCase):
with self.open(support.TESTFN, "w", errors="replace") as f:
self.assertEqual(f.errors, "replace")
+ @support.no_tracing
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
@@ -2459,6 +2519,7 @@ class TextIOWrapperTest(unittest.TestCase):
raise IOError()
txt.flush = bad_flush
self.assertRaises(IOError, txt.close) # exception not swallowed
+ self.assertTrue(txt.closed)
def test_multi_close(self):
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
@@ -2772,12 +2833,6 @@ class MiscIOTest(unittest.TestCase):
def test_blockingioerror(self):
# Various BlockingIOError issues
- self.assertRaises(TypeError, self.BlockingIOError)
- self.assertRaises(TypeError, self.BlockingIOError, 1)
- self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
- self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
- b = self.BlockingIOError(1, "")
- self.assertEqual(b.characters_written, 0)
class C(str):
pass
c = C("")
@@ -2919,6 +2974,7 @@ class MiscIOTest(unittest.TestCase):
except self.BlockingIOError as e:
self.assertEqual(e.args[0], errno.EAGAIN)
+ self.assertEqual(e.args[2], e.characters_written)
sent[-1] = sent[-1][:e.characters_written]
received.append(rf.read())
msg = b'BLOCKED'
@@ -2931,6 +2987,7 @@ class MiscIOTest(unittest.TestCase):
break
except self.BlockingIOError as e:
self.assertEqual(e.args[0], errno.EAGAIN)
+ self.assertEqual(e.args[2], e.characters_written)
self.assertEqual(e.characters_written, 0)
received.append(rf.read())
@@ -2941,6 +2998,24 @@ class MiscIOTest(unittest.TestCase):
self.assertTrue(wf.closed)
self.assertTrue(rf.closed)
+ def test_create_fail(self):
+ # 'x' mode fails if file is existing
+ with self.open(support.TESTFN, 'w'):
+ pass
+ self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
+
+ def test_create_writes(self):
+ # 'x' mode opens for writing
+ with self.open(support.TESTFN, 'xb') as f:
+ f.write(b"spam")
+ with self.open(support.TESTFN, 'rb') as f:
+ self.assertEqual(b"spam", f.read())
+
+ def test_open_allargs(self):
+ # there used to be a buffer overflow in the parser for rawmode
+ self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
+
+
class CMiscIOTest(MiscIOTest):
io = io
@@ -2961,14 +3036,14 @@ class SignalsTest(unittest.TestCase):
1/0
@unittest.skipUnless(threading, 'Threading required for this test.')
- @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
- 'issue #12429: skip test on FreeBSD <= 7')
def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
"""Check that a partial write, when it gets interrupted, properly
invokes the signal handler, and bubbles up the exception raised
in the latter."""
read_results = []
def _read():
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
s = os.read(r, 1)
read_results.append(s)
t = threading.Thread(target=_read)
@@ -2986,7 +3061,7 @@ class SignalsTest(unittest.TestCase):
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
self.assertRaises(ZeroDivisionError,
- wio.write, item * (1024 * 1024))
+ wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
@@ -3013,6 +3088,7 @@ class SignalsTest(unittest.TestCase):
def test_interrupted_write_text(self):
self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
+ @support.no_tracing
def check_reentrant_write(self, data, **fdopen_kwargs):
def on_alarm(*args):
# Will be called reentrantly from the same thread