diff options
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r-- | QMTest/TestCmd.py | 427 |
1 files changed, 355 insertions, 72 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py index f5e1c71..8bf054b 100644 --- a/QMTest/TestCmd.py +++ b/QMTest/TestCmd.py @@ -181,12 +181,12 @@ version. # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. __author__ = "Steven Knight <knight at baldmt dot com>" -__revision__ = "TestCmd.py 0.30.D001 2007/10/01 16:53:55 knight" -__version__ = "0.30" +__revision__ = "TestCmd.py 0.31.D001 2008/01/01 09:05:59 knight" +__version__ = "0.31" +import errno import os import os.path -import popen2 import re import shutil import stat @@ -457,6 +457,252 @@ else: default_sleep_seconds = 1 + + +try: + import subprocess +except ImportError: + # The subprocess module doesn't exist in this version of Python, + # so we're going to cobble up something that looks just enough + # like its API for our purposes below. + import new + + subprocess = new.module('subprocess') + + subprocess.PIPE = 'PIPE' + subprocess.STDOUT = 'STDOUT' + subprocess.mswindows = (sys.platform == 'win32') + + try: + import popen2 + popen2.Popen3 + except AttributeError: + class Popen3: + universal_newlines = 1 + def __init__(self, command, **kw): + if sys.platform == 'win32' and command[0] == '"': + command = '"' + command + '"' + (stdin, stdout, stderr) = os.popen3(' ' + command) + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + def close_output(self): + self.stdout.close() + self.resultcode = self.stderr.close() + def wait(self): + return self.resultcode + + else: + try: + popen2.Popen4 + except AttributeError: + # A cribbed Popen4 class, with some retrofitted code from + # the Python 1.5 Popen3 class methods to do certain things + # by hand. + class Popen4(popen2.Popen3): + childerr = None + + def __init__(self, cmd, bufsize=-1): + p2cread, p2cwrite = os.pipe() + c2pread, c2pwrite = os.pipe() + self.pid = os.fork() + if self.pid == 0: + # Child + os.dup2(p2cread, 0) + os.dup2(c2pwrite, 1) + os.dup2(c2pwrite, 2) + for i in range(3, popen2.MAXFD): + try: + os.close(i) + except: pass + try: + os.execvp(cmd[0], cmd) + finally: + os._exit(1) + # Shouldn't come here, I guess + os._exit(1) + os.close(p2cread) + self.tochild = os.fdopen(p2cwrite, 'w', bufsize) + os.close(c2pwrite) + self.fromchild = os.fdopen(c2pread, 'r', bufsize) + popen2._active.append(self) + + popen2.Popen4 = Popen4 + + class Popen3(popen2.Popen3, popen2.Popen4): + universal_newlines = 1 + def __init__(self, command, **kw): + if kw.get('stderr') == 'STDOUT': + apply(popen2.Popen4.__init__, (self, command, 1)) + else: + apply(popen2.Popen3.__init__, (self, command, 1)) + self.stdin = self.tochild + self.stdout = self.fromchild + self.stderr = self.childerr + + subprocess.Popen = Popen3 + + + +# From Josiah Carlson, +# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms +# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554 + +PIPE = subprocess.PIPE + +if subprocess.mswindows: + from win32file import ReadFile, WriteFile + from win32pipe import PeekNamedPipe + import msvcrt +else: + import select + import fcntl + + try: fcntl.F_GETFL + except AttributeError: fcntl.F_GETFL = 3 + + try: fcntl.F_SETFL + except AttributeError: fcntl.F_SETFL = 4 + +class Popen(subprocess.Popen): + def recv(self, maxsize=None): + return self._recv('stdout', maxsize) + + def recv_err(self, maxsize=None): + return self._recv('stderr', maxsize) + + def send_recv(self, input='', maxsize=None): + return self.send(input), self.recv(maxsize), self.recv_err(maxsize) + + def get_conn_maxsize(self, which, maxsize): + if maxsize is None: + maxsize = 1024 + elif maxsize < 1: + maxsize = 1 + return getattr(self, which), maxsize + + def _close(self, which): + getattr(self, which).close() + setattr(self, which, None) + + if subprocess.mswindows: + def send(self, input): + if not self.stdin: + return None + + try: + x = msvcrt.get_osfhandle(self.stdin.fileno()) + (errCode, written) = WriteFile(x, input) + except ValueError: + return self._close('stdin') + except (subprocess.pywintypes.error, Exception), why: + if why[0] in (109, errno.ESHUTDOWN): + return self._close('stdin') + raise + + return written + + def _recv(self, which, maxsize): + conn, maxsize = self.get_conn_maxsize(which, maxsize) + if conn is None: + return None + + try: + x = msvcrt.get_osfhandle(conn.fileno()) + (read, nAvail, nMessage) = PeekNamedPipe(x, 0) + if maxsize < nAvail: + nAvail = maxsize + if nAvail > 0: + (errCode, read) = ReadFile(x, nAvail, None) + except ValueError: + return self._close(which) + except (subprocess.pywintypes.error, Exception), why: + if why[0] in (109, errno.ESHUTDOWN): + return self._close(which) + raise + + #if self.universal_newlines: + # read = self._translate_newlines(read) + return read + + else: + def send(self, input): + if not self.stdin: + return None + + if not select.select([], [self.stdin], [], 0)[1]: + return 0 + + try: + written = os.write(self.stdin.fileno(), input) + except OSError, why: + if why[0] == errno.EPIPE: #broken pipe + return self._close('stdin') + raise + + return written + + def _recv(self, which, maxsize): + conn, maxsize = self.get_conn_maxsize(which, maxsize) + if conn is None: + return None + + try: + flags = fcntl.fcntl(conn, fcntl.F_GETFL) + except TypeError: + flags = None + else: + if not conn.closed: + fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK) + + try: + if not select.select([conn], [], [], 0)[0]: + return '' + + r = conn.read(maxsize) + if not r: + return self._close(which) + + #if self.universal_newlines: + # r = self._translate_newlines(r) + return r + finally: + if not conn.closed and not flags is None: + fcntl.fcntl(conn, fcntl.F_SETFL, flags) + +disconnect_message = "Other end disconnected!" + +def recv_some(p, t=.1, e=1, tr=5, stderr=0): + if tr < 1: + tr = 1 + x = time.time()+t + y = [] + r = '' + pr = p.recv + if stderr: + pr = p.recv_err + while time.time() < x or r: + r = pr() + if r is None: + if e: + raise Exception(disconnect_message) + else: + break + elif r: + y.append(r) + else: + time.sleep(max((x-time.time())/tr, 0)) + return ''.join(y) + +def send_all(p, data): + while len(data): + sent = p.send(data) + if sent is None: + raise Exception(disconnect_message) + data = buffer(data, sent) + + + class TestCmd: """Class TestCmd """ @@ -703,26 +949,17 @@ class TestCmd: dir = self.canonicalize(dir) os.rmdir(dir) - def run(self, program = None, - interpreter = None, - arguments = None, - chdir = None, - stdin = None, - universal_newlines = None): - """Runs a test of the program or script for the test - environment. Standard output and error output are saved for - future retrieval via the stdout() and stderr() methods. + def start(self, program = None, + interpreter = None, + arguments = None, + universal_newlines = None, + **kw): + """ + Starts a program or script for the test environment. The specified program will have the original directory - prepending unless it is enclosed in a [list]. + prepended unless it is enclosed in a [list]. """ - if chdir: - oldcwd = os.getcwd() - if not os.path.isabs(chdir): - chdir = os.path.join(self.workpath(chdir)) - if self.verbose: - sys.stderr.write("chdir(" + chdir + ")\n") - os.chdir(chdir) if program: if type(program) == type('') and not os.path.isabs(program): program = os.path.join(self._cwd, program) @@ -747,38 +984,56 @@ class TestCmd: if universal_newlines is None: universal_newlines = self.universal_newlines - try: - import subprocess - except ImportError: - try: - Popen3 = popen2.Popen3 - except AttributeError: - class Popen3: - def __init__(self, command): - (stdin, stdout, stderr) = os.popen3(' ' + command) - self.stdin = stdin - self.stdout = stdout - self.stderr = stderr - def close_output(self): - self.stdout.close() - self.resultcode = self.stderr.close() - def wait(self): - return self.resultcode - if sys.platform == 'win32' and cmd_string[0] == '"': - cmd_string = '"' + cmd_string + '"' - p = Popen3(cmd_string) - else: - p = Popen3(cmd, 1) - p.stdin = p.tochild - p.stdout = p.fromchild - p.stderr = p.childerr + combine = kw.get('combine', self.combine) + if combine: + stderr_value = subprocess.STDOUT else: - p = subprocess.Popen(cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=universal_newlines) + stderr_value = subprocess.PIPE + + return Popen(cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=stderr_value, + universal_newlines=universal_newlines) + def finish(self, popen, **kw): + """ + Finishes and waits for the process being run under control of + the specified popen argument, recording the exit status, + standard output and error output. + """ + popen.stdin.close() + self.status = popen.wait() + if not self.status: + self.status = 0 + self._stdout.append(popen.stdout.read()) + if popen.stderr: + stderr = popen.stderr.read() + else: + stderr = '' + self._stderr.append(stderr) + + def run(self, program = None, + interpreter = None, + arguments = None, + chdir = None, + stdin = None, + universal_newlines = None): + """Runs a test of the program or script for the test + environment. Standard output and error output are saved for + future retrieval via the stdout() and stderr() methods. + + The specified program will have the original directory + prepended unless it is enclosed in a [list]. + """ + if chdir: + oldcwd = os.getcwd() + if not os.path.isabs(chdir): + chdir = os.path.join(self.workpath(chdir)) + if self.verbose: + sys.stderr.write("chdir(" + chdir + ")\n") + os.chdir(chdir) + p = self.start(program, interpreter, arguments, universal_newlines) if stdin: if is_List(stdin): for line in stdin: @@ -788,23 +1043,26 @@ class TestCmd: p.stdin.close() out = p.stdout.read() - err = p.stderr.read() + if p.stderr is None: + err = '' + else: + err = p.stderr.read() try: - p.close_output() + close_output = p.close_output except AttributeError: p.stdout.close() - p.stderr.close() + if not p.stderr is None: + p.stderr.close() + else: + close_output() + + self._stdout.append(out) + self._stderr.append(err) self.status = p.wait() if not self.status: self.status = 0 - if self.combine: - self._stdout.append(out + err) - else: - self._stdout.append(out) - self._stderr.append(err) - if chdir: os.chdir(oldcwd) if self.verbose >= 2: @@ -990,18 +1248,24 @@ class TestCmd: def readable(self, top, read=1): """Make the specified directory tree readable (read == 1) or not (read == None). + + This method has no effect on Windows systems, which use a + completely different mechanism to control file readability. """ + if sys.platform == 'win32': + return + if read: def do_chmod(fname): try: st = os.stat(fname) except OSError: pass - else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0400)) + else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD)) else: def do_chmod(fname): try: st = os.stat(fname) except OSError: pass - else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0400)) + else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD)) if os.path.isfile(top): # If it's a file, that's easy, just chmod it. @@ -1040,16 +1304,29 @@ class TestCmd: or not (write == None). """ - if write: - def do_chmod(fname): - try: st = os.stat(fname) - except OSError: pass - else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200)) + if sys.platform == 'win32': + + if write: + def do_chmod(fname): + try: os.chmod(fname, stat.S_IWRITE) + except OSError: pass + else: + def do_chmod(fname): + try: os.chmod(fname, stat.S_IREAD) + except OSError: pass + else: - def do_chmod(fname): - try: st = os.stat(fname) - except OSError: pass - else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200)) + + if write: + def do_chmod(fname): + try: st = os.stat(fname) + except OSError: pass + else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200)) + else: + def do_chmod(fname): + try: st = os.stat(fname) + except OSError: pass + else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200)) if os.path.isfile(top): do_chmod(top) @@ -1061,18 +1338,24 @@ class TestCmd: def executable(self, top, execute=1): """Make the specified directory tree executable (execute == 1) or not (execute == None). + + This method has no effect on Windows systems, which use a + completely different mechanism to control file executability. """ + if sys.platform == 'win32': + return + if execute: def do_chmod(fname): try: st = os.stat(fname) except OSError: pass - else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0100)) + else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC)) else: def do_chmod(fname): try: st = os.stat(fname) except OSError: pass - else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0100)) + else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC)) if os.path.isfile(top): # If it's a file, that's easy, just chmod it. |