summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/multiprocessing/semaphore_tracker.py20
-rw-r--r--Lib/test/_test_multiprocessing.py43
-rw-r--r--Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst1
3 files changed, 57 insertions, 7 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
index de7738e..3b50a46 100644
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -29,6 +29,7 @@ class SemaphoreTracker(object):
def __init__(self):
self._lock = threading.Lock()
self._fd = None
+ self._pid = None
def getfd(self):
self.ensure_running()
@@ -40,8 +41,20 @@ class SemaphoreTracker(object):
This can be run from any process. Usually a child process will use
the semaphore created by its parent.'''
with self._lock:
- if self._fd is not None:
- return
+ if self._pid is not None:
+ # semaphore tracker was launched before, is it still running?
+ pid, status = os.waitpid(self._pid, os.WNOHANG)
+ if not pid:
+ # => still alive
+ return
+ # => dead, launch it again
+ os.close(self._fd)
+ self._fd = None
+ self._pid = None
+
+ warnings.warn('semaphore_tracker: process died unexpectedly, '
+ 'relaunching. Some semaphores might leak.')
+
fds_to_pass = []
try:
fds_to_pass.append(sys.stderr.fileno())
@@ -55,12 +68,13 @@ class SemaphoreTracker(object):
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd % r]
- util.spawnv_passfds(exe, args, fds_to_pass)
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
except:
os.close(w)
raise
else:
self._fd = w
+ self._pid = pid
finally:
os.close(r)
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 056474b..f01c004 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -4,6 +4,7 @@
import unittest
import queue as pyqueue
+import contextlib
import time
import io
import itertools
@@ -4125,14 +4126,14 @@ class TestStartMethod(unittest.TestCase):
self.fail("failed spawning forkserver or grandchild")
-#
-# Check that killing process does not leak named semaphores
-#
-
@unittest.skipIf(sys.platform == "win32",
"test semantics don't make sense on Windows")
class TestSemaphoreTracker(unittest.TestCase):
+
def test_semaphore_tracker(self):
+ #
+ # Check that killing process does not leak named semaphores
+ #
import subprocess
cmd = '''if 1:
import multiprocessing as mp, time, os
@@ -4166,6 +4167,40 @@ class TestSemaphoreTracker(unittest.TestCase):
self.assertRegex(err, expected)
self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
+ def check_semaphore_tracker_death(self, signum, should_die):
+ # bpo-31310: if the semaphore tracker process has died, it should
+ # be restarted implicitly.
+ from multiprocessing.semaphore_tracker import _semaphore_tracker
+ _semaphore_tracker.ensure_running()
+ pid = _semaphore_tracker._pid
+ os.kill(pid, signum)
+ time.sleep(1.0) # give it time to die
+
+ ctx = multiprocessing.get_context("spawn")
+ with contextlib.ExitStack() as stack:
+ if should_die:
+ stack.enter_context(self.assertWarnsRegex(
+ UserWarning,
+ "semaphore_tracker: process died"))
+ sem = ctx.Semaphore()
+ sem.acquire()
+ sem.release()
+ wr = weakref.ref(sem)
+ # ensure `sem` gets collected, which triggers communication with
+ # the semaphore tracker
+ del sem
+ gc.collect()
+ self.assertIsNone(wr())
+
+ def test_semaphore_tracker_sigint(self):
+ # Catchable signal (ignored by semaphore tracker)
+ self.check_semaphore_tracker_death(signal.SIGINT, False)
+
+ def test_semaphore_tracker_sigkill(self):
+ # Uncatchable signal.
+ self.check_semaphore_tracker_death(signal.SIGKILL, True)
+
+
class TestSimpleQueue(unittest.TestCase):
@classmethod
diff --git a/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst
new file mode 100644
index 0000000..4d340f0
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst
@@ -0,0 +1 @@
+multiprocessing's semaphore tracker should be launched again if crashed.