summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Sobolev <mail@sobolevn.me>2022-02-18 12:56:23 (GMT)
committerGitHub <noreply@github.com>2022-02-18 12:56:23 (GMT)
commite2c28616ce6c3cdb1013c415125220a0b86b86a1 (patch)
tree4807fb6355148a12a09fc2ad8b305de9fe6543b3
parentc3ce7781e3afe6f2dec5eef8e87fd5a664519ae9 (diff)
downloadcpython-e2c28616ce6c3cdb1013c415125220a0b86b86a1.zip
cpython-e2c28616ce6c3cdb1013c415125220a0b86b86a1.tar.gz
cpython-e2c28616ce6c3cdb1013c415125220a0b86b86a1.tar.bz2
bpo-46709: check eval breaker in specialized `CALL` opcodes (GH-31404)
-rw-r--r--Lib/unittest/test/test_break.py144
-rw-r--r--Python/ceval.c9
2 files changed, 94 insertions, 59 deletions
diff --git a/Lib/unittest/test/test_break.py b/Lib/unittest/test/test_break.py
index eebd2b6..33cbdd2 100644
--- a/Lib/unittest/test/test_break.py
+++ b/Lib/unittest/test/test_break.py
@@ -4,14 +4,18 @@ import os
import sys
import signal
import weakref
-
import unittest
+from test import support
+
@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
class TestBreak(unittest.TestCase):
int_handler = None
+ # This number was smart-guessed, previously tests were failing
+ # after 7th run. So, we take `x * 2 + 1` to be sure.
+ default_repeats = 15
def setUp(self):
self._default_handler = signal.getsignal(signal.SIGINT)
@@ -24,6 +28,27 @@ class TestBreak(unittest.TestCase):
unittest.signals._interrupt_handler = None
+ def withRepeats(self, test_function, repeats=None):
+ if not support.check_impl_detail(cpython=True):
+ # Override repeats count on non-cpython to execute only once.
+ # Because this test only makes sense to be repeated on CPython.
+ repeats = 1
+ elif repeats is None:
+ repeats = self.default_repeats
+
+ for repeat in range(repeats):
+ with self.subTest(repeat=repeat):
+ # We don't run `setUp` for the very first repeat
+ # and we don't run `tearDown` for the very last one,
+ # because they are handled by the test class itself.
+ if repeat != 0:
+ self.setUp()
+ try:
+ test_function()
+ finally:
+ if repeat != repeats - 1:
+ self.tearDown()
+
def testInstallHandler(self):
default_handler = signal.getsignal(signal.SIGINT)
unittest.installHandler()
@@ -48,35 +73,34 @@ class TestBreak(unittest.TestCase):
unittest.removeResult(result)
def testInterruptCaught(self):
- default_handler = signal.getsignal(signal.SIGINT)
-
- result = unittest.TestResult()
- unittest.installHandler()
- unittest.registerResult(result)
-
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
-
def test(result):
pid = os.getpid()
os.kill(pid, signal.SIGINT)
result.breakCaught = True
self.assertTrue(result.shouldStop)
- try:
- test(result)
- except KeyboardInterrupt:
- self.fail("KeyboardInterrupt not handled")
- self.assertTrue(result.breakCaught)
+ def test_function():
+ result = unittest.TestResult()
+ unittest.installHandler()
+ unittest.registerResult(result)
+ self.assertNotEqual(
+ signal.getsignal(signal.SIGINT),
+ self._default_handler,
+ )
+
+ try:
+ test(result)
+ except KeyboardInterrupt:
+ self.fail("KeyboardInterrupt not handled")
+ self.assertTrue(result.breakCaught)
+ self.withRepeats(test_function)
def testSecondInterrupt(self):
# Can't use skipIf decorator because the signal handler may have
# been changed after defining this method.
if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
self.skipTest("test requires SIGINT to not be ignored")
- result = unittest.TestResult()
- unittest.installHandler()
- unittest.registerResult(result)
def test(result):
pid = os.getpid()
@@ -86,40 +110,40 @@ class TestBreak(unittest.TestCase):
os.kill(pid, signal.SIGINT)
self.fail("Second KeyboardInterrupt not raised")
- try:
- test(result)
- except KeyboardInterrupt:
- pass
- else:
- self.fail("Second KeyboardInterrupt not raised")
- self.assertTrue(result.breakCaught)
+ def test_function():
+ result = unittest.TestResult()
+ unittest.installHandler()
+ unittest.registerResult(result)
+ with self.assertRaises(KeyboardInterrupt):
+ test(result)
+ self.assertTrue(result.breakCaught)
+ self.withRepeats(test_function)
- def testTwoResults(self):
- unittest.installHandler()
- result = unittest.TestResult()
- unittest.registerResult(result)
- new_handler = signal.getsignal(signal.SIGINT)
+ def testTwoResults(self):
+ def test_function():
+ unittest.installHandler()
- result2 = unittest.TestResult()
- unittest.registerResult(result2)
- self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
+ result = unittest.TestResult()
+ unittest.registerResult(result)
+ new_handler = signal.getsignal(signal.SIGINT)
- result3 = unittest.TestResult()
+ result2 = unittest.TestResult()
+ unittest.registerResult(result2)
+ self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
- def test(result):
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
+ result3 = unittest.TestResult()
- try:
- test(result)
- except KeyboardInterrupt:
- self.fail("KeyboardInterrupt not handled")
+ try:
+ os.kill(os.getpid(), signal.SIGINT)
+ except KeyboardInterrupt:
+ self.fail("KeyboardInterrupt not handled")
- self.assertTrue(result.shouldStop)
- self.assertTrue(result2.shouldStop)
- self.assertFalse(result3.shouldStop)
+ self.assertTrue(result.shouldStop)
+ self.assertTrue(result2.shouldStop)
+ self.assertFalse(result3.shouldStop)
+ self.withRepeats(test_function)
def testHandlerReplacedButCalled(self):
@@ -127,23 +151,25 @@ class TestBreak(unittest.TestCase):
# been changed after defining this method.
if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
self.skipTest("test requires SIGINT to not be ignored")
- # If our handler has been replaced (is no longer installed) but is
- # called by the *new* handler, then it isn't safe to delay the
- # SIGINT and we should immediately delegate to the default handler
- unittest.installHandler()
-
- handler = signal.getsignal(signal.SIGINT)
- def new_handler(frame, signum):
- handler(frame, signum)
- signal.signal(signal.SIGINT, new_handler)
- try:
- pid = os.getpid()
- os.kill(pid, signal.SIGINT)
- except KeyboardInterrupt:
- pass
- else:
- self.fail("replaced but delegated handler doesn't raise interrupt")
+ def test_function():
+ # If our handler has been replaced (is no longer installed) but is
+ # called by the *new* handler, then it isn't safe to delay the
+ # SIGINT and we should immediately delegate to the default handler
+ unittest.installHandler()
+
+ handler = signal.getsignal(signal.SIGINT)
+ def new_handler(frame, signum):
+ handler(frame, signum)
+ signal.signal(signal.SIGINT, new_handler)
+
+ try:
+ os.kill(os.getpid(), signal.SIGINT)
+ except KeyboardInterrupt:
+ pass
+ else:
+ self.fail("replaced but delegated handler doesn't raise interrupt")
+ self.withRepeats(test_function)
def testRunner(self):
# Creating a TextTestRunner with the appropriate argument should
diff --git a/Python/ceval.c b/Python/ceval.c
index b900de5..5a6de5b 100644
--- a/Python/ceval.c
+++ b/Python/ceval.c
@@ -4742,6 +4742,7 @@ handle_eval_breaker:
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4761,6 +4762,7 @@ handle_eval_breaker:
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4785,6 +4787,7 @@ handle_eval_breaker:
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4816,6 +4819,7 @@ handle_eval_breaker:
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4854,6 +4858,7 @@ handle_eval_breaker:
*/
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -4896,6 +4901,7 @@ handle_eval_breaker:
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -5013,6 +5019,7 @@ handle_eval_breaker:
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -5040,6 +5047,7 @@ handle_eval_breaker:
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}
@@ -5067,6 +5075,7 @@ handle_eval_breaker:
if (res == NULL) {
goto error;
}
+ CHECK_EVAL_BREAKER();
DISPATCH();
}