diff options
Diffstat (limited to 'Lib/test/test_contextlib.py')
-rw-r--r-- | Lib/test/test_contextlib.py | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/Lib/test/test_contextlib.py b/Lib/test/test_contextlib.py index 6e38305..e52ed91 100644 --- a/Lib/test/test_contextlib.py +++ b/Lib/test/test_contextlib.py @@ -370,6 +370,231 @@ class TestContextDecorator(unittest.TestCase): self.assertEqual(state, [1, 'something else', 999]) +class TestExitStack(unittest.TestCase): + + def test_no_resources(self): + with ExitStack(): + pass + + def test_callback(self): + expected = [ + ((), {}), + ((1,), {}), + ((1,2), {}), + ((), dict(example=1)), + ((1,), dict(example=1)), + ((1,2), dict(example=1)), + ] + result = [] + def _exit(*args, **kwds): + """Test metadata propagation""" + result.append((args, kwds)) + with ExitStack() as stack: + for args, kwds in reversed(expected): + if args and kwds: + f = stack.callback(_exit, *args, **kwds) + elif args: + f = stack.callback(_exit, *args) + elif kwds: + f = stack.callback(_exit, **kwds) + else: + f = stack.callback(_exit) + self.assertIs(f, _exit) + for wrapper in stack._exit_callbacks: + self.assertIs(wrapper.__wrapped__, _exit) + self.assertNotEqual(wrapper.__name__, _exit.__name__) + self.assertIsNone(wrapper.__doc__, _exit.__doc__) + self.assertEqual(result, expected) + + def test_push(self): + exc_raised = ZeroDivisionError + def _expect_exc(exc_type, exc, exc_tb): + self.assertIs(exc_type, exc_raised) + def _suppress_exc(*exc_details): + return True + def _expect_ok(exc_type, exc, exc_tb): + self.assertIsNone(exc_type) + self.assertIsNone(exc) + self.assertIsNone(exc_tb) + class ExitCM(object): + def __init__(self, check_exc): + self.check_exc = check_exc + def __enter__(self): + self.fail("Should not be called!") + def __exit__(self, *exc_details): + self.check_exc(*exc_details) + with ExitStack() as stack: + stack.push(_expect_ok) + self.assertIs(stack._exit_callbacks[-1], _expect_ok) + cm = ExitCM(_expect_ok) + stack.push(cm) + self.assertIs(stack._exit_callbacks[-1].__self__, cm) + stack.push(_suppress_exc) + self.assertIs(stack._exit_callbacks[-1], _suppress_exc) + cm = ExitCM(_expect_exc) + stack.push(cm) + self.assertIs(stack._exit_callbacks[-1].__self__, cm) + stack.push(_expect_exc) + self.assertIs(stack._exit_callbacks[-1], _expect_exc) + stack.push(_expect_exc) + self.assertIs(stack._exit_callbacks[-1], _expect_exc) + 1/0 + + def test_enter_context(self): + class TestCM(object): + def __enter__(self): + result.append(1) + def __exit__(self, *exc_details): + result.append(3) + + result = [] + cm = TestCM() + with ExitStack() as stack: + @stack.callback # Registered first => cleaned up last + def _exit(): + result.append(4) + self.assertIsNotNone(_exit) + stack.enter_context(cm) + self.assertIs(stack._exit_callbacks[-1].__self__, cm) + result.append(2) + self.assertEqual(result, [1, 2, 3, 4]) + + def test_close(self): + result = [] + with ExitStack() as stack: + @stack.callback + def _exit(): + result.append(1) + self.assertIsNotNone(_exit) + stack.close() + result.append(2) + self.assertEqual(result, [1, 2]) + + def test_pop_all(self): + result = [] + with ExitStack() as stack: + @stack.callback + def _exit(): + result.append(3) + self.assertIsNotNone(_exit) + new_stack = stack.pop_all() + result.append(1) + result.append(2) + new_stack.close() + self.assertEqual(result, [1, 2, 3]) + + def test_exit_raise(self): + with self.assertRaises(ZeroDivisionError): + with ExitStack() as stack: + stack.push(lambda *exc: False) + 1/0 + + def test_exit_suppress(self): + with ExitStack() as stack: + stack.push(lambda *exc: True) + 1/0 + + def test_exit_exception_chaining_reference(self): + # Sanity check to make sure that ExitStack chaining matches + # actual nested with statements + class RaiseExc: + def __init__(self, exc): + self.exc = exc + def __enter__(self): + return self + def __exit__(self, *exc_details): + raise self.exc + + class RaiseExcWithContext: + def __init__(self, outer, inner): + self.outer = outer + self.inner = inner + def __enter__(self): + return self + def __exit__(self, *exc_details): + try: + raise self.inner + except: + raise self.outer + + class SuppressExc: + def __enter__(self): + return self + def __exit__(self, *exc_details): + type(self).saved_details = exc_details + return True + + try: + with RaiseExc(IndexError): + with RaiseExcWithContext(KeyError, AttributeError): + with SuppressExc(): + with RaiseExc(ValueError): + 1 / 0 + except IndexError as exc: + self.assertIsInstance(exc.__context__, KeyError) + self.assertIsInstance(exc.__context__.__context__, AttributeError) + # Inner exceptions were suppressed + self.assertIsNone(exc.__context__.__context__.__context__) + else: + self.fail("Expected IndexError, but no exception was raised") + # Check the inner exceptions + inner_exc = SuppressExc.saved_details[1] + self.assertIsInstance(inner_exc, ValueError) + self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) + + def test_exit_exception_chaining(self): + # Ensure exception chaining matches the reference behaviour + def raise_exc(exc): + raise exc + + saved_details = None + def suppress_exc(*exc_details): + nonlocal saved_details + saved_details = exc_details + return True + + try: + with ExitStack() as stack: + stack.callback(raise_exc, IndexError) + stack.callback(raise_exc, KeyError) + stack.callback(raise_exc, AttributeError) + stack.push(suppress_exc) + stack.callback(raise_exc, ValueError) + 1 / 0 + except IndexError as exc: + self.assertIsInstance(exc.__context__, KeyError) + self.assertIsInstance(exc.__context__.__context__, AttributeError) + # Inner exceptions were suppressed + self.assertIsNone(exc.__context__.__context__.__context__) + else: + self.fail("Expected IndexError, but no exception was raised") + # Check the inner exceptions + inner_exc = saved_details[1] + self.assertIsInstance(inner_exc, ValueError) + self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) + + def test_exit_exception_chaining_suppress(self): + with ExitStack() as stack: + stack.push(lambda *exc: True) + stack.push(lambda *exc: 1/0) + stack.push(lambda *exc: {}[1]) + + def test_excessive_nesting(self): + # The original implementation would die with RecursionError here + with ExitStack() as stack: + for i in range(10000): + stack.callback(int) + + def test_instance_bypass(self): + class Example(object): pass + cm = Example() + cm.__exit__ = object() + stack = ExitStack() + self.assertRaises(AttributeError, stack.enter_context, cm) + stack.push(cm) + self.assertIs(stack._exit_callbacks[-1], cm) + + # This is needed to make the test actually run under regrtest.py! def test_main(): support.run_unittest(__name__) |