summaryrefslogtreecommitdiffstats
path: root/QMTest/TestCmd.py
diff options
context:
space:
mode:
Diffstat (limited to 'QMTest/TestCmd.py')
-rw-r--r--QMTest/TestCmd.py427
1 files changed, 355 insertions, 72 deletions
diff --git a/QMTest/TestCmd.py b/QMTest/TestCmd.py
index f5e1c71..8bf054b 100644
--- a/QMTest/TestCmd.py
+++ b/QMTest/TestCmd.py
@@ -181,12 +181,12 @@ version.
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
__author__ = "Steven Knight <knight at baldmt dot com>"
-__revision__ = "TestCmd.py 0.30.D001 2007/10/01 16:53:55 knight"
-__version__ = "0.30"
+__revision__ = "TestCmd.py 0.31.D001 2008/01/01 09:05:59 knight"
+__version__ = "0.31"
+import errno
import os
import os.path
-import popen2
import re
import shutil
import stat
@@ -457,6 +457,252 @@ else:
default_sleep_seconds = 1
+
+
+try:
+ import subprocess
+except ImportError:
+ # The subprocess module doesn't exist in this version of Python,
+ # so we're going to cobble up something that looks just enough
+ # like its API for our purposes below.
+ import new
+
+ subprocess = new.module('subprocess')
+
+ subprocess.PIPE = 'PIPE'
+ subprocess.STDOUT = 'STDOUT'
+ subprocess.mswindows = (sys.platform == 'win32')
+
+ try:
+ import popen2
+ popen2.Popen3
+ except AttributeError:
+ class Popen3:
+ universal_newlines = 1
+ def __init__(self, command, **kw):
+ if sys.platform == 'win32' and command[0] == '"':
+ command = '"' + command + '"'
+ (stdin, stdout, stderr) = os.popen3(' ' + command)
+ self.stdin = stdin
+ self.stdout = stdout
+ self.stderr = stderr
+ def close_output(self):
+ self.stdout.close()
+ self.resultcode = self.stderr.close()
+ def wait(self):
+ return self.resultcode
+
+ else:
+ try:
+ popen2.Popen4
+ except AttributeError:
+ # A cribbed Popen4 class, with some retrofitted code from
+ # the Python 1.5 Popen3 class methods to do certain things
+ # by hand.
+ class Popen4(popen2.Popen3):
+ childerr = None
+
+ def __init__(self, cmd, bufsize=-1):
+ p2cread, p2cwrite = os.pipe()
+ c2pread, c2pwrite = os.pipe()
+ self.pid = os.fork()
+ if self.pid == 0:
+ # Child
+ os.dup2(p2cread, 0)
+ os.dup2(c2pwrite, 1)
+ os.dup2(c2pwrite, 2)
+ for i in range(3, popen2.MAXFD):
+ try:
+ os.close(i)
+ except: pass
+ try:
+ os.execvp(cmd[0], cmd)
+ finally:
+ os._exit(1)
+ # Shouldn't come here, I guess
+ os._exit(1)
+ os.close(p2cread)
+ self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
+ os.close(c2pwrite)
+ self.fromchild = os.fdopen(c2pread, 'r', bufsize)
+ popen2._active.append(self)
+
+ popen2.Popen4 = Popen4
+
+ class Popen3(popen2.Popen3, popen2.Popen4):
+ universal_newlines = 1
+ def __init__(self, command, **kw):
+ if kw.get('stderr') == 'STDOUT':
+ apply(popen2.Popen4.__init__, (self, command, 1))
+ else:
+ apply(popen2.Popen3.__init__, (self, command, 1))
+ self.stdin = self.tochild
+ self.stdout = self.fromchild
+ self.stderr = self.childerr
+
+ subprocess.Popen = Popen3
+
+
+
+# From Josiah Carlson,
+# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
+# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
+
+PIPE = subprocess.PIPE
+
+if subprocess.mswindows:
+ from win32file import ReadFile, WriteFile
+ from win32pipe import PeekNamedPipe
+ import msvcrt
+else:
+ import select
+ import fcntl
+
+ try: fcntl.F_GETFL
+ except AttributeError: fcntl.F_GETFL = 3
+
+ try: fcntl.F_SETFL
+ except AttributeError: fcntl.F_SETFL = 4
+
+class Popen(subprocess.Popen):
+ def recv(self, maxsize=None):
+ return self._recv('stdout', maxsize)
+
+ def recv_err(self, maxsize=None):
+ return self._recv('stderr', maxsize)
+
+ def send_recv(self, input='', maxsize=None):
+ return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
+
+ def get_conn_maxsize(self, which, maxsize):
+ if maxsize is None:
+ maxsize = 1024
+ elif maxsize < 1:
+ maxsize = 1
+ return getattr(self, which), maxsize
+
+ def _close(self, which):
+ getattr(self, which).close()
+ setattr(self, which, None)
+
+ if subprocess.mswindows:
+ def send(self, input):
+ if not self.stdin:
+ return None
+
+ try:
+ x = msvcrt.get_osfhandle(self.stdin.fileno())
+ (errCode, written) = WriteFile(x, input)
+ except ValueError:
+ return self._close('stdin')
+ except (subprocess.pywintypes.error, Exception), why:
+ if why[0] in (109, errno.ESHUTDOWN):
+ return self._close('stdin')
+ raise
+
+ return written
+
+ def _recv(self, which, maxsize):
+ conn, maxsize = self.get_conn_maxsize(which, maxsize)
+ if conn is None:
+ return None
+
+ try:
+ x = msvcrt.get_osfhandle(conn.fileno())
+ (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
+ if maxsize < nAvail:
+ nAvail = maxsize
+ if nAvail > 0:
+ (errCode, read) = ReadFile(x, nAvail, None)
+ except ValueError:
+ return self._close(which)
+ except (subprocess.pywintypes.error, Exception), why:
+ if why[0] in (109, errno.ESHUTDOWN):
+ return self._close(which)
+ raise
+
+ #if self.universal_newlines:
+ # read = self._translate_newlines(read)
+ return read
+
+ else:
+ def send(self, input):
+ if not self.stdin:
+ return None
+
+ if not select.select([], [self.stdin], [], 0)[1]:
+ return 0
+
+ try:
+ written = os.write(self.stdin.fileno(), input)
+ except OSError, why:
+ if why[0] == errno.EPIPE: #broken pipe
+ return self._close('stdin')
+ raise
+
+ return written
+
+ def _recv(self, which, maxsize):
+ conn, maxsize = self.get_conn_maxsize(which, maxsize)
+ if conn is None:
+ return None
+
+ try:
+ flags = fcntl.fcntl(conn, fcntl.F_GETFL)
+ except TypeError:
+ flags = None
+ else:
+ if not conn.closed:
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
+
+ try:
+ if not select.select([conn], [], [], 0)[0]:
+ return ''
+
+ r = conn.read(maxsize)
+ if not r:
+ return self._close(which)
+
+ #if self.universal_newlines:
+ # r = self._translate_newlines(r)
+ return r
+ finally:
+ if not conn.closed and not flags is None:
+ fcntl.fcntl(conn, fcntl.F_SETFL, flags)
+
+disconnect_message = "Other end disconnected!"
+
+def recv_some(p, t=.1, e=1, tr=5, stderr=0):
+ if tr < 1:
+ tr = 1
+ x = time.time()+t
+ y = []
+ r = ''
+ pr = p.recv
+ if stderr:
+ pr = p.recv_err
+ while time.time() < x or r:
+ r = pr()
+ if r is None:
+ if e:
+ raise Exception(disconnect_message)
+ else:
+ break
+ elif r:
+ y.append(r)
+ else:
+ time.sleep(max((x-time.time())/tr, 0))
+ return ''.join(y)
+
+def send_all(p, data):
+ while len(data):
+ sent = p.send(data)
+ if sent is None:
+ raise Exception(disconnect_message)
+ data = buffer(data, sent)
+
+
+
class TestCmd:
"""Class TestCmd
"""
@@ -703,26 +949,17 @@ class TestCmd:
dir = self.canonicalize(dir)
os.rmdir(dir)
- def run(self, program = None,
- interpreter = None,
- arguments = None,
- chdir = None,
- stdin = None,
- universal_newlines = None):
- """Runs a test of the program or script for the test
- environment. Standard output and error output are saved for
- future retrieval via the stdout() and stderr() methods.
+ def start(self, program = None,
+ interpreter = None,
+ arguments = None,
+ universal_newlines = None,
+ **kw):
+ """
+ Starts a program or script for the test environment.
The specified program will have the original directory
- prepending unless it is enclosed in a [list].
+ prepended unless it is enclosed in a [list].
"""
- if chdir:
- oldcwd = os.getcwd()
- if not os.path.isabs(chdir):
- chdir = os.path.join(self.workpath(chdir))
- if self.verbose:
- sys.stderr.write("chdir(" + chdir + ")\n")
- os.chdir(chdir)
if program:
if type(program) == type('') and not os.path.isabs(program):
program = os.path.join(self._cwd, program)
@@ -747,38 +984,56 @@ class TestCmd:
if universal_newlines is None:
universal_newlines = self.universal_newlines
- try:
- import subprocess
- except ImportError:
- try:
- Popen3 = popen2.Popen3
- except AttributeError:
- class Popen3:
- def __init__(self, command):
- (stdin, stdout, stderr) = os.popen3(' ' + command)
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- def close_output(self):
- self.stdout.close()
- self.resultcode = self.stderr.close()
- def wait(self):
- return self.resultcode
- if sys.platform == 'win32' and cmd_string[0] == '"':
- cmd_string = '"' + cmd_string + '"'
- p = Popen3(cmd_string)
- else:
- p = Popen3(cmd, 1)
- p.stdin = p.tochild
- p.stdout = p.fromchild
- p.stderr = p.childerr
+ combine = kw.get('combine', self.combine)
+ if combine:
+ stderr_value = subprocess.STDOUT
else:
- p = subprocess.Popen(cmd,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=universal_newlines)
+ stderr_value = subprocess.PIPE
+
+ return Popen(cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=stderr_value,
+ universal_newlines=universal_newlines)
+ def finish(self, popen, **kw):
+ """
+ Finishes and waits for the process being run under control of
+ the specified popen argument, recording the exit status,
+ standard output and error output.
+ """
+ popen.stdin.close()
+ self.status = popen.wait()
+ if not self.status:
+ self.status = 0
+ self._stdout.append(popen.stdout.read())
+ if popen.stderr:
+ stderr = popen.stderr.read()
+ else:
+ stderr = ''
+ self._stderr.append(stderr)
+
+ def run(self, program = None,
+ interpreter = None,
+ arguments = None,
+ chdir = None,
+ stdin = None,
+ universal_newlines = None):
+ """Runs a test of the program or script for the test
+ environment. Standard output and error output are saved for
+ future retrieval via the stdout() and stderr() methods.
+
+ The specified program will have the original directory
+ prepended unless it is enclosed in a [list].
+ """
+ if chdir:
+ oldcwd = os.getcwd()
+ if not os.path.isabs(chdir):
+ chdir = os.path.join(self.workpath(chdir))
+ if self.verbose:
+ sys.stderr.write("chdir(" + chdir + ")\n")
+ os.chdir(chdir)
+ p = self.start(program, interpreter, arguments, universal_newlines)
if stdin:
if is_List(stdin):
for line in stdin:
@@ -788,23 +1043,26 @@ class TestCmd:
p.stdin.close()
out = p.stdout.read()
- err = p.stderr.read()
+ if p.stderr is None:
+ err = ''
+ else:
+ err = p.stderr.read()
try:
- p.close_output()
+ close_output = p.close_output
except AttributeError:
p.stdout.close()
- p.stderr.close()
+ if not p.stderr is None:
+ p.stderr.close()
+ else:
+ close_output()
+
+ self._stdout.append(out)
+ self._stderr.append(err)
self.status = p.wait()
if not self.status:
self.status = 0
- if self.combine:
- self._stdout.append(out + err)
- else:
- self._stdout.append(out)
- self._stderr.append(err)
-
if chdir:
os.chdir(oldcwd)
if self.verbose >= 2:
@@ -990,18 +1248,24 @@ class TestCmd:
def readable(self, top, read=1):
"""Make the specified directory tree readable (read == 1)
or not (read == None).
+
+ This method has no effect on Windows systems, which use a
+ completely different mechanism to control file readability.
"""
+ if sys.platform == 'win32':
+ return
+
if read:
def do_chmod(fname):
try: st = os.stat(fname)
except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0400))
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
else:
def do_chmod(fname):
try: st = os.stat(fname)
except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0400))
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
if os.path.isfile(top):
# If it's a file, that's easy, just chmod it.
@@ -1040,16 +1304,29 @@ class TestCmd:
or not (write == None).
"""
- if write:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
+ if sys.platform == 'win32':
+
+ if write:
+ def do_chmod(fname):
+ try: os.chmod(fname, stat.S_IWRITE)
+ except OSError: pass
+ else:
+ def do_chmod(fname):
+ try: os.chmod(fname, stat.S_IREAD)
+ except OSError: pass
+
else:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
+
+ if write:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
+ else:
+ def do_chmod(fname):
+ try: st = os.stat(fname)
+ except OSError: pass
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
if os.path.isfile(top):
do_chmod(top)
@@ -1061,18 +1338,24 @@ class TestCmd:
def executable(self, top, execute=1):
"""Make the specified directory tree executable (execute == 1)
or not (execute == None).
+
+ This method has no effect on Windows systems, which use a
+ completely different mechanism to control file executability.
"""
+ if sys.platform == 'win32':
+ return
+
if execute:
def do_chmod(fname):
try: st = os.stat(fname)
except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0100))
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
else:
def do_chmod(fname):
try: st = os.stat(fname)
except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0100))
+ else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
if os.path.isfile(top):
# If it's a file, that's easy, just chmod it.