diff options
author | Steve Dower <steve.dower@python.org> | 2022-09-07 20:09:20 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-07 20:09:20 (GMT) |
commit | de33df27aaf930be6a34027c530a651f0b4c91f5 (patch) | |
tree | 789d8df85c96cdf28c36a3e89f65d96d49949bc5 /Lib | |
parent | 4114bcc9ef7595a07196bcecf9c7d6d39f57f64d (diff) | |
download | cpython-de33df27aaf930be6a34027c530a651f0b4c91f5.zip cpython-de33df27aaf930be6a34027c530a651f0b4c91f5.tar.gz cpython-de33df27aaf930be6a34027c530a651f0b4c91f5.tar.bz2 |
gh-89545: Updates platform module to use new internal _wmi module on Windows to directly query OS properties (GH-96289)
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/ntpath.py | 5 | ||||
-rwxr-xr-x | Lib/platform.py | 148 | ||||
-rw-r--r-- | Lib/test/audit-tests.py | 11 | ||||
-rw-r--r-- | Lib/test/test_audit.py | 15 | ||||
-rw-r--r-- | Lib/test/test_platform.py | 41 | ||||
-rw-r--r-- | Lib/test/test_wmi.py | 67 |
6 files changed, 228 insertions, 59 deletions
diff --git a/Lib/ntpath.py b/Lib/ntpath.py index 959bcd0..d9582f4 100644 --- a/Lib/ntpath.py +++ b/Lib/ntpath.py @@ -732,9 +732,8 @@ else: return path -# Win9x family and earlier have no Unicode filename support. -supports_unicode_filenames = (hasattr(sys, "getwindowsversion") and - sys.getwindowsversion()[3] >= 2) +# All supported version have Unicode filename support. +supports_unicode_filenames = True def relpath(path, start=None): """Return a relative version of a path""" diff --git a/Lib/platform.py b/Lib/platform.py index df8faac..9f5b317 100755 --- a/Lib/platform.py +++ b/Lib/platform.py @@ -309,34 +309,52 @@ def _syscmd_ver(system='', release='', version='', version = _norm_version(version) return system, release, version -_WIN32_CLIENT_RELEASES = { - (5, 0): "2000", - (5, 1): "XP", - # Strictly, 5.2 client is XP 64-bit, but platform.py historically - # has always called it 2003 Server - (5, 2): "2003Server", - (5, None): "post2003", - - (6, 0): "Vista", - (6, 1): "7", - (6, 2): "8", - (6, 3): "8.1", - (6, None): "post8.1", - - (10, 0): "10", - (10, None): "post10", -} - -# Server release name lookup will default to client names if necessary -_WIN32_SERVER_RELEASES = { - (5, 2): "2003Server", - - (6, 0): "2008Server", - (6, 1): "2008ServerR2", - (6, 2): "2012Server", - (6, 3): "2012ServerR2", - (6, None): "post2012ServerR2", -} +try: + import _wmi +except ImportError: + def _wmi_query(*keys): + raise OSError("not supported") +else: + def _wmi_query(table, *keys): + table = { + "OS": "Win32_OperatingSystem", + "CPU": "Win32_Processor", + }[table] + data = _wmi.exec_query("SELECT {} FROM {}".format( + ",".join(keys), + table, + )).split("\0") + split_data = (i.partition("=") for i in data) + dict_data = {i[0]: i[2] for i in split_data} + return (dict_data[k] for k in keys) + + +_WIN32_CLIENT_RELEASES = [ + ((10, 1, 0), "post11"), + ((10, 0, 22000), "11"), + ((6, 4, 0), "10"), + ((6, 3, 0), "8.1"), + ((6, 2, 0), "8"), + ((6, 1, 0), "7"), + ((6, 0, 0), "Vista"), + ((5, 2, 3790), "XP64"), + ((5, 2, 0), "XPMedia"), + ((5, 1, 0), "XP"), + ((5, 0, 0), "2000"), +] + +_WIN32_SERVER_RELEASES = [ + ((10, 1, 0), "post2022Server"), + ((10, 0, 20348), "2022Server"), + ((10, 0, 17763), "2019Server"), + ((6, 4, 0), "2016Server"), + ((6, 3, 0), "2012ServerR2"), + ((6, 2, 0), "2012Server"), + ((6, 1, 0), "2008ServerR2"), + ((6, 0, 0), "2008Server"), + ((5, 2, 0), "2003Server"), + ((5, 0, 0), "2000Server"), +] def win32_is_iot(): return win32_edition() in ('IoTUAP', 'NanoServer', 'WindowsCoreHeadless', 'IoTEdgeOS') @@ -359,22 +377,40 @@ def win32_edition(): return None -def win32_ver(release='', version='', csd='', ptype=''): +def _win32_ver(version, csd, ptype): + # Try using WMI first, as this is the canonical source of data + try: + (version, product_type, ptype, spmajor, spminor) = _wmi_query( + 'OS', + 'Version', + 'ProductType', + 'BuildType', + 'ServicePackMajorVersion', + 'ServicePackMinorVersion', + ) + is_client = (int(product_type) == 1) + if spminor and spminor != '0': + csd = f'SP{spmajor}.{spminor}' + else: + csd = f'SP{spmajor}' + return version, csd, ptype, is_client + except OSError: + pass + + # Fall back to a combination of sys.getwindowsversion and "ver" try: from sys import getwindowsversion except ImportError: - return release, version, csd, ptype + return version, csd, ptype, True winver = getwindowsversion() + is_client = (getattr(winver, 'product_type', 1) == 1) try: - major, minor, build = map(int, _syscmd_ver()[2].split('.')) + version = _syscmd_ver()[2] + major, minor, build = map(int, version.split('.')) except ValueError: major, minor, build = winver.platform_version or winver[:3] - version = '{0}.{1}.{2}'.format(major, minor, build) - - release = (_WIN32_CLIENT_RELEASES.get((major, minor)) or - _WIN32_CLIENT_RELEASES.get((major, None)) or - release) + version = '{0}.{1}.{2}'.format(major, minor, build) # getwindowsversion() reflect the compatibility mode Python is # running under, and so the service pack value is only going to be @@ -386,12 +422,6 @@ def win32_ver(release='', version='', csd='', ptype=''): if csd[:13] == 'Service Pack ': csd = 'SP' + csd[13:] - # VER_NT_SERVER = 3 - if getattr(winver, 'product_type', None) == 3: - release = (_WIN32_SERVER_RELEASES.get((major, minor)) or - _WIN32_SERVER_RELEASES.get((major, None)) or - release) - try: try: import winreg @@ -407,6 +437,18 @@ def win32_ver(release='', version='', csd='', ptype=''): except OSError: pass + return version, csd, ptype, is_client + +def win32_ver(release='', version='', csd='', ptype=''): + is_client = False + + version, csd, ptype, is_client = _win32_ver(version, csd, ptype) + + if version: + intversion = tuple(map(int, version.split('.'))) + releases = _WIN32_CLIENT_RELEASES if is_client else _WIN32_SERVER_RELEASES + release = next((r for v, r in releases if v <= intversion), release) + return release, version, csd, ptype @@ -725,6 +767,21 @@ def _get_machine_win32(): # http://www.geocities.com/rick_lively/MANUALS/ENV/MSWIN/PROCESSI.HTM # WOW64 processes mask the native architecture + try: + [arch, *_] = _wmi_query('CPU', 'Architecture') + except OSError: + pass + else: + try: + arch = ['x86', 'MIPS', 'Alpha', 'PowerPC', None, + 'ARM', 'ia64', None, None, + 'AMD64', None, None, 'ARM64', + ][int(arch)] + except (ValueError, IndexError): + pass + else: + if arch: + return arch return ( os.environ.get('PROCESSOR_ARCHITEW6432', '') or os.environ.get('PROCESSOR_ARCHITECTURE', '') @@ -738,7 +795,12 @@ class _Processor: return func() or '' def get_win32(): - return os.environ.get('PROCESSOR_IDENTIFIER', _get_machine_win32()) + try: + manufacturer, caption = _wmi_query('CPU', 'Manufacturer', 'Caption') + except OSError: + return os.environ.get('PROCESSOR_IDENTIFIER', _get_machine_win32()) + else: + return f'{caption}, {manufacturer}' def get_OpenVMS(): try: diff --git a/Lib/test/audit-tests.py b/Lib/test/audit-tests.py index 00333cc..66c08f7 100644 --- a/Lib/test/audit-tests.py +++ b/Lib/test/audit-tests.py @@ -419,6 +419,17 @@ def test_sys_getframe(): sys._getframe() +def test_wmi_exec_query(): + import _wmi + + def hook(event, args): + if event.startswith("_wmi."): + print(event, args[0]) + + sys.addaudithook(hook) + _wmi.exec_query("SELECT * FROM Win32_OperatingSystem") + + if __name__ == "__main__": from test.support import suppress_msvcrt_asserts diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py index 18426f2..09b3333 100644 --- a/Lib/test/test_audit.py +++ b/Lib/test/test_audit.py @@ -185,5 +185,20 @@ class AuditTest(unittest.TestCase): self.assertEqual(actual, expected) + + def test_wmi_exec_query(self): + import_helper.import_module("_wmi") + returncode, events, stderr = self.run_python("test_wmi_exec_query") + if returncode: + self.fail(stderr) + + if support.verbose: + print(*events, sep='\n') + actual = [(ev[0], ev[2]) for ev in events] + expected = [("_wmi.exec_query", "SELECT * FROM Win32_OperatingSystem")] + + self.assertEqual(actual, expected) + + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_platform.py b/Lib/test/test_platform.py index 9b2cd20..9c03a89 100644 --- a/Lib/test/test_platform.py +++ b/Lib/test/test_platform.py @@ -229,6 +229,14 @@ class PlatformTest(unittest.TestCase): self.assertEqual(res[-1], res.processor) self.assertEqual(len(res), 6) + @unittest.skipUnless(sys.platform.startswith('win'), "windows only test") + def test_uname_win32_without_wmi(self): + def raises_oserror(*a): + raise OSError() + + with support.swap_attr(platform, '_wmi_query', raises_oserror): + self.test_uname() + def test_uname_cast_to_tuple(self): res = platform.uname() expected = ( @@ -289,20 +297,27 @@ class PlatformTest(unittest.TestCase): # on 64 bit Windows: if PROCESSOR_ARCHITEW6432 exists we should be # using it, per # http://blogs.msdn.com/david.wang/archive/2006/03/26/HOWTO-Detect-Process-Bitness.aspx - try: + + # We also need to suppress WMI checks, as those are reliable and + # overrule the environment variables + def raises_oserror(*a): + raise OSError() + + with support.swap_attr(platform, '_wmi_query', raises_oserror): with os_helper.EnvironmentVarGuard() as environ: - if 'PROCESSOR_ARCHITEW6432' in environ: - del environ['PROCESSOR_ARCHITEW6432'] - environ['PROCESSOR_ARCHITECTURE'] = 'foo' - platform._uname_cache = None - system, node, release, version, machine, processor = platform.uname() - self.assertEqual(machine, 'foo') - environ['PROCESSOR_ARCHITEW6432'] = 'bar' - platform._uname_cache = None - system, node, release, version, machine, processor = platform.uname() - self.assertEqual(machine, 'bar') - finally: - platform._uname_cache = None + try: + if 'PROCESSOR_ARCHITEW6432' in environ: + del environ['PROCESSOR_ARCHITEW6432'] + environ['PROCESSOR_ARCHITECTURE'] = 'foo' + platform._uname_cache = None + system, node, release, version, machine, processor = platform.uname() + self.assertEqual(machine, 'foo') + environ['PROCESSOR_ARCHITEW6432'] = 'bar' + platform._uname_cache = None + system, node, release, version, machine, processor = platform.uname() + self.assertEqual(machine, 'bar') + finally: + platform._uname_cache = None def test_java_ver(self): res = platform.java_ver() diff --git a/Lib/test/test_wmi.py b/Lib/test/test_wmi.py new file mode 100644 index 0000000..af2a453 --- /dev/null +++ b/Lib/test/test_wmi.py @@ -0,0 +1,67 @@ +# Test the internal _wmi module on Windows +# This is used by the platform module, and potentially others + +import re +import sys +import unittest +from test.support import import_helper + + +# Do this first so test will be skipped if module doesn't exist +_wmi = import_helper.import_module('_wmi', required_on=['win']) + + +class WmiTests(unittest.TestCase): + def test_wmi_query_os_version(self): + r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0") + self.assertEqual(1, len(r)) + k, eq, v = r[0].partition("=") + self.assertEqual("=", eq, r[0]) + self.assertEqual("Version", k, r[0]) + # Best we can check for the version is that it's digits, dot, digits, anything + # Otherwise, we are likely checking the result of the query against itself + self.assertTrue(re.match(r"\d+\.\d+.+$", v), r[0]) + + def test_wmi_query_repeated(self): + # Repeated queries should not break + for _ in range(10): + self.test_wmi_query_os_version() + + def test_wmi_query_error(self): + # Invalid queries fail with OSError + try: + _wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName") + except OSError as ex: + if ex.winerror & 0xFFFFFFFF == 0x80041010: + # This is the expected error code. All others should fail the test + return + self.fail("Expected OSError") + + def test_wmi_query_repeated_error(self): + for _ in range(10): + self.test_wmi_query_error() + + def test_wmi_query_not_select(self): + # Queries other than SELECT are blocked to avoid potential exploits + with self.assertRaises(ValueError): + _wmi.exec_query("not select, just in case someone tries something") + + def test_wmi_query_overflow(self): + # Ensure very big queries fail + # Test multiple times to ensure consistency + for _ in range(2): + with self.assertRaises(OSError): + _wmi.exec_query("SELECT * FROM CIM_DataFile") + + def test_wmi_query_multiple_rows(self): + # Multiple instances should have an extra null separator + r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000") + self.assertFalse(r.startswith("\0"), r) + self.assertFalse(r.endswith("\0"), r) + it = iter(r.split("\0")) + try: + while True: + self.assertTrue(re.match(r"ProcessId=\d+", next(it))) + self.assertEqual("", next(it)) + except StopIteration: + pass |