diff options
author | Victor Stinner <victor.stinner@haypocalc.com> | 2011-06-29 08:43:02 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@haypocalc.com> | 2011-06-29 08:43:02 (GMT) |
commit | 9e8b82f1e1b726bd956568dec3769f771644a8ed (patch) | |
tree | d35a7753f586159bb3c952466924abd47f6287ef /Lib/test/test_signal.py | |
parent | 470e0dc23aa5699011de131c7f21d24aade3def0 (diff) | |
download | cpython-9e8b82f1e1b726bd956568dec3769f771644a8ed.zip cpython-9e8b82f1e1b726bd956568dec3769f771644a8ed.tar.gz cpython-9e8b82f1e1b726bd956568dec3769f771644a8ed.tar.bz2 |
Issue #12303: run sig*wait*() tests in a subprocesss
... instead of using fork(): sig*wait*() functions behave differently (not
correctly) after a fork, especially on FreeBSD 6.
Skip also test_sigtimedwait_poll() on FreeBSD 6 because of a kernel bug.
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r-- | Lib/test/test_signal.py | 137 |
1 files changed, 81 insertions, 56 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 8d8a15d..311d0f0 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -520,7 +520,6 @@ class PendingSignalsTests(unittest.TestCase): functions. """ def setUp(self): - self.hndl_called = False self.has_pthread_kill = hasattr(signal, 'pthread_kill') def handler(self, signum, frame): @@ -610,87 +609,108 @@ class PendingSignalsTests(unittest.TestCase): @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 'need signal.pthread_sigmask()') - @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') - def wait_helper(self, test, handler, blocked): - signum = signal.SIGALRM + def wait_helper(self, test, blocked): + """ + test: body of the "def test(signum):" function. + blocked: number of the blocked signal + """ + code = ''' +import signal +import sys - # sig*wait* must be called with the signal blocked: since the current - # process might have several threads running, we fork() a child process - # to have a single thread. - pid = os.fork() - if pid == 0: - # child: block and wait the signal - try: - signal.signal(signum, handler) - signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) +def handler(signum, frame): + 1/0 - # Do the tests - test(signum) +def test(signum): +%s - # The handler must not be called on unblock - try: - signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) - except ZeroDivisionError: - print("the signal handler has been called", - file=sys.stderr) - os._exit(1) - except BaseException as err: - print("error: {}".format(err), file=sys.stderr) - sys.stderr.flush() - os._exit(1) - else: - os._exit(0) - else: - # parent: check that the child correcty received the signal - self.assertEqual(os.waitpid(pid, 0), (pid, 0)) +blocked = %s +signum = signal.SIGALRM + +# child: block and wait the signal +try: + signal.signal(signum, handler) + signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) + + # Do the tests + test(signum) + + # The handler must not be called on unblock + try: + signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) + except ZeroDivisionError: + print("the signal handler has been called", + file=sys.stderr) + sys.exit(1) +except BaseException as err: + print("error: {}".format(err), file=sys.stderr) + sys.stderr.flush() + sys.exit(1) +''' % (test, blocked) + + # sig*wait* must be called with the signal blocked: since the current + # process might have several threads running, use a subprocess to have + # a single thread. + assert_python_ok('-c', code) @unittest.skipUnless(hasattr(signal, 'sigwait'), 'need signal.sigwait()') def test_sigwait(self): - def test(signum): + test = ''' signal.alarm(1) - self.assertEqual(signum, signal.sigwait([signum])) + received = signal.sigwait([signum]) + assert received == signum , 'received %s, not %s' % (received, signum) + ''' - self.wait_helper(test, self.handler, signal.SIGALRM) + self.wait_helper(test, signal.SIGALRM) @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 'need signal.sigwaitinfo()') def test_sigwaitinfo(self): - def test(signum): + test = ''' signal.alarm(1) info = signal.sigwaitinfo([signum]) - self.assertEqual(signum, info.si_signo) + assert info.si_signo == signum, "info.si_signo != %s" % signum + ''' - self.wait_helper(test, self.handler, signal.SIGALRM) + self.wait_helper(test, signal.SIGALRM) @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 'need signal.sigtimedwait()') def test_sigtimedwait(self): - def test(signum): + test = ''' signal.alarm(1) info = signal.sigtimedwait([signum], (10, 1000)) - self.assertEqual(signum, info.si_signo) + assert info.si_signo == signum, 'info.si_signo != %s' % signum + ''' - self.wait_helper(test, self.handler, signal.SIGALRM) + self.wait_helper(test, signal.SIGALRM) - # check that polling with sigtimedwait works @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 'need signal.sigtimedwait()') + # issue #12303: sigtimedwait() takes 30 seconds on FreeBSD 6 (kernel bug) + @unittest.skipIf(sys.platform =='freebsd6', + 'sigtimedwait() with a null timeout doens\'t work on FreeBSD 6') def test_sigtimedwait_poll(self): - def test(signum): - self.kill(signum) + # check that polling with sigtimedwait works + test = ''' + import os + os.kill(os.getpid(), signum) info = signal.sigtimedwait([signum], (0, 0)) - self.assertEqual(signum, info.si_signo) + assert info.si_signo == signum, 'info.si_signo != %s' % signum + ''' - self.wait_helper(test, self.handler, signal.SIGALRM) + self.wait_helper(test, signal.SIGALRM) @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 'need signal.sigtimedwait()') def test_sigtimedwait_timeout(self): - def test(signum): - self.assertEqual(None, signal.sigtimedwait([signum], (1, 35500))) + test = ''' + received = signal.sigtimedwait([signum], (1, 0)) + assert received is None, "received=%r" % (received,) + ''' - self.wait_helper(test, self.handler, signal.SIGALRM) + self.wait_helper(test, signal.SIGALRM) @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 'need signal.sigtimedwait()') @@ -700,25 +720,30 @@ class PendingSignalsTests(unittest.TestCase): self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1)) self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0)) - def alarm_handler(self, signum, frame): - self.hndl_called = True - @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 'need signal.sigwaitinfo()') def test_sigwaitinfo_interrupted(self): - def test(signum): + test = ''' + import errno + + hndl_called = True + def alarm_handler(signum, frame): + hndl_called = False + + signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(1) try: signal.sigwaitinfo([signal.SIGUSR1]) except OSError as e: if e.errno == errno.EINTR: - self.assertTrue(self.hndl_called) + assert hndl_called, "SIGALRM handler not called" else: - self.fail("Expected EINTR to be raised by sigwaitinfo") + raise Exception("Expected EINTR to be raised by sigwaitinfo") else: - self.fail("Expected EINTR to be raised by sigwaitinfo") + raise Exception("Expected EINTR to be raised by sigwaitinfo") + ''' - self.wait_helper(test, self.alarm_handler, signal.SIGUSR1) + self.wait_helper(test, signal.SIGUSR1) @unittest.skipUnless(hasattr(signal, 'sigwait'), 'need signal.sigwait()') |