diff options
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r-- | QMTest/TestCmd.py | 367 |
1 files changed, 268 insertions, 99 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py index 3fca4ec..2c90302 100644 --- a/QMTest/TestCmd.py +++ b/QMTest/TestCmd.py @@ -25,7 +25,11 @@ There are a bunch of keyword arguments available at instantiation: subdir = 'subdir', verbose = Boolean, match = default_match_function, - diff = default_diff_function, + match_stdout = default_match_stdout_function, + match_stderr = default_match_stderr_function, + diff = default_diff_stderr_function, + diff_stdout = default_diff_stdout_function, + diff_stderr = default_diff_stderr_function, combine = Boolean) There are a bunch of methods that let you do different things: @@ -109,8 +113,18 @@ There are a bunch of methods that let you do different things: test.diff(actual, expected) + test.diff_stderr(actual, expected) + + test.diff_stdout(actual, expected) + test.match(actual, expected) + test.match_stderr(actual, expected) + + test.match_stdout(actual, expected) + + test.set_match_function(match, stdout, stderr) + test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n") test.match_exact(["actual 1\n", "actual 2\n"], ["expected 1\n", "expected 2\n"]) @@ -159,8 +173,8 @@ or incorrect permissions). TestCmd.no_result(condition, function) TestCmd.no_result(condition, function, skip) -The TestCmd module also provides unbound functions that handle matching -in the same way as the match_*() methods described above. +The TestCmd module also provides unbound global functions that handle +matching in the same way as the match_*() methods described above. import TestCmd @@ -170,8 +184,28 @@ in the same way as the match_*() methods described above. test = TestCmd.TestCmd(match = TestCmd.match_re_dotall) -The TestCmd module provides unbound functions that can be used for the -"diff" argument to TestCmd.TestCmd instantiation: +These functions are also available as static methods: + + import TestCmd + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_exact) + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re) + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re_dotall) + +These static methods can be accessed by a string naming the method: + + import TestCmd + + test = TestCmd.TestCmd(match = 'match_exact') + + test = TestCmd.TestCmd(match = 'match_re') + + test = TestCmd.TestCmd(match = 'match_re_dotall') + +The TestCmd module provides unbound global functions that can be used +for the "diff" argument to TestCmd.TestCmd instantiation: import TestCmd @@ -180,6 +214,35 @@ The TestCmd module provides unbound functions that can be used for the test = TestCmd.TestCmd(diff = TestCmd.simple_diff) + test = TestCmd.TestCmd(diff = TestCmd.context_diff) + + test = TestCmd.TestCmd(diff = TestCmd.unified_diff) + +These functions are also available as static methods: + + import TestCmd + + test = TestCmd.TestCmd(match = TestCmd.TestCmd.match_re, + diff = TestCmd.TestCmd.diff_re) + + test = TestCmd.TestCmd(diff = TestCmd.TestCmd.simple_diff) + + test = TestCmd.TestCmd(diff = TestCmd.TestCmd.context_diff) + + test = TestCmd.TestCmd(diff = TestCmd.TestCmd.unified_diff) + +These static methods can be accessed by a string naming the method: + + import TestCmd + + test = TestCmd.TestCmd(match = 'match_re', diff = 'diff_re') + + test = TestCmd.TestCmd(diff = 'simple_diff') + + test = TestCmd.TestCmd(diff = 'context_diff') + + test = TestCmd.TestCmd(diff = 'unified_diff') + The "diff" argument can also be used with standard difflib functions: import difflib @@ -216,19 +279,27 @@ version. from __future__ import division __author__ = "Steven Knight <knight at baldmt dot com>" -__revision__ = "TestCmd.py 1.1.D002 2010/05/27 14:47:22 knight" -__version__ = "1.1" +__revision__ = "TestCmd.py 1.3.D001 2010/06/03 12:58:27 knight" +__version__ = "1.3" import atexit +import difflib import errno import os import re import shutil +import signal import stat import sys import tempfile +import threading import time import traceback +import types + +class null(object): + pass +_Null = null() try: from collections import UserList, UserString @@ -264,11 +335,6 @@ __all__ = [ 'TestCmd' ] -try: - import difflib -except ImportError: - __all__.append('simple_diff') - def is_List(e): return isinstance(e, (list, UserList)) @@ -435,35 +501,30 @@ def match_re_dotall(lines = None, res = None): raise re.error(msg % (repr(s), e.args[0])) return expr.match(lines) -try: - import difflib -except ImportError: - pass -else: - def simple_diff(a, b, fromfile='', tofile='', - fromfiledate='', tofiledate='', n=3, lineterm='\n'): - """ - A function with the same calling signature as difflib.context_diff - (diff -c) and difflib.unified_diff (diff -u) but which prints - output like the simple, unadorned 'diff" command. - """ - sm = difflib.SequenceMatcher(None, a, b) - def comma(x1, x2): - return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2) - result = [] - for op, a1, a2, b1, b2 in sm.get_opcodes(): - if op == 'delete': - result.append("%sd%d" % (comma(a1, a2), b1)) - result.extend([ '< ' + l for l in a[a1:a2] ]) - elif op == 'insert': - result.append("%da%s" % (a1, comma(b1, b2))) - result.extend([ '> ' + l for l in b[b1:b2] ]) - elif op == 'replace': - result.append("%sc%s" % (comma(a1, a2), comma(b1, b2))) - result.extend([ '< ' + l for l in a[a1:a2] ]) - result.append('---') - result.extend([ '> ' + l for l in b[b1:b2] ]) - return result +def simple_diff(a, b, fromfile='', tofile='', + fromfiledate='', tofiledate='', n=3, lineterm='\n'): + """ + A function with the same calling signature as difflib.context_diff + (diff -c) and difflib.unified_diff (diff -u) but which prints + output like the simple, unadorned 'diff" command. + """ + sm = difflib.SequenceMatcher(None, a, b) + def comma(x1, x2): + return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2) + result = [] + for op, a1, a2, b1, b2 in sm.get_opcodes(): + if op == 'delete': + result.append("%sd%d" % (comma(a1, a2), b1)) + result.extend([ '< ' + l for l in a[a1:a2] ]) + elif op == 'insert': + result.append("%da%s" % (a1, comma(b1, b2))) + result.extend([ '> ' + l for l in b[b1:b2] ]) + elif op == 'replace': + result.append("%sc%s" % (comma(a1, a2), comma(b1, b2))) + result.extend([ '< ' + l for l in a[a1:a2] ]) + result.append('---') + result.extend([ '> ' + l for l in b[b1:b2] ]) + return result def diff_re(a, b, fromfile='', tofile='', fromfiledate='', tofiledate='', n=3, lineterm='\n'): @@ -576,7 +637,6 @@ except ImportError: # so we're going to cobble up something that looks just enough # like its API for our purposes below. import popen2 - import types subprocess = types.ModuleType('subprocess') subprocess.PIPE = 'PIPE' @@ -607,16 +667,31 @@ except ImportError: self.stderr.close() self.returncode = self.wait() return (out, err) + def terminate(self): + os.kill(self.pid, signal.SIGTERM) def wait(self, *args, **kw): resultcode = popen2.Popen3.wait(self, *args, **kw) - if os.WIFEXITED(resultcode): + if os.WIFSIGNALED(resultcode): + return (- os.WTERMSIG(resultcode)) + elif os.WIFEXITED(resultcode): return os.WEXITSTATUS(resultcode) - elif os.WIFSIGNALED(resultcode): - return os.WTERMSIG(resultcode) else: return None subprocess.Popen = Popen +else: + try: + subprocess.Popen.terminate + except AttributeError: + if sys.platform == 'win32': + import win32process + def terminate(self): + win32process.TerminateProcess(self._handle, 1) + else: + def terminate(self): + os.kill(self.pid, signal.SIGTERM) + method = types.MethodType(terminate, None, subprocess.Popen) + setattr(subprocess.Popen, 'terminate', method) @@ -790,9 +865,14 @@ class TestCmd(object): subdir = None, verbose = None, match = None, + match_stdout = None, + match_stderr = None, diff = None, + diff_stdout = None, + diff_stderr = None, combine = 0, - universal_newlines = 1): + universal_newlines = 1, + timeout = None): self._cwd = os.getcwd() self.description_set(description) self.program_set(program) @@ -805,21 +885,10 @@ class TestCmd(object): self.verbose_set(verbose) self.combine = combine self.universal_newlines = universal_newlines - if not match is None: - self.match_function = match - else: - self.match_function = match_re - if not diff is None: - self.diff_function = diff - else: - try: - difflib - except NameError: - pass - else: - self.diff_function = simple_diff - #self.diff_function = difflib.context_diff - #self.diff_function = difflib.unified_diff + self.process = None + self.set_timeout(timeout) + self.set_match_function(match, match_stdout, match_stderr) + self.set_diff_function(diff, diff_stdout, diff_stderr) self._dirlist = [] self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} if 'PRESERVE' in os.environ and not os.environ['PRESERVE'] is '': @@ -941,21 +1010,55 @@ class TestCmd(object): """ self.description = description - try: - difflib - except NameError: - def diff(self, a, b, name, *args, **kw): - print self.banner('Expected %s' % name) - print a - print self.banner('Actual %s' % name) - print b - else: - def diff(self, a, b, name, *args, **kw): + def set_diff_function(self, diff=_Null, stdout=_Null, stderr=_Null): + """Sets the specified diff functions. + """ + if diff is not _Null: + self._diff_function = diff + if stdout is not _Null: + self._diff_stdout_function = stdout + if stderr is not _Null: + self._diff_stderr_function = stderr + + def diff(self, a, b, name=None, diff_function=None, *args, **kw): + if diff_function is None: + try: + diff_function = getattr(self, self._diff_function) + except TypeError: + diff_function = self._diff_function + if diff_function is None: + diff_function = self.simple_diff + if name is not None: print self.banner(name) - args = (a.splitlines(), b.splitlines()) + args - lines = self.diff_function(*args, **kw) - for l in lines: - print l + args = (a.splitlines(), b.splitlines()) + args + for line in diff_function(*args, **kw): + print line + + def diff_stderr(self, a, b, *args, **kw): + """Compare actual and expected file contents. + """ + try: + diff_stderr_function = getattr(self, self._diff_stderr_function) + except TypeError: + diff_stderr_function = self._diff_stderr_function + return self.diff(a, b, diff_function=diff_stderr_function, *args, **kw) + + def diff_stdout(self, a, b, *args, **kw): + """Compare actual and expected file contents. + """ + try: + diff_stdout_function = getattr(self, self._diff_stdout_function) + except TypeError: + diff_stdout_function = self._diff_stdout_function + return self.diff(a, b, diff_function=diff_stdout_function, *args, **kw) + + simple_diff = staticmethod(simple_diff) + + diff_re = staticmethod(diff_re) + + context_diff = staticmethod(difflib.context_diff) + + unified_diff = staticmethod(difflib.unified_diff) def fail_test(self, condition = 1, function = None, skip = 0): """Cause the test to fail. @@ -974,25 +1077,57 @@ class TestCmd(object): """ self.interpreter = interpreter - def match(self, lines, matches): - """Compare actual and expected file contents. + def set_match_function(self, match=_Null, stdout=_Null, stderr=_Null): + """Sets the specified match functions. """ - return self.match_function(lines, matches) + if match is not _Null: + self._match_function = match + if stdout is not _Null: + self._match_stdout_function = stdout + if stderr is not _Null: + self._match_stderr_function = stderr - def match_exact(self, lines, matches): + def match(self, lines, matches): """Compare actual and expected file contents. """ - return match_exact(lines, matches) - - def match_re(self, lines, res): + try: + match_function = getattr(self, self._match_function) + except TypeError: + match_function = self._match_function + if match_function is None: + # Default is regular expression matches. + match_function = self.match_re + return match_function(lines, matches) + + def match_stderr(self, lines, matches): """Compare actual and expected file contents. """ - return match_re(lines, res) - - def match_re_dotall(self, lines, res): + try: + match_stderr_function = getattr(self, self._match_stderr_function) + except TypeError: + match_stderr_function = self._match_stderr_function + if match_stderr_function is None: + # Default is to use whatever match= is set to. + match_stderr_function = self.match + return match_stderr_function(lines, matches) + + def match_stdout(self, lines, matches): """Compare actual and expected file contents. """ - return match_re_dotall(lines, res) + try: + match_stdout_function = getattr(self, self._match_stdout_function) + except TypeError: + match_stdout_function = self._match_stdout_function + if match_stdout_function is None: + # Default is to use whatever match= is set to. + match_stdout_function = self.match + return match_stdout_function(lines, matches) + + match_exact = staticmethod(match_exact) + + match_re = staticmethod(match_re) + + match_re_dotall = staticmethod(match_re_dotall) def no_result(self, condition = 1, function = None, skip = 0): """Report that the test could not be run. @@ -1057,10 +1192,20 @@ class TestCmd(object): dir = self.canonicalize(dir) os.rmdir(dir) + def _timeout(self): + self.process.terminate() + self.timer.cancel() + self.timer = None + + def set_timeout(self, timeout): + self.timeout = timeout + self.timer = None + def start(self, program = None, interpreter = None, arguments = None, universal_newlines = None, + timeout = _Null, **kw): """ Starts a program or script for the test environment. @@ -1089,20 +1234,33 @@ class TestCmd(object): else: stderr_value = subprocess.PIPE - return Popen(cmd, - stdin=stdin, - stdout=subprocess.PIPE, - stderr=stderr_value, - universal_newlines=universal_newlines) - - def finish(self, popen, **kw): + if timeout is _Null: + timeout = self.timeout + if timeout: + self.timer = threading.Timer(float(timeout), self._timeout) + self.timer.start() + p = Popen(cmd, + stdin=stdin, + stdout=subprocess.PIPE, + stderr=stderr_value, + universal_newlines=universal_newlines) + self.process = p + return p + + def finish(self, popen=None, **kw): """ Finishes and waits for the process being run under control of the specified popen argument, recording the exit status, standard output and error output. """ + if popen is None: + popen = self.process stdout, stderr = popen.communicate() + if self.timer: + self.timer.cancel() + self.timer = None self.status = popen.returncode + self.process = None self._stdout.append(stdout or '') self._stderr.append(stderr or '') @@ -1111,7 +1269,8 @@ class TestCmd(object): arguments = None, chdir = None, stdin = None, - universal_newlines = None): + universal_newlines = None, + timeout = _Null): """Runs a test of the program or script for the test environment. Standard output and error output are saved for future retrieval via the stdout() and stderr() methods. @@ -1126,15 +1285,25 @@ class TestCmd(object): if self.verbose: sys.stderr.write("chdir(" + chdir + ")\n") os.chdir(chdir) - p = self.start(program, - interpreter, - arguments, - universal_newlines, - stdin=stdin) + p = self.start(program = program, + interpreter = interpreter, + arguments = arguments, + universal_newlines = universal_newlines, + timeout = timeout, + stdin = stdin) if is_List(stdin): stdin = ''.join(stdin) + # TODO(sgk): figure out how to re-use the logic in the .finish() + # method above. Just calling it from here causes problems with + # subclasses that redefine .finish(). We could abstract this + # into Yet Another common method called both here and by .finish(), + # but that seems ill-thought-out. stdout, stderr = p.communicate(input=stdin) + if self.timer: + self.timer.cancel() + self.timer = None self.status = p.returncode + self.process = None self._stdout.append(stdout or '') self._stderr.append(stderr or '') |