summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/multiprocessing/semaphore_tracker.py36
-rw-r--r--Lib/test/_test_multiprocessing.py28
-rw-r--r--Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst1
3 files changed, 55 insertions, 10 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
index 82833bc..3c2c3ad 100644
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -44,20 +44,23 @@ class SemaphoreTracker(object):
This can be run from any process. Usually a child process will use
the semaphore created by its parent.'''
with self._lock:
- if self._pid is not None:
+ if self._fd is not None:
# semaphore tracker was launched before, is it still running?
+ if self._check_alive():
+ # => still alive
+ return
+ # => dead, launch it again
+ os.close(self._fd)
+
+ # Clean-up to avoid dangling processes.
try:
- pid, _ = os.waitpid(self._pid, os.WNOHANG)
+ # _pid can be None if this process is a child from another
+ # python process, which has started the semaphore_tracker.
+ if self._pid is not None:
+ os.waitpid(self._pid, 0)
except ChildProcessError:
- # The process terminated
+ # The semaphore_tracker has already been terminated.
pass
- else:
- if not pid:
- # => still alive
- return
-
- # => dead, launch it again
- os.close(self._fd)
self._fd = None
self._pid = None
@@ -99,6 +102,17 @@ class SemaphoreTracker(object):
finally:
os.close(r)
+ def _check_alive(self):
+ '''Check that the pipe has not been closed by sending a probe.'''
+ try:
+ # We cannot use send here as it calls ensure_running, creating
+ # a cycle.
+ os.write(self._fd, b'PROBE:0\n')
+ except OSError:
+ return False
+ else:
+ return True
+
def register(self, name):
'''Register name of semaphore with semaphore tracker.'''
self._send('REGISTER', name)
@@ -150,6 +164,8 @@ def main(fd):
cache.add(name)
elif cmd == b'UNREGISTER':
cache.remove(name)
+ elif cmd == b'PROBE':
+ pass
else:
raise RuntimeError('unrecognized command %r' % cmd)
except Exception:
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 553ab81..836fde8 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -4891,6 +4891,34 @@ class TestSemaphoreTracker(unittest.TestCase):
# Uncatchable signal.
self.check_semaphore_tracker_death(signal.SIGKILL, True)
+ @staticmethod
+ def _is_semaphore_tracker_reused(conn, pid):
+ from multiprocessing.semaphore_tracker import _semaphore_tracker
+ _semaphore_tracker.ensure_running()
+ # The pid should be None in the child process, expect for the fork
+ # context. It should not be a new value.
+ reused = _semaphore_tracker._pid in (None, pid)
+ reused &= _semaphore_tracker._check_alive()
+ conn.send(reused)
+
+ def test_semaphore_tracker_reused(self):
+ from multiprocessing.semaphore_tracker import _semaphore_tracker
+ _semaphore_tracker.ensure_running()
+ pid = _semaphore_tracker._pid
+
+ r, w = multiprocessing.Pipe(duplex=False)
+ p = multiprocessing.Process(target=self._is_semaphore_tracker_reused,
+ args=(w, pid))
+ p.start()
+ is_semaphore_tracker_reused = r.recv()
+
+ # Clean up
+ p.join()
+ w.close()
+ r.close()
+
+ self.assertTrue(is_semaphore_tracker_reused)
+
class TestSimpleQueue(unittest.TestCase):
diff --git a/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst
new file mode 100644
index 0000000..32ebf4e
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst
@@ -0,0 +1 @@
+Fix the multiprocessing.semaphore_tracker so it is reused by child processes