diff options
Diffstat (limited to 'Lib/concurrent/futures/_base.py')
-rw-r--r-- | Lib/concurrent/futures/_base.py | 41 |
1 files changed, 35 insertions, 6 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 8136d89..0bbb85b 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -12,6 +12,7 @@ import time FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' @@ -70,8 +71,30 @@ class _Waiter(object): def add_cancelled(self, future): self.finished_futures.append(future) +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" + """Used by wait(return_when=FIRST_COMPLETED).""" def add_result(self, future): super().add_result(future) @@ -128,7 +151,9 @@ class _AcquireFutures(object): future._condition.release() def _create_and_install_waiters(fs, return_when): - if return_when == FIRST_COMPLETED: + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: waiter = _FirstCompletedWaiter() else: pending_count = sum( @@ -171,7 +196,7 @@ def as_completed(fs, timeout=None): f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = set(fs) - finished - waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) try: for future in finished: @@ -187,11 +212,15 @@ def as_completed(fs, timeout=None): '%d (of %d) futures unfinished' % ( len(pending), len(fs))) - waiter.event.wait(timeout) + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() - for future in waiter.finished_futures[:]: + for future in finished: yield future - waiter.finished_futures.remove(future) pending.remove(future) finally: |