diff options
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r-- | Lib/test/test_signal.py | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 22715cf..fc7725a 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,4 +1,5 @@ import os +import random import signal import socket import subprocess @@ -941,6 +942,101 @@ class PendingSignalsTests(unittest.TestCase): (exitcode, stdout)) +class StressTest(unittest.TestCase): + """ + Stress signal delivery, especially when a signal arrives in + the middle of recomputing the signal state or executing + previously tripped signal handlers. + """ + + @unittest.skipUnless(hasattr(signal, "setitimer"), + "test needs setitimer()") + def test_stress_delivery_dependent(self): + """ + This test uses dependent signal handlers. + """ + N = 10000 + sigs = [] + + def first_handler(signum, frame): + # 1e-6 is the minimum non-zero value for `setitimer()`. + # Choose a random delay so as to improve chances of + # triggering a race condition. Ideally the signal is received + # when inside critical signal-handling routines such as + # Py_MakePendingCalls(). + signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) + + def second_handler(signum=None, frame=None): + sigs.append(signum) + + def setsig(signum, handler): + old_handler = signal.signal(signum, handler) + self.addCleanup(signal.signal, signum, old_handler) + + # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both + # ascending and descending sequences (SIGUSR1 then SIGALRM, + # SIGPROF then SIGALRM), we maximize chances of hitting a bug. + setsig(signal.SIGPROF, first_handler) + setsig(signal.SIGUSR1, first_handler) + setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL + + expected_sigs = 0 + deadline = time.time() + 15.0 + + while expected_sigs < N: + os.kill(os.getpid(), signal.SIGPROF) + expected_sigs += 1 + # Wait for handlers to run to avoid signal coalescing + while len(sigs) < expected_sigs and time.time() < deadline: + time.sleep(1e-5) + + os.kill(os.getpid(), signal.SIGUSR1) + expected_sigs += 1 + while len(sigs) < expected_sigs and time.time() < deadline: + time.sleep(1e-5) + + # All ITIMER_REAL signals should have been delivered to the + # Python handler + self.assertEqual(len(sigs), N, "Some signals were lost") + + @unittest.skipUnless(hasattr(signal, "setitimer"), + "test needs setitimer()") + def test_stress_delivery_simultaneous(self): + """ + This test uses simultaneous signal handlers. + """ + N = 10000 + sigs = [] + + def handler(signum, frame): + sigs.append(signum) + + def setsig(signum, handler): + old_handler = signal.signal(signum, handler) + self.addCleanup(signal.signal, signum, old_handler) + + setsig(signal.SIGUSR1, handler) + setsig(signal.SIGALRM, handler) # for ITIMER_REAL + + expected_sigs = 0 + deadline = time.time() + 15.0 + + while expected_sigs < N: + # Hopefully the SIGALRM will be received somewhere during + # initial processing of SIGUSR1. + signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) + os.kill(os.getpid(), signal.SIGUSR1) + + expected_sigs += 2 + # Wait for handlers to run to avoid signal coalescing + while len(sigs) < expected_sigs and time.time() < deadline: + time.sleep(1e-5) + + # All ITIMER_REAL signals should have been delivered to the + # Python handler + self.assertEqual(len(sigs), N, "Some signals were lost") + + def tearDownModule(): support.reap_children() |