diff options
Diffstat (limited to 'Lib/test/support.py')
-rw-r--r-- | Lib/test/support.py | 322 |
1 files changed, 226 insertions, 96 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py index 79293ed..c92fa00 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -15,7 +15,7 @@ import shutil import warnings import unittest import importlib -import collections +import collections.abc import re import subprocess import imp @@ -23,6 +23,7 @@ import time import sysconfig import fnmatch import logging.handlers +import struct try: import _thread, threading @@ -34,26 +35,34 @@ try: except ImportError: multiprocessing = None +try: + import zlib +except ImportError: + zlib = None + +try: + import bz2 +except ImportError: + bz2 = None __all__ = [ "Error", "TestFailed", "ResourceDenied", "import_module", "verbose", "use_resources", "max_memuse", "record_original_stdout", "get_original_stdout", "unload", "unlink", "rmtree", "forget", - "is_resource_enabled", "requires", "requires_mac_ver", - "find_unused_port", "bind_port", - "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd", - "findfile", "sortdict", "check_syntax_error", "open_urlresource", - "check_warnings", "CleanImport", "EnvironmentVarGuard", - "TransientResource", "captured_output", "captured_stdout", - "captured_stdin", "captured_stderr", - "time_out", "socket_peer_reset", "ioerror_peer_reset", - "run_with_locale", 'temp_umask', "transient_internet", - "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", - "run_unittest", "run_doctest", "threading_setup", "threading_cleanup", - "reap_children", "cpython_only", "check_impl_detail", "get_attribute", - "swap_item", "swap_attr", "requires_IEEE_754", + "is_resource_enabled", "requires", "requires_freebsd_version", + "requires_linux_version", "requires_mac_ver", "find_unused_port", "bind_port", + "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd", + "findfile", "create_empty_file", "sortdict", "check_syntax_error", "open_urlresource", + "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource", + "captured_stdout", "captured_stdin", "captured_stderr", "time_out", + "socket_peer_reset", "ioerror_peer_reset", "run_with_locale", 'temp_umask', + "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest", + "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", + "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", + "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754", "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", - "import_fresh_module", "failfast", "run_with_tz" + "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast", + "anticipate_failure", "run_with_tz", "requires_bz2" ] class Error(Exception): @@ -124,6 +133,17 @@ def _save_and_block_module(name, orig_modules): return saved +def anticipate_failure(condition): + """Decorator to mark a test that is known to be broken in some cases + + Any use of this decorator should have a comment identifying the + associated tracker issue. + """ + if condition: + return unittest.expectedFailure + return lambda f: f + + def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): """Imports and returns a module, deliberately bypassing the sys.modules cache and importing a fresh copy of the module. Once the import is complete, @@ -167,8 +187,7 @@ def get_attribute(obj, name): try: attribute = getattr(obj, name) except AttributeError: - raise unittest.SkipTest("module %s has no attribute %s" % ( - obj.__name__, name)) + raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) else: return attribute @@ -209,8 +228,7 @@ def rmtree(path): try: shutil.rmtree(path) except OSError as error: - # Unix returns ENOENT, Windows returns ESRCH. - if error.errno not in (errno.ENOENT, errno.ESRCH): + if error.errno != errno.ENOENT: raise def make_legacy_pyc(source): @@ -295,9 +313,52 @@ def requires(resource, msg=None): return if not is_resource_enabled(resource): if msg is None: - msg = "Use of the `%s' resource not enabled" % resource + msg = "Use of the %r resource not enabled" % resource raise ResourceDenied(msg) +def _requires_unix_version(sysname, min_version): + """Decorator raising SkipTest if the OS is `sysname` and the version is less + than `min_version`. + + For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if + the FreeBSD version is less than 7.2. + """ + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + if platform.system() == sysname: + version_txt = platform.release().split('-', 1)[0] + try: + version = tuple(map(int, version_txt.split('.'))) + except ValueError: + pass + else: + if version < min_version: + min_version_txt = '.'.join(map(str, min_version)) + raise unittest.SkipTest( + "%s version %s or higher required, not %s" + % (sysname, min_version_txt, version_txt)) + return wrapper + return decorator + +def requires_freebsd_version(*min_version): + """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is + less than `min_version`. + + For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD + version is less than 7.2. + """ + return _requires_unix_version('FreeBSD', min_version) + +def requires_linux_version(*min_version): + """Decorator raising SkipTest if the OS is Linux and the Linux version is + less than `min_version`. + + For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux + version is less than 2.6.32. + """ + return _requires_unix_version('Linux', min_version) + def requires_mac_ver(*min_version): """Decorator raising SkipTest if the OS is Mac OS X and the OS X version if less than min_version. @@ -325,6 +386,7 @@ def requires_mac_ver(*min_version): return wrapper return decorator + HOST = 'localhost' def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): @@ -420,29 +482,37 @@ def bind_port(sock, host=HOST): port = sock.getsockname()[1] return port -FUZZ = 1e-6 - -def fcmp(x, y): # fuzzy comparison function - if isinstance(x, float) or isinstance(y, float): +def _is_ipv6_enabled(): + """Check whether IPv6 is enabled on this host.""" + if socket.has_ipv6: try: - fuzz = (abs(x) + abs(y)) * FUZZ - if abs(x-y) <= fuzz: - return 0 - except: + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.bind(('::1', 0)) + except (socket.error, socket.gaierror): pass - elif type(x) == type(y) and isinstance(x, (tuple, list)): - for i in range(min(len(x), len(y))): - outcome = fcmp(x[i], y[i]) - if outcome != 0: - return outcome - return (len(x) > len(y)) - (len(x) < len(y)) - return (x > y) - (x < y) + else: + sock.close() + return True + return False + +IPV6_ENABLED = _is_ipv6_enabled() + + +# A constant likely larger than the underlying OS pipe buffer size. +# Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe +# buffer size: take 1M to be sure. +PIPE_MAX_SIZE = 1024 * 1024 + # decorator for skipping tests on non-IEEE 754 platforms requires_IEEE_754 = unittest.skipUnless( float.__getformat__("double").startswith("IEEE"), "test requires IEEE 754 doubles") +requires_zlib = unittest.skipUnless(zlib, 'requires zlib') + +requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') + is_jython = sys.platform.startswith('java') # Filename used for testing @@ -543,14 +613,15 @@ def temp_cwd(name='tempcwd', quiet=False, path=None): rmtree(name) -@contextlib.contextmanager -def temp_umask(umask): - """Context manager that temporarily sets the process umask.""" - oldmask = os.umask(umask) - try: - yield - finally: - os.umask(oldmask) +if hasattr(os, "umask"): + @contextlib.contextmanager + def temp_umask(umask): + """Context manager that temporarily sets the process umask.""" + oldmask = os.umask(umask) + try: + yield + finally: + os.umask(oldmask) def findfile(file, here=__file__, subdir=None): @@ -568,6 +639,11 @@ def findfile(file, here=__file__, subdir=None): if os.path.exists(fn): return fn return file +def create_empty_file(filename): + """Create an empty file. If the file already exists, truncate it.""" + fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + os.close(fd) + def sortdict(dict): "Like repr(dict), but in sorted order." items = sorted(dict.items()) @@ -632,7 +708,7 @@ def open_urlresource(url, *args, **kw): f = check_valid_file(fn) if f is not None: return f - raise TestFailed('invalid resource "%s"' % fn) + raise TestFailed('invalid resource %r' % fn) class WarningsRecorder(object): @@ -753,7 +829,7 @@ class CleanImport(object): sys.modules.update(self.original_modules) -class EnvironmentVarGuard(collections.MutableMapping): +class EnvironmentVarGuard(collections.abc.MutableMapping): """Class to help protect the environment variable properly. Can be used as a context manager.""" @@ -884,7 +960,7 @@ def transient_internet(resource_name, *, timeout=30.0, errnos=()): ('WSANO_DATA', 11004), ] - denied = ResourceDenied("Resource '%s' is not available" % resource_name) + denied = ResourceDenied("Resource %r is not available" % resource_name) captured_errnos = errnos gai_errnos = [] if not captured_errnos: @@ -973,6 +1049,16 @@ def gc_collect(): gc.collect() gc.collect() +@contextlib.contextmanager +def disable_gc(): + have_gc = gc.isenabled() + gc.disable() + try: + yield + finally: + if have_gc: + gc.enable() + def python_is_optimized(): """Find if Python was built with optimizations.""" @@ -981,7 +1067,7 @@ def python_is_optimized(): for opt in cflags.split(): if opt.startswith('-O'): final_opt = opt - return final_opt and final_opt != '-O0' + return final_opt != '' and final_opt != '-O0' #======================================================================= @@ -1087,41 +1173,35 @@ def set_memlimit(limit): raise ValueError('Memory limit %r too low to be useful' % (limit,)) max_memuse = memlimit -def _memory_watchdog(start_evt, finish_evt, period=10.0): - """A function which periodically watches the process' memory consumption +class _MemoryWatchdog: + """An object which periodically watches the process' memory consumption and prints it out. """ - # XXX: because of the GIL, and because the very long operations tested - # in most bigmem tests are uninterruptible, the loop below gets woken up - # much less often than expected. - # The polling code should be rewritten in raw C, without holding the GIL, - # and push results onto an anonymous pipe. - try: - page_size = os.sysconf('SC_PAGESIZE') - except (ValueError, AttributeError): + + def __init__(self): + self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) + self.started = False + + def start(self): try: - page_size = os.sysconf('SC_PAGE_SIZE') - except (ValueError, AttributeError): - page_size = 4096 - procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) - try: - f = open(procfile, 'rb') - except IOError as e: - warnings.warn('/proc not available for stats: {}'.format(e), - RuntimeWarning) - sys.stderr.flush() - return - with f: - start_evt.set() - old_data = -1 - while not finish_evt.wait(period): - f.seek(0) - statm = f.read().decode('ascii') - data = int(statm.split()[5]) - if data != old_data: - old_data = data - print(" ... process data size: {data:.1f}G" - .format(data=data * page_size / (1024 ** 3))) + f = open(self.procfile, 'r') + except OSError as e: + warnings.warn('/proc not available for stats: {}'.format(e), + RuntimeWarning) + sys.stderr.flush() + return + + watchdog_script = findfile("memory_watchdog.py") + self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], + stdin=f, stderr=subprocess.DEVNULL) + f.close() + self.started = True + + def stop(self): + if self.started: + self.mem_watchdog.terminate() + self.mem_watchdog.wait() + def bigmemtest(size, memuse, dry_run=True): """Decorator for bigmem tests. @@ -1148,27 +1228,20 @@ def bigmemtest(size, memuse, dry_run=True): "not enough memory: %.1fG minimum needed" % (size * memuse / (1024 ** 3))) - if real_max_memuse and verbose and threading: + if real_max_memuse and verbose: print() print(" ... expected peak memory use: {peak:.1f}G" .format(peak=size * memuse / (1024 ** 3))) - sys.stdout.flush() - start_evt = threading.Event() - finish_evt = threading.Event() - t = threading.Thread(target=_memory_watchdog, - args=(start_evt, finish_evt, 0.5)) - t.daemon = True - t.start() - start_evt.set() + watchdog = _MemoryWatchdog() + watchdog.start() else: - t = None + watchdog = None try: return f(self, maxsize) finally: - if t: - finish_evt.set() - t.join() + if watchdog: + watchdog.stop() wrapper.size = size wrapper.memuse = memuse @@ -1250,6 +1323,33 @@ def check_impl_detail(**guards): return guards.get(platform.python_implementation().lower(), default) +def no_tracing(func): + """Decorator to temporarily turn off tracing for the duration of a test.""" + if not hasattr(sys, 'gettrace'): + return func + else: + @functools.wraps(func) + def wrapper(*args, **kwargs): + original_trace = sys.gettrace() + try: + sys.settrace(None) + return func(*args, **kwargs) + finally: + sys.settrace(original_trace) + return wrapper + + +def refcount_test(test): + """Decorator for tests which involve reference counting. + + To start, the decorator does not run the test if is not run by CPython. + After that, any trace function is unset during the test to prevent + unexpected refcounts caused by the trace function. + + """ + return no_tracing(cpython_only(test)) + + def _filter_suite(suite, pred): """Recursively filter test cases in a suite based on a predicate.""" newtests = [] @@ -1262,7 +1362,6 @@ def _filter_suite(suite, pred): newtests.append(test) suite._tests = newtests - def _run_suite(suite): """Run tests from a unittest.TestSuite-derived class.""" if verbose: @@ -1311,7 +1410,7 @@ def run_unittest(*classes): #======================================================================= # doctest driver. -def run_doctest(module, verbosity=None): +def run_doctest(module, verbosity=None, optionflags=0): """Run doctest on the given module. Return (#failures, #tests). If optional argument verbosity is not specified (or is None), pass @@ -1326,7 +1425,7 @@ def run_doctest(module, verbosity=None): else: verbosity = None - f, t = doctest.testmod(module, verbose=verbosity) + f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) if f: raise TestFailed("%d of %d doctests failed" % (f, t)) if verbose: @@ -1489,7 +1588,7 @@ def strip_python_stderr(stderr): def args_from_interpreter_flags(): """Return a list of command-line arguments reproducing the current - settings in sys.flags.""" + settings in sys.flags and sys.warnoptions.""" flag_opt_map = { 'bytes_warning': 'b', 'dont_write_bytecode': 'B', @@ -1505,6 +1604,8 @@ def args_from_interpreter_flags(): v = getattr(sys.flags, flag) if v > 0: args.append('-' + opt * v) + for opt in sys.warnoptions: + args.append('-W' + opt) return args #============================================================ @@ -1595,6 +1696,35 @@ def skip_unless_symlink(test): msg = "Requires functional symlink implementation" return test if ok else unittest.skip(msg)(test) +_can_xattr = None +def can_xattr(): + global _can_xattr + if _can_xattr is not None: + return _can_xattr + if not hasattr(os, "setxattr"): + can = False + else: + try: + with open(TESTFN, "wb") as fp: + try: + os.fsetxattr(fp.fileno(), b"user.test", b"") + # Kernels < 2.6.39 don't respect setxattr flags. + kernel_version = platform.release() + m = re.match("2.6.(\d{1,2})", kernel_version) + can = m is None or int(m.group(1)) >= 39 + except OSError: + can = False + finally: + unlink(TESTFN) + _can_xattr = can + return can + +def skip_unless_xattr(test): + """Skip decorator for tests that require functional extended attributes""" + ok = can_xattr() + msg = "no non-broken extended attribute support" + return test if ok else unittest.skip(msg)(test) + def patch(test_instance, object_to_patch, attr_name, new_value): """Override 'object_to_patch'.'attr_name' with 'new_value'. |