From 74381f694f3155c0b051a3e368423f57d337eb9c Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Wed, 31 Jan 2024 22:36:37 -0500 Subject: Only add worker threads as necesary --- SCons/Taskmaster/Job.py | 35 ++++++---- .../fixture/taskmaster_expected_new_parallel.txt | 3 +- test/option/stack-size.py | 76 ++++++++++++---------- 3 files changed, 65 insertions(+), 49 deletions(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 3bcc803..3b5b854 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -474,7 +474,7 @@ class NewParallel: def __init__(self, taskmaster, num, stack_size) -> None: self.taskmaster = taskmaster - self.num_workers = num + self.max_workers = num self.stack_size = stack_size self.interrupted = InterruptState() self.workers = [] @@ -484,7 +484,7 @@ class NewParallel: # also protects access to our state that gets updated # concurrently. The `can_search_cv` is associated with # this mutex. - self.tm_lock = (threading.Lock if self.num_workers > 1 else NewParallel.FakeLock)() + self.tm_lock = (threading.Lock if self.max_workers > 1 else NewParallel.FakeLock)() # Guarded under `tm_lock`. self.jobs = 0 @@ -493,11 +493,11 @@ class NewParallel: # The `can_search_cv` is used to manage a leader / # follower pattern for access to the taskmaster, and to # awaken from stalls. - self.can_search_cv = (threading.Condition if self.num_workers > 1 else NewParallel.FakeCondition)(self.tm_lock) + self.can_search_cv = (threading.Condition if self.max_workers > 1 else NewParallel.FakeCondition)(self.tm_lock) # The queue of tasks that have completed execution. The # next thread to obtain `tm_lock`` will retire them. - self.results_queue_lock = (threading.Lock if self.num_workers > 1 else NewParallel.FakeLock)() + self.results_queue_lock = (threading.Lock if self.max_workers > 1 else NewParallel.FakeLock)() self.results_queue = [] if self.taskmaster.trace: @@ -516,22 +516,26 @@ class NewParallel: method_name = sys._getframe(1).f_code.co_name + "():" thread_id=threading.get_ident() self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message)) - # print('%-15s %s' % (method_name, message)) def start(self) -> None: - if self.num_workers == 1: + if self.max_workers == 1: self._work() else: - self._start_workers() - for worker in self.workers: - worker.join() - self.workers = [] + self._start_worker() + while len(self.workers) > 0: + self.workers[0].join() + self.workers.pop(0) self.taskmaster.cleanup() - def _start_workers(self) -> None: + def _maybe_start_worker(self) -> None: + if self.max_workers > 1 and len(self.workers) < self.max_workers: + self._start_worker() + + def _start_worker(self) -> None: prev_size = self._adjust_stack_size() - for _ in range(self.num_workers): - self.workers.append(NewParallel.Worker(self)) + if self.trace: + self.trace_message("Starting new worker thread") + self.workers.append(NewParallel.Worker(self)) self._restore_stack_size(prev_size) def _adjust_stack_size(self): @@ -680,6 +684,11 @@ class NewParallel: self.trace_message("Found task requiring execution") self.state = NewParallel.State.READY self.can_search_cv.notify() + # This thread will be busy taking care of + # `execute`ing this task. If we haven't + # reached the limit, spawn a new thread to + # turn the crank and find the next task. + self._maybe_start_worker() else: # We failed to find a task, so this thread diff --git a/test/option/fixture/taskmaster_expected_new_parallel.txt b/test/option/fixture/taskmaster_expected_new_parallel.txt index 071c8ce..23f491f 100644 --- a/test/option/fixture/taskmaster_expected_new_parallel.txt +++ b/test/option/fixture/taskmaster_expected_new_parallel.txt @@ -1,3 +1,4 @@ +Job.NewParallel._start_worker(): [Thread:XXXXX] Starting new worker thread Job.NewParallel._work(): [Thread:XXXXX] Gained exclusive access Job.NewParallel._work(): [Thread:XXXXX] Starting search Job.NewParallel._work(): [Thread:XXXXX] Found 0 completed tasks to process @@ -86,5 +87,3 @@ Taskmaster: No candidate anymore. Job.NewParallel._work(): [Thread:XXXXX] Found no task requiring execution, and have no jobs: marking complete Job.NewParallel._work(): [Thread:XXXXX] Gained exclusive access Job.NewParallel._work(): [Thread:XXXXX] Completion detected, breaking from main loop -Job.NewParallel._work(): [Thread:XXXXX] Gained exclusive access -Job.NewParallel._work(): [Thread:XXXXX] Completion detected, breaking from main loop diff --git a/test/option/stack-size.py b/test/option/stack-size.py index d64c73b..e9cb38e 100644 --- a/test/option/stack-size.py +++ b/test/option/stack-size.py @@ -89,14 +89,14 @@ File .* # # Test without any options # -test.run(chdir='work1', +test.run(chdir='work1', arguments = '.', stdout=expected_stdout, stderr='') test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) -test.run(chdir='work1', +test.run(chdir='work1', arguments = '-c .') test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) @@ -104,14 +104,14 @@ test.must_not_exist(['work1', 'f2.out']) # # Test with -j2 # -test.run(chdir='work1', +test.run(chdir='work1', arguments = '-j2 .', stdout=expected_stdout, stderr='') test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) -test.run(chdir='work1', +test.run(chdir='work1', arguments = '-j2 -c .') test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) @@ -120,14 +120,14 @@ test.must_not_exist(['work1', 'f2.out']) # # Test with --stack-size # -test.run(chdir='work1', +test.run(chdir='work1', arguments = '--stack-size=128 .', stdout=expected_stdout, stderr='') test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) -test.run(chdir='work1', +test.run(chdir='work1', arguments = '--stack-size=128 -c .') test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) @@ -135,14 +135,14 @@ test.must_not_exist(['work1', 'f2.out']) # # Test with SetOption('stack_size', 128) # -test.run(chdir='work2', +test.run(chdir='work2', arguments = '.', stdout=expected_stdout, stderr='') test.must_exist(['work2', 'f1.out']) test.must_exist(['work2', 'f2.out']) -test.run(chdir='work2', +test.run(chdir='work2', arguments = '--stack-size=128 -c .') test.must_not_exist(['work2', 'f1.out']) test.must_not_exist(['work2', 'f2.out']) @@ -151,14 +151,14 @@ if isStackSizeAvailable: # # Test with -j2 --stack-size=128 # - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=128 .', stdout=expected_stdout, stderr='') test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=128 -c .') test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) @@ -166,7 +166,7 @@ if isStackSizeAvailable: # # Test with -j2 --stack-size=16 # - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=16 .', match=TestSCons.match_re, stdout=re_expected_stdout, @@ -174,17 +174,25 @@ if isStackSizeAvailable: scons: warning: Setting stack size failed: size not valid: 16384 bytes File .* + +scons: warning: Setting stack size failed: + size not valid: 16384 bytes +File .* """) test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=16 -c .', match=TestSCons.match_re, stderr=""" scons: warning: Setting stack size failed: size not valid: 16384 bytes File .* + +scons: warning: Setting stack size failed: + size not valid: 16384 bytes +File .* """) test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) @@ -192,14 +200,14 @@ File .* # # Test with -j2 SetOption('stack_size', 128) # - test.run(chdir='work2', + test.run(chdir='work2', arguments = '-j2 .', stdout=expected_stdout, stderr='') test.must_exist(['work2', 'f1.out']) test.must_exist(['work2', 'f2.out']) - test.run(chdir='work2', + test.run(chdir='work2', arguments = '-j2 -c .') test.must_not_exist(['work2', 'f1.out']) test.must_not_exist(['work2', 'f2.out']) @@ -207,14 +215,14 @@ File .* # # Test with -j2 --stack-size=128 --warn=no-stack-size # - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=128 --warn=no-stack-size .', stdout=expected_stdout, stderr='') test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=128 --warn=no-stack-size -c .') test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) @@ -222,29 +230,29 @@ File .* # # Test with -j2 --stack-size=16 --warn=no-stack-size # - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=16 --warn=no-stack-size .', stdout=expected_stdout, stderr='') test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=16 --warn=no-stack-size -c .') test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) # - # Test with -j2 --warn=no-stack-size SetOption('stack_size', 128) + # Test with -j2 --warn=no-stack-size SetOption('stack_size', 128) # - test.run(chdir='work2', + test.run(chdir='work2', arguments = '-j2 --warn=no-stack-size .', stdout=expected_stdout, stderr='') test.must_exist(['work2', 'f1.out']) test.must_exist(['work2', 'f2.out']) - test.run(chdir='work2', + test.run(chdir='work2', arguments = '-j2 --warn=no-stack-size -c .') test.must_not_exist(['work2', 'f1.out']) test.must_not_exist(['work2', 'f2.out']) @@ -254,7 +262,7 @@ else: # # Test with -j2 --stack-size=128 # - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=128 .', match=TestSCons.match_re, stdout=re_expected_stdout, @@ -262,7 +270,7 @@ else: test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=128 -c .', match=TestSCons.match_re, stderr=expect_unsupported) @@ -272,7 +280,7 @@ else: # # Test with -j2 --stack-size=16 # - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=16 .', match=TestSCons.match_re, stdout=re_expected_stdout, @@ -280,7 +288,7 @@ else: test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=16 -c .', match=TestSCons.match_re, stderr=expect_unsupported) @@ -290,7 +298,7 @@ else: # # Test with -j2 SetOption('stack_size', 128) # - test.run(chdir='work2', + test.run(chdir='work2', arguments = '-j2 .', match=TestSCons.match_re, stdout=re_expected_stdout, @@ -298,7 +306,7 @@ else: test.must_exist(['work2', 'f1.out']) test.must_exist(['work2', 'f2.out']) - test.run(chdir='work2', + test.run(chdir='work2', arguments = '-j2 -c .', match=TestSCons.match_re, stderr=expect_unsupported) @@ -308,14 +316,14 @@ else: # # Test with -j2 --stack-size=128 --warn=no-stack-size # - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=128 --warn=no-stack-size .', stdout=expected_stdout, stderr='') test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=128 --warn=no-stack-size -c .') test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) @@ -323,29 +331,29 @@ else: # # Test with -j2 --stack-size=16 --warn=no-stack-size # - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=16 --warn=no-stack-size .', stdout=expected_stdout, stderr='') test.must_exist(['work1', 'f1.out']) test.must_exist(['work1', 'f2.out']) - test.run(chdir='work1', + test.run(chdir='work1', arguments = '-j2 --stack-size=16 --warn=no-stack-size -c .') test.must_not_exist(['work1', 'f1.out']) test.must_not_exist(['work1', 'f2.out']) # - # Test with -j2 --warn=no-stack-size SetOption('stack_size', 128) + # Test with -j2 --warn=no-stack-size SetOption('stack_size', 128) # - test.run(chdir='work2', + test.run(chdir='work2', arguments = '-j2 --warn=no-stack-size .', stdout=expected_stdout, stderr='') test.must_exist(['work2', 'f1.out']) test.must_exist(['work2', 'f2.out']) - test.run(chdir='work2', + test.run(chdir='work2', arguments = '-j2 --warn=no-stack-size -c .') test.must_not_exist(['work2', 'f1.out']) test.must_not_exist(['work2', 'f2.out']) -- cgit v0.12 From 311c860bafe3f48ec60d22c19df6be5126b01173 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Thu, 1 Feb 2024 10:18:15 -0500 Subject: update chagnes and release --- CHANGES.txt | 3 +++ RELEASE.txt | 3 +++ 2 files changed, 6 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index 357cb45..cb45be7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -68,6 +68,9 @@ RELEASE VERSION/DATE TO BE FILLED IN LATER - CacheDir writes no longer happen within the taskmaster critical section, and therefore can run in parallel with both other CacheDir writes and the taskmaster DAG walk. + - The NewParallel scheduler now only adds threads as new work requiring execution + is discovered, up to the limit set by -j. This should reduce resource utilization + when the achievable parallelism in the DAG is less than the -j limit. From Mats Wichmann: - Add support for Python 3.13 (as of alpha 2). So far only affects diff --git a/RELEASE.txt b/RELEASE.txt index ae43db1..02f63e6 100644 --- a/RELEASE.txt +++ b/RELEASE.txt @@ -67,6 +67,9 @@ IMPROVEMENTS (Larger -j values) - CacheDir writes no longer happen within the taskmaster critical section, and therefore can run in parallel with both other CacheDir writes and the taskmaster DAG walk. +- The NewParallel scheduler now only adds threads as new work requiring execution + is discovered, up to the limit set by -j. This should reduce resource utilization + when the achievable parallelism in the DAG is less than the -j limit. PACKAGING -- cgit v0.12 From dca250fe2185ff6688e38d11f495bff138da28c6 Mon Sep 17 00:00:00 2001 From: Andrew Morrow Date: Tue, 13 Feb 2024 09:34:07 -0500 Subject: Only create new workers when current workers are saturated with jobs --- SCons/Taskmaster/Job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/SCons/Taskmaster/Job.py b/SCons/Taskmaster/Job.py index 3b5b854..73ec0df 100644 --- a/SCons/Taskmaster/Job.py +++ b/SCons/Taskmaster/Job.py @@ -529,7 +529,8 @@ class NewParallel: def _maybe_start_worker(self) -> None: if self.max_workers > 1 and len(self.workers) < self.max_workers: - self._start_worker() + if self.jobs >= len(self.workers): + self._start_worker() def _start_worker(self) -> None: prev_size = self._adjust_stack_size() -- cgit v0.12