summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_io.py
diff options
context:
space:
mode:
authorHai Shi <shihai1992@gmail.com>2020-07-06 09:12:49 (GMT)
committerGitHub <noreply@github.com>2020-07-06 09:12:49 (GMT)
commit883bc638335a57a6e6a6344c2fc132c4f9a0ec42 (patch)
tree1dd74a4aeeb80d8b85ff18de700763199a7bc1bc /Lib/test/test_io.py
parent9ce8132e1f2339cfe116dfd4795574182c2245b4 (diff)
downloadcpython-883bc638335a57a6e6a6344c2fc132c4f9a0ec42.zip
cpython-883bc638335a57a6e6a6344c2fc132c4f9a0ec42.tar.gz
cpython-883bc638335a57a6e6a6344c2fc132c4f9a0ec42.tar.bz2
bpo-40275: Use new test.support helper submodules in tests (GH-21314)
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r--Lib/test/test_io.py257
1 files changed, 130 insertions, 127 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index c0d67a17..18b8a79 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -40,8 +40,11 @@ from itertools import cycle, count
from test import support
from test.support.script_helper import (
assert_python_ok, assert_python_failure, run_python_until_end)
+from test.support import import_helper
+from test.support import os_helper
from test.support import threading_helper
-from test.support import FakePath
+from test.support import warnings_helper
+from test.support.os_helper import FakePath
import codecs
import io # C implementation of io
@@ -317,10 +320,10 @@ class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
class IOTest(unittest.TestCase):
def setUp(self):
- support.unlink(support.TESTFN)
+ os_helper.unlink(os_helper.TESTFN)
def tearDown(self):
- support.unlink(support.TESTFN)
+ os_helper.unlink(os_helper.TESTFN)
def write_ops(self, f):
self.assertEqual(f.write(b"blah."), 5)
@@ -405,19 +408,19 @@ class IOTest(unittest.TestCase):
# Try writing on a file opened in read mode and vice-versa.
exc = self.UnsupportedOperation
for mode in ("w", "wb"):
- with self.open(support.TESTFN, mode) as fp:
+ with self.open(os_helper.TESTFN, mode) as fp:
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
- with self.open(support.TESTFN, "wb", buffering=0) as fp:
+ with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
- with self.open(support.TESTFN, "rb", buffering=0) as fp:
+ with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
self.assertRaises(exc, fp.write, b"blah")
self.assertRaises(exc, fp.writelines, [b"blah\n"])
- with self.open(support.TESTFN, "rb") as fp:
+ with self.open(os_helper.TESTFN, "rb") as fp:
self.assertRaises(exc, fp.write, b"blah")
self.assertRaises(exc, fp.writelines, [b"blah\n"])
- with self.open(support.TESTFN, "r") as fp:
+ with self.open(os_helper.TESTFN, "r") as fp:
self.assertRaises(exc, fp.write, "blah")
self.assertRaises(exc, fp.writelines, ["blah\n"])
# Non-zero seeking from current or end pos
@@ -538,33 +541,33 @@ class IOTest(unittest.TestCase):
self.assertRaises(ValueError, self.open, bytes_fn, 'w')
def test_raw_file_io(self):
- with self.open(support.TESTFN, "wb", buffering=0) as f:
+ with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
self.assertEqual(f.readable(), False)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
self.write_ops(f)
- with self.open(support.TESTFN, "rb", buffering=0) as f:
+ with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), False)
self.assertEqual(f.seekable(), True)
self.read_ops(f)
def test_buffered_file_io(self):
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
self.assertEqual(f.readable(), False)
self.assertEqual(f.writable(), True)
self.assertEqual(f.seekable(), True)
self.write_ops(f)
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.readable(), True)
self.assertEqual(f.writable(), False)
self.assertEqual(f.seekable(), True)
self.read_ops(f, True)
def test_readline(self):
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.readline(), b"abc\n")
self.assertEqual(f.readline(10), b"def\n")
self.assertEqual(f.readline(2), b"xy")
@@ -572,7 +575,7 @@ class IOTest(unittest.TestCase):
self.assertEqual(f.readline(), b"foo\x00bar\n")
self.assertEqual(f.readline(None), b"another line")
self.assertRaises(TypeError, f.readline, 5.3)
- with self.open(support.TESTFN, "r") as f:
+ with self.open(os_helper.TESTFN, "r") as f:
self.assertRaises(TypeError, f.readline, 5.3)
def test_readline_nonsizeable(self):
@@ -607,20 +610,20 @@ class IOTest(unittest.TestCase):
support.requires(
'largefile',
'test requires %s bytes and a long time to run' % self.LARGE)
- with self.open(support.TESTFN, "w+b", 0) as f:
+ with self.open(os_helper.TESTFN, "w+b", 0) as f:
self.large_file_ops(f)
- with self.open(support.TESTFN, "w+b") as f:
+ with self.open(os_helper.TESTFN, "w+b") as f:
self.large_file_ops(f)
def test_with_open(self):
for bufsize in (0, 100):
f = None
- with self.open(support.TESTFN, "wb", bufsize) as f:
+ with self.open(os_helper.TESTFN, "wb", bufsize) as f:
f.write(b"xxx")
self.assertEqual(f.closed, True)
f = None
try:
- with self.open(support.TESTFN, "wb", bufsize) as f:
+ with self.open(os_helper.TESTFN, "wb", bufsize) as f:
1/0
except ZeroDivisionError:
self.assertEqual(f.closed, True)
@@ -629,13 +632,13 @@ class IOTest(unittest.TestCase):
# issue 5008
def test_append_mode_tell(self):
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
f.write(b"xxx")
- with self.open(support.TESTFN, "ab", buffering=0) as f:
+ with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
self.assertEqual(f.tell(), 3)
- with self.open(support.TESTFN, "ab") as f:
+ with self.open(os_helper.TESTFN, "ab") as f:
self.assertEqual(f.tell(), 3)
- with self.open(support.TESTFN, "a") as f:
+ with self.open(os_helper.TESTFN, "a") as f:
self.assertGreater(f.tell(), 0)
def test_destructor(self):
@@ -655,13 +658,13 @@ class IOTest(unittest.TestCase):
def flush(self):
record.append(3)
super().flush()
- with support.check_warnings(('', ResourceWarning)):
- f = MyFileIO(support.TESTFN, "wb")
+ with warnings_helper.check_warnings(('', ResourceWarning)):
+ f = MyFileIO(os_helper.TESTFN, "wb")
f.write(b"xxx")
del f
support.gc_collect()
self.assertEqual(record, [1, 2, 3])
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"xxx")
def _check_base_destructor(self, base):
@@ -707,9 +710,9 @@ class IOTest(unittest.TestCase):
self._check_base_destructor(self.TextIOBase)
def test_close_flushes(self):
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
f.write(b"xxx")
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"xxx")
def test_array_writes(self):
@@ -720,25 +723,25 @@ class IOTest(unittest.TestCase):
self.assertEqual(f.write(a), n)
f.writelines((a,))
check(self.BytesIO())
- check(self.FileIO(support.TESTFN, "w"))
+ check(self.FileIO(os_helper.TESTFN, "w"))
check(self.BufferedWriter(self.MockRawIO()))
check(self.BufferedRandom(self.MockRawIO()))
check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
def test_closefd(self):
- self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
+ self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
closefd=False)
def test_read_closed(self):
- with self.open(support.TESTFN, "w") as f:
+ with self.open(os_helper.TESTFN, "w") as f:
f.write("egg\n")
- with self.open(support.TESTFN, "r") as f:
+ with self.open(os_helper.TESTFN, "r") as f:
file = self.open(f.fileno(), "r", closefd=False)
self.assertEqual(file.read(), "egg\n")
file.seek(0)
file.close()
self.assertRaises(ValueError, file.read)
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
file = self.open(f.fileno(), "rb", closefd=False)
self.assertEqual(file.read()[:3], b"egg")
file.close()
@@ -746,12 +749,12 @@ class IOTest(unittest.TestCase):
def test_no_closefd_with_filename(self):
# can't use closefd in combination with a file name
- self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
+ self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False)
def test_closefd_attr(self):
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
f.write(b"egg\n")
- with self.open(support.TESTFN, "r") as f:
+ with self.open(os_helper.TESTFN, "r") as f:
self.assertEqual(f.buffer.raw.closefd, True)
file = self.open(f.fileno(), "r", closefd=False)
self.assertEqual(file.buffer.raw.closefd, False)
@@ -759,15 +762,15 @@ class IOTest(unittest.TestCase):
def test_garbage_collection(self):
# FileIO objects are collected, and collecting them flushes
# all data to disk.
- with support.check_warnings(('', ResourceWarning)):
- f = self.FileIO(support.TESTFN, "wb")
+ with warnings_helper.check_warnings(('', ResourceWarning)):
+ f = self.FileIO(os_helper.TESTFN, "wb")
f.write(b"abcxxx")
f.f = f
wr = weakref.ref(f)
del f
support.gc_collect()
self.assertIsNone(wr(), wr)
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"abcxxx")
def test_unbounded_file(self):
@@ -804,29 +807,29 @@ class IOTest(unittest.TestCase):
def test_flush_error_on_close(self):
# raw file
# Issue #5700: io.FileIO calls flush() after file closed
- self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
- fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
+ fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
self.check_flush_error_on_close(fd, 'wb', buffering=0)
- fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
os.close(fd)
# buffered io
- self.check_flush_error_on_close(support.TESTFN, 'wb')
- fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
+ fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
self.check_flush_error_on_close(fd, 'wb')
- fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
self.check_flush_error_on_close(fd, 'wb', closefd=False)
os.close(fd)
# text io
- self.check_flush_error_on_close(support.TESTFN, 'w')
- fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ self.check_flush_error_on_close(os_helper.TESTFN, 'w')
+ fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
self.check_flush_error_on_close(fd, 'w')
- fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
+ fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
self.check_flush_error_on_close(fd, 'w', closefd=False)
os.close(fd)
def test_multi_close(self):
- f = self.open(support.TESTFN, "wb", buffering=0)
+ f = self.open(os_helper.TESTFN, "wb", buffering=0)
f.close()
f.close()
f.close()
@@ -857,9 +860,9 @@ class IOTest(unittest.TestCase):
self.assertTrue(hasattr(obj, "__dict__"))
def test_opener(self):
- with self.open(support.TESTFN, "w") as f:
+ with self.open(os_helper.TESTFN, "w") as f:
f.write("egg\n")
- fd = os.open(support.TESTFN, os.O_RDONLY)
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
def opener(path, flags):
return fd
with self.open("non-existent", "r", opener=opener) as f:
@@ -894,14 +897,14 @@ class IOTest(unittest.TestCase):
f2.readline()
def test_nonbuffered_textio(self):
- with support.check_no_resource_warning(self):
+ with warnings_helper.check_no_resource_warning(self):
with self.assertRaises(ValueError):
- self.open(support.TESTFN, 'w', buffering=0)
+ self.open(os_helper.TESTFN, 'w', buffering=0)
def test_invalid_newline(self):
- with support.check_no_resource_warning(self):
+ with warnings_helper.check_no_resource_warning(self):
with self.assertRaises(ValueError):
- self.open(support.TESTFN, 'w', newline='invalid')
+ self.open(os_helper.TESTFN, 'w', newline='invalid')
def test_buffered_readinto_mixin(self):
# Test the implementation provided by BufferedIOBase
@@ -924,10 +927,10 @@ class IOTest(unittest.TestCase):
with self.open(path, "r") as f:
self.assertEqual(f.read(), "egg\n")
- check_path_succeeds(FakePath(support.TESTFN))
- check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
+ check_path_succeeds(FakePath(os_helper.TESTFN))
+ check_path_succeeds(FakePath(os_helper.TESTFN.encode('utf-8')))
- with self.open(support.TESTFN, "w") as f:
+ with self.open(os_helper.TESTFN, "w") as f:
bad_path = FakePath(f.fileno())
with self.assertRaises(TypeError):
self.open(bad_path, 'w')
@@ -942,7 +945,7 @@ class IOTest(unittest.TestCase):
# ensure that refcounting is correct with some error conditions
with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
- self.open(FakePath(support.TESTFN), 'rwxa')
+ self.open(FakePath(os_helper.TESTFN), 'rwxa')
def test_RawIOBase_readall(self):
# Exercise the default unlimited RawIOBase.read() and readall()
@@ -1454,9 +1457,9 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
l = list(range(256)) * N
random.shuffle(l)
s = bytes(bytearray(l))
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
f.write(s)
- with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
+ with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
bufio = self.tp(raw, 8)
errors = []
results = []
@@ -1482,7 +1485,7 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
c = bytes(bytearray([i]))
self.assertEqual(s.count(c), N)
finally:
- support.unlink(support.TESTFN)
+ os_helper.unlink(os_helper.TESTFN)
def test_unseekable(self):
bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
@@ -1572,9 +1575,9 @@ class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
def test_garbage_collection(self):
# C BufferedReader objects are collected.
# The Python version has __del__, so it ends into gc.garbage instead
- self.addCleanup(support.unlink, support.TESTFN)
- with support.check_warnings(('', ResourceWarning)):
- rawio = self.FileIO(support.TESTFN, "w+b")
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+ with warnings_helper.check_warnings(('', ResourceWarning)):
+ rawio = self.FileIO(os_helper.TESTFN, "w+b")
f = self.tp(rawio)
f.f = f
wr = weakref.ref(f)
@@ -1791,26 +1794,26 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
def test_truncate(self):
# Truncate implicitly flushes the buffer.
- self.addCleanup(support.unlink, support.TESTFN)
- with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+ with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
bufio = self.tp(raw, 8)
bufio.write(b"abcdef")
self.assertEqual(bufio.truncate(3), 3)
self.assertEqual(bufio.tell(), 6)
- with self.open(support.TESTFN, "rb", buffering=0) as f:
+ with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
self.assertEqual(f.read(), b"abc")
def test_truncate_after_write(self):
# Ensure that truncate preserves the file position after
# writes longer than the buffer size.
# Issue: https://bugs.python.org/issue32228
- self.addCleanup(support.unlink, support.TESTFN)
- with self.open(support.TESTFN, "wb") as f:
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+ with self.open(os_helper.TESTFN, "wb") as f:
# Fill with some buffer
f.write(b'\x00' * 10000)
buffer_sizes = [8192, 4096, 200]
for buffer_size in buffer_sizes:
- with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
+ with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
f.write(b'\x00' * (buffer_size + 1))
# After write write_pos and write_end are set to 0
f.read(1)
@@ -1838,7 +1841,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
# writing the buffer to the raw streams. This is in addition
# to concurrency issues due to switching threads in the middle
# of Python code.
- with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
+ with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
bufio = self.tp(raw, 8)
errors = []
def f():
@@ -1858,12 +1861,12 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
self.assertFalse(errors,
"the following exceptions were caught: %r" % errors)
bufio.close()
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
s = f.read()
for i in range(256):
self.assertEqual(s.count(bytes([i])), N)
finally:
- support.unlink(support.TESTFN)
+ os_helper.unlink(os_helper.TESTFN)
def test_misbehaved_io(self):
rawio = self.MisbehavedRawIO()
@@ -1931,9 +1934,9 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
# C BufferedWriter objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends into gc.garbage instead
- self.addCleanup(support.unlink, support.TESTFN)
- with support.check_warnings(('', ResourceWarning)):
- rawio = self.FileIO(support.TESTFN, "w+b")
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+ with warnings_helper.check_warnings(('', ResourceWarning)):
+ rawio = self.FileIO(os_helper.TESTFN, "w+b")
f = self.tp(rawio)
f.write(b"123xxx")
f.x = f
@@ -1941,7 +1944,7 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
del f
support.gc_collect()
self.assertIsNone(wr(), wr)
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"123xxx")
def test_args_error(self):
@@ -2579,10 +2582,10 @@ class TextIOWrapperTest(unittest.TestCase):
def setUp(self):
self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
- support.unlink(support.TESTFN)
+ os_helper.unlink(os_helper.TESTFN)
def tearDown(self):
- support.unlink(support.TESTFN)
+ os_helper.unlink(os_helper.TESTFN)
def test_constructor(self):
r = self.BytesIO(b"\xc3\xa9\n\n")
@@ -2918,11 +2921,11 @@ class TextIOWrapperTest(unittest.TestCase):
def test_basic_io(self):
for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
- f = self.open(support.TESTFN, "w+", encoding=enc)
+ f = self.open(os_helper.TESTFN, "w+", encoding=enc)
f._CHUNK_SIZE = chunksize
self.assertEqual(f.write("abc"), 3)
f.close()
- f = self.open(support.TESTFN, "r+", encoding=enc)
+ f = self.open(os_helper.TESTFN, "r+", encoding=enc)
f._CHUNK_SIZE = chunksize
self.assertEqual(f.tell(), 0)
self.assertEqual(f.read(), "abc")
@@ -2967,7 +2970,7 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(rlines, wlines)
def test_telling(self):
- f = self.open(support.TESTFN, "w+", encoding="utf-8")
+ f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
p0 = f.tell()
f.write("\xff\n")
p1 = f.tell()
@@ -2995,9 +2998,9 @@ class TextIOWrapperTest(unittest.TestCase):
u_suffix = "\u8888\n"
suffix = bytes(u_suffix.encode("utf-8"))
line = prefix + suffix
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
f.write(line*2)
- with self.open(support.TESTFN, "r", encoding="utf-8") as f:
+ with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
s = f.read(prefix_size)
self.assertEqual(s, str(prefix, "ascii"))
self.assertEqual(f.tell(), prefix_size)
@@ -3006,9 +3009,9 @@ class TextIOWrapperTest(unittest.TestCase):
def test_seeking_too(self):
# Regression test for a specific bug
data = b'\xe0\xbf\xbf\n'
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
f.write(data)
- with self.open(support.TESTFN, "r", encoding="utf-8") as f:
+ with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
f._CHUNK_SIZE # Just test that it exists
f._CHUNK_SIZE = 2
f.readline()
@@ -3022,17 +3025,17 @@ class TextIOWrapperTest(unittest.TestCase):
def test_seek_and_tell_with_data(data, min_pos=0):
"""Tell/seek to various points within a data stream and ensure
that the decoded data returned by read() is consistent."""
- f = self.open(support.TESTFN, 'wb')
+ f = self.open(os_helper.TESTFN, 'wb')
f.write(data)
f.close()
- f = self.open(support.TESTFN, encoding='test_decoder')
+ f = self.open(os_helper.TESTFN, encoding='test_decoder')
f._CHUNK_SIZE = CHUNK_SIZE
decoded = f.read()
f.close()
for i in range(min_pos, len(decoded) + 1): # seek positions
for j in [1, 5, len(decoded) - i]: # read lengths
- f = self.open(support.TESTFN, encoding='test_decoder')
+ f = self.open(os_helper.TESTFN, encoding='test_decoder')
self.assertEqual(f.read(i), decoded[:i])
cookie = f.tell()
self.assertEqual(f.read(j), decoded[i:i + j])
@@ -3062,11 +3065,11 @@ class TextIOWrapperTest(unittest.TestCase):
StatefulIncrementalDecoder.codecEnabled = 0
def test_multibyte_seek_and_tell(self):
- f = self.open(support.TESTFN, "w", encoding="euc_jp")
+ f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
f.write("AB\n\u3046\u3048\n")
f.close()
- f = self.open(support.TESTFN, "r", encoding="euc_jp")
+ f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
self.assertEqual(f.readline(), "AB\n")
p0 = f.tell()
self.assertEqual(f.readline(), "\u3046\u3048\n")
@@ -3077,7 +3080,7 @@ class TextIOWrapperTest(unittest.TestCase):
f.close()
def test_seek_with_encoder_state(self):
- f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
+ f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
f.write("\u00e6\u0300")
p0 = f.tell()
f.write("\u00e6")
@@ -3085,7 +3088,7 @@ class TextIOWrapperTest(unittest.TestCase):
f.write("\u0300")
f.close()
- f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
+ f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
f.close()
@@ -3229,7 +3232,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_append_bom(self):
# The BOM is not written again when appending to a non-empty file
- filename = support.TESTFN
+ filename = os_helper.TESTFN
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
with self.open(filename, 'w', encoding=charset) as f:
f.write('aaa')
@@ -3244,7 +3247,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_seek_bom(self):
# Same test, but when seeking manually
- filename = support.TESTFN
+ filename = os_helper.TESTFN
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
with self.open(filename, 'w', encoding=charset) as f:
f.write('aaa')
@@ -3259,7 +3262,7 @@ class TextIOWrapperTest(unittest.TestCase):
def test_seek_append_bom(self):
# Same test, but first seek to the start and then to the end
- filename = support.TESTFN
+ filename = os_helper.TESTFN
for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
with self.open(filename, 'w', encoding=charset) as f:
f.write('aaa')
@@ -3271,16 +3274,16 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
def test_errors_property(self):
- with self.open(support.TESTFN, "w") as f:
+ with self.open(os_helper.TESTFN, "w") as f:
self.assertEqual(f.errors, "strict")
- with self.open(support.TESTFN, "w", errors="replace") as f:
+ with self.open(os_helper.TESTFN, "w", errors="replace") as f:
self.assertEqual(f.errors, "replace")
@support.no_tracing
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
event = threading.Event()
- with self.open(support.TESTFN, "w", buffering=1) as f:
+ with self.open(os_helper.TESTFN, "w", buffering=1) as f:
def run(n):
text = "Thread%03d\n" % n
event.wait()
@@ -3289,7 +3292,7 @@ class TextIOWrapperTest(unittest.TestCase):
for x in range(20)]
with threading_helper.start_threads(threads, event.set):
time.sleep(0.02)
- with self.open(support.TESTFN) as f:
+ with self.open(os_helper.TESTFN) as f:
content = f.read()
for n in range(20):
self.assertEqual(content.count("Thread%03d\n" % n), 1)
@@ -3732,8 +3735,8 @@ class CTextIOWrapperTest(TextIOWrapperTest):
# C TextIOWrapper objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends in gc.garbage instead.
- with support.check_warnings(('', ResourceWarning)):
- rawio = io.FileIO(support.TESTFN, "wb")
+ with warnings_helper.check_warnings(('', ResourceWarning)):
+ rawio = io.FileIO(os_helper.TESTFN, "wb")
b = self.BufferedWriter(rawio)
t = self.TextIOWrapper(b, encoding="ascii")
t.write("456def")
@@ -3742,7 +3745,7 @@ class CTextIOWrapperTest(TextIOWrapperTest):
del t
support.gc_collect()
self.assertIsNone(wr(), wr)
- with self.open(support.TESTFN, "rb") as f:
+ with self.open(os_helper.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"456def")
def test_rwpair_cleared_before_textio(self):
@@ -3899,7 +3902,7 @@ class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
class MiscIOTest(unittest.TestCase):
def tearDown(self):
- support.unlink(support.TESTFN)
+ os_helper.unlink(os_helper.TESTFN)
def test___all__(self):
for name in self.io.__all__:
@@ -3913,21 +3916,21 @@ class MiscIOTest(unittest.TestCase):
self.assertTrue(issubclass(obj, self.IOBase))
def test_attributes(self):
- f = self.open(support.TESTFN, "wb", buffering=0)
+ f = self.open(os_helper.TESTFN, "wb", buffering=0)
self.assertEqual(f.mode, "wb")
f.close()
- with support.check_warnings(('', DeprecationWarning)):
- f = self.open(support.TESTFN, "U")
- self.assertEqual(f.name, support.TESTFN)
- self.assertEqual(f.buffer.name, support.TESTFN)
- self.assertEqual(f.buffer.raw.name, support.TESTFN)
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ f = self.open(os_helper.TESTFN, "U")
+ self.assertEqual(f.name, os_helper.TESTFN)
+ self.assertEqual(f.buffer.name, os_helper.TESTFN)
+ self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
self.assertEqual(f.mode, "U")
self.assertEqual(f.buffer.mode, "rb")
self.assertEqual(f.buffer.raw.mode, "rb")
f.close()
- f = self.open(support.TESTFN, "w+")
+ f = self.open(os_helper.TESTFN, "w+")
self.assertEqual(f.mode, "w+")
self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
self.assertEqual(f.buffer.raw.mode, "rb+")
@@ -3969,7 +3972,7 @@ class MiscIOTest(unittest.TestCase):
{"mode": "w+", "buffering": 2},
{"mode": "w+b", "buffering": 0},
]:
- f = self.open(support.TESTFN, **kwargs)
+ f = self.open(os_helper.TESTFN, **kwargs)
f.close()
self.assertRaises(ValueError, f.flush)
self.assertRaises(ValueError, f.fileno)
@@ -4019,17 +4022,17 @@ class MiscIOTest(unittest.TestCase):
self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
def _check_abc_inheritance(self, abcmodule):
- with self.open(support.TESTFN, "wb", buffering=0) as f:
+ with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
self.assertIsInstance(f, abcmodule.IOBase)
self.assertIsInstance(f, abcmodule.RawIOBase)
self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
self.assertNotIsInstance(f, abcmodule.TextIOBase)
- with self.open(support.TESTFN, "wb") as f:
+ with self.open(os_helper.TESTFN, "wb") as f:
self.assertIsInstance(f, abcmodule.IOBase)
self.assertNotIsInstance(f, abcmodule.RawIOBase)
self.assertIsInstance(f, abcmodule.BufferedIOBase)
self.assertNotIsInstance(f, abcmodule.TextIOBase)
- with self.open(support.TESTFN, "w") as f:
+ with self.open(os_helper.TESTFN, "w") as f:
self.assertIsInstance(f, abcmodule.IOBase)
self.assertNotIsInstance(f, abcmodule.RawIOBase)
self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
@@ -4053,9 +4056,9 @@ class MiscIOTest(unittest.TestCase):
self.assertIn(r, str(cm.warning.args[0]))
def test_warn_on_dealloc(self):
- self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
- self._check_warn_on_dealloc(support.TESTFN, "wb")
- self._check_warn_on_dealloc(support.TESTFN, "w")
+ self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
+ self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
+ self._check_warn_on_dealloc(os_helper.TESTFN, "w")
def _check_warn_on_dealloc_fd(self, *args, **kwargs):
fds = []
@@ -4073,7 +4076,7 @@ class MiscIOTest(unittest.TestCase):
# When using closefd=False, there's no warning
r, w = os.pipe()
fds += r, w
- with support.check_no_resource_warning(self):
+ with warnings_helper.check_no_resource_warning(self):
open(r, *args, closefd=False, **kwargs)
def test_warn_on_dealloc_fd(self):
@@ -4096,7 +4099,7 @@ class MiscIOTest(unittest.TestCase):
{"mode": "w+b", "buffering": 0},
]:
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
- with self.open(support.TESTFN, **kwargs) as f:
+ with self.open(os_helper.TESTFN, **kwargs) as f:
self.assertRaises(TypeError, pickle.dumps, f, protocol)
def test_nonblock_pipe_write_bigbuf(self):
@@ -4159,20 +4162,20 @@ class MiscIOTest(unittest.TestCase):
def test_create_fail(self):
# 'x' mode fails if file is existing
- with self.open(support.TESTFN, 'w'):
+ with self.open(os_helper.TESTFN, 'w'):
pass
- self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
+ self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x')
def test_create_writes(self):
# 'x' mode opens for writing
- with self.open(support.TESTFN, 'xb') as f:
+ with self.open(os_helper.TESTFN, 'xb') as f:
f.write(b"spam")
- with self.open(support.TESTFN, 'rb') as f:
+ with self.open(os_helper.TESTFN, 'rb') as f:
self.assertEqual(b"spam", f.read())
def test_open_allargs(self):
# there used to be a buffer overflow in the parser for rawmode
- self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
+ self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+')
def test_check_encoding_errors(self):
# bpo-37388: open() and TextIOWrapper must check encoding and errors
@@ -4427,7 +4430,7 @@ class SignalsTest(unittest.TestCase):
"""Check that a buffered write, when it gets interrupted (either
returning a partial result or EINTR), properly invokes the signal
handler and retries if the latter returned successfully."""
- select = support.import_module("select")
+ select = import_helper.import_module("select")
# A quantity that exceeds the buffer size of an anonymous pipe's
# write end.