diff options
-rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 36 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 28 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst | 1 |
3 files changed, 55 insertions, 10 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 82833bc..3c2c3ad 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -44,20 +44,23 @@ class SemaphoreTracker(object): This can be run from any process. Usually a child process will use the semaphore created by its parent.''' with self._lock: - if self._pid is not None: + if self._fd is not None: # semaphore tracker was launched before, is it still running? + if self._check_alive(): + # => still alive + return + # => dead, launch it again + os.close(self._fd) + + # Clean-up to avoid dangling processes. try: - pid, _ = os.waitpid(self._pid, os.WNOHANG) + # _pid can be None if this process is a child from another + # python process, which has started the semaphore_tracker. + if self._pid is not None: + os.waitpid(self._pid, 0) except ChildProcessError: - # The process terminated + # The semaphore_tracker has already been terminated. pass - else: - if not pid: - # => still alive - return - - # => dead, launch it again - os.close(self._fd) self._fd = None self._pid = None @@ -99,6 +102,17 @@ class SemaphoreTracker(object): finally: os.close(r) + def _check_alive(self): + '''Check that the pipe has not been closed by sending a probe.''' + try: + # We cannot use send here as it calls ensure_running, creating + # a cycle. + os.write(self._fd, b'PROBE:0\n') + except OSError: + return False + else: + return True + def register(self, name): '''Register name of semaphore with semaphore tracker.''' self._send('REGISTER', name) @@ -150,6 +164,8 @@ def main(fd): cache.add(name) elif cmd == b'UNREGISTER': cache.remove(name) + elif cmd == b'PROBE': + pass else: raise RuntimeError('unrecognized command %r' % cmd) except Exception: diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 553ab81..836fde8 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4891,6 +4891,34 @@ class TestSemaphoreTracker(unittest.TestCase): # Uncatchable signal. self.check_semaphore_tracker_death(signal.SIGKILL, True) + @staticmethod + def _is_semaphore_tracker_reused(conn, pid): + from multiprocessing.semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + # The pid should be None in the child process, expect for the fork + # context. It should not be a new value. + reused = _semaphore_tracker._pid in (None, pid) + reused &= _semaphore_tracker._check_alive() + conn.send(reused) + + def test_semaphore_tracker_reused(self): + from multiprocessing.semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + pid = _semaphore_tracker._pid + + r, w = multiprocessing.Pipe(duplex=False) + p = multiprocessing.Process(target=self._is_semaphore_tracker_reused, + args=(w, pid)) + p.start() + is_semaphore_tracker_reused = r.recv() + + # Clean up + p.join() + w.close() + r.close() + + self.assertTrue(is_semaphore_tracker_reused) + class TestSimpleQueue(unittest.TestCase): diff --git a/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst new file mode 100644 index 0000000..32ebf4e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst @@ -0,0 +1 @@ +Fix the multiprocessing.semaphore_tracker so it is reused by child processes |