summaryrefslogtreecommitdiffstats
path: root/QMTest
diff options
context:
space:
mode:
Diffstat (limited to 'QMTest')
-rw-r--r--QMTest/TestCmd.py427
-rw-r--r--QMTest/TestCommon.py137
-rw-r--r--QMTest/TestSCons.py143
-rw-r--r--QMTest/TestSCons_time.py24
4 files changed, 596 insertions, 135 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py
index f5e1c71..8bf054b 100644
--- a/QMTest/TestCmd.py
+++ b/QMTest/TestCmd.py
@@ -181,12 +181,12 @@ version.
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCmd.py 0.30.D001 2007/10/01 16:53:55 knight"
-__version__ = "0.30"
+__revision__ = "TestCmd.py 0.31.D001 2008/01/01 09:05:59 knight"
+__version__ = "0.31"
+import errno
import os
import os.path
-import popen2
import re
import shutil
import stat
@@ -457,6 +457,252 @@ else:
default_sleep_seconds = 1
+
+
+try:
+ import subprocess
+except ImportError:
+ # The subprocess module doesn't exist in this version of Python,
+ # so we're going to cobble up something that looks just enough
+ # like its API for our purposes below.
+ import new
+
+ subprocess = new.module('subprocess')
+
+ subprocess.PIPE = 'PIPE'
+ subprocess.STDOUT = 'STDOUT'
+ subprocess.mswindows = (sys.platform == 'win32')
+
+ try:
+ import popen2
+ popen2.Popen3
+ except AttributeError:
+ class Popen3:
+ universal_newlines = 1
+ def __init__(self, command, **kw):
+ if sys.platform == 'win32' and command[0] == '"':
+ command = '"' + command + '"'
+ (stdin, stdout, stderr) = os.popen3(' ' + command)
+ self.stdin = stdin
+ self.stdout = stdout
+ self.stderr = stderr
+ def close_output(self):
+ self.stdout.close()
+ self.resultcode = self.stderr.close()
+ def wait(self):
+ return self.resultcode
+
+ else:
+ try:
+ popen2.Popen4
+ except AttributeError:
+ # A cribbed Popen4 class, with some retrofitted code from
+ # the Python 1.5 Popen3 class methods to do certain things
+ # by hand.
+ class Popen4(popen2.Popen3):
+ childerr = None
+
+ def __init__(self, cmd, bufsize=-1):
+ p2cread, p2cwrite = os.pipe()
+ c2pread, c2pwrite = os.pipe()
+ self.pid = os.fork()
+ if self.pid == 0:
+ # Child
+ os.dup2(p2cread, 0)
+ os.dup2(c2pwrite, 1)
+ os.dup2(c2pwrite, 2)
+ for i in range(3, popen2.MAXFD):
+ try:
+ os.close(i)
+ except: pass
+ try:
+ os.execvp(cmd[0], cmd)
+ finally:
+ os._exit(1)
+ # Shouldn't come here, I guess
+ os._exit(1)
+ os.close(p2cread)
+ self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
+ os.close(c2pwrite)
+ self.fromchild = os.fdopen(c2pread, 'r', bufsize)
+ popen2._active.append(self)
+
+ popen2.Popen4 = Popen4
+
+ class Popen3(popen2.Popen3, popen2.Popen4):
+ universal_newlines = 1
+ def __init__(self, command, **kw):
+ if kw.get('stderr') == 'STDOUT':
+ apply(popen2.Popen4.__init__, (self, command, 1))
+ else:
+ apply(popen2.Popen3.__init__, (self, command, 1))
+ self.stdin = self.tochild
+ self.stdout = self.fromchild
+ self.stderr = self.childerr
+
+ subprocess.Popen = Popen3
+
+
+
+# From Josiah Carlson,
+# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
+# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
+
+PIPE = subprocess.PIPE
+
+if subprocess.mswindows:
+ from win32file import ReadFile, WriteFile
+ from win32pipe import PeekNamedPipe
+ import msvcrt
+else:
+ import select
+ import fcntl
+
+ try: fcntl.F_GETFL
+ except AttributeError: fcntl.F_GETFL = 3
+
+ try: fcntl.F_SETFL
+ except AttributeError: fcntl.F_SETFL = 4
+
+class Popen(subprocess.Popen):
+ def recv(self, maxsize=None):
+ return self._recv('stdout', maxsize)
+
+ def recv_err(self, maxsize=None):
+ return self._recv('stderr', maxsize)
+
+ def send_recv(self, input='', maxsize=None):
+ return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
+
+ def get_conn_maxsize(self, which, maxsize):
+ if maxsize is None:
+ maxsize = 1024
+ elif maxsize < 1:
+ maxsize = 1
+ return getattr(self, which), maxsize
+
+ def _close(self, which):
+ getattr(self, which).close()
+ setattr(self, which, None)
+
+ if subprocess.mswindows:
+ def send(self, input):
+ if not self.stdin:
+ return None
+
+ try:
+ x = msvcrt.get_osfhandle(self.stdin.fileno())
+ (errCode, written) = WriteFile(x, input)
+ except ValueError:
+ return self._close('stdin')
+ except (subprocess.pywintypes.error, Exception), why:
+ if why[0] in (109, errno.ESHUTDOWN):
+ return self._close('stdin')
+ raise
+
+ return written
+
+ def _recv(self, which, maxsize):
+ conn, maxsize = self.get_conn_maxsize(which, maxsize)
+ if conn is None:
+ return None
+
+ try:
+ x = msvcrt.get_osfhandle(conn.fileno())
+ (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
+ if maxsize < nAvail:
+ nAvail = maxsize
+ if nAvail > 0:
+ (errCode, read) = ReadFile(x, nAvail, None)
+ except ValueError:
+ return self._close(which)
+ except (subprocess.pywintypes.error, Exception), why:
+ if why[0] in (109, errno.ESHUTDOWN):
+ return self._close(which)
+ raise
+
+ #if self.universal_newlines:
+ # read = self._translate_newlines(read)
+ return read
+
+ else:
+ def send(self, input):
+ if not self.stdin:
+ return None
+
+ if not select.select([], [self.stdin], [], 0)[1]:
+ return 0
+
+ try:
+ written = os.write(self.stdin.fileno(), input)
+ except OSError, why:
+ if why[0] == errno.EPIPE: #broken pipe
+ return self._close('stdin')
+ raise
+
+ return written
+
+ def _recv(self, which, maxsize):
+ conn, maxsize = self.get_conn_maxsize(which, maxsize)
+ if conn is None:
+ return None
+
+ try:
+ flags = fcntl.fcntl(conn, fcntl.F_GETFL)
+ except TypeError:
+ flags = None
+ else:
+ if not conn.closed:
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
+
+ try:
+ if not select.select([conn], [], [], 0)[0]:
+ return ''
+
+ r = conn.read(maxsize)
+ if not r:
+ return self._close(which)
+
+ #if self.universal_newlines:
+ # r = self._translate_newlines(r)
+ return r
+ finally:
+ if not conn.closed and not flags is None:
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags)
+
+disconnect_message = "Other end disconnected!"
+
+def recv_some(p, t=.1, e=1, tr=5, stderr=0):
+ if tr < 1:
+ tr = 1
+ x = time.time()+t
+ y = []
+ r = ''
+ pr = p.recv
+ if stderr:
+ pr = p.recv_err
+ while time.time() < x or r:
+ r = pr()
+ if r is None:
+ if e:
+ raise Exception(disconnect_message)
+ else:
+ break
+ elif r:
+ y.append(r)
+ else:
+ time.sleep(max((x-time.time())/tr, 0))
+ return ''.join(y)
+
+def send_all(p, data):
+ while len(data):
+ sent = p.send(data)
+ if sent is None:
+ raise Exception(disconnect_message)
+ data = buffer(data, sent)
+
+
+
class TestCmd:
"""Class TestCmd
"""
@@ -703,26 +949,17 @@ class TestCmd:
dir = self.canonicalize(dir)
os.rmdir(dir)
- def run(self, program = None,
- interpreter = None,
- arguments = None,
- chdir = None,
- stdin = None,
- universal_newlines = None):
- """Runs a test of the program or script for the test
- environment. Standard output and error output are saved for
- future retrieval via the stdout() and stderr() methods.
+ def start(self, program = None,
+ interpreter = None,
+ arguments = None,
+ universal_newlines = None,
+ **kw):
+ """
+ Starts a program or script for the test environment.
The specified program will have the original directory
- prepending unless it is enclosed in a [list].
+ prepended unless it is enclosed in a [list].
"""
- if chdir:
- oldcwd = os.getcwd()
- if not os.path.isabs(chdir):
- chdir = os.path.join(self.workpath(chdir))
- if self.verbose:
- sys.stderr.write("chdir(" + chdir + ")\n")
- os.chdir(chdir)
if program:
if type(program) == type('') and not os.path.isabs(program):
program = os.path.join(self._cwd, program)
@@ -747,38 +984,56 @@ class TestCmd:
if universal_newlines is None:
universal_newlines = self.universal_newlines
- try:
- import subprocess
- except ImportError:
- try:
- Popen3 = popen2.Popen3
- except AttributeError:
- class Popen3:
- def __init__(self, command):
- (stdin, stdout, stderr) = os.popen3(' ' + command)
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- def close_output(self):
- self.stdout.close()
- self.resultcode = self.stderr.close()
- def wait(self):
- return self.resultcode
- if sys.platform == 'win32' and cmd_string[0] == '"':
- cmd_string = '"' + cmd_string + '"'
- p = Popen3(cmd_string)
- else:
- p = Popen3(cmd, 1)
- p.stdin = p.tochild
- p.stdout = p.fromchild
- p.stderr = p.childerr
+ combine = kw.get('combine', self.combine)
+ if combine:
+ stderr_value = subprocess.STDOUT
else:
- p = subprocess.Popen(cmd,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=universal_newlines)
+ stderr_value = subprocess.PIPE
+
+ return Popen(cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=stderr_value,
+ universal_newlines=universal_newlines)
+ def finish(self, popen, **kw):
+ """
+ Finishes and waits for the process being run under control of
+ the specified popen argument, recording the exit status,
+ standard output and error output.
+ """
+ popen.stdin.close()
+ self.status = popen.wait()
+ if not self.status:
+ self.status = 0
+ self._stdout.append(popen.stdout.read())
+ if popen.stderr:
+ stderr = popen.stderr.read()
+ else:
+ stderr = ''
+ self._stderr.append(stderr)
+
+ def run(self, program = None,
+ interpreter = None,
+ arguments = None,
+ chdir = None,
+ stdin = None,
+ universal_newlines = None):
+ """Runs a test of the program or script for the test
+ environment. Standard output and error output are saved for
+ future retrieval via the stdout() and stderr() methods.
+
+ The specified program will have the original directory
+ prepended unless it is enclosed in a [list].
+ """
+ if chdir:
+ oldcwd = os.getcwd()
+ if not os.path.isabs(chdir):
+ chdir = os.path.join(self.workpath(chdir))
+ if self.verbose:
+ sys.stderr.write("chdir(" + chdir + ")\n")
+ os.chdir(chdir)
+ p = self.start(program, interpreter, arguments, universal_newlines)
if stdin:
if is_List(stdin):
for line in stdin:
@@ -788,23 +1043,26 @@ class TestCmd:
p.stdin.close()
out = p.stdout.read()
- err = p.stderr.read()
+ if p.stderr is None:
+ err = ''
+ else:
+ err = p.stderr.read()
try:
- p.close_output()
+ close_output = p.close_output
except AttributeError:
p.stdout.close()
- p.stderr.close()
+ if not p.stderr is None:
+ p.stderr.close()
+ else:
+ close_output()
+
+ self._stdout.append(out)
+ self._stderr.append(err)
self.status = p.wait()
if not self.status:
self.status = 0
- if self.combine:
- self._stdout.append(out + err)
- else:
- self._stdout.append(out)
- self._stderr.append(err)
-
if chdir:
os.chdir(oldcwd)
if self.verbose >= 2:
@@ -990,18 +1248,24 @@ class TestCmd:
def readable(self, top, read=1):
"""Make the specified directory tree readable (read == 1)
or not (read == None).
+
+ This method has no effect on Windows systems, which use a
+ completely different mechanism to control file readability.
"""
+ if sys.platform == 'win32':
+ return
+
if read:
def do_chmod(fname):
try: st = os.stat(fname)
except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0400))
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
else:
def do_chmod(fname):
try: st = os.stat(fname)
except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0400))
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
if os.path.isfile(top):
# If it's a file, that's easy, just chmod it.
@@ -1040,16 +1304,29 @@ class TestCmd:
or not (write == None).
"""
- if write:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
+ if sys.platform == 'win32':
+
+ if write:
+ def do_chmod(fname):
+ try: os.chmod(fname, stat.S_IWRITE)
+ except OSError: pass
+ else:
+ def do_chmod(fname):
+ try: os.chmod(fname, stat.S_IREAD)
+ except OSError: pass
+
else:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
+
+ if write:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
+ else:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
if os.path.isfile(top):
do_chmod(top)
@@ -1061,18 +1338,24 @@ class TestCmd:
def executable(self, top, execute=1):
"""Make the specified directory tree executable (execute == 1)
or not (execute == None).
+
+ This method has no effect on Windows systems, which use a
+ completely different mechanism to control file executability.
"""
+ if sys.platform == 'win32':
+ return
+
if execute:
def do_chmod(fname):
try: st = os.stat(fname)
except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0100))
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
else:
def do_chmod(fname):
try: st = os.stat(fname)
except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0100))
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
if os.path.isfile(top):
# If it's a file, that's easy, just chmod it.
diff --git a/QMTest/TestCommon.py b/QMTest/TestCommon.py
index d6b21ad..acc63d4 100644
--- a/QMTest/TestCommon.py
+++ b/QMTest/TestCommon.py
@@ -84,9 +84,10 @@ The TestCommon module also provides the following variables
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCommon.py 0.30.D001 2007/10/01 16:53:55 knight"
-__version__ = "0.30"
+__revision__ = "TestCommon.py 0.31.D001 2008/01/01 09:05:59 knight"
+__version__ = "0.31"
+import copy
import os
import os.path
import stat
@@ -378,6 +379,97 @@ class TestCommon(TestCmd):
print "Writable files: `%s'" % string.join(writable, "', `")
self.fail_test(missing + writable)
+ def _complete(self, actual_stdout, expected_stdout,
+ actual_stderr, expected_stderr, status, match):
+ """
+ Post-processes running a subcommand, checking for failure
+ status and displaying output appropriately.
+ """
+ if _failed(self, status):
+ expect = ''
+ if status != 0:
+ expect = " (expected %s)" % str(status)
+ print "%s returned %s%s" % (self.program, str(_status(self)), expect)
+ print self.banner('STDOUT ')
+ print actual_stdout
+ print self.banner('STDERR ')
+ print actual_stderr
+ self.fail_test()
+ if not expected_stdout is None and not match(actual_stdout, expected_stdout):
+ self.diff(expected_stdout, actual_stdout, 'STDOUT ')
+ if actual_stderr:
+ print self.banner('STDERR ')
+ print actual_stderr
+ self.fail_test()
+ if not expected_stderr is None and not match(actual_stderr, expected_stderr):
+ print self.banner('STDOUT ')
+ print actual_stdout
+ self.diff(expected_stderr, actual_stderr, 'STDERR ')
+ self.fail_test()
+
+ def start(self, program = None,
+ interpreter = None,
+ arguments = None,
+ universal_newlines = None,
+ **kw):
+ """
+ Starts a program or script for the test environment.
+
+ This handles the "options" keyword argument and exceptions.
+ """
+ try:
+ options = kw['options']
+ del kw['options']
+ except KeyError:
+ pass
+ else:
+ if options:
+ if arguments is None:
+ arguments = options
+ else:
+ arguments = options + " " + arguments
+ try:
+ return apply(TestCmd.start,
+ (self, program, interpreter, arguments, universal_newlines),
+ kw)
+ except KeyboardInterrupt:
+ raise
+ except Exception, e:
+ print self.banner('STDOUT ')
+ try:
+ print self.stdout()
+ except IndexError:
+ pass
+ print self.banner('STDERR ')
+ try:
+ print self.stderr()
+ except IndexError:
+ pass
+ raise e
+
+ def finish(self, popen, stdout = None, stderr = '', status = 0, **kw):
+ """
+ Finishes and waits for the process being run under control of
+ the specified popen argument. Additional arguments are similar
+ to those of the run() method:
+
+ stdout The expected standard output from
+ the command. A value of None means
+ don't test standard output.
+
+ stderr The expected error output from
+ the command. A value of None means
+ don't test error output.
+
+ status The expected exit status from the
+ command. A value of None means don't
+ test exit status.
+ """
+ apply(TestCmd.finish, (self, popen,), kw)
+ match = kw.get('match', self.match)
+ self._complete(self.stdout(), stdout,
+ self.stderr(), stderr, status, match)
+
def run(self, options = None, arguments = None,
stdout = None, stderr = '', status = 0, **kw):
"""Runs the program under test, checking that the test succeeded.
@@ -415,44 +507,9 @@ class TestCommon(TestCmd):
del kw['match']
except KeyError:
match = self.match
- try:
- apply(TestCmd.run, [self], kw)
- except KeyboardInterrupt:
- raise
- except Exception, e:
- print self.banner('STDOUT ')
- try:
- print self.stdout()
- except IndexError:
- pass
- print self.banner('STDERR ')
- try:
- print self.stderr()
- except IndexError:
- pass
- raise e
- if _failed(self, status):
- expect = ''
- if status != 0:
- expect = " (expected %s)" % str(status)
- print "%s returned %s%s" % (self.program, str(_status(self)), expect)
- print self.banner('STDOUT ')
- print self.stdout()
- print self.banner('STDERR ')
- print self.stderr()
- self.fail_test()
- if not stdout is None and not match(self.stdout(), stdout):
- self.diff(stdout, self.stdout(), 'STDOUT ')
- stderr = self.stderr()
- if stderr:
- print self.banner('STDERR ')
- print stderr
- self.fail_test()
- if not stderr is None and not match(self.stderr(), stderr):
- print self.banner('STDOUT ')
- print self.stdout()
- self.diff(stderr, self.stderr(), 'STDERR ')
- self.fail_test()
+ apply(TestCmd.run, [self], kw)
+ self._complete(self.stdout(), stdout,
+ self.stderr(), stderr, status, match)
def skip_test(self, message="Skipping test.\n"):
"""Skips a test.
diff --git a/QMTest/TestSCons.py b/QMTest/TestSCons.py
index b1fdbc1..6b6f5ed 100644
--- a/QMTest/TestSCons.py
+++ b/QMTest/TestSCons.py
@@ -21,6 +21,7 @@ import os.path
import re
import string
import sys
+import time
import __builtin__
try:
@@ -138,7 +139,6 @@ def re_escape(str):
return str
-
class TestSCons(TestCommon):
"""Class for testing SCons.
@@ -337,8 +337,8 @@ class TestSCons(TestCommon):
return x
def normalize_pdf(self, s):
- s = re.sub(r'/CreationDate \(D:[^)]*\)',
- r'/CreationDate (D:XXXX)', s)
+ s = re.sub(r'/(Creation|Mod)Date \(D:[^)]*\)',
+ r'/\1Date (D:XXXX)', s)
s = re.sub(r'/ID \[<[0-9a-fA-F]*> <[0-9a-fA-F]*>\]',
r'/ID [<XXXX> <XXXX>]', s)
s = re.sub(r'/(BaseFont|FontName) /[A-Z]{6}',
@@ -381,33 +381,114 @@ class TestSCons(TestCommon):
return s
- def java_ENV(self):
+ def java_ENV(self, version=None):
"""
- Return a default external environment that uses a local Java SDK
- in preference to whatever's found in the default PATH.
+ Initialize with a default external environment that uses a local
+ Java SDK in preference to whatever's found in the default PATH.
"""
+ try:
+ return self._java_env[version]['ENV']
+ except AttributeError:
+ self._java_env = {}
+ except KeyError:
+ pass
+
import SCons.Environment
env = SCons.Environment.Environment()
- java_path = [
- '/usr/local/j2sdk1.4.2/bin',
- '/usr/local/j2sdk1.4.1/bin',
- '/usr/local/j2sdk1.3.1/bin',
- '/usr/local/j2sdk1.3.0/bin',
- '/usr/local/j2sdk1.2.2/bin',
- '/usr/local/j2sdk1.2/bin',
- '/usr/local/j2sdk1.1.8/bin',
- '/usr/local/j2sdk1.1.7/bin',
- '/usr/local/j2sdk1.1.6/bin',
- '/usr/local/j2sdk1.1.5/bin',
- '/usr/local/j2sdk1.1.4/bin',
- '/usr/local/j2sdk1.1.3/bin',
- '/usr/local/j2sdk1.1.2/bin',
- '/usr/local/j2sdk1.1.1/bin',
- env['ENV']['PATH'],
- ]
+ self._java_env[version] = env
+
+ def paths(patterns):
+ import glob
+ result = []
+ for p in patterns:
+ paths = glob.glob(p)
+ paths.sort()
+ result.extend(paths)
+ return result
+
+ if version:
+ patterns = [
+ '/usr/lib/jvm/*-%s*/bin' % version,
+ '/usr/local/j2sdk%s*/bin' % version,
+ ]
+ java_path = paths(patterns) + [env['ENV']['PATH']]
+ else:
+ patterns = [
+ '/usr/lib/jvm/*/bin',
+ '/usr/local/j2sdk*/bin',
+ ]
+ java_path = paths(patterns) + [env['ENV']['PATH']]
+
env['ENV']['PATH'] = string.join(java_path, os.pathsep)
return env['ENV']
+ def java_where_jar(self, version=None):
+ ENV = self.java_ENV(version)
+ if self.detect_tool('jar', ENV=ENV):
+ where_jar = self.detect('JAR', 'jar', ENV=ENV)
+ else:
+ where_jar = self.where_is('jar', ENV['PATH'])
+ if not where_jar:
+ self.skip_test("Could not find Java jar, skipping test(s).\n")
+ return where_jar
+
+ def java_where_java(self, version=None):
+ """
+ Return a path to the java executable.
+ """
+ ENV = self.java_ENV(version)
+ where_java = self.where_is('java', ENV['PATH'])
+ if not where_java:
+ self.skip_test("Could not find Java java, skipping test(s).\n")
+ return where_java
+
+ def java_where_javac(self, version=None):
+ """
+ Return a path to the javac compiler.
+ """
+ ENV = self.java_ENV(version)
+ if self.detect_tool('javac'):
+ where_javac = self.detect('JAVAC', 'javac', ENV=ENV)
+ else:
+ where_javac = self.where_is('javac', ENV['PATH'])
+ if not where_javac:
+ self.skip_test("Could not find Java javac, skipping test(s).\n")
+ self.run(program = where_javac,
+ arguments = '-version',
+ stderr=None,
+ status=None)
+ if version:
+ if string.find(self.stderr(), 'javac %s' % version) == -1:
+ fmt = "Could not find javac for Java version %s, skipping test(s).\n"
+ self.skip_test(fmt % version)
+ else:
+ m = re.search(r'javac (\d\.\d)', self.stderr())
+ if m:
+ version = m.group(1)
+ else:
+ version = None
+ return where_javac, version
+
+ def java_where_javah(self, version=None):
+ ENV = self.java_ENV(version)
+ if self.detect_tool('javah'):
+ where_javah = self.detect('JAVAH', 'javah', ENV=ENV)
+ else:
+ where_javah = self.where_is('javah', ENV['PATH'])
+ if not where_javah:
+ self.skip_test("Could not find Java javah, skipping test(s).\n")
+ return where_javah
+
+ def java_where_rmic(self, version=None):
+ ENV = self.java_ENV(version)
+ if self.detect_tool('rmic'):
+ where_rmic = self.detect('RMIC', 'rmic', ENV=ENV)
+ else:
+ where_rmic = self.where_is('rmic', ENV['PATH'])
+ if not where_rmic:
+ self.skip_test("Could not find Java rmic, skipping non-simulated test(s).\n")
+ return where_rmic
+
def Qt_dummy_installation(self, dir='qt'):
# create a dummy qt installation
@@ -840,6 +921,22 @@ print "self._msvs_versions =", str(env['MSVS']['VERSIONS'])
else:
return distutils.sysconfig.get_python_inc()
+ def wait_for(self, fname, timeout=10.0, popen=None):
+ """
+ Waits for the specified file name to exist.
+ """
+ waited = 0.0
+ while not os.path.exists(fname):
+ if timeout and waited >= timeout:
+ sys.stderr.write('timed out waiting for %s to exist\n' % fname)
+ if popen:
+ popen.stdin.close()
+ self.status = 1
+ self.finish(popen)
+ self.fail_test()
+ time.sleep(1.0)
+ waited = waited + 1.0
+
# In some environments, $AR will generate a warning message to stderr
# if the library doesn't previously exist and is being created. One
# way to fix this is to tell AR to be quiet (sometimes the 'c' flag),
diff --git a/QMTest/TestSCons_time.py b/QMTest/TestSCons_time.py
index 102181e..f3ea49a 100644
--- a/QMTest/TestSCons_time.py
+++ b/QMTest/TestSCons_time.py
@@ -246,6 +246,30 @@ class TestSCons_time(TestCommon):
self.write(python_name, profile_py % d)
self.run(program = python_name, interpreter = sys.executable)
+ def tempdir_re(self, *args):
+ """
+ Returns a regular expression to match a scons-time
+ temporary directory.
+ """
+ import re
+ import tempfile
+
+ sep = re.escape(os.sep)
+ tempdir = tempfile.gettempdir()
+
+ try:
+ realpath = os.path.realpath
+ except AttributeError:
+ pass
+ else:
+ tempdir = realpath(tempdir)
+
+ args = (tempdir, 'scons-time-',) + args
+ x = apply(os.path.join, args)
+ x = re.escape(x)
+ x = string.replace(x, 'time\\-', 'time\\-[^%s]*' % sep)
+ return x
+
def write_fake_aegis_py(self, name):
name = self.workpath(name)
self.write(name, aegis_py)