summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorJeffrey Yasskin <jyasskin@gmail.com>2008-03-21 05:02:44 (GMT)
committerJeffrey Yasskin <jyasskin@gmail.com>2008-03-21 05:02:44 (GMT)
commitcf26f5419e51a62ae796369ecece8019688f94f4 (patch)
treedc1241456629017d862016f165a58877029088a4 /Lib
parent816a168053544986adaf1fe71d5cbc60f81914c5 (diff)
downloadcpython-cf26f5419e51a62ae796369ecece8019688f94f4.zip
cpython-cf26f5419e51a62ae796369ecece8019688f94f4.tar.gz
cpython-cf26f5419e51a62ae796369ecece8019688f94f4.tar.bz2
Speed up test_signal from ~24s to 4s by avoiding nearly all of the sleep calls.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_signal.py250
1 files changed, 129 insertions, 121 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index b88565c..76574af 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -1,6 +1,10 @@
import unittest
from test import test_support
+from contextlib import closing, nested
+import pickle
+import select
import signal
+import traceback
import sys, os, time, errno
if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
@@ -11,39 +15,20 @@ if sys.platform[:3] in ('win', 'os2') or sys.platform == 'riscos':
class HandlerBCalled(Exception):
pass
+
+def exit_subprocess():
+ """Use os._exit(0) to exit the current subprocess.
+
+ Otherwise, the test catches the SystemExit and continues executing
+ in parallel with the original test, so you wind up with an
+ exponential number of tests running concurrently.
+ """
+ os._exit(0)
+
+
class InterProcessSignalTests(unittest.TestCase):
MAX_DURATION = 20 # Entire test should last at most 20 sec.
- # Set up a child to send signals to us (the parent) after waiting
- # long enough to receive the alarm. It seems we miss the alarm
- # for some reason. This will hopefully stop the hangs on
- # Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the
- # signals at times, or seemingly random subsets of them, and
- # nothing done in force_test_exit so far has actually helped.
- def spawn_force_test_exit_process(self, parent_pid):
- # Sigh, both imports seem necessary to avoid errors.
- import os
- fork_pid = os.fork()
- if fork_pid:
- # In parent.
- return fork_pid
-
- # In child.
- import os, time
- try:
- # Wait 5 seconds longer than the expected alarm to give enough
- # time for the normal sequence of events to occur. This is
- # just a stop-gap to try to prevent the test from hanging.
- time.sleep(self.MAX_DURATION + 5)
- print >> sys.__stdout__, " child should not have to kill parent"
- for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
- os.kill(parent_pid, getattr(signal, signame))
- print >> sys.__stdout__, " child sent", signame, "to", \
- parent_pid
- time.sleep(1)
- finally:
- os._exit(0)
-
def handlerA(self, *args):
self.a_called = True
if test_support.verbose:
@@ -55,121 +40,144 @@ class InterProcessSignalTests(unittest.TestCase):
print "handlerB invoked", args
raise HandlerBCalled(*args)
- def test_main(self):
- self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA)
- self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB)
- self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
- self.assertEquals(signal.getsignal(signal.SIGALRM),
- signal.default_int_handler)
-
- # Launch an external script to send us signals.
- # We expect the external script to:
- # send HUP, which invokes handlerA to set a_called
- # send USR1, which invokes handlerB to set b_called and raise
- # HandlerBCalled
- # send USR2, which is ignored
- #
- # Then we expect the alarm to go off, and its handler raises
- # KeyboardInterrupt, finally getting us out of the loop.
+ def wait(self, child_pid):
+ """Wait for child_pid to finish, ignoring EINTR."""
+ while True:
+ try:
+ pid, status = os.waitpid(child_pid, 0)
+ return status
+ except OSError as e:
+ if e.errno != errno.EINTR:
+ raise
- if test_support.verbose:
- verboseflag = '-x'
- else:
- verboseflag = '+x'
+ def run_test(self):
+ # Install handlers. This function runs in a sub-process, so we
+ # don't worry about re-setting the default handlers.
+ signal.signal(signal.SIGHUP, self.handlerA)
+ signal.signal(signal.SIGUSR1, self.handlerB)
+ signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+ signal.signal(signal.SIGALRM, signal.default_int_handler)
+
+ # Variables the signals will modify:
+ self.a_called = False
+ self.b_called = False
- pid = self.pid
+ # Let the sub-processes know who to send signals to.
+ pid = os.getpid()
if test_support.verbose:
print "test runner's pid is", pid
- # Shell script that will send us asynchronous signals
- script = """
- (
- set %(verboseflag)s
- sleep 2
- kill -HUP %(pid)d
- sleep 2
- kill -USR1 %(pid)d
- sleep 2
- kill -USR2 %(pid)d
- ) &
- """ % vars()
-
- signal.alarm(self.MAX_DURATION)
-
- handler_b_exception_raised = False
+ child = os.fork()
+ if child == 0:
+ os.kill(pid, signal.SIGHUP)
+ exit_subprocess()
+ self.wait(child)
+ self.assertTrue(self.a_called)
+ self.assertFalse(self.b_called)
+ self.a_called = False
- os.system(script)
try:
+ child = os.fork()
+ if child == 0:
+ os.kill(pid, signal.SIGUSR1)
+ exit_subprocess()
+ # This wait should be interrupted by the signal's exception.
+ self.wait(child)
+ self.fail('HandlerBCalled exception not thrown')
+ except HandlerBCalled:
+ # So we call it again to reap the child's zombie.
+ self.wait(child)
+ self.assertTrue(self.b_called)
+ self.assertFalse(self.a_called)
if test_support.verbose:
- print "starting pause() loop..."
- while 1:
- try:
- if test_support.verbose:
- print "call pause()..."
- signal.pause()
- if test_support.verbose:
- print "pause() returned"
- except HandlerBCalled:
- handler_b_exception_raised = True
- if test_support.verbose:
- print "HandlerBCalled exception caught"
+ print "HandlerBCalled exception caught"
+ child = os.fork()
+ if child == 0:
+ os.kill(pid, signal.SIGUSR2)
+ exit_subprocess()
+ self.wait(child) # Nothing should happen.
+
+ try:
+ signal.alarm(1)
+ # The race condition in pause doesn't matter in this case,
+ # since alarm is going to raise a KeyboardException, which
+ # will skip the call.
+ signal.pause()
except KeyboardInterrupt:
if test_support.verbose:
print "KeyboardInterrupt (the alarm() went off)"
-
- self.assert_(self.a_called)
- self.assert_(self.b_called)
- self.assert_(handler_b_exception_raised)
-
- def setUp(self):
- # Install handlers.
- self.hup = signal.signal(signal.SIGHUP, self.handlerA)
- self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB)
- self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
- self.alrm = signal.signal(signal.SIGALRM,
- signal.default_int_handler)
- self.a_called = False
- self.b_called = False
- self.pid = os.getpid()
- self.fork_pid = self.spawn_force_test_exit_process(self.pid)
-
- def tearDown(self):
- # Forcibly kill the child we created to ping us if there was a
- # test error.
- try:
- # Make sure we don't kill ourself if there was a fork
- # error.
- if self.fork_pid > 0:
- os.kill(self.fork_pid, signal.SIGKILL)
except:
- # If the child killed us, it has probably exited. Killing
- # a non-existent process will raise an error which we
- # don't care about.
- pass
-
- # Restore handlers.
- signal.alarm(0) # cancel alarm in case we died early
- signal.signal(signal.SIGHUP, self.hup)
- signal.signal(signal.SIGUSR1, self.usr1)
- signal.signal(signal.SIGUSR2, self.usr2)
- signal.signal(signal.SIGALRM, self.alrm)
+ self.fail('Some other exception woke us from pause: %s' %
+ traceback.format_exc())
+ else:
+ self.fail('pause returned of its own accord')
+
+ def test_main(self):
+ # This function spawns a child process to insulate the main
+ # test-running process from all the signals. It then
+ # communicates with that child process over a pipe and
+ # re-raises information about any exceptions the child
+ # throws. The real work happens in self.run_test().
+ os_done_r, os_done_w = os.pipe()
+ with nested(closing(os.fdopen(os_done_r)),
+ closing(os.fdopen(os_done_w, 'w'))) as (done_r, done_w):
+ child = os.fork()
+ if child == 0:
+ # In the child process; run the test and report results
+ # through the pipe.
+ try:
+ done_r.close()
+ # Have to close done_w again here because
+ # exit_subprocess() will skip the enclosing with block.
+ with closing(done_w):
+ try:
+ self.run_test()
+ except:
+ pickle.dump(traceback.format_exc(), done_w)
+ else:
+ pickle.dump(None, done_w)
+ except:
+ print 'Uh oh, raised from pickle.'
+ traceback.print_exc()
+ finally:
+ exit_subprocess()
+
+ done_w.close()
+ # Block for up to MAX_DURATION seconds for the test to finish.
+ r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
+ if done_r in r:
+ tb = pickle.load(done_r)
+ if tb:
+ self.fail(tb)
+ else:
+ os.kill(child, signal.SIGKILL)
+ self.fail('Test deadlocked after %d seconds.' %
+ self.MAX_DURATION)
class BasicSignalTests(unittest.TestCase):
+ def trivial_signal_handler(self, *args):
+ pass
+
def test_out_of_range_signal_number_raises_error(self):
self.assertRaises(ValueError, signal.getsignal, 4242)
- def trivial_signal_handler(*args):
- pass
-
self.assertRaises(ValueError, signal.signal, 4242,
- trivial_signal_handler)
+ self.trivial_signal_handler)
def test_setting_signal_handler_to_none_raises_error(self):
self.assertRaises(TypeError, signal.signal,
signal.SIGUSR1, None)
+ def test_getsignal(self):
+ hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
+ self.assertEquals(signal.getsignal(signal.SIGHUP),
+ self.trivial_signal_handler)
+ signal.signal(signal.SIGHUP, hup)
+ self.assertEquals(signal.getsignal(signal.SIGHUP), hup)
+
+
class WakeupSignalTests(unittest.TestCase):
TIMEOUT_FULL = 10
TIMEOUT_HALF = 5
@@ -232,7 +240,7 @@ class SiginterruptTest(unittest.TestCase):
os.kill(ppid, self.signum)
time.sleep(0.2)
finally:
- os._exit(0)
+ exit_subprocess()
try:
os.close(w)