summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/pool.py15
-rw-r--r--Lib/multiprocessing/util.py44
2 files changed, 43 insertions, 16 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 9e07e32..ec57939 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -225,7 +225,6 @@ class Pool(object):
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
- assert self._state == RUN
return self._map_async(func, iterable, mapstar, chunksize).get()
def starmap(self, func, iterable, chunksize=None):
@@ -234,7 +233,6 @@ class Pool(object):
be iterables as well and will be unpacked as arguments. Hence
`func` and (a, b) becomes func(a, b).
'''
- assert self._state == RUN
return self._map_async(func, iterable, starmapstar, chunksize).get()
def starmap_async(self, func, iterable, chunksize=None, callback=None,
@@ -242,7 +240,6 @@ class Pool(object):
'''
Asynchronous version of `starmap()` method.
'''
- assert self._state == RUN
return self._map_async(func, iterable, starmapstar, chunksize,
callback, error_callback)
@@ -250,7 +247,8 @@ class Pool(object):
'''
Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
@@ -268,7 +266,8 @@ class Pool(object):
'''
Like `imap()` method but ordering of results is arbitrary.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
@@ -287,7 +286,8 @@ class Pool(object):
'''
Asynchronous version of `apply()` method.
'''
- assert self._state == RUN
+ if self._state != RUN:
+ raise ValueError("Pool not running")
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
@@ -297,7 +297,6 @@ class Pool(object):
'''
Asynchronous version of `map()` method.
'''
- assert self._state == RUN
return self._map_async(func, iterable, mapstar, chunksize)
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
@@ -305,6 +304,8 @@ class Pool(object):
'''
Helper function to implement map, starmap and their async counterparts.
'''
+ if self._state != RUN:
+ raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index 8a6aede..7495813 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -235,6 +235,12 @@ def _run_finalizers(minpriority=None):
Finalizers with highest priority are called first; finalizers with
the same priority will be called in reverse order of creation.
'''
+ if _finalizer_registry is None:
+ # This function may be called after this module's globals are
+ # destroyed. See the _exit_function function in this module for more
+ # notes.
+ return
+
if minpriority is None:
f = lambda p : p[0][0] is not None
else:
@@ -266,7 +272,13 @@ def is_exiting():
_exiting = False
-def _exit_function():
+def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
+ active_children=active_children,
+ current_process=current_process):
+ # We hold on to references to functions in the arglist due to the
+ # situation described below, where this function is called after this
+ # module's globals are destroyed.
+
global _exiting
if not _exiting:
@@ -276,14 +288,28 @@ def _exit_function():
debug('running all "atexit" finalizers with priority >= 0')
_run_finalizers(0)
- for p in active_children():
- if p._daemonic:
- info('calling terminate() for daemon %s', p.name)
- p._popen.terminate()
-
- for p in active_children():
- info('calling join() for process %s', p.name)
- p.join()
+ if current_process() is not None:
+ # We check if the current process is None here because if
+ # it's None, any call to ``active_children()`` will throw
+ # an AttributeError (active_children winds up trying to
+ # get attributes from util._current_process). One
+ # situation where this can happen is if someone has
+ # manipulated sys.modules, causing this module to be
+ # garbage collected. The destructor for the module type
+ # then replaces all values in the module dict with None.
+ # For instance, after setuptools runs a test it replaces
+ # sys.modules with a copy created earlier. See issues
+ # #9775 and #15881. Also related: #4106, #9205, and
+ # #9207.
+
+ for p in active_children():
+ if p._daemonic:
+ info('calling terminate() for daemon %s', p.name)
+ p._popen.terminate()
+
+ for p in active_children():
+ info('calling join() for process %s', p.name)
+ p.join()
debug('running the remaining "atexit" finalizers')
_run_finalizers()