summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorBenjamin Peterson <benjamin@python.org>2011-07-16 02:10:44 (GMT)
committerBenjamin Peterson <benjamin@python.org>2011-07-16 02:10:44 (GMT)
commit7dc35f6fea054f9e7488e3d06d4dfdaac86af310 (patch)
tree5bd23d03a1ff5abc6b1f506391b2a55c53671c76 /Lib
parent0bd152cd67fef5c238ed15ba2ce09f6a74e8d334 (diff)
parentdc19c24832fac20402f6cf4d2396c299a73766bb (diff)
downloadcpython-7dc35f6fea054f9e7488e3d06d4dfdaac86af310.zip
cpython-7dc35f6fea054f9e7488e3d06d4dfdaac86af310.tar.gz
cpython-7dc35f6fea054f9e7488e3d06d4dfdaac86af310.tar.bz2
merge heads
Diffstat (limited to 'Lib')
-rw-r--r--Lib/concurrent/futures/process.py12
-rw-r--r--Lib/multiprocessing/queues.py9
-rw-r--r--Lib/packaging/tests/__main__.py15
-rw-r--r--Lib/test/support.py19
-rw-r--r--Lib/test/test_concurrent_futures.py3
-rw-r--r--Lib/test/test_os.py1
-rw-r--r--Lib/test/test_pydoc.py47
-rw-r--r--Lib/test/test_threaded_import.py11
-rw-r--r--Lib/test/threaded_import_hangers.py13
9 files changed, 82 insertions, 48 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 41e1320..9b2e0f3 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -205,6 +205,8 @@ def _queue_management_worker(executor_reference,
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
+ # Release the queue's resources as soon as possible.
+ call_queue.close()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS X.
for p in processes.values():
@@ -239,14 +241,14 @@ def _queue_management_worker(executor_reference,
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
- for p in processes.values():
- p.join()
+ shutdown_worker()
return
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
assert shutting_down()
- del processes[result_item]
+ p = processes.pop(result_item)
+ p.join()
if not processes:
shutdown_worker()
return
@@ -334,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor):
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
+ # Killed worker processes can produce spurious "broken pipe"
+ # tracebacks in the queue's own worker thread. But we detect killed
+ # processes anyway, so silence the tracebacks.
+ self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 3324363..4696ccc 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -41,6 +41,7 @@ import collections
import time
import atexit
import weakref
+import errno
from queue import Empty, Full
import _multiprocessing
@@ -67,6 +68,8 @@ class Queue(object):
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
+ # For use by concurrent.futures
+ self._ignore_epipe = False
self._after_fork()
@@ -178,7 +181,7 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send,
- self._wlock, self._writer.close),
+ self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -229,7 +232,7 @@ class Queue(object):
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close):
+ def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -271,6 +274,8 @@ class Queue(object):
except IndexError:
pass
except Exception as e:
+ if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+ return
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
diff --git a/Lib/packaging/tests/__main__.py b/Lib/packaging/tests/__main__.py
index 68ee229..0f175cf 100644
--- a/Lib/packaging/tests/__main__.py
+++ b/Lib/packaging/tests/__main__.py
@@ -5,15 +5,18 @@
import os
import sys
import unittest
-from test.support import run_unittest, reap_children
+from test.support import run_unittest, reap_children, reap_threads
+@reap_threads
def test_main():
- start_dir = os.path.dirname(__file__)
- top_dir = os.path.dirname(os.path.dirname(start_dir))
- test_loader = unittest.TestLoader()
- run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir))
- reap_children()
+ try:
+ start_dir = os.path.dirname(__file__)
+ top_dir = os.path.dirname(os.path.dirname(start_dir))
+ test_loader = unittest.TestLoader()
+ run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir))
+ finally:
+ reap_children()
if __name__ == '__main__':
diff --git a/Lib/test/support.py b/Lib/test/support.py
index 3dc4bfb..89ace45 100644
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -24,9 +24,15 @@ import sysconfig
import logging.handlers
try:
- import _thread
+ import _thread, threading
except ImportError:
_thread = None
+ threading = None
+try:
+ import multiprocessing.process
+except ImportError:
+ multiprocessing = None
+
try:
import zlib
@@ -1358,19 +1364,20 @@ def modules_cleanup(oldmodules):
def threading_setup():
if _thread:
- return _thread._count(),
+ return _thread._count(), threading._dangling.copy()
else:
- return 1,
+ return 1, ()
-def threading_cleanup(nb_threads):
+def threading_cleanup(*original_values):
if not _thread:
return
_MAX_COUNT = 10
for count in range(_MAX_COUNT):
- n = _thread._count()
- if n == nb_threads:
+ values = _thread._count(), threading._dangling
+ if values == original_values:
break
time.sleep(0.1)
+ gc_collect()
# XXX print a warning in case of failure?
def reap_threads(func):
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 222bd54..78a9906 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -634,7 +634,8 @@ def test_main():
ThreadPoolAsCompletedTests,
FutureTests,
ProcessPoolShutdownTest,
- ThreadPoolShutdownTest)
+ ThreadPoolShutdownTest,
+ )
finally:
test.support.reap_children()
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 72f1c1b..1d5f11c 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -1506,6 +1506,7 @@ class TestSendfile(unittest.TestCase):
raise
+@support.reap_threads
def test_main():
support.run_unittest(
FileTests,
diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py
index db118ca..7591303 100644
--- a/Lib/test/test_pydoc.py
+++ b/Lib/test/test_pydoc.py
@@ -15,9 +15,12 @@ import textwrap
from io import StringIO
from collections import namedtuple
from contextlib import contextmanager
-from test.support import TESTFN, forget, rmtree, EnvironmentVarGuard, \
- reap_children, captured_output, captured_stdout, unlink
+from test.script_helper import assert_python_ok
+from test.support import (
+ TESTFN, forget, rmtree, EnvironmentVarGuard,
+ reap_children, reap_threads, captured_output, captured_stdout, unlink
+)
from test import pydoc_mod
try:
@@ -199,17 +202,14 @@ missing_pattern = "no Python documentation found for '%s'"
# output pattern for module with bad imports
badimport_pattern = "problem in %s - ImportError: No module named %r"
-def run_pydoc(module_name, *args):
+def run_pydoc(module_name, *args, **env):
"""
Runs pydoc on the specified module. Returns the stripped
output of pydoc.
"""
- cmd = [sys.executable, pydoc.__file__, " ".join(args), module_name]
- try:
- output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0]
- return output.strip()
- finally:
- reap_children()
+ args = args + (module_name,)
+ rc, out, err = assert_python_ok(pydoc.__file__, *args, **env)
+ return out.strip()
def get_pydoc_html(module):
"Returns pydoc generated output as html"
@@ -312,19 +312,20 @@ class PydocDocTest(unittest.TestCase):
def newdirinpath(dir):
os.mkdir(dir)
sys.path.insert(0, dir)
- yield
- sys.path.pop(0)
- rmtree(dir)
+ try:
+ yield
+ finally:
+ sys.path.pop(0)
+ rmtree(dir)
- with newdirinpath(TESTFN), EnvironmentVarGuard() as env:
- env['PYTHONPATH'] = TESTFN
+ with newdirinpath(TESTFN):
fullmodname = os.path.join(TESTFN, modname)
sourcefn = fullmodname + os.extsep + "py"
for importstring, expectedinmsg in testpairs:
with open(sourcefn, 'w') as f:
f.write("import {}\n".format(importstring))
try:
- result = run_pydoc(modname).decode("ascii")
+ result = run_pydoc(modname, PYTHONPATH=TESTFN).decode("ascii")
finally:
forget(modname)
expected = badimport_pattern % (modname, expectedinmsg)
@@ -494,13 +495,17 @@ class TestHelper(unittest.TestCase):
self.assertEqual(sorted(pydoc.Helper.keywords),
sorted(keyword.kwlist))
+@reap_threads
def test_main():
- test.support.run_unittest(PydocDocTest,
- TestDescriptions,
- PydocServerTest,
- PydocUrlHandlerTest,
- TestHelper,
- )
+ try:
+ test.support.run_unittest(PydocDocTest,
+ TestDescriptions,
+ PydocServerTest,
+ PydocUrlHandlerTest,
+ TestHelper,
+ )
+ finally:
+ reap_children()
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_threaded_import.py b/Lib/test/test_threaded_import.py
index 6919d21..789920b 100644
--- a/Lib/test/test_threaded_import.py
+++ b/Lib/test/test_threaded_import.py
@@ -11,8 +11,8 @@ import sys
import time
import shutil
import unittest
-from test.support import verbose, import_module, run_unittest, TESTFN
-thread = import_module('_thread')
+from test.support import (
+ verbose, import_module, run_unittest, TESTFN, reap_threads)
threading = import_module('threading')
def task(N, done, done_tasks, errors):
@@ -62,7 +62,7 @@ class Finder:
def __init__(self):
self.numcalls = 0
self.x = 0
- self.lock = thread.allocate_lock()
+ self.lock = threading.Lock()
def find_module(self, name, path=None):
# Simulate some thread-unsafe behaviour. If calls to find_module()
@@ -113,7 +113,9 @@ class ThreadedImportTests(unittest.TestCase):
done_tasks = []
done.clear()
for i in range(N):
- thread.start_new_thread(task, (N, done, done_tasks, errors,))
+ t = threading.Thread(target=task,
+ args=(N, done, done_tasks, errors,))
+ t.start()
done.wait(60)
self.assertFalse(errors)
if verbose:
@@ -203,6 +205,7 @@ class ThreadedImportTests(unittest.TestCase):
self.assertEqual(set(results), {'a', 'b'})
+@reap_threads
def test_main():
run_unittest(ThreadedImportTests)
diff --git a/Lib/test/threaded_import_hangers.py b/Lib/test/threaded_import_hangers.py
index adf03e3..5484e60 100644
--- a/Lib/test/threaded_import_hangers.py
+++ b/Lib/test/threaded_import_hangers.py
@@ -35,8 +35,11 @@ for name, func, args in [
("os.path.abspath", os.path.abspath, ('.',)),
]:
- t = Worker(func, args)
- t.start()
- t.join(TIMEOUT)
- if t.is_alive():
- errors.append("%s appeared to hang" % name)
+ try:
+ t = Worker(func, args)
+ t.start()
+ t.join(TIMEOUT)
+ if t.is_alive():
+ errors.append("%s appeared to hang" % name)
+ finally:
+ del t