diff options
Diffstat (limited to 'Source/cmWorkerPool.cxx')
-rw-r--r-- | Source/cmWorkerPool.cxx | 71 |
1 files changed, 42 insertions, 29 deletions
diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx index aa0d6b3..12aba4f 100644 --- a/Source/cmWorkerPool.cxx +++ b/Source/cmWorkerPool.cxx @@ -13,7 +13,7 @@ #include <cm/memory> -#include "cm_uv.h" +#include <cm3p/uv.h> #include "cmRange.h" #include "cmStringAlgorithms.h" @@ -469,11 +469,9 @@ void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) void cmWorkerPoolWorker::UVProcessFinished() { - { - std::lock_guard<std::mutex> lock(Proc_.Mutex); - if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) { - Proc_.ROP.reset(); - } + std::lock_guard<std::mutex> lock(Proc_.Mutex); + if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) { + Proc_.ROP.reset(); } // Notify idling thread Proc_.Condition.notify_one(); @@ -532,6 +530,7 @@ public: unsigned int JobsProcessing = 0; std::deque<cmWorkerPool::JobHandleT> Queue; std::condition_variable Condition; + std::condition_variable ConditionFence; std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers; // -- References @@ -593,19 +592,12 @@ bool cmWorkerPoolInternal::Process() void cmWorkerPoolInternal::Abort() { - bool notifyThreads = false; // Clear all jobs and set abort flag - { - std::lock_guard<std::mutex> guard(Mutex); - if (Processing && !Aborting) { - // Register abort and clear queue - Aborting = true; - Queue.clear(); - notifyThreads = true; - } - } - if (notifyThreads) { - // Wake threads + std::lock_guard<std::mutex> guard(Mutex); + if (!Aborting) { + // Register abort and clear queue + Aborting = true; + Queue.clear(); Condition.notify_all(); } } @@ -669,7 +661,7 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) if (Aborting) { break; } - // Wait for new jobs + // Wait for new jobs on the main CV if (Queue.empty()) { ++WorkersIdle; Condition.wait(uLock); @@ -677,20 +669,34 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) continue; } - // Check for fence jobs - if (FenceProcessing || Queue.front()->IsFence()) { - if (JobsProcessing != 0) { - Condition.wait(uLock); - continue; - } - // No jobs get processed. Set the fence job processing flag. - FenceProcessing = true; + // If there is a fence currently active or waiting, + // sleep on the main CV and try again. + if (FenceProcessing) { + Condition.wait(uLock); + continue; } // Pop next job from queue jobHandle = std::move(Queue.front()); Queue.pop_front(); + // Check for fence jobs + bool raisedFence = false; + if (jobHandle->IsFence()) { + FenceProcessing = true; + raisedFence = true; + // Wait on the Fence CV until all pending jobs are done. + while (JobsProcessing != 0 && !Aborting) { + ConditionFence.wait(uLock); + } + // When aborting, explicitly kick all threads alive once more. + if (Aborting) { + FenceProcessing = false; + Condition.notify_all(); + break; + } + } + // Unlocked scope for job processing ++JobsProcessing; { @@ -701,11 +707,18 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) } --JobsProcessing; - // Was this a fence job? - if (FenceProcessing) { + // If this was the thread that entered fence processing + // originally, notify all idling workers that the fence + // is done. + if (raisedFence) { FenceProcessing = false; Condition.notify_all(); } + // If fence processing is still not done, notify the + // the fencing worker when all active jobs are done. + if (FenceProcessing && JobsProcessing == 0) { + ConditionFence.notify_all(); + } } // Decrement running workers count |