diff options
author | Antoine Pitrou <pitrou@free.fr> | 2017-11-03 13:58:37 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-03 13:58:37 (GMT) |
commit | b5f09acf0a0219cec32b7eba3acdcb573fc74ab5 (patch) | |
tree | 2c16266d244c854a60cf4343641b4702817279f1 /Lib | |
parent | f8b3f6b178e48773cd7298141cbaf408c6917e41 (diff) | |
download | cpython-b5f09acf0a0219cec32b7eba3acdcb573fc74ab5.zip cpython-b5f09acf0a0219cec32b7eba3acdcb573fc74ab5.tar.gz cpython-b5f09acf0a0219cec32b7eba3acdcb573fc74ab5.tar.bz2 |
[3.6] bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (GH-3247) (#4254)
* bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed
* Avoid mucking with process state in test.
Add a warning if the semaphore process died, as semaphores may then be leaked.
* Add NEWS entry
(cherry picked from commit cbe1756)
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 20 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 43 |
2 files changed, 56 insertions, 7 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index de7738e..3b50a46 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -29,6 +29,7 @@ class SemaphoreTracker(object): def __init__(self): self._lock = threading.Lock() self._fd = None + self._pid = None def getfd(self): self.ensure_running() @@ -40,8 +41,20 @@ class SemaphoreTracker(object): This can be run from any process. Usually a child process will use the semaphore created by its parent.''' with self._lock: - if self._fd is not None: - return + if self._pid is not None: + # semaphore tracker was launched before, is it still running? + pid, status = os.waitpid(self._pid, os.WNOHANG) + if not pid: + # => still alive + return + # => dead, launch it again + os.close(self._fd) + self._fd = None + self._pid = None + + warnings.warn('semaphore_tracker: process died unexpectedly, ' + 'relaunching. Some semaphores might leak.') + fds_to_pass = [] try: fds_to_pass.append(sys.stderr.fileno()) @@ -55,12 +68,13 @@ class SemaphoreTracker(object): exe = spawn.get_executable() args = [exe] + util._args_from_interpreter_flags() args += ['-c', cmd % r] - util.spawnv_passfds(exe, args, fds_to_pass) + pid = util.spawnv_passfds(exe, args, fds_to_pass) except: os.close(w) raise else: self._fd = w + self._pid = pid finally: os.close(r) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 056474b..f01c004 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4,6 +4,7 @@ import unittest import queue as pyqueue +import contextlib import time import io import itertools @@ -4125,14 +4126,14 @@ class TestStartMethod(unittest.TestCase): self.fail("failed spawning forkserver or grandchild") -# -# Check that killing process does not leak named semaphores -# - @unittest.skipIf(sys.platform == "win32", "test semantics don't make sense on Windows") class TestSemaphoreTracker(unittest.TestCase): + def test_semaphore_tracker(self): + # + # Check that killing process does not leak named semaphores + # import subprocess cmd = '''if 1: import multiprocessing as mp, time, os @@ -4166,6 +4167,40 @@ class TestSemaphoreTracker(unittest.TestCase): self.assertRegex(err, expected) self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) + def check_semaphore_tracker_death(self, signum, should_die): + # bpo-31310: if the semaphore tracker process has died, it should + # be restarted implicitly. + from multiprocessing.semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + pid = _semaphore_tracker._pid + os.kill(pid, signum) + time.sleep(1.0) # give it time to die + + ctx = multiprocessing.get_context("spawn") + with contextlib.ExitStack() as stack: + if should_die: + stack.enter_context(self.assertWarnsRegex( + UserWarning, + "semaphore_tracker: process died")) + sem = ctx.Semaphore() + sem.acquire() + sem.release() + wr = weakref.ref(sem) + # ensure `sem` gets collected, which triggers communication with + # the semaphore tracker + del sem + gc.collect() + self.assertIsNone(wr()) + + def test_semaphore_tracker_sigint(self): + # Catchable signal (ignored by semaphore tracker) + self.check_semaphore_tracker_death(signal.SIGINT, False) + + def test_semaphore_tracker_sigkill(self): + # Uncatchable signal. + self.check_semaphore_tracker_death(signal.SIGKILL, True) + + class TestSimpleQueue(unittest.TestCase): @classmethod |