summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@redhat.com>2019-10-01 10:29:36 (GMT)
committerGitHub <noreply@github.com>2019-10-01 10:29:36 (GMT)
commit982bfa4da07b2e5749a0f4e68f99e972bcc3a549 (patch)
tree9581db8d39159df09c590df363341da654cc965e
parent8462a4936b3a551dc546a6adea04a70b0a07ca67 (diff)
downloadcpython-982bfa4da07b2e5749a0f4e68f99e972bcc3a549.zip
cpython-982bfa4da07b2e5749a0f4e68f99e972bcc3a549.tar.gz
cpython-982bfa4da07b2e5749a0f4e68f99e972bcc3a549.tar.bz2
bpo-36670: Multiple regrtest bugfixes (GH-16511)
* Windows: Fix counter name in WindowsLoadTracker. Counter names are localized: use the registry to get the counter name. Original change written by Lorenz Mende. * Regrtest.main() now ensures that the Windows load tracker is also killed if an exception is raised * TestWorkerProcess now ensures that worker processes are no longer running before exiting: kill also worker processes when an exception is raised. * Enhance regrtest messages and warnings: include test name, duration, add a worker identifier, etc. * Rename MultiprocessRunner to TestWorkerProcess * Use print_warning() to display warnings. Co-Authored-By: Lorenz Mende <Lorenz.mende@gmail.com>
-rw-r--r--Lib/test/libregrtest/main.py19
-rw-r--r--Lib/test/libregrtest/runtest_mp.py222
-rw-r--r--Lib/test/libregrtest/win_utils.py67
3 files changed, 175 insertions, 133 deletions
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index 2997006..fd701c4 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -508,10 +508,6 @@ class Regrtest:
self.run_tests_sequential()
def finalize(self):
- if self.win_load_tracker is not None:
- self.win_load_tracker.close()
- self.win_load_tracker = None
-
if self.next_single_filename:
if self.next_single_test:
with open(self.next_single_filename, 'w') as fp:
@@ -680,11 +676,16 @@ class Regrtest:
# typeperf.exe for x64, x86 or ARM
print(f'Failed to create WindowsLoadTracker: {error}')
- self.run_tests()
- self.display_result()
-
- if self.ns.verbose2 and self.bad:
- self.rerun_failed_tests()
+ try:
+ self.run_tests()
+ self.display_result()
+
+ if self.ns.verbose2 and self.bad:
+ self.rerun_failed_tests()
+ finally:
+ if self.win_load_tracker is not None:
+ self.win_load_tracker.close()
+ self.win_load_tracker = None
self.finalize()
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index 9cb5be6..38b0578 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -15,7 +15,7 @@ from test.libregrtest.runtest import (
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
format_test_result, TestResult, is_failed, TIMEOUT)
from test.libregrtest.setup import setup_tests
-from test.libregrtest.utils import format_duration
+from test.libregrtest.utils import format_duration, print_warning
# Display the running tests if nothing happened last N seconds
@@ -103,9 +103,10 @@ class ExitThread(Exception):
pass
-class MultiprocessThread(threading.Thread):
- def __init__(self, pending, output, ns, timeout):
+class TestWorkerProcess(threading.Thread):
+ def __init__(self, worker_id, pending, output, ns, timeout):
super().__init__()
+ self.worker_id = worker_id
self.pending = pending
self.output = output
self.ns = ns
@@ -114,12 +115,16 @@ class MultiprocessThread(threading.Thread):
self.start_time = None
self._popen = None
self._killed = False
+ self._stopped = False
def __repr__(self):
- info = ['MultiprocessThread']
- test = self.current_test_name
+ info = [f'TestWorkerProcess #{self.worker_id}']
if self.is_alive():
- info.append('alive')
+ dt = time.monotonic() - self.start_time
+ info.append("running for %s" % format_duration(dt))
+ else:
+ info.append('stopped')
+ test = self.current_test_name
if test:
info.append(f'test={test}')
popen = self._popen
@@ -128,53 +133,24 @@ class MultiprocessThread(threading.Thread):
return '<%s>' % ' '.join(info)
def _kill(self):
- dt = time.monotonic() - self.start_time
+ if self._killed:
+ return
+ self._killed = True
popen = self._popen
- pid = popen.pid
- print("Kill worker process %s running for %.1f sec" % (pid, dt),
- file=sys.stderr, flush=True)
+ if popen is None:
+ return
+ print(f"Kill {self}", file=sys.stderr, flush=True)
try:
popen.kill()
- return True
except OSError as exc:
- print("WARNING: Failed to kill worker process %s: %r" % (pid, exc),
- file=sys.stderr, flush=True)
- return False
-
- def _close_wait(self):
- popen = self._popen
-
- # stdout and stderr must be closed to ensure that communicate()
- # does not hang
- popen.stdout.close()
- popen.stderr.close()
-
- try:
- popen.wait(JOIN_TIMEOUT)
- except (subprocess.TimeoutExpired, OSError) as exc:
- print("WARNING: Failed to wait for worker process %s "
- "completion (timeout=%.1f sec): %r"
- % (popen.pid, JOIN_TIMEOUT, exc),
- file=sys.stderr, flush=True)
-
- def kill(self):
- """
- Kill the current process (if any).
-
- This method can be called by the thread running the process,
- or by another thread.
- """
- self._killed = True
-
- if self._popen is None:
- return
-
- if not self._kill():
- return
+ print_warning(f"Failed to kill {self}: {exc!r}")
- self._close_wait()
+ def stop(self):
+ # Method called from a different thread to stop this thread
+ self._stopped = True
+ self._kill()
def mp_result_error(self, test_name, error_type, stdout='', stderr='',
err_msg=None):
@@ -190,59 +166,69 @@ class MultiprocessThread(threading.Thread):
try:
stdout, stderr = popen.communicate(timeout=JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
- print("WARNING: Failed to read worker process %s output "
- "(timeout=%.1f sec): %r"
- % (popen.pid, JOIN_TIMEOUT, exc),
- file=sys.stderr, flush=True)
-
- self._close_wait()
+ print_warning(f"Failed to read {self} output "
+ f"(timeout={format_duration(JOIN_TIMEOUT)}): "
+ f"{exc!r}")
return self.mp_result_error(test_name, TIMEOUT, stdout, stderr)
- def _runtest(self, test_name):
- try:
- self.start_time = time.monotonic()
- self.current_test_name = test_name
+ def _run_process(self, test_name):
+ self.start_time = time.monotonic()
+ self.current_test_name = test_name
+ try:
+ self._killed = False
self._popen = run_test_in_subprocess(test_name, self.ns)
popen = self._popen
+ except:
+ self.current_test_name = None
+ raise
+
+ try:
+ if self._stopped:
+ # If kill() has been called before self._popen is set,
+ # self._popen is still running. Call again kill()
+ # to ensure that the process is killed.
+ self._kill()
+ raise ExitThread
+
try:
- try:
- if self._killed:
- # If kill() has been called before self._popen is set,
- # self._popen is still running. Call again kill()
- # to ensure that the process is killed.
- self.kill()
- raise ExitThread
-
- try:
- stdout, stderr = popen.communicate(timeout=self.timeout)
- except subprocess.TimeoutExpired:
- if self._killed:
- # kill() has been called: communicate() fails
- # on reading closed stdout/stderr
- raise ExitThread
-
- return self._timedout(test_name)
- except OSError:
- if self._killed:
- # kill() has been called: communicate() fails
- # on reading closed stdout/stderr
- raise ExitThread
- raise
- except:
- self.kill()
- raise
- finally:
- self._close_wait()
+ stdout, stderr = popen.communicate(timeout=self.timeout)
+ except subprocess.TimeoutExpired:
+ if self._stopped:
+ # kill() has been called: communicate() fails
+ # on reading closed stdout/stderr
+ raise ExitThread
+
+ return self._timedout(test_name)
+ except OSError:
+ if self._stopped:
+ # kill() has been called: communicate() fails
+ # on reading closed stdout/stderr
+ raise ExitThread
+ raise
retcode = popen.returncode
+ stdout = stdout.strip()
+ stderr = stderr.rstrip()
+
+ return (retcode, stdout, stderr)
+ except:
+ self._kill()
+ raise
finally:
- self.current_test_name = None
+ self._wait_completed()
self._popen = None
+ self.current_test_name = None
+
+ def _runtest(self, test_name):
+ result = self._run_process(test_name)
- stdout = stdout.strip()
- stderr = stderr.rstrip()
+ if isinstance(result, MultiprocessResult):
+ # _timedout() case
+ return result
+
+ retcode, stdout, stderr = result
err_msg = None
if retcode != 0:
@@ -266,7 +252,7 @@ class MultiprocessThread(threading.Thread):
return MultiprocessResult(result, stdout, stderr, err_msg)
def run(self):
- while not self._killed:
+ while not self._stopped:
try:
try:
test_name = next(self.pending)
@@ -284,6 +270,33 @@ class MultiprocessThread(threading.Thread):
self.output.put((True, traceback.format_exc()))
break
+ def _wait_completed(self):
+ popen = self._popen
+
+ # stdout and stderr must be closed to ensure that communicate()
+ # does not hang
+ popen.stdout.close()
+ popen.stderr.close()
+
+ try:
+ popen.wait(JOIN_TIMEOUT)
+ except (subprocess.TimeoutExpired, OSError) as exc:
+ print_warning(f"Failed to wait for {self} completion "
+ f"(timeout={format_duration(JOIN_TIMEOUT)}): "
+ f"{exc!r}")
+
+ def wait_stopped(self, start_time):
+ while True:
+ # Write a message every second
+ self.join(1.0)
+ if not self.is_alive():
+ break
+ dt = time.monotonic() - start_time
+ print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
+ if dt > JOIN_TIMEOUT:
+ print_warning(f"Failed to join {self} in {format_duration(dt)}")
+ break
+
def get_running(workers):
running = []
@@ -298,7 +311,7 @@ def get_running(workers):
return running
-class MultiprocessRunner:
+class MultiprocessTestRunner:
def __init__(self, regrtest):
self.regrtest = regrtest
self.ns = regrtest.ns
@@ -311,30 +324,20 @@ class MultiprocessRunner:
self.workers = None
def start_workers(self):
- self.workers = [MultiprocessThread(self.pending, self.output,
- self.ns, self.worker_timeout)
- for _ in range(self.ns.use_mp)]
+ self.workers = [TestWorkerProcess(index, self.pending, self.output,
+ self.ns, self.worker_timeout)
+ for index in range(1, self.ns.use_mp + 1)]
print("Run tests in parallel using %s child processes"
% len(self.workers))
for worker in self.workers:
worker.start()
- def wait_workers(self):
+ def stop_workers(self):
start_time = time.monotonic()
for worker in self.workers:
- worker.kill()
+ worker.stop()
for worker in self.workers:
- while True:
- worker.join(1.0)
- if not worker.is_alive():
- break
- dt = time.monotonic() - start_time
- print("Wait for regrtest worker %r for %.1f sec" % (worker, dt),
- flush=True)
- if dt > JOIN_TIMEOUT:
- print("Warning -- failed to join a regrtest worker %s"
- % worker, flush=True)
- break
+ worker.wait_stopped(start_time)
def _get_result(self):
if not any(worker.is_alive() for worker in self.workers):
@@ -418,10 +421,11 @@ class MultiprocessRunner:
if self.ns.timeout is not None:
faulthandler.cancel_dump_traceback_later()
- # a test failed (and --failfast is set) or all tests completed
- self.pending.stop()
- self.wait_workers()
+ # Always ensure that all worker processes are no longer
+ # worker when we exit this function
+ self.pending.stop()
+ self.stop_workers()
def run_tests_multiprocess(regrtest):
- MultiprocessRunner(regrtest).run_tests()
+ MultiprocessTestRunner(regrtest).run_tests()
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py
index ec2d6c6..f0c17b9 100644
--- a/Lib/test/libregrtest/win_utils.py
+++ b/Lib/test/libregrtest/win_utils.py
@@ -3,16 +3,22 @@ import msvcrt
import os
import subprocess
import uuid
+import winreg
from test import support
+from test.libregrtest.utils import print_warning
# Max size of asynchronous reads
BUFSIZE = 8192
# Exponential damping factor (see below)
LOAD_FACTOR_1 = 0.9200444146293232478931553241
+
# Seconds per measurement
SAMPLING_INTERVAL = 5
-COUNTER_NAME = r'\System\Processor Queue Length'
+# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
+# of typeperf are registered
+COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
+ r"\Perflib\CurrentLanguage")
class WindowsLoadTracker():
@@ -25,7 +31,8 @@ class WindowsLoadTracker():
def __init__(self):
self.load = 0.0
- self.p = None
+ self.counter_name = ''
+ self.popen = None
self.start()
def start(self):
@@ -55,31 +62,46 @@ class WindowsLoadTracker():
overlap.GetOverlappedResult(True)
# Spawn off the load monitor
- command = ['typeperf', COUNTER_NAME, '-si', str(SAMPLING_INTERVAL)]
- self.p = subprocess.Popen(command, stdout=command_stdout, cwd=support.SAVEDCWD)
+ counter_name = self._get_counter_name()
+ command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
+ self.popen = subprocess.Popen(' '.join(command), stdout=command_stdout, cwd=support.SAVEDCWD)
# Close our copy of the write end of the pipe
os.close(command_stdout)
+ def _get_counter_name(self):
+ # accessing the registry to get the counter localization name
+ with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
+ counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
+
+ # Convert [key1, value1, key2, value2, ...] list
+ # to {key1: value1, key2: value2, ...} dict
+ counters = iter(counters)
+ counters_dict = dict(zip(counters, counters))
+
+ # System counter has key '2' and Processor Queue Length has key '44'
+ system = counters_dict['2']
+ process_queue_length = counters_dict['44']
+ return f'"\\{system}\\{process_queue_length}"'
+
def close(self):
- if self.p is None:
+ if self.popen is None:
return
- self.p.kill()
- self.p.wait()
- self.p = None
+ self.popen.kill()
+ self.popen.wait()
+ self.popen = None
def __del__(self):
self.close()
def read_output(self):
- import _winapi
-
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
bytes_read, res = overlapped.GetOverlappedResult(False)
if res != 0:
return
- return overlapped.getbuffer().decode()
+ output = overlapped.getbuffer()
+ return output.decode('oem', 'replace')
def getloadavg(self):
typeperf_output = self.read_output()
@@ -89,14 +111,29 @@ class WindowsLoadTracker():
# Process the backlog of load values
for line in typeperf_output.splitlines():
+ # Ignore the initial header:
+ # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
+ if '\\\\' in line:
+ continue
+
+ # Ignore blank lines
+ if not line.strip():
+ continue
+
# typeperf outputs in a CSV format like this:
# "07/19/2018 01:32:26.605","3.000000"
- toks = line.split(',')
- # Ignore blank lines and the initial header
- if line.strip() == '' or (COUNTER_NAME in line) or len(toks) != 2:
+ # (date, process queue length)
+ try:
+ tokens = line.split(',')
+ if len(tokens) != 2:
+ raise ValueError
+
+ value = tokens[1].replace('"', '')
+ load = float(value)
+ except ValueError:
+ print_warning("Failed to parse typeperf output: %a" % line)
continue
- load = float(toks[1].replace('"', ''))
# We use an exponentially weighted moving average, imitating the
# load calculation on Unix systems.
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation