summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/util.py')
-rw-r--r--Lib/multiprocessing/util.py70
1 files changed, 60 insertions, 10 deletions
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index f5862b4..d9e4799 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -17,13 +17,13 @@ import threading # we want threading to install it's
# cleanup function before multiprocessing does
from subprocess import _args_from_interpreter_flags
-from multiprocessing.process import current_process, active_children
+from . import process
__all__ = [
'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
'log_to_stderr', 'get_temp_dir', 'register_after_fork',
'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
- 'SUBDEBUG', 'SUBWARNING',
+ 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
]
#
@@ -71,8 +71,6 @@ def get_logger():
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
- logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
- logging.addLevelName(SUBWARNING, 'SUBWARNING')
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
@@ -111,13 +109,14 @@ def log_to_stderr(level=None):
def get_temp_dir():
# get name of a temp directory which will be automatically cleaned up
- if current_process()._tempdir is None:
+ tempdir = process.current_process()._config.get('tempdir')
+ if tempdir is None:
import shutil, tempfile
tempdir = tempfile.mkdtemp(prefix='pymp-')
info('created temp directory %s', tempdir)
Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
- current_process()._tempdir = tempdir
- return current_process()._tempdir
+ process.current_process()._config['tempdir'] = tempdir
+ return tempdir
#
# Support for reinitialization of objects when bootstrapping a child process
@@ -273,8 +272,8 @@ def is_exiting():
_exiting = False
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
- active_children=active_children,
- current_process=current_process):
+ active_children=process.active_children,
+ current_process=process.current_process):
# We hold on to references to functions in the arglist due to the
# situation described below, where this function is called after this
# module's globals are destroyed.
@@ -303,7 +302,7 @@ def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
# #9207.
for p in active_children():
- if p._daemonic:
+ if p.daemon:
info('calling terminate() for daemon %s', p.name)
p._popen.terminate()
@@ -335,3 +334,54 @@ class ForkAwareLocal(threading.local):
register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self):
return type(self), ()
+
+#
+# Close fds except those specified
+#
+
+try:
+ MAXFD = os.sysconf("SC_OPEN_MAX")
+except Exception:
+ MAXFD = 256
+
+def close_all_fds_except(fds):
+ fds = list(fds) + [-1, MAXFD]
+ fds.sort()
+ assert fds[-1] == MAXFD, 'fd too large'
+ for i in range(len(fds) - 1):
+ os.closerange(fds[i]+1, fds[i+1])
+
+#
+# Start a program with only specified fds kept open
+#
+
+def spawnv_passfds(path, args, passfds):
+ import _posixsubprocess, fcntl
+ passfds = sorted(passfds)
+ tmp = []
+ # temporarily unset CLOEXEC on passed fds
+ for fd in passfds:
+ flag = fcntl.fcntl(fd, fcntl.F_GETFD)
+ if flag & fcntl.FD_CLOEXEC:
+ fcntl.fcntl(fd, fcntl.F_SETFD, flag & ~fcntl.FD_CLOEXEC)
+ tmp.append((fd, flag))
+ errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe()
+ try:
+ return _posixsubprocess.fork_exec(
+ args, [os.fsencode(path)], True, passfds, None, None,
+ -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
+ False, False, None)
+ finally:
+ os.close(errpipe_read)
+ os.close(errpipe_write)
+ # reset CLOEXEC where necessary
+ for fd, flag in tmp:
+ fcntl.fcntl(fd, fcntl.F_SETFD, flag)
+
+#
+# Return pipe with CLOEXEC set on fds
+#
+
+def pipe():
+ import _posixsubprocess
+ return _posixsubprocess.cloexec_pipe()