diff options
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r-- | Lib/test/test_signal.py | 333 |
1 files changed, 172 insertions, 161 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 15c6e9c..c1df51e 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,167 +1,178 @@ -# Test the signal module -from test.test_support import verbose, TestSkipped, TestFailed, vereq +import unittest +from test import test_support import signal import os, sys, time -if sys.platform[:3] in ('win', 'os2'): - raise TestSkipped, "Can't test signal on %s" % sys.platform - -MAX_DURATION = 20 # Entire test should last at most 20 sec. - -if verbose: - x = '-x' -else: - x = '+x' - -pid = os.getpid() -if verbose: - print("test runner's pid is", pid) - -# Shell script that will send us asynchronous signals -script = """ - ( - set %(x)s - sleep 2 - kill -HUP %(pid)d - sleep 2 - kill -USR1 %(pid)d - sleep 2 - kill -USR2 %(pid)d - ) & -""" % vars() - -a_called = b_called = False - -def handlerA(*args): - global a_called - a_called = True - if verbose: - print("handlerA invoked", args) - class HandlerBCalled(Exception): pass -def handlerB(*args): - global b_called - b_called = True - if verbose: - print("handlerB invoked", args) - raise HandlerBCalled, args - -# Set up a child to send signals to us (the parent) after waiting long -# enough to receive the alarm. It seems we miss the alarm for some -# reason. This will hopefully stop the hangs on Tru64/Alpha. -# Alas, it doesn't. Tru64 appears to miss all the signals at times, or -# seemingly random subsets of them, and nothing done in force_test_exit -# so far has actually helped. -def force_test_exit(): - # Sigh, both imports seem necessary to avoid errors. - import os - fork_pid = os.fork() - if fork_pid: - # In parent. - return fork_pid - - # In child. - import os, time - try: - # Wait 5 seconds longer than the expected alarm to give enough - # time for the normal sequence of events to occur. This is - # just a stop-gap to try to prevent the test from hanging. - time.sleep(MAX_DURATION + 5) - print(' child should not have to kill parent', file=sys.__stdout__) - for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM": - os.kill(pid, getattr(signal, signame)) - print(" child sent", signame, "to", pid, file=sys.__stdout__) - time.sleep(1) - finally: - os._exit(0) - -# Install handlers. -hup = signal.signal(signal.SIGHUP, handlerA) -usr1 = signal.signal(signal.SIGUSR1, handlerB) -usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN) -alrm = signal.signal(signal.SIGALRM, signal.default_int_handler) - -try: - - signal.alarm(MAX_DURATION) - vereq(signal.getsignal(signal.SIGHUP), handlerA) - vereq(signal.getsignal(signal.SIGUSR1), handlerB) - vereq(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN) - vereq(signal.getsignal(signal.SIGALRM), signal.default_int_handler) - - # Try to ensure this test exits even if there is some problem with alarm. - # Tru64/Alpha often hangs and is ultimately killed by the buildbot. - fork_pid = force_test_exit() - - try: - signal.getsignal(4242) - raise TestFailed('expected ValueError for invalid signal # to ' - 'getsignal()') - except ValueError: - pass - - try: - signal.signal(4242, handlerB) - raise TestFailed('expected ValueError for invalid signal # to ' - 'signal()') - except ValueError: - pass - - try: - signal.signal(signal.SIGUSR1, None) - raise TestFailed('expected TypeError for non-callable') - except TypeError: - pass - - # Launch an external script to send us signals. - # We expect the external script to: - # send HUP, which invokes handlerA to set a_called - # send USR1, which invokes handlerB to set b_called and raise - # HandlerBCalled - # send USR2, which is ignored - # - # Then we expect the alarm to go off, and its handler raises - # KeyboardInterrupt, finally getting us out of the loop. - os.system(script) - try: - print("starting pause() loop...") - while 1: - try: - if verbose: - print("call pause()...") - signal.pause() - if verbose: - print("pause() returned") - except HandlerBCalled: - if verbose: - print("HandlerBCalled exception caught") - - except KeyboardInterrupt: - if verbose: - print("KeyboardInterrupt (the alarm() went off)") - - if not a_called: - print('HandlerA not called') - - if not b_called: - print('HandlerB not called') - -finally: - # Forcibly kill the child we created to ping us if there was a test error. - try: - # Make sure we don't kill ourself if there was a fork error. - if fork_pid > 0: - os.kill(fork_pid, signal.SIGKILL) - except: - # If the child killed us, it has probably exited. Killing a - # non-existent process will raise an error which we don't care about. - pass - - # Restore handlers. - signal.alarm(0) # cancel alarm in case we died early - signal.signal(signal.SIGHUP, hup) - signal.signal(signal.SIGUSR1, usr1) - signal.signal(signal.SIGUSR2, usr2) - signal.signal(signal.SIGALRM, alrm) +class InterProcessSignalTests(unittest.TestCase): + MAX_DURATION = 20 # Entire test should last at most 20 sec. + + # Set up a child to send signals to us (the parent) after waiting + # long enough to receive the alarm. It seems we miss the alarm + # for some reason. This will hopefully stop the hangs on + # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the + # signals at times, or seemingly random subsets of them, and + # nothing done in force_test_exit so far has actually helped. + def spawn_force_test_exit_process(self, parent_pid): + # Sigh, both imports seem necessary to avoid errors. + import os + fork_pid = os.fork() + if fork_pid: + # In parent. + return fork_pid + + # In child. + import os, time + try: + # Wait 5 seconds longer than the expected alarm to give enough + # time for the normal sequence of events to occur. This is + # just a stop-gap to try to prevent the test from hanging. + time.sleep(self.MAX_DURATION + 5) + print(" child should not have to kill parent", + file=sys.__stdout__) + for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM": + os.kill(parent_pid, getattr(signal, signame)) + print(" child sent", signame, "to", + parent_pid, file=sys.__stdout__) + time.sleep(1) + finally: + os._exit(0) + + def handlerA(self, *args): + self.a_called = True + if test_support.verbose: + print("handlerA invoked", args) + + def handlerB(self, *args): + self.b_called = True + if test_support.verbose: + print("handlerB invoked", args) + raise HandlerBCalled(*args) + + def test_main(self): + self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA) + self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB) + self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN) + self.assertEquals(signal.getsignal(signal.SIGALRM), + signal.default_int_handler) + + # Launch an external script to send us signals. + # We expect the external script to: + # send HUP, which invokes handlerA to set a_called + # send USR1, which invokes handlerB to set b_called and raise + # HandlerBCalled + # send USR2, which is ignored + # + # Then we expect the alarm to go off, and its handler raises + # KeyboardInterrupt, finally getting us out of the loop. + + if test_support.verbose: + verboseflag = '-x' + else: + verboseflag = '+x' + + pid = self.pid + if test_support.verbose: + print("test runner's pid is", pid) + + # Shell script that will send us asynchronous signals + script = """ + ( + set %(verboseflag)s + sleep 2 + kill -HUP %(pid)d + sleep 2 + kill -USR1 %(pid)d + sleep 2 + kill -USR2 %(pid)d + ) & + """ % vars() + + signal.alarm(self.MAX_DURATION) + + handler_b_exception_raised = False + + os.system(script) + try: + if test_support.verbose: + print("starting pause() loop...") + while 1: + try: + if test_support.verbose: + print("call pause()...") + signal.pause() + if test_support.verbose: + print("pause() returned") + except HandlerBCalled: + handler_b_exception_raised = True + if test_support.verbose: + print("HandlerBCalled exception caught") + + except KeyboardInterrupt: + if test_support.verbose: + print("KeyboardInterrupt (the alarm() went off)") + + self.assert_(self.a_called) + self.assert_(self.b_called) + self.assert_(handler_b_exception_raised) + + def setUp(self): + # Install handlers. + self.hup = signal.signal(signal.SIGHUP, self.handlerA) + self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB) + self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN) + self.alrm = signal.signal(signal.SIGALRM, + signal.default_int_handler) + self.a_called = False + self.b_called = False + self.pid = os.getpid() + self.fork_pid = self.spawn_force_test_exit_process(self.pid) + + def tearDown(self): + # Forcibly kill the child we created to ping us if there was a + # test error. + try: + # Make sure we don't kill ourself if there was a fork + # error. + if self.fork_pid > 0: + os.kill(self.fork_pid, signal.SIGKILL) + except: + # If the child killed us, it has probably exited. Killing + # a non-existent process will raise an error which we + # don't care about. + pass + + # Restore handlers. + signal.alarm(0) # cancel alarm in case we died early + signal.signal(signal.SIGHUP, self.hup) + signal.signal(signal.SIGUSR1, self.usr1) + signal.signal(signal.SIGUSR2, self.usr2) + signal.signal(signal.SIGALRM, self.alrm) + + +class BasicSignalTests(unittest.TestCase): + def test_out_of_range_signal_number_raises_error(self): + self.assertRaises(ValueError, signal.getsignal, 4242) + + def trivial_signal_handler(*args): + pass + + self.assertRaises(ValueError, signal.signal, 4242, + trivial_signal_handler) + + def test_setting_signal_handler_to_none_raises_error(self): + self.assertRaises(TypeError, signal.signal, + signal.SIGUSR1, None) + +def test_main(): + if sys.platform[:3] in ('win', 'os2'): + raise test_support.TestSkipped("Can't test signal on %s" % \ + sys.platform) + + test_support.run_unittest(BasicSignalTests, InterProcessSignalTests) + + +if __name__ == "__main__": + test_main() |