diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2012-11-03 13:36:01 (GMT) |
---|---|---|
committer | Andrew Svetlov <andrew.svetlov@gmail.com> | 2012-11-03 13:36:01 (GMT) |
commit | 6b973747f3c5e31afd89f6a7d79bc9f3f1628710 (patch) | |
tree | 10127b702286a312c859d4bf9922716f8ad402bf /Lib | |
parent | 0f77bf27ca372655104884bf1e1ecaf014e8a61a (diff) | |
download | cpython-6b973747f3c5e31afd89f6a7d79bc9f3f1628710.zip cpython-6b973747f3c5e31afd89f6a7d79bc9f3f1628710.tar.gz cpython-6b973747f3c5e31afd89f6a7d79bc9f3f1628710.tar.bz2 |
Issue #16284: Prevent keeping unnecessary references to worker functions in concurrent.futures ThreadPoolExecutor.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/concurrent/futures/process.py | 4 | ||||
-rw-r--r-- | Lib/concurrent/futures/thread.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 4 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 22 |
4 files changed, 32 insertions, 0 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 94e0289..3c20b93 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -240,6 +240,8 @@ def _queue_management_worker(executor_reference, "terminated abruptly while the future was " "running or pending." )) + # Delete references to object. See issue16284 + del work_item pending_work_items.clear() # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. @@ -264,6 +266,8 @@ def _queue_management_worker(executor_reference, work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) + # Delete references to object. See issue16284 + del work_item # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 95bb682..f9beb0f 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -63,6 +63,8 @@ def _worker(executor_reference, work_queue): work_item = work_queue.get(block=True) if work_item is not None: work_item.run() + # Delete references to object. See issue16284 + del work_item continue executor = executor_reference() # Exit if: diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 37271fb..f6f02b6 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -243,10 +243,14 @@ class Queue(object): if wacquire is None: send(obj) + # Delete references to object. See issue16284 + del obj else: wacquire() try: send(obj) + # Delete references to object. See issue16284 + del obj finally: wrelease() except IndexError: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 6ae450d..4ad3309 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -15,6 +15,7 @@ import sys import threading import time import unittest +import weakref from concurrent import futures from concurrent.futures._base import ( @@ -52,6 +53,11 @@ def sleep_and_print(t, msg): sys.stdout.flush() +class MyObject(object): + def my_method(self): + pass + + class ExecutorMixin: worker_count = 5 @@ -396,6 +402,22 @@ class ExecutorTest(unittest.TestCase): self.executor.map(str, [2] * (self.worker_count + 1)) self.executor.shutdown() + @test.support.cpython_only + def test_no_stale_references(self): + # Issue #16284: check that the executors don't unnecessarily hang onto + # references. + my_object = MyObject() + my_object_collected = threading.Event() + my_object_callback = weakref.ref( + my_object, lambda obj: my_object_collected.set()) + # Deliberately discarding the future. + self.executor.submit(my_object.my_method) + del my_object + + collected = my_object_collected.wait(timeout=5.0) + self.assertTrue(collected, + "Stale reference not collected within timeout.") + class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): def test_map_submits_without_iteration(self): |