diff options
Diffstat (limited to 'Lib/test/test_support.py')
-rw-r--r-- | Lib/test/test_support.py | 125 |
1 files changed, 94 insertions, 31 deletions
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 6fbb3cc..1ff0e4d 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -1,11 +1,17 @@ """Supporting definitions for the Python regression tests.""" if __name__ != 'test.test_support': - raise ImportError, 'test_support must be imported from the test package' + raise ImportError('test_support must be imported from the test package') -from contextlib import contextmanager +import contextlib +import errno +import socket import sys +import os +import os.path import warnings +import types +import unittest class Error(Exception): """Base class for regression test exceptions.""" @@ -54,7 +60,6 @@ def unload(name): pass def unlink(filename): - import os try: os.unlink(filename) except OSError: @@ -64,7 +69,6 @@ def forget(modname): '''"Forget" a module was ever imported by removing it from sys.modules and deleting any .pyc and .pyo files.''' unload(modname) - import os for dirname in sys.path: unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) # Deleting the .pyo file cannot be within the 'try' for the .pyc since @@ -96,7 +100,6 @@ def bind_port(sock, host='', preferred_port=54321): tests and we don't try multiple ports, the test can fails. This makes the test more robust.""" - import socket, errno # some random ports that hopefully no one is listening on. for port in [preferred_port, 9907, 10243, 32999]: try: @@ -107,7 +110,7 @@ def bind_port(sock, host='', preferred_port=54321): if err != errno.EADDRINUSE: raise print(' WARNING: failed to listen on port %d, trying another' % port, file=sys.__stderr__) - raise TestFailed, 'unable to find port to listen on' + raise TestFailed('unable to find port to listen on') FUZZ = 1e-6 @@ -135,7 +138,6 @@ except NameError: is_jython = sys.platform.startswith('java') -import os # Filename used for testing if os.name == 'java': # Jython disallows @ in module names @@ -197,13 +199,12 @@ except IOError: if fp is not None: fp.close() unlink(TESTFN) -del os, fp +del fp def findfile(file, here=__file__): """Try to find a file on sys.path and the working directory. If it is not found the argument passed to the function is returned (this does not necessarily signal failure; could still be the legitimate path).""" - import os if os.path.isabs(file): return file path = sys.path @@ -235,7 +236,7 @@ def vereq(a, b): """ if not (a == b): - raise TestFailed, "%r == %r" % (a, b) + raise TestFailed("%r == %r" % (a, b)) def sortdict(dict): "Like repr(dict), but in sorted order." @@ -254,7 +255,6 @@ def check_syntax_error(testcase, statement): def open_urlresource(url): import urllib, urlparse - import os.path filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! @@ -268,7 +268,7 @@ def open_urlresource(url): fn, _ = urllib.urlretrieve(url, filename) return open(fn) -@contextmanager +@contextlib.contextmanager def guard_warnings_filter(): """Guard the warnings filter from being permanently changed.""" original_filters = warnings.filters[:] @@ -277,14 +277,49 @@ def guard_warnings_filter(): finally: warnings.filters = original_filters +class WarningMessage(object): + "Holds the result of the latest showwarning() call" + def __init__(self): + self.message = None + self.category = None + self.filename = None + self.lineno = None + + def _showwarning(self, message, category, filename, lineno, file=None): + self.message = message + self.category = category + self.filename = filename + self.lineno = lineno + +@contextlib.contextmanager +def catch_warning(): + """ + Guard the warnings filter from being permanently changed and record the + data of the last warning that has been issued. + + Use like this: + + with catch_warning as w: + warnings.warn("foo") + assert str(w.message) == "foo" + """ + warning = WarningMessage() + original_filters = warnings.filters[:] + original_showwarning = warnings.showwarning + warnings.showwarning = warning._showwarning + try: + yield warning + finally: + warnings.showwarning = original_showwarning + warnings.filters = original_filters + class EnvironmentVarGuard(object): """Class to help protect the environment variable properly. Can be used as a context manager.""" def __init__(self): - from os import environ - self._environ = environ + self._environ = os.environ self._unset = set() self._reset = dict() @@ -309,6 +344,40 @@ class EnvironmentVarGuard(object): for unset in self._unset: del self._environ[unset] +class TransientResource(object): + + """Raise ResourceDenied if an exception is raised while the context manager + is in effect that matches the specified exception and attributes.""" + + def __init__(self, exc, **kwargs): + self.exc = exc + self.attrs = kwargs + + def __enter__(self): + return self + + def __exit__(self, type_=None, value=None, traceback=None): + """If type_ is a subclass of self.exc and value has attributes matching + self.attrs, raise ResourceDenied. Otherwise let the exception + propagate (if any).""" + if type_ is not None and issubclass(self.exc, type_): + for attr, attr_value in self.attrs.items(): + if not hasattr(value, attr): + break + if getattr(value, attr) != attr_value: + break + else: + raise ResourceDenied("an optional resource is not available") + + +def transient_internet(): + """Return a context manager that raises ResourceDenied when various issues + with the Internet connection manifest themselves as exceptions.""" + time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) + socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) + ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) + return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) + #======================================================================= # Decorator for running a function in a different locale, correctly resetting @@ -432,10 +501,7 @@ def bigaddrspacetest(f): return wrapper #======================================================================= -# Preliminary PyUNIT integration. - -import unittest - +# unittest integration. class BasicTestRunner: def run(self, test): @@ -444,7 +510,7 @@ class BasicTestRunner: return result -def run_suite(suite, testclass=None): +def _run_suite(suite): """Run tests from a unittest.TestSuite-derived class.""" if verbose: runner = unittest.TextTestRunner(sys.stdout, verbosity=2) @@ -458,28 +524,26 @@ def run_suite(suite, testclass=None): elif len(result.failures) == 1 and not result.errors: err = result.failures[0][1] else: - if testclass is None: - msg = "errors occurred; run in verbose mode for details" - else: - msg = "errors occurred in %s.%s" \ - % (testclass.__module__, testclass.__name__) + msg = "errors occurred; run in verbose mode for details" raise TestFailed(msg) raise TestFailed(err) def run_unittest(*classes): """Run tests from unittest.TestCase-derived classes.""" + valid_types = (unittest.TestSuite, unittest.TestCase) suite = unittest.TestSuite() for cls in classes: - if isinstance(cls, (unittest.TestSuite, unittest.TestCase)): + if isinstance(cls, str): + if cls in sys.modules: + suite.addTest(unittest.findTestCases(sys.modules[cls])) + else: + raise ValueError("str arguments must be keys in sys.modules") + elif isinstance(cls, valid_types): suite.addTest(cls) else: suite.addTest(unittest.makeSuite(cls)) - if len(classes)==1: - testclass = classes[0] - else: - testclass = None - run_suite(suite, testclass) + _run_suite(suite) #======================================================================= @@ -545,7 +609,6 @@ def reap_children(): # Reap all our dead child processes so we don't leave zombies around. # These hog resources and might be causing some of the buildbots to die. - import os if hasattr(os, 'waitpid'): any_process = -1 while True: |