summaryrefslogtreecommitdiffstats
path: root/QMTest
diff options
context:
space:
mode:
Diffstat (limited to 'QMTest')
-rw-r--r--QMTest/TestCmd.py254
-rw-r--r--QMTest/TestCommon.py124
2 files changed, 166 insertions, 212 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py
index ef41916..3fca4ec 100644
--- a/QMTest/TestCmd.py
+++ b/QMTest/TestCmd.py
@@ -216,9 +216,10 @@ version.
from __future__ import division
__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
-__version__ = "0.37"
+__revision__ = "TestCmd.py 1.1.D002 2010/05/27 14:47:22 knight"
+__version__ = "1.1"
+import atexit
import errno
import os
import re
@@ -228,6 +229,7 @@ import sys
import tempfile
import time
import traceback
+
try:
from collections import UserList, UserString
except ImportError:
@@ -268,15 +270,16 @@ except ImportError:
__all__.append('simple_diff')
def is_List(e):
- return isinstance(e, (list,UserList))
+ return isinstance(e, (list, UserList))
-try: eval('unicode')
+try:
+ eval('unicode')
except NameError:
def is_String(e):
- return isinstance(e, (str,UserString))
+ return isinstance(e, (str, UserString))
else:
def is_String(e):
- return isinstance(e, (str,unicode,UserString))
+ return isinstance(e, (str, unicode, UserString))
tempfile.template = 'testcmd.'
if os.name in ('posix', 'nt'):
@@ -290,12 +293,12 @@ _Cleanup = []
def _clean():
global _Cleanup
- cleanlist = [_f for _f in _Cleanup if _f]
+ cleanlist = [ c for c in _Cleanup if c ]
del _Cleanup[:]
cleanlist.reverse()
for test in cleanlist:
test.cleanup()
-import atexit
+
atexit.register(_clean)
def _caller(tblist, skip):
@@ -412,7 +415,7 @@ def match_re(lines = None, res = None):
expr = re.compile(s)
except re.error, e:
msg = "Regular expression error in %s: %s"
- raise re.error(msg % (repr(s), e[0]))
+ raise re.error(msg % (repr(s), e.args[0]))
if not expr.search(lines[i]):
return
return 1
@@ -426,12 +429,10 @@ def match_re_dotall(lines = None, res = None):
res = "\n".join(res)
s = "^" + res + "$"
try:
- #print 'DEBUG: lines', lines
- #print 'DEBUG: dotall', s
expr = re.compile(s, re.DOTALL)
except re.error, e:
msg = "Regular expression error in %s: %s"
- raise re.error(msg % (repr(s), e[0]))
+ raise re.error(msg % (repr(s), e.args[0]))
return expr.match(lines)
try:
@@ -453,15 +454,15 @@ else:
for op, a1, a2, b1, b2 in sm.get_opcodes():
if op == 'delete':
result.append("%sd%d" % (comma(a1, a2), b1))
- result.extend(['< ' + l for l in a[a1:a2]])
+ result.extend([ '< ' + l for l in a[a1:a2] ])
elif op == 'insert':
result.append("%da%s" % (a1, comma(b1, b2)))
- result.extend(['> ' + l for l in b[b1:b2]])
+ result.extend([ '> ' + l for l in b[b1:b2] ])
elif op == 'replace':
result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
- result.extend(['< ' + l for l in a[a1:a2]])
+ result.extend([ '< ' + l for l in a[a1:a2] ])
result.append('---')
- result.extend(['> ' + l for l in b[b1:b2]])
+ result.extend([ '> ' + l for l in b[b1:b2] ])
return result
def diff_re(a, b, fromfile='', tofile='',
@@ -486,7 +487,7 @@ def diff_re(a, b, fromfile='', tofile='',
expr = re.compile(s)
except re.error, e:
msg = "Regular expression error in %s: %s"
- raise re.error(msg % (repr(s), e[0]))
+ raise re.error(msg % (repr(s), e.args[0]))
if not expr.search(bline):
result.append("%sc%s" % (i+1, i+1))
result.append('< ' + repr(a[i]))
@@ -518,7 +519,7 @@ else:
if os.name == 'java':
python = os.path.join(sys.prefix, 'jython')
else:
- python = os.environ.get("python_executable", sys.executable)
+ python = os.environ.get('python_executable', sys.executable)
_python_ = escape(python)
if sys.platform == 'win32':
@@ -574,95 +575,48 @@ except ImportError:
# The subprocess module doesn't exist in this version of Python,
# so we're going to cobble up something that looks just enough
# like its API for our purposes below.
- from types import ModuleType
- subprocess = ModuleType('subprocess')
+ import popen2
+ import types
+ subprocess = types.ModuleType('subprocess')
subprocess.PIPE = 'PIPE'
subprocess.STDOUT = 'STDOUT'
subprocess.mswindows = (sys.platform == 'win32')
- try:
- import popen2
- popen2.Popen3
- except AttributeError:
- class Popen3:
- universal_newlines = 1
- def __init__(self, command, **kw):
- if sys.platform == 'win32' and command[0] == '"':
- command = '"' + command + '"'
- (stdin, stdout, stderr) = os.popen3(' ' + command)
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- def close_output(self):
- self.stdout.close()
- self.resultcode = self.stderr.close()
- def wait(self):
- resultcode = self.resultcode
- if os.WIFEXITED(resultcode):
- return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
- else:
- return None
+ class Popen(popen2.Popen3, popen2.Popen4):
+ universal_newlines = 1
+ def __init__(self, command, **kw):
+ if kw.get('stderr') == 'STDOUT':
+ popen2.Popen4.__init__(self, command, 1)
+ else:
+ popen2.Popen3.__init__(self, command, 1)
+ self.stdin = self.tochild
+ self.stdout = self.fromchild
+ self.stderr = self.childerr
+ def communicate(self, input=None):
+ if input:
+ self.stdin.write(input)
+ self.stdin.close()
+ out = self.stdout.read()
+ if self.stderr is None:
+ err = None
+ else:
+ err = self.stderr.read()
+ self.stdout.close()
+ if self.stderr is not None:
+ self.stderr.close()
+ self.returncode = self.wait()
+ return (out, err)
+ def wait(self, *args, **kw):
+ resultcode = popen2.Popen3.wait(self, *args, **kw)
+ if os.WIFEXITED(resultcode):
+ return os.WEXITSTATUS(resultcode)
+ elif os.WIFSIGNALED(resultcode):
+ return os.WTERMSIG(resultcode)
+ else:
+ return None
- else:
- try:
- popen2.Popen4
- except AttributeError:
- # A cribbed Popen4 class, with some retrofitted code from
- # the Python 1.5 Popen3 class methods to do certain things
- # by hand.
- class Popen4(popen2.Popen3):
- childerr = None
-
- def __init__(self, cmd, bufsize=-1):
- p2cread, p2cwrite = os.pipe()
- c2pread, c2pwrite = os.pipe()
- self.pid = os.fork()
- if self.pid == 0:
- # Child
- os.dup2(p2cread, 0)
- os.dup2(c2pwrite, 1)
- os.dup2(c2pwrite, 2)
- for i in range(3, popen2.MAXFD):
- try:
- os.close(i)
- except: pass
- try:
- os.execvp(cmd[0], cmd)
- finally:
- os._exit(1)
- # Shouldn't come here, I guess
- os._exit(1)
- os.close(p2cread)
- self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
- os.close(c2pwrite)
- self.fromchild = os.fdopen(c2pread, 'r', bufsize)
- popen2._active.append(self)
-
- popen2.Popen4 = Popen4
-
- class Popen3(popen2.Popen3, popen2.Popen4):
- universal_newlines = 1
- def __init__(self, command, **kw):
- if kw.get('stderr') == 'STDOUT':
- popen2.Popen4.__init__(self, command, 1)
- else:
- popen2.Popen3.__init__(self, command, 1)
- self.stdin = self.tochild
- self.stdout = self.fromchild
- self.stderr = self.childerr
- def wait(self, *args, **kw):
- resultcode = popen2.Popen3.wait(self, *args, **kw)
- if os.WIFEXITED(resultcode):
- return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
- else:
- return None
-
- subprocess.Popen = Popen3
+ subprocess.Popen = Popen
@@ -718,7 +672,7 @@ class Popen(subprocess.Popen):
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception), why:
- if why[0] in (109, errno.ESHUTDOWN):
+ if why.args[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
@@ -739,7 +693,7 @@ class Popen(subprocess.Popen):
except ValueError:
return self._close(which)
except (subprocess.pywintypes.error, Exception), why:
- if why[0] in (109, errno.ESHUTDOWN):
+ if why.args[0] in (109, errno.ESHUTDOWN):
return self._close(which)
raise
@@ -825,14 +779,6 @@ def send_all(p, data):
-try:
- object
-except NameError:
- class object:
- pass
-
-
-
class TestCmd(object):
"""Class TestCmd
"""
@@ -977,11 +923,11 @@ class TestCmd(object):
program = self.program
if not interpreter:
interpreter = self.interpreter
- if not type(program) in [list, tuple]:
+ if not isinstance(program, (list, tuple)):
program = [program]
cmd = list(program)
if interpreter:
- if not type(interpreter) in [list, tuple]:
+ if not isinstance(interpreter, (list, tuple)):
interpreter = [interpreter]
cmd = list(interpreter) + cmd
if arguments:
@@ -1123,10 +1069,8 @@ class TestCmd(object):
prepended unless it is enclosed in a [list].
"""
cmd = self.command_args(program, interpreter, arguments)
- #print 'DEBUG: SCONSFLAGS:', os.environ.get('SCONSFLAGS')
- #print 'DEBUG: command line:', ' '.join(map(self.escape, cmd))
if self.verbose:
- cmd_string = ' '.join(map(self.escape, cmd))
+ cmd_string = ' '.join([ self.escape(c) for c in cmd ])
sys.stderr.write(cmd_string + "\n")
if universal_newlines is None:
universal_newlines = self.universal_newlines
@@ -1157,16 +1101,10 @@ class TestCmd(object):
the specified popen argument, recording the exit status,
standard output and error output.
"""
- popen.stdin.close()
- self.status = popen.wait()
- if not self.status:
- self.status = 0
- self._stdout.append(popen.stdout.read())
- if popen.stderr:
- stderr = popen.stderr.read()
- else:
- stderr = ''
- self._stderr.append(stderr)
+ stdout, stderr = popen.communicate()
+ self.status = popen.returncode
+ self._stdout.append(stdout or '')
+ self._stderr.append(stderr or '')
def run(self, program = None,
interpreter = None,
@@ -1193,34 +1131,12 @@ class TestCmd(object):
arguments,
universal_newlines,
stdin=stdin)
- if stdin:
- if is_List(stdin):
- for line in stdin:
- p.stdin.write(line)
- else:
- p.stdin.write(stdin)
- p.stdin.close()
-
- out = p.stdout.read()
- if p.stderr is None:
- err = ''
- else:
- err = p.stderr.read()
- try:
- close_output = p.close_output
- except AttributeError:
- p.stdout.close()
- if not p.stderr is None:
- p.stderr.close()
- else:
- close_output()
-
- self._stdout.append(out)
- self._stderr.append(err)
-
- self.status = p.wait()
- if not self.status:
- self.status = 0
+ if is_List(stdin):
+ stdin = ''.join(stdin)
+ stdout, stderr = p.communicate(input=stdin)
+ self.status = p.returncode
+ self._stdout.append(stdout or '')
+ self._stderr.append(stderr or '')
if chdir:
os.chdir(oldcwd)
@@ -1434,10 +1350,10 @@ class TestCmd(object):
# permission, so it's also pretty easy, just chmod the
# directory and then chmod every entry on our walk down the
# tree.
+ do_chmod(top)
for dirpath, dirnames, filenames in os.walk(top):
- do_chmod(dirpath)
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
else:
# It's a directory and we're trying to turn off read
# permission, which means we have to chmod the directories
@@ -1445,9 +1361,9 @@ class TestCmd(object):
# the top down get in the way of being able to get at lower
# parts of the tree.
for dirpath, dirnames, filenames in os.walk(top, topdown=0):
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
- do_chmod(dirpath)
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+ do_chmod(top)
def writable(self, top, write=1):
"""Make the specified directory tree writable (write == 1)
@@ -1481,10 +1397,10 @@ class TestCmd(object):
if os.path.isfile(top):
do_chmod(top)
else:
+ do_chmod(top)
for dirpath, dirnames, filenames in os.walk(top, topdown=0):
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
- do_chmod(dirpath)
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
def executable(self, top, execute=1):
"""Make the specified directory tree executable (execute == 1)
@@ -1516,10 +1432,10 @@ class TestCmd(object):
# permission, so it's also pretty easy, just chmod the
# directory and then chmod every entry on our walk down the
# tree.
+ do_chmod(top)
for dirpath, dirnames, filenames in os.walk(top):
- do_chmod(dirpath)
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
else:
# It's a directory and we're trying to turn off execute
# permission, which means we have to chmod the directories
@@ -1527,9 +1443,9 @@ class TestCmd(object):
# the top down get in the way of being able to get at lower
# parts of the tree.
for dirpath, dirnames, filenames in os.walk(top, topdown=0):
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
- do_chmod(dirpath)
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+ do_chmod(top)
def write(self, file, content, mode = 'wb'):
"""Writes the specified content text (second argument) to the
diff --git a/QMTest/TestCommon.py b/QMTest/TestCommon.py
index 1120b55..3e41d51 100644
--- a/QMTest/TestCommon.py
+++ b/QMTest/TestCommon.py
@@ -40,7 +40,7 @@ provided by the TestCommon class:
test.must_contain_any_line(output, lines, ['title', find])
- test.exactly_contain_all_lines(output, lines, ['title', find])
+ test.must_contain_exactly_lines(output, lines, ['title', find])
test.must_exist('file1', ['file2', ...])
@@ -92,13 +92,14 @@ The TestCommon module also provides the following variables
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCommon.py 0.37.D001 2010/01/11 16:55:50 knight"
-__version__ = "0.37"
+__revision__ = "TestCommon.py 1.1.D001 2010/05/27 14:16:37 knight"
+__version__ = "1.1"
import copy
import os
import stat
import sys
+
try:
from collections import UserList
except ImportError:
@@ -119,6 +120,31 @@ __all__.extend([ 'TestCommon',
'dll_suffix',
])
+try:
+ sorted
+except NameError:
+ # Pre-2.4 Python has no sorted() function.
+ #
+ # The pre-2.4 Python list.sort() method does not support
+ # list.sort(key=) nor list.sort(reverse=) keyword arguments, so
+ # we must implement the functionality of those keyword arguments
+ # by hand instead of passing them to list.sort().
+ def sorted(iterable, cmp=None, key=None, reverse=False):
+ if key is not None:
+ result = [(key(x), x) for x in iterable]
+ else:
+ result = iterable[:]
+ if cmp is None:
+ # Pre-2.3 Python does not support list.sort(None).
+ result.sort()
+ else:
+ result.sort(cmp)
+ if key is not None:
+ result = [t1 for t0,t1 in result]
+ if reverse:
+ result.reverse()
+ return result
+
# Variables that describe the prefixes and suffixes on this system.
if sys.platform == 'win32':
exe_suffix = '.exe'
@@ -176,7 +202,7 @@ else:
dll_suffix = '.so'
def is_List(e):
- return isinstance(e, (list,UserList))
+ return isinstance(e, (list, UserList))
def is_writable(f):
mode = os.stat(f)[stat.ST_MODE]
@@ -206,22 +232,6 @@ elif os.name == 'nt':
def _status(self):
return self.status
-def _options_arguments(options, arguments):
- """
- This handles the "options" keyword argument and merges it
- with the arguments.
- """
- if options:
- if arguments is None:
- arguments = options
- else:
- if isinstance(options, str):
- options = [options]
- if isinstance(arguments, str):
- arguments = [arguments]
- arguments = ' '.join(options + arguments)
- return arguments
-
class TestCommon(TestCmd):
# Additional methods from the Perl Test::Cmd::Common module
@@ -239,6 +249,18 @@ class TestCommon(TestCmd):
TestCmd.__init__(self, **kw)
os.chdir(self.workdir)
+ def options_arguments(self, options, arguments):
+ """Merges the "options" keyword argument with the arguments."""
+ if options:
+ if arguments is None:
+ return options
+ if isinstance(options, str):
+ options = [options]
+ if isinstance(arguments, str):
+ arguments = [arguments]
+ arguments = ' '.join(options + arguments)
+ return arguments
+
def must_be_writable(self, *files):
"""Ensures that the specified file(s) exist and are writable.
An individual file can be specified as a list of directory names,
@@ -280,10 +302,14 @@ class TestCommon(TestCmd):
for lines in the output.
"""
if find is None:
- find = lambda o, l: o.find(l) != -1
+ def find(o, l):
+ try:
+ return o.index(l)
+ except ValueError:
+ return None
missing = []
for line in lines:
- if not find(output, line):
+ if find(output, line) is None:
missing.append(line)
if missing:
@@ -292,7 +318,7 @@ class TestCommon(TestCmd):
sys.stdout.write("Missing expected lines from %s:\n" % title)
for line in missing:
sys.stdout.write(' ' + repr(line) + '\n')
- sys.stdout.write(self.banner(title + ' '))
+ sys.stdout.write(self.banner(title + ' ') + '\n')
sys.stdout.write(output)
self.fail_test()
@@ -308,9 +334,13 @@ class TestCommon(TestCmd):
for lines in the output.
"""
if find is None:
- find = lambda o, l: o.find(l) != -1
+ def find(o, l):
+ try:
+ return o.index(l)
+ except ValueError:
+ return None
for line in lines:
- if find(output, line):
+ if find(output, line) is not None:
return
if title is None:
@@ -318,11 +348,11 @@ class TestCommon(TestCmd):
sys.stdout.write("Missing any expected line from %s:\n" % title)
for line in lines:
sys.stdout.write(' ' + repr(line) + '\n')
- sys.stdout.write(self.banner(title + ' '))
+ sys.stdout.write(self.banner(title + ' ') + '\n')
sys.stdout.write(output)
self.fail_test()
- def exactly_contain_all_lines(self, output, expect, title=None, find=None):
+ def must_contain_exactly_lines(self, output, expect, title=None, find=None):
"""Ensures that the specified output string (first argument)
contains all of the lines in the expected string (second argument)
with none left over.
@@ -332,10 +362,14 @@ class TestCommon(TestCmd):
An optional fourth argument can be used to supply a different
function, of the form "find(line, output), to use when searching
- for lines in the output.
+ for lines in the output. The function must return the index
+ of the found line in the output, or None if the line is not found.
"""
out = output.splitlines()
- exp = expect.splitlines()
+ if is_List(expect):
+ exp = [ e.rstrip('\n') for e in expect ]
+ else:
+ exp = expect.splitlines()
if sorted(out) == sorted(exp):
# early out for exact match
return
@@ -343,7 +377,7 @@ class TestCommon(TestCmd):
def find(o, l):
try:
return o.index(l)
- except:
+ except ValueError:
return None
missing = []
for line in exp:
@@ -363,12 +397,12 @@ class TestCommon(TestCmd):
sys.stdout.write("Missing expected lines from %s:\n" % title)
for line in missing:
sys.stdout.write(' ' + repr(line) + '\n')
- sys.stdout.write(self.banner('missing %s ' % title))
+ sys.stdout.write(self.banner('Missing %s ' % title) + '\n')
if out:
sys.stdout.write("Extra unexpected lines from %s:\n" % title)
for line in out:
sys.stdout.write(' ' + repr(line) + '\n')
- sys.stdout.write(self.banner('extra %s ' % title))
+ sys.stdout.write(self.banner('Extra %s ' % title) + '\n')
sys.stdout.flush()
self.fail_test()
@@ -429,10 +463,14 @@ class TestCommon(TestCmd):
for lines in the output.
"""
if find is None:
- find = lambda o, l: o.find(l) != -1
+ def find(o, l):
+ try:
+ return o.index(l)
+ except ValueError:
+ return None
unexpected = []
for line in lines:
- if find(output, line):
+ if find(output, line) is not None:
unexpected.append(line)
if unexpected:
@@ -441,7 +479,7 @@ class TestCommon(TestCmd):
sys.stdout.write("Unexpected lines in %s:\n" % title)
for line in unexpected:
sys.stdout.write(' ' + repr(line) + '\n')
- sys.stdout.write(self.banner(title + ' '))
+ sys.stdout.write(self.banner(title + ' ') + '\n')
sys.stdout.write(output)
self.fail_test()
@@ -487,21 +525,21 @@ class TestCommon(TestCmd):
expect = ''
if status != 0:
expect = " (expected %s)" % str(status)
- print "%s returned %s%s" % (self.program, str(_status(self)), expect)
+ print "%s returned %s%s" % (self.program, _status(self), expect)
print self.banner('STDOUT ')
print actual_stdout
print self.banner('STDERR ')
print actual_stderr
self.fail_test()
- if expected_stdout is not None \
- and not match(actual_stdout, expected_stdout):
+ if (expected_stdout is not None
+ and not match(actual_stdout, expected_stdout)):
self.diff(expected_stdout, actual_stdout, 'STDOUT ')
if actual_stderr:
print self.banner('STDERR ')
print actual_stderr
self.fail_test()
- if expected_stderr is not None \
- and not match(actual_stderr, expected_stderr):
+ if (expected_stderr is not None
+ and not match(actual_stderr, expected_stderr)):
print self.banner('STDOUT ')
print actual_stdout
self.diff(expected_stderr, actual_stderr, 'STDERR ')
@@ -517,10 +555,10 @@ class TestCommon(TestCmd):
Starts a program or script for the test environment, handling
any exceptions.
"""
- arguments = _options_arguments(options, arguments)
+ arguments = self.options_arguments(options, arguments)
try:
return TestCmd.start(self, program, interpreter, arguments,
- universal_newlines, **kw)
+ universal_newlines, **kw)
except KeyboardInterrupt:
raise
except Exception, e:
@@ -587,7 +625,7 @@ class TestCommon(TestCmd):
not test standard output (stdout = None), and expects that error
output is empty (stderr = "").
"""
- kw['arguments'] = _options_arguments(options, arguments)
+ kw['arguments'] = self.options_arguments(options, arguments)
try:
match = kw['match']
del kw['match']