summaryrefslogtreecommitdiffstats
path: root/Lib/test/libregrtest/win_utils.py
blob: a1cc2201147dbc720b637f03fd5aad098fa63f7d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import _winapi
import math
import msvcrt
import os
import subprocess
import uuid
import winreg
from test.support import os_helper
from test.libregrtest.utils import print_warning


# Max size of asynchronous reads
BUFSIZE = 8192
# Seconds per measurement
SAMPLING_INTERVAL = 1
# Exponential damping factor to compute exponentially weighted moving average
# on 1 minute (60 seconds)
LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
# Initialize the load using the arithmetic mean of the first NVALUE values
# of the Processor Queue Length
NVALUE = 5
# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
# of typeperf are registered
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
                        r"\Perflib\CurrentLanguage")


class WindowsLoadTracker():
    """
    This class asynchronously interacts with the `typeperf` command to read
    the system load on Windows. Multiprocessing and threads can't be used
    here because they interfere with the test suite's cases for those
    modules.
    """

    def __init__(self):
        self._values = []
        self._load = None
        self._buffer = ''
        self._popen = None
        self.start()

    def start(self):
        # Create a named pipe which allows for asynchronous IO in Windows
        pipe_name =  r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())

        open_mode =  _winapi.PIPE_ACCESS_INBOUND
        open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
        open_mode |= _winapi.FILE_FLAG_OVERLAPPED

        # This is the read end of the pipe, where we will be grabbing output
        self.pipe = _winapi.CreateNamedPipe(
            pipe_name, open_mode, _winapi.PIPE_WAIT,
            1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
        )
        # The write end of the pipe which is passed to the created process
        pipe_write_end = _winapi.CreateFile(
            pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
            _winapi.OPEN_EXISTING, 0, _winapi.NULL
        )
        # Open up the handle as a python file object so we can pass it to
        # subprocess
        command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)

        # Connect to the read end of the pipe in overlap/async mode
        overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
        overlap.GetOverlappedResult(True)

        # Spawn off the load monitor
        counter_name = self._get_counter_name()
        command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
        self._popen = subprocess.Popen(' '.join(command),
                                       stdout=command_stdout,
                                       cwd=os_helper.SAVEDCWD)

        # Close our copy of the write end of the pipe
        os.close(command_stdout)

    def _get_counter_name(self):
        # accessing the registry to get the counter localization name
        with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
            counters = winreg.QueryValueEx(perfkey, 'Counter')[0]

        # Convert [key1, value1, key2, value2, ...] list
        # to {key1: value1, key2: value2, ...} dict
        counters = iter(counters)
        counters_dict = dict(zip(counters, counters))

        # System counter has key '2' and Processor Queue Length has key '44'
        system = counters_dict['2']
        process_queue_length = counters_dict['44']
        return f'"\\{system}\\{process_queue_length}"'

    def close(self, kill=True):
        if self._popen is None:
            return

        self._load = None

        if kill:
            self._popen.kill()
        self._popen.wait()
        self._popen = None

    def __del__(self):
        self.close()

    def _parse_line(self, line):
        # typeperf outputs in a CSV format like this:
        # "07/19/2018 01:32:26.605","3.000000"
        # (date, process queue length)
        tokens = line.split(',')
        if len(tokens) != 2:
            raise ValueError

        value = tokens[1]
        if not value.startswith('"') or not value.endswith('"'):
            raise ValueError
        value = value[1:-1]
        return float(value)

    def _read_lines(self):
        overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
        bytes_read, res = overlapped.GetOverlappedResult(False)
        if res != 0:
            return ()

        output = overlapped.getbuffer()
        output = output.decode('oem', 'replace')
        output = self._buffer + output
        lines = output.splitlines(True)

        # bpo-36670: typeperf only writes a newline *before* writing a value,
        # not after. Sometimes, the written line in incomplete (ex: only
        # timestamp, without the process queue length). Only pass the last line
        # to the parser if it's a valid value, otherwise store it in
        # self._buffer.
        try:
            self._parse_line(lines[-1])
        except ValueError:
            self._buffer = lines.pop(-1)
        else:
            self._buffer = ''

        return lines

    def getloadavg(self):
        if self._popen is None:
            return None

        returncode = self._popen.poll()
        if returncode is not None:
            self.close(kill=False)
            return None

        try:
            lines = self._read_lines()
        except BrokenPipeError:
            self.close()
            return None

        for line in lines:
            line = line.rstrip()

            # Ignore the initial header:
            # "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
            if 'PDH-CSV' in line:
                continue

            # Ignore blank lines
            if not line:
                continue

            try:
                processor_queue_length = self._parse_line(line)
            except ValueError:
                print_warning("Failed to parse typeperf output: %a" % line)
                continue

            # We use an exponentially weighted moving average, imitating the
            # load calculation on Unix systems.
            # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
            # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
            if self._load is not None:
                self._load = (self._load * LOAD_FACTOR_1
                              + processor_queue_length  * (1.0 - LOAD_FACTOR_1))
            elif len(self._values) < NVALUE:
                self._values.append(processor_queue_length)
            else:
                self._load = sum(self._values) / len(self._values)

        return self._load