summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_support.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_support.py')
-rw-r--r--Lib/test/test_support.py125
1 files changed, 94 insertions, 31 deletions
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 6fbb3cc..1ff0e4d 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -1,11 +1,17 @@
"""Supporting definitions for the Python regression tests."""
if __name__ != 'test.test_support':
- raise ImportError, 'test_support must be imported from the test package'
+ raise ImportError('test_support must be imported from the test package')
-from contextlib import contextmanager
+import contextlib
+import errno
+import socket
import sys
+import os
+import os.path
import warnings
+import types
+import unittest
class Error(Exception):
"""Base class for regression test exceptions."""
@@ -54,7 +60,6 @@ def unload(name):
pass
def unlink(filename):
- import os
try:
os.unlink(filename)
except OSError:
@@ -64,7 +69,6 @@ def forget(modname):
'''"Forget" a module was ever imported by removing it from sys.modules and
deleting any .pyc and .pyo files.'''
unload(modname)
- import os
for dirname in sys.path:
unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
# Deleting the .pyo file cannot be within the 'try' for the .pyc since
@@ -96,7 +100,6 @@ def bind_port(sock, host='', preferred_port=54321):
tests and we don't try multiple ports, the test can fails. This
makes the test more robust."""
- import socket, errno
# some random ports that hopefully no one is listening on.
for port in [preferred_port, 9907, 10243, 32999]:
try:
@@ -107,7 +110,7 @@ def bind_port(sock, host='', preferred_port=54321):
if err != errno.EADDRINUSE:
raise
print(' WARNING: failed to listen on port %d, trying another' % port, file=sys.__stderr__)
- raise TestFailed, 'unable to find port to listen on'
+ raise TestFailed('unable to find port to listen on')
FUZZ = 1e-6
@@ -135,7 +138,6 @@ except NameError:
is_jython = sys.platform.startswith('java')
-import os
# Filename used for testing
if os.name == 'java':
# Jython disallows @ in module names
@@ -197,13 +199,12 @@ except IOError:
if fp is not None:
fp.close()
unlink(TESTFN)
-del os, fp
+del fp
def findfile(file, here=__file__):
"""Try to find a file on sys.path and the working directory. If it is not
found the argument passed to the function is returned (this does not
necessarily signal failure; could still be the legitimate path)."""
- import os
if os.path.isabs(file):
return file
path = sys.path
@@ -235,7 +236,7 @@ def vereq(a, b):
"""
if not (a == b):
- raise TestFailed, "%r == %r" % (a, b)
+ raise TestFailed("%r == %r" % (a, b))
def sortdict(dict):
"Like repr(dict), but in sorted order."
@@ -254,7 +255,6 @@ def check_syntax_error(testcase, statement):
def open_urlresource(url):
import urllib, urlparse
- import os.path
filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
@@ -268,7 +268,7 @@ def open_urlresource(url):
fn, _ = urllib.urlretrieve(url, filename)
return open(fn)
-@contextmanager
+@contextlib.contextmanager
def guard_warnings_filter():
"""Guard the warnings filter from being permanently changed."""
original_filters = warnings.filters[:]
@@ -277,14 +277,49 @@ def guard_warnings_filter():
finally:
warnings.filters = original_filters
+class WarningMessage(object):
+ "Holds the result of the latest showwarning() call"
+ def __init__(self):
+ self.message = None
+ self.category = None
+ self.filename = None
+ self.lineno = None
+
+ def _showwarning(self, message, category, filename, lineno, file=None):
+ self.message = message
+ self.category = category
+ self.filename = filename
+ self.lineno = lineno
+
+@contextlib.contextmanager
+def catch_warning():
+ """
+ Guard the warnings filter from being permanently changed and record the
+ data of the last warning that has been issued.
+
+ Use like this:
+
+ with catch_warning as w:
+ warnings.warn("foo")
+ assert str(w.message) == "foo"
+ """
+ warning = WarningMessage()
+ original_filters = warnings.filters[:]
+ original_showwarning = warnings.showwarning
+ warnings.showwarning = warning._showwarning
+ try:
+ yield warning
+ finally:
+ warnings.showwarning = original_showwarning
+ warnings.filters = original_filters
+
class EnvironmentVarGuard(object):
"""Class to help protect the environment variable properly. Can be used as
a context manager."""
def __init__(self):
- from os import environ
- self._environ = environ
+ self._environ = os.environ
self._unset = set()
self._reset = dict()
@@ -309,6 +344,40 @@ class EnvironmentVarGuard(object):
for unset in self._unset:
del self._environ[unset]
+class TransientResource(object):
+
+ """Raise ResourceDenied if an exception is raised while the context manager
+ is in effect that matches the specified exception and attributes."""
+
+ def __init__(self, exc, **kwargs):
+ self.exc = exc
+ self.attrs = kwargs
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type_=None, value=None, traceback=None):
+ """If type_ is a subclass of self.exc and value has attributes matching
+ self.attrs, raise ResourceDenied. Otherwise let the exception
+ propagate (if any)."""
+ if type_ is not None and issubclass(self.exc, type_):
+ for attr, attr_value in self.attrs.items():
+ if not hasattr(value, attr):
+ break
+ if getattr(value, attr) != attr_value:
+ break
+ else:
+ raise ResourceDenied("an optional resource is not available")
+
+
+def transient_internet():
+ """Return a context manager that raises ResourceDenied when various issues
+ with the Internet connection manifest themselves as exceptions."""
+ time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
+ socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
+ ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
+ return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
+
#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
@@ -432,10 +501,7 @@ def bigaddrspacetest(f):
return wrapper
#=======================================================================
-# Preliminary PyUNIT integration.
-
-import unittest
-
+# unittest integration.
class BasicTestRunner:
def run(self, test):
@@ -444,7 +510,7 @@ class BasicTestRunner:
return result
-def run_suite(suite, testclass=None):
+def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
if verbose:
runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
@@ -458,28 +524,26 @@ def run_suite(suite, testclass=None):
elif len(result.failures) == 1 and not result.errors:
err = result.failures[0][1]
else:
- if testclass is None:
- msg = "errors occurred; run in verbose mode for details"
- else:
- msg = "errors occurred in %s.%s" \
- % (testclass.__module__, testclass.__name__)
+ msg = "errors occurred; run in verbose mode for details"
raise TestFailed(msg)
raise TestFailed(err)
def run_unittest(*classes):
"""Run tests from unittest.TestCase-derived classes."""
+ valid_types = (unittest.TestSuite, unittest.TestCase)
suite = unittest.TestSuite()
for cls in classes:
- if isinstance(cls, (unittest.TestSuite, unittest.TestCase)):
+ if isinstance(cls, str):
+ if cls in sys.modules:
+ suite.addTest(unittest.findTestCases(sys.modules[cls]))
+ else:
+ raise ValueError("str arguments must be keys in sys.modules")
+ elif isinstance(cls, valid_types):
suite.addTest(cls)
else:
suite.addTest(unittest.makeSuite(cls))
- if len(classes)==1:
- testclass = classes[0]
- else:
- testclass = None
- run_suite(suite, testclass)
+ _run_suite(suite)
#=======================================================================
@@ -545,7 +609,6 @@ def reap_children():
# Reap all our dead child processes so we don't leave zombies around.
# These hog resources and might be causing some of the buildbots to die.
- import os
if hasattr(os, 'waitpid'):
any_process = -1
while True: