summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/concurrent/futures/process.py4
-rw-r--r--Lib/concurrent/futures/thread.py2
-rw-r--r--Lib/multiprocessing/queues.py4
-rw-r--r--Lib/test/test_concurrent_futures.py22
-rw-r--r--Misc/ACKS1
-rw-r--r--Misc/NEWS3
6 files changed, 36 insertions, 0 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 94e0289..3c20b93 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -240,6 +240,8 @@ def _queue_management_worker(executor_reference,
"terminated abruptly while the future was "
"running or pending."
))
+ # Delete references to object. See issue16284
+ del work_item
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
@@ -264,6 +266,8 @@ def _queue_management_worker(executor_reference,
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
+ # Delete references to object. See issue16284
+ del work_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 95bb682..f9beb0f 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -63,6 +63,8 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
+ # Delete references to object. See issue16284
+ del work_item
continue
executor = executor_reference()
# Exit if:
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 37271fb..f6f02b6 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -243,10 +243,14 @@ class Queue(object):
if wacquire is None:
send(obj)
+ # Delete references to object. See issue16284
+ del obj
else:
wacquire()
try:
send(obj)
+ # Delete references to object. See issue16284
+ del obj
finally:
wrelease()
except IndexError:
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 6ae450d..4ad3309 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -15,6 +15,7 @@ import sys
import threading
import time
import unittest
+import weakref
from concurrent import futures
from concurrent.futures._base import (
@@ -52,6 +53,11 @@ def sleep_and_print(t, msg):
sys.stdout.flush()
+class MyObject(object):
+ def my_method(self):
+ pass
+
+
class ExecutorMixin:
worker_count = 5
@@ -396,6 +402,22 @@ class ExecutorTest(unittest.TestCase):
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
+ @test.support.cpython_only
+ def test_no_stale_references(self):
+ # Issue #16284: check that the executors don't unnecessarily hang onto
+ # references.
+ my_object = MyObject()
+ my_object_collected = threading.Event()
+ my_object_callback = weakref.ref(
+ my_object, lambda obj: my_object_collected.set())
+ # Deliberately discarding the future.
+ self.executor.submit(my_object.my_method)
+ del my_object
+
+ collected = my_object_collected.wait(timeout=5.0)
+ self.assertTrue(collected,
+ "Stale reference not collected within timeout.")
+
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def test_map_submits_without_iteration(self):
diff --git a/Misc/ACKS b/Misc/ACKS
index 96d8178..bc5fc61 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -726,6 +726,7 @@ Lukas Lueg
Loren Luke
Fredrik Lundh
Mark Lutz
+Taras Lyapun
Jim Lynch
Mikael Lyngvig
Martin von Löwis
diff --git a/Misc/NEWS b/Misc/NEWS
index c6ba38b..5ca7e4d 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -88,6 +88,9 @@ Core and Builtins
Library
-------
+- Issue #16284: Prevent keeping unnecessary references to worker functions
+ in concurrent.futures ThreadPoolExecutor.
+
- Issue #1207589: Add Cut/Copy/Paste items to IDLE right click Context Menu
Patch by Todd Rovito.