summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_wmi.py
blob: bf8c52e646dc18d2799967574f3546791a163ac2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# Test the internal _wmi module on Windows
# This is used by the platform module, and potentially others

import time
import unittest
from test.support import import_helper, requires_resource, LOOPBACK_TIMEOUT


# Do this first so test will be skipped if module doesn't exist
_wmi = import_helper.import_module('_wmi', required_on=['win'])


def wmi_exec_query(query):
    # gh-112278: WMI maybe slow response when first call.
    try:
        return _wmi.exec_query(query)
    except WindowsError as e:
        if e.winerror != 258:
            raise
        time.sleep(LOOPBACK_TIMEOUT)
        return _wmi.exec_query(query)


class WmiTests(unittest.TestCase):
    def test_wmi_query_os_version(self):
        r = wmi_exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
        self.assertEqual(1, len(r))
        k, eq, v = r[0].partition("=")
        self.assertEqual("=", eq, r[0])
        self.assertEqual("Version", k, r[0])
        # Best we can check for the version is that it's digits, dot, digits, anything
        # Otherwise, we are likely checking the result of the query against itself
        self.assertRegex(v, r"\d+\.\d+.+$", r[0])

    def test_wmi_query_repeated(self):
        # Repeated queries should not break
        for _ in range(10):
            self.test_wmi_query_os_version()

    def test_wmi_query_error(self):
        # Invalid queries fail with OSError
        try:
            wmi_exec_query("SELECT InvalidColumnName FROM InvalidTableName")
        except OSError as ex:
            if ex.winerror & 0xFFFFFFFF == 0x80041010:
                # This is the expected error code. All others should fail the test
                return
        self.fail("Expected OSError")

    def test_wmi_query_repeated_error(self):
        for _ in range(10):
            self.test_wmi_query_error()

    def test_wmi_query_not_select(self):
        # Queries other than SELECT are blocked to avoid potential exploits
        with self.assertRaises(ValueError):
            wmi_exec_query("not select, just in case someone tries something")

    @requires_resource('cpu')
    def test_wmi_query_overflow(self):
        # Ensure very big queries fail
        # Test multiple times to ensure consistency
        for _ in range(2):
            with self.assertRaises(OSError):
                wmi_exec_query("SELECT * FROM CIM_DataFile")

    def test_wmi_query_multiple_rows(self):
        # Multiple instances should have an extra null separator
        r = wmi_exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
        self.assertFalse(r.startswith("\0"), r)
        self.assertFalse(r.endswith("\0"), r)
        it = iter(r.split("\0"))
        try:
            while True:
                self.assertRegex(next(it), r"ProcessId=\d+")
                self.assertEqual("", next(it))
        except StopIteration:
            pass

    def test_wmi_query_threads(self):
        from concurrent.futures import ThreadPoolExecutor
        query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
        with ThreadPoolExecutor(4) as pool:
            task = [pool.submit(wmi_exec_query, query) for _ in range(32)]
            for t in task:
                self.assertRegex(t.result(), "ProcessId=")