From 4d440cef23ea297ecb73457a807db385ff779f44 Mon Sep 17 00:00:00 2001 From: William Deegan Date: Wed, 26 Oct 2022 20:04:26 -0700 Subject: Initial logic to add logging to ExperimentalParallel job class. Not quite working --- SCons/Taskmaster/Job.py | 61 ++++++++++++++++++++++++++++++++++---------- SCons/Taskmaster/__init__.py | 3 +++ 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index d9c98e8..667d46b 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -29,8 +29,10 @@ stop, and wait on jobs. import SCons.compat +import logging import os import signal +import sys import threading from enum import Enum @@ -484,6 +486,23 @@ else: self.results_queue_lock = threading.Lock() self.results_queue = [] + if self.taskmaster.trace: + self.trace = self._setup_logging() + else: + self.trace = False + + def _setup_logging(self): + jl = logging.getLogger("Job") + jl.setLevel(level=logging.DEBUG) + jl.addHandler(self.taskmaster.trace.log_handler) + return jl + + def trace_message(self, message): + # This grabs the name of the function which calls trace_message() + method_name = sys._getframe(1).f_code.co_name + "():" + self.trace.debug('%-15s %s' % (method_name, message)) + print('%-15s %s' % (method_name, message)) + def start(self): self._start_workers() for worker in self.workers: @@ -527,7 +546,8 @@ else: # Obtain `tm_lock`, granting exclusive access to the taskmaster. with self.can_search_cv: - # print(f"XXX {threading.get_ident()} Gained exclusive access") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Gained exclusive access") # Capture whether we got here with `task` set, # then drop our reference to the task as we are no @@ -547,24 +567,28 @@ else: # in the condvar. Some other thread will come back # here with a completed task. if self.state == ExperimentalParallel.State.STALLED and completed_task: - # print(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Detected stall with completed task, bypassing wait") self.state = ExperimentalParallel.State.READY # Wait until we are neither searching nor stalled. while self.state == ExperimentalParallel.State.SEARCHING or self.state == ExperimentalParallel.State.STALLED: - # print(f"XXX {threading.get_ident()} Search already in progress, waiting") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Search already in progress, waiting") self.can_search_cv.wait() # If someone set the completed flag, bail. if self.state == ExperimentalParallel.State.COMPLETED: - # print(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Completion detected, breaking from main loop") break # Set the searching flag to indicate that a thread # is currently in the critical section for # taskmaster work. # - # print(f"XXX {threading.get_ident()} Starting search") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Starting search") self.state = ExperimentalParallel.State.SEARCHING # Bulk acquire the tasks in the results queue @@ -577,7 +601,8 @@ else: with self.results_queue_lock: results_queue, self.results_queue = self.results_queue, results_queue - # print(f"XXX {threading.get_ident()} Found {len(results_queue)} completed tasks to process") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found {len(results_queue)} completed tasks to process") for (rtask, rresult) in results_queue: if rresult: rtask.executed() @@ -606,7 +631,8 @@ else: # until results arrive if jobs are pending, or # mark the walk as complete if not. while self.state == ExperimentalParallel.State.SEARCHING: - # print(f"XXX {threading.get_ident()} Searching for new tasks") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Searching for new tasks") task = self.taskmaster.next_task() if task: @@ -626,12 +652,14 @@ else: task.postprocess() else: if not task.needs_execute(): - # print(f"XXX {threading.get_ident()} Found internal task") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found internal task") task.executed() task.postprocess() else: self.jobs += 1 - # print(f"XXX {threading.get_ident()} Found task requiring execution") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found task requiring execution") self.state = ExperimentalParallel.State.READY self.can_search_cv.notify() @@ -649,7 +677,8 @@ else: # outstanding that will re-enter the # loop. # - # print(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, but have jobs: marking stalled") self.state = ExperimentalParallel.State.STALLED else: # We didn't find a task and there are @@ -661,7 +690,8 @@ else: # note completion and awaken anyone # sleeping on the condvar. # - # print(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Found no task requiring execution, and have no jobs: marking complete") self.state = ExperimentalParallel.State.COMPLETED self.can_search_cv.notify_all() @@ -670,7 +700,8 @@ else: # to search, one of them can now begin turning the # taskmaster crank in parallel. if task: - # print(f"XXX {threading.get_ident()} Executing task") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Executing task") ok = True try: if self.interrupted(): @@ -686,7 +717,8 @@ else: # the searching loop will complete the # postprocessing work under the taskmaster lock. # - # print(f"XXX {threading.get_ident()} Enqueueing executed task results") + if self.trace: + self.trace_message(f"XXX {threading.get_ident()} Enqueueing executed task results") with self.results_queue_lock: self.results_queue.append((task, ok)) @@ -696,6 +728,9 @@ else: # the value of the `task` variable if you add new code # after this comment. +# TODO: Remove this is just for testing. +Parallel = ExperimentalParallel + # Local Variables: # tab-width:4 # indent-tabs-mode:nil diff --git a/SCons/Taskmaster/__init__.py b/SCons/Taskmaster/__init__.py index 3f0e700..2e08a8b 100644 --- a/SCons/Taskmaster/__init__.py +++ b/SCons/Taskmaster/__init__.py @@ -665,10 +665,13 @@ class Taskmaster: task_formatter = logging.Formatter('%(name)s.%(message)s') Task.LOGGER = tl + self.trace.log_handler = log_handler + log_handler.setFormatter(DispatchingFormatter( formatters={ 'Taskmaster': tm_formatter, 'Task': task_formatter, + 'Job': task_formatter, }, default_formatter=logging.Formatter('%(message)s') )) -- cgit v0.12