diff options
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 27 |
1 files changed, 21 insertions, 6 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 749cf8c..a5509ce 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -20,6 +20,7 @@ import logging import struct import operator import weakref +import warnings import test.support import test.support.script_helper from test import support @@ -4517,17 +4518,19 @@ class TestSemaphoreTracker(unittest.TestCase): # bpo-31310: if the semaphore tracker process has died, it should # be restarted implicitly. from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() pid = _semaphore_tracker._pid + if pid is not None: + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + with warnings.catch_warnings(record=True) as all_warn: + _semaphore_tracker.ensure_running() + pid = _semaphore_tracker._pid + os.kill(pid, signum) time.sleep(1.0) # give it time to die ctx = multiprocessing.get_context("spawn") - with contextlib.ExitStack() as stack: - if should_die: - stack.enter_context(self.assertWarnsRegex( - UserWarning, - "semaphore_tracker: process died")) + with warnings.catch_warnings(record=True) as all_warn: sem = ctx.Semaphore() sem.acquire() sem.release() @@ -4537,11 +4540,23 @@ class TestSemaphoreTracker(unittest.TestCase): del sem gc.collect() self.assertIsNone(wr()) + if should_die: + self.assertEqual(len(all_warn), 1) + the_warn = all_warn[0] + issubclass(the_warn.category, UserWarning) + self.assertTrue("semaphore_tracker: process died" + in str(the_warn.message)) + else: + self.assertEqual(len(all_warn), 0) def test_semaphore_tracker_sigint(self): # Catchable signal (ignored by semaphore tracker) self.check_semaphore_tracker_death(signal.SIGINT, False) + def test_semaphore_tracker_sigterm(self): + # Catchable signal (ignored by semaphore tracker) + self.check_semaphore_tracker_death(signal.SIGTERM, False) + def test_semaphore_tracker_sigkill(self): # Uncatchable signal. self.check_semaphore_tracker_death(signal.SIGKILL, True) |