diff options
Diffstat (limited to 'Lib/multiprocessing/forking.py')
-rw-r--r-- | Lib/multiprocessing/forking.py | 135 |
1 files changed, 47 insertions, 88 deletions
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 4e24d6a..eadc321 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -4,32 +4,7 @@ # multiprocessing/forking.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # import os @@ -55,18 +30,18 @@ def assert_spawning(self): # Try making some callable types picklable # -from pickle import _Pickler as Pickler +from pickle import Pickler +from copyreg import dispatch_table + class ForkingPickler(Pickler): - dispatch = Pickler.dispatch.copy() + _extra_reducers = {} + def __init__(self, *args): + Pickler.__init__(self, *args) + self.dispatch_table = dispatch_table.copy() + self.dispatch_table.update(self._extra_reducers) @classmethod def register(cls, type, reduce): - def dispatcher(self, obj): - rv = reduce(obj) - if isinstance(rv, str): - self.save_global(obj, rv) - else: - self.save_reduce(obj=obj, *rv) - cls.dispatch[type] = dispatcher + cls._extra_reducers[type] = reduce def _reduce_method(m): if m.__self__ is None: @@ -100,8 +75,6 @@ else: # if sys.platform != 'win32': - import time - exit = os._exit duplicate = os.dup close = os.close @@ -118,14 +91,23 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() code = process_obj._bootstrap() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -143,26 +125,20 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + from .connection import wait + if not wait([self.sentinel], timeout): + return None + # This shouldn't block if wait() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: try: os.kill(self.pid, signal.SIGTERM) - except OSError as e: + except OSError: if self.wait(timeout=0.1) is None: raise @@ -177,12 +153,9 @@ if sys.platform != 'win32': else: import _thread import msvcrt - import _subprocess - import time + import _winapi - from pickle import dump, load, HIGHEST_PROTOCOL - from _multiprocessing import win32, Connection, PipeConnection - from .util import Finalize + from pickle import load, HIGHEST_PROTOCOL def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) @@ -195,8 +168,8 @@ else: WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") - exit = win32.ExitProcess - close = win32.CloseHandle + exit = _winapi.ExitProcess + close = _winapi.CloseHandle # # _python_exe is the assumed path to the python executable. @@ -218,11 +191,11 @@ else: def duplicate(handle, target_process=None, inheritable=False): if target_process is None: - target_process = _subprocess.GetCurrentProcess() - return _subprocess.DuplicateHandle( - _subprocess.GetCurrentProcess(), handle, target_process, - 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS - ).Detach() + target_process = _winapi.GetCurrentProcess() + return _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), handle, target_process, + 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS + ) # # We define a Popen class similar to the one from subprocess, but @@ -246,16 +219,17 @@ else: # start process cmd = get_command_line() + [rhandle] cmd = ' '.join('"%s"' % x for x in cmd) - hp, ht, pid, tid = _subprocess.CreateProcess( + hp, ht, pid, tid = _winapi.CreateProcess( _python_exe, cmd, None, None, 1, 0, None, None, None ) - ht.Close() + _winapi.CloseHandle(ht) close(rhandle) # set attributes of self self.pid = pid self.returncode = None self._handle = hp + self.sentinel = int(hp) # send information to child prep_data = get_preparation_data(process_obj._name) @@ -279,13 +253,13 @@ else: def wait(self, timeout=None): if self.returncode is None: if timeout is None: - msecs = _subprocess.INFINITE + msecs = _winapi.INFINITE else: msecs = max(0, int(timeout * 1000 + 0.5)) - res = _subprocess.WaitForSingleObject(int(self._handle), msecs) - if res == _subprocess.WAIT_OBJECT_0: - code = _subprocess.GetExitCodeProcess(self._handle) + res = _winapi.WaitForSingleObject(int(self._handle), msecs) + if res == _winapi.WAIT_OBJECT_0: + code = _winapi.GetExitCodeProcess(self._handle) if code == TERMINATE: code = -signal.SIGTERM self.returncode = code @@ -298,7 +272,7 @@ else: def terminate(self): if self.returncode is None: try: - _subprocess.TerminateProcess(int(self._handle), TERMINATE) + _winapi.TerminateProcess(int(self._handle), TERMINATE) except WindowsError: if self.wait(timeout=0.1) is None: raise @@ -350,7 +324,8 @@ else: return [sys.executable, '--multiprocessing-fork'] else: prog = 'from multiprocessing.forking import main; main()' - return [_python_exe, '-c', prog, '--multiprocessing-fork'] + opts = util._args_from_interpreter_flags() + return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] def main(): @@ -405,22 +380,6 @@ else: return d - # - # Make (Pipe)Connection picklable - # - - def reduce_connection(conn): - if not Popen.thread_is_spawning(): - raise RuntimeError( - 'By default %s objects can only be shared between processes\n' - 'using inheritance' % type(conn).__name__ - ) - return type(conn), (Popen.duplicate_for_child(conn.fileno()), - conn.readable, conn.writable) - - ForkingPickler.register(Connection, reduce_connection) - ForkingPickler.register(PipeConnection, reduce_connection) - # # Prepare current process # |