summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_signal.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r--Lib/test/test_signal.py241
1 files changed, 119 insertions, 122 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 1d25814..2834076 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -1,6 +1,11 @@
import unittest
from test import test_support
+from contextlib import closing, nested
+import pickle
+import select
import signal
+import subprocess
+import traceback
import sys, os, time, errno
if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
@@ -11,40 +16,20 @@ if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
class HandlerBCalled(Exception):
pass
+
+def exit_subprocess():
+ """Use os._exit(0) to exit the current subprocess.
+
+ Otherwise, the test catches the SystemExit and continues executing
+ in parallel with the original test, so you wind up with an
+ exponential number of tests running concurrently.
+ """
+ os._exit(0)
+
+
class InterProcessSignalTests(unittest.TestCase):
MAX_DURATION = 20 # Entire test should last at most 20 sec.
- # Set up a child to send signals to us (the parent) after waiting
- # long enough to receive the alarm. It seems we miss the alarm
- # for some reason. This will hopefully stop the hangs on
- # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the
- # signals at times, or seemingly random subsets of them, and
- # nothing done in force_test_exit so far has actually helped.
- def spawn_force_test_exit_process(self, parent_pid):
- # Sigh, both imports seem necessary to avoid errors.
- import os
- fork_pid = os.fork()
- if fork_pid:
- # In parent.
- return fork_pid
-
- # In child.
- import os, time
- try:
- # Wait 5 seconds longer than the expected alarm to give enough
- # time for the normal sequence of events to occur. This is
- # just a stop-gap to try to prevent the test from hanging.
- time.sleep(self.MAX_DURATION + 5)
- print(" child should not have to kill parent",
- file=sys.__stdout__)
- for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
- os.kill(parent_pid, getattr(signal, signame))
- print(" child sent", signame, "to",
- parent_pid, file=sys.__stdout__)
- time.sleep(1)
- finally:
- os._exit(0)
-
def handlerA(self, *args):
self.a_called = True
if test_support.verbose:
@@ -56,121 +41,133 @@ class InterProcessSignalTests(unittest.TestCase):
print("handlerB invoked", args)
raise HandlerBCalled(*args)
- def test_main(self):
- self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA)
- self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB)
- self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
- self.assertEquals(signal.getsignal(signal.SIGALRM),
- signal.default_int_handler)
-
- # Launch an external script to send us signals.
- # We expect the external script to:
- # send HUP, which invokes handlerA to set a_called
- # send USR1, which invokes handlerB to set b_called and raise
- # HandlerBCalled
- # send USR2, which is ignored
- #
- # Then we expect the alarm to go off, and its handler raises
- # KeyboardInterrupt, finally getting us out of the loop.
+ def wait(self, child):
+ """Wait for child to finish, ignoring EINTR."""
+ while True:
+ try:
+ child.wait()
+ return
+ except OSError as e:
+ if e.errno != errno.EINTR:
+ raise
- if test_support.verbose:
- verboseflag = '-x'
- else:
- verboseflag = '+x'
+ def run_test(self):
+ # Install handlers. This function runs in a sub-process, so we
+ # don't worry about re-setting the default handlers.
+ signal.signal(signal.SIGHUP, self.handlerA)
+ signal.signal(signal.SIGUSR1, self.handlerB)
+ signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+ signal.signal(signal.SIGALRM, signal.default_int_handler)
+
+ # Variables the signals will modify:
+ self.a_called = False
+ self.b_called = False
- pid = self.pid
+ # Let the sub-processes know who to send signals to.
+ pid = os.getpid()
if test_support.verbose:
print("test runner's pid is", pid)
- # Shell script that will send us asynchronous signals
- script = """
- (
- set %(verboseflag)s
- sleep 2
- kill -HUP %(pid)d
- sleep 2
- kill -USR1 %(pid)d
- sleep 2
- kill -USR2 %(pid)d
- ) &
- """ % vars()
-
- signal.alarm(self.MAX_DURATION)
-
- handler_b_exception_raised = False
+ child = subprocess.Popen(['kill', '-HUP', str(pid)])
+ self.wait(child)
+ self.assertTrue(self.a_called)
+ self.assertFalse(self.b_called)
+ self.a_called = False
- os.system(script)
try:
+ child = subprocess.Popen(['kill', '-USR1', str(pid)])
+ # This wait should be interrupted by the signal's exception.
+ self.wait(child)
+ self.fail('HandlerBCalled exception not thrown')
+ except HandlerBCalled:
+ self.assertTrue(self.b_called)
+ self.assertFalse(self.a_called)
if test_support.verbose:
- print("starting pause() loop...")
- while 1:
- try:
- if test_support.verbose:
- print("call pause()...")
- signal.pause()
- if test_support.verbose:
- print("pause() returned")
- except HandlerBCalled:
- handler_b_exception_raised = True
- if test_support.verbose:
- print("HandlerBCalled exception caught")
+ print("HandlerBCalled exception caught")
+ child = subprocess.Popen(['kill', '-USR2', str(pid)])
+ self.wait(child) # Nothing should happen.
+
+ try:
+ signal.alarm(1)
+ # The race condition in pause doesn't matter in this case,
+ # since alarm is going to raise a KeyboardException, which
+ # will skip the call.
+ signal.pause()
except KeyboardInterrupt:
if test_support.verbose:
print("KeyboardInterrupt (the alarm() went off)")
-
- self.assert_(self.a_called)
- self.assert_(self.b_called)
- self.assert_(handler_b_exception_raised)
-
- def setUp(self):
- # Install handlers.
- self.hup = signal.signal(signal.SIGHUP, self.handlerA)
- self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB)
- self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
- self.alrm = signal.signal(signal.SIGALRM,
- signal.default_int_handler)
- self.a_called = False
- self.b_called = False
- self.pid = os.getpid()
- self.fork_pid = self.spawn_force_test_exit_process(self.pid)
-
- def tearDown(self):
- # Forcibly kill the child we created to ping us if there was a
- # test error.
- try:
- # Make sure we don't kill ourself if there was a fork
- # error.
- if self.fork_pid > 0:
- os.kill(self.fork_pid, signal.SIGKILL)
except:
- # If the child killed us, it has probably exited. Killing
- # a non-existent process will raise an error which we
- # don't care about.
- pass
-
- # Restore handlers.
- signal.alarm(0) # cancel alarm in case we died early
- signal.signal(signal.SIGHUP, self.hup)
- signal.signal(signal.SIGUSR1, self.usr1)
- signal.signal(signal.SIGUSR2, self.usr2)
- signal.signal(signal.SIGALRM, self.alrm)
+ self.fail('Some other exception woke us from pause: %s' %
+ traceback.format_exc())
+ else:
+ self.fail('pause returned of its own accord')
+
+ def test_main(self):
+ # This function spawns a child process to insulate the main
+ # test-running process from all the signals. It then
+ # communicates with that child process over a pipe and
+ # re-raises information about any exceptions the child
+ # throws. The real work happens in self.run_test().
+ os_done_r, os_done_w = os.pipe()
+ with nested(closing(os.fdopen(os_done_r, 'rb')),
+ closing(os.fdopen(os_done_w, 'wb'))) as (done_r, done_w):
+ child = os.fork()
+ if child == 0:
+ # In the child process; run the test and report results
+ # through the pipe.
+ try:
+ done_r.close()
+ # Have to close done_w again here because
+ # exit_subprocess() will skip the enclosing with block.
+ with closing(done_w):
+ try:
+ self.run_test()
+ except:
+ pickle.dump(traceback.format_exc(), done_w)
+ else:
+ pickle.dump(None, done_w)
+ except:
+ print('Uh oh, raised from pickle.')
+ traceback.print_exc()
+ finally:
+ exit_subprocess()
+
+ done_w.close()
+ # Block for up to MAX_DURATION seconds for the test to finish.
+ r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
+ if done_r in r:
+ tb = pickle.load(done_r)
+ if tb:
+ self.fail(tb)
+ else:
+ os.kill(child, signal.SIGKILL)
+ self.fail('Test deadlocked after %d seconds.' %
+ self.MAX_DURATION)
class BasicSignalTests(unittest.TestCase):
+ def trivial_signal_handler(self, *args):
+ pass
+
def test_out_of_range_signal_number_raises_error(self):
self.assertRaises(ValueError, signal.getsignal, 4242)
- def trivial_signal_handler(*args):
- pass
-
self.assertRaises(ValueError, signal.signal, 4242,
- trivial_signal_handler)
+ self.trivial_signal_handler)
def test_setting_signal_handler_to_none_raises_error(self):
self.assertRaises(TypeError, signal.signal,
signal.SIGUSR1, None)
+ def test_getsignal(self):
+ hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
+ self.assertEquals(signal.getsignal(signal.SIGHUP),
+ self.trivial_signal_handler)
+ signal.signal(signal.SIGHUP, hup)
+ self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
+
+
class WakeupSignalTests(unittest.TestCase):
TIMEOUT_FULL = 10
TIMEOUT_HALF = 5
@@ -233,7 +230,7 @@ class SiginterruptTest(unittest.TestCase):
os.kill(ppid, self.signum)
time.sleep(0.2)
finally:
- os._exit(0)
+ exit_subprocess()
try:
os.close(w)