summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_unittest/test_break.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_unittest/test_break.py')
-rw-r--r--Lib/test/test_unittest/test_break.py306
1 files changed, 306 insertions, 0 deletions
diff --git a/Lib/test/test_unittest/test_break.py b/Lib/test/test_unittest/test_break.py
new file mode 100644
index 0000000..33cbdd2
--- /dev/null
+++ b/Lib/test/test_unittest/test_break.py
@@ -0,0 +1,306 @@
+import gc
+import io
+import os
+import sys
+import signal
+import weakref
+import unittest
+
+from test import support
+
+
+@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
+@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
+class TestBreak(unittest.TestCase):
+ int_handler = None
+ # This number was smart-guessed, previously tests were failing
+ # after 7th run. So, we take `x * 2 + 1` to be sure.
+ default_repeats = 15
+
+ def setUp(self):
+ self._default_handler = signal.getsignal(signal.SIGINT)
+ if self.int_handler is not None:
+ signal.signal(signal.SIGINT, self.int_handler)
+
+ def tearDown(self):
+ signal.signal(signal.SIGINT, self._default_handler)
+ unittest.signals._results = weakref.WeakKeyDictionary()
+ unittest.signals._interrupt_handler = None
+
+
+ def withRepeats(self, test_function, repeats=None):
+ if not support.check_impl_detail(cpython=True):
+ # Override repeats count on non-cpython to execute only once.
+ # Because this test only makes sense to be repeated on CPython.
+ repeats = 1
+ elif repeats is None:
+ repeats = self.default_repeats
+
+ for repeat in range(repeats):
+ with self.subTest(repeat=repeat):
+ # We don't run `setUp` for the very first repeat
+ # and we don't run `tearDown` for the very last one,
+ # because they are handled by the test class itself.
+ if repeat != 0:
+ self.setUp()
+ try:
+ test_function()
+ finally:
+ if repeat != repeats - 1:
+ self.tearDown()
+
+ def testInstallHandler(self):
+ default_handler = signal.getsignal(signal.SIGINT)
+ unittest.installHandler()
+ self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+ try:
+ pid = os.getpid()
+ os.kill(pid, signal.SIGINT)
+ except KeyboardInterrupt:
+ self.fail("KeyboardInterrupt not handled")
+
+ self.assertTrue(unittest.signals._interrupt_handler.called)
+
+ def testRegisterResult(self):
+ result = unittest.TestResult()
+ self.assertNotIn(result, unittest.signals._results)
+
+ unittest.registerResult(result)
+ try:
+ self.assertIn(result, unittest.signals._results)
+ finally:
+ unittest.removeResult(result)
+
+ def testInterruptCaught(self):
+ def test(result):
+ pid = os.getpid()
+ os.kill(pid, signal.SIGINT)
+ result.breakCaught = True
+ self.assertTrue(result.shouldStop)
+
+ def test_function():
+ result = unittest.TestResult()
+ unittest.installHandler()
+ unittest.registerResult(result)
+
+ self.assertNotEqual(
+ signal.getsignal(signal.SIGINT),
+ self._default_handler,
+ )
+
+ try:
+ test(result)
+ except KeyboardInterrupt:
+ self.fail("KeyboardInterrupt not handled")
+ self.assertTrue(result.breakCaught)
+ self.withRepeats(test_function)
+
+ def testSecondInterrupt(self):
+ # Can't use skipIf decorator because the signal handler may have
+ # been changed after defining this method.
+ if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
+ self.skipTest("test requires SIGINT to not be ignored")
+
+ def test(result):
+ pid = os.getpid()
+ os.kill(pid, signal.SIGINT)
+ result.breakCaught = True
+ self.assertTrue(result.shouldStop)
+ os.kill(pid, signal.SIGINT)
+ self.fail("Second KeyboardInterrupt not raised")
+
+ def test_function():
+ result = unittest.TestResult()
+ unittest.installHandler()
+ unittest.registerResult(result)
+
+ with self.assertRaises(KeyboardInterrupt):
+ test(result)
+ self.assertTrue(result.breakCaught)
+ self.withRepeats(test_function)
+
+
+ def testTwoResults(self):
+ def test_function():
+ unittest.installHandler()
+
+ result = unittest.TestResult()
+ unittest.registerResult(result)
+ new_handler = signal.getsignal(signal.SIGINT)
+
+ result2 = unittest.TestResult()
+ unittest.registerResult(result2)
+ self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
+
+ result3 = unittest.TestResult()
+
+ try:
+ os.kill(os.getpid(), signal.SIGINT)
+ except KeyboardInterrupt:
+ self.fail("KeyboardInterrupt not handled")
+
+ self.assertTrue(result.shouldStop)
+ self.assertTrue(result2.shouldStop)
+ self.assertFalse(result3.shouldStop)
+ self.withRepeats(test_function)
+
+
+ def testHandlerReplacedButCalled(self):
+ # Can't use skipIf decorator because the signal handler may have
+ # been changed after defining this method.
+ if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
+ self.skipTest("test requires SIGINT to not be ignored")
+
+ def test_function():
+ # If our handler has been replaced (is no longer installed) but is
+ # called by the *new* handler, then it isn't safe to delay the
+ # SIGINT and we should immediately delegate to the default handler
+ unittest.installHandler()
+
+ handler = signal.getsignal(signal.SIGINT)
+ def new_handler(frame, signum):
+ handler(frame, signum)
+ signal.signal(signal.SIGINT, new_handler)
+
+ try:
+ os.kill(os.getpid(), signal.SIGINT)
+ except KeyboardInterrupt:
+ pass
+ else:
+ self.fail("replaced but delegated handler doesn't raise interrupt")
+ self.withRepeats(test_function)
+
+ def testRunner(self):
+ # Creating a TextTestRunner with the appropriate argument should
+ # register the TextTestResult it creates
+ runner = unittest.TextTestRunner(stream=io.StringIO())
+
+ result = runner.run(unittest.TestSuite())
+ self.assertIn(result, unittest.signals._results)
+
+ def testWeakReferences(self):
+ # Calling registerResult on a result should not keep it alive
+ result = unittest.TestResult()
+ unittest.registerResult(result)
+
+ ref = weakref.ref(result)
+ del result
+
+ # For non-reference counting implementations
+ gc.collect();gc.collect()
+ self.assertIsNone(ref())
+
+
+ def testRemoveResult(self):
+ result = unittest.TestResult()
+ unittest.registerResult(result)
+
+ unittest.installHandler()
+ self.assertTrue(unittest.removeResult(result))
+
+ # Should this raise an error instead?
+ self.assertFalse(unittest.removeResult(unittest.TestResult()))
+
+ try:
+ pid = os.getpid()
+ os.kill(pid, signal.SIGINT)
+ except KeyboardInterrupt:
+ pass
+
+ self.assertFalse(result.shouldStop)
+
+ def testMainInstallsHandler(self):
+ failfast = object()
+ test = object()
+ verbosity = object()
+ result = object()
+ default_handler = signal.getsignal(signal.SIGINT)
+
+ class FakeRunner(object):
+ initArgs = []
+ runArgs = []
+ def __init__(self, *args, **kwargs):
+ self.initArgs.append((args, kwargs))
+ def run(self, test):
+ self.runArgs.append(test)
+ return result
+
+ class Program(unittest.TestProgram):
+ def __init__(self, catchbreak):
+ self.exit = False
+ self.verbosity = verbosity
+ self.failfast = failfast
+ self.catchbreak = catchbreak
+ self.tb_locals = False
+ self.testRunner = FakeRunner
+ self.test = test
+ self.result = None
+
+ p = Program(False)
+ p.runTests()
+
+ self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
+ 'verbosity': verbosity,
+ 'failfast': failfast,
+ 'tb_locals': False,
+ 'warnings': None})])
+ self.assertEqual(FakeRunner.runArgs, [test])
+ self.assertEqual(p.result, result)
+
+ self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+ FakeRunner.initArgs = []
+ FakeRunner.runArgs = []
+ p = Program(True)
+ p.runTests()
+
+ self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
+ 'verbosity': verbosity,
+ 'failfast': failfast,
+ 'tb_locals': False,
+ 'warnings': None})])
+ self.assertEqual(FakeRunner.runArgs, [test])
+ self.assertEqual(p.result, result)
+
+ self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+ def testRemoveHandler(self):
+ default_handler = signal.getsignal(signal.SIGINT)
+ unittest.installHandler()
+ unittest.removeHandler()
+ self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+ # check that calling removeHandler multiple times has no ill-effect
+ unittest.removeHandler()
+ self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+ def testRemoveHandlerAsDecorator(self):
+ default_handler = signal.getsignal(signal.SIGINT)
+ unittest.installHandler()
+
+ @unittest.removeHandler
+ def test():
+ self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+ test()
+ self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
+
+@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
+@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
+class TestBreakDefaultIntHandler(TestBreak):
+ int_handler = signal.default_int_handler
+
+@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
+@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
+class TestBreakSignalIgnored(TestBreak):
+ int_handler = signal.SIG_IGN
+
+@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
+@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
+class TestBreakSignalDefault(TestBreak):
+ int_handler = signal.SIG_DFL
+
+
+if __name__ == "__main__":
+ unittest.main()