1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
"""
Basic subprocess implementation for POSIX which only uses os functions. Only
implement features required by setup.py to build C extension modules when
subprocess is unavailable. setup.py is not used on Windows.
"""
import os
def _waitstatus_to_exitcode(status):
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
return -os.WTERMSIG(status)
else:
raise ValueError(f"invalid wait status: {status!r}")
# distutils.spawn used by distutils.command.build_ext
# calls subprocess.Popen().wait()
class Popen:
def __init__(self, cmd, env=None):
self._cmd = cmd
self._env = env
self.returncode = None
def wait(self):
pid = os.fork()
if pid == 0:
# Child process
try:
if self._env is not None:
os.execve(self._cmd[0], self._cmd, self._env)
else:
os.execv(self._cmd[0], self._cmd)
finally:
os._exit(1)
else:
# Parent process
_, status = os.waitpid(pid, 0)
self.returncode = _waitstatus_to_exitcode(status)
return self.returncode
def _check_cmd(cmd):
# Use regex [a-zA-Z0-9./-]+: reject empty string, space, etc.
safe_chars = []
for first, last in (("a", "z"), ("A", "Z"), ("0", "9")):
for ch in range(ord(first), ord(last) + 1):
safe_chars.append(chr(ch))
safe_chars.append("./-")
safe_chars = ''.join(safe_chars)
if isinstance(cmd, (tuple, list)):
check_strs = cmd
elif isinstance(cmd, str):
check_strs = [cmd]
else:
return False
for arg in check_strs:
if not isinstance(arg, str):
return False
if not arg:
# reject empty string
return False
for ch in arg:
if ch not in safe_chars:
return False
return True
# _aix_support used by distutil.util calls subprocess.check_output()
def check_output(cmd, **kwargs):
if kwargs:
raise NotImplementedError(repr(kwargs))
if not _check_cmd(cmd):
raise ValueError(f"unsupported command: {cmd!r}")
tmp_filename = "check_output.tmp"
if not isinstance(cmd, str):
cmd = " ".join(cmd)
cmd = f"{cmd} >{tmp_filename}"
try:
# system() spawns a shell
status = os.system(cmd)
exitcode = _waitstatus_to_exitcode(status)
if exitcode:
raise ValueError(f"Command {cmd!r} returned non-zero "
f"exit status {exitcode!r}")
try:
with open(tmp_filename, "rb") as fp:
stdout = fp.read()
except FileNotFoundError:
stdout = b''
finally:
try:
os.unlink(tmp_filename)
except OSError:
pass
return stdout
|