"""Supporting definitions for the Python regression tests.""" if __name__ != 'test.test_support': raise ImportError('test_support must be imported from the test package') import contextlib import errno import socket import sys import os import os.path import warnings import types import unittest class Error(Exception): """Base class for regression test exceptions.""" class TestFailed(Error): """Test failed.""" class TestSkipped(Error): """Test skipped. This can be raised to indicate that a test was deliberatly skipped, but not because a feature wasn't available. For example, if some resource can't be used, such as the network appears to be unavailable, this should be raised instead of TestFailed. """ class ResourceDenied(TestSkipped): """Test skipped because it requested a disallowed resource. This is raised when a test calls requires() for a resource that has not be enabled. It is used to distinguish between expected and unexpected skips. """ verbose = 1 # Flag set to 0 by regrtest.py use_resources = None # Flag set to [] by regrtest.py max_memuse = 0 # Disable bigmem tests (they will still be run with # small sizes, to make sure they work.) # _original_stdout is meant to hold stdout at the time regrtest began. # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. # The point is to have some flavor of stdout the user can actually see. _original_stdout = None def record_original_stdout(stdout): global _original_stdout _original_stdout = stdout def get_original_stdout(): return _original_stdout or sys.stdout def unload(name): try: del sys.modules[name] except KeyError: pass def unlink(filename): try: os.unlink(filename) except OSError: pass def forget(modname): '''"Forget" a module was ever imported by removing it from sys.modules and deleting any .pyc and .pyo files.''' unload(modname) for dirname in sys.path: unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) # Deleting the .pyo file cannot be within the 'try' for the .pyc since # the chance exists that there is no .pyc (and thus the 'try' statement # is exited) but there is a .pyo file. unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) def is_resource_enabled(resource): """Test whether a resource is enabled. Known resources are set by regrtest.py.""" return use_resources is not None and resource in use_resources def requires(resource, msg=None): """Raise ResourceDenied if the specified resource is not available. If the caller's module is __main__ then automatically return True. The possibility of False being returned occurs when regrtest.py is executing.""" # see if the caller's module is __main__ - if so, treat as if # the resource was set if sys._getframe().f_back.f_globals.get("__name__") == "__main__": return if not is_resource_enabled(resource): if msg is None: msg = "Use of the `%s' resource not enabled" % resource raise ResourceDenied(msg) def bind_port(sock, host='', preferred_port=54321): """Try to bind the sock to a port. If we are running multiple tests and we don't try multiple ports, the test can fails. This makes the test more robust.""" # some random ports that hopefully no one is listening on. for port in [preferred_port, 9907, 10243, 32999]: try: sock.bind((host, port)) return port except socket.error as e: (err, msg) = e.args if err != errno.EADDRINUSE: raise print(' WARNING: failed to listen on port %d, trying another' % port, file=sys.__stderr__) raise TestFailed('unable to find port to listen on') FUZZ = 1e-6 def fcmp(x, y): # fuzzy comparison function if isinstance(x, float) or isinstance(y, float): try: fuzz = (abs(x) + abs(y)) * FUZZ if abs(x-y) <= fuzz: return 0 except: pass elif type(x) == type(y) and isinstance(x, (tuple, list)): for i in range(min(len(x), len(y))): outcome = fcmp(x[i], y[i]) if outcome != 0: return outcome return (len(x) > len(y)) - (len(x) < len(y)) return (x > y) - (x < y) try: str have_unicode = True except NameError: have_unicode = False is_jython = sys.platform.startswith('java') # Filename used for testing if os.name == 'java': # Jython disallows @ in module names TESTFN = '$test' elif os.name == 'riscos': TESTFN = 'testfile' else: TESTFN = '@test' # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() # TESTFN_UNICODE is a filename that can be encoded using the # file system encoding, but *not* with the default (ascii) encoding TESTFN_UNICODE = "@test-\xe0\xf2" TESTFN_ENCODING = sys.getfilesystemencoding() # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be # able to be encoded by *either* the default or filesystem encoding. # This test really only makes sense on Windows NT platforms # which have special Unicode support in posixmodule. if (not hasattr(sys, "getwindowsversion") or sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME TESTFN_UNICODE_UNENCODEABLE = None else: # Japanese characters (I think - from bug 846133) TESTFN_UNICODE_UNENCODEABLE = "@test-\u5171\u6709\u3055\u308c\u308b" try: # XXX - Note - should be using TESTFN_ENCODING here - but for # Windows, "mbcs" currently always operates as if in # errors=ignore' mode - hence we get '?' characters rather than # the exception. 'Latin1' operates as we expect - ie, fails. # See [ 850997 ] mbcs encoding ignores errors TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") except UnicodeEncodeError: pass else: print('WARNING: The filename %r CAN be encoded by the filesystem. ' \ 'Unicode filename tests may not be effective' \ % TESTFN_UNICODE_UNENCODEABLE) # Make sure we can write to TESTFN, try in /tmp if we can't fp = None try: fp = open(TESTFN, 'w+') except IOError: TMP_TESTFN = os.path.join('/tmp', TESTFN) try: fp = open(TMP_TESTFN, 'w+') TESTFN = TMP_TESTFN del TMP_TESTFN except IOError: print(('WARNING: tests will fail, unable to write to: %s or %s' % (TESTFN, TMP_TESTFN))) if fp is not None: fp.close() unlink(TESTFN) del fp def findfile(file, here=__file__): """Try to find a file on sys.path and the working directory. If it is not found the argument passed to the function is returned (this does not necessarily signal failure; could still be the legitimate path).""" if os.path.isabs(file): return file path = sys.path path = [os.path.dirname(here)] + path for dn in path: fn = os.path.join(dn, file) if os.path.exists(fn): return fn return file def verify(condition, reason='test failed'): """Verify that condition is true. If not, raise TestFailed. The optional argument reason can be given to provide a better error text. """ if not condition: raise TestFailed(reason) def vereq(a, b): """Raise TestFailed if a == b is false. This is better than verify(a == b) because, in case of failure, the error message incorporates repr(a) and repr(b) so you can see the inputs. Note that "not (a == b)" isn't necessarily the same as "a != b"; the former is tested. """ if not (a == b): raise TestFailed("%r == %r" % (a, b)) def sortdict(dict): "Like repr(dict), but in sorted order." items = sorted(dict.items()) reprpairs = ["%r: %r" % pair for pair in items] withcommas = ", ".join(reprpairs) return "{%s}" % withcommas def check_syntax_error(testcase, statement): try: compile(statement, '', 'exec') except SyntaxError: pass else: testcase.fail('Missing SyntaxError: "%s"' % statement) def open_urlresource(url): import urllib, urlparse requires('urlfetch') filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! for path in [os.path.curdir, os.path.pardir]: fn = os.path.join(path, filename) if os.path.exists(fn): return open(fn) print('\tfetching %s ...' % url, file=get_original_stdout()) fn, _ = urllib.urlretrieve(url, filename) return open(fn) @contextlib.contextmanager def guard_warnings_filter(): """Guard the warnings filter from being permanently changed.""" original_filters = warnings.filters[:] try: yield finally: warnings.filters = original_filters class WarningMessage(object): "Holds the result of the latest showwarning() call" def __init__(self): self.message = None self.category = None self.filename = None self.lineno = None def _showwarning(self, message, category, filename, lineno, file=None): self.message = message self.category = category self.filename = filename self.lineno = lineno @contextlib.contextmanager def catch_warning(): """ Guard the warnings filter from being permanently changed and record the data of the last warning that has been issued. Use like this: with catch_warning as w: warnings.warn("foo") assert str(w.message) == "foo" """ warning = WarningMessage() original_filters = warnings.filters[:] original_showwarning = warnings.showwarning warnings.showwarning = warning._showwarning try: yield warning finally: warnings.showwarning = original_showwarning warnings.filters = original_filters class EnvironmentVarGuard(object): """Class to help protect the environment variable properly. Can be used as a context manager.""" def __init__(self): self._environ = os.environ self._unset = set() self._reset = dict() def set(self, envvar, value): if envvar not in self._environ: self._unset.add(envvar) else: self._reset[envvar] = self._environ[envvar] self._environ[envvar] = value def unset(self, envvar): if envvar in self._environ: self._reset[envvar] = self._environ[envvar] del self._environ[envvar] def __enter__(self): return self def __exit__(self, *ignore_exc): for envvar, value in self._reset.items(): self._environ[envvar] = value for unset in self._unset: del self._environ[unset] class TransientResource(object): """Raise ResourceDenied if an exception is raised while the context manager is in effect that matches the specified exception and attributes.""" def __init__(self, exc, **kwargs): self.exc = exc self.attrs = kwargs def __enter__(self): return self def __exit__(self, type_=None, value=None, traceback=None): """If type_ is a subclass of self.exc and value has attributes matching self.attrs, raise ResourceDenied. Otherwise let the exception propagate (if any).""" if type_ is not None and issubclass(self.exc, type_): for attr, attr_value in self.attrs.items(): if not hasattr(value, attr): break if getattr(value, attr) != attr_value: break else: raise ResourceDenied("an optional resource is not available") def transient_internet(): """Return a context manager that raises ResourceDenied when various issues with the Internet connection manifest themselves as exceptions.""" time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) #======================================================================= # Decorator for running a function in a different locale, correctly resetting # it afterwards. def run_with_locale(catstr, *locales): def decorator(func): def inner(*args, **kwds): try: import locale category = getattr(locale, catstr) orig_locale = locale.setlocale(category) except AttributeError: # if the test author gives us an invalid category string raise except: # cannot retrieve original locale, so do nothing locale = orig_locale = None else: for loc in locales: try: locale.setlocale(category, loc) break except: pass # now run the function, resetting the locale on exceptions try: return func(*args, **kwds) finally: if locale and orig_locale: locale.setlocale(category, orig_locale) inner.__name__ = func.__name__ inner.__doc__ = func.__doc__ return inner return decorator #======================================================================= # Big-memory-test support. Separate from 'resources' because memory use should be configurable. # Some handy shorthands. Note that these are used for byte-limits as well # as size-limits, in the various bigmem tests _1M = 1024*1024 _1G = 1024 * _1M _2G = 2 * _1G # Hack to get at the maximum value an internal index can take. class _Dummy: def __getslice__(self, i, j): return j MAX_Py_ssize_t = _Dummy()[:] def set_memlimit(limit): import re global max_memuse sizes = { 'k': 1024, 'm': _1M, 'g': _1G, 't': 1024*_1G, } m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE) if m is None: raise ValueError('Invalid memory limit %r' % (limit,)) memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) if memlimit > MAX_Py_ssize_t: memlimit = MAX_Py_ssize_t if memlimit < _2G - 1: raise ValueError('Memory limit %r too low to be useful' % (limit,)) max_memuse = memlimit def bigmemtest(minsize, memuse, overhead=5*_1M): """Decorator for bigmem tests. 'minsize' is the minimum useful size for the test (in arbitrary, test-interpreted units.) 'memuse' is the number of 'bytes per size' for the test, or a good estimate of it. 'overhead' specifies fixed overhead, independant of the testsize, and defaults to 5Mb. The decorator tries to guess a good value for 'size' and passes it to the decorated test function. If minsize * memuse is more than the allowed memory use (as defined by max_memuse), the test is skipped. Otherwise, minsize is adjusted upward to use up to max_memuse. """ def decorator(f): def wrapper(self): if not max_memuse: # If max_memuse is 0 (the default), # we still want to run the tests with size set to a few kb, # to make sure they work. We still want to avoid using # too much memory, though, but we do that noisily. maxsize = 5147 self.failIf(maxsize * memuse + overhead > 20 * _1M) else: maxsize = int((max_memuse - overhead) / memuse) if maxsize < minsize: # Really ought to print 'test skipped' or something if verbose: sys.stderr.write("Skipping %s because of memory " "constraint\n" % (f.__name__,)) return # Try to keep some breathing room in memory use maxsize = max(maxsize - 50 * _1M, minsize) return f(self, maxsize) wrapper.minsize = minsize wrapper.memuse = memuse wrapper.overhead = overhead return wrapper return decorator def bigaddrspacetest(f): """Decorator for tests that fill the address space.""" def wrapper(self): if max_memuse < MAX_Py_ssize_t: if verbose: sys.stderr.write("Skipping %s because of memory " "constraint\n" % (f.__name__,)) else: return f(self) return wrapper #======================================================================= # unittest integration. class BasicTestRunner: def run(self, test): result = unittest.TestResult() test(result) return result def _run_suite(suite): """Run tests from a unittest.TestSuite-derived class.""" if verbose: runner = unittest.TextTestRunner(sys.stdout, verbosity=2) else: runner = BasicTestRunner() result = runner.run(suite) if not result.wasSuccessful(): if len(result.errors) == 1 and not result.failures: err = result.errors[0][1] elif len(result.failures) == 1 and not result.errors: err = result.failures[0][1] else: msg = "errors occurred; run in verbose mode for details" raise TestFailed(msg) raise TestFailed(err) def run_unittest(*classes): """Run tests from unittest.TestCase-derived classes.""" valid_types = (unittest.TestSuite, unittest.TestCase) suite = unittest.TestSuite() for cls in classes: if isinstance(cls, basestring): if cls in sys.modules: suite.addTest(unittest.findTestCases(sys.modules[cls])) else: raise ValueError("str arguments must be keys in sys.modules") elif isinstance(cls, valid_types): suite.addTest(cls) else: suite.addTest(unittest.makeSuite(cls)) _run_suite(suite) #======================================================================= # doctest driver. def run_doctest(module, verbosity=None): """Run doctest on the given module. Return (#failures, #tests). If optional argument verbosity is not specified (or is None), pass test_support's belief about verbosity on to doctest. Else doctest's usual behavior is used (it searches sys.argv for -v). """ import doctest if verbosity is None: verbosity = verbose else: verbosity = None # Direct doctest output (normally just errors) to real stdout; doctest # output shouldn't be compared by regrtest. save_stdout = sys.stdout sys.stdout = get_original_stdout() try: f, t = doctest.testmod(module, verbose=verbosity) if f: raise TestFailed("%d of %d doctests failed" % (f, t)) finally: sys.stdout = save_stdout if verbose: print('doctest (%s) ... %d tests with zero failures' % (module.__name__, t)) return f, t #======================================================================= # Threading support to prevent reporting refleaks when running regrtest.py -R def threading_setup(): import threading return len(threading._active), len(threading._limbo) def threading_cleanup(num_active, num_limbo): import threading import time _MAX_COUNT = 10 count = 0 while len(threading._active) != num_active and count < _MAX_COUNT: count += 1 time.sleep(0.1) count = 0 while len(threading._limbo) != num_limbo and count < _MAX_COUNT: count += 1 time.sleep(0.1) def reap_children(): """Use this function at the end of test_main() whenever sub-processes are started. This will help ensure that no extra children (zombies) stick around to hog resources and create problems when looking for refleaks. """ # Reap all our dead child processes so we don't leave zombies around. # These hog resources and might be causing some of the buildbots to die. if hasattr(os, 'waitpid'): any_process = -1 while True: try: # This will raise an exception on Windows. That's ok. pid, status = os.waitpid(any_process, os.WNOHANG) if pid == 0: break except: break