summaryrefslogtreecommitdiffstats
path: root/QMTest/TestCmd.py
diff options
context:
space:
mode:
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r--QMTest/TestCmd.py254
1 files changed, 85 insertions, 169 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py
index ef41916..3fca4ec 100644
--- a/QMTest/TestCmd.py
+++ b/QMTest/TestCmd.py
@@ -216,9 +216,10 @@ version.
from __future__ import division
__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
-__version__ = "0.37"
+__revision__ = "TestCmd.py 1.1.D002 2010/05/27 14:47:22 knight"
+__version__ = "1.1"
+import atexit
import errno
import os
import re
@@ -228,6 +229,7 @@ import sys
import tempfile
import time
import traceback
+
try:
from collections import UserList, UserString
except ImportError:
@@ -268,15 +270,16 @@ except ImportError:
__all__.append('simple_diff')
def is_List(e):
- return isinstance(e, (list,UserList))
+ return isinstance(e, (list, UserList))
-try: eval('unicode')
+try:
+ eval('unicode')
except NameError:
def is_String(e):
- return isinstance(e, (str,UserString))
+ return isinstance(e, (str, UserString))
else:
def is_String(e):
- return isinstance(e, (str,unicode,UserString))
+ return isinstance(e, (str, unicode, UserString))
tempfile.template = 'testcmd.'
if os.name in ('posix', 'nt'):
@@ -290,12 +293,12 @@ _Cleanup = []
def _clean():
global _Cleanup
- cleanlist = [_f for _f in _Cleanup if _f]
+ cleanlist = [ c for c in _Cleanup if c ]
del _Cleanup[:]
cleanlist.reverse()
for test in cleanlist:
test.cleanup()
-import atexit
+
atexit.register(_clean)
def _caller(tblist, skip):
@@ -412,7 +415,7 @@ def match_re(lines = None, res = None):
expr = re.compile(s)
except re.error, e:
msg = "Regular expression error in %s: %s"
- raise re.error(msg % (repr(s), e[0]))
+ raise re.error(msg % (repr(s), e.args[0]))
if not expr.search(lines[i]):
return
return 1
@@ -426,12 +429,10 @@ def match_re_dotall(lines = None, res = None):
res = "\n".join(res)
s = "^" + res + "$"
try:
- #print 'DEBUG: lines', lines
- #print 'DEBUG: dotall', s
expr = re.compile(s, re.DOTALL)
except re.error, e:
msg = "Regular expression error in %s: %s"
- raise re.error(msg % (repr(s), e[0]))
+ raise re.error(msg % (repr(s), e.args[0]))
return expr.match(lines)
try:
@@ -453,15 +454,15 @@ else:
for op, a1, a2, b1, b2 in sm.get_opcodes():
if op == 'delete':
result.append("%sd%d" % (comma(a1, a2), b1))
- result.extend(['< ' + l for l in a[a1:a2]])
+ result.extend([ '< ' + l for l in a[a1:a2] ])
elif op == 'insert':
result.append("%da%s" % (a1, comma(b1, b2)))
- result.extend(['> ' + l for l in b[b1:b2]])
+ result.extend([ '> ' + l for l in b[b1:b2] ])
elif op == 'replace':
result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
- result.extend(['< ' + l for l in a[a1:a2]])
+ result.extend([ '< ' + l for l in a[a1:a2] ])
result.append('---')
- result.extend(['> ' + l for l in b[b1:b2]])
+ result.extend([ '> ' + l for l in b[b1:b2] ])
return result
def diff_re(a, b, fromfile='', tofile='',
@@ -486,7 +487,7 @@ def diff_re(a, b, fromfile='', tofile='',
expr = re.compile(s)
except re.error, e:
msg = "Regular expression error in %s: %s"
- raise re.error(msg % (repr(s), e[0]))
+ raise re.error(msg % (repr(s), e.args[0]))
if not expr.search(bline):
result.append("%sc%s" % (i+1, i+1))
result.append('< ' + repr(a[i]))
@@ -518,7 +519,7 @@ else:
if os.name == 'java':
python = os.path.join(sys.prefix, 'jython')
else:
- python = os.environ.get("python_executable", sys.executable)
+ python = os.environ.get('python_executable', sys.executable)
_python_ = escape(python)
if sys.platform == 'win32':
@@ -574,95 +575,48 @@ except ImportError:
# The subprocess module doesn't exist in this version of Python,
# so we're going to cobble up something that looks just enough
# like its API for our purposes below.
- from types import ModuleType
- subprocess = ModuleType('subprocess')
+ import popen2
+ import types
+ subprocess = types.ModuleType('subprocess')
subprocess.PIPE = 'PIPE'
subprocess.STDOUT = 'STDOUT'
subprocess.mswindows = (sys.platform == 'win32')
- try:
- import popen2
- popen2.Popen3
- except AttributeError:
- class Popen3:
- universal_newlines = 1
- def __init__(self, command, **kw):
- if sys.platform == 'win32' and command[0] == '"':
- command = '"' + command + '"'
- (stdin, stdout, stderr) = os.popen3(' ' + command)
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- def close_output(self):
- self.stdout.close()
- self.resultcode = self.stderr.close()
- def wait(self):
- resultcode = self.resultcode
- if os.WIFEXITED(resultcode):
- return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
- else:
- return None
+ class Popen(popen2.Popen3, popen2.Popen4):
+ universal_newlines = 1
+ def __init__(self, command, **kw):
+ if kw.get('stderr') == 'STDOUT':
+ popen2.Popen4.__init__(self, command, 1)
+ else:
+ popen2.Popen3.__init__(self, command, 1)
+ self.stdin = self.tochild
+ self.stdout = self.fromchild
+ self.stderr = self.childerr
+ def communicate(self, input=None):
+ if input:
+ self.stdin.write(input)
+ self.stdin.close()
+ out = self.stdout.read()
+ if self.stderr is None:
+ err = None
+ else:
+ err = self.stderr.read()
+ self.stdout.close()
+ if self.stderr is not None:
+ self.stderr.close()
+ self.returncode = self.wait()
+ return (out, err)
+ def wait(self, *args, **kw):
+ resultcode = popen2.Popen3.wait(self, *args, **kw)
+ if os.WIFEXITED(resultcode):
+ return os.WEXITSTATUS(resultcode)
+ elif os.WIFSIGNALED(resultcode):
+ return os.WTERMSIG(resultcode)
+ else:
+ return None
- else:
- try:
- popen2.Popen4
- except AttributeError:
- # A cribbed Popen4 class, with some retrofitted code from
- # the Python 1.5 Popen3 class methods to do certain things
- # by hand.
- class Popen4(popen2.Popen3):
- childerr = None
-
- def __init__(self, cmd, bufsize=-1):
- p2cread, p2cwrite = os.pipe()
- c2pread, c2pwrite = os.pipe()
- self.pid = os.fork()
- if self.pid == 0:
- # Child
- os.dup2(p2cread, 0)
- os.dup2(c2pwrite, 1)
- os.dup2(c2pwrite, 2)
- for i in range(3, popen2.MAXFD):
- try:
- os.close(i)
- except: pass
- try:
- os.execvp(cmd[0], cmd)
- finally:
- os._exit(1)
- # Shouldn't come here, I guess
- os._exit(1)
- os.close(p2cread)
- self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
- os.close(c2pwrite)
- self.fromchild = os.fdopen(c2pread, 'r', bufsize)
- popen2._active.append(self)
-
- popen2.Popen4 = Popen4
-
- class Popen3(popen2.Popen3, popen2.Popen4):
- universal_newlines = 1
- def __init__(self, command, **kw):
- if kw.get('stderr') == 'STDOUT':
- popen2.Popen4.__init__(self, command, 1)
- else:
- popen2.Popen3.__init__(self, command, 1)
- self.stdin = self.tochild
- self.stdout = self.fromchild
- self.stderr = self.childerr
- def wait(self, *args, **kw):
- resultcode = popen2.Popen3.wait(self, *args, **kw)
- if os.WIFEXITED(resultcode):
- return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
- else:
- return None
-
- subprocess.Popen = Popen3
+ subprocess.Popen = Popen
@@ -718,7 +672,7 @@ class Popen(subprocess.Popen):
except ValueError:
return self._close('stdin')
except (subprocess.pywintypes.error, Exception), why:
- if why[0] in (109, errno.ESHUTDOWN):
+ if why.args[0] in (109, errno.ESHUTDOWN):
return self._close('stdin')
raise
@@ -739,7 +693,7 @@ class Popen(subprocess.Popen):
except ValueError:
return self._close(which)
except (subprocess.pywintypes.error, Exception), why:
- if why[0] in (109, errno.ESHUTDOWN):
+ if why.args[0] in (109, errno.ESHUTDOWN):
return self._close(which)
raise
@@ -825,14 +779,6 @@ def send_all(p, data):
-try:
- object
-except NameError:
- class object:
- pass
-
-
-
class TestCmd(object):
"""Class TestCmd
"""
@@ -977,11 +923,11 @@ class TestCmd(object):
program = self.program
if not interpreter:
interpreter = self.interpreter
- if not type(program) in [list, tuple]:
+ if not isinstance(program, (list, tuple)):
program = [program]
cmd = list(program)
if interpreter:
- if not type(interpreter) in [list, tuple]:
+ if not isinstance(interpreter, (list, tuple)):
interpreter = [interpreter]
cmd = list(interpreter) + cmd
if arguments:
@@ -1123,10 +1069,8 @@ class TestCmd(object):
prepended unless it is enclosed in a [list].
"""
cmd = self.command_args(program, interpreter, arguments)
- #print 'DEBUG: SCONSFLAGS:', os.environ.get('SCONSFLAGS')
- #print 'DEBUG: command line:', ' '.join(map(self.escape, cmd))
if self.verbose:
- cmd_string = ' '.join(map(self.escape, cmd))
+ cmd_string = ' '.join([ self.escape(c) for c in cmd ])
sys.stderr.write(cmd_string + "\n")
if universal_newlines is None:
universal_newlines = self.universal_newlines
@@ -1157,16 +1101,10 @@ class TestCmd(object):
the specified popen argument, recording the exit status,
standard output and error output.
"""
- popen.stdin.close()
- self.status = popen.wait()
- if not self.status:
- self.status = 0
- self._stdout.append(popen.stdout.read())
- if popen.stderr:
- stderr = popen.stderr.read()
- else:
- stderr = ''
- self._stderr.append(stderr)
+ stdout, stderr = popen.communicate()
+ self.status = popen.returncode
+ self._stdout.append(stdout or '')
+ self._stderr.append(stderr or '')
def run(self, program = None,
interpreter = None,
@@ -1193,34 +1131,12 @@ class TestCmd(object):
arguments,
universal_newlines,
stdin=stdin)
- if stdin:
- if is_List(stdin):
- for line in stdin:
- p.stdin.write(line)
- else:
- p.stdin.write(stdin)
- p.stdin.close()
-
- out = p.stdout.read()
- if p.stderr is None:
- err = ''
- else:
- err = p.stderr.read()
- try:
- close_output = p.close_output
- except AttributeError:
- p.stdout.close()
- if not p.stderr is None:
- p.stderr.close()
- else:
- close_output()
-
- self._stdout.append(out)
- self._stderr.append(err)
-
- self.status = p.wait()
- if not self.status:
- self.status = 0
+ if is_List(stdin):
+ stdin = ''.join(stdin)
+ stdout, stderr = p.communicate(input=stdin)
+ self.status = p.returncode
+ self._stdout.append(stdout or '')
+ self._stderr.append(stderr or '')
if chdir:
os.chdir(oldcwd)
@@ -1434,10 +1350,10 @@ class TestCmd(object):
# permission, so it's also pretty easy, just chmod the
# directory and then chmod every entry on our walk down the
# tree.
+ do_chmod(top)
for dirpath, dirnames, filenames in os.walk(top):
- do_chmod(dirpath)
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
else:
# It's a directory and we're trying to turn off read
# permission, which means we have to chmod the directories
@@ -1445,9 +1361,9 @@ class TestCmd(object):
# the top down get in the way of being able to get at lower
# parts of the tree.
for dirpath, dirnames, filenames in os.walk(top, topdown=0):
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
- do_chmod(dirpath)
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+ do_chmod(top)
def writable(self, top, write=1):
"""Make the specified directory tree writable (write == 1)
@@ -1481,10 +1397,10 @@ class TestCmd(object):
if os.path.isfile(top):
do_chmod(top)
else:
+ do_chmod(top)
for dirpath, dirnames, filenames in os.walk(top, topdown=0):
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
- do_chmod(dirpath)
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
def executable(self, top, execute=1):
"""Make the specified directory tree executable (execute == 1)
@@ -1516,10 +1432,10 @@ class TestCmd(object):
# permission, so it's also pretty easy, just chmod the
# directory and then chmod every entry on our walk down the
# tree.
+ do_chmod(top)
for dirpath, dirnames, filenames in os.walk(top):
- do_chmod(dirpath)
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
else:
# It's a directory and we're trying to turn off execute
# permission, which means we have to chmod the directories
@@ -1527,9 +1443,9 @@ class TestCmd(object):
# the top down get in the way of being able to get at lower
# parts of the tree.
for dirpath, dirnames, filenames in os.walk(top, topdown=0):
- for fn in filenames:
- do_chmod(os.path.join(dirpath, fn))
- do_chmod(dirpath)
+ for name in dirnames + filenames:
+ do_chmod(os.path.join(dirpath, name))
+ do_chmod(top)
def write(self, file, content, mode = 'wb'):
"""Writes the specified content text (second argument) to the