diff options
author | Brian Quinlan <brian@sweetapp.com> | 2011-03-20 02:11:11 (GMT) |
---|---|---|
committer | Brian Quinlan <brian@sweetapp.com> | 2011-03-20 02:11:11 (GMT) |
commit | 142fad4b6b1f33bb4620e7aec29ea1dfd8cc8fdb (patch) | |
tree | 3eac96dde6c10f9bdfefaf0a14fe38ca9e413492 | |
parent | 833d91204fe864fea881a2cdc292e792095f2d52 (diff) | |
download | cpython-142fad4b6b1f33bb4620e7aec29ea1dfd8cc8fdb.zip cpython-142fad4b6b1f33bb4620e7aec29ea1dfd8cc8fdb.tar.gz cpython-142fad4b6b1f33bb4620e7aec29ea1dfd8cc8fdb.tar.bz2 |
Use WeakSets rather than manual pruning to prevent unbounded growth of dead thread references.
-rw-r--r-- | Lib/concurrent/futures/process.py | 23 | ||||
-rw-r--r-- | Lib/concurrent/futures/thread.py | 25 |
2 files changed, 8 insertions, 40 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 79c60c3..44f8504 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -66,28 +66,14 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_thread_references = set() +_live_threads = weakref.WeakSet() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + for thread in _live_threads: + thread.join() # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for @@ -279,7 +265,6 @@ class ProcessPoolExecutor(_base.Executor): worker processes will be created as the machine has processors. """ _check_system_limits() - _remove_dead_thread_references() if max_workers is None: self._max_workers = multiprocessing.cpu_count() @@ -316,7 +301,7 @@ class ProcessPoolExecutor(_base.Executor): self._shutdown_process_event)) self._queue_management_thread.daemon = True self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) + _live_threads.add(self._queue_management_thread) def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 15736da..299b94a 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -25,29 +25,14 @@ import weakref # workers to exit when their work queues are empty and then waits until the # threads finish. -_thread_references = set() +_live_threads = weakref.WeakSet() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) - + for thread in _live_threads: + thread.join() atexit.register(_python_exit) class _WorkItem(object): @@ -95,8 +80,6 @@ class ThreadPoolExecutor(_base.Executor): max_workers: The maximum number of threads that can be used to execute the given calls. """ - _remove_dead_thread_references() - self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() @@ -125,7 +108,7 @@ class ThreadPoolExecutor(_base.Executor): t.daemon = True t.start() self._threads.add(t) - _thread_references.add(weakref.ref(t)) + _live_threads.add(t) def shutdown(self, wait=True): with self._shutdown_lock: |