summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
authorPatrick McLean <47801044+patrick-mclean@users.noreply.github.com>2019-09-12 17:15:44 (GMT)
committerGregory P. Smith <greg@krypto.org>2019-09-12 17:15:44 (GMT)
commit2b2ead74382513d0bb9ef34504e283a71e6a706f (patch)
tree28a8a0f37d31dc7a674d2690085a2dcd8a629118 /Lib/test/test_subprocess.py
parent57b7dbc46e71269d855e644d30826d33eedee2a1 (diff)
downloadcpython-2b2ead74382513d0bb9ef34504e283a71e6a706f.zip
cpython-2b2ead74382513d0bb9ef34504e283a71e6a706f.tar.gz
cpython-2b2ead74382513d0bb9ef34504e283a71e6a706f.tar.bz2
bpo-36046: Add user and group parameters to subprocess (GH-11950)
* subprocess: Add user, group and extra_groups paremeters to subprocess.Popen This adds a `user` parameter to the Popen constructor that will call setreuid() in the child before calling exec(). This allows processes running as root to safely drop privileges before running the subprocess without having to use a preexec_fn. This also adds a `group` parameter that will call setregid() in the child process before calling exec(). Finally an `extra_groups` parameter was added that will call setgroups() to set the supplimental groups.
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py162
1 files changed, 158 insertions, 4 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 91f525d..f107022 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -18,6 +18,7 @@ import shutil
import threading
import gc
import textwrap
+import json
from test.support import FakePath
try:
@@ -25,6 +26,14 @@ try:
except ImportError:
_testcapi = None
+try:
+ import pwd
+except ImportError:
+ pwd = None
+try:
+ import grp
+except ImportError:
+ grp = None
if support.PGO:
raise unittest.SkipTest("test is not helpful for PGO")
@@ -1733,6 +1742,139 @@ class POSIXProcessTestCase(BaseTestCase):
child_sid = int(output)
self.assertNotEqual(parent_sid, child_sid)
+ @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform')
+ def test_user(self):
+ # For code coverage of the user parameter. We don't care if we get an
+ # EPERM error from it depending on the test execution environment, that
+ # still indicates that it was called.
+
+ uid = os.geteuid()
+ test_users = [65534 if uid != 65534 else 65533, uid]
+ name_uid = "nobody" if sys.platform != 'darwin' else "unknown"
+
+ if pwd is not None:
+ test_users.append(name_uid)
+
+ for user in test_users:
+ with self.subTest(user=user):
+ try:
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import os; print(os.getuid())"],
+ user=user)
+ except OSError as e:
+ if e.errno != errno.EPERM:
+ raise
+ else:
+ if isinstance(user, str):
+ user_uid = pwd.getpwnam(user).pw_uid
+ else:
+ user_uid = user
+ child_user = int(output)
+ self.assertEqual(child_user, user_uid)
+
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"], user=-1)
+
+ if pwd is None:
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"], user=name_uid)
+
+ @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform')
+ def test_user_error(self):
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"], user=65535)
+
+ @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform')
+ def test_group(self):
+ gid = os.getegid()
+ group_list = [65534 if gid != 65534 else 65533]
+ name_group = "nogroup" if sys.platform != 'darwin' else "staff"
+
+ if grp is not None:
+ group_list.append(name_group)
+
+ for group in group_list + [gid]:
+ with self.subTest(group=group):
+ try:
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import os; print(os.getgid())"],
+ group=group)
+ except OSError as e:
+ if e.errno != errno.EPERM:
+ raise
+ else:
+ if isinstance(group, str):
+ group_gid = grp.getgrnam(group).gr_gid
+ else:
+ group_gid = group
+
+ child_group = int(output)
+ self.assertEqual(child_group, group_gid)
+
+ # make sure we bomb on negative values
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"], group=-1)
+
+ if grp is None:
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"], group=name_group)
+
+ @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform')
+ def test_group_error(self):
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"], group=65535)
+
+ @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform')
+ def test_extra_groups(self):
+ gid = os.getegid()
+ group_list = [65534 if gid != 65534 else 65533]
+ name_group = "nogroup" if sys.platform != 'darwin' else "staff"
+ perm_error = False
+
+ if grp is not None:
+ group_list.append(name_group)
+
+ try:
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import os, sys, json; json.dump(os.getgroups(), sys.stdout)"],
+ extra_groups=group_list)
+ except OSError as ex:
+ if ex.errno != errno.EPERM:
+ raise
+ perm_error = True
+
+ else:
+ parent_groups = os.getgroups()
+ child_groups = json.loads(output)
+
+ if grp is not None:
+ desired_gids = [grp.getgrnam(g).gr_gid if isinstance(g, str) else g
+ for g in group_list]
+ else:
+ desired_gids = group_list
+
+ if perm_error:
+ self.assertEqual(set(child_groups), set(parent_groups))
+ else:
+ self.assertEqual(set(desired_gids), set(child_groups))
+
+ # make sure we bomb on negative values
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[-1])
+
+ if grp is None:
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"],
+ extra_groups=[name_group])
+
+ @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform')
+ def test_extra_groups_error(self):
+ with self.assertRaises(ValueError):
+ subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[])
+
def test_run_abort(self):
# returncode handles signal termination
with support.SuppressCrashReport():
@@ -2782,13 +2924,23 @@ class POSIXProcessTestCase(BaseTestCase):
([b"arg"], [b"exe"], 123, [b"env"]),
([b"arg"], [b"exe"], None, 123),
):
- with self.assertRaises(TypeError):
+ with self.assertRaises(TypeError) as err:
_posixsubprocess.fork_exec(
args, exe_list,
True, (), cwd, env_list,
-1, -1, -1, -1,
1, 2, 3, 4,
- True, True, func)
+ True, True,
+ False, [], 0,
+ func)
+ # Attempt to prevent
+ # "TypeError: fork_exec() takes exactly N arguments (M given)"
+ # from passing the test. More refactoring to have us start
+ # with a valid *args list, confirm a good call with that works
+ # before mutating it in various ways to ensure that bad calls
+ # with individual arg type errors raise a typeerror would be
+ # ideal. Saving that for a future PR...
+ self.assertNotIn('takes exactly', str(err.exception))
finally:
if not gc_enabled:
gc.disable()
@@ -2827,7 +2979,9 @@ class POSIXProcessTestCase(BaseTestCase):
True, fds_to_keep, None, [b"env"],
-1, -1, -1, -1,
1, 2, 3, 4,
- True, True, None)
+ True, True,
+ None, None, None,
+ None)
self.assertIn('fds_to_keep', str(c.exception))
finally:
if not gc_enabled:
@@ -3239,7 +3393,7 @@ class MiscTests(unittest.TestCase):
def test__all__(self):
"""Ensure that __all__ is populated properly."""
- intentionally_excluded = {"list2cmdline", "Handle"}
+ intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp"}
exported = set(subprocess.__all__)
possible_exports = set()
import types