diff options
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r-- | Lib/test/test_signal.py | 471 |
1 files changed, 406 insertions, 65 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 8df1bf0..e23b126 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,17 +1,19 @@ -import errno +import unittest +from test import support +from contextlib import closing import gc -import os import pickle import select import signal +import struct import subprocess -import sys -import time import traceback -import unittest -from test import support -from contextlib import closing +import sys, os, time, errno from test.script_helper import assert_python_ok, spawn_python +try: + import threading +except ImportError: + threading = None if sys.platform in ('os2', 'riscos'): raise unittest.SkipTest("Can't test signal on %s" % sys.platform) @@ -57,15 +59,9 @@ class InterProcessSignalTests(unittest.TestCase): def handlerA(self, signum, frame): self.a_called = True - if support.verbose: - print("handlerA invoked from signal %s at:\n%s" % ( - signum, self.format_frame(frame, limit=1))) def handlerB(self, signum, frame): self.b_called = True - if support.verbose: - print ("handlerB invoked from signal %s at:\n%s" % ( - signum, self.format_frame(frame, limit=1))) raise HandlerBCalled(signum, self.format_frame(frame)) def wait(self, child): @@ -92,8 +88,6 @@ class InterProcessSignalTests(unittest.TestCase): # Let the sub-processes know who to send signals to. pid = os.getpid() - if support.verbose: - print("test runner's pid is", pid) child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) if child: @@ -117,8 +111,6 @@ class InterProcessSignalTests(unittest.TestCase): except HandlerBCalled: self.assertTrue(self.b_called) self.assertFalse(self.a_called) - if support.verbose: - print("HandlerBCalled exception caught") child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) if child: @@ -134,8 +126,7 @@ class InterProcessSignalTests(unittest.TestCase): # may return early. time.sleep(1) except KeyboardInterrupt: - if support.verbose: - print("KeyboardInterrupt (the alarm() went off)") + pass except: self.fail("Some other exception woke us from pause: %s" % traceback.format_exc()) @@ -191,7 +182,7 @@ class InterProcessSignalTests(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") -class BasicSignalTests(unittest.TestCase): +class PosixTests(unittest.TestCase): def trivial_signal_handler(self, *args): pass @@ -233,31 +224,44 @@ class WindowsSignalTests(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class WakeupSignalTests(unittest.TestCase): - def check_wakeup(self, test_body): - # use a subprocess to have only one thread and to not change signal - # handling of the parent process + def check_wakeup(self, test_body, *signals): + # use a subprocess to have only one thread code = """if 1: import fcntl import os import signal + import struct + + signals = {!r} def handler(signum, frame): pass + def check_signum(signals): + data = os.read(read, len(signals)+1) + raised = struct.unpack('%uB' % len(data), data) + # We don't care of the signal delivery order (it's not portable or + # reliable) + raised = set(raised) + signals = set(signals) + assert raised == signals, "%r != %r" % (raised, signals) + {} signal.signal(signal.SIGALRM, handler) read, write = os.pipe() - flags = fcntl.fcntl(write, fcntl.F_GETFL, 0) - flags = flags | os.O_NONBLOCK - fcntl.fcntl(write, fcntl.F_SETFL, flags) + for fd in (read, write): + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) signal.set_wakeup_fd(write) test() + check_signum(signals) os.close(read) os.close(write) - """.format(test_body) + """.format(signals, test_body) assert_python_ok('-c', code) @@ -276,14 +280,12 @@ class WakeupSignalTests(unittest.TestCase): time.sleep(TIMEOUT_FULL) mid_time = time.time() dt = mid_time - before_time - if dt >= TIMEOUT_HALF: - raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) + assert dt < TIMEOUT_HALF, dt select.select([read], [], [], TIMEOUT_FULL) after_time = time.time() dt = after_time - mid_time - if dt >= TIMEOUT_HALF: - raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) - """) + assert dt < TIMEOUT_HALF, dt + """, signal.SIGALRM) def test_wakeup_fd_during(self): self.check_wakeup("""def test(): @@ -304,9 +306,33 @@ class WakeupSignalTests(unittest.TestCase): raise Exception("select.error not raised") after_time = time.time() dt = after_time - before_time - if dt >= TIMEOUT_HALF: - raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) - """) + assert dt < TIMEOUT_HALF, dt + """, signal.SIGALRM) + + def test_signum(self): + self.check_wakeup("""def test(): + signal.signal(signal.SIGUSR1, handler) + os.kill(os.getpid(), signal.SIGUSR1) + os.kill(os.getpid(), signal.SIGALRM) + """, signal.SIGUSR1, signal.SIGALRM) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + def test_pending(self): + self.check_wakeup("""def test(): + signum1 = signal.SIGUSR1 + signum2 = signal.SIGUSR2 + + signal.signal(signum1, handler) + signal.signal(signum2, handler) + + signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) + os.kill(os.getpid(), signum1) + os.kill(os.getpid(), signum2) + # Unblocking the 2 signals calls the C signal handler twice + signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) + """, signal.SIGUSR1, signal.SIGUSR2) + @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class SiginterruptTest(unittest.TestCase): @@ -316,9 +342,6 @@ class SiginterruptTest(unittest.TestCase): read is interrupted by the signal and raises an exception. Return False if it returns normally. """ - class Timeout(Exception): - pass - # use a subprocess to have only one thread, to have a timeout on the # blocking read and to not touch signal handling in this process code = """if 1: @@ -359,18 +382,8 @@ class SiginterruptTest(unittest.TestCase): # wait until the child process is loaded and has started first_line = process.stdout.readline() - # Wait the process with a timeout of 5 seconds - timeout = time.time() + 5.0 - while True: - if timeout < time.time(): - raise Timeout() - status = process.poll() - if status is not None: - break - time.sleep(0.1) - - stdout, stderr = process.communicate() - except Timeout: + stdout, stderr = process.communicate(timeout=5.0) + except subprocess.TimeoutExpired: process.kill() return False else: @@ -419,8 +432,6 @@ class ItimerTest(unittest.TestCase): def sig_alrm(self, *args): self.hndl_called = True - if support.verbose: - print("SIGALRM handler invoked", args) def sig_vtalrm(self, *args): self.hndl_called = True @@ -432,21 +443,13 @@ class ItimerTest(unittest.TestCase): elif self.hndl_count == 3: # disable ITIMER_VIRTUAL, this function shouldn't be called anymore signal.setitimer(signal.ITIMER_VIRTUAL, 0) - if support.verbose: - print("last SIGVTALRM handler call") self.hndl_count += 1 - if support.verbose: - print("SIGVTALRM handler invoked", args) - def sig_prof(self, *args): self.hndl_called = True signal.setitimer(signal.ITIMER_PROF, 0) - if support.verbose: - print("SIGPROF handler invoked", args) - def test_itimer_exc(self): # XXX I'm assuming -1 is an invalid itimer, but maybe some platform # defines it ? @@ -459,10 +462,7 @@ class ItimerTest(unittest.TestCase): def test_itimer_real(self): self.itimer = signal.ITIMER_REAL signal.setitimer(self.itimer, 1.0) - if support.verbose: - print("\ncall pause()...") signal.pause() - self.assertEqual(self.hndl_called, True) # Issue 3864, unknown if this affects earlier versions of freebsd also @@ -511,11 +511,352 @@ class ItimerTest(unittest.TestCase): # and the handler should have been called self.assertEqual(self.hndl_called, True) + +class PendingSignalsTests(unittest.TestCase): + """ + Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() + functions. + """ + @unittest.skipUnless(hasattr(signal, 'sigpending'), + 'need signal.sigpending()') + def test_sigpending_empty(self): + self.assertEqual(signal.sigpending(), set()) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + @unittest.skipUnless(hasattr(signal, 'sigpending'), + 'need signal.sigpending()') + def test_sigpending(self): + code = """if 1: + import os + import signal + + def handler(signum, frame): + 1/0 + + signum = signal.SIGUSR1 + signal.signal(signum, handler) + + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + os.kill(os.getpid(), signum) + pending = signal.sigpending() + assert pending == {signum}, '%s != {%s}' % (pending, signum) + try: + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + except ZeroDivisionError: + pass + else: + raise Exception("ZeroDivisionError not raised") + """ + assert_python_ok('-c', code) + + @unittest.skipUnless(hasattr(signal, 'pthread_kill'), + 'need signal.pthread_kill()') + def test_pthread_kill(self): + code = """if 1: + import signal + import threading + import sys + + signum = signal.SIGUSR1 + + def handler(signum, frame): + 1/0 + + signal.signal(signum, handler) + + if sys.platform == 'freebsd6': + # Issue #12392 and #12469: send a signal to the main thread + # doesn't work before the creation of the first thread on + # FreeBSD 6 + def noop(): + pass + thread = threading.Thread(target=noop) + thread.start() + thread.join() + + tid = threading.get_ident() + try: + signal.pthread_kill(tid, signum) + except ZeroDivisionError: + pass + else: + raise Exception("ZeroDivisionError not raised") + """ + assert_python_ok('-c', code) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + def wait_helper(self, blocked, test): + """ + test: body of the "def test(signum):" function. + blocked: number of the blocked signal + """ + code = '''if 1: + import signal + import sys + + def handler(signum, frame): + 1/0 + + %s + + blocked = %s + signum = signal.SIGALRM + + # child: block and wait the signal + try: + signal.signal(signum, handler) + signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) + + # Do the tests + test(signum) + + # The handler must not be called on unblock + try: + signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) + except ZeroDivisionError: + print("the signal handler has been called", + file=sys.stderr) + sys.exit(1) + except BaseException as err: + print("error: {}".format(err), file=sys.stderr) + sys.stderr.flush() + sys.exit(1) + ''' % (test.strip(), blocked) + + # sig*wait* must be called with the signal blocked: since the current + # process might have several threads running, use a subprocess to have + # a single thread. + assert_python_ok('-c', code) + + @unittest.skipUnless(hasattr(signal, 'sigwait'), + 'need signal.sigwait()') + def test_sigwait(self): + self.wait_helper(signal.SIGALRM, ''' + def test(signum): + signal.alarm(1) + received = signal.sigwait([signum]) + assert received == signum , 'received %s, not %s' % (received, signum) + ''') + + @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), + 'need signal.sigwaitinfo()') + def test_sigwaitinfo(self): + self.wait_helper(signal.SIGALRM, ''' + def test(signum): + signal.alarm(1) + info = signal.sigwaitinfo([signum]) + assert info.si_signo == signum, "info.si_signo != %s" % signum + ''') + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigtimedwait()') + def test_sigtimedwait(self): + self.wait_helper(signal.SIGALRM, ''' + def test(signum): + signal.alarm(1) + info = signal.sigtimedwait([signum], (10, 1000)) + assert info.si_signo == signum, 'info.si_signo != %s' % signum + ''') + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigtimedwait()') + # issue #12303: sigtimedwait() takes 30 seconds on FreeBSD 6 (kernel bug) + @unittest.skipIf(sys.platform =='freebsd6', + "sigtimedwait() with a null timeout doens't work on FreeBSD 6") + def test_sigtimedwait_poll(self): + # check that polling with sigtimedwait works + self.wait_helper(signal.SIGALRM, ''' + def test(signum): + import os + os.kill(os.getpid(), signum) + info = signal.sigtimedwait([signum], (0, 0)) + assert info.si_signo == signum, 'info.si_signo != %s' % signum + ''') + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigtimedwait()') + def test_sigtimedwait_timeout(self): + self.wait_helper(signal.SIGALRM, ''' + def test(signum): + received = signal.sigtimedwait([signum], (1, 0)) + assert received is None, "received=%r" % (received,) + ''') + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigtimedwait()') + def test_sigtimedwait_negative_timeout(self): + signum = signal.SIGALRM + self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, -1)) + self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1)) + self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0)) + + @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), + 'need signal.sigwaitinfo()') + def test_sigwaitinfo_interrupted(self): + self.wait_helper(signal.SIGUSR1, ''' + def test(signum): + import errno + + hndl_called = True + def alarm_handler(signum, frame): + hndl_called = False + + signal.signal(signal.SIGALRM, alarm_handler) + signal.alarm(1) + try: + signal.sigwaitinfo([signal.SIGUSR1]) + except OSError as e: + if e.errno == errno.EINTR: + assert hndl_called, "SIGALRM handler not called" + else: + raise Exception("Expected EINTR to be raised by sigwaitinfo") + else: + raise Exception("Expected EINTR to be raised by sigwaitinfo") + ''') + + @unittest.skipUnless(hasattr(signal, 'sigwait'), + 'need signal.sigwait()') + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + @unittest.skipIf(threading is None, "test needs threading module") + def test_sigwait_thread(self): + # Check that calling sigwait() from a thread doesn't suspend the whole + # process. A new interpreter is spawned to avoid problems when mixing + # threads and fork(): only async-safe functions are allowed between + # fork() and exec(). + assert_python_ok("-c", """if True: + import os, threading, sys, time, signal + + # the default handler terminates the process + signum = signal.SIGUSR1 + + def kill_later(): + # wait until the main thread is waiting in sigwait() + time.sleep(1) + os.kill(os.getpid(), signum) + + # the signal must be blocked by all the threads + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + killer = threading.Thread(target=kill_later) + killer.start() + received = signal.sigwait([signum]) + if received != signum: + print("sigwait() received %s, not %s" % (received, signum), + file=sys.stderr) + sys.exit(1) + killer.join() + # unblock the signal, which should have been cleared by sigwait() + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + """) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + def test_pthread_sigmask_arguments(self): + self.assertRaises(TypeError, signal.pthread_sigmask) + self.assertRaises(TypeError, signal.pthread_sigmask, 1) + self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) + self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) + + @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') + def test_pthread_sigmask(self): + code = """if 1: + import signal + import os; import threading + + def handler(signum, frame): + 1/0 + + def kill(signum): + os.kill(os.getpid(), signum) + + def read_sigmask(): + return signal.pthread_sigmask(signal.SIG_BLOCK, []) + + signum = signal.SIGUSR1 + + # Install our signal handler + old_handler = signal.signal(signum, handler) + + # Unblock SIGUSR1 (and copy the old mask) to test our signal handler + old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + try: + kill(signum) + except ZeroDivisionError: + pass + else: + raise Exception("ZeroDivisionError not raised") + + # Block and then raise SIGUSR1. The signal is blocked: the signal + # handler is not called, and the signal is now pending + signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + kill(signum) + + # Check the new mask + blocked = read_sigmask() + assert signum in blocked, "%s not in %s" % (signum, blocked) + assert old_mask ^ blocked == {signum}, "%s ^ %s != {%s}" % (old_mask, blocked, signum) + + # Unblock SIGUSR1 + try: + # unblock the pending signal calls immediatly the signal handler + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + except ZeroDivisionError: + pass + else: + raise Exception("ZeroDivisionError not raised") + try: + kill(signum) + except ZeroDivisionError: + pass + else: + raise Exception("ZeroDivisionError not raised") + + # Check the new mask + unblocked = read_sigmask() + assert signum not in unblocked, "%s in %s" % (signum, unblocked) + assert blocked ^ unblocked == {signum}, "%s ^ %s != {%s}" % (blocked, unblocked, signum) + assert old_mask == unblocked, "%s != %s" % (old_mask, unblocked) + """ + assert_python_ok('-c', code) + + @unittest.skipIf(sys.platform == 'freebsd6', + "issue #12392: send a signal to the main thread doesn't work " + "before the creation of the first thread on FreeBSD 6") + @unittest.skipUnless(hasattr(signal, 'pthread_kill'), + 'need signal.pthread_kill()') + def test_pthread_kill_main_thread(self): + # Test that a signal can be sent to the main thread with pthread_kill() + # before any other thread has been created (see issue #12392). + code = """if True: + import threading + import signal + import sys + + def handler(signum, frame): + sys.exit(3) + + signal.signal(signal.SIGUSR1, handler) + signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) + sys.exit(2) + """ + + with spawn_python('-c', code) as process: + stdout, stderr = process.communicate() + exitcode = process.wait() + if exitcode != 3: + raise Exception("Child error (exit code %s): %s" % + (exitcode, stdout)) + + def test_main(): try: - support.run_unittest(BasicSignalTests, InterProcessSignalTests, + support.run_unittest(PosixTests, InterProcessSignalTests, WakeupSignalTests, SiginterruptTest, - ItimerTest, WindowsSignalTests) + ItimerTest, WindowsSignalTests, + PendingSignalsTests) finally: support.reap_children() |