diff options
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r-- | QMTest/TestCmd.py | 181 |
1 files changed, 113 insertions, 68 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py index 7a668e8..08642cb 100644 --- a/QMTest/TestCmd.py +++ b/QMTest/TestCmd.py @@ -103,6 +103,9 @@ things. Here is an overview of them: test.match_re_dotall("actual 1\nactual 2\n", regex_string) test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes) + test.tempdir() + test.tempdir('temporary-directory') + test.sleep() test.sleep(seconds) @@ -176,8 +179,8 @@ version. # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. __author__ = "Steven Knight <knight at baldmt dot com>" -__revision__ = "TestCmd.py 0.23.D001 2006/11/30 13:57:29 knight" -__version__ = "0.23" +__revision__ = "TestCmd.py 0.26.D001 2007/08/20 21:58:58 knight" +__version__ = "0.26" import os import os.path @@ -225,6 +228,10 @@ else: return type(e) is types.StringType or isinstance(e, UserString) tempfile.template = 'testcmd.' +if os.name in ('posix', 'nt'): + tempfile.template = 'testcmd.' + str(os.getpid()) + '.' +else: + tempfile.template = 'testcmd.' re_space = re.compile('\s') @@ -459,7 +466,8 @@ class TestCmd: subdir = None, verbose = None, match = None, - combine = 0): + combine = 0, + universal_newlines = 1): self._cwd = os.getcwd() self.description_set(description) self.program_set(program) @@ -471,6 +479,7 @@ class TestCmd: verbose = 0 self.verbose_set(verbose) self.combine = combine + self.universal_newlines = universal_newlines if not match is None: self.match_func = match else: @@ -685,7 +694,8 @@ class TestCmd: interpreter = None, arguments = None, chdir = None, - stdin = None): + stdin = None, + universal_newlines = None): """Runs a test of the program or script for the test environment. Standard output and error output are saved for future retrieval via the stdout() and stderr() methods. @@ -721,48 +731,67 @@ class TestCmd: cmd_string = string.join(map(self.escape, cmd), ' ') if self.verbose: sys.stderr.write(cmd_string + "\n") + if universal_newlines is None: + universal_newlines = self.universal_newlines + try: - p = popen2.Popen3(cmd, 1) - except AttributeError: - if sys.platform == 'win32' and cmd_string[0] == '"': - cmd_string = '"' + cmd_string + '"' - (tochild, fromchild, childerr) = os.popen3(' ' + cmd_string) - if stdin: - if is_List(stdin): - for line in stdin: - tochild.write(line) - else: - tochild.write(stdin) - tochild.close() - out = fromchild.read() - err = childerr.read() - if self.combine: - self._stdout.append(out + err) + import subprocess + except ImportError: + try: + Popen3 = popen2.Popen3 + except AttributeError: + class Popen3: + def __init__(self, command): + (stdin, stdout, stderr) = os.popen3(' ' + command) + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + def close_output(self): + self.stdout.close() + self.resultcode = self.stderr.close() + def wait(self): + return self.resultcode + if sys.platform == 'win32' and cmd_string[0] == '"': + cmd_string = '"' + cmd_string + '"' + p = Popen3(cmd_string) else: - self._stdout.append(out) - self._stderr.append(err) - fromchild.close() - self.status = childerr.close() - if not self.status: - self.status = 0 - except: - raise + p = Popen3(cmd, 1) + p.stdin = p.tochild + p.stdout = p.fromchild + p.stderr = p.childerr else: - if stdin: - if is_List(stdin): - for line in stdin: - p.tochild.write(line) - else: - p.tochild.write(stdin) - p.tochild.close() - out = p.fromchild.read() - err = p.childerr.read() - if self.combine: - self._stdout.append(out + err) + p = subprocess.Popen(cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=universal_newlines) + + if stdin: + if is_List(stdin): + for line in stdin: + p.stdin.write(line) else: - self._stdout.append(out) - self._stderr.append(err) - self.status = p.wait() + p.stdin.write(stdin) + p.stdin.close() + + out = p.stdout.read() + err = p.stderr.read() + try: + p.close_output() + except AttributeError: + p.stdout.close() + p.stderr.close() + + self.status = p.wait() + if not self.status: + self.status = 0 + + if self.combine: + self._stdout.append(out + err) + else: + self._stdout.append(out) + self._stderr.append(err) + if chdir: os.chdir(oldcwd) if self.verbose >= 2: @@ -852,6 +881,45 @@ class TestCmd: link = self.canonicalize(link) os.symlink(target, link) + def tempdir(self, path=None): + """Creates a temporary directory. + A unique directory name is generated if no path name is specified. + The directory is created, and will be removed when the TestCmd + object is destroyed. + """ + if path is None: + try: + path = tempfile.mktemp(prefix=tempfile.template) + except TypeError: + path = tempfile.mktemp() + os.mkdir(path) + + # Symlinks in the path will report things + # differently from os.getcwd(), so chdir there + # and back to fetch the canonical path. + cwd = os.getcwd() + try: + os.chdir(path) + path = os.getcwd() + finally: + os.chdir(cwd) + + # Uppercase the drive letter since the case of drive + # letters is pretty much random on win32: + drive,rest = os.path.splitdrive(path) + if drive: + path = string.upper(drive) + rest + + # + self._dirlist.append(path) + global _Cleanup + try: + _Cleanup.index(self) + except ValueError: + _Cleanup.append(self) + + return path + def touch(self, path, mtime=None): """Updates the modification time on the specified file or directory path name. The default is to update to the @@ -894,32 +962,9 @@ class TestCmd: """ if (path != None): if path == '': - path = tempfile.mktemp() - if path != None: - os.mkdir(path) - # We'd like to set self.workdir like this: - # self.workdir = path - # But symlinks in the path will report things - # differently from os.getcwd(), so chdir there - # and back to fetch the canonical path. - cwd = os.getcwd() - os.chdir(path) - self.workdir = os.getcwd() - os.chdir(cwd) - # Uppercase the drive letter since the case of drive - # letters is pretty much random on win32: - drive,rest = os.path.splitdrive(self.workdir) - if drive: - self.workdir = string.upper(drive) + rest - # - self._dirlist.append(self.workdir) - global _Cleanup - try: - _Cleanup.index(self) - except ValueError: - _Cleanup.append(self) - else: - self.workdir = None + path = None + path = self.tempdir(path) + self.workdir = path def workpath(self, *args): """Returns the absolute path name to a subdirectory or file |