diff options
author | Steve Dower <steve.dower@python.org> | 2024-04-15 15:43:28 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-15 15:43:28 (GMT) |
commit | 667a574c8fca0393d351f558680805cad9b71ec5 (patch) | |
tree | 8769f737a17976bb7269cdf4288333342df10ac7 /Lib/test | |
parent | 3e5109682b96aad62d8ffec676eb1356056471b4 (diff) | |
download | cpython-667a574c8fca0393d351f558680805cad9b71ec5.zip cpython-667a574c8fca0393d351f558680805cad9b71ec5.tar.gz cpython-667a574c8fca0393d351f558680805cad9b71ec5.tar.bz2 |
gh-112278: Improve error handling in wmi module and tests (GH-117818)
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_wmi.py | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/Lib/test/test_wmi.py b/Lib/test/test_wmi.py index 3445702..f667926 100644 --- a/Lib/test/test_wmi.py +++ b/Lib/test/test_wmi.py @@ -1,17 +1,31 @@ # Test the internal _wmi module on Windows # This is used by the platform module, and potentially others +import time import unittest -from test.support import import_helper, requires_resource +from test.support import import_helper, requires_resource, LOOPBACK_TIMEOUT # Do this first so test will be skipped if module doesn't exist _wmi = import_helper.import_module('_wmi', required_on=['win']) +def wmi_exec_query(query): + # gh-112278: WMI maybe slow response when first call. + try: + return _wmi.exec_query(query) + except BrokenPipeError: + pass + except WindowsError as e: + if e.winerror != 258: + raise + time.sleep(LOOPBACK_TIMEOUT) + return _wmi.exec_query(query) + + class WmiTests(unittest.TestCase): def test_wmi_query_os_version(self): - r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0") + r = wmi_exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0") self.assertEqual(1, len(r)) k, eq, v = r[0].partition("=") self.assertEqual("=", eq, r[0]) @@ -28,7 +42,7 @@ class WmiTests(unittest.TestCase): def test_wmi_query_error(self): # Invalid queries fail with OSError try: - _wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName") + wmi_exec_query("SELECT InvalidColumnName FROM InvalidTableName") except OSError as ex: if ex.winerror & 0xFFFFFFFF == 0x80041010: # This is the expected error code. All others should fail the test @@ -42,7 +56,7 @@ class WmiTests(unittest.TestCase): def test_wmi_query_not_select(self): # Queries other than SELECT are blocked to avoid potential exploits with self.assertRaises(ValueError): - _wmi.exec_query("not select, just in case someone tries something") + wmi_exec_query("not select, just in case someone tries something") @requires_resource('cpu') def test_wmi_query_overflow(self): @@ -50,11 +64,11 @@ class WmiTests(unittest.TestCase): # Test multiple times to ensure consistency for _ in range(2): with self.assertRaises(OSError): - _wmi.exec_query("SELECT * FROM CIM_DataFile") + wmi_exec_query("SELECT * FROM CIM_DataFile") def test_wmi_query_multiple_rows(self): # Multiple instances should have an extra null separator - r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000") + r = wmi_exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000") self.assertFalse(r.startswith("\0"), r) self.assertFalse(r.endswith("\0"), r) it = iter(r.split("\0")) @@ -69,6 +83,6 @@ class WmiTests(unittest.TestCase): from concurrent.futures import ThreadPoolExecutor query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000" with ThreadPoolExecutor(4) as pool: - task = [pool.submit(_wmi.exec_query, query) for _ in range(32)] + task = [pool.submit(wmi_exec_query, query) for _ in range(32)] for t in task: self.assertRegex(t.result(), "ProcessId=") |