diff options
author | Victor Stinner <victor.stinner@haypocalc.com> | 2011-06-13 14:19:06 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@haypocalc.com> | 2011-06-13 14:19:06 (GMT) |
commit | 415007e30dceced68b2c7d3a19143f0a3737fed7 (patch) | |
tree | e9f150d288979782c6c8506ed51b5d536ec34364 /Lib/test | |
parent | 8d233f2cb0e914ec553dafb86b66533681a90040 (diff) | |
download | cpython-415007e30dceced68b2c7d3a19143f0a3737fed7.zip cpython-415007e30dceced68b2c7d3a19143f0a3737fed7.tar.gz cpython-415007e30dceced68b2c7d3a19143f0a3737fed7.tar.bz2 |
Issue #12316: Fix sigwait() test using threads
Spawn a new process instead of using fork(). Patch written by Charles-François
Natali.
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_signal.py | 70 |
1 files changed, 41 insertions, 29 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 35887ef..effdbef 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -9,6 +9,7 @@ import struct import subprocess import traceback import sys, os, time, errno +from test.script_helper import assert_python_ok try: import threading except ImportError: @@ -598,9 +599,23 @@ class PendingSignalsTests(unittest.TestCase): with self.assertRaises(ZeroDivisionError): signal.pthread_kill(current, signum) + @unittest.skipUnless(hasattr(signal, 'sigwait'), + 'need signal.sigwait()') @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 'need signal.pthread_sigmask()') - def check_sigwait(self, test, signum): + @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') + def test_sigwait(self): + def test(signum): + signal.alarm(1) + received = signal.sigwait([signum]) + if received != signum: + print("sigwait() received %s, not %s" + % (received, signum), + file=sys.stderr) + os._exit(1) + + signum = signal.SIGALRM + # sigwait must be called with the signal blocked: since the current # process might have several threads running, we fork() a child process # to have a single thread. @@ -627,46 +642,43 @@ class PendingSignalsTests(unittest.TestCase): else: os._exit(0) else: - # parent: let the child some time to wait, send him the signal, and - # check it correcty received it + # parent: check that the child correcty received the signal self.assertEqual(os.waitpid(pid, 0), (pid, 0)) @unittest.skipUnless(hasattr(signal, 'sigwait'), 'need signal.sigwait()') - @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') - def test_sigwait(self): - def test(signum): - signal.alarm(1) - received = signal.sigwait([signum]) - if received != signum: - print("sigwait() received %s, not %s" - % (received, signum), - file=sys.stderr) - os._exit(1) - - self.check_sigwait(test, signal.SIGALRM) - - @unittest.skipUnless(hasattr(signal, 'sigwait'), - 'need signal.sigwait()') + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') @unittest.skipIf(threading is None, "test needs threading module") - @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') def test_sigwait_thread(self): - def kill_later(signum): - # wait until the main thread is waiting in sigwait() - time.sleep(1) - os.kill(os.getpid(), signum) - - def test(signum): - killer = threading.Thread(target=kill_later, args=(signum,)) + # Check that calling sigwait() from a thread doesn't suspend the whole + # process. A new interpreter is spawned to avoid problems when mixing + # threads and fork(): only async-safe functions are allowed between + # fork() and exec(). + assert_python_ok("-c", """if True: + import os, threading, sys, time, signal + + # the default handler terminates the process + signum = signal.SIGUSR1 + + def kill_later(): + # wait until the main thread is waiting in sigwait() + time.sleep(1) + os.kill(os.getpid(), signum) + + # the signal must be blocked by all the threads + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + killer = threading.Thread(target=kill_later) killer.start() received = signal.sigwait([signum]) if received != signum: print("sigwait() received %s, not %s" % (received, signum), file=sys.stderr) - os._exit(1) + sys.exit(1) killer.join() - - self.check_sigwait(test, signal.SIGUSR1) + # unblock the signal, which should have been cleared by sigwait() + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + """) @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 'need signal.pthread_sigmask()') |