diff options
Diffstat (limited to 'Lib/test/support.py')
-rw-r--r-- | Lib/test/support.py | 769 |
1 files changed, 769 insertions, 0 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py new file mode 100644 index 0000000..b8bc406 --- /dev/null +++ b/Lib/test/support.py @@ -0,0 +1,769 @@ +"""Supporting definitions for the Python regression tests.""" + +if __name__ != 'test.support': + raise ImportError('support must be imported from the test package') + +import contextlib +import errno +import socket +import sys +import os +import os.path +import shutil +import warnings +import unittest + +class Error(Exception): + """Base class for regression test exceptions.""" + +class TestFailed(Error): + """Test failed.""" + +class TestSkipped(Error): + """Test skipped. + + This can be raised to indicate that a test was deliberatly + skipped, but not because a feature wasn't available. For + example, if some resource can't be used, such as the network + appears to be unavailable, this should be raised instead of + TestFailed. + """ + +class ResourceDenied(TestSkipped): + """Test skipped because it requested a disallowed resource. + + This is raised when a test calls requires() for a resource that + has not be enabled. It is used to distinguish between expected + and unexpected skips. + """ + +def import_module(name, deprecated=False): + """Import the module to be tested, raising TestSkipped if it is not + available.""" + with catch_warning(record=False): + if deprecated: + warnings.filterwarnings("ignore", ".+ (module|package)", + DeprecationWarning) + try: + module = __import__(name, level=0) + except ImportError: + raise TestSkipped("No module named " + name) + else: + return module + +verbose = 1 # Flag set to 0 by regrtest.py +use_resources = None # Flag set to [] by regrtest.py +max_memuse = 0 # Disable bigmem tests (they will still be run with + # small sizes, to make sure they work.) + +# _original_stdout is meant to hold stdout at the time regrtest began. +# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. +# The point is to have some flavor of stdout the user can actually see. +_original_stdout = None +def record_original_stdout(stdout): + global _original_stdout + _original_stdout = stdout + +def get_original_stdout(): + return _original_stdout or sys.stdout + +def unload(name): + try: + del sys.modules[name] + except KeyError: + pass + +def unlink(filename): + try: + os.unlink(filename) + except OSError: + pass + +def rmtree(path): + try: + shutil.rmtree(path) + except OSError as e: + # Unix returns ENOENT, Windows returns ESRCH. + if e.errno not in (errno.ENOENT, errno.ESRCH): + raise + +def forget(modname): + '''"Forget" a module was ever imported by removing it from sys.modules and + deleting any .pyc and .pyo files.''' + unload(modname) + for dirname in sys.path: + unlink(os.path.join(dirname, modname + '.pyc')) + # Deleting the .pyo file cannot be within the 'try' for the .pyc since + # the chance exists that there is no .pyc (and thus the 'try' statement + # is exited) but there is a .pyo file. + unlink(os.path.join(dirname, modname + '.pyo')) + +def is_resource_enabled(resource): + """Test whether a resource is enabled. Known resources are set by + regrtest.py.""" + return use_resources is not None and resource in use_resources + +def requires(resource, msg=None): + """Raise ResourceDenied if the specified resource is not available. + + If the caller's module is __main__ then automatically return True. The + possibility of False being returned occurs when regrtest.py is executing.""" + # see if the caller's module is __main__ - if so, treat as if + # the resource was set + if sys._getframe().f_back.f_globals.get("__name__") == "__main__": + return + if not is_resource_enabled(resource): + if msg is None: + msg = "Use of the `%s' resource not enabled" % resource + raise ResourceDenied(msg) + +HOST = 'localhost' + +def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): + """Returns an unused port that should be suitable for binding. This is + achieved by creating a temporary socket with the same family and type as + the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to + the specified host address (defaults to 0.0.0.0) with the port set to 0, + eliciting an unused ephemeral port from the OS. The temporary socket is + then closed and deleted, and the ephemeral port is returned. + + Either this method or bind_port() should be used for any tests where a + server socket needs to be bound to a particular port for the duration of + the test. Which one to use depends on whether the calling code is creating + a python socket, or if an unused port needs to be provided in a constructor + or passed to an external program (i.e. the -accept argument to openssl's + s_server mode). Always prefer bind_port() over find_unused_port() where + possible. Hard coded ports should *NEVER* be used. As soon as a server + socket is bound to a hard coded port, the ability to run multiple instances + of the test simultaneously on the same host is compromised, which makes the + test a ticking time bomb in a buildbot environment. On Unix buildbots, this + may simply manifest as a failed test, which can be recovered from without + intervention in most cases, but on Windows, the entire python process can + completely and utterly wedge, requiring someone to log in to the buildbot + and manually kill the affected process. + + (This is easy to reproduce on Windows, unfortunately, and can be traced to + the SO_REUSEADDR socket option having different semantics on Windows versus + Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, + listen and then accept connections on identical host/ports. An EADDRINUSE + socket.error will be raised at some point (depending on the platform and + the order bind and listen were called on each socket). + + However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE + will ever be raised when attempting to bind two identical host/ports. When + accept() is called on each socket, the second caller's process will steal + the port from the first caller, leaving them both in an awkwardly wedged + state where they'll no longer respond to any signals or graceful kills, and + must be forcibly killed via OpenProcess()/TerminateProcess(). + + The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option + instead of SO_REUSEADDR, which effectively affords the same semantics as + SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open + Source world compared to Windows ones, this is a common mistake. A quick + look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when + openssl.exe is called with the 's_server' option, for example. See + http://bugs.python.org/issue2550 for more info. The following site also + has a very thorough description about the implications of both REUSEADDR + and EXCLUSIVEADDRUSE on Windows: + http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) + + XXX: although this approach is a vast improvement on previous attempts to + elicit unused ports, it rests heavily on the assumption that the ephemeral + port returned to us by the OS won't immediately be dished back out to some + other process when we close and delete our temporary socket but before our + calling code has a chance to bind the returned port. We can deal with this + issue if/when we come across it. + """ + + tempsock = socket.socket(family, socktype) + port = bind_port(tempsock) + tempsock.close() + del tempsock + return port + +def bind_port(sock, host=HOST): + """Bind the socket to a free port and return the port number. Relies on + ephemeral ports in order to ensure we are using an unbound port. This is + important as many tests may be running simultaneously, especially in a + buildbot environment. This method raises an exception if the sock.family + is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR + or SO_REUSEPORT set on it. Tests should *never* set these socket options + for TCP/IP sockets. The only case for setting these options is testing + multicasting via multiple UDP sockets. + + Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. + on Windows), it will be set on the socket. This will prevent anyone else + from bind()'ing to our host/port for the duration of the test. + """ + + if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: + if hasattr(socket, 'SO_REUSEADDR'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: + raise TestFailed("tests should never set the SO_REUSEADDR " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_REUSEPORT'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: + raise TestFailed("tests should never set the SO_REUSEPORT " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) + + sock.bind((host, 0)) + port = sock.getsockname()[1] + return port + +FUZZ = 1e-6 + +def fcmp(x, y): # fuzzy comparison function + if isinstance(x, float) or isinstance(y, float): + try: + fuzz = (abs(x) + abs(y)) * FUZZ + if abs(x-y) <= fuzz: + return 0 + except: + pass + elif type(x) == type(y) and isinstance(x, (tuple, list)): + for i in range(min(len(x), len(y))): + outcome = fcmp(x[i], y[i]) + if outcome != 0: + return outcome + return (len(x) > len(y)) - (len(x) < len(y)) + return (x > y) - (x < y) + +try: + str + have_unicode = True +except NameError: + have_unicode = False + +is_jython = sys.platform.startswith('java') + +# Filename used for testing +if os.name == 'java': + # Jython disallows @ in module names + TESTFN = '$test' +else: + TESTFN = '@test' + + # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() + # TESTFN_UNICODE is a filename that can be encoded using the + # file system encoding, but *not* with the default (ascii) encoding + TESTFN_UNICODE = "@test-\xe0\xf2" + TESTFN_ENCODING = sys.getfilesystemencoding() + # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be + # able to be encoded by *either* the default or filesystem encoding. + # This test really only makes sense on Windows NT platforms + # which have special Unicode support in posixmodule. + if (not hasattr(sys, "getwindowsversion") or + sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME + TESTFN_UNICODE_UNENCODEABLE = None + else: + # Japanese characters (I think - from bug 846133) + TESTFN_UNICODE_UNENCODEABLE = "@test-\u5171\u6709\u3055\u308c\u308b" + try: + # XXX - Note - should be using TESTFN_ENCODING here - but for + # Windows, "mbcs" currently always operates as if in + # errors=ignore' mode - hence we get '?' characters rather than + # the exception. 'Latin1' operates as we expect - ie, fails. + # See [ 850997 ] mbcs encoding ignores errors + TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") + except UnicodeEncodeError: + pass + else: + print('WARNING: The filename %r CAN be encoded by the filesystem. ' + 'Unicode filename tests may not be effective' + % TESTFN_UNICODE_UNENCODEABLE) + +# Make sure we can write to TESTFN, try in /tmp if we can't +fp = None +try: + fp = open(TESTFN, 'w+') +except IOError: + TMP_TESTFN = os.path.join('/tmp', TESTFN) + try: + fp = open(TMP_TESTFN, 'w+') + TESTFN = TMP_TESTFN + del TMP_TESTFN + except IOError: + print(('WARNING: tests will fail, unable to write to: %s or %s' % + (TESTFN, TMP_TESTFN))) +if fp is not None: + fp.close() + unlink(TESTFN) +del fp + +def findfile(file, here=__file__): + """Try to find a file on sys.path and the working directory. If it is not + found the argument passed to the function is returned (this does not + necessarily signal failure; could still be the legitimate path).""" + if os.path.isabs(file): + return file + path = sys.path + path = [os.path.dirname(here)] + path + for dn in path: + fn = os.path.join(dn, file) + if os.path.exists(fn): return fn + return file + +def verify(condition, reason='test failed'): + """Verify that condition is true. If not, raise TestFailed. + + The optional argument reason can be given to provide + a better error text. + """ + + if not condition: + raise TestFailed(reason) + +def vereq(a, b): + """Raise TestFailed if a == b is false. + + This is better than verify(a == b) because, in case of failure, the + error message incorporates repr(a) and repr(b) so you can see the + inputs. + + Note that "not (a == b)" isn't necessarily the same as "a != b"; the + former is tested. + """ + + if not (a == b): + raise TestFailed("%r == %r" % (a, b)) + +def sortdict(dict): + "Like repr(dict), but in sorted order." + items = sorted(dict.items()) + reprpairs = ["%r: %r" % pair for pair in items] + withcommas = ", ".join(reprpairs) + return "{%s}" % withcommas + +def check_syntax_error(testcase, statement): + try: + compile(statement, '<test string>', 'exec') + except SyntaxError: + pass + else: + testcase.fail('Missing SyntaxError: "%s"' % statement) + +def open_urlresource(url, *args, **kw): + import urllib, urlparse + + requires('urlfetch') + filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! + + for path in [os.path.curdir, os.path.pardir]: + fn = os.path.join(path, filename) + if os.path.exists(fn): + return open(fn, *args, **kw) + + print('\tfetching %s ...' % url, file=get_original_stdout()) + fn, _ = urllib.urlretrieve(url, filename) + return open(fn, *args, **kw) + + +class WarningMessage(object): + "Holds the result of the latest showwarning() call" + def __init__(self): + self.message = None + self.category = None + self.filename = None + self.lineno = None + + def _showwarning(self, message, category, filename, lineno, file=None, + line=None): + self.message = message + self.category = category + self.filename = filename + self.lineno = lineno + self.line = line + + def reset(self): + self._showwarning(*((None,)*6)) + + def __str__(self): + return ("{message : %r, category : %r, filename : %r, lineno : %s, " + "line : %r}" % (self.message, + self.category.__name__ if self.category else None, + self.filename, self.lineno, self.line)) + + +@contextlib.contextmanager +def catch_warning(module=warnings, record=True): + """ + Guard the warnings filter from being permanently changed and record the + data of the last warning that has been issued. + + Use like this: + + with catch_warning() as w: + warnings.warn("foo") + assert str(w.message) == "foo" + """ + original_filters = module.filters[:] + original_showwarning = module.showwarning + if record: + warning_obj = WarningMessage() + module.showwarning = warning_obj._showwarning + try: + yield warning_obj if record else None + finally: + module.showwarning = original_showwarning + module.filters = original_filters + + +class CleanImport(object): + """Context manager to force import to return a new module reference. + + This is useful for testing module-level behaviours, such as + the emission of a DepreciationWarning on import. + + Use like this: + + with CleanImport("foo"): + __import__("foo") # new reference + """ + + def __init__(self, *module_names): + self.original_modules = sys.modules.copy() + for module_name in module_names: + if module_name in sys.modules: + module = sys.modules[module_name] + # It is possible that module_name is just an alias for + # another module (e.g. stub for modules renamed in 3.x). + # In that case, we also need delete the real module to clear + # the import cache. + if module.__name__ != module_name: + del sys.modules[module.__name__] + del sys.modules[module_name] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + sys.modules.update(self.original_modules) + + +class EnvironmentVarGuard(object): + + """Class to help protect the environment variable properly. Can be used as + a context manager.""" + + def __init__(self): + self._environ = os.environ + self._unset = set() + self._reset = dict() + + def set(self, envvar, value): + if envvar not in self._environ: + self._unset.add(envvar) + else: + self._reset[envvar] = self._environ[envvar] + self._environ[envvar] = value + + def unset(self, envvar): + if envvar in self._environ: + self._reset[envvar] = self._environ[envvar] + del self._environ[envvar] + + def __enter__(self): + return self + + def __exit__(self, *ignore_exc): + for envvar, value in self._reset.items(): + self._environ[envvar] = value + for unset in self._unset: + del self._environ[unset] + +class TransientResource(object): + + """Raise ResourceDenied if an exception is raised while the context manager + is in effect that matches the specified exception and attributes.""" + + def __init__(self, exc, **kwargs): + self.exc = exc + self.attrs = kwargs + + def __enter__(self): + return self + + def __exit__(self, type_=None, value=None, traceback=None): + """If type_ is a subclass of self.exc and value has attributes matching + self.attrs, raise ResourceDenied. Otherwise let the exception + propagate (if any).""" + if type_ is not None and issubclass(self.exc, type_): + for attr, attr_value in self.attrs.items(): + if not hasattr(value, attr): + break + if getattr(value, attr) != attr_value: + break + else: + raise ResourceDenied("an optional resource is not available") + + +def transient_internet(): + """Return a context manager that raises ResourceDenied when various issues + with the Internet connection manifest themselves as exceptions.""" + time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) + socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) + ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) + return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) + + +@contextlib.contextmanager +def captured_output(stream_name): + """Run the 'with' statement body using a StringIO object in place of a + specific attribute on the sys module. + Example use (with 'stream_name=stdout'):: + + with captured_stdout() as s: + print("hello") + assert s.getvalue() == "hello" + """ + import io + orig_stdout = getattr(sys, stream_name) + setattr(sys, stream_name, io.StringIO()) + try: + yield getattr(sys, stream_name) + finally: + setattr(sys, stream_name, orig_stdout) + +def captured_stdout(): + return captured_output("stdout") + + +#======================================================================= +# Decorator for running a function in a different locale, correctly resetting +# it afterwards. + +def run_with_locale(catstr, *locales): + def decorator(func): + def inner(*args, **kwds): + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except: + # cannot retrieve original locale, so do nothing + locale = orig_locale = None + else: + for loc in locales: + try: + locale.setlocale(category, loc) + break + except: + pass + + # now run the function, resetting the locale on exceptions + try: + return func(*args, **kwds) + finally: + if locale and orig_locale: + locale.setlocale(category, orig_locale) + inner.__name__ = func.__name__ + inner.__doc__ = func.__doc__ + return inner + return decorator + +#======================================================================= +# Big-memory-test support. Separate from 'resources' because memory use +# should be configurable. + +# Some handy shorthands. Note that these are used for byte-limits as well +# as size-limits, in the various bigmem tests +_1M = 1024*1024 +_1G = 1024 * _1M +_2G = 2 * _1G + +MAX_Py_ssize_t = sys.maxsize + +def set_memlimit(limit): + import re + global max_memuse + sizes = { + 'k': 1024, + 'm': _1M, + 'g': _1G, + 't': 1024*_1G, + } + m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + re.IGNORECASE | re.VERBOSE) + if m is None: + raise ValueError('Invalid memory limit %r' % (limit,)) + memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) + if memlimit > MAX_Py_ssize_t: + memlimit = MAX_Py_ssize_t + if memlimit < _2G - 1: + raise ValueError('Memory limit %r too low to be useful' % (limit,)) + max_memuse = memlimit + +def bigmemtest(minsize, memuse, overhead=5*_1M): + """Decorator for bigmem tests. + + 'minsize' is the minimum useful size for the test (in arbitrary, + test-interpreted units.) 'memuse' is the number of 'bytes per size' for + the test, or a good estimate of it. 'overhead' specifies fixed overhead, + independent of the testsize, and defaults to 5Mb. + + The decorator tries to guess a good value for 'size' and passes it to + the decorated test function. If minsize * memuse is more than the + allowed memory use (as defined by max_memuse), the test is skipped. + Otherwise, minsize is adjusted upward to use up to max_memuse. + """ + def decorator(f): + def wrapper(self): + if not max_memuse: + # If max_memuse is 0 (the default), + # we still want to run the tests with size set to a few kb, + # to make sure they work. We still want to avoid using + # too much memory, though, but we do that noisily. + maxsize = 5147 + self.failIf(maxsize * memuse + overhead > 20 * _1M) + else: + maxsize = int((max_memuse - overhead) / memuse) + if maxsize < minsize: + # Really ought to print 'test skipped' or something + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + return + # Try to keep some breathing room in memory use + maxsize = max(maxsize - 50 * _1M, minsize) + return f(self, maxsize) + wrapper.minsize = minsize + wrapper.memuse = memuse + wrapper.overhead = overhead + return wrapper + return decorator + +def bigaddrspacetest(f): + """Decorator for tests that fill the address space.""" + def wrapper(self): + if max_memuse < MAX_Py_ssize_t: + if verbose: + sys.stderr.write("Skipping %s because of memory " + "constraint\n" % (f.__name__,)) + else: + return f(self) + return wrapper + +#======================================================================= +# unittest integration. + +class BasicTestRunner: + def run(self, test): + result = unittest.TestResult() + test(result) + return result + + +def _run_suite(suite): + """Run tests from a unittest.TestSuite-derived class.""" + if verbose: + runner = unittest.TextTestRunner(sys.stdout, verbosity=2) + else: + runner = BasicTestRunner() + + result = runner.run(suite) + if not result.wasSuccessful(): + if len(result.errors) == 1 and not result.failures: + err = result.errors[0][1] + elif len(result.failures) == 1 and not result.errors: + err = result.failures[0][1] + else: + err = "errors occurred; run in verbose mode for details" + raise TestFailed(err) + + +def run_unittest(*classes): + """Run tests from unittest.TestCase-derived classes.""" + valid_types = (unittest.TestSuite, unittest.TestCase) + suite = unittest.TestSuite() + for cls in classes: + if isinstance(cls, str): + if cls in sys.modules: + suite.addTest(unittest.findTestCases(sys.modules[cls])) + else: + raise ValueError("str arguments must be keys in sys.modules") + elif isinstance(cls, valid_types): + suite.addTest(cls) + else: + suite.addTest(unittest.makeSuite(cls)) + _run_suite(suite) + + +#======================================================================= +# doctest driver. + +def run_doctest(module, verbosity=None): + """Run doctest on the given module. Return (#failures, #tests). + + If optional argument verbosity is not specified (or is None), pass + support's belief about verbosity on to doctest. Else doctest's + usual behavior is used (it searches sys.argv for -v). + """ + + import doctest + + if verbosity is None: + verbosity = verbose + else: + verbosity = None + + # Direct doctest output (normally just errors) to real stdout; doctest + # output shouldn't be compared by regrtest. + save_stdout = sys.stdout + sys.stdout = get_original_stdout() + try: + f, t = doctest.testmod(module, verbose=verbosity) + if f: + raise TestFailed("%d of %d doctests failed" % (f, t)) + finally: + sys.stdout = save_stdout + if verbose: + print('doctest (%s) ... %d tests with zero failures' % + (module.__name__, t)) + return f, t + +#======================================================================= +# Threading support to prevent reporting refleaks when running regrtest.py -R + +def threading_setup(): + import threading + return len(threading._active), len(threading._limbo) + +def threading_cleanup(num_active, num_limbo): + import threading + import time + + _MAX_COUNT = 10 + count = 0 + while len(threading._active) != num_active and count < _MAX_COUNT: + count += 1 + time.sleep(0.1) + + count = 0 + while len(threading._limbo) != num_limbo and count < _MAX_COUNT: + count += 1 + time.sleep(0.1) + +def reap_children(): + """Use this function at the end of test_main() whenever sub-processes + are started. This will help ensure that no extra children (zombies) + stick around to hog resources and create problems when looking + for refleaks. + """ + + # Reap all our dead child processes so we don't leave zombies around. + # These hog resources and might be causing some of the buildbots to die. + if hasattr(os, 'waitpid'): + any_process = -1 + while True: + try: + # This will raise an exception on Windows. That's ok. + pid, status = os.waitpid(any_process, os.WNOHANG) + if pid == 0: + break + except: + break |