From a7f5d93bb6906d0f999248b47295d3a59b130f4d Mon Sep 17 00:00:00 2001 From: Hai Shi Date: Tue, 4 Aug 2020 00:41:24 +0800 Subject: bpo-40275: Use new test.support helper submodules in tests (GH-21449) --- Lib/test/test__opcode.py | 2 +- Lib/test/test_asyncore.py | 26 +++++---- Lib/test/test_binascii.py | 15 ++--- Lib/test/test_bisect.py | 7 ++- Lib/test/test_builtin.py | 6 +- Lib/test/test_concurrent_futures.py | 3 +- Lib/test/test_configparser.py | 7 ++- Lib/test/test_coroutines.py | 13 +++-- Lib/test/test_curses.py | 3 +- Lib/test/test_datetime.py | 4 +- Lib/test/test_mmap.py | 5 +- Lib/test/test_signal.py | 3 +- Lib/test/test_ssl.py | 53 +++++++++--------- Lib/test/test_tarfile.py | 107 ++++++++++++++++++------------------ Lib/test/test_threading.py | 3 +- Lib/test/test_trace.py | 3 +- Lib/test/test_tracemalloc.py | 13 +++-- Lib/test/test_xml_etree_c.py | 2 +- Lib/unittest/test/test_discovery.py | 5 +- Lib/unittest/test/test_result.py | 6 +- 20 files changed, 154 insertions(+), 132 deletions(-) diff --git a/Lib/test/test__opcode.py b/Lib/test/test__opcode.py index 0fb39ee..3bb64a7 100644 --- a/Lib/test/test__opcode.py +++ b/Lib/test/test__opcode.py @@ -1,5 +1,5 @@ import dis -from test.support import import_module +from test.support.import_helper import import_module import unittest _opcode = import_module("_opcode") diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 2cee6fb..06c6bc2 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -10,8 +10,10 @@ import struct import threading from test import support +from test.support import os_helper from test.support import socket_helper from test.support import threading_helper +from test.support import warnings_helper from io import BytesIO if support.PGO: @@ -92,7 +94,7 @@ def bind_af_aware(sock, addr): """Helper function to bind a socket according to its family.""" if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: # Make sure the path doesn't exist. - support.unlink(addr) + os_helper.unlink(addr) socket_helper.bind_unix_socket(sock, addr) else: sock.bind(addr) @@ -369,14 +371,14 @@ class DispatcherWithSendTests(unittest.TestCase): class FileWrapperTest(unittest.TestCase): def setUp(self): self.d = b"It's not dead, it's sleeping!" - with open(support.TESTFN, 'wb') as file: + with open(os_helper.TESTFN, 'wb') as file: file.write(self.d) def tearDown(self): - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def test_recv(self): - fd = os.open(support.TESTFN, os.O_RDONLY) + fd = os.open(os_helper.TESTFN, os.O_RDONLY) w = asyncore.file_wrapper(fd) os.close(fd) @@ -390,20 +392,20 @@ class FileWrapperTest(unittest.TestCase): def test_send(self): d1 = b"Come again?" d2 = b"I want to buy some cheese." - fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND) + fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND) w = asyncore.file_wrapper(fd) os.close(fd) w.write(d1) w.send(d2) w.close() - with open(support.TESTFN, 'rb') as file: + with open(os_helper.TESTFN, 'rb') as file: self.assertEqual(file.read(), self.d + d1 + d2) @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 'asyncore.file_dispatcher required') def test_dispatcher(self): - fd = os.open(support.TESTFN, os.O_RDONLY) + fd = os.open(os_helper.TESTFN, os.O_RDONLY) data = [] class FileDispatcher(asyncore.file_dispatcher): def handle_read(self): @@ -415,16 +417,16 @@ class FileWrapperTest(unittest.TestCase): def test_resource_warning(self): # Issue #11453 - fd = os.open(support.TESTFN, os.O_RDONLY) + fd = os.open(os_helper.TESTFN, os.O_RDONLY) f = asyncore.file_wrapper(fd) os.close(fd) - with support.check_warnings(('', ResourceWarning)): + with warnings_helper.check_warnings(('', ResourceWarning)): f = None support.gc_collect() def test_close_twice(self): - fd = os.open(support.TESTFN, os.O_RDONLY) + fd = os.open(os_helper.TESTFN, os.O_RDONLY) f = asyncore.file_wrapper(fd) os.close(fd) @@ -804,10 +806,10 @@ class TestAPI_UseIPv6Sockets(BaseTestAPI): class TestAPI_UseUnixSockets(BaseTestAPI): if HAS_UNIX_SOCKETS: family = socket.AF_UNIX - addr = support.TESTFN + addr = os_helper.TESTFN def tearDown(self): - support.unlink(self.addr) + os_helper.unlink(self.addr) BaseTestAPI.tearDown(self) class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase): diff --git a/Lib/test/test_binascii.py b/Lib/test/test_binascii.py index 4532795..4d1bf2c 100644 --- a/Lib/test/test_binascii.py +++ b/Lib/test/test_binascii.py @@ -4,7 +4,8 @@ import unittest import binascii import array import re -from test import support +from test.support import warnings_helper + # Note: "*_hex" functions are aliases for "(un)hexlify" b2a_functions = ['b2a_base64', 'b2a_hex', 'b2a_hqx', 'b2a_qp', 'b2a_uu', @@ -37,7 +38,7 @@ class BinASCIITest(unittest.TestCase): self.assertTrue(hasattr(getattr(binascii, name), '__call__')) self.assertRaises(TypeError, getattr(binascii, name)) - @support.ignore_warnings(category=DeprecationWarning) + @warnings_helper.ignore_warnings(category=DeprecationWarning) def test_returned_value(self): # Limit to the minimum of all limits (b2a_uu) MAX_ALL = 45 @@ -181,7 +182,7 @@ class BinASCIITest(unittest.TestCase): with self.assertRaises(TypeError): binascii.b2a_uu(b"", True) - @support.ignore_warnings(category=DeprecationWarning) + @warnings_helper.ignore_warnings(category=DeprecationWarning) def test_crc_hqx(self): crc = binascii.crc_hqx(self.type2test(b"Test the CRC-32 of"), 0) crc = binascii.crc_hqx(self.type2test(b" this string."), crc) @@ -201,7 +202,7 @@ class BinASCIITest(unittest.TestCase): self.assertRaises(TypeError, binascii.crc32) - @support.ignore_warnings(category=DeprecationWarning) + @warnings_helper.ignore_warnings(category=DeprecationWarning) def test_hqx(self): # Perform binhex4 style RLE-compression # Then calculate the hexbin4 binary-to-ASCII translation @@ -212,7 +213,7 @@ class BinASCIITest(unittest.TestCase): res = binascii.rledecode_hqx(b) self.assertEqual(res, self.rawdata) - @support.ignore_warnings(category=DeprecationWarning) + @warnings_helper.ignore_warnings(category=DeprecationWarning) def test_rle(self): # test repetition with a repetition longer than the limit of 255 data = (b'a' * 100 + b'b' + b'c' * 300) @@ -359,7 +360,7 @@ class BinASCIITest(unittest.TestCase): self.assertEqual(b2a_qp(type2test(b'a.\n')), b'a.\n') self.assertEqual(b2a_qp(type2test(b'.a')[:-1]), b'=2E') - @support.ignore_warnings(category=DeprecationWarning) + @warnings_helper.ignore_warnings(category=DeprecationWarning) def test_empty_string(self): # A test for SF bug #1022953. Make sure SystemError is not raised. empty = self.type2test(b'') @@ -384,7 +385,7 @@ class BinASCIITest(unittest.TestCase): # crc_hqx needs 2 arguments self.assertRaises(TypeError, binascii.crc_hqx, "test", 0) - @support.ignore_warnings(category=DeprecationWarning) + @warnings_helper.ignore_warnings(category=DeprecationWarning) def test_unicode_a2b(self): # Unicode strings are accepted by a2b_* functions. MAX_ALL = 45 diff --git a/Lib/test/test_bisect.py b/Lib/test/test_bisect.py index 580a963..fc7990d 100644 --- a/Lib/test/test_bisect.py +++ b/Lib/test/test_bisect.py @@ -1,10 +1,11 @@ import sys import unittest -from test import support +from test.support import import_helper from collections import UserList -py_bisect = support.import_fresh_module('bisect', blocked=['_bisect']) -c_bisect = support.import_fresh_module('bisect', fresh=['_bisect']) + +py_bisect = import_helper.import_fresh_module('bisect', blocked=['_bisect']) +c_bisect = import_helper.import_fresh_module('bisect', fresh=['_bisect']) class Range(object): """A trivial range()-like object that has an insert() method.""" diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py index 3bc1a3e..edb4ec0 100644 --- a/Lib/test/test_builtin.py +++ b/Lib/test/test_builtin.py @@ -26,10 +26,10 @@ from textwrap import dedent from types import AsyncGeneratorType, FunctionType from operator import neg from test import support -from test.support import ( - EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink, - maybe_get_event_loop_policy) +from test.support import (swap_attr, maybe_get_event_loop_policy) +from test.support.os_helper import (EnvironmentVarGuard, TESTFN, unlink) from test.support.script_helper import assert_python_ok +from test.support.warnings_helper import check_warnings from unittest.mock import MagicMock, patch try: import pty, signal diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 7da967e..a182b14 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1,8 +1,9 @@ from test import support +from test.support import import_helper from test.support import threading_helper # Skip tests if _multiprocessing wasn't built. -support.import_module('_multiprocessing') +import_helper.import_module('_multiprocessing') # Skip tests if sem_open implementation is broken. support.skip_if_broken_multiprocessing_synchronize() diff --git a/Lib/test/test_configparser.py b/Lib/test/test_configparser.py index f16da11..230ffc1 100644 --- a/Lib/test/test_configparser.py +++ b/Lib/test/test_configparser.py @@ -8,6 +8,7 @@ import unittest import warnings from test import support +from test.support import os_helper class SortedDict(collections.UserDict): @@ -1063,17 +1064,17 @@ class MultilineValuesTestCase(BasicTestCase, unittest.TestCase): cf.add_section(s) for j in range(10): cf.set(s, 'lovely_spam{}'.format(j), self.wonderful_spam) - with open(support.TESTFN, 'w') as f: + with open(os_helper.TESTFN, 'w') as f: cf.write(f) def tearDown(self): - os.unlink(support.TESTFN) + os.unlink(os_helper.TESTFN) def test_dominating_multiline_values(self): # We're reading from file because this is where the code changed # during performance updates in Python 3.2 cf_from_file = self.newconfig() - with open(support.TESTFN) as f: + with open(os_helper.TESTFN) as f: cf_from_file.read_file(f) self.assertEqual(cf_from_file.get('section8', 'lovely_spam4'), self.wonderful_spam.replace('\t\n', '\n')) diff --git a/Lib/test/test_coroutines.py b/Lib/test/test_coroutines.py index 8d1e069..145adb6 100644 --- a/Lib/test/test_coroutines.py +++ b/Lib/test/test_coroutines.py @@ -7,6 +7,8 @@ import types import unittest import warnings from test import support +from test.support import import_helper +from test.support import warnings_helper from test.support.script_helper import assert_python_ok @@ -2117,7 +2119,7 @@ class CoroAsyncIOCompatTest(unittest.TestCase): def test_asyncio_1(self): # asyncio cannot be imported when Python is compiled without thread # support - asyncio = support.import_module('asyncio') + asyncio = import_helper.import_module('asyncio') class MyException(Exception): pass @@ -2258,8 +2260,9 @@ class OriginTrackingTest(unittest.TestCase): try: warnings._warn_unawaited_coroutine = lambda coro: 1/0 with support.catch_unraisable_exception() as cm, \ - support.check_warnings((r'coroutine .* was never awaited', - RuntimeWarning)): + warnings_helper.check_warnings( + (r'coroutine .* was never awaited', + RuntimeWarning)): # only store repr() to avoid keeping the coroutine alive coro = corofn() coro_repr = repr(coro) @@ -2272,8 +2275,8 @@ class OriginTrackingTest(unittest.TestCase): self.assertEqual(cm.unraisable.exc_type, ZeroDivisionError) del warnings._warn_unawaited_coroutine - with support.check_warnings((r'coroutine .* was never awaited', - RuntimeWarning)): + with warnings_helper.check_warnings( + (r'coroutine .* was never awaited', RuntimeWarning)): corofn() support.gc_collect() diff --git a/Lib/test/test_curses.py b/Lib/test/test_curses.py index 5e619d1..2c6d14c 100644 --- a/Lib/test/test_curses.py +++ b/Lib/test/test_curses.py @@ -15,7 +15,8 @@ import sys import tempfile import unittest -from test.support import requires, import_module, verbose, SaveSignals +from test.support import requires, verbose, SaveSignals +from test.support.import_helper import import_module # Optionally test curses module. This currently requires that the # 'curses' resource be given on the regrtest command line using the -u diff --git a/Lib/test/test_datetime.py b/Lib/test/test_datetime.py index d659f36..bdb9f02 100644 --- a/Lib/test/test_datetime.py +++ b/Lib/test/test_datetime.py @@ -1,7 +1,9 @@ import unittest import sys -from test.support import import_fresh_module, run_unittest +from test.support import run_unittest +from test.support.import_helper import import_fresh_module + TESTS = 'test.datetimetester' diff --git a/Lib/test/test_mmap.py b/Lib/test/test_mmap.py index 5400f25..8f34c18 100644 --- a/Lib/test/test_mmap.py +++ b/Lib/test/test_mmap.py @@ -1,5 +1,6 @@ -from test.support import (TESTFN, import_module, unlink, - requires, _2G, _4G, gc_collect, cpython_only) +from test.support import (requires, _2G, _4G, gc_collect, cpython_only) +from test.support.import_helper import import_module +from test.support.os_helper import TESTFN, unlink import unittest import os import re diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 45553a6..c656790 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -9,6 +9,7 @@ import sys import time import unittest from test import support +from test.support import os_helper from test.support.script_helper import assert_python_ok, spawn_python try: import _testcapi @@ -154,7 +155,7 @@ class WakeupFDTests(unittest.TestCase): signal.set_wakeup_fd(signal.SIGINT, False) def test_invalid_fd(self): - fd = support.make_bad_fd() + fd = os_helper.make_bad_fd() self.assertRaises((ValueError, OSError), signal.set_wakeup_fd, fd) diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index de778d3..26eec96 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -4,8 +4,11 @@ import sys import unittest import unittest.mock from test import support +from test.support import import_helper +from test.support import os_helper from test.support import socket_helper from test.support import threading_helper +from test.support import warnings_helper import socket import select import time @@ -27,7 +30,7 @@ try: except ImportError: ctypes = None -ssl = support.import_module("ssl") +ssl = import_helper.import_module("ssl") from ssl import TLSVersion, _TLSContentType, _TLSMessageType @@ -571,7 +574,7 @@ class BasicSocketTests(unittest.TestCase): s = socket.socket(socket.AF_INET) ss = test_wrap_socket(s) wr = weakref.ref(ss) - with support.check_warnings(("", ResourceWarning)): + with warnings_helper.check_warnings(("", ResourceWarning)): del ss self.assertEqual(wr(), None) @@ -893,7 +896,7 @@ class BasicSocketTests(unittest.TestCase): self.assertEqual(len(paths), 6) self.assertIsInstance(paths, ssl.DefaultVerifyPaths) - with support.EnvironmentVarGuard() as env: + with os_helper.EnvironmentVarGuard() as env: env["SSL_CERT_DIR"] = CAPATH env["SSL_CERT_FILE"] = CERTFILE paths = ssl.get_default_verify_paths() @@ -1605,7 +1608,7 @@ class ContextTests(unittest.TestCase): @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars") def test_load_default_certs_env(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - with support.EnvironmentVarGuard() as env: + with os_helper.EnvironmentVarGuard() as env: env["SSL_CERT_DIR"] = CAPATH env["SSL_CERT_FILE"] = CERTFILE ctx.load_default_certs() @@ -1619,7 +1622,7 @@ class ContextTests(unittest.TestCase): stats = ctx.cert_store_stats() ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - with support.EnvironmentVarGuard() as env: + with os_helper.EnvironmentVarGuard() as env: env["SSL_CERT_DIR"] = CAPATH env["SSL_CERT_FILE"] = CERTFILE ctx.load_default_certs() @@ -4266,9 +4269,9 @@ class ThreadedTests(unittest.TestCase): def test_sendfile(self): TEST_DATA = b"x" * 512 - with open(support.TESTFN, 'wb') as f: + with open(os_helper.TESTFN, 'wb') as f: f.write(TEST_DATA) - self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) context = ssl.SSLContext(ssl.PROTOCOL_TLS) context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(SIGNING_CA) @@ -4277,7 +4280,7 @@ class ThreadedTests(unittest.TestCase): with server: with context.wrap_socket(socket.socket()) as s: s.connect((HOST, server.port)) - with open(support.TESTFN, 'rb') as file: + with open(os_helper.TESTFN, 'rb') as file: s.sendfile(file) self.assertEqual(s.recv(1024), TEST_DATA) @@ -4603,21 +4606,21 @@ requires_keylog = unittest.skipUnless( class TestSSLDebug(unittest.TestCase): - def keylog_lines(self, fname=support.TESTFN): + def keylog_lines(self, fname=os_helper.TESTFN): with open(fname) as f: return len(list(f)) @requires_keylog @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows") def test_keylog_defaults(self): - self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) self.assertEqual(ctx.keylog_filename, None) - self.assertFalse(os.path.isfile(support.TESTFN)) - ctx.keylog_filename = support.TESTFN - self.assertEqual(ctx.keylog_filename, support.TESTFN) - self.assertTrue(os.path.isfile(support.TESTFN)) + self.assertFalse(os.path.isfile(os_helper.TESTFN)) + ctx.keylog_filename = os_helper.TESTFN + self.assertEqual(ctx.keylog_filename, os_helper.TESTFN) + self.assertTrue(os.path.isfile(os_helper.TESTFN)) self.assertEqual(self.keylog_lines(), 1) ctx.keylog_filename = None @@ -4626,7 +4629,7 @@ class TestSSLDebug(unittest.TestCase): with self.assertRaises((IsADirectoryError, PermissionError)): # Windows raises PermissionError ctx.keylog_filename = os.path.dirname( - os.path.abspath(support.TESTFN)) + os.path.abspath(os_helper.TESTFN)) with self.assertRaises(TypeError): ctx.keylog_filename = 1 @@ -4634,10 +4637,10 @@ class TestSSLDebug(unittest.TestCase): @requires_keylog @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows") def test_keylog_filename(self): - self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) client_context, server_context, hostname = testing_context() - client_context.keylog_filename = support.TESTFN + client_context.keylog_filename = os_helper.TESTFN server = ThreadedEchoServer(context=server_context, chatty=False) with server: with client_context.wrap_socket(socket.socket(), @@ -4647,7 +4650,7 @@ class TestSSLDebug(unittest.TestCase): self.assertEqual(self.keylog_lines(), 6) client_context.keylog_filename = None - server_context.keylog_filename = support.TESTFN + server_context.keylog_filename = os_helper.TESTFN server = ThreadedEchoServer(context=server_context, chatty=False) with server: with client_context.wrap_socket(socket.socket(), @@ -4655,8 +4658,8 @@ class TestSSLDebug(unittest.TestCase): s.connect((HOST, server.port)) self.assertGreaterEqual(self.keylog_lines(), 11) - client_context.keylog_filename = support.TESTFN - server_context.keylog_filename = support.TESTFN + client_context.keylog_filename = os_helper.TESTFN + server_context.keylog_filename = os_helper.TESTFN server = ThreadedEchoServer(context=server_context, chatty=False) with server: with client_context.wrap_socket(socket.socket(), @@ -4672,19 +4675,19 @@ class TestSSLDebug(unittest.TestCase): "test is not compatible with ignore_environment") @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows") def test_keylog_env(self): - self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) with unittest.mock.patch.dict(os.environ): - os.environ['SSLKEYLOGFILE'] = support.TESTFN - self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN) + os.environ['SSLKEYLOGFILE'] = os_helper.TESTFN + self.assertEqual(os.environ['SSLKEYLOGFILE'], os_helper.TESTFN) ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) self.assertEqual(ctx.keylog_filename, None) ctx = ssl.create_default_context() - self.assertEqual(ctx.keylog_filename, support.TESTFN) + self.assertEqual(ctx.keylog_filename, os_helper.TESTFN) ctx = ssl._create_stdlib_context() - self.assertEqual(ctx.keylog_filename, support.TESTFN) + self.assertEqual(ctx.keylog_filename, os_helper.TESTFN) def test_msg_callback(self): client_context, server_context, hostname = testing_context() diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 3ddeb97..4bf7248 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -11,6 +11,7 @@ import unittest.mock import tarfile from test import support +from test.support import os_helper from test.support import script_helper # Check for our compression modules. @@ -30,7 +31,7 @@ except ImportError: def sha256sum(data): return sha256(data).hexdigest() -TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" +TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir" tarextdir = TEMPDIR + '-extract-test' tarname = support.findfile("testtar.tar") gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") @@ -575,21 +576,21 @@ class MiscReadTestBase(CommonReadTest): @unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") - @support.skip_unless_symlink + @os_helper.skip_unless_symlink def test_extract_hardlink(self): # Test hardlink extraction (e.g. bug #857297). with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: tar.extract("ustar/regtype", TEMPDIR) - self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) + self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype")) tar.extract("ustar/lnktype", TEMPDIR) - self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) + self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: data = f.read() self.assertEqual(sha256sum(data), sha256_regtype) tar.extract("ustar/symtype", TEMPDIR) - self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) + self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype")) with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: data = f.read() self.assertEqual(sha256sum(data), sha256_regtype) @@ -622,7 +623,7 @@ class MiscReadTestBase(CommonReadTest): self.assertEqual(tarinfo.mtime, file_mtime, errmsg) finally: tar.close() - support.rmtree(DIR) + os_helper.rmtree(DIR) def test_extract_directory(self): dirtype = "ustar/dirtype" @@ -637,11 +638,11 @@ class MiscReadTestBase(CommonReadTest): if sys.platform != "win32": self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) finally: - support.rmtree(DIR) + os_helper.rmtree(DIR) def test_extractall_pathlike_name(self): DIR = pathlib.Path(TEMPDIR) / "extractall" - with support.temp_dir(DIR), \ + with os_helper.temp_dir(DIR), \ tarfile.open(tarname, encoding="iso8859-1") as tar: directories = [t for t in tar if t.isdir()] tar.extractall(DIR, directories) @@ -652,7 +653,7 @@ class MiscReadTestBase(CommonReadTest): def test_extract_pathlike_name(self): dirtype = "ustar/dirtype" DIR = pathlib.Path(TEMPDIR) / "extractall" - with support.temp_dir(DIR), \ + with os_helper.temp_dir(DIR), \ tarfile.open(tarname, encoding="iso8859-1") as tar: tarinfo = tar.getmember(dirtype) tar.extract(tarinfo, path=DIR) @@ -676,7 +677,7 @@ class MiscReadTestBase(CommonReadTest): else: self.fail("ReadError not raised") finally: - support.unlink(empty) + os_helper.unlink(empty) def test_parallel_iteration(self): # Issue #16601: Restarting iteration over tarfile continued @@ -1026,7 +1027,7 @@ class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): fobj.write(b'x' * 4096) fobj.truncate() s = os.stat(name) - support.unlink(name) + os_helper.unlink(name) return (s.st_blocks * 512 < s.st_size) else: return False @@ -1171,7 +1172,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - support.rmdir(path) + os_helper.rmdir(path) # mock the following: # os.listdir: so we know that files are in the wrong order @@ -1193,9 +1194,9 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - support.unlink(os.path.join(path, "1")) - support.unlink(os.path.join(path, "2")) - support.rmdir(path) + os_helper.unlink(os.path.join(path, "1")) + os_helper.unlink(os.path.join(path, "2")) + os_helper.rmdir(path) def test_gettarinfo_pathlike_name(self): with tarfile.open(tmpname, self.mode) as tar: @@ -1229,10 +1230,10 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - support.unlink(target) - support.unlink(link) + os_helper.unlink(target) + os_helper.unlink(link) - @support.skip_unless_symlink + @os_helper.skip_unless_symlink def test_symlink_size(self): path = os.path.join(TEMPDIR, "symlink") os.symlink("link_target", path) @@ -1244,7 +1245,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - support.unlink(path) + os_helper.unlink(path) def test_add_self(self): # Test for #1257255. @@ -1257,7 +1258,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): self.assertEqual(tar.getnames(), [], "added the archive to itself") - with support.change_cwd(TEMPDIR): + with os_helper.change_cwd(TEMPDIR): tar.add(dstname) self.assertEqual(tar.getnames(), [], "added the archive to itself") @@ -1270,7 +1271,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): try: for name in ("foo", "bar", "baz"): name = os.path.join(tempdir, name) - support.create_empty_file(name) + os_helper.create_empty_file(name) def filter(tarinfo): if os.path.basename(tarinfo.name) == "bar": @@ -1298,7 +1299,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): finally: tar.close() finally: - support.rmtree(tempdir) + os_helper.rmtree(tempdir) # Guarantee that stored pathnames are not modified. Don't # remove ./ or ../ or double slashes. Still make absolute @@ -1309,7 +1310,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): # and compare the stored name with the original. foo = os.path.join(TEMPDIR, "foo") if not dir: - support.create_empty_file(foo) + os_helper.create_empty_file(foo) else: os.mkdir(foo) @@ -1326,14 +1327,14 @@ class WriteTest(WriteTestBase, unittest.TestCase): tar.close() if not dir: - support.unlink(foo) + os_helper.unlink(foo) else: - support.rmdir(foo) + os_helper.rmdir(foo) self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) - @support.skip_unless_symlink + @os_helper.skip_unless_symlink def test_extractall_symlinks(self): # Test if extractall works properly when tarfile contains symlinks tempdir = os.path.join(TEMPDIR, "testsymlinks") @@ -1356,8 +1357,8 @@ class WriteTest(WriteTestBase, unittest.TestCase): except OSError: self.fail("extractall failed with symlinked files") finally: - support.unlink(temparchive) - support.rmtree(tempdir) + os_helper.unlink(temparchive) + os_helper.rmtree(tempdir) def test_pathnames(self): self._test_pathname("foo") @@ -1385,7 +1386,7 @@ class WriteTest(WriteTestBase, unittest.TestCase): def test_cwd(self): # Test adding the current working directory. - with support.change_cwd(TEMPDIR): + with os_helper.change_cwd(TEMPDIR): tar = tarfile.open(tmpname, self.mode) try: tar.add(".") @@ -1453,7 +1454,7 @@ class StreamWriteTest(WriteTestBase, unittest.TestCase): # Test for issue #8464: Create files with correct # permissions. if os.path.exists(tmpname): - support.unlink(tmpname) + os_helper.unlink(tmpname) original_umask = os.umask(0o022) try: @@ -1599,7 +1600,7 @@ class DeviceHeaderTest(WriteTestBase, unittest.TestCase): self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2) self.assertEqual(buf_reg[device_headers], b"\0" * 16) finally: - support.rmtree(tempdir) + os_helper.rmtree(tempdir) class CreateTest(WriteTestBase, unittest.TestCase): @@ -1609,7 +1610,7 @@ class CreateTest(WriteTestBase, unittest.TestCase): file_path = os.path.join(TEMPDIR, "spameggs42") def setUp(self): - support.unlink(tmpname) + os_helper.unlink(tmpname) @classmethod def setUpClass(cls): @@ -1618,7 +1619,7 @@ class CreateTest(WriteTestBase, unittest.TestCase): @classmethod def tearDownClass(cls): - support.unlink(cls.file_path) + os_helper.unlink(cls.file_path) def test_create(self): with tarfile.open(tmpname, self.mode) as tobj: @@ -1733,8 +1734,8 @@ class HardlinkTest(unittest.TestCase): def tearDown(self): self.tar.close() - support.unlink(self.foo) - support.unlink(self.bar) + os_helper.unlink(self.foo) + os_helper.unlink(self.bar) def test_add_twice(self): # The same name will be added as a REGTYPE every @@ -2043,7 +2044,7 @@ class AppendTestBase: def setUp(self): self.tarname = tmpname if os.path.exists(self.tarname): - support.unlink(self.tarname) + os_helper.unlink(self.tarname) def _create_testtar(self, mode="w:"): with tarfile.open(tarname, encoding="iso8859-1") as src: @@ -2288,7 +2289,7 @@ class CommandLineTest(unittest.TestCase): files = [support.findfile('tokenize_tests.txt'), support.findfile('tokenize_tests-no-coding-cookie-' 'and-utf8-bom-sig-only.txt')] - self.addCleanup(support.unlink, tar_name) + self.addCleanup(os_helper.unlink, tar_name) with tarfile.open(tar_name, 'w') as tf: for tardata in files: tf.add(tardata, arcname=os.path.basename(tardata)) @@ -2334,7 +2335,7 @@ class CommandLineTest(unittest.TestCase): self.assertEqual(out, b'') self.assertEqual(rc, 1) finally: - support.unlink(tmpname) + os_helper.unlink(tmpname) def test_list_command(self): for tar_name in testtarnames: @@ -2376,7 +2377,7 @@ class CommandLineTest(unittest.TestCase): with tarfile.open(tmpname) as tar: tar.getmembers() finally: - support.unlink(tmpname) + os_helper.unlink(tmpname) def test_create_command_verbose(self): files = [support.findfile('tokenize_tests.txt'), @@ -2390,7 +2391,7 @@ class CommandLineTest(unittest.TestCase): with tarfile.open(tmpname) as tar: tar.getmembers() finally: - support.unlink(tmpname) + os_helper.unlink(tmpname) def test_create_command_dotless_filename(self): files = [support.findfile('tokenize_tests.txt')] @@ -2400,7 +2401,7 @@ class CommandLineTest(unittest.TestCase): with tarfile.open(dotlessname) as tar: tar.getmembers() finally: - support.unlink(dotlessname) + os_helper.unlink(dotlessname) def test_create_command_dot_started_filename(self): tar_name = os.path.join(TEMPDIR, ".testtar") @@ -2411,7 +2412,7 @@ class CommandLineTest(unittest.TestCase): with tarfile.open(tar_name) as tar: tar.getmembers() finally: - support.unlink(tar_name) + os_helper.unlink(tar_name) def test_create_command_compressed(self): files = [support.findfile('tokenize_tests.txt'), @@ -2426,41 +2427,41 @@ class CommandLineTest(unittest.TestCase): with filetype.taropen(tar_name) as tar: tar.getmembers() finally: - support.unlink(tar_name) + os_helper.unlink(tar_name) def test_extract_command(self): self.make_simple_tarfile(tmpname) for opt in '-e', '--extract': try: - with support.temp_cwd(tarextdir): + with os_helper.temp_cwd(tarextdir): out = self.tarfilecmd(opt, tmpname) self.assertEqual(out, b'') finally: - support.rmtree(tarextdir) + os_helper.rmtree(tarextdir) def test_extract_command_verbose(self): self.make_simple_tarfile(tmpname) for opt in '-v', '--verbose': try: - with support.temp_cwd(tarextdir): + with os_helper.temp_cwd(tarextdir): out = self.tarfilecmd(opt, '-e', tmpname, PYTHONIOENCODING='utf-8') self.assertIn(b' file is extracted.', out) finally: - support.rmtree(tarextdir) + os_helper.rmtree(tarextdir) def test_extract_command_different_directory(self): self.make_simple_tarfile(tmpname) try: - with support.temp_cwd(tarextdir): + with os_helper.temp_cwd(tarextdir): out = self.tarfilecmd('-e', tmpname, 'spamdir') self.assertEqual(out, b'') finally: - support.rmtree(tarextdir) + os_helper.rmtree(tarextdir) def test_extract_command_invalid_file(self): zipname = support.findfile('zipdir.zip') - with support.temp_cwd(tarextdir): + with os_helper.temp_cwd(tarextdir): rc, out, err = self.tarfilecmd_failure('-e', zipname) self.assertIn(b' is not a tar archive.', err) self.assertEqual(out, b'') @@ -2723,7 +2724,7 @@ class NumericOwnerTest(unittest.TestCase): def setUpModule(): - support.unlink(TEMPDIR) + os_helper.unlink(TEMPDIR) os.makedirs(TEMPDIR) global testtarnames @@ -2734,14 +2735,14 @@ def setUpModule(): # Create compressed tarfiles. for c in GzipTest, Bz2Test, LzmaTest: if c.open: - support.unlink(c.tarname) + os_helper.unlink(c.tarname) testtarnames.append(c.tarname) with c.open(c.tarname, "wb") as tar: tar.write(data) def tearDownModule(): if os.path.exists(TEMPDIR): - support.rmtree(TEMPDIR) + os_helper.rmtree(TEMPDIR) if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index ad82e30..47e131a 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -4,7 +4,8 @@ Tests for the threading module. import test.support from test.support import threading_helper -from test.support import verbose, import_module, cpython_only +from test.support import verbose, cpython_only +from test.support.import_helper import import_module from test.support.script_helper import assert_python_ok, assert_python_failure import random diff --git a/Lib/test/test_trace.py b/Lib/test/test_trace.py index c03982b..7547855 100644 --- a/Lib/test/test_trace.py +++ b/Lib/test/test_trace.py @@ -1,6 +1,7 @@ import os import sys -from test.support import TESTFN, TESTFN_UNICODE, FS_NONASCII, rmtree, unlink, captured_stdout +from test.support import captured_stdout +from test.support.os_helper import (TESTFN, rmtree, unlink) from test.support.script_helper import assert_python_ok, assert_python_failure import textwrap import unittest diff --git a/Lib/test/test_tracemalloc.py b/Lib/test/test_tracemalloc.py index c5ae4e6..a0037f8 100644 --- a/Lib/test/test_tracemalloc.py +++ b/Lib/test/test_tracemalloc.py @@ -7,6 +7,7 @@ from unittest.mock import patch from test.support.script_helper import (assert_python_ok, assert_python_failure, interpreter_requires_environment) from test import support +from test.support import os_helper try: import _testcapi @@ -287,11 +288,11 @@ class TestTracemallocEnabled(unittest.TestCase): self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10) # write on disk - snapshot.dump(support.TESTFN) - self.addCleanup(support.unlink, support.TESTFN) + snapshot.dump(os_helper.TESTFN) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) # load from disk - snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) + snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) self.assertEqual(snapshot2.traces, snapshot.traces) # tracemalloc must be tracing memory allocations to take a snapshot @@ -306,11 +307,11 @@ class TestTracemallocEnabled(unittest.TestCase): # take a snapshot with a new attribute snapshot = tracemalloc.take_snapshot() snapshot.test_attr = "new" - snapshot.dump(support.TESTFN) - self.addCleanup(support.unlink, support.TESTFN) + snapshot.dump(os_helper.TESTFN) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) # load() should recreate the attribute - snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) + snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) self.assertEqual(snapshot2.test_attr, "new") def fork_child(self): diff --git a/Lib/test/test_xml_etree_c.py b/Lib/test/test_xml_etree_c.py index e26e171..e68613b 100644 --- a/Lib/test/test_xml_etree_c.py +++ b/Lib/test/test_xml_etree_c.py @@ -2,7 +2,7 @@ import io import struct from test import support -from test.support import import_fresh_module +from test.support.import_helper import import_fresh_module import types import unittest diff --git a/Lib/unittest/test/test_discovery.py b/Lib/unittest/test/test_discovery.py index 16e081e..9d502c5 100644 --- a/Lib/unittest/test/test_discovery.py +++ b/Lib/unittest/test/test_discovery.py @@ -5,6 +5,7 @@ import sys import types import pickle from test import support +from test.support import import_helper import test.test_importlib.util import unittest @@ -848,7 +849,7 @@ class TestDiscovery(unittest.TestCase): with unittest.mock.patch('builtins.__import__', _import): # Since loader.discover() can modify sys.path, restore it when done. - with support.DirsOnSysPath(): + with import_helper.DirsOnSysPath(): # Make sure to remove 'package' from sys.modules when done. with test.test_importlib.util.uncache('package'): suite = loader.discover('package') @@ -865,7 +866,7 @@ class TestDiscovery(unittest.TestCase): with unittest.mock.patch('builtins.__import__', _import): # Since loader.discover() can modify sys.path, restore it when done. - with support.DirsOnSysPath(): + with import_helper.DirsOnSysPath(): # Make sure to remove 'package' from sys.modules when done. with test.test_importlib.util.uncache('package'): with self.assertRaises(TypeError) as cm: diff --git a/Lib/unittest/test/test_result.py b/Lib/unittest/test/test_result.py index 0ffb87b..a4af67b 100644 --- a/Lib/unittest/test/test_result.py +++ b/Lib/unittest/test/test_result.py @@ -2,7 +2,7 @@ import io import sys import textwrap -from test import support +from test.support import warnings_helper import traceback import unittest @@ -458,8 +458,8 @@ OldResult = type('OldResult', (object,), classDict) class Test_OldTestResult(unittest.TestCase): def assertOldResultWarning(self, test, failures): - with support.check_warnings(("TestResult has no add.+ method,", - RuntimeWarning)): + with warnings_helper.check_warnings( + ("TestResult has no add.+ method,", RuntimeWarning)): result = OldResult() test.run(result) self.assertEqual(len(result.failures), failures) -- cgit v0.12