summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/pool.py')
-rw-r--r--Lib/multiprocessing/pool.py27
1 files changed, 6 insertions, 21 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 8832a5c..75a76a4 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -87,7 +87,7 @@ class MaybeEncodingError(Exception):
self.exc)
def __repr__(self):
- return "<MaybeEncodingError: %s>" % str(self)
+ return "<%s: %s>" % (self.__class__.__name__, self)
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
@@ -666,8 +666,7 @@ class IMapIterator(object):
return self
def next(self, timeout=None):
- self._cond.acquire()
- try:
+ with self._cond:
try:
item = self._items.popleft()
except IndexError:
@@ -680,8 +679,6 @@ class IMapIterator(object):
if self._index == self._length:
raise StopIteration
raise TimeoutError
- finally:
- self._cond.release()
success, value = item
if success:
@@ -691,8 +688,7 @@ class IMapIterator(object):
__next__ = next # XXX
def _set(self, i, obj):
- self._cond.acquire()
- try:
+ with self._cond:
if self._index == i:
self._items.append(obj)
self._index += 1
@@ -706,18 +702,13 @@ class IMapIterator(object):
if self._index == self._length:
del self._cache[self._job]
- finally:
- self._cond.release()
def _set_length(self, length):
- self._cond.acquire()
- try:
+ with self._cond:
self._length = length
if self._index == self._length:
self._cond.notify()
del self._cache[self._job]
- finally:
- self._cond.release()
#
# Class whose instances are returned by `Pool.imap_unordered()`
@@ -726,15 +717,12 @@ class IMapIterator(object):
class IMapUnorderedIterator(IMapIterator):
def _set(self, i, obj):
- self._cond.acquire()
- try:
+ with self._cond:
self._items.append(obj)
self._index += 1
self._cond.notify()
if self._index == self._length:
del self._cache[self._job]
- finally:
- self._cond.release()
#
#
@@ -760,10 +748,7 @@ class ThreadPool(Pool):
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# put sentinels at head of inqueue to make workers finish
- inqueue.not_empty.acquire()
- try:
+ with inqueue.not_empty:
inqueue.queue.clear()
inqueue.queue.extend([None] * size)
inqueue.not_empty.notify_all()
- finally:
- inqueue.not_empty.release()