summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threadsignals.py
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2017-09-14 20:07:24 (GMT)
committerGitHub <noreply@github.com>2017-09-14 20:07:24 (GMT)
commitff40ecda73178dfcad24e26240d684356ef20793 (patch)
tree0533f9354f838a97a67fc22749b1b1b7374691fb /Lib/test/test_threadsignals.py
parentb8c7be2c523b012e57915182543d06657161057f (diff)
downloadcpython-ff40ecda73178dfcad24e26240d684356ef20793.zip
cpython-ff40ecda73178dfcad24e26240d684356ef20793.tar.gz
cpython-ff40ecda73178dfcad24e26240d684356ef20793.tar.bz2
bpo-31234: Add test.support.wait_threads_exit() (#3578)
Use _thread.count() to wait until threads exit. The new context manager prevents the "dangling thread" warning.
Diffstat (limited to 'Lib/test/test_threadsignals.py')
-rw-r--r--Lib/test/test_threadsignals.py92
1 files changed, 50 insertions, 42 deletions
diff --git a/Lib/test/test_threadsignals.py b/Lib/test/test_threadsignals.py
index 9d92742..f93dd77 100644
--- a/Lib/test/test_threadsignals.py
+++ b/Lib/test/test_threadsignals.py
@@ -4,8 +4,8 @@ import unittest
import signal
import os
import sys
-from test.support import run_unittest, import_module
-thread = import_module('_thread')
+from test import support
+thread = support.import_module('_thread')
import time
if (sys.platform[:3] == 'win'):
@@ -39,13 +39,15 @@ def send_signals():
class ThreadSignals(unittest.TestCase):
def test_signals(self):
- # Test signal handling semantics of threads.
- # We spawn a thread, have the thread send two signals, and
- # wait for it to finish. Check that we got both signals
- # and that they were run by the main thread.
- signalled_all.acquire()
- self.spawnSignallingThread()
- signalled_all.acquire()
+ with support.wait_threads_exit():
+ # Test signal handling semantics of threads.
+ # We spawn a thread, have the thread send two signals, and
+ # wait for it to finish. Check that we got both signals
+ # and that they were run by the main thread.
+ signalled_all.acquire()
+ self.spawnSignallingThread()
+ signalled_all.acquire()
+
# the signals that we asked the kernel to send
# will come back, but we don't know when.
# (it might even be after the thread exits
@@ -115,17 +117,19 @@ class ThreadSignals(unittest.TestCase):
# thread.
def other_thread():
rlock.acquire()
- thread.start_new_thread(other_thread, ())
- # Wait until we can't acquire it without blocking...
- while rlock.acquire(blocking=False):
- rlock.release()
- time.sleep(0.01)
- signal.alarm(1)
- t1 = time.time()
- self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
- dt = time.time() - t1
- # See rationale above in test_lock_acquire_interruption
- self.assertLess(dt, 3.0)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while rlock.acquire(blocking=False):
+ rlock.release()
+ time.sleep(0.01)
+ signal.alarm(1)
+ t1 = time.time()
+ self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
+ dt = time.time() - t1
+ # See rationale above in test_lock_acquire_interruption
+ self.assertLess(dt, 3.0)
finally:
signal.signal(signal.SIGALRM, oldalrm)
@@ -133,6 +137,7 @@ class ThreadSignals(unittest.TestCase):
self.sig_recvd = False
def my_handler(signal, frame):
self.sig_recvd = True
+
old_handler = signal.signal(signal.SIGUSR1, my_handler)
try:
def other_thread():
@@ -147,14 +152,16 @@ class ThreadSignals(unittest.TestCase):
# the lock acquisition. Then we'll let it run.
time.sleep(0.5)
lock.release()
- thread.start_new_thread(other_thread, ())
- # Wait until we can't acquire it without blocking...
- while lock.acquire(blocking=False):
- lock.release()
- time.sleep(0.01)
- result = lock.acquire() # Block while we receive a signal.
- self.assertTrue(self.sig_recvd)
- self.assertTrue(result)
+
+ with support.wait_threads_exit():
+ thread.start_new_thread(other_thread, ())
+ # Wait until we can't acquire it without blocking...
+ while lock.acquire(blocking=False):
+ lock.release()
+ time.sleep(0.01)
+ result = lock.acquire() # Block while we receive a signal.
+ self.assertTrue(self.sig_recvd)
+ self.assertTrue(result)
finally:
signal.signal(signal.SIGUSR1, old_handler)
@@ -193,19 +200,20 @@ class ThreadSignals(unittest.TestCase):
os.kill(process_pid, signal.SIGUSR1)
done.release()
- # Send the signals from the non-main thread, since the main thread
- # is the only one that can process signals.
- thread.start_new_thread(send_signals, ())
- timed_acquire()
- # Wait for thread to finish
- done.acquire()
- # This allows for some timing and scheduling imprecision
- self.assertLess(self.end - self.start, 2.0)
- self.assertGreater(self.end - self.start, 0.3)
- # If the signal is received several times before PyErr_CheckSignals()
- # is called, the handler will get called less than 40 times. Just
- # check it's been called at least once.
- self.assertGreater(self.sigs_recvd, 0)
+ with support.wait_threads_exit():
+ # Send the signals from the non-main thread, since the main thread
+ # is the only one that can process signals.
+ thread.start_new_thread(send_signals, ())
+ timed_acquire()
+ # Wait for thread to finish
+ done.acquire()
+ # This allows for some timing and scheduling imprecision
+ self.assertLess(self.end - self.start, 2.0)
+ self.assertGreater(self.end - self.start, 0.3)
+ # If the signal is received several times before PyErr_CheckSignals()
+ # is called, the handler will get called less than 40 times. Just
+ # check it's been called at least once.
+ self.assertGreater(self.sigs_recvd, 0)
finally:
signal.signal(signal.SIGUSR1, old_handler)
@@ -219,7 +227,7 @@ def test_main():
oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
try:
- run_unittest(ThreadSignals)
+ support.run_unittest(ThreadSignals)
finally:
registerSignals(*oldsigs)