summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2012-03-31 18:25:22 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2012-03-31 18:25:22 (GMT)
commit8b34b53c52ce604bc48b39f83089cd404201de1b (patch)
tree7f15c215d673520fb452b7652187448b8119f5f0 /Lib
parent6eeadf0a3efd964837e0a274bf202442facdb617 (diff)
parentf70401e842d120407c5450d0b89cece8616af8e4 (diff)
downloadcpython-8b34b53c52ce604bc48b39f83089cd404201de1b.zip
cpython-8b34b53c52ce604bc48b39f83089cd404201de1b.tar.gz
cpython-8b34b53c52ce604bc48b39f83089cd404201de1b.tar.bz2
Issue #14406: Fix a race condition when using `concurrent.futures.wait(return_when=ALL_COMPLETED)`.
Patch by Matt Joiner.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/concurrent/futures/_base.py8
-rw-r--r--Lib/test/test_concurrent_futures.py18
2 files changed, 22 insertions, 4 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index 3b097b5..88b5fbd 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -111,12 +111,14 @@ class _AllCompletedWaiter(_Waiter):
def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception
+ self.lock = threading.Lock()
super().__init__()
def _decrement_pending_calls(self):
- self.num_pending_calls -= 1
- if not self.num_pending_calls:
- self.event.set()
+ with self.lock:
+ self.num_pending_calls -= 1
+ if not self.num_pending_calls:
+ self.event.set()
def add_result(self, future):
super().add_result(future)
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 04ee246..6ae450d 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -183,7 +183,9 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
for p in processes.values():
p.join()
+
class WaitTests(unittest.TestCase):
+
def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)
@@ -284,7 +286,21 @@ class WaitTests(unittest.TestCase):
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
- pass
+
+ def test_pending_calls_race(self):
+ # Issue #14406: multi-threaded race condition when waiting on all
+ # futures.
+ event = threading.Event()
+ def future_func():
+ event.wait()
+ oldswitchinterval = sys.getswitchinterval()
+ sys.setswitchinterval(1e-6)
+ try:
+ fs = {self.executor.submit(future_func) for i in range(100)}
+ event.set()
+ futures.wait(fs, return_when=futures.ALL_COMPLETED)
+ finally:
+ sys.setswitchinterval(oldswitchinterval)
class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):