summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2012-11-03 13:36:01 (GMT)
committerAndrew Svetlov <andrew.svetlov@gmail.com>2012-11-03 13:36:01 (GMT)
commit6b973747f3c5e31afd89f6a7d79bc9f3f1628710 (patch)
tree10127b702286a312c859d4bf9922716f8ad402bf /Lib
parent0f77bf27ca372655104884bf1e1ecaf014e8a61a (diff)
downloadcpython-6b973747f3c5e31afd89f6a7d79bc9f3f1628710.zip
cpython-6b973747f3c5e31afd89f6a7d79bc9f3f1628710.tar.gz
cpython-6b973747f3c5e31afd89f6a7d79bc9f3f1628710.tar.bz2
Issue #16284: Prevent keeping unnecessary references to worker functions in concurrent.futures ThreadPoolExecutor.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/concurrent/futures/process.py4
-rw-r--r--Lib/concurrent/futures/thread.py2
-rw-r--r--Lib/multiprocessing/queues.py4
-rw-r--r--Lib/test/test_concurrent_futures.py22
4 files changed, 32 insertions, 0 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 94e0289..3c20b93 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -240,6 +240,8 @@ def _queue_management_worker(executor_reference,
"terminated abruptly while the future was "
"running or pending."
))
+ # Delete references to object. See issue16284
+ del work_item
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
@@ -264,6 +266,8 @@ def _queue_management_worker(executor_reference,
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
+ # Delete references to object. See issue16284
+ del work_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 95bb682..f9beb0f 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -63,6 +63,8 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
+ # Delete references to object. See issue16284
+ del work_item
continue
executor = executor_reference()
# Exit if:
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 37271fb..f6f02b6 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -243,10 +243,14 @@ class Queue(object):
if wacquire is None:
send(obj)
+ # Delete references to object. See issue16284
+ del obj
else:
wacquire()
try:
send(obj)
+ # Delete references to object. See issue16284
+ del obj
finally:
wrelease()
except IndexError:
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 6ae450d..4ad3309 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -15,6 +15,7 @@ import sys
import threading
import time
import unittest
+import weakref
from concurrent import futures
from concurrent.futures._base import (
@@ -52,6 +53,11 @@ def sleep_and_print(t, msg):
sys.stdout.flush()
+class MyObject(object):
+ def my_method(self):
+ pass
+
+
class ExecutorMixin:
worker_count = 5
@@ -396,6 +402,22 @@ class ExecutorTest(unittest.TestCase):
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
+ @test.support.cpython_only
+ def test_no_stale_references(self):
+ # Issue #16284: check that the executors don't unnecessarily hang onto
+ # references.
+ my_object = MyObject()
+ my_object_collected = threading.Event()
+ my_object_callback = weakref.ref(
+ my_object, lambda obj: my_object_collected.set())
+ # Deliberately discarding the future.
+ self.executor.submit(my_object.my_method)
+ del my_object
+
+ collected = my_object_collected.wait(timeout=5.0)
+ self.assertTrue(collected,
+ "Stale reference not collected within timeout.")
+
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
def test_map_submits_without_iteration(self):