summaryrefslogtreecommitdiffstats
path: root/QMTest/TestCmd.py
diff options
context:
space:
mode:
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r--QMTest/TestCmd.py181
1 files changed, 113 insertions, 68 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py
index 7a668e8..08642cb 100644
--- a/QMTest/TestCmd.py
+++ b/QMTest/TestCmd.py
@@ -103,6 +103,9 @@ things. Here is an overview of them:
test.match_re_dotall("actual 1\nactual 2\n", regex_string)
test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
+ test.tempdir()
+ test.tempdir('temporary-directory')
+
test.sleep()
test.sleep(seconds)
@@ -176,8 +179,8 @@ version.
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCmd.py 0.23.D001 2006/11/30 13:57:29 knight"
-__version__ = "0.23"
+__revision__ = "TestCmd.py 0.26.D001 2007/08/20 21:58:58 knight"
+__version__ = "0.26"
import os
import os.path
@@ -225,6 +228,10 @@ else:
return type(e) is types.StringType or isinstance(e, UserString)
tempfile.template = 'testcmd.'
+if os.name in ('posix', 'nt'):
+ tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
+else:
+ tempfile.template = 'testcmd.'
re_space = re.compile('\s')
@@ -459,7 +466,8 @@ class TestCmd:
subdir = None,
verbose = None,
match = None,
- combine = 0):
+ combine = 0,
+ universal_newlines = 1):
self._cwd = os.getcwd()
self.description_set(description)
self.program_set(program)
@@ -471,6 +479,7 @@ class TestCmd:
verbose = 0
self.verbose_set(verbose)
self.combine = combine
+ self.universal_newlines = universal_newlines
if not match is None:
self.match_func = match
else:
@@ -685,7 +694,8 @@ class TestCmd:
interpreter = None,
arguments = None,
chdir = None,
- stdin = None):
+ stdin = None,
+ universal_newlines = None):
"""Runs a test of the program or script for the test
environment. Standard output and error output are saved for
future retrieval via the stdout() and stderr() methods.
@@ -721,48 +731,67 @@ class TestCmd:
cmd_string = string.join(map(self.escape, cmd), ' ')
if self.verbose:
sys.stderr.write(cmd_string + "\n")
+ if universal_newlines is None:
+ universal_newlines = self.universal_newlines
+
try:
- p = popen2.Popen3(cmd, 1)
- except AttributeError:
- if sys.platform == 'win32' and cmd_string[0] == '"':
- cmd_string = '"' + cmd_string + '"'
- (tochild, fromchild, childerr) = os.popen3(' ' + cmd_string)
- if stdin:
- if is_List(stdin):
- for line in stdin:
- tochild.write(line)
- else:
- tochild.write(stdin)
- tochild.close()
- out = fromchild.read()
- err = childerr.read()
- if self.combine:
- self._stdout.append(out + err)
+ import subprocess
+ except ImportError:
+ try:
+ Popen3 = popen2.Popen3
+ except AttributeError:
+ class Popen3:
+ def __init__(self, command):
+ (stdin, stdout, stderr) = os.popen3(' ' + command)
+ self.stdin = stdin
+ self.stdout = stdout
+ self.stderr = stderr
+ def close_output(self):
+ self.stdout.close()
+ self.resultcode = self.stderr.close()
+ def wait(self):
+ return self.resultcode
+ if sys.platform == 'win32' and cmd_string[0] == '"':
+ cmd_string = '"' + cmd_string + '"'
+ p = Popen3(cmd_string)
else:
- self._stdout.append(out)
- self._stderr.append(err)
- fromchild.close()
- self.status = childerr.close()
- if not self.status:
- self.status = 0
- except:
- raise
+ p = Popen3(cmd, 1)
+ p.stdin = p.tochild
+ p.stdout = p.fromchild
+ p.stderr = p.childerr
else:
- if stdin:
- if is_List(stdin):
- for line in stdin:
- p.tochild.write(line)
- else:
- p.tochild.write(stdin)
- p.tochild.close()
- out = p.fromchild.read()
- err = p.childerr.read()
- if self.combine:
- self._stdout.append(out + err)
+ p = subprocess.Popen(cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=universal_newlines)
+
+ if stdin:
+ if is_List(stdin):
+ for line in stdin:
+ p.stdin.write(line)
else:
- self._stdout.append(out)
- self._stderr.append(err)
- self.status = p.wait()
+ p.stdin.write(stdin)
+ p.stdin.close()
+
+ out = p.stdout.read()
+ err = p.stderr.read()
+ try:
+ p.close_output()
+ except AttributeError:
+ p.stdout.close()
+ p.stderr.close()
+
+ self.status = p.wait()
+ if not self.status:
+ self.status = 0
+
+ if self.combine:
+ self._stdout.append(out + err)
+ else:
+ self._stdout.append(out)
+ self._stderr.append(err)
+
if chdir:
os.chdir(oldcwd)
if self.verbose >= 2:
@@ -852,6 +881,45 @@ class TestCmd:
link = self.canonicalize(link)
os.symlink(target, link)
+ def tempdir(self, path=None):
+ """Creates a temporary directory.
+ A unique directory name is generated if no path name is specified.
+ The directory is created, and will be removed when the TestCmd
+ object is destroyed.
+ """
+ if path is None:
+ try:
+ path = tempfile.mktemp(prefix=tempfile.template)
+ except TypeError:
+ path = tempfile.mktemp()
+ os.mkdir(path)
+
+ # Symlinks in the path will report things
+ # differently from os.getcwd(), so chdir there
+ # and back to fetch the canonical path.
+ cwd = os.getcwd()
+ try:
+ os.chdir(path)
+ path = os.getcwd()
+ finally:
+ os.chdir(cwd)
+
+ # Uppercase the drive letter since the case of drive
+ # letters is pretty much random on win32:
+ drive,rest = os.path.splitdrive(path)
+ if drive:
+ path = string.upper(drive) + rest
+
+ #
+ self._dirlist.append(path)
+ global _Cleanup
+ try:
+ _Cleanup.index(self)
+ except ValueError:
+ _Cleanup.append(self)
+
+ return path
+
def touch(self, path, mtime=None):
"""Updates the modification time on the specified file or
directory path name. The default is to update to the
@@ -894,32 +962,9 @@ class TestCmd:
"""
if (path != None):
if path == '':
- path = tempfile.mktemp()
- if path != None:
- os.mkdir(path)
- # We'd like to set self.workdir like this:
- # self.workdir = path
- # But symlinks in the path will report things
- # differently from os.getcwd(), so chdir there
- # and back to fetch the canonical path.
- cwd = os.getcwd()
- os.chdir(path)
- self.workdir = os.getcwd()
- os.chdir(cwd)
- # Uppercase the drive letter since the case of drive
- # letters is pretty much random on win32:
- drive,rest = os.path.splitdrive(self.workdir)
- if drive:
- self.workdir = string.upper(drive) + rest
- #
- self._dirlist.append(self.workdir)
- global _Cleanup
- try:
- _Cleanup.index(self)
- except ValueError:
- _Cleanup.append(self)
- else:
- self.workdir = None
+ path = None
+ path = self.tempdir(path)
+ self.workdir = path
def workpath(self, *args):
"""Returns the absolute path name to a subdirectory or file