diff options
Diffstat (limited to 'Lib/subprocess.py')
| -rw-r--r-- | Lib/subprocess.py | 337 | 
1 files changed, 246 insertions, 91 deletions
| diff --git a/Lib/subprocess.py b/Lib/subprocess.py index c02fb52..69b769b 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -191,8 +191,10 @@ should prepare for OSErrors.  A ValueError will be raised if Popen is called with invalid arguments. -check_call() and check_output() will raise CalledProcessError, if the -called process returns a non-zero return code. +Exceptions defined within this module inherit from SubprocessError. +check_call() and check_output() will raise CalledProcessError if the +called process returns a non-zero return code.  TimeoutExpired +be raised if a timeout was specified and expired.  Security @@ -340,6 +342,7 @@ mswindows = (sys.platform == "win32")  import io  import os +import time  import traceback  import gc  import signal @@ -347,7 +350,10 @@ import builtins  import warnings  # Exception classes used by this module. -class CalledProcessError(Exception): +class SubprocessError(Exception): pass + + +class CalledProcessError(SubprocessError):      """This exception is raised when a process run by check_call() or      check_output() returns a non-zero exit status.      The exit status will be stored in the returncode attribute; @@ -361,6 +367,19 @@ class CalledProcessError(Exception):          return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) +class TimeoutExpired(SubprocessError): +    """This exception is raised when the timeout expires while waiting for a +    child process. +    """ +    def __init__(self, cmd, output=None): +        self.cmd = cmd +        self.output = output + +    def __str__(self): +        return ("Command '%s' timed out after %s seconds" % +                (self.cmd, self.timeout)) + +  if mswindows:      import threading      import msvcrt @@ -449,15 +468,21 @@ def _eintr_retry_call(func, *args):              raise -def call(*popenargs, **kwargs): -    """Run command with arguments.  Wait for command to complete, then -    return the returncode attribute. +def call(*popenargs, timeout=None, **kwargs): +    """Run command with arguments.  Wait for command to complete or +    timeout, then return the returncode attribute.      The arguments are the same as for the Popen constructor.  Example:      retcode = call(["ls", "-l"])      """ -    return Popen(*popenargs, **kwargs).wait() +    p = Popen(*popenargs, **kwargs) +    try: +        return p.wait(timeout=timeout) +    except TimeoutExpired: +        p.kill() +        p.wait() +        raise  def check_call(*popenargs, **kwargs): @@ -466,7 +491,7 @@ def check_call(*popenargs, **kwargs):      CalledProcessError.  The CalledProcessError object will have the      return code in the returncode attribute. -    The arguments are the same as for the Popen constructor.  Example: +    The arguments are the same as for the call function.  Example:      check_call(["ls", "-l"])      """ @@ -479,7 +504,7 @@ def check_call(*popenargs, **kwargs):      return 0 -def check_output(*popenargs, **kwargs): +def check_output(*popenargs, timeout=None, **kwargs):      r"""Run command with arguments and return its output as a byte string.      If the exit code was non-zero it raises a CalledProcessError.  The @@ -502,13 +527,15 @@ def check_output(*popenargs, **kwargs):      if 'stdout' in kwargs:          raise ValueError('stdout argument not allowed, it will be overridden.')      process = Popen(*popenargs, stdout=PIPE, **kwargs) -    output, unused_err = process.communicate() +    try: +        output, unused_err = process.communicate(timeout=timeout) +    except TimeoutExpired: +        process.kill() +        output, unused_err = process.communicate() +        raise TimeoutExpired(process.args, output=output)      retcode = process.poll()      if retcode: -        cmd = kwargs.get("args") -        if cmd is None: -            cmd = popenargs[0] -        raise CalledProcessError(retcode, cmd, output=output) +        raise CalledProcessError(retcode, process.args, output=output)      return output @@ -639,6 +666,8 @@ class Popen(object):          _cleanup()          self._child_created = False +        self._input = None +        self._communication_started = False          if bufsize is None:              bufsize = 0  # Restore default          if not isinstance(bufsize, int): @@ -673,6 +702,7 @@ class Popen(object):                  raise ValueError("creationflags is only supported on Windows "                                   "platforms") +        self.args = args          self.stdin = None          self.stdout = None          self.stderr = None @@ -771,7 +801,7 @@ class Popen(object):              _active.append(self) -    def communicate(self, input=None): +    def communicate(self, input=None, timeout=None):          """Interact with process: Send data to stdin.  Read data from          stdout and stderr, until end-of-file is reached.  Wait for          process to terminate.  The optional input argument should be a @@ -780,9 +810,19 @@ class Popen(object):          communicate() returns a tuple (stdout, stderr).""" -        # Optimization: If we are only using one pipe, or no pipe at -        # all, using select() or threads is unnecessary. -        if [self.stdin, self.stdout, self.stderr].count(None) >= 2: +        if self._communication_started and input: +            raise ValueError("Cannot send input after starting communication") + +        if timeout is not None: +            endtime = time.time() + timeout +        else: +            endtime = None + +        # Optimization: If we are not worried about timeouts, we haven't +        # started communicating, and we have one or zero pipes, using select() +        # or threads is unnecessary. +        if (endtime is None and not self._communication_started and +            [self.stdin, self.stdout, self.stderr].count(None) >= 2):              stdout = None              stderr = None              if self.stdin: @@ -798,13 +838,36 @@ class Popen(object):              self.wait()              return (stdout, stderr) -        return self._communicate(input) +        try: +            stdout, stderr = self._communicate(input, endtime) +        finally: +            self._communication_started = True + +        sts = self.wait(timeout=self._remaining_time(endtime)) + +        return (stdout, stderr)      def poll(self):          return self._internal_poll() +    def _remaining_time(self, endtime): +        """Convenience for _communicate when computing timeouts.""" +        if endtime is None: +            return None +        else: +            return endtime - time.time() + + +    def _check_timeout(self, endtime): +        """Convenience for checking if a timeout has expired.""" +        if endtime is None: +            return +        if time.time() > endtime: +            raise TimeoutExpired(self.args) + +      if mswindows:          #          # Windows methods @@ -987,12 +1050,17 @@ class Popen(object):              return self.returncode -        def wait(self): +        def wait(self, timeout=None):              """Wait for child process to terminate.  Returns returncode              attribute.""" +            if timeout is None: +                timeout = _subprocess.INFINITE +            else: +                timeout = int(timeout * 1000)              if self.returncode is None: -                _subprocess.WaitForSingleObject(self._handle, -                                                _subprocess.INFINITE) +                result = _subprocess.WaitForSingleObject(self._handle, timeout) +                if result == _subprocess.WAIT_TIMEOUT: +                    raise TimeoutExpired(self.args)                  self.returncode = _subprocess.GetExitCodeProcess(self._handle)              return self.returncode @@ -1002,32 +1070,51 @@ class Popen(object):              fh.close() -        def _communicate(self, input): -            stdout = None # Return -            stderr = None # Return - -            if self.stdout: -                stdout = [] -                stdout_thread = threading.Thread(target=self._readerthread, -                                                 args=(self.stdout, stdout)) -                stdout_thread.daemon = True -                stdout_thread.start() -            if self.stderr: -                stderr = [] -                stderr_thread = threading.Thread(target=self._readerthread, -                                                 args=(self.stderr, stderr)) -                stderr_thread.daemon = True -                stderr_thread.start() +        def _communicate(self, input, endtime): +            # Start reader threads feeding into a list hanging off of this +            # object, unless they've already been started. +            if self.stdout and not hasattr(self, "_stdout_buff"): +                self._stdout_buff = [] +                self.stdout_thread = \ +                        threading.Thread(target=self._readerthread, +                                         args=(self.stdout, self._stdout_buff)) +                self.stdout_thread.daemon = True +                self.stdout_thread.start() +            if self.stderr and not hasattr(self, "_stderr_buff"): +                self._stderr_buff = [] +                self.stderr_thread = \ +                        threading.Thread(target=self._readerthread, +                                         args=(self.stderr, self._stderr_buff)) +                self.stderr_thread.daemon = True +                self.stderr_thread.start()              if self.stdin:                  if input is not None:                      self.stdin.write(input)                  self.stdin.close() +            # Wait for the reader threads, or time out.  If we time out, the +            # threads remain reading and the fds left open in case the user +            # calls communicate again. +            if self.stdout is not None: +                self.stdout_thread.join(self._remaining_time(endtime)) +                if self.stdout_thread.isAlive(): +                    raise TimeoutExpired(self.args) +            if self.stderr is not None: +                self.stderr_thread.join(self._remaining_time(endtime)) +                if self.stderr_thread.isAlive(): +                    raise TimeoutExpired(self.args) + +            # Collect the output from and close both pipes, now that we know +            # both have been read successfully. +            stdout = None +            stderr = None              if self.stdout: -                stdout_thread.join() +                stdout = self._stdout_buff +                self.stdout.close()              if self.stderr: -                stderr_thread.join() +                stderr = self._stderr_buff +                self.stderr.close()              # All data exchanged.  Translate lists into strings.              if stdout is not None: @@ -1035,7 +1122,6 @@ class Popen(object):              if stderr is not None:                  stderr = stderr[0] -            self.wait()              return (stdout, stderr)          def send_signal(self, sig): @@ -1125,7 +1211,7 @@ class Popen(object):                             restore_signals, start_new_session):              """Execute program (POSIX version)""" -            if isinstance(args, str): +            if isinstance(args, (str, bytes)):                  args = [args]              else:                  args = list(args) @@ -1365,25 +1451,52 @@ class Popen(object):              return self.returncode -        def wait(self): +        def _try_wait(self, wait_flags): +            try: +                (pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags) +            except OSError as e: +                if e.errno != errno.ECHILD: +                    raise +                # This happens if SIGCLD is set to be ignored or waiting +                # for child processes has otherwise been disabled for our +                # process.  This child is dead, we can't get the status. +                pid = self.pid +                sts = 0 +            return (pid, sts) + + +        def wait(self, timeout=None, endtime=None):              """Wait for child process to terminate.  Returns returncode              attribute.""" -            if self.returncode is None: -                try: -                    pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0) -                except OSError as e: -                    if e.errno != errno.ECHILD: -                        raise -                    # This happens if SIGCLD is set to be ignored or waiting -                    # for child processes has otherwise been disabled for our -                    # process.  This child is dead, we can't get the status. -                    sts = 0 +            # If timeout was passed but not endtime, compute endtime in terms of +            # timeout. +            if endtime is None and timeout is not None: +                endtime = time.time() + timeout +            if self.returncode is not None: +                return self.returncode +            elif endtime is not None: +                # Enter a busy loop if we have a timeout.  This busy loop was +                # cribbed from Lib/threading.py in Thread.wait() at r71065. +                delay = 0.0005 # 500 us -> initial delay of 1 ms +                while True: +                    (pid, sts) = self._try_wait(os.WNOHANG) +                    assert pid == self.pid or pid == 0 +                    if pid == self.pid: +                        self._handle_exitstatus(sts) +                        break +                    remaining = self._remaining_time(endtime) +                    if remaining <= 0: +                        raise TimeoutExpired(self.args) +                    delay = min(delay * 2, remaining, .05) +                    time.sleep(delay) +            elif self.returncode is None: +                (pid, sts) = self._try_wait(0)                  self._handle_exitstatus(sts)              return self.returncode -        def _communicate(self, input): -            if self.stdin: +        def _communicate(self, input, endtime): +            if self.stdin and not self._communication_started:                  # Flush stdio buffer.  This might block, if the user has                  # been writing to .stdin in an uncontrolled fashion.                  self.stdin.flush() @@ -1391,9 +1504,11 @@ class Popen(object):                      self.stdin.close()              if _has_poll: -                stdout, stderr = self._communicate_with_poll(input) +                stdout, stderr = self._communicate_with_poll(input, endtime)              else: -                stdout, stderr = self._communicate_with_select(input) +                stdout, stderr = self._communicate_with_select(input, endtime) + +            self.wait(timeout=self._remaining_time(endtime))              # All data exchanged.  Translate lists into strings.              if stdout is not None: @@ -1411,60 +1526,77 @@ class Popen(object):                      stderr = self._translate_newlines(stderr,                                                        self.stderr.encoding) -            self.wait()              return (stdout, stderr) -        def _communicate_with_poll(self, input): +        def _communicate_with_poll(self, input, endtime):              stdout = None # Return              stderr = None # Return -            fd2file = {} -            fd2output = {} + +            if not self._communication_started: +                self._fd2file = {}              poller = select.poll()              def register_and_append(file_obj, eventmask):                  poller.register(file_obj.fileno(), eventmask) -                fd2file[file_obj.fileno()] = file_obj +                self._fd2file[file_obj.fileno()] = file_obj              def close_unregister_and_remove(fd):                  poller.unregister(fd) -                fd2file[fd].close() -                fd2file.pop(fd) +                self._fd2file[fd].close() +                self._fd2file.pop(fd)              if self.stdin and input:                  register_and_append(self.stdin, select.POLLOUT) +            # Only create this mapping if we haven't already. +            if not self._communication_started: +                self._fd2output = {} +                if self.stdout: +                    self._fd2output[self.stdout.fileno()] = [] +                if self.stderr: +                    self._fd2output[self.stderr.fileno()] = [] +              select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI              if self.stdout:                  register_and_append(self.stdout, select_POLLIN_POLLPRI) -                fd2output[self.stdout.fileno()] = stdout = [] +                stdout = self._fd2output[self.stdout.fileno()]              if self.stderr:                  register_and_append(self.stderr, select_POLLIN_POLLPRI) -                fd2output[self.stderr.fileno()] = stderr = [] +                stderr = self._fd2output[self.stderr.fileno()] -            input_offset = 0 -            while fd2file: +            # Save the input here so that if we time out while communicating, +            # we can continue sending input if we retry. +            if self.stdin and self._input is None: +                self._input_offset = 0 +                self._input = input +                if self.universal_newlines: +                    self._input = self._input.encode(self.stdin.encoding) + +            while self._fd2file:                  try: -                    ready = poller.poll() +                    ready = poller.poll(self._remaining_time(endtime))                  except select.error as e:                      if e.args[0] == errno.EINTR:                          continue                      raise +                self._check_timeout(endtime)                  # XXX Rewrite these to use non-blocking I/O on the                  # file objects; they are no longer using C stdio!                  for fd, mode in ready:                      if mode & select.POLLOUT: -                        chunk = input[input_offset : input_offset + _PIPE_BUF] -                        input_offset += os.write(fd, chunk) -                        if input_offset >= len(input): +                        chunk = self._input[self._input_offset : +                                            self._input_offset + _PIPE_BUF] +                        self._input_offset += os.write(fd, chunk) +                        if self._input_offset >= len(self._input):                              close_unregister_and_remove(fd)                      elif mode & select_POLLIN_POLLPRI:                          data = os.read(fd, 4096)                          if not data:                              close_unregister_and_remove(fd) -                        fd2output[fd].append(data) +                        self._fd2output[fd].append(data)                      else:                          # Ignore hang up or errors.                          close_unregister_and_remove(fd) @@ -1472,53 +1604,76 @@ class Popen(object):              return (stdout, stderr) -        def _communicate_with_select(self, input): -            read_set = [] -            write_set = [] +        def _communicate_with_select(self, input, endtime): +            if not self._communication_started: +                self._read_set = [] +                self._write_set = [] +                if self.stdin and input: +                    self._write_set.append(self.stdin) +                if self.stdout: +                    self._read_set.append(self.stdout) +                if self.stderr: +                    self._read_set.append(self.stderr) + +            if self.stdin and self._input is None: +                self._input_offset = 0 +                self._input = input +                if self.universal_newlines: +                    self._input = self._input.encode(self.stdin.encoding) +              stdout = None # Return              stderr = None # Return -            if self.stdin and input: -                write_set.append(self.stdin)              if self.stdout: -                read_set.append(self.stdout) -                stdout = [] +                if not self._communication_started: +                    self._stdout_buff = [] +                stdout = self._stdout_buff              if self.stderr: -                read_set.append(self.stderr) -                stderr = [] +                if not self._communication_started: +                    self._stderr_buff = [] +                stderr = self._stderr_buff -            input_offset = 0 -            while read_set or write_set: +            while self._read_set or self._write_set:                  try: -                    rlist, wlist, xlist = select.select(read_set, write_set, []) +                    (rlist, wlist, xlist) = \ +                        select.select(self._read_set, self._write_set, [], +                                      self._remaining_time(endtime))                  except select.error as e:                      if e.args[0] == errno.EINTR:                          continue                      raise +                # According to the docs, returning three empty lists indicates +                # that the timeout expired. +                if not (rlist or wlist or xlist): +                    raise TimeoutExpired(self.args) +                # We also check what time it is ourselves for good measure. +                self._check_timeout(endtime) +                  # XXX Rewrite these to use non-blocking I/O on the                  # file objects; they are no longer using C stdio!                  if self.stdin in wlist: -                    chunk = input[input_offset : input_offset + _PIPE_BUF] +                    chunk = self._input[self._input_offset : +                                        self._input_offset + _PIPE_BUF]                      bytes_written = os.write(self.stdin.fileno(), chunk) -                    input_offset += bytes_written -                    if input_offset >= len(input): +                    self._input_offset += bytes_written +                    if self._input_offset >= len(self._input):                          self.stdin.close() -                        write_set.remove(self.stdin) +                        self._write_set.remove(self.stdin)                  if self.stdout in rlist:                      data = os.read(self.stdout.fileno(), 1024)                      if not data:                          self.stdout.close() -                        read_set.remove(self.stdout) +                        self._read_set.remove(self.stdout)                      stdout.append(data)                  if self.stderr in rlist:                      data = os.read(self.stderr.fileno(), 1024)                      if not data:                          self.stderr.close() -                        read_set.remove(self.stderr) +                        self._read_set.remove(self.stderr)                      stderr.append(data)              return (stdout, stderr) | 
