summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest/win_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/libregrtest/win_utils.py')
-rw-r--r--Lib/test/libregrtest/win_utils.py258
1 files changed, 95 insertions, 163 deletions
diff --git a/Lib/test/libregrtest/win_utils.py b/Lib/test/libregrtest/win_utils.py
index a1cc220..5736cdf 100644
--- a/Lib/test/libregrtest/win_utils.py
+++ b/Lib/test/libregrtest/win_utils.py
@@ -1,16 +1,11 @@
+import _overlapped
+import _thread
import _winapi
import math
-import msvcrt
-import os
-import subprocess
-import uuid
+import struct
import winreg
-from test.support import os_helper
-from test.libregrtest.utils import print_warning
-# Max size of asynchronous reads
-BUFSIZE = 8192
# Seconds per measurement
SAMPLING_INTERVAL = 1
# Exponential damping factor to compute exponentially weighted moving average
@@ -19,174 +14,111 @@ LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
# Initialize the load using the arithmetic mean of the first NVALUE values
# of the Processor Queue Length
NVALUE = 5
-# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
-# of typeperf are registered
-COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
- r"\Perflib\CurrentLanguage")
class WindowsLoadTracker():
"""
- This class asynchronously interacts with the `typeperf` command to read
- the system load on Windows. Multiprocessing and threads can't be used
- here because they interfere with the test suite's cases for those
- modules.
+ This class asynchronously reads the performance counters to calculate
+ the system load on Windows. A "raw" thread is used here to prevent
+ interference with the test suite's cases for the threading module.
"""
def __init__(self):
+ # Pre-flight test for access to the performance data;
+ # `PermissionError` will be raised if not allowed
+ winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)
+
self._values = []
self._load = None
- self._buffer = ''
- self._popen = None
- self.start()
-
- def start(self):
- # Create a named pipe which allows for asynchronous IO in Windows
- pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())
-
- open_mode = _winapi.PIPE_ACCESS_INBOUND
- open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
- open_mode |= _winapi.FILE_FLAG_OVERLAPPED
-
- # This is the read end of the pipe, where we will be grabbing output
- self.pipe = _winapi.CreateNamedPipe(
- pipe_name, open_mode, _winapi.PIPE_WAIT,
- 1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
- )
- # The write end of the pipe which is passed to the created process
- pipe_write_end = _winapi.CreateFile(
- pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
- _winapi.OPEN_EXISTING, 0, _winapi.NULL
- )
- # Open up the handle as a python file object so we can pass it to
- # subprocess
- command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)
-
- # Connect to the read end of the pipe in overlap/async mode
- overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
- overlap.GetOverlappedResult(True)
-
- # Spawn off the load monitor
- counter_name = self._get_counter_name()
- command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
- self._popen = subprocess.Popen(' '.join(command),
- stdout=command_stdout,
- cwd=os_helper.SAVEDCWD)
-
- # Close our copy of the write end of the pipe
- os.close(command_stdout)
-
- def _get_counter_name(self):
- # accessing the registry to get the counter localization name
- with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
- counters = winreg.QueryValueEx(perfkey, 'Counter')[0]
-
- # Convert [key1, value1, key2, value2, ...] list
- # to {key1: value1, key2: value2, ...} dict
- counters = iter(counters)
- counters_dict = dict(zip(counters, counters))
-
- # System counter has key '2' and Processor Queue Length has key '44'
- system = counters_dict['2']
- process_queue_length = counters_dict['44']
- return f'"\\{system}\\{process_queue_length}"'
-
- def close(self, kill=True):
- if self._popen is None:
+ self._running = _overlapped.CreateEvent(None, True, False, None)
+ self._stopped = _overlapped.CreateEvent(None, True, False, None)
+
+ _thread.start_new_thread(self._update_load, (), {})
+
+ def _update_load(self,
+ # localize module access to prevent shutdown errors
+ _wait=_winapi.WaitForSingleObject,
+ _signal=_overlapped.SetEvent):
+ # run until signaled to stop
+ while _wait(self._running, 1000):
+ self._calculate_load()
+ # notify stopped
+ _signal(self._stopped)
+
+ def _calculate_load(self,
+ # localize module access to prevent shutdown errors
+ _query=winreg.QueryValueEx,
+ _hkey=winreg.HKEY_PERFORMANCE_DATA,
+ _unpack=struct.unpack_from):
+ # get the 'System' object
+ data, _ = _query(_hkey, '2')
+ # PERF_DATA_BLOCK {
+ # WCHAR Signature[4] 8 +
+ # DWOWD LittleEndian 4 +
+ # DWORD Version 4 +
+ # DWORD Revision 4 +
+ # DWORD TotalByteLength 4 +
+ # DWORD HeaderLength = 24 byte offset
+ # ...
+ # }
+ obj_start, = _unpack('L', data, 24)
+ # PERF_OBJECT_TYPE {
+ # DWORD TotalByteLength
+ # DWORD DefinitionLength
+ # DWORD HeaderLength
+ # ...
+ # }
+ data_start, defn_start = _unpack('4xLL', data, obj_start)
+ data_base = obj_start + data_start
+ defn_base = obj_start + defn_start
+ # find the 'Processor Queue Length' counter (index=44)
+ while defn_base < data_base:
+ # PERF_COUNTER_DEFINITION {
+ # DWORD ByteLength
+ # DWORD CounterNameTitleIndex
+ # ... [7 DWORDs/28 bytes]
+ # DWORD CounterOffset
+ # }
+ size, idx, offset = _unpack('LL28xL', data, defn_base)
+ defn_base += size
+ if idx == 44:
+ counter_offset = data_base + offset
+ # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
+ processor_queue_length, = _unpack('L', data, counter_offset)
+ break
+ else:
return
- self._load = None
-
- if kill:
- self._popen.kill()
- self._popen.wait()
- self._popen = None
-
- def __del__(self):
- self.close()
-
- def _parse_line(self, line):
- # typeperf outputs in a CSV format like this:
- # "07/19/2018 01:32:26.605","3.000000"
- # (date, process queue length)
- tokens = line.split(',')
- if len(tokens) != 2:
- raise ValueError
-
- value = tokens[1]
- if not value.startswith('"') or not value.endswith('"'):
- raise ValueError
- value = value[1:-1]
- return float(value)
-
- def _read_lines(self):
- overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
- bytes_read, res = overlapped.GetOverlappedResult(False)
- if res != 0:
- return ()
-
- output = overlapped.getbuffer()
- output = output.decode('oem', 'replace')
- output = self._buffer + output
- lines = output.splitlines(True)
-
- # bpo-36670: typeperf only writes a newline *before* writing a value,
- # not after. Sometimes, the written line in incomplete (ex: only
- # timestamp, without the process queue length). Only pass the last line
- # to the parser if it's a valid value, otherwise store it in
- # self._buffer.
- try:
- self._parse_line(lines[-1])
- except ValueError:
- self._buffer = lines.pop(-1)
+ # We use an exponentially weighted moving average, imitating the
+ # load calculation on Unix systems.
+ # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
+ # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
+ if self._load is not None:
+ self._load = (self._load * LOAD_FACTOR_1
+ + processor_queue_length * (1.0 - LOAD_FACTOR_1))
+ elif len(self._values) < NVALUE:
+ self._values.append(processor_queue_length)
else:
- self._buffer = ''
+ self._load = sum(self._values) / len(self._values)
- return lines
+ def close(self, kill=True):
+ self.__del__()
+ return
+
+ def __del__(self,
+ # localize module access to prevent shutdown errors
+ _wait=_winapi.WaitForSingleObject,
+ _close=_winapi.CloseHandle,
+ _signal=_overlapped.SetEvent):
+ if self._running is not None:
+ # tell the update thread to quit
+ _signal(self._running)
+ # wait for the update thread to signal done
+ _wait(self._stopped, -1)
+ # cleanup events
+ _close(self._running)
+ _close(self._stopped)
+ self._running = self._stopped = None
def getloadavg(self):
- if self._popen is None:
- return None
-
- returncode = self._popen.poll()
- if returncode is not None:
- self.close(kill=False)
- return None
-
- try:
- lines = self._read_lines()
- except BrokenPipeError:
- self.close()
- return None
-
- for line in lines:
- line = line.rstrip()
-
- # Ignore the initial header:
- # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
- if 'PDH-CSV' in line:
- continue
-
- # Ignore blank lines
- if not line:
- continue
-
- try:
- processor_queue_length = self._parse_line(line)
- except ValueError:
- print_warning("Failed to parse typeperf output: %a" % line)
- continue
-
- # We use an exponentially weighted moving average, imitating the
- # load calculation on Unix systems.
- # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
- # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- if self._load is not None:
- self._load = (self._load * LOAD_FACTOR_1
- + processor_queue_length * (1.0 - LOAD_FACTOR_1))
- elif len(self._values) < NVALUE:
- self._values.append(processor_queue_length)
- else:
- self._load = sum(self._values) / len(self._values)
-
return self._load