summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_signal.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r--Lib/test/test_signal.py333
1 files changed, 182 insertions, 151 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 2803785..4a1b4a6 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -1,13 +1,17 @@
-import unittest
-from test import test_support
-from contextlib import closing
+import errno
import gc
+import os
import pickle
import select
import signal
import subprocess
+import sys
+import time
import traceback
-import sys, os, time, errno
+import unittest
+from test import support
+from contextlib import closing
+from test.script_helper import assert_python_ok, spawn_python
if sys.platform in ('os2', 'riscos'):
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
@@ -53,15 +57,15 @@ class InterProcessSignalTests(unittest.TestCase):
def handlerA(self, signum, frame):
self.a_called = True
- if test_support.verbose:
- print "handlerA invoked from signal %s at:\n%s" % (
- signum, self.format_frame(frame, limit=1))
+ if support.verbose:
+ print("handlerA invoked from signal %s at:\n%s" % (
+ signum, self.format_frame(frame, limit=1)))
def handlerB(self, signum, frame):
self.b_called = True
- if test_support.verbose:
- print "handlerB invoked from signal %s at:\n%s" % (
- signum, self.format_frame(frame, limit=1))
+ if support.verbose:
+ print ("handlerB invoked from signal %s at:\n%s" % (
+ signum, self.format_frame(frame, limit=1)))
raise HandlerBCalled(signum, self.format_frame(frame))
def wait(self, child):
@@ -88,8 +92,8 @@ class InterProcessSignalTests(unittest.TestCase):
# Let the sub-processes know who to send signals to.
pid = os.getpid()
- if test_support.verbose:
- print "test runner's pid is", pid
+ if support.verbose:
+ print("test runner's pid is", pid)
child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
if child:
@@ -113,8 +117,8 @@ class InterProcessSignalTests(unittest.TestCase):
except HandlerBCalled:
self.assertTrue(self.b_called)
self.assertFalse(self.a_called)
- if test_support.verbose:
- print "HandlerBCalled exception caught"
+ if support.verbose:
+ print("HandlerBCalled exception caught")
child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
if child:
@@ -130,8 +134,8 @@ class InterProcessSignalTests(unittest.TestCase):
# may return early.
time.sleep(1)
except KeyboardInterrupt:
- if test_support.verbose:
- print "KeyboardInterrupt (the alarm() went off)"
+ if support.verbose:
+ print("KeyboardInterrupt (the alarm() went off)")
except:
self.fail("Some other exception woke us from pause: %s" %
traceback.format_exc())
@@ -139,7 +143,7 @@ class InterProcessSignalTests(unittest.TestCase):
self.fail("pause returned of its own accord, and the signal"
" didn't arrive after another second.")
- # Issue 3864. Unknown if this affects earlier versions of freebsd also.
+ # Issue 3864, unknown if this affects earlier versions of freebsd also
@unittest.skipIf(sys.platform=='freebsd6',
'inter process signals not reliable (do not mix well with threading) '
'on freebsd6')
@@ -150,8 +154,8 @@ class InterProcessSignalTests(unittest.TestCase):
# re-raises information about any exceptions the child
# raises. The real work happens in self.run_test().
os_done_r, os_done_w = os.pipe()
- with closing(os.fdopen(os_done_r)) as done_r, \
- closing(os.fdopen(os_done_w, 'w')) as done_w:
+ with closing(os.fdopen(os_done_r, 'rb')) as done_r, \
+ closing(os.fdopen(os_done_w, 'wb')) as done_w:
child = os.fork()
if child == 0:
# In the child process; run the test and report results
@@ -168,7 +172,7 @@ class InterProcessSignalTests(unittest.TestCase):
else:
pickle.dump(None, done_w)
except:
- print 'Uh oh, raised from pickle.'
+ print('Uh oh, raised from pickle.')
traceback.print_exc()
finally:
exit_subprocess()
@@ -229,150 +233,174 @@ class WindowsSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class WakeupSignalTests(unittest.TestCase):
- TIMEOUT_FULL = 10
- TIMEOUT_HALF = 5
+ def check_wakeup(self, test_body):
+ # use a subprocess to have only one thread and to not change signal
+ # handling of the parent process
+ code = """if 1:
+ import fcntl
+ import os
+ import signal
+
+ def handler(signum, frame):
+ pass
+
+ {}
+
+ signal.signal(signal.SIGALRM, handler)
+ read, write = os.pipe()
+ flags = fcntl.fcntl(write, fcntl.F_GETFL, 0)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(write, fcntl.F_SETFL, flags)
+ signal.set_wakeup_fd(write)
+
+ test()
+
+ os.close(read)
+ os.close(write)
+ """.format(test_body)
+
+ assert_python_ok('-c', code)
def test_wakeup_fd_early(self):
- import select
-
- signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the sleep,
- # before select is called
- time.sleep(self.TIMEOUT_FULL)
- mid_time = time.time()
- self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
- select.select([self.read], [], [], self.TIMEOUT_FULL)
- after_time = time.time()
- self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
+ self.check_wakeup("""def test():
+ import select
+ import time
- def test_wakeup_fd_during(self):
- import select
+ TIMEOUT_FULL = 10
+ TIMEOUT_HALF = 5
- signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the select call
- self.assertRaises(select.error, select.select,
- [self.read], [], [], self.TIMEOUT_FULL)
- after_time = time.time()
- self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
+ signal.alarm(1)
+ before_time = time.time()
+ # We attempt to get a signal during the sleep,
+ # before select is called
+ time.sleep(TIMEOUT_FULL)
+ mid_time = time.time()
+ dt = mid_time - before_time
+ if dt >= TIMEOUT_HALF:
+ raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
+ select.select([read], [], [], TIMEOUT_FULL)
+ after_time = time.time()
+ dt = after_time - mid_time
+ if dt >= TIMEOUT_HALF:
+ raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
+ """)
- def setUp(self):
- import fcntl
+ def test_wakeup_fd_during(self):
+ self.check_wakeup("""def test():
+ import select
+ import time
- self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
- self.read, self.write = os.pipe()
- flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
- flags = flags | os.O_NONBLOCK
- fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
- self.old_wakeup = signal.set_wakeup_fd(self.write)
+ TIMEOUT_FULL = 10
+ TIMEOUT_HALF = 5
- def tearDown(self):
- signal.set_wakeup_fd(self.old_wakeup)
- os.close(self.read)
- os.close(self.write)
- signal.signal(signal.SIGALRM, self.alrm)
+ signal.alarm(1)
+ before_time = time.time()
+ # We attempt to get a signal during the select call
+ try:
+ select.select([read], [], [], TIMEOUT_FULL)
+ except select.error:
+ pass
+ else:
+ raise Exception("select.error not raised")
+ after_time = time.time()
+ dt = after_time - before_time
+ if dt >= TIMEOUT_HALF:
+ raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
+ """)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class SiginterruptTest(unittest.TestCase):
- def setUp(self):
- """Install a no-op signal handler that can be set to allow
- interrupts or not, and arrange for the original signal handler to be
- re-installed when the test is finished.
- """
- self.signum = signal.SIGUSR1
- oldhandler = signal.signal(self.signum, lambda x,y: None)
- self.addCleanup(signal.signal, self.signum, oldhandler)
-
- def readpipe_interrupted(self):
+ def readpipe_interrupted(self, interrupt):
"""Perform a read during which a signal will arrive. Return True if the
read is interrupted by the signal and raises an exception. Return False
if it returns normally.
"""
- # Create a pipe that can be used for the read. Also clean it up
- # when the test is over, since nothing else will (but see below for
- # the write end).
- r, w = os.pipe()
- self.addCleanup(os.close, r)
-
- # Create another process which can send a signal to this one to try
- # to interrupt the read.
- ppid = os.getpid()
- pid = os.fork()
-
- if pid == 0:
- # Child code: sleep to give the parent enough time to enter the
- # read() call (there's a race here, but it's really tricky to
- # eliminate it); then signal the parent process. Also, sleep
- # again to make it likely that the signal is delivered to the
- # parent process before the child exits. If the child exits
- # first, the write end of the pipe will be closed and the test
- # is invalid.
- try:
- time.sleep(0.2)
- os.kill(ppid, self.signum)
- time.sleep(0.2)
- finally:
- # No matter what, just exit as fast as possible now.
- exit_subprocess()
- else:
- # Parent code.
- # Make sure the child is eventually reaped, else it'll be a
- # zombie for the rest of the test suite run.
- self.addCleanup(os.waitpid, pid, 0)
-
- # Close the write end of the pipe. The child has a copy, so
- # it's not really closed until the child exits. We need it to
- # close when the child exits so that in the non-interrupt case
- # the read eventually completes, otherwise we could just close
- # it *after* the test.
- os.close(w)
-
- # Try the read and report whether it is interrupted or not to
- # the caller.
+ class Timeout(Exception):
+ pass
+
+ # use a subprocess to have only one thread, to have a timeout on the
+ # blocking read and to not touch signal handling in this process
+ code = """if 1:
+ import errno
+ import os
+ import signal
+ import sys
+
+ interrupt = %r
+ r, w = os.pipe()
+
+ def handler(signum, frame):
+ pass
+
+ signal.signal(signal.SIGALRM, handler)
+ if interrupt is not None:
+ signal.siginterrupt(signal.SIGALRM, interrupt)
+
+ print("ready")
+ sys.stdout.flush()
+
+ # run the test twice
+ for loop in range(2):
+ # send a SIGALRM in a second (during the read)
+ signal.alarm(1)
+ try:
+ # blocking call: read from a pipe without data
+ os.read(r, 1)
+ except OSError as err:
+ if err.errno != errno.EINTR:
+ raise
+ else:
+ sys.exit(2)
+ sys.exit(3)
+ """ % (interrupt,)
+ with spawn_python('-c', code) as process:
try:
- d = os.read(r, 1)
+ # wait until the child process is loaded and has started
+ first_line = process.stdout.readline()
+
+ # Wait the process with a timeout of 5 seconds
+ timeout = time.time() + 5.0
+ while True:
+ if timeout < time.time():
+ raise Timeout()
+ status = process.poll()
+ if status is not None:
+ break
+ time.sleep(0.1)
+
+ stdout, stderr = process.communicate()
+ except Timeout:
+ process.kill()
return False
- except OSError, err:
- if err.errno != errno.EINTR:
- raise
- return True
+ else:
+ stdout = first_line + stdout
+ exitcode = process.wait()
+ if exitcode not in (2, 3):
+ raise Exception("Child error (exit code %s): %s"
+ % (exitcode, stdout))
+ return (exitcode == 3)
def test_without_siginterrupt(self):
- """If a signal handler is installed and siginterrupt is not called
- at all, when that signal arrives, it interrupts a syscall that's in
- progress.
- """
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
+ # If a signal handler is installed and siginterrupt is not called
+ # at all, when that signal arrives, it interrupts a syscall that's in
+ # progress.
+ interrupted = self.readpipe_interrupted(None)
+ self.assertTrue(interrupted)
def test_siginterrupt_on(self):
- """If a signal handler is installed and siginterrupt is called with
- a true value for the second argument, when that signal arrives, it
- interrupts a syscall that's in progress.
- """
- signal.siginterrupt(self.signum, 1)
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
+ # If a signal handler is installed and siginterrupt is called with
+ # a true value for the second argument, when that signal arrives, it
+ # interrupts a syscall that's in progress.
+ interrupted = self.readpipe_interrupted(True)
+ self.assertTrue(interrupted)
def test_siginterrupt_off(self):
- """If a signal handler is installed and siginterrupt is called with
- a false value for the second argument, when that signal arrives, it
- does not interrupt a syscall that's in progress.
- """
- signal.siginterrupt(self.signum, 0)
- i = self.readpipe_interrupted()
- self.assertFalse(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertFalse(i)
+ # If a signal handler is installed and siginterrupt is called with
+ # a false value for the second argument, when that signal arrives, it
+ # does not interrupt a syscall that's in progress.
+ interrupted = self.readpipe_interrupted(False)
+ self.assertFalse(interrupted)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
@@ -391,7 +419,7 @@ class ItimerTest(unittest.TestCase):
def sig_alrm(self, *args):
self.hndl_called = True
- if test_support.verbose:
+ if support.verbose:
print("SIGALRM handler invoked", args)
def sig_vtalrm(self, *args):
@@ -404,19 +432,19 @@ class ItimerTest(unittest.TestCase):
elif self.hndl_count == 3:
# disable ITIMER_VIRTUAL, this function shouldn't be called anymore
signal.setitimer(signal.ITIMER_VIRTUAL, 0)
- if test_support.verbose:
+ if support.verbose:
print("last SIGVTALRM handler call")
self.hndl_count += 1
- if test_support.verbose:
+ if support.verbose:
print("SIGVTALRM handler invoked", args)
def sig_prof(self, *args):
self.hndl_called = True
signal.setitimer(signal.ITIMER_PROF, 0)
- if test_support.verbose:
+ if support.verbose:
print("SIGPROF handler invoked", args)
def test_itimer_exc(self):
@@ -431,13 +459,13 @@ class ItimerTest(unittest.TestCase):
def test_itimer_real(self):
self.itimer = signal.ITIMER_REAL
signal.setitimer(self.itimer, 1.0)
- if test_support.verbose:
+ if support.verbose:
print("\ncall pause()...")
signal.pause()
self.assertEqual(self.hndl_called, True)
- # Issue 3864. Unknown if this affects earlier versions of freebsd also.
+ # Issue 3864, unknown if this affects earlier versions of freebsd also
@unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
'itimer not reliable (does not mix well with threading) on some BSDs.')
def test_itimer_virtual(self):
@@ -460,7 +488,7 @@ class ItimerTest(unittest.TestCase):
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
- # Issue 3864. Unknown if this affects earlier versions of freebsd also.
+ # Issue 3864, unknown if this affects earlier versions of freebsd also
@unittest.skipIf(sys.platform=='freebsd6',
'itimer not reliable (does not mix well with threading) on freebsd6')
def test_itimer_prof(self):
@@ -484,9 +512,12 @@ class ItimerTest(unittest.TestCase):
self.assertEqual(self.hndl_called, True)
def test_main():
- test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
- WakeupSignalTests, SiginterruptTest,
- ItimerTest, WindowsSignalTests)
+ try:
+ support.run_unittest(BasicSignalTests, InterProcessSignalTests,
+ WakeupSignalTests, SiginterruptTest,
+ ItimerTest, WindowsSignalTests)
+ finally:
+ support.reap_children()
if __name__ == "__main__":