diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/concurrent/futures/process.py | 12 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 9 | ||||
-rw-r--r-- | Lib/packaging/tests/__main__.py | 15 | ||||
-rw-r--r-- | Lib/test/support.py | 19 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 3 | ||||
-rw-r--r-- | Lib/test/test_os.py | 1 | ||||
-rw-r--r-- | Lib/test/test_pydoc.py | 47 | ||||
-rw-r--r-- | Lib/test/test_threaded_import.py | 11 | ||||
-rw-r--r-- | Lib/test/threaded_import_hangers.py | 13 |
9 files changed, 82 insertions, 48 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 41e1320..9b2e0f3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -205,6 +205,8 @@ def _queue_management_worker(executor_reference, nb_children_alive = sum(p.is_alive() for p in processes.values()) for i in range(0, nb_children_alive): call_queue.put_nowait(None) + # Release the queue's resources as soon as possible. + call_queue.close() # If .join() is not called on the created processes then # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): @@ -239,14 +241,14 @@ def _queue_management_worker(executor_reference, # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() - for p in processes.values(): - p.join() + shutdown_worker() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) assert shutting_down() - del processes[result_item] + p = processes.pop(result_item) + p.join() if not processes: shutdown_worker() return @@ -334,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor): # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 3324363..4696ccc 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -41,6 +41,7 @@ import collections import time import atexit import weakref +import errno from queue import Empty, Full import _multiprocessing @@ -67,6 +68,8 @@ class Queue(object): else: self._wlock = Lock() self._sem = BoundedSemaphore(maxsize) + # For use by concurrent.futures + self._ignore_epipe = False self._after_fork() @@ -178,7 +181,7 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send, - self._wlock, self._writer.close), + self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) self._thread.daemon = True @@ -229,7 +232,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close): + def _feed(buffer, notempty, send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -271,6 +274,8 @@ class Queue(object): except IndexError: pass except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has diff --git a/Lib/packaging/tests/__main__.py b/Lib/packaging/tests/__main__.py index 68ee229..0f175cf 100644 --- a/Lib/packaging/tests/__main__.py +++ b/Lib/packaging/tests/__main__.py @@ -5,15 +5,18 @@ import os import sys import unittest -from test.support import run_unittest, reap_children +from test.support import run_unittest, reap_children, reap_threads +@reap_threads def test_main(): - start_dir = os.path.dirname(__file__) - top_dir = os.path.dirname(os.path.dirname(start_dir)) - test_loader = unittest.TestLoader() - run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir)) - reap_children() + try: + start_dir = os.path.dirname(__file__) + top_dir = os.path.dirname(os.path.dirname(start_dir)) + test_loader = unittest.TestLoader() + run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir)) + finally: + reap_children() if __name__ == '__main__': diff --git a/Lib/test/support.py b/Lib/test/support.py index 3dc4bfb..89ace45 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -24,9 +24,15 @@ import sysconfig import logging.handlers try: - import _thread + import _thread, threading except ImportError: _thread = None + threading = None +try: + import multiprocessing.process +except ImportError: + multiprocessing = None + try: import zlib @@ -1358,19 +1364,20 @@ def modules_cleanup(oldmodules): def threading_setup(): if _thread: - return _thread._count(), + return _thread._count(), threading._dangling.copy() else: - return 1, + return 1, () -def threading_cleanup(nb_threads): +def threading_cleanup(*original_values): if not _thread: return _MAX_COUNT = 10 for count in range(_MAX_COUNT): - n = _thread._count() - if n == nb_threads: + values = _thread._count(), threading._dangling + if values == original_values: break time.sleep(0.1) + gc_collect() # XXX print a warning in case of failure? def reap_threads(func): diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 222bd54..78a9906 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -634,7 +634,8 @@ def test_main(): ThreadPoolAsCompletedTests, FutureTests, ProcessPoolShutdownTest, - ThreadPoolShutdownTest) + ThreadPoolShutdownTest, + ) finally: test.support.reap_children() diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 72f1c1b..1d5f11c 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -1506,6 +1506,7 @@ class TestSendfile(unittest.TestCase): raise +@support.reap_threads def test_main(): support.run_unittest( FileTests, diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py index db118ca..7591303 100644 --- a/Lib/test/test_pydoc.py +++ b/Lib/test/test_pydoc.py @@ -15,9 +15,12 @@ import textwrap from io import StringIO from collections import namedtuple from contextlib import contextmanager -from test.support import TESTFN, forget, rmtree, EnvironmentVarGuard, \ - reap_children, captured_output, captured_stdout, unlink +from test.script_helper import assert_python_ok +from test.support import ( + TESTFN, forget, rmtree, EnvironmentVarGuard, + reap_children, reap_threads, captured_output, captured_stdout, unlink +) from test import pydoc_mod try: @@ -199,17 +202,14 @@ missing_pattern = "no Python documentation found for '%s'" # output pattern for module with bad imports badimport_pattern = "problem in %s - ImportError: No module named %r" -def run_pydoc(module_name, *args): +def run_pydoc(module_name, *args, **env): """ Runs pydoc on the specified module. Returns the stripped output of pydoc. """ - cmd = [sys.executable, pydoc.__file__, " ".join(args), module_name] - try: - output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] - return output.strip() - finally: - reap_children() + args = args + (module_name,) + rc, out, err = assert_python_ok(pydoc.__file__, *args, **env) + return out.strip() def get_pydoc_html(module): "Returns pydoc generated output as html" @@ -312,19 +312,20 @@ class PydocDocTest(unittest.TestCase): def newdirinpath(dir): os.mkdir(dir) sys.path.insert(0, dir) - yield - sys.path.pop(0) - rmtree(dir) + try: + yield + finally: + sys.path.pop(0) + rmtree(dir) - with newdirinpath(TESTFN), EnvironmentVarGuard() as env: - env['PYTHONPATH'] = TESTFN + with newdirinpath(TESTFN): fullmodname = os.path.join(TESTFN, modname) sourcefn = fullmodname + os.extsep + "py" for importstring, expectedinmsg in testpairs: with open(sourcefn, 'w') as f: f.write("import {}\n".format(importstring)) try: - result = run_pydoc(modname).decode("ascii") + result = run_pydoc(modname, PYTHONPATH=TESTFN).decode("ascii") finally: forget(modname) expected = badimport_pattern % (modname, expectedinmsg) @@ -494,13 +495,17 @@ class TestHelper(unittest.TestCase): self.assertEqual(sorted(pydoc.Helper.keywords), sorted(keyword.kwlist)) +@reap_threads def test_main(): - test.support.run_unittest(PydocDocTest, - TestDescriptions, - PydocServerTest, - PydocUrlHandlerTest, - TestHelper, - ) + try: + test.support.run_unittest(PydocDocTest, + TestDescriptions, + PydocServerTest, + PydocUrlHandlerTest, + TestHelper, + ) + finally: + reap_children() if __name__ == "__main__": test_main() diff --git a/Lib/test/test_threaded_import.py b/Lib/test/test_threaded_import.py index 6919d21..789920b 100644 --- a/Lib/test/test_threaded_import.py +++ b/Lib/test/test_threaded_import.py @@ -11,8 +11,8 @@ import sys import time import shutil import unittest -from test.support import verbose, import_module, run_unittest, TESTFN -thread = import_module('_thread') +from test.support import ( + verbose, import_module, run_unittest, TESTFN, reap_threads) threading = import_module('threading') def task(N, done, done_tasks, errors): @@ -62,7 +62,7 @@ class Finder: def __init__(self): self.numcalls = 0 self.x = 0 - self.lock = thread.allocate_lock() + self.lock = threading.Lock() def find_module(self, name, path=None): # Simulate some thread-unsafe behaviour. If calls to find_module() @@ -113,7 +113,9 @@ class ThreadedImportTests(unittest.TestCase): done_tasks = [] done.clear() for i in range(N): - thread.start_new_thread(task, (N, done, done_tasks, errors,)) + t = threading.Thread(target=task, + args=(N, done, done_tasks, errors,)) + t.start() done.wait(60) self.assertFalse(errors) if verbose: @@ -203,6 +205,7 @@ class ThreadedImportTests(unittest.TestCase): self.assertEqual(set(results), {'a', 'b'}) +@reap_threads def test_main(): run_unittest(ThreadedImportTests) diff --git a/Lib/test/threaded_import_hangers.py b/Lib/test/threaded_import_hangers.py index adf03e3..5484e60 100644 --- a/Lib/test/threaded_import_hangers.py +++ b/Lib/test/threaded_import_hangers.py @@ -35,8 +35,11 @@ for name, func, args in [ ("os.path.abspath", os.path.abspath, ('.',)), ]: - t = Worker(func, args) - t.start() - t.join(TIMEOUT) - if t.is_alive(): - errors.append("%s appeared to hang" % name) + try: + t = Worker(func, args) + t.start() + t.join(TIMEOUT) + if t.is_alive(): + errors.append("%s appeared to hang" % name) + finally: + del t |