summaryrefslogtreecommitdiffstats
path: root/Lib/test/support.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/support.py')
-rw-r--r--Lib/test/support.py591
1 files changed, 482 insertions, 109 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py
index dcfadea..2a06661 100644
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -10,28 +10,40 @@ import gc
import socket
import sys
import os
-import re
import platform
import shutil
import warnings
import unittest
import importlib
import collections
+import re
+import subprocess
+import imp
+import time
+import sysconfig
+import logging.handlers
+
+try:
+ import _thread
+except ImportError:
+ _thread = None
__all__ = [
"Error", "TestFailed", "ResourceDenied", "import_module",
"verbose", "use_resources", "max_memuse", "record_original_stdout",
"get_original_stdout", "unload", "unlink", "rmtree", "forget",
"is_resource_enabled", "requires", "find_unused_port", "bind_port",
- "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "findfile", "verify",
- "vereq", "sortdict", "check_syntax_error", "open_urlresource",
+ "fcmp", "is_jython", "TESTFN", "HOST", "FUZZ", "SAVEDCWD", "temp_cwd",
+ "findfile", "sortdict", "check_syntax_error", "open_urlresource",
"check_warnings", "CleanImport", "EnvironmentVarGuard",
"TransientResource", "captured_output", "captured_stdout",
"time_out", "socket_peer_reset", "ioerror_peer_reset",
- "run_with_locale", "transient_internet",
+ "run_with_locale", 'temp_umask', "transient_internet",
"set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner",
"run_unittest", "run_doctest", "threading_setup", "threading_cleanup",
"reap_children", "cpython_only", "check_impl_detail", "get_attribute",
+ "swap_item", "swap_attr", "requires_IEEE_754",
+ "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
"import_fresh_module"
]
@@ -177,27 +189,50 @@ def unload(name):
def unlink(filename):
try:
os.unlink(filename)
- except OSError:
- pass
+ except OSError as error:
+ # The filename need not exist.
+ if error.errno not in (errno.ENOENT, errno.ENOTDIR):
+ raise
def rmtree(path):
try:
shutil.rmtree(path)
- except OSError as e:
+ except OSError as error:
# Unix returns ENOENT, Windows returns ESRCH.
- if e.errno not in (errno.ENOENT, errno.ESRCH):
+ if error.errno not in (errno.ENOENT, errno.ESRCH):
raise
+def make_legacy_pyc(source):
+ """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
+
+ The choice of .pyc or .pyo extension is done based on the __debug__ flag
+ value.
+
+ :param source: The file system path to the source file. The source file
+ does not need to exist, however the PEP 3147 pyc file must exist.
+ :return: The file system path to the legacy pyc file.
+ """
+ pyc_file = imp.cache_from_source(source)
+ up_one = os.path.dirname(os.path.abspath(source))
+ legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
+ os.rename(pyc_file, legacy_pyc)
+ return legacy_pyc
+
def forget(modname):
- '''"Forget" a module was ever imported by removing it from sys.modules and
- deleting any .pyc and .pyo files.'''
+ """'Forget' a module was ever imported.
+
+ This removes the module from sys.modules and deletes any PEP 3147 or
+ legacy .pyc and .pyo files.
+ """
unload(modname)
for dirname in sys.path:
- unlink(os.path.join(dirname, modname + '.pyc'))
- # Deleting the .pyo file cannot be within the 'try' for the .pyc since
- # the chance exists that there is no .pyc (and thus the 'try' statement
- # is exited) but there is a .pyo file.
- unlink(os.path.join(dirname, modname + '.pyo'))
+ source = os.path.join(dirname, modname + '.py')
+ # It doesn't matter if they exist or not, unlink all possible
+ # combinations of PEP 3147 and legacy pyc and pyo files.
+ unlink(source + 'c')
+ unlink(source + 'o')
+ unlink(imp.cache_from_source(source, debug_override=True))
+ unlink(imp.cache_from_source(source, debug_override=False))
# On some platforms, should not run gui test even if it is allowed
# in `use_resources'.
@@ -238,7 +273,9 @@ def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True. The
- possibility of False being returned occurs when regrtest.py is executing."""
+ possibility of False being returned occurs when regrtest.py is
+ executing.
+ """
if resource == 'gui' and not _is_gui_available():
raise unittest.SkipTest("Cannot use the 'gui' resource")
# see if the caller's module is __main__ - if so, treat as if
@@ -363,6 +400,11 @@ def fcmp(x, y): # fuzzy comparison function
return (len(x) > len(y)) - (len(x) < len(y))
return (x > y) - (x < y)
+# decorator for skipping tests on non-IEEE 754 platforms
+requires_IEEE_754 = unittest.skipUnless(
+ float.__getformat__("double").startswith("IEEE"),
+ "test requires IEEE 754 doubles")
+
is_jython = sys.platform.startswith('java')
# Filename used for testing
@@ -372,57 +414,106 @@ if os.name == 'java':
else:
TESTFN = '@test'
- # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
- # TESTFN_UNICODE is a filename that can be encoded using the
- # file system encoding, but *not* with the default (ascii) encoding
- TESTFN_UNICODE = "@test-\xe0\xf2"
- TESTFN_ENCODING = sys.getfilesystemencoding()
- # TESTFN_UNENCODABLE is a filename that should *not* be
- # able to be encoded by *either* the default or filesystem encoding.
- # This test really only makes sense on Windows NT platforms
- # which have special Unicode support in posixmodule.
- if (not hasattr(sys, "getwindowsversion") or
- sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
- TESTFN_UNENCODABLE = None
- else:
+# Disambiguate TESTFN for parallel testing, while letting it remain a valid
+# module name.
+TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
+
+
+# TESTFN_UNICODE is a non-ascii filename
+TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
+if sys.platform == 'darwin':
+ # In Mac OS X's VFS API file names are, by definition, canonically
+ # decomposed Unicode, encoded using UTF-8. See QA1173:
+ # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
+ import unicodedata
+ TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
+TESTFN_ENCODING = sys.getfilesystemencoding()
+
+# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
+# encoded by the filesystem encoding (in strict mode). It can be None if we
+# cannot generate such filename.
+TESTFN_UNENCODABLE = None
+if os.name in ('nt', 'ce'):
+ # skip win32s (0) or Windows 9x/ME (1)
+ if sys.getwindowsversion().platform >= 2:
# Different kinds of characters from various languages to minimize the
# probability that the whole name is encodable to MBCS (issue #9819)
TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
try:
- # XXX - Note - should be using TESTFN_ENCODING here - but for
- # Windows, "mbcs" currently always operates as if in
- # errors=ignore' mode - hence we get '?' characters rather than
- # the exception. 'Latin1' operates as we expect - ie, fails.
- # See [ 850997 ] mbcs encoding ignores errors
- TESTFN_UNENCODABLE.encode("Latin1")
+ TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
except UnicodeEncodeError:
pass
else:
- print('WARNING: The filename %r CAN be encoded by the filesystem. '
+ print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
'Unicode filename tests may not be effective'
- % TESTFN_UNENCODABLE)
+ % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
+ TESTFN_UNENCODABLE = None
+# Mac OS X denies unencodable filenames (invalid utf-8)
+elif sys.platform != 'darwin':
+ try:
+ # ascii and utf-8 cannot encode the byte 0xff
+ b'\xff'.decode(TESTFN_ENCODING)
+ except UnicodeDecodeError:
+ # 0xff will be encoded using the surrogate character u+DCFF
+ TESTFN_UNENCODABLE = TESTFN \
+ + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
+ else:
+ # File system encoding (eg. ISO-8859-* encodings) can encode
+ # the byte 0xff. Skip some unicode filename tests.
+ pass
-if os.path.isdir(TESTFN):
- # a test failed (eg. test_os) without removing TESTFN directory
- shutil.rmtree(TESTFN)
+# Save the initial cwd
+SAVEDCWD = os.getcwd()
-# Make sure we can write to TESTFN, try in /tmp if we can't
-fp = None
-try:
- fp = open(TESTFN, 'w+')
-except IOError:
- TMP_TESTFN = os.path.join('/tmp', TESTFN)
+@contextlib.contextmanager
+def temp_cwd(name='tempcwd', quiet=False, path=None):
+ """
+ Context manager that temporarily changes the CWD.
+
+ An existing path may be provided as *path*, in which case this
+ function makes no changes to the file system.
+
+ Otherwise, the new CWD is created in the current directory and it's
+ named *name*. If *quiet* is False (default) and it's not possible to
+ create or change the CWD, an error is raised. If it's True, only a
+ warning is raised and the original CWD is used.
+ """
+ saved_dir = os.getcwd()
+ is_temporary = False
+ if path is None:
+ path = name
+ try:
+ os.mkdir(name)
+ is_temporary = True
+ except OSError:
+ if not quiet:
+ raise
+ warnings.warn('tests may fail, unable to create temp CWD ' + name,
+ RuntimeWarning, stacklevel=3)
+ try:
+ os.chdir(path)
+ except OSError:
+ if not quiet:
+ raise
+ warnings.warn('tests may fail, unable to change the CWD to ' + name,
+ RuntimeWarning, stacklevel=3)
+ try:
+ yield os.getcwd()
+ finally:
+ os.chdir(saved_dir)
+ if is_temporary:
+ rmtree(name)
+
+
+@contextlib.contextmanager
+def temp_umask(umask):
+ """Context manager that temporarily sets the process umask."""
+ oldmask = os.umask(umask)
try:
- fp = open(TMP_TESTFN, 'w+')
- TESTFN = TMP_TESTFN
- del TMP_TESTFN
- except IOError:
- print(('WARNING: tests will fail, unable to write to: %s or %s' %
- (TESTFN, TMP_TESTFN)))
-if fp is not None:
- fp.close()
- unlink(TESTFN)
-del fp
+ yield
+ finally:
+ os.umask(oldmask)
+
def findfile(file, here=__file__, subdir=None):
"""Try to find a file on sys.path and the working directory. If it is not
@@ -439,30 +530,6 @@ def findfile(file, here=__file__, subdir=None):
if os.path.exists(fn): return fn
return file
-def verify(condition, reason='test failed'):
- """Verify that condition is true. If not, raise TestFailed.
-
- The optional argument reason can be given to provide
- a better error text.
- """
-
- if not condition:
- raise TestFailed(reason)
-
-def vereq(a, b):
- """Raise TestFailed if a == b is false.
-
- This is better than verify(a == b) because, in case of failure, the
- error message incorporates repr(a) and repr(b) so you can see the
- inputs.
-
- Note that "not (a == b)" isn't necessarily the same as "a != b"; the
- former is tested.
- """
-
- if not (a == b):
- raise TestFailed("%r == %r" % (a, b))
-
def sortdict(dict):
"Like repr(dict), but in sorted order."
items = sorted(dict.items())
@@ -489,47 +556,131 @@ def check_syntax_error(testcase, statement):
def open_urlresource(url, *args, **kw):
import urllib.request, urllib.parse
- requires('urlfetch')
+ check = kw.pop('check', None)
+
filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
- for path in [os.path.curdir, os.path.pardir]:
- fn = os.path.join(path, filename)
- if os.path.exists(fn):
- return open(fn, *args, **kw)
+ fn = os.path.join(os.path.dirname(__file__), "data", filename)
+
+ def check_valid_file(fn):
+ f = open(fn, *args, **kw)
+ if check is None:
+ return f
+ elif check(f):
+ f.seek(0)
+ return f
+ f.close()
+
+ if os.path.exists(fn):
+ f = check_valid_file(fn)
+ if f is not None:
+ return f
+ unlink(fn)
+
+ # Verify the requirement before downloading the file
+ requires('urlfetch')
print('\tfetching %s ...' % url, file=get_original_stdout())
f = urllib.request.urlopen(url, timeout=15)
try:
- with open(filename, "wb") as out:
+ with open(fn, "wb") as out:
s = f.read()
while s:
out.write(s)
s = f.read()
finally:
f.close()
- return open(filename, *args, **kw)
+
+ f = check_valid_file(fn)
+ if f is not None:
+ return f
+ raise TestFailed('invalid resource "%s"' % fn)
+
class WarningsRecorder(object):
"""Convenience wrapper for the warnings list returned on
entry to the warnings.catch_warnings() context manager.
"""
def __init__(self, warnings_list):
- self.warnings = warnings_list
+ self._warnings = warnings_list
+ self._last = 0
def __getattr__(self, attr):
- if self.warnings:
- return getattr(self.warnings[-1], attr)
+ if len(self._warnings) > self._last:
+ return getattr(self._warnings[-1], attr)
elif attr in warnings.WarningMessage._WARNING_DETAILS:
return None
raise AttributeError("%r has no attribute %r" % (self, attr))
+ @property
+ def warnings(self):
+ return self._warnings[self._last:]
+
def reset(self):
- del self.warnings[:]
+ self._last = len(self._warnings)
-@contextlib.contextmanager
-def check_warnings():
+
+def _filterwarnings(filters, quiet=False):
+ """Catch the warnings, then check if all the expected
+ warnings have been raised and re-raise unexpected warnings.
+ If 'quiet' is True, only re-raise the unexpected warnings.
+ """
+ # Clear the warning registry of the calling module
+ # in order to re-raise the warnings.
+ frame = sys._getframe(2)
+ registry = frame.f_globals.get('__warningregistry__')
+ if registry:
+ registry.clear()
with warnings.catch_warnings(record=True) as w:
+ # Set filter "always" to record all warnings. Because
+ # test_warnings swap the module, we need to look up in
+ # the sys.modules dictionary.
+ sys.modules['warnings'].simplefilter("always")
yield WarningsRecorder(w)
+ # Filter the recorded warnings
+ reraise = list(w)
+ missing = []
+ for msg, cat in filters:
+ seen = False
+ for w in reraise[:]:
+ warning = w.message
+ # Filter out the matching messages
+ if (re.match(msg, str(warning), re.I) and
+ issubclass(warning.__class__, cat)):
+ seen = True
+ reraise.remove(w)
+ if not seen and not quiet:
+ # This filter caught nothing
+ missing.append((msg, cat.__name__))
+ if reraise:
+ raise AssertionError("unhandled warning %s" % reraise[0])
+ if missing:
+ raise AssertionError("filter (%r, %s) did not catch any warning" %
+ missing[0])
+
+
+@contextlib.contextmanager
+def check_warnings(*filters, **kwargs):
+ """Context manager to silence warnings.
+
+ Accept 2-tuples as positional arguments:
+ ("message regexp", WarningCategory)
+
+ Optional argument:
+ - if 'quiet' is True, it does not fail if a filter catches nothing
+ (default True without argument,
+ default False if some filters are defined)
+
+ Without argument, it defaults to:
+ check_warnings(("", Warning), quiet=True)
+ """
+ quiet = kwargs.get('quiet')
+ if not filters:
+ filters = (("", Warning),)
+ # Preserve backward compatibility
+ if quiet is None:
+ quiet = True
+ return _filterwarnings(filters, quiet)
class CleanImport(object):
@@ -541,7 +692,7 @@ class CleanImport(object):
Use like this:
with CleanImport("foo"):
- __import__("foo") # new reference
+ importlib.import_module("foo") # new reference
"""
def __init__(self, *module_names):
@@ -614,6 +765,32 @@ class EnvironmentVarGuard(collections.MutableMapping):
del self._environ[k]
else:
self._environ[k] = v
+ os.environ = self._environ
+
+
+class DirsOnSysPath(object):
+ """Context manager to temporarily add directories to sys.path.
+
+ This makes a copy of sys.path, appends any directories given
+ as positional arguments, then reverts sys.path to the copied
+ settings when the context ends.
+
+ Note that *all* sys.path modifications in the body of the
+ context manager, including replacement of the object,
+ will be reverted at the end of the block.
+ """
+
+ def __init__(self, *paths):
+ self.original_value = sys.path[:]
+ self.original_object = sys.path
+ sys.path.extend(paths)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *ignore_exc):
+ sys.path = self.original_object
+ sys.path[:] = self.original_value
class TransientResource(object):
@@ -663,6 +840,8 @@ def transient_internet(resource_name, *, timeout=30.0, errnos=()):
default_gai_errnos = [
('EAI_NONAME', -2),
('EAI_NODATA', -5),
+ # Encountered when trying to resolve IPv6-only hostnames
+ ('WSANO_DATA', 11004),
]
denied = ResourceDenied("Resource '%s' is not available" % resource_name)
@@ -731,6 +910,12 @@ def captured_output(stream_name):
def captured_stdout():
return captured_output("stdout")
+def captured_stderr():
+ return captured_output("stderr")
+
+def captured_stdin():
+ return captured_output("stdin")
+
def gc_collect():
"""Force as many objects as possible to be collected.
@@ -742,10 +927,22 @@ def gc_collect():
objects to disappear.
"""
gc.collect()
+ if is_jython:
+ time.sleep(0.1)
gc.collect()
gc.collect()
+def python_is_optimized():
+ """Find if Python was built with optimizations."""
+ cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
+ final_opt = ""
+ for opt in cflags.split():
+ if opt.startswith('-O'):
+ final_opt = opt
+ return final_opt and final_opt != '-O0'
+
+
#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
# it afterwards.
@@ -796,7 +993,6 @@ _4G = 4 * _1G
MAX_Py_ssize_t = sys.maxsize
def set_memlimit(limit):
- import re
global max_memuse
global real_max_memuse
sizes = {
@@ -1034,31 +1230,50 @@ def modules_cleanup(oldmodules):
if k.startswith('encodings.')]
sys.modules.clear()
sys.modules.update(encodings)
+ # XXX: This kind of problem can affect more than just encodings. In particular
+ # extension modules (such as _ssl) don't cope with reloading properly.
+ # Really, test modules should be cleaning out the test specific modules they
+ # know they added (ala test_runpy) rather than relying on this function (as
+ # test_importhooks and test_pkg do currently).
+ # Implicitly imported *real* modules should be left alone (see issue 10556).
sys.modules.update(oldmodules)
#=======================================================================
# Threading support to prevent reporting refleaks when running regrtest.py -R
-def threading_setup():
- import threading
- return len(threading._active), len(threading._limbo)
+# NOTE: we use thread._count() rather than threading.enumerate() (or the
+# moral equivalent thereof) because a threading.Thread object is still alive
+# until its __bootstrap() method has returned, even after it has been
+# unregistered from the threading module.
+# thread._count(), on the other hand, only gets decremented *after* the
+# __bootstrap() method has returned, which gives us reliable reference counts
+# at the end of a test run.
-def threading_cleanup(num_active, num_limbo):
- import threading
- import time
+def threading_setup():
+ if _thread:
+ return _thread._count(),
+ else:
+ return 1,
+def threading_cleanup(nb_threads):
+ if not _thread:
+ return
_MAX_COUNT = 10
- count = 0
- while len(threading._active) != num_active and count < _MAX_COUNT:
- count += 1
- time.sleep(0.1)
-
- count = 0
- while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
- count += 1
+ for count in range(_MAX_COUNT):
+ n = _thread._count()
+ if n == nb_threads:
+ break
time.sleep(0.1)
+ # XXX print a warning in case of failure?
def reap_threads(func):
+ """Use this function when threads are being used. This will
+ ensure that the threads are cleaned up even when the test fails.
+ If threading is unavailable this function does nothing.
+ """
+ if not _thread:
+ return func
+
@functools.wraps(func)
def decorator(*args):
key = threading_setup()
@@ -1088,6 +1303,60 @@ def reap_children():
except:
break
+@contextlib.contextmanager
+def swap_attr(obj, attr, new_val):
+ """Temporary swap out an attribute with a new object.
+
+ Usage:
+ with swap_attr(obj, "attr", 5):
+ ...
+
+ This will set obj.attr to 5 for the duration of the with: block,
+ restoring the old value at the end of the block. If `attr` doesn't
+ exist on `obj`, it will be created and then deleted at the end of the
+ block.
+ """
+ if hasattr(obj, attr):
+ real_val = getattr(obj, attr)
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ setattr(obj, attr, real_val)
+ else:
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ delattr(obj, attr)
+
+@contextlib.contextmanager
+def swap_item(obj, item, new_val):
+ """Temporary swap out an item with a new object.
+
+ Usage:
+ with swap_item(obj, "item", 5):
+ ...
+
+ This will set obj["item"] to 5 for the duration of the with: block,
+ restoring the old value at the end of the block. If `item` doesn't
+ exist on `obj`, it will be created and then deleted at the end of the
+ block.
+ """
+ if item in obj:
+ real_val = obj[item]
+ obj[item] = new_val
+ try:
+ yield
+ finally:
+ obj[item] = real_val
+ else:
+ obj[item] = new_val
+ try:
+ yield
+ finally:
+ del obj[item]
+
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.
@@ -1098,6 +1367,110 @@ def strip_python_stderr(stderr):
stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
return stderr
+def args_from_interpreter_flags():
+ """Return a list of command-line arguments reproducing the current
+ settings in sys.flags."""
+ flag_opt_map = {
+ 'bytes_warning': 'b',
+ 'dont_write_bytecode': 'B',
+ 'ignore_environment': 'E',
+ 'no_user_site': 's',
+ 'no_site': 'S',
+ 'optimize': 'O',
+ 'verbose': 'v',
+ }
+ args = []
+ for flag, opt in flag_opt_map.items():
+ v = getattr(sys.flags, flag)
+ if v > 0:
+ args.append('-' + opt * v)
+ return args
+
+#============================================================
+# Support for assertions about logging.
+#============================================================
+
+class TestHandler(logging.handlers.BufferingHandler):
+ def __init__(self, matcher):
+ # BufferingHandler takes a "capacity" argument
+ # so as to know when to flush. As we're overriding
+ # shouldFlush anyway, we can set a capacity of zero.
+ # You can call flush() manually to clear out the
+ # buffer.
+ logging.handlers.BufferingHandler.__init__(self, 0)
+ self.matcher = matcher
+
+ def shouldFlush(self):
+ return False
+
+ def emit(self, record):
+ self.format(record)
+ self.buffer.append(record.__dict__)
+
+ def matches(self, **kwargs):
+ """
+ Look for a saved dict whose keys/values match the supplied arguments.
+ """
+ result = False
+ for d in self.buffer:
+ if self.matcher.matches(d, **kwargs):
+ result = True
+ break
+ return result
+
+class Matcher(object):
+
+ _partial_matches = ('msg', 'message')
+
+ def matches(self, d, **kwargs):
+ """
+ Try to match a single dict with the supplied arguments.
+
+ Keys whose values are strings and which are in self._partial_matches
+ will be checked for partial (i.e. substring) matches. You can extend
+ this scheme to (for example) do regular expression matching, etc.
+ """
+ result = True
+ for k in kwargs:
+ v = kwargs[k]
+ dv = d.get(k)
+ if not self.match_value(k, dv, v):
+ result = False
+ break
+ return result
+
+ def match_value(self, k, dv, v):
+ """
+ Try to match a single stored value (dv) with a supplied value (v).
+ """
+ if type(v) != type(dv):
+ result = False
+ elif type(dv) is not str or k not in self._partial_matches:
+ result = (v == dv)
+ else:
+ result = dv.find(v) >= 0
+ return result
+
+
+_can_symlink = None
+def can_symlink():
+ global _can_symlink
+ if _can_symlink is not None:
+ return _can_symlink
+ try:
+ os.symlink(TESTFN, TESTFN + "can_symlink")
+ can = True
+ except (OSError, NotImplementedError):
+ can = False
+ _can_symlink = can
+ return can
+
+def skip_unless_symlink(test):
+ """Skip decorator for tests that require functional symlink"""
+ ok = can_symlink()
+ msg = "Requires functional symlink implementation"
+ return test if ok else unittest.skip(msg)(test)
+
def patch(test_instance, object_to_patch, attr_name, new_value):
"""Override 'object_to_patch'.'attr_name' with 'new_value'.