diff options
Diffstat (limited to 'src/engine/SCons/Job.py')
-rw-r--r-- | src/engine/SCons/Job.py | 248 |
1 files changed, 105 insertions, 143 deletions
diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py index 87a1bc2..202f86f 100644 --- a/src/engine/SCons/Job.py +++ b/src/engine/SCons/Job.py @@ -31,43 +31,29 @@ stop, and wait on jobs. __revision__ = "__FILE__ __REVISION__ __DATE__ __DEVELOPER__" -import time - class Jobs: """An instance of this class initializes N jobs, and provides methods for starting, stopping, and waiting on all N jobs. """ - + def __init__(self, num, taskmaster): """ create 'num' jobs using the given taskmaster. If 'num' is 1 or less, then a serial job will be used, - otherwise 'num' parallel jobs will be used. + otherwise a parallel job with 'num' worker threads will + be used. """ - # Keeps track of keyboard interrupts: - self.keyboard_interrupt = 0 - if num > 1: - self.jobs = [] - for i in range(num): - self.jobs.append(Parallel(taskmaster, self)) + self.job = Parallel(taskmaster, num) else: - self.jobs = [Serial(taskmaster, self)] - - self.running = [] + self.job = Serial(taskmaster) def run(self): - """run the jobs, and wait for them to finish""" - + """run the job""" try: - for job in self.jobs: - job.start() - self.running.append(job) - while self.running: - self.running[-1].wait() - self.running.pop() + self.job.start() except KeyboardInterrupt: # mask any further keyboard interrupts so that scons # can shutdown cleanly: @@ -75,19 +61,8 @@ class Jobs: # child processes can still get the keyboard interrupt) import signal signal.signal(signal.SIGINT, signal.SIG_IGN) + raise - for job in self.running: - job.keyboard_interrupt() - else: - self.keyboard_interrupt = 1 - - # wait on any remaining jobs to finish: - for job in self.running: - job.wait() - - if self.keyboard_interrupt: - raise KeyboardInterrupt - class Serial: """This class is used to execute tasks in series, and is more efficient than Parallel, but is only appropriate for non-parallel builds. Only @@ -96,7 +71,7 @@ class Serial: This class is not thread safe. """ - def __init__(self, taskmaster, jobs): + def __init__(self, taskmaster): """Create a new serial job given a taskmaster. The taskmaster's next_task() method should return the next task @@ -107,16 +82,14 @@ class Serial: is_blocked() method will not be called. """ self.taskmaster = taskmaster - self.jobs = jobs def start(self): - """Start the job. This will begin pulling tasks from the taskmaster and executing them, and return when there are no more tasks. If a task fails to execute (i.e. execute() raises an exception), then the job will stop.""" - while not self.jobs.keyboard_interrupt: + while 1: task = self.taskmaster.next_task() if task is None: @@ -134,33 +107,80 @@ class Serial: else: task.executed() - def wait(self): - """Serial jobs are always finished when start() returns, so there - is nothing to do here""" - pass - - def keyboard_interrupt(self): - self.jobs.keyboard_interrupt = 1 +# Trap import failure so that everything in the Job module but the +# Parallel class (and its dependent classes) will work if the interpreter +# doesn't support threads. +try: + import Queue + import threading +except ImportError: + pass + +class Worker(threading.Thread): + """A worker thread waits on a task to be posted to its request queue, + dequeues the task, executes it, and posts a tuple including the task + and a boolean indicating whether the task executed successfully. """ -# The will hold a condition variable once the first parallel task -# is created. -cv = None + def __init__(self, requestQueue, resultsQueue): + threading.Thread.__init__(self) + self.setDaemon(1) + self.requestQueue = requestQueue + self.resultsQueue = resultsQueue + self.start() + + def run(self): + while 1: + task = self.requestQueue.get() + + try: + task.execute() + except: + ok = False + else: + ok = True + + self.resultsQueue.put((task, ok)) + +class ThreadPool: + """This class is responsible for spawning and managing worker threads.""" + + def __init__(self, num): + """Create the request and reply queues, and 'num' worker threads.""" + # Ideally we wouldn't have to artificially limit the number of + # tasks that can be posted to the request queue. But this can + # result in a large number of pending tasks, which at the time + # of this writing causes the taskmaster's next_task method to + # take a very long time. + self.requestQueue = Queue.Queue(num) + self.resultsQueue = Queue.Queue() + + # Create worker threads + for i in range(num): + worker = Worker(self.requestQueue, self.resultsQueue) + + def put(self, obj): + """Put task into request queue.""" + self.requestQueue.put(obj) + + def get(self, block = 1): + """Remove and return a result tuple from the results queue.""" + return self.resultsQueue.get(block) + + def get_nowait(self): + """Remove and result a result tuple from the results queue + without blocking.""" + return self.get(False) class Parallel: - """This class is used to execute tasks in parallel, and is less - efficient than Serial, but is appropriate for parallel builds. Create - an instance of this class for each job or thread you want. + """This class is used to execute tasks in parallel, and is somewhat + less efficient than Serial, but is appropriate for parallel builds. This class is thread safe. """ - - def __init__(self, taskmaster, jobs): - """Create a new parallel job given a taskmaster, and a Jobs instance. - Multiple jobs will be using the taskmaster in parallel, but all - method calls to taskmaster methods are serialized by the jobs - themselves. + def __init__(self, taskmaster, num): + """Create a new parallel job given a taskmaster. The taskmaster's next_task() method should return the next task that needs to be executed, or None if there are no more tasks. The @@ -177,100 +197,42 @@ class Parallel: of parallel jobs: they can execute multiple tasks simultaneously. """ - global cv - - # import threading here so that everything in the Job module - # but the Parallel class will work if the interpreter doesn't - # support threads - import threading - self.taskmaster = taskmaster - self.jobs = jobs - self.thread = threading.Thread(None, self.__run) - - if cv is None: - cv = threading.Condition() + self.tp = ThreadPool(num) def start(self): - """Start the job. This will spawn a thread that will begin pulling - tasks from the task master and executing them. This method returns - immediately and doesn't wait for the jobs to be executed. - - To wait for the job to finish, call wait(). - """ - self.thread.start() - - def wait(self): - """Wait for the job to finish. A job is finished when there - are no more tasks. - - This method should only be called after start() has been called. - """ - - # Sleeping in a loop like this is lame. Calling - # self.thread.join() would be much nicer, but - # on Linux self.thread.join() doesn't always - # return when a KeyboardInterrupt happens, and when - # it does return, it causes Python to hang on shutdown. - # In other words this is just - # a work-around for some bugs/limitations in the - # self.thread.join() method. - while self.thread.isAlive(): - time.sleep(0.5) + """Start the job. This will begin pulling tasks from the + taskmaster and executing them, and return when there are no + more tasks. If a task fails to execute (i.e. execute() raises + an exception), then the job will stop.""" - def keyboard_interrupt(self): - cv.acquire() - self.jobs.keyboard_interrupt = 1 - cv.notifyAll() - cv.release() - - def __run(self): - """private method that actually executes the tasks""" + while 1: + task = self.taskmaster.next_task() + if task is None: + break - cv.acquire() + # prepare task for execution + try: + task.prepare() + except KeyboardInterrupt: + raise + except: + # Let the failed() callback function arrange for the + # build to stop if that's appropriate. + task.failed() - try: + # dispatch task + self.tp.put(task) while 1: - while (self.taskmaster.is_blocked() and - not self.jobs.keyboard_interrupt): - cv.wait(None) - - if self.jobs.keyboard_interrupt: - break - - task = self.taskmaster.next_task() - - if task == None: - break - try: - task.prepare() - cv.release() - try: - task.execute() - finally: - cv.acquire() - except KeyboardInterrupt: - self.jobs.keyboard_interrupt = 1 - except: - # Let the failed() callback function arrange for - # calling self.jobs.stop() to to stop the build - # if that's appropriate. - task.failed() - else: - task.executed() - - # signal the cv whether the task failed or not, - # or otherwise the other Jobs might - # remain blocked: - if (not self.taskmaster.is_blocked() or - self.jobs.keyboard_interrupt): - cv.notifyAll() - - finally: - cv.release() - - - + task, ok = self.tp.get_nowait() + except Queue.Empty: + if not self.taskmaster.is_blocked(): + break + task, ok = self.tp.get() + if ok: + task.executed() + else: + task.failed() |