diff options
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r-- | QMTest/TestCmd.py | 254 |
1 files changed, 85 insertions, 169 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py index ef41916..3fca4ec 100644 --- a/QMTest/TestCmd.py +++ b/QMTest/TestCmd.py @@ -216,9 +216,10 @@ version. from __future__ import division __author__ = "Steven Knight <knight at baldmt dot com>" -__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight" -__version__ = "0.37" +__revision__ = "TestCmd.py 1.1.D002 2010/05/27 14:47:22 knight" +__version__ = "1.1" +import atexit import errno import os import re @@ -228,6 +229,7 @@ import sys import tempfile import time import traceback + try: from collections import UserList, UserString except ImportError: @@ -268,15 +270,16 @@ except ImportError: __all__.append('simple_diff') def is_List(e): - return isinstance(e, (list,UserList)) + return isinstance(e, (list, UserList)) -try: eval('unicode') +try: + eval('unicode') except NameError: def is_String(e): - return isinstance(e, (str,UserString)) + return isinstance(e, (str, UserString)) else: def is_String(e): - return isinstance(e, (str,unicode,UserString)) + return isinstance(e, (str, unicode, UserString)) tempfile.template = 'testcmd.' if os.name in ('posix', 'nt'): @@ -290,12 +293,12 @@ _Cleanup = [] def _clean(): global _Cleanup - cleanlist = [_f for _f in _Cleanup if _f] + cleanlist = [ c for c in _Cleanup if c ] del _Cleanup[:] cleanlist.reverse() for test in cleanlist: test.cleanup() -import atexit + atexit.register(_clean) def _caller(tblist, skip): @@ -412,7 +415,7 @@ def match_re(lines = None, res = None): expr = re.compile(s) except re.error, e: msg = "Regular expression error in %s: %s" - raise re.error(msg % (repr(s), e[0])) + raise re.error(msg % (repr(s), e.args[0])) if not expr.search(lines[i]): return return 1 @@ -426,12 +429,10 @@ def match_re_dotall(lines = None, res = None): res = "\n".join(res) s = "^" + res + "$" try: - #print 'DEBUG: lines', lines - #print 'DEBUG: dotall', s expr = re.compile(s, re.DOTALL) except re.error, e: msg = "Regular expression error in %s: %s" - raise re.error(msg % (repr(s), e[0])) + raise re.error(msg % (repr(s), e.args[0])) return expr.match(lines) try: @@ -453,15 +454,15 @@ else: for op, a1, a2, b1, b2 in sm.get_opcodes(): if op == 'delete': result.append("%sd%d" % (comma(a1, a2), b1)) - result.extend(['< ' + l for l in a[a1:a2]]) + result.extend([ '< ' + l for l in a[a1:a2] ]) elif op == 'insert': result.append("%da%s" % (a1, comma(b1, b2))) - result.extend(['> ' + l for l in b[b1:b2]]) + result.extend([ '> ' + l for l in b[b1:b2] ]) elif op == 'replace': result.append("%sc%s" % (comma(a1, a2), comma(b1, b2))) - result.extend(['< ' + l for l in a[a1:a2]]) + result.extend([ '< ' + l for l in a[a1:a2] ]) result.append('---') - result.extend(['> ' + l for l in b[b1:b2]]) + result.extend([ '> ' + l for l in b[b1:b2] ]) return result def diff_re(a, b, fromfile='', tofile='', @@ -486,7 +487,7 @@ def diff_re(a, b, fromfile='', tofile='', expr = re.compile(s) except re.error, e: msg = "Regular expression error in %s: %s" - raise re.error(msg % (repr(s), e[0])) + raise re.error(msg % (repr(s), e.args[0])) if not expr.search(bline): result.append("%sc%s" % (i+1, i+1)) result.append('< ' + repr(a[i])) @@ -518,7 +519,7 @@ else: if os.name == 'java': python = os.path.join(sys.prefix, 'jython') else: - python = os.environ.get("python_executable", sys.executable) + python = os.environ.get('python_executable', sys.executable) _python_ = escape(python) if sys.platform == 'win32': @@ -574,95 +575,48 @@ except ImportError: # The subprocess module doesn't exist in this version of Python, # so we're going to cobble up something that looks just enough # like its API for our purposes below. - from types import ModuleType - subprocess = ModuleType('subprocess') + import popen2 + import types + subprocess = types.ModuleType('subprocess') subprocess.PIPE = 'PIPE' subprocess.STDOUT = 'STDOUT' subprocess.mswindows = (sys.platform == 'win32') - try: - import popen2 - popen2.Popen3 - except AttributeError: - class Popen3: - universal_newlines = 1 - def __init__(self, command, **kw): - if sys.platform == 'win32' and command[0] == '"': - command = '"' + command + '"' - (stdin, stdout, stderr) = os.popen3(' ' + command) - self.stdin = stdin - self.stdout = stdout - self.stderr = stderr - def close_output(self): - self.stdout.close() - self.resultcode = self.stderr.close() - def wait(self): - resultcode = self.resultcode - if os.WIFEXITED(resultcode): - return os.WEXITSTATUS(resultcode) - elif os.WIFSIGNALED(resultcode): - return os.WTERMSIG(resultcode) - else: - return None + class Popen(popen2.Popen3, popen2.Popen4): + universal_newlines = 1 + def __init__(self, command, **kw): + if kw.get('stderr') == 'STDOUT': + popen2.Popen4.__init__(self, command, 1) + else: + popen2.Popen3.__init__(self, command, 1) + self.stdin = self.tochild + self.stdout = self.fromchild + self.stderr = self.childerr + def communicate(self, input=None): + if input: + self.stdin.write(input) + self.stdin.close() + out = self.stdout.read() + if self.stderr is None: + err = None + else: + err = self.stderr.read() + self.stdout.close() + if self.stderr is not None: + self.stderr.close() + self.returncode = self.wait() + return (out, err) + def wait(self, *args, **kw): + resultcode = popen2.Popen3.wait(self, *args, **kw) + if os.WIFEXITED(resultcode): + return os.WEXITSTATUS(resultcode) + elif os.WIFSIGNALED(resultcode): + return os.WTERMSIG(resultcode) + else: + return None - else: - try: - popen2.Popen4 - except AttributeError: - # A cribbed Popen4 class, with some retrofitted code from - # the Python 1.5 Popen3 class methods to do certain things - # by hand. - class Popen4(popen2.Popen3): - childerr = None - - def __init__(self, cmd, bufsize=-1): - p2cread, p2cwrite = os.pipe() - c2pread, c2pwrite = os.pipe() - self.pid = os.fork() - if self.pid == 0: - # Child - os.dup2(p2cread, 0) - os.dup2(c2pwrite, 1) - os.dup2(c2pwrite, 2) - for i in range(3, popen2.MAXFD): - try: - os.close(i) - except: pass - try: - os.execvp(cmd[0], cmd) - finally: - os._exit(1) - # Shouldn't come here, I guess - os._exit(1) - os.close(p2cread) - self.tochild = os.fdopen(p2cwrite, 'w', bufsize) - os.close(c2pwrite) - self.fromchild = os.fdopen(c2pread, 'r', bufsize) - popen2._active.append(self) - - popen2.Popen4 = Popen4 - - class Popen3(popen2.Popen3, popen2.Popen4): - universal_newlines = 1 - def __init__(self, command, **kw): - if kw.get('stderr') == 'STDOUT': - popen2.Popen4.__init__(self, command, 1) - else: - popen2.Popen3.__init__(self, command, 1) - self.stdin = self.tochild - self.stdout = self.fromchild - self.stderr = self.childerr - def wait(self, *args, **kw): - resultcode = popen2.Popen3.wait(self, *args, **kw) - if os.WIFEXITED(resultcode): - return os.WEXITSTATUS(resultcode) - elif os.WIFSIGNALED(resultcode): - return os.WTERMSIG(resultcode) - else: - return None - - subprocess.Popen = Popen3 + subprocess.Popen = Popen @@ -718,7 +672,7 @@ class Popen(subprocess.Popen): except ValueError: return self._close('stdin') except (subprocess.pywintypes.error, Exception), why: - if why[0] in (109, errno.ESHUTDOWN): + if why.args[0] in (109, errno.ESHUTDOWN): return self._close('stdin') raise @@ -739,7 +693,7 @@ class Popen(subprocess.Popen): except ValueError: return self._close(which) except (subprocess.pywintypes.error, Exception), why: - if why[0] in (109, errno.ESHUTDOWN): + if why.args[0] in (109, errno.ESHUTDOWN): return self._close(which) raise @@ -825,14 +779,6 @@ def send_all(p, data): -try: - object -except NameError: - class object: - pass - - - class TestCmd(object): """Class TestCmd """ @@ -977,11 +923,11 @@ class TestCmd(object): program = self.program if not interpreter: interpreter = self.interpreter - if not type(program) in [list, tuple]: + if not isinstance(program, (list, tuple)): program = [program] cmd = list(program) if interpreter: - if not type(interpreter) in [list, tuple]: + if not isinstance(interpreter, (list, tuple)): interpreter = [interpreter] cmd = list(interpreter) + cmd if arguments: @@ -1123,10 +1069,8 @@ class TestCmd(object): prepended unless it is enclosed in a [list]. """ cmd = self.command_args(program, interpreter, arguments) - #print 'DEBUG: SCONSFLAGS:', os.environ.get('SCONSFLAGS') - #print 'DEBUG: command line:', ' '.join(map(self.escape, cmd)) if self.verbose: - cmd_string = ' '.join(map(self.escape, cmd)) + cmd_string = ' '.join([ self.escape(c) for c in cmd ]) sys.stderr.write(cmd_string + "\n") if universal_newlines is None: universal_newlines = self.universal_newlines @@ -1157,16 +1101,10 @@ class TestCmd(object): the specified popen argument, recording the exit status, standard output and error output. """ - popen.stdin.close() - self.status = popen.wait() - if not self.status: - self.status = 0 - self._stdout.append(popen.stdout.read()) - if popen.stderr: - stderr = popen.stderr.read() - else: - stderr = '' - self._stderr.append(stderr) + stdout, stderr = popen.communicate() + self.status = popen.returncode + self._stdout.append(stdout or '') + self._stderr.append(stderr or '') def run(self, program = None, interpreter = None, @@ -1193,34 +1131,12 @@ class TestCmd(object): arguments, universal_newlines, stdin=stdin) - if stdin: - if is_List(stdin): - for line in stdin: - p.stdin.write(line) - else: - p.stdin.write(stdin) - p.stdin.close() - - out = p.stdout.read() - if p.stderr is None: - err = '' - else: - err = p.stderr.read() - try: - close_output = p.close_output - except AttributeError: - p.stdout.close() - if not p.stderr is None: - p.stderr.close() - else: - close_output() - - self._stdout.append(out) - self._stderr.append(err) - - self.status = p.wait() - if not self.status: - self.status = 0 + if is_List(stdin): + stdin = ''.join(stdin) + stdout, stderr = p.communicate(input=stdin) + self.status = p.returncode + self._stdout.append(stdout or '') + self._stderr.append(stderr or '') if chdir: os.chdir(oldcwd) @@ -1434,10 +1350,10 @@ class TestCmd(object): # permission, so it's also pretty easy, just chmod the # directory and then chmod every entry on our walk down the # tree. + do_chmod(top) for dirpath, dirnames, filenames in os.walk(top): - do_chmod(dirpath) - for fn in filenames: - do_chmod(os.path.join(dirpath, fn)) + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) else: # It's a directory and we're trying to turn off read # permission, which means we have to chmod the directories @@ -1445,9 +1361,9 @@ class TestCmd(object): # the top down get in the way of being able to get at lower # parts of the tree. for dirpath, dirnames, filenames in os.walk(top, topdown=0): - for fn in filenames: - do_chmod(os.path.join(dirpath, fn)) - do_chmod(dirpath) + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) + do_chmod(top) def writable(self, top, write=1): """Make the specified directory tree writable (write == 1) @@ -1481,10 +1397,10 @@ class TestCmd(object): if os.path.isfile(top): do_chmod(top) else: + do_chmod(top) for dirpath, dirnames, filenames in os.walk(top, topdown=0): - for fn in filenames: - do_chmod(os.path.join(dirpath, fn)) - do_chmod(dirpath) + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) def executable(self, top, execute=1): """Make the specified directory tree executable (execute == 1) @@ -1516,10 +1432,10 @@ class TestCmd(object): # permission, so it's also pretty easy, just chmod the # directory and then chmod every entry on our walk down the # tree. + do_chmod(top) for dirpath, dirnames, filenames in os.walk(top): - do_chmod(dirpath) - for fn in filenames: - do_chmod(os.path.join(dirpath, fn)) + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) else: # It's a directory and we're trying to turn off execute # permission, which means we have to chmod the directories @@ -1527,9 +1443,9 @@ class TestCmd(object): # the top down get in the way of being able to get at lower # parts of the tree. for dirpath, dirnames, filenames in os.walk(top, topdown=0): - for fn in filenames: - do_chmod(os.path.join(dirpath, fn)) - do_chmod(dirpath) + for name in dirnames + filenames: + do_chmod(os.path.join(dirpath, name)) + do_chmod(top) def write(self, file, content, mode = 'wb'): """Writes the specified content text (second argument) to the |