diff options
Diffstat (limited to 'Lib/test/test_contextlib.py')
| -rw-r--r-- | Lib/test/test_contextlib.py | 128 |
1 files changed, 107 insertions, 21 deletions
diff --git a/Lib/test/test_contextlib.py b/Lib/test/test_contextlib.py index 2669651..516403e 100644 --- a/Lib/test/test_contextlib.py +++ b/Lib/test/test_contextlib.py @@ -83,6 +83,42 @@ class ContextManagerTestCase(unittest.TestCase): raise ZeroDivisionError(999) self.assertEqual(state, [1, 42, 999]) + def test_contextmanager_except_stopiter(self): + stop_exc = StopIteration('spam') + @contextmanager + def woohoo(): + yield + try: + with self.assertWarnsRegex(PendingDeprecationWarning, + "StopIteration"): + with woohoo(): + raise stop_exc + except Exception as ex: + self.assertIs(ex, stop_exc) + else: + self.fail('StopIteration was suppressed') + + def test_contextmanager_except_pep479(self): + code = """\ +from __future__ import generator_stop +from contextlib import contextmanager +@contextmanager +def woohoo(): + yield +""" + locals = {} + exec(code, locals, locals) + woohoo = locals['woohoo'] + + stop_exc = StopIteration('spam') + try: + with woohoo(): + raise stop_exc + except Exception as ex: + self.assertIs(ex, stop_exc) + else: + self.fail('StopIteration was suppressed') + def _create_contextmanager_attribs(self): def attribs(**kw): def decorate(func): @@ -726,60 +762,110 @@ class TestExitStack(unittest.TestCase): stack.push(cm) self.assertIs(stack._exit_callbacks[-1], cm) -class TestRedirectStdout(unittest.TestCase): + def test_dont_reraise_RuntimeError(self): + # https://bugs.python.org/issue27122 + class UniqueException(Exception): pass + class UniqueRuntimeError(RuntimeError): pass + + @contextmanager + def second(): + try: + yield 1 + except Exception as exc: + raise UniqueException("new exception") from exc + + @contextmanager + def first(): + try: + yield 1 + except Exception as exc: + raise exc + + # The UniqueRuntimeError should be caught by second()'s exception + # handler which chain raised a new UniqueException. + with self.assertRaises(UniqueException) as err_ctx: + with ExitStack() as es_ctx: + es_ctx.enter_context(second()) + es_ctx.enter_context(first()) + raise UniqueRuntimeError("please no infinite loop.") + + exc = err_ctx.exception + self.assertIsInstance(exc, UniqueException) + self.assertIsInstance(exc.__context__, UniqueRuntimeError) + self.assertIsNone(exc.__context__.__context__) + self.assertIsNone(exc.__context__.__cause__) + self.assertIs(exc.__cause__, exc.__context__) + + +class TestRedirectStream: + + redirect_stream = None + orig_stream = None @support.requires_docstrings def test_instance_docs(self): # Issue 19330: ensure context manager instances have good docstrings - cm_docstring = redirect_stdout.__doc__ - obj = redirect_stdout(None) + cm_docstring = self.redirect_stream.__doc__ + obj = self.redirect_stream(None) self.assertEqual(obj.__doc__, cm_docstring) def test_no_redirect_in_init(self): - orig_stdout = sys.stdout - redirect_stdout(None) - self.assertIs(sys.stdout, orig_stdout) + orig_stdout = getattr(sys, self.orig_stream) + self.redirect_stream(None) + self.assertIs(getattr(sys, self.orig_stream), orig_stdout) def test_redirect_to_string_io(self): f = io.StringIO() msg = "Consider an API like help(), which prints directly to stdout" - orig_stdout = sys.stdout - with redirect_stdout(f): - print(msg) - self.assertIs(sys.stdout, orig_stdout) + orig_stdout = getattr(sys, self.orig_stream) + with self.redirect_stream(f): + print(msg, file=getattr(sys, self.orig_stream)) + self.assertIs(getattr(sys, self.orig_stream), orig_stdout) s = f.getvalue().strip() self.assertEqual(s, msg) def test_enter_result_is_target(self): f = io.StringIO() - with redirect_stdout(f) as enter_result: + with self.redirect_stream(f) as enter_result: self.assertIs(enter_result, f) def test_cm_is_reusable(self): f = io.StringIO() - write_to_f = redirect_stdout(f) - orig_stdout = sys.stdout + write_to_f = self.redirect_stream(f) + orig_stdout = getattr(sys, self.orig_stream) with write_to_f: - print("Hello", end=" ") + print("Hello", end=" ", file=getattr(sys, self.orig_stream)) with write_to_f: - print("World!") - self.assertIs(sys.stdout, orig_stdout) + print("World!", file=getattr(sys, self.orig_stream)) + self.assertIs(getattr(sys, self.orig_stream), orig_stdout) s = f.getvalue() self.assertEqual(s, "Hello World!\n") def test_cm_is_reentrant(self): f = io.StringIO() - write_to_f = redirect_stdout(f) - orig_stdout = sys.stdout + write_to_f = self.redirect_stream(f) + orig_stdout = getattr(sys, self.orig_stream) with write_to_f: - print("Hello", end=" ") + print("Hello", end=" ", file=getattr(sys, self.orig_stream)) with write_to_f: - print("World!") - self.assertIs(sys.stdout, orig_stdout) + print("World!", file=getattr(sys, self.orig_stream)) + self.assertIs(getattr(sys, self.orig_stream), orig_stdout) s = f.getvalue() self.assertEqual(s, "Hello World!\n") +class TestRedirectStdout(TestRedirectStream, unittest.TestCase): + + redirect_stream = redirect_stdout + orig_stream = "stdout" + + +class TestRedirectStderr(TestRedirectStream, unittest.TestCase): + + redirect_stream = redirect_stderr + orig_stream = "stderr" + + class TestSuppress(unittest.TestCase): @support.requires_docstrings |
