diff options
Diffstat (limited to 'Lib/test/test_signal.py')
| -rw-r--r-- | Lib/test/test_signal.py | 462 | 
1 files changed, 348 insertions, 114 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index f64bd4c..8a5a408 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -5,9 +5,15 @@ import gc  import pickle  import select  import signal +import struct  import subprocess  import traceback  import sys, os, time, errno +from test.script_helper import assert_python_ok, spawn_python +try: +    import threading +except ImportError: +    threading = None  if sys.platform in ('os2', 'riscos'):      raise unittest.SkipTest("Can't test signal on %s" % sys.platform) @@ -53,15 +59,9 @@ class InterProcessSignalTests(unittest.TestCase):      def handlerA(self, signum, frame):          self.a_called = True -        if support.verbose: -            print("handlerA invoked from signal %s at:\n%s" % ( -                signum, self.format_frame(frame, limit=1)))      def handlerB(self, signum, frame):          self.b_called = True -        if support.verbose: -            print ("handlerB invoked from signal %s at:\n%s" % ( -                signum, self.format_frame(frame, limit=1)))          raise HandlerBCalled(signum, self.format_frame(frame))      def wait(self, child): @@ -88,8 +88,6 @@ class InterProcessSignalTests(unittest.TestCase):          # Let the sub-processes know who to send signals to.          pid = os.getpid() -        if support.verbose: -            print("test runner's pid is", pid)          child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])          if child: @@ -113,8 +111,6 @@ class InterProcessSignalTests(unittest.TestCase):          except HandlerBCalled:              self.assertTrue(self.b_called)              self.assertFalse(self.a_called) -            if support.verbose: -                print("HandlerBCalled exception caught")          child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])          if child: @@ -130,8 +126,7 @@ class InterProcessSignalTests(unittest.TestCase):              # may return early.              time.sleep(1)          except KeyboardInterrupt: -            if support.verbose: -                print("KeyboardInterrupt (the alarm() went off)") +            pass          except:              self.fail("Some other exception woke us from pause: %s" %                        traceback.format_exc()) @@ -187,7 +182,7 @@ class InterProcessSignalTests(unittest.TestCase):  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") -class BasicSignalTests(unittest.TestCase): +class PosixTests(unittest.TestCase):      def trivial_signal_handler(self, *args):          pass @@ -232,6 +227,18 @@ class WakeupSignalTests(unittest.TestCase):      TIMEOUT_FULL = 10      TIMEOUT_HALF = 5 +    def handler(self, signum, frame): +        pass + +    def check_signum(self, *signals): +        data = os.read(self.read, len(signals)+1) +        raised = struct.unpack('%uB' % len(data), data) +        # We don't care of the signal delivery order (it's not portable or +        # reliable) +        raised = set(raised) +        signals = set(signals) +        self.assertEqual(raised, signals) +      def test_wakeup_fd_early(self):          import select @@ -245,6 +252,7 @@ class WakeupSignalTests(unittest.TestCase):          select.select([self.read], [], [], self.TIMEOUT_FULL)          after_time = time.time()          self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) +        self.check_signum(signal.SIGALRM)      def test_wakeup_fd_during(self):          import select @@ -256,11 +264,41 @@ class WakeupSignalTests(unittest.TestCase):              [self.read], [], [], self.TIMEOUT_FULL)          after_time = time.time()          self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) +        self.check_signum(signal.SIGALRM) + +    def test_signum(self): +        old_handler = signal.signal(signal.SIGUSR1, self.handler) +        self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) +        os.kill(os.getpid(), signal.SIGUSR1) +        os.kill(os.getpid(), signal.SIGALRM) +        self.check_signum(signal.SIGUSR1, signal.SIGALRM) + +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), +                         'need signal.pthread_sigmask()') +    @unittest.skipUnless(hasattr(signal, 'pthread_kill'), +                         'need signal.pthread_kill()') +    def test_pending(self): +        signum1 = signal.SIGUSR1 +        signum2 = signal.SIGUSR2 +        tid = threading.current_thread().ident + +        old_handler = signal.signal(signum1, self.handler) +        self.addCleanup(signal.signal, signum1, old_handler) +        old_handler = signal.signal(signum2, self.handler) +        self.addCleanup(signal.signal, signum2, old_handler) + +        signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) +        signal.pthread_kill(tid, signum1) +        signal.pthread_kill(tid, signum2) +        # Unblocking the 2 signals calls the C signal handler twice +        signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) + +        self.check_signum(signum1, signum2)      def setUp(self):          import fcntl -        self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) +        self.alrm = signal.signal(signal.SIGALRM, self.handler)          self.read, self.write = os.pipe()          flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)          flags = flags | os.O_NONBLOCK @@ -276,103 +314,83 @@ class WakeupSignalTests(unittest.TestCase):  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")  class SiginterruptTest(unittest.TestCase): -    def setUp(self): -        """Install a no-op signal handler that can be set to allow -        interrupts or not, and arrange for the original signal handler to be -        re-installed when the test is finished. -        """ -        self.signum = signal.SIGUSR1 -        oldhandler = signal.signal(self.signum, lambda x,y: None) -        self.addCleanup(signal.signal, self.signum, oldhandler) - -    def readpipe_interrupted(self): +    def readpipe_interrupted(self, interrupt):          """Perform a read during which a signal will arrive.  Return True if the          read is interrupted by the signal and raises an exception.  Return False          if it returns normally.          """ -        # Create a pipe that can be used for the read.  Also clean it up -        # when the test is over, since nothing else will (but see below for -        # the write end). -        r, w = os.pipe() -        self.addCleanup(os.close, r) - -        # Create another process which can send a signal to this one to try -        # to interrupt the read. -        ppid = os.getpid() -        pid = os.fork() - -        if pid == 0: -            # Child code: sleep to give the parent enough time to enter the -            # read() call (there's a race here, but it's really tricky to -            # eliminate it); then signal the parent process.  Also, sleep -            # again to make it likely that the signal is delivered to the -            # parent process before the child exits.  If the child exits -            # first, the write end of the pipe will be closed and the test -            # is invalid. -            try: -                time.sleep(0.2) -                os.kill(ppid, self.signum) -                time.sleep(0.2) -            finally: -                # No matter what, just exit as fast as possible now. -                exit_subprocess() -        else: -            # Parent code. -            # Make sure the child is eventually reaped, else it'll be a -            # zombie for the rest of the test suite run. -            self.addCleanup(os.waitpid, pid, 0) - -            # Close the write end of the pipe.  The child has a copy, so -            # it's not really closed until the child exits.  We need it to -            # close when the child exits so that in the non-interrupt case -            # the read eventually completes, otherwise we could just close -            # it *after* the test. -            os.close(w) - -            # Try the read and report whether it is interrupted or not to -            # the caller. +        # use a subprocess to have only one thread, to have a timeout on the +        # blocking read and to not touch signal handling in this process +        code = """if 1: +            import errno +            import os +            import signal +            import sys + +            interrupt = %r +            r, w = os.pipe() + +            def handler(signum, frame): +                pass + +            print("ready") +            sys.stdout.flush() + +            signal.signal(signal.SIGALRM, handler) +            if interrupt is not None: +                signal.siginterrupt(signal.SIGALRM, interrupt) + +            # run the test twice +            for loop in range(2): +                # send a SIGALRM in a second (during the read) +                signal.alarm(1) +                try: +                    # blocking call: read from a pipe without data +                    os.read(r, 1) +                except OSError as err: +                    if err.errno != errno.EINTR: +                        raise +                else: +                    sys.exit(2) +            sys.exit(3) +        """ % (interrupt,) +        with spawn_python('-c', code) as process:              try: -                d = os.read(r, 1) +                # wait until the child process is loaded and has started +                first_line = process.stdout.readline() + +                stdout, stderr = process.communicate(timeout=3.0) +            except subprocess.TimeoutExpired: +                process.kill()                  return False -            except OSError as err: -                if err.errno != errno.EINTR: -                    raise -                return True +            else: +                stdout = first_line + stdout +                exitcode = process.wait() +                if exitcode not in (2, 3): +                    raise Exception("Child error (exit code %s): %s" +                                    % (exitcode, stdout)) +                return (exitcode == 3)      def test_without_siginterrupt(self): -        """If a signal handler is installed and siginterrupt is not called -        at all, when that signal arrives, it interrupts a syscall that's in -        progress. -        """ -        i = self.readpipe_interrupted() -        self.assertTrue(i) -        # Arrival of the signal shouldn't have changed anything. -        i = self.readpipe_interrupted() -        self.assertTrue(i) +        # If a signal handler is installed and siginterrupt is not called +        # at all, when that signal arrives, it interrupts a syscall that's in +        # progress. +        interrupted = self.readpipe_interrupted(None) +        self.assertTrue(interrupted)      def test_siginterrupt_on(self): -        """If a signal handler is installed and siginterrupt is called with -        a true value for the second argument, when that signal arrives, it -        interrupts a syscall that's in progress. -        """ -        signal.siginterrupt(self.signum, 1) -        i = self.readpipe_interrupted() -        self.assertTrue(i) -        # Arrival of the signal shouldn't have changed anything. -        i = self.readpipe_interrupted() -        self.assertTrue(i) +        # If a signal handler is installed and siginterrupt is called with +        # a true value for the second argument, when that signal arrives, it +        # interrupts a syscall that's in progress. +        interrupted = self.readpipe_interrupted(True) +        self.assertTrue(interrupted)      def test_siginterrupt_off(self): -        """If a signal handler is installed and siginterrupt is called with -        a false value for the second argument, when that signal arrives, it -        does not interrupt a syscall that's in progress. -        """ -        signal.siginterrupt(self.signum, 0) -        i = self.readpipe_interrupted() -        self.assertFalse(i) -        # Arrival of the signal shouldn't have changed anything. -        i = self.readpipe_interrupted() -        self.assertFalse(i) +        # If a signal handler is installed and siginterrupt is called with +        # a false value for the second argument, when that signal arrives, it +        # does not interrupt a syscall that's in progress. +        interrupted = self.readpipe_interrupted(False) +        self.assertFalse(interrupted)  @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") @@ -391,8 +409,6 @@ class ItimerTest(unittest.TestCase):      def sig_alrm(self, *args):          self.hndl_called = True -        if support.verbose: -            print("SIGALRM handler invoked", args)      def sig_vtalrm(self, *args):          self.hndl_called = True @@ -404,21 +420,13 @@ class ItimerTest(unittest.TestCase):          elif self.hndl_count == 3:              # disable ITIMER_VIRTUAL, this function shouldn't be called anymore              signal.setitimer(signal.ITIMER_VIRTUAL, 0) -            if support.verbose: -                print("last SIGVTALRM handler call")          self.hndl_count += 1 -        if support.verbose: -            print("SIGVTALRM handler invoked", args) -      def sig_prof(self, *args):          self.hndl_called = True          signal.setitimer(signal.ITIMER_PROF, 0) -        if support.verbose: -            print("SIGPROF handler invoked", args) -      def test_itimer_exc(self):          # XXX I'm assuming -1 is an invalid itimer, but maybe some platform          # defines it ? @@ -431,10 +439,7 @@ class ItimerTest(unittest.TestCase):      def test_itimer_real(self):          self.itimer = signal.ITIMER_REAL          signal.setitimer(self.itimer, 1.0) -        if support.verbose: -            print("\ncall pause()...")          signal.pause() -          self.assertEqual(self.hndl_called, True)      # Issue 3864, unknown if this affects earlier versions of freebsd also @@ -483,11 +488,240 @@ class ItimerTest(unittest.TestCase):          # and the handler should have been called          self.assertEqual(self.hndl_called, True) + +class PendingSignalsTests(unittest.TestCase): +    """ +    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() +    functions. +    """ +    def setUp(self): +        self.has_pthread_kill = hasattr(signal, 'pthread_kill') + +    def handler(self, signum, frame): +        1/0 + +    def read_sigmask(self): +        return signal.pthread_sigmask(signal.SIG_BLOCK, []) + +    def can_test_blocked_signals(self, skip): +        """ +        Check if a blocked signal can be raised to the main thread without +        calling its signal handler. We need pthread_kill() or exactly one +        thread (the main thread). + +        Return True if it's possible. Otherwise, return False and print a +        warning if skip is False, or raise a SkipTest exception if skip is +        True. +        """ +        if self.has_pthread_kill: +            return True + +        # The fault handler timeout thread masks all signals. If the main +        # thread masks also SIGUSR1, all threads mask this signal. In this +        # case, if we send SIGUSR1 to the process, the signal is pending in the +        # main or the faulthandler timeout thread.  Unblock SIGUSR1 in the main +        # thread calls the signal handler only if the signal is pending for the +        # main thread. Stop the faulthandler timeout thread to workaround this +        # problem. +        import faulthandler +        faulthandler.cancel_dump_tracebacks_later() + +        # Issue #11998: The _tkinter module loads the Tcl library which +        # creates a thread waiting events in select(). This thread receives +        # signals blocked by all other threads. We cannot test blocked +        # signals +        if '_tkinter' in sys.modules: +            message = ("_tkinter is loaded and pthread_kill() is missing, " +                       "cannot test blocked signals (issue #11998)") +            if skip: +                self.skipTest(message) +            else: +                print("WARNING: %s" % message) +            return False +        return True + +    def kill(self, signum): +        if self.has_pthread_kill: +            tid = threading.get_ident() +            signal.pthread_kill(tid, signum) +        else: +            pid = os.getpid() +            os.kill(pid, signum) + +    @unittest.skipUnless(hasattr(signal, 'sigpending'), +                         'need signal.sigpending()') +    def test_sigpending_empty(self): +        self.assertEqual(signal.sigpending(), set()) + +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), +                         'need signal.pthread_sigmask()') +    @unittest.skipUnless(hasattr(signal, 'sigpending'), +                         'need signal.sigpending()') +    def test_sigpending(self): +        self.can_test_blocked_signals(True) + +        signum = signal.SIGUSR1 +        old_handler = signal.signal(signum, self.handler) +        self.addCleanup(signal.signal, signum, old_handler) + +        signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) +        self.kill(signum) +        self.assertEqual(signal.sigpending(), {signum}) +        with self.assertRaises(ZeroDivisionError): +            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + +    @unittest.skipUnless(hasattr(signal, 'pthread_kill'), +                         'need signal.pthread_kill()') +    def test_pthread_kill(self): +        signum = signal.SIGUSR1 +        current = threading.get_ident() + +        old_handler = signal.signal(signum, self.handler) +        self.addCleanup(signal.signal, signum, old_handler) + +        with self.assertRaises(ZeroDivisionError): +            signal.pthread_kill(current, signum) + +    @unittest.skipUnless(hasattr(signal, 'sigwait'), +                         'need signal.sigwait()') +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), +                         'need signal.pthread_sigmask()') +    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') +    def test_sigwait(self): +        def test(signum): +            signal.alarm(1) +            received = signal.sigwait([signum]) +            if received != signum: +                print("sigwait() received %s, not %s" +                      % (received, signum), +                      file=sys.stderr) +                os._exit(1) + +        signum = signal.SIGALRM + +        # sigwait must be called with the signal blocked: since the current +        # process might have several threads running, we fork() a child process +        # to have a single thread. +        pid = os.fork() +        if pid == 0: +            # child: block and wait the signal +            try: +                signal.signal(signum, self.handler) +                signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + +                # Do the tests +                test(signum) + +                # The handler must not be called on unblock +                try: +                    signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) +                except ZeroDivisionError: +                    print("the signal handler has been called", +                          file=sys.stderr) +                    os._exit(1) +            except BaseException as err: +                print("error: {}".format(err), file=sys.stderr) +                os._exit(1) +            else: +                os._exit(0) +        else: +            # parent: check that the child correcty received the signal +            self.assertEqual(os.waitpid(pid, 0), (pid, 0)) + +    @unittest.skipUnless(hasattr(signal, 'sigwait'), +                         'need signal.sigwait()') +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), +                         'need signal.pthread_sigmask()') +    @unittest.skipIf(threading is None, "test needs threading module") +    def test_sigwait_thread(self): +        # Check that calling sigwait() from a thread doesn't suspend the whole +        # process. A new interpreter is spawned to avoid problems when mixing +        # threads and fork(): only async-safe functions are allowed between +        # fork() and exec(). +        assert_python_ok("-c", """if True: +            import os, threading, sys, time, signal + +            # the default handler terminates the process +            signum = signal.SIGUSR1 + +            def kill_later(): +                # wait until the main thread is waiting in sigwait() +                time.sleep(1) +                os.kill(os.getpid(), signum) + +            # the signal must be blocked by all the threads +            signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) +            killer = threading.Thread(target=kill_later) +            killer.start() +            received = signal.sigwait([signum]) +            if received != signum: +                print("sigwait() received %s, not %s" % (received, signum), +                      file=sys.stderr) +                sys.exit(1) +            killer.join() +            # unblock the signal, which should have been cleared by sigwait() +            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) +        """) + +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), +                         'need signal.pthread_sigmask()') +    def test_pthread_sigmask_arguments(self): +        self.assertRaises(TypeError, signal.pthread_sigmask) +        self.assertRaises(TypeError, signal.pthread_sigmask, 1) +        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) +        self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) + +    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), +                         'need signal.pthread_sigmask()') +    def test_pthread_sigmask(self): +        test_blocked_signals = self.can_test_blocked_signals(False) +        signum = signal.SIGUSR1 + +        # Install our signal handler +        old_handler = signal.signal(signum, self.handler) +        self.addCleanup(signal.signal, signum, old_handler) + +        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler +        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) +        self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask) +        with self.assertRaises(ZeroDivisionError): +            self.kill(signum) + +        # Block and then raise SIGUSR1. The signal is blocked: the signal +        # handler is not called, and the signal is now pending +        signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) +        if test_blocked_signals: +            self.kill(signum) + +        # Check the new mask +        blocked = self.read_sigmask() +        self.assertIn(signum, blocked) +        self.assertEqual(old_mask ^ blocked, {signum}) + +        # Unblock SIGUSR1 +        if test_blocked_signals: +            with self.assertRaises(ZeroDivisionError): +                # unblock the pending signal calls immediatly the signal handler +                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) +        else: +            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) +        with self.assertRaises(ZeroDivisionError): +            self.kill(signum) + +        # Check the new mask +        unblocked = self.read_sigmask() +        self.assertNotIn(signum, unblocked) +        self.assertEqual(blocked ^ unblocked, {signum}) +        self.assertSequenceEqual(old_mask, unblocked) +        # Finally, restore the previous signal handler and the signal mask + +  def test_main():      try: -        support.run_unittest(BasicSignalTests, InterProcessSignalTests, +        support.run_unittest(PosixTests, InterProcessSignalTests,                               WakeupSignalTests, SiginterruptTest, -                             ItimerTest, WindowsSignalTests) +                             ItimerTest, WindowsSignalTests, +                             PendingSignalsTests)      finally:          support.reap_children()  | 
