diff options
author | Andrew Morrow <andrew.morrow@viam.com> | 2023-12-14 15:49:22 (GMT) |
---|---|---|
committer | Andrew Morrow <andrew.morrow@viam.com> | 2024-01-19 18:46:34 (GMT) |
commit | 8c8052c95d4645a584b4332e11aabe0843446678 (patch) | |
tree | 0e43797d83d55c9088ddab18d77eb16320e17fa4 /SCons/Taskmaster | |
parent | c452f92126499dd213ac1593791212cc73ecb50a (diff) | |
download | SCons-8c8052c95d4645a584b4332e11aabe0843446678.zip SCons-8c8052c95d4645a584b4332e11aabe0843446678.tar.gz SCons-8c8052c95d4645a584b4332e11aabe0843446678.tar.bz2 |
threading is always available
Diffstat (limited to 'SCons/Taskmaster')
-rw-r--r-- | SCons/Taskmaster/Job.py | 909 |
1 files changed, 449 insertions, 460 deletions
diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 572464b..b73c941 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -31,6 +31,7 @@ import SCons.compat import logging import os +import queue import signal import sys import threading @@ -95,16 +96,13 @@ class Jobs: if stack_size is None: stack_size = default_stack_size - try: - experimental_option = GetOption('experimental') - if 'tm_v2' in experimental_option: - self.job = NewParallel(taskmaster, num, stack_size) - else: - self.job = LegacyParallel(taskmaster, num, stack_size) + experimental_option = GetOption('experimental') + if 'tm_v2' in experimental_option: + self.job = NewParallel(taskmaster, num, stack_size) + else: + self.job = LegacyParallel(taskmaster, num, stack_size) - self.num_jobs = num - except NameError: - pass + self.num_jobs = num if self.job is None: self.job = Serial(taskmaster) self.num_jobs = 1 @@ -239,505 +237,496 @@ class Serial: self.taskmaster.cleanup() -# Trap import failure so that everything in the Job module but the -# Parallel class (and its dependent classes) will work if the interpreter -# doesn't support threads. -try: - import queue - import threading -except ImportError: - pass -else: - class Worker(threading.Thread): - """A worker thread waits on a task to be posted to its request queue, - dequeues the task, executes it, and posts a tuple including the task - and a boolean indicating whether the task executed successfully. """ +class Worker(threading.Thread): + """A worker thread waits on a task to be posted to its request queue, + dequeues the task, executes it, and posts a tuple including the task + and a boolean indicating whether the task executed successfully. """ - def __init__(self, requestQueue, resultsQueue, interrupted) -> None: - super().__init__() - self.daemon = True - self.requestQueue = requestQueue - self.resultsQueue = resultsQueue - self.interrupted = interrupted - self.start() + def __init__(self, requestQueue, resultsQueue, interrupted) -> None: + super().__init__() + self.daemon = True + self.requestQueue = requestQueue + self.resultsQueue = resultsQueue + self.interrupted = interrupted + self.start() - def run(self): - while True: - task = self.requestQueue.get() + def run(self): + while True: + task = self.requestQueue.get() - if task is None: - # The "None" value is used as a sentinel by - # ThreadPool.cleanup(). This indicates that there - # are no more tasks, so we should quit. - break + if task is None: + # The "None" value is used as a sentinel by + # ThreadPool.cleanup(). This indicates that there + # are no more tasks, so we should quit. + break - try: - if self.interrupted(): - raise SCons.Errors.BuildError( - task.targets[0], errstr=interrupt_msg) - task.execute() - except Exception: - task.exception_set() - ok = False - else: - ok = True + try: + if self.interrupted(): + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + task.execute() + except Exception: + task.exception_set() + ok = False + else: + ok = True - self.resultsQueue.put((task, ok)) + self.resultsQueue.put((task, ok)) - class ThreadPool: - """This class is responsible for spawning and managing worker threads.""" +class ThreadPool: + """This class is responsible for spawning and managing worker threads.""" - def __init__(self, num, stack_size, interrupted) -> None: - """Create the request and reply queues, and 'num' worker threads. + def __init__(self, num, stack_size, interrupted) -> None: + """Create the request and reply queues, and 'num' worker threads. - One must specify the stack size of the worker threads. The - stack size is specified in kilobytes. - """ - self.requestQueue = queue.Queue(0) - self.resultsQueue = queue.Queue(0) + One must specify the stack size of the worker threads. The + stack size is specified in kilobytes. + """ + self.requestQueue = queue.Queue(0) + self.resultsQueue = queue.Queue(0) - try: - prev_size = threading.stack_size(stack_size * 1024) - except AttributeError as e: - # Only print a warning if the stack size has been - # explicitly set. - if explicit_stack_size is not None: - msg = "Setting stack size is unsupported by this version of Python:\n " + \ - e.args[0] - SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) - except ValueError as e: - msg = "Setting stack size failed:\n " + str(e) + try: + prev_size = threading.stack_size(stack_size * 1024) + except AttributeError as e: + # Only print a warning if the stack size has been + # explicitly set. + if explicit_stack_size is not None: + msg = "Setting stack size is unsupported by this version of Python:\n " + \ + e.args[0] SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + except ValueError as e: + msg = "Setting stack size failed:\n " + str(e) + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + + # Create worker threads + self.workers = [] + for _ in range(num): + worker = Worker(self.requestQueue, self.resultsQueue, interrupted) + self.workers.append(worker) + + if 'prev_size' in locals(): + threading.stack_size(prev_size) + + def put(self, task) -> None: + """Put task into request queue.""" + self.requestQueue.put(task) + + def get(self): + """Remove and return a result tuple from the results queue.""" + return self.resultsQueue.get() - # Create worker threads - self.workers = [] - for _ in range(num): - worker = Worker(self.requestQueue, self.resultsQueue, interrupted) - self.workers.append(worker) - - if 'prev_size' in locals(): - threading.stack_size(prev_size) - - def put(self, task) -> None: - """Put task into request queue.""" - self.requestQueue.put(task) - - def get(self): - """Remove and return a result tuple from the results queue.""" - return self.resultsQueue.get() - - def preparation_failed(self, task) -> None: - self.resultsQueue.put((task, False)) - - def cleanup(self) -> None: - """ - Shuts down the thread pool, giving each worker thread a - chance to shut down gracefully. - """ - # For each worker thread, put a sentinel "None" value - # on the requestQueue (indicating that there's no work - # to be done) so that each worker thread will get one and - # terminate gracefully. - for _ in self.workers: - self.requestQueue.put(None) - - # Wait for all of the workers to terminate. - # - # If we don't do this, later Python versions (2.4, 2.5) often - # seem to raise exceptions during shutdown. This happens - # in requestQueue.get(), as an assertion failure that - # requestQueue.not_full is notified while not acquired, - # seemingly because the main thread has shut down (or is - # in the process of doing so) while the workers are still - # trying to pull sentinels off the requestQueue. - # - # Normally these terminations should happen fairly quickly, - # but we'll stick a one-second timeout on here just in case - # someone gets hung. - for worker in self.workers: - worker.join(1.0) - self.workers = [] - - class LegacyParallel: - """This class is used to execute tasks in parallel, and is somewhat - less efficient than Serial, but is appropriate for parallel builds. - - This class is thread safe. + def preparation_failed(self, task) -> None: + self.resultsQueue.put((task, False)) + + def cleanup(self) -> None: + """ + Shuts down the thread pool, giving each worker thread a + chance to shut down gracefully. """ + # For each worker thread, put a sentinel "None" value + # on the requestQueue (indicating that there's no work + # to be done) so that each worker thread will get one and + # terminate gracefully. + for _ in self.workers: + self.requestQueue.put(None) + + # Wait for all of the workers to terminate. + # + # If we don't do this, later Python versions (2.4, 2.5) often + # seem to raise exceptions during shutdown. This happens + # in requestQueue.get(), as an assertion failure that + # requestQueue.not_full is notified while not acquired, + # seemingly because the main thread has shut down (or is + # in the process of doing so) while the workers are still + # trying to pull sentinels off the requestQueue. + # + # Normally these terminations should happen fairly quickly, + # but we'll stick a one-second timeout on here just in case + # someone gets hung. + for worker in self.workers: + worker.join(1.0) + self.workers = [] + +class LegacyParallel: + """This class is used to execute tasks in parallel, and is somewhat + less efficient than Serial, but is appropriate for parallel builds. + + This class is thread safe. + """ - def __init__(self, taskmaster, num, stack_size) -> None: - """Create a new parallel job given a taskmaster. + def __init__(self, taskmaster, num, stack_size) -> None: + """Create a new parallel job given a taskmaster. - The taskmaster's next_task() method should return the next - task that needs to be executed, or None if there are no more - tasks. The taskmaster's executed() method will be called - for each task when it is successfully executed, or failed() - will be called if the task failed to execute (i.e. execute() - raised an exception). + The taskmaster's next_task() method should return the next + task that needs to be executed, or None if there are no more + tasks. The taskmaster's executed() method will be called + for each task when it is successfully executed, or failed() + will be called if the task failed to execute (i.e. execute() + raised an exception). - Note: calls to taskmaster are serialized, but calls to - execute() on distinct tasks are not serialized, because - that is the whole point of parallel jobs: they can execute - multiple tasks simultaneously. """ + Note: calls to taskmaster are serialized, but calls to + execute() on distinct tasks are not serialized, because + that is the whole point of parallel jobs: they can execute + multiple tasks simultaneously. """ - self.taskmaster = taskmaster - self.interrupted = InterruptState() - self.tp = ThreadPool(num, stack_size, self.interrupted) + self.taskmaster = taskmaster + self.interrupted = InterruptState() + self.tp = ThreadPool(num, stack_size, self.interrupted) - self.maxjobs = num + self.maxjobs = num - def start(self): - """Start the job. This will begin pulling tasks from the - taskmaster and executing them, and return when there are no - more tasks. If a task fails to execute (i.e. execute() raises - an exception), then the job will stop.""" + def start(self): + """Start the job. This will begin pulling tasks from the + taskmaster and executing them, and return when there are no + more tasks. If a task fails to execute (i.e. execute() raises + an exception), then the job will stop.""" - jobs = 0 + jobs = 0 - while True: - # Start up as many available tasks as we're - # allowed to. - while jobs < self.maxjobs: - task = self.taskmaster.next_task() - if task is None: - break + while True: + # Start up as many available tasks as we're + # allowed to. + while jobs < self.maxjobs: + task = self.taskmaster.next_task() + if task is None: + break - try: - # prepare task for execution - task.prepare() - except Exception: - task.exception_set() - task.failed() - task.postprocess() + try: + # prepare task for execution + task.prepare() + except Exception: + task.exception_set() + task.failed() + task.postprocess() + else: + if task.needs_execute(): + # dispatch task + self.tp.put(task) + jobs += 1 else: - if task.needs_execute(): - # dispatch task - self.tp.put(task) - jobs += 1 - else: - task.executed() - task.postprocess() + task.executed() + task.postprocess() + + if not task and not jobs: + break + + # Let any/all completed tasks finish up before we go + # back and put the next batch of tasks on the queue. + while True: + task, ok = self.tp.get() + jobs -= 1 + + if ok: + task.executed() + else: + if self.interrupted(): + try: + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + except Exception: + task.exception_set() - if not task and not jobs: + # Let the failed() callback function arrange + # for the build to stop if that's appropriate. + task.failed() + + task.postprocess() + + if self.tp.resultsQueue.empty(): break - # Let any/all completed tasks finish up before we go - # back and put the next batch of tasks on the queue. - while True: - task, ok = self.tp.get() - jobs -= 1 + self.tp.cleanup() + self.taskmaster.cleanup() - if ok: - task.executed() - else: - if self.interrupted(): - try: - raise SCons.Errors.BuildError( - task.targets[0], errstr=interrupt_msg) - except Exception: - task.exception_set() +# An experimental new parallel scheduler that uses a leaders/followers pattern. +class NewParallel: - # Let the failed() callback function arrange - # for the build to stop if that's appropriate. - task.failed() + class State(Enum): + READY = 0 + SEARCHING = 1 + STALLED = 2 + COMPLETED = 3 - task.postprocess() + class Worker(threading.Thread): + def __init__(self, owner) -> None: + super().__init__() + self.daemon = True + self.owner = owner + self.start() - if self.tp.resultsQueue.empty(): - break - - self.tp.cleanup() - self.taskmaster.cleanup() - - # An experimental new parallel scheduler that uses a leaders/followers pattern. - class NewParallel: - - class State(Enum): - READY = 0 - SEARCHING = 1 - STALLED = 2 - COMPLETED = 3 - - class Worker(threading.Thread): - def __init__(self, owner) -> None: - super().__init__() - self.daemon = True - self.owner = owner - self.start() - - def run(self) -> None: - self.owner._work() - - def __init__(self, taskmaster, num, stack_size) -> None: - self.taskmaster = taskmaster - self.num_workers = num - self.stack_size = stack_size - self.interrupted = InterruptState() - self.workers = [] - - # The `tm_lock` is what ensures that we only have one - # thread interacting with the taskmaster at a time. It - # also protects access to our state that gets updated - # concurrently. The `can_search_cv` is associated with - # this mutex. - self.tm_lock = threading.Lock() - - # Guarded under `tm_lock`. - self.jobs = 0 - self.state = NewParallel.State.READY - - # The `can_search_cv` is used to manage a leader / - # follower pattern for access to the taskmaster, and to - # awaken from stalls. - self.can_search_cv = threading.Condition(self.tm_lock) - - # The queue of tasks that have completed execution. The - # next thread to obtain `tm_lock`` will retire them. - self.results_queue_lock = threading.Lock() - self.results_queue = [] - - if self.taskmaster.trace: - self.trace = self._setup_logging() - else: - self.trace = False - - def _setup_logging(self): - jl = logging.getLogger("Job") - jl.setLevel(level=logging.DEBUG) - jl.addHandler(self.taskmaster.trace.log_handler) - return jl - - def trace_message(self, message) -> None: - # This grabs the name of the function which calls trace_message() - method_name = sys._getframe(1).f_code.co_name + "():" - thread_id=threading.get_ident() - self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message)) - # print('%-15s %s' % (method_name, message)) - - def start(self) -> None: - self._start_workers() - for worker in self.workers: - worker.join() - self.workers = [] - self.taskmaster.cleanup() - - def _start_workers(self) -> None: - prev_size = self._adjust_stack_size() - for _ in range(self.num_workers): - self.workers.append(NewParallel.Worker(self)) - self._restore_stack_size(prev_size) - - def _adjust_stack_size(self): - try: - prev_size = threading.stack_size(self.stack_size * 1024) - return prev_size - except AttributeError as e: - # Only print a warning if the stack size has been - # explicitly set. - if explicit_stack_size is not None: - msg = "Setting stack size is unsupported by this version of Python:\n " + \ - e.args[0] - SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) - except ValueError as e: - msg = "Setting stack size failed:\n " + str(e) - SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + def run(self) -> None: + self.owner._work() - return None + def __init__(self, taskmaster, num, stack_size) -> None: + self.taskmaster = taskmaster + self.num_workers = num + self.stack_size = stack_size + self.interrupted = InterruptState() + self.workers = [] + + # The `tm_lock` is what ensures that we only have one + # thread interacting with the taskmaster at a time. It + # also protects access to our state that gets updated + # concurrently. The `can_search_cv` is associated with + # this mutex. + self.tm_lock = threading.Lock() + + # Guarded under `tm_lock`. + self.jobs = 0 + self.state = NewParallel.State.READY + + # The `can_search_cv` is used to manage a leader / + # follower pattern for access to the taskmaster, and to + # awaken from stalls. + self.can_search_cv = threading.Condition(self.tm_lock) + + # The queue of tasks that have completed execution. The + # next thread to obtain `tm_lock`` will retire them. + self.results_queue_lock = threading.Lock() + self.results_queue = [] + + if self.taskmaster.trace: + self.trace = self._setup_logging() + else: + self.trace = False + + def _setup_logging(self): + jl = logging.getLogger("Job") + jl.setLevel(level=logging.DEBUG) + jl.addHandler(self.taskmaster.trace.log_handler) + return jl + + def trace_message(self, message) -> None: + # This grabs the name of the function which calls trace_message() + method_name = sys._getframe(1).f_code.co_name + "():" + thread_id=threading.get_ident() + self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message)) + # print('%-15s %s' % (method_name, message)) + + def start(self) -> None: + self._start_workers() + for worker in self.workers: + worker.join() + self.workers = [] + self.taskmaster.cleanup() - def _restore_stack_size(self, prev_size) -> None: - if prev_size is not None: - threading.stack_size(prev_size) + def _start_workers(self) -> None: + prev_size = self._adjust_stack_size() + for _ in range(self.num_workers): + self.workers.append(NewParallel.Worker(self)) + self._restore_stack_size(prev_size) - def _work(self): + def _adjust_stack_size(self): + try: + prev_size = threading.stack_size(self.stack_size * 1024) + return prev_size + except AttributeError as e: + # Only print a warning if the stack size has been + # explicitly set. + if explicit_stack_size is not None: + msg = "Setting stack size is unsupported by this version of Python:\n " + \ + e.args[0] + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + except ValueError as e: + msg = "Setting stack size failed:\n " + str(e) + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) - task = None + return None - while True: + def _restore_stack_size(self, prev_size) -> None: + if prev_size is not None: + threading.stack_size(prev_size) - # Obtain `tm_lock`, granting exclusive access to the taskmaster. - with self.can_search_cv: + def _work(self): + task = None + + while True: + + # Obtain `tm_lock`, granting exclusive access to the taskmaster. + with self.can_search_cv: + + if self.trace: + self.trace_message("Gained exclusive access") + + # Capture whether we got here with `task` set, + # then drop our reference to the task as we are no + # longer interested in the actual object. + completed_task = (task is not None) + task = None + + # We will only have `completed_task` set here if + # we have looped back after executing a task. If + # we have completed a task and find that we are + # stalled, we should speculatively indicate that + # we are no longer stalled by transitioning to the + # 'ready' state which will bypass the condition + # wait so that we immediately process the results + # queue and hopefully light up new + # work. Otherwise, stay stalled, and we will wait + # in the condvar. Some other thread will come back + # here with a completed task. + if self.state == NewParallel.State.STALLED and completed_task: if self.trace: - self.trace_message("Gained exclusive access") - - # Capture whether we got here with `task` set, - # then drop our reference to the task as we are no - # longer interested in the actual object. - completed_task = (task is not None) - task = None - - # We will only have `completed_task` set here if - # we have looped back after executing a task. If - # we have completed a task and find that we are - # stalled, we should speculatively indicate that - # we are no longer stalled by transitioning to the - # 'ready' state which will bypass the condition - # wait so that we immediately process the results - # queue and hopefully light up new - # work. Otherwise, stay stalled, and we will wait - # in the condvar. Some other thread will come back - # here with a completed task. - if self.state == NewParallel.State.STALLED and completed_task: - if self.trace: - self.trace_message("Detected stall with completed task, bypassing wait") - self.state = NewParallel.State.READY - - # Wait until we are neither searching nor stalled. - while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: - if self.trace: - self.trace_message("Search already in progress, waiting") - self.can_search_cv.wait() - - # If someone set the completed flag, bail. - if self.state == NewParallel.State.COMPLETED: - if self.trace: - self.trace_message("Completion detected, breaking from main loop") - break - - # Set the searching flag to indicate that a thread - # is currently in the critical section for - # taskmaster work. - # + self.trace_message("Detected stall with completed task, bypassing wait") + self.state = NewParallel.State.READY + + # Wait until we are neither searching nor stalled. + while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: if self.trace: - self.trace_message("Starting search") - self.state = NewParallel.State.SEARCHING - - # Bulk acquire the tasks in the results queue - # under the result queue lock, then process them - # all outside that lock. We need to process the - # tasks in the results queue before looking for - # new work because we might be unable to find new - # work if we don't. - results_queue = [] - with self.results_queue_lock: - results_queue, self.results_queue = self.results_queue, results_queue + self.trace_message("Search already in progress, waiting") + self.can_search_cv.wait() + # If someone set the completed flag, bail. + if self.state == NewParallel.State.COMPLETED: if self.trace: - self.trace_message("Found {len(results_queue)} completed tasks to process") - for (rtask, rresult) in results_queue: - if rresult: - rtask.executed() - else: - if self.interrupted(): - try: - raise SCons.Errors.BuildError( - rtask.targets[0], errstr=interrupt_msg) - except Exception: - rtask.exception_set() - - # Let the failed() callback function arrange - # for the build to stop if that's appropriate. - rtask.failed() - - rtask.postprocess() - self.jobs -= 1 - - # We are done with any task objects that were in - # the results queue. - results_queue.clear() - - # Now, turn the crank on the taskmaster until we - # either run out of tasks, or find a task that - # needs execution. If we run out of tasks, go idle - # until results arrive if jobs are pending, or - # mark the walk as complete if not. - while self.state == NewParallel.State.SEARCHING: - if self.trace: - self.trace_message("Searching for new tasks") - task = self.taskmaster.next_task() - - if task: - # We found a task. Walk it through the - # task lifecycle. If it does not need - # execution, just complete the task and - # look for the next one. Otherwise, - # indicate that we are no longer searching - # so we can drop out of this loop, execute - # the task outside the lock, and allow - # another thread in to search. + self.trace_message("Completion detected, breaking from main loop") + break + + # Set the searching flag to indicate that a thread + # is currently in the critical section for + # taskmaster work. + # + if self.trace: + self.trace_message("Starting search") + self.state = NewParallel.State.SEARCHING + + # Bulk acquire the tasks in the results queue + # under the result queue lock, then process them + # all outside that lock. We need to process the + # tasks in the results queue before looking for + # new work because we might be unable to find new + # work if we don't. + results_queue = [] + with self.results_queue_lock: + results_queue, self.results_queue = self.results_queue, results_queue + + if self.trace: + self.trace_message("Found {len(results_queue)} completed tasks to process") + for (rtask, rresult) in results_queue: + if rresult: + rtask.executed() + else: + if self.interrupted(): try: - task.prepare() + raise SCons.Errors.BuildError( + rtask.targets[0], errstr=interrupt_msg) except Exception: - task.exception_set() - task.failed() - task.postprocess() - else: - if not task.needs_execute(): - if self.trace: - self.trace_message("Found internal task") - task.executed() - task.postprocess() - else: - self.jobs += 1 - if self.trace: - self.trace_message("Found task requiring execution") - self.state = NewParallel.State.READY - self.can_search_cv.notify() + rtask.exception_set() + + # Let the failed() callback function arrange + # for the build to stop if that's appropriate. + rtask.failed() + + rtask.postprocess() + self.jobs -= 1 + + # We are done with any task objects that were in + # the results queue. + results_queue.clear() + # Now, turn the crank on the taskmaster until we + # either run out of tasks, or find a task that + # needs execution. If we run out of tasks, go idle + # until results arrive if jobs are pending, or + # mark the walk as complete if not. + while self.state == NewParallel.State.SEARCHING: + if self.trace: + self.trace_message("Searching for new tasks") + task = self.taskmaster.next_task() + + if task: + # We found a task. Walk it through the + # task lifecycle. If it does not need + # execution, just complete the task and + # look for the next one. Otherwise, + # indicate that we are no longer searching + # so we can drop out of this loop, execute + # the task outside the lock, and allow + # another thread in to search. + try: + task.prepare() + except Exception: + task.exception_set() + task.failed() + task.postprocess() else: - # We failed to find a task, so this thread - # cannot continue turning the taskmaster - # crank. We must exit the loop. - if self.jobs: - # No task was found, but there are - # outstanding jobs executing that - # might unblock new tasks when they - # complete. Transition to the stalled - # state. We do not need a notify, - # because we know there are threads - # outstanding that will re-enter the - # loop. - # + if not task.needs_execute(): if self.trace: - self.trace_message("Found no task requiring execution, but have jobs: marking stalled") - self.state = NewParallel.State.STALLED + self.trace_message("Found internal task") + task.executed() + task.postprocess() else: - # We didn't find a task and there are - # no jobs outstanding, so there is - # nothing that will ever return - # results which might unblock new - # tasks. We can conclude that the walk - # is complete. Update our state to - # note completion and awaken anyone - # sleeping on the condvar. - # + self.jobs += 1 if self.trace: - self.trace_message("Found no task requiring execution, and have no jobs: marking complete") - self.state = NewParallel.State.COMPLETED - self.can_search_cv.notify_all() - - # We no longer hold `tm_lock` here. If we have a task, - # we can now execute it. If there are threads waiting - # to search, one of them can now begin turning the - # taskmaster crank in NewParallel. - if task: - if self.trace: - self.trace_message("Executing task") - ok = True - try: - if self.interrupted(): - raise SCons.Errors.BuildError( - task.targets[0], errstr=interrupt_msg) - task.execute() - except Exception: - ok = False - task.exception_set() + self.trace_message("Found task requiring execution") + self.state = NewParallel.State.READY + self.can_search_cv.notify() - # Grab the results queue lock and enqueue the - # executed task and state. The next thread into - # the searching loop will complete the - # postprocessing work under the taskmaster lock. - # - if self.trace: - self.trace_message("Enqueueing executed task results") - with self.results_queue_lock: - self.results_queue.append((task, ok)) - - # Tricky state "fallthrough" here. We are going back - # to the top of the loop, which behaves differently - # depending on whether `task` is set. Do not perturb - # the value of the `task` variable if you add new code - # after this comment. + else: + # We failed to find a task, so this thread + # cannot continue turning the taskmaster + # crank. We must exit the loop. + if self.jobs: + # No task was found, but there are + # outstanding jobs executing that + # might unblock new tasks when they + # complete. Transition to the stalled + # state. We do not need a notify, + # because we know there are threads + # outstanding that will re-enter the + # loop. + # + if self.trace: + self.trace_message("Found no task requiring execution, but have jobs: marking stalled") + self.state = NewParallel.State.STALLED + else: + # We didn't find a task and there are + # no jobs outstanding, so there is + # nothing that will ever return + # results which might unblock new + # tasks. We can conclude that the walk + # is complete. Update our state to + # note completion and awaken anyone + # sleeping on the condvar. + # + if self.trace: + self.trace_message("Found no task requiring execution, and have no jobs: marking complete") + self.state = NewParallel.State.COMPLETED + self.can_search_cv.notify_all() + + # We no longer hold `tm_lock` here. If we have a task, + # we can now execute it. If there are threads waiting + # to search, one of them can now begin turning the + # taskmaster crank in NewParallel. + if task: + if self.trace: + self.trace_message("Executing task") + ok = True + try: + if self.interrupted(): + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + task.execute() + except Exception: + ok = False + task.exception_set() + + # Grab the results queue lock and enqueue the + # executed task and state. The next thread into + # the searching loop will complete the + # postprocessing work under the taskmaster lock. + # + if self.trace: + self.trace_message("Enqueueing executed task results") + with self.results_queue_lock: + self.results_queue.append((task, ok)) + + # Tricky state "fallthrough" here. We are going back + # to the top of the loop, which behaves differently + # depending on whether `task` is set. Do not perturb + # the value of the `task` variable if you add new code + # after this comment. # Local Variables: # tab-width:4 |