summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/forking.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/forking.py')
-rw-r--r--Lib/multiprocessing/forking.py106
1 files changed, 46 insertions, 60 deletions
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 4e24d6a..15fdb0e 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -55,18 +55,18 @@ def assert_spawning(self):
# Try making some callable types picklable
#
-from pickle import _Pickler as Pickler
+from pickle import Pickler
+from copyreg import dispatch_table
+
class ForkingPickler(Pickler):
- dispatch = Pickler.dispatch.copy()
+ _extra_reducers = {}
+ def __init__(self, *args):
+ Pickler.__init__(self, *args)
+ self.dispatch_table = dispatch_table.copy()
+ self.dispatch_table.update(self._extra_reducers)
@classmethod
def register(cls, type, reduce):
- def dispatcher(self, obj):
- rv = reduce(obj)
- if isinstance(rv, str):
- self.save_global(obj, rv)
- else:
- self.save_reduce(obj=obj, *rv)
- cls.dispatch[type] = dispatcher
+ cls._extra_reducers[type] = reduce
def _reduce_method(m):
if m.__self__ is None:
@@ -100,11 +100,12 @@ else:
#
if sys.platform != 'win32':
- import time
+ import select
exit = os._exit
duplicate = os.dup
close = os.close
+ _select = util._eintr_retry(select.select)
#
# We define a Popen class similar to the one from subprocess, but
@@ -118,14 +119,23 @@ if sys.platform != 'win32':
sys.stderr.flush()
self.returncode = None
+ r, w = os.pipe()
+ self.sentinel = r
+
self.pid = os.fork()
if self.pid == 0:
+ os.close(r)
if 'random' in sys.modules:
import random
random.seed()
code = process_obj._bootstrap()
os._exit(code)
+ # `w` will be closed when the child exits, at which point `r`
+ # will become ready for reading (using e.g. select()).
+ os.close(w)
+ util.Finalize(self, os.close, (r,))
+
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
@@ -143,26 +153,20 @@ if sys.platform != 'win32':
return self.returncode
def wait(self, timeout=None):
- if timeout is None:
- return self.poll(0)
- deadline = time.time() + timeout
- delay = 0.0005
- while 1:
- res = self.poll()
- if res is not None:
- break
- remaining = deadline - time.time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, 0.05)
- time.sleep(delay)
- return res
+ if self.returncode is None:
+ if timeout is not None:
+ r = _select([self.sentinel], [], [], timeout)[0]
+ if not r:
+ return None
+ # This shouldn't block if select() returned successfully.
+ return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+ return self.returncode
def terminate(self):
if self.returncode is None:
try:
os.kill(self.pid, signal.SIGTERM)
- except OSError as e:
+ except OSError:
if self.wait(timeout=0.1) is None:
raise
@@ -177,12 +181,9 @@ if sys.platform != 'win32':
else:
import _thread
import msvcrt
- import _subprocess
- import time
+ import _winapi
- from pickle import dump, load, HIGHEST_PROTOCOL
- from _multiprocessing import win32, Connection, PipeConnection
- from .util import Finalize
+ from pickle import load, HIGHEST_PROTOCOL
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
@@ -195,8 +196,8 @@ else:
WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
- exit = win32.ExitProcess
- close = win32.CloseHandle
+ exit = _winapi.ExitProcess
+ close = _winapi.CloseHandle
#
# _python_exe is the assumed path to the python executable.
@@ -218,11 +219,11 @@ else:
def duplicate(handle, target_process=None, inheritable=False):
if target_process is None:
- target_process = _subprocess.GetCurrentProcess()
- return _subprocess.DuplicateHandle(
- _subprocess.GetCurrentProcess(), handle, target_process,
- 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
- ).Detach()
+ target_process = _winapi.GetCurrentProcess()
+ return _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(), handle, target_process,
+ 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
+ )
#
# We define a Popen class similar to the one from subprocess, but
@@ -246,16 +247,17 @@ else:
# start process
cmd = get_command_line() + [rhandle]
cmd = ' '.join('"%s"' % x for x in cmd)
- hp, ht, pid, tid = _subprocess.CreateProcess(
+ hp, ht, pid, tid = _winapi.CreateProcess(
_python_exe, cmd, None, None, 1, 0, None, None, None
)
- ht.Close()
+ _winapi.CloseHandle(ht)
close(rhandle)
# set attributes of self
self.pid = pid
self.returncode = None
self._handle = hp
+ self.sentinel = int(hp)
# send information to child
prep_data = get_preparation_data(process_obj._name)
@@ -279,13 +281,13 @@ else:
def wait(self, timeout=None):
if self.returncode is None:
if timeout is None:
- msecs = _subprocess.INFINITE
+ msecs = _winapi.INFINITE
else:
msecs = max(0, int(timeout * 1000 + 0.5))
- res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
- if res == _subprocess.WAIT_OBJECT_0:
- code = _subprocess.GetExitCodeProcess(self._handle)
+ res = _winapi.WaitForSingleObject(int(self._handle), msecs)
+ if res == _winapi.WAIT_OBJECT_0:
+ code = _winapi.GetExitCodeProcess(self._handle)
if code == TERMINATE:
code = -signal.SIGTERM
self.returncode = code
@@ -298,7 +300,7 @@ else:
def terminate(self):
if self.returncode is None:
try:
- _subprocess.TerminateProcess(int(self._handle), TERMINATE)
+ _winapi.TerminateProcess(int(self._handle), TERMINATE)
except WindowsError:
if self.wait(timeout=0.1) is None:
raise
@@ -405,22 +407,6 @@ else:
return d
- #
- # Make (Pipe)Connection picklable
- #
-
- def reduce_connection(conn):
- if not Popen.thread_is_spawning():
- raise RuntimeError(
- 'By default %s objects can only be shared between processes\n'
- 'using inheritance' % type(conn).__name__
- )
- return type(conn), (Popen.duplicate_for_child(conn.fileno()),
- conn.readable, conn.writable)
-
- ForkingPickler.register(Connection, reduce_connection)
- ForkingPickler.register(PipeConnection, reduce_connection)
-
#
# Prepare current process
#