diff options
author | Patrick McLean <47801044+patrick-mclean@users.noreply.github.com> | 2019-09-12 17:15:44 (GMT) |
---|---|---|
committer | Gregory P. Smith <greg@krypto.org> | 2019-09-12 17:15:44 (GMT) |
commit | 2b2ead74382513d0bb9ef34504e283a71e6a706f (patch) | |
tree | 28a8a0f37d31dc7a674d2690085a2dcd8a629118 /Lib/test | |
parent | 57b7dbc46e71269d855e644d30826d33eedee2a1 (diff) | |
download | cpython-2b2ead74382513d0bb9ef34504e283a71e6a706f.zip cpython-2b2ead74382513d0bb9ef34504e283a71e6a706f.tar.gz cpython-2b2ead74382513d0bb9ef34504e283a71e6a706f.tar.bz2 |
bpo-36046: Add user and group parameters to subprocess (GH-11950)
* subprocess: Add user, group and extra_groups paremeters to subprocess.Popen
This adds a `user` parameter to the Popen constructor that will call
setreuid() in the child before calling exec(). This allows processes
running as root to safely drop privileges before running the subprocess
without having to use a preexec_fn.
This also adds a `group` parameter that will call setregid() in
the child process before calling exec().
Finally an `extra_groups` parameter was added that will call
setgroups() to set the supplimental groups.
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_capi.py | 6 | ||||
-rw-r--r-- | Lib/test/test_subprocess.py | 162 |
2 files changed, 161 insertions, 7 deletions
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py index 4d6e2f2..a384064 100644 --- a/Lib/test/test_capi.py +++ b/Lib/test/test_capi.py @@ -96,7 +96,7 @@ class CAPITest(unittest.TestCase): def __len__(self): return 1 self.assertRaises(TypeError, _posixsubprocess.fork_exec, - 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) + 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20) # Issue #15736: overflow in _PySequence_BytesToCharpArray() class Z(object): def __len__(self): @@ -104,7 +104,7 @@ class CAPITest(unittest.TestCase): def __getitem__(self, i): return b'x' self.assertRaises(MemoryError, _posixsubprocess.fork_exec, - 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) + 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20) @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') def test_subprocess_fork_exec(self): @@ -114,7 +114,7 @@ class CAPITest(unittest.TestCase): # Issue #15738: crash in subprocess_fork_exec() self.assertRaises(TypeError, _posixsubprocess.fork_exec, - Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) + Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20) @unittest.skipIf(MISSING_C_DOCSTRINGS, "Signature information for builtins requires docstrings") diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 91f525d..f107022 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -18,6 +18,7 @@ import shutil import threading import gc import textwrap +import json from test.support import FakePath try: @@ -25,6 +26,14 @@ try: except ImportError: _testcapi = None +try: + import pwd +except ImportError: + pwd = None +try: + import grp +except ImportError: + grp = None if support.PGO: raise unittest.SkipTest("test is not helpful for PGO") @@ -1733,6 +1742,139 @@ class POSIXProcessTestCase(BaseTestCase): child_sid = int(output) self.assertNotEqual(parent_sid, child_sid) + @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform') + def test_user(self): + # For code coverage of the user parameter. We don't care if we get an + # EPERM error from it depending on the test execution environment, that + # still indicates that it was called. + + uid = os.geteuid() + test_users = [65534 if uid != 65534 else 65533, uid] + name_uid = "nobody" if sys.platform != 'darwin' else "unknown" + + if pwd is not None: + test_users.append(name_uid) + + for user in test_users: + with self.subTest(user=user): + try: + output = subprocess.check_output( + [sys.executable, "-c", + "import os; print(os.getuid())"], + user=user) + except OSError as e: + if e.errno != errno.EPERM: + raise + else: + if isinstance(user, str): + user_uid = pwd.getpwnam(user).pw_uid + else: + user_uid = user + child_user = int(output) + self.assertEqual(child_user, user_uid) + + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], user=-1) + + if pwd is None: + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], user=name_uid) + + @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform') + def test_user_error(self): + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], user=65535) + + @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform') + def test_group(self): + gid = os.getegid() + group_list = [65534 if gid != 65534 else 65533] + name_group = "nogroup" if sys.platform != 'darwin' else "staff" + + if grp is not None: + group_list.append(name_group) + + for group in group_list + [gid]: + with self.subTest(group=group): + try: + output = subprocess.check_output( + [sys.executable, "-c", + "import os; print(os.getgid())"], + group=group) + except OSError as e: + if e.errno != errno.EPERM: + raise + else: + if isinstance(group, str): + group_gid = grp.getgrnam(group).gr_gid + else: + group_gid = group + + child_group = int(output) + self.assertEqual(child_group, group_gid) + + # make sure we bomb on negative values + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], group=-1) + + if grp is None: + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], group=name_group) + + @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform') + def test_group_error(self): + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], group=65535) + + @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') + def test_extra_groups(self): + gid = os.getegid() + group_list = [65534 if gid != 65534 else 65533] + name_group = "nogroup" if sys.platform != 'darwin' else "staff" + perm_error = False + + if grp is not None: + group_list.append(name_group) + + try: + output = subprocess.check_output( + [sys.executable, "-c", + "import os, sys, json; json.dump(os.getgroups(), sys.stdout)"], + extra_groups=group_list) + except OSError as ex: + if ex.errno != errno.EPERM: + raise + perm_error = True + + else: + parent_groups = os.getgroups() + child_groups = json.loads(output) + + if grp is not None: + desired_gids = [grp.getgrnam(g).gr_gid if isinstance(g, str) else g + for g in group_list] + else: + desired_gids = group_list + + if perm_error: + self.assertEqual(set(child_groups), set(parent_groups)) + else: + self.assertEqual(set(desired_gids), set(child_groups)) + + # make sure we bomb on negative values + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[-1]) + + if grp is None: + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], + extra_groups=[name_group]) + + @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform') + def test_extra_groups_error(self): + with self.assertRaises(ValueError): + subprocess.check_call([sys.executable, "-c", "pass"], extra_groups=[]) + def test_run_abort(self): # returncode handles signal termination with support.SuppressCrashReport(): @@ -2782,13 +2924,23 @@ class POSIXProcessTestCase(BaseTestCase): ([b"arg"], [b"exe"], 123, [b"env"]), ([b"arg"], [b"exe"], None, 123), ): - with self.assertRaises(TypeError): + with self.assertRaises(TypeError) as err: _posixsubprocess.fork_exec( args, exe_list, True, (), cwd, env_list, -1, -1, -1, -1, 1, 2, 3, 4, - True, True, func) + True, True, + False, [], 0, + func) + # Attempt to prevent + # "TypeError: fork_exec() takes exactly N arguments (M given)" + # from passing the test. More refactoring to have us start + # with a valid *args list, confirm a good call with that works + # before mutating it in various ways to ensure that bad calls + # with individual arg type errors raise a typeerror would be + # ideal. Saving that for a future PR... + self.assertNotIn('takes exactly', str(err.exception)) finally: if not gc_enabled: gc.disable() @@ -2827,7 +2979,9 @@ class POSIXProcessTestCase(BaseTestCase): True, fds_to_keep, None, [b"env"], -1, -1, -1, -1, 1, 2, 3, 4, - True, True, None) + True, True, + None, None, None, + None) self.assertIn('fds_to_keep', str(c.exception)) finally: if not gc_enabled: @@ -3239,7 +3393,7 @@ class MiscTests(unittest.TestCase): def test__all__(self): """Ensure that __all__ is populated properly.""" - intentionally_excluded = {"list2cmdline", "Handle"} + intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp"} exported = set(subprocess.__all__) possible_exports = set() import types |