diff options
author | Nikita Sobolev <mail@sobolevn.me> | 2022-02-18 12:56:23 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-18 12:56:23 (GMT) |
commit | e2c28616ce6c3cdb1013c415125220a0b86b86a1 (patch) | |
tree | 4807fb6355148a12a09fc2ad8b305de9fe6543b3 | |
parent | c3ce7781e3afe6f2dec5eef8e87fd5a664519ae9 (diff) | |
download | cpython-e2c28616ce6c3cdb1013c415125220a0b86b86a1.zip cpython-e2c28616ce6c3cdb1013c415125220a0b86b86a1.tar.gz cpython-e2c28616ce6c3cdb1013c415125220a0b86b86a1.tar.bz2 |
bpo-46709: check eval breaker in specialized `CALL` opcodes (GH-31404)
-rw-r--r-- | Lib/unittest/test/test_break.py | 144 | ||||
-rw-r--r-- | Python/ceval.c | 9 |
2 files changed, 94 insertions, 59 deletions
diff --git a/Lib/unittest/test/test_break.py b/Lib/unittest/test/test_break.py index eebd2b6..33cbdd2 100644 --- a/Lib/unittest/test/test_break.py +++ b/Lib/unittest/test/test_break.py @@ -4,14 +4,18 @@ import os import sys import signal import weakref - import unittest +from test import support + @unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") @unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") class TestBreak(unittest.TestCase): int_handler = None + # This number was smart-guessed, previously tests were failing + # after 7th run. So, we take `x * 2 + 1` to be sure. + default_repeats = 15 def setUp(self): self._default_handler = signal.getsignal(signal.SIGINT) @@ -24,6 +28,27 @@ class TestBreak(unittest.TestCase): unittest.signals._interrupt_handler = None + def withRepeats(self, test_function, repeats=None): + if not support.check_impl_detail(cpython=True): + # Override repeats count on non-cpython to execute only once. + # Because this test only makes sense to be repeated on CPython. + repeats = 1 + elif repeats is None: + repeats = self.default_repeats + + for repeat in range(repeats): + with self.subTest(repeat=repeat): + # We don't run `setUp` for the very first repeat + # and we don't run `tearDown` for the very last one, + # because they are handled by the test class itself. + if repeat != 0: + self.setUp() + try: + test_function() + finally: + if repeat != repeats - 1: + self.tearDown() + def testInstallHandler(self): default_handler = signal.getsignal(signal.SIGINT) unittest.installHandler() @@ -48,35 +73,34 @@ class TestBreak(unittest.TestCase): unittest.removeResult(result) def testInterruptCaught(self): - default_handler = signal.getsignal(signal.SIGINT) - - result = unittest.TestResult() - unittest.installHandler() - unittest.registerResult(result) - - self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) - def test(result): pid = os.getpid() os.kill(pid, signal.SIGINT) result.breakCaught = True self.assertTrue(result.shouldStop) - try: - test(result) - except KeyboardInterrupt: - self.fail("KeyboardInterrupt not handled") - self.assertTrue(result.breakCaught) + def test_function(): + result = unittest.TestResult() + unittest.installHandler() + unittest.registerResult(result) + self.assertNotEqual( + signal.getsignal(signal.SIGINT), + self._default_handler, + ) + + try: + test(result) + except KeyboardInterrupt: + self.fail("KeyboardInterrupt not handled") + self.assertTrue(result.breakCaught) + self.withRepeats(test_function) def testSecondInterrupt(self): # Can't use skipIf decorator because the signal handler may have # been changed after defining this method. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: self.skipTest("test requires SIGINT to not be ignored") - result = unittest.TestResult() - unittest.installHandler() - unittest.registerResult(result) def test(result): pid = os.getpid() @@ -86,40 +110,40 @@ class TestBreak(unittest.TestCase): os.kill(pid, signal.SIGINT) self.fail("Second KeyboardInterrupt not raised") - try: - test(result) - except KeyboardInterrupt: - pass - else: - self.fail("Second KeyboardInterrupt not raised") - self.assertTrue(result.breakCaught) + def test_function(): + result = unittest.TestResult() + unittest.installHandler() + unittest.registerResult(result) + with self.assertRaises(KeyboardInterrupt): + test(result) + self.assertTrue(result.breakCaught) + self.withRepeats(test_function) - def testTwoResults(self): - unittest.installHandler() - result = unittest.TestResult() - unittest.registerResult(result) - new_handler = signal.getsignal(signal.SIGINT) + def testTwoResults(self): + def test_function(): + unittest.installHandler() - result2 = unittest.TestResult() - unittest.registerResult(result2) - self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) + result = unittest.TestResult() + unittest.registerResult(result) + new_handler = signal.getsignal(signal.SIGINT) - result3 = unittest.TestResult() + result2 = unittest.TestResult() + unittest.registerResult(result2) + self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) - def test(result): - pid = os.getpid() - os.kill(pid, signal.SIGINT) + result3 = unittest.TestResult() - try: - test(result) - except KeyboardInterrupt: - self.fail("KeyboardInterrupt not handled") + try: + os.kill(os.getpid(), signal.SIGINT) + except KeyboardInterrupt: + self.fail("KeyboardInterrupt not handled") - self.assertTrue(result.shouldStop) - self.assertTrue(result2.shouldStop) - self.assertFalse(result3.shouldStop) + self.assertTrue(result.shouldStop) + self.assertTrue(result2.shouldStop) + self.assertFalse(result3.shouldStop) + self.withRepeats(test_function) def testHandlerReplacedButCalled(self): @@ -127,23 +151,25 @@ class TestBreak(unittest.TestCase): # been changed after defining this method. if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: self.skipTest("test requires SIGINT to not be ignored") - # If our handler has been replaced (is no longer installed) but is - # called by the *new* handler, then it isn't safe to delay the - # SIGINT and we should immediately delegate to the default handler - unittest.installHandler() - - handler = signal.getsignal(signal.SIGINT) - def new_handler(frame, signum): - handler(frame, signum) - signal.signal(signal.SIGINT, new_handler) - try: - pid = os.getpid() - os.kill(pid, signal.SIGINT) - except KeyboardInterrupt: - pass - else: - self.fail("replaced but delegated handler doesn't raise interrupt") + def test_function(): + # If our handler has been replaced (is no longer installed) but is + # called by the *new* handler, then it isn't safe to delay the + # SIGINT and we should immediately delegate to the default handler + unittest.installHandler() + + handler = signal.getsignal(signal.SIGINT) + def new_handler(frame, signum): + handler(frame, signum) + signal.signal(signal.SIGINT, new_handler) + + try: + os.kill(os.getpid(), signal.SIGINT) + except KeyboardInterrupt: + pass + else: + self.fail("replaced but delegated handler doesn't raise interrupt") + self.withRepeats(test_function) def testRunner(self): # Creating a TextTestRunner with the appropriate argument should diff --git a/Python/ceval.c b/Python/ceval.c index b900de5..5a6de5b 100644 --- a/Python/ceval.c +++ b/Python/ceval.c @@ -4742,6 +4742,7 @@ handle_eval_breaker: if (res == NULL) { goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } @@ -4761,6 +4762,7 @@ handle_eval_breaker: if (res == NULL) { goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } @@ -4785,6 +4787,7 @@ handle_eval_breaker: if (res == NULL) { goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } @@ -4816,6 +4819,7 @@ handle_eval_breaker: if (res == NULL) { goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } @@ -4854,6 +4858,7 @@ handle_eval_breaker: */ goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } @@ -4896,6 +4901,7 @@ handle_eval_breaker: if (res == NULL) { goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } @@ -5013,6 +5019,7 @@ handle_eval_breaker: if (res == NULL) { goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } @@ -5040,6 +5047,7 @@ handle_eval_breaker: if (res == NULL) { goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } @@ -5067,6 +5075,7 @@ handle_eval_breaker: if (res == NULL) { goto error; } + CHECK_EVAL_BREAKER(); DISPATCH(); } |