diff options
Diffstat (limited to 'Lib/test/support.py')
-rw-r--r-- | Lib/test/support.py | 591 |
1 files changed, 482 insertions, 109 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py index dcfadea..2a06661 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -10,28 +10,40 @@ import gc import socket import sys import os -import re import platform import shutil import warnings import unittest import importlib import collections +import re +import subprocess +import imp +import time +import sysconfig +import logging.handlers + +try: + import _thread +except ImportError: + _thread = None __all__ = [ "Error", "TestFailed", "ResourceDenied", "import_module", "verbose", "use_resources", "max_memuse", "record_original_stdout", "get_original_stdout", "unload", "unlink", "rmtree", "forget", "is_resource_enabled", "requires", "find_unused_port", "bind_port", - "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "findfile", "verify", - "vereq", "sortdict", "check_syntax_error", "open_urlresource", + "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd", + "findfile", "sortdict", "check_syntax_error", "open_urlresource", "check_warnings", "CleanImport", "EnvironmentVarGuard", "TransientResource", "captured_output", "captured_stdout", "time_out", "socket_peer_reset", "ioerror_peer_reset", - "run_with_locale", "transient_internet", + "run_with_locale", 'temp_umask', "transient_internet", "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", "get_attribute", + "swap_item", "swap_attr", "requires_IEEE_754", + "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", "import_fresh_module" ] @@ -177,27 +189,50 @@ def unload(name): def unlink(filename): try: os.unlink(filename) - except OSError: - pass + except OSError as error: + # The filename need not exist. + if error.errno not in (errno.ENOENT, errno.ENOTDIR): + raise def rmtree(path): try: shutil.rmtree(path) - except OSError as e: + except OSError as error: # Unix returns ENOENT, Windows returns ESRCH. - if e.errno not in (errno.ENOENT, errno.ESRCH): + if error.errno not in (errno.ENOENT, errno.ESRCH): raise +def make_legacy_pyc(source): + """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location. + + The choice of .pyc or .pyo extension is done based on the __debug__ flag + value. + + :param source: The file system path to the source file. The source file + does not need to exist, however the PEP 3147 pyc file must exist. + :return: The file system path to the legacy pyc file. + """ + pyc_file = imp.cache_from_source(source) + up_one = os.path.dirname(os.path.abspath(source)) + legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o')) + os.rename(pyc_file, legacy_pyc) + return legacy_pyc + def forget(modname): - '''"Forget" a module was ever imported by removing it from sys.modules and - deleting any .pyc and .pyo files.''' + """'Forget' a module was ever imported. + + This removes the module from sys.modules and deletes any PEP 3147 or + legacy .pyc and .pyo files. + """ unload(modname) for dirname in sys.path: - unlink(os.path.join(dirname, modname + '.pyc')) - # Deleting the .pyo file cannot be within the 'try' for the .pyc since - # the chance exists that there is no .pyc (and thus the 'try' statement - # is exited) but there is a .pyo file. - unlink(os.path.join(dirname, modname + '.pyo')) + source = os.path.join(dirname, modname + '.py') + # It doesn't matter if they exist or not, unlink all possible + # combinations of PEP 3147 and legacy pyc and pyo files. + unlink(source + 'c') + unlink(source + 'o') + unlink(imp.cache_from_source(source, debug_override=True)) + unlink(imp.cache_from_source(source, debug_override=False)) # On some platforms, should not run gui test even if it is allowed # in `use_resources'. @@ -238,7 +273,9 @@ def requires(resource, msg=None): """Raise ResourceDenied if the specified resource is not available. If the caller's module is __main__ then automatically return True. The - possibility of False being returned occurs when regrtest.py is executing.""" + possibility of False being returned occurs when regrtest.py is + executing. + """ if resource == 'gui' and not _is_gui_available(): raise unittest.SkipTest("Cannot use the 'gui' resource") # see if the caller's module is __main__ - if so, treat as if @@ -363,6 +400,11 @@ def fcmp(x, y): # fuzzy comparison function return (len(x) > len(y)) - (len(x) < len(y)) return (x > y) - (x < y) +# decorator for skipping tests on non-IEEE 754 platforms +requires_IEEE_754 = unittest.skipUnless( + float.__getformat__("double").startswith("IEEE"), + "test requires IEEE 754 doubles") + is_jython = sys.platform.startswith('java') # Filename used for testing @@ -372,57 +414,106 @@ if os.name == 'java': else: TESTFN = '@test' - # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() - # TESTFN_UNICODE is a filename that can be encoded using the - # file system encoding, but *not* with the default (ascii) encoding - TESTFN_UNICODE = "@test-\xe0\xf2" - TESTFN_ENCODING = sys.getfilesystemencoding() - # TESTFN_UNENCODABLE is a filename that should *not* be - # able to be encoded by *either* the default or filesystem encoding. - # This test really only makes sense on Windows NT platforms - # which have special Unicode support in posixmodule. - if (not hasattr(sys, "getwindowsversion") or - sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME - TESTFN_UNENCODABLE = None - else: +# Disambiguate TESTFN for parallel testing, while letting it remain a valid +# module name. +TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) + + +# TESTFN_UNICODE is a non-ascii filename +TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" +if sys.platform == 'darwin': + # In Mac OS X's VFS API file names are, by definition, canonically + # decomposed Unicode, encoded using UTF-8. See QA1173: + # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html + import unicodedata + TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) +TESTFN_ENCODING = sys.getfilesystemencoding() + +# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be +# encoded by the filesystem encoding (in strict mode). It can be None if we +# cannot generate such filename. +TESTFN_UNENCODABLE = None +if os.name in ('nt', 'ce'): + # skip win32s (0) or Windows 9x/ME (1) + if sys.getwindowsversion().platform >= 2: # Different kinds of characters from various languages to minimize the # probability that the whole name is encodable to MBCS (issue #9819) TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" try: - # XXX - Note - should be using TESTFN_ENCODING here - but for - # Windows, "mbcs" currently always operates as if in - # errors=ignore' mode - hence we get '?' characters rather than - # the exception. 'Latin1' operates as we expect - ie, fails. - # See [ 850997 ] mbcs encoding ignores errors - TESTFN_UNENCODABLE.encode("Latin1") + TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) except UnicodeEncodeError: pass else: - print('WARNING: The filename %r CAN be encoded by the filesystem. ' + print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' 'Unicode filename tests may not be effective' - % TESTFN_UNENCODABLE) + % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) + TESTFN_UNENCODABLE = None +# Mac OS X denies unencodable filenames (invalid utf-8) +elif sys.platform != 'darwin': + try: + # ascii and utf-8 cannot encode the byte 0xff + b'\xff'.decode(TESTFN_ENCODING) + except UnicodeDecodeError: + # 0xff will be encoded using the surrogate character u+DCFF + TESTFN_UNENCODABLE = TESTFN \ + + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') + else: + # File system encoding (eg. ISO-8859-* encodings) can encode + # the byte 0xff. Skip some unicode filename tests. + pass -if os.path.isdir(TESTFN): - # a test failed (eg. test_os) without removing TESTFN directory - shutil.rmtree(TESTFN) +# Save the initial cwd +SAVEDCWD = os.getcwd() -# Make sure we can write to TESTFN, try in /tmp if we can't -fp = None -try: - fp = open(TESTFN, 'w+') -except IOError: - TMP_TESTFN = os.path.join('/tmp', TESTFN) +@contextlib.contextmanager +def temp_cwd(name='tempcwd', quiet=False, path=None): + """ + Context manager that temporarily changes the CWD. + + An existing path may be provided as *path*, in which case this + function makes no changes to the file system. + + Otherwise, the new CWD is created in the current directory and it's + named *name*. If *quiet* is False (default) and it's not possible to + create or change the CWD, an error is raised. If it's True, only a + warning is raised and the original CWD is used. + """ + saved_dir = os.getcwd() + is_temporary = False + if path is None: + path = name + try: + os.mkdir(name) + is_temporary = True + except OSError: + if not quiet: + raise + warnings.warn('tests may fail, unable to create temp CWD ' + name, + RuntimeWarning, stacklevel=3) + try: + os.chdir(path) + except OSError: + if not quiet: + raise + warnings.warn('tests may fail, unable to change the CWD to ' + name, + RuntimeWarning, stacklevel=3) + try: + yield os.getcwd() + finally: + os.chdir(saved_dir) + if is_temporary: + rmtree(name) + + +@contextlib.contextmanager +def temp_umask(umask): + """Context manager that temporarily sets the process umask.""" + oldmask = os.umask(umask) try: - fp = open(TMP_TESTFN, 'w+') - TESTFN = TMP_TESTFN - del TMP_TESTFN - except IOError: - print(('WARNING: tests will fail, unable to write to: %s or %s' % - (TESTFN, TMP_TESTFN))) -if fp is not None: - fp.close() - unlink(TESTFN) -del fp + yield + finally: + os.umask(oldmask) + def findfile(file, here=__file__, subdir=None): """Try to find a file on sys.path and the working directory. If it is not @@ -439,30 +530,6 @@ def findfile(file, here=__file__, subdir=None): if os.path.exists(fn): return fn return file -def verify(condition, reason='test failed'): - """Verify that condition is true. If not, raise TestFailed. - - The optional argument reason can be given to provide - a better error text. - """ - - if not condition: - raise TestFailed(reason) - -def vereq(a, b): - """Raise TestFailed if a == b is false. - - This is better than verify(a == b) because, in case of failure, the - error message incorporates repr(a) and repr(b) so you can see the - inputs. - - Note that "not (a == b)" isn't necessarily the same as "a != b"; the - former is tested. - """ - - if not (a == b): - raise TestFailed("%r == %r" % (a, b)) - def sortdict(dict): "Like repr(dict), but in sorted order." items = sorted(dict.items()) @@ -489,47 +556,131 @@ def check_syntax_error(testcase, statement): def open_urlresource(url, *args, **kw): import urllib.request, urllib.parse - requires('urlfetch') + check = kw.pop('check', None) + filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! - for path in [os.path.curdir, os.path.pardir]: - fn = os.path.join(path, filename) - if os.path.exists(fn): - return open(fn, *args, **kw) + fn = os.path.join(os.path.dirname(__file__), "data", filename) + + def check_valid_file(fn): + f = open(fn, *args, **kw) + if check is None: + return f + elif check(f): + f.seek(0) + return f + f.close() + + if os.path.exists(fn): + f = check_valid_file(fn) + if f is not None: + return f + unlink(fn) + + # Verify the requirement before downloading the file + requires('urlfetch') print('\tfetching %s ...' % url, file=get_original_stdout()) f = urllib.request.urlopen(url, timeout=15) try: - with open(filename, "wb") as out: + with open(fn, "wb") as out: s = f.read() while s: out.write(s) s = f.read() finally: f.close() - return open(filename, *args, **kw) + + f = check_valid_file(fn) + if f is not None: + return f + raise TestFailed('invalid resource "%s"' % fn) + class WarningsRecorder(object): """Convenience wrapper for the warnings list returned on entry to the warnings.catch_warnings() context manager. """ def __init__(self, warnings_list): - self.warnings = warnings_list + self._warnings = warnings_list + self._last = 0 def __getattr__(self, attr): - if self.warnings: - return getattr(self.warnings[-1], attr) + if len(self._warnings) > self._last: + return getattr(self._warnings[-1], attr) elif attr in warnings.WarningMessage._WARNING_DETAILS: return None raise AttributeError("%r has no attribute %r" % (self, attr)) + @property + def warnings(self): + return self._warnings[self._last:] + def reset(self): - del self.warnings[:] + self._last = len(self._warnings) -@contextlib.contextmanager -def check_warnings(): + +def _filterwarnings(filters, quiet=False): + """Catch the warnings, then check if all the expected + warnings have been raised and re-raise unexpected warnings. + If 'quiet' is True, only re-raise the unexpected warnings. + """ + # Clear the warning registry of the calling module + # in order to re-raise the warnings. + frame = sys._getframe(2) + registry = frame.f_globals.get('__warningregistry__') + if registry: + registry.clear() with warnings.catch_warnings(record=True) as w: + # Set filter "always" to record all warnings. Because + # test_warnings swap the module, we need to look up in + # the sys.modules dictionary. + sys.modules['warnings'].simplefilter("always") yield WarningsRecorder(w) + # Filter the recorded warnings + reraise = list(w) + missing = [] + for msg, cat in filters: + seen = False + for w in reraise[:]: + warning = w.message + # Filter out the matching messages + if (re.match(msg, str(warning), re.I) and + issubclass(warning.__class__, cat)): + seen = True + reraise.remove(w) + if not seen and not quiet: + # This filter caught nothing + missing.append((msg, cat.__name__)) + if reraise: + raise AssertionError("unhandled warning %s" % reraise[0]) + if missing: + raise AssertionError("filter (%r, %s) did not catch any warning" % + missing[0]) + + +@contextlib.contextmanager +def check_warnings(*filters, **kwargs): + """Context manager to silence warnings. + + Accept 2-tuples as positional arguments: + ("message regexp", WarningCategory) + + Optional argument: + - if 'quiet' is True, it does not fail if a filter catches nothing + (default True without argument, + default False if some filters are defined) + + Without argument, it defaults to: + check_warnings(("", Warning), quiet=True) + """ + quiet = kwargs.get('quiet') + if not filters: + filters = (("", Warning),) + # Preserve backward compatibility + if quiet is None: + quiet = True + return _filterwarnings(filters, quiet) class CleanImport(object): @@ -541,7 +692,7 @@ class CleanImport(object): Use like this: with CleanImport("foo"): - __import__("foo") # new reference + importlib.import_module("foo") # new reference """ def __init__(self, *module_names): @@ -614,6 +765,32 @@ class EnvironmentVarGuard(collections.MutableMapping): del self._environ[k] else: self._environ[k] = v + os.environ = self._environ + + +class DirsOnSysPath(object): + """Context manager to temporarily add directories to sys.path. + + This makes a copy of sys.path, appends any directories given + as positional arguments, then reverts sys.path to the copied + settings when the context ends. + + Note that *all* sys.path modifications in the body of the + context manager, including replacement of the object, + will be reverted at the end of the block. + """ + + def __init__(self, *paths): + self.original_value = sys.path[:] + self.original_object = sys.path + sys.path.extend(paths) + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.path = self.original_object + sys.path[:] = self.original_value class TransientResource(object): @@ -663,6 +840,8 @@ def transient_internet(resource_name, *, timeout=30.0, errnos=()): default_gai_errnos = [ ('EAI_NONAME', -2), ('EAI_NODATA', -5), + # Encountered when trying to resolve IPv6-only hostnames + ('WSANO_DATA', 11004), ] denied = ResourceDenied("Resource '%s' is not available" % resource_name) @@ -731,6 +910,12 @@ def captured_output(stream_name): def captured_stdout(): return captured_output("stdout") +def captured_stderr(): + return captured_output("stderr") + +def captured_stdin(): + return captured_output("stdin") + def gc_collect(): """Force as many objects as possible to be collected. @@ -742,10 +927,22 @@ def gc_collect(): objects to disappear. """ gc.collect() + if is_jython: + time.sleep(0.1) gc.collect() gc.collect() +def python_is_optimized(): + """Find if Python was built with optimizations.""" + cflags = sysconfig.get_config_var('PY_CFLAGS') or '' + final_opt = "" + for opt in cflags.split(): + if opt.startswith('-O'): + final_opt = opt + return final_opt and final_opt != '-O0' + + #======================================================================= # Decorator for running a function in a different locale, correctly resetting # it afterwards. @@ -796,7 +993,6 @@ _4G = 4 * _1G MAX_Py_ssize_t = sys.maxsize def set_memlimit(limit): - import re global max_memuse global real_max_memuse sizes = { @@ -1034,31 +1230,50 @@ def modules_cleanup(oldmodules): if k.startswith('encodings.')] sys.modules.clear() sys.modules.update(encodings) + # XXX: This kind of problem can affect more than just encodings. In particular + # extension modules (such as _ssl) don't cope with reloading properly. + # Really, test modules should be cleaning out the test specific modules they + # know they added (ala test_runpy) rather than relying on this function (as + # test_importhooks and test_pkg do currently). + # Implicitly imported *real* modules should be left alone (see issue 10556). sys.modules.update(oldmodules) #======================================================================= # Threading support to prevent reporting refleaks when running regrtest.py -R -def threading_setup(): - import threading - return len(threading._active), len(threading._limbo) +# NOTE: we use thread._count() rather than threading.enumerate() (or the +# moral equivalent thereof) because a threading.Thread object is still alive +# until its __bootstrap() method has returned, even after it has been +# unregistered from the threading module. +# thread._count(), on the other hand, only gets decremented *after* the +# __bootstrap() method has returned, which gives us reliable reference counts +# at the end of a test run. -def threading_cleanup(num_active, num_limbo): - import threading - import time +def threading_setup(): + if _thread: + return _thread._count(), + else: + return 1, +def threading_cleanup(nb_threads): + if not _thread: + return _MAX_COUNT = 10 - count = 0 - while len(threading._active) != num_active and count < _MAX_COUNT: - count += 1 - time.sleep(0.1) - - count = 0 - while len(threading._limbo) != num_limbo and count < _MAX_COUNT: - count += 1 + for count in range(_MAX_COUNT): + n = _thread._count() + if n == nb_threads: + break time.sleep(0.1) + # XXX print a warning in case of failure? def reap_threads(func): + """Use this function when threads are being used. This will + ensure that the threads are cleaned up even when the test fails. + If threading is unavailable this function does nothing. + """ + if not _thread: + return func + @functools.wraps(func) def decorator(*args): key = threading_setup() @@ -1088,6 +1303,60 @@ def reap_children(): except: break +@contextlib.contextmanager +def swap_attr(obj, attr, new_val): + """Temporary swap out an attribute with a new object. + + Usage: + with swap_attr(obj, "attr", 5): + ... + + This will set obj.attr to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `attr` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + """ + if hasattr(obj, attr): + real_val = getattr(obj, attr) + setattr(obj, attr, new_val) + try: + yield + finally: + setattr(obj, attr, real_val) + else: + setattr(obj, attr, new_val) + try: + yield + finally: + delattr(obj, attr) + +@contextlib.contextmanager +def swap_item(obj, item, new_val): + """Temporary swap out an item with a new object. + + Usage: + with swap_item(obj, "item", 5): + ... + + This will set obj["item"] to 5 for the duration of the with: block, + restoring the old value at the end of the block. If `item` doesn't + exist on `obj`, it will be created and then deleted at the end of the + block. + """ + if item in obj: + real_val = obj[item] + obj[item] = new_val + try: + yield + finally: + obj[item] = real_val + else: + obj[item] = new_val + try: + yield + finally: + del obj[item] + def strip_python_stderr(stderr): """Strip the stderr of a Python process from potential debug output emitted by the interpreter. @@ -1098,6 +1367,110 @@ def strip_python_stderr(stderr): stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() return stderr +def args_from_interpreter_flags(): + """Return a list of command-line arguments reproducing the current + settings in sys.flags.""" + flag_opt_map = { + 'bytes_warning': 'b', + 'dont_write_bytecode': 'B', + 'ignore_environment': 'E', + 'no_user_site': 's', + 'no_site': 'S', + 'optimize': 'O', + 'verbose': 'v', + } + args = [] + for flag, opt in flag_opt_map.items(): + v = getattr(sys.flags, flag) + if v > 0: + args.append('-' + opt * v) + return args + +#============================================================ +# Support for assertions about logging. +#============================================================ + +class TestHandler(logging.handlers.BufferingHandler): + def __init__(self, matcher): + # BufferingHandler takes a "capacity" argument + # so as to know when to flush. As we're overriding + # shouldFlush anyway, we can set a capacity of zero. + # You can call flush() manually to clear out the + # buffer. + logging.handlers.BufferingHandler.__init__(self, 0) + self.matcher = matcher + + def shouldFlush(self): + return False + + def emit(self, record): + self.format(record) + self.buffer.append(record.__dict__) + + def matches(self, **kwargs): + """ + Look for a saved dict whose keys/values match the supplied arguments. + """ + result = False + for d in self.buffer: + if self.matcher.matches(d, **kwargs): + result = True + break + return result + +class Matcher(object): + + _partial_matches = ('msg', 'message') + + def matches(self, d, **kwargs): + """ + Try to match a single dict with the supplied arguments. + + Keys whose values are strings and which are in self._partial_matches + will be checked for partial (i.e. substring) matches. You can extend + this scheme to (for example) do regular expression matching, etc. + """ + result = True + for k in kwargs: + v = kwargs[k] + dv = d.get(k) + if not self.match_value(k, dv, v): + result = False + break + return result + + def match_value(self, k, dv, v): + """ + Try to match a single stored value (dv) with a supplied value (v). + """ + if type(v) != type(dv): + result = False + elif type(dv) is not str or k not in self._partial_matches: + result = (v == dv) + else: + result = dv.find(v) >= 0 + return result + + +_can_symlink = None +def can_symlink(): + global _can_symlink + if _can_symlink is not None: + return _can_symlink + try: + os.symlink(TESTFN, TESTFN + "can_symlink") + can = True + except (OSError, NotImplementedError): + can = False + _can_symlink = can + return can + +def skip_unless_symlink(test): + """Skip decorator for tests that require functional symlink""" + ok = can_symlink() + msg = "Requires functional symlink implementation" + return test if ok else unittest.skip(msg)(test) + def patch(test_instance, object_to_patch, attr_name, new_value): """Override 'object_to_patch'.'attr_name' with 'new_value'. |