summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_contextlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_contextlib.py')
-rw-r--r--Lib/test/test_contextlib.py225
1 files changed, 225 insertions, 0 deletions
diff --git a/Lib/test/test_contextlib.py b/Lib/test/test_contextlib.py
index 6e38305..e52ed91 100644
--- a/Lib/test/test_contextlib.py
+++ b/Lib/test/test_contextlib.py
@@ -370,6 +370,231 @@ class TestContextDecorator(unittest.TestCase):
self.assertEqual(state, [1, 'something else', 999])
+class TestExitStack(unittest.TestCase):
+
+ def test_no_resources(self):
+ with ExitStack():
+ pass
+
+ def test_callback(self):
+ expected = [
+ ((), {}),
+ ((1,), {}),
+ ((1,2), {}),
+ ((), dict(example=1)),
+ ((1,), dict(example=1)),
+ ((1,2), dict(example=1)),
+ ]
+ result = []
+ def _exit(*args, **kwds):
+ """Test metadata propagation"""
+ result.append((args, kwds))
+ with ExitStack() as stack:
+ for args, kwds in reversed(expected):
+ if args and kwds:
+ f = stack.callback(_exit, *args, **kwds)
+ elif args:
+ f = stack.callback(_exit, *args)
+ elif kwds:
+ f = stack.callback(_exit, **kwds)
+ else:
+ f = stack.callback(_exit)
+ self.assertIs(f, _exit)
+ for wrapper in stack._exit_callbacks:
+ self.assertIs(wrapper.__wrapped__, _exit)
+ self.assertNotEqual(wrapper.__name__, _exit.__name__)
+ self.assertIsNone(wrapper.__doc__, _exit.__doc__)
+ self.assertEqual(result, expected)
+
+ def test_push(self):
+ exc_raised = ZeroDivisionError
+ def _expect_exc(exc_type, exc, exc_tb):
+ self.assertIs(exc_type, exc_raised)
+ def _suppress_exc(*exc_details):
+ return True
+ def _expect_ok(exc_type, exc, exc_tb):
+ self.assertIsNone(exc_type)
+ self.assertIsNone(exc)
+ self.assertIsNone(exc_tb)
+ class ExitCM(object):
+ def __init__(self, check_exc):
+ self.check_exc = check_exc
+ def __enter__(self):
+ self.fail("Should not be called!")
+ def __exit__(self, *exc_details):
+ self.check_exc(*exc_details)
+ with ExitStack() as stack:
+ stack.push(_expect_ok)
+ self.assertIs(stack._exit_callbacks[-1], _expect_ok)
+ cm = ExitCM(_expect_ok)
+ stack.push(cm)
+ self.assertIs(stack._exit_callbacks[-1].__self__, cm)
+ stack.push(_suppress_exc)
+ self.assertIs(stack._exit_callbacks[-1], _suppress_exc)
+ cm = ExitCM(_expect_exc)
+ stack.push(cm)
+ self.assertIs(stack._exit_callbacks[-1].__self__, cm)
+ stack.push(_expect_exc)
+ self.assertIs(stack._exit_callbacks[-1], _expect_exc)
+ stack.push(_expect_exc)
+ self.assertIs(stack._exit_callbacks[-1], _expect_exc)
+ 1/0
+
+ def test_enter_context(self):
+ class TestCM(object):
+ def __enter__(self):
+ result.append(1)
+ def __exit__(self, *exc_details):
+ result.append(3)
+
+ result = []
+ cm = TestCM()
+ with ExitStack() as stack:
+ @stack.callback # Registered first => cleaned up last
+ def _exit():
+ result.append(4)
+ self.assertIsNotNone(_exit)
+ stack.enter_context(cm)
+ self.assertIs(stack._exit_callbacks[-1].__self__, cm)
+ result.append(2)
+ self.assertEqual(result, [1, 2, 3, 4])
+
+ def test_close(self):
+ result = []
+ with ExitStack() as stack:
+ @stack.callback
+ def _exit():
+ result.append(1)
+ self.assertIsNotNone(_exit)
+ stack.close()
+ result.append(2)
+ self.assertEqual(result, [1, 2])
+
+ def test_pop_all(self):
+ result = []
+ with ExitStack() as stack:
+ @stack.callback
+ def _exit():
+ result.append(3)
+ self.assertIsNotNone(_exit)
+ new_stack = stack.pop_all()
+ result.append(1)
+ result.append(2)
+ new_stack.close()
+ self.assertEqual(result, [1, 2, 3])
+
+ def test_exit_raise(self):
+ with self.assertRaises(ZeroDivisionError):
+ with ExitStack() as stack:
+ stack.push(lambda *exc: False)
+ 1/0
+
+ def test_exit_suppress(self):
+ with ExitStack() as stack:
+ stack.push(lambda *exc: True)
+ 1/0
+
+ def test_exit_exception_chaining_reference(self):
+ # Sanity check to make sure that ExitStack chaining matches
+ # actual nested with statements
+ class RaiseExc:
+ def __init__(self, exc):
+ self.exc = exc
+ def __enter__(self):
+ return self
+ def __exit__(self, *exc_details):
+ raise self.exc
+
+ class RaiseExcWithContext:
+ def __init__(self, outer, inner):
+ self.outer = outer
+ self.inner = inner
+ def __enter__(self):
+ return self
+ def __exit__(self, *exc_details):
+ try:
+ raise self.inner
+ except:
+ raise self.outer
+
+ class SuppressExc:
+ def __enter__(self):
+ return self
+ def __exit__(self, *exc_details):
+ type(self).saved_details = exc_details
+ return True
+
+ try:
+ with RaiseExc(IndexError):
+ with RaiseExcWithContext(KeyError, AttributeError):
+ with SuppressExc():
+ with RaiseExc(ValueError):
+ 1 / 0
+ except IndexError as exc:
+ self.assertIsInstance(exc.__context__, KeyError)
+ self.assertIsInstance(exc.__context__.__context__, AttributeError)
+ # Inner exceptions were suppressed
+ self.assertIsNone(exc.__context__.__context__.__context__)
+ else:
+ self.fail("Expected IndexError, but no exception was raised")
+ # Check the inner exceptions
+ inner_exc = SuppressExc.saved_details[1]
+ self.assertIsInstance(inner_exc, ValueError)
+ self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
+
+ def test_exit_exception_chaining(self):
+ # Ensure exception chaining matches the reference behaviour
+ def raise_exc(exc):
+ raise exc
+
+ saved_details = None
+ def suppress_exc(*exc_details):
+ nonlocal saved_details
+ saved_details = exc_details
+ return True
+
+ try:
+ with ExitStack() as stack:
+ stack.callback(raise_exc, IndexError)
+ stack.callback(raise_exc, KeyError)
+ stack.callback(raise_exc, AttributeError)
+ stack.push(suppress_exc)
+ stack.callback(raise_exc, ValueError)
+ 1 / 0
+ except IndexError as exc:
+ self.assertIsInstance(exc.__context__, KeyError)
+ self.assertIsInstance(exc.__context__.__context__, AttributeError)
+ # Inner exceptions were suppressed
+ self.assertIsNone(exc.__context__.__context__.__context__)
+ else:
+ self.fail("Expected IndexError, but no exception was raised")
+ # Check the inner exceptions
+ inner_exc = saved_details[1]
+ self.assertIsInstance(inner_exc, ValueError)
+ self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
+
+ def test_exit_exception_chaining_suppress(self):
+ with ExitStack() as stack:
+ stack.push(lambda *exc: True)
+ stack.push(lambda *exc: 1/0)
+ stack.push(lambda *exc: {}[1])
+
+ def test_excessive_nesting(self):
+ # The original implementation would die with RecursionError here
+ with ExitStack() as stack:
+ for i in range(10000):
+ stack.callback(int)
+
+ def test_instance_bypass(self):
+ class Example(object): pass
+ cm = Example()
+ cm.__exit__ = object()
+ stack = ExitStack()
+ self.assertRaises(AttributeError, stack.enter_context, cm)
+ stack.push(cm)
+ self.assertIs(stack._exit_callbacks[-1], cm)
+
+
# This is needed to make the test actually run under regrtest.py!
def test_main():
support.run_unittest(__name__)