From 13e96cc596d158b98996db3fa291086ea4afecd9 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 24 Jun 2017 19:22:23 +0200 Subject: Fix bpo-30596: Add close() method to multiprocessing.Process (#2010) * Fix bpo-30596: Add close() method to multiprocessing.Process * Raise ValueError if close() is called before the Process is finished running * Add docs * Add NEWS blurb --- Doc/library/multiprocessing.rst | 10 ++++++ Lib/multiprocessing/forkserver.py | 16 +++++++--- Lib/multiprocessing/popen_fork.py | 7 ++++- Lib/multiprocessing/popen_forkserver.py | 2 +- Lib/multiprocessing/popen_spawn_posix.py | 2 +- Lib/multiprocessing/popen_spawn_win32.py | 5 ++- Lib/multiprocessing/process.py | 35 +++++++++++++++++++++ Lib/test/_test_multiprocessing.py | 36 ++++++++++++++++++++++ .../2017-06-24-18-55-58.bpo-30596.VhB8iG.rst | 1 + 9 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 6b4a8cb..5265639 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -598,6 +598,16 @@ The :mod:`multiprocessing` package mostly replicates the API of the acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock. + .. method:: close() + + Close the :class:`Process` object, releasing all resources associated + with it. :exc:`ValueError` is raised if the underlying process + is still running. Once :meth:`close` returns successfully, most + other methods and attributes of the :class:`Process` object will + raise :exc:`ValueError`. + + .. versionadded:: 3.7 + Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`, :meth:`terminate` and :attr:`exitcode` methods should only be called by the process that created the process object. diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 7010515..b9f9b9d 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -210,8 +210,12 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): else: assert os.WIFEXITED(sts) returncode = os.WEXITSTATUS(sts) - # Write the exit code to the pipe - write_signed(child_w, returncode) + # Send exit code to client process + try: + write_signed(child_w, returncode) + except BrokenPipeError: + # client vanished + pass os.close(child_w) else: # This shouldn't happen really @@ -241,8 +245,12 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): finally: os._exit(code) else: - # Send pid to client processes - write_signed(child_w, pid) + # Send pid to client process + try: + write_signed(child_w, pid) + except BrokenPipeError: + # client vanished + pass pid_to_fd[pid] = child_w os.close(child_r) for fd in fds: diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index ca28bf3..5af9d91 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -17,6 +17,7 @@ class Popen(object): sys.stdout.flush() sys.stderr.flush() self.returncode = None + self.finalizer = None self._launch(process_obj) def duplicate_for_child(self, fd): @@ -70,5 +71,9 @@ class Popen(object): os._exit(code) else: os.close(child_w) - util.Finalize(self, os.close, (parent_r,)) + self.finalizer = util.Finalize(self, os.close, (parent_r,)) self.sentinel = parent_r + + def close(self): + if self.finalizer is not None: + self.finalizer() diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py index fa8e574..a51a277 100644 --- a/Lib/multiprocessing/popen_forkserver.py +++ b/Lib/multiprocessing/popen_forkserver.py @@ -49,7 +49,7 @@ class Popen(popen_fork.Popen): set_spawning_popen(None) self.sentinel, w = forkserver.connect_to_new_process(self._fds) - util.Finalize(self, os.close, (self.sentinel,)) + self.finalizer = util.Finalize(self, os.close, (self.sentinel,)) with open(w, 'wb', closefd=True) as f: f.write(buf.getbuffer()) self.pid = forkserver.read_signed(self.sentinel) diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 98f8f0a..3815106 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -62,7 +62,7 @@ class Popen(popen_fork.Popen): f.write(fp.getbuffer()) finally: if parent_r is not None: - util.Finalize(self, os.close, (parent_r,)) + self.finalizer = util.Finalize(self, os.close, (parent_r,)) for fd in (child_r, child_w, parent_w): if fd is not None: os.close(fd) diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py index 6fd588f..ecb86e9 100644 --- a/Lib/multiprocessing/popen_spawn_win32.py +++ b/Lib/multiprocessing/popen_spawn_win32.py @@ -56,7 +56,7 @@ class Popen(object): self.returncode = None self._handle = hp self.sentinel = int(hp) - util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) + self.finalizer = util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) # send information to child set_spawning_popen(self) @@ -96,3 +96,6 @@ class Popen(object): except OSError: if self.wait(timeout=1.0) is None: raise + + def close(self): + self.finalizer() diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 37365f2..70bb50d 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -76,6 +76,7 @@ class BaseProcess(object): self._config = _current_process._config.copy() self._parent_pid = os.getpid() self._popen = None + self._closed = False self._target = target self._args = tuple(args) self._kwargs = dict(kwargs) @@ -85,6 +86,10 @@ class BaseProcess(object): self.daemon = daemon _dangling.add(self) + def _check_closed(self): + if self._closed: + raise ValueError("process object is closed") + def run(self): ''' Method to be run in sub-process; can be overridden in sub-class @@ -96,6 +101,7 @@ class BaseProcess(object): ''' Start child process ''' + self._check_closed() assert self._popen is None, 'cannot start a process twice' assert self._parent_pid == os.getpid(), \ 'can only start a process object created by current process' @@ -110,12 +116,14 @@ class BaseProcess(object): ''' Terminate process; sends SIGTERM signal or uses TerminateProcess() ''' + self._check_closed() self._popen.terminate() def join(self, timeout=None): ''' Wait until child process terminates ''' + self._check_closed() assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) @@ -126,6 +134,7 @@ class BaseProcess(object): ''' Return whether process is alive ''' + self._check_closed() if self is _current_process: return True assert self._parent_pid == os.getpid(), 'can only test a child process' @@ -134,6 +143,23 @@ class BaseProcess(object): self._popen.poll() return self._popen.returncode is None + def close(self): + ''' + Close the Process object. + + This method releases resources held by the Process object. It is + an error to call this method if the child process is still running. + ''' + if self._popen is not None: + if self._popen.poll() is None: + raise ValueError("Cannot close a process while it is still running. " + "You should first call join() or terminate().") + self._popen.close() + self._popen = None + del self._sentinel + _children.discard(self) + self._closed = True + @property def name(self): return self._name @@ -174,6 +200,7 @@ class BaseProcess(object): ''' Return exit code of process or `None` if it has yet to stop ''' + self._check_closed() if self._popen is None: return self._popen return self._popen.poll() @@ -183,6 +210,7 @@ class BaseProcess(object): ''' Return identifier (PID) of process or `None` if it has yet to start ''' + self._check_closed() if self is _current_process: return os.getpid() else: @@ -196,6 +224,7 @@ class BaseProcess(object): Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination. ''' + self._check_closed() try: return self._sentinel except AttributeError: @@ -204,6 +233,8 @@ class BaseProcess(object): def __repr__(self): if self is _current_process: status = 'started' + elif self._closed: + status = 'closed' elif self._parent_pid != os.getpid(): status = 'unknown' elif self._popen is None: @@ -295,6 +326,7 @@ class _MainProcess(BaseProcess): self._name = 'MainProcess' self._parent_pid = None self._popen = None + self._closed = False self._config = {'authkey': AuthenticationString(os.urandom(32)), 'semprefix': '/mp'} # Note that some versions of FreeBSD only allow named @@ -307,6 +339,9 @@ class _MainProcess(BaseProcess): # Everything in self._config will be inherited by descendant # processes. + def close(self): + pass + _current_process = _MainProcess() _process_counter = itertools.count(1) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7148ea4..d4a461d 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -403,6 +403,42 @@ class _TestProcess(BaseTestCase): p.join() self.assertTrue(wait_for_handle(sentinel, timeout=1)) + @classmethod + def _test_close(cls, rc=0, q=None): + if q is not None: + q.get() + sys.exit(rc) + + def test_close(self): + if self.TYPE == "threads": + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + q = self.Queue() + p = self.Process(target=self._test_close, kwargs={'q': q}) + p.daemon = True + p.start() + self.assertEqual(p.is_alive(), True) + # Child is still alive, cannot close + with self.assertRaises(ValueError): + p.close() + + q.put(None) + p.join() + self.assertEqual(p.is_alive(), False) + self.assertEqual(p.exitcode, 0) + p.close() + with self.assertRaises(ValueError): + p.is_alive() + with self.assertRaises(ValueError): + p.join() + with self.assertRaises(ValueError): + p.terminate() + p.close() + + wr = weakref.ref(p) + del p + gc.collect() + self.assertIs(wr(), None) + def test_many_processes(self): if self.TYPE == 'threads': self.skipTest('test not appropriate for {}'.format(self.TYPE)) diff --git a/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst b/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst new file mode 100644 index 0000000..6b9e9a1 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst @@ -0,0 +1 @@ +Add a ``close()`` method to ``multiprocessing.Process``. -- cgit v0.12