diff options
author | Hai Shi <shihai1992@gmail.com> | 2020-07-06 09:12:49 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-06 09:12:49 (GMT) |
commit | 883bc638335a57a6e6a6344c2fc132c4f9a0ec42 (patch) | |
tree | 1dd74a4aeeb80d8b85ff18de700763199a7bc1bc /Lib/test/test_io.py | |
parent | 9ce8132e1f2339cfe116dfd4795574182c2245b4 (diff) | |
download | cpython-883bc638335a57a6e6a6344c2fc132c4f9a0ec42.zip cpython-883bc638335a57a6e6a6344c2fc132c4f9a0ec42.tar.gz cpython-883bc638335a57a6e6a6344c2fc132c4f9a0ec42.tar.bz2 |
bpo-40275: Use new test.support helper submodules in tests (GH-21314)
Diffstat (limited to 'Lib/test/test_io.py')
-rw-r--r-- | Lib/test/test_io.py | 257 |
1 files changed, 130 insertions, 127 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index c0d67a17..18b8a79 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -40,8 +40,11 @@ from itertools import cycle, count from test import support from test.support.script_helper import ( assert_python_ok, assert_python_failure, run_python_until_end) +from test.support import import_helper +from test.support import os_helper from test.support import threading_helper -from test.support import FakePath +from test.support import warnings_helper +from test.support.os_helper import FakePath import codecs import io # C implementation of io @@ -317,10 +320,10 @@ class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): class IOTest(unittest.TestCase): def setUp(self): - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def tearDown(self): - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def write_ops(self, f): self.assertEqual(f.write(b"blah."), 5) @@ -405,19 +408,19 @@ class IOTest(unittest.TestCase): # Try writing on a file opened in read mode and vice-versa. exc = self.UnsupportedOperation for mode in ("w", "wb"): - with self.open(support.TESTFN, mode) as fp: + with self.open(os_helper.TESTFN, mode) as fp: self.assertRaises(exc, fp.read) self.assertRaises(exc, fp.readline) - with self.open(support.TESTFN, "wb", buffering=0) as fp: + with self.open(os_helper.TESTFN, "wb", buffering=0) as fp: self.assertRaises(exc, fp.read) self.assertRaises(exc, fp.readline) - with self.open(support.TESTFN, "rb", buffering=0) as fp: + with self.open(os_helper.TESTFN, "rb", buffering=0) as fp: self.assertRaises(exc, fp.write, b"blah") self.assertRaises(exc, fp.writelines, [b"blah\n"]) - with self.open(support.TESTFN, "rb") as fp: + with self.open(os_helper.TESTFN, "rb") as fp: self.assertRaises(exc, fp.write, b"blah") self.assertRaises(exc, fp.writelines, [b"blah\n"]) - with self.open(support.TESTFN, "r") as fp: + with self.open(os_helper.TESTFN, "r") as fp: self.assertRaises(exc, fp.write, "blah") self.assertRaises(exc, fp.writelines, ["blah\n"]) # Non-zero seeking from current or end pos @@ -538,33 +541,33 @@ class IOTest(unittest.TestCase): self.assertRaises(ValueError, self.open, bytes_fn, 'w') def test_raw_file_io(self): - with self.open(support.TESTFN, "wb", buffering=0) as f: + with self.open(os_helper.TESTFN, "wb", buffering=0) as f: self.assertEqual(f.readable(), False) self.assertEqual(f.writable(), True) self.assertEqual(f.seekable(), True) self.write_ops(f) - with self.open(support.TESTFN, "rb", buffering=0) as f: + with self.open(os_helper.TESTFN, "rb", buffering=0) as f: self.assertEqual(f.readable(), True) self.assertEqual(f.writable(), False) self.assertEqual(f.seekable(), True) self.read_ops(f) def test_buffered_file_io(self): - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: self.assertEqual(f.readable(), False) self.assertEqual(f.writable(), True) self.assertEqual(f.seekable(), True) self.write_ops(f) - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: self.assertEqual(f.readable(), True) self.assertEqual(f.writable(), False) self.assertEqual(f.seekable(), True) self.read_ops(f, True) def test_readline(self): - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: self.assertEqual(f.readline(), b"abc\n") self.assertEqual(f.readline(10), b"def\n") self.assertEqual(f.readline(2), b"xy") @@ -572,7 +575,7 @@ class IOTest(unittest.TestCase): self.assertEqual(f.readline(), b"foo\x00bar\n") self.assertEqual(f.readline(None), b"another line") self.assertRaises(TypeError, f.readline, 5.3) - with self.open(support.TESTFN, "r") as f: + with self.open(os_helper.TESTFN, "r") as f: self.assertRaises(TypeError, f.readline, 5.3) def test_readline_nonsizeable(self): @@ -607,20 +610,20 @@ class IOTest(unittest.TestCase): support.requires( 'largefile', 'test requires %s bytes and a long time to run' % self.LARGE) - with self.open(support.TESTFN, "w+b", 0) as f: + with self.open(os_helper.TESTFN, "w+b", 0) as f: self.large_file_ops(f) - with self.open(support.TESTFN, "w+b") as f: + with self.open(os_helper.TESTFN, "w+b") as f: self.large_file_ops(f) def test_with_open(self): for bufsize in (0, 100): f = None - with self.open(support.TESTFN, "wb", bufsize) as f: + with self.open(os_helper.TESTFN, "wb", bufsize) as f: f.write(b"xxx") self.assertEqual(f.closed, True) f = None try: - with self.open(support.TESTFN, "wb", bufsize) as f: + with self.open(os_helper.TESTFN, "wb", bufsize) as f: 1/0 except ZeroDivisionError: self.assertEqual(f.closed, True) @@ -629,13 +632,13 @@ class IOTest(unittest.TestCase): # issue 5008 def test_append_mode_tell(self): - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: f.write(b"xxx") - with self.open(support.TESTFN, "ab", buffering=0) as f: + with self.open(os_helper.TESTFN, "ab", buffering=0) as f: self.assertEqual(f.tell(), 3) - with self.open(support.TESTFN, "ab") as f: + with self.open(os_helper.TESTFN, "ab") as f: self.assertEqual(f.tell(), 3) - with self.open(support.TESTFN, "a") as f: + with self.open(os_helper.TESTFN, "a") as f: self.assertGreater(f.tell(), 0) def test_destructor(self): @@ -655,13 +658,13 @@ class IOTest(unittest.TestCase): def flush(self): record.append(3) super().flush() - with support.check_warnings(('', ResourceWarning)): - f = MyFileIO(support.TESTFN, "wb") + with warnings_helper.check_warnings(('', ResourceWarning)): + f = MyFileIO(os_helper.TESTFN, "wb") f.write(b"xxx") del f support.gc_collect() self.assertEqual(record, [1, 2, 3]) - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: self.assertEqual(f.read(), b"xxx") def _check_base_destructor(self, base): @@ -707,9 +710,9 @@ class IOTest(unittest.TestCase): self._check_base_destructor(self.TextIOBase) def test_close_flushes(self): - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: f.write(b"xxx") - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: self.assertEqual(f.read(), b"xxx") def test_array_writes(self): @@ -720,25 +723,25 @@ class IOTest(unittest.TestCase): self.assertEqual(f.write(a), n) f.writelines((a,)) check(self.BytesIO()) - check(self.FileIO(support.TESTFN, "w")) + check(self.FileIO(os_helper.TESTFN, "w")) check(self.BufferedWriter(self.MockRawIO())) check(self.BufferedRandom(self.MockRawIO())) check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) def test_closefd(self): - self.assertRaises(ValueError, self.open, support.TESTFN, 'w', + self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w', closefd=False) def test_read_closed(self): - with self.open(support.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w") as f: f.write("egg\n") - with self.open(support.TESTFN, "r") as f: + with self.open(os_helper.TESTFN, "r") as f: file = self.open(f.fileno(), "r", closefd=False) self.assertEqual(file.read(), "egg\n") file.seek(0) file.close() self.assertRaises(ValueError, file.read) - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: file = self.open(f.fileno(), "rb", closefd=False) self.assertEqual(file.read()[:3], b"egg") file.close() @@ -746,12 +749,12 @@ class IOTest(unittest.TestCase): def test_no_closefd_with_filename(self): # can't use closefd in combination with a file name - self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) + self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False) def test_closefd_attr(self): - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: f.write(b"egg\n") - with self.open(support.TESTFN, "r") as f: + with self.open(os_helper.TESTFN, "r") as f: self.assertEqual(f.buffer.raw.closefd, True) file = self.open(f.fileno(), "r", closefd=False) self.assertEqual(file.buffer.raw.closefd, False) @@ -759,15 +762,15 @@ class IOTest(unittest.TestCase): def test_garbage_collection(self): # FileIO objects are collected, and collecting them flushes # all data to disk. - with support.check_warnings(('', ResourceWarning)): - f = self.FileIO(support.TESTFN, "wb") + with warnings_helper.check_warnings(('', ResourceWarning)): + f = self.FileIO(os_helper.TESTFN, "wb") f.write(b"abcxxx") f.f = f wr = weakref.ref(f) del f support.gc_collect() self.assertIsNone(wr(), wr) - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: self.assertEqual(f.read(), b"abcxxx") def test_unbounded_file(self): @@ -804,29 +807,29 @@ class IOTest(unittest.TestCase): def test_flush_error_on_close(self): # raw file # Issue #5700: io.FileIO calls flush() after file closed - self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) - fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0) + fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) self.check_flush_error_on_close(fd, 'wb', buffering=0) - fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) os.close(fd) # buffered io - self.check_flush_error_on_close(support.TESTFN, 'wb') - fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(os_helper.TESTFN, 'wb') + fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) self.check_flush_error_on_close(fd, 'wb') - fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) self.check_flush_error_on_close(fd, 'wb', closefd=False) os.close(fd) # text io - self.check_flush_error_on_close(support.TESTFN, 'w') - fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + self.check_flush_error_on_close(os_helper.TESTFN, 'w') + fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) self.check_flush_error_on_close(fd, 'w') - fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) + fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT) self.check_flush_error_on_close(fd, 'w', closefd=False) os.close(fd) def test_multi_close(self): - f = self.open(support.TESTFN, "wb", buffering=0) + f = self.open(os_helper.TESTFN, "wb", buffering=0) f.close() f.close() f.close() @@ -857,9 +860,9 @@ class IOTest(unittest.TestCase): self.assertTrue(hasattr(obj, "__dict__")) def test_opener(self): - with self.open(support.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w") as f: f.write("egg\n") - fd = os.open(support.TESTFN, os.O_RDONLY) + fd = os.open(os_helper.TESTFN, os.O_RDONLY) def opener(path, flags): return fd with self.open("non-existent", "r", opener=opener) as f: @@ -894,14 +897,14 @@ class IOTest(unittest.TestCase): f2.readline() def test_nonbuffered_textio(self): - with support.check_no_resource_warning(self): + with warnings_helper.check_no_resource_warning(self): with self.assertRaises(ValueError): - self.open(support.TESTFN, 'w', buffering=0) + self.open(os_helper.TESTFN, 'w', buffering=0) def test_invalid_newline(self): - with support.check_no_resource_warning(self): + with warnings_helper.check_no_resource_warning(self): with self.assertRaises(ValueError): - self.open(support.TESTFN, 'w', newline='invalid') + self.open(os_helper.TESTFN, 'w', newline='invalid') def test_buffered_readinto_mixin(self): # Test the implementation provided by BufferedIOBase @@ -924,10 +927,10 @@ class IOTest(unittest.TestCase): with self.open(path, "r") as f: self.assertEqual(f.read(), "egg\n") - check_path_succeeds(FakePath(support.TESTFN)) - check_path_succeeds(FakePath(support.TESTFN.encode('utf-8'))) + check_path_succeeds(FakePath(os_helper.TESTFN)) + check_path_succeeds(FakePath(os_helper.TESTFN.encode('utf-8'))) - with self.open(support.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w") as f: bad_path = FakePath(f.fileno()) with self.assertRaises(TypeError): self.open(bad_path, 'w') @@ -942,7 +945,7 @@ class IOTest(unittest.TestCase): # ensure that refcounting is correct with some error conditions with self.assertRaisesRegex(ValueError, 'read/write/append mode'): - self.open(FakePath(support.TESTFN), 'rwxa') + self.open(FakePath(os_helper.TESTFN), 'rwxa') def test_RawIOBase_readall(self): # Exercise the default unlimited RawIOBase.read() and readall() @@ -1454,9 +1457,9 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): l = list(range(256)) * N random.shuffle(l) s = bytes(bytearray(l)) - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: f.write(s) - with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: + with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw: bufio = self.tp(raw, 8) errors = [] results = [] @@ -1482,7 +1485,7 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): c = bytes(bytearray([i])) self.assertEqual(s.count(c), N) finally: - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def test_unseekable(self): bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) @@ -1572,9 +1575,9 @@ class CBufferedReaderTest(BufferedReaderTest, SizeofTest): def test_garbage_collection(self): # C BufferedReader objects are collected. # The Python version has __del__, so it ends into gc.garbage instead - self.addCleanup(support.unlink, support.TESTFN) - with support.check_warnings(('', ResourceWarning)): - rawio = self.FileIO(support.TESTFN, "w+b") + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + with warnings_helper.check_warnings(('', ResourceWarning)): + rawio = self.FileIO(os_helper.TESTFN, "w+b") f = self.tp(rawio) f.f = f wr = weakref.ref(f) @@ -1791,26 +1794,26 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): def test_truncate(self): # Truncate implicitly flushes the buffer. - self.addCleanup(support.unlink, support.TESTFN) - with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw: bufio = self.tp(raw, 8) bufio.write(b"abcdef") self.assertEqual(bufio.truncate(3), 3) self.assertEqual(bufio.tell(), 6) - with self.open(support.TESTFN, "rb", buffering=0) as f: + with self.open(os_helper.TESTFN, "rb", buffering=0) as f: self.assertEqual(f.read(), b"abc") def test_truncate_after_write(self): # Ensure that truncate preserves the file position after # writes longer than the buffer size. # Issue: https://bugs.python.org/issue32228 - self.addCleanup(support.unlink, support.TESTFN) - with self.open(support.TESTFN, "wb") as f: + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + with self.open(os_helper.TESTFN, "wb") as f: # Fill with some buffer f.write(b'\x00' * 10000) buffer_sizes = [8192, 4096, 200] for buffer_size in buffer_sizes: - with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f: + with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f: f.write(b'\x00' * (buffer_size + 1)) # After write write_pos and write_end are set to 0 f.read(1) @@ -1838,7 +1841,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): # writing the buffer to the raw streams. This is in addition # to concurrency issues due to switching threads in the middle # of Python code. - with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: + with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw: bufio = self.tp(raw, 8) errors = [] def f(): @@ -1858,12 +1861,12 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): self.assertFalse(errors, "the following exceptions were caught: %r" % errors) bufio.close() - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: s = f.read() for i in range(256): self.assertEqual(s.count(bytes([i])), N) finally: - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def test_misbehaved_io(self): rawio = self.MisbehavedRawIO() @@ -1931,9 +1934,9 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest): # C BufferedWriter objects are collected, and collecting them flushes # all data to disk. # The Python version has __del__, so it ends into gc.garbage instead - self.addCleanup(support.unlink, support.TESTFN) - with support.check_warnings(('', ResourceWarning)): - rawio = self.FileIO(support.TESTFN, "w+b") + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + with warnings_helper.check_warnings(('', ResourceWarning)): + rawio = self.FileIO(os_helper.TESTFN, "w+b") f = self.tp(rawio) f.write(b"123xxx") f.x = f @@ -1941,7 +1944,7 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest): del f support.gc_collect() self.assertIsNone(wr(), wr) - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: self.assertEqual(f.read(), b"123xxx") def test_args_error(self): @@ -2579,10 +2582,10 @@ class TextIOWrapperTest(unittest.TestCase): def setUp(self): self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def tearDown(self): - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def test_constructor(self): r = self.BytesIO(b"\xc3\xa9\n\n") @@ -2918,11 +2921,11 @@ class TextIOWrapperTest(unittest.TestCase): def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": - f = self.open(support.TESTFN, "w+", encoding=enc) + f = self.open(os_helper.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.write("abc"), 3) f.close() - f = self.open(support.TESTFN, "r+", encoding=enc) + f = self.open(os_helper.TESTFN, "r+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.tell(), 0) self.assertEqual(f.read(), "abc") @@ -2967,7 +2970,7 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(rlines, wlines) def test_telling(self): - f = self.open(support.TESTFN, "w+", encoding="utf-8") + f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") p0 = f.tell() f.write("\xff\n") p1 = f.tell() @@ -2995,9 +2998,9 @@ class TextIOWrapperTest(unittest.TestCase): u_suffix = "\u8888\n" suffix = bytes(u_suffix.encode("utf-8")) line = prefix + suffix - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: f.write(line*2) - with self.open(support.TESTFN, "r", encoding="utf-8") as f: + with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: s = f.read(prefix_size) self.assertEqual(s, str(prefix, "ascii")) self.assertEqual(f.tell(), prefix_size) @@ -3006,9 +3009,9 @@ class TextIOWrapperTest(unittest.TestCase): def test_seeking_too(self): # Regression test for a specific bug data = b'\xe0\xbf\xbf\n' - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: f.write(data) - with self.open(support.TESTFN, "r", encoding="utf-8") as f: + with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: f._CHUNK_SIZE # Just test that it exists f._CHUNK_SIZE = 2 f.readline() @@ -3022,17 +3025,17 @@ class TextIOWrapperTest(unittest.TestCase): def test_seek_and_tell_with_data(data, min_pos=0): """Tell/seek to various points within a data stream and ensure that the decoded data returned by read() is consistent.""" - f = self.open(support.TESTFN, 'wb') + f = self.open(os_helper.TESTFN, 'wb') f.write(data) f.close() - f = self.open(support.TESTFN, encoding='test_decoder') + f = self.open(os_helper.TESTFN, encoding='test_decoder') f._CHUNK_SIZE = CHUNK_SIZE decoded = f.read() f.close() for i in range(min_pos, len(decoded) + 1): # seek positions for j in [1, 5, len(decoded) - i]: # read lengths - f = self.open(support.TESTFN, encoding='test_decoder') + f = self.open(os_helper.TESTFN, encoding='test_decoder') self.assertEqual(f.read(i), decoded[:i]) cookie = f.tell() self.assertEqual(f.read(j), decoded[i:i + j]) @@ -3062,11 +3065,11 @@ class TextIOWrapperTest(unittest.TestCase): StatefulIncrementalDecoder.codecEnabled = 0 def test_multibyte_seek_and_tell(self): - f = self.open(support.TESTFN, "w", encoding="euc_jp") + f = self.open(os_helper.TESTFN, "w", encoding="euc_jp") f.write("AB\n\u3046\u3048\n") f.close() - f = self.open(support.TESTFN, "r", encoding="euc_jp") + f = self.open(os_helper.TESTFN, "r", encoding="euc_jp") self.assertEqual(f.readline(), "AB\n") p0 = f.tell() self.assertEqual(f.readline(), "\u3046\u3048\n") @@ -3077,7 +3080,7 @@ class TextIOWrapperTest(unittest.TestCase): f.close() def test_seek_with_encoder_state(self): - f = self.open(support.TESTFN, "w", encoding="euc_jis_2004") + f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004") f.write("\u00e6\u0300") p0 = f.tell() f.write("\u00e6") @@ -3085,7 +3088,7 @@ class TextIOWrapperTest(unittest.TestCase): f.write("\u0300") f.close() - f = self.open(support.TESTFN, "r", encoding="euc_jis_2004") + f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004") self.assertEqual(f.readline(), "\u00e6\u0300\u0300") f.close() @@ -3229,7 +3232,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_append_bom(self): # The BOM is not written again when appending to a non-empty file - filename = support.TESTFN + filename = os_helper.TESTFN for charset in ('utf-8-sig', 'utf-16', 'utf-32'): with self.open(filename, 'w', encoding=charset) as f: f.write('aaa') @@ -3244,7 +3247,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_seek_bom(self): # Same test, but when seeking manually - filename = support.TESTFN + filename = os_helper.TESTFN for charset in ('utf-8-sig', 'utf-16', 'utf-32'): with self.open(filename, 'w', encoding=charset) as f: f.write('aaa') @@ -3259,7 +3262,7 @@ class TextIOWrapperTest(unittest.TestCase): def test_seek_append_bom(self): # Same test, but first seek to the start and then to the end - filename = support.TESTFN + filename = os_helper.TESTFN for charset in ('utf-8-sig', 'utf-16', 'utf-32'): with self.open(filename, 'w', encoding=charset) as f: f.write('aaa') @@ -3271,16 +3274,16 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) def test_errors_property(self): - with self.open(support.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w") as f: self.assertEqual(f.errors, "strict") - with self.open(support.TESTFN, "w", errors="replace") as f: + with self.open(os_helper.TESTFN, "w", errors="replace") as f: self.assertEqual(f.errors, "replace") @support.no_tracing def test_threads_write(self): # Issue6750: concurrent writes could duplicate data event = threading.Event() - with self.open(support.TESTFN, "w", buffering=1) as f: + with self.open(os_helper.TESTFN, "w", buffering=1) as f: def run(n): text = "Thread%03d\n" % n event.wait() @@ -3289,7 +3292,7 @@ class TextIOWrapperTest(unittest.TestCase): for x in range(20)] with threading_helper.start_threads(threads, event.set): time.sleep(0.02) - with self.open(support.TESTFN) as f: + with self.open(os_helper.TESTFN) as f: content = f.read() for n in range(20): self.assertEqual(content.count("Thread%03d\n" % n), 1) @@ -3732,8 +3735,8 @@ class CTextIOWrapperTest(TextIOWrapperTest): # C TextIOWrapper objects are collected, and collecting them flushes # all data to disk. # The Python version has __del__, so it ends in gc.garbage instead. - with support.check_warnings(('', ResourceWarning)): - rawio = io.FileIO(support.TESTFN, "wb") + with warnings_helper.check_warnings(('', ResourceWarning)): + rawio = io.FileIO(os_helper.TESTFN, "wb") b = self.BufferedWriter(rawio) t = self.TextIOWrapper(b, encoding="ascii") t.write("456def") @@ -3742,7 +3745,7 @@ class CTextIOWrapperTest(TextIOWrapperTest): del t support.gc_collect() self.assertIsNone(wr(), wr) - with self.open(support.TESTFN, "rb") as f: + with self.open(os_helper.TESTFN, "rb") as f: self.assertEqual(f.read(), b"456def") def test_rwpair_cleared_before_textio(self): @@ -3899,7 +3902,7 @@ class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): class MiscIOTest(unittest.TestCase): def tearDown(self): - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def test___all__(self): for name in self.io.__all__: @@ -3913,21 +3916,21 @@ class MiscIOTest(unittest.TestCase): self.assertTrue(issubclass(obj, self.IOBase)) def test_attributes(self): - f = self.open(support.TESTFN, "wb", buffering=0) + f = self.open(os_helper.TESTFN, "wb", buffering=0) self.assertEqual(f.mode, "wb") f.close() - with support.check_warnings(('', DeprecationWarning)): - f = self.open(support.TESTFN, "U") - self.assertEqual(f.name, support.TESTFN) - self.assertEqual(f.buffer.name, support.TESTFN) - self.assertEqual(f.buffer.raw.name, support.TESTFN) + with warnings_helper.check_warnings(('', DeprecationWarning)): + f = self.open(os_helper.TESTFN, "U") + self.assertEqual(f.name, os_helper.TESTFN) + self.assertEqual(f.buffer.name, os_helper.TESTFN) + self.assertEqual(f.buffer.raw.name, os_helper.TESTFN) self.assertEqual(f.mode, "U") self.assertEqual(f.buffer.mode, "rb") self.assertEqual(f.buffer.raw.mode, "rb") f.close() - f = self.open(support.TESTFN, "w+") + f = self.open(os_helper.TESTFN, "w+") self.assertEqual(f.mode, "w+") self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? self.assertEqual(f.buffer.raw.mode, "rb+") @@ -3969,7 +3972,7 @@ class MiscIOTest(unittest.TestCase): {"mode": "w+", "buffering": 2}, {"mode": "w+b", "buffering": 0}, ]: - f = self.open(support.TESTFN, **kwargs) + f = self.open(os_helper.TESTFN, **kwargs) f.close() self.assertRaises(ValueError, f.flush) self.assertRaises(ValueError, f.fileno) @@ -4019,17 +4022,17 @@ class MiscIOTest(unittest.TestCase): self.assertIsInstance(self.TextIOBase, abc.ABCMeta) def _check_abc_inheritance(self, abcmodule): - with self.open(support.TESTFN, "wb", buffering=0) as f: + with self.open(os_helper.TESTFN, "wb", buffering=0) as f: self.assertIsInstance(f, abcmodule.IOBase) self.assertIsInstance(f, abcmodule.RawIOBase) self.assertNotIsInstance(f, abcmodule.BufferedIOBase) self.assertNotIsInstance(f, abcmodule.TextIOBase) - with self.open(support.TESTFN, "wb") as f: + with self.open(os_helper.TESTFN, "wb") as f: self.assertIsInstance(f, abcmodule.IOBase) self.assertNotIsInstance(f, abcmodule.RawIOBase) self.assertIsInstance(f, abcmodule.BufferedIOBase) self.assertNotIsInstance(f, abcmodule.TextIOBase) - with self.open(support.TESTFN, "w") as f: + with self.open(os_helper.TESTFN, "w") as f: self.assertIsInstance(f, abcmodule.IOBase) self.assertNotIsInstance(f, abcmodule.RawIOBase) self.assertNotIsInstance(f, abcmodule.BufferedIOBase) @@ -4053,9 +4056,9 @@ class MiscIOTest(unittest.TestCase): self.assertIn(r, str(cm.warning.args[0])) def test_warn_on_dealloc(self): - self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0) - self._check_warn_on_dealloc(support.TESTFN, "wb") - self._check_warn_on_dealloc(support.TESTFN, "w") + self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0) + self._check_warn_on_dealloc(os_helper.TESTFN, "wb") + self._check_warn_on_dealloc(os_helper.TESTFN, "w") def _check_warn_on_dealloc_fd(self, *args, **kwargs): fds = [] @@ -4073,7 +4076,7 @@ class MiscIOTest(unittest.TestCase): # When using closefd=False, there's no warning r, w = os.pipe() fds += r, w - with support.check_no_resource_warning(self): + with warnings_helper.check_no_resource_warning(self): open(r, *args, closefd=False, **kwargs) def test_warn_on_dealloc_fd(self): @@ -4096,7 +4099,7 @@ class MiscIOTest(unittest.TestCase): {"mode": "w+b", "buffering": 0}, ]: for protocol in range(pickle.HIGHEST_PROTOCOL + 1): - with self.open(support.TESTFN, **kwargs) as f: + with self.open(os_helper.TESTFN, **kwargs) as f: self.assertRaises(TypeError, pickle.dumps, f, protocol) def test_nonblock_pipe_write_bigbuf(self): @@ -4159,20 +4162,20 @@ class MiscIOTest(unittest.TestCase): def test_create_fail(self): # 'x' mode fails if file is existing - with self.open(support.TESTFN, 'w'): + with self.open(os_helper.TESTFN, 'w'): pass - self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') + self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x') def test_create_writes(self): # 'x' mode opens for writing - with self.open(support.TESTFN, 'xb') as f: + with self.open(os_helper.TESTFN, 'xb') as f: f.write(b"spam") - with self.open(support.TESTFN, 'rb') as f: + with self.open(os_helper.TESTFN, 'rb') as f: self.assertEqual(b"spam", f.read()) def test_open_allargs(self): # there used to be a buffer overflow in the parser for rawmode - self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+') + self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+') def test_check_encoding_errors(self): # bpo-37388: open() and TextIOWrapper must check encoding and errors @@ -4427,7 +4430,7 @@ class SignalsTest(unittest.TestCase): """Check that a buffered write, when it gets interrupted (either returning a partial result or EINTR), properly invokes the signal handler and retries if the latter returned successfully.""" - select = support.import_module("select") + select = import_helper.import_module("select") # A quantity that exceeds the buffer size of an anonymous pipe's # write end. |