From 3bd3df5a4979bc8fa2fbfb17dd6cd4b284892cc0 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Mon, 16 May 2022 23:19:59 -0400 Subject: Implement new parallel scheduler --- SCons/Taskmaster/Job.py | 266 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 265 insertions(+), 1 deletion(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index b398790..a0bc1d0 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -31,6 +31,9 @@ import SCons.compat import os import signal +import threading + +from enum import Enum import SCons.Errors import SCons.Warnings @@ -342,7 +345,7 @@ else: worker.join(1.0) self.workers = [] - class Parallel: + class LegacyParallel: """This class is used to execute tasks in parallel, and is somewhat less efficient than Serial, but is appropriate for parallel builds. @@ -432,6 +435,267 @@ else: self.tp.cleanup() self.taskmaster.cleanup() + + class NewParallel: + + class State(Enum): + READY = 0 + SEARCHING = 1 + STALLED = 2 + COMPLETED = 3 + + class Worker(threading.Thread): + def __init__(self, owner): + super().__init__() + self.daemon = True + self.owner = owner + self.start() + + def run(self): + self.owner._work() + + def __init__(self, taskmaster, num, stack_size): + self.taskmaster = taskmaster + self.num_workers = num + self.stack_size = stack_size + self.interrupted = InterruptState() + self.workers = [] + + # The `tm_lock` is what ensures that we only have one + # thread interacting with the taskmaster at a time. It + # also protects access to our state that gets updated + # concurrently. The `can_search_cv` is associated with + # this mutex. + self.tm_lock = threading.Lock() + + # Guarded under `tm_lock`. + self.jobs = 0 + self.state = NewParallel.State.READY + + # The `can_search_cv` is used to manage a leader / + # follower pattern for access to the taskmaster, and to + # awaken from stalls. + self.can_search_cv = threading.Condition(self.tm_lock) + + # The queue of tasks that have completed execution. The + # next thread to obtain `tm_lock`` will retire them. + self.results_queue_lock = threading.Lock() + self.results_queue = [] + + def start(self): + self._start_workers() + for worker in self.workers: + worker.join() + self.workers = [] + self.taskmaster.cleanup() + + def _start_workers(self): + prev_size = self._adjust_stack_size() + for _ in range(self.num_workers): + self.workers.append(NewParallel.Worker(self)) + self._restore_stack_size(prev_size) + + def _adjust_stack_size(self): + try: + prev_size = threading.stack_size(self.stack_size*1024) + return prev_size + except AttributeError as e: + # Only print a warning if the stack size has been + # explicitly set. + if explicit_stack_size is not None: + msg = "Setting stack size is unsupported by this version of Python:\n " + \ + e.args[0] + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + except ValueError as e: + msg = "Setting stack size failed:\n " + str(e) + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + + return None + + def _restore_stack_size(self, prev_size): + if prev_size is not None: + threading.stack_size(prev_size) + + def _work(self): + + task = None + + while True: + + # Obtain `tm_lock`, granting exclusive access to the taskmaster. + with self.can_search_cv: + + # print(f"XXX {threading.get_ident()} Gained exclusive access") + + # Capture whether we got here with `task` set, + # then drop our reference to the task as we are no + # longer interested in the actual object. + completed_task = (task is not None) + task = None + + # We will only have `completed_task` set here if + # we have looped back after executing a task. If + # we have completed a task and find that we are + # stalled, we should speculatively indicate that + # we are no longer stalled by transitioning to the + # 'ready' state which will bypass the condition + # wait so that we immediately process the results + # queue and hopefully light up new + # work. Otherwise, stay stalled, and we will wait + # in the condvar. Some other thread will come back + # here with a completed task. + if self.state == NewParallel.State.STALLED and completed_task: + # print(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") + self.state = NewParallel.State.READY + + # Wait until we are neither searching nor stalled. + while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: + # print(f"XXX {threading.get_ident()} Search already in progress, waiting") + self.can_search_cv.wait() + + # If someone set the completed flag, bail. + if self.state == NewParallel.State.COMPLETED: + # print(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") + break + + # Set the searching flag to indicate that a thread + # is currently in the critical section for + # taskmaster work. + # + # print(f"XXX {threading.get_ident()} Starting search") + self.state = NewParallel.State.SEARCHING + + # Bulk acquire the tasks in the results queue + # under the result queue lock, then process them + # all outside that lock. We need to process the + # tasks in the results queue before looking for + # new work because we might be unable to find new + # work if we don't. + results_queue = [] + with self.results_queue_lock: + results_queue, self.results_queue = self.results_queue, results_queue + + # print(f"XXX {threading.get_ident()} Found {len(results_queue)} completed tasks to process") + for (rtask, rresult) in results_queue: + if rresult: + rtask.executed() + else: + if self.interrupted(): + try: + raise SCons.Errors.BuildError( + rtask.targets[0], errstr=interrupt_msg) + except: + rtask.exception_set() + + # Let the failed() callback function arrange + # for the build to stop if that's appropriate. + rtask.failed() + + rtask.postprocess() + self.jobs -= 1 + + # We are done with any task objects that were in + # the results queue. + results_queue.clear() + + # Now, turn the crank on the taskmaster until we + # either run out of tasks, or find a task that + # needs execution. If we run out of tasks, go idle + # until results arrive if jobs are pending, or + # mark the walk as complete if not. + while self.state == NewParallel.State.SEARCHING: + # print(f"XXX {threading.get_ident()} Searching for new tasks") + task = self.taskmaster.next_task() + + if task: + # We found a task. Walk it through the + # task lifecycle. If it does not need + # execution, just complete the task and + # look for the next one. Otherwise, + # indicate that we are no longer searching + # so we can drop out of this loop, execute + # the task outside the lock, and allow + # another thread in to search. + try: + task.prepare() + except: + task.exception_set() + task.failed() + task.postprocess() + else: + if not task.needs_execute(): + # print(f"XXX {threading.get_ident()} Found internal task") + task.executed() + task.postprocess() + else: + self.jobs += 1 + # print(f"XXX {threading.get_ident()} Found task requiring execution") + self.state = NewParallel.State.READY + self.can_search_cv.notify() + + else: + # We failed to find a task, so this thread + # cannot continue turning the taskmaster + # crank. We must exit the loop. + if self.jobs: + # No task was found, but there are + # outstanding jobs executing that + # might unblock new tasks when they + # complete. Transition to the stalled + # state. We do not need a notify, + # because we know there are threads + # outstanding that will re-enter the + # loop. + # + # print(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") + self.state = NewParallel.State.STALLED + else: + # We didn't find a task and there are + # no jobs outstanding, so there is + # nothing that will ever return + # results which might unblock new + # tasks. We can conclude that the walk + # is complete. Update our state to + # note completion and awaken anyone + # sleeping on the condvar. + # + # print(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") + self.state = NewParallel.State.COMPLETED + self.can_search_cv.notify_all() + + # We no longer hold `tm_lock` here. If we have a task, + # we can now execute it. If there are threads waiting + # to search, one of them can now begin turning the + # taskmaster crank in parallel. + if task: + # print(f"XXX {threading.get_ident()} Executing task") + ok = True + try: + if self.interrupted(): + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + task.execute() + except: + ok = False + task.exception_set() + + # Grab the results queue lock and enqueue the + # executed task and state. The next thread into + # the searching loop will complete the + # postprocessing work under the taskmaster lock. + # + # print(f"XXX {threading.get_ident()} Enqueueing executed task results") + with self.results_queue_lock: + self.results_queue.append((task, ok)) + + # Tricky state "fallthrough" here. We are going back + # to the top of the loop, which behaves differently + # depending on whether `task` is set. Do not perturb + # the value of the `task` variable if you add new code + # after this comment. + + Parallel = NewParallel + # Local Variables: # tab-width:4 # indent-tabs-mode:nil -- cgit v0.12