diff options
Diffstat (limited to 'Lib/multiprocessing/forking.py')
| -rw-r--r-- | Lib/multiprocessing/forking.py | 106 |
1 files changed, 46 insertions, 60 deletions
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 4e24d6a..15fdb0e 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -55,18 +55,18 @@ def assert_spawning(self): # Try making some callable types picklable # -from pickle import _Pickler as Pickler +from pickle import Pickler +from copyreg import dispatch_table + class ForkingPickler(Pickler): - dispatch = Pickler.dispatch.copy() + _extra_reducers = {} + def __init__(self, *args): + Pickler.__init__(self, *args) + self.dispatch_table = dispatch_table.copy() + self.dispatch_table.update(self._extra_reducers) @classmethod def register(cls, type, reduce): - def dispatcher(self, obj): - rv = reduce(obj) - if isinstance(rv, str): - self.save_global(obj, rv) - else: - self.save_reduce(obj=obj, *rv) - cls.dispatch[type] = dispatcher + cls._extra_reducers[type] = reduce def _reduce_method(m): if m.__self__ is None: @@ -100,11 +100,12 @@ else: # if sys.platform != 'win32': - import time + import select exit = os._exit duplicate = os.dup close = os.close + _select = util._eintr_retry(select.select) # # We define a Popen class similar to the one from subprocess, but @@ -118,14 +119,23 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() code = process_obj._bootstrap() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -143,26 +153,20 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + r = _select([self.sentinel], [], [], timeout)[0] + if not r: + return None + # This shouldn't block if select() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: try: os.kill(self.pid, signal.SIGTERM) - except OSError as e: + except OSError: if self.wait(timeout=0.1) is None: raise @@ -177,12 +181,9 @@ if sys.platform != 'win32': else: import _thread import msvcrt - import _subprocess - import time + import _winapi - from pickle import dump, load, HIGHEST_PROTOCOL - from _multiprocessing import win32, Connection, PipeConnection - from .util import Finalize + from pickle import load, HIGHEST_PROTOCOL def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) @@ -195,8 +196,8 @@ else: WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") - exit = win32.ExitProcess - close = win32.CloseHandle + exit = _winapi.ExitProcess + close = _winapi.CloseHandle # # _python_exe is the assumed path to the python executable. @@ -218,11 +219,11 @@ else: def duplicate(handle, target_process=None, inheritable=False): if target_process is None: - target_process = _subprocess.GetCurrentProcess() - return _subprocess.DuplicateHandle( - _subprocess.GetCurrentProcess(), handle, target_process, - 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS - ).Detach() + target_process = _winapi.GetCurrentProcess() + return _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), handle, target_process, + 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS + ) # # We define a Popen class similar to the one from subprocess, but @@ -246,16 +247,17 @@ else: # start process cmd = get_command_line() + [rhandle] cmd = ' '.join('"%s"' % x for x in cmd) - hp, ht, pid, tid = _subprocess.CreateProcess( + hp, ht, pid, tid = _winapi.CreateProcess( _python_exe, cmd, None, None, 1, 0, None, None, None ) - ht.Close() + _winapi.CloseHandle(ht) close(rhandle) # set attributes of self self.pid = pid self.returncode = None self._handle = hp + self.sentinel = int(hp) # send information to child prep_data = get_preparation_data(process_obj._name) @@ -279,13 +281,13 @@ else: def wait(self, timeout=None): if self.returncode is None: if timeout is None: - msecs = _subprocess.INFINITE + msecs = _winapi.INFINITE else: msecs = max(0, int(timeout * 1000 + 0.5)) - res = _subprocess.WaitForSingleObject(int(self._handle), msecs) - if res == _subprocess.WAIT_OBJECT_0: - code = _subprocess.GetExitCodeProcess(self._handle) + res = _winapi.WaitForSingleObject(int(self._handle), msecs) + if res == _winapi.WAIT_OBJECT_0: + code = _winapi.GetExitCodeProcess(self._handle) if code == TERMINATE: code = -signal.SIGTERM self.returncode = code @@ -298,7 +300,7 @@ else: def terminate(self): if self.returncode is None: try: - _subprocess.TerminateProcess(int(self._handle), TERMINATE) + _winapi.TerminateProcess(int(self._handle), TERMINATE) except WindowsError: if self.wait(timeout=0.1) is None: raise @@ -405,22 +407,6 @@ else: return d - # - # Make (Pipe)Connection picklable - # - - def reduce_connection(conn): - if not Popen.thread_is_spawning(): - raise RuntimeError( - 'By default %s objects can only be shared between processes\n' - 'using inheritance' % type(conn).__name__ - ) - return type(conn), (Popen.duplicate_for_child(conn.fileno()), - conn.readable, conn.writable) - - ForkingPickler.register(Connection, reduce_connection) - ForkingPickler.register(PipeConnection, reduce_connection) - # # Prepare current process # |
