diff options
author | Brian Curtin <brian.curtin@gmail.com> | 2010-07-08 21:39:08 (GMT) |
---|---|---|
committer | Brian Curtin <brian.curtin@gmail.com> | 2010-07-08 21:39:08 (GMT) |
commit | d40e6f70a5edabffcbfff22912163520da3a29e2 (patch) | |
tree | ef8d44ca974bf606f3437d2ce9977207b8748174 /Lib/test/symlink_support.py | |
parent | 0dd8f7890a3396eaef8c740588c65af9422a65a5 (diff) | |
download | cpython-d40e6f70a5edabffcbfff22912163520da3a29e2.zip cpython-d40e6f70a5edabffcbfff22912163520da3a29e2.tar.gz cpython-d40e6f70a5edabffcbfff22912163520da3a29e2.tar.bz2 |
Implement #1578269. Patch by Jason R. Coombs.
Added Windows support for os.symlink when run on Windows 6.0 or greater,
aka Vista. Previous Windows versions will raise NotImplementedError
when trying to symlink.
Includes numerous test updates and additions to test_os, including
a symlink_support module because of the fact that privilege escalation
is required in order to run the tests to ensure that the user is able
to create symlinks. By default, accounts do not have the required
privilege, so the escalation code will have to be exposed later (or
documented on how to do so). I'll be following up with that work next.
Note that the tests use ctypes, which was agreed on during the PyCon
language summit.
Diffstat (limited to 'Lib/test/symlink_support.py')
-rw-r--r-- | Lib/test/symlink_support.py | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/Lib/test/symlink_support.py b/Lib/test/symlink_support.py new file mode 100644 index 0000000..eab7667 --- /dev/null +++ b/Lib/test/symlink_support.py @@ -0,0 +1,203 @@ +""" +A module built to test if the current process has the privilege to +create symlinks on Windows. +""" + +# allow script to run natively under python 2.6+ +from __future__ import print_function + +import ctypes +from ctypes import wintypes + +GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess +GetCurrentProcess.restype = wintypes.HANDLE +OpenProcessToken = ctypes.windll.advapi32.OpenProcessToken +OpenProcessToken.argtypes = (wintypes.HANDLE, wintypes.DWORD, ctypes.POINTER(wintypes.HANDLE)) +OpenProcessToken.restype = wintypes.BOOL + +class LUID(ctypes.Structure): + _fields_ = [ + ('low_part', wintypes.DWORD), + ('high_part', wintypes.LONG), + ] + + def __eq__(self, other): + return ( + self.high_part == other.high_part and + self.low_part == other.low_part + ) + + def __ne__(self, other): + return not (self==other) + +LookupPrivilegeValue = ctypes.windll.advapi32.LookupPrivilegeValueW +LookupPrivilegeValue.argtypes = ( + wintypes.LPWSTR, # system name + wintypes.LPWSTR, # name + ctypes.POINTER(LUID), + ) +LookupPrivilegeValue.restype = wintypes.BOOL + +class TOKEN_INFORMATION_CLASS: + TokenUser = 1 + TokenGroups = 2 + TokenPrivileges = 3 + # ... see http://msdn.microsoft.com/en-us/library/aa379626%28VS.85%29.aspx + +SE_PRIVILEGE_ENABLED_BY_DEFAULT = (0x00000001) +SE_PRIVILEGE_ENABLED = (0x00000002) +SE_PRIVILEGE_REMOVED = (0x00000004) +SE_PRIVILEGE_USED_FOR_ACCESS = (0x80000000) + +class LUID_AND_ATTRIBUTES(ctypes.Structure): + _fields_ = [ + ('LUID', LUID), + ('attributes', wintypes.DWORD), + ] + + def is_enabled(self): + return bool(self.attributes & SE_PRIVILEGE_ENABLED) + + def enable(self): + self.attributes |= SE_PRIVILEGE_ENABLED + + def get_name(self): + size = wintypes.DWORD(10240) + buf = ctypes.create_unicode_buffer(size.value) + res = LookupPrivilegeName(None, self.LUID, buf, size) + if res == 0: + raise RuntimeError + return buf[:size.value] + + def __str__(self): + name = self.name + fmt = ['{name}', '{name} (enabled)'][self.is_enabled()] + return fmt.format(**vars()) + +LookupPrivilegeName = ctypes.windll.advapi32.LookupPrivilegeNameW +LookupPrivilegeName.argtypes = ( + wintypes.LPWSTR, # lpSystemName + ctypes.POINTER(LUID), # lpLuid + wintypes.LPWSTR, # lpName + ctypes.POINTER(wintypes.DWORD), #cchName + ) +LookupPrivilegeName.restype = wintypes.BOOL + +class TOKEN_PRIVILEGES(ctypes.Structure): + _fields_ = [ + ('count', wintypes.DWORD), + ('privileges', LUID_AND_ATTRIBUTES*0), + ] + + def get_array(self): + array_type = LUID_AND_ATTRIBUTES*self.count + privileges = ctypes.cast(self.privileges, ctypes.POINTER(array_type)).contents + return privileges + + def __iter__(self): + return iter(self.get_array()) + +PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES) + +GetTokenInformation = ctypes.windll.advapi32.GetTokenInformation +GetTokenInformation.argtypes = [ + wintypes.HANDLE, # TokenHandle + ctypes.c_uint, # TOKEN_INFORMATION_CLASS value + ctypes.c_void_p, # TokenInformation + wintypes.DWORD, # TokenInformationLength + ctypes.POINTER(wintypes.DWORD), # ReturnLength + ] +GetTokenInformation.restype = wintypes.BOOL + +# http://msdn.microsoft.com/en-us/library/aa375202%28VS.85%29.aspx +AdjustTokenPrivileges = ctypes.windll.advapi32.AdjustTokenPrivileges +AdjustTokenPrivileges.restype = wintypes.BOOL +AdjustTokenPrivileges.argtypes = [ + wintypes.HANDLE, # TokenHandle + wintypes.BOOL, # DisableAllPrivileges + PTOKEN_PRIVILEGES, # NewState (optional) + wintypes.DWORD, # BufferLength of PreviousState + PTOKEN_PRIVILEGES, # PreviousState (out, optional) + ctypes.POINTER(wintypes.DWORD), # ReturnLength + ] + +def get_process_token(): + "Get the current process token" + token = wintypes.HANDLE() + TOKEN_ALL_ACCESS = 0xf01ff + res = OpenProcessToken(GetCurrentProcess(), TOKEN_ALL_ACCESS, token) + if not res > 0: + raise RuntimeError("Couldn't get process token") + return token + +def get_symlink_luid(): + "Get the LUID for the SeCreateSymbolicLinkPrivilege" + symlink_luid = LUID() + res = LookupPrivilegeValue(None, "SeCreateSymbolicLinkPrivilege", symlink_luid) + if not res > 0: + raise RuntimeError("Couldn't lookup privilege value") + return symlink_luid + +def get_privilege_information(): + "Get all privileges associated with the current process." + # first call with zero length to determine what size buffer we need + + return_length = wintypes.DWORD() + params = [ + get_process_token(), + TOKEN_INFORMATION_CLASS.TokenPrivileges, + None, + 0, + return_length, + ] + + res = GetTokenInformation(*params) + + # assume we now have the necessary length in return_length + + buffer = ctypes.create_string_buffer(return_length.value) + params[2] = buffer + params[3] = return_length.value + + res = GetTokenInformation(*params) + assert res > 0, "Error in second GetTokenInformation (%d)" % res + + privileges = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents + return privileges + +def report_privilege_information(): + "Report all privilege information assigned to the current process." + privileges = get_privilege_information() + print("found {0} privileges".format(privileges.count)) + tuple(map(print, privileges)) + +def enable_symlink_privilege(): + """ + Try to assign the symlink privilege to the current process token. + Return True if the assignment is successful. + """ + # create a space in memory for a TOKEN_PRIVILEGES structure + # with one element + size = ctypes.sizeof(TOKEN_PRIVILEGES) + size += ctypes.sizeof(LUID_AND_ATTRIBUTES) + buffer = ctypes.create_string_buffer(size) + tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents + tp.count = 1 + tp.get_array()[0].enable() + tp.get_array()[0].LUID = get_symlink_luid() + token = get_process_token() + res = AdjustTokenPrivileges(token, False, tp, 0, None, None) + if res == 0: + raise RuntimeError("Error in AdjustTokenPrivileges") + + ERROR_NOT_ALL_ASSIGNED = 1300 + return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED + +def main(): + assigned = enable_symlink_privilege() + msg = ['failure', 'success'][assigned] + + print("Symlink privilege assignment completed with {0}".format(msg)) + +if __name__ == '__main__': + main() |