summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py66
1 files changed, 46 insertions, 20 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index ea82cea..96258b4 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -49,7 +49,7 @@ except ImportError:
def _default_chunk_size():
"""Get the default TextIOWrapper chunk size"""
- with open(__file__, "r", encoding="latin1") as f:
+ with open(__file__, "r", encoding="latin-1") as f:
return f._CHUNK_SIZE
@@ -634,6 +634,15 @@ class IOTest(unittest.TestCase):
for obj in test:
self.assertTrue(hasattr(obj, "__dict__"))
+ def test_opener(self):
+ with self.open(support.TESTFN, "w") as f:
+ f.write("egg\n")
+ fd = os.open(support.TESTFN, os.O_RDONLY)
+ def opener(path, flags):
+ return fd
+ with self.open("non-existent", "r", opener=opener) as f:
+ self.assertEqual(f.read(), "egg\n")
+
class CIOTest(IOTest):
def test_IOBase_finalize(self):
@@ -835,6 +844,12 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
self.assertEqual(b, b"gf")
self.assertEqual(bufio.readinto(b), 0)
self.assertEqual(b, b"gf")
+ rawio = self.MockRawIO((b"abc", None))
+ bufio = self.tp(rawio)
+ self.assertEqual(bufio.readinto(b), 2)
+ self.assertEqual(b, b"ab")
+ self.assertEqual(bufio.readinto(b), 1)
+ self.assertEqual(b, b"cb")
def test_readlines(self):
def bufio():
@@ -1802,11 +1817,11 @@ class TextIOWrapperTest(unittest.TestCase):
r = self.BytesIO(b"\xc3\xa9\n\n")
b = self.BufferedReader(r, 1000)
t = self.TextIOWrapper(b)
- t.__init__(b, encoding="latin1", newline="\r\n")
- self.assertEqual(t.encoding, "latin1")
+ t.__init__(b, encoding="latin-1", newline="\r\n")
+ self.assertEqual(t.encoding, "latin-1")
self.assertEqual(t.line_buffering, False)
- t.__init__(b, encoding="utf8", line_buffering=True)
- self.assertEqual(t.encoding, "utf8")
+ t.__init__(b, encoding="utf-8", line_buffering=True)
+ self.assertEqual(t.encoding, "utf-8")
self.assertEqual(t.line_buffering, True)
self.assertEqual("\xe9\n", t.readline())
self.assertRaises(TypeError, t.__init__, b, newline=42)
@@ -1856,8 +1871,8 @@ class TextIOWrapperTest(unittest.TestCase):
def test_encoding(self):
# Check the encoding attribute is always set, and valid
b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="utf8")
- self.assertEqual(t.encoding, "utf8")
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ self.assertEqual(t.encoding, "utf-8")
t = self.TextIOWrapper(b)
self.assertTrue(t.encoding is not None)
codecs.lookup(t.encoding)
@@ -1950,8 +1965,8 @@ class TextIOWrapperTest(unittest.TestCase):
testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
for newline, expected in [
- (None, normalized.decode("ascii").splitlines(True)),
- ("", testdata.decode("ascii").splitlines(True)),
+ (None, normalized.decode("ascii").splitlines(keepends=True)),
+ ("", testdata.decode("ascii").splitlines(keepends=True)),
("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
@@ -2036,7 +2051,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_basic_io(self):
for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
- for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
+ for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
f = self.open(support.TESTFN, "w+", encoding=enc)
f._CHUNK_SIZE = chunksize
self.assertEqual(f.write("abc"), 3)
@@ -2086,7 +2101,7 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(rlines, wlines)
def test_telling(self):
- f = self.open(support.TESTFN, "w+", encoding="utf8")
+ f = self.open(support.TESTFN, "w+", encoding="utf-8")
p0 = f.tell()
f.write("\xff\n")
p1 = f.tell()
@@ -2332,6 +2347,7 @@ class TextIOWrapperTest(unittest.TestCase):
with self.open(support.TESTFN, "w", errors="replace") as f:
self.assertEqual(f.errors, "replace")
+ @support.no_tracing
@unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
@@ -2649,12 +2665,6 @@ class MiscIOTest(unittest.TestCase):
def test_blockingioerror(self):
# Various BlockingIOError issues
- self.assertRaises(TypeError, self.BlockingIOError)
- self.assertRaises(TypeError, self.BlockingIOError, 1)
- self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
- self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
- b = self.BlockingIOError(1, "")
- self.assertEqual(b.characters_written, 0)
class C(str):
pass
c = C("")
@@ -2796,6 +2806,7 @@ class MiscIOTest(unittest.TestCase):
except self.BlockingIOError as e:
self.assertEqual(e.args[0], errno.EAGAIN)
+ self.assertEqual(e.args[2], e.characters_written)
sent[-1] = sent[-1][:e.characters_written]
received.append(rf.read())
msg = b'BLOCKED'
@@ -2808,6 +2819,7 @@ class MiscIOTest(unittest.TestCase):
break
except self.BlockingIOError as e:
self.assertEqual(e.args[0], errno.EAGAIN)
+ self.assertEqual(e.args[2], e.characters_written)
self.assertEqual(e.characters_written, 0)
received.append(rf.read())
@@ -2818,6 +2830,19 @@ class MiscIOTest(unittest.TestCase):
self.assertTrue(wf.closed)
self.assertTrue(rf.closed)
+ def test_create_fail(self):
+ # 'x' mode fails if file is existing
+ with self.open(support.TESTFN, 'w'):
+ pass
+ self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
+
+ def test_create_writes(self):
+ # 'x' mode opens for writing
+ with self.open(support.TESTFN, 'xb') as f:
+ f.write(b"spam")
+ with self.open(support.TESTFN, 'rb') as f:
+ self.assertEqual(b"spam", f.read())
+
class CMiscIOTest(MiscIOTest):
io = io
@@ -2838,14 +2863,14 @@ class SignalsTest(unittest.TestCase):
1/0
@unittest.skipUnless(threading, 'Threading required for this test.')
- @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
- 'issue #12429: skip test on FreeBSD <= 7')
def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
"""Check that a partial write, when it gets interrupted, properly
invokes the signal handler, and bubbles up the exception raised
in the latter."""
read_results = []
def _read():
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
s = os.read(r, 1)
read_results.append(s)
t = threading.Thread(target=_read)
@@ -2863,7 +2888,7 @@ class SignalsTest(unittest.TestCase):
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
self.assertRaises(ZeroDivisionError,
- wio.write, item * (1024 * 1024))
+ wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
t.join()
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
@@ -2890,6 +2915,7 @@ class SignalsTest(unittest.TestCase):
def test_interrupted_write_text(self):
self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
+ @support.no_tracing
def check_reentrant_write(self, data, **fdopen_kwargs):
def on_alarm(*args):
# Will be called reentrantly from the same thread