summaryrefslogtreecommitdiffstats
path: root/src/engine/SCons
diff options
context:
space:
mode:
authorSteven Knight <knight@baldmt.com>2004-09-17 12:52:09 (GMT)
committerSteven Knight <knight@baldmt.com>2004-09-17 12:52:09 (GMT)
commit4b31e93c1ad6509ef1998955673696b121faad40 (patch)
tree806253836114289fdf9b129dc945b68e493e2663 /src/engine/SCons
parent8f74c71eb3b45e68bc9d90e5049353c73c0d228a (diff)
downloadSCons-4b31e93c1ad6509ef1998955673696b121faad40.zip
SCons-4b31e93c1ad6509ef1998955673696b121faad40.tar.gz
SCons-4b31e93c1ad6509ef1998955673696b121faad40.tar.bz2
Fix problems with Parallel Tasks and Exception handling. (Kevin Quick)
Diffstat (limited to 'src/engine/SCons')
-rw-r--r--src/engine/SCons/Job.py65
-rw-r--r--src/engine/SCons/JobTests.py141
-rw-r--r--src/engine/SCons/Taskmaster.py57
-rw-r--r--src/engine/SCons/TaskmasterTests.py43
4 files changed, 239 insertions, 67 deletions
diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py
index bbe0a2d..2d4dbf8 100644
--- a/src/engine/SCons/Job.py
+++ b/src/engine/SCons/Job.py
@@ -179,11 +179,9 @@ else:
def get(self, block = 1):
"""Remove and return a result tuple from the results queue."""
return self.resultsQueue.get(block)
-
- def get_nowait(self):
- """Remove and result a result tuple from the results queue
- without blocking."""
- return self.get(0)
+
+ def preparation_failed(self, obj):
+ self.resultsQueue.put((obj, 0))
class Parallel:
"""This class is used to execute tasks in parallel, and is somewhat
@@ -213,7 +211,6 @@ else:
self.taskmaster = taskmaster
self.tp = ThreadPool(num)
- self.jobs = 0
self.maxjobs = num
def start(self):
@@ -222,8 +219,25 @@ else:
more tasks. If a task fails to execute (i.e. execute() raises
an exception), then the job will stop."""
+ jobs = 0
+
while 1:
- if self.jobs < self.maxjobs:
+
+ # There's a concern here that the while-loop test below
+ # might delay reporting status back about failed build
+ # tasks until the entire build is done if tasks execute
+ # fast enough, or self.maxjobs is big enough. It looks
+ # like that's enough of a corner case that we'll wait to
+ # see if it's an issue in practice. If so, one possible
+ # fix might be:
+ #
+ # while jobs < self.maxjobs and \
+ # self.tp.resultsQueue.empty():
+ #
+ # but that's somewhat unattractive because the
+ # resultsQueue.empty() check might introduce some
+ # significant overhead involving mutex locking.
+ while jobs < self.maxjobs:
task = self.taskmaster.next_task()
if task is None:
break
@@ -234,26 +248,25 @@ else:
except KeyboardInterrupt:
raise
except:
- # Let the failed() callback function arrange for the
- # build to stop if that's appropriate.
- task.failed()
+ # Let the failed() callback function arrange
+ # for the build to stop if that's appropriate.
+ task.exception_set()
+ self.tp.preparation_failed(task)
+ jobs = jobs + 1
+ continue
# dispatch task
self.tp.put(task)
- self.jobs = self.jobs + 1
+ jobs = jobs + 1
- while 1:
- try:
- task, ok = self.tp.get_nowait()
- except Queue.Empty:
- if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
- break
- task, ok = self.tp.get()
-
- self.jobs = self.jobs - 1
- if ok:
- task.executed()
- else:
- task.failed()
-
- task.postprocess()
+ if not task and not jobs: break
+
+ task, ok = self.tp.get()
+
+ jobs = jobs - 1
+ if ok:
+ task.executed()
+ else:
+ task.failed()
+
+ task.postprocess()
diff --git a/src/engine/SCons/JobTests.py b/src/engine/SCons/JobTests.py
index d216464..7dc8658 100644
--- a/src/engine/SCons/JobTests.py
+++ b/src/engine/SCons/JobTests.py
@@ -307,6 +307,145 @@ class ParallelExceptionTestCase(unittest.TestCase):
self.failUnless(taskmaster.num_postprocessed >= 1,
"one or more tasks should have been postprocessed")
+#---------------------------------------------------------------------
+# Above tested Job object with contrived Task and Taskmaster objects.
+# Now test Job object with actual Task and Taskmaster objects.
+
+import SCons.Taskmaster
+import SCons.Node
+import time
+
+
+class testnode (SCons.Node.Node):
+ def __init__(self):
+ SCons.Node.Node.__init__(self)
+ self.expect_to_be = SCons.Node.executed
+
+class goodnode (testnode):
+ pass
+
+class slowgoodnode (goodnode):
+ def prepare(self):
+ # Delay to allow scheduled Jobs to run while the dispatcher
+ # sleeps. Keep this short because it affects the time taken
+ # by this test.
+ time.sleep(0.15)
+ goodnode.prepare(self)
+
+class badnode (goodnode):
+ def __init__(self):
+ goodnode.__init__(self)
+ self.expect_to_be = SCons.Node.failed
+ def build(self, **kw):
+ raise 'badnode exception'
+
+class slowbadnode (badnode):
+ def build(self, **kw):
+ # Appears to take a while to build, allowing faster builds to
+ # overlap. Time duration is not especially important, but if
+ # it is faster than slowgoodnode then these could complete
+ # while the scheduler is sleeping.
+ time.sleep(0.05)
+ raise 'slowbadnode exception'
+
+class badpreparenode (badnode):
+ def prepare(self):
+ raise 'badpreparenode exception'
+
+class _SConsTaskTest(unittest.TestCase):
+
+ def _test_seq(self, num_jobs):
+ for node_seq in [
+ [goodnode],
+ [badnode],
+ [slowbadnode],
+ [slowgoodnode],
+ [badpreparenode],
+ [goodnode, badnode],
+ [slowgoodnode, badnode],
+ [goodnode, slowbadnode],
+ [goodnode, goodnode, goodnode, slowbadnode],
+ [goodnode, slowbadnode, badpreparenode, slowgoodnode],
+ [goodnode, slowbadnode, slowgoodnode, badnode]
+ ]:
+
+ self._do_test(num_jobs, node_seq)
+
+ def _do_test(self, num_jobs, node_seq):
+
+ testnodes = []
+ for tnum in range(num_tasks):
+ testnodes.append(node_seq[tnum % len(node_seq)]())
+
+ taskmaster = SCons.Taskmaster.Taskmaster(testnodes)
+ jobs = SCons.Job.Jobs(num_jobs, taskmaster)
+
+ # Exceptions thrown by tasks are not actually propagated to
+ # this level, but are instead stored in the Taskmaster.
+
+ jobs.run()
+
+ # Now figure out if tests proceeded correctly. The first test
+ # that fails will shutdown the initiation of subsequent tests,
+ # but any tests currently queued for execution will still be
+ # processed, and any tests that completed before the failure
+ # would have resulted in new tests being queued for execution.
+
+ # Apply the following operational heuristics of Job.py:
+ # 0) An initial jobset of tasks will be queued before any
+ # good/bad results are obtained (from "execute" of task in
+ # thread).
+ # 1) A goodnode will complete immediately on its thread and
+ # allow another node to be queued for execution.
+ # 2) A badnode will complete immediately and suppress any
+ # subsequent execution queuing, but all currently queued
+ # tasks will still be processed.
+ # 3) A slowbadnode will fail later. It will block slots in
+ # the job queue. Nodes that complete immediately will
+ # allow other nodes to be queued in their place, and this
+ # will continue until either (#2) above or until all job
+ # slots are filled with slowbadnode entries.
+
+ # One approach to validating this test would be to try to
+ # determine exactly how many nodes executed, how many didn't,
+ # and the results of each, and then to assert failure on any
+ # mismatch (including the total number of built nodes).
+ # However, while this is possible to do for a single-processor
+ # system, it is nearly impossible to predict correctly for a
+ # multi-processor system and still test the characteristics of
+ # delayed execution nodes. Stated another way, multithreading
+ # is inherently non-deterministic unless you can completely
+ # characterize the entire system, and since that's not
+ # possible here, we shouldn't try.
+
+ # Therefore, this test will simply scan the set of nodes to
+ # see if the node was executed or not and if it was executed
+ # that it obtained the expected value for that node
+ # (i.e. verifying we don't get failure crossovers or
+ # mislabelling of results).
+
+ for N in testnodes:
+ self.failUnless(N.get_state() in [None, N.expect_to_be],
+ "node ran but got unexpected result")
+
+ self.failUnless(filter(lambda N: N.get_state(), testnodes),
+ "no nodes ran at all.")
+
+
+class SerialTaskTest(_SConsTaskTest):
+ def runTest(self):
+ "test serial jobs with actual Taskmaster and Task"
+ self._test_seq(1)
+
+
+class ParallelTaskTest(_SConsTaskTest):
+ def runTest(self):
+ "test parallel jobs with actual Taskmaster and Task"
+ self._test_seq(num_jobs)
+
+
+
+#---------------------------------------------------------------------
def suite():
suite = unittest.TestSuite()
@@ -315,6 +454,8 @@ def suite():
suite.addTest(NoParallelTestCase())
suite.addTest(SerialExceptionTestCase())
suite.addTest(ParallelExceptionTestCase())
+ suite.addTest(SerialTaskTest())
+ suite.addTest(ParallelTaskTest())
return suite
if __name__ == "__main__":
diff --git a/src/engine/SCons/Taskmaster.py b/src/engine/SCons/Taskmaster.py
index 8f9839e..dab5026 100644
--- a/src/engine/SCons/Taskmaster.py
+++ b/src/engine/SCons/Taskmaster.py
@@ -58,6 +58,7 @@ class Task:
self.targets = targets
self.top = top
self.node = node
+ self.exc_clear()
def display(self, message):
"""Allow the calling interface to display a message
@@ -73,7 +74,7 @@ class Task:
# Now that it's the appropriate time, give the TaskMaster a
# chance to raise any exceptions it encountered while preparing
# this task.
- self.tm.exception_raise()
+ self.exception_raise()
if self.tm.message:
self.display(self.tm.message)
@@ -209,14 +210,25 @@ class Task:
t.postprocess()
def exc_info(self):
- return self.tm.exception
+ return self.exception
def exc_clear(self):
- self.tm.exception_clear()
+ self.exception = (None, None, None)
+ self.exception_raise = self._no_exception_to_raise
- def exception_set(self):
- self.tm.exception_set()
+ def exception_set(self, exception=None):
+ if not exception:
+ exception = sys.exc_info()
+ self.exception = exception
+ self.exception_raise = self._exception_raise
+ def _no_exception_to_raise(self):
+ pass
+
+ def _exception_raise(self):
+ """Raise a pending exception that was recorded while
+ getting a Task ready for execution."""
+ self.tm.exception_raise(self.exc_info())
def order(dependencies):
@@ -240,7 +252,6 @@ class Taskmaster:
self.tasker = tasker
self.ready = None # the next task that is ready to be executed
self.order = order
- self.exception_clear()
self.message = None
def _find_next_ready_node(self):
@@ -249,6 +260,8 @@ class Taskmaster:
if self.ready:
return
+ self.ready_exc = None
+
while self.candidates:
node = self.candidates[-1]
state = node.get_state()
@@ -266,7 +279,7 @@ class Taskmaster:
except SystemExit:
exc_value = sys.exc_info()[1]
e = SCons.Errors.ExplicitExit(node, exc_value.code)
- self.exception_set((SCons.Errors.ExplicitExit, e))
+ self.ready_exc = (SCons.Errors.ExplicitExit, e)
self.candidates.pop()
self.ready = node
break
@@ -277,7 +290,7 @@ class Taskmaster:
# children (like a child couldn't be linked in to a
# BuildDir, or a Scanner threw something). Arrange to
# raise the exception when the Task is "executed."
- self.exception_set()
+ self.ready_exc = sys.exc_info()
self.candidates.pop()
self.ready = node
break
@@ -313,7 +326,7 @@ class Taskmaster:
# the kids are derived (like a child couldn't be linked
# from a repository). Arrange to raise the exception
# when the Task is "executed."
- self.exception_set()
+ self.ready_exc = sys.exc_info()
self.candidates.pop()
self.ready = node
break
@@ -397,8 +410,13 @@ class Taskmaster:
# a child couldn't be linked in to a BuildDir when deciding
# whether this node is current). Arrange to raise the
# exception when the Task is "executed."
- self.exception_set()
+ self.ready_exc = sys.exc_info()
+
+ if self.ready_exc:
+ task.exception_set(self.ready_exc)
+
self.ready = None
+ self.ready_exc = None
return task
@@ -442,21 +460,6 @@ class Taskmaster:
self.candidates.extend(self.pending)
self.pending = []
- def exception_set(self, exception=None):
- if exception is None:
- exception = sys.exc_info()
- self.exception = exception
- self.exception_raise = self._exception_raise
-
- def exception_clear(self):
- self.exception = (None, None, None)
- self.exception_raise = self._no_exception_to_raise
-
- def _no_exception_to_raise(self):
- pass
-
- def _exception_raise(self):
- """Raise a pending exception that was recorded while
- getting a Task ready for execution."""
- exc_type, exc_value = self.exception[:2]
+ def exception_raise(self, exception):
+ exc_type, exc_value = exception[:2]
raise exc_type, exc_value
diff --git a/src/engine/SCons/TaskmasterTests.py b/src/engine/SCons/TaskmasterTests.py
index ef7f51a..8865e09 100644
--- a/src/engine/SCons/TaskmasterTests.py
+++ b/src/engine/SCons/TaskmasterTests.py
@@ -478,7 +478,7 @@ class TaskmasterTestCase(unittest.TestCase):
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster(targets = [n1], tasker = MyTask)
t = tm.next_task()
- exc_type, exc_value, exc_tb = tm.exception
+ exc_type, exc_value, exc_tb = t.exception
assert exc_type == MyException, repr(exc_type)
assert str(exc_value) == "from make_ready()", exc_value
@@ -557,14 +557,14 @@ class TaskmasterTestCase(unittest.TestCase):
n1 = StopNode("n1")
tm = SCons.Taskmaster.Taskmaster([n1])
t = tm.next_task()
- exc_type, exc_value, exc_tb = tm.exception
+ exc_type, exc_value, exc_tb = t.exception
assert exc_type == SCons.Errors.StopError, repr(exc_type)
assert str(exc_value) == "stop!", exc_value
n2 = ExitNode("n2")
tm = SCons.Taskmaster.Taskmaster([n2])
t = tm.next_task()
- exc_type, exc_value = tm.exception
+ exc_type, exc_value = t.exception
assert exc_type == SCons.Errors.ExplicitExit, repr(exc_type)
assert exc_value.node == n2, exc_value.node
assert exc_value.status == 77, exc_value.status
@@ -741,8 +741,8 @@ class TaskmasterTestCase(unittest.TestCase):
built_text = None
n5 = Node("n5")
tm = SCons.Taskmaster.Taskmaster([n5])
- tm.exception_set((MyException, "exception value"))
t = tm.next_task()
+ t.exception_set((MyException, "exception value"))
exc_caught = None
try:
t.prepare()
@@ -880,19 +880,20 @@ class TaskmasterTestCase(unittest.TestCase):
"""
n1 = Node("n1")
tm = SCons.Taskmaster.Taskmaster([n1])
+ t = tm.next_task()
- tm.exception_set((1, 2))
- exc_type, exc_value = tm.exception
+ t.exception_set((1, 2))
+ exc_type, exc_value = t.exception
assert exc_type == 1, exc_type
assert exc_value == 2, exc_value
- tm.exception_set(3)
- assert tm.exception == 3
+ t.exception_set(3)
+ assert t.exception == 3
try: 1/0
except: pass
- tm.exception_set(None)
- exc_type, exc_value, exc_tb = tm.exception
+ t.exception_set(None)
+ exc_type, exc_value, exc_tb = t.exception
assert exc_type is ZeroDivisionError, exc_type
exception_values = [
"integer division or modulo",
@@ -900,9 +901,9 @@ class TaskmasterTestCase(unittest.TestCase):
]
assert str(exc_value) in exception_values, exc_value
- tm.exception_set(("exception 1", None))
+ t.exception_set(("exception 1", None))
try:
- tm.exception_raise()
+ t.exception_raise()
except:
exc_type, exc_value = sys.exc_info()[:2]
assert exc_type == "exception 1", exc_type
@@ -910,9 +911,9 @@ class TaskmasterTestCase(unittest.TestCase):
else:
assert 0, "did not catch expected exception"
- tm.exception_set(("exception 2", "xyzzy"))
+ t.exception_set(("exception 2", "xyzzy"))
try:
- tm.exception_raise()
+ t.exception_raise()
except:
exc_type, exc_value = sys.exc_info()[:2]
assert exc_type == "exception 2", exc_type
@@ -920,6 +921,20 @@ class TaskmasterTestCase(unittest.TestCase):
else:
assert 0, "did not catch expected exception"
+ t.exception_set(("exception 3", "XYZZY"))
+ def fw_exc(exc):
+ raise 'exception_forwarded', exc
+ tm.exception_raise = fw_exc
+ try:
+ t.exception_raise()
+ except:
+ exc_type, exc_value = sys.exc_info()[:2]
+ assert exc_type == 'exception_forwarded', exc_type
+ assert exc_value[0] == "exception 3", exc_value[0]
+ assert exc_value[1] == "XYZZY", exc_value[1]
+ else:
+ assert 0, "did not catch expected exception"
+
def test_postprocess(self):
"""Test postprocessing targets to give them a chance to clean up