diff options
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r-- | Lib/test/test_subprocess.py | 181 |
1 files changed, 126 insertions, 55 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index fb0b834..e3462d9 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -60,6 +60,8 @@ class BaseTestCase(unittest.TestCase): # shutdown time. That frustrates tests trying to check stderr produced # from a spawned Python process. actual = support.strip_python_stderr(stderr) + # strip_python_stderr also strips whitespace, so we do too. + expected = expected.strip() self.assertEqual(actual, expected, msg) @@ -71,6 +73,15 @@ class ProcessTestCase(BaseTestCase): "import sys; sys.exit(47)"]) self.assertEqual(rc, 47) + def test_call_timeout(self): + # call() function with timeout argument; we want to test that the child + # process gets killed when the timeout expires. If the child isn't + # killed, this call will deadlock since subprocess.call waits for the + # child. + self.assertRaises(subprocess.TimeoutExpired, subprocess.call, + [sys.executable, "-c", "while True: pass"], + timeout=0.1) + def test_check_call_zero(self): # check_call() function with zero return code rc = subprocess.check_call([sys.executable, "-c", @@ -113,6 +124,21 @@ class ProcessTestCase(BaseTestCase): self.fail("Expected ValueError when stdout arg supplied.") self.assertIn('stdout', c.exception.args[0]) + def test_check_output_timeout(self): + # check_output() function with timeout arg + with self.assertRaises(subprocess.TimeoutExpired) as c: + output = subprocess.check_output( + [sys.executable, "-c", + "import sys, time\n" + "sys.stdout.write('BDFL')\n" + "sys.stdout.flush()\n" + "time.sleep(3600)"], + # Some heavily loaded buildbots (sparc Debian 3.x) require + # this much time to start and print. + timeout=3) + self.fail("Expected TimeoutExpired.") + self.assertEqual(c.exception.output, b'BDFL') + def test_call_kwargs(self): # call() function with keyword args newenv = os.environ.copy() @@ -312,6 +338,31 @@ class ProcessTestCase(BaseTestCase): rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) self.assertEqual(rc, 2) + def test_stdout_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'for i in range(10240):' + 'print("x" * 1024)'], + stdout=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stdout, None) + + def test_stderr_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys\n' + 'for i in range(10240):' + 'sys.stderr.write("x" * 1024)'], + stderr=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stderr, None) + + def test_stdin_devnull(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys;' + 'sys.stdin.read(1)'], + stdin=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stdin, None) + def test_cwd(self): tmpdir = tempfile.gettempdir() # We cannot use os.path.realpath to canonicalize the path, @@ -400,6 +451,41 @@ class ProcessTestCase(BaseTestCase): self.assertEqual(stdout, b"banana") self.assertStderrEqual(stderr, b"pineapple") + def test_communicate_timeout(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stderr.write("pineapple\\n");' + 'time.sleep(1);' + 'sys.stderr.write("pear\\n");' + 'sys.stdout.write(sys.stdin.read())'], + universal_newlines=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", + timeout=0.3) + # Make sure we can keep waiting for it, and that we get the whole output + # after it completes. + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "banana") + self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n") + + def test_communicate_timeout_large_ouput(self): + # Test an expiring timeout while the child is outputting lots of data. + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));'], + stdout=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) + (stdout, _) = p.communicate() + self.assertEqual(len(stdout), 4 * 64 * 1024) + # Test for the fd leak reported in http://bugs.python.org/issue2791. def test_communicate_pipe_fd_leak(self): for stdin_pipe in (False, True): @@ -436,24 +522,21 @@ class ProcessTestCase(BaseTestCase): # This test will probably deadlock rather than fail, if # communicate() does not work properly. x, y = os.pipe() - if mswindows: - pipe_buf = 512 - else: - pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") os.close(x) os.close(y) p = subprocess.Popen([sys.executable, "-c", 'import sys,os;' 'sys.stdout.write(sys.stdin.read(47));' - 'sys.stderr.write("xyz"*%d);' - 'sys.stdout.write(sys.stdin.read())' % pipe_buf], + 'sys.stderr.write("x" * %d);' + 'sys.stdout.write(sys.stdin.read())' % + support.PIPE_MAX_SIZE], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.addCleanup(p.stdout.close) self.addCleanup(p.stderr.close) self.addCleanup(p.stdin.close) - string_to_write = b"abc"*pipe_buf + string_to_write = b"a" * support.PIPE_MAX_SIZE (stdout, stderr) = p.communicate(string_to_write) self.assertEqual(stdout, string_to_write) @@ -603,30 +686,32 @@ class ProcessTestCase(BaseTestCase): self.assertEqual(subprocess.list2cmdline(['ab', '']), 'ab ""') - def test_poll(self): - p = subprocess.Popen([sys.executable, - "-c", "import time; time.sleep(1)"]) - count = 0 - while p.poll() is None: - time.sleep(0.1) - count += 1 - # We expect that the poll loop probably went around about 10 times, - # but, based on system scheduling we can't control, it's possible - # poll() never returned None. It "should be" very rare that it - # didn't go around at least twice. - self.assertGreaterEqual(count, 2) + p = subprocess.Popen([sys.executable, "-c", + "import os; os.read(0, 1)"], + stdin=subprocess.PIPE) + self.addCleanup(p.stdin.close) + self.assertIsNone(p.poll()) + os.write(p.stdin.fileno(), b'A') + p.wait() # Subsequent invocations should just return the returncode self.assertEqual(p.poll(), 0) - def test_wait(self): - p = subprocess.Popen([sys.executable, - "-c", "import time; time.sleep(2)"]) + p = subprocess.Popen([sys.executable, "-c", "pass"]) self.assertEqual(p.wait(), 0) # Subsequent invocations should just return the returncode self.assertEqual(p.wait(), 0) + def test_wait_timeout(self): + p = subprocess.Popen([sys.executable, + "-c", "import time; time.sleep(0.1)"]) + with self.assertRaises(subprocess.TimeoutExpired) as c: + p.wait(timeout=0.01) + self.assertIn("0.01", str(c.exception)) # For coverage of __str__. + # Some heavily loaded buildbots (sparc Debian 3.x) require this much + # time to start. + self.assertEqual(p.wait(timeout=3), 0) def test_invalid_bufsize(self): # an invalid type of the bufsize argument should raise @@ -705,25 +790,29 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", 'pass'], stdin=subprocess.PIPE) self.addCleanup(p.stdin.close) - time.sleep(2) + p.wait() p.communicate(b"x" * 2**20) - @unittest.skipUnless(hasattr(signal, 'SIGALRM'), - "Requires signal.SIGALRM") + @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), + "Requires signal.SIGUSR1") + @unittest.skipUnless(hasattr(os, 'kill'), + "Requires os.kill") + @unittest.skipUnless(hasattr(os, 'getppid'), + "Requires os.getppid") def test_communicate_eintr(self): # Issue #12493: communicate() should handle EINTR def handler(signum, frame): pass - old_handler = signal.signal(signal.SIGALRM, handler) - self.addCleanup(signal.signal, signal.SIGALRM, old_handler) + old_handler = signal.signal(signal.SIGUSR1, handler) + self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) - # the process is running for 2 seconds - args = [sys.executable, "-c", 'import time; time.sleep(2)'] + args = [sys.executable, "-c", + 'import os, signal;' + 'os.kill(os.getppid(), signal.SIGUSR1)'] for stream in ('stdout', 'stderr'): kw = {stream: subprocess.PIPE} with subprocess.Popen(args, **kw) as process: - signal.alarm(1) - # communicate() will be interrupted by SIGALRM + # communicate() will be interrupted by SIGUSR1 process.communicate() @@ -1221,6 +1310,11 @@ class POSIXProcessTestCase(BaseTestCase): exitcode = subprocess.call([abs_program, "-c", "pass"]) self.assertEqual(exitcode, 0) + # absolute bytes path as a string + cmd = b"'" + abs_program + b"' -c pass" + exitcode = subprocess.call(cmd, shell=True) + self.assertEqual(exitcode, 0) + # bytes program, unicode PATH env = os.environ.copy() env["PATH"] = path @@ -1404,7 +1498,7 @@ class POSIXProcessTestCase(BaseTestCase): stdout, stderr = p.communicate() self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" " non-zero with this error:\n%s" % - stderr.decode('utf8')) + stderr.decode('utf-8')) def test_select_unbuffered(self): # Issue #11459: bufsize=0 should really set the pipes as @@ -1617,28 +1711,6 @@ class ProcessTestCaseNoPoll(ProcessTestCase): ProcessTestCase.tearDown(self) -@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False), - "_posixsubprocess extension module not found.") -class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase): - @classmethod - def setUpClass(cls): - global subprocess - assert subprocess._posixsubprocess - # Reimport subprocess while forcing _posixsubprocess to not exist. - with support.check_warnings(('.*_posixsubprocess .* not being used.*', - RuntimeWarning)): - subprocess = support.import_fresh_module( - 'subprocess', blocked=['_posixsubprocess']) - assert not subprocess._posixsubprocess - - @classmethod - def tearDownClass(cls): - global subprocess - # Reimport subprocess as it should be, restoring order to the universe. - subprocess = support.import_fresh_module('subprocess') - assert subprocess._posixsubprocess - - class HelperFunctionTests(unittest.TestCase): @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") def test_eintr_retry_call(self): @@ -1747,7 +1819,6 @@ def test_main(): unit_tests = (ProcessTestCase, POSIXProcessTestCase, Win32ProcessTestCase, - ProcessTestCasePOSIXPurePython, CommandTests, ProcessTestCaseNoPoll, HelperFunctionTests, |