diff options
author | Steven Knight <knight@baldmt.com> | 2003-10-22 03:15:44 (GMT) |
---|---|---|
committer | Steven Knight <knight@baldmt.com> | 2003-10-22 03:15:44 (GMT) |
commit | 8ea748f67747b589b407db59acbe15d62962ba33 (patch) | |
tree | 0177b25e280c6371d87d57211d667c9952a7440d /src/engine/SCons/Job.py | |
parent | 5711795d6f0f4dffbcfabc0d823024ca44313b27 (diff) | |
download | SCons-8ea748f67747b589b407db59acbe15d62962ba33.zip SCons-8ea748f67747b589b407db59acbe15d62962ba33.tar.gz SCons-8ea748f67747b589b407db59acbe15d62962ba33.tar.bz2 |
Really handle lack of the threading.py module when run by non-threaded Pythons.
Diffstat (limited to 'src/engine/SCons/Job.py')
-rw-r--r-- | src/engine/SCons/Job.py | 225 |
1 files changed, 118 insertions, 107 deletions
diff --git a/src/engine/SCons/Job.py b/src/engine/SCons/Job.py index d26b73c..ee84e34 100644 --- a/src/engine/SCons/Job.py +++ b/src/engine/SCons/Job.py @@ -43,12 +43,23 @@ class Jobs: If 'num' is 1 or less, then a serial job will be used, otherwise a parallel job with 'num' worker threads will be used. + + The 'num_jobs' attribute will be set to the actual number of jobs + allocated. If more than one job is requested but the Parallel + class can't do it, it gets reset to 1. Wrapping interfaces that + care should check the value of 'num_jobs' after initialization. """ + self.job = None if num > 1: - self.job = Parallel(taskmaster, num) - else: + try: + self.job = Parallel(taskmaster, num) + self.num_jobs = num + except NameError: + pass + if self.job is None: self.job = Serial(taskmaster) + self.num_jobs = 1 def run(self): """run the job""" @@ -116,127 +127,127 @@ try: import threading except ImportError: pass +else: + class Worker(threading.Thread): + """A worker thread waits on a task to be posted to its request queue, + dequeues the task, executes it, and posts a tuple including the task + and a boolean indicating whether the task executed successfully. """ + + def __init__(self, requestQueue, resultsQueue): + threading.Thread.__init__(self) + self.setDaemon(1) + self.requestQueue = requestQueue + self.resultsQueue = resultsQueue + self.start() + + def run(self): + while 1: + task = self.requestQueue.get() -class Worker(threading.Thread): - """A worker thread waits on a task to be posted to its request queue, - dequeues the task, executes it, and posts a tuple including the task - and a boolean indicating whether the task executed successfully. """ - - def __init__(self, requestQueue, resultsQueue): - threading.Thread.__init__(self) - self.setDaemon(1) - self.requestQueue = requestQueue - self.resultsQueue = resultsQueue - self.start() - - def run(self): - while 1: - task = self.requestQueue.get() - - try: - task.execute() - except KeyboardInterrupt: - # be explicit here for test/interrupts.py - ok = False - except: - ok = 0 - else: - ok = 1 - - self.resultsQueue.put((task, ok)) - -class ThreadPool: - """This class is responsible for spawning and managing worker threads.""" + try: + task.execute() + except KeyboardInterrupt: + # be explicit here for test/interrupts.py + ok = False + except: + ok = 0 + else: + ok = 1 - def __init__(self, num): - """Create the request and reply queues, and 'num' worker threads.""" - self.requestQueue = Queue.Queue(0) - self.resultsQueue = Queue.Queue(0) + self.resultsQueue.put((task, ok)) - # Create worker threads - for i in range(num): - worker = Worker(self.requestQueue, self.resultsQueue) + class ThreadPool: + """This class is responsible for spawning and managing worker threads.""" - def put(self, obj): - """Put task into request queue.""" - self.requestQueue.put(obj) + def __init__(self, num): + """Create the request and reply queues, and 'num' worker threads.""" + self.requestQueue = Queue.Queue(0) + self.resultsQueue = Queue.Queue(0) - def get(self, block = 1): - """Remove and return a result tuple from the results queue.""" - return self.resultsQueue.get(block) - - def get_nowait(self): - """Remove and result a result tuple from the results queue - without blocking.""" - return self.get(0) + # Create worker threads + for i in range(num): + worker = Worker(self.requestQueue, self.resultsQueue) -class Parallel: - """This class is used to execute tasks in parallel, and is somewhat - less efficient than Serial, but is appropriate for parallel builds. + def put(self, obj): + """Put task into request queue.""" + self.requestQueue.put(obj) - This class is thread safe. - """ + def get(self, block = 1): + """Remove and return a result tuple from the results queue.""" + return self.resultsQueue.get(block) + + def get_nowait(self): + """Remove and result a result tuple from the results queue + without blocking.""" + return self.get(0) - def __init__(self, taskmaster, num): - """Create a new parallel job given a taskmaster. + class Parallel: + """This class is used to execute tasks in parallel, and is somewhat + less efficient than Serial, but is appropriate for parallel builds. - The taskmaster's next_task() method should return the next task - that needs to be executed, or None if there are no more tasks. The - taskmaster's executed() method will be called for each task when it - is successfully executed or failed() will be called if the task - failed to execute (i.e. execute() raised an exception). The - taskmaster's is_blocked() method should return true iff there are - more tasks, but they can't be executed until one or more other - tasks have been executed. next_task() will be called iff - is_blocked() returned false. - - Note: calls to taskmaster are serialized, but calls to execute() on - distinct tasks are not serialized, because that is the whole point - of parallel jobs: they can execute multiple tasks - simultaneously. """ + This class is thread safe. + """ - self.taskmaster = taskmaster - self.tp = ThreadPool(num) + def __init__(self, taskmaster, num): + """Create a new parallel job given a taskmaster. - self.jobs = 0 - self.maxjobs = num + The taskmaster's next_task() method should return the next task + that needs to be executed, or None if there are no more tasks. The + taskmaster's executed() method will be called for each task when it + is successfully executed or failed() will be called if the task + failed to execute (i.e. execute() raised an exception). The + taskmaster's is_blocked() method should return true iff there are + more tasks, but they can't be executed until one or more other + tasks have been executed. next_task() will be called iff + is_blocked() returned false. - def start(self): - """Start the job. This will begin pulling tasks from the - taskmaster and executing them, and return when there are no - more tasks. If a task fails to execute (i.e. execute() raises - an exception), then the job will stop.""" + Note: calls to taskmaster are serialized, but calls to execute() on + distinct tasks are not serialized, because that is the whole point + of parallel jobs: they can execute multiple tasks + simultaneously. """ - while 1: - if self.jobs < self.maxjobs: - task = self.taskmaster.next_task() - if task is None: - break + self.taskmaster = taskmaster + self.tp = ThreadPool(num) - # prepare task for execution - try: - task.prepare() - except KeyboardInterrupt: - raise - except: - # Let the failed() callback function arrange for the - # build to stop if that's appropriate. - task.failed() + self.jobs = 0 + self.maxjobs = num - # dispatch task - self.tp.put(task) - self.jobs = self.jobs + 1 + def start(self): + """Start the job. This will begin pulling tasks from the + taskmaster and executing them, and return when there are no + more tasks. If a task fails to execute (i.e. execute() raises + an exception), then the job will stop.""" while 1: - try: - task, ok = self.tp.get_nowait() - except Queue.Empty: - if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()): + if self.jobs < self.maxjobs: + task = self.taskmaster.next_task() + if task is None: break - task, ok = self.tp.get() - self.jobs = self.jobs - 1 - if ok: - task.executed() - else: - task.failed() + # prepare task for execution + try: + task.prepare() + except KeyboardInterrupt: + raise + except: + # Let the failed() callback function arrange for the + # build to stop if that's appropriate. + task.failed() + + # dispatch task + self.tp.put(task) + self.jobs = self.jobs + 1 + + while 1: + try: + task, ok = self.tp.get_nowait() + except Queue.Empty: + if not (self.jobs is self.maxjobs or self.taskmaster.is_blocked()): + break + task, ok = self.tp.get() + + self.jobs = self.jobs - 1 + if ok: + task.executed() + else: + task.failed() |