diff options
Diffstat (limited to 'Lib/multiprocessing/util.py')
-rw-r--r-- | Lib/multiprocessing/util.py | 70 |
1 files changed, 60 insertions, 10 deletions
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index f5862b4..d9e4799 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -17,13 +17,13 @@ import threading # we want threading to install it's # cleanup function before multiprocessing does from subprocess import _args_from_interpreter_flags -from multiprocessing.process import current_process, active_children +from . import process __all__ = [ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', - 'SUBDEBUG', 'SUBWARNING', + 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', ] # @@ -71,8 +71,6 @@ def get_logger(): _logger = logging.getLogger(LOGGER_NAME) _logger.propagate = 0 - logging.addLevelName(SUBDEBUG, 'SUBDEBUG') - logging.addLevelName(SUBWARNING, 'SUBWARNING') # XXX multiprocessing should cleanup before logging if hasattr(atexit, 'unregister'): @@ -111,13 +109,14 @@ def log_to_stderr(level=None): def get_temp_dir(): # get name of a temp directory which will be automatically cleaned up - if current_process()._tempdir is None: + tempdir = process.current_process()._config.get('tempdir') + if tempdir is None: import shutil, tempfile tempdir = tempfile.mkdtemp(prefix='pymp-') info('created temp directory %s', tempdir) Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) - current_process()._tempdir = tempdir - return current_process()._tempdir + process.current_process()._config['tempdir'] = tempdir + return tempdir # # Support for reinitialization of objects when bootstrapping a child process @@ -273,8 +272,8 @@ def is_exiting(): _exiting = False def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, - active_children=active_children, - current_process=current_process): + active_children=process.active_children, + current_process=process.current_process): # We hold on to references to functions in the arglist due to the # situation described below, where this function is called after this # module's globals are destroyed. @@ -303,7 +302,7 @@ def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, # #9207. for p in active_children(): - if p._daemonic: + if p.daemon: info('calling terminate() for daemon %s', p.name) p._popen.terminate() @@ -335,3 +334,54 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () + +# +# Close fds except those specified +# + +try: + MAXFD = os.sysconf("SC_OPEN_MAX") +except Exception: + MAXFD = 256 + +def close_all_fds_except(fds): + fds = list(fds) + [-1, MAXFD] + fds.sort() + assert fds[-1] == MAXFD, 'fd too large' + for i in range(len(fds) - 1): + os.closerange(fds[i]+1, fds[i+1]) + +# +# Start a program with only specified fds kept open +# + +def spawnv_passfds(path, args, passfds): + import _posixsubprocess, fcntl + passfds = sorted(passfds) + tmp = [] + # temporarily unset CLOEXEC on passed fds + for fd in passfds: + flag = fcntl.fcntl(fd, fcntl.F_GETFD) + if flag & fcntl.FD_CLOEXEC: + fcntl.fcntl(fd, fcntl.F_SETFD, flag & ~fcntl.FD_CLOEXEC) + tmp.append((fd, flag)) + errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe() + try: + return _posixsubprocess.fork_exec( + args, [os.fsencode(path)], True, passfds, None, None, + -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, + False, False, None) + finally: + os.close(errpipe_read) + os.close(errpipe_write) + # reset CLOEXEC where necessary + for fd, flag in tmp: + fcntl.fcntl(fd, fcntl.F_SETFD, flag) + +# +# Return pipe with CLOEXEC set on fds +# + +def pipe(): + import _posixsubprocess + return _posixsubprocess.cloexec_pipe() |