summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@haypocalc.com>2011-06-13 14:19:06 (GMT)
committerVictor Stinner <victor.stinner@haypocalc.com>2011-06-13 14:19:06 (GMT)
commit415007e30dceced68b2c7d3a19143f0a3737fed7 (patch)
treee9f150d288979782c6c8506ed51b5d536ec34364 /Lib
parent8d233f2cb0e914ec553dafb86b66533681a90040 (diff)
downloadcpython-415007e30dceced68b2c7d3a19143f0a3737fed7.zip
cpython-415007e30dceced68b2c7d3a19143f0a3737fed7.tar.gz
cpython-415007e30dceced68b2c7d3a19143f0a3737fed7.tar.bz2
Issue #12316: Fix sigwait() test using threads
Spawn a new process instead of using fork(). Patch written by Charles-François Natali.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_signal.py70
1 files changed, 41 insertions, 29 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 35887ef..effdbef 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -9,6 +9,7 @@ import struct
import subprocess
import traceback
import sys, os, time, errno
+from test.script_helper import assert_python_ok
try:
import threading
except ImportError:
@@ -598,9 +599,23 @@ class PendingSignalsTests(unittest.TestCase):
with self.assertRaises(ZeroDivisionError):
signal.pthread_kill(current, signum)
+ @unittest.skipUnless(hasattr(signal, 'sigwait'),
+ 'need signal.sigwait()')
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
- def check_sigwait(self, test, signum):
+ @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
+ def test_sigwait(self):
+ def test(signum):
+ signal.alarm(1)
+ received = signal.sigwait([signum])
+ if received != signum:
+ print("sigwait() received %s, not %s"
+ % (received, signum),
+ file=sys.stderr)
+ os._exit(1)
+
+ signum = signal.SIGALRM
+
# sigwait must be called with the signal blocked: since the current
# process might have several threads running, we fork() a child process
# to have a single thread.
@@ -627,46 +642,43 @@ class PendingSignalsTests(unittest.TestCase):
else:
os._exit(0)
else:
- # parent: let the child some time to wait, send him the signal, and
- # check it correcty received it
+ # parent: check that the child correcty received the signal
self.assertEqual(os.waitpid(pid, 0), (pid, 0))
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')
- @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
- def test_sigwait(self):
- def test(signum):
- signal.alarm(1)
- received = signal.sigwait([signum])
- if received != signum:
- print("sigwait() received %s, not %s"
- % (received, signum),
- file=sys.stderr)
- os._exit(1)
-
- self.check_sigwait(test, signal.SIGALRM)
-
- @unittest.skipUnless(hasattr(signal, 'sigwait'),
- 'need signal.sigwait()')
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
@unittest.skipIf(threading is None, "test needs threading module")
- @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
def test_sigwait_thread(self):
- def kill_later(signum):
- # wait until the main thread is waiting in sigwait()
- time.sleep(1)
- os.kill(os.getpid(), signum)
-
- def test(signum):
- killer = threading.Thread(target=kill_later, args=(signum,))
+ # Check that calling sigwait() from a thread doesn't suspend the whole
+ # process. A new interpreter is spawned to avoid problems when mixing
+ # threads and fork(): only async-safe functions are allowed between
+ # fork() and exec().
+ assert_python_ok("-c", """if True:
+ import os, threading, sys, time, signal
+
+ # the default handler terminates the process
+ signum = signal.SIGUSR1
+
+ def kill_later():
+ # wait until the main thread is waiting in sigwait()
+ time.sleep(1)
+ os.kill(os.getpid(), signum)
+
+ # the signal must be blocked by all the threads
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ killer = threading.Thread(target=kill_later)
killer.start()
received = signal.sigwait([signum])
if received != signum:
print("sigwait() received %s, not %s" % (received, signum),
file=sys.stderr)
- os._exit(1)
+ sys.exit(1)
killer.join()
-
- self.check_sigwait(test, signal.SIGUSR1)
+ # unblock the signal, which should have been cleared by sigwait()
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ """)
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')