diff options
Diffstat (limited to 'Lib/os.py')
| -rw-r--r-- | Lib/os.py | 535 |
1 files changed, 309 insertions, 226 deletions
@@ -2,12 +2,12 @@ r"""OS routines for Mac, NT, or Posix depending on what system we're on. This exports: - all functions from posix, nt, os2, or ce, e.g. unlink, stat, etc. - - os.path is one of the modules posixpath, or ntpath - - os.name is 'posix', 'nt', 'os2', 'ce' or 'riscos' + - os.path is either posixpath or ntpath + - os.name is either 'posix', 'nt', 'os2' or 'ce'. - os.curdir is a string representing the current directory ('.' or ':') - os.pardir is a string representing the parent directory ('..' or '::') - os.sep is the (or a most common) pathname separator ('/' or ':' or '\\') - - os.extsep is the extension separator ('.' or '/') + - os.extsep is the extension separator (always '.') - os.altsep is the alternate pathname separator (None or '/') - os.pathsep is the component separator used in $PATH etc - os.linesep is the line separator in text files ('\r' or '\n' or '\r\n') @@ -28,7 +28,7 @@ import sys, errno _names = sys.builtin_module_names # Note: more names are added to __all__ later. -__all__ = ["altsep", "curdir", "pardir", "sep", "extsep", "pathsep", "linesep", +__all__ = ["altsep", "curdir", "pardir", "sep", "pathsep", "linesep", "defpath", "name", "path", "devnull", "SEEK_SET", "SEEK_CUR", "SEEK_END"] @@ -99,22 +99,8 @@ elif 'ce' in _names: __all__.extend(_get_exports_list(ce)) del ce -elif 'riscos' in _names: - name = 'riscos' - linesep = '\n' - from riscos import * - try: - from riscos import _exit - except ImportError: - pass - import riscospath as path - - import riscos - __all__.extend(_get_exports_list(riscos)) - del riscos - else: - raise ImportError, 'no os specific module found' + raise ImportError('no os specific module found') sys.modules['os.path'] = path from os.path import (curdir, pardir, sep, pathsep, defpath, extsep, altsep, @@ -128,18 +114,26 @@ SEEK_SET = 0 SEEK_CUR = 1 SEEK_END = 2 + +def _get_masked_mode(mode): + mask = umask(0) + umask(mask) + return mode & ~mask + #' # Super directory utilities. # (Inspired by Eric Raymond; the doc strings are mostly his) -def makedirs(name, mode=0777): - """makedirs(path [, mode=0777]) +def makedirs(name, mode=0o777, exist_ok=False): + """makedirs(path [, mode=0o777][, exist_ok=False]) Super-mkdir; create a leaf directory and all intermediate ones. Works like mkdir, except that any intermediate path segment (not - just the rightmost) will be created if it does not exist. This is - recursive. + just the rightmost) will be created if it does not exist. If the + target directory with the same mode as we specified already exists, + raises an OSError if exist_ok is False, otherwise no exception is + raised. This is recursive. """ head, tail = path.split(name) @@ -147,14 +141,29 @@ def makedirs(name, mode=0777): head, tail = path.split(head) if head and tail and not path.exists(head): try: - makedirs(head, mode) - except OSError, e: + makedirs(head, mode, exist_ok) + except OSError as e: # be happy if someone already created the path if e.errno != errno.EEXIST: raise if tail == curdir: # xxx/newdir/. exists if xxx/newdir exists return - mkdir(name, mode) + try: + mkdir(name, mode) + except OSError as e: + import stat as st + dir_exists = path.isdir(name) + expected_mode = _get_masked_mode(mode) + if dir_exists: + # S_ISGID is automatically copied by the OS from parent to child + # directories on mkdir. Don't consider it being set to be a mode + # mismatch as mkdir does not unset it when not specified in mode. + actual_mode = st.S_IMODE(lstat(name).st_mode) & ~st.S_ISGID + else: + actual_mode = -1 + if not (e.errno == errno.EEXIST and exist_ok and dir_exists and + actual_mode == expected_mode): + raise def removedirs(name): """removedirs(path) @@ -256,9 +265,9 @@ def walk(top, topdown=True, onerror=None, followlinks=False): import os from os.path import join, getsize for root, dirs, files in os.walk('python/Lib/email'): - print root, "consumes", - print sum([getsize(join(root, name)) for name in files]), - print "bytes in", len(files), "non-directory files" + print(root, "consumes", end="") + print(sum([getsize(join(root, name)) for name in files]), end="") + print("bytes in", len(files), "non-directory files") if 'CVS' in dirs: dirs.remove('CVS') # don't visit CVS directories """ @@ -266,7 +275,7 @@ def walk(top, topdown=True, onerror=None, followlinks=False): islink, join, isdir = path.islink, path.join, path.isdir # We may not have read permission for top, in which case we can't - # get a list of the files the directory contains. os.path.walk + # get a list of the files the directory contains. os.walk # always suppressed the exception then, rather than blow up for a # minor reason when (say) a thousand readable directories are still # left to visit. That logic is copied here. @@ -274,7 +283,7 @@ def walk(top, topdown=True, onerror=None, followlinks=False): # Note that listdir and error are globals in this module due # to earlier import-*. names = listdir(top) - except error, err: + except error as err: if onerror is not None: onerror(err) return @@ -356,162 +365,247 @@ __all__.extend(["execl","execle","execlp","execlpe","execvp","execvpe"]) def _execvpe(file, args, env=None): if env is not None: - func = execve + exec_func = execve argrest = (args, env) else: - func = execv + exec_func = execv argrest = (args,) env = environ head, tail = path.split(file) if head: - func(file, *argrest) + exec_func(file, *argrest) return - if 'PATH' in env: - envpath = env['PATH'] - else: - envpath = defpath - PATH = envpath.split(pathsep) - saved_exc = None + last_exc = saved_exc = None saved_tb = None - for dir in PATH: + path_list = get_exec_path(env) + if name != 'nt': + file = fsencode(file) + path_list = map(fsencode, path_list) + for dir in path_list: fullname = path.join(dir, file) try: - func(fullname, *argrest) - except error, e: + exec_func(fullname, *argrest) + except error as e: + last_exc = e tb = sys.exc_info()[2] if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR and saved_exc is None): saved_exc = e saved_tb = tb if saved_exc: - raise error, saved_exc, saved_tb - raise error, e, tb + raise saved_exc.with_traceback(saved_tb) + raise last_exc.with_traceback(tb) + + +def get_exec_path(env=None): + """Returns the sequence of directories that will be searched for the + named executable (similar to a shell) when launching a process. + + *env* must be an environment variable dict or None. If *env* is None, + os.environ will be used. + """ + # Use a local import instead of a global import to limit the number of + # modules loaded at startup: the os module is always loaded at startup by + # Python. It may also avoid a bootstrap issue. + import warnings + + if env is None: + env = environ + + # {b'PATH': ...}.get('PATH') and {'PATH': ...}.get(b'PATH') emit a + # BytesWarning when using python -b or python -bb: ignore the warning + with warnings.catch_warnings(): + warnings.simplefilter("ignore", BytesWarning) + + try: + path_list = env.get('PATH') + except TypeError: + path_list = None + + if supports_bytes_environ: + try: + path_listb = env[b'PATH'] + except (KeyError, TypeError): + pass + else: + if path_list is not None: + raise ValueError( + "env cannot contain 'PATH' and b'PATH' keys") + path_list = path_listb + + if path_list is not None and isinstance(path_list, bytes): + path_list = fsdecode(path_list) + + if path_list is None: + path_list = defpath + return path_list.split(pathsep) + + +# Change environ to automatically call putenv(), unsetenv if they exist. +from _abcoll import MutableMapping # Can't use collections (bootstrap) + +class _Environ(MutableMapping): + def __init__(self, data, encodekey, decodekey, encodevalue, decodevalue, putenv, unsetenv): + self.encodekey = encodekey + self.decodekey = decodekey + self.encodevalue = encodevalue + self.decodevalue = decodevalue + self.putenv = putenv + self.unsetenv = unsetenv + self._data = data + + def __getitem__(self, key): + value = self._data[self.encodekey(key)] + return self.decodevalue(value) + + def __setitem__(self, key, value): + key = self.encodekey(key) + value = self.encodevalue(value) + self.putenv(key, value) + self._data[key] = value + + def __delitem__(self, key): + key = self.encodekey(key) + self.unsetenv(key) + del self._data[key] + + def __iter__(self): + for key in self._data: + yield self.decodekey(key) + + def __len__(self): + return len(self._data) + + def __repr__(self): + return 'environ({{{}}})'.format(', '.join( + ('{!r}: {!r}'.format(self.decodekey(key), self.decodevalue(value)) + for key, value in self._data.items()))) + + def copy(self): + return dict(self) + + def setdefault(self, key, value): + if key not in self: + self[key] = value + return self[key] -# Change environ to automatically call putenv() if it exists try: - # This will fail if there's no putenv - putenv + _putenv = putenv except NameError: - pass + _putenv = lambda key, value: None else: - import UserDict + __all__.append("putenv") - # Fake unsetenv() for Windows - # not sure about os2 here but - # I'm guessing they are the same. +try: + _unsetenv = unsetenv +except NameError: + _unsetenv = lambda key: _putenv(key, "") +else: + __all__.append("unsetenv") +def _createenviron(): if name in ('os2', 'nt'): - def unsetenv(key): - putenv(key, "") - - if name == "riscos": - # On RISC OS, all env access goes through getenv and putenv - from riscosenviron import _Environ - elif name in ('os2', 'nt'): # Where Env Var Names Must Be UPPERCASE - # But we store them as upper case - class _Environ(UserDict.IterableUserDict): - def __init__(self, environ): - UserDict.UserDict.__init__(self) - data = self.data - for k, v in environ.items(): - data[k.upper()] = v - def __setitem__(self, key, item): - putenv(key, item) - self.data[key.upper()] = item - def __getitem__(self, key): - return self.data[key.upper()] - try: - unsetenv - except NameError: - def __delitem__(self, key): - del self.data[key.upper()] - else: - def __delitem__(self, key): - unsetenv(key) - del self.data[key.upper()] - def clear(self): - for key in self.data.keys(): - unsetenv(key) - del self.data[key] - def pop(self, key, *args): - unsetenv(key) - return self.data.pop(key.upper(), *args) - def has_key(self, key): - return key.upper() in self.data - def __contains__(self, key): - return key.upper() in self.data - def get(self, key, failobj=None): - return self.data.get(key.upper(), failobj) - def update(self, dict=None, **kwargs): - if dict: - try: - keys = dict.keys() - except AttributeError: - # List of (key, value) - for k, v in dict: - self[k] = v - else: - # got keys - # cannot use items(), since mappings - # may not have them. - for k in keys: - self[k] = dict[k] - if kwargs: - self.update(kwargs) - def copy(self): - return dict(self) - - else: # Where Env Var Names Can Be Mixed Case - class _Environ(UserDict.IterableUserDict): - def __init__(self, environ): - UserDict.UserDict.__init__(self) - self.data = environ - def __setitem__(self, key, item): - putenv(key, item) - self.data[key] = item - def update(self, dict=None, **kwargs): - if dict: - try: - keys = dict.keys() - except AttributeError: - # List of (key, value) - for k, v in dict: - self[k] = v - else: - # got keys - # cannot use items(), since mappings - # may not have them. - for k in keys: - self[k] = dict[k] - if kwargs: - self.update(kwargs) - try: - unsetenv - except NameError: - pass - else: - def __delitem__(self, key): - unsetenv(key) - del self.data[key] - def clear(self): - for key in self.data.keys(): - unsetenv(key) - del self.data[key] - def pop(self, key, *args): - unsetenv(key) - return self.data.pop(key, *args) - def copy(self): - return dict(self) - - - environ = _Environ(environ) + # Where Env Var Names Must Be UPPERCASE + def check_str(value): + if not isinstance(value, str): + raise TypeError("str expected, not %s" % type(value).__name__) + return value + encode = check_str + decode = str + def encodekey(key): + return encode(key).upper() + data = {} + for key, value in environ.items(): + data[encodekey(key)] = value + else: + # Where Env Var Names Can Be Mixed Case + encoding = sys.getfilesystemencoding() + def encode(value): + if not isinstance(value, str): + raise TypeError("str expected, not %s" % type(value).__name__) + return value.encode(encoding, 'surrogateescape') + def decode(value): + return value.decode(encoding, 'surrogateescape') + encodekey = encode + data = environ + return _Environ(data, + encodekey, decode, + encode, decode, + _putenv, _unsetenv) + +# unicode environ +environ = _createenviron() +del _createenviron + def getenv(key, default=None): """Get an environment variable, return None if it doesn't exist. - The optional second argument can specify an alternate default.""" + The optional second argument can specify an alternate default. + key, default and the result are str.""" return environ.get(key, default) -__all__.append("getenv") + +supports_bytes_environ = name not in ('os2', 'nt') +__all__.extend(("getenv", "supports_bytes_environ")) + +if supports_bytes_environ: + def _check_bytes(value): + if not isinstance(value, bytes): + raise TypeError("bytes expected, not %s" % type(value).__name__) + return value + + # bytes environ + environb = _Environ(environ._data, + _check_bytes, bytes, + _check_bytes, bytes, + _putenv, _unsetenv) + del _check_bytes + + def getenvb(key, default=None): + """Get an environment variable, return None if it doesn't exist. + The optional second argument can specify an alternate default. + key, default and the result are bytes.""" + return environb.get(key, default) + + __all__.extend(("environb", "getenvb")) + +def _fscodec(): + encoding = sys.getfilesystemencoding() + if encoding == 'mbcs': + errors = 'strict' + else: + errors = 'surrogateescape' + + def fsencode(filename): + """ + Encode filename to the filesystem encoding with 'surrogateescape' error + handler, return bytes unchanged. On Windows, use 'strict' error handler if + the file system encoding is 'mbcs' (which is the default encoding). + """ + if isinstance(filename, bytes): + return filename + elif isinstance(filename, str): + return filename.encode(encoding, errors) + else: + raise TypeError("expect bytes or str, not %s" % type(filename).__name__) + + def fsdecode(filename): + """ + Decode filename from the filesystem encoding with 'surrogateescape' error + handler, return str unchanged. On Windows, use 'strict' error handler if + the file system encoding is 'mbcs' (which is the default encoding). + """ + if isinstance(filename, str): + return filename + elif isinstance(filename, bytes): + return filename.decode(encoding, errors) + else: + raise TypeError("expect bytes or str, not %s" % type(filename).__name__) + + return fsencode, fsdecode + +fsencode, fsdecode = _fscodec() +del _fscodec def _exists(name): return name in globals() @@ -551,7 +645,7 @@ if _exists("fork") and not _exists("spawnv") and _exists("execv"): elif WIFEXITED(sts): return WEXITSTATUS(sts) else: - raise error, "Not stopped, signaled or exited???" + raise error("Not stopped, signaled or exited???") def spawnv(mode, file, args): """spawnv(mode, file, args) -> integer @@ -649,70 +743,7 @@ otherwise return -SIG, where SIG is the signal that killed it. """ __all__.extend(["spawnvp", "spawnvpe", "spawnlp", "spawnlpe",]) - -# Supply popen2 etc. (for Unix) -if _exists("fork"): - if not _exists("popen2"): - def popen2(cmd, mode="t", bufsize=-1): - """Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' - may be a sequence, in which case arguments will be passed directly to - the program without shell intervention (as with os.spawnv()). If 'cmd' - is a string it will be passed to the shell (as with os.system()). If - 'bufsize' is specified, it sets the buffer size for the I/O pipes. The - file objects (child_stdin, child_stdout) are returned.""" - import warnings - msg = "os.popen2 is deprecated. Use the subprocess module." - warnings.warn(msg, DeprecationWarning, stacklevel=2) - - import subprocess - PIPE = subprocess.PIPE - p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring), - bufsize=bufsize, stdin=PIPE, stdout=PIPE, - close_fds=True) - return p.stdin, p.stdout - __all__.append("popen2") - - if not _exists("popen3"): - def popen3(cmd, mode="t", bufsize=-1): - """Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' - may be a sequence, in which case arguments will be passed directly to - the program without shell intervention (as with os.spawnv()). If 'cmd' - is a string it will be passed to the shell (as with os.system()). If - 'bufsize' is specified, it sets the buffer size for the I/O pipes. The - file objects (child_stdin, child_stdout, child_stderr) are returned.""" - import warnings - msg = "os.popen3 is deprecated. Use the subprocess module." - warnings.warn(msg, DeprecationWarning, stacklevel=2) - - import subprocess - PIPE = subprocess.PIPE - p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring), - bufsize=bufsize, stdin=PIPE, stdout=PIPE, - stderr=PIPE, close_fds=True) - return p.stdin, p.stdout, p.stderr - __all__.append("popen3") - - if not _exists("popen4"): - def popen4(cmd, mode="t", bufsize=-1): - """Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' - may be a sequence, in which case arguments will be passed directly to - the program without shell intervention (as with os.spawnv()). If 'cmd' - is a string it will be passed to the shell (as with os.system()). If - 'bufsize' is specified, it sets the buffer size for the I/O pipes. The - file objects (child_stdin, child_stdout_stderr) are returned.""" - import warnings - msg = "os.popen4 is deprecated. Use the subprocess module." - warnings.warn(msg, DeprecationWarning, stacklevel=2) - - import subprocess - PIPE = subprocess.PIPE - p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring), - bufsize=bufsize, stdin=PIPE, stdout=PIPE, - stderr=subprocess.STDOUT, close_fds=True) - return p.stdin, p.stdout - __all__.append("popen4") - -import copy_reg as _copy_reg +import copyreg as _copyreg def _make_stat_result(tup, dict): return stat_result(tup, dict) @@ -722,7 +753,7 @@ def _pickle_stat_result(sr): return (_make_stat_result, args) try: - _copy_reg.pickle(stat_result, _pickle_stat_result, _make_stat_result) + _copyreg.pickle(stat_result, _pickle_stat_result, _make_stat_result) except NameError: # stat_result may not exist pass @@ -734,7 +765,59 @@ def _pickle_statvfs_result(sr): return (_make_statvfs_result, args) try: - _copy_reg.pickle(statvfs_result, _pickle_statvfs_result, + _copyreg.pickle(statvfs_result, _pickle_statvfs_result, _make_statvfs_result) except NameError: # statvfs_result may not exist pass + +# Supply os.popen() +def popen(cmd, mode="r", buffering=-1): + if not isinstance(cmd, str): + raise TypeError("invalid cmd type (%s, expected string)" % type(cmd)) + if mode not in ("r", "w"): + raise ValueError("invalid mode %r" % mode) + if buffering == 0 or buffering == None: + raise ValueError("popen() does not support unbuffered streams") + import subprocess, io + if mode == "r": + proc = subprocess.Popen(cmd, + shell=True, + stdout=subprocess.PIPE, + bufsize=buffering) + return _wrap_close(io.TextIOWrapper(proc.stdout), proc) + else: + proc = subprocess.Popen(cmd, + shell=True, + stdin=subprocess.PIPE, + bufsize=buffering) + return _wrap_close(io.TextIOWrapper(proc.stdin), proc) + +# Helper for popen() -- a proxy for a file whose close waits for the process +class _wrap_close: + def __init__(self, stream, proc): + self._stream = stream + self._proc = proc + def close(self): + self._stream.close() + returncode = self._proc.wait() + if returncode == 0: + return None + if name == 'nt': + return returncode + else: + return returncode << 8 # Shift left to match old behavior + def __enter__(self): + return self + def __exit__(self, *args): + self.close() + def __getattr__(self, name): + return getattr(self._stream, name) + def __iter__(self): + return iter(self._stream) + +# Supply os.fdopen() +def fdopen(fd, *args, **kwargs): + if not isinstance(fd, int): + raise TypeError("invalid fd type (%s, expected integer)" % type(fd)) + import io + return io.open(fd, *args, **kwargs) |
