diff options
author | Victor Stinner <victor.stinner@haypocalc.com> | 2011-06-10 10:48:13 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@haypocalc.com> | 2011-06-10 10:48:13 (GMT) |
commit | af4946020e92556052782fcc95075dd8d694dbb5 (patch) | |
tree | 2e9d61f2e5c5bbc863f4c943aa433bd3f64f798a /Lib/test | |
parent | b0ae53d8a09731a51be48f9e84a71d09d0f90657 (diff) | |
download | cpython-af4946020e92556052782fcc95075dd8d694dbb5.zip cpython-af4946020e92556052782fcc95075dd8d694dbb5.tar.gz cpython-af4946020e92556052782fcc95075dd8d694dbb5.tar.bz2 |
Issue #8407: Make signal.sigwait() tests more reliable
Block the signal before calling sigwait(). Use os.fork() to ensure that we have
only one thread.
Initial patch written by Charles-François Natali.
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_signal.py | 67 |
1 files changed, 53 insertions, 14 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 1ed4b9e..ea40627 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -598,34 +598,73 @@ class PendingSignalsTests(unittest.TestCase): with self.assertRaises(ZeroDivisionError): signal.pthread_kill(current, signum) + def check_sigwait(self, test, signum): + # sigwait must be called with the signal blocked: since the current + # process might have several threads running, we fork() a child process + # to have a single thread. + pid = os.fork() + if pid == 0: + # child: block and wait the signal + try: + signal.signal(signum, self.handler) + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + + # Do the tests + test(signum) + + # The handler must not be called on unblock + try: + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + except ZeroDivisionError: + print("the signal handler has been called", + file=sys.stderr) + os._exit(1) + + os._exit(0) + finally: + os._exit(1) + else: + # parent: let the child some time to wait, send him the signal, and + # check it correcty received it + self.assertEqual(os.waitpid(pid, 0), (pid, 0)) + @unittest.skipUnless(hasattr(signal, 'sigwait'), 'need signal.sigwait()') + @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') def test_sigwait(self): - old_handler = signal.signal(signal.SIGALRM, self.handler) - self.addCleanup(signal.signal, signal.SIGALRM, old_handler) + def test(signum): + signal.alarm(1) + received = signal.sigwait([signum]) + if received != signum: + print("sigwait() received %s, not %s" + % (received, signum), + file=sys.stderr) + os._exit(1) - signal.alarm(1) - self.assertEqual(signal.sigwait([signal.SIGALRM]), signal.SIGALRM) + self.check_sigwait(test, signal.SIGALRM) @unittest.skipUnless(hasattr(signal, 'sigwait'), 'need signal.sigwait()') @unittest.skipIf(threading is None, "test needs threading module") + @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') def test_sigwait_thread(self): - signum = signal.SIGUSR1 - old_handler = signal.signal(signum, self.handler) - self.addCleanup(signal.signal, signum, old_handler) - - def kill_later(): + def kill_later(signum): + # wait until the main thread is waiting in sigwait() time.sleep(1) os.kill(os.getpid(), signum) - killer = threading.Thread(target=kill_later) - killer.start() - try: - self.assertEqual(signal.sigwait([signum]), signum) - finally: + def test(signum): + killer = threading.Thread(target=kill_later, args=(signum,)) + killer.start() + received = signal.sigwait([signum]) + if received != signum: + print("sigwait() received %s, not %s" % (received, signum), + file=sys.stderr) + os._exit(1) killer.join() + self.check_sigwait(test, signal.SIGUSR1) + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 'need signal.pthread_sigmask()') def test_pthread_sigmask_arguments(self): |