summaryrefslogtreecommitdiffstats
path: root/QMTest
diff options
context:
space:
mode:
Diffstat (limited to 'QMTest')
-rw-r--r--QMTest/.aeignore5
-rw-r--r--QMTest/SConscript57
-rw-r--r--QMTest/TestCmd.py1030
-rw-r--r--QMTest/TestCommon.py429
-rw-r--r--QMTest/TestRuntest.py137
-rw-r--r--QMTest/TestSCons.py538
-rw-r--r--QMTest/classes.qmc11
-rw-r--r--QMTest/configuration6
-rw-r--r--QMTest/scons_tdb.py520
-rw-r--r--QMTest/unittest.py693
10 files changed, 3426 insertions, 0 deletions
diff --git a/QMTest/.aeignore b/QMTest/.aeignore
new file mode 100644
index 0000000..22ebd62
--- /dev/null
+++ b/QMTest/.aeignore
@@ -0,0 +1,5 @@
+*,D
+*.pyc
+.*.swp
+.consign
+.sconsign
diff --git a/QMTest/SConscript b/QMTest/SConscript
new file mode 100644
index 0000000..e016dc4
--- /dev/null
+++ b/QMTest/SConscript
@@ -0,0 +1,57 @@
+#
+# SConscript file for external packages we need.
+#
+
+#
+# __COPYRIGHT__
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
+# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
+# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+#
+
+import os.path
+
+Import('env')
+
+files = [
+ 'classes.qmc',
+ 'configuration',
+ 'scons_tdb.py',
+ 'TestCmd.py',
+ 'TestCommon.py',
+ 'TestRuntest.py',
+ 'TestSCons.py',
+ 'unittest.py',
+]
+
+def copy(target, source, env):
+ t = str(target[0])
+ s = str(source[0])
+ open(t, 'wb').write(open(s, 'rb').read())
+
+for file in files:
+ # Guarantee that real copies of these files always exist in
+ # build/QMTest. If there's a symlink there, then this is an Aegis
+ # build and we blow them away now so that they'll get "built" later.
+ p = os.path.join('build', 'QMTest', file)
+ if os.path.islink(p):
+ os.unlink(p)
+ sp = '#' + p
+ env.Command(sp, file, copy)
+ Local(sp)
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py
new file mode 100644
index 0000000..9b3e7a2
--- /dev/null
+++ b/QMTest/TestCmd.py
@@ -0,0 +1,1030 @@
+"""
+TestCmd.py: a testing framework for commands and scripts.
+
+The TestCmd module provides a framework for portable automated testing
+of executable commands and scripts (in any language, not just Python),
+especially commands and scripts that require file system interaction.
+
+In addition to running tests and evaluating conditions, the TestCmd
+module manages and cleans up one or more temporary workspace
+directories, and provides methods for creating files and directories in
+those workspace directories from in-line data, here-documents), allowing
+tests to be completely self-contained.
+
+A TestCmd environment object is created via the usual invocation:
+
+ import TestCmd
+ test = TestCmd.TestCmd()
+
+There are a bunch of keyword arguments that you can use at instantiation
+time:
+
+ test = TestCmd.TestCmd(description = 'string',
+ program = 'program_or_script_to_test',
+ interpreter = 'script_interpreter',
+ workdir = 'prefix',
+ subdir = 'subdir',
+ verbose = Boolean,
+ match = default_match_function,
+ combine = Boolean)
+
+There are a bunch of methods that let you do a bunch of different
+things. Here is an overview of them:
+
+ test.verbose_set(1)
+
+ test.description_set('string')
+
+ test.program_set('program_or_script_to_test')
+
+ test.interpreter_set('script_interpreter')
+ test.interpreter_set(['script_interpreter', 'arg'])
+
+ test.workdir_set('prefix')
+ test.workdir_set('')
+
+ test.workpath('file')
+ test.workpath('subdir', 'file')
+
+ test.subdir('subdir', ...)
+
+ test.write('file', "contents\n")
+ test.write(['subdir', 'file'], "contents\n")
+
+ test.read('file')
+ test.read(['subdir', 'file'])
+ test.read('file', mode)
+ test.read(['subdir', 'file'], mode)
+
+ test.writable('dir', 1)
+ test.writable('dir', None)
+
+ test.preserve(condition, ...)
+
+ test.cleanup(condition)
+
+ test.run(program = 'program_or_script_to_run',
+ interpreter = 'script_interpreter',
+ arguments = 'arguments to pass to program',
+ chdir = 'directory_to_chdir_to',
+ stdin = 'input to feed to the program\n')
+
+ test.pass_test()
+ test.pass_test(condition)
+ test.pass_test(condition, function)
+
+ test.fail_test()
+ test.fail_test(condition)
+ test.fail_test(condition, function)
+ test.fail_test(condition, function, skip)
+
+ test.no_result()
+ test.no_result(condition)
+ test.no_result(condition, function)
+ test.no_result(condition, function, skip)
+
+ test.stdout()
+ test.stdout(run)
+
+ test.stderr()
+ test.stderr(run)
+
+ test.symlink(target, link)
+
+ test.match(actual, expected)
+
+ test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
+ test.match_exact(["actual 1\n", "actual 2\n"],
+ ["expected 1\n", "expected 2\n"])
+
+ test.match_re("actual 1\nactual 2\n", regex_string)
+ test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
+
+ test.match_re_dotall("actual 1\nactual 2\n", regex_string)
+ test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
+
+ test.sleep()
+ test.sleep(seconds)
+
+ test.where_is('foo')
+ test.where_is('foo', 'PATH1:PATH2')
+ test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
+
+ test.unlink('file')
+ test.unlink('subdir', 'file')
+
+The TestCmd module provides pass_test(), fail_test(), and no_result()
+unbound functions that report test results for use with the Aegis change
+management system. These methods terminate the test immediately,
+reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
+status 0 (success), 1 or 2 respectively. This allows for a distinction
+between an actual failed test and a test that could not be properly
+evaluated because of an external condition (such as a full file system
+or incorrect permissions).
+
+ import TestCmd
+
+ TestCmd.pass_test()
+ TestCmd.pass_test(condition)
+ TestCmd.pass_test(condition, function)
+
+ TestCmd.fail_test()
+ TestCmd.fail_test(condition)
+ TestCmd.fail_test(condition, function)
+ TestCmd.fail_test(condition, function, skip)
+
+ TestCmd.no_result()
+ TestCmd.no_result(condition)
+ TestCmd.no_result(condition, function)
+ TestCmd.no_result(condition, function, skip)
+
+The TestCmd module also provides unbound functions that handle matching
+in the same way as the match_*() methods described above.
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = TestCmd.match_exact)
+
+ test = TestCmd.TestCmd(match = TestCmd.match_re)
+
+ test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
+
+Lastly, the where_is() method also exists in an unbound function
+version.
+
+ import TestCmd
+
+ TestCmd.where_is('foo')
+ TestCmd.where_is('foo', 'PATH1:PATH2')
+ TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
+"""
+
+# Copyright 2000, 2001, 2002, 2003, 2004 Steven Knight
+# This module is free software, and you may redistribute it and/or modify
+# it under the same terms as Python itself, so long as this copyright message
+# and disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
+# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
+# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+#
+# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
+# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+
+__author__ = "Steven Knight <knight at baldmt dot com>"
+__revision__ = "TestCmd.py 0.22.D001 2006/02/26 15:45:18 knight"
+__version__ = "0.22"
+
+import os
+import os.path
+import popen2
+import re
+import shutil
+import stat
+import string
+import sys
+import tempfile
+import time
+import traceback
+import types
+import UserList
+
+__all__ = [ 'fail_test', 'no_result', 'pass_test',
+ 'match_exact', 'match_re', 'match_re_dotall',
+ 'python_executable', 'TestCmd' ]
+
+def is_List(e):
+ return type(e) is types.ListType \
+ or isinstance(e, UserList.UserList)
+
+try:
+ from UserString import UserString
+except ImportError:
+ class UserString:
+ pass
+
+if hasattr(types, 'UnicodeType'):
+ def is_String(e):
+ return type(e) is types.StringType \
+ or type(e) is types.UnicodeType \
+ or isinstance(e, UserString)
+else:
+ def is_String(e):
+ return type(e) is types.StringType or isinstance(e, UserString)
+
+tempfile.template = 'testcmd.'
+
+re_space = re.compile('\s')
+
+_Cleanup = []
+
+def _clean():
+ global _Cleanup
+ cleanlist = filter(None, _Cleanup)
+ del _Cleanup[:]
+ cleanlist.reverse()
+ for test in cleanlist:
+ test.cleanup()
+
+sys.exitfunc = _clean
+
+class Collector:
+ def __init__(self, top):
+ self.entries = [top]
+ def __call__(self, arg, dirname, names):
+ pathjoin = lambda n, d=dirname: os.path.join(d, n)
+ self.entries.extend(map(pathjoin, names))
+
+def _caller(tblist, skip):
+ string = ""
+ arr = []
+ for file, line, name, text in tblist:
+ if file[-10:] == "TestCmd.py":
+ break
+ arr = [(file, line, name, text)] + arr
+ atfrom = "at"
+ for file, line, name, text in arr[skip:]:
+ if name == "?":
+ name = ""
+ else:
+ name = " (" + name + ")"
+ string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
+ atfrom = "\tfrom"
+ return string
+
+def fail_test(self = None, condition = 1, function = None, skip = 0):
+ """Cause the test to fail.
+
+ By default, the fail_test() method reports that the test FAILED
+ and exits with a status of 1. If a condition argument is supplied,
+ the test fails only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ of = ""
+ desc = ""
+ sep = " "
+ if not self is None:
+ if self.program:
+ of = " of " + self.program
+ sep = "\n\t"
+ if self.description:
+ desc = " [" + self.description + "]"
+ sep = "\n\t"
+
+ at = _caller(traceback.extract_stack(), skip)
+ sys.stderr.write("FAILED test" + of + desc + sep + at)
+
+ sys.exit(1)
+
+def no_result(self = None, condition = 1, function = None, skip = 0):
+ """Causes a test to exit with no valid result.
+
+ By default, the no_result() method reports NO RESULT for the test
+ and exits with a status of 2. If a condition argument is supplied,
+ the test fails only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ of = ""
+ desc = ""
+ sep = " "
+ if not self is None:
+ if self.program:
+ of = " of " + self.program
+ sep = "\n\t"
+ if self.description:
+ desc = " [" + self.description + "]"
+ sep = "\n\t"
+
+ at = _caller(traceback.extract_stack(), skip)
+ sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
+
+ sys.exit(2)
+
+def pass_test(self = None, condition = 1, function = None):
+ """Causes a test to pass.
+
+ By default, the pass_test() method reports PASSED for the test
+ and exits with a status of 0. If a condition argument is supplied,
+ the test passes only if the condition is true.
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ sys.stderr.write("PASSED\n")
+ sys.exit(0)
+
+def match_exact(lines = None, matches = None):
+ """
+ """
+ if not is_List(lines):
+ lines = string.split(lines, "\n")
+ if not is_List(matches):
+ matches = string.split(matches, "\n")
+ if len(lines) != len(matches):
+ return
+ for i in range(len(lines)):
+ if lines[i] != matches[i]:
+ return
+ return 1
+
+def match_re(lines = None, res = None):
+ """
+ """
+ if not is_List(lines):
+ lines = string.split(lines, "\n")
+ if not is_List(res):
+ res = string.split(res, "\n")
+ if len(lines) != len(res):
+ return
+ for i in range(len(lines)):
+ if not re.compile("^" + res[i] + "$").search(lines[i]):
+ return
+ return 1
+
+def match_re_dotall(lines = None, res = None):
+ """
+ """
+ if not type(lines) is type(""):
+ lines = string.join(lines, "\n")
+ if not type(res) is type(""):
+ res = string.join(res, "\n")
+ if re.compile("^" + res + "$", re.DOTALL).match(lines):
+ return 1
+
+if os.name == 'java':
+
+ python_executable = os.path.join(sys.prefix, 'jython')
+
+else:
+
+ python_executable = sys.executable
+
+if sys.platform == 'win32':
+
+ default_sleep_seconds = 2
+
+ def where_is(file, path=None, pathext=None):
+ if path is None:
+ path = os.environ['PATH']
+ if is_String(path):
+ path = string.split(path, os.pathsep)
+ if pathext is None:
+ pathext = os.environ['PATHEXT']
+ if is_String(pathext):
+ pathext = string.split(pathext, os.pathsep)
+ for ext in pathext:
+ if string.lower(ext) == string.lower(file[-len(ext):]):
+ pathext = ['']
+ break
+ for dir in path:
+ f = os.path.join(dir, file)
+ for ext in pathext:
+ fext = f + ext
+ if os.path.isfile(fext):
+ return fext
+ return None
+
+else:
+
+ def where_is(file, path=None, pathext=None):
+ if path is None:
+ path = os.environ['PATH']
+ if is_String(path):
+ path = string.split(path, os.pathsep)
+ for dir in path:
+ f = os.path.join(dir, file)
+ if os.path.isfile(f):
+ try:
+ st = os.stat(f)
+ except OSError:
+ continue
+ if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
+ return f
+ return None
+
+ default_sleep_seconds = 1
+
+class TestCmd:
+ """Class TestCmd
+ """
+
+ def __init__(self, description = None,
+ program = None,
+ interpreter = None,
+ workdir = None,
+ subdir = None,
+ verbose = None,
+ match = None,
+ combine = 0):
+ self._cwd = os.getcwd()
+ self.description_set(description)
+ self.program_set(program)
+ self.interpreter_set(interpreter)
+ if verbose is None:
+ try:
+ verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
+ except ValueError:
+ verbose = 0
+ self.verbose_set(verbose)
+ self.combine = combine
+ if not match is None:
+ self.match_func = match
+ else:
+ self.match_func = match_re
+ self._dirlist = []
+ self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
+ if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
+ self._preserve['pass_test'] = os.environ['PRESERVE']
+ self._preserve['fail_test'] = os.environ['PRESERVE']
+ self._preserve['no_result'] = os.environ['PRESERVE']
+ else:
+ try:
+ self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
+ except KeyError:
+ pass
+ try:
+ self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
+ except KeyError:
+ pass
+ try:
+ self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
+ except KeyError:
+ pass
+ self._stdout = []
+ self._stderr = []
+ self.status = None
+ self.condition = 'no_result'
+ self.workdir_set(workdir)
+ self.subdir(subdir)
+
+ def __del__(self):
+ self.cleanup()
+
+ def __repr__(self):
+ return "%x" % id(self)
+
+ if os.name == 'posix':
+
+ def escape(self, arg):
+ "escape shell special characters"
+ slash = '\\'
+ special = '"$'
+
+ arg = string.replace(arg, slash, slash+slash)
+ for c in special:
+ arg = string.replace(arg, c, slash+c)
+
+ if re_space.search(arg):
+ arg = '"' + arg + '"'
+ return arg
+
+ else:
+
+ # Windows does not allow special characters in file names
+ # anyway, so no need for an escape function, we will just quote
+ # the arg.
+ def escape(self, arg):
+ if re_space.search(arg):
+ arg = '"' + arg + '"'
+ return arg
+
+ def canonicalize(self, path):
+ if is_List(path):
+ path = apply(os.path.join, tuple(path))
+ if not os.path.isabs(path):
+ path = os.path.join(self.workdir, path)
+ return path
+
+ def cleanup(self, condition = None):
+ """Removes any temporary working directories for the specified
+ TestCmd environment. If the environment variable PRESERVE was
+ set when the TestCmd environment was created, temporary working
+ directories are not removed. If any of the environment variables
+ PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
+ when the TestCmd environment was created, then temporary working
+ directories are not removed if the test passed, failed, or had
+ no result, respectively. Temporary working directories are also
+ preserved for conditions specified via the preserve method.
+
+ Typically, this method is not called directly, but is used when
+ the script exits to clean up temporary working directories as
+ appropriate for the exit status.
+ """
+ if not self._dirlist:
+ return
+ os.chdir(self._cwd)
+ self.workdir = None
+ if condition is None:
+ condition = self.condition
+ if self._preserve[condition]:
+ for dir in self._dirlist:
+ print "Preserved directory", dir
+ else:
+ list = self._dirlist[:]
+ list.reverse()
+ for dir in list:
+ self.writable(dir, 1)
+ shutil.rmtree(dir, ignore_errors = 1)
+ self._dirlist = []
+
+ try:
+ global _Cleanup
+ _Cleanup.remove(self)
+ except (AttributeError, ValueError):
+ pass
+
+ def chmod(self, path, mode):
+ """Changes permissions on the specified file or directory
+ path name."""
+ path = self.canonicalize(path)
+ os.chmod(path, mode)
+
+ def description_set(self, description):
+ """Set the description of the functionality being tested.
+ """
+ self.description = description
+
+# def diff(self):
+# """Diff two arrays.
+# """
+
+ def fail_test(self, condition = 1, function = None, skip = 0):
+ """Cause the test to fail.
+ """
+ if not condition:
+ return
+ self.condition = 'fail_test'
+ fail_test(self = self,
+ condition = condition,
+ function = function,
+ skip = skip)
+
+ def interpreter_set(self, interpreter):
+ """Set the program to be used to interpret the program
+ under test as a script.
+ """
+ self.interpreter = interpreter
+
+ def match(self, lines, matches):
+ """Compare actual and expected file contents.
+ """
+ return self.match_func(lines, matches)
+
+ def match_exact(self, lines, matches):
+ """Compare actual and expected file contents.
+ """
+ return match_exact(lines, matches)
+
+ def match_re(self, lines, res):
+ """Compare actual and expected file contents.
+ """
+ return match_re(lines, res)
+
+ def match_re_dotall(self, lines, res):
+ """Compare actual and expected file contents.
+ """
+ return match_re_dotall(lines, res)
+
+ def no_result(self, condition = 1, function = None, skip = 0):
+ """Report that the test could not be run.
+ """
+ if not condition:
+ return
+ self.condition = 'no_result'
+ no_result(self = self,
+ condition = condition,
+ function = function,
+ skip = skip)
+
+ def pass_test(self, condition = 1, function = None):
+ """Cause the test to pass.
+ """
+ if not condition:
+ return
+ self.condition = 'pass_test'
+ pass_test(self = self, condition = condition, function = function)
+
+ def preserve(self, *conditions):
+ """Arrange for the temporary working directories for the
+ specified TestCmd environment to be preserved for one or more
+ conditions. If no conditions are specified, arranges for
+ the temporary working directories to be preserved for all
+ conditions.
+ """
+ if conditions is ():
+ conditions = ('pass_test', 'fail_test', 'no_result')
+ for cond in conditions:
+ self._preserve[cond] = 1
+
+ def program_set(self, program):
+ """Set the executable program or script to be tested.
+ """
+ if program and not os.path.isabs(program):
+ program = os.path.join(self._cwd, program)
+ self.program = program
+
+ def read(self, file, mode = 'rb'):
+ """Reads and returns the contents of the specified file name.
+ The file name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The file is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name. The I/O mode for the file may
+ be specified; it must begin with an 'r'. The default is
+ 'rb' (binary read).
+ """
+ file = self.canonicalize(file)
+ if mode[0] != 'r':
+ raise ValueError, "mode must begin with 'r'"
+ return open(file, mode).read()
+
+ def run(self, program = None,
+ interpreter = None,
+ arguments = None,
+ chdir = None,
+ stdin = None):
+ """Runs a test of the program or script for the test
+ environment. Standard output and error output are saved for
+ future retrieval via the stdout() and stderr() methods.
+
+ The specified program will have the original directory
+ prepending unless it is enclosed in a [list].
+ """
+ if chdir:
+ oldcwd = os.getcwd()
+ if not os.path.isabs(chdir):
+ chdir = os.path.join(self.workpath(chdir))
+ if self.verbose:
+ sys.stderr.write("chdir(" + chdir + ")\n")
+ os.chdir(chdir)
+ if program:
+ if type(program) == type('') and not os.path.isabs(program):
+ program = os.path.join(self._cwd, program)
+ else:
+ program = self.program
+ if not interpreter:
+ interpreter = self.interpreter
+ if not type(program) in [type([]), type(())]:
+ program = [program]
+ cmd = list(program)
+ if interpreter:
+ if not type(interpreter) in [type([]), type(())]:
+ interpreter = [interpreter]
+ cmd = list(interpreter) + cmd
+ if arguments:
+ if type(arguments) == type(''):
+ arguments = string.split(arguments)
+ cmd.extend(arguments)
+ cmd_string = string.join(map(self.escape, cmd), ' ')
+ if self.verbose:
+ sys.stderr.write(cmd_string + "\n")
+ try:
+ p = popen2.Popen3(cmd, 1)
+ except AttributeError:
+ if sys.platform == 'win32' and cmd_string[0] == '"':
+ cmd_string = '"' + cmd_string + '"'
+ (tochild, fromchild, childerr) = os.popen3(' ' + cmd_string)
+ if stdin:
+ if is_List(stdin):
+ for line in stdin:
+ tochild.write(line)
+ else:
+ tochild.write(stdin)
+ tochild.close()
+ out = fromchild.read()
+ err = childerr.read()
+ if self.combine:
+ self._stdout.append(out + err)
+ else:
+ self._stdout.append(out)
+ self._stderr.append(err)
+ fromchild.close()
+ self.status = childerr.close()
+ if not self.status:
+ self.status = 0
+ except:
+ raise
+ else:
+ if stdin:
+ if is_List(stdin):
+ for line in stdin:
+ p.tochild.write(line)
+ else:
+ p.tochild.write(stdin)
+ p.tochild.close()
+ out = p.fromchild.read()
+ err = p.childerr.read()
+ if self.combine:
+ self._stdout.append(out + err)
+ else:
+ self._stdout.append(out)
+ self._stderr.append(err)
+ self.status = p.wait()
+ if chdir:
+ os.chdir(oldcwd)
+ if self.verbose >= 2:
+ write = sys.stdout.write
+ write('============ STATUS: %d\n' % self.status)
+ out = self.stdout()
+ if out or self.verbose >= 3:
+ write('============ BEGIN STDOUT (len=%d):\n' % len(out))
+ write(out)
+ write('============ END STDOUT\n')
+ err = self.stderr()
+ if err or self.verbose >= 3:
+ write('============ BEGIN STDERR (len=%d)\n' % len(err))
+ write(err)
+ write('============ END STDERR\n')
+
+ def sleep(self, seconds = default_sleep_seconds):
+ """Sleeps at least the specified number of seconds. If no
+ number is specified, sleeps at least the minimum number of
+ seconds necessary to advance file time stamps on the current
+ system. Sleeping more seconds is all right.
+ """
+ time.sleep(seconds)
+
+ def stderr(self, run = None):
+ """Returns the error output from the specified run number.
+ If there is no specified run number, then returns the error
+ output of the last run. If the run number is less than zero,
+ then returns the error output from that many runs back from the
+ current run.
+ """
+ if not run:
+ run = len(self._stderr)
+ elif run < 0:
+ run = len(self._stderr) + run
+ run = run - 1
+ return self._stderr[run]
+
+ def stdout(self, run = None):
+ """Returns the standard output from the specified run number.
+ If there is no specified run number, then returns the standard
+ output of the last run. If the run number is less than zero,
+ then returns the standard output from that many runs back from
+ the current run.
+ """
+ if not run:
+ run = len(self._stdout)
+ elif run < 0:
+ run = len(self._stdout) + run
+ run = run - 1
+ return self._stdout[run]
+
+ def subdir(self, *subdirs):
+ """Create new subdirectories under the temporary working
+ directory, one for each argument. An argument may be a list,
+ in which case the list elements are concatenated using the
+ os.path.join() method. Subdirectories multiple levels deep
+ must be created using a separate argument for each level:
+
+ test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
+
+ Returns the number of subdirectories actually created.
+ """
+ count = 0
+ for sub in subdirs:
+ if sub is None:
+ continue
+ if is_List(sub):
+ sub = apply(os.path.join, tuple(sub))
+ new = os.path.join(self.workdir, sub)
+ try:
+ os.mkdir(new)
+ except OSError:
+ pass
+ else:
+ count = count + 1
+ return count
+
+ def symlink(self, target, link):
+ """Creates a symlink to the specified target.
+ The link name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The link is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name. The target is *not* assumed to be
+ under the temporary working directory.
+ """
+ link = self.canonicalize(link)
+ os.symlink(target, link)
+
+ def touch(self, path, mtime=None):
+ """Updates the modification time on the specified file or
+ directory path name. The default is to update to the
+ current time if no explicit modification time is specified.
+ """
+ path = self.canonicalize(path)
+ atime = os.path.getatime(path)
+ if mtime is None:
+ mtime = time.time()
+ os.utime(path, (atime, mtime))
+
+ def unlink(self, file):
+ """Unlinks the specified file name.
+ The file name may be a list, in which case the elements are
+ concatenated with the os.path.join() method. The file is
+ assumed to be under the temporary working directory unless it
+ is an absolute path name.
+ """
+ file = self.canonicalize(file)
+ os.unlink(file)
+
+ def verbose_set(self, verbose):
+ """Set the verbose level.
+ """
+ self.verbose = verbose
+
+ def where_is(self, file, path=None, pathext=None):
+ """Find an executable file.
+ """
+ if is_List(file):
+ file = apply(os.path.join, tuple(file))
+ if not os.path.isabs(file):
+ file = where_is(file, path, pathext)
+ return file
+
+ def workdir_set(self, path):
+ """Creates a temporary working directory with the specified
+ path name. If the path is a null string (''), a unique
+ directory name is created.
+ """
+ if (path != None):
+ if path == '':
+ path = tempfile.mktemp()
+ if path != None:
+ os.mkdir(path)
+ # We'd like to set self.workdir like this:
+ # self.workdir = path
+ # But symlinks in the path will report things
+ # differently from os.getcwd(), so chdir there
+ # and back to fetch the canonical path.
+ cwd = os.getcwd()
+ os.chdir(path)
+ self.workdir = os.getcwd()
+ os.chdir(cwd)
+ # Uppercase the drive letter since the case of drive
+ # letters is pretty much random on win32:
+ drive,rest = os.path.splitdrive(self.workdir)
+ if drive:
+ self.workdir = string.upper(drive) + rest
+ #
+ self._dirlist.append(self.workdir)
+ global _Cleanup
+ try:
+ _Cleanup.index(self)
+ except ValueError:
+ _Cleanup.append(self)
+ else:
+ self.workdir = None
+
+ def workpath(self, *args):
+ """Returns the absolute path name to a subdirectory or file
+ within the current temporary working directory. Concatenates
+ the temporary working directory name with the specified
+ arguments using the os.path.join() method.
+ """
+ return apply(os.path.join, (self.workdir,) + tuple(args))
+
+ def readable(self, top, read=1):
+ """Make the specified directory tree readable (read == 1)
+ or not (read == None).
+ """
+
+ if read:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0400))
+ else:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0400))
+
+ if os.path.isfile(top):
+ # If it's a file, that's easy, just chmod it.
+ do_chmod(top)
+ elif read:
+ # It's a directory and we're trying to turn on read
+ # permission, so it's also pretty easy, just chmod the
+ # directory and then chmod every entry on our walk down the
+ # tree. Because os.path.walk() is top-down, we'll enable
+ # read permission on any directories that have it disabled
+ # before os.path.walk() tries to list their contents.
+ do_chmod(top)
+
+ def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
+ pathnames = map(lambda n, d=dirname: os.path.join(d, n),
+ names)
+ map(lambda p, do=do_chmod: do(p), pathnames)
+
+ os.path.walk(top, chmod_entries, None)
+ else:
+ # It's a directory and we're trying to turn off read
+ # permission, which means we have to chmod the directoreis
+ # in the tree bottom-up, lest disabling read permission from
+ # the top down get in the way of being able to get at lower
+ # parts of the tree. But os.path.walk() visits things top
+ # down, so we just use an object to collect a list of all
+ # of the entries in the tree, reverse the list, and then
+ # chmod the reversed (bottom-up) list.
+ col = Collector(top)
+ os.path.walk(top, col, None)
+ col.entries.reverse()
+ map(lambda d, do=do_chmod: do(d), col.entries)
+
+ def writable(self, top, write=1):
+ """Make the specified directory tree writable (write == 1)
+ or not (write == None).
+ """
+
+ if write:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
+ else:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
+
+ if os.path.isfile(top):
+ do_chmod(top)
+ else:
+ col = Collector(top)
+ os.path.walk(top, col, None)
+ map(lambda d, do=do_chmod: do(d), col.entries)
+
+ def executable(self, top, execute=1):
+ """Make the specified directory tree executable (execute == 1)
+ or not (execute == None).
+ """
+
+ if execute:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0100))
+ else:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0100))
+
+ if os.path.isfile(top):
+ # If it's a file, that's easy, just chmod it.
+ do_chmod(top)
+ elif execute:
+ # It's a directory and we're trying to turn on execute
+ # permission, so it's also pretty easy, just chmod the
+ # directory and then chmod every entry on our walk down the
+ # tree. Because os.path.walk() is top-down, we'll enable
+ # execute permission on any directories that have it disabled
+ # before os.path.walk() tries to list their contents.
+ do_chmod(top)
+
+ def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
+ pathnames = map(lambda n, d=dirname: os.path.join(d, n),
+ names)
+ map(lambda p, do=do_chmod: do(p), pathnames)
+
+ os.path.walk(top, chmod_entries, None)
+ else:
+ # It's a directory and we're trying to turn off execute
+ # permission, which means we have to chmod the directories
+ # in the tree bottom-up, lest disabling execute permission from
+ # the top down get in the way of being able to get at lower
+ # parts of the tree. But os.path.walk() visits things top
+ # down, so we just use an object to collect a list of all
+ # of the entries in the tree, reverse the list, and then
+ # chmod the reversed (bottom-up) list.
+ col = Collector(top)
+ os.path.walk(top, col, None)
+ col.entries.reverse()
+ map(lambda d, do=do_chmod: do(d), col.entries)
+
+ def write(self, file, content, mode = 'wb'):
+ """Writes the specified content text (second argument) to the
+ specified file name (first argument). The file name may be
+ a list, in which case the elements are concatenated with the
+ os.path.join() method. The file is created under the temporary
+ working directory. Any subdirectories in the path must already
+ exist. The I/O mode for the file may be specified; it must
+ begin with a 'w'. The default is 'wb' (binary write).
+ """
+ file = self.canonicalize(file)
+ if mode[0] != 'w':
+ raise ValueError, "mode must begin with 'w'"
+ open(file, mode).write(content)
diff --git a/QMTest/TestCommon.py b/QMTest/TestCommon.py
new file mode 100644
index 0000000..b30b75c
--- /dev/null
+++ b/QMTest/TestCommon.py
@@ -0,0 +1,429 @@
+"""
+TestCommon.py: a testing framework for commands and scripts
+ with commonly useful error handling
+
+The TestCommon module provides a simple, high-level interface for writing
+tests of executable commands and scripts, especially commands and scripts
+that interact with the file system. All methods throw exceptions and
+exit on failure, with useful error messages. This makes a number of
+explicit checks unnecessary, making the test scripts themselves simpler
+to write and easier to read.
+
+The TestCommon class is a subclass of the TestCmd class. In essence,
+TestCommon is a wrapper that handles common TestCmd error conditions in
+useful ways. You can use TestCommon directly, or subclass it for your
+program and add additional (or override) methods to tailor it to your
+program's specific needs. Alternatively, the TestCommon class serves
+as a useful example of how to define your own TestCmd subclass.
+
+As a subclass of TestCmd, TestCommon provides access to all of the
+variables and methods from the TestCmd module. Consequently, you can
+use any variable or method documented in the TestCmd module without
+having to explicitly import TestCmd.
+
+A TestCommon environment object is created via the usual invocation:
+
+ import TestCommon
+ test = TestCommon.TestCommon()
+
+You can use all of the TestCmd keyword arguments when instantiating a
+TestCommon object; see the TestCmd documentation for details.
+
+Here is an overview of the methods and keyword arguments that are
+provided by the TestCommon class:
+
+ test.must_be_writable('file1', ['file2', ...])
+
+ test.must_contain('file', 'required text\n')
+
+ test.must_exist('file1', ['file2', ...])
+
+ test.must_match('file', "expected contents\n")
+
+ test.must_not_be_writable('file1', ['file2', ...])
+
+ test.must_not_exist('file1', ['file2', ...])
+
+ test.run(options = "options to be prepended to arguments",
+ stdout = "expected standard output from the program",
+ stderr = "expected error output from the program",
+ status = expected_status,
+ match = match_function)
+
+The TestCommon module also provides the following variables
+
+ TestCommon.python_executable
+ TestCommon.exe_suffix
+ TestCommon.obj_suffix
+ TestCommon.shobj_suffix
+ TestCommon.lib_prefix
+ TestCommon.lib_suffix
+ TestCommon.dll_prefix
+ TestCommon.dll_suffix
+
+"""
+
+# Copyright 2000, 2001, 2002, 2003, 2004 Steven Knight
+# This module is free software, and you may redistribute it and/or modify
+# it under the same terms as Python itself, so long as this copyright message
+# and disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
+# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
+# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+#
+# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
+# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+
+__author__ = "Steven Knight <knight at baldmt dot com>"
+__revision__ = "TestCommon.py 0.22.D001 2006/02/26 15:45:18 knight"
+__version__ = "0.22"
+
+import os
+import os.path
+import stat
+import string
+import sys
+import types
+import UserList
+
+from TestCmd import *
+from TestCmd import __all__
+
+__all__.extend([ 'TestCommon',
+ 'TestFailed',
+ 'TestNoResult',
+ 'exe_suffix',
+ 'obj_suffix',
+ 'shobj_suffix',
+ 'lib_prefix',
+ 'lib_suffix',
+ 'dll_prefix',
+ 'dll_suffix',
+ ])
+
+# Variables that describe the prefixes and suffixes on this system.
+if sys.platform == 'win32':
+ exe_suffix = '.exe'
+ obj_suffix = '.obj'
+ shobj_suffix = '.obj'
+ lib_prefix = ''
+ lib_suffix = '.lib'
+ dll_prefix = ''
+ dll_suffix = '.dll'
+elif sys.platform == 'cygwin':
+ exe_suffix = '.exe'
+ obj_suffix = '.o'
+ shobj_suffix = '.os'
+ lib_prefix = 'lib'
+ lib_suffix = '.a'
+ dll_prefix = ''
+ dll_suffix = '.dll'
+elif string.find(sys.platform, 'irix') != -1:
+ exe_suffix = ''
+ obj_suffix = '.o'
+ shobj_suffix = '.o'
+ lib_prefix = 'lib'
+ lib_suffix = '.a'
+ dll_prefix = 'lib'
+ dll_suffix = '.so'
+elif string.find(sys.platform, 'darwin') != -1:
+ exe_suffix = ''
+ obj_suffix = '.o'
+ shobj_suffix = '.os'
+ lib_prefix = 'lib'
+ lib_suffix = '.a'
+ dll_prefix = 'lib'
+ dll_suffix = '.dylib'
+else:
+ exe_suffix = ''
+ obj_suffix = '.o'
+ shobj_suffix = '.os'
+ lib_prefix = 'lib'
+ lib_suffix = '.a'
+ dll_prefix = 'lib'
+ dll_suffix = '.so'
+
+try:
+ import difflib
+except ImportError:
+ pass
+else:
+ def simple_diff(a, b, fromfile='', tofile='',
+ fromfiledate='', tofiledate='', n=3, lineterm='\n'):
+ """
+ A function with the same calling signature as difflib.context_diff
+ (diff -c) and difflib.unified_diff (diff -u) but which prints
+ output like the simple, unadorned 'diff" command.
+ """
+ sm = difflib.SequenceMatcher(None, a, b)
+ def comma(x1, x2):
+ return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
+ result = []
+ for op, a1, a2, b1, b2 in sm.get_opcodes():
+ if op == 'delete':
+ result.append("%sd%d" % (comma(a1, a2), b1))
+ result.extend(map(lambda l: '< ' + l, a[a1:a2]))
+ elif op == 'insert':
+ result.append("%da%s" % (a1, comma(b1, b2)))
+ result.extend(map(lambda l: '> ' + l, b[b1:b2]))
+ elif op == 'replace':
+ result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
+ result.extend(map(lambda l: '< ' + l, a[a1:a2]))
+ result.append('---')
+ result.extend(map(lambda l: '> ' + l, b[b1:b2]))
+ return result
+
+def is_List(e):
+ return type(e) is types.ListType \
+ or isinstance(e, UserList.UserList)
+
+def is_writable(f):
+ mode = os.stat(f)[stat.ST_MODE]
+ return mode & stat.S_IWUSR
+
+def separate_files(flist):
+ existing = []
+ missing = []
+ for f in flist:
+ if os.path.exists(f):
+ existing.append(f)
+ else:
+ missing.append(f)
+ return existing, missing
+
+class TestFailed(Exception):
+ def __init__(self, args=None):
+ self.args = args
+
+class TestNoResult(Exception):
+ def __init__(self, args=None):
+ self.args = args
+
+if os.name == 'posix':
+ def _failed(self, status = 0):
+ if self.status is None or status is None:
+ return None
+ if os.WIFSIGNALED(self.status):
+ return None
+ return _status(self) != status
+ def _status(self):
+ if os.WIFEXITED(self.status):
+ return os.WEXITSTATUS(self.status)
+ else:
+ return None
+elif os.name == 'nt':
+ def _failed(self, status = 0):
+ return not (self.status is None or status is None) and \
+ self.status != status
+ def _status(self):
+ return self.status
+
+class TestCommon(TestCmd):
+
+ # Additional methods from the Perl Test::Cmd::Common module
+ # that we may wish to add in the future:
+ #
+ # $test->subdir('subdir', ...);
+ #
+ # $test->copy('src_file', 'dst_file');
+
+ def __init__(self, **kw):
+ """Initialize a new TestCommon instance. This involves just
+ calling the base class initialization, and then changing directory
+ to the workdir.
+ """
+ apply(TestCmd.__init__, [self], kw)
+ os.chdir(self.workdir)
+ try:
+ difflib
+ except NameError:
+ pass
+ else:
+ self.diff_function = simple_diff
+ #self.diff_function = difflib.context_diff
+ #self.diff_function = difflib.unified_diff
+
+ banner_char = '='
+ banner_width = 80
+
+ def banner(self, s, width=None):
+ if width is None:
+ width = self.banner_width
+ return s + self.banner_char * (width - len(s))
+
+ try:
+ difflib
+ except NameError:
+ def diff(self, a, b, name, *args, **kw):
+ print self.banner('Expected %s' % name)
+ print a
+ print self.banner('Actual %s' % name)
+ print b
+ else:
+ def diff(self, a, b, name, *args, **kw):
+ print self.banner(name)
+ args = (a.splitlines(), b.splitlines()) + args
+ lines = apply(self.diff_function, args, kw)
+ for l in lines:
+ print l
+
+ def must_be_writable(self, *files):
+ """Ensures that the specified file(s) exist and are writable.
+ An individual file can be specified as a list of directory names,
+ in which case the pathname will be constructed by concatenating
+ them. Exits FAILED if any of the files does not exist or is
+ not writable.
+ """
+ files = map(lambda x: is_List(x) and apply(os.path.join, x) or x, files)
+ existing, missing = separate_files(files)
+ unwritable = filter(lambda x, iw=is_writable: not iw(x), existing)
+ if missing:
+ print "Missing files: `%s'" % string.join(missing, "', `")
+ if unwritable:
+ print "Unwritable files: `%s'" % string.join(unwritable, "', `")
+ self.fail_test(missing + unwritable)
+
+ def must_contain(self, file, required, mode = 'rb'):
+ """Ensures that the specified file contains the required text.
+ """
+ file_contents = self.read(file, mode)
+ contains = (string.find(file_contents, required) != -1)
+ if not contains:
+ print "File `%s' does not contain required string." % file
+ print self.banner('Required string ')
+ print required
+ print self.banner('%s contents ' % file)
+ print file_contents
+ self.fail_test(not contains)
+
+ def must_exist(self, *files):
+ """Ensures that the specified file(s) must exist. An individual
+ file be specified as a list of directory names, in which case the
+ pathname will be constructed by concatenating them. Exits FAILED
+ if any of the files does not exist.
+ """
+ files = map(lambda x: is_List(x) and apply(os.path.join, x) or x, files)
+ missing = filter(lambda x: not os.path.exists(x), files)
+ if missing:
+ print "Missing files: `%s'" % string.join(missing, "', `")
+ self.fail_test(missing)
+
+ def must_match(self, file, expect, mode = 'rb'):
+ """Matches the contents of the specified file (first argument)
+ against the expected contents (second argument). The expected
+ contents are a list of lines or a string which will be split
+ on newlines.
+ """
+ file_contents = self.read(file, mode)
+ try:
+ self.fail_test(not self.match(file_contents, expect))
+ except KeyboardInterrupt:
+ raise
+ except:
+ print "Unexpected contents of `%s'" % file
+ self.diff(expect, file_contents, 'contents ')
+ raise
+
+ def must_not_exist(self, *files):
+ """Ensures that the specified file(s) must not exist.
+ An individual file be specified as a list of directory names, in
+ which case the pathname will be constructed by concatenating them.
+ Exits FAILED if any of the files exists.
+ """
+ files = map(lambda x: is_List(x) and apply(os.path.join, x) or x, files)
+ existing = filter(os.path.exists, files)
+ if existing:
+ print "Unexpected files exist: `%s'" % string.join(existing, "', `")
+ self.fail_test(existing)
+
+
+ def must_not_be_writable(self, *files):
+ """Ensures that the specified file(s) exist and are not writable.
+ An individual file can be specified as a list of directory names,
+ in which case the pathname will be constructed by concatenating
+ them. Exits FAILED if any of the files does not exist or is
+ writable.
+ """
+ files = map(lambda x: is_List(x) and apply(os.path.join, x) or x, files)
+ existing, missing = separate_files(files)
+ writable = filter(is_writable, existing)
+ if missing:
+ print "Missing files: `%s'" % string.join(missing, "', `")
+ if writable:
+ print "Writable files: `%s'" % string.join(writable, "', `")
+ self.fail_test(missing + writable)
+
+ def run(self, options = None, arguments = None,
+ stdout = None, stderr = '', status = 0, **kw):
+ """Runs the program under test, checking that the test succeeded.
+
+ The arguments are the same as the base TestCmd.run() method,
+ with the addition of:
+
+ options Extra options that get appended to the beginning
+ of the arguments.
+
+ stdout The expected standard output from
+ the command. A value of None means
+ don't test standard output.
+
+ stderr The expected error output from
+ the command. A value of None means
+ don't test error output.
+
+ status The expected exit status from the
+ command. A value of None means don't
+ test exit status.
+
+ By default, this expects a successful exit (status = 0), does
+ not test standard output (stdout = None), and expects that error
+ output is empty (stderr = "").
+ """
+ if options:
+ if arguments is None:
+ arguments = options
+ else:
+ arguments = options + " " + arguments
+ kw['arguments'] = arguments
+ try:
+ match = kw['match']
+ del kw['match']
+ except KeyError:
+ match = self.match
+ try:
+ apply(TestCmd.run, [self], kw)
+ except KeyboardInterrupt:
+ raise
+ except:
+ print self.banner('STDOUT ')
+ print self.stdout()
+ print self.banner('STDERR ')
+ print self.stderr()
+ raise
+ if _failed(self, status):
+ expect = ''
+ if status != 0:
+ expect = " (expected %s)" % str(status)
+ print "%s returned %s%s" % (self.program, str(_status(self)), expect)
+ print self.banner('STDOUT ')
+ print self.stdout()
+ print self.banner('STDERR ')
+ print self.stderr()
+ raise TestFailed
+ if not stdout is None and not match(self.stdout(), stdout):
+ self.diff(stdout, self.stdout(), 'STDOUT ')
+ stderr = self.stderr()
+ if stderr:
+ print self.banner('STDERR ')
+ print stderr
+ raise TestFailed
+ if not stderr is None and not match(self.stderr(), stderr):
+ print self.banner('STDOUT ')
+ print self.stdout()
+ self.diff(stderr, self.stderr(), 'STDERR ')
+ raise TestFailed
diff --git a/QMTest/TestRuntest.py b/QMTest/TestRuntest.py
new file mode 100644
index 0000000..6fd423a
--- /dev/null
+++ b/QMTest/TestRuntest.py
@@ -0,0 +1,137 @@
+"""
+TestRuntest.py: a testing framework for the runtest.py command used to
+invoke SCons tests.
+
+A TestRuntest environment object is created via the usual invocation:
+
+ test = TestRuntest()
+
+TestRuntest is a subclass of TestCommon, which is in turn is a subclass
+of TestCmd), and hence has available all of the methods and attributes
+from those classes, as well as any overridden or additional methods or
+attributes defined in this subclass.
+"""
+
+# __COPYRIGHT__
+
+__revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
+
+import os
+import os.path
+import string
+import shutil
+import sys
+
+from TestCommon import *
+from TestCommon import __all__
+
+__all__.extend([ 'TestRuntest',
+ 'python',
+ ])
+
+python = python_executable
+
+
+failing_test_template = """\
+import sys
+sys.stdout.write('FAILING TEST STDOUT\\n')
+sys.stderr.write('FAILING TEST STDERR\\n')
+sys.exit(1)
+"""
+
+no_result_test_template = """\
+import sys
+sys.stdout.write('NO RESULT TEST STDOUT\\n')
+sys.stderr.write('NO RESULT TEST STDERR\\n')
+sys.exit(2)
+"""
+
+passing_test_template = """\
+import sys
+sys.stdout.write('PASSING TEST STDOUT\\n')
+sys.stderr.write('PASSING TEST STDERR\\n')
+sys.exit(0)
+"""
+
+class TestRuntest(TestCommon):
+ """Class for testing the runtest.py script.
+
+ This provides a common place for initializing Runtest tests,
+ eliminating the need to begin every test with the same repeated
+ initializations.
+ """
+
+ def __init__(self, **kw):
+ """Initialize a Runtest testing object.
+
+ If they're not overridden by keyword arguments, this
+ initializes the object with the following default values:
+
+ program = 'runtest.py'
+ interpreter = ['python', '-tt']
+ match = match_exact
+ workdir = ''
+
+ The workdir value means that, by default, a temporary
+ workspace directory is created for a TestRuntest environment.
+ The superclass TestCommon.__init__() will change directory (chdir)
+ to the workspace directory, so an explicit "chdir = '.'" on all
+ of the run() method calls is not necessary. This initialization
+ also copies the runtest.py and QMTest/ subdirectory tree to the
+ temporary directory, duplicating how this test infrastructure
+ appears in a normal workspace.
+ """
+ set_workpath_runtest = None
+ if not kw.has_key('program'):
+ kw['program'] = 'runtest.py'
+ set_workpath_runtest = 1
+ if not kw.has_key('interpreter'):
+ kw['interpreter'] = [python, '-tt']
+ if not kw.has_key('match'):
+ kw['match'] = match_exact
+ if not kw.has_key('workdir'):
+ kw['workdir'] = ''
+ orig_cwd = os.getcwd()
+ apply(TestCommon.__init__, [self], kw)
+
+ things_to_copy = [
+ 'runtest.py',
+ 'QMTest',
+ ]
+
+ dirs = []
+
+ spe = os.environ.get('SCONS_SOURCE_PATH_EXECUTABLE', orig_cwd)
+ for d in string.split(spe, os.pathsep):
+ dirs.append(os.path.join(d, 'build'))
+ dirs.append(d)
+
+ for thing in things_to_copy:
+ for dir in dirs:
+ t = os.path.join(dir, thing)
+ if os.path.exists(t):
+ if os.path.isdir(t):
+ copy_func = shutil.copytree
+ else:
+ copy_func = shutil.copyfile
+ copy_func(t, self.workpath(thing))
+ break
+
+ if set_workpath_runtest:
+ self.program_set(self.workpath('runtest.py'))
+
+ for key in os.environ.keys():
+ if key[:5] == 'AEGIS':
+ os.environ[key] = ''
+
+ os.environ['PYTHONPATH'] = ''
+ os.environ['SCONS_SOURCE_PATH_EXECUTABLE'] = ''
+
+ def write_failing_test(self, name):
+ self.write(name, failing_test_template)
+
+ def write_no_result_test(self, name):
+ self.write(name, no_result_test_template)
+
+ def write_passing_test(self, name):
+ self.write(name, passing_test_template)
diff --git a/QMTest/TestSCons.py b/QMTest/TestSCons.py
new file mode 100644
index 0000000..0904d31
--- /dev/null
+++ b/QMTest/TestSCons.py
@@ -0,0 +1,538 @@
+"""
+TestSCons.py: a testing framework for the SCons software construction
+tool.
+
+A TestSCons environment object is created via the usual invocation:
+
+ test = TestSCons()
+
+TestScons is a subclass of TestCommon, which is in turn is a subclass
+of TestCmd), and hence has available all of the methods and attributes
+from those classes, as well as any overridden or additional methods or
+attributes defined in this subclass.
+"""
+
+# __COPYRIGHT__
+
+__revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
+
+import os
+import os.path
+import string
+import sys
+
+from TestCommon import *
+from TestCommon import __all__
+
+__all__.extend([ 'TestSCons',
+ 'python',
+ '_exe',
+ '_obj',
+ '_shobj',
+ 'lib_',
+ '_lib',
+ 'dll_',
+ '_dll'
+ ])
+
+python = python_executable
+_exe = exe_suffix
+_obj = obj_suffix
+_shobj = shobj_suffix
+_lib = lib_suffix
+lib_ = lib_prefix
+_dll = dll_suffix
+dll_ = dll_prefix
+
+def gccFortranLibs():
+ """Test whether -lfrtbegin is required. This can probably be done in
+ a more reliable way, but using popen3 is relatively efficient."""
+
+ libs = ['g2c']
+
+ try:
+ import popen2
+ stderr = popen2.popen3('gcc -v')[2]
+ except OSError:
+ return libs
+
+ for l in stderr.readlines():
+ list = string.split(l)
+ if len(list) > 3 and list[:2] == ['gcc', 'version']:
+ if list[2][:2] == '3.':
+ libs = ['frtbegin'] + libs
+ break
+ return libs
+
+
+if sys.platform == 'cygwin':
+ # On Cygwin, os.path.normcase() lies, so just report back the
+ # fact that the underlying Win32 OS is case-insensitive.
+ def case_sensitive_suffixes(s1, s2):
+ return 0
+else:
+ def case_sensitive_suffixes(s1, s2):
+ return (os.path.normcase(s1) != os.path.normcase(s2))
+
+
+if sys.platform == 'win32':
+ fortran_lib = gccFortranLibs()
+elif sys.platform == 'cygwin':
+ fortran_lib = gccFortranLibs()
+elif string.find(sys.platform, 'irix') != -1:
+ fortran_lib = ['ftn']
+else:
+ fortran_lib = gccFortranLibs()
+
+
+
+file_expr = r"""File "[^"]*", line \d+, in .+
+"""
+
+# re.escape escapes too much.
+def re_escape(str):
+ for c in ['.', '[', ']', '(', ')', '*', '+', '?']: # Not an exhaustive list.
+ str = string.replace(str, c, '\\' + c)
+ return str
+
+
+
+class TestSCons(TestCommon):
+ """Class for testing SCons.
+
+ This provides a common place for initializing SCons tests,
+ eliminating the need to begin every test with the same repeated
+ initializations.
+ """
+
+ def __init__(self, **kw):
+ """Initialize an SCons testing object.
+
+ If they're not overridden by keyword arguments, this
+ initializes the object with the following default values:
+
+ program = 'scons' if it exists,
+ else 'scons.py'
+ interpreter = 'python'
+ match = match_exact
+ workdir = ''
+
+ The workdir value means that, by default, a temporary workspace
+ directory is created for a TestSCons environment. In addition,
+ this method changes directory (chdir) to the workspace directory,
+ so an explicit "chdir = '.'" on all of the run() method calls
+ is not necessary.
+ """
+ self.orig_cwd = os.getcwd()
+ try:
+ script_dir = os.environ['SCONS_SCRIPT_DIR']
+ except KeyError:
+ pass
+ else:
+ os.chdir(script_dir)
+ if not kw.has_key('program'):
+ kw['program'] = os.environ.get('SCONS')
+ if not kw['program']:
+ if os.path.exists('scons'):
+ kw['program'] = 'scons'
+ else:
+ kw['program'] = 'scons.py'
+ if not kw.has_key('interpreter') and not os.environ.get('SCONS_EXEC'):
+ kw['interpreter'] = [python, '-tt']
+ if not kw.has_key('match'):
+ kw['match'] = match_exact
+ if not kw.has_key('workdir'):
+ kw['workdir'] = ''
+ apply(TestCommon.__init__, [self], kw)
+
+ def Environment(self, ENV=None, *args, **kw):
+ """
+ Return a construction Environment that optionally overrides
+ the default external environment with the specified ENV.
+ """
+ import SCons.Environment
+ import SCons.Errors
+ if not ENV is None:
+ kw['ENV'] = ENV
+ try:
+ return apply(SCons.Environment.Environment, args, kw)
+ except (SCons.Errors.UserError, SCons.Errors.InternalError):
+ return None
+
+ def detect(self, var, prog=None, ENV=None):
+ """
+ Detect a program named 'prog' by first checking the construction
+ variable named 'var' and finally searching the path used by
+ SCons. If either method fails to detect the program, then false
+ is returned, otherwise the full path to prog is returned. If
+ prog is None, then the value of the environment variable will be
+ used as prog.
+ """
+ env = self.Environment(ENV)
+ v = env.subst('$'+var)
+ if not v:
+ return None
+ if prog is None:
+ prog = v
+ if v != prog:
+ return None
+ return env.WhereIs(prog)
+
+ def detect_tool(self, tool, prog=None, ENV=None):
+ """
+ Given a tool (i.e., tool specification that would be passed
+ to the "tools=" parameter of Environment()) and a program that
+ corresponds to that tool, return true if and only if we can find
+ that tool using Environment.Detect().
+
+ By default, prog is set to the value passed into the tools parameter.
+ """
+
+ if not prog:
+ prog = tool
+ env = self.Environment(ENV, tools=[tool])
+ if env is None:
+ return None
+ return env.Detect([prog])
+
+ def where_is(self, prog, path=None):
+ """
+ Given a program, search for it in the specified external PATH,
+ or in the actual external PATH is none is specified.
+ """
+ import SCons.Environment
+ env = SCons.Environment.Environment()
+ if path is None:
+ path = os.environ['PATH']
+ return env.WhereIs(prog, path)
+
+ def wrap_stdout(self, build_str = "", read_str = "", error = 0, cleaning = 0):
+ """Wraps standard output string(s) in the normal
+ "Reading ... done" and "Building ... done" strings
+ """
+ cap,lc = [ ('Build','build'),
+ ('Clean','clean') ][cleaning]
+ if error:
+ term = "scons: %sing terminated because of errors.\n" % lc
+ else:
+ term = "scons: done %sing targets.\n" % lc
+ return "scons: Reading SConscript files ...\n" + \
+ read_str + \
+ "scons: done reading SConscript files.\n" + \
+ "scons: %sing targets ...\n" % cap + \
+ build_str + \
+ term
+
+ def up_to_date(self, options = None, arguments = None, read_str = "", **kw):
+ s = ""
+ for arg in string.split(arguments):
+ s = s + "scons: `%s' is up to date.\n" % arg
+ if options:
+ arguments = options + " " + arguments
+ kw['arguments'] = arguments
+ kw['stdout'] = self.wrap_stdout(read_str = read_str, build_str = s)
+ kw['match'] = self.match_exact
+ apply(self.run, [], kw)
+
+ def not_up_to_date(self, options = None, arguments = None, **kw):
+ """Asserts that none of the targets listed in arguments is
+ up to date, but does not make any assumptions on other targets.
+ This function is most useful in conjunction with the -n option.
+ """
+ s = ""
+ for arg in string.split(arguments):
+ s = s + "(?!scons: `%s' is up to date.)" % arg
+ if options:
+ arguments = options + " " + arguments
+ kw['arguments'] = arguments
+ kw['stdout'] = self.wrap_stdout(build_str="("+s+"[^\n]*\n)*")
+ kw['stdout'] = string.replace(kw['stdout'],'\n','\\n')
+ kw['stdout'] = string.replace(kw['stdout'],'.','\\.')
+ kw['match'] = self.match_re_dotall
+ apply(self.run, [], kw)
+
+ def skip_test(self, message="Skipping test.\n"):
+ """Skips a test.
+
+ Proper test-skipping behavior is dependent on whether we're being
+ executed as part of development of a change under Aegis.
+
+ Technically, skipping a test is a NO RESULT, but Aegis will
+ treat that as a test failure and prevent the change from going
+ to the next step. We don't want to force anyone using Aegis
+ to have to install absolutely every tool used by the tests,
+ so we actually report to Aegis that a skipped test has PASSED
+ so that the workflow isn't held up.
+ """
+ if message:
+ sys.stdout.write(message)
+ sys.stdout.flush()
+ devdir = os.popen("aesub '$dd' 2>/dev/null", "r").read()[:-1]
+ intdir = os.popen("aesub '$intd' 2>/dev/null", "r").read()[:-1]
+ if devdir and self._cwd[:len(devdir)] == devdir or \
+ intdir and self._cwd[:len(intdir)] == intdir:
+ # We're under the development directory for this change,
+ # so this is an Aegis invocation; pass the test (exit 0).
+ self.pass_test()
+ else:
+ # skip=1 means skip this function when showing where this
+ # result came from. They only care about the line where the
+ # script called test.skip_test(), not the line number where
+ # we call test.no_result().
+ self.no_result(skip=1)
+
+ def diff_substr(self, expect, actual):
+ i = 0
+ for x, y in zip(expect, actual):
+ if x != y:
+ return "Actual did not match expect at char %d:\n" \
+ " Expect: %s\n" \
+ " Actual: %s\n" \
+ % (i, repr(expect[i-20:i+40]), repr(actual[i-20:i+40]))
+ i = i + 1
+ return "Actual matched the expected output???"
+
+ def java_ENV(self):
+ """
+ Return a default external environment that uses a local Java SDK
+ in preference to whatever's found in the default PATH.
+ """
+ import SCons.Environment
+ env = SCons.Environment.Environment()
+ java_path = [
+ '/usr/local/j2sdk1.4.2/bin',
+ '/usr/local/j2sdk1.4.1/bin',
+ '/usr/local/j2sdk1.3.1/bin',
+ '/usr/local/j2sdk1.3.0/bin',
+ '/usr/local/j2sdk1.2.2/bin',
+ '/usr/local/j2sdk1.2/bin',
+ '/usr/local/j2sdk1.1.8/bin',
+ '/usr/local/j2sdk1.1.7/bin',
+ '/usr/local/j2sdk1.1.6/bin',
+ '/usr/local/j2sdk1.1.5/bin',
+ '/usr/local/j2sdk1.1.4/bin',
+ '/usr/local/j2sdk1.1.3/bin',
+ '/usr/local/j2sdk1.1.2/bin',
+ '/usr/local/j2sdk1.1.1/bin',
+ env['ENV']['PATH'],
+ ]
+ env['ENV']['PATH'] = string.join(java_path, os.pathsep)
+ return env['ENV']
+
+ def Qt_dummy_installation(self, dir='qt'):
+ # create a dummy qt installation
+
+ self.subdir( dir, [dir, 'bin'], [dir, 'include'], [dir, 'lib'] )
+
+ self.write([dir, 'bin', 'mymoc.py'], """\
+import getopt
+import sys
+import string
+import re
+cmd_opts, args = getopt.getopt(sys.argv[1:], 'io:', [])
+output = None
+impl = 0
+opt_string = ''
+for opt, arg in cmd_opts:
+ if opt == '-o': output = open(arg, 'wb')
+ elif opt == '-i': impl = 1
+ else: opt_string = opt_string + ' ' + opt
+for a in args:
+ contents = open(a, 'rb').read()
+ subst = r'{ my_qt_symbol( "' + a + '\\\\n" ); }'
+ if impl:
+ contents = re.sub( r'#include.*', '', contents )
+ output.write(string.replace(contents, 'Q_OBJECT', subst))
+output.close()
+sys.exit(0)
+""")
+
+ self.write([dir, 'bin', 'myuic.py'], """\
+import os.path
+import re
+import sys
+import string
+output_arg = 0
+impl_arg = 0
+impl = None
+source = None
+for arg in sys.argv[1:]:
+ if output_arg:
+ output = open(arg, 'wb')
+ output_arg = 0
+ elif impl_arg:
+ impl = arg
+ impl_arg = 0
+ elif arg == "-o":
+ output_arg = 1
+ elif arg == "-impl":
+ impl_arg = 1
+ else:
+ if source:
+ sys.exit(1)
+ source = open(arg, 'rb')
+ sourceFile = arg
+if impl:
+ output.write( '#include "' + impl + '"\\n' )
+ includes = re.findall('<include.*?>(.*?)</include>', source.read())
+ for incFile in includes:
+ # this is valid for ui.h files, at least
+ if os.path.exists(incFile):
+ output.write('#include "' + incFile + '"\\n')
+else:
+ output.write( '#include "my_qobject.h"\\n' + source.read() + " Q_OBJECT \\n" )
+output.close()
+sys.exit(0)
+""" )
+
+ self.write([dir, 'include', 'my_qobject.h'], r"""
+#define Q_OBJECT ;
+void my_qt_symbol(const char *arg);
+""")
+
+ self.write([dir, 'lib', 'my_qobject.cpp'], r"""
+#include "../include/my_qobject.h"
+#include <stdio.h>
+void my_qt_symbol(const char *arg) {
+ printf( arg );
+}
+""")
+
+ self.write(['qt', 'lib', 'SConstruct'], r"""
+env = Environment()
+env.StaticLibrary( 'myqt', 'my_qobject.cpp' )
+""")
+
+ self.run(chdir = self.workpath('qt', 'lib'),
+ arguments = '.',
+ stderr = noisy_ar,
+ match = self.match_re_dotall)
+
+ self.QT = self.workpath(dir)
+ self.QT_LIB = 'myqt'
+ self.QT_MOC = '%s %s' % (python, self.workpath(dir, 'bin', 'mymoc.py'))
+ self.QT_UIC = '%s %s' % (python, self.workpath(dir, 'bin', 'myuic.py'))
+
+ def Qt_create_SConstruct(self, place):
+ if type(place) is type([]):
+ place = apply(test.workpath, place)
+ self.write(place, """\
+if ARGUMENTS.get('noqtdir', 0): QTDIR=None
+else: QTDIR=r'%s'
+env = Environment(QTDIR = QTDIR,
+ QT_LIB = r'%s',
+ QT_MOC = r'%s',
+ QT_UIC = r'%s',
+ tools=['default','qt'])
+dup = 1
+if ARGUMENTS.get('build_dir', 0):
+ if ARGUMENTS.get('chdir', 0):
+ SConscriptChdir(1)
+ else:
+ SConscriptChdir(0)
+ dup=int(ARGUMENTS.get('dup', 1))
+ if dup == 0:
+ builddir = 'build_dup0'
+ env['QT_DEBUG'] = 1
+ else:
+ builddir = 'build'
+ BuildDir(builddir, '.', duplicate=dup)
+ print builddir, dup
+ sconscript = Dir(builddir).File('SConscript')
+else:
+ sconscript = File('SConscript')
+Export("env dup")
+SConscript( sconscript )
+""" % (self.QT, self.QT_LIB, self.QT_MOC, self.QT_UIC))
+
+ def msvs_versions(self):
+ if not hasattr(self, '_msvs_versions'):
+
+ # Determine the SCons version and the versions of the MSVS
+ # environments installed on the test machine.
+ #
+ # We do this by executing SCons with an SConstruct file
+ # (piped on stdin) that spits out Python assignments that
+ # we can just exec(). We construct the SCons.__"version"__
+ # string in the input here so that the SCons build itself
+ # doesn't fill it in when packaging SCons.
+ input = """\
+import SCons
+print "self._scons_version =", repr(SCons.__%s__)
+env = Environment();
+print "self._msvs_versions =", str(env['MSVS']['VERSIONS'])
+""" % 'version'
+
+ self.run(arguments = '-n -q -Q -f -', stdin = input)
+ exec(self.stdout())
+
+ return self._msvs_versions
+
+ def vcproj_sys_path(self, fname):
+ """
+ """
+ orig = 'sys.path = [ join(sys'
+
+ enginepath = repr(os.path.join(self._cwd, '..', 'engine'))
+ replace = 'sys.path = [ %s, join(sys' % enginepath
+
+ contents = self.read(fname)
+ contents = string.replace(contents, orig, replace)
+ self.write(fname, contents)
+
+ def msvs_substitute(self, input, msvs_ver,
+ subdir=None, sconscript=None,
+ python=sys.executable,
+ project_guid=None):
+ if not hasattr(self, '_msvs_versions'):
+ self.msvs_versions()
+
+ if subdir:
+ workpath = self.workpath(subdir)
+ else:
+ workpath = self.workpath()
+
+ if sconscript is None:
+ sconscript = self.workpath('SConstruct')
+
+ if project_guid is None:
+ project_guid = "{E5466E26-0003-F18B-8F8A-BCD76C86388D}"
+
+ if os.environ.has_key('SCONS_LIB_DIR'):
+ exec_script_main = "from os.path import join; import sys; sys.path = [ r'%s' ] + sys.path; import SCons.Script; SCons.Script.main()" % os.environ['SCONS_LIB_DIR']
+ else:
+ exec_script_main = "from os.path import join; import sys; sys.path = [ join(sys.prefix, 'Lib', 'site-packages', 'scons-%s'), join(sys.prefix, 'scons-%s'), join(sys.prefix, 'Lib', 'site-packages', 'scons'), join(sys.prefix, 'scons') ] + sys.path; import SCons.Script; SCons.Script.main()" % (self._scons_version, self._scons_version)
+ exec_script_main_xml = string.replace(exec_script_main, "'", "&apos;")
+
+ result = string.replace(input, r'<WORKPATH>', workpath)
+ result = string.replace(result, r'<PYTHON>', python)
+ result = string.replace(result, r'<SCONSCRIPT>', sconscript)
+ result = string.replace(result, r'<SCONS_SCRIPT_MAIN>', exec_script_main)
+ result = string.replace(result, r'<SCONS_SCRIPT_MAIN_XML>', exec_script_main_xml)
+ result = string.replace(result, r'<PROJECT_GUID>', project_guid)
+ return result
+
+ def get_msvs_executable(self, version):
+ """Returns a full path to the executable (MSDEV or devenv)
+ for the specified version of Visual Studio.
+ """
+ sub_path = {
+ '6.0' : ['Common', 'MSDev98', 'Bin', 'MSDEV.COM'],
+ '7.0' : ['Common7', 'IDE', 'devenv.com'],
+ '7.1' : ['Common7', 'IDE', 'devenv.com'],
+ '8.0' : ['Common7', 'IDE', 'devenv.com'],
+ }
+ from SCons.Tool.msvs import get_msvs_install_dirs
+ vs_path = get_msvs_install_dirs(version)['VSINSTALLDIR']
+ return apply(os.path.join, [vs_path] + sub_path[version])
+
+# In some environments, $AR will generate a warning message to stderr
+# if the library doesn't previously exist and is being created. One
+# way to fix this is to tell AR to be quiet (sometimes the 'c' flag),
+# but this is difficult to do in a platform-/implementation-specific
+# method. Instead, we will use the following as a stderr match for
+# tests that use AR so that we will view zero or more "ar: creating
+# <file>" messages to be successful executions of the test (see
+# test/AR.py for sample usage).
+
+noisy_ar=r'(ar: creating( archive)? \S+\n?)*'
diff --git a/QMTest/classes.qmc b/QMTest/classes.qmc
new file mode 100644
index 0000000..73e3df3
--- /dev/null
+++ b/QMTest/classes.qmc
@@ -0,0 +1,11 @@
+<?xml version="1.0" ?>
+<!DOCTYPE class-directory
+ PUBLIC '-//QM/2.3/Class-Directory//EN'
+ 'http://www.codesourcery.com/qm/dtds/2.3/-//qm/2.3/class-directory//en.dtd'>
+<class-directory>
+ <class kind="database" name="scons_tdb.Database"/>
+ <class kind="test" name="scons_tdb.Test"/>
+ <class kind="result_stream" name="scons_tdb.AegisChangeStream"/>
+ <class kind="result_stream" name="scons_tdb.AegisBaselineStream"/>
+ <class kind="result_stream" name="scons_tdb.AegisBatchStream"/>
+</class-directory>
diff --git a/QMTest/configuration b/QMTest/configuration
new file mode 100644
index 0000000..db648ae
--- /dev/null
+++ b/QMTest/configuration
@@ -0,0 +1,6 @@
+<?xml version='1.0' encoding='ISO-8859-1'?>
+<extension class="scons_tdb.Database" kind="database">
+ <argument name="srcdir">
+ <text>.</text>
+ </argument>
+</extension>
diff --git a/QMTest/scons_tdb.py b/QMTest/scons_tdb.py
new file mode 100644
index 0000000..145c2a7
--- /dev/null
+++ b/QMTest/scons_tdb.py
@@ -0,0 +1,520 @@
+#!/usr/bin/env python
+#
+# __COPYRIGHT__
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
+# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
+# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+#
+
+__revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
+
+"""
+QMTest classes to support SCons' testing and Aegis-inspired workflow.
+
+Thanks to Stefan Seefeld for the initial code.
+"""
+
+########################################################################
+# Imports
+########################################################################
+
+import qm
+import qm.common
+import qm.test.base
+from qm.fields import *
+from qm.executable import *
+from qm.test import database
+from qm.test import test
+from qm.test import resource
+from qm.test import suite
+from qm.test.result import Result
+from qm.test.file_result_stream import FileResultStream
+from qm.test.classes.text_result_stream import TextResultStream
+from qm.test.directory_suite import DirectorySuite
+from qm.extension import get_extension_class_name, get_class_arguments_as_dictionary
+import os, dircache
+
+if sys.platform == 'win32':
+ console = 'con'
+else:
+ console = '/dev/tty'
+
+def Trace(msg):
+ open(console, 'w').write(msg)
+
+# QMTest 2.3 hard-codes how it captures the beginning and end time by
+# calling the qm.common.format_time_iso() function, which canonicalizes
+# the time stamp in one-second granularity ISO format. In order to get
+# sub-second granularity, as well as to use the more precise time.clock()
+# function on Windows, we must replace that function with our own.
+
+orig_format_time_iso = qm.common.format_time_iso
+
+if sys.platform == 'win32':
+ time_func = time.clock
+else:
+ time_func = time.time
+
+def my_format_time(time_secs=None):
+ return str(time_func())
+
+qm.common.format_time_iso = my_format_time
+
+########################################################################
+# Classes
+########################################################################
+
+def get_explicit_arguments(e):
+ """This function can be removed once QMTest 2.4 is out."""
+
+ # Get all of the arguments.
+ arguments = get_class_arguments_as_dictionary(e.__class__)
+ # Determine which subset of the 'arguments' have been set
+ # explicitly.
+ explicit_arguments = {}
+ for name, field in arguments.items():
+ # Do not record computed fields.
+ if field.IsComputed():
+ continue
+ if e.__dict__.has_key(name):
+ explicit_arguments[name] = e.__dict__[name]
+
+ return explicit_arguments
+
+
+def check_exit_status(result, prefix, desc, status):
+ """This function can be removed once QMTest 2.4 is out."""
+
+ if sys.platform == "win32" or os.WIFEXITED(status):
+ # Obtain the exit code.
+ if sys.platform == "win32":
+ exit_code = status
+ else:
+ exit_code = os.WEXITSTATUS(status)
+ # If the exit code is non-zero, the test fails.
+ if exit_code != 0:
+ result.Fail("%s failed with exit code %d." % (desc, exit_code))
+ # Record the exit code in the result.
+ result[prefix + "exit_code"] = str(exit_code)
+ return False
+
+ elif os.WIFSIGNALED(status):
+ # Obtain the signal number.
+ signal = os.WTERMSIG(status)
+ # If the program gets a fatal signal, the test fails .
+ result.Fail("%s received fatal signal %d." % (desc, signal))
+ result[prefix + "signal"] = str(signal)
+ return False
+ else:
+ # A process should only be able to stop by exiting, or
+ # by being terminated with a signal.
+ assert None
+
+ return True
+
+# XXX I'd like to annotate the overall test run with the following
+# information about the Python version, SCons version, and environment.
+# Not sure how to do that yet; ask Stefan.
+#
+# sys_keys = ['byteorder', 'exec_prefix', 'executable', 'maxint', 'maxunicode', 'platform', 'prefix', 'version', 'version_info']
+
+# " <%s>" % tag
+# " <version>%s</version>" % module.__version__
+# " <build>%s</build>" % module.__build__
+# " <buildsys>%s</buildsys>" % module.__buildsys__
+# " <date>%s</date>" % module.__date__
+# " <developer>%s</developer>" % module.__developer__
+# " </%s>" % tag
+
+# " <scons>"
+# print_version_info("script", scons)
+# print_version_info("engine", SCons)
+# " </scons>"
+
+# environ_keys = [
+# 'PATH',
+# 'SCONSFLAGS',
+# 'SCONS_LIB_DIR',
+# 'PYTHON_ROOT',
+# 'QTDIR',
+#
+# 'COMSPEC',
+# 'INTEL_LICENSE_FILE',
+# 'INCLUDE',
+# 'LIB',
+# 'MSDEVDIR',
+# 'OS',
+# 'PATHEXT',
+# 'SYSTEMROOT',
+# 'TEMP',
+# 'TMP',
+# 'USERNAME',
+# 'VXDOMNTOOLS',
+# 'WINDIR',
+# 'XYZZY'
+#
+# 'ENV',
+# 'HOME',
+# 'LANG',
+# 'LANGUAGE',
+# 'LOGNAME',
+# 'MACHINE',
+# 'OLDPWD',
+# 'PWD',
+# 'OPSYS',
+# 'SHELL',
+# 'TMPDIR',
+# 'USER',
+# ]
+
+class AegisStream(TextResultStream):
+ def __init__(self, *args, **kw):
+ super(AegisStream, self).__init__(*args, **kw)
+ self._num_tests = 0
+ self._outcomes = {}
+ self._outcome_counts = {}
+ for outcome in AegisTest.aegis_outcomes:
+ self._outcome_counts[outcome] = 0
+ self.format = "full"
+ def _percent(self, outcome):
+ return 100. * self._outcome_counts[outcome] / self._num_tests
+ def _aegis_no_result(self, result):
+ outcome = result.GetOutcome()
+ return (outcome == Result.FAIL and result.get('Test.exit_code') == '2')
+ def _DisplayText(self, text):
+ # qm.common.html_to_text() uses htmllib, which sticks an extra
+ # '\n' on the front of the text. Strip it and only display
+ # the text if there's anything to display.
+ text = qm.common.html_to_text(text)
+ if text[0] == '\n':
+ text = text[1:]
+ if text:
+ lines = text.splitlines()
+ if lines[-1] == '':
+ lines = lines[:-1]
+ self.file.write(' ' + '\n '.join(lines) + '\n\n')
+ def _DisplayResult(self, result, format):
+ test_id = result.GetId()
+ kind = result.GetKind()
+ if self._aegis_no_result(result):
+ outcome = "NO_RESULT"
+ else:
+ outcome = result.GetOutcome()
+ self._WriteOutcome(test_id, kind, outcome)
+ self.file.write('\n')
+ def _DisplayAnnotations(self, result):
+ try:
+ self._DisplayText(result["Test.stdout"])
+ except KeyError:
+ pass
+ try:
+ self._DisplayText(result["Test.stderr"])
+ except KeyError:
+ pass
+ if result["Test.print_time"] != "0":
+ start = float(result['qmtest.start_time'])
+ end = float(result['qmtest.end_time'])
+ fmt = " Total execution time: %.1f seconds\n\n"
+ self.file.write(fmt % (end - start))
+
+class AegisChangeStream(AegisStream):
+ def WriteResult(self, result):
+ test_id = result.GetId()
+ if self._aegis_no_result(result):
+ outcome = AegisTest.NO_RESULT
+ else:
+ outcome = result.GetOutcome()
+ self._num_tests += 1
+ self._outcome_counts[outcome] += 1
+ super(AegisStream, self).WriteResult(result)
+ def _SummarizeTestStats(self):
+ self.file.write("\n")
+ self._DisplayHeading("STATISTICS")
+ if self._num_tests != 0:
+ # We'd like to use the _FormatStatistics() method to do
+ # this, but it's wrapped around the list in Result.outcomes,
+ # so it's simpler to just do it ourselves.
+ print " %6d tests total\n" % self._num_tests
+ for outcome in AegisTest.aegis_outcomes:
+ if self._outcome_counts[outcome] != 0:
+ print " %6d (%3.0f%%) tests %s" % (
+ self._outcome_counts[outcome],
+ self._percent(outcome),
+ outcome
+ )
+
+class AegisBaselineStream(AegisStream):
+ def WriteResult(self, result):
+ test_id = result.GetId()
+ if self._aegis_no_result(result):
+ outcome = AegisTest.NO_RESULT
+ self.expected_outcomes[test_id] = Result.PASS
+ self._outcome_counts[outcome] += 1
+ else:
+ self.expected_outcomes[test_id] = Result.FAIL
+ outcome = result.GetOutcome()
+ if outcome != Result.Fail:
+ self._outcome_counts[outcome] += 1
+ self._num_tests += 1
+ super(AegisStream, self).WriteResult(result)
+ def _SummarizeRelativeTestStats(self):
+ self.file.write("\n")
+ self._DisplayHeading("STATISTICS")
+ if self._num_tests != 0:
+ # We'd like to use the _FormatStatistics() method to do
+ # this, but it's wrapped around the list in Result.outcomes,
+ # so it's simpler to just do it ourselves.
+ if self._outcome_counts[AegisTest.FAIL]:
+ print " %6d (%3.0f%%) tests as expected" % (
+ self._outcome_counts[AegisTest.FAIL],
+ self._percent(AegisTest.FAIL),
+ )
+ non_fail_outcomes = list(AegisTest.aegis_outcomes[:])
+ non_fail_outcomes.remove(AegisTest.FAIL)
+ for outcome in non_fail_outcomes:
+ if self._outcome_counts[outcome] != 0:
+ print " %6d (%3.0f%%) tests unexpected %s" % (
+ self._outcome_counts[outcome],
+ self._percent(outcome),
+ outcome,
+ )
+
+class AegisBatchStream(FileResultStream):
+ arguments = [
+ qm.fields.TextField(
+ name = "results_file",
+ title = "Aegis Results File",
+ description = """
+ """,
+ verbatim = "true",
+ default_value = "aegis-results.txt",
+ ),
+ ]
+ def __init__(self, arguments):
+ self.filename = arguments['results_file']
+ super(AegisBatchStream, self).__init__(arguments)
+ self._outcomes = {}
+ def WriteResult(self, result):
+ test_id = result.GetId()
+ kind = result.GetKind()
+ outcome = result.GetOutcome()
+ exit_status = '0'
+ if outcome == Result.FAIL:
+ exit_status = result.get('Test.exit_code')
+ self._outcomes[test_id] = exit_status
+ def Summarize(self):
+ self.file.write('test_result = [\n')
+ for file_name, exit_status in self._outcomes.items():
+ self.file.write(' { file_name = "%s";\n' % file_name)
+ self.file.write(' exit_status = %s; },\n' % exit_status)
+ self.file.write('];\n')
+
+class AegisTest(test.Test):
+ PASS = "PASS"
+ FAIL = "FAIL"
+ NO_RESULT = "NO_RESULT"
+ ERROR = "ERROR"
+ UNTESTED = "UNTESTED"
+
+ aegis_outcomes = (
+ PASS, FAIL, NO_RESULT, ERROR, UNTESTED,
+ )
+ """Aegis test outcomes."""
+
+class Test(AegisTest):
+ """Simple test that runs a python script and checks the status
+ to determine whether the test passes."""
+
+ script = TextField(title="Script to test")
+ topdir = TextField(title="Top source directory")
+
+ def Run(self, context, result):
+ """Run the test. The test passes if the command exits with status=0,
+ and fails otherwise. The program output is logged, but not validated."""
+
+ command = RedirectedExecutable()
+ args = [context.get('python', 'python'), self.script]
+ status = command.Run(args, os.environ)
+ result["Test.print_time"] = context.get('print_time', '0')
+ if not check_exit_status(result, 'Test.', self.script, status):
+ # In case of failure record exit code, stdout, and stderr.
+ result.Fail("Non-zero exit_code.")
+ result["Test.stdout"] = result.Quote(command.stdout)
+ result["Test.stderr"] = result.Quote(command.stderr)
+
+
+class Database(database.Database):
+ """Scons test database.
+ * The 'src' and 'test' directories are explicit suites.
+ * Their subdirectories are implicit suites.
+ * All files under 'src/' ending with 'Tests.py' contain tests.
+ * All files under 'test/' with extension '.py' contain tests.
+ * Right now there is only a single test class, which simply runs
+ the specified python interpreter on the given script. To be refined..."""
+
+ srcdir = TextField(title = "Source Directory",
+ description = "The root of the test suite's source tree.")
+ _is_generic_database = True
+
+ def is_a_test_under_test(path, t):
+ return os.path.splitext(t)[1] == '.py' \
+ and os.path.isfile(os.path.join(path, t))
+
+ def is_a_test_under_src(path, t):
+ return t[-8:] == 'Tests.py' \
+ and os.path.isfile(os.path.join(path, t))
+
+ is_a_test = {
+ 'src' : is_a_test_under_src,
+ 'test' : is_a_test_under_test,
+ }
+
+ exclude_subdirs = {
+ '.svn' : 1,
+ 'CVS' : 1,
+ }
+
+ def is_a_test_subdir(path, subdir):
+ if exclude_subdirs.get(subdir):
+ return None
+ return os.path.isdir(os.path.join(path, subdir))
+
+ def __init__(self, path, arguments):
+
+ self.label_class = "file_label.FileLabel"
+ self.modifiable = "false"
+ # Initialize the base class.
+ super(Database, self).__init__(path, arguments)
+
+
+ def GetRoot(self):
+
+ return self.srcdir
+
+
+ def GetSubdirectories(self, directory):
+
+ components = self.GetLabelComponents(directory)
+ path = os.path.join(self.GetRoot(), *components)
+ if directory:
+ dirs = [d for d in dircache.listdir(path)
+ if os.path.isdir(os.path.join(path, d))]
+ else:
+ dirs = self.is_a_test.keys()
+
+ dirs.sort()
+ return dirs
+
+
+ def GetIds(self, kind, directory = "", scan_subdirs = 1):
+
+ components = self.GetLabelComponents(directory)
+ path = os.path.join(self.GetRoot(), *components)
+
+ if kind == database.Database.TEST:
+
+ if not components:
+ return []
+
+ ids = [self.JoinLabels(directory, t)
+ for t in dircache.listdir(path)
+ if self.is_a_test[components[0]](path, t)]
+
+ elif kind == Database.RESOURCE:
+ return [] # no resources yet
+
+ else: # SUITE
+
+ if directory:
+ ids = [self.JoinLabels(directory, d)
+ for d in dircache.listdir(path)
+ if os.path.isdir(os.path.join(path, d))]
+ else:
+ ids = self.is_a_test.keys()
+
+ if scan_subdirs:
+ for d in dircache.listdir(path):
+ if (os.path.isdir(d)):
+ ids.extend(self.GetIds(kind,
+ self.JoinLabels(directory, d),
+ True))
+
+ return ids
+
+
+ def GetExtension(self, id):
+
+ if not id:
+ return DirectorySuite(self, id)
+
+ components = self.GetLabelComponents(id)
+ path = os.path.join(self.GetRoot(), *components)
+
+ if os.path.isdir(path): # a directory
+ return DirectorySuite(self, id)
+
+ elif os.path.isfile(path): # a test
+
+ arguments = {}
+ arguments['script'] = path
+ arguments['topdir'] = self.GetRoot()
+
+ return Test(arguments, qmtest_id = id, qmtest_database = self)
+
+ else: # nothing else to offer
+
+ return None
+
+
+ def GetTest(self, test_id):
+ """This method can be removed once QMTest 2.4 is out."""
+
+ t = self.GetExtension(test_id)
+ if isinstance(t, test.Test):
+ return database.TestDescriptor(self,
+ test_id,
+ get_extension_class_name(t.__class__),
+ get_explicit_arguments(t))
+
+ raise database.NoSuchTestError(test_id)
+
+ def GetSuite(self, suite_id):
+ """This method can be removed once QMTest 2.4 is out."""
+
+ if suite_id == "":
+ return DirectorySuite(self, "")
+
+ s = self.GetExtension(suite_id)
+ if isinstance(s, suite.Suite):
+ return s
+
+ raise database.NoSuchSuiteError(suite_id)
+
+
+ def GetResource(self, resource_id):
+ """This method can be removed once QMTest 2.4 is out."""
+
+ r = self.GetExtension(resource_id)
+ if isinstance(r, resource.Resource):
+ return ResourceDescriptor(self,
+ resource_id,
+ get_extension_class_name(r.__class__),
+ get_explicit_arguments(r))
+
+ raise database.NoSuchResourceError(resource_id)
diff --git a/QMTest/unittest.py b/QMTest/unittest.py
new file mode 100644
index 0000000..f0540a5
--- /dev/null
+++ b/QMTest/unittest.py
@@ -0,0 +1,693 @@
+#!/usr/bin/env python
+"""
+Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
+Smalltalk testing framework.
+
+Further information is available in the bundled documentation, and from
+
+ http://pyunit.sourceforge.net/
+
+This module contains the core framework classes that form the basis of
+specific test cases and suites (TestCase, TestSuite etc.), and also a
+text-based utility class for running the tests and reporting the results
+(TextTestRunner).
+
+Copyright (c) 1999, 2000, 2001 Steve Purcell
+This module is free software, and you may redistribute it and/or modify
+it under the same terms as Python itself, so long as this copyright message
+and disclaimer are retained in their original form.
+
+IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
+SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
+THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
+AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+"""
+
+__author__ = "Steve Purcell (stephen_purcell@yahoo.com)"
+__version__ = "$ Revision: 1.23 $"[11:-2]
+
+import time
+import sys
+import traceback
+import string
+import os
+
+##############################################################################
+# A platform-specific concession to help the code work for JPython users
+##############################################################################
+
+plat = string.lower(sys.platform)
+_isJPython = string.find(plat, 'java') >= 0 or string.find(plat, 'jdk') >= 0
+del plat
+
+
+##############################################################################
+# Test framework core
+##############################################################################
+
+class TestResult:
+ """Holder for test result information.
+
+ Test results are automatically managed by the TestCase and TestSuite
+ classes, and do not need to be explicitly manipulated by writers of tests.
+
+ Each instance holds the total number of tests run, and collections of
+ failures and errors that occurred among those test runs. The collections
+ contain tuples of (testcase, exceptioninfo), where exceptioninfo is a
+ tuple of values as returned by sys.exc_info().
+ """
+ def __init__(self):
+ self.failures = []
+ self.errors = []
+ self.testsRun = 0
+ self.shouldStop = 0
+
+ def startTest(self, test):
+ "Called when the given test is about to be run"
+ self.testsRun = self.testsRun + 1
+
+ def stopTest(self, test):
+ "Called when the given test has been run"
+ pass
+
+ def addError(self, test, err):
+ "Called when an error has occurred"
+ self.errors.append((test, err))
+
+ def addFailure(self, test, err):
+ "Called when a failure has occurred"
+ self.failures.append((test, err))
+
+ def wasSuccessful(self):
+ "Tells whether or not this result was a success"
+ return len(self.failures) == len(self.errors) == 0
+
+ def stop(self):
+ "Indicates that the tests should be aborted"
+ self.shouldStop = 1
+
+ def __repr__(self):
+ return "<%s run=%i errors=%i failures=%i>" % \
+ (self.__class__, self.testsRun, len(self.errors),
+ len(self.failures))
+
+
+class TestCase:
+ """A class whose instances are single test cases.
+
+ Test authors should subclass TestCase for their own tests. Construction
+ and deconstruction of the test's environment ('fixture') can be
+ implemented by overriding the 'setUp' and 'tearDown' methods respectively.
+
+ By default, the test code itself should be placed in a method named
+ 'runTest'.
+
+ If the fixture may be used for many test cases, create as
+ many test methods as are needed. When instantiating such a TestCase
+ subclass, specify in the constructor arguments the name of the test method
+ that the instance is to execute.
+
+ If it is necessary to override the __init__ method, the base class
+ __init__ method must always be called.
+ """
+ def __init__(self, methodName='runTest'):
+ """Create an instance of the class that will use the named test
+ method when executed. Raises a ValueError if the instance does
+ not have a method with the specified name.
+ """
+ try:
+ self.__testMethod = getattr(self,methodName)
+ except AttributeError:
+ raise ValueError, "no such test method in %s: %s" % \
+ (self.__class__, methodName)
+
+ def setUp(self):
+ "Hook method for setting up the test fixture before exercising it."
+ pass
+
+ def tearDown(self):
+ "Hook method for deconstructing the test fixture after testing it."
+ pass
+
+ def countTestCases(self):
+ return 1
+
+ def defaultTestResult(self):
+ return TestResult()
+
+ def shortDescription(self):
+ """Returns a one-line description of the test, or None if no
+ description has been provided.
+
+ The default implementation of this method returns the first line of
+ the specified test method's docstring.
+ """
+ doc = self.__testMethod.__doc__
+ return doc and string.strip(string.split(doc, "\n")[0]) or None
+
+ def id(self):
+ return "%s.%s" % (self.__class__, self.__testMethod.__name__)
+
+ def __str__(self):
+ return "%s (%s)" % (self.__testMethod.__name__, self.__class__)
+
+ def __repr__(self):
+ return "<%s testMethod=%s>" % \
+ (self.__class__, self.__testMethod.__name__)
+
+ def run(self, result=None):
+ return self(result)
+
+ def __call__(self, result=None):
+ if result is None: result = self.defaultTestResult()
+ result.startTest(self)
+ try:
+ try:
+ self.setUp()
+ except:
+ result.addError(self,self.__exc_info())
+ return
+
+ try:
+ self.__testMethod()
+ except AssertionError, e:
+ result.addFailure(self,self.__exc_info())
+ except:
+ result.addError(self,self.__exc_info())
+
+ try:
+ self.tearDown()
+ except:
+ result.addError(self,self.__exc_info())
+ finally:
+ result.stopTest(self)
+
+ def debug(self):
+ """Run the test without collecting errors in a TestResult"""
+ self.setUp()
+ self.__testMethod()
+ self.tearDown()
+
+ def assert_(self, expr, msg=None):
+ """Equivalent of built-in 'assert', but is not optimised out when
+ __debug__ is false.
+ """
+ if not expr:
+ raise AssertionError, msg
+
+ failUnless = assert_
+
+ def failIf(self, expr, msg=None):
+ "Fail the test if the expression is true."
+ apply(self.assert_,(not expr,msg))
+
+ def assertRaises(self, excClass, callableObj, *args, **kwargs):
+ """Assert that an exception of class excClass is thrown
+ by callableObj when invoked with arguments args and keyword
+ arguments kwargs. If a different type of exception is
+ thrown, it will not be caught, and the test case will be
+ deemed to have suffered an error, exactly as for an
+ unexpected exception.
+ """
+ try:
+ apply(callableObj, args, kwargs)
+ except excClass:
+ return
+ else:
+ if hasattr(excClass,'__name__'): excName = excClass.__name__
+ else: excName = str(excClass)
+ raise AssertionError, excName
+
+ def fail(self, msg=None):
+ """Fail immediately, with the given message."""
+ raise AssertionError, msg
+
+ def __exc_info(self):
+ """Return a version of sys.exc_info() with the traceback frame
+ minimised; usually the top level of the traceback frame is not
+ needed.
+ """
+ exctype, excvalue, tb = sys.exc_info()
+ newtb = tb.tb_next
+ if newtb is None:
+ return (exctype, excvalue, tb)
+ return (exctype, excvalue, newtb)
+
+
+class TestSuite:
+ """A test suite is a composite test consisting of a number of TestCases.
+
+ For use, create an instance of TestSuite, then add test case instances.
+ When all tests have been added, the suite can be passed to a test
+ runner, such as TextTestRunner. It will run the individual test cases
+ in the order in which they were added, aggregating the results. When
+ subclassing, do not forget to call the base class constructor.
+ """
+ def __init__(self, tests=()):
+ self._tests = []
+ self.addTests(tests)
+
+ def __repr__(self):
+ return "<%s tests=%s>" % (self.__class__, self._tests)
+
+ __str__ = __repr__
+
+ def countTestCases(self):
+ cases = 0
+ for test in self._tests:
+ cases = cases + test.countTestCases()
+ return cases
+
+ def addTest(self, test):
+ self._tests.append(test)
+
+ def addTests(self, tests):
+ for test in tests:
+ self.addTest(test)
+
+ def run(self, result):
+ return self(result)
+
+ def __call__(self, result):
+ for test in self._tests:
+ if result.shouldStop:
+ break
+ test(result)
+ return result
+
+ def debug(self):
+ """Run the tests without collecting errors in a TestResult"""
+ for test in self._tests: test.debug()
+
+
+class FunctionTestCase(TestCase):
+ """A test case that wraps a test function.
+
+ This is useful for slipping pre-existing test functions into the
+ PyUnit framework. Optionally, set-up and tidy-up functions can be
+ supplied. As with TestCase, the tidy-up ('tearDown') function will
+ always be called if the set-up ('setUp') function ran successfully.
+ """
+
+ def __init__(self, testFunc, setUp=None, tearDown=None,
+ description=None):
+ TestCase.__init__(self)
+ self.__setUpFunc = setUp
+ self.__tearDownFunc = tearDown
+ self.__testFunc = testFunc
+ self.__description = description
+
+ def setUp(self):
+ if self.__setUpFunc is not None:
+ self.__setUpFunc()
+
+ def tearDown(self):
+ if self.__tearDownFunc is not None:
+ self.__tearDownFunc()
+
+ def runTest(self):
+ self.__testFunc()
+
+ def id(self):
+ return self.__testFunc.__name__
+
+ def __str__(self):
+ return "%s (%s)" % (self.__class__, self.__testFunc.__name__)
+
+ def __repr__(self):
+ return "<%s testFunc=%s>" % (self.__class__, self.__testFunc)
+
+ def shortDescription(self):
+ if self.__description is not None: return self.__description
+ doc = self.__testFunc.__doc__
+ return doc and string.strip(string.split(doc, "\n")[0]) or None
+
+
+
+##############################################################################
+# Convenience functions
+##############################################################################
+
+def getTestCaseNames(testCaseClass, prefix, sortUsing=cmp):
+ """Extracts all the names of functions in the given test case class
+ and its base classes that start with the given prefix. This is used
+ by makeSuite().
+ """
+ testFnNames = filter(lambda n,p=prefix: n[:len(p)] == p,
+ dir(testCaseClass))
+ for baseclass in testCaseClass.__bases__:
+ testFnNames = testFnNames + \
+ getTestCaseNames(baseclass, prefix, sortUsing=None)
+ if sortUsing:
+ testFnNames.sort(sortUsing)
+ return testFnNames
+
+
+def makeSuite(testCaseClass, prefix='test', sortUsing=cmp):
+ """Returns a TestSuite instance built from all of the test functions
+ in the given test case class whose names begin with the given
+ prefix. The cases are sorted by their function names
+ using the supplied comparison function, which defaults to 'cmp'.
+ """
+ cases = map(testCaseClass,
+ getTestCaseNames(testCaseClass, prefix, sortUsing))
+ return TestSuite(cases)
+
+
+def createTestInstance(name, module=None):
+ """Finds tests by their name, optionally only within the given module.
+
+ Return the newly-constructed test, ready to run. If the name contains a ':'
+ then the portion of the name after the colon is used to find a specific
+ test case within the test case class named before the colon.
+
+ Examples:
+ findTest('examples.listtests.suite')
+ -- returns result of calling 'suite'
+ findTest('examples.listtests.ListTestCase:checkAppend')
+ -- returns result of calling ListTestCase('checkAppend')
+ findTest('examples.listtests.ListTestCase:check-')
+ -- returns result of calling makeSuite(ListTestCase, prefix="check")
+ """
+
+ spec = string.split(name, ':')
+ if len(spec) > 2: raise ValueError, "illegal test name: %s" % name
+ if len(spec) == 1:
+ testName = spec[0]
+ caseName = None
+ else:
+ testName, caseName = spec
+ parts = string.split(testName, '.')
+ if module is None:
+ if len(parts) < 2:
+ raise ValueError, "incomplete test name: %s" % name
+ constructor = __import__(string.join(parts[:-1],'.'))
+ parts = parts[1:]
+ else:
+ constructor = module
+ for part in parts:
+ constructor = getattr(constructor, part)
+ if not callable(constructor):
+ raise ValueError, "%s is not a callable object" % constructor
+ if caseName:
+ if caseName[-1] == '-':
+ prefix = caseName[:-1]
+ if not prefix:
+ raise ValueError, "prefix too short: %s" % name
+ test = makeSuite(constructor, prefix=prefix)
+ else:
+ test = constructor(caseName)
+ else:
+ test = constructor()
+ if not hasattr(test,"countTestCases"):
+ raise TypeError, \
+ "object %s found with spec %s is not a test" % (test, name)
+ return test
+
+
+##############################################################################
+# Text UI
+##############################################################################
+
+class _WritelnDecorator:
+ """Used to decorate file-like objects with a handy 'writeln' method"""
+ def __init__(self,stream):
+ self.stream = stream
+ if _isJPython:
+ import java.lang.System
+ self.linesep = java.lang.System.getProperty("line.separator")
+ else:
+ self.linesep = os.linesep
+
+ def __getattr__(self, attr):
+ return getattr(self.stream,attr)
+
+ def writeln(self, *args):
+ if args: apply(self.write, args)
+ self.write(self.linesep)
+
+
+class _JUnitTextTestResult(TestResult):
+ """A test result class that can print formatted text results to a stream.
+
+ Used by JUnitTextTestRunner.
+ """
+ def __init__(self, stream):
+ self.stream = stream
+ TestResult.__init__(self)
+
+ def addError(self, test, error):
+ TestResult.addError(self,test,error)
+ self.stream.write('E')
+ self.stream.flush()
+ if error[0] is KeyboardInterrupt:
+ self.shouldStop = 1
+
+ def addFailure(self, test, error):
+ TestResult.addFailure(self,test,error)
+ self.stream.write('F')
+ self.stream.flush()
+
+ def startTest(self, test):
+ TestResult.startTest(self,test)
+ self.stream.write('.')
+ self.stream.flush()
+
+ def printNumberedErrors(self,errFlavour,errors):
+ if not errors: return
+ if len(errors) == 1:
+ self.stream.writeln("There was 1 %s:" % errFlavour)
+ else:
+ self.stream.writeln("There were %i %ss:" %
+ (len(errors), errFlavour))
+ i = 1
+ for test,error in errors:
+ errString = string.join(apply(traceback.format_exception,error),"")
+ self.stream.writeln("%i) %s" % (i, test))
+ self.stream.writeln(errString)
+ i = i + 1
+
+ def printErrors(self):
+ self.printNumberedErrors("error",self.errors)
+
+ def printFailures(self):
+ self.printNumberedErrors("failure",self.failures)
+
+ def printHeader(self):
+ self.stream.writeln()
+ if self.wasSuccessful():
+ self.stream.writeln("OK (%i tests)" % self.testsRun)
+ else:
+ self.stream.writeln("!!!FAILURES!!!")
+ self.stream.writeln("Test Results")
+ self.stream.writeln()
+ self.stream.writeln("Run: %i ; Failures: %i ; Errors: %i" %
+ (self.testsRun, len(self.failures),
+ len(self.errors)))
+
+ def printResult(self):
+ self.printHeader()
+ self.printErrors()
+ self.printFailures()
+
+
+class JUnitTextTestRunner:
+ """A test runner class that displays results in textual form.
+
+ The display format approximates that of JUnit's 'textui' test runner.
+ This test runner may be removed in a future version of PyUnit.
+ """
+ def __init__(self, stream=sys.stderr):
+ self.stream = _WritelnDecorator(stream)
+
+ def run(self, test):
+ "Run the given test case or test suite."
+ result = _JUnitTextTestResult(self.stream)
+ startTime = time.time()
+ test(result)
+ stopTime = time.time()
+ self.stream.writeln()
+ self.stream.writeln("Time: %.3fs" % float(stopTime - startTime))
+ result.printResult()
+ return result
+
+
+##############################################################################
+# Verbose text UI
+##############################################################################
+
+class _VerboseTextTestResult(TestResult):
+ """A test result class that can print formatted text results to a stream.
+
+ Used by VerboseTextTestRunner.
+ """
+ def __init__(self, stream, descriptions):
+ TestResult.__init__(self)
+ self.stream = stream
+ self.lastFailure = None
+ self.descriptions = descriptions
+
+ def startTest(self, test):
+ TestResult.startTest(self, test)
+ if self.descriptions:
+ self.stream.write(test.shortDescription() or str(test))
+ else:
+ self.stream.write(str(test))
+ self.stream.write(" ... ")
+
+ def stopTest(self, test):
+ TestResult.stopTest(self, test)
+ if self.lastFailure is not test:
+ self.stream.writeln("ok")
+
+ def addError(self, test, err):
+ TestResult.addError(self, test, err)
+ self._printError("ERROR", test, err)
+ self.lastFailure = test
+ if err[0] is KeyboardInterrupt:
+ self.shouldStop = 1
+
+ def addFailure(self, test, err):
+ TestResult.addFailure(self, test, err)
+ self._printError("FAIL", test, err)
+ self.lastFailure = test
+
+ def _printError(self, flavour, test, err):
+ errLines = []
+ separator1 = "\t" + '=' * 70
+ separator2 = "\t" + '-' * 70
+ if not self.lastFailure is test:
+ self.stream.writeln()
+ self.stream.writeln(separator1)
+ self.stream.writeln("\t%s" % flavour)
+ self.stream.writeln(separator2)
+ for line in apply(traceback.format_exception, err):
+ for l in string.split(line,"\n")[:-1]:
+ self.stream.writeln("\t%s" % l)
+ self.stream.writeln(separator1)
+
+
+class VerboseTextTestRunner:
+ """A test runner class that displays results in textual form.
+
+ It prints out the names of tests as they are run, errors as they
+ occur, and a summary of the results at the end of the test run.
+ """
+ def __init__(self, stream=sys.stderr, descriptions=1):
+ self.stream = _WritelnDecorator(stream)
+ self.descriptions = descriptions
+
+ def run(self, test):
+ "Run the given test case or test suite."
+ result = _VerboseTextTestResult(self.stream, self.descriptions)
+ startTime = time.time()
+ test(result)
+ stopTime = time.time()
+ timeTaken = float(stopTime - startTime)
+ self.stream.writeln("-" * 78)
+ run = result.testsRun
+ self.stream.writeln("Ran %d test%s in %.3fs" %
+ (run, run > 1 and "s" or "", timeTaken))
+ self.stream.writeln()
+ if not result.wasSuccessful():
+ self.stream.write("FAILED (")
+ failed, errored = map(len, (result.failures, result.errors))
+ if failed:
+ self.stream.write("failures=%d" % failed)
+ if errored:
+ if failed: self.stream.write(", ")
+ self.stream.write("errors=%d" % errored)
+ self.stream.writeln(")")
+ else:
+ self.stream.writeln("OK")
+ return result
+
+
+# Which flavour of TextTestRunner is the default?
+TextTestRunner = VerboseTextTestRunner
+
+
+##############################################################################
+# Facilities for running tests from the command line
+##############################################################################
+
+class TestProgram:
+ """A command-line program that runs a set of tests; this is primarily
+ for making test modules conveniently executable.
+ """
+ USAGE = """\
+Usage: %(progName)s [-h|--help] [test[:(casename|prefix-)]] [...]
+
+Examples:
+ %(progName)s - run default set of tests
+ %(progName)s MyTestSuite - run suite 'MyTestSuite'
+ %(progName)s MyTestCase:checkSomething - run MyTestCase.checkSomething
+ %(progName)s MyTestCase:check- - run all 'check*' test methods
+ in MyTestCase
+"""
+ def __init__(self, module='__main__', defaultTest=None,
+ argv=None, testRunner=None):
+ if type(module) == type(''):
+ self.module = __import__(module)
+ for part in string.split(module,'.')[1:]:
+ self.module = getattr(self.module, part)
+ else:
+ self.module = module
+ if argv is None:
+ argv = sys.argv
+ self.defaultTest = defaultTest
+ self.testRunner = testRunner
+ self.progName = os.path.basename(argv[0])
+ self.parseArgs(argv)
+ self.createTests()
+ self.runTests()
+
+ def usageExit(self, msg=None):
+ if msg: print msg
+ print self.USAGE % self.__dict__
+ sys.exit(2)
+
+ def parseArgs(self, argv):
+ import getopt
+ try:
+ options, args = getopt.getopt(argv[1:], 'hH', ['help'])
+ opts = {}
+ for opt, value in options:
+ if opt in ('-h','-H','--help'):
+ self.usageExit()
+ if len(args) == 0 and self.defaultTest is None:
+ raise getopt.error, "No default test is defined."
+ if len(args) > 0:
+ self.testNames = args
+ else:
+ self.testNames = (self.defaultTest,)
+ except getopt.error, msg:
+ self.usageExit(msg)
+
+ def createTests(self):
+ tests = []
+ for testName in self.testNames:
+ tests.append(createTestInstance(testName, self.module))
+ self.test = TestSuite(tests)
+
+ def runTests(self):
+ if self.testRunner is None:
+ self.testRunner = TextTestRunner()
+ result = self.testRunner.run(self.test)
+ sys.exit(not result.wasSuccessful())
+
+main = TestProgram
+
+
+##############################################################################
+# Executing this module from the command line
+##############################################################################
+
+if __name__ == "__main__":
+ main(module=None)