summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2013-08-22 10:38:57 (GMT)
committerRichard Oudkerk <shibturn@gmail.com>2013-08-22 10:38:57 (GMT)
commit7d2d43c0b15b8062c9b5d672a78e653abe2e1d91 (patch)
treeabc0049e1078fafc3c66f0463b8e9f70e2004d0e /Lib/multiprocessing
parent0718f70131e4bc3756f453eb2f0c9ef8ed2fa843 (diff)
downloadcpython-7d2d43c0b15b8062c9b5d672a78e653abe2e1d91.zip
cpython-7d2d43c0b15b8062c9b5d672a78e653abe2e1d91.tar.gz
cpython-7d2d43c0b15b8062c9b5d672a78e653abe2e1d91.tar.bz2
Stop making fork server have copy of semaphore_tracker_fd.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/forkserver.py13
-rw-r--r--Lib/multiprocessing/popen_spawn_posix.py6
-rw-r--r--Lib/multiprocessing/popen_spawn_win32.py5
-rw-r--r--Lib/multiprocessing/semaphore_tracker.py14
-rw-r--r--Lib/multiprocessing/spawn.py15
5 files changed, 29 insertions, 24 deletions
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
index c0ac993..208bd4e 100644
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -10,6 +10,7 @@ import threading
from . import connection
from . import process
from . import reduction
+from . import semaphore_tracker
from . import spawn
from . import util
@@ -55,13 +56,14 @@ def connect_to_new_process(fds):
The calling process should write to data_w the pickled preparation and
process data.
'''
- if len(fds) + 3 >= MAXFDS_TO_SEND:
+ if len(fds) + 4 >= MAXFDS_TO_SEND:
raise ValueError('too many fds')
with socket.socket(socket.AF_UNIX) as client:
client.connect(_forkserver_address)
parent_r, child_w = util.pipe()
child_r, parent_w = util.pipe()
- allfds = [child_r, child_w, _forkserver_alive_fd]
+ allfds = [child_r, child_w, _forkserver_alive_fd,
+ semaphore_tracker._semaphore_tracker_fd]
allfds += fds
try:
reduction.sendfds(client, allfds)
@@ -88,8 +90,6 @@ def ensure_running():
return
assert all(type(mod) is str for mod in _preload_modules)
- config = process.current_process()._config
- semaphore_tracker_fd = config['semaphore_tracker_fd']
cmd = ('from multiprocessing.forkserver import main; ' +
'main(%d, %d, %r, **%r)')
@@ -110,7 +110,7 @@ def ensure_running():
# when they all terminate the read end becomes ready.
alive_r, alive_w = util.pipe()
try:
- fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
+ fds_to_pass = [listener.fileno(), alive_r]
cmd %= (listener.fileno(), alive_r, _preload_modules, data)
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
@@ -197,7 +197,8 @@ def _serve_one(s, listener, alive_r, handler):
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
s.close()
assert len(fds) <= MAXFDS_TO_SEND
- child_r, child_w, _forkserver_alive_fd, *_inherited_fds = fds
+ child_r, child_w, _forkserver_alive_fd, stfd, *_inherited_fds = fds
+ semaphore_tracker._semaphore_tracker_fd = stfd
# send pid to client processes
write_unsigned(child_w, os.getpid())
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
index de262aa..e67915d 100644
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -40,7 +40,8 @@ class Popen(popen_fork.Popen):
return fd
def _launch(self, process_obj):
- tracker_fd = current_process()._config['semaphore_tracker_fd']
+ from . import semaphore_tracker
+ tracker_fd = semaphore_tracker._semaphore_tracker_fd
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
@@ -55,7 +56,8 @@ class Popen(popen_fork.Popen):
try:
parent_r, child_w = util.pipe()
child_r, parent_w = util.pipe()
- cmd = spawn.get_command_line() + [str(child_r)]
+ cmd = spawn.get_command_line(tracker_fd=tracker_fd,
+ pipe_handle=child_r)
self._fds.extend([child_r, child_w])
self.pid = util.spawnv_passfds(spawn.get_executable(),
cmd, self._fds)
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
index 7e0c4b3..f1e9aae 100644
--- a/Lib/multiprocessing/popen_spawn_win32.py
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -32,13 +32,14 @@ class Popen(object):
def __init__(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)
- cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())
# read end of pipe will be "stolen" by the child process
# -- see spawn_main() in spawn.py.
rhandle, whandle = _winapi.CreatePipe(None, 0)
wfd = msvcrt.open_osfhandle(whandle, 0)
- cmd += ' {} {}'.format(os.getpid(), rhandle)
+ cmd = spawn.get_command_line(parent_pid=os.getpid(),
+ pipe_handle=rhandle)
+ cmd = ' '.join('"%s"' % x for x in cmd)
with open(wfd, 'wb', closefd=True) as to_child:
# start process
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
index 4a2d636..99a0dd4 100644
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -26,6 +26,7 @@ from . import current_process
__all__ = ['ensure_running', 'register', 'unregister']
+_semaphore_tracker_fd = None
_lock = threading.Lock()
@@ -34,9 +35,9 @@ def ensure_running():
This can be run from any process. Usually a child process will use
the semaphore created by its parent.'''
+ global _semaphore_tracker_fd
with _lock:
- config = current_process()._config
- if config.get('semaphore_tracker_fd') is not None:
+ if _semaphore_tracker_fd is not None:
return
fds_to_pass = []
try:
@@ -44,7 +45,7 @@ def ensure_running():
except Exception:
pass
cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
- r, semaphore_tracker_fd = util.pipe()
+ r, w = util.pipe()
try:
fds_to_pass.append(r)
# process will out live us, so no need to wait on pid
@@ -53,10 +54,10 @@ def ensure_running():
args += ['-c', cmd % r]
util.spawnv_passfds(exe, args, fds_to_pass)
except:
- os.close(semaphore_tracker_fd)
+ os.close(w)
raise
else:
- config['semaphore_tracker_fd'] = semaphore_tracker_fd
+ _semaphore_tracker_fd = w
finally:
os.close(r)
@@ -77,8 +78,7 @@ def _send(cmd, name):
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('name too long')
- fd = current_process()._config['semaphore_tracker_fd']
- nbytes = os.write(fd, msg)
+ nbytes = os.write(_semaphore_tracker_fd, msg)
assert nbytes == len(msg)
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
index 83561db..9c4acee 100644
--- a/Lib/multiprocessing/spawn.py
+++ b/Lib/multiprocessing/spawn.py
@@ -66,32 +66,33 @@ def freeze_support():
sys.exit()
-def get_command_line():
+def get_command_line(**kwds):
'''
Returns prefix of command line used for spawning a child process
'''
if getattr(sys, 'frozen', False):
return [sys.executable, '--multiprocessing-fork']
else:
- prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'
+ prog = 'from multiprocessing.spawn import spawn_main; spawn_main(%s)'
+ prog %= ', '.join('%s=%r' % item for item in kwds.items())
opts = util._args_from_interpreter_flags()
return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
-def spawn_main():
+def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
'''
Run code specifed by data received over pipe
'''
assert is_forking(sys.argv)
- handle = int(sys.argv[-1])
if sys.platform == 'win32':
import msvcrt
from .reduction import steal_handle
- pid = int(sys.argv[-2])
- new_handle = steal_handle(pid, handle)
+ new_handle = steal_handle(parent_pid, pipe_handle)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
else:
- fd = handle
+ from . import semaphore_tracker
+ semaphore_tracker._semaphore_tracker_fd = tracker_fd
+ fd = pipe_handle
exitcode = _main(fd)
sys.exit(exitcode)