summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_signal.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r--Lib/test/test_signal.py462
1 files changed, 348 insertions, 114 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index f64bd4c..8a5a408 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -5,9 +5,15 @@ import gc
import pickle
import select
import signal
+import struct
import subprocess
import traceback
import sys, os, time, errno
+from test.script_helper import assert_python_ok, spawn_python
+try:
+ import threading
+except ImportError:
+ threading = None
if sys.platform in ('os2', 'riscos'):
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
@@ -53,15 +59,9 @@ class InterProcessSignalTests(unittest.TestCase):
def handlerA(self, signum, frame):
self.a_called = True
- if support.verbose:
- print("handlerA invoked from signal %s at:\n%s" % (
- signum, self.format_frame(frame, limit=1)))
def handlerB(self, signum, frame):
self.b_called = True
- if support.verbose:
- print ("handlerB invoked from signal %s at:\n%s" % (
- signum, self.format_frame(frame, limit=1)))
raise HandlerBCalled(signum, self.format_frame(frame))
def wait(self, child):
@@ -88,8 +88,6 @@ class InterProcessSignalTests(unittest.TestCase):
# Let the sub-processes know who to send signals to.
pid = os.getpid()
- if support.verbose:
- print("test runner's pid is", pid)
child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
if child:
@@ -113,8 +111,6 @@ class InterProcessSignalTests(unittest.TestCase):
except HandlerBCalled:
self.assertTrue(self.b_called)
self.assertFalse(self.a_called)
- if support.verbose:
- print("HandlerBCalled exception caught")
child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
if child:
@@ -130,8 +126,7 @@ class InterProcessSignalTests(unittest.TestCase):
# may return early.
time.sleep(1)
except KeyboardInterrupt:
- if support.verbose:
- print("KeyboardInterrupt (the alarm() went off)")
+ pass
except:
self.fail("Some other exception woke us from pause: %s" %
traceback.format_exc())
@@ -187,7 +182,7 @@ class InterProcessSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
-class BasicSignalTests(unittest.TestCase):
+class PosixTests(unittest.TestCase):
def trivial_signal_handler(self, *args):
pass
@@ -232,6 +227,18 @@ class WakeupSignalTests(unittest.TestCase):
TIMEOUT_FULL = 10
TIMEOUT_HALF = 5
+ def handler(self, signum, frame):
+ pass
+
+ def check_signum(self, *signals):
+ data = os.read(self.read, len(signals)+1)
+ raised = struct.unpack('%uB' % len(data), data)
+ # We don't care of the signal delivery order (it's not portable or
+ # reliable)
+ raised = set(raised)
+ signals = set(signals)
+ self.assertEqual(raised, signals)
+
def test_wakeup_fd_early(self):
import select
@@ -245,6 +252,7 @@ class WakeupSignalTests(unittest.TestCase):
select.select([self.read], [], [], self.TIMEOUT_FULL)
after_time = time.time()
self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
+ self.check_signum(signal.SIGALRM)
def test_wakeup_fd_during(self):
import select
@@ -256,11 +264,41 @@ class WakeupSignalTests(unittest.TestCase):
[self.read], [], [], self.TIMEOUT_FULL)
after_time = time.time()
self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
+ self.check_signum(signal.SIGALRM)
+
+ def test_signum(self):
+ old_handler = signal.signal(signal.SIGUSR1, self.handler)
+ self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
+ os.kill(os.getpid(), signal.SIGUSR1)
+ os.kill(os.getpid(), signal.SIGALRM)
+ self.check_signum(signal.SIGUSR1, signal.SIGALRM)
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
+ 'need signal.pthread_kill()')
+ def test_pending(self):
+ signum1 = signal.SIGUSR1
+ signum2 = signal.SIGUSR2
+ tid = threading.current_thread().ident
+
+ old_handler = signal.signal(signum1, self.handler)
+ self.addCleanup(signal.signal, signum1, old_handler)
+ old_handler = signal.signal(signum2, self.handler)
+ self.addCleanup(signal.signal, signum2, old_handler)
+
+ signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
+ signal.pthread_kill(tid, signum1)
+ signal.pthread_kill(tid, signum2)
+ # Unblocking the 2 signals calls the C signal handler twice
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
+
+ self.check_signum(signum1, signum2)
def setUp(self):
import fcntl
- self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
+ self.alrm = signal.signal(signal.SIGALRM, self.handler)
self.read, self.write = os.pipe()
flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
@@ -276,103 +314,83 @@ class WakeupSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class SiginterruptTest(unittest.TestCase):
- def setUp(self):
- """Install a no-op signal handler that can be set to allow
- interrupts or not, and arrange for the original signal handler to be
- re-installed when the test is finished.
- """
- self.signum = signal.SIGUSR1
- oldhandler = signal.signal(self.signum, lambda x,y: None)
- self.addCleanup(signal.signal, self.signum, oldhandler)
-
- def readpipe_interrupted(self):
+ def readpipe_interrupted(self, interrupt):
"""Perform a read during which a signal will arrive. Return True if the
read is interrupted by the signal and raises an exception. Return False
if it returns normally.
"""
- # Create a pipe that can be used for the read. Also clean it up
- # when the test is over, since nothing else will (but see below for
- # the write end).
- r, w = os.pipe()
- self.addCleanup(os.close, r)
-
- # Create another process which can send a signal to this one to try
- # to interrupt the read.
- ppid = os.getpid()
- pid = os.fork()
-
- if pid == 0:
- # Child code: sleep to give the parent enough time to enter the
- # read() call (there's a race here, but it's really tricky to
- # eliminate it); then signal the parent process. Also, sleep
- # again to make it likely that the signal is delivered to the
- # parent process before the child exits. If the child exits
- # first, the write end of the pipe will be closed and the test
- # is invalid.
- try:
- time.sleep(0.2)
- os.kill(ppid, self.signum)
- time.sleep(0.2)
- finally:
- # No matter what, just exit as fast as possible now.
- exit_subprocess()
- else:
- # Parent code.
- # Make sure the child is eventually reaped, else it'll be a
- # zombie for the rest of the test suite run.
- self.addCleanup(os.waitpid, pid, 0)
-
- # Close the write end of the pipe. The child has a copy, so
- # it's not really closed until the child exits. We need it to
- # close when the child exits so that in the non-interrupt case
- # the read eventually completes, otherwise we could just close
- # it *after* the test.
- os.close(w)
-
- # Try the read and report whether it is interrupted or not to
- # the caller.
+ # use a subprocess to have only one thread, to have a timeout on the
+ # blocking read and to not touch signal handling in this process
+ code = """if 1:
+ import errno
+ import os
+ import signal
+ import sys
+
+ interrupt = %r
+ r, w = os.pipe()
+
+ def handler(signum, frame):
+ pass
+
+ print("ready")
+ sys.stdout.flush()
+
+ signal.signal(signal.SIGALRM, handler)
+ if interrupt is not None:
+ signal.siginterrupt(signal.SIGALRM, interrupt)
+
+ # run the test twice
+ for loop in range(2):
+ # send a SIGALRM in a second (during the read)
+ signal.alarm(1)
+ try:
+ # blocking call: read from a pipe without data
+ os.read(r, 1)
+ except OSError as err:
+ if err.errno != errno.EINTR:
+ raise
+ else:
+ sys.exit(2)
+ sys.exit(3)
+ """ % (interrupt,)
+ with spawn_python('-c', code) as process:
try:
- d = os.read(r, 1)
+ # wait until the child process is loaded and has started
+ first_line = process.stdout.readline()
+
+ stdout, stderr = process.communicate(timeout=3.0)
+ except subprocess.TimeoutExpired:
+ process.kill()
return False
- except OSError as err:
- if err.errno != errno.EINTR:
- raise
- return True
+ else:
+ stdout = first_line + stdout
+ exitcode = process.wait()
+ if exitcode not in (2, 3):
+ raise Exception("Child error (exit code %s): %s"
+ % (exitcode, stdout))
+ return (exitcode == 3)
def test_without_siginterrupt(self):
- """If a signal handler is installed and siginterrupt is not called
- at all, when that signal arrives, it interrupts a syscall that's in
- progress.
- """
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
+ # If a signal handler is installed and siginterrupt is not called
+ # at all, when that signal arrives, it interrupts a syscall that's in
+ # progress.
+ interrupted = self.readpipe_interrupted(None)
+ self.assertTrue(interrupted)
def test_siginterrupt_on(self):
- """If a signal handler is installed and siginterrupt is called with
- a true value for the second argument, when that signal arrives, it
- interrupts a syscall that's in progress.
- """
- signal.siginterrupt(self.signum, 1)
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
+ # If a signal handler is installed and siginterrupt is called with
+ # a true value for the second argument, when that signal arrives, it
+ # interrupts a syscall that's in progress.
+ interrupted = self.readpipe_interrupted(True)
+ self.assertTrue(interrupted)
def test_siginterrupt_off(self):
- """If a signal handler is installed and siginterrupt is called with
- a false value for the second argument, when that signal arrives, it
- does not interrupt a syscall that's in progress.
- """
- signal.siginterrupt(self.signum, 0)
- i = self.readpipe_interrupted()
- self.assertFalse(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertFalse(i)
+ # If a signal handler is installed and siginterrupt is called with
+ # a false value for the second argument, when that signal arrives, it
+ # does not interrupt a syscall that's in progress.
+ interrupted = self.readpipe_interrupted(False)
+ self.assertFalse(interrupted)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
@@ -391,8 +409,6 @@ class ItimerTest(unittest.TestCase):
def sig_alrm(self, *args):
self.hndl_called = True
- if support.verbose:
- print("SIGALRM handler invoked", args)
def sig_vtalrm(self, *args):
self.hndl_called = True
@@ -404,21 +420,13 @@ class ItimerTest(unittest.TestCase):
elif self.hndl_count == 3:
# disable ITIMER_VIRTUAL, this function shouldn't be called anymore
signal.setitimer(signal.ITIMER_VIRTUAL, 0)
- if support.verbose:
- print("last SIGVTALRM handler call")
self.hndl_count += 1
- if support.verbose:
- print("SIGVTALRM handler invoked", args)
-
def sig_prof(self, *args):
self.hndl_called = True
signal.setitimer(signal.ITIMER_PROF, 0)
- if support.verbose:
- print("SIGPROF handler invoked", args)
-
def test_itimer_exc(self):
# XXX I'm assuming -1 is an invalid itimer, but maybe some platform
# defines it ?
@@ -431,10 +439,7 @@ class ItimerTest(unittest.TestCase):
def test_itimer_real(self):
self.itimer = signal.ITIMER_REAL
signal.setitimer(self.itimer, 1.0)
- if support.verbose:
- print("\ncall pause()...")
signal.pause()
-
self.assertEqual(self.hndl_called, True)
# Issue 3864, unknown if this affects earlier versions of freebsd also
@@ -483,11 +488,240 @@ class ItimerTest(unittest.TestCase):
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
+
+class PendingSignalsTests(unittest.TestCase):
+ """
+ Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
+ functions.
+ """
+ def setUp(self):
+ self.has_pthread_kill = hasattr(signal, 'pthread_kill')
+
+ def handler(self, signum, frame):
+ 1/0
+
+ def read_sigmask(self):
+ return signal.pthread_sigmask(signal.SIG_BLOCK, [])
+
+ def can_test_blocked_signals(self, skip):
+ """
+ Check if a blocked signal can be raised to the main thread without
+ calling its signal handler. We need pthread_kill() or exactly one
+ thread (the main thread).
+
+ Return True if it's possible. Otherwise, return False and print a
+ warning if skip is False, or raise a SkipTest exception if skip is
+ True.
+ """
+ if self.has_pthread_kill:
+ return True
+
+ # The fault handler timeout thread masks all signals. If the main
+ # thread masks also SIGUSR1, all threads mask this signal. In this
+ # case, if we send SIGUSR1 to the process, the signal is pending in the
+ # main or the faulthandler timeout thread. Unblock SIGUSR1 in the main
+ # thread calls the signal handler only if the signal is pending for the
+ # main thread. Stop the faulthandler timeout thread to workaround this
+ # problem.
+ import faulthandler
+ faulthandler.cancel_dump_tracebacks_later()
+
+ # Issue #11998: The _tkinter module loads the Tcl library which
+ # creates a thread waiting events in select(). This thread receives
+ # signals blocked by all other threads. We cannot test blocked
+ # signals
+ if '_tkinter' in sys.modules:
+ message = ("_tkinter is loaded and pthread_kill() is missing, "
+ "cannot test blocked signals (issue #11998)")
+ if skip:
+ self.skipTest(message)
+ else:
+ print("WARNING: %s" % message)
+ return False
+ return True
+
+ def kill(self, signum):
+ if self.has_pthread_kill:
+ tid = threading.get_ident()
+ signal.pthread_kill(tid, signum)
+ else:
+ pid = os.getpid()
+ os.kill(pid, signum)
+
+ @unittest.skipUnless(hasattr(signal, 'sigpending'),
+ 'need signal.sigpending()')
+ def test_sigpending_empty(self):
+ self.assertEqual(signal.sigpending(), set())
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ @unittest.skipUnless(hasattr(signal, 'sigpending'),
+ 'need signal.sigpending()')
+ def test_sigpending(self):
+ self.can_test_blocked_signals(True)
+
+ signum = signal.SIGUSR1
+ old_handler = signal.signal(signum, self.handler)
+ self.addCleanup(signal.signal, signum, old_handler)
+
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ self.kill(signum)
+ self.assertEqual(signal.sigpending(), {signum})
+ with self.assertRaises(ZeroDivisionError):
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
+ 'need signal.pthread_kill()')
+ def test_pthread_kill(self):
+ signum = signal.SIGUSR1
+ current = threading.get_ident()
+
+ old_handler = signal.signal(signum, self.handler)
+ self.addCleanup(signal.signal, signum, old_handler)
+
+ with self.assertRaises(ZeroDivisionError):
+ signal.pthread_kill(current, signum)
+
+ @unittest.skipUnless(hasattr(signal, 'sigwait'),
+ 'need signal.sigwait()')
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
+ def test_sigwait(self):
+ def test(signum):
+ signal.alarm(1)
+ received = signal.sigwait([signum])
+ if received != signum:
+ print("sigwait() received %s, not %s"
+ % (received, signum),
+ file=sys.stderr)
+ os._exit(1)
+
+ signum = signal.SIGALRM
+
+ # sigwait must be called with the signal blocked: since the current
+ # process might have several threads running, we fork() a child process
+ # to have a single thread.
+ pid = os.fork()
+ if pid == 0:
+ # child: block and wait the signal
+ try:
+ signal.signal(signum, self.handler)
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+
+ # Do the tests
+ test(signum)
+
+ # The handler must not be called on unblock
+ try:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ except ZeroDivisionError:
+ print("the signal handler has been called",
+ file=sys.stderr)
+ os._exit(1)
+ except BaseException as err:
+ print("error: {}".format(err), file=sys.stderr)
+ os._exit(1)
+ else:
+ os._exit(0)
+ else:
+ # parent: check that the child correcty received the signal
+ self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+
+ @unittest.skipUnless(hasattr(signal, 'sigwait'),
+ 'need signal.sigwait()')
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ @unittest.skipIf(threading is None, "test needs threading module")
+ def test_sigwait_thread(self):
+ # Check that calling sigwait() from a thread doesn't suspend the whole
+ # process. A new interpreter is spawned to avoid problems when mixing
+ # threads and fork(): only async-safe functions are allowed between
+ # fork() and exec().
+ assert_python_ok("-c", """if True:
+ import os, threading, sys, time, signal
+
+ # the default handler terminates the process
+ signum = signal.SIGUSR1
+
+ def kill_later():
+ # wait until the main thread is waiting in sigwait()
+ time.sleep(1)
+ os.kill(os.getpid(), signum)
+
+ # the signal must be blocked by all the threads
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ killer = threading.Thread(target=kill_later)
+ killer.start()
+ received = signal.sigwait([signum])
+ if received != signum:
+ print("sigwait() received %s, not %s" % (received, signum),
+ file=sys.stderr)
+ sys.exit(1)
+ killer.join()
+ # unblock the signal, which should have been cleared by sigwait()
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ """)
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ def test_pthread_sigmask_arguments(self):
+ self.assertRaises(TypeError, signal.pthread_sigmask)
+ self.assertRaises(TypeError, signal.pthread_sigmask, 1)
+ self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
+ self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
+
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ def test_pthread_sigmask(self):
+ test_blocked_signals = self.can_test_blocked_signals(False)
+ signum = signal.SIGUSR1
+
+ # Install our signal handler
+ old_handler = signal.signal(signum, self.handler)
+ self.addCleanup(signal.signal, signum, old_handler)
+
+ # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
+ old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask)
+ with self.assertRaises(ZeroDivisionError):
+ self.kill(signum)
+
+ # Block and then raise SIGUSR1. The signal is blocked: the signal
+ # handler is not called, and the signal is now pending
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ if test_blocked_signals:
+ self.kill(signum)
+
+ # Check the new mask
+ blocked = self.read_sigmask()
+ self.assertIn(signum, blocked)
+ self.assertEqual(old_mask ^ blocked, {signum})
+
+ # Unblock SIGUSR1
+ if test_blocked_signals:
+ with self.assertRaises(ZeroDivisionError):
+ # unblock the pending signal calls immediatly the signal handler
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ else:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ with self.assertRaises(ZeroDivisionError):
+ self.kill(signum)
+
+ # Check the new mask
+ unblocked = self.read_sigmask()
+ self.assertNotIn(signum, unblocked)
+ self.assertEqual(blocked ^ unblocked, {signum})
+ self.assertSequenceEqual(old_mask, unblocked)
+ # Finally, restore the previous signal handler and the signal mask
+
+
def test_main():
try:
- support.run_unittest(BasicSignalTests, InterProcessSignalTests,
+ support.run_unittest(PosixTests, InterProcessSignalTests,
WakeupSignalTests, SiginterruptTest,
- ItimerTest, WindowsSignalTests)
+ ItimerTest, WindowsSignalTests,
+ PendingSignalsTests)
finally:
support.reap_children()