summaryrefslogtreecommitdiffstats
path: root/SCons/Taskmaster
diff options
context:
space:
mode:
authorAndrew Morrow <andrew.morrow@viam.com>2023-12-14 15:49:22 (GMT)
committerAndrew Morrow <andrew.morrow@viam.com>2024-01-19 18:46:34 (GMT)
commit8c8052c95d4645a584b4332e11aabe0843446678 (patch)
tree0e43797d83d55c9088ddab18d77eb16320e17fa4 /SCons/Taskmaster
parentc452f92126499dd213ac1593791212cc73ecb50a (diff)
downloadSCons-8c8052c95d4645a584b4332e11aabe0843446678.zip
SCons-8c8052c95d4645a584b4332e11aabe0843446678.tar.gz
SCons-8c8052c95d4645a584b4332e11aabe0843446678.tar.bz2
threading is always available
Diffstat (limited to 'SCons/Taskmaster')
-rw-r--r--SCons/Taskmaster/Job.py909
1 files changed, 449 insertions, 460 deletions
diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py
index 572464b..b73c941 100644
--- a/SCons/Taskmaster/Job.py
+++ b/SCons/Taskmaster/Job.py
@@ -31,6 +31,7 @@ import SCons.compat
import logging
import os
+import queue
import signal
import sys
import threading
@@ -95,16 +96,13 @@ class Jobs:
if stack_size is None:
stack_size = default_stack_size
- try:
- experimental_option = GetOption('experimental')
- if 'tm_v2' in experimental_option:
- self.job = NewParallel(taskmaster, num, stack_size)
- else:
- self.job = LegacyParallel(taskmaster, num, stack_size)
+ experimental_option = GetOption('experimental')
+ if 'tm_v2' in experimental_option:
+ self.job = NewParallel(taskmaster, num, stack_size)
+ else:
+ self.job = LegacyParallel(taskmaster, num, stack_size)
- self.num_jobs = num
- except NameError:
- pass
+ self.num_jobs = num
if self.job is None:
self.job = Serial(taskmaster)
self.num_jobs = 1
@@ -239,505 +237,496 @@ class Serial:
self.taskmaster.cleanup()
-# Trap import failure so that everything in the Job module but the
-# Parallel class (and its dependent classes) will work if the interpreter
-# doesn't support threads.
-try:
- import queue
- import threading
-except ImportError:
- pass
-else:
- class Worker(threading.Thread):
- """A worker thread waits on a task to be posted to its request queue,
- dequeues the task, executes it, and posts a tuple including the task
- and a boolean indicating whether the task executed successfully. """
+class Worker(threading.Thread):
+ """A worker thread waits on a task to be posted to its request queue,
+ dequeues the task, executes it, and posts a tuple including the task
+ and a boolean indicating whether the task executed successfully. """
- def __init__(self, requestQueue, resultsQueue, interrupted) -> None:
- super().__init__()
- self.daemon = True
- self.requestQueue = requestQueue
- self.resultsQueue = resultsQueue
- self.interrupted = interrupted
- self.start()
+ def __init__(self, requestQueue, resultsQueue, interrupted) -> None:
+ super().__init__()
+ self.daemon = True
+ self.requestQueue = requestQueue
+ self.resultsQueue = resultsQueue
+ self.interrupted = interrupted
+ self.start()
- def run(self):
- while True:
- task = self.requestQueue.get()
+ def run(self):
+ while True:
+ task = self.requestQueue.get()
- if task is None:
- # The "None" value is used as a sentinel by
- # ThreadPool.cleanup(). This indicates that there
- # are no more tasks, so we should quit.
- break
+ if task is None:
+ # The "None" value is used as a sentinel by
+ # ThreadPool.cleanup(). This indicates that there
+ # are no more tasks, so we should quit.
+ break
- try:
- if self.interrupted():
- raise SCons.Errors.BuildError(
- task.targets[0], errstr=interrupt_msg)
- task.execute()
- except Exception:
- task.exception_set()
- ok = False
- else:
- ok = True
+ try:
+ if self.interrupted():
+ raise SCons.Errors.BuildError(
+ task.targets[0], errstr=interrupt_msg)
+ task.execute()
+ except Exception:
+ task.exception_set()
+ ok = False
+ else:
+ ok = True
- self.resultsQueue.put((task, ok))
+ self.resultsQueue.put((task, ok))
- class ThreadPool:
- """This class is responsible for spawning and managing worker threads."""
+class ThreadPool:
+ """This class is responsible for spawning and managing worker threads."""
- def __init__(self, num, stack_size, interrupted) -> None:
- """Create the request and reply queues, and 'num' worker threads.
+ def __init__(self, num, stack_size, interrupted) -> None:
+ """Create the request and reply queues, and 'num' worker threads.
- One must specify the stack size of the worker threads. The
- stack size is specified in kilobytes.
- """
- self.requestQueue = queue.Queue(0)
- self.resultsQueue = queue.Queue(0)
+ One must specify the stack size of the worker threads. The
+ stack size is specified in kilobytes.
+ """
+ self.requestQueue = queue.Queue(0)
+ self.resultsQueue = queue.Queue(0)
- try:
- prev_size = threading.stack_size(stack_size * 1024)
- except AttributeError as e:
- # Only print a warning if the stack size has been
- # explicitly set.
- if explicit_stack_size is not None:
- msg = "Setting stack size is unsupported by this version of Python:\n " + \
- e.args[0]
- SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
- except ValueError as e:
- msg = "Setting stack size failed:\n " + str(e)
+ try:
+ prev_size = threading.stack_size(stack_size * 1024)
+ except AttributeError as e:
+ # Only print a warning if the stack size has been
+ # explicitly set.
+ if explicit_stack_size is not None:
+ msg = "Setting stack size is unsupported by this version of Python:\n " + \
+ e.args[0]
SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
+ except ValueError as e:
+ msg = "Setting stack size failed:\n " + str(e)
+ SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
+
+ # Create worker threads
+ self.workers = []
+ for _ in range(num):
+ worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
+ self.workers.append(worker)
+
+ if 'prev_size' in locals():
+ threading.stack_size(prev_size)
+
+ def put(self, task) -> None:
+ """Put task into request queue."""
+ self.requestQueue.put(task)
+
+ def get(self):
+ """Remove and return a result tuple from the results queue."""
+ return self.resultsQueue.get()
- # Create worker threads
- self.workers = []
- for _ in range(num):
- worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
- self.workers.append(worker)
-
- if 'prev_size' in locals():
- threading.stack_size(prev_size)
-
- def put(self, task) -> None:
- """Put task into request queue."""
- self.requestQueue.put(task)
-
- def get(self):
- """Remove and return a result tuple from the results queue."""
- return self.resultsQueue.get()
-
- def preparation_failed(self, task) -> None:
- self.resultsQueue.put((task, False))
-
- def cleanup(self) -> None:
- """
- Shuts down the thread pool, giving each worker thread a
- chance to shut down gracefully.
- """
- # For each worker thread, put a sentinel "None" value
- # on the requestQueue (indicating that there's no work
- # to be done) so that each worker thread will get one and
- # terminate gracefully.
- for _ in self.workers:
- self.requestQueue.put(None)
-
- # Wait for all of the workers to terminate.
- #
- # If we don't do this, later Python versions (2.4, 2.5) often
- # seem to raise exceptions during shutdown. This happens
- # in requestQueue.get(), as an assertion failure that
- # requestQueue.not_full is notified while not acquired,
- # seemingly because the main thread has shut down (or is
- # in the process of doing so) while the workers are still
- # trying to pull sentinels off the requestQueue.
- #
- # Normally these terminations should happen fairly quickly,
- # but we'll stick a one-second timeout on here just in case
- # someone gets hung.
- for worker in self.workers:
- worker.join(1.0)
- self.workers = []
-
- class LegacyParallel:
- """This class is used to execute tasks in parallel, and is somewhat
- less efficient than Serial, but is appropriate for parallel builds.
-
- This class is thread safe.
+ def preparation_failed(self, task) -> None:
+ self.resultsQueue.put((task, False))
+
+ def cleanup(self) -> None:
+ """
+ Shuts down the thread pool, giving each worker thread a
+ chance to shut down gracefully.
"""
+ # For each worker thread, put a sentinel "None" value
+ # on the requestQueue (indicating that there's no work
+ # to be done) so that each worker thread will get one and
+ # terminate gracefully.
+ for _ in self.workers:
+ self.requestQueue.put(None)
+
+ # Wait for all of the workers to terminate.
+ #
+ # If we don't do this, later Python versions (2.4, 2.5) often
+ # seem to raise exceptions during shutdown. This happens
+ # in requestQueue.get(), as an assertion failure that
+ # requestQueue.not_full is notified while not acquired,
+ # seemingly because the main thread has shut down (or is
+ # in the process of doing so) while the workers are still
+ # trying to pull sentinels off the requestQueue.
+ #
+ # Normally these terminations should happen fairly quickly,
+ # but we'll stick a one-second timeout on here just in case
+ # someone gets hung.
+ for worker in self.workers:
+ worker.join(1.0)
+ self.workers = []
+
+class LegacyParallel:
+ """This class is used to execute tasks in parallel, and is somewhat
+ less efficient than Serial, but is appropriate for parallel builds.
+
+ This class is thread safe.
+ """
- def __init__(self, taskmaster, num, stack_size) -> None:
- """Create a new parallel job given a taskmaster.
+ def __init__(self, taskmaster, num, stack_size) -> None:
+ """Create a new parallel job given a taskmaster.
- The taskmaster's next_task() method should return the next
- task that needs to be executed, or None if there are no more
- tasks. The taskmaster's executed() method will be called
- for each task when it is successfully executed, or failed()
- will be called if the task failed to execute (i.e. execute()
- raised an exception).
+ The taskmaster's next_task() method should return the next
+ task that needs to be executed, or None if there are no more
+ tasks. The taskmaster's executed() method will be called
+ for each task when it is successfully executed, or failed()
+ will be called if the task failed to execute (i.e. execute()
+ raised an exception).
- Note: calls to taskmaster are serialized, but calls to
- execute() on distinct tasks are not serialized, because
- that is the whole point of parallel jobs: they can execute
- multiple tasks simultaneously. """
+ Note: calls to taskmaster are serialized, but calls to
+ execute() on distinct tasks are not serialized, because
+ that is the whole point of parallel jobs: they can execute
+ multiple tasks simultaneously. """
- self.taskmaster = taskmaster
- self.interrupted = InterruptState()
- self.tp = ThreadPool(num, stack_size, self.interrupted)
+ self.taskmaster = taskmaster
+ self.interrupted = InterruptState()
+ self.tp = ThreadPool(num, stack_size, self.interrupted)
- self.maxjobs = num
+ self.maxjobs = num
- def start(self):
- """Start the job. This will begin pulling tasks from the
- taskmaster and executing them, and return when there are no
- more tasks. If a task fails to execute (i.e. execute() raises
- an exception), then the job will stop."""
+ def start(self):
+ """Start the job. This will begin pulling tasks from the
+ taskmaster and executing them, and return when there are no
+ more tasks. If a task fails to execute (i.e. execute() raises
+ an exception), then the job will stop."""
- jobs = 0
+ jobs = 0
- while True:
- # Start up as many available tasks as we're
- # allowed to.
- while jobs < self.maxjobs:
- task = self.taskmaster.next_task()
- if task is None:
- break
+ while True:
+ # Start up as many available tasks as we're
+ # allowed to.
+ while jobs < self.maxjobs:
+ task = self.taskmaster.next_task()
+ if task is None:
+ break
- try:
- # prepare task for execution
- task.prepare()
- except Exception:
- task.exception_set()
- task.failed()
- task.postprocess()
+ try:
+ # prepare task for execution
+ task.prepare()
+ except Exception:
+ task.exception_set()
+ task.failed()
+ task.postprocess()
+ else:
+ if task.needs_execute():
+ # dispatch task
+ self.tp.put(task)
+ jobs += 1
else:
- if task.needs_execute():
- # dispatch task
- self.tp.put(task)
- jobs += 1
- else:
- task.executed()
- task.postprocess()
+ task.executed()
+ task.postprocess()
+
+ if not task and not jobs:
+ break
+
+ # Let any/all completed tasks finish up before we go
+ # back and put the next batch of tasks on the queue.
+ while True:
+ task, ok = self.tp.get()
+ jobs -= 1
+
+ if ok:
+ task.executed()
+ else:
+ if self.interrupted():
+ try:
+ raise SCons.Errors.BuildError(
+ task.targets[0], errstr=interrupt_msg)
+ except Exception:
+ task.exception_set()
- if not task and not jobs:
+ # Let the failed() callback function arrange
+ # for the build to stop if that's appropriate.
+ task.failed()
+
+ task.postprocess()
+
+ if self.tp.resultsQueue.empty():
break
- # Let any/all completed tasks finish up before we go
- # back and put the next batch of tasks on the queue.
- while True:
- task, ok = self.tp.get()
- jobs -= 1
+ self.tp.cleanup()
+ self.taskmaster.cleanup()
- if ok:
- task.executed()
- else:
- if self.interrupted():
- try:
- raise SCons.Errors.BuildError(
- task.targets[0], errstr=interrupt_msg)
- except Exception:
- task.exception_set()
+# An experimental new parallel scheduler that uses a leaders/followers pattern.
+class NewParallel:
- # Let the failed() callback function arrange
- # for the build to stop if that's appropriate.
- task.failed()
+ class State(Enum):
+ READY = 0
+ SEARCHING = 1
+ STALLED = 2
+ COMPLETED = 3
- task.postprocess()
+ class Worker(threading.Thread):
+ def __init__(self, owner) -> None:
+ super().__init__()
+ self.daemon = True
+ self.owner = owner
+ self.start()
- if self.tp.resultsQueue.empty():
- break
-
- self.tp.cleanup()
- self.taskmaster.cleanup()
-
- # An experimental new parallel scheduler that uses a leaders/followers pattern.
- class NewParallel:
-
- class State(Enum):
- READY = 0
- SEARCHING = 1
- STALLED = 2
- COMPLETED = 3
-
- class Worker(threading.Thread):
- def __init__(self, owner) -> None:
- super().__init__()
- self.daemon = True
- self.owner = owner
- self.start()
-
- def run(self) -> None:
- self.owner._work()
-
- def __init__(self, taskmaster, num, stack_size) -> None:
- self.taskmaster = taskmaster
- self.num_workers = num
- self.stack_size = stack_size
- self.interrupted = InterruptState()
- self.workers = []
-
- # The `tm_lock` is what ensures that we only have one
- # thread interacting with the taskmaster at a time. It
- # also protects access to our state that gets updated
- # concurrently. The `can_search_cv` is associated with
- # this mutex.
- self.tm_lock = threading.Lock()
-
- # Guarded under `tm_lock`.
- self.jobs = 0
- self.state = NewParallel.State.READY
-
- # The `can_search_cv` is used to manage a leader /
- # follower pattern for access to the taskmaster, and to
- # awaken from stalls.
- self.can_search_cv = threading.Condition(self.tm_lock)
-
- # The queue of tasks that have completed execution. The
- # next thread to obtain `tm_lock`` will retire them.
- self.results_queue_lock = threading.Lock()
- self.results_queue = []
-
- if self.taskmaster.trace:
- self.trace = self._setup_logging()
- else:
- self.trace = False
-
- def _setup_logging(self):
- jl = logging.getLogger("Job")
- jl.setLevel(level=logging.DEBUG)
- jl.addHandler(self.taskmaster.trace.log_handler)
- return jl
-
- def trace_message(self, message) -> None:
- # This grabs the name of the function which calls trace_message()
- method_name = sys._getframe(1).f_code.co_name + "():"
- thread_id=threading.get_ident()
- self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message))
- # print('%-15s %s' % (method_name, message))
-
- def start(self) -> None:
- self._start_workers()
- for worker in self.workers:
- worker.join()
- self.workers = []
- self.taskmaster.cleanup()
-
- def _start_workers(self) -> None:
- prev_size = self._adjust_stack_size()
- for _ in range(self.num_workers):
- self.workers.append(NewParallel.Worker(self))
- self._restore_stack_size(prev_size)
-
- def _adjust_stack_size(self):
- try:
- prev_size = threading.stack_size(self.stack_size * 1024)
- return prev_size
- except AttributeError as e:
- # Only print a warning if the stack size has been
- # explicitly set.
- if explicit_stack_size is not None:
- msg = "Setting stack size is unsupported by this version of Python:\n " + \
- e.args[0]
- SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
- except ValueError as e:
- msg = "Setting stack size failed:\n " + str(e)
- SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
+ def run(self) -> None:
+ self.owner._work()
- return None
+ def __init__(self, taskmaster, num, stack_size) -> None:
+ self.taskmaster = taskmaster
+ self.num_workers = num
+ self.stack_size = stack_size
+ self.interrupted = InterruptState()
+ self.workers = []
+
+ # The `tm_lock` is what ensures that we only have one
+ # thread interacting with the taskmaster at a time. It
+ # also protects access to our state that gets updated
+ # concurrently. The `can_search_cv` is associated with
+ # this mutex.
+ self.tm_lock = threading.Lock()
+
+ # Guarded under `tm_lock`.
+ self.jobs = 0
+ self.state = NewParallel.State.READY
+
+ # The `can_search_cv` is used to manage a leader /
+ # follower pattern for access to the taskmaster, and to
+ # awaken from stalls.
+ self.can_search_cv = threading.Condition(self.tm_lock)
+
+ # The queue of tasks that have completed execution. The
+ # next thread to obtain `tm_lock`` will retire them.
+ self.results_queue_lock = threading.Lock()
+ self.results_queue = []
+
+ if self.taskmaster.trace:
+ self.trace = self._setup_logging()
+ else:
+ self.trace = False
+
+ def _setup_logging(self):
+ jl = logging.getLogger("Job")
+ jl.setLevel(level=logging.DEBUG)
+ jl.addHandler(self.taskmaster.trace.log_handler)
+ return jl
+
+ def trace_message(self, message) -> None:
+ # This grabs the name of the function which calls trace_message()
+ method_name = sys._getframe(1).f_code.co_name + "():"
+ thread_id=threading.get_ident()
+ self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message))
+ # print('%-15s %s' % (method_name, message))
+
+ def start(self) -> None:
+ self._start_workers()
+ for worker in self.workers:
+ worker.join()
+ self.workers = []
+ self.taskmaster.cleanup()
- def _restore_stack_size(self, prev_size) -> None:
- if prev_size is not None:
- threading.stack_size(prev_size)
+ def _start_workers(self) -> None:
+ prev_size = self._adjust_stack_size()
+ for _ in range(self.num_workers):
+ self.workers.append(NewParallel.Worker(self))
+ self._restore_stack_size(prev_size)
- def _work(self):
+ def _adjust_stack_size(self):
+ try:
+ prev_size = threading.stack_size(self.stack_size * 1024)
+ return prev_size
+ except AttributeError as e:
+ # Only print a warning if the stack size has been
+ # explicitly set.
+ if explicit_stack_size is not None:
+ msg = "Setting stack size is unsupported by this version of Python:\n " + \
+ e.args[0]
+ SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
+ except ValueError as e:
+ msg = "Setting stack size failed:\n " + str(e)
+ SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
- task = None
+ return None
- while True:
+ def _restore_stack_size(self, prev_size) -> None:
+ if prev_size is not None:
+ threading.stack_size(prev_size)
- # Obtain `tm_lock`, granting exclusive access to the taskmaster.
- with self.can_search_cv:
+ def _work(self):
+ task = None
+
+ while True:
+
+ # Obtain `tm_lock`, granting exclusive access to the taskmaster.
+ with self.can_search_cv:
+
+ if self.trace:
+ self.trace_message("Gained exclusive access")
+
+ # Capture whether we got here with `task` set,
+ # then drop our reference to the task as we are no
+ # longer interested in the actual object.
+ completed_task = (task is not None)
+ task = None
+
+ # We will only have `completed_task` set here if
+ # we have looped back after executing a task. If
+ # we have completed a task and find that we are
+ # stalled, we should speculatively indicate that
+ # we are no longer stalled by transitioning to the
+ # 'ready' state which will bypass the condition
+ # wait so that we immediately process the results
+ # queue and hopefully light up new
+ # work. Otherwise, stay stalled, and we will wait
+ # in the condvar. Some other thread will come back
+ # here with a completed task.
+ if self.state == NewParallel.State.STALLED and completed_task:
if self.trace:
- self.trace_message("Gained exclusive access")
-
- # Capture whether we got here with `task` set,
- # then drop our reference to the task as we are no
- # longer interested in the actual object.
- completed_task = (task is not None)
- task = None
-
- # We will only have `completed_task` set here if
- # we have looped back after executing a task. If
- # we have completed a task and find that we are
- # stalled, we should speculatively indicate that
- # we are no longer stalled by transitioning to the
- # 'ready' state which will bypass the condition
- # wait so that we immediately process the results
- # queue and hopefully light up new
- # work. Otherwise, stay stalled, and we will wait
- # in the condvar. Some other thread will come back
- # here with a completed task.
- if self.state == NewParallel.State.STALLED and completed_task:
- if self.trace:
- self.trace_message("Detected stall with completed task, bypassing wait")
- self.state = NewParallel.State.READY
-
- # Wait until we are neither searching nor stalled.
- while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED:
- if self.trace:
- self.trace_message("Search already in progress, waiting")
- self.can_search_cv.wait()
-
- # If someone set the completed flag, bail.
- if self.state == NewParallel.State.COMPLETED:
- if self.trace:
- self.trace_message("Completion detected, breaking from main loop")
- break
-
- # Set the searching flag to indicate that a thread
- # is currently in the critical section for
- # taskmaster work.
- #
+ self.trace_message("Detected stall with completed task, bypassing wait")
+ self.state = NewParallel.State.READY
+
+ # Wait until we are neither searching nor stalled.
+ while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED:
if self.trace:
- self.trace_message("Starting search")
- self.state = NewParallel.State.SEARCHING
-
- # Bulk acquire the tasks in the results queue
- # under the result queue lock, then process them
- # all outside that lock. We need to process the
- # tasks in the results queue before looking for
- # new work because we might be unable to find new
- # work if we don't.
- results_queue = []
- with self.results_queue_lock:
- results_queue, self.results_queue = self.results_queue, results_queue
+ self.trace_message("Search already in progress, waiting")
+ self.can_search_cv.wait()
+ # If someone set the completed flag, bail.
+ if self.state == NewParallel.State.COMPLETED:
if self.trace:
- self.trace_message("Found {len(results_queue)} completed tasks to process")
- for (rtask, rresult) in results_queue:
- if rresult:
- rtask.executed()
- else:
- if self.interrupted():
- try:
- raise SCons.Errors.BuildError(
- rtask.targets[0], errstr=interrupt_msg)
- except Exception:
- rtask.exception_set()
-
- # Let the failed() callback function arrange
- # for the build to stop if that's appropriate.
- rtask.failed()
-
- rtask.postprocess()
- self.jobs -= 1
-
- # We are done with any task objects that were in
- # the results queue.
- results_queue.clear()
-
- # Now, turn the crank on the taskmaster until we
- # either run out of tasks, or find a task that
- # needs execution. If we run out of tasks, go idle
- # until results arrive if jobs are pending, or
- # mark the walk as complete if not.
- while self.state == NewParallel.State.SEARCHING:
- if self.trace:
- self.trace_message("Searching for new tasks")
- task = self.taskmaster.next_task()
-
- if task:
- # We found a task. Walk it through the
- # task lifecycle. If it does not need
- # execution, just complete the task and
- # look for the next one. Otherwise,
- # indicate that we are no longer searching
- # so we can drop out of this loop, execute
- # the task outside the lock, and allow
- # another thread in to search.
+ self.trace_message("Completion detected, breaking from main loop")
+ break
+
+ # Set the searching flag to indicate that a thread
+ # is currently in the critical section for
+ # taskmaster work.
+ #
+ if self.trace:
+ self.trace_message("Starting search")
+ self.state = NewParallel.State.SEARCHING
+
+ # Bulk acquire the tasks in the results queue
+ # under the result queue lock, then process them
+ # all outside that lock. We need to process the
+ # tasks in the results queue before looking for
+ # new work because we might be unable to find new
+ # work if we don't.
+ results_queue = []
+ with self.results_queue_lock:
+ results_queue, self.results_queue = self.results_queue, results_queue
+
+ if self.trace:
+ self.trace_message("Found {len(results_queue)} completed tasks to process")
+ for (rtask, rresult) in results_queue:
+ if rresult:
+ rtask.executed()
+ else:
+ if self.interrupted():
try:
- task.prepare()
+ raise SCons.Errors.BuildError(
+ rtask.targets[0], errstr=interrupt_msg)
except Exception:
- task.exception_set()
- task.failed()
- task.postprocess()
- else:
- if not task.needs_execute():
- if self.trace:
- self.trace_message("Found internal task")
- task.executed()
- task.postprocess()
- else:
- self.jobs += 1
- if self.trace:
- self.trace_message("Found task requiring execution")
- self.state = NewParallel.State.READY
- self.can_search_cv.notify()
+ rtask.exception_set()
+
+ # Let the failed() callback function arrange
+ # for the build to stop if that's appropriate.
+ rtask.failed()
+
+ rtask.postprocess()
+ self.jobs -= 1
+
+ # We are done with any task objects that were in
+ # the results queue.
+ results_queue.clear()
+ # Now, turn the crank on the taskmaster until we
+ # either run out of tasks, or find a task that
+ # needs execution. If we run out of tasks, go idle
+ # until results arrive if jobs are pending, or
+ # mark the walk as complete if not.
+ while self.state == NewParallel.State.SEARCHING:
+ if self.trace:
+ self.trace_message("Searching for new tasks")
+ task = self.taskmaster.next_task()
+
+ if task:
+ # We found a task. Walk it through the
+ # task lifecycle. If it does not need
+ # execution, just complete the task and
+ # look for the next one. Otherwise,
+ # indicate that we are no longer searching
+ # so we can drop out of this loop, execute
+ # the task outside the lock, and allow
+ # another thread in to search.
+ try:
+ task.prepare()
+ except Exception:
+ task.exception_set()
+ task.failed()
+ task.postprocess()
else:
- # We failed to find a task, so this thread
- # cannot continue turning the taskmaster
- # crank. We must exit the loop.
- if self.jobs:
- # No task was found, but there are
- # outstanding jobs executing that
- # might unblock new tasks when they
- # complete. Transition to the stalled
- # state. We do not need a notify,
- # because we know there are threads
- # outstanding that will re-enter the
- # loop.
- #
+ if not task.needs_execute():
if self.trace:
- self.trace_message("Found no task requiring execution, but have jobs: marking stalled")
- self.state = NewParallel.State.STALLED
+ self.trace_message("Found internal task")
+ task.executed()
+ task.postprocess()
else:
- # We didn't find a task and there are
- # no jobs outstanding, so there is
- # nothing that will ever return
- # results which might unblock new
- # tasks. We can conclude that the walk
- # is complete. Update our state to
- # note completion and awaken anyone
- # sleeping on the condvar.
- #
+ self.jobs += 1
if self.trace:
- self.trace_message("Found no task requiring execution, and have no jobs: marking complete")
- self.state = NewParallel.State.COMPLETED
- self.can_search_cv.notify_all()
-
- # We no longer hold `tm_lock` here. If we have a task,
- # we can now execute it. If there are threads waiting
- # to search, one of them can now begin turning the
- # taskmaster crank in NewParallel.
- if task:
- if self.trace:
- self.trace_message("Executing task")
- ok = True
- try:
- if self.interrupted():
- raise SCons.Errors.BuildError(
- task.targets[0], errstr=interrupt_msg)
- task.execute()
- except Exception:
- ok = False
- task.exception_set()
+ self.trace_message("Found task requiring execution")
+ self.state = NewParallel.State.READY
+ self.can_search_cv.notify()
- # Grab the results queue lock and enqueue the
- # executed task and state. The next thread into
- # the searching loop will complete the
- # postprocessing work under the taskmaster lock.
- #
- if self.trace:
- self.trace_message("Enqueueing executed task results")
- with self.results_queue_lock:
- self.results_queue.append((task, ok))
-
- # Tricky state "fallthrough" here. We are going back
- # to the top of the loop, which behaves differently
- # depending on whether `task` is set. Do not perturb
- # the value of the `task` variable if you add new code
- # after this comment.
+ else:
+ # We failed to find a task, so this thread
+ # cannot continue turning the taskmaster
+ # crank. We must exit the loop.
+ if self.jobs:
+ # No task was found, but there are
+ # outstanding jobs executing that
+ # might unblock new tasks when they
+ # complete. Transition to the stalled
+ # state. We do not need a notify,
+ # because we know there are threads
+ # outstanding that will re-enter the
+ # loop.
+ #
+ if self.trace:
+ self.trace_message("Found no task requiring execution, but have jobs: marking stalled")
+ self.state = NewParallel.State.STALLED
+ else:
+ # We didn't find a task and there are
+ # no jobs outstanding, so there is
+ # nothing that will ever return
+ # results which might unblock new
+ # tasks. We can conclude that the walk
+ # is complete. Update our state to
+ # note completion and awaken anyone
+ # sleeping on the condvar.
+ #
+ if self.trace:
+ self.trace_message("Found no task requiring execution, and have no jobs: marking complete")
+ self.state = NewParallel.State.COMPLETED
+ self.can_search_cv.notify_all()
+
+ # We no longer hold `tm_lock` here. If we have a task,
+ # we can now execute it. If there are threads waiting
+ # to search, one of them can now begin turning the
+ # taskmaster crank in NewParallel.
+ if task:
+ if self.trace:
+ self.trace_message("Executing task")
+ ok = True
+ try:
+ if self.interrupted():
+ raise SCons.Errors.BuildError(
+ task.targets[0], errstr=interrupt_msg)
+ task.execute()
+ except Exception:
+ ok = False
+ task.exception_set()
+
+ # Grab the results queue lock and enqueue the
+ # executed task and state. The next thread into
+ # the searching loop will complete the
+ # postprocessing work under the taskmaster lock.
+ #
+ if self.trace:
+ self.trace_message("Enqueueing executed task results")
+ with self.results_queue_lock:
+ self.results_queue.append((task, ok))
+
+ # Tricky state "fallthrough" here. We are going back
+ # to the top of the loop, which behaves differently
+ # depending on whether `task` is set. Do not perturb
+ # the value of the `task` variable if you add new code
+ # after this comment.
# Local Variables:
# tab-width:4