diff options
Diffstat (limited to 'Lib/multiprocessing/forking.py')
| -rw-r--r-- | Lib/multiprocessing/forking.py | 189 | 
1 files changed, 74 insertions, 115 deletions
| diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index bc8ac44..af6580d 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -4,32 +4,7 @@  # multiprocessing/forking.py  #  # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -#    notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -#    notice, this list of conditions and the following disclaimer in the -#    documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -#    used to endorse or promote products derived from this software -#    without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement.  #  import os @@ -38,7 +13,7 @@ import signal  from multiprocessing import util, process -__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] +__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler']  #  # Check that the current thread is spawning a child process @@ -55,18 +30,18 @@ def assert_spawning(self):  # Try making some callable types picklable  # -from pickle import _Pickler as Pickler +from pickle import Pickler +from copyreg import dispatch_table +  class ForkingPickler(Pickler): -    dispatch = Pickler.dispatch.copy() +    _extra_reducers = {} +    def __init__(self, *args): +        Pickler.__init__(self, *args) +        self.dispatch_table = dispatch_table.copy() +        self.dispatch_table.update(self._extra_reducers)      @classmethod      def register(cls, type, reduce): -        def dispatcher(self, obj): -            rv = reduce(obj) -            if isinstance(rv, str): -                self.save_global(obj, rv) -            else: -                self.save_reduce(obj=obj, *rv) -        cls.dispatch[type] = dispatcher +        cls._extra_reducers[type] = reduce  def _reduce_method(m):      if m.__self__ is None: @@ -100,9 +75,6 @@ else:  #  if sys.platform != 'win32': -    import time - -    exit = os._exit      duplicate = os.dup      close = os.close @@ -118,14 +90,23 @@ if sys.platform != 'win32':              sys.stderr.flush()              self.returncode = None +            r, w = os.pipe() +            self.sentinel = r +              self.pid = os.fork()              if self.pid == 0: +                os.close(r)                  if 'random' in sys.modules:                      import random                      random.seed()                  code = process_obj._bootstrap()                  os._exit(code) +            # `w` will be closed when the child exits, at which point `r` +            # will become ready for reading (using e.g. select()). +            os.close(w) +            util.Finalize(self, os.close, (r,)) +          def poll(self, flag=os.WNOHANG):              if self.returncode is None:                  try: @@ -143,26 +124,20 @@ if sys.platform != 'win32':              return self.returncode          def wait(self, timeout=None): -            if timeout is None: -                return self.poll(0) -            deadline = time.time() + timeout -            delay = 0.0005 -            while 1: -                res = self.poll() -                if res is not None: -                    break -                remaining = deadline - time.time() -                if remaining <= 0: -                    break -                delay = min(delay * 2, remaining, 0.05) -                time.sleep(delay) -            return res +            if self.returncode is None: +                if timeout is not None: +                    from .connection import wait +                    if not wait([self.sentinel], timeout): +                        return None +                # This shouldn't block if wait() returned successfully. +                return self.poll(os.WNOHANG if timeout == 0.0 else 0) +            return self.returncode          def terminate(self):              if self.returncode is None:                  try:                      os.kill(self.pid, signal.SIGTERM) -                except OSError as e: +                except OSError:                      if self.wait(timeout=0.1) is None:                          raise @@ -177,12 +152,9 @@ if sys.platform != 'win32':  else:      import _thread      import msvcrt -    import _subprocess -    import time +    import _winapi -    from pickle import dump, load, HIGHEST_PROTOCOL -    from _multiprocessing import win32, Connection, PipeConnection -    from .util import Finalize +    from pickle import load, HIGHEST_PROTOCOL      def dump(obj, file, protocol=None):          ForkingPickler(file, protocol).dump(obj) @@ -195,8 +167,7 @@ else:      WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))      WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") -    exit = win32.ExitProcess -    close = win32.CloseHandle +    close = _winapi.CloseHandle      #      # _python_exe is the assumed path to the python executable. @@ -218,11 +189,11 @@ else:      def duplicate(handle, target_process=None, inheritable=False):          if target_process is None: -            target_process = _subprocess.GetCurrentProcess() -        return _subprocess.DuplicateHandle( -            _subprocess.GetCurrentProcess(), handle, target_process, -            0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS -            ).Detach() +            target_process = _winapi.GetCurrentProcess() +        return _winapi.DuplicateHandle( +            _winapi.GetCurrentProcess(), handle, target_process, +            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS +            )      #      # We define a Popen class similar to the one from subprocess, but @@ -236,6 +207,9 @@ else:          _tls = _thread._local()          def __init__(self, process_obj): +            cmd = ' '.join('"%s"' % x for x in get_command_line()) +            prep_data = get_preparation_data(process_obj._name) +              # create pipe for communication with child              rfd, wfd = os.pipe() @@ -243,30 +217,30 @@ else:              rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)              os.close(rfd) -            # start process -            cmd = get_command_line() + [rhandle] -            cmd = ' '.join('"%s"' % x for x in cmd) -            hp, ht, pid, tid = _subprocess.CreateProcess( -                _python_exe, cmd, None, None, 1, 0, None, None, None -                ) -            ht.Close() -            close(rhandle) - -            # set attributes of self -            self.pid = pid -            self.returncode = None -            self._handle = hp - -            # send information to child -            prep_data = get_preparation_data(process_obj._name) -            to_child = os.fdopen(wfd, 'wb') -            Popen._tls.process_handle = int(hp) -            try: -                dump(prep_data, to_child, HIGHEST_PROTOCOL) -                dump(process_obj, to_child, HIGHEST_PROTOCOL) -            finally: -                del Popen._tls.process_handle -                to_child.close() +            with open(wfd, 'wb', closefd=True) as to_child: +                # start process +                try: +                    hp, ht, pid, tid = _winapi.CreateProcess( +                        _python_exe, cmd + (' %s' % rhandle), +                        None, None, 1, 0, None, None, None +                        ) +                    _winapi.CloseHandle(ht) +                finally: +                    close(rhandle) + +                # set attributes of self +                self.pid = pid +                self.returncode = None +                self._handle = hp +                self.sentinel = int(hp) + +                # send information to child +                Popen._tls.process_handle = int(hp) +                try: +                    dump(prep_data, to_child, HIGHEST_PROTOCOL) +                    dump(process_obj, to_child, HIGHEST_PROTOCOL) +                finally: +                    del Popen._tls.process_handle          @staticmethod          def thread_is_spawning(): @@ -279,13 +253,13 @@ else:          def wait(self, timeout=None):              if self.returncode is None:                  if timeout is None: -                    msecs = _subprocess.INFINITE +                    msecs = _winapi.INFINITE                  else:                      msecs = max(0, int(timeout * 1000 + 0.5)) -                res = _subprocess.WaitForSingleObject(int(self._handle), msecs) -                if res == _subprocess.WAIT_OBJECT_0: -                    code = _subprocess.GetExitCodeProcess(self._handle) +                res = _winapi.WaitForSingleObject(int(self._handle), msecs) +                if res == _winapi.WAIT_OBJECT_0: +                    code = _winapi.GetExitCodeProcess(self._handle)                      if code == TERMINATE:                          code = -signal.SIGTERM                      self.returncode = code @@ -298,9 +272,9 @@ else:          def terminate(self):              if self.returncode is None:                  try: -                    _subprocess.TerminateProcess(int(self._handle), TERMINATE) -                except WindowsError: -                    if self.wait(timeout=0.1) is None: +                    _winapi.TerminateProcess(int(self._handle), TERMINATE) +                except OSError: +                    if self.wait(timeout=1.0) is None:                          raise      # @@ -350,7 +324,8 @@ else:              return [sys.executable, '--multiprocessing-fork']          else:              prog = 'from multiprocessing.forking import main; main()' -            return [_python_exe, '-c', prog, '--multiprocessing-fork'] +            opts = util._args_from_interpreter_flags() +            return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']      def main(): @@ -372,7 +347,7 @@ else:          from_parent.close()          exitcode = self._bootstrap() -        exit(exitcode) +        sys.exit(exitcode)      def get_preparation_data(name): @@ -405,22 +380,6 @@ else:          return d -    # -    # Make (Pipe)Connection picklable -    # - -    def reduce_connection(conn): -        if not Popen.thread_is_spawning(): -            raise RuntimeError( -                'By default %s objects can only be shared between processes\n' -                'using inheritance' % type(conn).__name__ -                ) -        return type(conn), (Popen.duplicate_for_child(conn.fileno()), -                            conn.readable, conn.writable) - -    ForkingPickler.register(Connection, reduce_connection) -    ForkingPickler.register(PipeConnection, reduce_connection) -  #  # Prepare current process  # | 
