From 707f228b1eb8d2829797524e0aa93922a238ff66 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 15 Jul 2011 22:29:44 +0200 Subject: Try harder to reap dangling threads in test.support.reap_threads(). --- Lib/test/support.py | 19 +++++++++++++------ Misc/NEWS | 2 ++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/Lib/test/support.py b/Lib/test/support.py index e2d7ae0..2827e8b 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -24,9 +24,15 @@ import sysconfig import logging.handlers try: - import _thread + import _thread, threading except ImportError: _thread = None + threading = None +try: + import multiprocessing.process +except ImportError: + multiprocessing = None + __all__ = [ "Error", "TestFailed", "ResourceDenied", "import_module", @@ -1275,19 +1281,20 @@ def modules_cleanup(oldmodules): def threading_setup(): if _thread: - return _thread._count(), + return _thread._count(), threading._dangling.copy() else: - return 1, + return 1, () -def threading_cleanup(nb_threads): +def threading_cleanup(*original_values): if not _thread: return _MAX_COUNT = 10 for count in range(_MAX_COUNT): - n = _thread._count() - if n == nb_threads: + values = _thread._count(), threading._dangling + if values == original_values: break time.sleep(0.1) + gc_collect() # XXX print a warning in case of failure? def reap_threads(func): diff --git a/Misc/NEWS b/Misc/NEWS index 00bce49..02b59dc 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -67,6 +67,8 @@ C-API Tests ----- +- Try harder to reap dangling threads in test.support.reap_threads(). + - Issue #12573: Add resource checks for dangling Thread and Process objects. - Issue #12549: Correct test_platform to not fail when OS X returns 'x86_64' -- cgit v0.12 From a6e81a23b36e61f4803cca059d55943f52b17b7f Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 15 Jul 2011 22:32:25 +0200 Subject: test_pydoc needs to cleanup after itself --- Lib/test/test_pydoc.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py index a8f9fbfc..aaa6912 100644 --- a/Lib/test/test_pydoc.py +++ b/Lib/test/test_pydoc.py @@ -15,8 +15,10 @@ import textwrap from io import StringIO from collections import namedtuple from contextlib import contextmanager -from test.support import TESTFN, forget, rmtree, EnvironmentVarGuard, \ - reap_children, captured_output, captured_stdout, unlink +from test.support import ( + TESTFN, forget, rmtree, EnvironmentVarGuard, + reap_children, reap_threads, captured_output, captured_stdout, unlink +) from test import pydoc_mod @@ -205,11 +207,8 @@ def run_pydoc(module_name, *args): output of pydoc. """ cmd = [sys.executable, pydoc.__file__, " ".join(args), module_name] - try: - output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] - return output.strip() - finally: - reap_children() + output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] + return output.strip() def get_pydoc_html(module): "Returns pydoc generated output as html" @@ -488,13 +487,17 @@ class TestHelper(unittest.TestCase): self.assertEqual(sorted(pydoc.Helper.keywords), sorted(keyword.kwlist)) +@reap_threads def test_main(): - test.support.run_unittest(PydocDocTest, - TestDescriptions, - PydocServerTest, - PydocUrlHandlerTest, - TestHelper, - ) + try: + test.support.run_unittest(PydocDocTest, + TestDescriptions, + PydocServerTest, + PydocUrlHandlerTest, + TestHelper, + ) + finally: + reap_children() if __name__ == "__main__": test_main() -- cgit v0.12 From f7f54759b5f81cc011e987746ed3edd7fcc96d21 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 15 Jul 2011 22:42:12 +0200 Subject: Use test.script_helper in test_pydoc --- Lib/test/test_pydoc.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py index aaa6912..2a21a7e 100644 --- a/Lib/test/test_pydoc.py +++ b/Lib/test/test_pydoc.py @@ -15,11 +15,12 @@ import textwrap from io import StringIO from collections import namedtuple from contextlib import contextmanager + +from test.script_helper import assert_python_ok from test.support import ( TESTFN, forget, rmtree, EnvironmentVarGuard, reap_children, reap_threads, captured_output, captured_stdout, unlink ) - from test import pydoc_mod try: @@ -201,14 +202,14 @@ missing_pattern = "no Python documentation found for '%s'" # output pattern for module with bad imports badimport_pattern = "problem in %s - ImportError: No module named %s" -def run_pydoc(module_name, *args): +def run_pydoc(module_name, *args, **env): """ Runs pydoc on the specified module. Returns the stripped output of pydoc. """ - cmd = [sys.executable, pydoc.__file__, " ".join(args), module_name] - output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] - return output.strip() + args = args + (module_name,) + rc, out, err = assert_python_ok(pydoc.__file__, *args, **env) + return out.strip() def get_pydoc_html(module): "Returns pydoc generated output as html" @@ -307,19 +308,20 @@ class PydocDocTest(unittest.TestCase): def newdirinpath(dir): os.mkdir(dir) sys.path.insert(0, dir) - yield - sys.path.pop(0) - rmtree(dir) + try: + yield + finally: + sys.path.pop(0) + rmtree(dir) - with newdirinpath(TESTFN), EnvironmentVarGuard() as env: - env['PYTHONPATH'] = TESTFN + with newdirinpath(TESTFN): fullmodname = os.path.join(TESTFN, modname) sourcefn = fullmodname + os.extsep + "py" for importstring, expectedinmsg in testpairs: with open(sourcefn, 'w') as f: f.write("import {}\n".format(importstring)) try: - result = run_pydoc(modname).decode("ascii") + result = run_pydoc(modname, PYTHONPATH=TESTFN).decode("ascii") finally: forget(modname) expected = badimport_pattern % (modname, expectedinmsg) -- cgit v0.12 From f26ad7149f91db1baf2dff38c9ad7a4f6f035036 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 15 Jul 2011 23:00:56 +0200 Subject: test_os needs to reap threads --- Lib/test/test_os.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 72f1c1b..1d5f11c 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -1506,6 +1506,7 @@ class TestSendfile(unittest.TestCase): raise +@support.reap_threads def test_main(): support.run_unittest( FileTests, -- cgit v0.12 From 075050f84f15fb0681254dc3a0b7a53e9fe6668f Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 15 Jul 2011 23:09:13 +0200 Subject: test_threaded_import must clean up after itself --- Lib/test/test_threaded_import.py | 11 +++++++---- Lib/test/threaded_import_hangers.py | 14 +++++++++----- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/Lib/test/test_threaded_import.py b/Lib/test/test_threaded_import.py index 6919d21..789920b 100644 --- a/Lib/test/test_threaded_import.py +++ b/Lib/test/test_threaded_import.py @@ -11,8 +11,8 @@ import sys import time import shutil import unittest -from test.support import verbose, import_module, run_unittest, TESTFN -thread = import_module('_thread') +from test.support import ( + verbose, import_module, run_unittest, TESTFN, reap_threads) threading = import_module('threading') def task(N, done, done_tasks, errors): @@ -62,7 +62,7 @@ class Finder: def __init__(self): self.numcalls = 0 self.x = 0 - self.lock = thread.allocate_lock() + self.lock = threading.Lock() def find_module(self, name, path=None): # Simulate some thread-unsafe behaviour. If calls to find_module() @@ -113,7 +113,9 @@ class ThreadedImportTests(unittest.TestCase): done_tasks = [] done.clear() for i in range(N): - thread.start_new_thread(task, (N, done, done_tasks, errors,)) + t = threading.Thread(target=task, + args=(N, done, done_tasks, errors,)) + t.start() done.wait(60) self.assertFalse(errors) if verbose: @@ -203,6 +205,7 @@ class ThreadedImportTests(unittest.TestCase): self.assertEqual(set(results), {'a', 'b'}) +@reap_threads def test_main(): run_unittest(ThreadedImportTests) diff --git a/Lib/test/threaded_import_hangers.py b/Lib/test/threaded_import_hangers.py index adf03e3..d7cc255 100644 --- a/Lib/test/threaded_import_hangers.py +++ b/Lib/test/threaded_import_hangers.py @@ -35,8 +35,12 @@ for name, func, args in [ ("os.path.abspath", os.path.abspath, ('.',)), ]: - t = Worker(func, args) - t.start() - t.join(TIMEOUT) - if t.is_alive(): - errors.append("%s appeared to hang" % name) + try: + t = Worker(func, args) + t.start() + t.join(TIMEOUT) + if t.is_alive(): + errors.append("%s appeared to hang" % name) + finally: + del t + -- cgit v0.12 From 9e42a4dbaa53250965039932a44eb2771e737648 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 15 Jul 2011 23:09:58 +0200 Subject: Fix whitespace --- Lib/test/threaded_import_hangers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/threaded_import_hangers.py b/Lib/test/threaded_import_hangers.py index d7cc255..5484e60 100644 --- a/Lib/test/threaded_import_hangers.py +++ b/Lib/test/threaded_import_hangers.py @@ -43,4 +43,3 @@ for name, func, args in [ errors.append("%s appeared to hang" % name) finally: del t - -- cgit v0.12 From db535957cd4b44c6176e49dc2d28f034ece5262c Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 15 Jul 2011 23:26:19 +0200 Subject: test_packaging should clean up after itself (but it doesn't really) --- Lib/packaging/tests/__main__.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/Lib/packaging/tests/__main__.py b/Lib/packaging/tests/__main__.py index 68ee229..0f175cf 100644 --- a/Lib/packaging/tests/__main__.py +++ b/Lib/packaging/tests/__main__.py @@ -5,15 +5,18 @@ import os import sys import unittest -from test.support import run_unittest, reap_children +from test.support import run_unittest, reap_children, reap_threads +@reap_threads def test_main(): - start_dir = os.path.dirname(__file__) - top_dir = os.path.dirname(os.path.dirname(start_dir)) - test_loader = unittest.TestLoader() - run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir)) - reap_children() + try: + start_dir = os.path.dirname(__file__) + top_dir = os.path.dirname(os.path.dirname(start_dir)) + test_loader = unittest.TestLoader() + run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir)) + finally: + reap_children() if __name__ == '__main__': -- cgit v0.12 From d06a065a441896477f8dc4f5543654f6ba20bb51 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 16 Jul 2011 01:13:34 +0200 Subject: Fix potential resource leaks in concurrent.futures.ProcessPoolExecutor by joining all queues and processes when shutdown() is called. --- Lib/concurrent/futures/process.py | 5 ++++- Lib/test/test_concurrent_futures.py | 3 ++- Misc/NEWS | 3 +++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 41e1320..7c22a62 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -209,6 +209,8 @@ def _queue_management_worker(executor_reference, # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() + # Release resources held by the queue + call_queue.close() while True: _add_call_item_to_queue(pending_work_items, @@ -246,7 +248,8 @@ def _queue_management_worker(executor_reference, # Clean shutdown of a worker using its PID # (avoids marking the executor broken) assert shutting_down() - del processes[result_item] + p = processes.pop(result_item) + p.join() if not processes: shutdown_worker() return diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 222bd54..78a9906 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -634,7 +634,8 @@ def test_main(): ThreadPoolAsCompletedTests, FutureTests, ProcessPoolShutdownTest, - ThreadPoolShutdownTest) + ThreadPoolShutdownTest, + ) finally: test.support.reap_children() diff --git a/Misc/NEWS b/Misc/NEWS index e75de01..8d7fb16 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -228,6 +228,9 @@ Core and Builtins Library ------- +- Fix potential resource leaks in concurrent.futures.ProcessPoolExecutor + by joining all queues and processes when shutdown() is called. + - Issue #11603: Fix a crash when __str__ is rebound as __repr__. Patch by Andreas Stührk. -- cgit v0.12 From dc19c24832fac20402f6cf4d2396c299a73766bb Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 16 Jul 2011 01:51:58 +0200 Subject: Silence spurious "broken pipe" tracebacks when shutting down a ProcessPoolExecutor. --- Lib/concurrent/futures/process.py | 11 +++++++---- Lib/multiprocessing/queues.py | 9 +++++++-- Misc/NEWS | 3 +++ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7c22a62..9b2e0f3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -205,12 +205,12 @@ def _queue_management_worker(executor_reference, nb_children_alive = sum(p.is_alive() for p in processes.values()) for i in range(0, nb_children_alive): call_queue.put_nowait(None) + # Release the queue's resources as soon as possible. + call_queue.close() # If .join() is not called on the created processes then # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() - # Release resources held by the queue - call_queue.close() while True: _add_call_item_to_queue(pending_work_items, @@ -241,8 +241,7 @@ def _queue_management_worker(executor_reference, # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() - for p in processes.values(): - p.join() + shutdown_worker() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID @@ -337,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor): # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 3324363..4696ccc 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -41,6 +41,7 @@ import collections import time import atexit import weakref +import errno from queue import Empty, Full import _multiprocessing @@ -67,6 +68,8 @@ class Queue(object): else: self._wlock = Lock() self._sem = BoundedSemaphore(maxsize) + # For use by concurrent.futures + self._ignore_epipe = False self._after_fork() @@ -178,7 +181,7 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send, - self._wlock, self._writer.close), + self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) self._thread.daemon = True @@ -229,7 +232,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close): + def _feed(buffer, notempty, send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -271,6 +274,8 @@ class Queue(object): except IndexError: pass except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has diff --git a/Misc/NEWS b/Misc/NEWS index 8d7fb16..53465b4 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -228,6 +228,9 @@ Core and Builtins Library ------- +- Silence spurious "broken pipe" tracebacks when shutting down a + ProcessPoolExecutor. + - Fix potential resource leaks in concurrent.futures.ProcessPoolExecutor by joining all queues and processes when shutdown() is called. -- cgit v0.12