From cd092f47ddddef487b3a5f747b93dfd292d24628 Mon Sep 17 00:00:00 2001 From: William Deegan Date: Tue, 15 Nov 2022 20:39:29 -0500 Subject: logging working in NewParallel, but changed to be default. Need to figure out how to switch from one to the other --- SCons/Taskmaster/Job.py | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 667d46b..c2a825c 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -91,7 +91,7 @@ class Jobs: stack_size = default_stack_size try: - self.job = Parallel(taskmaster, num, stack_size) + self.job = NewParallel(taskmaster, num, stack_size) self.num_jobs = num except NameError: pass @@ -348,7 +348,7 @@ else: worker.join(1.0) self.workers = [] - class Parallel: + class LegacyParallel: """This class is used to execute tasks in parallel, and is somewhat less efficient than Serial, but is appropriate for parallel builds. @@ -440,7 +440,7 @@ else: self.taskmaster.cleanup() # An experimental new parallel scheduler that uses a leaders/followers pattern. - class ExperimentalParallel: + class NewParallel: class State(Enum): READY = 0 @@ -474,7 +474,7 @@ else: # Guarded under `tm_lock`. self.jobs = 0 - self.state = ExperimentalParallel.State.READY + self.state = NewParallel.State.READY # The `can_search_cv` is used to manage a leader / # follower pattern for access to the taskmaster, and to @@ -500,8 +500,8 @@ else: def trace_message(self, message): # This grabs the name of the function which calls trace_message() method_name = sys._getframe(1).f_code.co_name + "():" - self.trace.debug('%-15s %s' % (method_name, message)) - print('%-15s %s' % (method_name, message)) + self.trace.debug('%s.%s %s' % (type(self).__name__, method_name, message)) + # print('%-15s %s' % (method_name, message)) def start(self): self._start_workers() @@ -513,7 +513,7 @@ else: def _start_workers(self): prev_size = self._adjust_stack_size() for _ in range(self.num_workers): - self.workers.append(ExperimentalParallel.Worker(self)) + self.workers.append(NewParallel.Worker(self)) self._restore_stack_size(prev_size) def _adjust_stack_size(self): @@ -566,19 +566,19 @@ else: # work. Otherwise, stay stalled, and we will wait # in the condvar. Some other thread will come back # here with a completed task. - if self.state == ExperimentalParallel.State.STALLED and completed_task: + if self.state == NewParallel.State.STALLED and completed_task: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") - self.state = ExperimentalParallel.State.READY + self.state = NewParallel.State.READY # Wait until we are neither searching nor stalled. - while self.state == ExperimentalParallel.State.SEARCHING or self.state == ExperimentalParallel.State.STALLED: + while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Search already in progress, waiting") self.can_search_cv.wait() # If someone set the completed flag, bail. - if self.state == ExperimentalParallel.State.COMPLETED: + if self.state == NewParallel.State.COMPLETED: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") break @@ -589,7 +589,7 @@ else: # if self.trace: self.trace_message(f"XXX {threading.get_ident()} Starting search") - self.state = ExperimentalParallel.State.SEARCHING + self.state = NewParallel.State.SEARCHING # Bulk acquire the tasks in the results queue # under the result queue lock, then process them @@ -630,7 +630,7 @@ else: # needs execution. If we run out of tasks, go idle # until results arrive if jobs are pending, or # mark the walk as complete if not. - while self.state == ExperimentalParallel.State.SEARCHING: + while self.state == NewParallel.State.SEARCHING: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Searching for new tasks") task = self.taskmaster.next_task() @@ -660,7 +660,7 @@ else: self.jobs += 1 if self.trace: self.trace_message(f"XXX {threading.get_ident()} Found task requiring execution") - self.state = ExperimentalParallel.State.READY + self.state = NewParallel.State.READY self.can_search_cv.notify() else: @@ -679,7 +679,7 @@ else: # if self.trace: self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") - self.state = ExperimentalParallel.State.STALLED + self.state = NewParallel.State.STALLED else: # We didn't find a task and there are # no jobs outstanding, so there is @@ -692,13 +692,13 @@ else: # if self.trace: self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") - self.state = ExperimentalParallel.State.COMPLETED + self.state = NewParallel.State.COMPLETED self.can_search_cv.notify_all() # We no longer hold `tm_lock` here. If we have a task, # we can now execute it. If there are threads waiting # to search, one of them can now begin turning the - # taskmaster crank in parallel. + # taskmaster crank in NewParallel. if task: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Executing task") @@ -728,9 +728,6 @@ else: # the value of the `task` variable if you add new code # after this comment. -# TODO: Remove this is just for testing. -Parallel = ExperimentalParallel - # Local Variables: # tab-width:4 # indent-tabs-mode:nil -- cgit v0.12