From 4d440cef23ea297ecb73457a807db385ff779f44 Mon Sep 17 00:00:00 2001 From: William Deegan Date: Wed, 26 Oct 2022 20:04:26 -0700 Subject: Initial logic to add logging to ExperimentalParallel job class. Not quite working --- SCons/Taskmaster/Job.py | 61 ++++++++++++++++++++++++++++++++++---------- SCons/Taskmaster/__init__.py | 3 +++ 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index d9c98e8..667d46b 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -29,8 +29,10 @@ stop, and wait on jobs. import SCons.compat +import logging import os import signal +import sys import threading from enum import Enum @@ -484,6 +486,23 @@ else: self.results_queue_lock = threading.Lock() self.results_queue = [] + if self.taskmaster.trace: + self.trace = self._setup_logging() + else: + self.trace = False + + def _setup_logging(self): + jl = logging.getLogger("Job") + jl.setLevel(level=logging.DEBUG) + jl.addHandler(self.taskmaster.trace.log_handler) + return jl + + def trace_message(self, message): + # This grabs the name of the function which calls trace_message() + method_name = sys._getframe(1).f_code.co_name + "():" + self.trace.debug('%-15s %s' % (method_name, message)) + print('%-15s %s' % (method_name, message)) + def start(self): self._start_workers() for worker in self.workers: @@ -527,7 +546,8 @@ else: # Obtain `tm_lock`, granting exclusive access to the taskmaster. with self.can_search_cv: - # print(f"XXX {threading.get_ident()} Gained exclusive access") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Gained exclusive access") # Capture whether we got here with `task` set, # then drop our reference to the task as we are no @@ -547,24 +567,28 @@ else: # in the condvar. Some other thread will come back # here with a completed task. if self.state == ExperimentalParallel.State.STALLED and completed_task: - # print(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") self.state = ExperimentalParallel.State.READY # Wait until we are neither searching nor stalled. while self.state == ExperimentalParallel.State.SEARCHING or self.state == ExperimentalParallel.State.STALLED: - # print(f"XXX {threading.get_ident()} Search already in progress, waiting") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Search already in progress, waiting") self.can_search_cv.wait() # If someone set the completed flag, bail. if self.state == ExperimentalParallel.State.COMPLETED: - # print(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") break # Set the searching flag to indicate that a thread # is currently in the critical section for # taskmaster work. # - # print(f"XXX {threading.get_ident()} Starting search") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Starting search") self.state = ExperimentalParallel.State.SEARCHING # Bulk acquire the tasks in the results queue @@ -577,7 +601,8 @@ else: with self.results_queue_lock: results_queue, self.results_queue = self.results_queue, results_queue - # print(f"XXX {threading.get_ident()} Found {len(results_queue)} completed tasks to process") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found {len(results_queue)} completed tasks to process") for (rtask, rresult) in results_queue: if rresult: rtask.executed() @@ -606,7 +631,8 @@ else: # until results arrive if jobs are pending, or # mark the walk as complete if not. while self.state == ExperimentalParallel.State.SEARCHING: - # print(f"XXX {threading.get_ident()} Searching for new tasks") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Searching for new tasks") task = self.taskmaster.next_task() if task: @@ -626,12 +652,14 @@ else: task.postprocess() else: if not task.needs_execute(): - # print(f"XXX {threading.get_ident()} Found internal task") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found internal task") task.executed() task.postprocess() else: self.jobs += 1 - # print(f"XXX {threading.get_ident()} Found task requiring execution") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found task requiring execution") self.state = ExperimentalParallel.State.READY self.can_search_cv.notify() @@ -649,7 +677,8 @@ else: # outstanding that will re-enter the # loop. # - # print(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") self.state = ExperimentalParallel.State.STALLED else: # We didn't find a task and there are @@ -661,7 +690,8 @@ else: # note completion and awaken anyone # sleeping on the condvar. # - # print(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") self.state = ExperimentalParallel.State.COMPLETED self.can_search_cv.notify_all() @@ -670,7 +700,8 @@ else: # to search, one of them can now begin turning the # taskmaster crank in parallel. if task: - # print(f"XXX {threading.get_ident()} Executing task") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Executing task") ok = True try: if self.interrupted(): @@ -686,7 +717,8 @@ else: # the searching loop will complete the # postprocessing work under the taskmaster lock. # - # print(f"XXX {threading.get_ident()} Enqueueing executed task results") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Enqueueing executed task results") with self.results_queue_lock: self.results_queue.append((task, ok)) @@ -696,6 +728,9 @@ else: # the value of the `task` variable if you add new code # after this comment. +# TODO: Remove this is just for testing. +Parallel = ExperimentalParallel + # Local Variables: # tab-width:4 # indent-tabs-mode:nil diff --git a/SCons/Taskmaster/__init__.py b/SCons/Taskmaster/__init__.py index 3f0e700..2e08a8b 100644 --- a/SCons/Taskmaster/__init__.py +++ b/SCons/Taskmaster/__init__.py @@ -665,10 +665,13 @@ class Taskmaster: task_formatter = logging.Formatter('%(name)s.%(message)s') Task.LOGGER = tl + self.trace.log_handler = log_handler + log_handler.setFormatter(DispatchingFormatter( formatters={ 'Taskmaster': tm_formatter, 'Task': task_formatter, + 'Job': task_formatter, }, default_formatter=logging.Formatter('%(message)s') )) -- cgit v0.12 From cd092f47ddddef487b3a5f747b93dfd292d24628 Mon Sep 17 00:00:00 2001 From: William Deegan Date: Tue, 15 Nov 2022 20:39:29 -0500 Subject: logging working in NewParallel, but changed to be default. Need to figure out how to switch from one to the other --- SCons/Taskmaster/Job.py | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 667d46b..c2a825c 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -91,7 +91,7 @@ class Jobs: stack_size = default_stack_size try: - self.job = Parallel(taskmaster, num, stack_size) + self.job = NewParallel(taskmaster, num, stack_size) self.num_jobs = num except NameError: pass @@ -348,7 +348,7 @@ else: worker.join(1.0) self.workers = [] - class Parallel: + class LegacyParallel: """This class is used to execute tasks in parallel, and is somewhat less efficient than Serial, but is appropriate for parallel builds. @@ -440,7 +440,7 @@ else: self.taskmaster.cleanup() # An experimental new parallel scheduler that uses a leaders/followers pattern. - class ExperimentalParallel: + class NewParallel: class State(Enum): READY = 0 @@ -474,7 +474,7 @@ else: # Guarded under `tm_lock`. self.jobs = 0 - self.state = ExperimentalParallel.State.READY + self.state = NewParallel.State.READY # The `can_search_cv` is used to manage a leader / # follower pattern for access to the taskmaster, and to @@ -500,8 +500,8 @@ else: def trace_message(self, message): # This grabs the name of the function which calls trace_message() method_name = sys._getframe(1).f_code.co_name + "():" - self.trace.debug('%-15s %s' % (method_name, message)) - print('%-15s %s' % (method_name, message)) + self.trace.debug('%s.%s %s' % (type(self).__name__, method_name, message)) + # print('%-15s %s' % (method_name, message)) def start(self): self._start_workers() @@ -513,7 +513,7 @@ else: def _start_workers(self): prev_size = self._adjust_stack_size() for _ in range(self.num_workers): - self.workers.append(ExperimentalParallel.Worker(self)) + self.workers.append(NewParallel.Worker(self)) self._restore_stack_size(prev_size) def _adjust_stack_size(self): @@ -566,19 +566,19 @@ else: # work. Otherwise, stay stalled, and we will wait # in the condvar. Some other thread will come back # here with a completed task. - if self.state == ExperimentalParallel.State.STALLED and completed_task: + if self.state == NewParallel.State.STALLED and completed_task: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") - self.state = ExperimentalParallel.State.READY + self.state = NewParallel.State.READY # Wait until we are neither searching nor stalled. - while self.state == ExperimentalParallel.State.SEARCHING or self.state == ExperimentalParallel.State.STALLED: + while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Search already in progress, waiting") self.can_search_cv.wait() # If someone set the completed flag, bail. - if self.state == ExperimentalParallel.State.COMPLETED: + if self.state == NewParallel.State.COMPLETED: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") break @@ -589,7 +589,7 @@ else: # if self.trace: self.trace_message(f"XXX {threading.get_ident()} Starting search") - self.state = ExperimentalParallel.State.SEARCHING + self.state = NewParallel.State.SEARCHING # Bulk acquire the tasks in the results queue # under the result queue lock, then process them @@ -630,7 +630,7 @@ else: # needs execution. If we run out of tasks, go idle # until results arrive if jobs are pending, or # mark the walk as complete if not. - while self.state == ExperimentalParallel.State.SEARCHING: + while self.state == NewParallel.State.SEARCHING: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Searching for new tasks") task = self.taskmaster.next_task() @@ -660,7 +660,7 @@ else: self.jobs += 1 if self.trace: self.trace_message(f"XXX {threading.get_ident()} Found task requiring execution") - self.state = ExperimentalParallel.State.READY + self.state = NewParallel.State.READY self.can_search_cv.notify() else: @@ -679,7 +679,7 @@ else: # if self.trace: self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") - self.state = ExperimentalParallel.State.STALLED + self.state = NewParallel.State.STALLED else: # We didn't find a task and there are # no jobs outstanding, so there is @@ -692,13 +692,13 @@ else: # if self.trace: self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") - self.state = ExperimentalParallel.State.COMPLETED + self.state = NewParallel.State.COMPLETED self.can_search_cv.notify_all() # We no longer hold `tm_lock` here. If we have a task, # we can now execute it. If there are threads waiting # to search, one of them can now begin turning the - # taskmaster crank in parallel. + # taskmaster crank in NewParallel. if task: if self.trace: self.trace_message(f"XXX {threading.get_ident()} Executing task") @@ -728,9 +728,6 @@ else: # the value of the `task` variable if you add new code # after this comment. -# TODO: Remove this is just for testing. -Parallel = ExperimentalParallel - # Local Variables: # tab-width:4 # indent-tabs-mode:nil -- cgit v0.12 From be2be99f6934c6702138826a32cdb680a75ce5b6 Mon Sep 17 00:00:00 2001 From: William Deegan Date: Thu, 17 Nov 2022 07:36:33 -0800 Subject: Remove extraneous XXX from NewParallel logging. Added SCONS_NEW_PARALLEL shell environment variable to turn on NewParallel. This is a temporary mechanism to enable NewParallel --- SCons/Taskmaster/Job.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index c2a825c..58f3f01 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -53,6 +53,7 @@ default_stack_size = 256 interrupt_msg = 'Build interrupted.' +USE_NEW_PARALLEL = os.environ.get('SCONS_NEW_PARALLEL', False) class InterruptState: def __init__(self): @@ -91,7 +92,10 @@ class Jobs: stack_size = default_stack_size try: - self.job = NewParallel(taskmaster, num, stack_size) + if USE_NEW_PARALLEL: + self.job = NewParallel(taskmaster, num, stack_size) + else: + self.job = LegacyParallel(taskmaster, num, stack_size) self.num_jobs = num except NameError: pass @@ -547,7 +551,7 @@ else: with self.can_search_cv: if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Gained exclusive access") + self.trace_message(f"{threading.get_ident()} Gained exclusive access") # Capture whether we got here with `task` set, # then drop our reference to the task as we are no @@ -568,19 +572,19 @@ else: # here with a completed task. if self.state == NewParallel.State.STALLED and completed_task: if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") + self.trace_message(f"{threading.get_ident()} Detected stall with completed task, bypassing wait") self.state = NewParallel.State.READY # Wait until we are neither searching nor stalled. while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Search already in progress, waiting") + self.trace_message(f"{threading.get_ident()} Search already in progress, waiting") self.can_search_cv.wait() # If someone set the completed flag, bail. if self.state == NewParallel.State.COMPLETED: if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") + self.trace_message(f"{threading.get_ident()} Completion detected, breaking from main loop") break # Set the searching flag to indicate that a thread @@ -588,7 +592,7 @@ else: # taskmaster work. # if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Starting search") + self.trace_message(f"{threading.get_ident()} Starting search") self.state = NewParallel.State.SEARCHING # Bulk acquire the tasks in the results queue @@ -602,7 +606,7 @@ else: results_queue, self.results_queue = self.results_queue, results_queue if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Found {len(results_queue)} completed tasks to process") + self.trace_message(f"{threading.get_ident()} Found {len(results_queue)} completed tasks to process") for (rtask, rresult) in results_queue: if rresult: rtask.executed() @@ -632,7 +636,7 @@ else: # mark the walk as complete if not. while self.state == NewParallel.State.SEARCHING: if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Searching for new tasks") + self.trace_message(f"{threading.get_ident()} Searching for new tasks") task = self.taskmaster.next_task() if task: @@ -653,13 +657,13 @@ else: else: if not task.needs_execute(): if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Found internal task") + self.trace_message(f"{threading.get_ident()} Found internal task") task.executed() task.postprocess() else: self.jobs += 1 if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Found task requiring execution") + self.trace_message(f"{threading.get_ident()} Found task requiring execution") self.state = NewParallel.State.READY self.can_search_cv.notify() @@ -678,7 +682,7 @@ else: # loop. # if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") + self.trace_message(f"{threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") self.state = NewParallel.State.STALLED else: # We didn't find a task and there are @@ -691,7 +695,7 @@ else: # sleeping on the condvar. # if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") + self.trace_message(f"{threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") self.state = NewParallel.State.COMPLETED self.can_search_cv.notify_all() @@ -701,7 +705,7 @@ else: # taskmaster crank in NewParallel. if task: if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Executing task") + self.trace_message(f"{threading.get_ident()} Executing task") ok = True try: if self.interrupted(): @@ -718,7 +722,7 @@ else: # postprocessing work under the taskmaster lock. # if self.trace: - self.trace_message(f"XXX {threading.get_ident()} Enqueueing executed task results") + self.trace_message(f"{threading.get_ident()} Enqueueing executed task results") with self.results_queue_lock: self.results_queue.append((task, ok)) -- cgit v0.12 From 1decdfa5ea74d53663cbae0044f97b3b9a7859a8 Mon Sep 17 00:00:00 2001 From: William Deegan Date: Thu, 17 Nov 2022 10:59:49 -0800 Subject: Fix SCons/Taskmaster/JobTests.py test --- SCons/Taskmaster/JobTests.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/SCons/Taskmaster/JobTests.py b/SCons/Taskmaster/JobTests.py index 374d3f3..57b548c 100644 --- a/SCons/Taskmaster/JobTests.py +++ b/SCons/Taskmaster/JobTests.py @@ -339,8 +339,8 @@ class NoParallelTestCase(unittest.TestCase): """test handling lack of parallel support""" def NoParallel(tm, num, stack_size): raise NameError - save_Parallel = SCons.Taskmaster.Job.Parallel - SCons.Taskmaster.Job.Parallel = NoParallel + save_Parallel = SCons.Taskmaster.Job.LegacyParallel + SCons.Taskmaster.Job.LegacyParallel = NoParallel try: taskmaster = Taskmaster(num_tasks, self, RandomTask) jobs = SCons.Taskmaster.Job.Jobs(2, taskmaster) @@ -358,7 +358,7 @@ class NoParallelTestCase(unittest.TestCase): self.assertFalse(taskmaster.num_failed, "some task(s) failed to execute") finally: - SCons.Taskmaster.Job.Parallel = save_Parallel + SCons.Taskmaster.Job.LegacyParallel = save_Parallel class SerialExceptionTestCase(unittest.TestCase): -- cgit v0.12 From 1d4e8b68953ca500ac3b92db66ab7e6249703ee9 Mon Sep 17 00:00:00 2001 From: William Deegan Date: Mon, 21 Nov 2022 08:09:53 -0800 Subject: [ci skip] Added info to CHANGES/RELEASE files --- CHANGES.txt | 2 ++ RELEASE.txt | 3 +++ 2 files changed, 5 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index 0a9f697..8361204 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -22,6 +22,8 @@ RELEASE VERSION/DATE TO BE FILLED IN LATER NOTE: If you hook into SCons.Jobs, you'll have to change that to use SCons.Taskmaster.Jobs - Changed the Taskmaster trace logic to use python's logging module. The output formatting should remain (mostly) the same. Minor update to unittest for this to adjust for 1 less newline. + - Migrated logging logic for --taskmastertrace to use Python's logging module. Added logging + to NewParallel Job class (Andrew Morrow's new parallel job implementation) diff --git a/RELEASE.txt b/RELEASE.txt index 94ba72e..94bc483 100644 --- a/RELEASE.txt +++ b/RELEASE.txt @@ -33,6 +33,9 @@ CHANGED/ENHANCED EXISTING FUNCTIONALITY DefaultEnvironment anymore. - The console message from the Chmod() action function now displays octal modes using the modern Python syntax (0o755 rather than 0755). +- Migrated logging logic for --taskmastertrace to use Python's logging module. Added logging + to NewParallel Job class (Andrew Morrow's new parallel job implementation) + FIXES ----- -- cgit v0.12