diff options
Diffstat (limited to 'Lib/test/support.py')
| -rw-r--r-- | Lib/test/support.py | 368 | 
1 files changed, 249 insertions, 119 deletions
| diff --git a/Lib/test/support.py b/Lib/test/support.py index e1ec9e2..014bcf5 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -15,7 +15,7 @@ import shutil  import warnings  import unittest  import importlib -import collections +import collections.abc  import re  import subprocess  import imp @@ -37,26 +37,41 @@ try:  except ImportError:      multiprocessing = None +try: +    import zlib +except ImportError: +    zlib = None + +try: +    import bz2 +except ImportError: +    bz2 = None + +try: +    import lzma +except ImportError: +    lzma = None  __all__ = [ -    "Error", "TestFailed", "ResourceDenied", "import_module", -    "verbose", "use_resources", "max_memuse", "record_original_stdout", +    "Error", "TestFailed", "ResourceDenied", "import_module", "verbose", +    "use_resources", "max_memuse", "record_original_stdout",      "get_original_stdout", "unload", "unlink", "rmtree", "forget", -    "is_resource_enabled", "requires", "requires_mac_ver", -    "find_unused_port", "bind_port", -    "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd", -    "findfile", "sortdict", "check_syntax_error", "open_urlresource", -    "check_warnings", "CleanImport", "EnvironmentVarGuard", -    "TransientResource", "captured_output", "captured_stdout", -    "captured_stdin", "captured_stderr", -    "time_out", "socket_peer_reset", "ioerror_peer_reset", -    "run_with_locale", 'temp_umask', "transient_internet", -    "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", -    "run_unittest", "run_doctest", "threading_setup", "threading_cleanup", -    "reap_children", "cpython_only", "check_impl_detail", "get_attribute", -    "swap_item", "swap_attr", "requires_IEEE_754", +    "is_resource_enabled", "requires", "requires_freebsd_version", +    "requires_linux_version", "requires_mac_ver", "find_unused_port", +    "bind_port", "IPV6_ENABLED", "is_jython", "TESTFN", "HOST", "SAVEDCWD", +    "temp_cwd", "findfile", "create_empty_file", "sortdict", +    "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport", +    "EnvironmentVarGuard", "TransientResource", "captured_stdout", +    "captured_stdin", "captured_stderr", "time_out", "socket_peer_reset", +    "ioerror_peer_reset", "run_with_locale", 'temp_umask', +    "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest", +    "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", +    "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", +    "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",      "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", -    "import_fresh_module", "failfast", "run_with_tz" +    "skip_unless_xattr", "import_fresh_module", "requires_zlib", +    "PIPE_MAX_SIZE", "failfast", "anticipate_failure", "run_with_tz", +    "requires_bz2", "requires_lzma"      ]  class Error(Exception): @@ -127,6 +142,17 @@ def _save_and_block_module(name, orig_modules):      return saved +def anticipate_failure(condition): +    """Decorator to mark a test that is known to be broken in some cases + +       Any use of this decorator should have a comment identifying the +       associated tracker issue. +    """ +    if condition: +        return unittest.expectedFailure +    return lambda f: f + +  def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):      """Imports and returns a module, deliberately bypassing the sys.modules cache      and importing a fresh copy of the module. Once the import is complete, @@ -170,8 +196,7 @@ def get_attribute(obj, name):      try:          attribute = getattr(obj, name)      except AttributeError: -        raise unittest.SkipTest("module %s has no attribute %s" % ( -            obj.__name__, name)) +        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))      else:          return attribute @@ -276,8 +301,7 @@ def rmtree(path):      try:          _rmtree(path)      except OSError as error: -        # Unix returns ENOENT, Windows returns ESRCH. -        if error.errno not in (errno.ENOENT, errno.ESRCH): +        if error.errno != errno.ENOENT:              raise  def make_legacy_pyc(source): @@ -362,9 +386,52 @@ def requires(resource, msg=None):          return      if not is_resource_enabled(resource):          if msg is None: -            msg = "Use of the `%s' resource not enabled" % resource +            msg = "Use of the %r resource not enabled" % resource          raise ResourceDenied(msg) +def _requires_unix_version(sysname, min_version): +    """Decorator raising SkipTest if the OS is `sysname` and the version is less +    than `min_version`. + +    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if +    the FreeBSD version is less than 7.2. +    """ +    def decorator(func): +        @functools.wraps(func) +        def wrapper(*args, **kw): +            if platform.system() == sysname: +                version_txt = platform.release().split('-', 1)[0] +                try: +                    version = tuple(map(int, version_txt.split('.'))) +                except ValueError: +                    pass +                else: +                    if version < min_version: +                        min_version_txt = '.'.join(map(str, min_version)) +                        raise unittest.SkipTest( +                            "%s version %s or higher required, not %s" +                            % (sysname, min_version_txt, version_txt)) +        return wrapper +    return decorator + +def requires_freebsd_version(*min_version): +    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is +    less than `min_version`. + +    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD +    version is less than 7.2. +    """ +    return _requires_unix_version('FreeBSD', min_version) + +def requires_linux_version(*min_version): +    """Decorator raising SkipTest if the OS is Linux and the Linux version is +    less than `min_version`. + +    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux +    version is less than 2.6.32. +    """ +    return _requires_unix_version('Linux', min_version) +  def requires_mac_ver(*min_version):      """Decorator raising SkipTest if the OS is Mac OS X and the OS X      version if less than min_version. @@ -392,6 +459,7 @@ def requires_mac_ver(*min_version):          return wrapper      return decorator +  HOST = 'localhost'  def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): @@ -487,29 +555,41 @@ def bind_port(sock, host=HOST):      port = sock.getsockname()[1]      return port -FUZZ = 1e-6 - -def fcmp(x, y): # fuzzy comparison function -    if isinstance(x, float) or isinstance(y, float): +def _is_ipv6_enabled(): +    """Check whether IPv6 is enabled on this host.""" +    if socket.has_ipv6: +        sock = None          try: -            fuzz = (abs(x) + abs(y)) * FUZZ -            if abs(x-y) <= fuzz: -                return 0 -        except: +            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) +            sock.bind(('::1', 0)) +            return True +        except (socket.error, socket.gaierror):              pass -    elif type(x) == type(y) and isinstance(x, (tuple, list)): -        for i in range(min(len(x), len(y))): -            outcome = fcmp(x[i], y[i]) -            if outcome != 0: -                return outcome -        return (len(x) > len(y)) - (len(x) < len(y)) -    return (x > y) - (x < y) +        finally: +            if sock: +                sock.close() +    return False + +IPV6_ENABLED = _is_ipv6_enabled() + + +# A constant likely larger than the underlying OS pipe buffer size. +# Windows limit seems to be around 512B, and most Unix kernels have a 64K pipe +# buffer size: take 1M to be sure. +PIPE_MAX_SIZE = 1024 * 1024 +  # decorator for skipping tests on non-IEEE 754 platforms  requires_IEEE_754 = unittest.skipUnless(      float.__getformat__("double").startswith("IEEE"),      "test requires IEEE 754 doubles") +requires_zlib = unittest.skipUnless(zlib, 'requires zlib') + +requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') + +requires_lzma = unittest.skipUnless(lzma, 'requires lzma') +  is_jython = sys.platform.startswith('java')  # Filename used for testing @@ -610,14 +690,15 @@ def temp_cwd(name='tempcwd', quiet=False, path=None):              rmtree(name) -@contextlib.contextmanager -def temp_umask(umask): -    """Context manager that temporarily sets the process umask.""" -    oldmask = os.umask(umask) -    try: -        yield -    finally: -        os.umask(oldmask) +if hasattr(os, "umask"): +    @contextlib.contextmanager +    def temp_umask(umask): +        """Context manager that temporarily sets the process umask.""" +        oldmask = os.umask(umask) +        try: +            yield +        finally: +            os.umask(oldmask)  def findfile(file, here=__file__, subdir=None): @@ -635,6 +716,11 @@ def findfile(file, here=__file__, subdir=None):          if os.path.exists(fn): return fn      return file +def create_empty_file(filename): +    """Create an empty file. If the file already exists, truncate it.""" +    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) +    os.close(fd) +  def sortdict(dict):      "Like repr(dict), but in sorted order."      items = sorted(dict.items()) @@ -699,7 +785,7 @@ def open_urlresource(url, *args, **kw):      f = check_valid_file(fn)      if f is not None:          return f -    raise TestFailed('invalid resource "%s"' % fn) +    raise TestFailed('invalid resource %r' % fn)  class WarningsRecorder(object): @@ -820,7 +906,7 @@ class CleanImport(object):          sys.modules.update(self.original_modules) -class EnvironmentVarGuard(collections.MutableMapping): +class EnvironmentVarGuard(collections.abc.MutableMapping):      """Class to help protect the environment variable properly.  Can be used as      a context manager.""" @@ -951,7 +1037,7 @@ def transient_internet(resource_name, *, timeout=30.0, errnos=()):          ('WSANO_DATA', 11004),      ] -    denied = ResourceDenied("Resource '%s' is not available" % resource_name) +    denied = ResourceDenied("Resource %r is not available" % resource_name)      captured_errnos = errnos      gai_errnos = []      if not captured_errnos: @@ -1040,6 +1126,16 @@ def gc_collect():      gc.collect()      gc.collect() +@contextlib.contextmanager +def disable_gc(): +    have_gc = gc.isenabled() +    gc.disable() +    try: +        yield +    finally: +        if have_gc: +            gc.enable() +  def python_is_optimized():      """Find if Python was built with optimizations.""" @@ -1048,19 +1144,21 @@ def python_is_optimized():      for opt in cflags.split():          if opt.startswith('-O'):              final_opt = opt -    return final_opt and final_opt != '-O0' +    return final_opt != '' and final_opt != '-O0' -_header = '2P' +_header = 'nP' +_align = '0n'  if hasattr(sys, "gettotalrefcount"):      _header = '2P' + _header -_vheader = _header + 'P' +    _align = '0P' +_vheader = _header + 'n'  def calcobjsize(fmt): -    return struct.calcsize(_header + fmt + '0P') +    return struct.calcsize(_header + fmt + _align)  def calcvobjsize(fmt): -    return struct.calcsize(_vheader + fmt + '0P') +    return struct.calcsize(_vheader + fmt + _align)  _TPFLAGS_HAVE_GC = 1<<14 @@ -1179,41 +1277,35 @@ def set_memlimit(limit):          raise ValueError('Memory limit %r too low to be useful' % (limit,))      max_memuse = memlimit -def _memory_watchdog(start_evt, finish_evt, period=10.0): -    """A function which periodically watches the process' memory consumption +class _MemoryWatchdog: +    """An object which periodically watches the process' memory consumption      and prints it out.      """ -    # XXX: because of the GIL, and because the very long operations tested -    # in most bigmem tests are uninterruptible, the loop below gets woken up -    # much less often than expected. -    # The polling code should be rewritten in raw C, without holding the GIL, -    # and push results onto an anonymous pipe. -    try: -        page_size = os.sysconf('SC_PAGESIZE') -    except (ValueError, AttributeError): + +    def __init__(self): +        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) +        self.started = False + +    def start(self):          try: -            page_size = os.sysconf('SC_PAGE_SIZE') -        except (ValueError, AttributeError): -            page_size = 4096 -    procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) -    try: -        f = open(procfile, 'rb') -    except IOError as e: -        warnings.warn('/proc not available for stats: {}'.format(e), -                      RuntimeWarning) -        sys.stderr.flush() -        return -    with f: -        start_evt.set() -        old_data = -1 -        while not finish_evt.wait(period): -            f.seek(0) -            statm = f.read().decode('ascii') -            data = int(statm.split()[5]) -            if data != old_data: -                old_data = data -                print(" ... process data size: {data:.1f}G" -                       .format(data=data * page_size / (1024 ** 3))) +            f = open(self.procfile, 'r') +        except OSError as e: +            warnings.warn('/proc not available for stats: {}'.format(e), +                          RuntimeWarning) +            sys.stderr.flush() +            return + +        watchdog_script = findfile("memory_watchdog.py") +        self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], +                                             stdin=f, stderr=subprocess.DEVNULL) +        f.close() +        self.started = True + +    def stop(self): +        if self.started: +            self.mem_watchdog.terminate() +            self.mem_watchdog.wait() +  def bigmemtest(size, memuse, dry_run=True):      """Decorator for bigmem tests. @@ -1240,27 +1332,20 @@ def bigmemtest(size, memuse, dry_run=True):                      "not enough memory: %.1fG minimum needed"                      % (size * memuse / (1024 ** 3))) -            if real_max_memuse and verbose and threading: +            if real_max_memuse and verbose:                  print()                  print(" ... expected peak memory use: {peak:.1f}G"                        .format(peak=size * memuse / (1024 ** 3))) -                sys.stdout.flush() -                start_evt = threading.Event() -                finish_evt = threading.Event() -                t = threading.Thread(target=_memory_watchdog, -                                     args=(start_evt, finish_evt, 0.5)) -                t.daemon = True -                t.start() -                start_evt.set() +                watchdog = _MemoryWatchdog() +                watchdog.start()              else: -                t = None +                watchdog = None              try:                  return f(self, maxsize)              finally: -                if t: -                    finish_evt.set() -                    t.join() +                if watchdog: +                    watchdog.stop()          wrapper.size = size          wrapper.memuse = memuse @@ -1342,6 +1427,33 @@ def check_impl_detail(**guards):      return guards.get(platform.python_implementation().lower(), default) +def no_tracing(func): +    """Decorator to temporarily turn off tracing for the duration of a test.""" +    if not hasattr(sys, 'gettrace'): +        return func +    else: +        @functools.wraps(func) +        def wrapper(*args, **kwargs): +            original_trace = sys.gettrace() +            try: +                sys.settrace(None) +                return func(*args, **kwargs) +            finally: +                sys.settrace(original_trace) +        return wrapper + + +def refcount_test(test): +    """Decorator for tests which involve reference counting. + +    To start, the decorator does not run the test if is not run by CPython. +    After that, any trace function is unset during the test to prevent +    unexpected refcounts caused by the trace function. + +    """ +    return no_tracing(cpython_only(test)) + +  def _filter_suite(suite, pred):      """Recursively filter test cases in a suite based on a predicate."""      newtests = [] @@ -1354,7 +1466,6 @@ def _filter_suite(suite, pred):                  newtests.append(test)      suite._tests = newtests -  def _run_suite(suite):      """Run tests from a unittest.TestSuite-derived class."""      if verbose: @@ -1403,7 +1514,7 @@ def run_unittest(*classes):  #=======================================================================  # doctest driver. -def run_doctest(module, verbosity=None): +def run_doctest(module, verbosity=None, optionflags=0):      """Run doctest on the given module.  Return (#failures, #tests).      If optional argument verbosity is not specified (or is None), pass @@ -1418,7 +1529,7 @@ def run_doctest(module, verbosity=None):      else:          verbosity = None -    f, t = doctest.testmod(module, verbose=verbosity) +    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)      if f:          raise TestFailed("%d of %d doctests failed" % (f, t))      if verbose: @@ -1576,28 +1687,13 @@ def strip_python_stderr(stderr):      This will typically be run on the result of the communicate() method      of a subprocess.Popen object.      """ -    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() +    stderr = re.sub(br"\[\d+ refs\]\r?\n?", b"", stderr).strip()      return stderr  def args_from_interpreter_flags():      """Return a list of command-line arguments reproducing the current -    settings in sys.flags.""" -    flag_opt_map = { -        'bytes_warning': 'b', -        'dont_write_bytecode': 'B', -        'hash_randomization': 'R', -        'ignore_environment': 'E', -        'no_user_site': 's', -        'no_site': 'S', -        'optimize': 'O', -        'verbose': 'v', -    } -    args = [] -    for flag, opt in flag_opt_map.items(): -        v = getattr(sys.flags, flag) -        if v > 0: -            args.append('-' + opt * v) -    return args +    settings in sys.flags and sys.warnoptions.""" +    return subprocess._args_from_interpreter_flags()  #============================================================  # Support for assertions about logging. @@ -1687,6 +1783,40 @@ def skip_unless_symlink(test):      msg = "Requires functional symlink implementation"      return test if ok else unittest.skip(msg)(test) +_can_xattr = None +def can_xattr(): +    global _can_xattr +    if _can_xattr is not None: +        return _can_xattr +    if not hasattr(os, "setxattr"): +        can = False +    else: +        tmp_fp, tmp_name = tempfile.mkstemp() +        try: +            with open(TESTFN, "wb") as fp: +                try: +                    # TESTFN & tempfile may use different file systems with +                    # different capabilities +                    os.setxattr(tmp_fp, b"user.test", b"") +                    os.setxattr(fp.fileno(), b"user.test", b"") +                    # Kernels < 2.6.39 don't respect setxattr flags. +                    kernel_version = platform.release() +                    m = re.match("2.6.(\d{1,2})", kernel_version) +                    can = m is None or int(m.group(1)) >= 39 +                except OSError: +                    can = False +        finally: +            unlink(TESTFN) +            unlink(tmp_name) +    _can_xattr = can +    return can + +def skip_unless_xattr(test): +    """Skip decorator for tests that require functional extended attributes""" +    ok = can_xattr() +    msg = "no non-broken extended attribute support" +    return test if ok else unittest.skip(msg)(test) +  def patch(test_instance, object_to_patch, attr_name, new_value):      """Override 'object_to_patch'.'attr_name' with 'new_value'. | 
