diff options
Diffstat (limited to 'Lib/multiprocessing/pool.py')
-rw-r--r-- | Lib/multiprocessing/pool.py | 27 |
1 files changed, 6 insertions, 21 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 8832a5c..75a76a4 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -87,7 +87,7 @@ class MaybeEncodingError(Exception): self.exc) def __repr__(self): - return "<MaybeEncodingError: %s>" % str(self) + return "<%s: %s>" % (self.__class__.__name__, self) def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, @@ -666,8 +666,7 @@ class IMapIterator(object): return self def next(self, timeout=None): - self._cond.acquire() - try: + with self._cond: try: item = self._items.popleft() except IndexError: @@ -680,8 +679,6 @@ class IMapIterator(object): if self._index == self._length: raise StopIteration raise TimeoutError - finally: - self._cond.release() success, value = item if success: @@ -691,8 +688,7 @@ class IMapIterator(object): __next__ = next # XXX def _set(self, i, obj): - self._cond.acquire() - try: + with self._cond: if self._index == i: self._items.append(obj) self._index += 1 @@ -706,18 +702,13 @@ class IMapIterator(object): if self._index == self._length: del self._cache[self._job] - finally: - self._cond.release() def _set_length(self, length): - self._cond.acquire() - try: + with self._cond: self._length = length if self._index == self._length: self._cond.notify() del self._cache[self._job] - finally: - self._cond.release() # # Class whose instances are returned by `Pool.imap_unordered()` @@ -726,15 +717,12 @@ class IMapIterator(object): class IMapUnorderedIterator(IMapIterator): def _set(self, i, obj): - self._cond.acquire() - try: + with self._cond: self._items.append(obj) self._index += 1 self._cond.notify() if self._index == self._length: del self._cache[self._job] - finally: - self._cond.release() # # @@ -760,10 +748,7 @@ class ThreadPool(Pool): @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # put sentinels at head of inqueue to make workers finish - inqueue.not_empty.acquire() - try: + with inqueue.not_empty: inqueue.queue.clear() inqueue.queue.extend([None] * size) inqueue.not_empty.notify_all() - finally: - inqueue.not_empty.release() |