diff options
author | Richard Oudkerk <shibturn@gmail.com> | 2013-08-22 10:38:55 (GMT) |
---|---|---|
committer | Richard Oudkerk <shibturn@gmail.com> | 2013-08-22 10:38:55 (GMT) |
commit | 0718f70131e4bc3756f453eb2f0c9ef8ed2fa843 (patch) | |
tree | 1e57c0ec97c544a9e8f63028ba9e1900d163d705 /Lib/multiprocessing | |
parent | b8c537094d52dc07434df757c4c29c0f6c6e76d4 (diff) | |
download | cpython-0718f70131e4bc3756f453eb2f0c9ef8ed2fa843.zip cpython-0718f70131e4bc3756f453eb2f0c9ef8ed2fa843.tar.gz cpython-0718f70131e4bc3756f453eb2f0c9ef8ed2fa843.tar.bz2 |
Issue #18762: Fix EBADF error when using forkserver.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 55 |
1 files changed, 25 insertions, 30 deletions
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 26bf0c3..c0ac993 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -23,11 +23,12 @@ __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process', MAXFDS_TO_SEND = 256 UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t +_forkserver_address = None +_forkserver_alive_fd = None _inherited_fds = None _lock = threading.Lock() _preload_modules = ['__main__'] - # # Public function # @@ -56,31 +57,15 @@ def connect_to_new_process(fds): ''' if len(fds) + 3 >= MAXFDS_TO_SEND: raise ValueError('too many fds') - address, alive_w = process.current_process()._config['forkserver_info'] with socket.socket(socket.AF_UNIX) as client: - client.connect(address) + client.connect(_forkserver_address) parent_r, child_w = util.pipe() child_r, parent_w = util.pipe() - allfds = [child_r, child_w, alive_w] + allfds = [child_r, child_w, _forkserver_alive_fd] allfds += fds try: reduction.sendfds(client, allfds) return parent_r, parent_w - except OSError: - # XXX This is debugging info for Issue #18762 - import fcntl - L = [] - for fd in allfds: - try: - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - except OSError as e: - L.append((fd, e)) - else: - L.append((fd, flags)) - print('*** connect_to_new_process: %r' % L, file=sys.stderr) - os.close(parent_r) - os.close(parent_w) - raise except: os.close(parent_r) os.close(parent_w) @@ -97,12 +82,13 @@ def ensure_running(): process will just reuse the forkserver started by its parent, so ensure_running() will do nothing. ''' + global _forkserver_address, _forkserver_alive_fd with _lock: - config = process.current_process()._config - if config.get('forkserver_info') is not None: + if _forkserver_alive_fd is not None: return assert all(type(mod) is str for mod in _preload_modules) + config = process.current_process()._config semaphore_tracker_fd = config['semaphore_tracker_fd'] cmd = ('from multiprocessing.forkserver import main; ' + 'main(%d, %d, %r, **%r)') @@ -122,13 +108,20 @@ def ensure_running(): # all client processes own the write end of the "alive" pipe; # when they all terminate the read end becomes ready. - alive_r, alive_w = os.pipe() - config['forkserver_info'] = (address, alive_w) - fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd] - cmd %= (listener.fileno(), alive_r, _preload_modules, data) - exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd] - pid = util.spawnv_passfds(exe, args, fds_to_pass) + alive_r, alive_w = util.pipe() + try: + fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd] + cmd %= (listener.fileno(), alive_r, _preload_modules, data) + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd] + pid = util.spawnv_passfds(exe, args, fds_to_pass) + except: + os.close(alive_w) + raise + finally: + os.close(alive_r) + _forkserver_address = address + _forkserver_alive_fd = alive_w def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): @@ -157,6 +150,8 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): # ignoring SIGCHLD means no need to reap zombie processes handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN) with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener: + global _forkserver_address + _forkserver_address = listener.getsockname() readers = [listener, alive_r] while True: @@ -191,7 +186,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): # def _serve_one(s, listener, alive_r, handler): - global _inherited_fds + global _inherited_fds, _forkserver_alive_fd # close unnecessary stuff and reset SIGCHLD handler listener.close() @@ -202,7 +197,7 @@ def _serve_one(s, listener, alive_r, handler): fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) s.close() assert len(fds) <= MAXFDS_TO_SEND - child_r, child_w, alive_w, *_inherited_fds = fds + child_r, child_w, _forkserver_alive_fd, *_inherited_fds = fds # send pid to client processes write_unsigned(child_w, os.getpid()) |