summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_signal.py
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@haypocalc.com>2011-06-10 10:48:13 (GMT)
committerVictor Stinner <victor.stinner@haypocalc.com>2011-06-10 10:48:13 (GMT)
commitaf4946020e92556052782fcc95075dd8d694dbb5 (patch)
tree2e9d61f2e5c5bbc863f4c943aa433bd3f64f798a /Lib/test/test_signal.py
parentb0ae53d8a09731a51be48f9e84a71d09d0f90657 (diff)
downloadcpython-af4946020e92556052782fcc95075dd8d694dbb5.zip
cpython-af4946020e92556052782fcc95075dd8d694dbb5.tar.gz
cpython-af4946020e92556052782fcc95075dd8d694dbb5.tar.bz2
Issue #8407: Make signal.sigwait() tests more reliable
Block the signal before calling sigwait(). Use os.fork() to ensure that we have only one thread. Initial patch written by Charles-François Natali.
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r--Lib/test/test_signal.py67
1 files changed, 53 insertions, 14 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py
index 1ed4b9e..ea40627 100644
--- a/Lib/test/test_signal.py
+++ b/Lib/test/test_signal.py
@@ -598,34 +598,73 @@ class PendingSignalsTests(unittest.TestCase):
with self.assertRaises(ZeroDivisionError):
signal.pthread_kill(current, signum)
+ def check_sigwait(self, test, signum):
+ # sigwait must be called with the signal blocked: since the current
+ # process might have several threads running, we fork() a child process
+ # to have a single thread.
+ pid = os.fork()
+ if pid == 0:
+ # child: block and wait the signal
+ try:
+ signal.signal(signum, self.handler)
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+
+ # Do the tests
+ test(signum)
+
+ # The handler must not be called on unblock
+ try:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ except ZeroDivisionError:
+ print("the signal handler has been called",
+ file=sys.stderr)
+ os._exit(1)
+
+ os._exit(0)
+ finally:
+ os._exit(1)
+ else:
+ # parent: let the child some time to wait, send him the signal, and
+ # check it correcty received it
+ self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')
+ @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
def test_sigwait(self):
- old_handler = signal.signal(signal.SIGALRM, self.handler)
- self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+ def test(signum):
+ signal.alarm(1)
+ received = signal.sigwait([signum])
+ if received != signum:
+ print("sigwait() received %s, not %s"
+ % (received, signum),
+ file=sys.stderr)
+ os._exit(1)
- signal.alarm(1)
- self.assertEqual(signal.sigwait([signal.SIGALRM]), signal.SIGALRM)
+ self.check_sigwait(test, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')
@unittest.skipIf(threading is None, "test needs threading module")
+ @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
def test_sigwait_thread(self):
- signum = signal.SIGUSR1
- old_handler = signal.signal(signum, self.handler)
- self.addCleanup(signal.signal, signum, old_handler)
-
- def kill_later():
+ def kill_later(signum):
+ # wait until the main thread is waiting in sigwait()
time.sleep(1)
os.kill(os.getpid(), signum)
- killer = threading.Thread(target=kill_later)
- killer.start()
- try:
- self.assertEqual(signal.sigwait([signum]), signum)
- finally:
+ def test(signum):
+ killer = threading.Thread(target=kill_later, args=(signum,))
+ killer.start()
+ received = signal.sigwait([signum])
+ if received != signum:
+ print("sigwait() received %s, not %s" % (received, signum),
+ file=sys.stderr)
+ os._exit(1)
killer.join()
+ self.check_sigwait(test, signal.SIGUSR1)
+
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
def test_pthread_sigmask_arguments(self):