diff options
Diffstat (limited to 'Lib/multiprocessing/pool.py')
-rw-r--r-- | Lib/multiprocessing/pool.py | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index c2364ab..e457f0a 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -92,7 +92,9 @@ class MaybeEncodingError(Exception): def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, wrap_exception=False): - assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) + if (maxtasks is not None) and not (isinstance(maxtasks, int) + and maxtasks >= 1): + raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): @@ -254,8 +256,8 @@ class Pool(object): def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. + Pool must be running. ''' - assert self._state == RUN return self.apply_async(func, args, kwds).get() def map(self, func, iterable, chunksize=None): @@ -307,6 +309,10 @@ class Pool(object): )) return result else: + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0:n}".format( + chunksize)) assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapIterator(self._cache) @@ -334,7 +340,9 @@ class Pool(object): )) return result else: - assert chunksize > 1 + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0!r}".format(chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapUnorderedIterator(self._cache) self._taskqueue.put( @@ -466,7 +474,7 @@ class Pool(object): return if thread._state: - assert thread._state == TERMINATE + assert thread._state == TERMINATE, "Thread not in TERMINATE" util.debug('result handler found thread._state=TERMINATE') break @@ -542,7 +550,10 @@ class Pool(object): def join(self): util.debug('joining pool') - assert self._state in (CLOSE, TERMINATE) + if self._state == RUN: + raise ValueError("Pool is still running") + elif self._state not in (CLOSE, TERMINATE): + raise ValueError("In unknown state") self._worker_handler.join() self._task_handler.join() self._result_handler.join() @@ -570,7 +581,9 @@ class Pool(object): util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) - assert result_handler.is_alive() or len(cache) == 0 + if (not result_handler.is_alive()) and (len(cache) != 0): + raise AssertionError( + "Cannot have cache with result_hander not alive") result_handler._state = TERMINATE outqueue.put(None) # sentinel @@ -628,7 +641,8 @@ class ApplyResult(object): return self._event.is_set() def successful(self): - assert self.ready() + if not self.ready(): + raise ValueError("{0!r} not ready".format(self)) return self._success def wait(self, timeout=None): |