summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py181
1 files changed, 126 insertions, 55 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index fb0b834..5deec42 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -60,6 +60,8 @@ class BaseTestCase(unittest.TestCase):
# shutdown time. That frustrates tests trying to check stderr produced
# from a spawned Python process.
actual = support.strip_python_stderr(stderr)
+ # strip_python_stderr also strips whitespace, so we do too.
+ expected = expected.strip()
self.assertEqual(actual, expected, msg)
@@ -71,6 +73,15 @@ class ProcessTestCase(BaseTestCase):
"import sys; sys.exit(47)"])
self.assertEqual(rc, 47)
+ def test_call_timeout(self):
+ # call() function with timeout argument; we want to test that the child
+ # process gets killed when the timeout expires. If the child isn't
+ # killed, this call will deadlock since subprocess.call waits for the
+ # child.
+ self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
+ [sys.executable, "-c", "while True: pass"],
+ timeout=0.1)
+
def test_check_call_zero(self):
# check_call() function with zero return code
rc = subprocess.check_call([sys.executable, "-c",
@@ -113,6 +124,21 @@ class ProcessTestCase(BaseTestCase):
self.fail("Expected ValueError when stdout arg supplied.")
self.assertIn('stdout', c.exception.args[0])
+ def test_check_output_timeout(self):
+ # check_output() function with timeout arg
+ with self.assertRaises(subprocess.TimeoutExpired) as c:
+ output = subprocess.check_output(
+ [sys.executable, "-c",
+ "import sys, time\n"
+ "sys.stdout.write('BDFL')\n"
+ "sys.stdout.flush()\n"
+ "time.sleep(3600)"],
+ # Some heavily loaded buildbots (sparc Debian 3.x) require
+ # this much time to start and print.
+ timeout=3)
+ self.fail("Expected TimeoutExpired.")
+ self.assertEqual(c.exception.output, b'BDFL')
+
def test_call_kwargs(self):
# call() function with keyword args
newenv = os.environ.copy()
@@ -312,6 +338,31 @@ class ProcessTestCase(BaseTestCase):
rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
self.assertEqual(rc, 2)
+ def test_stdout_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'for i in range(10240):'
+ 'print("x" * 1024)'],
+ stdout=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stdout, None)
+
+ def test_stderr_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys\n'
+ 'for i in range(10240):'
+ 'sys.stderr.write("x" * 1024)'],
+ stderr=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stderr, None)
+
+ def test_stdin_devnull(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys;'
+ 'sys.stdin.read(1)'],
+ stdin=subprocess.DEVNULL)
+ p.wait()
+ self.assertEqual(p.stdin, None)
+
def test_cwd(self):
tmpdir = tempfile.gettempdir()
# We cannot use os.path.realpath to canonicalize the path,
@@ -400,6 +451,41 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(stdout, b"banana")
self.assertStderrEqual(stderr, b"pineapple")
+ def test_communicate_timeout(self):
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os,time;'
+ 'sys.stderr.write("pineapple\\n");'
+ 'time.sleep(1);'
+ 'sys.stderr.write("pear\\n");'
+ 'sys.stdout.write(sys.stdin.read())'],
+ universal_newlines=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
+ timeout=0.3)
+ # Make sure we can keep waiting for it, and that we get the whole output
+ # after it completes.
+ (stdout, stderr) = p.communicate()
+ self.assertEqual(stdout, "banana")
+ self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
+
+ def test_communicate_timeout_large_ouput(self):
+ # Test an expiring timeout while the child is outputting lots of data.
+ p = subprocess.Popen([sys.executable, "-c",
+ 'import sys,os,time;'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'
+ 'time.sleep(0.2);'
+ 'sys.stdout.write("a" * (64 * 1024));'],
+ stdout=subprocess.PIPE)
+ self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
+ (stdout, _) = p.communicate()
+ self.assertEqual(len(stdout), 4 * 64 * 1024)
+
# Test for the fd leak reported in http://bugs.python.org/issue2791.
def test_communicate_pipe_fd_leak(self):
for stdin_pipe in (False, True):
@@ -436,24 +522,21 @@ class ProcessTestCase(BaseTestCase):
# This test will probably deadlock rather than fail, if
# communicate() does not work properly.
x, y = os.pipe()
- if mswindows:
- pipe_buf = 512
- else:
- pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
os.close(x)
os.close(y)
p = subprocess.Popen([sys.executable, "-c",
'import sys,os;'
'sys.stdout.write(sys.stdin.read(47));'
- 'sys.stderr.write("xyz"*%d);'
- 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
+ 'sys.stderr.write("x" * %d);'
+ 'sys.stdout.write(sys.stdin.read())' %
+ support.PIPE_MAX_SIZE],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
self.addCleanup(p.stdin.close)
- string_to_write = b"abc"*pipe_buf
+ string_to_write = b"a" * support.PIPE_MAX_SIZE
(stdout, stderr) = p.communicate(string_to_write)
self.assertEqual(stdout, string_to_write)
@@ -603,30 +686,32 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(subprocess.list2cmdline(['ab', '']),
'ab ""')
-
def test_poll(self):
- p = subprocess.Popen([sys.executable,
- "-c", "import time; time.sleep(1)"])
- count = 0
- while p.poll() is None:
- time.sleep(0.1)
- count += 1
- # We expect that the poll loop probably went around about 10 times,
- # but, based on system scheduling we can't control, it's possible
- # poll() never returned None. It "should be" very rare that it
- # didn't go around at least twice.
- self.assertGreaterEqual(count, 2)
+ p = subprocess.Popen([sys.executable, "-c",
+ "import os",
+ "os.read(1)"], stdin=subprocess.PIPE)
+ self.addCleanup(p.stdin.close)
+ self.assertIsNone(p.poll())
+ os.write(p.stdin.fileno(), b'A')
+ p.wait()
# Subsequent invocations should just return the returncode
self.assertEqual(p.poll(), 0)
-
def test_wait(self):
- p = subprocess.Popen([sys.executable,
- "-c", "import time; time.sleep(2)"])
+ p = subprocess.Popen([sys.executable, "-c", "pass"])
self.assertEqual(p.wait(), 0)
# Subsequent invocations should just return the returncode
self.assertEqual(p.wait(), 0)
+ def test_wait_timeout(self):
+ p = subprocess.Popen([sys.executable,
+ "-c", "import time; time.sleep(0.1)"])
+ with self.assertRaises(subprocess.TimeoutExpired) as c:
+ p.wait(timeout=0.01)
+ self.assertIn("0.01", str(c.exception)) # For coverage of __str__.
+ # Some heavily loaded buildbots (sparc Debian 3.x) require this much
+ # time to start.
+ self.assertEqual(p.wait(timeout=3), 0)
def test_invalid_bufsize(self):
# an invalid type of the bufsize argument should raise
@@ -705,25 +790,29 @@ class ProcessTestCase(BaseTestCase):
p = subprocess.Popen([sys.executable, "-c", 'pass'],
stdin=subprocess.PIPE)
self.addCleanup(p.stdin.close)
- time.sleep(2)
+ p.wait()
p.communicate(b"x" * 2**20)
- @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
- "Requires signal.SIGALRM")
+ @unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
+ "Requires signal.SIGUSR1")
+ @unittest.skipUnless(hasattr(os, 'kill'),
+ "Requires os.kill")
+ @unittest.skipUnless(hasattr(os, 'getppid'),
+ "Requires os.getppid")
def test_communicate_eintr(self):
# Issue #12493: communicate() should handle EINTR
def handler(signum, frame):
pass
- old_handler = signal.signal(signal.SIGALRM, handler)
- self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
+ old_handler = signal.signal(signal.SIGUSR1, handler)
+ self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
- # the process is running for 2 seconds
- args = [sys.executable, "-c", 'import time; time.sleep(2)']
+ args = [sys.executable, "-c",
+ 'import os, signal;'
+ 'os.kill(os.getppid(), signal.SIGUSR1)']
for stream in ('stdout', 'stderr'):
kw = {stream: subprocess.PIPE}
with subprocess.Popen(args, **kw) as process:
- signal.alarm(1)
- # communicate() will be interrupted by SIGALRM
+ # communicate() will be interrupted by SIGUSR1
process.communicate()
@@ -1221,6 +1310,11 @@ class POSIXProcessTestCase(BaseTestCase):
exitcode = subprocess.call([abs_program, "-c", "pass"])
self.assertEqual(exitcode, 0)
+ # absolute bytes path as a string
+ cmd = b"'" + abs_program + b"' -c pass"
+ exitcode = subprocess.call(cmd, shell=True)
+ self.assertEqual(exitcode, 0)
+
# bytes program, unicode PATH
env = os.environ.copy()
env["PATH"] = path
@@ -1404,7 +1498,7 @@ class POSIXProcessTestCase(BaseTestCase):
stdout, stderr = p.communicate()
self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
" non-zero with this error:\n%s" %
- stderr.decode('utf8'))
+ stderr.decode('utf-8'))
def test_select_unbuffered(self):
# Issue #11459: bufsize=0 should really set the pipes as
@@ -1617,28 +1711,6 @@ class ProcessTestCaseNoPoll(ProcessTestCase):
ProcessTestCase.tearDown(self)
-@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False),
- "_posixsubprocess extension module not found.")
-class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase):
- @classmethod
- def setUpClass(cls):
- global subprocess
- assert subprocess._posixsubprocess
- # Reimport subprocess while forcing _posixsubprocess to not exist.
- with support.check_warnings(('.*_posixsubprocess .* not being used.*',
- RuntimeWarning)):
- subprocess = support.import_fresh_module(
- 'subprocess', blocked=['_posixsubprocess'])
- assert not subprocess._posixsubprocess
-
- @classmethod
- def tearDownClass(cls):
- global subprocess
- # Reimport subprocess as it should be, restoring order to the universe.
- subprocess = support.import_fresh_module('subprocess')
- assert subprocess._posixsubprocess
-
-
class HelperFunctionTests(unittest.TestCase):
@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
def test_eintr_retry_call(self):
@@ -1747,7 +1819,6 @@ def test_main():
unit_tests = (ProcessTestCase,
POSIXProcessTestCase,
Win32ProcessTestCase,
- ProcessTestCasePOSIXPurePython,
CommandTests,
ProcessTestCaseNoPoll,
HelperFunctionTests,