summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_signal.py
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@haypocalc.com>2011-07-04 15:49:40 (GMT)
committerVictor Stinner <victor.stinner@haypocalc.com>2011-07-04 15:49:40 (GMT)
commitd554cdf8b91ac104229e4b4e8774aea58c4ae235 (patch)
tree236ef853efadc4917ab4ec6da40c964acb8ea968 /Lib/test/test_signal.py
parentb575289292902bb078dbd128a90da642c27d4ad6 (diff)
parente40b3aabfb10e1f9fd31541369fec491029d5d46 (diff)
downloadcpython-d554cdf8b91ac104229e4b4e8774aea58c4ae235.zip
cpython-d554cdf8b91ac104229e4b4e8774aea58c4ae235.tar.gz
cpython-d554cdf8b91ac104229e4b4e8774aea58c4ae235.tar.bz2
(merge 3.2) Issue #12469: Run wakeup and pending signal tests in a subprocess
to run the test in a fresh process with only one thread and to not change signal handling of the parent process.
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r--Lib/test/test_signal.py494
1 files changed, 256 insertions, 238 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index e5df000..e23b126 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -224,117 +224,115 @@ class WindowsSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class WakeupSignalTests(unittest.TestCase):
- TIMEOUT_FULL = 10
- TIMEOUT_HALF = 5
+ def check_wakeup(self, test_body, *signals):
+ # use a subprocess to have only one thread
+ code = """if 1:
+ import fcntl
+ import os
+ import signal
+ import struct
- def handler(self, signum, frame):
- pass
+ signals = {!r}
- def check_signum(self, *signals):
- data = os.read(self.read, len(signals)+1)
- raised = struct.unpack('%uB' % len(data), data)
- # We don't care of the signal delivery order (it's not portable or
- # reliable)
- raised = set(raised)
- signals = set(signals)
- self.assertEqual(raised, signals)
+ def handler(signum, frame):
+ pass
- def test_wakeup_fd_early(self):
- import select
-
- signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the sleep,
- # before select is called
- time.sleep(self.TIMEOUT_FULL)
- mid_time = time.time()
- self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
- select.select([self.read], [], [], self.TIMEOUT_FULL)
- after_time = time.time()
- self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
- self.check_signum(signal.SIGALRM)
+ def check_signum(signals):
+ data = os.read(read, len(signals)+1)
+ raised = struct.unpack('%uB' % len(data), data)
+ # We don't care of the signal delivery order (it's not portable or
+ # reliable)
+ raised = set(raised)
+ signals = set(signals)
+ assert raised == signals, "%r != %r" % (raised, signals)
- def test_wakeup_fd_during(self):
- import select
+ {}
- signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the select call
- self.assertRaises(select.error, select.select,
- [self.read], [], [], self.TIMEOUT_FULL)
- after_time = time.time()
- self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
- self.check_signum(signal.SIGALRM)
+ signal.signal(signal.SIGALRM, handler)
+ read, write = os.pipe()
+ for fd in (read, write):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+ signal.set_wakeup_fd(write)
- def test_signum(self):
- old_handler = signal.signal(signal.SIGUSR1, self.handler)
- self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
- os.kill(os.getpid(), signal.SIGUSR1)
- os.kill(os.getpid(), signal.SIGALRM)
- self.check_signum(signal.SIGUSR1, signal.SIGALRM)
+ test()
+ check_signum(signals)
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
- 'need signal.pthread_kill()')
- def test_pending(self):
- signum1 = signal.SIGUSR1
- signum2 = signal.SIGUSR2
- tid = threading.current_thread().ident
+ os.close(read)
+ os.close(write)
+ """.format(signals, test_body)
- old_handler = signal.signal(signum1, self.handler)
- self.addCleanup(signal.signal, signum1, old_handler)
- old_handler = signal.signal(signum2, self.handler)
- self.addCleanup(signal.signal, signum2, old_handler)
+ assert_python_ok('-c', code)
- signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
- signal.pthread_kill(tid, signum1)
- signal.pthread_kill(tid, signum2)
- # Unblocking the 2 signals calls the C signal handler twice
- signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
+ def test_wakeup_fd_early(self):
+ self.check_wakeup("""def test():
+ import select
+ import time
- self.check_signum(signum1, signum2)
+ TIMEOUT_FULL = 10
+ TIMEOUT_HALF = 5
- @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
- 'need signal.pthread_kill()')
- def test_pthread_kill_main_thread(self):
- # Test that a signal can be sent to the main thread with pthread_kill()
- # before any other thread has been created (see issue #12392).
- code = """if True:
- import threading
- import signal
- import sys
+ signal.alarm(1)
+ before_time = time.time()
+ # We attempt to get a signal during the sleep,
+ # before select is called
+ time.sleep(TIMEOUT_FULL)
+ mid_time = time.time()
+ dt = mid_time - before_time
+ assert dt < TIMEOUT_HALF, dt
+ select.select([read], [], [], TIMEOUT_FULL)
+ after_time = time.time()
+ dt = after_time - mid_time
+ assert dt < TIMEOUT_HALF, dt
+ """, signal.SIGALRM)
- def handler(signum, frame):
- sys.exit(3)
+ def test_wakeup_fd_during(self):
+ self.check_wakeup("""def test():
+ import select
+ import time
+ TIMEOUT_FULL = 10
+ TIMEOUT_HALF = 5
+
+ signal.alarm(1)
+ before_time = time.time()
+ # We attempt to get a signal during the select call
+ try:
+ select.select([read], [], [], TIMEOUT_FULL)
+ except select.error:
+ pass
+ else:
+ raise Exception("select.error not raised")
+ after_time = time.time()
+ dt = after_time - before_time
+ assert dt < TIMEOUT_HALF, dt
+ """, signal.SIGALRM)
+
+ def test_signum(self):
+ self.check_wakeup("""def test():
signal.signal(signal.SIGUSR1, handler)
- signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
- sys.exit(1)
- """
+ os.kill(os.getpid(), signal.SIGUSR1)
+ os.kill(os.getpid(), signal.SIGALRM)
+ """, signal.SIGUSR1, signal.SIGALRM)
- with spawn_python('-c', code) as process:
- stdout, stderr = process.communicate()
- exitcode = process.wait()
- if exitcode != 3:
- raise Exception("Child error (exit code %s): %s" %
- (exitcode, stdout))
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ def test_pending(self):
+ self.check_wakeup("""def test():
+ signum1 = signal.SIGUSR1
+ signum2 = signal.SIGUSR2
- def setUp(self):
- import fcntl
+ signal.signal(signum1, handler)
+ signal.signal(signum2, handler)
- self.alrm = signal.signal(signal.SIGALRM, self.handler)
- self.read, self.write = os.pipe()
- flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
- flags = flags | os.O_NONBLOCK
- fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
- self.old_wakeup = signal.set_wakeup_fd(self.write)
+ signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
+ os.kill(os.getpid(), signum1)
+ os.kill(os.getpid(), signum2)
+ # Unblocking the 2 signals calls the C signal handler twice
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
+ """, signal.SIGUSR1, signal.SIGUSR2)
- def tearDown(self):
- signal.set_wakeup_fd(self.old_wakeup)
- os.close(self.read)
- os.close(self.write)
- signal.signal(signal.SIGALRM, self.alrm)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class SiginterruptTest(unittest.TestCase):
@@ -519,60 +517,6 @@ class PendingSignalsTests(unittest.TestCase):
Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
functions.
"""
- def setUp(self):
- self.has_pthread_kill = hasattr(signal, 'pthread_kill')
-
- def handler(self, signum, frame):
- 1/0
-
- def read_sigmask(self):
- return signal.pthread_sigmask(signal.SIG_BLOCK, [])
-
- def can_test_blocked_signals(self, skip):
- """
- Check if a blocked signal can be raised to the main thread without
- calling its signal handler. We need pthread_kill() or exactly one
- thread (the main thread).
-
- Return True if it's possible. Otherwise, return False and print a
- warning if skip is False, or raise a SkipTest exception if skip is
- True.
- """
- if self.has_pthread_kill:
- return True
-
- # The fault handler timeout thread masks all signals. If the main
- # thread masks also SIGUSR1, all threads mask this signal. In this
- # case, if we send SIGUSR1 to the process, the signal is pending in the
- # main or the faulthandler timeout thread. Unblock SIGUSR1 in the main
- # thread calls the signal handler only if the signal is pending for the
- # main thread. Stop the faulthandler timeout thread to workaround this
- # problem.
- import faulthandler
- faulthandler.cancel_dump_tracebacks_later()
-
- # Issue #11998: The _tkinter module loads the Tcl library which
- # creates a thread waiting events in select(). This thread receives
- # signals blocked by all other threads. We cannot test blocked
- # signals
- if '_tkinter' in sys.modules:
- message = ("_tkinter is loaded and pthread_kill() is missing, "
- "cannot test blocked signals (issue #11998)")
- if skip:
- self.skipTest(message)
- else:
- print("WARNING: %s" % message)
- return False
- return True
-
- def kill(self, signum):
- if self.has_pthread_kill:
- tid = threading.get_ident()
- signal.pthread_kill(tid, signum)
- else:
- pid = os.getpid()
- os.kill(pid, signum)
-
@unittest.skipUnless(hasattr(signal, 'sigpending'),
'need signal.sigpending()')
def test_sigpending_empty(self):
@@ -583,70 +527,103 @@ class PendingSignalsTests(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, 'sigpending'),
'need signal.sigpending()')
def test_sigpending(self):
- self.can_test_blocked_signals(True)
+ code = """if 1:
+ import os
+ import signal
- signum = signal.SIGUSR1
- old_handler = signal.signal(signum, self.handler)
- self.addCleanup(signal.signal, signum, old_handler)
+ def handler(signum, frame):
+ 1/0
- signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- self.kill(signum)
- self.assertEqual(signal.sigpending(), {signum})
- with self.assertRaises(ZeroDivisionError):
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ signum = signal.SIGUSR1
+ signal.signal(signum, handler)
+
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ os.kill(os.getpid(), signum)
+ pending = signal.sigpending()
+ assert pending == {signum}, '%s != {%s}' % (pending, signum)
+ try:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ except ZeroDivisionError:
+ pass
+ else:
+ raise Exception("ZeroDivisionError not raised")
+ """
+ assert_python_ok('-c', code)
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
'need signal.pthread_kill()')
def test_pthread_kill(self):
- signum = signal.SIGUSR1
- current = threading.get_ident()
+ code = """if 1:
+ import signal
+ import threading
+ import sys
- old_handler = signal.signal(signum, self.handler)
- self.addCleanup(signal.signal, signum, old_handler)
+ signum = signal.SIGUSR1
+
+ def handler(signum, frame):
+ 1/0
- with self.assertRaises(ZeroDivisionError):
- signal.pthread_kill(current, signum)
+ signal.signal(signum, handler)
+
+ if sys.platform == 'freebsd6':
+ # Issue #12392 and #12469: send a signal to the main thread
+ # doesn't work before the creation of the first thread on
+ # FreeBSD 6
+ def noop():
+ pass
+ thread = threading.Thread(target=noop)
+ thread.start()
+ thread.join()
+
+ tid = threading.get_ident()
+ try:
+ signal.pthread_kill(tid, signum)
+ except ZeroDivisionError:
+ pass
+ else:
+ raise Exception("ZeroDivisionError not raised")
+ """
+ assert_python_ok('-c', code)
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
- def wait_helper(self, test, blocked):
+ def wait_helper(self, blocked, test):
"""
test: body of the "def test(signum):" function.
blocked: number of the blocked signal
"""
- code = '''
-import signal
-import sys
+ code = '''if 1:
+ import signal
+ import sys
-def handler(signum, frame):
- 1/0
+ def handler(signum, frame):
+ 1/0
-def test(signum):
-%s
+ %s
-blocked = %s
-signum = signal.SIGALRM
+ blocked = %s
+ signum = signal.SIGALRM
-# child: block and wait the signal
-try:
- signal.signal(signum, handler)
- signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
+ # child: block and wait the signal
+ try:
+ signal.signal(signum, handler)
+ signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
- # Do the tests
- test(signum)
+ # Do the tests
+ test(signum)
- # The handler must not be called on unblock
- try:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
- except ZeroDivisionError:
- print("the signal handler has been called",
- file=sys.stderr)
- sys.exit(1)
-except BaseException as err:
- print("error: {}".format(err), file=sys.stderr)
- sys.stderr.flush()
- sys.exit(1)
-''' % (test, blocked)
+ # The handler must not be called on unblock
+ try:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
+ except ZeroDivisionError:
+ print("the signal handler has been called",
+ file=sys.stderr)
+ sys.exit(1)
+ except BaseException as err:
+ print("error: {}".format(err), file=sys.stderr)
+ sys.stderr.flush()
+ sys.exit(1)
+ ''' % (test.strip(), blocked)
# sig*wait* must be called with the signal blocked: since the current
# process might have several threads running, use a subprocess to have
@@ -656,61 +633,56 @@ except BaseException as err:
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')
def test_sigwait(self):
- test = '''
+ self.wait_helper(signal.SIGALRM, '''
+ def test(signum):
signal.alarm(1)
received = signal.sigwait([signum])
assert received == signum , 'received %s, not %s' % (received, signum)
- '''
-
- self.wait_helper(test, signal.SIGALRM)
+ ''')
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
'need signal.sigwaitinfo()')
def test_sigwaitinfo(self):
- test = '''
+ self.wait_helper(signal.SIGALRM, '''
+ def test(signum):
signal.alarm(1)
info = signal.sigwaitinfo([signum])
assert info.si_signo == signum, "info.si_signo != %s" % signum
- '''
-
- self.wait_helper(test, signal.SIGALRM)
+ ''')
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()')
def test_sigtimedwait(self):
- test = '''
+ self.wait_helper(signal.SIGALRM, '''
+ def test(signum):
signal.alarm(1)
info = signal.sigtimedwait([signum], (10, 1000))
assert info.si_signo == signum, 'info.si_signo != %s' % signum
- '''
-
- self.wait_helper(test, signal.SIGALRM)
+ ''')
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()')
# issue #12303: sigtimedwait() takes 30 seconds on FreeBSD 6 (kernel bug)
@unittest.skipIf(sys.platform =='freebsd6',
- 'sigtimedwait() with a null timeout doens\'t work on FreeBSD 6')
+ "sigtimedwait() with a null timeout doens't work on FreeBSD 6")
def test_sigtimedwait_poll(self):
# check that polling with sigtimedwait works
- test = '''
+ self.wait_helper(signal.SIGALRM, '''
+ def test(signum):
import os
os.kill(os.getpid(), signum)
info = signal.sigtimedwait([signum], (0, 0))
assert info.si_signo == signum, 'info.si_signo != %s' % signum
- '''
-
- self.wait_helper(test, signal.SIGALRM)
+ ''')
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()')
def test_sigtimedwait_timeout(self):
- test = '''
+ self.wait_helper(signal.SIGALRM, '''
+ def test(signum):
received = signal.sigtimedwait([signum], (1, 0))
assert received is None, "received=%r" % (received,)
- '''
-
- self.wait_helper(test, signal.SIGALRM)
+ ''')
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()')
@@ -723,7 +695,8 @@ except BaseException as err:
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
'need signal.sigwaitinfo()')
def test_sigwaitinfo_interrupted(self):
- test = '''
+ self.wait_helper(signal.SIGUSR1, '''
+ def test(signum):
import errno
hndl_called = True
@@ -741,9 +714,7 @@ except BaseException as err:
raise Exception("Expected EINTR to be raised by sigwaitinfo")
else:
raise Exception("Expected EINTR to be raised by sigwaitinfo")
- '''
-
- self.wait_helper(test, signal.SIGUSR1)
+ ''')
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')
@@ -791,46 +762,93 @@ except BaseException as err:
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
def test_pthread_sigmask(self):
- test_blocked_signals = self.can_test_blocked_signals(False)
+ code = """if 1:
+ import signal
+ import os; import threading
+
+ def handler(signum, frame):
+ 1/0
+
+ def kill(signum):
+ os.kill(os.getpid(), signum)
+
+ def read_sigmask():
+ return signal.pthread_sigmask(signal.SIG_BLOCK, [])
+
signum = signal.SIGUSR1
# Install our signal handler
- old_handler = signal.signal(signum, self.handler)
- self.addCleanup(signal.signal, signum, old_handler)
+ old_handler = signal.signal(signum, handler)
# Unblock SIGUSR1 (and copy the old mask) to test our signal handler
old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask)
- with self.assertRaises(ZeroDivisionError):
- self.kill(signum)
+ try:
+ kill(signum)
+ except ZeroDivisionError:
+ pass
+ else:
+ raise Exception("ZeroDivisionError not raised")
# Block and then raise SIGUSR1. The signal is blocked: the signal
# handler is not called, and the signal is now pending
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- if test_blocked_signals:
- self.kill(signum)
+ kill(signum)
# Check the new mask
- blocked = self.read_sigmask()
- self.assertIn(signum, blocked)
- self.assertEqual(old_mask ^ blocked, {signum})
+ blocked = read_sigmask()
+ assert signum in blocked, "%s not in %s" % (signum, blocked)
+ assert old_mask ^ blocked == {signum}, "%s ^ %s != {%s}" % (old_mask, blocked, signum)
# Unblock SIGUSR1
- if test_blocked_signals:
- with self.assertRaises(ZeroDivisionError):
- # unblock the pending signal calls immediatly the signal handler
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- else:
+ try:
+ # unblock the pending signal calls immediatly the signal handler
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- with self.assertRaises(ZeroDivisionError):
- self.kill(signum)
+ except ZeroDivisionError:
+ pass
+ else:
+ raise Exception("ZeroDivisionError not raised")
+ try:
+ kill(signum)
+ except ZeroDivisionError:
+ pass
+ else:
+ raise Exception("ZeroDivisionError not raised")
# Check the new mask
- unblocked = self.read_sigmask()
- self.assertNotIn(signum, unblocked)
- self.assertEqual(blocked ^ unblocked, {signum})
- self.assertSequenceEqual(old_mask, unblocked)
- # Finally, restore the previous signal handler and the signal mask
+ unblocked = read_sigmask()
+ assert signum not in unblocked, "%s in %s" % (signum, unblocked)
+ assert blocked ^ unblocked == {signum}, "%s ^ %s != {%s}" % (blocked, unblocked, signum)
+ assert old_mask == unblocked, "%s != %s" % (old_mask, unblocked)
+ """
+ assert_python_ok('-c', code)
+
+ @unittest.skipIf(sys.platform == 'freebsd6',
+ "issue #12392: send a signal to the main thread doesn't work "
+ "before the creation of the first thread on FreeBSD 6")
+ @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
+ 'need signal.pthread_kill()')
+ def test_pthread_kill_main_thread(self):
+ # Test that a signal can be sent to the main thread with pthread_kill()
+ # before any other thread has been created (see issue #12392).
+ code = """if True:
+ import threading
+ import signal
+ import sys
+
+ def handler(signum, frame):
+ sys.exit(3)
+
+ signal.signal(signal.SIGUSR1, handler)
+ signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
+ sys.exit(2)
+ """
+
+ with spawn_python('-c', code) as process:
+ stdout, stderr = process.communicate()
+ exitcode = process.wait()
+ if exitcode != 3:
+ raise Exception("Child error (exit code %s): %s" %
+ (exitcode, stdout))
def test_main():