summaryrefslogtreecommitdiffstats
path: root/src/engine/SCons/Job.py
diff options
context:
space:
mode:
authorSteven Knight <knight@baldmt.com>2003-10-22 03:15:44 (GMT)
committerSteven Knight <knight@baldmt.com>2003-10-22 03:15:44 (GMT)
commit8ea748f67747b589b407db59acbe15d62962ba33 (patch)
tree0177b25e280c6371d87d57211d667c9952a7440d /src/engine/SCons/Job.py
parent5711795d6f0f4dffbcfabc0d823024ca44313b27 (diff)
downloadSCons-8ea748f67747b589b407db59acbe15d62962ba33.zip
SCons-8ea748f67747b589b407db59acbe15d62962ba33.tar.gz
SCons-8ea748f67747b589b407db59acbe15d62962ba33.tar.bz2
Really handle lack of the threading.py module when run by non-threaded Pythons.
Diffstat (limited to 'src/engine/SCons/Job.py')
-rw-r--r--src/engine/SCons/Job.py225
1 files changed, 118 insertions, 107 deletions
diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py
index d26b73c..ee84e34 100644
--- a/src/engine/SCons/Job.py
+++ b/src/engine/SCons/Job.py
@@ -43,12 +43,23 @@ class Jobs:
If 'num' is 1 or less, then a serial job will be used,
otherwise a parallel job with 'num' worker threads will
be used.
+
+ The 'num_jobs' attribute will be set to the actual number of jobs
+ allocated. If more than one job is requested but the Parallel
+ class can't do it, it gets reset to 1. Wrapping interfaces that
+ care should check the value of 'num_jobs' after initialization.
"""
+ self.job = None
if num > 1:
- self.job = Parallel(taskmaster, num)
- else:
+ try:
+ self.job = Parallel(taskmaster, num)
+ self.num_jobs = num
+ except NameError:
+ pass
+ if self.job is None:
self.job = Serial(taskmaster)
+ self.num_jobs = 1
def run(self):
"""run the job"""
@@ -116,127 +127,127 @@ try:
import threading
except ImportError:
pass
+else:
+ class Worker(threading.Thread):
+ """A worker thread waits on a task to be posted to its request queue,
+ dequeues the task, executes it, and posts a tuple including the task
+ and a boolean indicating whether the task executed successfully. """
+
+ def __init__(self, requestQueue, resultsQueue):
+ threading.Thread.__init__(self)
+ self.setDaemon(1)
+ self.requestQueue = requestQueue
+ self.resultsQueue = resultsQueue
+ self.start()
+
+ def run(self):
+ while 1:
+ task = self.requestQueue.get()
-class Worker(threading.Thread):
- """A worker thread waits on a task to be posted to its request queue,
- dequeues the task, executes it, and posts a tuple including the task
- and a boolean indicating whether the task executed successfully. """
-
- def __init__(self, requestQueue, resultsQueue):
- threading.Thread.__init__(self)
- self.setDaemon(1)
- self.requestQueue = requestQueue
- self.resultsQueue = resultsQueue
- self.start()
-
- def run(self):
- while 1:
- task = self.requestQueue.get()
-
- try:
- task.execute()
- except KeyboardInterrupt:
- # be explicit here for test/interrupts.py
- ok = False
- except:
- ok = 0
- else:
- ok = 1
-
- self.resultsQueue.put((task, ok))
-
-class ThreadPool:
- """This class is responsible for spawning and managing worker threads."""
+ try:
+ task.execute()
+ except KeyboardInterrupt:
+ # be explicit here for test/interrupts.py
+ ok = False
+ except:
+ ok = 0
+ else:
+ ok = 1
- def __init__(self, num):
- """Create the request and reply queues, and 'num' worker threads."""
- self.requestQueue = Queue.Queue(0)
- self.resultsQueue = Queue.Queue(0)
+ self.resultsQueue.put((task, ok))
- # Create worker threads
- for i in range(num):
- worker = Worker(self.requestQueue, self.resultsQueue)
+ class ThreadPool:
+ """This class is responsible for spawning and managing worker threads."""
- def put(self, obj):
- """Put task into request queue."""
- self.requestQueue.put(obj)
+ def __init__(self, num):
+ """Create the request and reply queues, and 'num' worker threads."""
+ self.requestQueue = Queue.Queue(0)
+ self.resultsQueue = Queue.Queue(0)
- def get(self, block = 1):
- """Remove and return a result tuple from the results queue."""
- return self.resultsQueue.get(block)
-
- def get_nowait(self):
- """Remove and result a result tuple from the results queue
- without blocking."""
- return self.get(0)
+ # Create worker threads
+ for i in range(num):
+ worker = Worker(self.requestQueue, self.resultsQueue)
-class Parallel:
- """This class is used to execute tasks in parallel, and is somewhat
- less efficient than Serial, but is appropriate for parallel builds.
+ def put(self, obj):
+ """Put task into request queue."""
+ self.requestQueue.put(obj)
- This class is thread safe.
- """
+ def get(self, block = 1):
+ """Remove and return a result tuple from the results queue."""
+ return self.resultsQueue.get(block)
+
+ def get_nowait(self):
+ """Remove and result a result tuple from the results queue
+ without blocking."""
+ return self.get(0)
- def __init__(self, taskmaster, num):
- """Create a new parallel job given a taskmaster.
+ class Parallel:
+ """This class is used to execute tasks in parallel, and is somewhat
+ less efficient than Serial, but is appropriate for parallel builds.
- The taskmaster's next_task() method should return the next task
- that needs to be executed, or None if there are no more tasks. The
- taskmaster's executed() method will be called for each task when it
- is successfully executed or failed() will be called if the task
- failed to execute (i.e. execute() raised an exception). The
- taskmaster's is_blocked() method should return true iff there are
- more tasks, but they can't be executed until one or more other
- tasks have been executed. next_task() will be called iff
- is_blocked() returned false.
-
- Note: calls to taskmaster are serialized, but calls to execute() on
- distinct tasks are not serialized, because that is the whole point
- of parallel jobs: they can execute multiple tasks
- simultaneously. """
+ This class is thread safe.
+ """
- self.taskmaster = taskmaster
- self.tp = ThreadPool(num)
+ def __init__(self, taskmaster, num):
+ """Create a new parallel job given a taskmaster.
- self.jobs = 0
- self.maxjobs = num
+ The taskmaster's next_task() method should return the next task
+ that needs to be executed, or None if there are no more tasks. The
+ taskmaster's executed() method will be called for each task when it
+ is successfully executed or failed() will be called if the task
+ failed to execute (i.e. execute() raised an exception). The
+ taskmaster's is_blocked() method should return true iff there are
+ more tasks, but they can't be executed until one or more other
+ tasks have been executed. next_task() will be called iff
+ is_blocked() returned false.
- def start(self):
- """Start the job. This will begin pulling tasks from the
- taskmaster and executing them, and return when there are no
- more tasks. If a task fails to execute (i.e. execute() raises
- an exception), then the job will stop."""
+ Note: calls to taskmaster are serialized, but calls to execute() on
+ distinct tasks are not serialized, because that is the whole point
+ of parallel jobs: they can execute multiple tasks
+ simultaneously. """
- while 1:
- if self.jobs < self.maxjobs:
- task = self.taskmaster.next_task()
- if task is None:
- break
+ self.taskmaster = taskmaster
+ self.tp = ThreadPool(num)
- # prepare task for execution
- try:
- task.prepare()
- except KeyboardInterrupt:
- raise
- except:
- # Let the failed() callback function arrange for the
- # build to stop if that's appropriate.
- task.failed()
+ self.jobs = 0
+ self.maxjobs = num
- # dispatch task
- self.tp.put(task)
- self.jobs = self.jobs + 1
+ def start(self):
+ """Start the job. This will begin pulling tasks from the
+ taskmaster and executing them, and return when there are no
+ more tasks. If a task fails to execute (i.e. execute() raises
+ an exception), then the job will stop."""
while 1:
- try:
- task, ok = self.tp.get_nowait()
- except Queue.Empty:
- if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
+ if self.jobs < self.maxjobs:
+ task = self.taskmaster.next_task()
+ if task is None:
break
- task, ok = self.tp.get()
- self.jobs = self.jobs - 1
- if ok:
- task.executed()
- else:
- task.failed()
+ # prepare task for execution
+ try:
+ task.prepare()
+ except KeyboardInterrupt:
+ raise
+ except:
+ # Let the failed() callback function arrange for the
+ # build to stop if that's appropriate.
+ task.failed()
+
+ # dispatch task
+ self.tp.put(task)
+ self.jobs = self.jobs + 1
+
+ while 1:
+ try:
+ task, ok = self.tp.get_nowait()
+ except Queue.Empty:
+ if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()):
+ break
+ task, ok = self.tp.get()
+
+ self.jobs = self.jobs - 1
+ if ok:
+ task.executed()
+ else:
+ task.failed()