diff options
author | Richard Oudkerk <shibturn@gmail.com> | 2013-10-16 15:41:56 (GMT) |
---|---|---|
committer | Richard Oudkerk <shibturn@gmail.com> | 2013-10-16 15:41:56 (GMT) |
commit | b1694cf588ca915c003b9b79c9fdeab82deb9476 (patch) | |
tree | 923f376dd771dbc25815cad0708c753ef3af3b14 | |
parent | 3e4b52875e0162b536817da93f54b1988195c3ab (diff) | |
download | cpython-b1694cf588ca915c003b9b79c9fdeab82deb9476.zip cpython-b1694cf588ca915c003b9b79c9fdeab82deb9476.tar.gz cpython-b1694cf588ca915c003b9b79c9fdeab82deb9476.tar.bz2 |
Issue #18999: Make multiprocessing use context objects.
This allows different parts of a program to use different methods for
starting processes without interfering with each other.
-rw-r--r-- | Doc/library/multiprocessing.rst | 76 | ||||
-rw-r--r-- | Lib/multiprocessing/__init__.py | 260 | ||||
-rw-r--r-- | Lib/multiprocessing/context.py | 348 | ||||
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 214 | ||||
-rw-r--r-- | Lib/multiprocessing/heap.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/managers.py | 11 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/popen.py | 78 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_fork.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_forkserver.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_posix.py | 13 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_win32.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 21 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 29 | ||||
-rw-r--r-- | Lib/multiprocessing/reduction.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 112 | ||||
-rw-r--r-- | Lib/multiprocessing/sharedctypes.py | 37 | ||||
-rw-r--r-- | Lib/multiprocessing/spawn.py | 8 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 53 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 42 |
20 files changed, 736 insertions, 614 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 8535aed..6a72759 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -98,8 +98,8 @@ necessary, see :ref:`multiprocessing-programming`. -Start methods -~~~~~~~~~~~~~ +Contexts and start methods +~~~~~~~~~~~~~~~~~~~~~~~~~~ Depending on the platform, :mod:`multiprocessing` supports three ways to start a process. These *start methods* are @@ -132,7 +132,7 @@ to start a process. These *start methods* are unnecessary resources are inherited. Available on Unix platforms which support passing file descriptors - over unix pipes. + over Unix pipes. Before Python 3.4 *fork* was the only option available on Unix. Also, prior to Python 3.4, child processes would inherit all the parents @@ -153,18 +153,46 @@ example:: import multiprocessing as mp - def foo(): - print('hello') + def foo(q): + q.put('hello') if __name__ == '__main__': mp.set_start_method('spawn') - p = mp.Process(target=foo) + q = mp.Queue() + p = mp.Process(target=foo, args=(q,)) p.start() + print(q.get()) p.join() :func:`set_start_method` should not be used more than once in the program. +Alternatively, you can use :func:`get_context` to obtain a context +object. Context objects have the same API as the multiprocessing +module, and allow one to use multiple start methods in the same +program. :: + + import multiprocessing as mp + + def foo(q): + q.put('hello') + + if __name__ == '__main__': + ctx = mp.get_context('spawn') + q = ctx.Queue() + p = ctx.Process(target=foo, args=(q,)) + p.start() + print(q.get()) + p.join() + +Note that objects related to one context may not be compatible with +processes for a different context. In particular, locks created using +the *fork* context cannot be passed to a processes started using the +*spawn* or *forkserver* start methods. + +A library which wants to use a particular start method should probably +use :func:`get_context` to avoid interfering with the choice of the +library user. Exchanging objects between processes @@ -859,11 +887,30 @@ Miscellaneous .. versionadded:: 3.4 -.. function:: get_start_method() +.. function:: get_context(method=None) + + Return a context object which has the same attributes as the + :mod:`multiprocessing` module. - Return the current start method. This can be ``'fork'``, - ``'spawn'`` or ``'forkserver'``. ``'fork'`` is the default on - Unix, while ``'spawn'`` is the default on Windows. + If *method* is *None* then the default context is returned. + Otherwise *method* should be ``'fork'``, ``'spawn'``, + ``'forkserver'``. :exc:`ValueError` is raised if the specified + start method is not available. + + .. versionadded:: 3.4 + +.. function:: get_start_method(allow_none=False) + + Return the name of start method used for starting processes. + + If the start method has not been fixed and *allow_none* is false, + then the start method is fixed to the default and the name is + returned. If the start method has not been fixed and *allow_none* + is true then *None* is returned. + + The return value can be ``'fork'``, ``'spawn'``, ``'forkserver'`` + or *None*. ``'fork'`` is the default on Unix, while ``'spawn'`` is + the default on Windows. .. versionadded:: 3.4 @@ -1785,7 +1832,7 @@ Process Pools One can create a pool of processes which will carry out tasks submitted to it with the :class:`Pool` class. -.. class:: Pool([processes[, initializer[, initargs[, maxtasksperchild]]]]) +.. class:: Pool([processes[, initializer[, initargs[, maxtasksperchild [, context]]]]]) A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and @@ -1805,6 +1852,13 @@ with the :class:`Pool` class. unused resources to be freed. The default *maxtasksperchild* is None, which means worker processes will live as long as the pool. + .. versionadded:: 3.4 + *context* can be used to specify the context used for starting + the worker processes. Usually a pool is created using the + function :func:`multiprocessing.Pool` or the :meth:`Pool` method + of a context object. In both cases *context* is set + appropriately. + .. note:: Worker processes within a :class:`Pool` typically live for the complete diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index fd75839..86df638 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -12,27 +12,16 @@ # Licensed to PSF under a Contributor Agreement. # -__version__ = '0.70a1' - -__all__ = [ - 'Process', 'current_process', 'active_children', 'freeze_support', - 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger', - 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError', - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', - 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', - 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', - 'set_executable', 'set_start_method', 'get_start_method', - 'get_all_start_methods', 'set_forkserver_preload' - ] +import sys +from . import context # -# Imports +# Copy stuff from default context # -import os -import sys - -from .process import Process, current_process, active_children +globals().update((name, getattr(context._default_context, name)) + for name in context._default_context.__all__) +__all__ = context._default_context.__all__ # # XXX These should not really be documented or public. @@ -47,240 +36,3 @@ SUBWARNING = 25 if '__main__' in sys.modules: sys.modules['__mp_main__'] = sys.modules['__main__'] - -# -# Exceptions -# - -class ProcessError(Exception): - pass - -class BufferTooShort(ProcessError): - pass - -class TimeoutError(ProcessError): - pass - -class AuthenticationError(ProcessError): - pass - -# -# Definitions not depending on native semaphores -# - -def Manager(): - ''' - Returns a manager associated with a running server process - - The managers methods such as `Lock()`, `Condition()` and `Queue()` - can be used to create shared objects. - ''' - from .managers import SyncManager - m = SyncManager() - m.start() - return m - -def Pipe(duplex=True): - ''' - Returns two connection object connected by a pipe - ''' - from .connection import Pipe - return Pipe(duplex) - -def cpu_count(): - ''' - Returns the number of CPUs in the system - ''' - num = os.cpu_count() - if num is None: - raise NotImplementedError('cannot determine number of cpus') - else: - return num - -def freeze_support(): - ''' - Check whether this is a fake forked process in a frozen executable. - If so then run code specified by commandline and exit. - ''' - if sys.platform == 'win32' and getattr(sys, 'frozen', False): - from .spawn import freeze_support - freeze_support() - -def get_logger(): - ''' - Return package logger -- if it does not already exist then it is created - ''' - from .util import get_logger - return get_logger() - -def log_to_stderr(level=None): - ''' - Turn on logging and add a handler which prints to stderr - ''' - from .util import log_to_stderr - return log_to_stderr(level) - -def allow_connection_pickling(): - ''' - Install support for sending connections and sockets between processes - ''' - # This is undocumented. In previous versions of multiprocessing - # its only effect was to make socket objects inheritable on Windows. - from . import connection - -# -# Definitions depending on native semaphores -# - -def Lock(): - ''' - Returns a non-recursive lock object - ''' - from .synchronize import Lock - return Lock() - -def RLock(): - ''' - Returns a recursive lock object - ''' - from .synchronize import RLock - return RLock() - -def Condition(lock=None): - ''' - Returns a condition object - ''' - from .synchronize import Condition - return Condition(lock) - -def Semaphore(value=1): - ''' - Returns a semaphore object - ''' - from .synchronize import Semaphore - return Semaphore(value) - -def BoundedSemaphore(value=1): - ''' - Returns a bounded semaphore object - ''' - from .synchronize import BoundedSemaphore - return BoundedSemaphore(value) - -def Event(): - ''' - Returns an event object - ''' - from .synchronize import Event - return Event() - -def Barrier(parties, action=None, timeout=None): - ''' - Returns a barrier object - ''' - from .synchronize import Barrier - return Barrier(parties, action, timeout) - -def Queue(maxsize=0): - ''' - Returns a queue object - ''' - from .queues import Queue - return Queue(maxsize) - -def JoinableQueue(maxsize=0): - ''' - Returns a queue object - ''' - from .queues import JoinableQueue - return JoinableQueue(maxsize) - -def SimpleQueue(): - ''' - Returns a queue object - ''' - from .queues import SimpleQueue - return SimpleQueue() - -def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): - ''' - Returns a process pool object - ''' - from .pool import Pool - return Pool(processes, initializer, initargs, maxtasksperchild) - -def RawValue(typecode_or_type, *args): - ''' - Returns a shared object - ''' - from .sharedctypes import RawValue - return RawValue(typecode_or_type, *args) - -def RawArray(typecode_or_type, size_or_initializer): - ''' - Returns a shared array - ''' - from .sharedctypes import RawArray - return RawArray(typecode_or_type, size_or_initializer) - -def Value(typecode_or_type, *args, lock=True): - ''' - Returns a synchronized shared object - ''' - from .sharedctypes import Value - return Value(typecode_or_type, *args, lock=lock) - -def Array(typecode_or_type, size_or_initializer, *, lock=True): - ''' - Returns a synchronized shared array - ''' - from .sharedctypes import Array - return Array(typecode_or_type, size_or_initializer, lock=lock) - -# -# -# - -def set_executable(executable): - ''' - Sets the path to a python.exe or pythonw.exe binary used to run - child processes instead of sys.executable when using the 'spawn' - start method. Useful for people embedding Python. - ''' - from .spawn import set_executable - set_executable(executable) - -def set_start_method(method): - ''' - Set method for starting processes: 'fork', 'spawn' or 'forkserver'. - ''' - from .popen import set_start_method - set_start_method(method) - -def get_start_method(): - ''' - Get method for starting processes: 'fork', 'spawn' or 'forkserver'. - ''' - from .popen import get_start_method - return get_start_method() - -def get_all_start_methods(): - ''' - Get list of availables start methods, default first. - ''' - from .popen import get_all_start_methods - return get_all_start_methods() - -def set_forkserver_preload(module_names): - ''' - Set list of module names to try to load in the forkserver process - when it is started. Properly chosen this can significantly reduce - the cost of starting a new process using the forkserver method. - The default list is ['__main__']. - ''' - try: - from .forkserver import set_forkserver_preload - except ImportError: - pass - else: - set_forkserver_preload(module_names) diff --git a/Lib/multiprocessing/context.py b/Lib/multiprocessing/context.py new file mode 100644 index 0000000..63849f9 --- /dev/null +++ b/Lib/multiprocessing/context.py @@ -0,0 +1,348 @@ +import os +import sys +import threading + +from . import process + +__all__ = [] # things are copied from here to __init__.py + +# +# Exceptions +# + +class ProcessError(Exception): + pass + +class BufferTooShort(ProcessError): + pass + +class TimeoutError(ProcessError): + pass + +class AuthenticationError(ProcessError): + pass + +# +# Base type for contexts +# + +class BaseContext(object): + + ProcessError = ProcessError + BufferTooShort = BufferTooShort + TimeoutError = TimeoutError + AuthenticationError = AuthenticationError + + current_process = staticmethod(process.current_process) + active_children = staticmethod(process.active_children) + + def cpu_count(self): + '''Returns the number of CPUs in the system''' + num = os.cpu_count() + if num is None: + raise NotImplementedError('cannot determine number of cpus') + else: + return num + + def Manager(self): + '''Returns a manager associated with a running server process + + The managers methods such as `Lock()`, `Condition()` and `Queue()` + can be used to create shared objects. + ''' + from .managers import SyncManager + m = SyncManager(ctx=self.get_context()) + m.start() + return m + + def Pipe(self, duplex=True): + '''Returns two connection object connected by a pipe''' + from .connection import Pipe + return Pipe(duplex) + + def Lock(self): + '''Returns a non-recursive lock object''' + from .synchronize import Lock + return Lock(ctx=self.get_context()) + + def RLock(self): + '''Returns a recursive lock object''' + from .synchronize import RLock + return RLock(ctx=self.get_context()) + + def Condition(self, lock=None): + '''Returns a condition object''' + from .synchronize import Condition + return Condition(lock, ctx=self.get_context()) + + def Semaphore(self, value=1): + '''Returns a semaphore object''' + from .synchronize import Semaphore + return Semaphore(value, ctx=self.get_context()) + + def BoundedSemaphore(self, value=1): + '''Returns a bounded semaphore object''' + from .synchronize import BoundedSemaphore + return BoundedSemaphore(value, ctx=self.get_context()) + + def Event(self): + '''Returns an event object''' + from .synchronize import Event + return Event(ctx=self.get_context()) + + def Barrier(self, parties, action=None, timeout=None): + '''Returns a barrier object''' + from .synchronize import Barrier + return Barrier(parties, action, timeout, ctx=self.get_context()) + + def Queue(self, maxsize=0): + '''Returns a queue object''' + from .queues import Queue + return Queue(maxsize, ctx=self.get_context()) + + def JoinableQueue(self, maxsize=0): + '''Returns a queue object''' + from .queues import JoinableQueue + return JoinableQueue(maxsize, ctx=self.get_context()) + + def SimpleQueue(self): + '''Returns a queue object''' + from .queues import SimpleQueue + return SimpleQueue(ctx=self.get_context()) + + def Pool(self, processes=None, initializer=None, initargs=(), + maxtasksperchild=None): + '''Returns a process pool object''' + from .pool import Pool + return Pool(processes, initializer, initargs, maxtasksperchild, + context=self.get_context()) + + def RawValue(self, typecode_or_type, *args): + '''Returns a shared object''' + from .sharedctypes import RawValue + return RawValue(typecode_or_type, *args) + + def RawArray(self, typecode_or_type, size_or_initializer): + '''Returns a shared array''' + from .sharedctypes import RawArray + return RawArray(typecode_or_type, size_or_initializer) + + def Value(self, typecode_or_type, *args, lock=True): + '''Returns a synchronized shared object''' + from .sharedctypes import Value + return Value(typecode_or_type, *args, lock=lock, + ctx=self.get_context()) + + def Array(self, typecode_or_type, size_or_initializer, *, lock=True): + '''Returns a synchronized shared array''' + from .sharedctypes import Array + return Array(typecode_or_type, size_or_initializer, lock=lock, + ctx=self.get_context()) + + def freeze_support(self): + '''Check whether this is a fake forked process in a frozen executable. + If so then run code specified by commandline and exit. + ''' + if sys.platform == 'win32' and getattr(sys, 'frozen', False): + from .spawn import freeze_support + freeze_support() + + def get_logger(self): + '''Return package logger -- if it does not already exist then + it is created. + ''' + from .util import get_logger + return get_logger() + + def log_to_stderr(self, level=None): + '''Turn on logging and add a handler which prints to stderr''' + from .util import log_to_stderr + return log_to_stderr(level) + + def allow_connection_pickling(self): + '''Install support for sending connections and sockets + between processes + ''' + # This is undocumented. In previous versions of multiprocessing + # its only effect was to make socket objects inheritable on Windows. + from . import connection + + def set_executable(self, executable): + '''Sets the path to a python.exe or pythonw.exe binary used to run + child processes instead of sys.executable when using the 'spawn' + start method. Useful for people embedding Python. + ''' + from .spawn import set_executable + set_executable(executable) + + def set_forkserver_preload(self, module_names): + '''Set list of module names to try to load in forkserver process. + This is really just a hint. + ''' + from .forkserver import set_forkserver_preload + set_forkserver_preload(module_names) + + def get_context(self, method=None): + if method is None: + return self + try: + ctx = _concrete_contexts[method] + except KeyError: + raise ValueError('cannot find context for %r' % method) + ctx._check_available() + return ctx + + def get_start_method(self, allow_none=False): + return self._name + + def set_start_method(self, method=None): + raise ValueError('cannot set start method of concrete context') + + def _check_available(self): + pass + +# +# Type of default context -- underlying context can be set at most once +# + +class Process(process.BaseProcess): + _start_method = None + @staticmethod + def _Popen(process_obj): + return _default_context.get_context().Process._Popen(process_obj) + +class DefaultContext(BaseContext): + Process = Process + + def __init__(self, context): + self._default_context = context + self._actual_context = None + + def get_context(self, method=None): + if method is None: + if self._actual_context is None: + self._actual_context = self._default_context + return self._actual_context + else: + return super().get_context(method) + + def set_start_method(self, method, force=False): + if self._actual_context is not None and not force: + raise RuntimeError('context has already been set') + if method is None and force: + self._actual_context = None + return + self._actual_context = self.get_context(method) + + def get_start_method(self, allow_none=False): + if self._actual_context is None: + if allow_none: + return None + self._actual_context = self._default_context + return self._actual_context._name + + def get_all_start_methods(self): + if sys.platform == 'win32': + return ['spawn'] + else: + from . import reduction + if reduction.HAVE_SEND_HANDLE: + return ['fork', 'spawn', 'forkserver'] + else: + return ['fork', 'spawn'] + +DefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_') + +# +# Context types for fixed start method +# + +if sys.platform != 'win32': + + class ForkProcess(process.BaseProcess): + _start_method = 'fork' + @staticmethod + def _Popen(process_obj): + from .popen_fork import Popen + return Popen(process_obj) + + class SpawnProcess(process.BaseProcess): + _start_method = 'spawn' + @staticmethod + def _Popen(process_obj): + from .popen_spawn_posix import Popen + return Popen(process_obj) + + class ForkServerProcess(process.BaseProcess): + _start_method = 'forkserver' + @staticmethod + def _Popen(process_obj): + from .popen_forkserver import Popen + return Popen(process_obj) + + class ForkContext(BaseContext): + _name = 'fork' + Process = ForkProcess + + class SpawnContext(BaseContext): + _name = 'spawn' + Process = SpawnProcess + + class ForkServerContext(BaseContext): + _name = 'forkserver' + Process = ForkServerProcess + def _check_available(self): + from . import reduction + if not reduction.HAVE_SEND_HANDLE: + raise ValueError('forkserver start method not available') + + _concrete_contexts = { + 'fork': ForkContext(), + 'spawn': SpawnContext(), + 'forkserver': ForkServerContext(), + } + _default_context = DefaultContext(_concrete_contexts['fork']) + +else: + + class SpawnProcess(process.BaseProcess): + _start_method = 'spawn' + @staticmethod + def _Popen(process_obj): + from .popen_spawn_win32 import Popen + return Popen(process_obj) + + class SpawnContext(BaseContext): + _name = 'spawn' + Process = SpawnProcess + + _concrete_contexts = { + 'spawn': SpawnContext(), + } + _default_context = DefaultContext(_concrete_contexts['spawn']) + +# +# Force the start method +# + +def _force_start_method(method): + _default_context._actual_context = _concrete_contexts[method] + +# +# Check that the current thread is spawning a child process +# + +_tls = threading.local() + +def get_spawning_popen(): + return getattr(_tls, 'spawning_popen', None) + +def set_spawning_popen(popen): + _tls.spawning_popen = popen + +def assert_spawning(obj): + if get_spawning_popen() is None: + raise RuntimeError( + '%s objects should only be shared between processes' + ' through inheritance' % type(obj).__name__ + ) diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 0a23707..387517e 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -24,105 +24,113 @@ __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process', MAXFDS_TO_SEND = 256 UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t -_forkserver_address = None -_forkserver_alive_fd = None -_inherited_fds = None -_lock = threading.Lock() -_preload_modules = ['__main__'] - # -# Public function +# Forkserver class # -def set_forkserver_preload(modules_names): - '''Set list of module names to try to load in forkserver process.''' - global _preload_modules - _preload_modules = modules_names - - -def get_inherited_fds(): - '''Return list of fds inherited from parent process. - - This returns None if the current process was not started by fork server. - ''' - return _inherited_fds - - -def connect_to_new_process(fds): - '''Request forkserver to create a child process. - - Returns a pair of fds (status_r, data_w). The calling process can read - the child process's pid and (eventually) its returncode from status_r. - The calling process should write to data_w the pickled preparation and - process data. - ''' - if len(fds) + 4 >= MAXFDS_TO_SEND: - raise ValueError('too many fds') - with socket.socket(socket.AF_UNIX) as client: - client.connect(_forkserver_address) - parent_r, child_w = os.pipe() - child_r, parent_w = os.pipe() - allfds = [child_r, child_w, _forkserver_alive_fd, - semaphore_tracker._semaphore_tracker_fd] - allfds += fds - try: - reduction.sendfds(client, allfds) - return parent_r, parent_w - except: - os.close(parent_r) - os.close(parent_w) - raise - finally: - os.close(child_r) - os.close(child_w) - - -def ensure_running(): - '''Make sure that a fork server is running. - - This can be called from any process. Note that usually a child - process will just reuse the forkserver started by its parent, so - ensure_running() will do nothing. - ''' - global _forkserver_address, _forkserver_alive_fd - with _lock: - if _forkserver_alive_fd is not None: - return - - assert all(type(mod) is str for mod in _preload_modules) - cmd = ('from multiprocessing.forkserver import main; ' + - 'main(%d, %d, %r, **%r)') - - if _preload_modules: - desired_keys = {'main_path', 'sys_path'} - data = spawn.get_preparation_data('ignore') - data = dict((x,y) for (x,y) in data.items() if x in desired_keys) - else: - data = {} - - with socket.socket(socket.AF_UNIX) as listener: - address = connection.arbitrary_address('AF_UNIX') - listener.bind(address) - os.chmod(address, 0o600) - listener.listen(100) - - # all client processes own the write end of the "alive" pipe; - # when they all terminate the read end becomes ready. - alive_r, alive_w = os.pipe() +class ForkServer(object): + + def __init__(self): + self._forkserver_address = None + self._forkserver_alive_fd = None + self._inherited_fds = None + self._lock = threading.Lock() + self._preload_modules = ['__main__'] + + def set_forkserver_preload(self, modules_names): + '''Set list of module names to try to load in forkserver process.''' + if not all(type(mod) is str for mod in self._preload_modules): + raise TypeError('module_names must be a list of strings') + self._preload_modules = modules_names + + def get_inherited_fds(self): + '''Return list of fds inherited from parent process. + + This returns None if the current process was not started by fork + server. + ''' + return self._inherited_fds + + def connect_to_new_process(self, fds): + '''Request forkserver to create a child process. + + Returns a pair of fds (status_r, data_w). The calling process can read + the child process's pid and (eventually) its returncode from status_r. + The calling process should write to data_w the pickled preparation and + process data. + ''' + self.ensure_running() + if len(fds) + 4 >= MAXFDS_TO_SEND: + raise ValueError('too many fds') + with socket.socket(socket.AF_UNIX) as client: + client.connect(self._forkserver_address) + parent_r, child_w = os.pipe() + child_r, parent_w = os.pipe() + allfds = [child_r, child_w, self._forkserver_alive_fd, + semaphore_tracker.getfd()] + allfds += fds try: - fds_to_pass = [listener.fileno(), alive_r] - cmd %= (listener.fileno(), alive_r, _preload_modules, data) - exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd] - pid = util.spawnv_passfds(exe, args, fds_to_pass) + reduction.sendfds(client, allfds) + return parent_r, parent_w except: - os.close(alive_w) + os.close(parent_r) + os.close(parent_w) raise finally: - os.close(alive_r) - _forkserver_address = address - _forkserver_alive_fd = alive_w + os.close(child_r) + os.close(child_w) + + def ensure_running(self): + '''Make sure that a fork server is running. + + This can be called from any process. Note that usually a child + process will just reuse the forkserver started by its parent, so + ensure_running() will do nothing. + ''' + with self._lock: + semaphore_tracker.ensure_running() + if self._forkserver_alive_fd is not None: + return + + cmd = ('from multiprocessing.forkserver import main; ' + + 'main(%d, %d, %r, **%r)') + + if self._preload_modules: + desired_keys = {'main_path', 'sys_path'} + data = spawn.get_preparation_data('ignore') + data = dict((x,y) for (x,y) in data.items() + if x in desired_keys) + else: + data = {} + + with socket.socket(socket.AF_UNIX) as listener: + address = connection.arbitrary_address('AF_UNIX') + listener.bind(address) + os.chmod(address, 0o600) + listener.listen(100) + + # all client processes own the write end of the "alive" pipe; + # when they all terminate the read end becomes ready. + alive_r, alive_w = os.pipe() + try: + fds_to_pass = [listener.fileno(), alive_r] + cmd %= (listener.fileno(), alive_r, self._preload_modules, + data) + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + args += ['-c', cmd] + pid = util.spawnv_passfds(exe, args, fds_to_pass) + except: + os.close(alive_w) + raise + finally: + os.close(alive_r) + self._forkserver_address = address + self._forkserver_alive_fd = alive_w +# +# +# def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): '''Run forkserver.''' @@ -151,8 +159,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN) with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \ selectors.DefaultSelector() as selector: - global _forkserver_address - _forkserver_address = listener.getsockname() + _forkserver._forkserver_address = listener.getsockname() selector.register(listener, selectors.EVENT_READ) selector.register(alive_r, selectors.EVENT_READ) @@ -187,13 +194,7 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): if e.errno != errno.ECONNABORTED: raise -# -# Code to bootstrap new process -# - def _serve_one(s, listener, alive_r, handler): - global _inherited_fds, _forkserver_alive_fd - # close unnecessary stuff and reset SIGCHLD handler listener.close() os.close(alive_r) @@ -203,8 +204,9 @@ def _serve_one(s, listener, alive_r, handler): fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) s.close() assert len(fds) <= MAXFDS_TO_SEND - child_r, child_w, _forkserver_alive_fd, stfd, *_inherited_fds = fds - semaphore_tracker._semaphore_tracker_fd = stfd + (child_r, child_w, _forkserver._forkserver_alive_fd, + stfd, *_forkserver._inherited_fds) = fds + semaphore_tracker._semaphore_tracker._fd = stfd # send pid to client processes write_unsigned(child_w, os.getpid()) @@ -253,3 +255,13 @@ def write_unsigned(fd, n): if nbytes == 0: raise RuntimeError('should not get here') msg = msg[nbytes:] + +# +# +# + +_forkserver = ForkServer() +ensure_running = _forkserver.ensure_running +get_inherited_fds = _forkserver.get_inherited_fds +connect_to_new_process = _forkserver.connect_to_new_process +set_forkserver_preload = _forkserver.set_forkserver_preload diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index b95f90f..98bfdc8 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -16,7 +16,7 @@ import tempfile import threading import _multiprocessing -from . import popen +from . import context from . import reduction from . import util @@ -50,7 +50,7 @@ if sys.platform == 'win32': self._state = (self.size, self.name) def __getstate__(self): - popen.assert_spawning(self) + context.assert_spawning(self) return self._state def __setstate__(self, state): diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index f580e9e..cc87d36 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -23,11 +23,12 @@ from time import time as _time from traceback import format_exc from . import connection +from . import context from . import pool from . import process -from . import popen from . import reduction from . import util +from . import get_context # # Register some things for pickling @@ -438,7 +439,8 @@ class BaseManager(object): _registry = {} _Server = Server - def __init__(self, address=None, authkey=None, serializer='pickle'): + def __init__(self, address=None, authkey=None, serializer='pickle', + ctx=None): if authkey is None: authkey = process.current_process().authkey self._address = address # XXX not final address if eg ('', 0) @@ -447,6 +449,7 @@ class BaseManager(object): self._state.value = State.INITIAL self._serializer = serializer self._Listener, self._Client = listener_client[serializer] + self._ctx = ctx or get_context() def get_server(self): ''' @@ -478,7 +481,7 @@ class BaseManager(object): reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server - self._process = process.Process( + self._process = self._ctx.Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), @@ -800,7 +803,7 @@ class BaseProxy(object): def __reduce__(self): kwds = {} - if popen.get_spawning_popen() is not None: + if context.get_spawning_popen() is not None: kwds['authkey'] = self._authkey if getattr(self, '_isauto', False): diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 1cecd09..4be00d5 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -24,7 +24,7 @@ import traceback # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. from . import util -from . import Process, cpu_count, TimeoutError, SimpleQueue +from . import get_context, cpu_count, TimeoutError # # Constants representing the state of a pool @@ -137,10 +137,12 @@ class Pool(object): ''' Class which supports an async version of applying functions to arguments. ''' - Process = Process + def Process(self, *args, **kwds): + return self._ctx.Process(*args, **kwds) def __init__(self, processes=None, initializer=None, initargs=(), - maxtasksperchild=None): + maxtasksperchild=None, context=None): + self._ctx = context or get_context() self._setup_queues() self._taskqueue = queue.Queue() self._cache = {} @@ -232,8 +234,8 @@ class Pool(object): self._repopulate_pool() def _setup_queues(self): - self._inqueue = SimpleQueue() - self._outqueue = SimpleQueue() + self._inqueue = self._ctx.SimpleQueue() + self._outqueue = self._ctx.SimpleQueue() self._quick_put = self._inqueue._writer.send self._quick_get = self._outqueue._reader.recv diff --git a/Lib/multiprocessing/popen.py b/Lib/multiprocessing/popen.py deleted file mode 100644 index b0c80d5..0000000 --- a/Lib/multiprocessing/popen.py +++ /dev/null @@ -1,78 +0,0 @@ -import sys -import threading - -__all__ = ['Popen', 'get_spawning_popen', 'set_spawning_popen', - 'assert_spawning'] - -# -# Check that the current thread is spawning a child process -# - -_tls = threading.local() - -def get_spawning_popen(): - return getattr(_tls, 'spawning_popen', None) - -def set_spawning_popen(popen): - _tls.spawning_popen = popen - -def assert_spawning(obj): - if get_spawning_popen() is None: - raise RuntimeError( - '%s objects should only be shared between processes' - ' through inheritance' % type(obj).__name__ - ) - -# -# -# - -_Popen = None - -def Popen(process_obj): - if _Popen is None: - set_start_method() - return _Popen(process_obj) - -def get_start_method(): - if _Popen is None: - set_start_method() - return _Popen.method - -def set_start_method(meth=None, *, start_helpers=True): - global _Popen - try: - modname = _method_to_module[meth] - __import__(modname) - except (KeyError, ImportError): - raise ValueError('could not use start method %r' % meth) - module = sys.modules[modname] - if start_helpers: - module.Popen.ensure_helpers_running() - _Popen = module.Popen - - -if sys.platform == 'win32': - - _method_to_module = { - None: 'multiprocessing.popen_spawn_win32', - 'spawn': 'multiprocessing.popen_spawn_win32', - } - - def get_all_start_methods(): - return ['spawn'] - -else: - _method_to_module = { - None: 'multiprocessing.popen_fork', - 'fork': 'multiprocessing.popen_fork', - 'spawn': 'multiprocessing.popen_spawn_posix', - 'forkserver': 'multiprocessing.popen_forkserver', - } - - def get_all_start_methods(): - from . import reduction - if reduction.HAVE_SEND_HANDLE: - return ['fork', 'spawn', 'forkserver'] - else: - return ['fork', 'spawn'] diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index c9f3aae..463cc18 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -81,7 +81,3 @@ class Popen(object): os.close(child_w) util.Finalize(self, os.close, (parent_r,)) self.sentinel = parent_r - - @staticmethod - def ensure_helpers_running(): - pass diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py index f1c4b57..b115f81 100644 --- a/Lib/multiprocessing/popen_forkserver.py +++ b/Lib/multiprocessing/popen_forkserver.py @@ -4,8 +4,8 @@ import os from . import reduction if not reduction.HAVE_SEND_HANDLE: raise ImportError('No support for sending fds between processes') +from . import context from . import forkserver -from . import popen from . import popen_fork from . import spawn from . import util @@ -42,12 +42,12 @@ class Popen(popen_fork.Popen): def _launch(self, process_obj): prep_data = spawn.get_preparation_data(process_obj._name) buf = io.BytesIO() - popen.set_spawning_popen(self) + context.set_spawning_popen(self) try: reduction.dump(prep_data, buf) reduction.dump(process_obj, buf) finally: - popen.set_spawning_popen(None) + context.set_spawning_popen(None) self.sentinel, w = forkserver.connect_to_new_process(self._fds) util.Finalize(self, os.close, (self.sentinel,)) @@ -67,9 +67,3 @@ class Popen(popen_fork.Popen): # The process ended abnormally perhaps because of a signal self.returncode = 255 return self.returncode - - @staticmethod - def ensure_helpers_running(): - from . import semaphore_tracker - semaphore_tracker.ensure_running() - forkserver.ensure_running() diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 751bf22..8b5dc42 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -2,7 +2,7 @@ import fcntl import io import os -from . import popen +from . import context from . import popen_fork from . import reduction from . import spawn @@ -41,16 +41,16 @@ class Popen(popen_fork.Popen): def _launch(self, process_obj): from . import semaphore_tracker - tracker_fd = semaphore_tracker._semaphore_tracker_fd + tracker_fd = semaphore_tracker.getfd() self._fds.append(tracker_fd) prep_data = spawn.get_preparation_data(process_obj._name) fp = io.BytesIO() - popen.set_spawning_popen(self) + context.set_spawning_popen(self) try: reduction.dump(prep_data, fp) reduction.dump(process_obj, fp) finally: - popen.set_spawning_popen(None) + context.set_spawning_popen(None) parent_r = child_w = child_r = parent_w = None try: @@ -70,8 +70,3 @@ class Popen(popen_fork.Popen): for fd in (child_r, child_w, parent_w): if fd is not None: os.close(fd) - - @staticmethod - def ensure_helpers_running(): - from . import semaphore_tracker - semaphore_tracker.ensure_running() diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py index f1e9aae..3b53068 100644 --- a/Lib/multiprocessing/popen_spawn_win32.py +++ b/Lib/multiprocessing/popen_spawn_win32.py @@ -4,8 +4,8 @@ import signal import sys import _winapi +from . import context from . import spawn -from . import popen from . import reduction from . import util @@ -60,15 +60,15 @@ class Popen(object): util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) # send information to child - popen.set_spawning_popen(self) + context.set_spawning_popen(self) try: reduction.dump(prep_data, to_child) reduction.dump(process_obj, to_child) finally: - popen.set_spawning_popen(None) + context.set_spawning_popen(None) def duplicate_for_child(self, handle): - assert self is popen.get_spawning_popen() + assert self is context.get_spawning_popen() return reduction.duplicate(handle, self.sentinel) def wait(self, timeout=None): @@ -97,7 +97,3 @@ class Popen(object): except OSError: if self.wait(timeout=1.0) is None: raise - - @staticmethod - def ensure_helpers_running(): - pass diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index c1cb36f..c2fc581 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -7,7 +7,7 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = ['Process', 'current_process', 'active_children'] +__all__ = ['BaseProcess', 'current_process', 'active_children'] # # Imports @@ -59,13 +59,14 @@ def _cleanup(): # The `Process` class # -class Process(object): +class BaseProcess(object): ''' Process objects represent activity that is run in a separate process The class is analogous to `threading.Thread` ''' - _Popen = None + def _Popen(self): + raise NotImplementedError def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None): @@ -101,11 +102,7 @@ class Process(object): assert not _current_process._config.get('daemon'), \ 'daemonic processes are not allowed to have children' _cleanup() - if self._Popen is not None: - Popen = self._Popen - else: - from .popen import Popen - self._popen = Popen(self) + self._popen = self._Popen(self) self._sentinel = self._popen.sentinel _children.add(self) @@ -229,10 +226,12 @@ class Process(object): ## def _bootstrap(self): - from . import util + from . import util, context global _current_process, _process_counter, _children try: + if self._start_method is not None: + context._force_start_method(self._start_method) _process_counter = itertools.count(1) _children = set() if sys.stdin is not None: @@ -282,7 +281,7 @@ class Process(object): class AuthenticationString(bytes): def __reduce__(self): - from .popen import get_spawning_popen + from .context import get_spawning_popen if get_spawning_popen() is None: raise TypeError( 'Pickling an AuthenticationString object is ' @@ -294,7 +293,7 @@ class AuthenticationString(bytes): # Create object representing the main process # -class _MainProcess(Process): +class _MainProcess(BaseProcess): def __init__(self): self._identity = () diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 10e40a5..f650771 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -22,8 +22,7 @@ from queue import Empty, Full import _multiprocessing from . import connection -from . import popen -from . import synchronize +from . import context from .util import debug, info, Finalize, register_after_fork, is_exiting from .reduction import ForkingPickler @@ -34,18 +33,18 @@ from .reduction import ForkingPickler class Queue(object): - def __init__(self, maxsize=0): + def __init__(self, maxsize=0, *, ctx): if maxsize <= 0: maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX self._maxsize = maxsize self._reader, self._writer = connection.Pipe(duplex=False) - self._rlock = synchronize.Lock() + self._rlock = ctx.Lock() self._opid = os.getpid() if sys.platform == 'win32': self._wlock = None else: - self._wlock = synchronize.Lock() - self._sem = synchronize.BoundedSemaphore(maxsize) + self._wlock = ctx.Lock() + self._sem = ctx.BoundedSemaphore(maxsize) # For use by concurrent.futures self._ignore_epipe = False @@ -55,7 +54,7 @@ class Queue(object): register_after_fork(self, Queue._after_fork) def __getstate__(self): - popen.assert_spawning(self) + context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) @@ -279,10 +278,10 @@ _sentinel = object() class JoinableQueue(Queue): - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - self._unfinished_tasks = synchronize.Semaphore(0) - self._cond = synchronize.Condition() + def __init__(self, maxsize=0, *, ctx): + Queue.__init__(self, maxsize, ctx=ctx) + self._unfinished_tasks = ctx.Semaphore(0) + self._cond = ctx.Condition() def __getstate__(self): return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) @@ -332,20 +331,20 @@ class JoinableQueue(Queue): class SimpleQueue(object): - def __init__(self): + def __init__(self, *, ctx): self._reader, self._writer = connection.Pipe(duplex=False) - self._rlock = synchronize.Lock() + self._rlock = ctx.Lock() self._poll = self._reader.poll if sys.platform == 'win32': self._wlock = None else: - self._wlock = synchronize.Lock() + self._wlock = ctx.Lock() def empty(self): return not self._poll() def __getstate__(self): - popen.assert_spawning(self) + context.assert_spawning(self) return (self._reader, self._writer, self._rlock, self._wlock) def __setstate__(self, state): diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 5bbbcf4..01e6de2 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -15,7 +15,7 @@ import pickle import socket import sys -from . import popen +from . import context from . import util __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] @@ -183,7 +183,7 @@ else: def DupFd(fd): '''Return a wrapper for an fd.''' - popen_obj = popen.get_spawning_popen() + popen_obj = context.get_spawning_popen() if popen_obj is not None: return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) elif HAVE_SEND_HANDLE: diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 6c9e4a5..ddb2b52 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -26,60 +26,70 @@ from . import current_process __all__ = ['ensure_running', 'register', 'unregister'] -_semaphore_tracker_fd = None -_lock = threading.Lock() +class SemaphoreTracker(object): + def __init__(self): + self._lock = threading.Lock() + self._fd = None -def ensure_running(): - '''Make sure that semaphore tracker process is running. + def getfd(self): + self.ensure_running() + return self._fd - This can be run from any process. Usually a child process will use - the semaphore created by its parent.''' - global _semaphore_tracker_fd - with _lock: - if _semaphore_tracker_fd is not None: - return - fds_to_pass = [] - try: - fds_to_pass.append(sys.stderr.fileno()) - except Exception: - pass - cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)' - r, w = os.pipe() - try: - fds_to_pass.append(r) - # process will out live us, so no need to wait on pid - exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() - args += ['-c', cmd % r] - util.spawnv_passfds(exe, args, fds_to_pass) - except: - os.close(w) - raise - else: - _semaphore_tracker_fd = w - finally: - os.close(r) - - -def register(name): - '''Register name of semaphore with semaphore tracker.''' - _send('REGISTER', name) - - -def unregister(name): - '''Unregister name of semaphore with semaphore tracker.''' - _send('UNREGISTER', name) - - -def _send(cmd, name): - msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') - if len(name) > 512: - # posix guarantees that writes to a pipe of less than PIPE_BUF - # bytes are atomic, and that PIPE_BUF >= 512 - raise ValueError('name too long') - nbytes = os.write(_semaphore_tracker_fd, msg) - assert nbytes == len(msg) + def ensure_running(self): + '''Make sure that semaphore tracker process is running. + + This can be run from any process. Usually a child process will use + the semaphore created by its parent.''' + with self._lock: + if self._fd is not None: + return + fds_to_pass = [] + try: + fds_to_pass.append(sys.stderr.fileno()) + except Exception: + pass + cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)' + r, w = os.pipe() + try: + fds_to_pass.append(r) + # process will out live us, so no need to wait on pid + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + args += ['-c', cmd % r] + util.spawnv_passfds(exe, args, fds_to_pass) + except: + os.close(w) + raise + else: + self._fd = w + finally: + os.close(r) + + def register(self, name): + '''Register name of semaphore with semaphore tracker.''' + self._send('REGISTER', name) + + def unregister(self, name): + '''Unregister name of semaphore with semaphore tracker.''' + self._send('UNREGISTER', name) + + def _send(self, cmd, name): + self.ensure_running() + msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') + if len(name) > 512: + # posix guarantees that writes to a pipe of less than PIPE_BUF + # bytes are atomic, and that PIPE_BUF >= 512 + raise ValueError('name too long') + nbytes = os.write(self._fd, msg) + assert nbytes == len(msg) + + +_semaphore_tracker = SemaphoreTracker() +ensure_running = _semaphore_tracker.ensure_running +register = _semaphore_tracker.register +unregister = _semaphore_tracker.unregister +getfd = _semaphore_tracker.getfd def main(fd): diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py index a97dadf..0c17825 100644 --- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -11,10 +11,10 @@ import ctypes import weakref from . import heap +from . import get_context -from .synchronize import RLock +from .context import assert_spawning from .reduction import ForkingPickler -from .popen import assert_spawning __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized'] @@ -66,7 +66,7 @@ def RawArray(typecode_or_type, size_or_initializer): result.__init__(*size_or_initializer) return result -def Value(typecode_or_type, *args, lock=True): +def Value(typecode_or_type, *args, lock=True, ctx=None): ''' Return a synchronization wrapper for a Value ''' @@ -74,12 +74,13 @@ def Value(typecode_or_type, *args, lock=True): if lock is False: return obj if lock in (True, None): - lock = RLock() + ctx = ctx or get_context() + lock = ctx.RLock() if not hasattr(lock, 'acquire'): raise AttributeError("'%r' has no method 'acquire'" % lock) - return synchronized(obj, lock) + return synchronized(obj, lock, ctx=ctx) -def Array(typecode_or_type, size_or_initializer, *, lock=True): +def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None): ''' Return a synchronization wrapper for a RawArray ''' @@ -87,25 +88,27 @@ def Array(typecode_or_type, size_or_initializer, *, lock=True): if lock is False: return obj if lock in (True, None): - lock = RLock() + ctx = ctx or get_context() + lock = ctx.RLock() if not hasattr(lock, 'acquire'): raise AttributeError("'%r' has no method 'acquire'" % lock) - return synchronized(obj, lock) + return synchronized(obj, lock, ctx=ctx) def copy(obj): new_obj = _new_value(type(obj)) ctypes.pointer(new_obj)[0] = obj return new_obj -def synchronized(obj, lock=None): +def synchronized(obj, lock=None, ctx=None): assert not isinstance(obj, SynchronizedBase), 'object already synchronized' + ctx = ctx or get_context() if isinstance(obj, ctypes._SimpleCData): - return Synchronized(obj, lock) + return Synchronized(obj, lock, ctx) elif isinstance(obj, ctypes.Array): if obj._type_ is ctypes.c_char: - return SynchronizedString(obj, lock) - return SynchronizedArray(obj, lock) + return SynchronizedString(obj, lock, ctx) + return SynchronizedArray(obj, lock, ctx) else: cls = type(obj) try: @@ -115,7 +118,7 @@ def synchronized(obj, lock=None): d = dict((name, make_property(name)) for name in names) classname = 'Synchronized' + cls.__name__ scls = class_cache[cls] = type(classname, (SynchronizedBase,), d) - return scls(obj, lock) + return scls(obj, lock, ctx) # # Functions for pickling/unpickling @@ -175,9 +178,13 @@ class_cache = weakref.WeakKeyDictionary() class SynchronizedBase(object): - def __init__(self, obj, lock=None): + def __init__(self, obj, lock=None, ctx=None): self._obj = obj - self._lock = lock or RLock() + if lock: + self._lock = lock + else: + ctx = ctx or get_context(force=True) + self._lock = ctx.RLock() self.acquire = self._lock.acquire self.release = self._lock.release diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 9c4acee..8167454 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -12,9 +12,9 @@ import os import pickle import sys +from . import get_start_method, set_start_method from . import process from . import util -from . import popen __all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable', 'get_preparation_data', 'get_command_line', 'import_main_path'] @@ -91,7 +91,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) else: from . import semaphore_tracker - semaphore_tracker._semaphore_tracker_fd = tracker_fd + semaphore_tracker._semaphore_tracker._fd = tracker_fd fd = pipe_handle exitcode = _main(fd) sys.exit(exitcode) @@ -154,7 +154,7 @@ def get_preparation_data(name): sys_argv=sys.argv, orig_dir=process.ORIGINAL_DIR, dir=os.getcwd(), - start_method=popen.get_start_method(), + start_method=get_start_method(), ) if sys.platform != 'win32' or (not WINEXE and not WINSERVICE): @@ -204,7 +204,7 @@ def prepare(data): process.ORIGINAL_DIR = data['orig_dir'] if 'start_method' in data: - popen.set_start_method(data['start_method'], start_helpers=False) + set_start_method(data['start_method']) if 'main_path' in data: import_main_path(data['main_path']) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 09736ef..82c30a2 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -20,7 +20,7 @@ import _multiprocessing from time import time as _time -from . import popen +from . import context from . import process from . import util @@ -50,14 +50,15 @@ class SemLock(object): _rand = tempfile._RandomNameSequence() - def __init__(self, kind, value, maxvalue): - unlink_immediately = (sys.platform == 'win32' or - popen.get_start_method() == 'fork') + def __init__(self, kind, value, maxvalue, *, ctx): + ctx = ctx or get_context() + ctx = ctx.get_context() + unlink_now = sys.platform == 'win32' or ctx._name == 'fork' for i in range(100): try: sl = self._semlock = _multiprocessing.SemLock( kind, value, maxvalue, self._make_name(), - unlink_immediately) + unlink_now) except FileExistsError: pass else: @@ -99,10 +100,10 @@ class SemLock(object): return self._semlock.__exit__(*args) def __getstate__(self): - popen.assert_spawning(self) + context.assert_spawning(self) sl = self._semlock if sys.platform == 'win32': - h = popen.get_spawning_popen().duplicate_for_child(sl.handle) + h = context.get_spawning_popen().duplicate_for_child(sl.handle) else: h = sl.handle return (h, sl.kind, sl.maxvalue, sl.name) @@ -123,8 +124,8 @@ class SemLock(object): class Semaphore(SemLock): - def __init__(self, value=1): - SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) + def __init__(self, value=1, *, ctx): + SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx) def get_value(self): return self._semlock._get_value() @@ -142,8 +143,8 @@ class Semaphore(SemLock): class BoundedSemaphore(Semaphore): - def __init__(self, value=1): - SemLock.__init__(self, SEMAPHORE, value, value) + def __init__(self, value=1, *, ctx): + SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx) def __repr__(self): try: @@ -159,8 +160,8 @@ class BoundedSemaphore(Semaphore): class Lock(SemLock): - def __init__(self): - SemLock.__init__(self, SEMAPHORE, 1, 1) + def __init__(self, *, ctx): + SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) def __repr__(self): try: @@ -184,8 +185,8 @@ class Lock(SemLock): class RLock(SemLock): - def __init__(self): - SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) + def __init__(self, *, ctx): + SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx) def __repr__(self): try: @@ -210,15 +211,15 @@ class RLock(SemLock): class Condition(object): - def __init__(self, lock=None): - self._lock = lock or RLock() - self._sleeping_count = Semaphore(0) - self._woken_count = Semaphore(0) - self._wait_semaphore = Semaphore(0) + def __init__(self, lock=None, *, ctx): + self._lock = lock or ctx.RLock() + self._sleeping_count = ctx.Semaphore(0) + self._woken_count = ctx.Semaphore(0) + self._wait_semaphore = ctx.Semaphore(0) self._make_methods() def __getstate__(self): - popen.assert_spawning(self) + context.assert_spawning(self) return (self._lock, self._sleeping_count, self._woken_count, self._wait_semaphore) @@ -332,9 +333,9 @@ class Condition(object): class Event(object): - def __init__(self): - self._cond = Condition(Lock()) - self._flag = Semaphore(0) + def __init__(self, *, ctx): + self._cond = ctx.Condition(ctx.Lock()) + self._flag = ctx.Semaphore(0) def is_set(self): self._cond.acquire() @@ -383,11 +384,11 @@ class Event(object): class Barrier(threading.Barrier): - def __init__(self, parties, action=None, timeout=None): + def __init__(self, parties, action=None, timeout=None, *, ctx): import struct from .heap import BufferWrapper wrapper = BufferWrapper(struct.calcsize('i') * 2) - cond = Condition() + cond = ctx.Condition() self.__setstate__((parties, action, timeout, cond, wrapper)) self._state = 0 self._count = 0 diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 40d8d52..910cfa9 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3555,6 +3555,32 @@ class TestIgnoreEINTR(unittest.TestCase): conn.close() class TestStartMethod(unittest.TestCase): + @classmethod + def _check_context(cls, conn): + conn.send(multiprocessing.get_start_method()) + + def check_context(self, ctx): + r, w = ctx.Pipe(duplex=False) + p = ctx.Process(target=self._check_context, args=(w,)) + p.start() + w.close() + child_method = r.recv() + r.close() + p.join() + self.assertEqual(child_method, ctx.get_start_method()) + + def test_context(self): + for method in ('fork', 'spawn', 'forkserver'): + try: + ctx = multiprocessing.get_context(method) + except ValueError: + continue + self.assertEqual(ctx.get_start_method(), method) + self.assertIs(ctx.get_context(), ctx) + self.assertRaises(ValueError, ctx.set_start_method, 'spawn') + self.assertRaises(ValueError, ctx.set_start_method, None) + self.check_context(ctx) + def test_set_get(self): multiprocessing.set_forkserver_preload(PRELOAD) count = 0 @@ -3562,13 +3588,19 @@ class TestStartMethod(unittest.TestCase): try: for method in ('fork', 'spawn', 'forkserver'): try: - multiprocessing.set_start_method(method) + multiprocessing.set_start_method(method, force=True) except ValueError: continue self.assertEqual(multiprocessing.get_start_method(), method) + ctx = multiprocessing.get_context() + self.assertEqual(ctx.get_start_method(), method) + self.assertTrue(type(ctx).__name__.lower().startswith(method)) + self.assertTrue( + ctx.Process.__name__.lower().startswith(method)) + self.check_context(multiprocessing) count += 1 finally: - multiprocessing.set_start_method(old_method) + multiprocessing.set_start_method(old_method, force=True) self.assertGreaterEqual(count, 1) def test_get_all(self): @@ -3753,9 +3785,9 @@ def install_tests_in_module_dict(remote_globs, start_method): multiprocessing.process._cleanup() dangling[0] = multiprocessing.process._dangling.copy() dangling[1] = threading._dangling.copy() - old_start_method[0] = multiprocessing.get_start_method() + old_start_method[0] = multiprocessing.get_start_method(allow_none=True) try: - multiprocessing.set_start_method(start_method) + multiprocessing.set_start_method(start_method, force=True) except ValueError: raise unittest.SkipTest(start_method + ' start method not supported') @@ -3771,7 +3803,7 @@ def install_tests_in_module_dict(remote_globs, start_method): multiprocessing.get_logger().setLevel(LOG_LEVEL) def tearDownModule(): - multiprocessing.set_start_method(old_start_method[0]) + multiprocessing.set_start_method(old_start_method[0], force=True) # pause a bit so we don't get warning about dangling threads/processes time.sleep(0.5) multiprocessing.process._cleanup() |