diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/context.py | 1 | ||||
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 3 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_fork.py | 8 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_forkserver.py | 6 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_posix.py | 10 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 52 | ||||
-rw-r--r-- | Lib/multiprocessing/spawn.py | 19 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 6 |
8 files changed, 85 insertions, 20 deletions
diff --git a/Lib/multiprocessing/context.py b/Lib/multiprocessing/context.py index 871746b..5a48657 100644 --- a/Lib/multiprocessing/context.py +++ b/Lib/multiprocessing/context.py @@ -35,6 +35,7 @@ class BaseContext(object): AuthenticationError = AuthenticationError current_process = staticmethod(process.current_process) + parent_process = staticmethod(process.parent_process) active_children = staticmethod(process.active_children) def cpu_count(self): diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index dabf7bc..9b63986 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -294,7 +294,8 @@ def _serve_one(child_r, fds, unused_fds, handlers): *_forkserver._inherited_fds) = fds # Run process object received over pipe - code = spawn._main(child_r) + parent_sentinel = os.dup(child_r) + code = spawn._main(child_r, parent_sentinel) return code diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index 685e8da..11e2160 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -66,16 +66,20 @@ class Popen(object): def _launch(self, process_obj): code = 1 parent_r, child_w = os.pipe() + child_r, parent_w = os.pipe() self.pid = os.fork() if self.pid == 0: try: os.close(parent_r) - code = process_obj._bootstrap() + os.close(parent_w) + code = process_obj._bootstrap(parent_sentinel=child_r) finally: os._exit(code) else: os.close(child_w) - self.finalizer = util.Finalize(self, os.close, (parent_r,)) + os.close(child_r) + self.finalizer = util.Finalize(self, util.close_fds, + (parent_r, parent_w,)) self.sentinel = parent_r def close(self): diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py index a51a277..a56eb9b 100644 --- a/Lib/multiprocessing/popen_forkserver.py +++ b/Lib/multiprocessing/popen_forkserver.py @@ -49,7 +49,11 @@ class Popen(popen_fork.Popen): set_spawning_popen(None) self.sentinel, w = forkserver.connect_to_new_process(self._fds) - self.finalizer = util.Finalize(self, os.close, (self.sentinel,)) + # Keep a duplicate of the data pipe's write end as a sentinel of the + # parent process used by the child process. + _parent_w = os.dup(w) + self.finalizer = util.Finalize(self, util.close_fds, + (_parent_w, self.sentinel)) with open(w, 'wb', closefd=True) as f: f.write(buf.getbuffer()) self.pid = forkserver.read_signed(self.sentinel) diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 59f8e45..24b8634 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -61,8 +61,12 @@ class Popen(popen_fork.Popen): with open(parent_w, 'wb', closefd=False) as f: f.write(fp.getbuffer()) finally: - if parent_r is not None: - self.finalizer = util.Finalize(self, os.close, (parent_r,)) - for fd in (child_r, child_w, parent_w): + fds_to_close = [] + for fd in (parent_r, parent_w): + if fd is not None: + fds_to_close.append(fd) + self.finalizer = util.Finalize(self, util.close_fds, fds_to_close) + + for fd in (child_r, child_w): if fd is not None: os.close(fd) diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 780f2d0..c62c826 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -7,7 +7,8 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = ['BaseProcess', 'current_process', 'active_children'] +__all__ = ['BaseProcess', 'current_process', 'active_children', + 'parent_process'] # # Imports @@ -46,6 +47,13 @@ def active_children(): _cleanup() return list(_children) + +def parent_process(): + ''' + Return process object representing the parent process + ''' + return _parent_process + # # # @@ -76,6 +84,7 @@ class BaseProcess(object): self._identity = _current_process._identity + (count,) self._config = _current_process._config.copy() self._parent_pid = os.getpid() + self._parent_name = _current_process.name self._popen = None self._closed = False self._target = target @@ -278,9 +287,9 @@ class BaseProcess(object): ## - def _bootstrap(self): + def _bootstrap(self, parent_sentinel=None): from . import util, context - global _current_process, _process_counter, _children + global _current_process, _parent_process, _process_counter, _children try: if self._start_method is not None: @@ -290,6 +299,8 @@ class BaseProcess(object): util._close_stdin() old_process = _current_process _current_process = self + _parent_process = _ParentProcess( + self._parent_name, self._parent_pid, parent_sentinel) try: util._finalizer_registry.clear() util._run_after_forkers() @@ -337,6 +348,40 @@ class AuthenticationString(bytes): ) return AuthenticationString, (bytes(self),) + +# +# Create object representing the parent process +# + +class _ParentProcess(BaseProcess): + + def __init__(self, name, pid, sentinel): + self._identity = () + self._name = name + self._pid = pid + self._parent_pid = None + self._popen = None + self._closed = False + self._sentinel = sentinel + self._config = {} + + def is_alive(self): + from multiprocessing.connection import wait + return not wait([self._sentinel], timeout=0) + + @property + def ident(self): + return self._pid + + def join(self, timeout=None): + ''' + Wait until parent process terminates + ''' + from multiprocessing.connection import wait + wait([self._sentinel], timeout=timeout) + + pid = ident + # # Create object representing the main process # @@ -365,6 +410,7 @@ class _MainProcess(BaseProcess): pass +_parent_process = None _current_process = _MainProcess() _process_counter = itertools.count(1) _children = set() diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index f66b5aa..7cc129e 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -100,25 +100,24 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): if parent_pid is not None: source_process = _winapi.OpenProcess( - _winapi.PROCESS_DUP_HANDLE, False, parent_pid) + _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE, + False, parent_pid) else: source_process = None - try: - new_handle = reduction.duplicate(pipe_handle, - source_process=source_process) - finally: - if source_process is not None: - _winapi.CloseHandle(source_process) + new_handle = reduction.duplicate(pipe_handle, + source_process=source_process) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) + parent_sentinel = source_process else: from . import resource_tracker resource_tracker._resource_tracker._fd = tracker_fd fd = pipe_handle - exitcode = _main(fd) + parent_sentinel = os.dup(pipe_handle) + exitcode = _main(fd, parent_sentinel) sys.exit(exitcode) -def _main(fd): +def _main(fd, parent_sentinel): with os.fdopen(fd, 'rb', closefd=True) as from_parent: process.current_process()._inheriting = True try: @@ -127,7 +126,7 @@ def _main(fd): self = reduction.pickle.load(from_parent) finally: del process.current_process()._inheriting - return self._bootstrap() + return self._bootstrap(parent_sentinel) def _check_not_importing_main(): diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 0c4eb24..5674ad7 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -421,3 +421,9 @@ def spawnv_passfds(path, args, passfds): finally: os.close(errpipe_read) os.close(errpipe_write) + + +def close_fds(*fds): + """Close each file descriptor given as an argument""" + for fd in fds: + os.close(fd) |