diff options
author | Benjamin Peterson <benjamin@python.org> | 2008-05-20 21:35:26 (GMT) |
---|---|---|
committer | Benjamin Peterson <benjamin@python.org> | 2008-05-20 21:35:26 (GMT) |
commit | ee8712cda46338d223509cc5751fd36509ad3860 (patch) | |
tree | bb9d363b4276566415457980472001c7e3ec2bed /Lib/test/test_support.py | |
parent | 6a654814ea3f3a918935762ffdcd33ae98e00278 (diff) | |
download | cpython-ee8712cda46338d223509cc5751fd36509ad3860.zip cpython-ee8712cda46338d223509cc5751fd36509ad3860.tar.gz cpython-ee8712cda46338d223509cc5751fd36509ad3860.tar.bz2 |
#2621 rename test.test_support to test.support
Diffstat (limited to 'Lib/test/test_support.py')
-rw-r--r-- | Lib/test/test_support.py | 769 |
1 files changed, 0 insertions, 769 deletions
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py deleted file mode 100644 index c4084bb..0000000 --- a/Lib/test/test_support.py +++ /dev/null @@ -1,769 +0,0 @@ -"""Supporting definitions for the Python regression tests.""" - -if __name__ != 'test.test_support': - raise ImportError('test_support must be imported from the test package') - -import contextlib -import errno -import socket -import sys -import os -import os.path -import shutil -import warnings -import unittest - -class Error(Exception): - """Base class for regression test exceptions.""" - -class TestFailed(Error): - """Test failed.""" - -class TestSkipped(Error): - """Test skipped. - - This can be raised to indicate that a test was deliberatly - skipped, but not because a feature wasn't available. For - example, if some resource can't be used, such as the network - appears to be unavailable, this should be raised instead of - TestFailed. - """ - -class ResourceDenied(TestSkipped): - """Test skipped because it requested a disallowed resource. - - This is raised when a test calls requires() for a resource that - has not be enabled. It is used to distinguish between expected - and unexpected skips. - """ - -def import_module(name, deprecated=False): - """Import the module to be tested, raising TestSkipped if it is not - available.""" - with catch_warning(record=False): - if deprecated: - warnings.filterwarnings("ignore", ".+ (module|package)", - DeprecationWarning) - try: - module = __import__(name, level=0) - except ImportError: - raise TestSkipped("No module named " + name) - else: - return module - -verbose = 1 # Flag set to 0 by regrtest.py -use_resources = None # Flag set to [] by regrtest.py -max_memuse = 0 # Disable bigmem tests (they will still be run with - # small sizes, to make sure they work.) - -# _original_stdout is meant to hold stdout at the time regrtest began. -# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. -# The point is to have some flavor of stdout the user can actually see. -_original_stdout = None -def record_original_stdout(stdout): - global _original_stdout - _original_stdout = stdout - -def get_original_stdout(): - return _original_stdout or sys.stdout - -def unload(name): - try: - del sys.modules[name] - except KeyError: - pass - -def unlink(filename): - try: - os.unlink(filename) - except OSError: - pass - -def rmtree(path): - try: - shutil.rmtree(path) - except OSError as e: - # Unix returns ENOENT, Windows returns ESRCH. - if e.errno not in (errno.ENOENT, errno.ESRCH): - raise - -def forget(modname): - '''"Forget" a module was ever imported by removing it from sys.modules and - deleting any .pyc and .pyo files.''' - unload(modname) - for dirname in sys.path: - unlink(os.path.join(dirname, modname + '.pyc')) - # Deleting the .pyo file cannot be within the 'try' for the .pyc since - # the chance exists that there is no .pyc (and thus the 'try' statement - # is exited) but there is a .pyo file. - unlink(os.path.join(dirname, modname + '.pyo')) - -def is_resource_enabled(resource): - """Test whether a resource is enabled. Known resources are set by - regrtest.py.""" - return use_resources is not None and resource in use_resources - -def requires(resource, msg=None): - """Raise ResourceDenied if the specified resource is not available. - - If the caller's module is __main__ then automatically return True. The - possibility of False being returned occurs when regrtest.py is executing.""" - # see if the caller's module is __main__ - if so, treat as if - # the resource was set - if sys._getframe().f_back.f_globals.get("__name__") == "__main__": - return - if not is_resource_enabled(resource): - if msg is None: - msg = "Use of the `%s' resource not enabled" % resource - raise ResourceDenied(msg) - -HOST = 'localhost' - -def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): - """Returns an unused port that should be suitable for binding. This is - achieved by creating a temporary socket with the same family and type as - the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to - the specified host address (defaults to 0.0.0.0) with the port set to 0, - eliciting an unused ephemeral port from the OS. The temporary socket is - then closed and deleted, and the ephemeral port is returned. - - Either this method or bind_port() should be used for any tests where a - server socket needs to be bound to a particular port for the duration of - the test. Which one to use depends on whether the calling code is creating - a python socket, or if an unused port needs to be provided in a constructor - or passed to an external program (i.e. the -accept argument to openssl's - s_server mode). Always prefer bind_port() over find_unused_port() where - possible. Hard coded ports should *NEVER* be used. As soon as a server - socket is bound to a hard coded port, the ability to run multiple instances - of the test simultaneously on the same host is compromised, which makes the - test a ticking time bomb in a buildbot environment. On Unix buildbots, this - may simply manifest as a failed test, which can be recovered from without - intervention in most cases, but on Windows, the entire python process can - completely and utterly wedge, requiring someone to log in to the buildbot - and manually kill the affected process. - - (This is easy to reproduce on Windows, unfortunately, and can be traced to - the SO_REUSEADDR socket option having different semantics on Windows versus - Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, - listen and then accept connections on identical host/ports. An EADDRINUSE - socket.error will be raised at some point (depending on the platform and - the order bind and listen were called on each socket). - - However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE - will ever be raised when attempting to bind two identical host/ports. When - accept() is called on each socket, the second caller's process will steal - the port from the first caller, leaving them both in an awkwardly wedged - state where they'll no longer respond to any signals or graceful kills, and - must be forcibly killed via OpenProcess()/TerminateProcess(). - - The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option - instead of SO_REUSEADDR, which effectively affords the same semantics as - SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open - Source world compared to Windows ones, this is a common mistake. A quick - look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when - openssl.exe is called with the 's_server' option, for example. See - http://bugs.python.org/issue2550 for more info. The following site also - has a very thorough description about the implications of both REUSEADDR - and EXCLUSIVEADDRUSE on Windows: - http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) - - XXX: although this approach is a vast improvement on previous attempts to - elicit unused ports, it rests heavily on the assumption that the ephemeral - port returned to us by the OS won't immediately be dished back out to some - other process when we close and delete our temporary socket but before our - calling code has a chance to bind the returned port. We can deal with this - issue if/when we come across it. - """ - - tempsock = socket.socket(family, socktype) - port = bind_port(tempsock) - tempsock.close() - del tempsock - return port - -def bind_port(sock, host=HOST): - """Bind the socket to a free port and return the port number. Relies on - ephemeral ports in order to ensure we are using an unbound port. This is - important as many tests may be running simultaneously, especially in a - buildbot environment. This method raises an exception if the sock.family - is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR - or SO_REUSEPORT set on it. Tests should *never* set these socket options - for TCP/IP sockets. The only case for setting these options is testing - multicasting via multiple UDP sockets. - - Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. - on Windows), it will be set on the socket. This will prevent anyone else - from bind()'ing to our host/port for the duration of the test. - """ - - if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: - if hasattr(socket, 'SO_REUSEADDR'): - if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: - raise TestFailed("tests should never set the SO_REUSEADDR " \ - "socket option on TCP/IP sockets!") - if hasattr(socket, 'SO_REUSEPORT'): - if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: - raise TestFailed("tests should never set the SO_REUSEPORT " \ - "socket option on TCP/IP sockets!") - if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) - - sock.bind((host, 0)) - port = sock.getsockname()[1] - return port - -FUZZ = 1e-6 - -def fcmp(x, y): # fuzzy comparison function - if isinstance(x, float) or isinstance(y, float): - try: - fuzz = (abs(x) + abs(y)) * FUZZ - if abs(x-y) <= fuzz: - return 0 - except: - pass - elif type(x) == type(y) and isinstance(x, (tuple, list)): - for i in range(min(len(x), len(y))): - outcome = fcmp(x[i], y[i]) - if outcome != 0: - return outcome - return (len(x) > len(y)) - (len(x) < len(y)) - return (x > y) - (x < y) - -try: - str - have_unicode = True -except NameError: - have_unicode = False - -is_jython = sys.platform.startswith('java') - -# Filename used for testing -if os.name == 'java': - # Jython disallows @ in module names - TESTFN = '$test' -else: - TESTFN = '@test' - - # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() - # TESTFN_UNICODE is a filename that can be encoded using the - # file system encoding, but *not* with the default (ascii) encoding - TESTFN_UNICODE = "@test-\xe0\xf2" - TESTFN_ENCODING = sys.getfilesystemencoding() - # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be - # able to be encoded by *either* the default or filesystem encoding. - # This test really only makes sense on Windows NT platforms - # which have special Unicode support in posixmodule. - if (not hasattr(sys, "getwindowsversion") or - sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME - TESTFN_UNICODE_UNENCODEABLE = None - else: - # Japanese characters (I think - from bug 846133) - TESTFN_UNICODE_UNENCODEABLE = "@test-\u5171\u6709\u3055\u308c\u308b" - try: - # XXX - Note - should be using TESTFN_ENCODING here - but for - # Windows, "mbcs" currently always operates as if in - # errors=ignore' mode - hence we get '?' characters rather than - # the exception. 'Latin1' operates as we expect - ie, fails. - # See [ 850997 ] mbcs encoding ignores errors - TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") - except UnicodeEncodeError: - pass - else: - print('WARNING: The filename %r CAN be encoded by the filesystem. ' - 'Unicode filename tests may not be effective' - % TESTFN_UNICODE_UNENCODEABLE) - -# Make sure we can write to TESTFN, try in /tmp if we can't -fp = None -try: - fp = open(TESTFN, 'w+') -except IOError: - TMP_TESTFN = os.path.join('/tmp', TESTFN) - try: - fp = open(TMP_TESTFN, 'w+') - TESTFN = TMP_TESTFN - del TMP_TESTFN - except IOError: - print(('WARNING: tests will fail, unable to write to: %s or %s' % - (TESTFN, TMP_TESTFN))) -if fp is not None: - fp.close() - unlink(TESTFN) -del fp - -def findfile(file, here=__file__): - """Try to find a file on sys.path and the working directory. If it is not - found the argument passed to the function is returned (this does not - necessarily signal failure; could still be the legitimate path).""" - if os.path.isabs(file): - return file - path = sys.path - path = [os.path.dirname(here)] + path - for dn in path: - fn = os.path.join(dn, file) - if os.path.exists(fn): return fn - return file - -def verify(condition, reason='test failed'): - """Verify that condition is true. If not, raise TestFailed. - - The optional argument reason can be given to provide - a better error text. - """ - - if not condition: - raise TestFailed(reason) - -def vereq(a, b): - """Raise TestFailed if a == b is false. - - This is better than verify(a == b) because, in case of failure, the - error message incorporates repr(a) and repr(b) so you can see the - inputs. - - Note that "not (a == b)" isn't necessarily the same as "a != b"; the - former is tested. - """ - - if not (a == b): - raise TestFailed("%r == %r" % (a, b)) - -def sortdict(dict): - "Like repr(dict), but in sorted order." - items = sorted(dict.items()) - reprpairs = ["%r: %r" % pair for pair in items] - withcommas = ", ".join(reprpairs) - return "{%s}" % withcommas - -def check_syntax_error(testcase, statement): - try: - compile(statement, '<test string>', 'exec') - except SyntaxError: - pass - else: - testcase.fail('Missing SyntaxError: "%s"' % statement) - -def open_urlresource(url, *args, **kw): - import urllib, urlparse - - requires('urlfetch') - filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! - - for path in [os.path.curdir, os.path.pardir]: - fn = os.path.join(path, filename) - if os.path.exists(fn): - return open(fn, *args, **kw) - - print('\tfetching %s ...' % url, file=get_original_stdout()) - fn, _ = urllib.urlretrieve(url, filename) - return open(fn, *args, **kw) - - -class WarningMessage(object): - "Holds the result of the latest showwarning() call" - def __init__(self): - self.message = None - self.category = None - self.filename = None - self.lineno = None - - def _showwarning(self, message, category, filename, lineno, file=None, - line=None): - self.message = message - self.category = category - self.filename = filename - self.lineno = lineno - self.line = line - - def reset(self): - self._showwarning(*((None,)*6)) - - def __str__(self): - return ("{message : %r, category : %r, filename : %r, lineno : %s, " - "line : %r}" % (self.message, - self.category.__name__ if self.category else None, - self.filename, self.lineno, self.line)) - - -@contextlib.contextmanager -def catch_warning(module=warnings, record=True): - """ - Guard the warnings filter from being permanently changed and record the - data of the last warning that has been issued. - - Use like this: - - with catch_warning() as w: - warnings.warn("foo") - assert str(w.message) == "foo" - """ - original_filters = module.filters[:] - original_showwarning = module.showwarning - if record: - warning_obj = WarningMessage() - module.showwarning = warning_obj._showwarning - try: - yield warning_obj if record else None - finally: - module.showwarning = original_showwarning - module.filters = original_filters - - -class CleanImport(object): - """Context manager to force import to return a new module reference. - - This is useful for testing module-level behaviours, such as - the emission of a DepreciationWarning on import. - - Use like this: - - with CleanImport("foo"): - __import__("foo") # new reference - """ - - def __init__(self, *module_names): - self.original_modules = sys.modules.copy() - for module_name in module_names: - if module_name in sys.modules: - module = sys.modules[module_name] - # It is possible that module_name is just an alias for - # another module (e.g. stub for modules renamed in 3.x). - # In that case, we also need delete the real module to clear - # the import cache. - if module.__name__ != module_name: - del sys.modules[module.__name__] - del sys.modules[module_name] - - def __enter__(self): - return self - - def __exit__(self, *ignore_exc): - sys.modules.update(self.original_modules) - - -class EnvironmentVarGuard(object): - - """Class to help protect the environment variable properly. Can be used as - a context manager.""" - - def __init__(self): - self._environ = os.environ - self._unset = set() - self._reset = dict() - - def set(self, envvar, value): - if envvar not in self._environ: - self._unset.add(envvar) - else: - self._reset[envvar] = self._environ[envvar] - self._environ[envvar] = value - - def unset(self, envvar): - if envvar in self._environ: - self._reset[envvar] = self._environ[envvar] - del self._environ[envvar] - - def __enter__(self): - return self - - def __exit__(self, *ignore_exc): - for envvar, value in self._reset.items(): - self._environ[envvar] = value - for unset in self._unset: - del self._environ[unset] - -class TransientResource(object): - - """Raise ResourceDenied if an exception is raised while the context manager - is in effect that matches the specified exception and attributes.""" - - def __init__(self, exc, **kwargs): - self.exc = exc - self.attrs = kwargs - - def __enter__(self): - return self - - def __exit__(self, type_=None, value=None, traceback=None): - """If type_ is a subclass of self.exc and value has attributes matching - self.attrs, raise ResourceDenied. Otherwise let the exception - propagate (if any).""" - if type_ is not None and issubclass(self.exc, type_): - for attr, attr_value in self.attrs.items(): - if not hasattr(value, attr): - break - if getattr(value, attr) != attr_value: - break - else: - raise ResourceDenied("an optional resource is not available") - - -def transient_internet(): - """Return a context manager that raises ResourceDenied when various issues - with the Internet connection manifest themselves as exceptions.""" - time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) - socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) - ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) - return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) - - -@contextlib.contextmanager -def captured_output(stream_name): - """Run the 'with' statement body using a StringIO object in place of a - specific attribute on the sys module. - Example use (with 'stream_name=stdout'):: - - with captured_stdout() as s: - print("hello") - assert s.getvalue() == "hello" - """ - import io - orig_stdout = getattr(sys, stream_name) - setattr(sys, stream_name, io.StringIO()) - try: - yield getattr(sys, stream_name) - finally: - setattr(sys, stream_name, orig_stdout) - -def captured_stdout(): - return captured_output("stdout") - - -#======================================================================= -# Decorator for running a function in a different locale, correctly resetting -# it afterwards. - -def run_with_locale(catstr, *locales): - def decorator(func): - def inner(*args, **kwds): - try: - import locale - category = getattr(locale, catstr) - orig_locale = locale.setlocale(category) - except AttributeError: - # if the test author gives us an invalid category string - raise - except: - # cannot retrieve original locale, so do nothing - locale = orig_locale = None - else: - for loc in locales: - try: - locale.setlocale(category, loc) - break - except: - pass - - # now run the function, resetting the locale on exceptions - try: - return func(*args, **kwds) - finally: - if locale and orig_locale: - locale.setlocale(category, orig_locale) - inner.__name__ = func.__name__ - inner.__doc__ = func.__doc__ - return inner - return decorator - -#======================================================================= -# Big-memory-test support. Separate from 'resources' because memory use -# should be configurable. - -# Some handy shorthands. Note that these are used for byte-limits as well -# as size-limits, in the various bigmem tests -_1M = 1024*1024 -_1G = 1024 * _1M -_2G = 2 * _1G - -MAX_Py_ssize_t = sys.maxsize - -def set_memlimit(limit): - import re - global max_memuse - sizes = { - 'k': 1024, - 'm': _1M, - 'g': _1G, - 't': 1024*_1G, - } - m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, - re.IGNORECASE | re.VERBOSE) - if m is None: - raise ValueError('Invalid memory limit %r' % (limit,)) - memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) - if memlimit > MAX_Py_ssize_t: - memlimit = MAX_Py_ssize_t - if memlimit < _2G - 1: - raise ValueError('Memory limit %r too low to be useful' % (limit,)) - max_memuse = memlimit - -def bigmemtest(minsize, memuse, overhead=5*_1M): - """Decorator for bigmem tests. - - 'minsize' is the minimum useful size for the test (in arbitrary, - test-interpreted units.) 'memuse' is the number of 'bytes per size' for - the test, or a good estimate of it. 'overhead' specifies fixed overhead, - independent of the testsize, and defaults to 5Mb. - - The decorator tries to guess a good value for 'size' and passes it to - the decorated test function. If minsize * memuse is more than the - allowed memory use (as defined by max_memuse), the test is skipped. - Otherwise, minsize is adjusted upward to use up to max_memuse. - """ - def decorator(f): - def wrapper(self): - if not max_memuse: - # If max_memuse is 0 (the default), - # we still want to run the tests with size set to a few kb, - # to make sure they work. We still want to avoid using - # too much memory, though, but we do that noisily. - maxsize = 5147 - self.failIf(maxsize * memuse + overhead > 20 * _1M) - else: - maxsize = int((max_memuse - overhead) / memuse) - if maxsize < minsize: - # Really ought to print 'test skipped' or something - if verbose: - sys.stderr.write("Skipping %s because of memory " - "constraint\n" % (f.__name__,)) - return - # Try to keep some breathing room in memory use - maxsize = max(maxsize - 50 * _1M, minsize) - return f(self, maxsize) - wrapper.minsize = minsize - wrapper.memuse = memuse - wrapper.overhead = overhead - return wrapper - return decorator - -def bigaddrspacetest(f): - """Decorator for tests that fill the address space.""" - def wrapper(self): - if max_memuse < MAX_Py_ssize_t: - if verbose: - sys.stderr.write("Skipping %s because of memory " - "constraint\n" % (f.__name__,)) - else: - return f(self) - return wrapper - -#======================================================================= -# unittest integration. - -class BasicTestRunner: - def run(self, test): - result = unittest.TestResult() - test(result) - return result - - -def _run_suite(suite): - """Run tests from a unittest.TestSuite-derived class.""" - if verbose: - runner = unittest.TextTestRunner(sys.stdout, verbosity=2) - else: - runner = BasicTestRunner() - - result = runner.run(suite) - if not result.wasSuccessful(): - if len(result.errors) == 1 and not result.failures: - err = result.errors[0][1] - elif len(result.failures) == 1 and not result.errors: - err = result.failures[0][1] - else: - err = "errors occurred; run in verbose mode for details" - raise TestFailed(err) - - -def run_unittest(*classes): - """Run tests from unittest.TestCase-derived classes.""" - valid_types = (unittest.TestSuite, unittest.TestCase) - suite = unittest.TestSuite() - for cls in classes: - if isinstance(cls, str): - if cls in sys.modules: - suite.addTest(unittest.findTestCases(sys.modules[cls])) - else: - raise ValueError("str arguments must be keys in sys.modules") - elif isinstance(cls, valid_types): - suite.addTest(cls) - else: - suite.addTest(unittest.makeSuite(cls)) - _run_suite(suite) - - -#======================================================================= -# doctest driver. - -def run_doctest(module, verbosity=None): - """Run doctest on the given module. Return (#failures, #tests). - - If optional argument verbosity is not specified (or is None), pass - test_support's belief about verbosity on to doctest. Else doctest's - usual behavior is used (it searches sys.argv for -v). - """ - - import doctest - - if verbosity is None: - verbosity = verbose - else: - verbosity = None - - # Direct doctest output (normally just errors) to real stdout; doctest - # output shouldn't be compared by regrtest. - save_stdout = sys.stdout - sys.stdout = get_original_stdout() - try: - f, t = doctest.testmod(module, verbose=verbosity) - if f: - raise TestFailed("%d of %d doctests failed" % (f, t)) - finally: - sys.stdout = save_stdout - if verbose: - print('doctest (%s) ... %d tests with zero failures' % - (module.__name__, t)) - return f, t - -#======================================================================= -# Threading support to prevent reporting refleaks when running regrtest.py -R - -def threading_setup(): - import threading - return len(threading._active), len(threading._limbo) - -def threading_cleanup(num_active, num_limbo): - import threading - import time - - _MAX_COUNT = 10 - count = 0 - while len(threading._active) != num_active and count < _MAX_COUNT: - count += 1 - time.sleep(0.1) - - count = 0 - while len(threading._limbo) != num_limbo and count < _MAX_COUNT: - count += 1 - time.sleep(0.1) - -def reap_children(): - """Use this function at the end of test_main() whenever sub-processes - are started. This will help ensure that no extra children (zombies) - stick around to hog resources and create problems when looking - for refleaks. - """ - - # Reap all our dead child processes so we don't leave zombies around. - # These hog resources and might be causing some of the buildbots to die. - if hasattr(os, 'waitpid'): - any_process = -1 - while True: - try: - # This will raise an exception on Windows. That's ok. - pid, status = os.waitpid(any_process, os.WNOHANG) - if pid == 0: - break - except: - break |