summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/pool.py')
-rw-r--r--Lib/multiprocessing/pool.py28
1 files changed, 21 insertions, 7 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index c2364ab..e457f0a 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -92,7 +92,9 @@ class MaybeEncodingError(Exception):
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False):
- assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
+ if (maxtasks is not None) and not (isinstance(maxtasks, int)
+ and maxtasks >= 1):
+ raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
@@ -254,8 +256,8 @@ class Pool(object):
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
+ Pool must be running.
'''
- assert self._state == RUN
return self.apply_async(func, args, kwds).get()
def map(self, func, iterable, chunksize=None):
@@ -307,6 +309,10 @@ class Pool(object):
))
return result
else:
+ if chunksize < 1:
+ raise ValueError(
+ "Chunksize must be 1+, not {0:n}".format(
+ chunksize))
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
@@ -334,7 +340,9 @@ class Pool(object):
))
return result
else:
- assert chunksize > 1
+ if chunksize < 1:
+ raise ValueError(
+ "Chunksize must be 1+, not {0!r}".format(chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put(
@@ -466,7 +474,7 @@ class Pool(object):
return
if thread._state:
- assert thread._state == TERMINATE
+ assert thread._state == TERMINATE, "Thread not in TERMINATE"
util.debug('result handler found thread._state=TERMINATE')
break
@@ -542,7 +550,10 @@ class Pool(object):
def join(self):
util.debug('joining pool')
- assert self._state in (CLOSE, TERMINATE)
+ if self._state == RUN:
+ raise ValueError("Pool is still running")
+ elif self._state not in (CLOSE, TERMINATE):
+ raise ValueError("In unknown state")
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
@@ -570,7 +581,9 @@ class Pool(object):
util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
- assert result_handler.is_alive() or len(cache) == 0
+ if (not result_handler.is_alive()) and (len(cache) != 0):
+ raise AssertionError(
+ "Cannot have cache with result_hander not alive")
result_handler._state = TERMINATE
outqueue.put(None) # sentinel
@@ -628,7 +641,8 @@ class ApplyResult(object):
return self._event.is_set()
def successful(self):
- assert self.ready()
+ if not self.ready():
+ raise ValueError("{0!r} not ready".format(self))
return self._success
def wait(self, timeout=None):