summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2013-08-14 14:35:41 (GMT)
committerRichard Oudkerk <shibturn@gmail.com>2013-08-14 14:35:41 (GMT)
commit84ed9a68bd9a13252b376b21a9167dabae254325 (patch)
treeec8daa39fcf64b658bddf52f56ae47c0bdc2b091 /Lib
parentd06eeb4a2492b59d34ab69a2046dcae1f10ec593 (diff)
downloadcpython-84ed9a68bd9a13252b376b21a9167dabae254325.zip
cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.gz
cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.bz2
Issue #8713: Support alternative start methods in multiprocessing on Unix.
See http://hg.python.org/sandbox/sbt#spawn
Diffstat (limited to 'Lib')
-rw-r--r--Lib/multiprocessing/__init__.py107
-rw-r--r--Lib/multiprocessing/connection.py61
-rw-r--r--Lib/multiprocessing/dummy/__init__.py4
-rw-r--r--Lib/multiprocessing/forking.py477
-rw-r--r--Lib/multiprocessing/forkserver.py238
-rw-r--r--Lib/multiprocessing/heap.py56
-rw-r--r--Lib/multiprocessing/managers.py44
-rw-r--r--Lib/multiprocessing/pool.py84
-rw-r--r--Lib/multiprocessing/popen.py78
-rw-r--r--Lib/multiprocessing/popen_fork.py87
-rw-r--r--Lib/multiprocessing/popen_forkserver.py75
-rw-r--r--Lib/multiprocessing/popen_spawn_posix.py75
-rw-r--r--Lib/multiprocessing/popen_spawn_win32.py102
-rw-r--r--Lib/multiprocessing/process.py60
-rw-r--r--Lib/multiprocessing/queues.py36
-rw-r--r--Lib/multiprocessing/reduction.py363
-rw-r--r--Lib/multiprocessing/resource_sharer.py158
-rw-r--r--Lib/multiprocessing/semaphore_tracker.py135
-rw-r--r--Lib/multiprocessing/sharedctypes.py7
-rw-r--r--Lib/multiprocessing/spawn.py258
-rw-r--r--Lib/multiprocessing/synchronize.py73
-rw-r--r--Lib/multiprocessing/util.py70
-rw-r--r--Lib/test/_test_multiprocessing.py (renamed from Lib/test/test_multiprocessing.py)481
-rw-r--r--Lib/test/mp_fork_bomb.py5
-rwxr-xr-xLib/test/regrtest.py2
-rw-r--r--Lib/test/test_multiprocessing_fork.py7
-rw-r--r--Lib/test/test_multiprocessing_forkserver.py7
-rw-r--r--Lib/test/test_multiprocessing_spawn.py7
28 files changed, 2106 insertions, 1051 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index b42613f..fd75839 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -21,6 +21,8 @@ __all__ = [
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool',
'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
+ 'set_executable', 'set_start_method', 'get_start_method',
+ 'get_all_start_methods', 'set_forkserver_preload'
]
#
@@ -30,8 +32,14 @@ __all__ = [
import os
import sys
-from multiprocessing.process import Process, current_process, active_children
-from multiprocessing.util import SUBDEBUG, SUBWARNING
+from .process import Process, current_process, active_children
+
+#
+# XXX These should not really be documented or public.
+#
+
+SUBDEBUG = 5
+SUBWARNING = 25
#
# Alias for main module -- will be reset by bootstrapping child processes
@@ -56,8 +64,6 @@ class TimeoutError(ProcessError):
class AuthenticationError(ProcessError):
pass
-import _multiprocessing
-
#
# Definitions not depending on native semaphores
#
@@ -69,7 +75,7 @@ def Manager():
The managers methods such as `Lock()`, `Condition()` and `Queue()`
can be used to create shared objects.
'''
- from multiprocessing.managers import SyncManager
+ from .managers import SyncManager
m = SyncManager()
m.start()
return m
@@ -78,7 +84,7 @@ def Pipe(duplex=True):
'''
Returns two connection object connected by a pipe
'''
- from multiprocessing.connection import Pipe
+ from .connection import Pipe
return Pipe(duplex)
def cpu_count():
@@ -97,21 +103,21 @@ def freeze_support():
If so then run code specified by commandline and exit.
'''
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
- from multiprocessing.forking import freeze_support
+ from .spawn import freeze_support
freeze_support()
def get_logger():
'''
Return package logger -- if it does not already exist then it is created
'''
- from multiprocessing.util import get_logger
+ from .util import get_logger
return get_logger()
def log_to_stderr(level=None):
'''
Turn on logging and add a handler which prints to stderr
'''
- from multiprocessing.util import log_to_stderr
+ from .util import log_to_stderr
return log_to_stderr(level)
def allow_connection_pickling():
@@ -120,7 +126,7 @@ def allow_connection_pickling():
'''
# This is undocumented. In previous versions of multiprocessing
# its only effect was to make socket objects inheritable on Windows.
- import multiprocessing.connection
+ from . import connection
#
# Definitions depending on native semaphores
@@ -130,120 +136,151 @@ def Lock():
'''
Returns a non-recursive lock object
'''
- from multiprocessing.synchronize import Lock
+ from .synchronize import Lock
return Lock()
def RLock():
'''
Returns a recursive lock object
'''
- from multiprocessing.synchronize import RLock
+ from .synchronize import RLock
return RLock()
def Condition(lock=None):
'''
Returns a condition object
'''
- from multiprocessing.synchronize import Condition
+ from .synchronize import Condition
return Condition(lock)
def Semaphore(value=1):
'''
Returns a semaphore object
'''
- from multiprocessing.synchronize import Semaphore
+ from .synchronize import Semaphore
return Semaphore(value)
def BoundedSemaphore(value=1):
'''
Returns a bounded semaphore object
'''
- from multiprocessing.synchronize import BoundedSemaphore
+ from .synchronize import BoundedSemaphore
return BoundedSemaphore(value)
def Event():
'''
Returns an event object
'''
- from multiprocessing.synchronize import Event
+ from .synchronize import Event
return Event()
def Barrier(parties, action=None, timeout=None):
'''
Returns a barrier object
'''
- from multiprocessing.synchronize import Barrier
+ from .synchronize import Barrier
return Barrier(parties, action, timeout)
def Queue(maxsize=0):
'''
Returns a queue object
'''
- from multiprocessing.queues import Queue
+ from .queues import Queue
return Queue(maxsize)
def JoinableQueue(maxsize=0):
'''
Returns a queue object
'''
- from multiprocessing.queues import JoinableQueue
+ from .queues import JoinableQueue
return JoinableQueue(maxsize)
def SimpleQueue():
'''
Returns a queue object
'''
- from multiprocessing.queues import SimpleQueue
+ from .queues import SimpleQueue
return SimpleQueue()
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
- from multiprocessing.pool import Pool
+ from .pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def RawValue(typecode_or_type, *args):
'''
Returns a shared object
'''
- from multiprocessing.sharedctypes import RawValue
+ from .sharedctypes import RawValue
return RawValue(typecode_or_type, *args)
def RawArray(typecode_or_type, size_or_initializer):
'''
Returns a shared array
'''
- from multiprocessing.sharedctypes import RawArray
+ from .sharedctypes import RawArray
return RawArray(typecode_or_type, size_or_initializer)
def Value(typecode_or_type, *args, lock=True):
'''
Returns a synchronized shared object
'''
- from multiprocessing.sharedctypes import Value
+ from .sharedctypes import Value
return Value(typecode_or_type, *args, lock=lock)
def Array(typecode_or_type, size_or_initializer, *, lock=True):
'''
Returns a synchronized shared array
'''
- from multiprocessing.sharedctypes import Array
+ from .sharedctypes import Array
return Array(typecode_or_type, size_or_initializer, lock=lock)
#
#
#
-if sys.platform == 'win32':
+def set_executable(executable):
+ '''
+ Sets the path to a python.exe or pythonw.exe binary used to run
+ child processes instead of sys.executable when using the 'spawn'
+ start method. Useful for people embedding Python.
+ '''
+ from .spawn import set_executable
+ set_executable(executable)
+
+def set_start_method(method):
+ '''
+ Set method for starting processes: 'fork', 'spawn' or 'forkserver'.
+ '''
+ from .popen import set_start_method
+ set_start_method(method)
+
+def get_start_method():
+ '''
+ Get method for starting processes: 'fork', 'spawn' or 'forkserver'.
+ '''
+ from .popen import get_start_method
+ return get_start_method()
- def set_executable(executable):
- '''
- Sets the path to a python.exe or pythonw.exe binary used to run
- child processes on Windows instead of sys.executable.
- Useful for people embedding Python.
- '''
- from multiprocessing.forking import set_executable
- set_executable(executable)
+def get_all_start_methods():
+ '''
+ Get list of availables start methods, default first.
+ '''
+ from .popen import get_all_start_methods
+ return get_all_start_methods()
- __all__ += ['set_executable']
+def set_forkserver_preload(module_names):
+ '''
+ Set list of module names to try to load in the forkserver process
+ when it is started. Properly chosen this can significantly reduce
+ the cost of starting a new process using the forkserver method.
+ The default list is ['__main__'].
+ '''
+ try:
+ from .forkserver import set_forkserver_preload
+ except ImportError:
+ pass
+ else:
+ set_forkserver_preload(module_names)
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 1093d9f..443fa7e 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -21,9 +21,13 @@ import tempfile
import itertools
import _multiprocessing
-from multiprocessing import current_process, AuthenticationError, BufferTooShort
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import ForkingPickler
+
+from . import reduction
+from . import util
+
+from . import AuthenticationError, BufferTooShort
+from .reduction import ForkingPickler
+
try:
import _winapi
from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
@@ -71,7 +75,7 @@ def arbitrary_address(family):
if family == 'AF_INET':
return ('localhost', 0)
elif family == 'AF_UNIX':
- return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
+ return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
(os.getpid(), next(_mmap_counter)))
@@ -505,7 +509,7 @@ if sys.platform != 'win32':
c1 = Connection(s1.detach())
c2 = Connection(s2.detach())
else:
- fd1, fd2 = os.pipe()
+ fd1, fd2 = util.pipe()
c1 = Connection(fd1, writable=False)
c2 = Connection(fd2, readable=False)
@@ -577,7 +581,7 @@ class SocketListener(object):
self._last_accepted = None
if family == 'AF_UNIX':
- self._unlink = Finalize(
+ self._unlink = util.Finalize(
self, os.unlink, args=(address,), exitpriority=0
)
else:
@@ -625,8 +629,8 @@ if sys.platform == 'win32':
self._handle_queue = [self._new_handle(first=True)]
self._last_accepted = None
- sub_debug('listener created with address=%r', self._address)
- self.close = Finalize(
+ util.sub_debug('listener created with address=%r', self._address)
+ self.close = util.Finalize(
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
@@ -668,7 +672,7 @@ if sys.platform == 'win32':
@staticmethod
def _finalize_pipe_listener(queue, address):
- sub_debug('closing listener with address=%r', address)
+ util.sub_debug('closing listener with address=%r', address)
for handle in queue:
_winapi.CloseHandle(handle)
@@ -919,15 +923,32 @@ else:
#
if sys.platform == 'win32':
- from . import reduction
- ForkingPickler.register(socket.socket, reduction.reduce_socket)
- ForkingPickler.register(Connection, reduction.reduce_connection)
- ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
+ def reduce_connection(conn):
+ handle = conn.fileno()
+ with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
+ from . import resource_sharer
+ ds = resource_sharer.DupSocket(s)
+ return rebuild_connection, (ds, conn.readable, conn.writable)
+ def rebuild_connection(ds, readable, writable):
+ sock = ds.detach()
+ return Connection(sock.detach(), readable, writable)
+ reduction.register(Connection, reduce_connection)
+
+ def reduce_pipe_connection(conn):
+ access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
+ (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
+ dh = reduction.DupHandle(conn.fileno(), access)
+ return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
+ def rebuild_pipe_connection(dh, readable, writable):
+ handle = dh.detach()
+ return PipeConnection(handle, readable, writable)
+ reduction.register(PipeConnection, reduce_pipe_connection)
+
else:
- try:
- from . import reduction
- except ImportError:
- pass
- else:
- ForkingPickler.register(socket.socket, reduction.reduce_socket)
- ForkingPickler.register(Connection, reduction.reduce_connection)
+ def reduce_connection(conn):
+ df = reduction.DupFd(conn.fileno())
+ return rebuild_connection, (df, conn.readable, conn.writable)
+ def rebuild_connection(df, readable, writable):
+ fd = df.detach()
+ return Connection(fd, readable, writable)
+ reduction.register(Connection, reduce_connection)
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index 20ae957..97f7af7 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -22,7 +22,7 @@ import sys
import weakref
import array
-from multiprocessing.dummy.connection import Pipe
+from .connection import Pipe
from threading import Lock, RLock, Semaphore, BoundedSemaphore
from threading import Event, Condition, Barrier
from queue import Queue
@@ -113,7 +113,7 @@ def shutdown():
pass
def Pool(processes=None, initializer=None, initargs=()):
- from multiprocessing.pool import ThreadPool
+ from ..pool import ThreadPool
return ThreadPool(processes, initializer, initargs)
JoinableQueue = Queue
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
deleted file mode 100644
index 39cfd8d..0000000
--- a/Lib/multiprocessing/forking.py
+++ /dev/null
@@ -1,477 +0,0 @@
-#
-# Module for starting a process object using os.fork() or CreateProcess()
-#
-# multiprocessing/forking.py
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# Licensed to PSF under a Contributor Agreement.
-#
-
-import io
-import os
-import pickle
-import sys
-import signal
-import errno
-
-from multiprocessing import util, process
-
-__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler']
-
-#
-# Check that the current thread is spawning a child process
-#
-
-def assert_spawning(self):
- if not Popen.thread_is_spawning():
- raise RuntimeError(
- '%s objects should only be shared between processes'
- ' through inheritance' % type(self).__name__
- )
-
-#
-# Try making some callable types picklable
-#
-
-from pickle import Pickler
-from copyreg import dispatch_table
-
-class ForkingPickler(Pickler):
- _extra_reducers = {}
- def __init__(self, *args):
- Pickler.__init__(self, *args)
- self.dispatch_table = dispatch_table.copy()
- self.dispatch_table.update(self._extra_reducers)
- @classmethod
- def register(cls, type, reduce):
- cls._extra_reducers[type] = reduce
-
- @staticmethod
- def dumps(obj):
- buf = io.BytesIO()
- ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
- return buf.getbuffer()
-
- loads = pickle.loads
-
-
-def _reduce_method(m):
- if m.__self__ is None:
- return getattr, (m.__class__, m.__func__.__name__)
- else:
- return getattr, (m.__self__, m.__func__.__name__)
-class _C:
- def f(self):
- pass
-ForkingPickler.register(type(_C().f), _reduce_method)
-
-
-def _reduce_method_descriptor(m):
- return getattr, (m.__objclass__, m.__name__)
-ForkingPickler.register(type(list.append), _reduce_method_descriptor)
-ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
-
-try:
- from functools import partial
-except ImportError:
- pass
-else:
- def _reduce_partial(p):
- return _rebuild_partial, (p.func, p.args, p.keywords or {})
- def _rebuild_partial(func, args, keywords):
- return partial(func, *args, **keywords)
- ForkingPickler.register(partial, _reduce_partial)
-
-#
-# Unix
-#
-
-if sys.platform != 'win32':
- duplicate = os.dup
- close = os.close
-
- #
- # We define a Popen class similar to the one from subprocess, but
- # whose constructor takes a process object as its argument.
- #
-
- class Popen(object):
-
- def __init__(self, process_obj):
- sys.stdout.flush()
- sys.stderr.flush()
- self.returncode = None
-
- r, w = os.pipe()
- self.sentinel = r
-
- self.pid = os.fork()
- if self.pid == 0:
- os.close(r)
- if 'random' in sys.modules:
- import random
- random.seed()
- code = process_obj._bootstrap()
- os._exit(code)
-
- # `w` will be closed when the child exits, at which point `r`
- # will become ready for reading (using e.g. select()).
- os.close(w)
- util.Finalize(self, os.close, (r,))
-
- def poll(self, flag=os.WNOHANG):
- if self.returncode is None:
- while True:
- try:
- pid, sts = os.waitpid(self.pid, flag)
- except OSError as e:
- if e.errno == errno.EINTR:
- continue
- # Child process not yet created. See #1731717
- # e.errno == errno.ECHILD == 10
- return None
- else:
- break
- if pid == self.pid:
- if os.WIFSIGNALED(sts):
- self.returncode = -os.WTERMSIG(sts)
- else:
- assert os.WIFEXITED(sts)
- self.returncode = os.WEXITSTATUS(sts)
- return self.returncode
-
- def wait(self, timeout=None):
- if self.returncode is None:
- if timeout is not None:
- from .connection import wait
- if not wait([self.sentinel], timeout):
- return None
- # This shouldn't block if wait() returned successfully.
- return self.poll(os.WNOHANG if timeout == 0.0 else 0)
- return self.returncode
-
- def terminate(self):
- if self.returncode is None:
- try:
- os.kill(self.pid, signal.SIGTERM)
- except OSError:
- if self.wait(timeout=0.1) is None:
- raise
-
- @staticmethod
- def thread_is_spawning():
- return False
-
-#
-# Windows
-#
-
-else:
- import _thread
- import msvcrt
- import _winapi
-
- from pickle import load, HIGHEST_PROTOCOL
-
- def dump(obj, file, protocol=None):
- ForkingPickler(file, protocol).dump(obj)
-
- #
- #
- #
-
- TERMINATE = 0x10000
- WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
- WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
-
- close = _winapi.CloseHandle
-
- #
- # _python_exe is the assumed path to the python executable.
- # People embedding Python want to modify it.
- #
-
- if WINSERVICE:
- _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
- else:
- _python_exe = sys.executable
-
- def set_executable(exe):
- global _python_exe
- _python_exe = exe
-
- #
- #
- #
-
- def duplicate(handle, target_process=None, inheritable=False):
- if target_process is None:
- target_process = _winapi.GetCurrentProcess()
- return _winapi.DuplicateHandle(
- _winapi.GetCurrentProcess(), handle, target_process,
- 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
- )
-
- #
- # We define a Popen class similar to the one from subprocess, but
- # whose constructor takes a process object as its argument.
- #
-
- class Popen(object):
- '''
- Start a subprocess to run the code of a process object
- '''
- _tls = _thread._local()
-
- def __init__(self, process_obj):
- cmd = ' '.join('"%s"' % x for x in get_command_line())
- prep_data = get_preparation_data(process_obj._name)
-
- # create pipe for communication with child
- rfd, wfd = os.pipe()
-
- # get handle for read end of the pipe and make it inheritable
- rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
- os.close(rfd)
-
- with open(wfd, 'wb', closefd=True) as to_child:
- # start process
- try:
- hp, ht, pid, tid = _winapi.CreateProcess(
- _python_exe, cmd + (' %s' % rhandle),
- None, None, 1, 0, None, None, None
- )
- _winapi.CloseHandle(ht)
- finally:
- close(rhandle)
-
- # set attributes of self
- self.pid = pid
- self.returncode = None
- self._handle = hp
- self.sentinel = int(hp)
- util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
-
- # send information to child
- Popen._tls.process_handle = int(hp)
- try:
- dump(prep_data, to_child, HIGHEST_PROTOCOL)
- dump(process_obj, to_child, HIGHEST_PROTOCOL)
- finally:
- del Popen._tls.process_handle
-
- @staticmethod
- def thread_is_spawning():
- return getattr(Popen._tls, 'process_handle', None) is not None
-
- @staticmethod
- def duplicate_for_child(handle):
- return duplicate(handle, Popen._tls.process_handle)
-
- def wait(self, timeout=None):
- if self.returncode is None:
- if timeout is None:
- msecs = _winapi.INFINITE
- else:
- msecs = max(0, int(timeout * 1000 + 0.5))
-
- res = _winapi.WaitForSingleObject(int(self._handle), msecs)
- if res == _winapi.WAIT_OBJECT_0:
- code = _winapi.GetExitCodeProcess(self._handle)
- if code == TERMINATE:
- code = -signal.SIGTERM
- self.returncode = code
-
- return self.returncode
-
- def poll(self):
- return self.wait(timeout=0)
-
- def terminate(self):
- if self.returncode is None:
- try:
- _winapi.TerminateProcess(int(self._handle), TERMINATE)
- except OSError:
- if self.wait(timeout=1.0) is None:
- raise
-
- #
- #
- #
-
- def is_forking(argv):
- '''
- Return whether commandline indicates we are forking
- '''
- if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
- assert len(argv) == 3
- return True
- else:
- return False
-
-
- def freeze_support():
- '''
- Run code for process object if this in not the main process
- '''
- if is_forking(sys.argv):
- main()
- sys.exit()
-
-
- def get_command_line():
- '''
- Returns prefix of command line used for spawning a child process
- '''
- if getattr(process.current_process(), '_inheriting', False):
- raise RuntimeError('''
- Attempt to start a new process before the current process
- has finished its bootstrapping phase.
-
- This probably means that you are on Windows and you have
- forgotten to use the proper idiom in the main module:
-
- if __name__ == '__main__':
- freeze_support()
- ...
-
- The "freeze_support()" line can be omitted if the program
- is not going to be frozen to produce a Windows executable.''')
-
- if getattr(sys, 'frozen', False):
- return [sys.executable, '--multiprocessing-fork']
- else:
- prog = 'from multiprocessing.forking import main; main()'
- opts = util._args_from_interpreter_flags()
- return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
-
-
- def main():
- '''
- Run code specifed by data received over pipe
- '''
- assert is_forking(sys.argv)
-
- handle = int(sys.argv[-1])
- fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
- from_parent = os.fdopen(fd, 'rb')
-
- process.current_process()._inheriting = True
- preparation_data = load(from_parent)
- prepare(preparation_data)
- self = load(from_parent)
- process.current_process()._inheriting = False
-
- from_parent.close()
-
- exitcode = self._bootstrap()
- sys.exit(exitcode)
-
-
- def get_preparation_data(name):
- '''
- Return info about parent needed by child to unpickle process object
- '''
- from .util import _logger, _log_to_stderr
-
- d = dict(
- name=name,
- sys_path=sys.path,
- sys_argv=sys.argv,
- log_to_stderr=_log_to_stderr,
- orig_dir=process.ORIGINAL_DIR,
- authkey=process.current_process().authkey,
- )
-
- if _logger is not None:
- d['log_level'] = _logger.getEffectiveLevel()
-
- if not WINEXE and not WINSERVICE:
- main_path = getattr(sys.modules['__main__'], '__file__', None)
- if not main_path and sys.argv[0] not in ('', '-c'):
- main_path = sys.argv[0]
- if main_path is not None:
- if not os.path.isabs(main_path) and \
- process.ORIGINAL_DIR is not None:
- main_path = os.path.join(process.ORIGINAL_DIR, main_path)
- d['main_path'] = os.path.normpath(main_path)
-
- return d
-
-#
-# Prepare current process
-#
-
-old_main_modules = []
-
-def prepare(data):
- '''
- Try to get current process ready to unpickle process object
- '''
- old_main_modules.append(sys.modules['__main__'])
-
- if 'name' in data:
- process.current_process().name = data['name']
-
- if 'authkey' in data:
- process.current_process()._authkey = data['authkey']
-
- if 'log_to_stderr' in data and data['log_to_stderr']:
- util.log_to_stderr()
-
- if 'log_level' in data:
- util.get_logger().setLevel(data['log_level'])
-
- if 'sys_path' in data:
- sys.path = data['sys_path']
-
- if 'sys_argv' in data:
- sys.argv = data['sys_argv']
-
- if 'dir' in data:
- os.chdir(data['dir'])
-
- if 'orig_dir' in data:
- process.ORIGINAL_DIR = data['orig_dir']
-
- if 'main_path' in data:
- # XXX (ncoghlan): The following code makes several bogus
- # assumptions regarding the relationship between __file__
- # and a module's real name. See PEP 302 and issue #10845
- main_path = data['main_path']
- main_name = os.path.splitext(os.path.basename(main_path))[0]
- if main_name == '__init__':
- main_name = os.path.basename(os.path.dirname(main_path))
-
- if main_name == '__main__':
- main_module = sys.modules['__main__']
- main_module.__file__ = main_path
- elif main_name != 'ipython':
- # Main modules not actually called __main__.py may
- # contain additional code that should still be executed
- import importlib
- import types
-
- if main_path is None:
- dirs = None
- elif os.path.basename(main_path).startswith('__init__.py'):
- dirs = [os.path.dirname(os.path.dirname(main_path))]
- else:
- dirs = [os.path.dirname(main_path)]
-
- assert main_name not in sys.modules, main_name
- sys.modules.pop('__mp_main__', None)
- # We should not try to load __main__
- # since that would execute 'if __name__ == "__main__"'
- # clauses, potentially causing a psuedo fork bomb.
- loader = importlib.find_loader(main_name, path=dirs)
- main_module = types.ModuleType(main_name)
- try:
- loader.init_module_attrs(main_module)
- except AttributeError: # init_module_attrs is optional
- pass
- main_module.__name__ = '__mp_main__'
- code = loader.get_code(main_name)
- exec(code, main_module.__dict__)
-
- sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
new file mode 100644
index 0000000..628808e
--- /dev/null
+++ b/Lib/multiprocessing/forkserver.py
@@ -0,0 +1,238 @@
+import errno
+import os
+import select
+import signal
+import socket
+import struct
+import sys
+import threading
+
+from . import connection
+from . import process
+from . import reduction
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
+ 'set_forkserver_preload']
+
+#
+#
+#
+
+MAXFDS_TO_SEND = 256
+UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t
+
+_inherited_fds = None
+_lock = threading.Lock()
+_preload_modules = ['__main__']
+
+
+#
+# Public function
+#
+
+def set_forkserver_preload(modules_names):
+ '''Set list of module names to try to load in forkserver process.'''
+ global _preload_modules
+ _preload_modules = modules_names
+
+
+def get_inherited_fds():
+ '''Return list of fds inherited from parent process.
+
+ This returns None if the current process was not started by fork server.
+ '''
+ return _inherited_fds
+
+
+def connect_to_new_process(fds):
+ '''Request forkserver to create a child process.
+
+ Returns a pair of fds (status_r, data_w). The calling process can read
+ the child process's pid and (eventually) its returncode from status_r.
+ The calling process should write to data_w the pickled preparation and
+ process data.
+ '''
+ if len(fds) + 3 >= MAXFDS_TO_SEND:
+ raise ValueError('too many fds')
+ address, alive_w = process.current_process()._config['forkserver_info']
+ with socket.socket(socket.AF_UNIX) as client:
+ client.connect(address)
+ parent_r, child_w = util.pipe()
+ child_r, parent_w = util.pipe()
+ allfds = [child_r, child_w, alive_w]
+ allfds += fds
+ try:
+ reduction.sendfds(client, allfds)
+ return parent_r, parent_w
+ except:
+ os.close(parent_r)
+ os.close(parent_w)
+ raise
+ finally:
+ os.close(child_r)
+ os.close(child_w)
+
+
+def ensure_running():
+ '''Make sure that a fork server is running.
+
+ This can be called from any process. Note that usually a child
+ process will just reuse the forkserver started by its parent, so
+ ensure_running() will do nothing.
+ '''
+ with _lock:
+ config = process.current_process()._config
+ if config.get('forkserver_info') is not None:
+ return
+
+ assert all(type(mod) is str for mod in _preload_modules)
+ semaphore_tracker_fd = config['semaphore_tracker_fd']
+ cmd = ('from multiprocessing.forkserver import main; ' +
+ 'main(%d, %d, %r, **%r)')
+
+ if _preload_modules:
+ desired_keys = {'main_path', 'sys_path'}
+ data = spawn.get_preparation_data('ignore')
+ data = dict((x,y) for (x,y) in data.items() if x in desired_keys)
+ else:
+ data = {}
+
+ with socket.socket(socket.AF_UNIX) as listener:
+ address = connection.arbitrary_address('AF_UNIX')
+ listener.bind(address)
+ os.chmod(address, 0o600)
+ listener.listen(100)
+
+ # all client processes own the write end of the "alive" pipe;
+ # when they all terminate the read end becomes ready.
+ alive_r, alive_w = os.pipe()
+ config['forkserver_info'] = (address, alive_w)
+ fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
+ cmd %= (listener.fileno(), alive_r, _preload_modules, data)
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+
+
+def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
+ '''Run forkserver.'''
+ if preload:
+ if '__main__' in preload and main_path is not None:
+ process.current_process()._inheriting = True
+ try:
+ spawn.import_main_path(main_path)
+ finally:
+ del process.current_process()._inheriting
+ for modname in preload:
+ try:
+ __import__(modname)
+ except ImportError:
+ pass
+
+ # close sys.stdin
+ if sys.stdin is not None:
+ try:
+ sys.stdin.close()
+ sys.stdin = open(os.devnull)
+ except (OSError, ValueError):
+ pass
+
+ # ignoring SIGCHLD means no need to reap zombie processes
+ handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
+ with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
+ readers = [listener, alive_r]
+
+ while True:
+ try:
+ rfds, wfds, xfds = select.select(readers, [], [])
+
+ if alive_r in rfds:
+ # EOF because no more client processes left
+ assert os.read(alive_r, 1) == b''
+ raise SystemExit
+
+ assert listener in rfds
+ with listener.accept()[0] as s:
+ code = 1
+ if os.fork() == 0:
+ try:
+ _serve_one(s, listener, alive_r, handler)
+ except Exception:
+ sys.excepthook(*sys.exc_info())
+ sys.stderr.flush()
+ finally:
+ os._exit(code)
+
+ except InterruptedError:
+ pass
+ except OSError as e:
+ if e.errno != errno.ECONNABORTED:
+ raise
+
+#
+# Code to bootstrap new process
+#
+
+def _serve_one(s, listener, alive_r, handler):
+ global _inherited_fds
+
+ # close unnecessary stuff and reset SIGCHLD handler
+ listener.close()
+ os.close(alive_r)
+ signal.signal(signal.SIGCHLD, handler)
+
+ # receive fds from parent process
+ fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
+ s.close()
+ assert len(fds) <= MAXFDS_TO_SEND
+ child_r, child_w, alive_w, *_inherited_fds = fds
+
+ # send pid to client processes
+ write_unsigned(child_w, os.getpid())
+
+ # reseed random number generator
+ if 'random' in sys.modules:
+ import random
+ random.seed()
+
+ # run process object received over pipe
+ code = spawn._main(child_r)
+
+ # write the exit code to the pipe
+ write_unsigned(child_w, code)
+
+#
+# Read and write unsigned numbers
+#
+
+def read_unsigned(fd):
+ data = b''
+ length = UNSIGNED_STRUCT.size
+ while len(data) < length:
+ while True:
+ try:
+ s = os.read(fd, length - len(data))
+ except InterruptedError:
+ pass
+ else:
+ break
+ if not s:
+ raise EOFError('unexpected EOF')
+ data += s
+ return UNSIGNED_STRUCT.unpack(data)[0]
+
+def write_unsigned(fd, n):
+ msg = UNSIGNED_STRUCT.pack(n)
+ while msg:
+ while True:
+ try:
+ nbytes = os.write(fd, msg)
+ except InterruptedError:
+ pass
+ else:
+ break
+ if nbytes == 0:
+ raise RuntimeError('should not get here')
+ msg = msg[nbytes:]
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index e63fdb8..b95f90f 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -8,15 +8,17 @@
#
import bisect
+import itertools
import mmap
import os
import sys
+import tempfile
import threading
-import itertools
-
import _multiprocessing
-from multiprocessing.util import Finalize, info
-from multiprocessing.forking import assert_spawning
+
+from . import popen
+from . import reduction
+from . import util
__all__ = ['BufferWrapper']
@@ -30,17 +32,25 @@ if sys.platform == 'win32':
class Arena(object):
- _counter = itertools.count()
+ _rand = tempfile._RandomNameSequence()
def __init__(self, size):
self.size = size
- self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
- self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
- assert _winapi.GetLastError() == 0, 'tagname already in use'
+ for i in range(100):
+ name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
+ buf = mmap.mmap(-1, size, tagname=name)
+ if _winapi.GetLastError() == 0:
+ break
+ # We have reopened a preexisting mmap.
+ buf.close()
+ else:
+ raise FileExistsError('Cannot find name for new mmap')
+ self.name = name
+ self.buffer = buf
self._state = (self.size, self.name)
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
return self._state
def __setstate__(self, state):
@@ -52,10 +62,28 @@ else:
class Arena(object):
- def __init__(self, size):
- self.buffer = mmap.mmap(-1, size)
+ def __init__(self, size, fd=-1):
self.size = size
- self.name = None
+ self.fd = fd
+ if fd == -1:
+ self.fd, name = tempfile.mkstemp(
+ prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir())
+ os.unlink(name)
+ util.Finalize(self, os.close, (self.fd,))
+ with open(self.fd, 'wb', closefd=False) as f:
+ f.write(b'\0'*size)
+ self.buffer = mmap.mmap(self.fd, self.size)
+
+ def reduce_arena(a):
+ if a.fd == -1:
+ raise ValueError('Arena is unpicklable because '
+ 'forking was enabled when it was created')
+ return rebuild_arena, (a.size, reduction.DupFd(a.fd))
+
+ def rebuild_arena(size, dupfd):
+ return Arena(size, dupfd.detach())
+
+ reduction.register(Arena, reduce_arena)
#
# Class allowing allocation of chunks of memory from arenas
@@ -90,7 +118,7 @@ class Heap(object):
if i == len(self._lengths):
length = self._roundup(max(self._size, size), mmap.PAGESIZE)
self._size *= 2
- info('allocating a new mmap of length %d', length)
+ util.info('allocating a new mmap of length %d', length)
arena = Arena(length)
self._arenas.append(arena)
return (arena, 0, length)
@@ -216,7 +244,7 @@ class BufferWrapper(object):
assert 0 <= size < sys.maxsize
block = BufferWrapper._heap.malloc(size)
self._state = (block, size)
- Finalize(self, BufferWrapper._heap.free, args=(block,))
+ util.Finalize(self, BufferWrapper._heap.free, args=(block,))
def create_memoryview(self):
(arena, start, stop), size = self._state
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 36cd650..f580e9e 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -19,11 +19,15 @@ import threading
import array
import queue
-from traceback import format_exc
-from multiprocessing import Process, current_process, active_children, Pool, util, connection
-from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import Popen, ForkingPickler
from time import time as _time
+from traceback import format_exc
+
+from . import connection
+from . import pool
+from . import process
+from . import popen
+from . import reduction
+from . import util
#
# Register some things for pickling
@@ -31,16 +35,14 @@ from time import time as _time
def reduce_array(a):
return array.array, (a.typecode, a.tobytes())
-ForkingPickler.register(array.array, reduce_array)
+reduction.register(array.array, reduce_array)
view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
if view_types[0] is not list: # only needed in Py3.0
def rebuild_as_list(obj):
return list, (list(obj),)
for view_type in view_types:
- ForkingPickler.register(view_type, rebuild_as_list)
- import copyreg
- copyreg.pickle(view_type, rebuild_as_list)
+ reduction.register(view_type, rebuild_as_list)
#
# Type for identifying shared objects
@@ -130,7 +132,7 @@ class Server(object):
def __init__(self, registry, address, authkey, serializer):
assert isinstance(authkey, bytes)
self.registry = registry
- self.authkey = AuthenticationString(authkey)
+ self.authkey = process.AuthenticationString(authkey)
Listener, Client = listener_client[serializer]
# do authentication later
@@ -146,7 +148,7 @@ class Server(object):
Run the server forever
'''
self.stop_event = threading.Event()
- current_process()._manager_server = self
+ process.current_process()._manager_server = self
try:
accepter = threading.Thread(target=self.accepter)
accepter.daemon = True
@@ -438,9 +440,9 @@ class BaseManager(object):
def __init__(self, address=None, authkey=None, serializer='pickle'):
if authkey is None:
- authkey = current_process().authkey
+ authkey = process.current_process().authkey
self._address = address # XXX not final address if eg ('', 0)
- self._authkey = AuthenticationString(authkey)
+ self._authkey = process.AuthenticationString(authkey)
self._state = State()
self._state.value = State.INITIAL
self._serializer = serializer
@@ -476,7 +478,7 @@ class BaseManager(object):
reader, writer = connection.Pipe(duplex=False)
# spawn process which runs a server
- self._process = Process(
+ self._process = process.Process(
target=type(self)._run_server,
args=(self._registry, self._address, self._authkey,
self._serializer, writer, initializer, initargs),
@@ -691,11 +693,11 @@ class BaseProxy(object):
self._Client = listener_client[serializer][1]
if authkey is not None:
- self._authkey = AuthenticationString(authkey)
+ self._authkey = process.AuthenticationString(authkey)
elif self._manager is not None:
self._authkey = self._manager._authkey
else:
- self._authkey = current_process().authkey
+ self._authkey = process.current_process().authkey
if incref:
self._incref()
@@ -704,7 +706,7 @@ class BaseProxy(object):
def _connect(self):
util.debug('making connection to manager')
- name = current_process().name
+ name = process.current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
conn = self._Client(self._token.address, authkey=self._authkey)
@@ -798,7 +800,7 @@ class BaseProxy(object):
def __reduce__(self):
kwds = {}
- if Popen.thread_is_spawning():
+ if popen.get_spawning_popen() is not None:
kwds['authkey'] = self._authkey
if getattr(self, '_isauto', False):
@@ -835,14 +837,14 @@ def RebuildProxy(func, token, serializer, kwds):
If possible the shared object is returned, or otherwise a proxy for it.
'''
- server = getattr(current_process(), '_manager_server', None)
+ server = getattr(process.current_process(), '_manager_server', None)
if server and server.address == token.address:
return server.id_to_obj[token.id][0]
else:
incref = (
kwds.pop('incref', True) and
- not getattr(current_process(), '_inheriting', False)
+ not getattr(process.current_process(), '_inheriting', False)
)
return func(token, serializer, incref=incref, **kwds)
@@ -889,7 +891,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None,
if authkey is None and manager is not None:
authkey = manager._authkey
if authkey is None:
- authkey = current_process().authkey
+ authkey = process.current_process().authkey
ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
@@ -1109,7 +1111,7 @@ SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
-SyncManager.register('Pool', Pool, PoolProxy)
+SyncManager.register('Pool', pool.Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Value', Value, ValueProxy)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 8082ad6..1cecd09 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -7,7 +7,7 @@
# Licensed to PSF under a Contributor Agreement.
#
-__all__ = ['Pool']
+__all__ = ['Pool', 'ThreadPool']
#
# Imports
@@ -21,8 +21,10 @@ import os
import time
import traceback
-from multiprocessing import Process, TimeoutError
-from multiprocessing.util import Finalize, debug
+# If threading is available then ThreadPool should be provided. Therefore
+# we avoid top-level imports which are liable to fail on some systems.
+from . import util
+from . import Process, cpu_count, TimeoutError, SimpleQueue
#
# Constants representing the state of a pool
@@ -104,11 +106,11 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
try:
task = get()
except (EOFError, OSError):
- debug('worker got EOFError or OSError -- exiting')
+ util.debug('worker got EOFError or OSError -- exiting')
break
if task is None:
- debug('worker got sentinel -- exiting')
+ util.debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
@@ -121,11 +123,11 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
put((job, i, result))
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
- debug("Possible encoding error while sending result: %s" % (
+ util.debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))
completed += 1
- debug('worker exiting after %d tasks' % completed)
+ util.debug('worker exiting after %d tasks' % completed)
#
# Class representing a process pool
@@ -184,7 +186,7 @@ class Pool(object):
self._result_handler._state = RUN
self._result_handler.start()
- self._terminate = Finalize(
+ self._terminate = util.Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._worker_handler, self._task_handler,
@@ -201,7 +203,7 @@ class Pool(object):
worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
- debug('cleaning up worker %d' % i)
+ util.debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
del self._pool[i]
@@ -221,7 +223,7 @@ class Pool(object):
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
- debug('added worker')
+ util.debug('added worker')
def _maintain_pool(self):
"""Clean up any exited workers and start replacements for them.
@@ -230,7 +232,6 @@ class Pool(object):
self._repopulate_pool()
def _setup_queues(self):
- from .queues import SimpleQueue
self._inqueue = SimpleQueue()
self._outqueue = SimpleQueue()
self._quick_put = self._inqueue._writer.send
@@ -358,7 +359,7 @@ class Pool(object):
time.sleep(0.1)
# send sentinel to stop workers
pool._taskqueue.put(None)
- debug('worker handler exiting')
+ util.debug('worker handler exiting')
@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool):
@@ -368,36 +369,36 @@ class Pool(object):
i = -1
for i, task in enumerate(taskseq):
if thread._state:
- debug('task handler found thread._state != RUN')
+ util.debug('task handler found thread._state != RUN')
break
try:
put(task)
except OSError:
- debug('could not put task on queue')
+ util.debug('could not put task on queue')
break
else:
if set_length:
- debug('doing set_length()')
+ util.debug('doing set_length()')
set_length(i+1)
continue
break
else:
- debug('task handler got sentinel')
+ util.debug('task handler got sentinel')
try:
# tell result handler to finish when cache is empty
- debug('task handler sending sentinel to result handler')
+ util.debug('task handler sending sentinel to result handler')
outqueue.put(None)
# tell workers there is no more work
- debug('task handler sending sentinel to workers')
+ util.debug('task handler sending sentinel to workers')
for p in pool:
put(None)
except OSError:
- debug('task handler got OSError when sending sentinels')
+ util.debug('task handler got OSError when sending sentinels')
- debug('task handler exiting')
+ util.debug('task handler exiting')
@staticmethod
def _handle_results(outqueue, get, cache):
@@ -407,16 +408,16 @@ class Pool(object):
try:
task = get()
except (OSError, EOFError):
- debug('result handler got EOFError/OSError -- exiting')
+ util.debug('result handler got EOFError/OSError -- exiting')
return
if thread._state:
assert thread._state == TERMINATE
- debug('result handler found thread._state=TERMINATE')
+ util.debug('result handler found thread._state=TERMINATE')
break
if task is None:
- debug('result handler got sentinel')
+ util.debug('result handler got sentinel')
break
job, i, obj = task
@@ -429,11 +430,11 @@ class Pool(object):
try:
task = get()
except (OSError, EOFError):
- debug('result handler got EOFError/OSError -- exiting')
+ util.debug('result handler got EOFError/OSError -- exiting')
return
if task is None:
- debug('result handler ignoring extra sentinel')
+ util.debug('result handler ignoring extra sentinel')
continue
job, i, obj = task
try:
@@ -442,7 +443,7 @@ class Pool(object):
pass
if hasattr(outqueue, '_reader'):
- debug('ensuring that outqueue is not full')
+ util.debug('ensuring that outqueue is not full')
# If we don't make room available in outqueue then
# attempts to add the sentinel (None) to outqueue may
# block. There is guaranteed to be no more than 2 sentinels.
@@ -454,7 +455,7 @@ class Pool(object):
except (OSError, EOFError):
pass
- debug('result handler exiting: len(cache)=%s, thread._state=%s',
+ util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
len(cache), thread._state)
@staticmethod
@@ -472,19 +473,19 @@ class Pool(object):
)
def close(self):
- debug('closing pool')
+ util.debug('closing pool')
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
def terminate(self):
- debug('terminating pool')
+ util.debug('terminating pool')
self._state = TERMINATE
self._worker_handler._state = TERMINATE
self._terminate()
def join(self):
- debug('joining pool')
+ util.debug('joining pool')
assert self._state in (CLOSE, TERMINATE)
self._worker_handler.join()
self._task_handler.join()
@@ -495,7 +496,7 @@ class Pool(object):
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# task_handler may be blocked trying to put items on inqueue
- debug('removing tasks from inqueue until task handler finished')
+ util.debug('removing tasks from inqueue until task handler finished')
inqueue._rlock.acquire()
while task_handler.is_alive() and inqueue._reader.poll():
inqueue._reader.recv()
@@ -505,12 +506,12 @@ class Pool(object):
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
- debug('finalizing pool')
+ util.debug('finalizing pool')
worker_handler._state = TERMINATE
task_handler._state = TERMINATE
- debug('helping task handler/workers to finish')
+ util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
assert result_handler.is_alive() or len(cache) == 0
@@ -520,31 +521,31 @@ class Pool(object):
# We must wait for the worker handler to exit before terminating
# workers because we don't want workers to be restarted behind our back.
- debug('joining worker handler')
+ util.debug('joining worker handler')
if threading.current_thread() is not worker_handler:
worker_handler.join()
# Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
- debug('terminating workers')
+ util.debug('terminating workers')
for p in pool:
if p.exitcode is None:
p.terminate()
- debug('joining task handler')
+ util.debug('joining task handler')
if threading.current_thread() is not task_handler:
task_handler.join()
- debug('joining result handler')
+ util.debug('joining result handler')
if threading.current_thread() is not result_handler:
result_handler.join()
if pool and hasattr(pool[0], 'terminate'):
- debug('joining pool workers')
+ util.debug('joining pool workers')
for p in pool:
if p.is_alive():
# worker has not yet exited
- debug('cleaning up worker %d' % p.pid)
+ util.debug('cleaning up worker %d' % p.pid)
p.join()
def __enter__(self):
@@ -730,7 +731,10 @@ class IMapUnorderedIterator(IMapIterator):
class ThreadPool(Pool):
- from .dummy import Process
+ @staticmethod
+ def Process(*args, **kwds):
+ from .dummy import Process
+ return Process(*args, **kwds)
def __init__(self, processes=None, initializer=None, initargs=()):
Pool.__init__(self, processes, initializer, initargs)
diff --git a/Lib/multiprocessing/popen.py b/Lib/multiprocessing/popen.py
new file mode 100644
index 0000000..b0c80d5
--- /dev/null
+++ b/Lib/multiprocessing/popen.py
@@ -0,0 +1,78 @@
+import sys
+import threading
+
+__all__ = ['Popen', 'get_spawning_popen', 'set_spawning_popen',
+ 'assert_spawning']
+
+#
+# Check that the current thread is spawning a child process
+#
+
+_tls = threading.local()
+
+def get_spawning_popen():
+ return getattr(_tls, 'spawning_popen', None)
+
+def set_spawning_popen(popen):
+ _tls.spawning_popen = popen
+
+def assert_spawning(obj):
+ if get_spawning_popen() is None:
+ raise RuntimeError(
+ '%s objects should only be shared between processes'
+ ' through inheritance' % type(obj).__name__
+ )
+
+#
+#
+#
+
+_Popen = None
+
+def Popen(process_obj):
+ if _Popen is None:
+ set_start_method()
+ return _Popen(process_obj)
+
+def get_start_method():
+ if _Popen is None:
+ set_start_method()
+ return _Popen.method
+
+def set_start_method(meth=None, *, start_helpers=True):
+ global _Popen
+ try:
+ modname = _method_to_module[meth]
+ __import__(modname)
+ except (KeyError, ImportError):
+ raise ValueError('could not use start method %r' % meth)
+ module = sys.modules[modname]
+ if start_helpers:
+ module.Popen.ensure_helpers_running()
+ _Popen = module.Popen
+
+
+if sys.platform == 'win32':
+
+ _method_to_module = {
+ None: 'multiprocessing.popen_spawn_win32',
+ 'spawn': 'multiprocessing.popen_spawn_win32',
+ }
+
+ def get_all_start_methods():
+ return ['spawn']
+
+else:
+ _method_to_module = {
+ None: 'multiprocessing.popen_fork',
+ 'fork': 'multiprocessing.popen_fork',
+ 'spawn': 'multiprocessing.popen_spawn_posix',
+ 'forkserver': 'multiprocessing.popen_forkserver',
+ }
+
+ def get_all_start_methods():
+ from . import reduction
+ if reduction.HAVE_SEND_HANDLE:
+ return ['fork', 'spawn', 'forkserver']
+ else:
+ return ['fork', 'spawn']
diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py
new file mode 100644
index 0000000..fd25ddc
--- /dev/null
+++ b/Lib/multiprocessing/popen_fork.py
@@ -0,0 +1,87 @@
+import os
+import sys
+import signal
+import errno
+
+from . import util
+
+__all__ = ['Popen']
+
+#
+# Start child process using fork
+#
+
+class Popen(object):
+ method = 'fork'
+
+ def __init__(self, process_obj):
+ sys.stdout.flush()
+ sys.stderr.flush()
+ self.returncode = None
+ self._launch(process_obj)
+
+ def duplicate_for_child(self, fd):
+ return fd
+
+ def poll(self, flag=os.WNOHANG):
+ if self.returncode is None:
+ while True:
+ try:
+ pid, sts = os.waitpid(self.pid, flag)
+ except OSError as e:
+ if e.errno == errno.EINTR:
+ continue
+ # Child process not yet created. See #1731717
+ # e.errno == errno.ECHILD == 10
+ return None
+ else:
+ break
+ if pid == self.pid:
+ if os.WIFSIGNALED(sts):
+ self.returncode = -os.WTERMSIG(sts)
+ else:
+ assert os.WIFEXITED(sts)
+ self.returncode = os.WEXITSTATUS(sts)
+ return self.returncode
+
+ def wait(self, timeout=None):
+ if self.returncode is None:
+ if timeout is not None:
+ from .connection import wait
+ if not wait([self.sentinel], timeout):
+ return None
+ # This shouldn't block if wait() returned successfully.
+ return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+ return self.returncode
+
+ def terminate(self):
+ if self.returncode is None:
+ try:
+ os.kill(self.pid, signal.SIGTERM)
+ except ProcessLookupError:
+ pass
+ except OSError:
+ if self.wait(timeout=0.1) is None:
+ raise
+
+ def _launch(self, process_obj):
+ code = 1
+ parent_r, child_w = util.pipe()
+ self.pid = os.fork()
+ if self.pid == 0:
+ try:
+ os.close(parent_r)
+ if 'random' in sys.modules:
+ import random
+ random.seed()
+ code = process_obj._bootstrap()
+ finally:
+ os._exit(code)
+ else:
+ os.close(child_w)
+ util.Finalize(self, os.close, (parent_r,))
+ self.sentinel = parent_r
+
+ @staticmethod
+ def ensure_helpers_running():
+ pass
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
new file mode 100644
index 0000000..f1c4b57
--- /dev/null
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -0,0 +1,75 @@
+import io
+import os
+
+from . import reduction
+if not reduction.HAVE_SEND_HANDLE:
+ raise ImportError('No support for sending fds between processes')
+from . import forkserver
+from . import popen
+from . import popen_fork
+from . import spawn
+from . import util
+
+
+__all__ = ['Popen']
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+ def __init__(self, ind):
+ self.ind = ind
+ def detach(self):
+ return forkserver.get_inherited_fds()[self.ind]
+
+#
+# Start child process using a server process
+#
+
+class Popen(popen_fork.Popen):
+ method = 'forkserver'
+ DupFd = _DupFd
+
+ def __init__(self, process_obj):
+ self._fds = []
+ super().__init__(process_obj)
+
+ def duplicate_for_child(self, fd):
+ self._fds.append(fd)
+ return len(self._fds) - 1
+
+ def _launch(self, process_obj):
+ prep_data = spawn.get_preparation_data(process_obj._name)
+ buf = io.BytesIO()
+ popen.set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, buf)
+ reduction.dump(process_obj, buf)
+ finally:
+ popen.set_spawning_popen(None)
+
+ self.sentinel, w = forkserver.connect_to_new_process(self._fds)
+ util.Finalize(self, os.close, (self.sentinel,))
+ with open(w, 'wb', closefd=True) as f:
+ f.write(buf.getbuffer())
+ self.pid = forkserver.read_unsigned(self.sentinel)
+
+ def poll(self, flag=os.WNOHANG):
+ if self.returncode is None:
+ from .connection import wait
+ timeout = 0 if flag == os.WNOHANG else None
+ if not wait([self.sentinel], timeout):
+ return None
+ try:
+ self.returncode = forkserver.read_unsigned(self.sentinel)
+ except (OSError, EOFError):
+ # The process ended abnormally perhaps because of a signal
+ self.returncode = 255
+ return self.returncode
+
+ @staticmethod
+ def ensure_helpers_running():
+ from . import semaphore_tracker
+ semaphore_tracker.ensure_running()
+ forkserver.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
new file mode 100644
index 0000000..de262aa
--- /dev/null
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -0,0 +1,75 @@
+import fcntl
+import io
+import os
+
+from . import popen
+from . import popen_fork
+from . import reduction
+from . import spawn
+from . import util
+
+from . import current_process
+
+__all__ = ['Popen']
+
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+ def __init__(self, fd):
+ self.fd = fd
+ def detach(self):
+ return self.fd
+
+#
+# Start child process using a fresh interpreter
+#
+
+class Popen(popen_fork.Popen):
+ method = 'spawn'
+ DupFd = _DupFd
+
+ def __init__(self, process_obj):
+ self._fds = []
+ super().__init__(process_obj)
+
+ def duplicate_for_child(self, fd):
+ self._fds.append(fd)
+ return fd
+
+ def _launch(self, process_obj):
+ tracker_fd = current_process()._config['semaphore_tracker_fd']
+ self._fds.append(tracker_fd)
+ prep_data = spawn.get_preparation_data(process_obj._name)
+ fp = io.BytesIO()
+ popen.set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, fp)
+ reduction.dump(process_obj, fp)
+ finally:
+ popen.set_spawning_popen(None)
+
+ parent_r = child_w = child_r = parent_w = None
+ try:
+ parent_r, child_w = util.pipe()
+ child_r, parent_w = util.pipe()
+ cmd = spawn.get_command_line() + [str(child_r)]
+ self._fds.extend([child_r, child_w])
+ self.pid = util.spawnv_passfds(spawn.get_executable(),
+ cmd, self._fds)
+ self.sentinel = parent_r
+ with open(parent_w, 'wb', closefd=False) as f:
+ f.write(fp.getbuffer())
+ finally:
+ if parent_r is not None:
+ util.Finalize(self, os.close, (parent_r,))
+ for fd in (child_r, child_w, parent_w):
+ if fd is not None:
+ os.close(fd)
+
+ @staticmethod
+ def ensure_helpers_running():
+ from . import semaphore_tracker
+ semaphore_tracker.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
new file mode 100644
index 0000000..7e0c4b3
--- /dev/null
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -0,0 +1,102 @@
+import os
+import msvcrt
+import signal
+import sys
+import _winapi
+
+from . import spawn
+from . import popen
+from . import reduction
+from . import util
+
+__all__ = ['Popen']
+
+#
+#
+#
+
+TERMINATE = 0x10000
+WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+#
+# We define a Popen class similar to the one from subprocess, but
+# whose constructor takes a process object as its argument.
+#
+
+class Popen(object):
+ '''
+ Start a subprocess to run the code of a process object
+ '''
+ method = 'spawn'
+
+ def __init__(self, process_obj):
+ prep_data = spawn.get_preparation_data(process_obj._name)
+ cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())
+
+ # read end of pipe will be "stolen" by the child process
+ # -- see spawn_main() in spawn.py.
+ rhandle, whandle = _winapi.CreatePipe(None, 0)
+ wfd = msvcrt.open_osfhandle(whandle, 0)
+ cmd += ' {} {}'.format(os.getpid(), rhandle)
+
+ with open(wfd, 'wb', closefd=True) as to_child:
+ # start process
+ try:
+ hp, ht, pid, tid = _winapi.CreateProcess(
+ spawn.get_executable(), cmd,
+ None, None, False, 0, None, None, None)
+ _winapi.CloseHandle(ht)
+ except:
+ _winapi.CloseHandle(rhandle)
+ raise
+
+ # set attributes of self
+ self.pid = pid
+ self.returncode = None
+ self._handle = hp
+ self.sentinel = int(hp)
+ util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
+
+ # send information to child
+ popen.set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, to_child)
+ reduction.dump(process_obj, to_child)
+ finally:
+ popen.set_spawning_popen(None)
+
+ def duplicate_for_child(self, handle):
+ assert self is popen.get_spawning_popen()
+ return reduction.duplicate(handle, self.sentinel)
+
+ def wait(self, timeout=None):
+ if self.returncode is None:
+ if timeout is None:
+ msecs = _winapi.INFINITE
+ else:
+ msecs = max(0, int(timeout * 1000 + 0.5))
+
+ res = _winapi.WaitForSingleObject(int(self._handle), msecs)
+ if res == _winapi.WAIT_OBJECT_0:
+ code = _winapi.GetExitCodeProcess(self._handle)
+ if code == TERMINATE:
+ code = -signal.SIGTERM
+ self.returncode = code
+
+ return self.returncode
+
+ def poll(self):
+ return self.wait(timeout=0)
+
+ def terminate(self):
+ if self.returncode is None:
+ try:
+ _winapi.TerminateProcess(int(self._handle), TERMINATE)
+ except OSError:
+ if self.wait(timeout=1.0) is None:
+ raise
+
+ @staticmethod
+ def ensure_helpers_running():
+ pass
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 893507b..c1cb36f 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -43,7 +43,7 @@ def active_children():
Return list of process objects corresponding to live child processes
'''
_cleanup()
- return list(_current_process._children)
+ return list(_children)
#
#
@@ -51,9 +51,9 @@ def active_children():
def _cleanup():
# check for processes which have finished
- for p in list(_current_process._children):
+ for p in list(_children):
if p._popen.poll() is not None:
- _current_process._children.discard(p)
+ _children.discard(p)
#
# The `Process` class
@@ -63,21 +63,16 @@ class Process(object):
'''
Process objects represent activity that is run in a separate process
- The class is analagous to `threading.Thread`
+ The class is analogous to `threading.Thread`
'''
_Popen = None
def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
*, daemon=None):
assert group is None, 'group argument must be None for now'
- count = next(_current_process._counter)
+ count = next(_process_counter)
self._identity = _current_process._identity + (count,)
- self._authkey = _current_process._authkey
- if daemon is not None:
- self._daemonic = daemon
- else:
- self._daemonic = _current_process._daemonic
- self._tempdir = _current_process._tempdir
+ self._config = _current_process._config.copy()
self._parent_pid = os.getpid()
self._popen = None
self._target = target
@@ -85,6 +80,8 @@ class Process(object):
self._kwargs = dict(kwargs)
self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity)
+ if daemon is not None:
+ self.daemon = daemon
_dangling.add(self)
def run(self):
@@ -101,16 +98,16 @@ class Process(object):
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
- assert not _current_process._daemonic, \
+ assert not _current_process._config.get('daemon'), \
'daemonic processes are not allowed to have children'
_cleanup()
if self._Popen is not None:
Popen = self._Popen
else:
- from .forking import Popen
+ from .popen import Popen
self._popen = Popen(self)
self._sentinel = self._popen.sentinel
- _current_process._children.add(self)
+ _children.add(self)
def terminate(self):
'''
@@ -126,7 +123,7 @@ class Process(object):
assert self._popen is not None, 'can only join a started process'
res = self._popen.wait(timeout)
if res is not None:
- _current_process._children.discard(self)
+ _children.discard(self)
def is_alive(self):
'''
@@ -154,7 +151,7 @@ class Process(object):
'''
Return whether process is a daemon
'''
- return self._daemonic
+ return self._config.get('daemon', False)
@daemon.setter
def daemon(self, daemonic):
@@ -162,18 +159,18 @@ class Process(object):
Set whether process is a daemon
'''
assert self._popen is None, 'process has already started'
- self._daemonic = daemonic
+ self._config['daemon'] = daemonic
@property
def authkey(self):
- return self._authkey
+ return self._config['authkey']
@authkey.setter
def authkey(self, authkey):
'''
Set authorization key of process
'''
- self._authkey = AuthenticationString(authkey)
+ self._config['authkey'] = AuthenticationString(authkey)
@property
def exitcode(self):
@@ -227,17 +224,17 @@ class Process(object):
status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
- status, self._daemonic and ' daemon' or '')
+ status, self.daemon and ' daemon' or '')
##
def _bootstrap(self):
from . import util
- global _current_process
+ global _current_process, _process_counter, _children
try:
- self._children = set()
- self._counter = itertools.count(1)
+ _process_counter = itertools.count(1)
+ _children = set()
if sys.stdin is not None:
try:
sys.stdin.close()
@@ -285,8 +282,8 @@ class Process(object):
class AuthenticationString(bytes):
def __reduce__(self):
- from .forking import Popen
- if not Popen.thread_is_spawning():
+ from .popen import get_spawning_popen
+ if get_spawning_popen() is None:
raise TypeError(
'Pickling an AuthenticationString object is '
'disallowed for security reasons'
@@ -301,16 +298,19 @@ class _MainProcess(Process):
def __init__(self):
self._identity = ()
- self._daemonic = False
self._name = 'MainProcess'
self._parent_pid = None
self._popen = None
- self._counter = itertools.count(1)
- self._children = set()
- self._authkey = AuthenticationString(os.urandom(32))
- self._tempdir = None
+ self._config = {'authkey': AuthenticationString(os.urandom(32)),
+ 'semprefix': 'mp'}
+ # Note that some versions of FreeBSD only allow named
+ # semaphores to have names of up to 14 characters. Therfore
+ # we choose a short prefix.
+
_current_process = _MainProcess()
+_process_counter = itertools.count(1)
+_children = set()
del _MainProcess
#
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index ec188ee..10e40a5 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -18,11 +18,15 @@ import weakref
import errno
from queue import Empty, Full
+
import _multiprocessing
-from multiprocessing.connection import Pipe
-from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
-from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning, ForkingPickler
+
+from . import connection
+from . import popen
+from . import synchronize
+
+from .util import debug, info, Finalize, register_after_fork, is_exiting
+from .reduction import ForkingPickler
#
# Queue type using a pipe, buffer and thread
@@ -34,14 +38,14 @@ class Queue(object):
if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize
- self._reader, self._writer = Pipe(duplex=False)
- self._rlock = Lock()
+ self._reader, self._writer = connection.Pipe(duplex=False)
+ self._rlock = synchronize.Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
- self._wlock = Lock()
- self._sem = BoundedSemaphore(maxsize)
+ self._wlock = synchronize.Lock()
+ self._sem = synchronize.BoundedSemaphore(maxsize)
# For use by concurrent.futures
self._ignore_epipe = False
@@ -51,7 +55,7 @@ class Queue(object):
register_after_fork(self, Queue._after_fork)
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
@@ -208,8 +212,6 @@ class Queue(object):
@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
- from .util import is_exiting
-
nacquire = notempty.acquire
nrelease = notempty.release
nwait = notempty.wait
@@ -279,8 +281,8 @@ class JoinableQueue(Queue):
def __init__(self, maxsize=0):
Queue.__init__(self, maxsize)
- self._unfinished_tasks = Semaphore(0)
- self._cond = Condition()
+ self._unfinished_tasks = synchronize.Semaphore(0)
+ self._cond = synchronize.Condition()
def __getstate__(self):
return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
@@ -331,19 +333,19 @@ class JoinableQueue(Queue):
class SimpleQueue(object):
def __init__(self):
- self._reader, self._writer = Pipe(duplex=False)
- self._rlock = Lock()
+ self._reader, self._writer = connection.Pipe(duplex=False)
+ self._rlock = synchronize.Lock()
self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
- self._wlock = Lock()
+ self._wlock = synchronize.Lock()
def empty(self):
return not self._poll()
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
return (self._reader, self._writer, self._rlock, self._wlock)
def __setstate__(self, state):
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 656fa8f..5bbbcf4 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -1,6 +1,5 @@
#
-# Module to allow connection and socket objects to be transferred
-# between processes
+# Module which deals with pickling of objects.
#
# multiprocessing/reduction.py
#
@@ -8,27 +7,57 @@
# Licensed to PSF under a Contributor Agreement.
#
-__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
-
+import copyreg
+import functools
+import io
import os
-import sys
+import pickle
import socket
-import threading
-import struct
-import signal
+import sys
-from multiprocessing import current_process
-from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.util import is_exiting, sub_warning
+from . import popen
+from . import util
+__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
+
+
+HAVE_SEND_HANDLE = (sys.platform == 'win32' or
+ (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS') and
+ hasattr(socket.socket, 'sendmsg')))
#
+# Pickler subclass
#
-#
-if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
- hasattr(socket, 'SCM_RIGHTS'))):
- raise ImportError('pickling of connections not supported')
+class ForkingPickler(pickle.Pickler):
+ '''Pickler subclass used by multiprocessing.'''
+ _extra_reducers = {}
+ _copyreg_dispatch_table = copyreg.dispatch_table
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.dispatch_table = self._copyreg_dispatch_table.copy()
+ self.dispatch_table.update(self._extra_reducers)
+
+ @classmethod
+ def register(cls, type, reduce):
+ '''Register a reduce function for a type.'''
+ cls._extra_reducers[type] = reduce
+
+ @classmethod
+ def dumps(cls, obj, protocol=None):
+ buf = io.BytesIO()
+ cls(buf, protocol).dump(obj)
+ return buf.getbuffer()
+
+ loads = pickle.loads
+
+register = ForkingPickler.register
+
+def dump(obj, file, protocol=None):
+ '''Replacement for pickle.dump() using ForkingPickler.'''
+ ForkingPickler(file, protocol).dump(obj)
#
# Platform specific definitions
@@ -36,20 +65,44 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
if sys.platform == 'win32':
# Windows
- __all__ += ['reduce_pipe_connection']
+ __all__ += ['DupHandle', 'duplicate', 'steal_handle']
import _winapi
+ def duplicate(handle, target_process=None, inheritable=False):
+ '''Duplicate a handle. (target_process is a handle not a pid!)'''
+ if target_process is None:
+ target_process = _winapi.GetCurrentProcess()
+ return _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(), handle, target_process,
+ 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
+
+ def steal_handle(source_pid, handle):
+ '''Steal a handle from process identified by source_pid.'''
+ source_process_handle = _winapi.OpenProcess(
+ _winapi.PROCESS_DUP_HANDLE, False, source_pid)
+ try:
+ return _winapi.DuplicateHandle(
+ source_process_handle, handle,
+ _winapi.GetCurrentProcess(), 0, False,
+ _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(source_process_handle)
+
def send_handle(conn, handle, destination_pid):
+ '''Send a handle over a local connection.'''
dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
conn.send(dh)
def recv_handle(conn):
+ '''Receive a handle over a local connection.'''
return conn.recv().detach()
class DupHandle(object):
+ '''Picklable wrapper for a handle.'''
def __init__(self, handle, access, pid=None):
- # duplicate handle for process with given pid
if pid is None:
+ # We just duplicate the handle in the current process and
+ # let the receiving process steal the handle.
pid = os.getpid()
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
@@ -62,9 +115,12 @@ if sys.platform == 'win32':
self._pid = pid
def detach(self):
+ '''Get the handle. This should only be called once.'''
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
+ # The handle has already been duplicated for this process.
return self._handle
+ # We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
@@ -74,207 +130,112 @@ if sys.platform == 'win32':
finally:
_winapi.CloseHandle(proc)
- class DupSocket(object):
- def __init__(self, sock):
- new_sock = sock.dup()
- def send(conn, pid):
- share = new_sock.share(pid)
- conn.send_bytes(share)
- self._id = resource_sharer.register(send, new_sock.close)
-
- def detach(self):
- conn = resource_sharer.get_connection(self._id)
- try:
- share = conn.recv_bytes()
- return socket.fromshare(share)
- finally:
- conn.close()
-
- def reduce_socket(s):
- return rebuild_socket, (DupSocket(s),)
-
- def rebuild_socket(ds):
- return ds.detach()
-
- def reduce_connection(conn):
- handle = conn.fileno()
- with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
- ds = DupSocket(s)
- return rebuild_connection, (ds, conn.readable, conn.writable)
-
- def rebuild_connection(ds, readable, writable):
- from .connection import Connection
- sock = ds.detach()
- return Connection(sock.detach(), readable, writable)
-
- def reduce_pipe_connection(conn):
- access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
- (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
- dh = DupHandle(conn.fileno(), access)
- return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
-
- def rebuild_pipe_connection(dh, readable, writable):
- from .connection import PipeConnection
- handle = dh.detach()
- return PipeConnection(handle, readable, writable)
-
else:
# Unix
+ __all__ += ['DupFd', 'sendfds', 'recvfds']
+ import array
# On MacOSX we should acknowledge receipt of fds -- see Issue14669
ACKNOWLEDGE = sys.platform == 'darwin'
+ def sendfds(sock, fds):
+ '''Send an array of fds over an AF_UNIX socket.'''
+ fds = array.array('i', fds)
+ msg = bytes([len(fds) % 256])
+ sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
+ if ACKNOWLEDGE and sock.recv(1) != b'A':
+ raise RuntimeError('did not receive acknowledgement of fd')
+
+ def recvfds(sock, size):
+ '''Receive an array of fds over an AF_UNIX socket.'''
+ a = array.array('i')
+ bytes_size = a.itemsize * size
+ msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
+ if not msg and not ancdata:
+ raise EOFError
+ try:
+ if ACKNOWLEDGE:
+ sock.send(b'A')
+ if len(ancdata) != 1:
+ raise RuntimeError('received %d items of ancdata' %
+ len(ancdata))
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ if len(cmsg_data) % a.itemsize != 0:
+ raise ValueError
+ a.frombytes(cmsg_data)
+ assert len(a) % 256 == msg[0]
+ return list(a)
+ except (ValueError, IndexError):
+ pass
+ raise RuntimeError('Invalid data received')
+
def send_handle(conn, handle, destination_pid):
+ '''Send a handle over a local connection.'''
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
- s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
- struct.pack("@i", handle))])
- if ACKNOWLEDGE and conn.recv_bytes() != b'ACK':
- raise RuntimeError('did not receive acknowledgement of fd')
+ sendfds(s, [handle])
def recv_handle(conn):
- size = struct.calcsize("@i")
+ '''Receive a handle over a local connection.'''
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
- msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
- try:
- if ACKNOWLEDGE:
- conn.send_bytes(b'ACK')
- cmsg_level, cmsg_type, cmsg_data = ancdata[0]
- if (cmsg_level == socket.SOL_SOCKET and
- cmsg_type == socket.SCM_RIGHTS):
- return struct.unpack("@i", cmsg_data[:size])[0]
- except (ValueError, IndexError, struct.error):
- pass
- raise RuntimeError('Invalid data received')
-
- class DupFd(object):
- def __init__(self, fd):
- new_fd = os.dup(fd)
- def send(conn, pid):
- send_handle(conn, new_fd, pid)
- def close():
- os.close(new_fd)
- self._id = resource_sharer.register(send, close)
+ return recvfds(s, 1)[0]
+
+ def DupFd(fd):
+ '''Return a wrapper for an fd.'''
+ popen_obj = popen.get_spawning_popen()
+ if popen_obj is not None:
+ return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
+ elif HAVE_SEND_HANDLE:
+ from . import resource_sharer
+ return resource_sharer.DupFd(fd)
+ else:
+ raise ValueError('SCM_RIGHTS appears not to be available')
- def detach(self):
- conn = resource_sharer.get_connection(self._id)
- try:
- return recv_handle(conn)
- finally:
- conn.close()
+#
+# Try making some callable types picklable
+#
- def reduce_socket(s):
- df = DupFd(s.fileno())
- return rebuild_socket, (df, s.family, s.type, s.proto)
+def _reduce_method(m):
+ if m.__self__ is None:
+ return getattr, (m.__class__, m.__func__.__name__)
+ else:
+ return getattr, (m.__self__, m.__func__.__name__)
+class _C:
+ def f(self):
+ pass
+register(type(_C().f), _reduce_method)
- def rebuild_socket(df, family, type, proto):
- fd = df.detach()
- s = socket.fromfd(fd, family, type, proto)
- os.close(fd)
- return s
- def reduce_connection(conn):
- df = DupFd(conn.fileno())
- return rebuild_connection, (df, conn.readable, conn.writable)
+def _reduce_method_descriptor(m):
+ return getattr, (m.__objclass__, m.__name__)
+register(type(list.append), _reduce_method_descriptor)
+register(type(int.__add__), _reduce_method_descriptor)
- def rebuild_connection(df, readable, writable):
- from .connection import Connection
- fd = df.detach()
- return Connection(fd, readable, writable)
+
+def _reduce_partial(p):
+ return _rebuild_partial, (p.func, p.args, p.keywords or {})
+def _rebuild_partial(func, args, keywords):
+ return functools.partial(func, *args, **keywords)
+register(functools.partial, _reduce_partial)
#
-# Server which shares registered resources with clients
+# Make sockets picklable
#
-class ResourceSharer(object):
- def __init__(self):
- self._key = 0
- self._cache = {}
- self._old_locks = []
- self._lock = threading.Lock()
- self._listener = None
- self._address = None
- self._thread = None
- register_after_fork(self, ResourceSharer._afterfork)
-
- def register(self, send, close):
- with self._lock:
- if self._address is None:
- self._start()
- self._key += 1
- self._cache[self._key] = (send, close)
- return (self._address, self._key)
-
- @staticmethod
- def get_connection(ident):
- from .connection import Client
- address, key = ident
- c = Client(address, authkey=current_process().authkey)
- c.send((key, os.getpid()))
- return c
-
- def stop(self, timeout=None):
- from .connection import Client
- with self._lock:
- if self._address is not None:
- c = Client(self._address, authkey=current_process().authkey)
- c.send(None)
- c.close()
- self._thread.join(timeout)
- if self._thread.is_alive():
- sub_warn('ResourceSharer thread did not stop when asked')
- self._listener.close()
- self._thread = None
- self._address = None
- self._listener = None
- for key, (send, close) in self._cache.items():
- close()
- self._cache.clear()
-
- def _afterfork(self):
- for key, (send, close) in self._cache.items():
- close()
- self._cache.clear()
- # If self._lock was locked at the time of the fork, it may be broken
- # -- see issue 6721. Replace it without letting it be gc'ed.
- self._old_locks.append(self._lock)
- self._lock = threading.Lock()
- if self._listener is not None:
- self._listener.close()
- self._listener = None
- self._address = None
- self._thread = None
-
- def _start(self):
- from .connection import Listener
- assert self._listener is None
- debug('starting listener and thread for sending handles')
- self._listener = Listener(authkey=current_process().authkey)
- self._address = self._listener.address
- t = threading.Thread(target=self._serve)
- t.daemon = True
- t.start()
- self._thread = t
-
- def _serve(self):
- if hasattr(signal, 'pthread_sigmask'):
- signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
- while 1:
- try:
- conn = self._listener.accept()
- msg = conn.recv()
- if msg is None:
- break
- key, destination_pid = msg
- send, close = self._cache.pop(key)
- send(conn, destination_pid)
- close()
- conn.close()
- except:
- if not is_exiting():
- import traceback
- sub_warning(
- 'thread for sharing handles raised exception :\n' +
- '-'*79 + '\n' + traceback.format_exc() + '-'*79
- )
-
-resource_sharer = ResourceSharer()
+if sys.platform == 'win32':
+ def _reduce_socket(s):
+ from .resource_sharer import DupSocket
+ return _rebuild_socket, (DupSocket(s),)
+ def _rebuild_socket(ds):
+ return ds.detach()
+ register(socket.socket, _reduce_socket)
+
+else:
+ def _reduce_socket(s):
+ df = DupFd(s.fileno())
+ return _rebuild_socket, (df, s.family, s.type, s.proto)
+ def _rebuild_socket(df, family, type, proto):
+ fd = df.detach()
+ return socket.socket(family, type, proto, fileno=fd)
+ register(socket.socket, _reduce_socket)
diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py
new file mode 100644
index 0000000..5e46fc6
--- /dev/null
+++ b/Lib/multiprocessing/resource_sharer.py
@@ -0,0 +1,158 @@
+#
+# We use a background thread for sharing fds on Unix, and for sharing sockets on
+# Windows.
+#
+# A client which wants to pickle a resource registers it with the resource
+# sharer and gets an identifier in return. The unpickling process will connect
+# to the resource sharer, sends the identifier and its pid, and then receives
+# the resource.
+#
+
+import os
+import signal
+import socket
+import sys
+import threading
+
+from . import process
+from . import reduction
+from . import util
+
+__all__ = ['stop']
+
+
+if sys.platform == 'win32':
+ __all__ += ['DupSocket']
+
+ class DupSocket(object):
+ '''Picklable wrapper for a socket.'''
+ def __init__(self, sock):
+ new_sock = sock.dup()
+ def send(conn, pid):
+ share = new_sock.share(pid)
+ conn.send_bytes(share)
+ self._id = _resource_sharer.register(send, new_sock.close)
+
+ def detach(self):
+ '''Get the socket. This should only be called once.'''
+ with _resource_sharer.get_connection(self._id) as conn:
+ share = conn.recv_bytes()
+ return socket.fromshare(share)
+
+else:
+ __all__ += ['DupFd']
+
+ class DupFd(object):
+ '''Wrapper for fd which can be used at any time.'''
+ def __init__(self, fd):
+ new_fd = os.dup(fd)
+ def send(conn, pid):
+ reduction.send_handle(conn, new_fd, pid)
+ def close():
+ os.close(new_fd)
+ self._id = _resource_sharer.register(send, close)
+
+ def detach(self):
+ '''Get the fd. This should only be called once.'''
+ with _resource_sharer.get_connection(self._id) as conn:
+ return reduction.recv_handle(conn)
+
+
+class _ResourceSharer(object):
+ '''Manager for resouces using background thread.'''
+ def __init__(self):
+ self._key = 0
+ self._cache = {}
+ self._old_locks = []
+ self._lock = threading.Lock()
+ self._listener = None
+ self._address = None
+ self._thread = None
+ util.register_after_fork(self, _ResourceSharer._afterfork)
+
+ def register(self, send, close):
+ '''Register resource, returning an identifier.'''
+ with self._lock:
+ if self._address is None:
+ self._start()
+ self._key += 1
+ self._cache[self._key] = (send, close)
+ return (self._address, self._key)
+
+ @staticmethod
+ def get_connection(ident):
+ '''Return connection from which to receive identified resource.'''
+ from .connection import Client
+ address, key = ident
+ c = Client(address, authkey=process.current_process().authkey)
+ c.send((key, os.getpid()))
+ return c
+
+ def stop(self, timeout=None):
+ '''Stop the background thread and clear registered resources.'''
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address,
+ authkey=process.current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ util.sub_warning('_ResourceSharer thread did '
+ 'not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
+ def _afterfork(self):
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+ # If self._lock was locked at the time of the fork, it may be broken
+ # -- see issue 6721. Replace it without letting it be gc'ed.
+ self._old_locks.append(self._lock)
+ self._lock = threading.Lock()
+ if self._listener is not None:
+ self._listener.close()
+ self._listener = None
+ self._address = None
+ self._thread = None
+
+ def _start(self):
+ from .connection import Listener
+ assert self._listener is None
+ util.debug('starting listener and thread for sending handles')
+ self._listener = Listener(authkey=process.current_process().authkey)
+ self._address = self._listener.address
+ t = threading.Thread(target=self._serve)
+ t.daemon = True
+ t.start()
+ self._thread = t
+
+ def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
+ while 1:
+ try:
+ with self._listener.accept() as conn:
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
+ send, close = self._cache.pop(key)
+ try:
+ send(conn, destination_pid)
+ finally:
+ close()
+ except:
+ if not util.is_exiting():
+ sys.excepthook(*sys.exc_info())
+
+
+_resource_sharer = _ResourceSharer()
+stop = _resource_sharer.stop
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
new file mode 100644
index 0000000..4a2d636
--- /dev/null
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -0,0 +1,135 @@
+#
+# On Unix we run a server process which keeps track of unlinked
+# semaphores. The server ignores SIGINT and SIGTERM and reads from a
+# pipe. Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining semaphore names.
+#
+# This is important because the system only supports a limited number
+# of named semaphores, and they will not be automatically removed till
+# the next reboot. Without this semaphore tracker process, "killall
+# python" would probably leave unlinked semaphores.
+#
+
+import errno
+import os
+import signal
+import sys
+import threading
+import warnings
+import _multiprocessing
+
+from . import spawn
+from . import util
+from . import current_process
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+
+_lock = threading.Lock()
+
+
+def ensure_running():
+ '''Make sure that semaphore tracker process is running.
+
+ This can be run from any process. Usually a child process will use
+ the semaphore created by its parent.'''
+ with _lock:
+ config = current_process()._config
+ if config.get('semaphore_tracker_fd') is not None:
+ return
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
+ r, semaphore_tracker_fd = util.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd % r]
+ util.spawnv_passfds(exe, args, fds_to_pass)
+ except:
+ os.close(semaphore_tracker_fd)
+ raise
+ else:
+ config['semaphore_tracker_fd'] = semaphore_tracker_fd
+ finally:
+ os.close(r)
+
+
+def register(name):
+ '''Register name of semaphore with semaphore tracker.'''
+ _send('REGISTER', name)
+
+
+def unregister(name):
+ '''Unregister name of semaphore with semaphore tracker.'''
+ _send('UNREGISTER', name)
+
+
+def _send(cmd, name):
+ msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+ if len(name) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('name too long')
+ fd = current_process()._config['semaphore_tracker_fd']
+ nbytes = os.write(fd, msg)
+ assert nbytes == len(msg)
+
+
+def main(fd):
+ '''Run semaphore tracker.'''
+ # protect the process from ^C and "killall python" etc
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+ for f in (sys.stdin, sys.stdout):
+ try:
+ f.close()
+ except Exception:
+ pass
+
+ cache = set()
+ try:
+ # keep track of registered/unregistered semaphores
+ with open(fd, 'rb') as f:
+ for line in f:
+ try:
+ cmd, name = line.strip().split(b':')
+ if cmd == b'REGISTER':
+ cache.add(name)
+ elif cmd == b'UNREGISTER':
+ cache.remove(name)
+ else:
+ raise RuntimeError('unrecognized command %r' % cmd)
+ except Exception:
+ try:
+ sys.excepthook(*sys.exc_info())
+ except:
+ pass
+ finally:
+ # all processes have terminated; cleanup any remaining semaphores
+ if cache:
+ try:
+ warnings.warn('semaphore_tracker: There appear to be %d '
+ 'leaked semaphores to clean up at shutdown' %
+ len(cache))
+ except Exception:
+ pass
+ for name in cache:
+ # For some reason the process which created and registered this
+ # semaphore has failed to unregister it. Presumably it has died.
+ # We therefore unlink it.
+ try:
+ name = name.decode('ascii')
+ try:
+ _multiprocessing.sem_unlink(name)
+ except Exception as e:
+ warnings.warn('semaphore_tracker: %r: %s' % (name, e))
+ finally:
+ pass
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index a358ed4..a97dadf 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -10,8 +10,11 @@
import ctypes
import weakref
-from multiprocessing import heap, RLock
-from multiprocessing.forking import assert_spawning, ForkingPickler
+from . import heap
+
+from .synchronize import RLock
+from .reduction import ForkingPickler
+from .popen import assert_spawning
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
new file mode 100644
index 0000000..83561db
--- /dev/null
+++ b/Lib/multiprocessing/spawn.py
@@ -0,0 +1,258 @@
+#
+# Code used to start processes when using the spawn or forkserver
+# start methods.
+#
+# multiprocessing/spawn.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import os
+import pickle
+import sys
+
+from . import process
+from . import util
+from . import popen
+
+__all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
+ 'get_preparation_data', 'get_command_line', 'import_main_path']
+
+#
+# _python_exe is the assumed path to the python executable.
+# People embedding Python want to modify it.
+#
+
+if sys.platform != 'win32':
+ WINEXE = False
+ WINSERVICE = False
+else:
+ WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+ WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+if WINSERVICE:
+ _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
+else:
+ _python_exe = sys.executable
+
+def set_executable(exe):
+ global _python_exe
+ _python_exe = exe
+
+def get_executable():
+ return _python_exe
+
+#
+#
+#
+
+def is_forking(argv):
+ '''
+ Return whether commandline indicates we are forking
+ '''
+ if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
+ return True
+ else:
+ return False
+
+
+def freeze_support():
+ '''
+ Run code for process object if this in not the main process
+ '''
+ if is_forking(sys.argv):
+ main()
+ sys.exit()
+
+
+def get_command_line():
+ '''
+ Returns prefix of command line used for spawning a child process
+ '''
+ if getattr(sys, 'frozen', False):
+ return [sys.executable, '--multiprocessing-fork']
+ else:
+ prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'
+ opts = util._args_from_interpreter_flags()
+ return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
+
+
+def spawn_main():
+ '''
+ Run code specifed by data received over pipe
+ '''
+ assert is_forking(sys.argv)
+ handle = int(sys.argv[-1])
+ if sys.platform == 'win32':
+ import msvcrt
+ from .reduction import steal_handle
+ pid = int(sys.argv[-2])
+ new_handle = steal_handle(pid, handle)
+ fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
+ else:
+ fd = handle
+ exitcode = _main(fd)
+ sys.exit(exitcode)
+
+
+def _main(fd):
+ with os.fdopen(fd, 'rb', closefd=True) as from_parent:
+ process.current_process()._inheriting = True
+ try:
+ preparation_data = pickle.load(from_parent)
+ prepare(preparation_data)
+ self = pickle.load(from_parent)
+ finally:
+ del process.current_process()._inheriting
+ return self._bootstrap()
+
+
+def _check_not_importing_main():
+ if getattr(process.current_process(), '_inheriting', False):
+ raise RuntimeError('''
+ An attempt has been made to start a new process before the
+ current process has finished its bootstrapping phase.
+
+ This probably means that you are not using fork to start your
+ child processes and you have forgotten to use the proper idiom
+ in the main module:
+
+ if __name__ == '__main__':
+ freeze_support()
+ ...
+
+ The "freeze_support()" line can be omitted if the program
+ is not going to be frozen to produce an executable.''')
+
+
+def get_preparation_data(name):
+ '''
+ Return info about parent needed by child to unpickle process object
+ '''
+ _check_not_importing_main()
+ d = dict(
+ log_to_stderr=util._log_to_stderr,
+ authkey=process.current_process().authkey,
+ )
+
+ if util._logger is not None:
+ d['log_level'] = util._logger.getEffectiveLevel()
+
+ sys_path=sys.path.copy()
+ try:
+ i = sys_path.index('')
+ except ValueError:
+ pass
+ else:
+ sys_path[i] = process.ORIGINAL_DIR
+
+ d.update(
+ name=name,
+ sys_path=sys_path,
+ sys_argv=sys.argv,
+ orig_dir=process.ORIGINAL_DIR,
+ dir=os.getcwd(),
+ start_method=popen.get_start_method(),
+ )
+
+ if sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
+ main_path = getattr(sys.modules['__main__'], '__file__', None)
+ if not main_path and sys.argv[0] not in ('', '-c'):
+ main_path = sys.argv[0]
+ if main_path is not None:
+ if (not os.path.isabs(main_path) and
+ process.ORIGINAL_DIR is not None):
+ main_path = os.path.join(process.ORIGINAL_DIR, main_path)
+ d['main_path'] = os.path.normpath(main_path)
+
+ return d
+
+#
+# Prepare current process
+#
+
+old_main_modules = []
+
+def prepare(data):
+ '''
+ Try to get current process ready to unpickle process object
+ '''
+ if 'name' in data:
+ process.current_process().name = data['name']
+
+ if 'authkey' in data:
+ process.current_process().authkey = data['authkey']
+
+ if 'log_to_stderr' in data and data['log_to_stderr']:
+ util.log_to_stderr()
+
+ if 'log_level' in data:
+ util.get_logger().setLevel(data['log_level'])
+
+ if 'sys_path' in data:
+ sys.path = data['sys_path']
+
+ if 'sys_argv' in data:
+ sys.argv = data['sys_argv']
+
+ if 'dir' in data:
+ os.chdir(data['dir'])
+
+ if 'orig_dir' in data:
+ process.ORIGINAL_DIR = data['orig_dir']
+
+ if 'start_method' in data:
+ popen.set_start_method(data['start_method'], start_helpers=False)
+
+ if 'main_path' in data:
+ import_main_path(data['main_path'])
+
+
+def import_main_path(main_path):
+ '''
+ Set sys.modules['__main__'] to module at main_path
+ '''
+ # XXX (ncoghlan): The following code makes several bogus
+ # assumptions regarding the relationship between __file__
+ # and a module's real name. See PEP 302 and issue #10845
+ if getattr(sys.modules['__main__'], '__file__', None) == main_path:
+ return
+
+ main_name = os.path.splitext(os.path.basename(main_path))[0]
+ if main_name == '__init__':
+ main_name = os.path.basename(os.path.dirname(main_path))
+
+ if main_name == '__main__':
+ main_module = sys.modules['__main__']
+ main_module.__file__ = main_path
+ elif main_name != 'ipython':
+ # Main modules not actually called __main__.py may
+ # contain additional code that should still be executed
+ import importlib
+ import types
+
+ if main_path is None:
+ dirs = None
+ elif os.path.basename(main_path).startswith('__init__.py'):
+ dirs = [os.path.dirname(os.path.dirname(main_path))]
+ else:
+ dirs = [os.path.dirname(main_path)]
+
+ assert main_name not in sys.modules, main_name
+ sys.modules.pop('__mp_main__', None)
+ # We should not try to load __main__
+ # since that would execute 'if __name__ == "__main__"'
+ # clauses, potentially causing a psuedo fork bomb.
+ loader = importlib.find_loader(main_name, path=dirs)
+ main_module = types.ModuleType(main_name)
+ try:
+ loader.init_module_attrs(main_module)
+ except AttributeError: # init_module_attrs is optional
+ pass
+ main_module.__name__ = '__mp_main__'
+ code = loader.get_code(main_name)
+ exec(code, main_module.__dict__)
+
+ old_main_modules.append(sys.modules['__main__'])
+ sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 0faca78..09736ef 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -11,20 +11,24 @@ __all__ = [
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
]
+import os
import threading
import sys
-
+import itertools
+import tempfile
import _multiprocessing
-from multiprocessing.process import current_process
-from multiprocessing.util import register_after_fork, debug
-from multiprocessing.forking import assert_spawning, Popen
+
from time import time as _time
+from . import popen
+from . import process
+from . import util
+
# Try to import the mp.synchronize module cleanly, if it fails
# raise ImportError for platforms lacking a working sem_open implementation.
# See issue 3770
try:
- from _multiprocessing import SemLock
+ from _multiprocessing import SemLock, sem_unlink
except (ImportError):
raise ImportError("This platform lacks a functioning sem_open" +
" implementation, therefore, the required" +
@@ -44,15 +48,45 @@ SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
class SemLock(object):
+ _rand = tempfile._RandomNameSequence()
+
def __init__(self, kind, value, maxvalue):
- sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
- debug('created semlock with handle %s' % sl.handle)
+ unlink_immediately = (sys.platform == 'win32' or
+ popen.get_start_method() == 'fork')
+ for i in range(100):
+ try:
+ sl = self._semlock = _multiprocessing.SemLock(
+ kind, value, maxvalue, self._make_name(),
+ unlink_immediately)
+ except FileExistsError:
+ pass
+ else:
+ break
+ else:
+ raise FileExistsError('cannot find name for semaphore')
+
+ util.debug('created semlock with handle %s' % sl.handle)
self._make_methods()
if sys.platform != 'win32':
def _after_fork(obj):
obj._semlock._after_fork()
- register_after_fork(self, _after_fork)
+ util.register_after_fork(self, _after_fork)
+
+ if self._semlock.name is not None:
+ # We only get here if we are on Unix with forking
+ # disabled. When the object is garbage collected or the
+ # process shuts down we unlink the semaphore name
+ from .semaphore_tracker import register
+ register(self._semlock.name)
+ util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
+ exitpriority=0)
+
+ @staticmethod
+ def _cleanup(name):
+ from .semaphore_tracker import unregister
+ sem_unlink(name)
+ unregister(name)
def _make_methods(self):
self.acquire = self._semlock.acquire
@@ -65,15 +99,24 @@ class SemLock(object):
return self._semlock.__exit__(*args)
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
sl = self._semlock
- return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
+ if sys.platform == 'win32':
+ h = popen.get_spawning_popen().duplicate_for_child(sl.handle)
+ else:
+ h = sl.handle
+ return (h, sl.kind, sl.maxvalue, sl.name)
def __setstate__(self, state):
self._semlock = _multiprocessing.SemLock._rebuild(*state)
- debug('recreated blocker with handle %r' % state[0])
+ util.debug('recreated blocker with handle %r' % state[0])
self._make_methods()
+ @staticmethod
+ def _make_name():
+ return '/%s-%s' % (process.current_process()._config['semprefix'],
+ next(SemLock._rand))
+
#
# Semaphore
#
@@ -122,7 +165,7 @@ class Lock(SemLock):
def __repr__(self):
try:
if self._semlock._is_mine():
- name = current_process().name
+ name = process.current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
elif self._semlock._get_value() == 1:
@@ -147,7 +190,7 @@ class RLock(SemLock):
def __repr__(self):
try:
if self._semlock._is_mine():
- name = current_process().name
+ name = process.current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
count = self._semlock._count()
@@ -175,7 +218,7 @@ class Condition(object):
self._make_methods()
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
return (self._lock, self._sleeping_count,
self._woken_count, self._wait_semaphore)
@@ -342,7 +385,7 @@ class Barrier(threading.Barrier):
def __init__(self, parties, action=None, timeout=None):
import struct
- from multiprocessing.heap import BufferWrapper
+ from .heap import BufferWrapper
wrapper = BufferWrapper(struct.calcsize('i') * 2)
cond = Condition()
self.__setstate__((parties, action, timeout, cond, wrapper))
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index f5862b4..d9e4799 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -17,13 +17,13 @@ import threading # we want threading to install it's
# cleanup function before multiprocessing does
from subprocess import _args_from_interpreter_flags
-from multiprocessing.process import current_process, active_children
+from . import process
__all__ = [
'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
'log_to_stderr', 'get_temp_dir', 'register_after_fork',
'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
- 'SUBDEBUG', 'SUBWARNING',
+ 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
]
#
@@ -71,8 +71,6 @@ def get_logger():
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
- logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
- logging.addLevelName(SUBWARNING, 'SUBWARNING')
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
@@ -111,13 +109,14 @@ def log_to_stderr(level=None):
def get_temp_dir():
# get name of a temp directory which will be automatically cleaned up
- if current_process()._tempdir is None:
+ tempdir = process.current_process()._config.get('tempdir')
+ if tempdir is None:
import shutil, tempfile
tempdir = tempfile.mkdtemp(prefix='pymp-')
info('created temp directory %s', tempdir)
Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
- current_process()._tempdir = tempdir
- return current_process()._tempdir
+ process.current_process()._config['tempdir'] = tempdir
+ return tempdir
#
# Support for reinitialization of objects when bootstrapping a child process
@@ -273,8 +272,8 @@ def is_exiting():
_exiting = False
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
- active_children=active_children,
- current_process=current_process):
+ active_children=process.active_children,
+ current_process=process.current_process):
# We hold on to references to functions in the arglist due to the
# situation described below, where this function is called after this
# module's globals are destroyed.
@@ -303,7 +302,7 @@ def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
# #9207.
for p in active_children():
- if p._daemonic:
+ if p.daemon:
info('calling terminate() for daemon %s', p.name)
p._popen.terminate()
@@ -335,3 +334,54 @@ class ForkAwareLocal(threading.local):
register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self):
return type(self), ()
+
+#
+# Close fds except those specified
+#
+
+try:
+ MAXFD = os.sysconf("SC_OPEN_MAX")
+except Exception:
+ MAXFD = 256
+
+def close_all_fds_except(fds):
+ fds = list(fds) + [-1, MAXFD]
+ fds.sort()
+ assert fds[-1] == MAXFD, 'fd too large'
+ for i in range(len(fds) - 1):
+ os.closerange(fds[i]+1, fds[i+1])
+
+#
+# Start a program with only specified fds kept open
+#
+
+def spawnv_passfds(path, args, passfds):
+ import _posixsubprocess, fcntl
+ passfds = sorted(passfds)
+ tmp = []
+ # temporarily unset CLOEXEC on passed fds
+ for fd in passfds:
+ flag = fcntl.fcntl(fd, fcntl.F_GETFD)
+ if flag & fcntl.FD_CLOEXEC:
+ fcntl.fcntl(fd, fcntl.F_SETFD, flag & ~fcntl.FD_CLOEXEC)
+ tmp.append((fd, flag))
+ errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe()
+ try:
+ return _posixsubprocess.fork_exec(
+ args, [os.fsencode(path)], True, passfds, None, None,
+ -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
+ False, False, None)
+ finally:
+ os.close(errpipe_read)
+ os.close(errpipe_write)
+ # reset CLOEXEC where necessary
+ for fd, flag in tmp:
+ fcntl.fcntl(fd, fcntl.F_SETFD, flag)
+
+#
+# Return pipe with CLOEXEC set on fds
+#
+
+def pipe():
+ import _posixsubprocess
+ return _posixsubprocess.cloexec_pipe()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 2a6381d..f777edc 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -43,7 +43,7 @@ from multiprocessing import util
try:
from multiprocessing import reduction
- HAS_REDUCTION = True
+ HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
except ImportError:
HAS_REDUCTION = False
@@ -99,6 +99,9 @@ try:
except:
MAXFD = 256
+# To speed up tests when using the forkserver, we can preload these:
+PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
+
#
# Some tests require ctypes
#
@@ -330,7 +333,6 @@ class _TestProcess(BaseTestCase):
@classmethod
def _test_recursion(cls, wconn, id):
- from multiprocessing import forking
wconn.send(id)
if len(id) < 2:
for i in range(2):
@@ -378,7 +380,7 @@ class _TestProcess(BaseTestCase):
self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
event.set()
p.join()
- self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
+ self.assertTrue(wait_for_handle(sentinel, timeout=1))
#
#
@@ -2493,7 +2495,7 @@ class _TestPicklingConnections(BaseTestCase):
@classmethod
def tearDownClass(cls):
- from multiprocessing.reduction import resource_sharer
+ from multiprocessing import resource_sharer
resource_sharer.stop(timeout=5)
@classmethod
@@ -2807,30 +2809,40 @@ class _TestFinalize(BaseTestCase):
# Test that from ... import * works for each module
#
-class _TestImportStar(BaseTestCase):
+class _TestImportStar(unittest.TestCase):
- ALLOWED_TYPES = ('processes',)
+ def get_module_names(self):
+ import glob
+ folder = os.path.dirname(multiprocessing.__file__)
+ pattern = os.path.join(folder, '*.py')
+ files = glob.glob(pattern)
+ modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
+ modules = ['multiprocessing.' + m for m in modules]
+ modules.remove('multiprocessing.__init__')
+ modules.append('multiprocessing')
+ return modules
def test_import(self):
- modules = [
- 'multiprocessing', 'multiprocessing.connection',
- 'multiprocessing.heap', 'multiprocessing.managers',
- 'multiprocessing.pool', 'multiprocessing.process',
- 'multiprocessing.synchronize', 'multiprocessing.util'
- ]
-
- if HAS_REDUCTION:
- modules.append('multiprocessing.reduction')
+ modules = self.get_module_names()
+ if sys.platform == 'win32':
+ modules.remove('multiprocessing.popen_fork')
+ modules.remove('multiprocessing.popen_forkserver')
+ modules.remove('multiprocessing.popen_spawn_posix')
+ else:
+ modules.remove('multiprocessing.popen_spawn_win32')
+ if not HAS_REDUCTION:
+ modules.remove('multiprocessing.popen_forkserver')
- if c_int is not None:
+ if c_int is None:
# This module requires _ctypes
- modules.append('multiprocessing.sharedctypes')
+ modules.remove('multiprocessing.sharedctypes')
for name in modules:
__import__(name)
mod = sys.modules[name]
+ self.assertTrue(hasattr(mod, '__all__'), name)
- for attr in getattr(mod, '__all__', ()):
+ for attr in mod.__all__:
self.assertTrue(
hasattr(mod, attr),
'%r does not have attribute %r' % (mod, attr)
@@ -2953,131 +2965,6 @@ class TestInvalidHandle(unittest.TestCase):
self.assertRaises((ValueError, OSError),
multiprocessing.connection.Connection, -1)
-#
-# Functions used to create test cases from the base ones in this module
-#
-
-def create_test_cases(Mixin, type):
- result = {}
- glob = globals()
- Type = type.capitalize()
- ALL_TYPES = {'processes', 'threads', 'manager'}
-
- for name in list(glob.keys()):
- if name.startswith('_Test'):
- base = glob[name]
- assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
- if type in base.ALLOWED_TYPES:
- newname = 'With' + Type + name[1:]
- class Temp(base, Mixin, unittest.TestCase):
- pass
- result[newname] = Temp
- Temp.__name__ = Temp.__qualname__ = newname
- Temp.__module__ = Mixin.__module__
- return result
-
-#
-# Create test cases
-#
-
-class ProcessesMixin(object):
- TYPE = 'processes'
- Process = multiprocessing.Process
- connection = multiprocessing.connection
- current_process = staticmethod(multiprocessing.current_process)
- active_children = staticmethod(multiprocessing.active_children)
- Pool = staticmethod(multiprocessing.Pool)
- Pipe = staticmethod(multiprocessing.Pipe)
- Queue = staticmethod(multiprocessing.Queue)
- JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
- Lock = staticmethod(multiprocessing.Lock)
- RLock = staticmethod(multiprocessing.RLock)
- Semaphore = staticmethod(multiprocessing.Semaphore)
- BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
- Condition = staticmethod(multiprocessing.Condition)
- Event = staticmethod(multiprocessing.Event)
- Barrier = staticmethod(multiprocessing.Barrier)
- Value = staticmethod(multiprocessing.Value)
- Array = staticmethod(multiprocessing.Array)
- RawValue = staticmethod(multiprocessing.RawValue)
- RawArray = staticmethod(multiprocessing.RawArray)
-
-testcases_processes = create_test_cases(ProcessesMixin, type='processes')
-globals().update(testcases_processes)
-
-
-class ManagerMixin(object):
- TYPE = 'manager'
- Process = multiprocessing.Process
- Queue = property(operator.attrgetter('manager.Queue'))
- JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
- Lock = property(operator.attrgetter('manager.Lock'))
- RLock = property(operator.attrgetter('manager.RLock'))
- Semaphore = property(operator.attrgetter('manager.Semaphore'))
- BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
- Condition = property(operator.attrgetter('manager.Condition'))
- Event = property(operator.attrgetter('manager.Event'))
- Barrier = property(operator.attrgetter('manager.Barrier'))
- Value = property(operator.attrgetter('manager.Value'))
- Array = property(operator.attrgetter('manager.Array'))
- list = property(operator.attrgetter('manager.list'))
- dict = property(operator.attrgetter('manager.dict'))
- Namespace = property(operator.attrgetter('manager.Namespace'))
-
- @classmethod
- def Pool(cls, *args, **kwds):
- return cls.manager.Pool(*args, **kwds)
-
- @classmethod
- def setUpClass(cls):
- cls.manager = multiprocessing.Manager()
-
- @classmethod
- def tearDownClass(cls):
- # only the manager process should be returned by active_children()
- # but this can take a bit on slow machines, so wait a few seconds
- # if there are other children too (see #17395)
- t = 0.01
- while len(multiprocessing.active_children()) > 1 and t < 5:
- time.sleep(t)
- t *= 2
- gc.collect() # do garbage collection
- if cls.manager._number_of_objects() != 0:
- # This is not really an error since some tests do not
- # ensure that all processes which hold a reference to a
- # managed object have been joined.
- print('Shared objects which still exist at manager shutdown:')
- print(cls.manager._debug_info())
- cls.manager.shutdown()
- cls.manager.join()
- cls.manager = None
-
-testcases_manager = create_test_cases(ManagerMixin, type='manager')
-globals().update(testcases_manager)
-
-
-class ThreadsMixin(object):
- TYPE = 'threads'
- Process = multiprocessing.dummy.Process
- connection = multiprocessing.dummy.connection
- current_process = staticmethod(multiprocessing.dummy.current_process)
- active_children = staticmethod(multiprocessing.dummy.active_children)
- Pool = staticmethod(multiprocessing.Pool)
- Pipe = staticmethod(multiprocessing.dummy.Pipe)
- Queue = staticmethod(multiprocessing.dummy.Queue)
- JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
- Lock = staticmethod(multiprocessing.dummy.Lock)
- RLock = staticmethod(multiprocessing.dummy.RLock)
- Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
- BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
- Condition = staticmethod(multiprocessing.dummy.Condition)
- Event = staticmethod(multiprocessing.dummy.Event)
- Barrier = staticmethod(multiprocessing.dummy.Barrier)
- Value = staticmethod(multiprocessing.dummy.Value)
- Array = staticmethod(multiprocessing.dummy.Array)
-
-testcases_threads = create_test_cases(ThreadsMixin, type='threads')
-globals().update(testcases_threads)
class OtherTest(unittest.TestCase):
@@ -3427,7 +3314,7 @@ class TestFlags(unittest.TestCase):
def test_flags(self):
import json, subprocess
# start child process using unusual flags
- prog = ('from test.test_multiprocessing import TestFlags; ' +
+ prog = ('from test._test_multiprocessing import TestFlags; ' +
'TestFlags.run_in_child()')
data = subprocess.check_output(
[sys.executable, '-E', '-S', '-O', '-c', prog])
@@ -3474,13 +3361,14 @@ class TestTimeouts(unittest.TestCase):
class TestNoForkBomb(unittest.TestCase):
def test_noforkbomb(self):
+ sm = multiprocessing.get_start_method()
name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
- if WIN32:
- rc, out, err = test.script_helper.assert_python_failure(name)
+ if sm != 'fork':
+ rc, out, err = test.script_helper.assert_python_failure(name, sm)
self.assertEqual('', out.decode('ascii'))
self.assertIn('RuntimeError', err.decode('ascii'))
else:
- rc, out, err = test.script_helper.assert_python_ok(name)
+ rc, out, err = test.script_helper.assert_python_ok(name, sm)
self.assertEqual('123', out.decode('ascii').rstrip())
self.assertEqual('', err.decode('ascii'))
@@ -3514,6 +3402,72 @@ class TestForkAwareThreadLock(unittest.TestCase):
self.assertLessEqual(new_size, old_size)
#
+# Check that non-forked child processes do not inherit unneeded fds/handles
+#
+
+class TestCloseFds(unittest.TestCase):
+
+ def get_high_socket_fd(self):
+ if WIN32:
+ # The child process will not have any socket handles, so
+ # calling socket.fromfd() should produce WSAENOTSOCK even
+ # if there is a handle of the same number.
+ return socket.socket().detach()
+ else:
+ # We want to produce a socket with an fd high enough that a
+ # freshly created child process will not have any fds as high.
+ fd = socket.socket().detach()
+ to_close = []
+ while fd < 50:
+ to_close.append(fd)
+ fd = os.dup(fd)
+ for x in to_close:
+ os.close(x)
+ return fd
+
+ def close(self, fd):
+ if WIN32:
+ socket.socket(fileno=fd).close()
+ else:
+ os.close(fd)
+
+ @classmethod
+ def _test_closefds(cls, conn, fd):
+ try:
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ except Exception as e:
+ conn.send(e)
+ else:
+ s.close()
+ conn.send(None)
+
+ def test_closefd(self):
+ if not HAS_REDUCTION:
+ raise unittest.SkipTest('requires fd pickling')
+
+ reader, writer = multiprocessing.Pipe()
+ fd = self.get_high_socket_fd()
+ try:
+ p = multiprocessing.Process(target=self._test_closefds,
+ args=(writer, fd))
+ p.start()
+ writer.close()
+ e = reader.recv()
+ p.join(timeout=5)
+ finally:
+ self.close(fd)
+ writer.close()
+ reader.close()
+
+ if multiprocessing.get_start_method() == 'fork':
+ self.assertIs(e, None)
+ else:
+ WSAENOTSOCK = 10038
+ self.assertIsInstance(e, OSError)
+ self.assertTrue(e.errno == errno.EBADF or
+ e.winerror == WSAENOTSOCK, e)
+
+#
# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
#
@@ -3557,10 +3511,10 @@ class TestIgnoreEINTR(unittest.TestCase):
def handler(signum, frame):
pass
signal.signal(signal.SIGUSR1, handler)
- l = multiprocessing.connection.Listener()
- conn.send(l.address)
- a = l.accept()
- a.send('welcome')
+ with multiprocessing.connection.Listener() as l:
+ conn.send(l.address)
+ a = l.accept()
+ a.send('welcome')
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
def test_ignore_listener(self):
@@ -3581,26 +3535,221 @@ class TestIgnoreEINTR(unittest.TestCase):
finally:
conn.close()
+class TestStartMethod(unittest.TestCase):
+ def test_set_get(self):
+ multiprocessing.set_forkserver_preload(PRELOAD)
+ count = 0
+ old_method = multiprocessing.get_start_method()
+ try:
+ for method in ('fork', 'spawn', 'forkserver'):
+ try:
+ multiprocessing.set_start_method(method)
+ except ValueError:
+ continue
+ self.assertEqual(multiprocessing.get_start_method(), method)
+ count += 1
+ finally:
+ multiprocessing.set_start_method(old_method)
+ self.assertGreaterEqual(count, 1)
+
+ def test_get_all(self):
+ methods = multiprocessing.get_all_start_methods()
+ if sys.platform == 'win32':
+ self.assertEqual(methods, ['spawn'])
+ else:
+ self.assertTrue(methods == ['fork', 'spawn'] or
+ methods == ['fork', 'spawn', 'forkserver'])
+
+#
+# Check that killing process does not leak named semaphores
+#
+
+@unittest.skipIf(sys.platform == "win32",
+ "test semantics don't make sense on Windows")
+class TestSemaphoreTracker(unittest.TestCase):
+ def test_semaphore_tracker(self):
+ import subprocess
+ cmd = '''if 1:
+ import multiprocessing as mp, time, os
+ mp.set_start_method("spawn")
+ lock1 = mp.Lock()
+ lock2 = mp.Lock()
+ os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
+ os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
+ time.sleep(10)
+ '''
+ print("\nTestSemaphoreTracker will output warnings a bit like:\n"
+ " ... There appear to be 2 leaked semaphores"
+ " to clean up at shutdown\n"
+ " ... '/mp-03jgqz': [Errno 2] No such file or directory",
+ file=sys.stderr)
+ r, w = os.pipe()
+ p = subprocess.Popen([sys.executable,
+ #'-W', 'ignore:semaphore_tracker',
+ '-c', cmd % (w, w)],
+ pass_fds=[w])
+ os.close(w)
+ with open(r, 'rb', closefd=True) as f:
+ name1 = f.readline().rstrip().decode('ascii')
+ name2 = f.readline().rstrip().decode('ascii')
+ _multiprocessing.sem_unlink(name1)
+ p.terminate()
+ p.wait()
+ time.sleep(1.0)
+ with self.assertRaises(OSError) as ctx:
+ _multiprocessing.sem_unlink(name2)
+ # docs say it should be ENOENT, but OSX seems to give EINVAL
+ self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
+
#
+# Mixins
#
+
+class ProcessesMixin(object):
+ TYPE = 'processes'
+ Process = multiprocessing.Process
+ connection = multiprocessing.connection
+ current_process = staticmethod(multiprocessing.current_process)
+ active_children = staticmethod(multiprocessing.active_children)
+ Pool = staticmethod(multiprocessing.Pool)
+ Pipe = staticmethod(multiprocessing.Pipe)
+ Queue = staticmethod(multiprocessing.Queue)
+ JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
+ Lock = staticmethod(multiprocessing.Lock)
+ RLock = staticmethod(multiprocessing.RLock)
+ Semaphore = staticmethod(multiprocessing.Semaphore)
+ BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
+ Condition = staticmethod(multiprocessing.Condition)
+ Event = staticmethod(multiprocessing.Event)
+ Barrier = staticmethod(multiprocessing.Barrier)
+ Value = staticmethod(multiprocessing.Value)
+ Array = staticmethod(multiprocessing.Array)
+ RawValue = staticmethod(multiprocessing.RawValue)
+ RawArray = staticmethod(multiprocessing.RawArray)
+
+
+class ManagerMixin(object):
+ TYPE = 'manager'
+ Process = multiprocessing.Process
+ Queue = property(operator.attrgetter('manager.Queue'))
+ JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
+ Lock = property(operator.attrgetter('manager.Lock'))
+ RLock = property(operator.attrgetter('manager.RLock'))
+ Semaphore = property(operator.attrgetter('manager.Semaphore'))
+ BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
+ Condition = property(operator.attrgetter('manager.Condition'))
+ Event = property(operator.attrgetter('manager.Event'))
+ Barrier = property(operator.attrgetter('manager.Barrier'))
+ Value = property(operator.attrgetter('manager.Value'))
+ Array = property(operator.attrgetter('manager.Array'))
+ list = property(operator.attrgetter('manager.list'))
+ dict = property(operator.attrgetter('manager.dict'))
+ Namespace = property(operator.attrgetter('manager.Namespace'))
+
+ @classmethod
+ def Pool(cls, *args, **kwds):
+ return cls.manager.Pool(*args, **kwds)
+
+ @classmethod
+ def setUpClass(cls):
+ cls.manager = multiprocessing.Manager()
+
+ @classmethod
+ def tearDownClass(cls):
+ # only the manager process should be returned by active_children()
+ # but this can take a bit on slow machines, so wait a few seconds
+ # if there are other children too (see #17395)
+ t = 0.01
+ while len(multiprocessing.active_children()) > 1 and t < 5:
+ time.sleep(t)
+ t *= 2
+ gc.collect() # do garbage collection
+ if cls.manager._number_of_objects() != 0:
+ # This is not really an error since some tests do not
+ # ensure that all processes which hold a reference to a
+ # managed object have been joined.
+ print('Shared objects which still exist at manager shutdown:')
+ print(cls.manager._debug_info())
+ cls.manager.shutdown()
+ cls.manager.join()
+ cls.manager = None
+
+
+class ThreadsMixin(object):
+ TYPE = 'threads'
+ Process = multiprocessing.dummy.Process
+ connection = multiprocessing.dummy.connection
+ current_process = staticmethod(multiprocessing.dummy.current_process)
+ active_children = staticmethod(multiprocessing.dummy.active_children)
+ Pool = staticmethod(multiprocessing.Pool)
+ Pipe = staticmethod(multiprocessing.dummy.Pipe)
+ Queue = staticmethod(multiprocessing.dummy.Queue)
+ JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
+ Lock = staticmethod(multiprocessing.dummy.Lock)
+ RLock = staticmethod(multiprocessing.dummy.RLock)
+ Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
+ BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
+ Condition = staticmethod(multiprocessing.dummy.Condition)
+ Event = staticmethod(multiprocessing.dummy.Event)
+ Barrier = staticmethod(multiprocessing.dummy.Barrier)
+ Value = staticmethod(multiprocessing.dummy.Value)
+ Array = staticmethod(multiprocessing.dummy.Array)
+
+#
+# Functions used to create test cases from the base ones in this module
#
-def setUpModule():
- if sys.platform.startswith("linux"):
- try:
- lock = multiprocessing.RLock()
- except OSError:
- raise unittest.SkipTest("OSError raises on RLock creation, "
- "see issue 3111!")
- check_enough_semaphores()
- util.get_temp_dir() # creates temp directory for use by all processes
- multiprocessing.get_logger().setLevel(LOG_LEVEL)
+def install_tests_in_module_dict(remote_globs, start_method):
+ __module__ = remote_globs['__name__']
+ local_globs = globals()
+ ALL_TYPES = {'processes', 'threads', 'manager'}
+ for name, base in local_globs.items():
+ if not isinstance(base, type):
+ continue
+ if issubclass(base, BaseTestCase):
+ if base is BaseTestCase:
+ continue
+ assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
+ for type_ in base.ALLOWED_TYPES:
+ newname = 'With' + type_.capitalize() + name[1:]
+ Mixin = local_globs[type_.capitalize() + 'Mixin']
+ class Temp(base, Mixin, unittest.TestCase):
+ pass
+ Temp.__name__ = Temp.__qualname__ = newname
+ Temp.__module__ = __module__
+ remote_globs[newname] = Temp
+ elif issubclass(base, unittest.TestCase):
+ class Temp(base, object):
+ pass
+ Temp.__name__ = Temp.__qualname__ = name
+ Temp.__module__ = __module__
+ remote_globs[name] = Temp
-def tearDownModule():
- # pause a bit so we don't get warning about dangling threads/processes
- time.sleep(0.5)
+ def setUpModule():
+ multiprocessing.set_forkserver_preload(PRELOAD)
+ remote_globs['old_start_method'] = multiprocessing.get_start_method()
+ try:
+ multiprocessing.set_start_method(start_method)
+ except ValueError:
+ raise unittest.SkipTest(start_method +
+ ' start method not supported')
+ print('Using start method %r' % multiprocessing.get_start_method())
+ if sys.platform.startswith("linux"):
+ try:
+ lock = multiprocessing.RLock()
+ except OSError:
+ raise unittest.SkipTest("OSError raises on RLock creation, "
+ "see issue 3111!")
+ check_enough_semaphores()
+ util.get_temp_dir() # creates temp directory
+ multiprocessing.get_logger().setLevel(LOG_LEVEL)
+
+ def tearDownModule():
+ multiprocessing.set_start_method(remote_globs['old_start_method'])
+ # pause a bit so we don't get warning about dangling threads/processes
+ time.sleep(0.5)
-if __name__ == '__main__':
- unittest.main()
+ remote_globs['setUpModule'] = setUpModule
+ remote_globs['tearDownModule'] = tearDownModule
diff --git a/Lib/test/mp_fork_bomb.py b/Lib/test/mp_fork_bomb.py
index 908afe3..017e010 100644
--- a/Lib/test/mp_fork_bomb.py
+++ b/Lib/test/mp_fork_bomb.py
@@ -7,6 +7,11 @@ def foo():
# correctly on Windows. However, we should get a RuntimeError rather
# than the Windows equivalent of a fork bomb.
+if len(sys.argv) > 1:
+ multiprocessing.set_start_method(sys.argv[1])
+else:
+ multiprocessing.set_start_method('spawn')
+
p = multiprocessing.Process(target=foo)
p.start()
p.join()
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index c8bbcb2..b9945d7 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -149,7 +149,7 @@ try:
except ImportError:
threading = None
try:
- import multiprocessing.process
+ import _multiprocessing, multiprocessing.process
except ImportError:
multiprocessing = None
diff --git a/Lib/test/test_multiprocessing_fork.py b/Lib/test/test_multiprocessing_fork.py
new file mode 100644
index 0000000..2bf4e75
--- /dev/null
+++ b/Lib/test/test_multiprocessing_fork.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'fork')
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_multiprocessing_forkserver.py b/Lib/test/test_multiprocessing_forkserver.py
new file mode 100644
index 0000000..193a04a
--- /dev/null
+++ b/Lib/test/test_multiprocessing_forkserver.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'forkserver')
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_multiprocessing_spawn.py b/Lib/test/test_multiprocessing_spawn.py
new file mode 100644
index 0000000..334ae9e
--- /dev/null
+++ b/Lib/test/test_multiprocessing_spawn.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'spawn')
+
+if __name__ == '__main__':
+ unittest.main()