diff options
author | Victor Stinner <vstinner@python.org> | 2020-04-22 14:30:35 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-22 14:30:35 (GMT) |
commit | 9bee32b34e4fb3e67a88bf14d38153851d4c4598 (patch) | |
tree | c169f63fa285210ff9a384974c91bf23ec1bd41e /Lib/test/test_os.py | |
parent | bcc136ba892e62078a67ad0ca0b34072ec9c88aa (diff) | |
download | cpython-9bee32b34e4fb3e67a88bf14d38153851d4c4598.zip cpython-9bee32b34e4fb3e67a88bf14d38153851d4c4598.tar.gz cpython-9bee32b34e4fb3e67a88bf14d38153851d4c4598.tar.bz2 |
bpo-40138: Fix Windows os.waitpid() for large exit code (GH-19637)
Fix the Windows implementation of os.waitpid() for exit code
larger than "INT_MAX >> 8". The exit status is now interpreted as an
unsigned number.
os.waitstatus_to_exitcode() now accepts wait status larger than
INT_MAX.
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r-- | Lib/test/test_os.py | 72 |
1 files changed, 51 insertions, 21 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 73dc064..74aef47 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -2789,40 +2789,66 @@ class PidTests(unittest.TestCase): # We are the parent of our subprocess self.assertEqual(int(stdout), os.getpid()) + def check_waitpid(self, code, exitcode, callback=None): + if sys.platform == 'win32': + # On Windows, os.spawnv() simply joins arguments with spaces: + # arguments need to be quoted + args = [f'"{sys.executable}"', '-c', f'"{code}"'] + else: + args = [sys.executable, '-c', code] + pid = os.spawnv(os.P_NOWAIT, sys.executable, args) + + if callback is not None: + callback(pid) + + # don't use support.wait_process() to test directly os.waitpid() + # and os.waitstatus_to_exitcode() + pid2, status = os.waitpid(pid, 0) + self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) + self.assertEqual(pid2, pid) + def test_waitpid(self): - args = [sys.executable, '-c', 'pass'] - # Add an implicit test for PyUnicode_FSConverter(). - pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args) - support.wait_process(pid, exitcode=0) + self.check_waitpid(code='pass', exitcode=0) def test_waitstatus_to_exitcode(self): exitcode = 23 - filename = support.TESTFN - self.addCleanup(support.unlink, filename) + code = f'import sys; sys.exit({exitcode})' + self.check_waitpid(code, exitcode=exitcode) - with open(filename, "w") as fp: - print(f'import sys; sys.exit({exitcode})', file=fp) - fp.flush() - args = [sys.executable, filename] - pid = os.spawnv(os.P_NOWAIT, args[0], args) + with self.assertRaises(TypeError): + os.waitstatus_to_exitcode(0.0) - pid2, status = os.waitpid(pid, 0) - self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) - self.assertEqual(pid2, pid) + @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') + def test_waitpid_windows(self): + # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode() + # with exit code larger than INT_MAX. + STATUS_CONTROL_C_EXIT = 0xC000013A + code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' + self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) + + @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') + def test_waitstatus_to_exitcode_windows(self): + max_exitcode = 2 ** 32 - 1 + for exitcode in (0, 1, 5, max_exitcode): + self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8), + exitcode) + + # invalid values + with self.assertRaises(ValueError): + os.waitstatus_to_exitcode((max_exitcode + 1) << 8) + with self.assertRaises(OverflowError): + os.waitstatus_to_exitcode(-1) # Skip the test on Windows @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL') def test_waitstatus_to_exitcode_kill(self): + code = f'import time; time.sleep({support.LONG_TIMEOUT})' signum = signal.SIGKILL - args = [sys.executable, '-c', - f'import time; time.sleep({support.LONG_TIMEOUT})'] - pid = os.spawnv(os.P_NOWAIT, args[0], args) - os.kill(pid, signum) + def kill_process(pid): + os.kill(pid, signum) - pid2, status = os.waitpid(pid, 0) - self.assertEqual(os.waitstatus_to_exitcode(status), -signum) - self.assertEqual(pid2, pid) + self.check_waitpid(code, exitcode=-signum, callback=kill_process) class SpawnTests(unittest.TestCase): @@ -2884,6 +2910,10 @@ class SpawnTests(unittest.TestCase): exitcode = os.spawnv(os.P_WAIT, args[0], args) self.assertEqual(exitcode, self.exitcode) + # Test for PyUnicode_FSConverter() + exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args) + self.assertEqual(exitcode, self.exitcode) + @requires_os_func('spawnve') def test_spawnve(self): args = self.create_args(with_env=True) |