summaryrefslogtreecommitdiffstats
path: root/src/engine
diff options
context:
space:
mode:
authorSteven Knight <knight@baldmt.com>2003-10-06 21:41:15 (GMT)
committerSteven Knight <knight@baldmt.com>2003-10-06 21:41:15 (GMT)
commitb462ddcce80024755db7fbd667c9fb122dad98c9 (patch)
treec056f407f3f50a2e8e851ea8232921ff3aea1124 /src/engine
parent9afb4a60206be177cb061406f8c427c094027f11 (diff)
downloadSCons-b462ddcce80024755db7fbd667c9fb122dad98c9.zip
SCons-b462ddcce80024755db7fbd667c9fb122dad98c9.tar.gz
SCons-b462ddcce80024755db7fbd667c9fb122dad98c9.tar.bz2
New parallel job execution. (J.T. Conklin)
Diffstat (limited to 'src/engine')
-rw-r--r--src/engine/SCons/Job.py248
-rw-r--r--src/engine/SCons/JobTests.py6
-rw-r--r--src/engine/SCons/Taskmaster.py2
-rw-r--r--src/engine/SCons/TaskmasterTests.py33
4 files changed, 141 insertions, 148 deletions
diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py
index 87a1bc2..202f86f 100644
--- a/src/engine/SCons/Job.py
+++ b/src/engine/SCons/Job.py
@@ -31,43 +31,29 @@ stop, and wait on jobs.
__revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__"
-import time
-
class Jobs:
"""An instance of this class initializes N jobs, and provides
methods for starting, stopping, and waiting on all N jobs.
"""
-
+
def __init__(self, num, taskmaster):
"""
create 'num' jobs using the given taskmaster.
If 'num' is 1 or less, then a serial job will be used,
- otherwise 'num' parallel jobs will be used.
+ otherwise a parallel job with 'num' worker threads will
+ be used.
"""
- # Keeps track of keyboard interrupts:
- self.keyboard_interrupt = 0
-
if num > 1:
- self.jobs = []
- for i in range(num):
- self.jobs.append(Parallel(taskmaster, self))
+ self.job = Parallel(taskmaster, num)
else:
- self.jobs = [Serial(taskmaster, self)]
-
- self.running = []
+ self.job = Serial(taskmaster)
def run(self):
- """run the jobs, and wait for them to finish"""
-
+ """run the job"""
try:
- for job in self.jobs:
- job.start()
- self.running.append(job)
- while self.running:
- self.running[-1].wait()
- self.running.pop()
+ self.job.start()
except KeyboardInterrupt:
# mask any further keyboard interrupts so that scons
# can shutdown cleanly:
@@ -75,19 +61,8 @@ class Jobs:
# child processes can still get the keyboard interrupt)
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
+ raise
- for job in self.running:
- job.keyboard_interrupt()
- else:
- self.keyboard_interrupt = 1
-
- # wait on any remaining jobs to finish:
- for job in self.running:
- job.wait()
-
- if self.keyboard_interrupt:
- raise KeyboardInterrupt
-
class Serial:
"""This class is used to execute tasks in series, and is more efficient
than Parallel, but is only appropriate for non-parallel builds. Only
@@ -96,7 +71,7 @@ class Serial:
This class is not thread safe.
"""
- def __init__(self, taskmaster, jobs):
+ def __init__(self, taskmaster):
"""Create a new serial job given a taskmaster.
The taskmaster's next_task() method should return the next task
@@ -107,16 +82,14 @@ class Serial:
is_blocked() method will not be called. """
self.taskmaster = taskmaster
- self.jobs = jobs
def start(self):
-
"""Start the job. This will begin pulling tasks from the taskmaster
and executing them, and return when there are no more tasks. If a task
fails to execute (i.e. execute() raises an exception), then the job will
stop."""
- while not self.jobs.keyboard_interrupt:
+ while 1:
task = self.taskmaster.next_task()
if task is None:
@@ -134,33 +107,80 @@ class Serial:
else:
task.executed()
- def wait(self):
- """Serial jobs are always finished when start() returns, so there
- is nothing to do here"""
- pass
-
- def keyboard_interrupt(self):
- self.jobs.keyboard_interrupt = 1
+# Trap import failure so that everything in the Job module but the
+# Parallel class (and its dependent classes) will work if the interpreter
+# doesn't support threads.
+try:
+ import Queue
+ import threading
+except ImportError:
+ pass
+
+class Worker(threading.Thread):
+ """A worker thread waits on a task to be posted to its request queue,
+ dequeues the task, executes it, and posts a tuple including the task
+ and a boolean indicating whether the task executed successfully. """
-# The will hold a condition variable once the first parallel task
-# is created.
-cv = None
+ def __init__(self, requestQueue, resultsQueue):
+ threading.Thread.__init__(self)
+ self.setDaemon(1)
+ self.requestQueue = requestQueue
+ self.resultsQueue = resultsQueue
+ self.start()
+
+ def run(self):
+ while 1:
+ task = self.requestQueue.get()
+
+ try:
+ task.execute()
+ except:
+ ok = False
+ else:
+ ok = True
+
+ self.resultsQueue.put((task, ok))
+
+class ThreadPool:
+ """This class is responsible for spawning and managing worker threads."""
+
+ def __init__(self, num):
+ """Create the request and reply queues, and 'num' worker threads."""
+ # Ideally we wouldn't have to artificially limit the number of
+ # tasks that can be posted to the request queue. But this can
+ # result in a large number of pending tasks, which at the time
+ # of this writing causes the taskmaster's next_task method to
+ # take a very long time.
+ self.requestQueue = Queue.Queue(num)
+ self.resultsQueue = Queue.Queue()
+
+ # Create worker threads
+ for i in range(num):
+ worker = Worker(self.requestQueue, self.resultsQueue)
+
+ def put(self, obj):
+ """Put task into request queue."""
+ self.requestQueue.put(obj)
+
+ def get(self, block = 1):
+ """Remove and return a result tuple from the results queue."""
+ return self.resultsQueue.get(block)
+
+ def get_nowait(self):
+ """Remove and result a result tuple from the results queue
+ without blocking."""
+ return self.get(False)
class Parallel:
- """This class is used to execute tasks in parallel, and is less
- efficient than Serial, but is appropriate for parallel builds. Create
- an instance of this class for each job or thread you want.
+ """This class is used to execute tasks in parallel, and is somewhat
+ less efficient than Serial, but is appropriate for parallel builds.
This class is thread safe.
"""
-
- def __init__(self, taskmaster, jobs):
- """Create a new parallel job given a taskmaster, and a Jobs instance.
- Multiple jobs will be using the taskmaster in parallel, but all
- method calls to taskmaster methods are serialized by the jobs
- themselves.
+ def __init__(self, taskmaster, num):
+ """Create a new parallel job given a taskmaster.
The taskmaster's next_task() method should return the next task
that needs to be executed, or None if there are no more tasks. The
@@ -177,100 +197,42 @@ class Parallel:
of parallel jobs: they can execute multiple tasks
simultaneously. """
- global cv
-
- # import threading here so that everything in the Job module
- # but the Parallel class will work if the interpreter doesn't
- # support threads
- import threading
-
self.taskmaster = taskmaster
- self.jobs = jobs
- self.thread = threading.Thread(None, self.__run)
-
- if cv is None:
- cv = threading.Condition()
+ self.tp = ThreadPool(num)
def start(self):
- """Start the job. This will spawn a thread that will begin pulling
- tasks from the task master and executing them. This method returns
- immediately and doesn't wait for the jobs to be executed.
-
- To wait for the job to finish, call wait().
- """
- self.thread.start()
-
- def wait(self):
- """Wait for the job to finish. A job is finished when there
- are no more tasks.
-
- This method should only be called after start() has been called.
- """
-
- # Sleeping in a loop like this is lame. Calling
- # self.thread.join() would be much nicer, but
- # on Linux self.thread.join() doesn't always
- # return when a KeyboardInterrupt happens, and when
- # it does return, it causes Python to hang on shutdown.
- # In other words this is just
- # a work-around for some bugs/limitations in the
- # self.thread.join() method.
- while self.thread.isAlive():
- time.sleep(0.5)
+ """Start the job. This will begin pulling tasks from the
+ taskmaster and executing them, and return when there are no
+ more tasks. If a task fails to execute (i.e. execute() raises
+ an exception), then the job will stop."""
- def keyboard_interrupt(self):
- cv.acquire()
- self.jobs.keyboard_interrupt = 1
- cv.notifyAll()
- cv.release()
-
- def __run(self):
- """private method that actually executes the tasks"""
+ while 1:
+ task = self.taskmaster.next_task()
+ if task is None:
+ break
- cv.acquire()
+ # prepare task for execution
+ try:
+ task.prepare()
+ except KeyboardInterrupt:
+ raise
+ except:
+ # Let the failed() callback function arrange for the
+ # build to stop if that's appropriate.
+ task.failed()
- try:
+ # dispatch task
+ self.tp.put(task)
while 1:
- while (self.taskmaster.is_blocked() and
- not self.jobs.keyboard_interrupt):
- cv.wait(None)
-
- if self.jobs.keyboard_interrupt:
- break
-
- task = self.taskmaster.next_task()
-
- if task == None:
- break
-
try:
- task.prepare()
- cv.release()
- try:
- task.execute()
- finally:
- cv.acquire()
- except KeyboardInterrupt:
- self.jobs.keyboard_interrupt = 1
- except:
- # Let the failed() callback function arrange for
- # calling self.jobs.stop() to to stop the build
- # if that's appropriate.
- task.failed()
- else:
- task.executed()
-
- # signal the cv whether the task failed or not,
- # or otherwise the other Jobs might
- # remain blocked:
- if (not self.taskmaster.is_blocked() or
- self.jobs.keyboard_interrupt):
- cv.notifyAll()
-
- finally:
- cv.release()
-
-
-
+ task, ok = self.tp.get_nowait()
+ except Queue.Empty:
+ if not self.taskmaster.is_blocked():
+ break
+ task, ok = self.tp.get()
+ if ok:
+ task.executed()
+ else:
+ task.failed()
diff --git a/src/engine/SCons/JobTests.py b/src/engine/SCons/JobTests.py
index 58b98cf..2c84028 100644
--- a/src/engine/SCons/JobTests.py
+++ b/src/engine/SCons/JobTests.py
@@ -28,6 +28,7 @@ import random
import math
import SCons.Job
import sys
+import time
# a large number
num_sines = 10000
@@ -75,6 +76,7 @@ class Task:
# do something that will take some random amount of time:
for i in range(random.randrange(0, num_sines, 1)):
x = math.sin(i)
+ time.sleep(0.01)
self.was_executed = 1
@@ -169,6 +171,10 @@ class Taskmaster:
return self.num_iterated == self.num_tasks
def is_blocked(self):
+ if self.stop or self.all_tasks_are_executed():
+ return False
+ if self.all_tasks_are_iterated():
+ return True
# simulate blocking tasks
return self.num_iterated - self.num_executed >= max(num_jobs/2, 2)
diff --git a/src/engine/SCons/Taskmaster.py b/src/engine/SCons/Taskmaster.py
index 9b13b60..7760cfe 100644
--- a/src/engine/SCons/Taskmaster.py
+++ b/src/engine/SCons/Taskmaster.py
@@ -358,7 +358,7 @@ class Taskmaster:
def is_blocked(self):
self._find_next_ready_node()
- return not self.ready and self.pending
+ return not self.ready and (self.pending or self.executing)
def stop(self):
"""Stop the current build completely."""
diff --git a/src/engine/SCons/TaskmasterTests.py b/src/engine/SCons/TaskmasterTests.py
index 4dbf8b3..a394151 100644
--- a/src/engine/SCons/TaskmasterTests.py
+++ b/src/engine/SCons/TaskmasterTests.py
@@ -258,6 +258,8 @@ class TaskmasterTestCase(unittest.TestCase):
assert not tm.is_blocked()
t5 = tm.next_task()
assert t5.get_target() == n5, t5.get_target()
+ assert tm.is_blocked() # still executing t5
+ t5.executed()
assert not tm.is_blocked()
assert tm.next_task() == None
@@ -355,9 +357,10 @@ class TaskmasterTestCase(unittest.TestCase):
t.executed()
t = tm.next_task()
assert t.get_target() == n5
- assert not tm.is_blocked()
+ assert tm.is_blocked() # still executing n5
assert not tm.next_task()
t.executed()
+ assert not tm.is_blocked()
n1 = Node("n1")
n2 = Node("n2")
@@ -464,10 +467,32 @@ class TaskmasterTestCase(unittest.TestCase):
assert not tm.is_blocked()
class MyTM(SCons.Taskmaster.Taskmaster):
- def is_blocked(self):
- return 1
+ def _find_next_ready_node(self):
+ self.ready = 1
+ tm = MyTM()
+ assert not tm.is_blocked()
+
+ class MyTM(SCons.Taskmaster.Taskmaster):
+ def _find_next_ready_node(self):
+ self.ready = None
+ self.pending = []
+ self.executing = []
+ tm = MyTM()
+ assert not tm.is_blocked()
+
+ class MyTM(SCons.Taskmaster.Taskmaster):
+ def _find_next_ready_node(self):
+ self.ready = None
+ self.pending = [1]
+ tm = MyTM()
+ assert tm.is_blocked()
+
+ class MyTM(SCons.Taskmaster.Taskmaster):
+ def _find_next_ready_node(self):
+ self.ready = None
+ self.executing = [1]
tm = MyTM()
- assert tm.is_blocked() == 1
+ assert tm.is_blocked()
def test_stop(self):
"""Test the stop() method