"""Unit tests for contextlib.py, and other context managers.""" import io import sys import tempfile import threading import unittest from contextlib import * # Tests __all__ from test import support import weakref class TestAbstractContextManager(unittest.TestCase): def test_enter(self): class DefaultEnter(AbstractContextManager): def __exit__(self, *args): super().__exit__(*args) manager = DefaultEnter() self.assertIs(manager.__enter__(), manager) def test_exit_is_abstract(self): class MissingExit(AbstractContextManager): pass with self.assertRaises(TypeError): MissingExit() def test_structural_subclassing(self): class ManagerFromScratch: def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): return None self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager)) class DefaultEnter(AbstractContextManager): def __exit__(self, *args): super().__exit__(*args) self.assertTrue(issubclass(DefaultEnter, AbstractContextManager)) class NoEnter(ManagerFromScratch): __enter__ = None self.assertFalse(issubclass(NoEnter, AbstractContextManager)) class NoExit(ManagerFromScratch): __exit__ = None self.assertFalse(issubclass(NoExit, AbstractContextManager)) class ContextManagerTestCase(unittest.TestCase): def test_contextmanager_plain(self): state = [] @contextmanager def woohoo(): state.append(1) yield 42 state.append(999) with woohoo() as x: self.assertEqual(state, [1]) self.assertEqual(x, 42) state.append(x) self.assertEqual(state, [1, 42, 999]) def test_contextmanager_finally(self): state = [] @contextmanager def woohoo(): state.append(1) try: yield 42 finally: state.append(999) with self.assertRaises(ZeroDivisionError): with woohoo() as x: self.assertEqual(state, [1]) self.assertEqual(x, 42) state.append(x) raise ZeroDivisionError() self.assertEqual(state, [1, 42, 999]) def test_contextmanager_no_reraise(self): @contextmanager def whee(): yield ctx = whee() ctx.__enter__() # Calling __exit__ should not result in an exception self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None)) def test_contextmanager_trap_yield_after_throw(self): @contextmanager def whoo(): try: yield except: yield ctx = whoo() ctx.__enter__() self.assertRaises( RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None ) def test_contextmanager_except(self): state = [] @contextmanager def woohoo(): state.append(1) try: yield 42 except ZeroDivisionError as e: state.append(e.args[0]) self.assertEqual(state, [1, 42, 999]) with woohoo() as x: self.assertEqual(state, [1]) self.assertEqual(x, 42) state.append(x) raise ZeroDivisionError(999) self.assertEqual(state, [1, 42, 999]) def test_contextmanager_except_stopiter(self): stop_exc = StopIteration('spam') @contextmanager def woohoo(): yield try: with self.assertWarnsRegex(DeprecationWarning, "StopIteration"): with woohoo(): raise stop_exc except Exception as ex: self.assertIs(ex, stop_exc) else: self.fail('StopIteration was suppressed') def test_contextmanager_except_pep479(self): code = """\ from __future__ import generator_stop from contextlib import contextmanager @contextmanager def woohoo(): yield """ locals = {} exec(code, locals, locals) woohoo = locals['woohoo'] stop_exc = StopIteration('spam') try: with woohoo(): raise stop_exc except Exception as ex: self.assertIs(ex, stop_exc) else: self.fail('StopIteration was suppressed') def test_contextmanager_do_not_unchain_non_stopiteration_exceptions(self): @contextmanager def test_issue29692(): try: yield except Exception as exc: raise RuntimeError('issue29692:Chained') from exc try: with test_issue29692(): raise ZeroDivisionError except Exception as ex: self.assertIs(type(ex), RuntimeError) self.assertEqual(ex.args[0], 'issue29692:Chained') self.assertIsInstance(ex.__cause__, ZeroDivisionError) try: with test_issue29692(): raise StopIteration('issue29692:Unchained') except Exception as ex: self.assertIs(type(ex), StopIteration) self.assertEqual(ex.args[0], 'issue29692:Unchained') self.assertIsNone(ex.__cause__) def _create_contextmanager_attribs(self): def attribs(**kw): def decorate(func): for k,v in kw.items(): setattr(func,k,v) return func return decorate @contextmanager @attribs(foo='bar') def baz(spam): """Whee!""" return baz def test_contextmanager_attribs(self): baz = self._create_contextmanager_attribs() self.assertEqual(baz.__name__,'baz') self.assertEqual(baz.foo, 'bar') @support.requires_docstrings def test_contextmanager_doc_attrib(self): baz = self._create_contextmanager_attribs() self.assertEqual(baz.__doc__, "Whee!") @support.requires_docstrings def test_instance_docstring_given_cm_docstring(self): baz = self._create_contextmanager_attribs()(None) self.assertEqual(baz.__doc__, "Whee!") def test_keywords(self): # Ensure no keyword arguments are inhibited @contextmanager def woohoo(self, func, args, kwds): yield (self, func, args, kwds) with woohoo(self=11, func=22, args=33, kwds=44) as target: self.assertEqual(target, (11, 22, 33, 44)) def test_nokeepref(self): class A: pass @contextmanager def woohoo(a, b): a = weakref.ref(a) b = weakref.ref(b) self.assertIsNone(a()) self.assertIsNone(b()) yield with woohoo(A(), b=A()): pass def test_param_errors(self): @contextmanager def woohoo(a, *, b): yield with self.assertRaises(TypeError): woohoo() with self.assertRaises(TypeError): woohoo(3, 5) with self.assertRaises(TypeError): woohoo(b=3) def test_recursive(self): depth = 0 @contextmanager def woohoo(): nonlocal depth before = depth depth += 1 yield depth -= 1 self.assertEqual(depth, before) @woohoo() def recursive(): if depth < 10: recursive() recursive() self.assertEqual(depth, 0) class ClosingTestCase(unittest.TestCase): @support.requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = closing.__doc__ obj = closing(None) self.assertEqual(obj.__doc__, cm_docstring) def test_closing(self): state = [] class C: def close(self): state.append(1) x = C() self.assertEqual(state, []) with closing(x) as y: self.assertEqual(x, y) self.assertEqual(state, [1]) def test_closing_error(self): state = [] class C: def close(self): state.append(1) x = C() self.assertEqual(state, []) with self.assertRaises(ZeroDivisionError): with closing(x) as y: self.assertEqual(x, y) 1 / 0 self.assertEqual(state, [1]) class NullcontextTestCase(unittest.TestCase): def test_nullcontext(self): class C: pass c = C() with nullcontext(c) as c_in: self.assertIs(c_in, c) class FileContextTestCase(unittest.TestCase): def testWithOpen(self): tfn = tempfile.mktemp() try: f = None with open(tfn, "w") as f: self.assertFalse(f.closed) f.write("Booh\n") self.assertTrue(f.closed) f = None with self.assertRaises(ZeroDivisionError): with open(tfn, "r") as f: self.assertFalse(f.closed) self.assertEqual(f.read(), "Booh\n") 1 / 0 self.assertTrue(f.closed) finally: support.unlink(tfn) class LockContextTestCase(unittest.TestCase): def boilerPlate(self, lock, locked): self.assertFalse(locked()) with lock: self.assertTrue(locked()) self.assertFalse(locked()) with self.assertRaises(ZeroDivisionError): with lock: self.assertTrue(locked()) 1 / 0 self.assertFalse(locked()) def testWithLock(self): lock = threading.Lock() self.boilerPlate(lock, lock.locked) def testWithRLock(self): lock = threading.RLock() self.boilerPlate(lock, lock._is_owned) def testWithCondition(self): lock = threading.Condition() def locked(): return lock._is_owned() self.boilerPlate(lock, locked) def testWithSemaphore(self): lock = threading.Semaphore() def locked(): if lock.acquire(False): lock.release() return False else: return True self.boilerPlate(lock, locked) def testWithBoundedSemaphore(self): lock = threading.BoundedSemaphore() def locked(): if lock.acquire(False): lock.release() return False else: return True self.boilerPlate(lock, locked) class mycontext(ContextDecorator): """Example decoration-compatible context manager for testing""" started = False exc = None catch = False def __enter__(self): self.started = True return self def __exit__(self, *exc): self.exc = exc return self.catch class TestContextDecorator(unittest.TestCase): @support.requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = mycontext.__doc__ obj = mycontext() self.assertEqual(obj.__doc__, cm_docstring) def test_contextdecorator(self): context = mycontext() with context as result: self.assertIs(result, context) self.assertTrue(context.started) self.assertEqual(context.exc, (None, None, None)) def test_contextdecorator_with_exception(self): context = mycontext() with self.assertRaisesRegex(NameError, 'foo'): with context: raise NameError('foo') self.assertIsNotNone(context.exc) self.assertIs(context.exc[0], NameError) context = mycontext() context.catch = True with context: raise NameError('foo') self.assertIsNotNone(context.exc) self.assertIs(context.exc[0], NameError) def test_decorator(self): context = mycontext() @context def test(): self.assertIsNone(context.exc) self.assertTrue(context.started) test() self.assertEqual(context.exc, (None, None, None)) def test_decorator_with_exception(self): context = mycontext() @context def test(): self.assertIsNone(context.exc) self.assertTrue(context.started) raise NameError('foo') with self.assertRaisesRegex(NameError, 'foo'): test() self.assertIsNotNone(context.exc) self.assertIs(context.exc[0], NameError) def test_decorating_method(self): context = mycontext() class Test(object): @context def method(self, a, b, c=None): self.a = a self.b = b self.c = c # these tests are for argument passing when used as a decorator test = Test() test.method(1, 2) self.assertEqual(test.a, 1) self.assertEqual(test.b, 2) self.assertEqual(test.c, None) test = Test() test.method('a', 'b', 'c') self.assertEqual(test.a, 'a') self.assertEqual(test.b, 'b') self.assertEqual(test.c, 'c') test = Test() test.method(a=1, b=2) self.assertEqual(test.a, 1) self.assertEqual(test.b, 2) def test_typo_enter(self): class mycontext(ContextDecorator): def __unter__(self): pass def __exit__(self, *exc): pass with self.assertRaises(AttributeError): with mycontext(): pass def test_typo_exit(self): class mycontext(ContextDecorator): def __enter__(self): pass def __uxit__(self, *exc): pass with self.assertRaises(AttributeError): with mycontext(): pass def test_contextdecorator_as_mixin(self): class somecontext(object): started = False exc = None def __enter__(self): self.started = True return self def __exit__(self, *exc): self.exc = exc class mycontext(somecontext, ContextDecorator): pass context = mycontext() @context def test(): self.assertIsNone(context.exc) self.assertTrue(context.started) test() self.assertEqual(context.exc, (None, None, None)) def test_contextmanager_as_decorator(self): @contextmanager def woohoo(y): state.append(y) yield state.append(999) state = [] @woohoo(1) def test(x): self.assertEqual(state, [1]) state.append(x) test('something') self.assertEqual(state, [1, 'something', 999]) # Issue #11647: Ensure the decorated function is 'reusable' state = [] test('something else') self.assertEqual(state, [1, 'something else', 999]) class TestBaseExitStack: exit_stack = None @support.requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = self.exit_stack.__doc__ obj = self.exit_stack() self.assertEqual(obj.__doc__, cm_docstring) def test_no_resources(self): with self.exit_stack(): pass def test_callback(self): expected = [ ((), {}), ((1,), {}), ((1,2), {}), ((), dict(example=1)), ((1,), dict(example=1)), ((1,2), dict(example=1)), ] result = [] def _exit(*args, **kwds): """Test metadata propagation""" result.append((args, kwds)) with self.exit_stack() as stack: for args, kwds in reversed(expected): if args and kwds: f = stack.callback(_exit, *args, **kwds) elif args: f = stack.callback(_exit, *args) elif kwds: f = stack.callback(_exit, **kwds) else: f = stack.callback(_exit) self.assertIs(f, _exit) for wrapper in stack._exit_callbacks: self.assertIs(wrapper[1].__wrapped__, _exit) self.assertNotEqual(wrapper[1].__name__, _exit.__name__) self.assertIsNone(wrapper[1].__doc__, _exit.__doc__) self.assertEqual(result, expected) def test_push(self): exc_raised = ZeroDivisionError def _expect_exc(exc_type, exc, exc_tb): self.assertIs(exc_type, exc_raised) def _suppress_exc(*exc_details): return True def _expect_ok(exc_type, exc, exc_tb): self.assertIsNone(exc_type) self.assertIsNone(exc) self.assertIsNone(exc_tb) class ExitCM(object): def __init__(self, check_exc): self.check_exc = check_exc def __enter__(self): self.fail("Should not be called!") def __exit__(self, *exc_details): self.check_exc(*exc_details) with self.exit_stack() as stack: stack.push(_expect_ok) self.assertIs(stack._exit_callbacks[-1][1], _expect_ok) cm = ExitCM(_expect_ok) stack.push(cm) self.assertIs(stack._exit_callbacks[-1][1].__self__, cm) stack.push(_suppress_exc) self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc) cm = ExitCM(_expect_exc) stack.push(cm) self.assertIs(stack._exit_callbacks[-1][1].__self__, cm) stack.push(_expect_exc) self.assertIs(stack._exit_callbacks[-1][1], _expect_exc) stack.push(_expect_exc) self.assertIs(stack._exit_callbacks[-1][1], _expect_exc) 1/0 def test_enter_context(self): class TestCM(object): def __enter__(self): result.append(1) def __exit__(self, *exc_details): result.append(3) result = [] cm = TestCM() with self.exit_stack() as stack: @stack.callback # Registered first => cleaned up last def _exit(): result.append(4) self.assertIsNotNone(_exit) stack.enter_context(cm) self.assertIs(stack._exit_callbacks[-1][1].__self__, cm) result.append(2) self.assertEqual(result, [1, 2, 3, 4]) def test_close(self): result = [] with self.exit_stack() as stack: @stack.callback def _exit(): result.append(1) self.assertIsNotNone(_exit) stack.close() result.append(2) self.assertEqual(result, [1, 2]) def test_pop_all(self): result = [] with self.exit_stack() as stack: @stack.callback def _exit(): result.append(3) self.assertIsNotNone(_exit) new_stack = stack.pop_all() result.append(1) result.append(2) new_stack.close() self.assertEqual(result, [1, 2, 3]) def test_exit_raise(self): with self.assertRaises(ZeroDivisionError): with self.exit_stack() as stack: stack.push(lambda *exc: False) 1/0 def test_exit_suppress(self): with self.exit_stack() as stack: stack.push(lambda *exc: True) 1/0 def test_exit_exception_chaining_reference(self): # Sanity check to make sure that ExitStack chaining matches # actual nested with statements class RaiseExc: def __init__(self, exc): self.exc = exc def __enter__(self): return self def __exit__(self, *exc_details): raise self.exc class RaiseExcWithContext: def __init__(self, outer, inner): self.outer = outer self.inner = inner def __enter__(self): return self def __exit__(self, *exc_details): try: raise self.inner except: raise self.outer class SuppressExc: def __enter__(self): return self def __exit__(self, *exc_details): type(self).saved_details = exc_details return True try: with RaiseExc(IndexError): with RaiseExcWithContext(KeyError, AttributeError): with SuppressExc(): with RaiseExc(ValueError): 1 / 0 except IndexError as exc: self.assertIsInstance(exc.__context__, KeyError) self.assertIsInstance(exc.__context__.__context__, AttributeError) # Inner exceptions were suppressed self.assertIsNone(exc.__context__.__context__.__context__) else: self.fail("Expected IndexError, but no exception was raised") # Check the inner exceptions inner_exc = SuppressExc.saved_details[1] self.assertIsInstance(inner_exc, ValueError) self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) def test_exit_exception_chaining(self): # Ensure exception chaining matches the reference behaviour def raise_exc(exc): raise exc saved_details = None def suppress_exc(*exc_details): nonlocal saved_details saved_details = exc_details return True try: with self.exit_stack() as stack: stack.callback(raise_exc, IndexError) stack.callback(raise_exc, KeyError) stack.callback(raise_exc, AttributeError) stack.push(suppress_exc) stack.callback(raise_exc, ValueError) 1 / 0 except IndexError as exc: self.assertIsInstance(exc.__context__, KeyError) self.assertIsInstance(exc.__context__.__context__, AttributeError) # Inner exceptions were suppressed self.assertIsNone(exc.__context__.__context__.__context__) else: self.fail("Expected IndexError, but no exception was raised") # Check the inner exceptions inner_exc = saved_details[1] self.assertIsInstance(inner_exc, ValueError) self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) def test_exit_exception_non_suppressing(self): # http://bugs.python.org/issue19092 def raise_exc(exc): raise exc def suppress_exc(*exc_details): return True try: with self.exit_stack() as stack: stack.callback(lambda: None) stack.callback(raise_exc, IndexError) except Exception as exc: self.assertIsInstance(exc, IndexError) else: self.fail("Expected IndexError, but no exception was raised") try: with self.exit_stack() as stack: stack.callback(raise_exc, KeyError) stack.push(suppress_exc) stack.callback(raise_exc, IndexError) except Exception as exc: self.assertIsInstance(exc, KeyError) else: self.fail("Expected KeyError, but no exception was raised") def test_exit_exception_with_correct_context(self): # http://bugs.python.org/issue20317 @contextmanager def gets_the_context_right(exc): try: yield finally: raise exc exc1 = Exception(1) exc2 = Exception(2) exc3 = Exception(3) exc4 = Exception(4) # The contextmanager already fixes the context, so prior to the # fix, ExitStack would try to fix it *again* and get into an # infinite self-referential loop try: with self.exit_stack() as stack: stack.enter_context(gets_the_context_right(exc4)) stack.enter_context(gets_the_context_right(exc3)) stack.enter_context(gets_the_context_right(exc2)) raise exc1 except Exception as exc: self.assertIs(exc, exc4) self.assertIs(exc.__context__, exc3) self.assertIs(exc.__context__.__context__, exc2) self.assertIs(exc.__context__.__context__.__context__, exc1) self.assertIsNone( exc.__context__.__context__.__context__.__context__) def test_exit_exception_with_existing_context(self): # Addresses a lack of test coverage discovered after checking in a # fix for issue 20317 that still contained debugging code. def raise_nested(inner_exc, outer_exc): try: raise inner_exc finally: raise outer_exc exc1 = Exception(1) exc2 = Exception(2) exc3 = Exception(3) exc4 = Exception(4) exc5 = Exception(5) try: with self.exit_stack() as stack: stack.callback(raise_nested, exc4, exc5) stack.callback(raise_nested, exc2, exc3) raise exc1 except Exception as exc: self.assertIs(exc, exc5) self.assertIs(exc.__context__, exc4) self.assertIs(exc.__context__.__context__, exc3) self.assertIs(exc.__context__.__context__.__context__, exc2) self.assertIs( exc.__context__.__context__.__context__.__context__, exc1) self.assertIsNone( exc.__context__.__context__.__context__.__context__.__context__) def test_body_exception_suppress(self): def suppress_exc(*exc_details): return True try: with self.exit_stack() as stack: stack.push(suppress_exc) 1/0 except IndexError as exc: self.fail("Expected no exception, got IndexError") def test_exit_exception_chaining_suppress(self): with self.exit_stack() as stack: stack.push(lambda *exc: True) stack.push(lambda *exc: 1/0) stack.push(lambda *exc: {}[1]) def test_excessive_nesting(self): # The original implementation would die with RecursionError here with self.exit_stack() as stack: for i in range(10000): stack.callback(int) def test_instance_bypass(self): class Example(object): pass cm = Example() cm.__exit__ = object() stack = self.exit_stack() self.assertRaises(AttributeError, stack.enter_context, cm) stack.push(cm) self.assertIs(stack._exit_callbacks[-1][1], cm) def test_dont_reraise_RuntimeError(self): # https://bugs.python.org/issue27122 class UniqueException(Exception): pass class UniqueRuntimeError(RuntimeError): pass @contextmanager def second(): try: yield 1 except Exception as exc: raise UniqueException("new exception") from exc @contextmanager def first(): try: yield 1 except Exception as exc: raise exc # The UniqueRuntimeError should be caught by second()'s exception # handler which chain raised a new UniqueException. with self.assertRaises(UniqueException) as err_ctx: with self.exit_stack() as es_ctx: es_ctx.enter_context(second()) es_ctx.enter_context(first()) raise UniqueRuntimeError("please no infinite loop.") exc = err_ctx.exception self.assertIsInstance(exc, UniqueException) self.assertIsInstance(exc.__context__, UniqueRuntimeError) self.assertIsNone(exc.__context__.__context__) self.assertIsNone(exc.__context__.__cause__) self.assertIs(exc.__cause__, exc.__context__) class TestExitStack(TestBaseExitStack, unittest.TestCase): exit_stack = ExitStack class TestRedirectStream: redirect_stream = None orig_stream = None @support.requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = self.redirect_stream.__doc__ obj = self.redirect_stream(None) self.assertEqual(obj.__doc__, cm_docstring) def test_no_redirect_in_init(self): orig_stdout = getattr(sys, self.orig_stream) self.redirect_stream(None) self.assertIs(getattr(sys, self.orig_stream), orig_stdout) def test_redirect_to_string_io(self): f = io.StringIO() msg = "Consider an API like help(), which prints directly to stdout" orig_stdout = getattr(sys, self.orig_stream) with self.redirect_stream(f): print(msg, file=getattr(sys, self.orig_stream)) self.assertIs(getattr(sys, self.orig_stream), orig_stdout) s = f.getvalue().strip() self.assertEqual(s, msg) def test_enter_result_is_target(self): f = io.StringIO() with self.redirect_stream(f) as enter_result: self.assertIs(enter_result, f) def test_cm_is_reusable(self): f = io.StringIO() write_to_f = self.redirect_stream(f) orig_stdout = getattr(sys, self.orig_stream) with write_to_f: print("Hello", end=" ", file=getattr(sys, self.orig_stream)) with write_to_f: print("World!", file=getattr(sys, self.orig_stream)) self.assertIs(getattr(sys, self.orig_stream), orig_stdout) s = f.getvalue() self.assertEqual(s, "Hello World!\n") def test_cm_is_reentrant(self): f = io.StringIO() write_to_f = self.redirect_stream(f) orig_stdout = getattr(sys, self.orig_stream) with write_to_f: print("Hello", end=" ", file=getattr(sys, self.orig_stream)) with write_to_f: print("World!", file=getattr(sys, self.orig_stream)) self.assertIs(getattr(sys, self.orig_stream), orig_stdout) s = f.getvalue() self.assertEqual(s, "Hello World!\n") class TestRedirectStdout(TestRedirectStream, unittest.TestCase): redirect_stream = redirect_stdout orig_stream = "stdout" class TestRedirectStderr(TestRedirectStream, unittest.TestCase): redirect_stream = redirect_stderr orig_stream = "stderr" class TestSuppress(unittest.TestCase): @support.requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings cm_docstring = suppress.__doc__ obj = suppress() self.assertEqual(obj.__doc__, cm_docstring) def test_no_result_from_enter(self): with suppress(ValueError) as enter_result: self.assertIsNone(enter_result) def test_no_exception(self): with suppress(ValueError): self.assertEqual(pow(2, 5), 32) def test_exact_exception(self): with suppress(TypeError): len(5) def test_exception_hierarchy(self): with suppress(LookupError): 'Hello'[50] def test_other_exception(self): with self.assertRaises(ZeroDivisionError): with suppress(TypeError): 1/0 def test_no_args(self): with self.assertRaises(ZeroDivisionError): with suppress(): 1/0 def test_multiple_exception_args(self): with suppress(ZeroDivisionError, TypeError): 1/0 with suppress(ZeroDivisionError, TypeError): len(5) def test_cm_is_reentrant(self): ignore_exceptions = suppress(Exception) with ignore_exceptions: pass with ignore_exceptions: len(5) with ignore_exceptions: with ignore_exceptions: # Check nested usage len(5) outer_continued = True 1/0 self.assertTrue(outer_continued) if __name__ == "__main__": unittest.main()