diff options
Diffstat (limited to 'Lib/test/symlink_support.py')
-rw-r--r-- | Lib/test/symlink_support.py | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/Lib/test/symlink_support.py b/Lib/test/symlink_support.py new file mode 100644 index 0000000..eab7667 --- /dev/null +++ b/Lib/test/symlink_support.py @@ -0,0 +1,203 @@ +""" +A module built to test if the current process has the privilege to +create symlinks on Windows. +""" + +# allow script to run natively under python 2.6+ +from __future__ import print_function + +import ctypes +from ctypes import wintypes + +GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess +GetCurrentProcess.restype = wintypes.HANDLE +OpenProcessToken = ctypes.windll.advapi32.OpenProcessToken +OpenProcessToken.argtypes = (wintypes.HANDLE, wintypes.DWORD, ctypes.POINTER(wintypes.HANDLE)) +OpenProcessToken.restype = wintypes.BOOL + +class LUID(ctypes.Structure): + _fields_ = [ + ('low_part', wintypes.DWORD), + ('high_part', wintypes.LONG), + ] + + def __eq__(self, other): + return ( + self.high_part == other.high_part and + self.low_part == other.low_part + ) + + def __ne__(self, other): + return not (self==other) + +LookupPrivilegeValue = ctypes.windll.advapi32.LookupPrivilegeValueW +LookupPrivilegeValue.argtypes = ( + wintypes.LPWSTR, # system name + wintypes.LPWSTR, # name + ctypes.POINTER(LUID), + ) +LookupPrivilegeValue.restype = wintypes.BOOL + +class TOKEN_INFORMATION_CLASS: + TokenUser = 1 + TokenGroups = 2 + TokenPrivileges = 3 + # ... see http://msdn.microsoft.com/en-us/library/aa379626%28VS.85%29.aspx + +SE_PRIVILEGE_ENABLED_BY_DEFAULT = (0x00000001) +SE_PRIVILEGE_ENABLED = (0x00000002) +SE_PRIVILEGE_REMOVED = (0x00000004) +SE_PRIVILEGE_USED_FOR_ACCESS = (0x80000000) + +class LUID_AND_ATTRIBUTES(ctypes.Structure): + _fields_ = [ + ('LUID', LUID), + ('attributes', wintypes.DWORD), + ] + + def is_enabled(self): + return bool(self.attributes & SE_PRIVILEGE_ENABLED) + + def enable(self): + self.attributes |= SE_PRIVILEGE_ENABLED + + def get_name(self): + size = wintypes.DWORD(10240) + buf = ctypes.create_unicode_buffer(size.value) + res = LookupPrivilegeName(None, self.LUID, buf, size) + if res == 0: + raise RuntimeError + return buf[:size.value] + + def __str__(self): + name = self.name + fmt = ['{name}', '{name} (enabled)'][self.is_enabled()] + return fmt.format(**vars()) + +LookupPrivilegeName = ctypes.windll.advapi32.LookupPrivilegeNameW +LookupPrivilegeName.argtypes = ( + wintypes.LPWSTR, # lpSystemName + ctypes.POINTER(LUID), # lpLuid + wintypes.LPWSTR, # lpName + ctypes.POINTER(wintypes.DWORD), #cchName + ) +LookupPrivilegeName.restype = wintypes.BOOL + +class TOKEN_PRIVILEGES(ctypes.Structure): + _fields_ = [ + ('count', wintypes.DWORD), + ('privileges', LUID_AND_ATTRIBUTES*0), + ] + + def get_array(self): + array_type = LUID_AND_ATTRIBUTES*self.count + privileges = ctypes.cast(self.privileges, ctypes.POINTER(array_type)).contents + return privileges + + def __iter__(self): + return iter(self.get_array()) + +PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES) + +GetTokenInformation = ctypes.windll.advapi32.GetTokenInformation +GetTokenInformation.argtypes = [ + wintypes.HANDLE, # TokenHandle + ctypes.c_uint, # TOKEN_INFORMATION_CLASS value + ctypes.c_void_p, # TokenInformation + wintypes.DWORD, # TokenInformationLength + ctypes.POINTER(wintypes.DWORD), # ReturnLength + ] +GetTokenInformation.restype = wintypes.BOOL + +# http://msdn.microsoft.com/en-us/library/aa375202%28VS.85%29.aspx +AdjustTokenPrivileges = ctypes.windll.advapi32.AdjustTokenPrivileges +AdjustTokenPrivileges.restype = wintypes.BOOL +AdjustTokenPrivileges.argtypes = [ + wintypes.HANDLE, # TokenHandle + wintypes.BOOL, # DisableAllPrivileges + PTOKEN_PRIVILEGES, # NewState (optional) + wintypes.DWORD, # BufferLength of PreviousState + PTOKEN_PRIVILEGES, # PreviousState (out, optional) + ctypes.POINTER(wintypes.DWORD), # ReturnLength + ] + +def get_process_token(): + "Get the current process token" + token = wintypes.HANDLE() + TOKEN_ALL_ACCESS = 0xf01ff + res = OpenProcessToken(GetCurrentProcess(), TOKEN_ALL_ACCESS, token) + if not res > 0: + raise RuntimeError("Couldn't get process token") + return token + +def get_symlink_luid(): + "Get the LUID for the SeCreateSymbolicLinkPrivilege" + symlink_luid = LUID() + res = LookupPrivilegeValue(None, "SeCreateSymbolicLinkPrivilege", symlink_luid) + if not res > 0: + raise RuntimeError("Couldn't lookup privilege value") + return symlink_luid + +def get_privilege_information(): + "Get all privileges associated with the current process." + # first call with zero length to determine what size buffer we need + + return_length = wintypes.DWORD() + params = [ + get_process_token(), + TOKEN_INFORMATION_CLASS.TokenPrivileges, + None, + 0, + return_length, + ] + + res = GetTokenInformation(*params) + + # assume we now have the necessary length in return_length + + buffer = ctypes.create_string_buffer(return_length.value) + params[2] = buffer + params[3] = return_length.value + + res = GetTokenInformation(*params) + assert res > 0, "Error in second GetTokenInformation (%d)" % res + + privileges = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents + return privileges + +def report_privilege_information(): + "Report all privilege information assigned to the current process." + privileges = get_privilege_information() + print("found {0} privileges".format(privileges.count)) + tuple(map(print, privileges)) + +def enable_symlink_privilege(): + """ + Try to assign the symlink privilege to the current process token. + Return True if the assignment is successful. + """ + # create a space in memory for a TOKEN_PRIVILEGES structure + # with one element + size = ctypes.sizeof(TOKEN_PRIVILEGES) + size += ctypes.sizeof(LUID_AND_ATTRIBUTES) + buffer = ctypes.create_string_buffer(size) + tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents + tp.count = 1 + tp.get_array()[0].enable() + tp.get_array()[0].LUID = get_symlink_luid() + token = get_process_token() + res = AdjustTokenPrivileges(token, False, tp, 0, None, None) + if res == 0: + raise RuntimeError("Error in AdjustTokenPrivileges") + + ERROR_NOT_ALL_ASSIGNED = 1300 + return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED + +def main(): + assigned = enable_symlink_privilege() + msg = ['failure', 'success'][assigned] + + print("Symlink privilege assignment completed with {0}".format(msg)) + +if __name__ == '__main__': + main() |