summaryrefslogtreecommitdiffstats
path: root/Lib/os.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/os.py')
-rw-r--r--Lib/os.py535
1 files changed, 309 insertions, 226 deletions
diff --git a/Lib/os.py b/Lib/os.py
index 6d598f3..d1101a2 100644
--- a/Lib/os.py
+++ b/Lib/os.py
@@ -2,12 +2,12 @@ r"""OS routines for Mac, NT, or Posix depending on what system we're on.
This exports:
- all functions from posix, nt, os2, or ce, e.g. unlink, stat, etc.
- - os.path is one of the modules posixpath, or ntpath
- - os.name is 'posix', 'nt', 'os2', 'ce' or 'riscos'
+ - os.path is either posixpath or ntpath
+ - os.name is either 'posix', 'nt', 'os2' or 'ce'.
- os.curdir is a string representing the current directory ('.' or ':')
- os.pardir is a string representing the parent directory ('..' or '::')
- os.sep is the (or a most common) pathname separator ('/' or ':' or '\\')
- - os.extsep is the extension separator ('.' or '/')
+ - os.extsep is the extension separator (always '.')
- os.altsep is the alternate pathname separator (None or '/')
- os.pathsep is the component separator used in $PATH etc
- os.linesep is the line separator in text files ('\r' or '\n' or '\r\n')
@@ -28,7 +28,7 @@ import sys, errno
_names = sys.builtin_module_names
# Note: more names are added to __all__ later.
-__all__ = ["altsep", "curdir", "pardir", "sep", "extsep", "pathsep", "linesep",
+__all__ = ["altsep", "curdir", "pardir", "sep", "pathsep", "linesep",
"defpath", "name", "path", "devnull",
"SEEK_SET", "SEEK_CUR", "SEEK_END"]
@@ -99,22 +99,8 @@ elif 'ce' in _names:
__all__.extend(_get_exports_list(ce))
del ce
-elif 'riscos' in _names:
- name = 'riscos'
- linesep = '\n'
- from riscos import *
- try:
- from riscos import _exit
- except ImportError:
- pass
- import riscospath as path
-
- import riscos
- __all__.extend(_get_exports_list(riscos))
- del riscos
-
else:
- raise ImportError, 'no os specific module found'
+ raise ImportError('no os specific module found')
sys.modules['os.path'] = path
from os.path import (curdir, pardir, sep, pathsep, defpath, extsep, altsep,
@@ -128,18 +114,26 @@ SEEK_SET = 0
SEEK_CUR = 1
SEEK_END = 2
+
+def _get_masked_mode(mode):
+ mask = umask(0)
+ umask(mask)
+ return mode & ~mask
+
#'
# Super directory utilities.
# (Inspired by Eric Raymond; the doc strings are mostly his)
-def makedirs(name, mode=0777):
- """makedirs(path [, mode=0777])
+def makedirs(name, mode=0o777, exist_ok=False):
+ """makedirs(path [, mode=0o777][, exist_ok=False])
Super-mkdir; create a leaf directory and all intermediate ones.
Works like mkdir, except that any intermediate path segment (not
- just the rightmost) will be created if it does not exist. This is
- recursive.
+ just the rightmost) will be created if it does not exist. If the
+ target directory with the same mode as we specified already exists,
+ raises an OSError if exist_ok is False, otherwise no exception is
+ raised. This is recursive.
"""
head, tail = path.split(name)
@@ -147,14 +141,29 @@ def makedirs(name, mode=0777):
head, tail = path.split(head)
if head and tail and not path.exists(head):
try:
- makedirs(head, mode)
- except OSError, e:
+ makedirs(head, mode, exist_ok)
+ except OSError as e:
# be happy if someone already created the path
if e.errno != errno.EEXIST:
raise
if tail == curdir: # xxx/newdir/. exists if xxx/newdir exists
return
- mkdir(name, mode)
+ try:
+ mkdir(name, mode)
+ except OSError as e:
+ import stat as st
+ dir_exists = path.isdir(name)
+ expected_mode = _get_masked_mode(mode)
+ if dir_exists:
+ # S_ISGID is automatically copied by the OS from parent to child
+ # directories on mkdir. Don't consider it being set to be a mode
+ # mismatch as mkdir does not unset it when not specified in mode.
+ actual_mode = st.S_IMODE(lstat(name).st_mode) & ~st.S_ISGID
+ else:
+ actual_mode = -1
+ if not (e.errno == errno.EEXIST and exist_ok and dir_exists and
+ actual_mode == expected_mode):
+ raise
def removedirs(name):
"""removedirs(path)
@@ -256,9 +265,9 @@ def walk(top, topdown=True, onerror=None, followlinks=False):
import os
from os.path import join, getsize
for root, dirs, files in os.walk('python/Lib/email'):
- print root, "consumes",
- print sum([getsize(join(root, name)) for name in files]),
- print "bytes in", len(files), "non-directory files"
+ print(root, "consumes", end="")
+ print(sum([getsize(join(root, name)) for name in files]), end="")
+ print("bytes in", len(files), "non-directory files")
if 'CVS' in dirs:
dirs.remove('CVS') # don't visit CVS directories
"""
@@ -266,7 +275,7 @@ def walk(top, topdown=True, onerror=None, followlinks=False):
islink, join, isdir = path.islink, path.join, path.isdir
# We may not have read permission for top, in which case we can't
- # get a list of the files the directory contains. os.path.walk
+ # get a list of the files the directory contains. os.walk
# always suppressed the exception then, rather than blow up for a
# minor reason when (say) a thousand readable directories are still
# left to visit. That logic is copied here.
@@ -274,7 +283,7 @@ def walk(top, topdown=True, onerror=None, followlinks=False):
# Note that listdir and error are globals in this module due
# to earlier import-*.
names = listdir(top)
- except error, err:
+ except error as err:
if onerror is not None:
onerror(err)
return
@@ -356,162 +365,247 @@ __all__.extend(["execl","execle","execlp","execlpe","execvp","execvpe"])
def _execvpe(file, args, env=None):
if env is not None:
- func = execve
+ exec_func = execve
argrest = (args, env)
else:
- func = execv
+ exec_func = execv
argrest = (args,)
env = environ
head, tail = path.split(file)
if head:
- func(file, *argrest)
+ exec_func(file, *argrest)
return
- if 'PATH' in env:
- envpath = env['PATH']
- else:
- envpath = defpath
- PATH = envpath.split(pathsep)
- saved_exc = None
+ last_exc = saved_exc = None
saved_tb = None
- for dir in PATH:
+ path_list = get_exec_path(env)
+ if name != 'nt':
+ file = fsencode(file)
+ path_list = map(fsencode, path_list)
+ for dir in path_list:
fullname = path.join(dir, file)
try:
- func(fullname, *argrest)
- except error, e:
+ exec_func(fullname, *argrest)
+ except error as e:
+ last_exc = e
tb = sys.exc_info()[2]
if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR
and saved_exc is None):
saved_exc = e
saved_tb = tb
if saved_exc:
- raise error, saved_exc, saved_tb
- raise error, e, tb
+ raise saved_exc.with_traceback(saved_tb)
+ raise last_exc.with_traceback(tb)
+
+
+def get_exec_path(env=None):
+ """Returns the sequence of directories that will be searched for the
+ named executable (similar to a shell) when launching a process.
+
+ *env* must be an environment variable dict or None. If *env* is None,
+ os.environ will be used.
+ """
+ # Use a local import instead of a global import to limit the number of
+ # modules loaded at startup: the os module is always loaded at startup by
+ # Python. It may also avoid a bootstrap issue.
+ import warnings
+
+ if env is None:
+ env = environ
+
+ # {b'PATH': ...}.get('PATH') and {'PATH': ...}.get(b'PATH') emit a
+ # BytesWarning when using python -b or python -bb: ignore the warning
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", BytesWarning)
+
+ try:
+ path_list = env.get('PATH')
+ except TypeError:
+ path_list = None
+
+ if supports_bytes_environ:
+ try:
+ path_listb = env[b'PATH']
+ except (KeyError, TypeError):
+ pass
+ else:
+ if path_list is not None:
+ raise ValueError(
+ "env cannot contain 'PATH' and b'PATH' keys")
+ path_list = path_listb
+
+ if path_list is not None and isinstance(path_list, bytes):
+ path_list = fsdecode(path_list)
+
+ if path_list is None:
+ path_list = defpath
+ return path_list.split(pathsep)
+
+
+# Change environ to automatically call putenv(), unsetenv if they exist.
+from _abcoll import MutableMapping # Can't use collections (bootstrap)
+
+class _Environ(MutableMapping):
+ def __init__(self, data, encodekey, decodekey, encodevalue, decodevalue, putenv, unsetenv):
+ self.encodekey = encodekey
+ self.decodekey = decodekey
+ self.encodevalue = encodevalue
+ self.decodevalue = decodevalue
+ self.putenv = putenv
+ self.unsetenv = unsetenv
+ self._data = data
+
+ def __getitem__(self, key):
+ value = self._data[self.encodekey(key)]
+ return self.decodevalue(value)
+
+ def __setitem__(self, key, value):
+ key = self.encodekey(key)
+ value = self.encodevalue(value)
+ self.putenv(key, value)
+ self._data[key] = value
+
+ def __delitem__(self, key):
+ key = self.encodekey(key)
+ self.unsetenv(key)
+ del self._data[key]
+
+ def __iter__(self):
+ for key in self._data:
+ yield self.decodekey(key)
+
+ def __len__(self):
+ return len(self._data)
+
+ def __repr__(self):
+ return 'environ({{{}}})'.format(', '.join(
+ ('{!r}: {!r}'.format(self.decodekey(key), self.decodevalue(value))
+ for key, value in self._data.items())))
+
+ def copy(self):
+ return dict(self)
+
+ def setdefault(self, key, value):
+ if key not in self:
+ self[key] = value
+ return self[key]
-# Change environ to automatically call putenv() if it exists
try:
- # This will fail if there's no putenv
- putenv
+ _putenv = putenv
except NameError:
- pass
+ _putenv = lambda key, value: None
else:
- import UserDict
+ __all__.append("putenv")
- # Fake unsetenv() for Windows
- # not sure about os2 here but
- # I'm guessing they are the same.
+try:
+ _unsetenv = unsetenv
+except NameError:
+ _unsetenv = lambda key: _putenv(key, "")
+else:
+ __all__.append("unsetenv")
+def _createenviron():
if name in ('os2', 'nt'):
- def unsetenv(key):
- putenv(key, "")
-
- if name == "riscos":
- # On RISC OS, all env access goes through getenv and putenv
- from riscosenviron import _Environ
- elif name in ('os2', 'nt'): # Where Env Var Names Must Be UPPERCASE
- # But we store them as upper case
- class _Environ(UserDict.IterableUserDict):
- def __init__(self, environ):
- UserDict.UserDict.__init__(self)
- data = self.data
- for k, v in environ.items():
- data[k.upper()] = v
- def __setitem__(self, key, item):
- putenv(key, item)
- self.data[key.upper()] = item
- def __getitem__(self, key):
- return self.data[key.upper()]
- try:
- unsetenv
- except NameError:
- def __delitem__(self, key):
- del self.data[key.upper()]
- else:
- def __delitem__(self, key):
- unsetenv(key)
- del self.data[key.upper()]
- def clear(self):
- for key in self.data.keys():
- unsetenv(key)
- del self.data[key]
- def pop(self, key, *args):
- unsetenv(key)
- return self.data.pop(key.upper(), *args)
- def has_key(self, key):
- return key.upper() in self.data
- def __contains__(self, key):
- return key.upper() in self.data
- def get(self, key, failobj=None):
- return self.data.get(key.upper(), failobj)
- def update(self, dict=None, **kwargs):
- if dict:
- try:
- keys = dict.keys()
- except AttributeError:
- # List of (key, value)
- for k, v in dict:
- self[k] = v
- else:
- # got keys
- # cannot use items(), since mappings
- # may not have them.
- for k in keys:
- self[k] = dict[k]
- if kwargs:
- self.update(kwargs)
- def copy(self):
- return dict(self)
-
- else: # Where Env Var Names Can Be Mixed Case
- class _Environ(UserDict.IterableUserDict):
- def __init__(self, environ):
- UserDict.UserDict.__init__(self)
- self.data = environ
- def __setitem__(self, key, item):
- putenv(key, item)
- self.data[key] = item
- def update(self, dict=None, **kwargs):
- if dict:
- try:
- keys = dict.keys()
- except AttributeError:
- # List of (key, value)
- for k, v in dict:
- self[k] = v
- else:
- # got keys
- # cannot use items(), since mappings
- # may not have them.
- for k in keys:
- self[k] = dict[k]
- if kwargs:
- self.update(kwargs)
- try:
- unsetenv
- except NameError:
- pass
- else:
- def __delitem__(self, key):
- unsetenv(key)
- del self.data[key]
- def clear(self):
- for key in self.data.keys():
- unsetenv(key)
- del self.data[key]
- def pop(self, key, *args):
- unsetenv(key)
- return self.data.pop(key, *args)
- def copy(self):
- return dict(self)
-
-
- environ = _Environ(environ)
+ # Where Env Var Names Must Be UPPERCASE
+ def check_str(value):
+ if not isinstance(value, str):
+ raise TypeError("str expected, not %s" % type(value).__name__)
+ return value
+ encode = check_str
+ decode = str
+ def encodekey(key):
+ return encode(key).upper()
+ data = {}
+ for key, value in environ.items():
+ data[encodekey(key)] = value
+ else:
+ # Where Env Var Names Can Be Mixed Case
+ encoding = sys.getfilesystemencoding()
+ def encode(value):
+ if not isinstance(value, str):
+ raise TypeError("str expected, not %s" % type(value).__name__)
+ return value.encode(encoding, 'surrogateescape')
+ def decode(value):
+ return value.decode(encoding, 'surrogateescape')
+ encodekey = encode
+ data = environ
+ return _Environ(data,
+ encodekey, decode,
+ encode, decode,
+ _putenv, _unsetenv)
+
+# unicode environ
+environ = _createenviron()
+del _createenviron
+
def getenv(key, default=None):
"""Get an environment variable, return None if it doesn't exist.
- The optional second argument can specify an alternate default."""
+ The optional second argument can specify an alternate default.
+ key, default and the result are str."""
return environ.get(key, default)
-__all__.append("getenv")
+
+supports_bytes_environ = name not in ('os2', 'nt')
+__all__.extend(("getenv", "supports_bytes_environ"))
+
+if supports_bytes_environ:
+ def _check_bytes(value):
+ if not isinstance(value, bytes):
+ raise TypeError("bytes expected, not %s" % type(value).__name__)
+ return value
+
+ # bytes environ
+ environb = _Environ(environ._data,
+ _check_bytes, bytes,
+ _check_bytes, bytes,
+ _putenv, _unsetenv)
+ del _check_bytes
+
+ def getenvb(key, default=None):
+ """Get an environment variable, return None if it doesn't exist.
+ The optional second argument can specify an alternate default.
+ key, default and the result are bytes."""
+ return environb.get(key, default)
+
+ __all__.extend(("environb", "getenvb"))
+
+def _fscodec():
+ encoding = sys.getfilesystemencoding()
+ if encoding == 'mbcs':
+ errors = 'strict'
+ else:
+ errors = 'surrogateescape'
+
+ def fsencode(filename):
+ """
+ Encode filename to the filesystem encoding with 'surrogateescape' error
+ handler, return bytes unchanged. On Windows, use 'strict' error handler if
+ the file system encoding is 'mbcs' (which is the default encoding).
+ """
+ if isinstance(filename, bytes):
+ return filename
+ elif isinstance(filename, str):
+ return filename.encode(encoding, errors)
+ else:
+ raise TypeError("expect bytes or str, not %s" % type(filename).__name__)
+
+ def fsdecode(filename):
+ """
+ Decode filename from the filesystem encoding with 'surrogateescape' error
+ handler, return str unchanged. On Windows, use 'strict' error handler if
+ the file system encoding is 'mbcs' (which is the default encoding).
+ """
+ if isinstance(filename, str):
+ return filename
+ elif isinstance(filename, bytes):
+ return filename.decode(encoding, errors)
+ else:
+ raise TypeError("expect bytes or str, not %s" % type(filename).__name__)
+
+ return fsencode, fsdecode
+
+fsencode, fsdecode = _fscodec()
+del _fscodec
def _exists(name):
return name in globals()
@@ -551,7 +645,7 @@ if _exists("fork") and not _exists("spawnv") and _exists("execv"):
elif WIFEXITED(sts):
return WEXITSTATUS(sts)
else:
- raise error, "Not stopped, signaled or exited???"
+ raise error("Not stopped, signaled or exited???")
def spawnv(mode, file, args):
"""spawnv(mode, file, args) -> integer
@@ -649,70 +743,7 @@ otherwise return -SIG, where SIG is the signal that killed it. """
__all__.extend(["spawnvp", "spawnvpe", "spawnlp", "spawnlpe",])
-
-# Supply popen2 etc. (for Unix)
-if _exists("fork"):
- if not _exists("popen2"):
- def popen2(cmd, mode="t", bufsize=-1):
- """Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd'
- may be a sequence, in which case arguments will be passed directly to
- the program without shell intervention (as with os.spawnv()). If 'cmd'
- is a string it will be passed to the shell (as with os.system()). If
- 'bufsize' is specified, it sets the buffer size for the I/O pipes. The
- file objects (child_stdin, child_stdout) are returned."""
- import warnings
- msg = "os.popen2 is deprecated. Use the subprocess module."
- warnings.warn(msg, DeprecationWarning, stacklevel=2)
-
- import subprocess
- PIPE = subprocess.PIPE
- p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring),
- bufsize=bufsize, stdin=PIPE, stdout=PIPE,
- close_fds=True)
- return p.stdin, p.stdout
- __all__.append("popen2")
-
- if not _exists("popen3"):
- def popen3(cmd, mode="t", bufsize=-1):
- """Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd'
- may be a sequence, in which case arguments will be passed directly to
- the program without shell intervention (as with os.spawnv()). If 'cmd'
- is a string it will be passed to the shell (as with os.system()). If
- 'bufsize' is specified, it sets the buffer size for the I/O pipes. The
- file objects (child_stdin, child_stdout, child_stderr) are returned."""
- import warnings
- msg = "os.popen3 is deprecated. Use the subprocess module."
- warnings.warn(msg, DeprecationWarning, stacklevel=2)
-
- import subprocess
- PIPE = subprocess.PIPE
- p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring),
- bufsize=bufsize, stdin=PIPE, stdout=PIPE,
- stderr=PIPE, close_fds=True)
- return p.stdin, p.stdout, p.stderr
- __all__.append("popen3")
-
- if not _exists("popen4"):
- def popen4(cmd, mode="t", bufsize=-1):
- """Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd'
- may be a sequence, in which case arguments will be passed directly to
- the program without shell intervention (as with os.spawnv()). If 'cmd'
- is a string it will be passed to the shell (as with os.system()). If
- 'bufsize' is specified, it sets the buffer size for the I/O pipes. The
- file objects (child_stdin, child_stdout_stderr) are returned."""
- import warnings
- msg = "os.popen4 is deprecated. Use the subprocess module."
- warnings.warn(msg, DeprecationWarning, stacklevel=2)
-
- import subprocess
- PIPE = subprocess.PIPE
- p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring),
- bufsize=bufsize, stdin=PIPE, stdout=PIPE,
- stderr=subprocess.STDOUT, close_fds=True)
- return p.stdin, p.stdout
- __all__.append("popen4")
-
-import copy_reg as _copy_reg
+import copyreg as _copyreg
def _make_stat_result(tup, dict):
return stat_result(tup, dict)
@@ -722,7 +753,7 @@ def _pickle_stat_result(sr):
return (_make_stat_result, args)
try:
- _copy_reg.pickle(stat_result, _pickle_stat_result, _make_stat_result)
+ _copyreg.pickle(stat_result, _pickle_stat_result, _make_stat_result)
except NameError: # stat_result may not exist
pass
@@ -734,7 +765,59 @@ def _pickle_statvfs_result(sr):
return (_make_statvfs_result, args)
try:
- _copy_reg.pickle(statvfs_result, _pickle_statvfs_result,
+ _copyreg.pickle(statvfs_result, _pickle_statvfs_result,
_make_statvfs_result)
except NameError: # statvfs_result may not exist
pass
+
+# Supply os.popen()
+def popen(cmd, mode="r", buffering=-1):
+ if not isinstance(cmd, str):
+ raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
+ if mode not in ("r", "w"):
+ raise ValueError("invalid mode %r" % mode)
+ if buffering == 0 or buffering == None:
+ raise ValueError("popen() does not support unbuffered streams")
+ import subprocess, io
+ if mode == "r":
+ proc = subprocess.Popen(cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
+ bufsize=buffering)
+ return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
+ else:
+ proc = subprocess.Popen(cmd,
+ shell=True,
+ stdin=subprocess.PIPE,
+ bufsize=buffering)
+ return _wrap_close(io.TextIOWrapper(proc.stdin), proc)
+
+# Helper for popen() -- a proxy for a file whose close waits for the process
+class _wrap_close:
+ def __init__(self, stream, proc):
+ self._stream = stream
+ self._proc = proc
+ def close(self):
+ self._stream.close()
+ returncode = self._proc.wait()
+ if returncode == 0:
+ return None
+ if name == 'nt':
+ return returncode
+ else:
+ return returncode << 8 # Shift left to match old behavior
+ def __enter__(self):
+ return self
+ def __exit__(self, *args):
+ self.close()
+ def __getattr__(self, name):
+ return getattr(self._stream, name)
+ def __iter__(self):
+ return iter(self._stream)
+
+# Supply os.fdopen()
+def fdopen(fd, *args, **kwargs):
+ if not isinstance(fd, int):
+ raise TypeError("invalid fd type (%s, expected integer)" % type(fd))
+ import io
+ return io.open(fd, *args, **kwargs)