diff options
Diffstat (limited to 'Lib/test/test_signal.py')
| -rw-r--r-- | Lib/test/test_signal.py | 554 |
1 files changed, 440 insertions, 114 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index f64bd4c..8d8a15d 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -5,9 +5,15 @@ import gc import pickle import select import signal +import struct import subprocess import traceback import sys, os, time, errno +from test.script_helper import assert_python_ok, spawn_python +try: + import threading +except ImportError: + threading = None if sys.platform in ('os2', 'riscos'): raise unittest.SkipTest("Can't test signal on %s" % sys.platform) @@ -53,15 +59,9 @@ class InterProcessSignalTests(unittest.TestCase): def handlerA(self, signum, frame): self.a_called = True - if support.verbose: - print("handlerA invoked from signal %s at:\n%s" % ( - signum, self.format_frame(frame, limit=1))) def handlerB(self, signum, frame): self.b_called = True - if support.verbose: - print ("handlerB invoked from signal %s at:\n%s" % ( - signum, self.format_frame(frame, limit=1))) raise HandlerBCalled(signum, self.format_frame(frame)) def wait(self, child): @@ -88,8 +88,6 @@ class InterProcessSignalTests(unittest.TestCase): # Let the sub-processes know who to send signals to. pid = os.getpid() - if support.verbose: - print("test runner's pid is", pid) child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) if child: @@ -113,8 +111,6 @@ class InterProcessSignalTests(unittest.TestCase): except HandlerBCalled: self.assertTrue(self.b_called) self.assertFalse(self.a_called) - if support.verbose: - print("HandlerBCalled exception caught") child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) if child: @@ -130,8 +126,7 @@ class InterProcessSignalTests(unittest.TestCase): # may return early. time.sleep(1) except KeyboardInterrupt: - if support.verbose: - print("KeyboardInterrupt (the alarm() went off)") + pass except: self.fail("Some other exception woke us from pause: %s" % traceback.format_exc()) @@ -187,7 +182,7 @@ class InterProcessSignalTests(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") -class BasicSignalTests(unittest.TestCase): +class PosixTests(unittest.TestCase): def trivial_signal_handler(self, *args): pass @@ -232,6 +227,18 @@ class WakeupSignalTests(unittest.TestCase): TIMEOUT_FULL = 10 TIMEOUT_HALF = 5 + def handler(self, signum, frame): + pass + + def check_signum(self, *signals): + data = os.read(self.read, len(signals)+1) + raised = struct.unpack('%uB' % len(data), data) + # We don't care of the signal delivery order (it's not portable or + # reliable) + raised = set(raised) + signals = set(signals) + self.assertEqual(raised, signals) + def test_wakeup_fd_early(self): import select @@ -245,6 +252,7 @@ class WakeupSignalTests(unittest.TestCase): select.select([self.read], [], [], self.TIMEOUT_FULL) after_time = time.time() self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) + self.check_signum(signal.SIGALRM) def test_wakeup_fd_during(self): import select @@ -256,11 +264,66 @@ class WakeupSignalTests(unittest.TestCase): [self.read], [], [], self.TIMEOUT_FULL) after_time = time.time() self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) + self.check_signum(signal.SIGALRM) + + def test_signum(self): + old_handler = signal.signal(signal.SIGUSR1, self.handler) + self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) + os.kill(os.getpid(), signal.SIGUSR1) + os.kill(os.getpid(), signal.SIGALRM) + self.check_signum(signal.SIGUSR1, signal.SIGALRM) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + @unittest.skipUnless(hasattr(signal, 'pthread_kill'), + 'need signal.pthread_kill()') + def test_pending(self): + signum1 = signal.SIGUSR1 + signum2 = signal.SIGUSR2 + tid = threading.current_thread().ident + + old_handler = signal.signal(signum1, self.handler) + self.addCleanup(signal.signal, signum1, old_handler) + old_handler = signal.signal(signum2, self.handler) + self.addCleanup(signal.signal, signum2, old_handler) + + signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) + signal.pthread_kill(tid, signum1) + signal.pthread_kill(tid, signum2) + # Unblocking the 2 signals calls the C signal handler twice + signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) + + self.check_signum(signum1, signum2) + + @unittest.skipUnless(hasattr(signal, 'pthread_kill'), + 'need signal.pthread_kill()') + def test_pthread_kill_main_thread(self): + # Test that a signal can be sent to the main thread with pthread_kill() + # before any other thread has been created (see issue #12392). + code = """if True: + import threading + import signal + import sys + + def handler(signum, frame): + sys.exit(3) + + signal.signal(signal.SIGUSR1, handler) + signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) + sys.exit(1) + """ + + with spawn_python('-c', code) as process: + stdout, stderr = process.communicate() + exitcode = process.wait() + if exitcode != 3: + raise Exception("Child error (exit code %s): %s" % + (exitcode, stdout)) def setUp(self): import fcntl - self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) + self.alrm = signal.signal(signal.SIGALRM, self.handler) self.read, self.write = os.pipe() flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0) flags = flags | os.O_NONBLOCK @@ -276,103 +339,83 @@ class WakeupSignalTests(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class SiginterruptTest(unittest.TestCase): - def setUp(self): - """Install a no-op signal handler that can be set to allow - interrupts or not, and arrange for the original signal handler to be - re-installed when the test is finished. - """ - self.signum = signal.SIGUSR1 - oldhandler = signal.signal(self.signum, lambda x,y: None) - self.addCleanup(signal.signal, self.signum, oldhandler) - - def readpipe_interrupted(self): + def readpipe_interrupted(self, interrupt): """Perform a read during which a signal will arrive. Return True if the read is interrupted by the signal and raises an exception. Return False if it returns normally. """ - # Create a pipe that can be used for the read. Also clean it up - # when the test is over, since nothing else will (but see below for - # the write end). - r, w = os.pipe() - self.addCleanup(os.close, r) - - # Create another process which can send a signal to this one to try - # to interrupt the read. - ppid = os.getpid() - pid = os.fork() - - if pid == 0: - # Child code: sleep to give the parent enough time to enter the - # read() call (there's a race here, but it's really tricky to - # eliminate it); then signal the parent process. Also, sleep - # again to make it likely that the signal is delivered to the - # parent process before the child exits. If the child exits - # first, the write end of the pipe will be closed and the test - # is invalid. - try: - time.sleep(0.2) - os.kill(ppid, self.signum) - time.sleep(0.2) - finally: - # No matter what, just exit as fast as possible now. - exit_subprocess() - else: - # Parent code. - # Make sure the child is eventually reaped, else it'll be a - # zombie for the rest of the test suite run. - self.addCleanup(os.waitpid, pid, 0) - - # Close the write end of the pipe. The child has a copy, so - # it's not really closed until the child exits. We need it to - # close when the child exits so that in the non-interrupt case - # the read eventually completes, otherwise we could just close - # it *after* the test. - os.close(w) - - # Try the read and report whether it is interrupted or not to - # the caller. + # use a subprocess to have only one thread, to have a timeout on the + # blocking read and to not touch signal handling in this process + code = """if 1: + import errno + import os + import signal + import sys + + interrupt = %r + r, w = os.pipe() + + def handler(signum, frame): + pass + + print("ready") + sys.stdout.flush() + + signal.signal(signal.SIGALRM, handler) + if interrupt is not None: + signal.siginterrupt(signal.SIGALRM, interrupt) + + # run the test twice + for loop in range(2): + # send a SIGALRM in a second (during the read) + signal.alarm(1) + try: + # blocking call: read from a pipe without data + os.read(r, 1) + except OSError as err: + if err.errno != errno.EINTR: + raise + else: + sys.exit(2) + sys.exit(3) + """ % (interrupt,) + with spawn_python('-c', code) as process: try: - d = os.read(r, 1) + # wait until the child process is loaded and has started + first_line = process.stdout.readline() + + stdout, stderr = process.communicate(timeout=3.0) + except subprocess.TimeoutExpired: + process.kill() return False - except OSError as err: - if err.errno != errno.EINTR: - raise - return True + else: + stdout = first_line + stdout + exitcode = process.wait() + if exitcode not in (2, 3): + raise Exception("Child error (exit code %s): %s" + % (exitcode, stdout)) + return (exitcode == 3) def test_without_siginterrupt(self): - """If a signal handler is installed and siginterrupt is not called - at all, when that signal arrives, it interrupts a syscall that's in - progress. - """ - i = self.readpipe_interrupted() - self.assertTrue(i) - # Arrival of the signal shouldn't have changed anything. - i = self.readpipe_interrupted() - self.assertTrue(i) + # If a signal handler is installed and siginterrupt is not called + # at all, when that signal arrives, it interrupts a syscall that's in + # progress. + interrupted = self.readpipe_interrupted(None) + self.assertTrue(interrupted) def test_siginterrupt_on(self): - """If a signal handler is installed and siginterrupt is called with - a true value for the second argument, when that signal arrives, it - interrupts a syscall that's in progress. - """ - signal.siginterrupt(self.signum, 1) - i = self.readpipe_interrupted() - self.assertTrue(i) - # Arrival of the signal shouldn't have changed anything. - i = self.readpipe_interrupted() - self.assertTrue(i) + # If a signal handler is installed and siginterrupt is called with + # a true value for the second argument, when that signal arrives, it + # interrupts a syscall that's in progress. + interrupted = self.readpipe_interrupted(True) + self.assertTrue(interrupted) def test_siginterrupt_off(self): - """If a signal handler is installed and siginterrupt is called with - a false value for the second argument, when that signal arrives, it - does not interrupt a syscall that's in progress. - """ - signal.siginterrupt(self.signum, 0) - i = self.readpipe_interrupted() - self.assertFalse(i) - # Arrival of the signal shouldn't have changed anything. - i = self.readpipe_interrupted() - self.assertFalse(i) + # If a signal handler is installed and siginterrupt is called with + # a false value for the second argument, when that signal arrives, it + # does not interrupt a syscall that's in progress. + interrupted = self.readpipe_interrupted(False) + self.assertFalse(interrupted) @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") @@ -391,8 +434,6 @@ class ItimerTest(unittest.TestCase): def sig_alrm(self, *args): self.hndl_called = True - if support.verbose: - print("SIGALRM handler invoked", args) def sig_vtalrm(self, *args): self.hndl_called = True @@ -404,21 +445,13 @@ class ItimerTest(unittest.TestCase): elif self.hndl_count == 3: # disable ITIMER_VIRTUAL, this function shouldn't be called anymore signal.setitimer(signal.ITIMER_VIRTUAL, 0) - if support.verbose: - print("last SIGVTALRM handler call") self.hndl_count += 1 - if support.verbose: - print("SIGVTALRM handler invoked", args) - def sig_prof(self, *args): self.hndl_called = True signal.setitimer(signal.ITIMER_PROF, 0) - if support.verbose: - print("SIGPROF handler invoked", args) - def test_itimer_exc(self): # XXX I'm assuming -1 is an invalid itimer, but maybe some platform # defines it ? @@ -431,10 +464,7 @@ class ItimerTest(unittest.TestCase): def test_itimer_real(self): self.itimer = signal.ITIMER_REAL signal.setitimer(self.itimer, 1.0) - if support.verbose: - print("\ncall pause()...") signal.pause() - self.assertEqual(self.hndl_called, True) # Issue 3864, unknown if this affects earlier versions of freebsd also @@ -483,11 +513,307 @@ class ItimerTest(unittest.TestCase): # and the handler should have been called self.assertEqual(self.hndl_called, True) + +class PendingSignalsTests(unittest.TestCase): + """ + Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() + functions. + """ + def setUp(self): + self.hndl_called = False + self.has_pthread_kill = hasattr(signal, 'pthread_kill') + + def handler(self, signum, frame): + 1/0 + + def read_sigmask(self): + return signal.pthread_sigmask(signal.SIG_BLOCK, []) + + def can_test_blocked_signals(self, skip): + """ + Check if a blocked signal can be raised to the main thread without + calling its signal handler. We need pthread_kill() or exactly one + thread (the main thread). + + Return True if it's possible. Otherwise, return False and print a + warning if skip is False, or raise a SkipTest exception if skip is + True. + """ + if self.has_pthread_kill: + return True + + # The fault handler timeout thread masks all signals. If the main + # thread masks also SIGUSR1, all threads mask this signal. In this + # case, if we send SIGUSR1 to the process, the signal is pending in the + # main or the faulthandler timeout thread. Unblock SIGUSR1 in the main + # thread calls the signal handler only if the signal is pending for the + # main thread. Stop the faulthandler timeout thread to workaround this + # problem. + import faulthandler + faulthandler.cancel_dump_tracebacks_later() + + # Issue #11998: The _tkinter module loads the Tcl library which + # creates a thread waiting events in select(). This thread receives + # signals blocked by all other threads. We cannot test blocked + # signals + if '_tkinter' in sys.modules: + message = ("_tkinter is loaded and pthread_kill() is missing, " + "cannot test blocked signals (issue #11998)") + if skip: + self.skipTest(message) + else: + print("WARNING: %s" % message) + return False + return True + + def kill(self, signum): + if self.has_pthread_kill: + tid = threading.get_ident() + signal.pthread_kill(tid, signum) + else: + pid = os.getpid() + os.kill(pid, signum) + + @unittest.skipUnless(hasattr(signal, 'sigpending'), + 'need signal.sigpending()') + def test_sigpending_empty(self): + self.assertEqual(signal.sigpending(), set()) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + @unittest.skipUnless(hasattr(signal, 'sigpending'), + 'need signal.sigpending()') + def test_sigpending(self): + self.can_test_blocked_signals(True) + + signum = signal.SIGUSR1 + old_handler = signal.signal(signum, self.handler) + self.addCleanup(signal.signal, signum, old_handler) + + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + self.kill(signum) + self.assertEqual(signal.sigpending(), {signum}) + with self.assertRaises(ZeroDivisionError): + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + + @unittest.skipUnless(hasattr(signal, 'pthread_kill'), + 'need signal.pthread_kill()') + def test_pthread_kill(self): + signum = signal.SIGUSR1 + current = threading.get_ident() + + old_handler = signal.signal(signum, self.handler) + self.addCleanup(signal.signal, signum, old_handler) + + with self.assertRaises(ZeroDivisionError): + signal.pthread_kill(current, signum) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') + def wait_helper(self, test, handler, blocked): + signum = signal.SIGALRM + + # sig*wait* must be called with the signal blocked: since the current + # process might have several threads running, we fork() a child process + # to have a single thread. + pid = os.fork() + if pid == 0: + # child: block and wait the signal + try: + signal.signal(signum, handler) + signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) + + # Do the tests + test(signum) + + # The handler must not be called on unblock + try: + signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) + except ZeroDivisionError: + print("the signal handler has been called", + file=sys.stderr) + os._exit(1) + except BaseException as err: + print("error: {}".format(err), file=sys.stderr) + sys.stderr.flush() + os._exit(1) + else: + os._exit(0) + else: + # parent: check that the child correcty received the signal + self.assertEqual(os.waitpid(pid, 0), (pid, 0)) + + @unittest.skipUnless(hasattr(signal, 'sigwait'), + 'need signal.sigwait()') + def test_sigwait(self): + def test(signum): + signal.alarm(1) + self.assertEqual(signum, signal.sigwait([signum])) + + self.wait_helper(test, self.handler, signal.SIGALRM) + + @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), + 'need signal.sigwaitinfo()') + def test_sigwaitinfo(self): + def test(signum): + signal.alarm(1) + info = signal.sigwaitinfo([signum]) + self.assertEqual(signum, info.si_signo) + + self.wait_helper(test, self.handler, signal.SIGALRM) + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigtimedwait()') + def test_sigtimedwait(self): + def test(signum): + signal.alarm(1) + info = signal.sigtimedwait([signum], (10, 1000)) + self.assertEqual(signum, info.si_signo) + + self.wait_helper(test, self.handler, signal.SIGALRM) + + # check that polling with sigtimedwait works + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigtimedwait()') + def test_sigtimedwait_poll(self): + def test(signum): + self.kill(signum) + info = signal.sigtimedwait([signum], (0, 0)) + self.assertEqual(signum, info.si_signo) + + self.wait_helper(test, self.handler, signal.SIGALRM) + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigtimedwait()') + def test_sigtimedwait_timeout(self): + def test(signum): + self.assertEqual(None, signal.sigtimedwait([signum], (1, 35500))) + + self.wait_helper(test, self.handler, signal.SIGALRM) + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigtimedwait()') + def test_sigtimedwait_negative_timeout(self): + signum = signal.SIGALRM + self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, -1)) + self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1)) + self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0)) + + def alarm_handler(self, signum, frame): + self.hndl_called = True + + @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), + 'need signal.sigwaitinfo()') + def test_sigwaitinfo_interrupted(self): + def test(signum): + signal.alarm(1) + try: + signal.sigwaitinfo([signal.SIGUSR1]) + except OSError as e: + if e.errno == errno.EINTR: + self.assertTrue(self.hndl_called) + else: + self.fail("Expected EINTR to be raised by sigwaitinfo") + else: + self.fail("Expected EINTR to be raised by sigwaitinfo") + + self.wait_helper(test, self.alarm_handler, signal.SIGUSR1) + + @unittest.skipUnless(hasattr(signal, 'sigwait'), + 'need signal.sigwait()') + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + @unittest.skipIf(threading is None, "test needs threading module") + def test_sigwait_thread(self): + # Check that calling sigwait() from a thread doesn't suspend the whole + # process. A new interpreter is spawned to avoid problems when mixing + # threads and fork(): only async-safe functions are allowed between + # fork() and exec(). + assert_python_ok("-c", """if True: + import os, threading, sys, time, signal + + # the default handler terminates the process + signum = signal.SIGUSR1 + + def kill_later(): + # wait until the main thread is waiting in sigwait() + time.sleep(1) + os.kill(os.getpid(), signum) + + # the signal must be blocked by all the threads + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + killer = threading.Thread(target=kill_later) + killer.start() + received = signal.sigwait([signum]) + if received != signum: + print("sigwait() received %s, not %s" % (received, signum), + file=sys.stderr) + sys.exit(1) + killer.join() + # unblock the signal, which should have been cleared by sigwait() + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + """) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + def test_pthread_sigmask_arguments(self): + self.assertRaises(TypeError, signal.pthread_sigmask) + self.assertRaises(TypeError, signal.pthread_sigmask, 1) + self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) + self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + def test_pthread_sigmask(self): + test_blocked_signals = self.can_test_blocked_signals(False) + signum = signal.SIGUSR1 + + # Install our signal handler + old_handler = signal.signal(signum, self.handler) + self.addCleanup(signal.signal, signum, old_handler) + + # Unblock SIGUSR1 (and copy the old mask) to test our signal handler + old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask) + with self.assertRaises(ZeroDivisionError): + self.kill(signum) + + # Block and then raise SIGUSR1. The signal is blocked: the signal + # handler is not called, and the signal is now pending + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + if test_blocked_signals: + self.kill(signum) + + # Check the new mask + blocked = self.read_sigmask() + self.assertIn(signum, blocked) + self.assertEqual(old_mask ^ blocked, {signum}) + + # Unblock SIGUSR1 + if test_blocked_signals: + with self.assertRaises(ZeroDivisionError): + # unblock the pending signal calls immediatly the signal handler + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + else: + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + with self.assertRaises(ZeroDivisionError): + self.kill(signum) + + # Check the new mask + unblocked = self.read_sigmask() + self.assertNotIn(signum, unblocked) + self.assertEqual(blocked ^ unblocked, {signum}) + self.assertSequenceEqual(old_mask, unblocked) + # Finally, restore the previous signal handler and the signal mask + + def test_main(): try: - support.run_unittest(BasicSignalTests, InterProcessSignalTests, + support.run_unittest(PosixTests, InterProcessSignalTests, WakeupSignalTests, SiginterruptTest, - ItimerTest, WindowsSignalTests) + ItimerTest, WindowsSignalTests, + PendingSignalsTests) finally: support.reap_children() |
