diff options
Diffstat (limited to 'Lib/test/libregrtest/win_utils.py')
-rw-r--r-- | Lib/test/libregrtest/win_utils.py | 258 |
1 files changed, 95 insertions, 163 deletions
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py index a1cc220..5736cdf 100644 --- a/Lib/test/libregrtest/win_utils.py +++ b/Lib/test/libregrtest/win_utils.py @@ -1,16 +1,11 @@ +import _overlapped +import _thread import _winapi import math -import msvcrt -import os -import subprocess -import uuid +import struct import winreg -from test.support import os_helper -from test.libregrtest.utils import print_warning -# Max size of asynchronous reads -BUFSIZE = 8192 # Seconds per measurement SAMPLING_INTERVAL = 1 # Exponential damping factor to compute exponentially weighted moving average @@ -19,174 +14,111 @@ LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) # Initialize the load using the arithmetic mean of the first NVALUE values # of the Processor Queue Length NVALUE = 5 -# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names -# of typeperf are registered -COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion" - r"\Perflib\CurrentLanguage") class WindowsLoadTracker(): """ - This class asynchronously interacts with the `typeperf` command to read - the system load on Windows. Multiprocessing and threads can't be used - here because they interfere with the test suite's cases for those - modules. + This class asynchronously reads the performance counters to calculate + the system load on Windows. A "raw" thread is used here to prevent + interference with the test suite's cases for the threading module. """ def __init__(self): + # Pre-flight test for access to the performance data; + # `PermissionError` will be raised if not allowed + winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA) + self._values = [] self._load = None - self._buffer = '' - self._popen = None - self.start() - - def start(self): - # Create a named pipe which allows for asynchronous IO in Windows - pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4()) - - open_mode = _winapi.PIPE_ACCESS_INBOUND - open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE - open_mode |= _winapi.FILE_FLAG_OVERLAPPED - - # This is the read end of the pipe, where we will be grabbing output - self.pipe = _winapi.CreateNamedPipe( - pipe_name, open_mode, _winapi.PIPE_WAIT, - 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL - ) - # The write end of the pipe which is passed to the created process - pipe_write_end = _winapi.CreateFile( - pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL, - _winapi.OPEN_EXISTING, 0, _winapi.NULL - ) - # Open up the handle as a python file object so we can pass it to - # subprocess - command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0) - - # Connect to the read end of the pipe in overlap/async mode - overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True) - overlap.GetOverlappedResult(True) - - # Spawn off the load monitor - counter_name = self._get_counter_name() - command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)] - self._popen = subprocess.Popen(' '.join(command), - stdout=command_stdout, - cwd=os_helper.SAVEDCWD) - - # Close our copy of the write end of the pipe - os.close(command_stdout) - - def _get_counter_name(self): - # accessing the registry to get the counter localization name - with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey: - counters = winreg.QueryValueEx(perfkey, 'Counter')[0] - - # Convert [key1, value1, key2, value2, ...] list - # to {key1: value1, key2: value2, ...} dict - counters = iter(counters) - counters_dict = dict(zip(counters, counters)) - - # System counter has key '2' and Processor Queue Length has key '44' - system = counters_dict['2'] - process_queue_length = counters_dict['44'] - return f'"\\{system}\\{process_queue_length}"' - - def close(self, kill=True): - if self._popen is None: + self._running = _overlapped.CreateEvent(None, True, False, None) + self._stopped = _overlapped.CreateEvent(None, True, False, None) + + _thread.start_new_thread(self._update_load, (), {}) + + def _update_load(self, + # localize module access to prevent shutdown errors + _wait=_winapi.WaitForSingleObject, + _signal=_overlapped.SetEvent): + # run until signaled to stop + while _wait(self._running, 1000): + self._calculate_load() + # notify stopped + _signal(self._stopped) + + def _calculate_load(self, + # localize module access to prevent shutdown errors + _query=winreg.QueryValueEx, + _hkey=winreg.HKEY_PERFORMANCE_DATA, + _unpack=struct.unpack_from): + # get the 'System' object + data, _ = _query(_hkey, '2') + # PERF_DATA_BLOCK { + # WCHAR Signature[4] 8 + + # DWOWD LittleEndian 4 + + # DWORD Version 4 + + # DWORD Revision 4 + + # DWORD TotalByteLength 4 + + # DWORD HeaderLength = 24 byte offset + # ... + # } + obj_start, = _unpack('L', data, 24) + # PERF_OBJECT_TYPE { + # DWORD TotalByteLength + # DWORD DefinitionLength + # DWORD HeaderLength + # ... + # } + data_start, defn_start = _unpack('4xLL', data, obj_start) + data_base = obj_start + data_start + defn_base = obj_start + defn_start + # find the 'Processor Queue Length' counter (index=44) + while defn_base < data_base: + # PERF_COUNTER_DEFINITION { + # DWORD ByteLength + # DWORD CounterNameTitleIndex + # ... [7 DWORDs/28 bytes] + # DWORD CounterOffset + # } + size, idx, offset = _unpack('LL28xL', data, defn_base) + defn_base += size + if idx == 44: + counter_offset = data_base + offset + # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD) + processor_queue_length, = _unpack('L', data, counter_offset) + break + else: return - self._load = None - - if kill: - self._popen.kill() - self._popen.wait() - self._popen = None - - def __del__(self): - self.close() - - def _parse_line(self, line): - # typeperf outputs in a CSV format like this: - # "07/19/2018 01:32:26.605","3.000000" - # (date, process queue length) - tokens = line.split(',') - if len(tokens) != 2: - raise ValueError - - value = tokens[1] - if not value.startswith('"') or not value.endswith('"'): - raise ValueError - value = value[1:-1] - return float(value) - - def _read_lines(self): - overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True) - bytes_read, res = overlapped.GetOverlappedResult(False) - if res != 0: - return () - - output = overlapped.getbuffer() - output = output.decode('oem', 'replace') - output = self._buffer + output - lines = output.splitlines(True) - - # bpo-36670: typeperf only writes a newline *before* writing a value, - # not after. Sometimes, the written line in incomplete (ex: only - # timestamp, without the process queue length). Only pass the last line - # to the parser if it's a valid value, otherwise store it in - # self._buffer. - try: - self._parse_line(lines[-1]) - except ValueError: - self._buffer = lines.pop(-1) + # We use an exponentially weighted moving average, imitating the + # load calculation on Unix systems. + # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation + # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average + if self._load is not None: + self._load = (self._load * LOAD_FACTOR_1 + + processor_queue_length * (1.0 - LOAD_FACTOR_1)) + elif len(self._values) < NVALUE: + self._values.append(processor_queue_length) else: - self._buffer = '' + self._load = sum(self._values) / len(self._values) - return lines + def close(self, kill=True): + self.__del__() + return + + def __del__(self, + # localize module access to prevent shutdown errors + _wait=_winapi.WaitForSingleObject, + _close=_winapi.CloseHandle, + _signal=_overlapped.SetEvent): + if self._running is not None: + # tell the update thread to quit + _signal(self._running) + # wait for the update thread to signal done + _wait(self._stopped, -1) + # cleanup events + _close(self._running) + _close(self._stopped) + self._running = self._stopped = None def getloadavg(self): - if self._popen is None: - return None - - returncode = self._popen.poll() - if returncode is not None: - self.close(kill=False) - return None - - try: - lines = self._read_lines() - except BrokenPipeError: - self.close() - return None - - for line in lines: - line = line.rstrip() - - # Ignore the initial header: - # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length" - if 'PDH-CSV' in line: - continue - - # Ignore blank lines - if not line: - continue - - try: - processor_queue_length = self._parse_line(line) - except ValueError: - print_warning("Failed to parse typeperf output: %a" % line) - continue - - # We use an exponentially weighted moving average, imitating the - # load calculation on Unix systems. - # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation - # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - if self._load is not None: - self._load = (self._load * LOAD_FACTOR_1 - + processor_queue_length * (1.0 - LOAD_FACTOR_1)) - elif len(self._values) < NVALUE: - self._values.append(processor_queue_length) - else: - self._load = sum(self._values) / len(self._values) - return self._load |