diff options
Diffstat (limited to 'Lib/unittest/case.py')
-rw-r--r-- | Lib/unittest/case.py | 252 |
1 files changed, 166 insertions, 86 deletions
diff --git a/Lib/unittest/case.py b/Lib/unittest/case.py index ad1fa84..cf7301b 100644 --- a/Lib/unittest/case.py +++ b/Lib/unittest/case.py @@ -7,6 +7,7 @@ import pprint import re import warnings import collections +import contextlib from . import result from .util import (strclass, safe_repr, _count_diff_all_purpose, @@ -26,17 +27,11 @@ class SkipTest(Exception): instead of raising this directly. """ -class _ExpectedFailure(Exception): +class _ShouldStop(Exception): """ - Raise this when a test is expected to fail. - - This is an implementation detail. + The test should stop. """ - def __init__(self, exc_info): - super(_ExpectedFailure, self).__init__() - self.exc_info = exc_info - class _UnexpectedSuccess(Exception): """ The test was supposed to fail, but it didn't! @@ -44,13 +39,40 @@ class _UnexpectedSuccess(Exception): class _Outcome(object): - def __init__(self): + def __init__(self, result=None): + self.expecting_failure = False + self.result = result + self.result_supports_subtests = hasattr(result, "addSubTest") self.success = True - self.skipped = None - self.unexpectedSuccess = None + self.skipped = [] self.expectedFailure = None self.errors = [] - self.failures = [] + + @contextlib.contextmanager + def testPartExecutor(self, test_case, isTest=False): + old_success = self.success + self.success = True + try: + yield + except KeyboardInterrupt: + raise + except SkipTest as e: + self.success = False + self.skipped.append((test_case, str(e))) + except _ShouldStop: + pass + except: + exc_info = sys.exc_info() + if self.expecting_failure: + self.expectedFailure = exc_info + else: + self.success = False + self.errors.append((test_case, exc_info)) + else: + if self.result_supports_subtests and self.success: + self.errors.append((test_case, None)) + finally: + self.success = self.success and old_success def _id(obj): @@ -88,16 +110,9 @@ def skipUnless(condition, reason): return skip(reason) return _id - -def expectedFailure(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - try: - func(*args, **kwargs) - except Exception: - raise _ExpectedFailure(sys.exc_info()) - raise _UnexpectedSuccess - return wrapper +def expectedFailure(test_item): + test_item.__unittest_expecting_failure__ = True + return test_item class _AssertRaisesBaseContext(object): @@ -271,7 +286,7 @@ class TestCase(object): not have a method with the specified name. """ self._testMethodName = methodName - self._outcomeForDoCleanups = None + self._outcome = None self._testMethodDoc = 'No test' try: testMethod = getattr(self, methodName) @@ -284,6 +299,7 @@ class TestCase(object): else: self._testMethodDoc = testMethod.__doc__ self._cleanups = [] + self._subtest = None # Map types to custom assertEqual functions that will compare # instances of said type in more detail to generate a more useful @@ -371,44 +387,80 @@ class TestCase(object): return "<%s testMethod=%s>" % \ (strclass(self.__class__), self._testMethodName) - def _addSkip(self, result, reason): + def _addSkip(self, result, test_case, reason): addSkip = getattr(result, 'addSkip', None) if addSkip is not None: - addSkip(self, reason) + addSkip(test_case, reason) else: warnings.warn("TestResult has no addSkip method, skips not reported", RuntimeWarning, 2) + result.addSuccess(test_case) + + @contextlib.contextmanager + def subTest(self, msg=None, **params): + """Return a context manager that will return the enclosed block + of code in a subtest identified by the optional message and + keyword parameters. A failure in the subtest marks the test + case as failed but resumes execution at the end of the enclosed + block, allowing further test code to be executed. + """ + if not self._outcome.result_supports_subtests: + yield + return + parent = self._subtest + if parent is None: + params_map = collections.ChainMap(params) + else: + params_map = parent.params.new_child(params) + self._subtest = _SubTest(self, msg, params_map) + try: + with self._outcome.testPartExecutor(self._subtest, isTest=True): + yield + if not self._outcome.success: + result = self._outcome.result + if result is not None and result.failfast: + raise _ShouldStop + elif self._outcome.expectedFailure: + # If the test is expecting a failure, we really want to + # stop now and register the expected failure. + raise _ShouldStop + finally: + self._subtest = parent + + def _feedErrorsToResult(self, result, errors): + for test, exc_info in errors: + if isinstance(test, _SubTest): + result.addSubTest(test.test_case, test, exc_info) + elif exc_info is not None: + if issubclass(exc_info[0], self.failureException): + result.addFailure(test, exc_info) + else: + result.addError(test, exc_info) + + def _addExpectedFailure(self, result, exc_info): + try: + addExpectedFailure = result.addExpectedFailure + except AttributeError: + warnings.warn("TestResult has no addExpectedFailure method, reporting as passes", + RuntimeWarning) result.addSuccess(self) + else: + addExpectedFailure(self, exc_info) - def _executeTestPart(self, function, outcome, isTest=False): + def _addUnexpectedSuccess(self, result): try: - function() - except KeyboardInterrupt: - raise - except SkipTest as e: - outcome.success = False - outcome.skipped = str(e) - except _UnexpectedSuccess: - exc_info = sys.exc_info() - outcome.success = False - if isTest: - outcome.unexpectedSuccess = exc_info - else: - outcome.errors.append(exc_info) - except _ExpectedFailure: - outcome.success = False - exc_info = sys.exc_info() - if isTest: - outcome.expectedFailure = exc_info - else: - outcome.errors.append(exc_info) - except self.failureException: - outcome.success = False - outcome.failures.append(sys.exc_info()) - exc_info = sys.exc_info() - except: - outcome.success = False - outcome.errors.append(sys.exc_info()) + addUnexpectedSuccess = result.addUnexpectedSuccess + except AttributeError: + warnings.warn("TestResult has no addUnexpectedSuccess method, reporting as failure", + RuntimeWarning) + # We need to pass an actual exception and traceback to addFailure, + # otherwise the legacy result can choke. + try: + raise _UnexpectedSuccess from None + except _UnexpectedSuccess: + result.addFailure(self, sys.exc_info()) + else: + addUnexpectedSuccess(self) def run(self, result=None): orig_result = result @@ -427,46 +479,38 @@ class TestCase(object): try: skip_why = (getattr(self.__class__, '__unittest_skip_why__', '') or getattr(testMethod, '__unittest_skip_why__', '')) - self._addSkip(result, skip_why) + self._addSkip(result, self, skip_why) finally: result.stopTest(self) return + expecting_failure = getattr(testMethod, + "__unittest_expecting_failure__", False) try: - outcome = _Outcome() - self._outcomeForDoCleanups = outcome + outcome = _Outcome(result) + self._outcome = outcome - self._executeTestPart(self.setUp, outcome) + with outcome.testPartExecutor(self): + self.setUp() if outcome.success: - self._executeTestPart(testMethod, outcome, isTest=True) - self._executeTestPart(self.tearDown, outcome) + outcome.expecting_failure = expecting_failure + with outcome.testPartExecutor(self, isTest=True): + testMethod() + outcome.expecting_failure = False + with outcome.testPartExecutor(self): + self.tearDown() self.doCleanups() + for test, reason in outcome.skipped: + self._addSkip(result, test, reason) + self._feedErrorsToResult(result, outcome.errors) if outcome.success: - result.addSuccess(self) - else: - if outcome.skipped is not None: - self._addSkip(result, outcome.skipped) - for exc_info in outcome.errors: - result.addError(self, exc_info) - for exc_info in outcome.failures: - result.addFailure(self, exc_info) - if outcome.unexpectedSuccess is not None: - addUnexpectedSuccess = getattr(result, 'addUnexpectedSuccess', None) - if addUnexpectedSuccess is not None: - addUnexpectedSuccess(self) + if expecting_failure: + if outcome.expectedFailure: + self._addExpectedFailure(result, outcome.expectedFailure) else: - warnings.warn("TestResult has no addUnexpectedSuccess method, reporting as failures", - RuntimeWarning) - result.addFailure(self, outcome.unexpectedSuccess) - - if outcome.expectedFailure is not None: - addExpectedFailure = getattr(result, 'addExpectedFailure', None) - if addExpectedFailure is not None: - addExpectedFailure(self, outcome.expectedFailure) - else: - warnings.warn("TestResult has no addExpectedFailure method, reporting as passes", - RuntimeWarning) - result.addSuccess(self) + self._addUnexpectedSuccess(result) + else: + result.addSuccess(self) return result finally: result.stopTest(self) @@ -478,11 +522,11 @@ class TestCase(object): def doCleanups(self): """Execute all cleanup functions. Normally called for you after tearDown.""" - outcome = self._outcomeForDoCleanups or _Outcome() + outcome = self._outcome or _Outcome() while self._cleanups: function, args, kwargs = self._cleanups.pop() - part = lambda: function(*args, **kwargs) - self._executeTestPart(part, outcome) + with outcome.testPartExecutor(self): + function(*args, **kwargs) # return this for backwards compatibility # even though we no longer us it internally @@ -1213,3 +1257,39 @@ class FunctionTestCase(TestCase): return self._description doc = self._testFunc.__doc__ return doc and doc.split("\n")[0].strip() or None + + +class _SubTest(TestCase): + + def __init__(self, test_case, message, params): + super().__init__() + self._message = message + self.test_case = test_case + self.params = params + self.failureException = test_case.failureException + + def runTest(self): + raise NotImplementedError("subtests cannot be run directly") + + def _subDescription(self): + parts = [] + if self._message: + parts.append("[{}]".format(self._message)) + if self.params: + params_desc = ', '.join( + "{}={!r}".format(k, v) + for (k, v) in sorted(self.params.items())) + parts.append("({})".format(params_desc)) + return " ".join(parts) or '(<subtest>)' + + def id(self): + return "{} {}".format(self.test_case.id(), self._subDescription()) + + def shortDescription(self): + """Returns a one-line description of the subtest, or None if no + description has been provided. + """ + return self.test_case.shortDescription() + + def __str__(self): + return "{} {}".format(self.test_case, self._subDescription()) |