summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2013-08-22 10:38:55 (GMT)
committerRichard Oudkerk <shibturn@gmail.com>2013-08-22 10:38:55 (GMT)
commit0718f70131e4bc3756f453eb2f0c9ef8ed2fa843 (patch)
tree1e57c0ec97c544a9e8f63028ba9e1900d163d705 /Lib/multiprocessing
parentb8c537094d52dc07434df757c4c29c0f6c6e76d4 (diff)
downloadcpython-0718f70131e4bc3756f453eb2f0c9ef8ed2fa843.zip
cpython-0718f70131e4bc3756f453eb2f0c9ef8ed2fa843.tar.gz
cpython-0718f70131e4bc3756f453eb2f0c9ef8ed2fa843.tar.bz2
Issue #18762: Fix EBADF error when using forkserver.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/forkserver.py55
1 files changed, 25 insertions, 30 deletions
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
index 26bf0c3..c0ac993 100644
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -23,11 +23,12 @@ __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
MAXFDS_TO_SEND = 256
UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t
+_forkserver_address = None
+_forkserver_alive_fd = None
_inherited_fds = None
_lock = threading.Lock()
_preload_modules = ['__main__']
-
#
# Public function
#
@@ -56,31 +57,15 @@ def connect_to_new_process(fds):
'''
if len(fds) + 3 >= MAXFDS_TO_SEND:
raise ValueError('too many fds')
- address, alive_w = process.current_process()._config['forkserver_info']
with socket.socket(socket.AF_UNIX) as client:
- client.connect(address)
+ client.connect(_forkserver_address)
parent_r, child_w = util.pipe()
child_r, parent_w = util.pipe()
- allfds = [child_r, child_w, alive_w]
+ allfds = [child_r, child_w, _forkserver_alive_fd]
allfds += fds
try:
reduction.sendfds(client, allfds)
return parent_r, parent_w
- except OSError:
- # XXX This is debugging info for Issue #18762
- import fcntl
- L = []
- for fd in allfds:
- try:
- flags = fcntl.fcntl(fd, fcntl.F_GETFL)
- except OSError as e:
- L.append((fd, e))
- else:
- L.append((fd, flags))
- print('*** connect_to_new_process: %r' % L, file=sys.stderr)
- os.close(parent_r)
- os.close(parent_w)
- raise
except:
os.close(parent_r)
os.close(parent_w)
@@ -97,12 +82,13 @@ def ensure_running():
process will just reuse the forkserver started by its parent, so
ensure_running() will do nothing.
'''
+ global _forkserver_address, _forkserver_alive_fd
with _lock:
- config = process.current_process()._config
- if config.get('forkserver_info') is not None:
+ if _forkserver_alive_fd is not None:
return
assert all(type(mod) is str for mod in _preload_modules)
+ config = process.current_process()._config
semaphore_tracker_fd = config['semaphore_tracker_fd']
cmd = ('from multiprocessing.forkserver import main; ' +
'main(%d, %d, %r, **%r)')
@@ -122,13 +108,20 @@ def ensure_running():
# all client processes own the write end of the "alive" pipe;
# when they all terminate the read end becomes ready.
- alive_r, alive_w = os.pipe()
- config['forkserver_info'] = (address, alive_w)
- fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
- cmd %= (listener.fileno(), alive_r, _preload_modules, data)
- exe = spawn.get_executable()
- args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
- pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ alive_r, alive_w = util.pipe()
+ try:
+ fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
+ cmd %= (listener.fileno(), alive_r, _preload_modules, data)
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ except:
+ os.close(alive_w)
+ raise
+ finally:
+ os.close(alive_r)
+ _forkserver_address = address
+ _forkserver_alive_fd = alive_w
def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
@@ -157,6 +150,8 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
# ignoring SIGCHLD means no need to reap zombie processes
handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
+ global _forkserver_address
+ _forkserver_address = listener.getsockname()
readers = [listener, alive_r]
while True:
@@ -191,7 +186,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
#
def _serve_one(s, listener, alive_r, handler):
- global _inherited_fds
+ global _inherited_fds, _forkserver_alive_fd
# close unnecessary stuff and reset SIGCHLD handler
listener.close()
@@ -202,7 +197,7 @@ def _serve_one(s, listener, alive_r, handler):
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
s.close()
assert len(fds) <= MAXFDS_TO_SEND
- child_r, child_w, alive_w, *_inherited_fds = fds
+ child_r, child_w, _forkserver_alive_fd, *_inherited_fds = fds
# send pid to client processes
write_unsigned(child_w, os.getpid())