diff options
Diffstat (limited to 'Lib/test/test_signal.py')
| -rw-r--r-- | Lib/test/test_signal.py | 282 |
1 files changed, 155 insertions, 127 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index f64bd4c..4a1b4a6 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,13 +1,17 @@ -import unittest -from test import support -from contextlib import closing +import errno import gc +import os import pickle import select import signal import subprocess +import sys +import time import traceback -import sys, os, time, errno +import unittest +from test import support +from contextlib import closing +from test.script_helper import assert_python_ok, spawn_python if sys.platform in ('os2', 'riscos'): raise unittest.SkipTest("Can't test signal on %s" % sys.platform) @@ -109,7 +113,7 @@ class InterProcessSignalTests(unittest.TestCase): # This wait should be interrupted by the signal's exception. self.wait(child) time.sleep(1) # Give the signal time to be delivered. - self.fail('HandlerBCalled exception not thrown') + self.fail('HandlerBCalled exception not raised') except HandlerBCalled: self.assertTrue(self.b_called) self.assertFalse(self.a_called) @@ -148,7 +152,7 @@ class InterProcessSignalTests(unittest.TestCase): # test-running process from all the signals. It then # communicates with that child process over a pipe and # re-raises information about any exceptions the child - # throws. The real work happens in self.run_test(). + # raises. The real work happens in self.run_test(). os_done_r, os_done_w = os.pipe() with closing(os.fdopen(os_done_r, 'rb')) as done_r, \ closing(os.fdopen(os_done_w, 'wb')) as done_w: @@ -229,150 +233,174 @@ class WindowsSignalTests(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class WakeupSignalTests(unittest.TestCase): - TIMEOUT_FULL = 10 - TIMEOUT_HALF = 5 + def check_wakeup(self, test_body): + # use a subprocess to have only one thread and to not change signal + # handling of the parent process + code = """if 1: + import fcntl + import os + import signal + + def handler(signum, frame): + pass + + {} + + signal.signal(signal.SIGALRM, handler) + read, write = os.pipe() + flags = fcntl.fcntl(write, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(write, fcntl.F_SETFL, flags) + signal.set_wakeup_fd(write) + + test() + + os.close(read) + os.close(write) + """.format(test_body) + + assert_python_ok('-c', code) def test_wakeup_fd_early(self): - import select - - signal.alarm(1) - before_time = time.time() - # We attempt to get a signal during the sleep, - # before select is called - time.sleep(self.TIMEOUT_FULL) - mid_time = time.time() - self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF) - select.select([self.read], [], [], self.TIMEOUT_FULL) - after_time = time.time() - self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) + self.check_wakeup("""def test(): + import select + import time - def test_wakeup_fd_during(self): - import select + TIMEOUT_FULL = 10 + TIMEOUT_HALF = 5 - signal.alarm(1) - before_time = time.time() - # We attempt to get a signal during the select call - self.assertRaises(select.error, select.select, - [self.read], [], [], self.TIMEOUT_FULL) - after_time = time.time() - self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) + signal.alarm(1) + before_time = time.time() + # We attempt to get a signal during the sleep, + # before select is called + time.sleep(TIMEOUT_FULL) + mid_time = time.time() + dt = mid_time - before_time + if dt >= TIMEOUT_HALF: + raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) + select.select([read], [], [], TIMEOUT_FULL) + after_time = time.time() + dt = after_time - mid_time + if dt >= TIMEOUT_HALF: + raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) + """) - def setUp(self): - import fcntl + def test_wakeup_fd_during(self): + self.check_wakeup("""def test(): + import select + import time - self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) - self.read, self.write = os.pipe() - flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0) - flags = flags | os.O_NONBLOCK - fcntl.fcntl(self.write, fcntl.F_SETFL, flags) - self.old_wakeup = signal.set_wakeup_fd(self.write) + TIMEOUT_FULL = 10 + TIMEOUT_HALF = 5 - def tearDown(self): - signal.set_wakeup_fd(self.old_wakeup) - os.close(self.read) - os.close(self.write) - signal.signal(signal.SIGALRM, self.alrm) + signal.alarm(1) + before_time = time.time() + # We attempt to get a signal during the select call + try: + select.select([read], [], [], TIMEOUT_FULL) + except select.error: + pass + else: + raise Exception("select.error not raised") + after_time = time.time() + dt = after_time - before_time + if dt >= TIMEOUT_HALF: + raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) + """) @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class SiginterruptTest(unittest.TestCase): - def setUp(self): - """Install a no-op signal handler that can be set to allow - interrupts or not, and arrange for the original signal handler to be - re-installed when the test is finished. - """ - self.signum = signal.SIGUSR1 - oldhandler = signal.signal(self.signum, lambda x,y: None) - self.addCleanup(signal.signal, self.signum, oldhandler) - - def readpipe_interrupted(self): + def readpipe_interrupted(self, interrupt): """Perform a read during which a signal will arrive. Return True if the read is interrupted by the signal and raises an exception. Return False if it returns normally. """ - # Create a pipe that can be used for the read. Also clean it up - # when the test is over, since nothing else will (but see below for - # the write end). - r, w = os.pipe() - self.addCleanup(os.close, r) - - # Create another process which can send a signal to this one to try - # to interrupt the read. - ppid = os.getpid() - pid = os.fork() - - if pid == 0: - # Child code: sleep to give the parent enough time to enter the - # read() call (there's a race here, but it's really tricky to - # eliminate it); then signal the parent process. Also, sleep - # again to make it likely that the signal is delivered to the - # parent process before the child exits. If the child exits - # first, the write end of the pipe will be closed and the test - # is invalid. - try: - time.sleep(0.2) - os.kill(ppid, self.signum) - time.sleep(0.2) - finally: - # No matter what, just exit as fast as possible now. - exit_subprocess() - else: - # Parent code. - # Make sure the child is eventually reaped, else it'll be a - # zombie for the rest of the test suite run. - self.addCleanup(os.waitpid, pid, 0) - - # Close the write end of the pipe. The child has a copy, so - # it's not really closed until the child exits. We need it to - # close when the child exits so that in the non-interrupt case - # the read eventually completes, otherwise we could just close - # it *after* the test. - os.close(w) - - # Try the read and report whether it is interrupted or not to - # the caller. + class Timeout(Exception): + pass + + # use a subprocess to have only one thread, to have a timeout on the + # blocking read and to not touch signal handling in this process + code = """if 1: + import errno + import os + import signal + import sys + + interrupt = %r + r, w = os.pipe() + + def handler(signum, frame): + pass + + signal.signal(signal.SIGALRM, handler) + if interrupt is not None: + signal.siginterrupt(signal.SIGALRM, interrupt) + + print("ready") + sys.stdout.flush() + + # run the test twice + for loop in range(2): + # send a SIGALRM in a second (during the read) + signal.alarm(1) + try: + # blocking call: read from a pipe without data + os.read(r, 1) + except OSError as err: + if err.errno != errno.EINTR: + raise + else: + sys.exit(2) + sys.exit(3) + """ % (interrupt,) + with spawn_python('-c', code) as process: try: - d = os.read(r, 1) + # wait until the child process is loaded and has started + first_line = process.stdout.readline() + + # Wait the process with a timeout of 5 seconds + timeout = time.time() + 5.0 + while True: + if timeout < time.time(): + raise Timeout() + status = process.poll() + if status is not None: + break + time.sleep(0.1) + + stdout, stderr = process.communicate() + except Timeout: + process.kill() return False - except OSError as err: - if err.errno != errno.EINTR: - raise - return True + else: + stdout = first_line + stdout + exitcode = process.wait() + if exitcode not in (2, 3): + raise Exception("Child error (exit code %s): %s" + % (exitcode, stdout)) + return (exitcode == 3) def test_without_siginterrupt(self): - """If a signal handler is installed and siginterrupt is not called - at all, when that signal arrives, it interrupts a syscall that's in - progress. - """ - i = self.readpipe_interrupted() - self.assertTrue(i) - # Arrival of the signal shouldn't have changed anything. - i = self.readpipe_interrupted() - self.assertTrue(i) + # If a signal handler is installed and siginterrupt is not called + # at all, when that signal arrives, it interrupts a syscall that's in + # progress. + interrupted = self.readpipe_interrupted(None) + self.assertTrue(interrupted) def test_siginterrupt_on(self): - """If a signal handler is installed and siginterrupt is called with - a true value for the second argument, when that signal arrives, it - interrupts a syscall that's in progress. - """ - signal.siginterrupt(self.signum, 1) - i = self.readpipe_interrupted() - self.assertTrue(i) - # Arrival of the signal shouldn't have changed anything. - i = self.readpipe_interrupted() - self.assertTrue(i) + # If a signal handler is installed and siginterrupt is called with + # a true value for the second argument, when that signal arrives, it + # interrupts a syscall that's in progress. + interrupted = self.readpipe_interrupted(True) + self.assertTrue(interrupted) def test_siginterrupt_off(self): - """If a signal handler is installed and siginterrupt is called with - a false value for the second argument, when that signal arrives, it - does not interrupt a syscall that's in progress. - """ - signal.siginterrupt(self.signum, 0) - i = self.readpipe_interrupted() - self.assertFalse(i) - # Arrival of the signal shouldn't have changed anything. - i = self.readpipe_interrupted() - self.assertFalse(i) + # If a signal handler is installed and siginterrupt is called with + # a false value for the second argument, when that signal arrives, it + # does not interrupt a syscall that's in progress. + interrupted = self.readpipe_interrupted(False) + self.assertFalse(interrupted) @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") |
