summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_signal.py
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@haypocalc.com>2011-06-29 08:43:02 (GMT)
committerVictor Stinner <victor.stinner@haypocalc.com>2011-06-29 08:43:02 (GMT)
commit9e8b82f1e1b726bd956568dec3769f771644a8ed (patch)
treed35a7753f586159bb3c952466924abd47f6287ef /Lib/test/test_signal.py
parent470e0dc23aa5699011de131c7f21d24aade3def0 (diff)
downloadcpython-9e8b82f1e1b726bd956568dec3769f771644a8ed.zip
cpython-9e8b82f1e1b726bd956568dec3769f771644a8ed.tar.gz
cpython-9e8b82f1e1b726bd956568dec3769f771644a8ed.tar.bz2
Issue #12303: run sig*wait*() tests in a subprocesss
... instead of using fork(): sig*wait*() functions behave differently (not correctly) after a fork, especially on FreeBSD 6. Skip also test_sigtimedwait_poll() on FreeBSD 6 because of a kernel bug.
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r--Lib/test/test_signal.py137
1 files changed, 81 insertions, 56 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 8d8a15d..311d0f0 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -520,7 +520,6 @@ class PendingSignalsTests(unittest.TestCase):
functions.
"""
def setUp(self):
- self.hndl_called = False
self.has_pthread_kill = hasattr(signal, 'pthread_kill')
def handler(self, signum, frame):
@@ -610,87 +609,108 @@ class PendingSignalsTests(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
- @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
- def wait_helper(self, test, handler, blocked):
- signum = signal.SIGALRM
+ def wait_helper(self, test, blocked):
+ """
+ test: body of the "def test(signum):" function.
+ blocked: number of the blocked signal
+ """
+ code = '''
+import signal
+import sys
- # sig*wait* must be called with the signal blocked: since the current
- # process might have several threads running, we fork() a child process
- # to have a single thread.
- pid = os.fork()
- if pid == 0:
- # child: block and wait the signal
- try:
- signal.signal(signum, handler)
- signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
+def handler(signum, frame):
+ 1/0
- # Do the tests
- test(signum)
+def test(signum):
+%s
- # The handler must not be called on unblock
- try:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
- except ZeroDivisionError:
- print("the signal handler has been called",
- file=sys.stderr)
- os._exit(1)
- except BaseException as err:
- print("error: {}".format(err), file=sys.stderr)
- sys.stderr.flush()
- os._exit(1)
- else:
- os._exit(0)
- else:
- # parent: check that the child correcty received the signal
- self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+blocked = %s
+signum = signal.SIGALRM
+
+# child: block and wait the signal
+try:
+ signal.signal(signum, handler)
+ signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
+
+ # Do the tests
+ test(signum)
+
+ # The handler must not be called on unblock
+ try:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
+ except ZeroDivisionError:
+ print("the signal handler has been called",
+ file=sys.stderr)
+ sys.exit(1)
+except BaseException as err:
+ print("error: {}".format(err), file=sys.stderr)
+ sys.stderr.flush()
+ sys.exit(1)
+''' % (test, blocked)
+
+ # sig*wait* must be called with the signal blocked: since the current
+ # process might have several threads running, use a subprocess to have
+ # a single thread.
+ assert_python_ok('-c', code)
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')
def test_sigwait(self):
- def test(signum):
+ test = '''
signal.alarm(1)
- self.assertEqual(signum, signal.sigwait([signum]))
+ received = signal.sigwait([signum])
+ assert received == signum , 'received %s, not %s' % (received, signum)
+ '''
- self.wait_helper(test, self.handler, signal.SIGALRM)
+ self.wait_helper(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
'need signal.sigwaitinfo()')
def test_sigwaitinfo(self):
- def test(signum):
+ test = '''
signal.alarm(1)
info = signal.sigwaitinfo([signum])
- self.assertEqual(signum, info.si_signo)
+ assert info.si_signo == signum, "info.si_signo != %s" % signum
+ '''
- self.wait_helper(test, self.handler, signal.SIGALRM)
+ self.wait_helper(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()')
def test_sigtimedwait(self):
- def test(signum):
+ test = '''
signal.alarm(1)
info = signal.sigtimedwait([signum], (10, 1000))
- self.assertEqual(signum, info.si_signo)
+ assert info.si_signo == signum, 'info.si_signo != %s' % signum
+ '''
- self.wait_helper(test, self.handler, signal.SIGALRM)
+ self.wait_helper(test, signal.SIGALRM)
- # check that polling with sigtimedwait works
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()')
+ # issue #12303: sigtimedwait() takes 30 seconds on FreeBSD 6 (kernel bug)
+ @unittest.skipIf(sys.platform =='freebsd6',
+ 'sigtimedwait() with a null timeout doens\'t work on FreeBSD 6')
def test_sigtimedwait_poll(self):
- def test(signum):
- self.kill(signum)
+ # check that polling with sigtimedwait works
+ test = '''
+ import os
+ os.kill(os.getpid(), signum)
info = signal.sigtimedwait([signum], (0, 0))
- self.assertEqual(signum, info.si_signo)
+ assert info.si_signo == signum, 'info.si_signo != %s' % signum
+ '''
- self.wait_helper(test, self.handler, signal.SIGALRM)
+ self.wait_helper(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()')
def test_sigtimedwait_timeout(self):
- def test(signum):
- self.assertEqual(None, signal.sigtimedwait([signum], (1, 35500)))
+ test = '''
+ received = signal.sigtimedwait([signum], (1, 0))
+ assert received is None, "received=%r" % (received,)
+ '''
- self.wait_helper(test, self.handler, signal.SIGALRM)
+ self.wait_helper(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
'need signal.sigtimedwait()')
@@ -700,25 +720,30 @@ class PendingSignalsTests(unittest.TestCase):
self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1))
self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0))
- def alarm_handler(self, signum, frame):
- self.hndl_called = True
-
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
'need signal.sigwaitinfo()')
def test_sigwaitinfo_interrupted(self):
- def test(signum):
+ test = '''
+ import errno
+
+ hndl_called = True
+ def alarm_handler(signum, frame):
+ hndl_called = False
+
+ signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(1)
try:
signal.sigwaitinfo([signal.SIGUSR1])
except OSError as e:
if e.errno == errno.EINTR:
- self.assertTrue(self.hndl_called)
+ assert hndl_called, "SIGALRM handler not called"
else:
- self.fail("Expected EINTR to be raised by sigwaitinfo")
+ raise Exception("Expected EINTR to be raised by sigwaitinfo")
else:
- self.fail("Expected EINTR to be raised by sigwaitinfo")
+ raise Exception("Expected EINTR to be raised by sigwaitinfo")
+ '''
- self.wait_helper(test, self.alarm_handler, signal.SIGUSR1)
+ self.wait_helper(test, signal.SIGUSR1)
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')