diff options
-rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 34 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 27 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2018-07-31-23-33-06.bpo-33613.Cdnt0i.rst | 3 |
3 files changed, 52 insertions, 12 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 3e31bf8..82833bc 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -23,6 +23,9 @@ from . import util __all__ = ['ensure_running', 'register', 'unregister'] +_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') +_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) + class SemaphoreTracker(object): @@ -43,10 +46,16 @@ class SemaphoreTracker(object): with self._lock: if self._pid is not None: # semaphore tracker was launched before, is it still running? - pid, status = os.waitpid(self._pid, os.WNOHANG) - if not pid: - # => still alive - return + try: + pid, _ = os.waitpid(self._pid, os.WNOHANG) + except ChildProcessError: + # The process terminated + pass + else: + if not pid: + # => still alive + return + # => dead, launch it again os.close(self._fd) self._fd = None @@ -68,7 +77,19 @@ class SemaphoreTracker(object): exe = spawn.get_executable() args = [exe] + util._args_from_interpreter_flags() args += ['-c', cmd % r] - pid = util.spawnv_passfds(exe, args, fds_to_pass) + # bpo-33613: Register a signal mask that will block the signals. + # This signal mask will be inherited by the child that is going + # to be spawned and will protect the child from a race condition + # that can make the child die before it registers signal handlers + # for SIGINT and SIGTERM. The mask is unregistered after spawning + # the child. + try: + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) + pid = util.spawnv_passfds(exe, args, fds_to_pass) + finally: + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) except: os.close(w) raise @@ -104,12 +125,13 @@ register = _semaphore_tracker.register unregister = _semaphore_tracker.unregister getfd = _semaphore_tracker.getfd - def main(fd): '''Run semaphore tracker.''' # protect the process from ^C and "killall python" etc signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) for f in (sys.stdin, sys.stdout): try: diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 749cf8c..a5509ce 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -20,6 +20,7 @@ import logging import struct import operator import weakref +import warnings import test.support import test.support.script_helper from test import support @@ -4517,17 +4518,19 @@ class TestSemaphoreTracker(unittest.TestCase): # bpo-31310: if the semaphore tracker process has died, it should # be restarted implicitly. from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() pid = _semaphore_tracker._pid + if pid is not None: + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + with warnings.catch_warnings(record=True) as all_warn: + _semaphore_tracker.ensure_running() + pid = _semaphore_tracker._pid + os.kill(pid, signum) time.sleep(1.0) # give it time to die ctx = multiprocessing.get_context("spawn") - with contextlib.ExitStack() as stack: - if should_die: - stack.enter_context(self.assertWarnsRegex( - UserWarning, - "semaphore_tracker: process died")) + with warnings.catch_warnings(record=True) as all_warn: sem = ctx.Semaphore() sem.acquire() sem.release() @@ -4537,11 +4540,23 @@ class TestSemaphoreTracker(unittest.TestCase): del sem gc.collect() self.assertIsNone(wr()) + if should_die: + self.assertEqual(len(all_warn), 1) + the_warn = all_warn[0] + issubclass(the_warn.category, UserWarning) + self.assertTrue("semaphore_tracker: process died" + in str(the_warn.message)) + else: + self.assertEqual(len(all_warn), 0) def test_semaphore_tracker_sigint(self): # Catchable signal (ignored by semaphore tracker) self.check_semaphore_tracker_death(signal.SIGINT, False) + def test_semaphore_tracker_sigterm(self): + # Catchable signal (ignored by semaphore tracker) + self.check_semaphore_tracker_death(signal.SIGTERM, False) + def test_semaphore_tracker_sigkill(self): # Uncatchable signal. self.check_semaphore_tracker_death(signal.SIGKILL, True) diff --git a/Misc/NEWS.d/next/Library/2018-07-31-23-33-06.bpo-33613.Cdnt0i.rst b/Misc/NEWS.d/next/Library/2018-07-31-23-33-06.bpo-33613.Cdnt0i.rst new file mode 100644 index 0000000..9574e43 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-07-31-23-33-06.bpo-33613.Cdnt0i.rst @@ -0,0 +1,3 @@ +Fix a race condition in ``multiprocessing.semaphore_tracker`` when the +tracker receives SIGINT before it can register signal handlers for ignoring +it. |