diff options
Diffstat (limited to 'Lib/test/support.py')
| -rw-r--r-- | Lib/test/support.py | 178 | 
1 files changed, 132 insertions, 46 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py index e82a4f9..b3989e5 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -15,7 +15,7 @@ import shutil  import warnings  import unittest  import importlib -import collections +import collections.abc  import re  import subprocess  import imp @@ -35,25 +35,28 @@ except ImportError:      multiprocessing = None +try: +    import zlib +except ImportError: +    zlib = None +  __all__ = [      "Error", "TestFailed", "ResourceDenied", "import_module",      "verbose", "use_resources", "max_memuse", "record_original_stdout",      "get_original_stdout", "unload", "unlink", "rmtree", "forget", -    "is_resource_enabled", "requires", "requires_mac_ver", -    "find_unused_port", "bind_port", -    "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd", -    "findfile", "sortdict", "check_syntax_error", "open_urlresource", -    "check_warnings", "CleanImport", "EnvironmentVarGuard", -    "TransientResource", "captured_output", "captured_stdout", -    "captured_stdin", "captured_stderr", -    "time_out", "socket_peer_reset", "ioerror_peer_reset", -    "run_with_locale", 'temp_umask', "transient_internet", -    "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", -    "run_unittest", "run_doctest", "threading_setup", "threading_cleanup", -    "reap_children", "cpython_only", "check_impl_detail", "get_attribute", -    "swap_item", "swap_attr", "requires_IEEE_754", +    "is_resource_enabled", "requires", "requires_linux_version", +    "requires_mac_ver", "find_unused_port", "bind_port", +    "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", "temp_cwd", +    "findfile", "create_empty_file", "sortdict", "check_syntax_error", "open_urlresource", +    "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource", +    "captured_stdout", "captured_stdin", "captured_stderr", "time_out", +    "socket_peer_reset", "ioerror_peer_reset", "run_with_locale", 'temp_umask', +    "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest", +    "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", +    "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", +    "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",      "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", -    "import_fresh_module", "failfast", +    "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast",      ]  class Error(Exception): @@ -168,7 +171,7 @@ def get_attribute(obj, name):          attribute = getattr(obj, name)      except AttributeError:          raise unittest.SkipTest("module %s has no attribute %s" % ( -            obj.__name__, name)) +            repr(obj), name))      else:          return attribute @@ -295,9 +298,36 @@ def requires(resource, msg=None):          return      if not is_resource_enabled(resource):          if msg is None: -            msg = "Use of the `%s' resource not enabled" % resource +            msg = "Use of the %r resource not enabled" % resource          raise ResourceDenied(msg) +def requires_linux_version(*min_version): +    """Decorator raising SkipTest if the OS is Linux and the kernel version is +    less than min_version. + +    For example, @requires_linux_version(2, 6, 35) raises SkipTest if the Linux +    kernel version is less than 2.6.35. +    """ +    def decorator(func): +        @functools.wraps(func) +        def wrapper(*args, **kw): +            if sys.platform == 'linux': +                version_txt = platform.release().split('-', 1)[0] +                try: +                    version = tuple(map(int, version_txt.split('.'))) +                except ValueError: +                    pass +                else: +                    if version < min_version: +                        min_version_txt = '.'.join(map(str, min_version)) +                        raise unittest.SkipTest( +                            "Linux kernel %s or higher required, not %s" +                            % (min_version_txt, version_txt)) +            return func(*args, **kw) +        wrapper.min_version = min_version +        return wrapper +    return decorator +  def requires_mac_ver(*min_version):      """Decorator raising SkipTest if the OS is Mac OS X and the OS X      version if less than min_version. @@ -325,6 +355,7 @@ def requires_mac_ver(*min_version):          return wrapper      return decorator +  HOST = 'localhost'  def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): @@ -420,29 +451,35 @@ def bind_port(sock, host=HOST):      port = sock.getsockname()[1]      return port -FUZZ = 1e-6 - -def fcmp(x, y): # fuzzy comparison function -    if isinstance(x, float) or isinstance(y, float): +def _is_ipv6_enabled(): +    """Check whether IPv6 is enabled on this host.""" +    if socket.has_ipv6:          try: -            fuzz = (abs(x) + abs(y)) * FUZZ -            if abs(x-y) <= fuzz: -                return 0 -        except: +            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) +            sock.bind(('::1', 0)) +        except (socket.error, socket.gaierror):              pass -    elif type(x) == type(y) and isinstance(x, (tuple, list)): -        for i in range(min(len(x), len(y))): -            outcome = fcmp(x[i], y[i]) -            if outcome != 0: -                return outcome -        return (len(x) > len(y)) - (len(x) < len(y)) -    return (x > y) - (x < y) +        else: +            sock.close() +            return True +    return False + +IPV6_ENABLED = _is_ipv6_enabled() + + +# A constant likely larger than the underlying OS pipe buffer size. +# Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe +# buffer size: take 1M to be sure. +PIPE_MAX_SIZE = 1024 * 1024 +  # decorator for skipping tests on non-IEEE 754 platforms  requires_IEEE_754 = unittest.skipUnless(      float.__getformat__("double").startswith("IEEE"),      "test requires IEEE 754 doubles") +requires_zlib = unittest.skipUnless(zlib, 'requires zlib') +  is_jython = sys.platform.startswith('java')  # Filename used for testing @@ -543,14 +580,15 @@ def temp_cwd(name='tempcwd', quiet=False, path=None):              rmtree(name) -@contextlib.contextmanager -def temp_umask(umask): -    """Context manager that temporarily sets the process umask.""" -    oldmask = os.umask(umask) -    try: -        yield -    finally: -        os.umask(oldmask) +if hasattr(os, "umask"): +    @contextlib.contextmanager +    def temp_umask(umask): +        """Context manager that temporarily sets the process umask.""" +        oldmask = os.umask(umask) +        try: +            yield +        finally: +            os.umask(oldmask)  def findfile(file, here=__file__, subdir=None): @@ -568,6 +606,11 @@ def findfile(file, here=__file__, subdir=None):          if os.path.exists(fn): return fn      return file +def create_empty_file(filename): +    """Create an empty file. If the file already exists, truncate it.""" +    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) +    os.close(fd) +  def sortdict(dict):      "Like repr(dict), but in sorted order."      items = sorted(dict.items()) @@ -632,7 +675,7 @@ def open_urlresource(url, *args, **kw):      f = check_valid_file(fn)      if f is not None:          return f -    raise TestFailed('invalid resource "%s"' % fn) +    raise TestFailed('invalid resource %r' % fn)  class WarningsRecorder(object): @@ -753,7 +796,7 @@ class CleanImport(object):          sys.modules.update(self.original_modules) -class EnvironmentVarGuard(collections.MutableMapping): +class EnvironmentVarGuard(collections.abc.MutableMapping):      """Class to help protect the environment variable properly.  Can be used as      a context manager.""" @@ -883,7 +926,7 @@ def transient_internet(resource_name, *, timeout=30.0, errnos=()):          ('WSANO_DATA', 11004),      ] -    denied = ResourceDenied("Resource '%s' is not available" % resource_name) +    denied = ResourceDenied("Resource %r is not available" % resource_name)      captured_errnos = errnos      gai_errnos = []      if not captured_errnos: @@ -972,6 +1015,16 @@ def gc_collect():      gc.collect()      gc.collect() +@contextlib.contextmanager +def disable_gc(): +    have_gc = gc.isenabled() +    gc.disable() +    try: +        yield +    finally: +        if have_gc: +            gc.enable() +  def python_is_optimized():      """Find if Python was built with optimizations.""" @@ -980,7 +1033,7 @@ def python_is_optimized():      for opt in cflags.split():          if opt.startswith('-O'):              final_opt = opt -    return final_opt and final_opt != '-O0' +    return final_opt != '' and final_opt != '-O0'  #======================================================================= @@ -1090,6 +1143,11 @@ def bigmemtest(minsize, memuse):      return decorator  def precisionbigmemtest(size, memuse): +    """Decorator for bigmem tests that need exact sizes. + +    Like bigmemtest, but without the size scaling upward to fill available +    memory. +    """      def decorator(f):          def wrapper(self):              size = wrapper.size @@ -1185,6 +1243,33 @@ def check_impl_detail(**guards):      return guards.get(platform.python_implementation().lower(), default) +def no_tracing(func): +    """Decorator to temporarily turn off tracing for the duration of a test.""" +    if not hasattr(sys, 'gettrace'): +        return func +    else: +        @functools.wraps(func) +        def wrapper(*args, **kwargs): +            original_trace = sys.gettrace() +            try: +                sys.settrace(None) +                return func(*args, **kwargs) +            finally: +                sys.settrace(original_trace) +        return wrapper + + +def refcount_test(test): +    """Decorator for tests which involve reference counting. + +    To start, the decorator does not run the test if is not run by CPython. +    After that, any trace function is unset during the test to prevent +    unexpected refcounts caused by the trace function. + +    """ +    return no_tracing(cpython_only(test)) + +  def _filter_suite(suite, pred):      """Recursively filter test cases in a suite based on a predicate."""      newtests = [] @@ -1197,7 +1282,6 @@ def _filter_suite(suite, pred):                  newtests.append(test)      suite._tests = newtests -  def _run_suite(suite):      """Run tests from a unittest.TestSuite-derived class."""      if verbose: @@ -1424,7 +1508,7 @@ def strip_python_stderr(stderr):  def args_from_interpreter_flags():      """Return a list of command-line arguments reproducing the current -    settings in sys.flags.""" +    settings in sys.flags and sys.warnoptions."""      flag_opt_map = {          'bytes_warning': 'b',          'dont_write_bytecode': 'B', @@ -1439,6 +1523,8 @@ def args_from_interpreter_flags():          v = getattr(sys.flags, flag)          if v > 0:              args.append('-' + opt * v) +    for opt in sys.warnoptions: +        args.append('-W' + opt)      return args  #============================================================  | 
