diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/pool.py | 15 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 44 |
2 files changed, 43 insertions, 16 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 9e07e32..ec57939 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -225,7 +225,6 @@ class Pool(object): Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ''' - assert self._state == RUN return self._map_async(func, iterable, mapstar, chunksize).get() def starmap(self, func, iterable, chunksize=None): @@ -234,7 +233,6 @@ class Pool(object): be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). ''' - assert self._state == RUN return self._map_async(func, iterable, starmapstar, chunksize).get() def starmap_async(self, func, iterable, chunksize=None, callback=None, @@ -242,7 +240,6 @@ class Pool(object): ''' Asynchronous version of `starmap()` method. ''' - assert self._state == RUN return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) @@ -250,7 +247,8 @@ class Pool(object): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -268,7 +266,8 @@ class Pool(object): ''' Like `imap()` method but ordering of results is arbitrary. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put((((result._job, i, func, (x,), {}) @@ -287,7 +286,8 @@ class Pool(object): ''' Asynchronous version of `apply()` method. ''' - assert self._state == RUN + if self._state != RUN: + raise ValueError("Pool not running") result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @@ -297,7 +297,6 @@ class Pool(object): ''' Asynchronous version of `map()` method. ''' - assert self._state == RUN return self._map_async(func, iterable, mapstar, chunksize) def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, @@ -305,6 +304,8 @@ class Pool(object): ''' Helper function to implement map, starmap and their async counterparts. ''' + if self._state != RUN: + raise ValueError("Pool not running") if not hasattr(iterable, '__len__'): iterable = list(iterable) diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 8a6aede..7495813 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -235,6 +235,12 @@ def _run_finalizers(minpriority=None): Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. ''' + if _finalizer_registry is None: + # This function may be called after this module's globals are + # destroyed. See the _exit_function function in this module for more + # notes. + return + if minpriority is None: f = lambda p : p[0][0] is not None else: @@ -266,7 +272,13 @@ def is_exiting(): _exiting = False -def _exit_function(): +def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, + active_children=active_children, + current_process=current_process): + # We hold on to references to functions in the arglist due to the + # situation described below, where this function is called after this + # module's globals are destroyed. + global _exiting if not _exiting: @@ -276,14 +288,28 @@ def _exit_function(): debug('running all "atexit" finalizers with priority >= 0') _run_finalizers(0) - for p in active_children(): - if p._daemonic: - info('calling terminate() for daemon %s', p.name) - p._popen.terminate() - - for p in active_children(): - info('calling join() for process %s', p.name) - p.join() + if current_process() is not None: + # We check if the current process is None here because if + # it's None, any call to ``active_children()`` will throw + # an AttributeError (active_children winds up trying to + # get attributes from util._current_process). One + # situation where this can happen is if someone has + # manipulated sys.modules, causing this module to be + # garbage collected. The destructor for the module type + # then replaces all values in the module dict with None. + # For instance, after setuptools runs a test it replaces + # sys.modules with a copy created earlier. See issues + # #9775 and #15881. Also related: #4106, #9205, and + # #9207. + + for p in active_children(): + if p._daemonic: + info('calling terminate() for daemon %s', p.name) + p._popen.terminate() + + for p in active_children(): + info('calling join() for process %s', p.name) + p.join() debug('running the remaining "atexit" finalizers') _run_finalizers() |