diff options
Diffstat (limited to 'Lib/multiprocessing/forking.py')
-rw-r--r-- | Lib/multiprocessing/forking.py | 189 |
1 files changed, 74 insertions, 115 deletions
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index bc8ac44..af6580d 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -4,32 +4,7 @@ # multiprocessing/forking.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # import os @@ -38,7 +13,7 @@ import signal from multiprocessing import util, process -__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] +__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler'] # # Check that the current thread is spawning a child process @@ -55,18 +30,18 @@ def assert_spawning(self): # Try making some callable types picklable # -from pickle import _Pickler as Pickler +from pickle import Pickler +from copyreg import dispatch_table + class ForkingPickler(Pickler): - dispatch = Pickler.dispatch.copy() + _extra_reducers = {} + def __init__(self, *args): + Pickler.__init__(self, *args) + self.dispatch_table = dispatch_table.copy() + self.dispatch_table.update(self._extra_reducers) @classmethod def register(cls, type, reduce): - def dispatcher(self, obj): - rv = reduce(obj) - if isinstance(rv, str): - self.save_global(obj, rv) - else: - self.save_reduce(obj=obj, *rv) - cls.dispatch[type] = dispatcher + cls._extra_reducers[type] = reduce def _reduce_method(m): if m.__self__ is None: @@ -100,9 +75,6 @@ else: # if sys.platform != 'win32': - import time - - exit = os._exit duplicate = os.dup close = os.close @@ -118,14 +90,23 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() code = process_obj._bootstrap() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -143,26 +124,20 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + from .connection import wait + if not wait([self.sentinel], timeout): + return None + # This shouldn't block if wait() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: try: os.kill(self.pid, signal.SIGTERM) - except OSError as e: + except OSError: if self.wait(timeout=0.1) is None: raise @@ -177,12 +152,9 @@ if sys.platform != 'win32': else: import _thread import msvcrt - import _subprocess - import time + import _winapi - from pickle import dump, load, HIGHEST_PROTOCOL - from _multiprocessing import win32, Connection, PipeConnection - from .util import Finalize + from pickle import load, HIGHEST_PROTOCOL def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) @@ -195,8 +167,7 @@ else: WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") - exit = win32.ExitProcess - close = win32.CloseHandle + close = _winapi.CloseHandle # # _python_exe is the assumed path to the python executable. @@ -218,11 +189,11 @@ else: def duplicate(handle, target_process=None, inheritable=False): if target_process is None: - target_process = _subprocess.GetCurrentProcess() - return _subprocess.DuplicateHandle( - _subprocess.GetCurrentProcess(), handle, target_process, - 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS - ).Detach() + target_process = _winapi.GetCurrentProcess() + return _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), handle, target_process, + 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS + ) # # We define a Popen class similar to the one from subprocess, but @@ -236,6 +207,9 @@ else: _tls = _thread._local() def __init__(self, process_obj): + cmd = ' '.join('"%s"' % x for x in get_command_line()) + prep_data = get_preparation_data(process_obj._name) + # create pipe for communication with child rfd, wfd = os.pipe() @@ -243,30 +217,30 @@ else: rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) os.close(rfd) - # start process - cmd = get_command_line() + [rhandle] - cmd = ' '.join('"%s"' % x for x in cmd) - hp, ht, pid, tid = _subprocess.CreateProcess( - _python_exe, cmd, None, None, 1, 0, None, None, None - ) - ht.Close() - close(rhandle) - - # set attributes of self - self.pid = pid - self.returncode = None - self._handle = hp - - # send information to child - prep_data = get_preparation_data(process_obj._name) - to_child = os.fdopen(wfd, 'wb') - Popen._tls.process_handle = int(hp) - try: - dump(prep_data, to_child, HIGHEST_PROTOCOL) - dump(process_obj, to_child, HIGHEST_PROTOCOL) - finally: - del Popen._tls.process_handle - to_child.close() + with open(wfd, 'wb', closefd=True) as to_child: + # start process + try: + hp, ht, pid, tid = _winapi.CreateProcess( + _python_exe, cmd + (' %s' % rhandle), + None, None, 1, 0, None, None, None + ) + _winapi.CloseHandle(ht) + finally: + close(rhandle) + + # set attributes of self + self.pid = pid + self.returncode = None + self._handle = hp + self.sentinel = int(hp) + + # send information to child + Popen._tls.process_handle = int(hp) + try: + dump(prep_data, to_child, HIGHEST_PROTOCOL) + dump(process_obj, to_child, HIGHEST_PROTOCOL) + finally: + del Popen._tls.process_handle @staticmethod def thread_is_spawning(): @@ -279,13 +253,13 @@ else: def wait(self, timeout=None): if self.returncode is None: if timeout is None: - msecs = _subprocess.INFINITE + msecs = _winapi.INFINITE else: msecs = max(0, int(timeout * 1000 + 0.5)) - res = _subprocess.WaitForSingleObject(int(self._handle), msecs) - if res == _subprocess.WAIT_OBJECT_0: - code = _subprocess.GetExitCodeProcess(self._handle) + res = _winapi.WaitForSingleObject(int(self._handle), msecs) + if res == _winapi.WAIT_OBJECT_0: + code = _winapi.GetExitCodeProcess(self._handle) if code == TERMINATE: code = -signal.SIGTERM self.returncode = code @@ -298,9 +272,9 @@ else: def terminate(self): if self.returncode is None: try: - _subprocess.TerminateProcess(int(self._handle), TERMINATE) - except WindowsError: - if self.wait(timeout=0.1) is None: + _winapi.TerminateProcess(int(self._handle), TERMINATE) + except OSError: + if self.wait(timeout=1.0) is None: raise # @@ -350,7 +324,8 @@ else: return [sys.executable, '--multiprocessing-fork'] else: prog = 'from multiprocessing.forking import main; main()' - return [_python_exe, '-c', prog, '--multiprocessing-fork'] + opts = util._args_from_interpreter_flags() + return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] def main(): @@ -372,7 +347,7 @@ else: from_parent.close() exitcode = self._bootstrap() - exit(exitcode) + sys.exit(exitcode) def get_preparation_data(name): @@ -405,22 +380,6 @@ else: return d - # - # Make (Pipe)Connection picklable - # - - def reduce_connection(conn): - if not Popen.thread_is_spawning(): - raise RuntimeError( - 'By default %s objects can only be shared between processes\n' - 'using inheritance' % type(conn).__name__ - ) - return type(conn), (Popen.duplicate_for_child(conn.fileno()), - conn.readable, conn.writable) - - ForkingPickler.register(Connection, reduce_connection) - ForkingPickler.register(PipeConnection, reduce_connection) - # # Prepare current process # |