summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorSteve Dower <steve.dower@python.org>2022-09-07 20:09:20 (GMT)
committerGitHub <noreply@github.com>2022-09-07 20:09:20 (GMT)
commitde33df27aaf930be6a34027c530a651f0b4c91f5 (patch)
tree789d8df85c96cdf28c36a3e89f65d96d49949bc5 /Lib
parent4114bcc9ef7595a07196bcecf9c7d6d39f57f64d (diff)
downloadcpython-de33df27aaf930be6a34027c530a651f0b4c91f5.zip
cpython-de33df27aaf930be6a34027c530a651f0b4c91f5.tar.gz
cpython-de33df27aaf930be6a34027c530a651f0b4c91f5.tar.bz2
gh-89545: Updates platform module to use new internal _wmi module on Windows to directly query OS properties (GH-96289)
Diffstat (limited to 'Lib')
-rw-r--r--Lib/ntpath.py5
-rwxr-xr-xLib/platform.py148
-rw-r--r--Lib/test/audit-tests.py11
-rw-r--r--Lib/test/test_audit.py15
-rw-r--r--Lib/test/test_platform.py41
-rw-r--r--Lib/test/test_wmi.py67
6 files changed, 228 insertions, 59 deletions
diff --git a/Lib/ntpath.py b/Lib/ntpath.py
index 959bcd0..d9582f4 100644
--- a/Lib/ntpath.py
+++ b/Lib/ntpath.py
@@ -732,9 +732,8 @@ else:
return path
-# Win9x family and earlier have no Unicode filename support.
-supports_unicode_filenames = (hasattr(sys, "getwindowsversion") and
- sys.getwindowsversion()[3] >= 2)
+# All supported version have Unicode filename support.
+supports_unicode_filenames = True
def relpath(path, start=None):
"""Return a relative version of a path"""
diff --git a/Lib/platform.py b/Lib/platform.py
index df8faac..9f5b317 100755
--- a/Lib/platform.py
+++ b/Lib/platform.py
@@ -309,34 +309,52 @@ def _syscmd_ver(system='', release='', version='',
version = _norm_version(version)
return system, release, version
-_WIN32_CLIENT_RELEASES = {
- (5, 0): "2000",
- (5, 1): "XP",
- # Strictly, 5.2 client is XP 64-bit, but platform.py historically
- # has always called it 2003 Server
- (5, 2): "2003Server",
- (5, None): "post2003",
-
- (6, 0): "Vista",
- (6, 1): "7",
- (6, 2): "8",
- (6, 3): "8.1",
- (6, None): "post8.1",
-
- (10, 0): "10",
- (10, None): "post10",
-}
-
-# Server release name lookup will default to client names if necessary
-_WIN32_SERVER_RELEASES = {
- (5, 2): "2003Server",
-
- (6, 0): "2008Server",
- (6, 1): "2008ServerR2",
- (6, 2): "2012Server",
- (6, 3): "2012ServerR2",
- (6, None): "post2012ServerR2",
-}
+try:
+ import _wmi
+except ImportError:
+ def _wmi_query(*keys):
+ raise OSError("not supported")
+else:
+ def _wmi_query(table, *keys):
+ table = {
+ "OS": "Win32_OperatingSystem",
+ "CPU": "Win32_Processor",
+ }[table]
+ data = _wmi.exec_query("SELECT {} FROM {}".format(
+ ",".join(keys),
+ table,
+ )).split("\0")
+ split_data = (i.partition("=") for i in data)
+ dict_data = {i[0]: i[2] for i in split_data}
+ return (dict_data[k] for k in keys)
+
+
+_WIN32_CLIENT_RELEASES = [
+ ((10, 1, 0), "post11"),
+ ((10, 0, 22000), "11"),
+ ((6, 4, 0), "10"),
+ ((6, 3, 0), "8.1"),
+ ((6, 2, 0), "8"),
+ ((6, 1, 0), "7"),
+ ((6, 0, 0), "Vista"),
+ ((5, 2, 3790), "XP64"),
+ ((5, 2, 0), "XPMedia"),
+ ((5, 1, 0), "XP"),
+ ((5, 0, 0), "2000"),
+]
+
+_WIN32_SERVER_RELEASES = [
+ ((10, 1, 0), "post2022Server"),
+ ((10, 0, 20348), "2022Server"),
+ ((10, 0, 17763), "2019Server"),
+ ((6, 4, 0), "2016Server"),
+ ((6, 3, 0), "2012ServerR2"),
+ ((6, 2, 0), "2012Server"),
+ ((6, 1, 0), "2008ServerR2"),
+ ((6, 0, 0), "2008Server"),
+ ((5, 2, 0), "2003Server"),
+ ((5, 0, 0), "2000Server"),
+]
def win32_is_iot():
return win32_edition() in ('IoTUAP', 'NanoServer', 'WindowsCoreHeadless', 'IoTEdgeOS')
@@ -359,22 +377,40 @@ def win32_edition():
return None
-def win32_ver(release='', version='', csd='', ptype=''):
+def _win32_ver(version, csd, ptype):
+ # Try using WMI first, as this is the canonical source of data
+ try:
+ (version, product_type, ptype, spmajor, spminor) = _wmi_query(
+ 'OS',
+ 'Version',
+ 'ProductType',
+ 'BuildType',
+ 'ServicePackMajorVersion',
+ 'ServicePackMinorVersion',
+ )
+ is_client = (int(product_type) == 1)
+ if spminor and spminor != '0':
+ csd = f'SP{spmajor}.{spminor}'
+ else:
+ csd = f'SP{spmajor}'
+ return version, csd, ptype, is_client
+ except OSError:
+ pass
+
+ # Fall back to a combination of sys.getwindowsversion and "ver"
try:
from sys import getwindowsversion
except ImportError:
- return release, version, csd, ptype
+ return version, csd, ptype, True
winver = getwindowsversion()
+ is_client = (getattr(winver, 'product_type', 1) == 1)
try:
- major, minor, build = map(int, _syscmd_ver()[2].split('.'))
+ version = _syscmd_ver()[2]
+ major, minor, build = map(int, version.split('.'))
except ValueError:
major, minor, build = winver.platform_version or winver[:3]
- version = '{0}.{1}.{2}'.format(major, minor, build)
-
- release = (_WIN32_CLIENT_RELEASES.get((major, minor)) or
- _WIN32_CLIENT_RELEASES.get((major, None)) or
- release)
+ version = '{0}.{1}.{2}'.format(major, minor, build)
# getwindowsversion() reflect the compatibility mode Python is
# running under, and so the service pack value is only going to be
@@ -386,12 +422,6 @@ def win32_ver(release='', version='', csd='', ptype=''):
if csd[:13] == 'Service Pack ':
csd = 'SP' + csd[13:]
- # VER_NT_SERVER = 3
- if getattr(winver, 'product_type', None) == 3:
- release = (_WIN32_SERVER_RELEASES.get((major, minor)) or
- _WIN32_SERVER_RELEASES.get((major, None)) or
- release)
-
try:
try:
import winreg
@@ -407,6 +437,18 @@ def win32_ver(release='', version='', csd='', ptype=''):
except OSError:
pass
+ return version, csd, ptype, is_client
+
+def win32_ver(release='', version='', csd='', ptype=''):
+ is_client = False
+
+ version, csd, ptype, is_client = _win32_ver(version, csd, ptype)
+
+ if version:
+ intversion = tuple(map(int, version.split('.')))
+ releases = _WIN32_CLIENT_RELEASES if is_client else _WIN32_SERVER_RELEASES
+ release = next((r for v, r in releases if v <= intversion), release)
+
return release, version, csd, ptype
@@ -725,6 +767,21 @@ def _get_machine_win32():
# http://www.geocities.com/rick_lively/MANUALS/ENV/MSWIN/PROCESSI.HTM
# WOW64 processes mask the native architecture
+ try:
+ [arch, *_] = _wmi_query('CPU', 'Architecture')
+ except OSError:
+ pass
+ else:
+ try:
+ arch = ['x86', 'MIPS', 'Alpha', 'PowerPC', None,
+ 'ARM', 'ia64', None, None,
+ 'AMD64', None, None, 'ARM64',
+ ][int(arch)]
+ except (ValueError, IndexError):
+ pass
+ else:
+ if arch:
+ return arch
return (
os.environ.get('PROCESSOR_ARCHITEW6432', '') or
os.environ.get('PROCESSOR_ARCHITECTURE', '')
@@ -738,7 +795,12 @@ class _Processor:
return func() or ''
def get_win32():
- return os.environ.get('PROCESSOR_IDENTIFIER', _get_machine_win32())
+ try:
+ manufacturer, caption = _wmi_query('CPU', 'Manufacturer', 'Caption')
+ except OSError:
+ return os.environ.get('PROCESSOR_IDENTIFIER', _get_machine_win32())
+ else:
+ return f'{caption}, {manufacturer}'
def get_OpenVMS():
try:
diff --git a/Lib/test/audit-tests.py b/Lib/test/audit-tests.py
index 00333cc..66c08f7 100644
--- a/Lib/test/audit-tests.py
+++ b/Lib/test/audit-tests.py
@@ -419,6 +419,17 @@ def test_sys_getframe():
sys._getframe()
+def test_wmi_exec_query():
+ import _wmi
+
+ def hook(event, args):
+ if event.startswith("_wmi."):
+ print(event, args[0])
+
+ sys.addaudithook(hook)
+ _wmi.exec_query("SELECT * FROM Win32_OperatingSystem")
+
+
if __name__ == "__main__":
from test.support import suppress_msvcrt_asserts
diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py
index 18426f2..09b3333 100644
--- a/Lib/test/test_audit.py
+++ b/Lib/test/test_audit.py
@@ -185,5 +185,20 @@ class AuditTest(unittest.TestCase):
self.assertEqual(actual, expected)
+
+ def test_wmi_exec_query(self):
+ import_helper.import_module("_wmi")
+ returncode, events, stderr = self.run_python("test_wmi_exec_query")
+ if returncode:
+ self.fail(stderr)
+
+ if support.verbose:
+ print(*events, sep='\n')
+ actual = [(ev[0], ev[2]) for ev in events]
+ expected = [("_wmi.exec_query", "SELECT * FROM Win32_OperatingSystem")]
+
+ self.assertEqual(actual, expected)
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_platform.py b/Lib/test/test_platform.py
index 9b2cd20..9c03a89 100644
--- a/Lib/test/test_platform.py
+++ b/Lib/test/test_platform.py
@@ -229,6 +229,14 @@ class PlatformTest(unittest.TestCase):
self.assertEqual(res[-1], res.processor)
self.assertEqual(len(res), 6)
+ @unittest.skipUnless(sys.platform.startswith('win'), "windows only test")
+ def test_uname_win32_without_wmi(self):
+ def raises_oserror(*a):
+ raise OSError()
+
+ with support.swap_attr(platform, '_wmi_query', raises_oserror):
+ self.test_uname()
+
def test_uname_cast_to_tuple(self):
res = platform.uname()
expected = (
@@ -289,20 +297,27 @@ class PlatformTest(unittest.TestCase):
# on 64 bit Windows: if PROCESSOR_ARCHITEW6432 exists we should be
# using it, per
# http://blogs.msdn.com/david.wang/archive/2006/03/26/HOWTO-Detect-Process-Bitness.aspx
- try:
+
+ # We also need to suppress WMI checks, as those are reliable and
+ # overrule the environment variables
+ def raises_oserror(*a):
+ raise OSError()
+
+ with support.swap_attr(platform, '_wmi_query', raises_oserror):
with os_helper.EnvironmentVarGuard() as environ:
- if 'PROCESSOR_ARCHITEW6432' in environ:
- del environ['PROCESSOR_ARCHITEW6432']
- environ['PROCESSOR_ARCHITECTURE'] = 'foo'
- platform._uname_cache = None
- system, node, release, version, machine, processor = platform.uname()
- self.assertEqual(machine, 'foo')
- environ['PROCESSOR_ARCHITEW6432'] = 'bar'
- platform._uname_cache = None
- system, node, release, version, machine, processor = platform.uname()
- self.assertEqual(machine, 'bar')
- finally:
- platform._uname_cache = None
+ try:
+ if 'PROCESSOR_ARCHITEW6432' in environ:
+ del environ['PROCESSOR_ARCHITEW6432']
+ environ['PROCESSOR_ARCHITECTURE'] = 'foo'
+ platform._uname_cache = None
+ system, node, release, version, machine, processor = platform.uname()
+ self.assertEqual(machine, 'foo')
+ environ['PROCESSOR_ARCHITEW6432'] = 'bar'
+ platform._uname_cache = None
+ system, node, release, version, machine, processor = platform.uname()
+ self.assertEqual(machine, 'bar')
+ finally:
+ platform._uname_cache = None
def test_java_ver(self):
res = platform.java_ver()
diff --git a/Lib/test/test_wmi.py b/Lib/test/test_wmi.py
new file mode 100644
index 0000000..af2a453
--- /dev/null
+++ b/Lib/test/test_wmi.py
@@ -0,0 +1,67 @@
+# Test the internal _wmi module on Windows
+# This is used by the platform module, and potentially others
+
+import re
+import sys
+import unittest
+from test.support import import_helper
+
+
+# Do this first so test will be skipped if module doesn't exist
+_wmi = import_helper.import_module('_wmi', required_on=['win'])
+
+
+class WmiTests(unittest.TestCase):
+ def test_wmi_query_os_version(self):
+ r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
+ self.assertEqual(1, len(r))
+ k, eq, v = r[0].partition("=")
+ self.assertEqual("=", eq, r[0])
+ self.assertEqual("Version", k, r[0])
+ # Best we can check for the version is that it's digits, dot, digits, anything
+ # Otherwise, we are likely checking the result of the query against itself
+ self.assertTrue(re.match(r"\d+\.\d+.+$", v), r[0])
+
+ def test_wmi_query_repeated(self):
+ # Repeated queries should not break
+ for _ in range(10):
+ self.test_wmi_query_os_version()
+
+ def test_wmi_query_error(self):
+ # Invalid queries fail with OSError
+ try:
+ _wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName")
+ except OSError as ex:
+ if ex.winerror & 0xFFFFFFFF == 0x80041010:
+ # This is the expected error code. All others should fail the test
+ return
+ self.fail("Expected OSError")
+
+ def test_wmi_query_repeated_error(self):
+ for _ in range(10):
+ self.test_wmi_query_error()
+
+ def test_wmi_query_not_select(self):
+ # Queries other than SELECT are blocked to avoid potential exploits
+ with self.assertRaises(ValueError):
+ _wmi.exec_query("not select, just in case someone tries something")
+
+ def test_wmi_query_overflow(self):
+ # Ensure very big queries fail
+ # Test multiple times to ensure consistency
+ for _ in range(2):
+ with self.assertRaises(OSError):
+ _wmi.exec_query("SELECT * FROM CIM_DataFile")
+
+ def test_wmi_query_multiple_rows(self):
+ # Multiple instances should have an extra null separator
+ r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
+ self.assertFalse(r.startswith("\0"), r)
+ self.assertFalse(r.endswith("\0"), r)
+ it = iter(r.split("\0"))
+ try:
+ while True:
+ self.assertTrue(re.match(r"ProcessId=\d+", next(it)))
+ self.assertEqual("", next(it))
+ except StopIteration:
+ pass