summaryrefslogtreecommitdiffstats
path: root/QMTest/TestCmd.py
diff options
context:
space:
mode:
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r--QMTest/TestCmd.py367
1 files changed, 268 insertions, 99 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py
index 3fca4ec..2c90302 100644
--- a/QMTest/TestCmd.py
+++ b/QMTest/TestCmd.py
@@ -25,7 +25,11 @@ There are a bunch of keyword arguments available at instantiation:
subdir = 'subdir',
verbose = Boolean,
match = default_match_function,
- diff = default_diff_function,
+ match_stdout = default_match_stdout_function,
+ match_stderr = default_match_stderr_function,
+ diff = default_diff_stderr_function,
+ diff_stdout = default_diff_stdout_function,
+ diff_stderr = default_diff_stderr_function,
combine = Boolean)
There are a bunch of methods that let you do different things:
@@ -109,8 +113,18 @@ There are a bunch of methods that let you do different things:
test.diff(actual, expected)
+ test.diff_stderr(actual, expected)
+
+ test.diff_stdout(actual, expected)
+
test.match(actual, expected)
+ test.match_stderr(actual, expected)
+
+ test.match_stdout(actual, expected)
+
+ test.set_match_function(match, stdout, stderr)
+
test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
test.match_exact(["actual 1\n", "actual 2\n"],
["expected 1\n", "expected 2\n"])
@@ -159,8 +173,8 @@ or incorrect permissions).
TestCmd.no_result(condition, function)
TestCmd.no_result(condition, function, skip)
-The TestCmd module also provides unbound functions that handle matching
-in the same way as the match_*() methods described above.
+The TestCmd module also provides unbound global functions that handle
+matching in the same way as the match_*() methods described above.
import TestCmd
@@ -170,8 +184,28 @@ in the same way as the match_*() methods described above.
test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
-The TestCmd module provides unbound functions that can be used for the
-"diff" argument to TestCmd.TestCmd instantiation:
+These functions are also available as static methods:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_exact)
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re)
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re_dotall)
+
+These static methods can be accessed by a string naming the method:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = 'match_exact')
+
+ test = TestCmd.TestCmd(match = 'match_re')
+
+ test = TestCmd.TestCmd(match = 'match_re_dotall')
+
+The TestCmd module provides unbound global functions that can be used
+for the "diff" argument to TestCmd.TestCmd instantiation:
import TestCmd
@@ -180,6 +214,35 @@ The TestCmd module provides unbound functions that can be used for the
test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
+ test = TestCmd.TestCmd(diff = TestCmd.context_diff)
+
+ test = TestCmd.TestCmd(diff = TestCmd.unified_diff)
+
+These functions are also available as static methods:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re,
+ diff = TestCmd.TestCmd.diff_re)
+
+ test = TestCmd.TestCmd(diff = TestCmd.TestCmd.simple_diff)
+
+ test = TestCmd.TestCmd(diff = TestCmd.TestCmd.context_diff)
+
+ test = TestCmd.TestCmd(diff = TestCmd.TestCmd.unified_diff)
+
+These static methods can be accessed by a string naming the method:
+
+ import TestCmd
+
+ test = TestCmd.TestCmd(match = 'match_re', diff = 'diff_re')
+
+ test = TestCmd.TestCmd(diff = 'simple_diff')
+
+ test = TestCmd.TestCmd(diff = 'context_diff')
+
+ test = TestCmd.TestCmd(diff = 'unified_diff')
+
The "diff" argument can also be used with standard difflib functions:
import difflib
@@ -216,19 +279,27 @@ version.
from __future__ import division
__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCmd.py 1.1.D002 2010/05/27 14:47:22 knight"
-__version__ = "1.1"
+__revision__ = "TestCmd.py 1.3.D001 2010/06/03 12:58:27 knight"
+__version__ = "1.3"
import atexit
+import difflib
import errno
import os
import re
import shutil
+import signal
import stat
import sys
import tempfile
+import threading
import time
import traceback
+import types
+
+class null(object):
+ pass
+_Null = null()
try:
from collections import UserList, UserString
@@ -264,11 +335,6 @@ __all__ = [
'TestCmd'
]
-try:
- import difflib
-except ImportError:
- __all__.append('simple_diff')
-
def is_List(e):
return isinstance(e, (list, UserList))
@@ -435,35 +501,30 @@ def match_re_dotall(lines = None, res = None):
raise re.error(msg % (repr(s), e.args[0]))
return expr.match(lines)
-try:
- import difflib
-except ImportError:
- pass
-else:
- def simple_diff(a, b, fromfile='', tofile='',
- fromfiledate='', tofiledate='', n=3, lineterm='\n'):
- """
- A function with the same calling signature as difflib.context_diff
- (diff -c) and difflib.unified_diff (diff -u) but which prints
- output like the simple, unadorned 'diff" command.
- """
- sm = difflib.SequenceMatcher(None, a, b)
- def comma(x1, x2):
- return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
- result = []
- for op, a1, a2, b1, b2 in sm.get_opcodes():
- if op == 'delete':
- result.append("%sd%d" % (comma(a1, a2), b1))
- result.extend([ '< ' + l for l in a[a1:a2] ])
- elif op == 'insert':
- result.append("%da%s" % (a1, comma(b1, b2)))
- result.extend([ '> ' + l for l in b[b1:b2] ])
- elif op == 'replace':
- result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
- result.extend([ '< ' + l for l in a[a1:a2] ])
- result.append('---')
- result.extend([ '> ' + l for l in b[b1:b2] ])
- return result
+def simple_diff(a, b, fromfile='', tofile='',
+ fromfiledate='', tofiledate='', n=3, lineterm='\n'):
+ """
+ A function with the same calling signature as difflib.context_diff
+ (diff -c) and difflib.unified_diff (diff -u) but which prints
+ output like the simple, unadorned 'diff" command.
+ """
+ sm = difflib.SequenceMatcher(None, a, b)
+ def comma(x1, x2):
+ return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
+ result = []
+ for op, a1, a2, b1, b2 in sm.get_opcodes():
+ if op == 'delete':
+ result.append("%sd%d" % (comma(a1, a2), b1))
+ result.extend([ '< ' + l for l in a[a1:a2] ])
+ elif op == 'insert':
+ result.append("%da%s" % (a1, comma(b1, b2)))
+ result.extend([ '> ' + l for l in b[b1:b2] ])
+ elif op == 'replace':
+ result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
+ result.extend([ '< ' + l for l in a[a1:a2] ])
+ result.append('---')
+ result.extend([ '> ' + l for l in b[b1:b2] ])
+ return result
def diff_re(a, b, fromfile='', tofile='',
fromfiledate='', tofiledate='', n=3, lineterm='\n'):
@@ -576,7 +637,6 @@ except ImportError:
# so we're going to cobble up something that looks just enough
# like its API for our purposes below.
import popen2
- import types
subprocess = types.ModuleType('subprocess')
subprocess.PIPE = 'PIPE'
@@ -607,16 +667,31 @@ except ImportError:
self.stderr.close()
self.returncode = self.wait()
return (out, err)
+ def terminate(self):
+ os.kill(self.pid, signal.SIGTERM)
def wait(self, *args, **kw):
resultcode = popen2.Popen3.wait(self, *args, **kw)
- if os.WIFEXITED(resultcode):
+ if os.WIFSIGNALED(resultcode):
+ return (- os.WTERMSIG(resultcode))
+ elif os.WIFEXITED(resultcode):
return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
else:
return None
subprocess.Popen = Popen
+else:
+ try:
+ subprocess.Popen.terminate
+ except AttributeError:
+ if sys.platform == 'win32':
+ import win32process
+ def terminate(self):
+ win32process.TerminateProcess(self._handle, 1)
+ else:
+ def terminate(self):
+ os.kill(self.pid, signal.SIGTERM)
+ method = types.MethodType(terminate, None, subprocess.Popen)
+ setattr(subprocess.Popen, 'terminate', method)
@@ -790,9 +865,14 @@ class TestCmd(object):
subdir = None,
verbose = None,
match = None,
+ match_stdout = None,
+ match_stderr = None,
diff = None,
+ diff_stdout = None,
+ diff_stderr = None,
combine = 0,
- universal_newlines = 1):
+ universal_newlines = 1,
+ timeout = None):
self._cwd = os.getcwd()
self.description_set(description)
self.program_set(program)
@@ -805,21 +885,10 @@ class TestCmd(object):
self.verbose_set(verbose)
self.combine = combine
self.universal_newlines = universal_newlines
- if not match is None:
- self.match_function = match
- else:
- self.match_function = match_re
- if not diff is None:
- self.diff_function = diff
- else:
- try:
- difflib
- except NameError:
- pass
- else:
- self.diff_function = simple_diff
- #self.diff_function = difflib.context_diff
- #self.diff_function = difflib.unified_diff
+ self.process = None
+ self.set_timeout(timeout)
+ self.set_match_function(match, match_stdout, match_stderr)
+ self.set_diff_function(diff, diff_stdout, diff_stderr)
self._dirlist = []
self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
if 'PRESERVE' in os.environ and not os.environ['PRESERVE'] is '':
@@ -941,21 +1010,55 @@ class TestCmd(object):
"""
self.description = description
- try:
- difflib
- except NameError:
- def diff(self, a, b, name, *args, **kw):
- print self.banner('Expected %s' % name)
- print a
- print self.banner('Actual %s' % name)
- print b
- else:
- def diff(self, a, b, name, *args, **kw):
+ def set_diff_function(self, diff=_Null, stdout=_Null, stderr=_Null):
+ """Sets the specified diff functions.
+ """
+ if diff is not _Null:
+ self._diff_function = diff
+ if stdout is not _Null:
+ self._diff_stdout_function = stdout
+ if stderr is not _Null:
+ self._diff_stderr_function = stderr
+
+ def diff(self, a, b, name=None, diff_function=None, *args, **kw):
+ if diff_function is None:
+ try:
+ diff_function = getattr(self, self._diff_function)
+ except TypeError:
+ diff_function = self._diff_function
+ if diff_function is None:
+ diff_function = self.simple_diff
+ if name is not None:
print self.banner(name)
- args = (a.splitlines(), b.splitlines()) + args
- lines = self.diff_function(*args, **kw)
- for l in lines:
- print l
+ args = (a.splitlines(), b.splitlines()) + args
+ for line in diff_function(*args, **kw):
+ print line
+
+ def diff_stderr(self, a, b, *args, **kw):
+ """Compare actual and expected file contents.
+ """
+ try:
+ diff_stderr_function = getattr(self, self._diff_stderr_function)
+ except TypeError:
+ diff_stderr_function = self._diff_stderr_function
+ return self.diff(a, b, diff_function=diff_stderr_function, *args, **kw)
+
+ def diff_stdout(self, a, b, *args, **kw):
+ """Compare actual and expected file contents.
+ """
+ try:
+ diff_stdout_function = getattr(self, self._diff_stdout_function)
+ except TypeError:
+ diff_stdout_function = self._diff_stdout_function
+ return self.diff(a, b, diff_function=diff_stdout_function, *args, **kw)
+
+ simple_diff = staticmethod(simple_diff)
+
+ diff_re = staticmethod(diff_re)
+
+ context_diff = staticmethod(difflib.context_diff)
+
+ unified_diff = staticmethod(difflib.unified_diff)
def fail_test(self, condition = 1, function = None, skip = 0):
"""Cause the test to fail.
@@ -974,25 +1077,57 @@ class TestCmd(object):
"""
self.interpreter = interpreter
- def match(self, lines, matches):
- """Compare actual and expected file contents.
+ def set_match_function(self, match=_Null, stdout=_Null, stderr=_Null):
+ """Sets the specified match functions.
"""
- return self.match_function(lines, matches)
+ if match is not _Null:
+ self._match_function = match
+ if stdout is not _Null:
+ self._match_stdout_function = stdout
+ if stderr is not _Null:
+ self._match_stderr_function = stderr
- def match_exact(self, lines, matches):
+ def match(self, lines, matches):
"""Compare actual and expected file contents.
"""
- return match_exact(lines, matches)
-
- def match_re(self, lines, res):
+ try:
+ match_function = getattr(self, self._match_function)
+ except TypeError:
+ match_function = self._match_function
+ if match_function is None:
+ # Default is regular expression matches.
+ match_function = self.match_re
+ return match_function(lines, matches)
+
+ def match_stderr(self, lines, matches):
"""Compare actual and expected file contents.
"""
- return match_re(lines, res)
-
- def match_re_dotall(self, lines, res):
+ try:
+ match_stderr_function = getattr(self, self._match_stderr_function)
+ except TypeError:
+ match_stderr_function = self._match_stderr_function
+ if match_stderr_function is None:
+ # Default is to use whatever match= is set to.
+ match_stderr_function = self.match
+ return match_stderr_function(lines, matches)
+
+ def match_stdout(self, lines, matches):
"""Compare actual and expected file contents.
"""
- return match_re_dotall(lines, res)
+ try:
+ match_stdout_function = getattr(self, self._match_stdout_function)
+ except TypeError:
+ match_stdout_function = self._match_stdout_function
+ if match_stdout_function is None:
+ # Default is to use whatever match= is set to.
+ match_stdout_function = self.match
+ return match_stdout_function(lines, matches)
+
+ match_exact = staticmethod(match_exact)
+
+ match_re = staticmethod(match_re)
+
+ match_re_dotall = staticmethod(match_re_dotall)
def no_result(self, condition = 1, function = None, skip = 0):
"""Report that the test could not be run.
@@ -1057,10 +1192,20 @@ class TestCmd(object):
dir = self.canonicalize(dir)
os.rmdir(dir)
+ def _timeout(self):
+ self.process.terminate()
+ self.timer.cancel()
+ self.timer = None
+
+ def set_timeout(self, timeout):
+ self.timeout = timeout
+ self.timer = None
+
def start(self, program = None,
interpreter = None,
arguments = None,
universal_newlines = None,
+ timeout = _Null,
**kw):
"""
Starts a program or script for the test environment.
@@ -1089,20 +1234,33 @@ class TestCmd(object):
else:
stderr_value = subprocess.PIPE
- return Popen(cmd,
- stdin=stdin,
- stdout=subprocess.PIPE,
- stderr=stderr_value,
- universal_newlines=universal_newlines)
-
- def finish(self, popen, **kw):
+ if timeout is _Null:
+ timeout = self.timeout
+ if timeout:
+ self.timer = threading.Timer(float(timeout), self._timeout)
+ self.timer.start()
+ p = Popen(cmd,
+ stdin=stdin,
+ stdout=subprocess.PIPE,
+ stderr=stderr_value,
+ universal_newlines=universal_newlines)
+ self.process = p
+ return p
+
+ def finish(self, popen=None, **kw):
"""
Finishes and waits for the process being run under control of
the specified popen argument, recording the exit status,
standard output and error output.
"""
+ if popen is None:
+ popen = self.process
stdout, stderr = popen.communicate()
+ if self.timer:
+ self.timer.cancel()
+ self.timer = None
self.status = popen.returncode
+ self.process = None
self._stdout.append(stdout or '')
self._stderr.append(stderr or '')
@@ -1111,7 +1269,8 @@ class TestCmd(object):
arguments = None,
chdir = None,
stdin = None,
- universal_newlines = None):
+ universal_newlines = None,
+ timeout = _Null):
"""Runs a test of the program or script for the test
environment. Standard output and error output are saved for
future retrieval via the stdout() and stderr() methods.
@@ -1126,15 +1285,25 @@ class TestCmd(object):
if self.verbose:
sys.stderr.write("chdir(" + chdir + ")\n")
os.chdir(chdir)
- p = self.start(program,
- interpreter,
- arguments,
- universal_newlines,
- stdin=stdin)
+ p = self.start(program = program,
+ interpreter = interpreter,
+ arguments = arguments,
+ universal_newlines = universal_newlines,
+ timeout = timeout,
+ stdin = stdin)
if is_List(stdin):
stdin = ''.join(stdin)
+ # TODO(sgk): figure out how to re-use the logic in the .finish()
+ # method above. Just calling it from here causes problems with
+ # subclasses that redefine .finish(). We could abstract this
+ # into Yet Another common method called both here and by .finish(),
+ # but that seems ill-thought-out.
stdout, stderr = p.communicate(input=stdin)
+ if self.timer:
+ self.timer.cancel()
+ self.timer = None
self.status = p.returncode
+ self.process = None
self._stdout.append(stdout or '')
self._stderr.append(stderr or '')