diff options
author | Richard Oudkerk <shibturn@gmail.com> | 2013-08-22 10:38:57 (GMT) |
---|---|---|
committer | Richard Oudkerk <shibturn@gmail.com> | 2013-08-22 10:38:57 (GMT) |
commit | 7d2d43c0b15b8062c9b5d672a78e653abe2e1d91 (patch) | |
tree | abc0049e1078fafc3c66f0463b8e9f70e2004d0e /Lib/multiprocessing | |
parent | 0718f70131e4bc3756f453eb2f0c9ef8ed2fa843 (diff) | |
download | cpython-7d2d43c0b15b8062c9b5d672a78e653abe2e1d91.zip cpython-7d2d43c0b15b8062c9b5d672a78e653abe2e1d91.tar.gz cpython-7d2d43c0b15b8062c9b5d672a78e653abe2e1d91.tar.bz2 |
Stop making fork server have copy of semaphore_tracker_fd.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 13 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_posix.py | 6 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_win32.py | 5 | ||||
-rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 14 | ||||
-rw-r--r-- | Lib/multiprocessing/spawn.py | 15 |
5 files changed, 29 insertions, 24 deletions
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index c0ac993..208bd4e 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -10,6 +10,7 @@ import threading from . import connection from . import process from . import reduction +from . import semaphore_tracker from . import spawn from . import util @@ -55,13 +56,14 @@ def connect_to_new_process(fds): The calling process should write to data_w the pickled preparation and process data. ''' - if len(fds) + 3 >= MAXFDS_TO_SEND: + if len(fds) + 4 >= MAXFDS_TO_SEND: raise ValueError('too many fds') with socket.socket(socket.AF_UNIX) as client: client.connect(_forkserver_address) parent_r, child_w = util.pipe() child_r, parent_w = util.pipe() - allfds = [child_r, child_w, _forkserver_alive_fd] + allfds = [child_r, child_w, _forkserver_alive_fd, + semaphore_tracker._semaphore_tracker_fd] allfds += fds try: reduction.sendfds(client, allfds) @@ -88,8 +90,6 @@ def ensure_running(): return assert all(type(mod) is str for mod in _preload_modules) - config = process.current_process()._config - semaphore_tracker_fd = config['semaphore_tracker_fd'] cmd = ('from multiprocessing.forkserver import main; ' + 'main(%d, %d, %r, **%r)') @@ -110,7 +110,7 @@ def ensure_running(): # when they all terminate the read end becomes ready. alive_r, alive_w = util.pipe() try: - fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd] + fds_to_pass = [listener.fileno(), alive_r] cmd %= (listener.fileno(), alive_r, _preload_modules, data) exe = spawn.get_executable() args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd] @@ -197,7 +197,8 @@ def _serve_one(s, listener, alive_r, handler): fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) s.close() assert len(fds) <= MAXFDS_TO_SEND - child_r, child_w, _forkserver_alive_fd, *_inherited_fds = fds + child_r, child_w, _forkserver_alive_fd, stfd, *_inherited_fds = fds + semaphore_tracker._semaphore_tracker_fd = stfd # send pid to client processes write_unsigned(child_w, os.getpid()) diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index de262aa..e67915d 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -40,7 +40,8 @@ class Popen(popen_fork.Popen): return fd def _launch(self, process_obj): - tracker_fd = current_process()._config['semaphore_tracker_fd'] + from . import semaphore_tracker + tracker_fd = semaphore_tracker._semaphore_tracker_fd self._fds.append(tracker_fd) prep_data = spawn.get_preparation_data(process_obj._name) fp = io.BytesIO() @@ -55,7 +56,8 @@ class Popen(popen_fork.Popen): try: parent_r, child_w = util.pipe() child_r, parent_w = util.pipe() - cmd = spawn.get_command_line() + [str(child_r)] + cmd = spawn.get_command_line(tracker_fd=tracker_fd, + pipe_handle=child_r) self._fds.extend([child_r, child_w]) self.pid = util.spawnv_passfds(spawn.get_executable(), cmd, self._fds) diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py index 7e0c4b3..f1e9aae 100644 --- a/Lib/multiprocessing/popen_spawn_win32.py +++ b/Lib/multiprocessing/popen_spawn_win32.py @@ -32,13 +32,14 @@ class Popen(object): def __init__(self, process_obj): prep_data = spawn.get_preparation_data(process_obj._name) - cmd = ' '.join('"%s"' % x for x in spawn.get_command_line()) # read end of pipe will be "stolen" by the child process # -- see spawn_main() in spawn.py. rhandle, whandle = _winapi.CreatePipe(None, 0) wfd = msvcrt.open_osfhandle(whandle, 0) - cmd += ' {} {}'.format(os.getpid(), rhandle) + cmd = spawn.get_command_line(parent_pid=os.getpid(), + pipe_handle=rhandle) + cmd = ' '.join('"%s"' % x for x in cmd) with open(wfd, 'wb', closefd=True) as to_child: # start process diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 4a2d636..99a0dd4 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -26,6 +26,7 @@ from . import current_process __all__ = ['ensure_running', 'register', 'unregister'] +_semaphore_tracker_fd = None _lock = threading.Lock() @@ -34,9 +35,9 @@ def ensure_running(): This can be run from any process. Usually a child process will use the semaphore created by its parent.''' + global _semaphore_tracker_fd with _lock: - config = current_process()._config - if config.get('semaphore_tracker_fd') is not None: + if _semaphore_tracker_fd is not None: return fds_to_pass = [] try: @@ -44,7 +45,7 @@ def ensure_running(): except Exception: pass cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)' - r, semaphore_tracker_fd = util.pipe() + r, w = util.pipe() try: fds_to_pass.append(r) # process will out live us, so no need to wait on pid @@ -53,10 +54,10 @@ def ensure_running(): args += ['-c', cmd % r] util.spawnv_passfds(exe, args, fds_to_pass) except: - os.close(semaphore_tracker_fd) + os.close(w) raise else: - config['semaphore_tracker_fd'] = semaphore_tracker_fd + _semaphore_tracker_fd = w finally: os.close(r) @@ -77,8 +78,7 @@ def _send(cmd, name): # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 raise ValueError('name too long') - fd = current_process()._config['semaphore_tracker_fd'] - nbytes = os.write(fd, msg) + nbytes = os.write(_semaphore_tracker_fd, msg) assert nbytes == len(msg) diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 83561db..9c4acee 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -66,32 +66,33 @@ def freeze_support(): sys.exit() -def get_command_line(): +def get_command_line(**kwds): ''' Returns prefix of command line used for spawning a child process ''' if getattr(sys, 'frozen', False): return [sys.executable, '--multiprocessing-fork'] else: - prog = 'from multiprocessing.spawn import spawn_main; spawn_main()' + prog = 'from multiprocessing.spawn import spawn_main; spawn_main(%s)' + prog %= ', '.join('%s=%r' % item for item in kwds.items()) opts = util._args_from_interpreter_flags() return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] -def spawn_main(): +def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): ''' Run code specifed by data received over pipe ''' assert is_forking(sys.argv) - handle = int(sys.argv[-1]) if sys.platform == 'win32': import msvcrt from .reduction import steal_handle - pid = int(sys.argv[-2]) - new_handle = steal_handle(pid, handle) + new_handle = steal_handle(parent_pid, pipe_handle) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) else: - fd = handle + from . import semaphore_tracker + semaphore_tracker._semaphore_tracker_fd = tracker_fd + fd = pipe_handle exitcode = _main(fd) sys.exit(exitcode) |