From 8c8052c95d4645a584b4332e11aabe0843446678 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 14 Dec 2023 10:49:22 -0500 Subject: threading is always available --- SCons/Taskmaster/Job.py | 909 ++++++++++++++++++++++++------------------------ 1 file changed, 449 insertions(+), 460 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 572464b..b73c941 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -31,6 +31,7 @@ import SCons.compat import logging import os +import queue import signal import sys import threading @@ -95,16 +96,13 @@ class Jobs: if stack_size is None: stack_size = default_stack_size - try: - experimental_option = GetOption('experimental') - if 'tm_v2' in experimental_option: - self.job = NewParallel(taskmaster, num, stack_size) - else: - self.job = LegacyParallel(taskmaster, num, stack_size) + experimental_option = GetOption('experimental') + if 'tm_v2' in experimental_option: + self.job = NewParallel(taskmaster, num, stack_size) + else: + self.job = LegacyParallel(taskmaster, num, stack_size) - self.num_jobs = num - except NameError: - pass + self.num_jobs = num if self.job is None: self.job = Serial(taskmaster) self.num_jobs = 1 @@ -239,505 +237,496 @@ class Serial: self.taskmaster.cleanup() -# Trap import failure so that everything in the Job module but the -# Parallel class (and its dependent classes) will work if the interpreter -# doesn't support threads. -try: - import queue - import threading -except ImportError: - pass -else: - class Worker(threading.Thread): - """A worker thread waits on a task to be posted to its request queue, - dequeues the task, executes it, and posts a tuple including the task - and a boolean indicating whether the task executed successfully. """ +class Worker(threading.Thread): + """A worker thread waits on a task to be posted to its request queue, + dequeues the task, executes it, and posts a tuple including the task + and a boolean indicating whether the task executed successfully. """ - def __init__(self, requestQueue, resultsQueue, interrupted) -> None: - super().__init__() - self.daemon = True - self.requestQueue = requestQueue - self.resultsQueue = resultsQueue - self.interrupted = interrupted - self.start() + def __init__(self, requestQueue, resultsQueue, interrupted) -> None: + super().__init__() + self.daemon = True + self.requestQueue = requestQueue + self.resultsQueue = resultsQueue + self.interrupted = interrupted + self.start() - def run(self): - while True: - task = self.requestQueue.get() + def run(self): + while True: + task = self.requestQueue.get() - if task is None: - # The "None" value is used as a sentinel by - # ThreadPool.cleanup(). This indicates that there - # are no more tasks, so we should quit. - break + if task is None: + # The "None" value is used as a sentinel by + # ThreadPool.cleanup(). This indicates that there + # are no more tasks, so we should quit. + break - try: - if self.interrupted(): - raise SCons.Errors.BuildError( - task.targets[0], errstr=interrupt_msg) - task.execute() - except Exception: - task.exception_set() - ok = False - else: - ok = True + try: + if self.interrupted(): + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + task.execute() + except Exception: + task.exception_set() + ok = False + else: + ok = True - self.resultsQueue.put((task, ok)) + self.resultsQueue.put((task, ok)) - class ThreadPool: - """This class is responsible for spawning and managing worker threads.""" +class ThreadPool: + """This class is responsible for spawning and managing worker threads.""" - def __init__(self, num, stack_size, interrupted) -> None: - """Create the request and reply queues, and 'num' worker threads. + def __init__(self, num, stack_size, interrupted) -> None: + """Create the request and reply queues, and 'num' worker threads. - One must specify the stack size of the worker threads. The - stack size is specified in kilobytes. - """ - self.requestQueue = queue.Queue(0) - self.resultsQueue = queue.Queue(0) + One must specify the stack size of the worker threads. The + stack size is specified in kilobytes. + """ + self.requestQueue = queue.Queue(0) + self.resultsQueue = queue.Queue(0) - try: - prev_size = threading.stack_size(stack_size * 1024) - except AttributeError as e: - # Only print a warning if the stack size has been - # explicitly set. - if explicit_stack_size is not None: - msg = "Setting stack size is unsupported by this version of Python:\n " + \ - e.args[0] - SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) - except ValueError as e: - msg = "Setting stack size failed:\n " + str(e) + try: + prev_size = threading.stack_size(stack_size * 1024) + except AttributeError as e: + # Only print a warning if the stack size has been + # explicitly set. + if explicit_stack_size is not None: + msg = "Setting stack size is unsupported by this version of Python:\n " + \ + e.args[0] SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + except ValueError as e: + msg = "Setting stack size failed:\n " + str(e) + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + + # Create worker threads + self.workers = [] + for _ in range(num): + worker = Worker(self.requestQueue, self.resultsQueue, interrupted) + self.workers.append(worker) + + if 'prev_size' in locals(): + threading.stack_size(prev_size) + + def put(self, task) -> None: + """Put task into request queue.""" + self.requestQueue.put(task) + + def get(self): + """Remove and return a result tuple from the results queue.""" + return self.resultsQueue.get() - # Create worker threads - self.workers = [] - for _ in range(num): - worker = Worker(self.requestQueue, self.resultsQueue, interrupted) - self.workers.append(worker) - - if 'prev_size' in locals(): - threading.stack_size(prev_size) - - def put(self, task) -> None: - """Put task into request queue.""" - self.requestQueue.put(task) - - def get(self): - """Remove and return a result tuple from the results queue.""" - return self.resultsQueue.get() - - def preparation_failed(self, task) -> None: - self.resultsQueue.put((task, False)) - - def cleanup(self) -> None: - """ - Shuts down the thread pool, giving each worker thread a - chance to shut down gracefully. - """ - # For each worker thread, put a sentinel "None" value - # on the requestQueue (indicating that there's no work - # to be done) so that each worker thread will get one and - # terminate gracefully. - for _ in self.workers: - self.requestQueue.put(None) - - # Wait for all of the workers to terminate. - # - # If we don't do this, later Python versions (2.4, 2.5) often - # seem to raise exceptions during shutdown. This happens - # in requestQueue.get(), as an assertion failure that - # requestQueue.not_full is notified while not acquired, - # seemingly because the main thread has shut down (or is - # in the process of doing so) while the workers are still - # trying to pull sentinels off the requestQueue. - # - # Normally these terminations should happen fairly quickly, - # but we'll stick a one-second timeout on here just in case - # someone gets hung. - for worker in self.workers: - worker.join(1.0) - self.workers = [] - - class LegacyParallel: - """This class is used to execute tasks in parallel, and is somewhat - less efficient than Serial, but is appropriate for parallel builds. - - This class is thread safe. + def preparation_failed(self, task) -> None: + self.resultsQueue.put((task, False)) + + def cleanup(self) -> None: + """ + Shuts down the thread pool, giving each worker thread a + chance to shut down gracefully. """ + # For each worker thread, put a sentinel "None" value + # on the requestQueue (indicating that there's no work + # to be done) so that each worker thread will get one and + # terminate gracefully. + for _ in self.workers: + self.requestQueue.put(None) + + # Wait for all of the workers to terminate. + # + # If we don't do this, later Python versions (2.4, 2.5) often + # seem to raise exceptions during shutdown. This happens + # in requestQueue.get(), as an assertion failure that + # requestQueue.not_full is notified while not acquired, + # seemingly because the main thread has shut down (or is + # in the process of doing so) while the workers are still + # trying to pull sentinels off the requestQueue. + # + # Normally these terminations should happen fairly quickly, + # but we'll stick a one-second timeout on here just in case + # someone gets hung. + for worker in self.workers: + worker.join(1.0) + self.workers = [] + +class LegacyParallel: + """This class is used to execute tasks in parallel, and is somewhat + less efficient than Serial, but is appropriate for parallel builds. + + This class is thread safe. + """ - def __init__(self, taskmaster, num, stack_size) -> None: - """Create a new parallel job given a taskmaster. + def __init__(self, taskmaster, num, stack_size) -> None: + """Create a new parallel job given a taskmaster. - The taskmaster's next_task() method should return the next - task that needs to be executed, or None if there are no more - tasks. The taskmaster's executed() method will be called - for each task when it is successfully executed, or failed() - will be called if the task failed to execute (i.e. execute() - raised an exception). + The taskmaster's next_task() method should return the next + task that needs to be executed, or None if there are no more + tasks. The taskmaster's executed() method will be called + for each task when it is successfully executed, or failed() + will be called if the task failed to execute (i.e. execute() + raised an exception). - Note: calls to taskmaster are serialized, but calls to - execute() on distinct tasks are not serialized, because - that is the whole point of parallel jobs: they can execute - multiple tasks simultaneously. """ + Note: calls to taskmaster are serialized, but calls to + execute() on distinct tasks are not serialized, because + that is the whole point of parallel jobs: they can execute + multiple tasks simultaneously. """ - self.taskmaster = taskmaster - self.interrupted = InterruptState() - self.tp = ThreadPool(num, stack_size, self.interrupted) + self.taskmaster = taskmaster + self.interrupted = InterruptState() + self.tp = ThreadPool(num, stack_size, self.interrupted) - self.maxjobs = num + self.maxjobs = num - def start(self): - """Start the job. This will begin pulling tasks from the - taskmaster and executing them, and return when there are no - more tasks. If a task fails to execute (i.e. execute() raises - an exception), then the job will stop.""" + def start(self): + """Start the job. This will begin pulling tasks from the + taskmaster and executing them, and return when there are no + more tasks. If a task fails to execute (i.e. execute() raises + an exception), then the job will stop.""" - jobs = 0 + jobs = 0 - while True: - # Start up as many available tasks as we're - # allowed to. - while jobs < self.maxjobs: - task = self.taskmaster.next_task() - if task is None: - break + while True: + # Start up as many available tasks as we're + # allowed to. + while jobs < self.maxjobs: + task = self.taskmaster.next_task() + if task is None: + break - try: - # prepare task for execution - task.prepare() - except Exception: - task.exception_set() - task.failed() - task.postprocess() + try: + # prepare task for execution + task.prepare() + except Exception: + task.exception_set() + task.failed() + task.postprocess() + else: + if task.needs_execute(): + # dispatch task + self.tp.put(task) + jobs += 1 else: - if task.needs_execute(): - # dispatch task - self.tp.put(task) - jobs += 1 - else: - task.executed() - task.postprocess() + task.executed() + task.postprocess() + + if not task and not jobs: + break + + # Let any/all completed tasks finish up before we go + # back and put the next batch of tasks on the queue. + while True: + task, ok = self.tp.get() + jobs -= 1 + + if ok: + task.executed() + else: + if self.interrupted(): + try: + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + except Exception: + task.exception_set() - if not task and not jobs: + # Let the failed() callback function arrange + # for the build to stop if that's appropriate. + task.failed() + + task.postprocess() + + if self.tp.resultsQueue.empty(): break - # Let any/all completed tasks finish up before we go - # back and put the next batch of tasks on the queue. - while True: - task, ok = self.tp.get() - jobs -= 1 + self.tp.cleanup() + self.taskmaster.cleanup() - if ok: - task.executed() - else: - if self.interrupted(): - try: - raise SCons.Errors.BuildError( - task.targets[0], errstr=interrupt_msg) - except Exception: - task.exception_set() +# An experimental new parallel scheduler that uses a leaders/followers pattern. +class NewParallel: - # Let the failed() callback function arrange - # for the build to stop if that's appropriate. - task.failed() + class State(Enum): + READY = 0 + SEARCHING = 1 + STALLED = 2 + COMPLETED = 3 - task.postprocess() + class Worker(threading.Thread): + def __init__(self, owner) -> None: + super().__init__() + self.daemon = True + self.owner = owner + self.start() - if self.tp.resultsQueue.empty(): - break - - self.tp.cleanup() - self.taskmaster.cleanup() - - # An experimental new parallel scheduler that uses a leaders/followers pattern. - class NewParallel: - - class State(Enum): - READY = 0 - SEARCHING = 1 - STALLED = 2 - COMPLETED = 3 - - class Worker(threading.Thread): - def __init__(self, owner) -> None: - super().__init__() - self.daemon = True - self.owner = owner - self.start() - - def run(self) -> None: - self.owner._work() - - def __init__(self, taskmaster, num, stack_size) -> None: - self.taskmaster = taskmaster - self.num_workers = num - self.stack_size = stack_size - self.interrupted = InterruptState() - self.workers = [] - - # The `tm_lock` is what ensures that we only have one - # thread interacting with the taskmaster at a time. It - # also protects access to our state that gets updated - # concurrently. The `can_search_cv` is associated with - # this mutex. - self.tm_lock = threading.Lock() - - # Guarded under `tm_lock`. - self.jobs = 0 - self.state = NewParallel.State.READY - - # The `can_search_cv` is used to manage a leader / - # follower pattern for access to the taskmaster, and to - # awaken from stalls. - self.can_search_cv = threading.Condition(self.tm_lock) - - # The queue of tasks that have completed execution. The - # next thread to obtain `tm_lock`` will retire them. - self.results_queue_lock = threading.Lock() - self.results_queue = [] - - if self.taskmaster.trace: - self.trace = self._setup_logging() - else: - self.trace = False - - def _setup_logging(self): - jl = logging.getLogger("Job") - jl.setLevel(level=logging.DEBUG) - jl.addHandler(self.taskmaster.trace.log_handler) - return jl - - def trace_message(self, message) -> None: - # This grabs the name of the function which calls trace_message() - method_name = sys._getframe(1).f_code.co_name + "():" - thread_id=threading.get_ident() - self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message)) - # print('%-15s %s' % (method_name, message)) - - def start(self) -> None: - self._start_workers() - for worker in self.workers: - worker.join() - self.workers = [] - self.taskmaster.cleanup() - - def _start_workers(self) -> None: - prev_size = self._adjust_stack_size() - for _ in range(self.num_workers): - self.workers.append(NewParallel.Worker(self)) - self._restore_stack_size(prev_size) - - def _adjust_stack_size(self): - try: - prev_size = threading.stack_size(self.stack_size * 1024) - return prev_size - except AttributeError as e: - # Only print a warning if the stack size has been - # explicitly set. - if explicit_stack_size is not None: - msg = "Setting stack size is unsupported by this version of Python:\n " + \ - e.args[0] - SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) - except ValueError as e: - msg = "Setting stack size failed:\n " + str(e) - SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + def run(self) -> None: + self.owner._work() - return None + def __init__(self, taskmaster, num, stack_size) -> None: + self.taskmaster = taskmaster + self.num_workers = num + self.stack_size = stack_size + self.interrupted = InterruptState() + self.workers = [] + + # The `tm_lock` is what ensures that we only have one + # thread interacting with the taskmaster at a time. It + # also protects access to our state that gets updated + # concurrently. The `can_search_cv` is associated with + # this mutex. + self.tm_lock = threading.Lock() + + # Guarded under `tm_lock`. + self.jobs = 0 + self.state = NewParallel.State.READY + + # The `can_search_cv` is used to manage a leader / + # follower pattern for access to the taskmaster, and to + # awaken from stalls. + self.can_search_cv = threading.Condition(self.tm_lock) + + # The queue of tasks that have completed execution. The + # next thread to obtain `tm_lock`` will retire them. + self.results_queue_lock = threading.Lock() + self.results_queue = [] + + if self.taskmaster.trace: + self.trace = self._setup_logging() + else: + self.trace = False + + def _setup_logging(self): + jl = logging.getLogger("Job") + jl.setLevel(level=logging.DEBUG) + jl.addHandler(self.taskmaster.trace.log_handler) + return jl + + def trace_message(self, message) -> None: + # This grabs the name of the function which calls trace_message() + method_name = sys._getframe(1).f_code.co_name + "():" + thread_id=threading.get_ident() + self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message)) + # print('%-15s %s' % (method_name, message)) + + def start(self) -> None: + self._start_workers() + for worker in self.workers: + worker.join() + self.workers = [] + self.taskmaster.cleanup() - def _restore_stack_size(self, prev_size) -> None: - if prev_size is not None: - threading.stack_size(prev_size) + def _start_workers(self) -> None: + prev_size = self._adjust_stack_size() + for _ in range(self.num_workers): + self.workers.append(NewParallel.Worker(self)) + self._restore_stack_size(prev_size) - def _work(self): + def _adjust_stack_size(self): + try: + prev_size = threading.stack_size(self.stack_size * 1024) + return prev_size + except AttributeError as e: + # Only print a warning if the stack size has been + # explicitly set. + if explicit_stack_size is not None: + msg = "Setting stack size is unsupported by this version of Python:\n " + \ + e.args[0] + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) + except ValueError as e: + msg = "Setting stack size failed:\n " + str(e) + SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) - task = None + return None - while True: + def _restore_stack_size(self, prev_size) -> None: + if prev_size is not None: + threading.stack_size(prev_size) - # Obtain `tm_lock`, granting exclusive access to the taskmaster. - with self.can_search_cv: + def _work(self): + task = None + + while True: + + # Obtain `tm_lock`, granting exclusive access to the taskmaster. + with self.can_search_cv: + + if self.trace: + self.trace_message("Gained exclusive access") + + # Capture whether we got here with `task` set, + # then drop our reference to the task as we are no + # longer interested in the actual object. + completed_task = (task is not None) + task = None + + # We will only have `completed_task` set here if + # we have looped back after executing a task. If + # we have completed a task and find that we are + # stalled, we should speculatively indicate that + # we are no longer stalled by transitioning to the + # 'ready' state which will bypass the condition + # wait so that we immediately process the results + # queue and hopefully light up new + # work. Otherwise, stay stalled, and we will wait + # in the condvar. Some other thread will come back + # here with a completed task. + if self.state == NewParallel.State.STALLED and completed_task: if self.trace: - self.trace_message("Gained exclusive access") - - # Capture whether we got here with `task` set, - # then drop our reference to the task as we are no - # longer interested in the actual object. - completed_task = (task is not None) - task = None - - # We will only have `completed_task` set here if - # we have looped back after executing a task. If - # we have completed a task and find that we are - # stalled, we should speculatively indicate that - # we are no longer stalled by transitioning to the - # 'ready' state which will bypass the condition - # wait so that we immediately process the results - # queue and hopefully light up new - # work. Otherwise, stay stalled, and we will wait - # in the condvar. Some other thread will come back - # here with a completed task. - if self.state == NewParallel.State.STALLED and completed_task: - if self.trace: - self.trace_message("Detected stall with completed task, bypassing wait") - self.state = NewParallel.State.READY - - # Wait until we are neither searching nor stalled. - while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: - if self.trace: - self.trace_message("Search already in progress, waiting") - self.can_search_cv.wait() - - # If someone set the completed flag, bail. - if self.state == NewParallel.State.COMPLETED: - if self.trace: - self.trace_message("Completion detected, breaking from main loop") - break - - # Set the searching flag to indicate that a thread - # is currently in the critical section for - # taskmaster work. - # + self.trace_message("Detected stall with completed task, bypassing wait") + self.state = NewParallel.State.READY + + # Wait until we are neither searching nor stalled. + while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED: if self.trace: - self.trace_message("Starting search") - self.state = NewParallel.State.SEARCHING - - # Bulk acquire the tasks in the results queue - # under the result queue lock, then process them - # all outside that lock. We need to process the - # tasks in the results queue before looking for - # new work because we might be unable to find new - # work if we don't. - results_queue = [] - with self.results_queue_lock: - results_queue, self.results_queue = self.results_queue, results_queue + self.trace_message("Search already in progress, waiting") + self.can_search_cv.wait() + # If someone set the completed flag, bail. + if self.state == NewParallel.State.COMPLETED: if self.trace: - self.trace_message("Found {len(results_queue)} completed tasks to process") - for (rtask, rresult) in results_queue: - if rresult: - rtask.executed() - else: - if self.interrupted(): - try: - raise SCons.Errors.BuildError( - rtask.targets[0], errstr=interrupt_msg) - except Exception: - rtask.exception_set() - - # Let the failed() callback function arrange - # for the build to stop if that's appropriate. - rtask.failed() - - rtask.postprocess() - self.jobs -= 1 - - # We are done with any task objects that were in - # the results queue. - results_queue.clear() - - # Now, turn the crank on the taskmaster until we - # either run out of tasks, or find a task that - # needs execution. If we run out of tasks, go idle - # until results arrive if jobs are pending, or - # mark the walk as complete if not. - while self.state == NewParallel.State.SEARCHING: - if self.trace: - self.trace_message("Searching for new tasks") - task = self.taskmaster.next_task() - - if task: - # We found a task. Walk it through the - # task lifecycle. If it does not need - # execution, just complete the task and - # look for the next one. Otherwise, - # indicate that we are no longer searching - # so we can drop out of this loop, execute - # the task outside the lock, and allow - # another thread in to search. + self.trace_message("Completion detected, breaking from main loop") + break + + # Set the searching flag to indicate that a thread + # is currently in the critical section for + # taskmaster work. + # + if self.trace: + self.trace_message("Starting search") + self.state = NewParallel.State.SEARCHING + + # Bulk acquire the tasks in the results queue + # under the result queue lock, then process them + # all outside that lock. We need to process the + # tasks in the results queue before looking for + # new work because we might be unable to find new + # work if we don't. + results_queue = [] + with self.results_queue_lock: + results_queue, self.results_queue = self.results_queue, results_queue + + if self.trace: + self.trace_message("Found {len(results_queue)} completed tasks to process") + for (rtask, rresult) in results_queue: + if rresult: + rtask.executed() + else: + if self.interrupted(): try: - task.prepare() + raise SCons.Errors.BuildError( + rtask.targets[0], errstr=interrupt_msg) except Exception: - task.exception_set() - task.failed() - task.postprocess() - else: - if not task.needs_execute(): - if self.trace: - self.trace_message("Found internal task") - task.executed() - task.postprocess() - else: - self.jobs += 1 - if self.trace: - self.trace_message("Found task requiring execution") - self.state = NewParallel.State.READY - self.can_search_cv.notify() + rtask.exception_set() + + # Let the failed() callback function arrange + # for the build to stop if that's appropriate. + rtask.failed() + + rtask.postprocess() + self.jobs -= 1 + + # We are done with any task objects that were in + # the results queue. + results_queue.clear() + # Now, turn the crank on the taskmaster until we + # either run out of tasks, or find a task that + # needs execution. If we run out of tasks, go idle + # until results arrive if jobs are pending, or + # mark the walk as complete if not. + while self.state == NewParallel.State.SEARCHING: + if self.trace: + self.trace_message("Searching for new tasks") + task = self.taskmaster.next_task() + + if task: + # We found a task. Walk it through the + # task lifecycle. If it does not need + # execution, just complete the task and + # look for the next one. Otherwise, + # indicate that we are no longer searching + # so we can drop out of this loop, execute + # the task outside the lock, and allow + # another thread in to search. + try: + task.prepare() + except Exception: + task.exception_set() + task.failed() + task.postprocess() else: - # We failed to find a task, so this thread - # cannot continue turning the taskmaster - # crank. We must exit the loop. - if self.jobs: - # No task was found, but there are - # outstanding jobs executing that - # might unblock new tasks when they - # complete. Transition to the stalled - # state. We do not need a notify, - # because we know there are threads - # outstanding that will re-enter the - # loop. - # + if not task.needs_execute(): if self.trace: - self.trace_message("Found no task requiring execution, but have jobs: marking stalled") - self.state = NewParallel.State.STALLED + self.trace_message("Found internal task") + task.executed() + task.postprocess() else: - # We didn't find a task and there are - # no jobs outstanding, so there is - # nothing that will ever return - # results which might unblock new - # tasks. We can conclude that the walk - # is complete. Update our state to - # note completion and awaken anyone - # sleeping on the condvar. - # + self.jobs += 1 if self.trace: - self.trace_message("Found no task requiring execution, and have no jobs: marking complete") - self.state = NewParallel.State.COMPLETED - self.can_search_cv.notify_all() - - # We no longer hold `tm_lock` here. If we have a task, - # we can now execute it. If there are threads waiting - # to search, one of them can now begin turning the - # taskmaster crank in NewParallel. - if task: - if self.trace: - self.trace_message("Executing task") - ok = True - try: - if self.interrupted(): - raise SCons.Errors.BuildError( - task.targets[0], errstr=interrupt_msg) - task.execute() - except Exception: - ok = False - task.exception_set() + self.trace_message("Found task requiring execution") + self.state = NewParallel.State.READY + self.can_search_cv.notify() - # Grab the results queue lock and enqueue the - # executed task and state. The next thread into - # the searching loop will complete the - # postprocessing work under the taskmaster lock. - # - if self.trace: - self.trace_message("Enqueueing executed task results") - with self.results_queue_lock: - self.results_queue.append((task, ok)) - - # Tricky state "fallthrough" here. We are going back - # to the top of the loop, which behaves differently - # depending on whether `task` is set. Do not perturb - # the value of the `task` variable if you add new code - # after this comment. + else: + # We failed to find a task, so this thread + # cannot continue turning the taskmaster + # crank. We must exit the loop. + if self.jobs: + # No task was found, but there are + # outstanding jobs executing that + # might unblock new tasks when they + # complete. Transition to the stalled + # state. We do not need a notify, + # because we know there are threads + # outstanding that will re-enter the + # loop. + # + if self.trace: + self.trace_message("Found no task requiring execution, but have jobs: marking stalled") + self.state = NewParallel.State.STALLED + else: + # We didn't find a task and there are + # no jobs outstanding, so there is + # nothing that will ever return + # results which might unblock new + # tasks. We can conclude that the walk + # is complete. Update our state to + # note completion and awaken anyone + # sleeping on the condvar. + # + if self.trace: + self.trace_message("Found no task requiring execution, and have no jobs: marking complete") + self.state = NewParallel.State.COMPLETED + self.can_search_cv.notify_all() + + # We no longer hold `tm_lock` here. If we have a task, + # we can now execute it. If there are threads waiting + # to search, one of them can now begin turning the + # taskmaster crank in NewParallel. + if task: + if self.trace: + self.trace_message("Executing task") + ok = True + try: + if self.interrupted(): + raise SCons.Errors.BuildError( + task.targets[0], errstr=interrupt_msg) + task.execute() + except Exception: + ok = False + task.exception_set() + + # Grab the results queue lock and enqueue the + # executed task and state. The next thread into + # the searching loop will complete the + # postprocessing work under the taskmaster lock. + # + if self.trace: + self.trace_message("Enqueueing executed task results") + with self.results_queue_lock: + self.results_queue.append((task, ok)) + + # Tricky state "fallthrough" here. We are going back + # to the top of the loop, which behaves differently + # depending on whether `task` is set. Do not perturb + # the value of the `task` variable if you add new code + # after this comment. # Local Variables: # tab-width:4 -- cgit v0.12 From 629b425f836c89ed2d25d2709610b32960d4d12e Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 14 Dec 2023 10:56:14 -0500 Subject: fixup --- SCons/Taskmaster/JobTests.py | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/SCons/Taskmaster/JobTests.py b/SCons/Taskmaster/JobTests.py index 3faa97d..b114d05 100644 --- a/SCons/Taskmaster/JobTests.py +++ b/SCons/Taskmaster/JobTests.py @@ -348,34 +348,6 @@ class SerialTestCase(unittest.TestCase): "some task(s) failed to execute") -class NoParallelTestCase(JobTestCase): - - def runTest(self) -> None: - """test handling lack of parallel support""" - def NoParallel(tm, num, stack_size): - raise NameError - save_Parallel = SCons.Taskmaster.Job.LegacyParallel - SCons.Taskmaster.Job.LegacyParallel = NoParallel - try: - taskmaster = Taskmaster(num_tasks, self, RandomTask) - jobs = SCons.Taskmaster.Job.Jobs(2, taskmaster) - self.assertTrue(jobs.num_jobs == 1, - "unexpected number of jobs %d" % jobs.num_jobs) - jobs.run() - self.assertTrue(taskmaster.tasks_were_serial(), - "the tasks were not executed in series") - self.assertTrue(taskmaster.all_tasks_are_executed(), - "all the tests were not executed") - self.assertTrue(taskmaster.all_tasks_are_iterated(), - "all the tests were not iterated over") - self.assertTrue(taskmaster.all_tasks_are_postprocessed(), - "all the tests were not postprocessed") - self.assertFalse(taskmaster.num_failed, - "some task(s) failed to execute") - finally: - SCons.Taskmaster.Job.LegacyParallel = save_Parallel - - class SerialExceptionTestCase(unittest.TestCase): def runTest(self) -> None: """test a serial job with tasks that raise exceptions""" -- cgit v0.12 From bc0fddc7c5a33475759c4f63f840942b1b448211 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 14 Dec 2023 11:32:35 -0500 Subject: Always use NewParallel with tm_v2 flag --- SCons/Taskmaster/Job.py | 38 ++++++++++++++------------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index b73c941..56a7c1e 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -73,16 +73,9 @@ class Jobs: def __init__(self, num, taskmaster) -> None: """ - Create 'num' jobs using the given taskmaster. - - If 'num' is 1 or less, then a serial job will be used, - otherwise a parallel job with 'num' worker threads will - be used. - - The 'num_jobs' attribute will be set to the actual number of jobs - allocated. If more than one job is requested but the Parallel - class can't do it, it gets reset to 1. Wrapping interfaces that - care should check the value of 'num_jobs' after initialization. + Create 'num' jobs using the given taskmaster. The exact implementation + used varies with the number of jobs requested and the state of the `tm_v2` flag + to `--experimental`. """ # Importing GetOption here instead of at top of file to avoid @@ -90,22 +83,19 @@ class Jobs: # pylint: disable=import-outside-toplevel from SCons.Script import GetOption - self.job = None - if num > 1: - stack_size = explicit_stack_size - if stack_size is None: - stack_size = default_stack_size + stack_size = explicit_stack_size + if stack_size is None: + stack_size = default_stack_size - experimental_option = GetOption('experimental') - if 'tm_v2' in experimental_option: - self.job = NewParallel(taskmaster, num, stack_size) - else: + experimental_option = GetOption('experimental') or [] + if 'tm_v2' in experimental_option: + self.job = NewParallel(taskmaster, num, stack_size) + else: + if num > 1: self.job = LegacyParallel(taskmaster, num, stack_size) - - self.num_jobs = num - if self.job is None: - self.job = Serial(taskmaster) - self.num_jobs = 1 + else: + self.job = Serial(taskmaster) + self.num_jobs = num def run(self, postfunc=lambda: None) -> None: """Run the jobs. -- cgit v0.12 From ac496f714e39097194b6bb6c2efef94d17c158a7 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 14 Dec 2023 11:33:53 -0500 Subject: Let the main thread participate in NewParallel --- SCons/Taskmaster/Job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 56a7c1e..e6f37b6 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -449,7 +449,7 @@ class NewParallel: def __init__(self, taskmaster, num, stack_size) -> None: self.taskmaster = taskmaster - self.num_workers = num + self.num_workers = num - 1 self.stack_size = stack_size self.interrupted = InterruptState() self.workers = [] @@ -505,6 +505,7 @@ class NewParallel: for _ in range(self.num_workers): self.workers.append(NewParallel.Worker(self)) self._restore_stack_size(prev_size) + _work() def _adjust_stack_size(self): try: -- cgit v0.12 From 2d0d367ed22e54ef1ae32ca464891e0b7ba426b1 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 14 Dec 2023 11:45:25 -0500 Subject: fixup --- SCons/Taskmaster/Job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index e6f37b6..d1d9501 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -505,7 +505,7 @@ class NewParallel: for _ in range(self.num_workers): self.workers.append(NewParallel.Worker(self)) self._restore_stack_size(prev_size) - _work() + self._work() def _adjust_stack_size(self): try: -- cgit v0.12 From a64b1bfd9614413fee4c6584bb2b5feddca73534 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 14 Dec 2023 14:40:03 -0500 Subject: No locks needed for -j1 with NewParallel --- SCons/Taskmaster/Job.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index d1d9501..0bd4060 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -447,6 +447,30 @@ class NewParallel: def run(self) -> None: self.owner._work() + class FakeLock(object): + def lock(self): + pass + def unlock(self): + pass + def __enter__(self): + pass + def __exit__(self, *args): + pass + + class FakeCondition(object): + def __init__(self, lock): + pass + def wait(self): + fatal(); + def notify(self): + pass + def notify_all(self): + pass + def __enter__(self): + pass + def __exit__(self, *args): + pass + def __init__(self, taskmaster, num, stack_size) -> None: self.taskmaster = taskmaster self.num_workers = num - 1 @@ -459,7 +483,7 @@ class NewParallel: # also protects access to our state that gets updated # concurrently. The `can_search_cv` is associated with # this mutex. - self.tm_lock = threading.Lock() + self.tm_lock = (threading.Lock if self.num_workers > 0 else NewParallel.FakeLock)() # Guarded under `tm_lock`. self.jobs = 0 @@ -468,11 +492,11 @@ class NewParallel: # The `can_search_cv` is used to manage a leader / # follower pattern for access to the taskmaster, and to # awaken from stalls. - self.can_search_cv = threading.Condition(self.tm_lock) + self.can_search_cv = (threading.Condition if self.num_workers > 0 else NewParallel.FakeCondition)(self.tm_lock) # The queue of tasks that have completed execution. The # next thread to obtain `tm_lock`` will retire them. - self.results_queue_lock = threading.Lock() + self.results_queue_lock = (threading.Lock if self.num_workers > 0 else NewParallel.FakeLock)() self.results_queue = [] if self.taskmaster.trace: -- cgit v0.12 From 8d7067d2a43a708e68ade68546f4ffc940a780bf Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Tue, 9 Jan 2024 13:06:37 -0500 Subject: wip --- CHANGES.txt | 6 +++++- SCons/Script/SConsOptions.py | 2 +- SCons/Taskmaster/Job.py | 11 ++++++----- SCons/Taskmaster/JobTests.py | 7 +++++-- doc/man/scons.xml | 2 +- test/option/option--experimental.py | 8 ++++---- test/option/taskmastertrace.py | 5 +++-- 7 files changed, 25 insertions(+), 16 deletions(-) mode change 100644 => 100755 test/option/option--experimental.py mode change 100644 => 100755 test/option/taskmastertrace.py diff --git a/CHANGES.txt b/CHANGES.txt index dfde4c5..cee9947 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -61,6 +61,11 @@ RELEASE VERSION/DATE TO BE FILLED IN LATER expected bytecodes in ActionTests.py. + From Andrew Morrow: + - The NewParallel scheduler is now the default, the `tm_v2` flag is removed, + and the old scheduler is opt-in under `--experimental=legacysched`. Additionally, + the new scheduler is now used for -j1 builds as well. + RELEASE 4.6.0 - Sun, 19 Nov 2023 17:22:20 -0700 From Max Bachmann: @@ -7950,4 +7955,3 @@ A brief overview of important functionality available in release 0.01: - Linux packages available in RPM and Debian format. - Windows installer available. - diff --git a/SCons/Script/SConsOptions.py b/SCons/Script/SConsOptions.py index b74353e..25675f3 100644 --- a/SCons/Script/SConsOptions.py +++ b/SCons/Script/SConsOptions.py @@ -40,7 +40,7 @@ SUPPRESS_HELP = optparse.SUPPRESS_HELP diskcheck_all = SCons.Node.FS.diskcheck_types() -experimental_features = {'warp_speed', 'transporter', 'ninja', 'tm_v2'} +experimental_features = {'warp_speed', 'transporter', 'ninja', 'legacysched'} def diskcheck_convert(value): diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 0bd4060..66387d6 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -74,7 +74,7 @@ class Jobs: def __init__(self, num, taskmaster) -> None: """ Create 'num' jobs using the given taskmaster. The exact implementation - used varies with the number of jobs requested and the state of the `tm_v2` flag + used varies with the number of jobs requested and the state of the `legacysched` flag to `--experimental`. """ @@ -88,13 +88,14 @@ class Jobs: stack_size = default_stack_size experimental_option = GetOption('experimental') or [] - if 'tm_v2' in experimental_option: - self.job = NewParallel(taskmaster, num, stack_size) - else: + if 'legacysched' in experimental_option: if num > 1: self.job = LegacyParallel(taskmaster, num, stack_size) else: self.job = Serial(taskmaster) + else: + self.job = NewParallel(taskmaster, num, stack_size) + self.num_jobs = num def run(self, postfunc=lambda: None) -> None: @@ -617,7 +618,7 @@ class NewParallel: results_queue, self.results_queue = self.results_queue, results_queue if self.trace: - self.trace_message("Found {len(results_queue)} completed tasks to process") + self.trace_message(f"Found {len(results_queue)} completed tasks to process") for (rtask, rresult) in results_queue: if rresult: rtask.executed() diff --git a/SCons/Taskmaster/JobTests.py b/SCons/Taskmaster/JobTests.py index b114d05..c457e78 100644 --- a/SCons/Taskmaster/JobTests.py +++ b/SCons/Taskmaster/JobTests.py @@ -525,14 +525,17 @@ class SerialTaskTest(_SConsTaskTest): """test serial jobs with actual Taskmaster and Task""" self._test_seq(1) + # Now run test with LegacyParallel + OptionsParser.values.experimental=['legacysched'] + self._test_seq(1) class ParallelTaskTest(_SConsTaskTest): def runTest(self) -> None: """test parallel jobs with actual Taskmaster and Task""" self._test_seq(num_jobs) - # Now run test with NewParallel() instead of LegacyParallel - OptionsParser.values.experimental=['tm_v2'] + # Now run test with LegacyParallel + OptionsParser.values.experimental=['legacysched'] self._test_seq(num_jobs) diff --git a/doc/man/scons.xml b/doc/man/scons.xml index 22f93f5..9432dbe 100644 --- a/doc/man/scons.xml +++ b/doc/man/scons.xml @@ -1212,7 +1212,7 @@ the mechanisms in the specified order. The default setting is none. Current available features are: ninja (New in version 4.2), - tm_v2 (New in version 4.4.1). + legacysched (New in version 4.6.0). No Support offered for any features or tools enabled by this flag. diff --git a/test/option/option--experimental.py b/test/option/option--experimental.py old mode 100644 new mode 100755 index 324de99..2ad189e --- a/test/option/option--experimental.py +++ b/test/option/option--experimental.py @@ -36,13 +36,13 @@ test.file_fixture('fixture/SConstruct__experimental', 'SConstruct') tests = [ ('.', []), ('--experimental=ninja', ['ninja']), - ('--experimental=tm_v2', ['tm_v2']), - ('--experimental=all', ['ninja', 'tm_v2', 'transporter', 'warp_speed']), + ('--experimental=legacysched', ['legacysched']), + ('--experimental=all', ['ninja', 'legacysched', 'transporter', 'warp_speed']), ('--experimental=none', []), ] for args, exper in tests: - read_string = """All Features=ninja,tm_v2,transporter,warp_speed + read_string = """All Features=legacysched,ninja,transporter,warp_speed Experimental=%s """ % (exper) test.run(arguments=args, @@ -51,7 +51,7 @@ Experimental=%s test.run(arguments='--experimental=warp_drive', stderr="""usage: scons [OPTIONS] [VARIABLES] [TARGETS] -SCons Error: option --experimental: invalid choice: 'warp_drive' (choose from 'all','none','ninja','tm_v2','transporter','warp_speed') +SCons Error: option --experimental: invalid choice: 'warp_drive' (choose from 'all','none','legacysched',ninja','transporter','warp_speed') """, status=2) diff --git a/test/option/taskmastertrace.py b/test/option/taskmastertrace.py old mode 100644 new mode 100755 index 99de718..9ff3d37 --- a/test/option/taskmastertrace.py +++ b/test/option/taskmastertrace.py @@ -51,11 +51,12 @@ Copy("Tfile.mid", "Tfile.in") Copy("Tfile.out", "Tfile.mid") """) -test.run(arguments='--taskmastertrace=trace.out .', stdout=expect_stdout) +# Test LegacyParallel Job implementation +test.run(arguments='--experimental=legacysched --taskmastertrace=trace.out .', stdout=expect_stdout) test.must_match_file('trace.out', 'taskmaster_expected_file_1.txt', mode='r') # Test NewParallel Job implementation -test.run(arguments='-j 2 --experimental=tm_v2 --taskmastertrace=new_parallel_trace.out .') +test.run(arguments='-j 2 --taskmastertrace=new_parallel_trace.out .') new_trace = test.read('new_parallel_trace.out', mode='r') thread_id = re.compile(r'\[Thread:\d+\]') -- cgit v0.12 From 2d562d02596186a8561c986e895dafeb54dcf1f4 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Fri, 19 Jan 2024 15:34:46 -0500 Subject: Use main thread as worker if -j1 with NewParallel --- SCons/Taskmaster/Job.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 66387d6..60992be 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -474,7 +474,7 @@ class NewParallel: def __init__(self, taskmaster, num, stack_size) -> None: self.taskmaster = taskmaster - self.num_workers = num - 1 + self.num_workers = num self.stack_size = stack_size self.interrupted = InterruptState() self.workers = [] @@ -484,7 +484,7 @@ class NewParallel: # also protects access to our state that gets updated # concurrently. The `can_search_cv` is associated with # this mutex. - self.tm_lock = (threading.Lock if self.num_workers > 0 else NewParallel.FakeLock)() + self.tm_lock = (threading.Lock if self.num_workers > 1 else NewParallel.FakeLock)() # Guarded under `tm_lock`. self.jobs = 0 @@ -493,11 +493,11 @@ class NewParallel: # The `can_search_cv` is used to manage a leader / # follower pattern for access to the taskmaster, and to # awaken from stalls. - self.can_search_cv = (threading.Condition if self.num_workers > 0 else NewParallel.FakeCondition)(self.tm_lock) + self.can_search_cv = (threading.Condition if self.num_workers > 1 else NewParallel.FakeCondition)(self.tm_lock) # The queue of tasks that have completed execution. The # next thread to obtain `tm_lock`` will retire them. - self.results_queue_lock = (threading.Lock if self.num_workers > 0 else NewParallel.FakeLock)() + self.results_queue_lock = (threading.Lock if self.num_workers > 1 else NewParallel.FakeLock)() self.results_queue = [] if self.taskmaster.trace: @@ -519,10 +519,13 @@ class NewParallel: # print('%-15s %s' % (method_name, message)) def start(self) -> None: - self._start_workers() - for worker in self.workers: - worker.join() - self.workers = [] + if self.num_workers == 1: + self._work() + else: + self._start_workers() + for worker in self.workers: + worker.join() + self.workers = [] self.taskmaster.cleanup() def _start_workers(self) -> None: @@ -530,7 +533,6 @@ class NewParallel: for _ in range(self.num_workers): self.workers.append(NewParallel.Worker(self)) self._restore_stack_size(prev_size) - self._work() def _adjust_stack_size(self): try: -- cgit v0.12 From 2b4dafee6be6834deb04484d4e6ff23355cee627 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Fri, 19 Jan 2024 15:35:11 -0500 Subject: fix experimental option test --- test/option/option--experimental.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/option/option--experimental.py b/test/option/option--experimental.py index 2ad189e..2929eeb 100755 --- a/test/option/option--experimental.py +++ b/test/option/option--experimental.py @@ -37,7 +37,7 @@ tests = [ ('.', []), ('--experimental=ninja', ['ninja']), ('--experimental=legacysched', ['legacysched']), - ('--experimental=all', ['ninja', 'legacysched', 'transporter', 'warp_speed']), + ('--experimental=all', ['legacysched', 'ninja', 'transporter', 'warp_speed']), ('--experimental=none', []), ] @@ -51,7 +51,7 @@ Experimental=%s test.run(arguments='--experimental=warp_drive', stderr="""usage: scons [OPTIONS] [VARIABLES] [TARGETS] -SCons Error: option --experimental: invalid choice: 'warp_drive' (choose from 'all','none','legacysched',ninja','transporter','warp_speed') +SCons Error: option --experimental: invalid choice: 'warp_drive' (choose from 'all','none','legacysched','ninja','transporter','warp_speed') """, status=2) -- cgit v0.12 From d932ba3700305f82ffafba6137da99b3145b82d2 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Fri, 19 Jan 2024 16:15:55 -0500 Subject: Fix JobTests.py --- SCons/Taskmaster/JobTests.py | 45 +------------------------------------------- 1 file changed, 1 insertion(+), 44 deletions(-) diff --git a/SCons/Taskmaster/JobTests.py b/SCons/Taskmaster/JobTests.py index c457e78..52118d9 100644 --- a/SCons/Taskmaster/JobTests.py +++ b/SCons/Taskmaster/JobTests.py @@ -202,6 +202,7 @@ class Taskmaster: self.parallel_list = [0] * (n+1) self.found_parallel = False self.Task = Task + self.trace = False # 'guard' guards 'task_begin_list' and 'task_end_list' try: @@ -284,50 +285,6 @@ class ParallelTestCase(JobTestCase): self.assertFalse(taskmaster.num_failed, "some task(s) failed to execute") - # Verify that parallel jobs will pull all of the completed tasks - # out of the queue at once, instead of one by one. We do this by - # replacing the default ThreadPool class with one that records the - # order in which tasks are put() and get() to/from the pool, and - # which sleeps a little bit before call get() to let the initial - # tasks complete and get their notifications on the resultsQueue. - - class SleepTask(Task): - def _do_something(self) -> None: - time.sleep(0.01) - - global SaveThreadPool - SaveThreadPool = SCons.Taskmaster.Job.ThreadPool - - class WaitThreadPool(SaveThreadPool): - def put(self, task): - ThreadPoolCallList.append('put(%s)' % task.i) - return SaveThreadPool.put(self, task) - def get(self): - time.sleep(0.05) - result = SaveThreadPool.get(self) - ThreadPoolCallList.append('get(%s)' % result[0].i) - return result - - SCons.Taskmaster.Job.ThreadPool = WaitThreadPool - - try: - taskmaster = Taskmaster(3, self, SleepTask) - jobs = SCons.Taskmaster.Job.Jobs(2, taskmaster) - jobs.run() - - # The key here is that we get(1) and get(2) from the - # resultsQueue before we put(3), but get(1) and get(2) can - # be in either order depending on how the first two parallel - # tasks get scheduled by the operating system. - expect = [ - ['put(1)', 'put(2)', 'get(1)', 'get(2)', 'put(3)', 'get(3)'], - ['put(1)', 'put(2)', 'get(2)', 'get(1)', 'put(3)', 'get(3)'], - ] - assert ThreadPoolCallList in expect, ThreadPoolCallList - - finally: - SCons.Taskmaster.Job.ThreadPool = SaveThreadPool - class SerialTestCase(unittest.TestCase): def runTest(self) -> None: """test a serial job""" -- cgit v0.12 From e4e128a70ea7659b95c5edcc713436683d9a2cca Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Fri, 19 Jan 2024 16:32:21 -0500 Subject: SConf failed should use cached exception info --- SCons/SConf.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/SCons/SConf.py b/SCons/SConf.py index 53666e6..2bc68f1 100644 --- a/SCons/SConf.py +++ b/SCons/SConf.py @@ -253,8 +253,7 @@ class SConfBuildTask(SCons.Taskmaster.AlwaysTask): # ConfigureCacheError and if yes, reraise the exception exc_type = self.exc_info()[0] if issubclass(exc_type, SConfError): - # TODO pylint E0704: bare raise not inside except - raise + raise self.exc_info()[1] elif issubclass(exc_type, SCons.Errors.BuildError): # we ignore Build Errors (occurs, when a test doesn't pass) # Clear the exception to prevent the contained traceback -- cgit v0.12 From 3ced491dc5ff8871ef2a72f575f6c44da42b7409 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Sat, 20 Jan 2024 17:18:51 -0500 Subject: review feedback: legacysched to legacy_sched --- CHANGES.txt | 2 +- SCons/Script/SConsOptions.py | 2 +- SCons/Taskmaster/Job.py | 4 ++-- SCons/Taskmaster/JobTests.py | 4 ++-- doc/man/scons.xml | 2 +- test/option/option--experimental.py | 8 ++++---- test/option/taskmastertrace.py | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index cee9947..d0a8593 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -63,7 +63,7 @@ RELEASE VERSION/DATE TO BE FILLED IN LATER From Andrew Morrow: - The NewParallel scheduler is now the default, the `tm_v2` flag is removed, - and the old scheduler is opt-in under `--experimental=legacysched`. Additionally, + and the old scheduler is opt-in under `--experimental=legacy_sched`. Additionally, the new scheduler is now used for -j1 builds as well. RELEASE 4.6.0 - Sun, 19 Nov 2023 17:22:20 -0700 diff --git a/SCons/Script/SConsOptions.py b/SCons/Script/SConsOptions.py index 25675f3..18fe0e4 100644 --- a/SCons/Script/SConsOptions.py +++ b/SCons/Script/SConsOptions.py @@ -40,7 +40,7 @@ SUPPRESS_HELP = optparse.SUPPRESS_HELP diskcheck_all = SCons.Node.FS.diskcheck_types() -experimental_features = {'warp_speed', 'transporter', 'ninja', 'legacysched'} +experimental_features = {'warp_speed', 'transporter', 'ninja', 'legacy_sched'} def diskcheck_convert(value): diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 60992be..3bcc803 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -74,7 +74,7 @@ class Jobs: def __init__(self, num, taskmaster) -> None: """ Create 'num' jobs using the given taskmaster. The exact implementation - used varies with the number of jobs requested and the state of the `legacysched` flag + used varies with the number of jobs requested and the state of the `legacy_sched` flag to `--experimental`. """ @@ -88,7 +88,7 @@ class Jobs: stack_size = default_stack_size experimental_option = GetOption('experimental') or [] - if 'legacysched' in experimental_option: + if 'legacy_sched' in experimental_option: if num > 1: self.job = LegacyParallel(taskmaster, num, stack_size) else: diff --git a/SCons/Taskmaster/JobTests.py b/SCons/Taskmaster/JobTests.py index 52118d9..3743866 100644 --- a/SCons/Taskmaster/JobTests.py +++ b/SCons/Taskmaster/JobTests.py @@ -483,7 +483,7 @@ class SerialTaskTest(_SConsTaskTest): self._test_seq(1) # Now run test with LegacyParallel - OptionsParser.values.experimental=['legacysched'] + OptionsParser.values.experimental=['legacy_sched'] self._test_seq(1) class ParallelTaskTest(_SConsTaskTest): @@ -492,7 +492,7 @@ class ParallelTaskTest(_SConsTaskTest): self._test_seq(num_jobs) # Now run test with LegacyParallel - OptionsParser.values.experimental=['legacysched'] + OptionsParser.values.experimental=['legacy_sched'] self._test_seq(num_jobs) diff --git a/doc/man/scons.xml b/doc/man/scons.xml index 9432dbe..6808cd2 100644 --- a/doc/man/scons.xml +++ b/doc/man/scons.xml @@ -1212,7 +1212,7 @@ the mechanisms in the specified order. The default setting is none. Current available features are: ninja (New in version 4.2), - legacysched (New in version 4.6.0). + legacy_sched (New in version 4.6.0). No Support offered for any features or tools enabled by this flag. diff --git a/test/option/option--experimental.py b/test/option/option--experimental.py index 2929eeb..da0f0a5 100755 --- a/test/option/option--experimental.py +++ b/test/option/option--experimental.py @@ -36,13 +36,13 @@ test.file_fixture('fixture/SConstruct__experimental', 'SConstruct') tests = [ ('.', []), ('--experimental=ninja', ['ninja']), - ('--experimental=legacysched', ['legacysched']), - ('--experimental=all', ['legacysched', 'ninja', 'transporter', 'warp_speed']), + ('--experimental=legacy_sched', ['legacy_sched']), + ('--experimental=all', ['legacy_sched', 'ninja', 'transporter', 'warp_speed']), ('--experimental=none', []), ] for args, exper in tests: - read_string = """All Features=legacysched,ninja,transporter,warp_speed + read_string = """All Features=legacy_sched,ninja,transporter,warp_speed Experimental=%s """ % (exper) test.run(arguments=args, @@ -51,7 +51,7 @@ Experimental=%s test.run(arguments='--experimental=warp_drive', stderr="""usage: scons [OPTIONS] [VARIABLES] [TARGETS] -SCons Error: option --experimental: invalid choice: 'warp_drive' (choose from 'all','none','legacysched','ninja','transporter','warp_speed') +SCons Error: option --experimental: invalid choice: 'warp_drive' (choose from 'all','none','legacy_sched','ninja','transporter','warp_speed') """, status=2) diff --git a/test/option/taskmastertrace.py b/test/option/taskmastertrace.py index 9ff3d37..25d4e2f 100755 --- a/test/option/taskmastertrace.py +++ b/test/option/taskmastertrace.py @@ -52,7 +52,7 @@ Copy("Tfile.out", "Tfile.mid") """) # Test LegacyParallel Job implementation -test.run(arguments='--experimental=legacysched --taskmastertrace=trace.out .', stdout=expect_stdout) +test.run(arguments='--experimental=legacy_sched --taskmastertrace=trace.out .', stdout=expect_stdout) test.must_match_file('trace.out', 'taskmaster_expected_file_1.txt', mode='r') # Test NewParallel Job implementation -- cgit v0.12 From dba62cc5bd5100161ca054c8232763ac4cab2d78 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Sat, 20 Jan 2024 17:20:24 -0500 Subject: review feedback: CHANGES.txt ordering --- CHANGES.txt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index d0a8593..19b9b0c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -56,16 +56,16 @@ RELEASE VERSION/DATE TO BE FILLED IN LATER From Prabhu S. Khalsa: - Fix typo in user documentation (issue #4458) - From Mats Wichmann: - - Add support for Python 3.13 (as of alpha 2). So far only affects - expected bytecodes in ActionTests.py. - - From Andrew Morrow: - The NewParallel scheduler is now the default, the `tm_v2` flag is removed, and the old scheduler is opt-in under `--experimental=legacy_sched`. Additionally, the new scheduler is now used for -j1 builds as well. + From Mats Wichmann: + - Add support for Python 3.13 (as of alpha 2). So far only affects + expected bytecodes in ActionTests.py. + + RELEASE 4.6.0 - Sun, 19 Nov 2023 17:22:20 -0700 From Max Bachmann: -- cgit v0.12 From 82cddcfd392efa0f50809e6719ceaaa802aecb25 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Tue, 23 Jan 2024 14:03:10 -0500 Subject: taskmastertrace tests passing --- test/Interactive/taskmastertrace.py | 2 +- test/option/fixture/taskmaster_expected_new_parallel.txt | 2 +- test/option/taskmastertrace.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/Interactive/taskmastertrace.py b/test/Interactive/taskmastertrace.py index 04e95fd..381005a 100644 --- a/test/Interactive/taskmastertrace.py +++ b/test/Interactive/taskmastertrace.py @@ -43,7 +43,7 @@ Command('2', [], Touch('$TARGET')) test.write('foo.in', "foo.in 1\n") -scons = test.start(arguments = '-Q --interactive') +scons = test.start(arguments = '-Q --interactive --experimental=legacy_sched') scons.send("build foo.out 1\n") diff --git a/test/option/fixture/taskmaster_expected_new_parallel.txt b/test/option/fixture/taskmaster_expected_new_parallel.txt index 77334d9..071c8ce 100644 --- a/test/option/fixture/taskmaster_expected_new_parallel.txt +++ b/test/option/fixture/taskmaster_expected_new_parallel.txt @@ -1,6 +1,6 @@ Job.NewParallel._work(): [Thread:XXXXX] Gained exclusive access Job.NewParallel._work(): [Thread:XXXXX] Starting search -Job.NewParallel._work(): [Thread:XXXXX] Found {len(results_queue)} completed tasks to process +Job.NewParallel._work(): [Thread:XXXXX] Found 0 completed tasks to process Job.NewParallel._work(): [Thread:XXXXX] Searching for new tasks Taskmaster: Looking for a node to evaluate diff --git a/test/option/taskmastertrace.py b/test/option/taskmastertrace.py index 25d4e2f..35ab6fc 100755 --- a/test/option/taskmastertrace.py +++ b/test/option/taskmastertrace.py @@ -42,7 +42,7 @@ test.write('Tfile.in', "Tfile.in\n") expect_stdout = test.wrap_stdout(test.read('taskmaster_expected_stdout_1.txt', mode='r')) -test.run(arguments='--taskmastertrace=- .', stdout=expect_stdout) +test.run(arguments='--experimental=legacy_sched --taskmastertrace=- .', stdout=expect_stdout) test.run(arguments='-c .') -- cgit v0.12 From 42ddab297309085cbfe104ddbcd32e474796b830 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Tue, 23 Jan 2024 16:31:07 -0500 Subject: destructure rather than repeatedly accessing --- SCons/SConf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/SCons/SConf.py b/SCons/SConf.py index 2bc68f1..d2e09be 100644 --- a/SCons/SConf.py +++ b/SCons/SConf.py @@ -251,9 +251,9 @@ class SConfBuildTask(SCons.Taskmaster.AlwaysTask): def failed(self): # check, if the reason was a ConfigureDryRunError or a # ConfigureCacheError and if yes, reraise the exception - exc_type = self.exc_info()[0] + exc_type, exc, _ = self.exc_info() if issubclass(exc_type, SConfError): - raise self.exc_info()[1] + raise exc elif issubclass(exc_type, SCons.Errors.BuildError): # we ignore Build Errors (occurs, when a test doesn't pass) # Clear the exception to prevent the contained traceback -- cgit v0.12 From c467ec23947e75ab16fe100913d8625c42a99ab6 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Wed, 24 Jan 2024 11:03:10 -0500 Subject: Partial Revert "Fix JobTests.py" This reverts commit d932ba3700305f82ffafba6137da99b3145b82d2. --- SCons/Taskmaster/JobTests.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/SCons/Taskmaster/JobTests.py b/SCons/Taskmaster/JobTests.py index 3743866..9e76ae0 100644 --- a/SCons/Taskmaster/JobTests.py +++ b/SCons/Taskmaster/JobTests.py @@ -285,6 +285,50 @@ class ParallelTestCase(JobTestCase): self.assertFalse(taskmaster.num_failed, "some task(s) failed to execute") + # Verify that parallel jobs will pull all of the completed tasks + # out of the queue at once, instead of one by one. We do this by + # replacing the default ThreadPool class with one that records the + # order in which tasks are put() and get() to/from the pool, and + # which sleeps a little bit before call get() to let the initial + # tasks complete and get their notifications on the resultsQueue. + + class SleepTask(Task): + def _do_something(self) -> None: + time.sleep(0.01) + + global SaveThreadPool + SaveThreadPool = SCons.Taskmaster.Job.ThreadPool + + class WaitThreadPool(SaveThreadPool): + def put(self, task): + ThreadPoolCallList.append('put(%s)' % task.i) + return SaveThreadPool.put(self, task) + def get(self): + time.sleep(0.05) + result = SaveThreadPool.get(self) + ThreadPoolCallList.append('get(%s)' % result[0].i) + return result + + SCons.Taskmaster.Job.ThreadPool = WaitThreadPool + + try: + taskmaster = Taskmaster(3, self, SleepTask) + jobs = SCons.Taskmaster.Job.Jobs(2, taskmaster) + jobs.run() + + # The key here is that we get(1) and get(2) from the + # resultsQueue before we put(3), but get(1) and get(2) can + # be in either order depending on how the first two parallel + # tasks get scheduled by the operating system. + expect = [ + ['put(1)', 'put(2)', 'get(1)', 'get(2)', 'put(3)', 'get(3)'], + ['put(1)', 'put(2)', 'get(2)', 'get(1)', 'put(3)', 'get(3)'], + ] + assert ThreadPoolCallList in expect, ThreadPoolCallList + + finally: + SCons.Taskmaster.Job.ThreadPool = SaveThreadPool + class SerialTestCase(unittest.TestCase): def runTest(self) -> None: """test a serial job""" -- cgit v0.12 From a3ac1ddd1452f309d8986a0574c3980c487bd904 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Wed, 24 Jan 2024 11:22:36 -0500 Subject: Run legacy parallel test case in legacy sched mode --- SCons/Taskmaster/JobTests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/SCons/Taskmaster/JobTests.py b/SCons/Taskmaster/JobTests.py index 9e76ae0..ce2b3db 100644 --- a/SCons/Taskmaster/JobTests.py +++ b/SCons/Taskmaster/JobTests.py @@ -313,7 +313,9 @@ class ParallelTestCase(JobTestCase): try: taskmaster = Taskmaster(3, self, SleepTask) + OptionsParser.values.experimental.append('legacy_sched') jobs = SCons.Taskmaster.Job.Jobs(2, taskmaster) + OptionsParser.values.experimental.pop() jobs.run() # The key here is that we get(1) and get(2) from the -- cgit v0.12 From ca9b434ffe7c454771da958a67302f009a66a670 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 25 Jan 2024 20:51:45 -0500 Subject: Disallow python without threading support on startup --- SCons/Script/Main.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/SCons/Script/Main.py b/SCons/Script/Main.py index c29fb38..af7e3ff 100644 --- a/SCons/Script/Main.py +++ b/SCons/Script/Main.py @@ -1447,6 +1447,13 @@ def main() -> None: sys.stderr.write("scons: *** Minimum Python version is %d.%d.%d\n" %minimum_python_version) sys.exit(1) + try: + import threading + except ImportError: + msg = "scons: *** SCons version %s requires a Python interpreter with support for the `threading` package" + sys.stderr.write(msg % SConsVersion) + sys.exit(1) + parts = ["SCons by Steven Knight et al.:\n"] try: import SCons -- cgit v0.12 From 36b6ab73bd3597694d4edf9117d67f826280ed40 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 25 Jan 2024 21:17:39 -0500 Subject: note python threading changes --- CHANGES.txt | 5 +++++ RELEASE.txt | 1 + 2 files changed, 6 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index 19b9b0c..dfa71af 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -60,6 +60,11 @@ RELEASE VERSION/DATE TO BE FILLED IN LATER - The NewParallel scheduler is now the default, the `tm_v2` flag is removed, and the old scheduler is opt-in under `--experimental=legacy_sched`. Additionally, the new scheduler is now used for -j1 builds as well. + - A python interpreter with support for the `threading` package is now required, + and this is enforced on startup. SCons currently sets its minimum supported + Python to 3.6, and it was not until Python 3.7 where `threading` became + default supported. In practice, we expect most real world Python 3.6 deployments + will have `threading` support enabled, so this will not be an issue. From Mats Wichmann: - Add support for Python 3.13 (as of alpha 2). So far only affects diff --git a/RELEASE.txt b/RELEASE.txt index 22922d5..b31916a 100644 --- a/RELEASE.txt +++ b/RELEASE.txt @@ -33,6 +33,7 @@ CHANGED/ENHANCED EXISTING FUNCTIONALITY that the generated function argument list matches the function's prototype when including a header file. Fixes GH Issue #4320 - Now supports pre-release Python 3.13 +- Support for Python versions without support for the `threading` package has been removed FIXES ----- -- cgit v0.12