From 3bd3df5a4979bc8fa2fbfb17dd6cd4b284892cc0 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Mon, 16 May 2022 23:19:59 -0400 Subject: Implement new parallel scheduler --- SCons/Taskmaster/Job.py | 266 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 265 insertions(+), 1 deletion(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index b398790..a0bc1d0 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -31,6 +31,9 @@ import SCons.compat import os import signal +import threading + +from enum import Enum import SCons.Errors import SCons.Warnings @@ -342,7 +345,7 @@ else: worker.join(1.0) self.workers = [] - class Parallel: + class LegacyParallel: """This class is used to execute tasks in parallel, and is somewhat less efficient than Serial, but is appropriate for parallel builds. @@ -432,6 +435,267 @@ else: self.tp.cleanup() self.taskmaster.cleanup() + + class NewParallel: + + class State(Enum): + READY = 0 + SEARCHING = 1 + STALLED = 2 + COMPLETED = 3 + + class Worker(threading.Thread): + def __init__(self, owner): + super().__init__() + self.daemon = True + self.owner = owner + self.start() + + def run(self): + self.owner._work() + + def __init__(self, taskmaster, num, stack_size): + self.taskmaster = taskmaster + self.num_workers = num + self.stack_size = stack_size + self.interrupted = InterruptState() + self.workers = [] + + # The `tm_lock` is what ensures that we only have one + # thread interacting with the taskmaster at a time. It + # also protects access to our state that gets updated + # concurrently. The `can_search_cv` is associated with + # this mutex. + self.tm_lock = threading.Lock() + + # Guarded under `tm_lock`. + self.jobs = 0 + self.state = NewParallel.State.READY + + # The `can_search_cv` is used to manage a leader / + # follower pattern for access to the taskmaster, and to + # awaken from stalls. + self.can_search_cv = threading.Condition(self.tm_lock) + + # The queue of tasks that have completed execution. The + # next thread to obtain `tm_lock`` will retire them. + self.results_queue_lock = threading.Lock() + self.results_queue = [] + + def start(self): + self._start_workers() + for worker in self.workers: + worker.join() + self.workers = [] + self.taskmaster.cleanup() + + def _start_workers(self): + prev_size = self._adjust_stack_size() + for _ in range(self.num_workers): + self.workers.append(NewParallel.Worker(self)) + self._restore_stack_size(prev_size) + + def _adjust_stack_size(self): + try: + prev_size = threading.stack_size(self.stack_size*1024) + return prev_size + except AttributeError as e: + # Only print a warning if the stack size has been + # explicitly set. + if explicit_stack_size is not None: + msg = "Setting stack size is unsupported by this version of Python:\n " + \ + e.args[0] + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + except ValueError as e: + msg = "Setting stack size failed:\n " + str(e) + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + + return None + + def _restore_stack_size(self, prev_size): + if prev_size is not None: + threading.stack_size(prev_size) + + def _work(self): + + task = None + + while True: + + # Obtain `tm_lock`, granting exclusive access to the taskmaster. + with self.can_search_cv: + + # print(f"XXX {threading.get_ident()} Gained exclusive access") + + # Capture whether we got here with `task` set, + # then drop our reference to the task as we are no + # longer interested in the actual object. + completed_task = (task is not None) + task = None + + # We will only have `completed_task` set here if + # we have looped back after executing a task. If + # we have completed a task and find that we are + # stalled, we should speculatively indicate that + # we are no longer stalled by transitioning to the + # 'ready' state which will bypass the condition + # wait so that we immediately process the results + # queue and hopefully light up new + # work. Otherwise, stay stalled, and we will wait + # in the condvar. Some other thread will come back + # here with a completed task. + if self.state == NewParallel.State.STALLED and completed_task: + # print(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") + self.state = NewParallel.State.READY + + # Wait until we are neither searching nor stalled. + while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: + # print(f"XXX {threading.get_ident()} Search already in progress, waiting") + self.can_search_cv.wait() + + # If someone set the completed flag, bail. + if self.state == NewParallel.State.COMPLETED: + # print(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") + break + + # Set the searching flag to indicate that a thread + # is currently in the critical section for + # taskmaster work. + # + # print(f"XXX {threading.get_ident()} Starting search") + self.state = NewParallel.State.SEARCHING + + # Bulk acquire the tasks in the results queue + # under the result queue lock, then process them + # all outside that lock. We need to process the + # tasks in the results queue before looking for + # new work because we might be unable to find new + # work if we don't. + results_queue = [] + with self.results_queue_lock: + results_queue, self.results_queue = self.results_queue, results_queue + + # print(f"XXX {threading.get_ident()} Found {len(results_queue)} completed tasks to process") + for (rtask, rresult) in results_queue: + if rresult: + rtask.executed() + else: + if self.interrupted(): + try: + raise SCons.Errors.BuildError( + rtask.targets[0], errstr=interrupt_msg) + except: + rtask.exception_set() + + # Let the failed() callback function arrange + # for the build to stop if that's appropriate. + rtask.failed() + + rtask.postprocess() + self.jobs -= 1 + + # We are done with any task objects that were in + # the results queue. + results_queue.clear() + + # Now, turn the crank on the taskmaster until we + # either run out of tasks, or find a task that + # needs execution. If we run out of tasks, go idle + # until results arrive if jobs are pending, or + # mark the walk as complete if not. + while self.state == NewParallel.State.SEARCHING: + # print(f"XXX {threading.get_ident()} Searching for new tasks") + task = self.taskmaster.next_task() + + if task: + # We found a task. Walk it through the + # task lifecycle. If it does not need + # execution, just complete the task and + # look for the next one. Otherwise, + # indicate that we are no longer searching + # so we can drop out of this loop, execute + # the task outside the lock, and allow + # another thread in to search. + try: + task.prepare() + except: + task.exception_set() + task.failed() + task.postprocess() + else: + if not task.needs_execute(): + # print(f"XXX {threading.get_ident()} Found internal task") + task.executed() + task.postprocess() + else: + self.jobs += 1 + # print(f"XXX {threading.get_ident()} Found task requiring execution") + self.state = NewParallel.State.READY + self.can_search_cv.notify() + + else: + # We failed to find a task, so this thread + # cannot continue turning the taskmaster + # crank. We must exit the loop. + if self.jobs: + # No task was found, but there are + # outstanding jobs executing that + # might unblock new tasks when they + # complete. Transition to the stalled + # state. We do not need a notify, + # because we know there are threads + # outstanding that will re-enter the + # loop. + # + # print(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") + self.state = NewParallel.State.STALLED + else: + # We didn't find a task and there are + # no jobs outstanding, so there is + # nothing that will ever return + # results which might unblock new + # tasks. We can conclude that the walk + # is complete. Update our state to + # note completion and awaken anyone + # sleeping on the condvar. + # + # print(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") + self.state = NewParallel.State.COMPLETED + self.can_search_cv.notify_all() + + # We no longer hold `tm_lock` here. If we have a task, + # we can now execute it. If there are threads waiting + # to search, one of them can now begin turning the + # taskmaster crank in parallel. + if task: + # print(f"XXX {threading.get_ident()} Executing task") + ok = True + try: + if self.interrupted(): + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + task.execute() + except: + ok = False + task.exception_set() + + # Grab the results queue lock and enqueue the + # executed task and state. The next thread into + # the searching loop will complete the + # postprocessing work under the taskmaster lock. + # + # print(f"XXX {threading.get_ident()} Enqueueing executed task results") + with self.results_queue_lock: + self.results_queue.append((task, ok)) + + # Tricky state "fallthrough" here. We are going back + # to the top of the loop, which behaves differently + # depending on whether `task` is set. Do not perturb + # the value of the `task` variable if you add new code + # after this comment. + + Parallel = NewParallel + # Local Variables: # tab-width:4 # indent-tabs-mode:nil -- cgit v0.12 From 8b4c611eee0fd97f6b60ad7db5d380c1ec4c702e Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 13 Oct 2022 15:05:39 -0400 Subject: Restore original Parallel as the default and rename new parallel as experimental --- SCons/Taskmaster/Job.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index a0bc1d0..e9624cf 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -345,7 +345,7 @@ else: worker.join(1.0) self.workers = [] - class LegacyParallel: + class Parallel: """This class is used to execute tasks in parallel, and is somewhat less efficient than Serial, but is appropriate for parallel builds. @@ -436,7 +436,8 @@ else: self.taskmaster.cleanup() - class NewParallel: + # An experimental new parallel scheduler that uses a leaders/followers pattern. + class ExperimentalParallel: class State(Enum): READY = 0 @@ -470,7 +471,7 @@ else: # Guarded under `tm_lock`. self.jobs = 0 - self.state = NewParallel.State.READY + self.state = ExperimentalParallel.State.READY # The `can_search_cv` is used to manage a leader / # follower pattern for access to the taskmaster, and to @@ -492,7 +493,7 @@ else: def _start_workers(self): prev_size = self._adjust_stack_size() for _ in range(self.num_workers): - self.workers.append(NewParallel.Worker(self)) + self.workers.append(ExperimentalParallel.Worker(self)) self._restore_stack_size(prev_size) def _adjust_stack_size(self): @@ -544,17 +545,17 @@ else: # work. Otherwise, stay stalled, and we will wait # in the condvar. Some other thread will come back # here with a completed task. - if self.state == NewParallel.State.STALLED and completed_task: + if self.state == ExperimentalParallel.State.STALLED and completed_task: # print(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") - self.state = NewParallel.State.READY + self.state = ExperimentalParallel.State.READY # Wait until we are neither searching nor stalled. - while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: + while self.state == ExperimentalParallel.State.SEARCHING or self.state == ExperimentalParallel.State.STALLED: # print(f"XXX {threading.get_ident()} Search already in progress, waiting") self.can_search_cv.wait() # If someone set the completed flag, bail. - if self.state == NewParallel.State.COMPLETED: + if self.state == ExperimentalParallel.State.COMPLETED: # print(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") break @@ -563,7 +564,7 @@ else: # taskmaster work. # # print(f"XXX {threading.get_ident()} Starting search") - self.state = NewParallel.State.SEARCHING + self.state = ExperimentalParallel.State.SEARCHING # Bulk acquire the tasks in the results queue # under the result queue lock, then process them @@ -603,7 +604,7 @@ else: # needs execution. If we run out of tasks, go idle # until results arrive if jobs are pending, or # mark the walk as complete if not. - while self.state == NewParallel.State.SEARCHING: + while self.state == ExperimentalParallel.State.SEARCHING: # print(f"XXX {threading.get_ident()} Searching for new tasks") task = self.taskmaster.next_task() @@ -630,7 +631,7 @@ else: else: self.jobs += 1 # print(f"XXX {threading.get_ident()} Found task requiring execution") - self.state = NewParallel.State.READY + self.state = ExperimentalParallel.State.READY self.can_search_cv.notify() else: @@ -648,7 +649,7 @@ else: # loop. # # print(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") - self.state = NewParallel.State.STALLED + self.state = ExperimentalParallel.State.STALLED else: # We didn't find a task and there are # no jobs outstanding, so there is @@ -660,7 +661,7 @@ else: # sleeping on the condvar. # # print(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") - self.state = NewParallel.State.COMPLETED + self.state = ExperimentalParallel.State.COMPLETED self.can_search_cv.notify_all() # We no longer hold `tm_lock` here. If we have a task, @@ -694,8 +695,6 @@ else: # the value of the `task` variable if you add new code # after this comment. - Parallel = NewParallel - # Local Variables: # tab-width:4 # indent-tabs-mode:nil -- cgit v0.12 From da4b75ded0a5ba97af5bb7124d4436df66464c99 Mon Sep 17 00:00:00 2001 From: William Deegan Date: Fri, 21 Oct 2022 13:35:36 -0700 Subject: [ci skip] Resolve outstanding pep8 errors (fix sider complaints and more) --- SCons/Taskmaster/Job.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index e9624cf..d9c98e8 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -145,7 +145,7 @@ class Jobs: else: os._exit(2) # pylint: disable=protected-access - self.old_sigint = signal.signal(signal.SIGINT, handler) + self.old_sigint = signal.signal(signal.SIGINT, handler) self.old_sigterm = signal.signal(signal.SIGTERM, handler) try: self.old_sighup = signal.signal(signal.SIGHUP, handler) @@ -170,6 +170,7 @@ class Jobs: except AttributeError: pass + class Serial: """This class is used to execute tasks in series, and is more efficient than Parallel, but is only appropriate for non-parallel builds. Only @@ -211,7 +212,7 @@ class Serial: try: raise SCons.Errors.BuildError( task.targets[0], errstr=interrupt_msg) - except: + except Exception: task.exception_set() else: task.exception_set() @@ -263,7 +264,7 @@ else: raise SCons.Errors.BuildError( task.targets[0], errstr=interrupt_msg) task.execute() - except: + except Exception: task.exception_set() ok = False else: @@ -284,7 +285,7 @@ else: self.resultsQueue = queue.Queue(0) try: - prev_size = threading.stack_size(stack_size*1024) + prev_size = threading.stack_size(stack_size * 1024) except AttributeError as e: # Only print a warning if the stack size has been # explicitly set. @@ -392,7 +393,7 @@ else: try: # prepare task for execution task.prepare() - except: + except Exception: task.exception_set() task.failed() task.postprocess() @@ -405,7 +406,8 @@ else: task.executed() task.postprocess() - if not task and not jobs: break + if not task and not jobs: + break # Let any/all completed tasks finish up before we go # back and put the next batch of tasks on the queue. @@ -420,7 +422,7 @@ else: try: raise SCons.Errors.BuildError( task.targets[0], errstr=interrupt_msg) - except: + except Exception: task.exception_set() # Let the failed() callback function arrange @@ -435,7 +437,6 @@ else: self.tp.cleanup() self.taskmaster.cleanup() - # An experimental new parallel scheduler that uses a leaders/followers pattern. class ExperimentalParallel: @@ -498,7 +499,7 @@ else: def _adjust_stack_size(self): try: - prev_size = threading.stack_size(self.stack_size*1024) + prev_size = threading.stack_size(self.stack_size * 1024) return prev_size except AttributeError as e: # Only print a warning if the stack size has been @@ -585,7 +586,7 @@ else: try: raise SCons.Errors.BuildError( rtask.targets[0], errstr=interrupt_msg) - except: + except Exception: rtask.exception_set() # Let the failed() callback function arrange @@ -619,7 +620,7 @@ else: # another thread in to search. try: task.prepare() - except: + except Exception: task.exception_set() task.failed() task.postprocess() @@ -676,7 +677,7 @@ else: raise SCons.Errors.BuildError( task.targets[0], errstr=interrupt_msg) task.execute() - except: + except Exception: ok = False task.exception_set() -- cgit v0.12