From c09a9f56c08d80567454cae6f78f738a89e1ae94 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Mon, 20 May 2019 21:37:05 +0200 Subject: bpo-36888: Add multiprocessing.parent_process() (GH-13247) --- Doc/library/multiprocessing.rst | 8 +++ Lib/multiprocessing/context.py | 1 + Lib/multiprocessing/forkserver.py | 3 +- Lib/multiprocessing/popen_fork.py | 8 ++- Lib/multiprocessing/popen_forkserver.py | 6 ++- Lib/multiprocessing/popen_spawn_posix.py | 10 ++-- Lib/multiprocessing/process.py | 52 +++++++++++++++++-- Lib/multiprocessing/spawn.py | 19 ++++--- Lib/multiprocessing/util.py | 6 +++ Lib/test/_test_multiprocessing.py | 59 ++++++++++++++++++++++ .../2019-05-16-18-02-08.bpo-36888.-H2Dkm.rst | 2 + Modules/_winapi.c | 1 + 12 files changed, 155 insertions(+), 20 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2019-05-16-18-02-08.bpo-36888.-H2Dkm.rst diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index c6ffb00..cc6dd4e 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -944,6 +944,14 @@ Miscellaneous An analogue of :func:`threading.current_thread`. +.. function:: parent_process() + + Return the :class:`Process` object corresponding to the parent process of + the :func:`current_process`. For the main process, ``parent_process`` will + be ``None``. + + .. versionadded:: 3.8 + .. function:: freeze_support() Add support for when a program which uses :mod:`multiprocessing` has been diff --git a/Lib/multiprocessing/context.py b/Lib/multiprocessing/context.py index 871746b..5a48657 100644 --- a/Lib/multiprocessing/context.py +++ b/Lib/multiprocessing/context.py @@ -35,6 +35,7 @@ class BaseContext(object): AuthenticationError = AuthenticationError current_process = staticmethod(process.current_process) + parent_process = staticmethod(process.parent_process) active_children = staticmethod(process.active_children) def cpu_count(self): diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index dabf7bc..9b63986 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -294,7 +294,8 @@ def _serve_one(child_r, fds, unused_fds, handlers): *_forkserver._inherited_fds) = fds # Run process object received over pipe - code = spawn._main(child_r) + parent_sentinel = os.dup(child_r) + code = spawn._main(child_r, parent_sentinel) return code diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index 685e8da..11e2160 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -66,16 +66,20 @@ class Popen(object): def _launch(self, process_obj): code = 1 parent_r, child_w = os.pipe() + child_r, parent_w = os.pipe() self.pid = os.fork() if self.pid == 0: try: os.close(parent_r) - code = process_obj._bootstrap() + os.close(parent_w) + code = process_obj._bootstrap(parent_sentinel=child_r) finally: os._exit(code) else: os.close(child_w) - self.finalizer = util.Finalize(self, os.close, (parent_r,)) + os.close(child_r) + self.finalizer = util.Finalize(self, util.close_fds, + (parent_r, parent_w,)) self.sentinel = parent_r def close(self): diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py index a51a277..a56eb9b 100644 --- a/Lib/multiprocessing/popen_forkserver.py +++ b/Lib/multiprocessing/popen_forkserver.py @@ -49,7 +49,11 @@ class Popen(popen_fork.Popen): set_spawning_popen(None) self.sentinel, w = forkserver.connect_to_new_process(self._fds) - self.finalizer = util.Finalize(self, os.close, (self.sentinel,)) + # Keep a duplicate of the data pipe's write end as a sentinel of the + # parent process used by the child process. + _parent_w = os.dup(w) + self.finalizer = util.Finalize(self, util.close_fds, + (_parent_w, self.sentinel)) with open(w, 'wb', closefd=True) as f: f.write(buf.getbuffer()) self.pid = forkserver.read_signed(self.sentinel) diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 59f8e45..24b8634 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -61,8 +61,12 @@ class Popen(popen_fork.Popen): with open(parent_w, 'wb', closefd=False) as f: f.write(fp.getbuffer()) finally: - if parent_r is not None: - self.finalizer = util.Finalize(self, os.close, (parent_r,)) - for fd in (child_r, child_w, parent_w): + fds_to_close = [] + for fd in (parent_r, parent_w): + if fd is not None: + fds_to_close.append(fd) + self.finalizer = util.Finalize(self, util.close_fds, fds_to_close) + + for fd in (child_r, child_w): if fd is not None: os.close(fd) diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 780f2d0..c62c826 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -7,7 +7,8 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = ['BaseProcess', 'current_process', 'active_children'] +__all__ = ['BaseProcess', 'current_process', 'active_children', + 'parent_process'] # # Imports @@ -46,6 +47,13 @@ def active_children(): _cleanup() return list(_children) + +def parent_process(): + ''' + Return process object representing the parent process + ''' + return _parent_process + # # # @@ -76,6 +84,7 @@ class BaseProcess(object): self._identity = _current_process._identity + (count,) self._config = _current_process._config.copy() self._parent_pid = os.getpid() + self._parent_name = _current_process.name self._popen = None self._closed = False self._target = target @@ -278,9 +287,9 @@ class BaseProcess(object): ## - def _bootstrap(self): + def _bootstrap(self, parent_sentinel=None): from . import util, context - global _current_process, _process_counter, _children + global _current_process, _parent_process, _process_counter, _children try: if self._start_method is not None: @@ -290,6 +299,8 @@ class BaseProcess(object): util._close_stdin() old_process = _current_process _current_process = self + _parent_process = _ParentProcess( + self._parent_name, self._parent_pid, parent_sentinel) try: util._finalizer_registry.clear() util._run_after_forkers() @@ -337,6 +348,40 @@ class AuthenticationString(bytes): ) return AuthenticationString, (bytes(self),) + +# +# Create object representing the parent process +# + +class _ParentProcess(BaseProcess): + + def __init__(self, name, pid, sentinel): + self._identity = () + self._name = name + self._pid = pid + self._parent_pid = None + self._popen = None + self._closed = False + self._sentinel = sentinel + self._config = {} + + def is_alive(self): + from multiprocessing.connection import wait + return not wait([self._sentinel], timeout=0) + + @property + def ident(self): + return self._pid + + def join(self, timeout=None): + ''' + Wait until parent process terminates + ''' + from multiprocessing.connection import wait + wait([self._sentinel], timeout=timeout) + + pid = ident + # # Create object representing the main process # @@ -365,6 +410,7 @@ class _MainProcess(BaseProcess): pass +_parent_process = None _current_process = _MainProcess() _process_counter = itertools.count(1) _children = set() diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index f66b5aa..7cc129e 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -100,25 +100,24 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): if parent_pid is not None: source_process = _winapi.OpenProcess( - _winapi.PROCESS_DUP_HANDLE, False, parent_pid) + _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE, + False, parent_pid) else: source_process = None - try: - new_handle = reduction.duplicate(pipe_handle, - source_process=source_process) - finally: - if source_process is not None: - _winapi.CloseHandle(source_process) + new_handle = reduction.duplicate(pipe_handle, + source_process=source_process) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) + parent_sentinel = source_process else: from . import resource_tracker resource_tracker._resource_tracker._fd = tracker_fd fd = pipe_handle - exitcode = _main(fd) + parent_sentinel = os.dup(pipe_handle) + exitcode = _main(fd, parent_sentinel) sys.exit(exitcode) -def _main(fd): +def _main(fd, parent_sentinel): with os.fdopen(fd, 'rb', closefd=True) as from_parent: process.current_process()._inheriting = True try: @@ -127,7 +126,7 @@ def _main(fd): self = reduction.pickle.load(from_parent) finally: del process.current_process()._inheriting - return self._bootstrap() + return self._bootstrap(parent_sentinel) def _check_not_importing_main(): diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 0c4eb24..5674ad7 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -421,3 +421,9 @@ def spawnv_passfds(path, args, passfds): finally: os.close(errpipe_read) os.close(errpipe_write) + + +def close_fds(*fds): + """Close each file descriptor given as an argument""" + for fd in fds: + os.close(fd) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 78ec53b..071b54a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -269,6 +269,64 @@ class _TestProcess(BaseTestCase): q.put(bytes(current.authkey)) q.put(current.pid) + def test_parent_process_attributes(self): + if self.TYPE == "threads": + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + self.assertIsNone(self.parent_process()) + + rconn, wconn = self.Pipe(duplex=False) + p = self.Process(target=self._test_send_parent_process, args=(wconn,)) + p.start() + p.join() + parent_pid, parent_name = rconn.recv() + self.assertEqual(parent_pid, self.current_process().pid) + self.assertEqual(parent_pid, os.getpid()) + self.assertEqual(parent_name, self.current_process().name) + + @classmethod + def _test_send_parent_process(cls, wconn): + from multiprocessing.process import parent_process + wconn.send([parent_process().pid, parent_process().name]) + + def test_parent_process(self): + if self.TYPE == "threads": + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + # Launch a child process. Make it launch a grandchild process. Kill the + # child process and make sure that the grandchild notices the death of + # its parent (a.k.a the child process). + rconn, wconn = self.Pipe(duplex=False) + p = self.Process( + target=self._test_create_grandchild_process, args=(wconn, )) + p.start() + + if not rconn.poll(timeout=5): + raise AssertionError("Could not communicate with child process") + parent_process_status = rconn.recv() + self.assertEqual(parent_process_status, "alive") + + p.terminate() + p.join() + + if not rconn.poll(timeout=5): + raise AssertionError("Could not communicate with child process") + parent_process_status = rconn.recv() + self.assertEqual(parent_process_status, "not alive") + + @classmethod + def _test_create_grandchild_process(cls, wconn): + p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) + p.start() + time.sleep(100) + + @classmethod + def _test_report_parent_status(cls, wconn): + from multiprocessing.process import parent_process + wconn.send("alive" if parent_process().is_alive() else "not alive") + parent_process().join(timeout=5) + wconn.send("alive" if parent_process().is_alive() else "not alive") + def test_process(self): q = self.Queue(1) e = self.Event() @@ -5398,6 +5456,7 @@ class ProcessesMixin(BaseMixin): Process = multiprocessing.Process connection = multiprocessing.connection current_process = staticmethod(multiprocessing.current_process) + parent_process = staticmethod(multiprocessing.parent_process) active_children = staticmethod(multiprocessing.active_children) Pool = staticmethod(multiprocessing.Pool) Pipe = staticmethod(multiprocessing.Pipe) diff --git a/Misc/NEWS.d/next/Library/2019-05-16-18-02-08.bpo-36888.-H2Dkm.rst b/Misc/NEWS.d/next/Library/2019-05-16-18-02-08.bpo-36888.-H2Dkm.rst new file mode 100644 index 0000000..e7b5467 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-16-18-02-08.bpo-36888.-H2Dkm.rst @@ -0,0 +1,2 @@ +Python child processes can now access the status of their parent process +using multiprocessing.process.parent_process diff --git a/Modules/_winapi.c b/Modules/_winapi.c index 2eb708e..8873519 100644 --- a/Modules/_winapi.c +++ b/Modules/_winapi.c @@ -1955,6 +1955,7 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, PIPE_UNLIMITED_INSTANCES); WINAPI_CONSTANT(F_DWORD, PIPE_WAIT); WINAPI_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS); + WINAPI_CONSTANT(F_DWORD, SYNCHRONIZE); WINAPI_CONSTANT(F_DWORD, PROCESS_DUP_HANDLE); WINAPI_CONSTANT(F_DWORD, SEC_COMMIT); WINAPI_CONSTANT(F_DWORD, SEC_IMAGE); -- cgit v0.12