From 01ad90258871d3d86d6b9a06878c7ddad95b292a Mon Sep 17 00:00:00 2001 From: Joerg Sonnenberger Date: Thu, 21 May 2020 21:07:50 +0200 Subject: Autogen: Redo locking and state machine for fence handling and the worker pool (1) All CV use must hold the corresponding mutex, otherwise race conditions happen. This is mandated by the C++ standard. (2) Introduce a separate CV for the thread waiting for other jobs to finish before running a fence. This avoids waking up all other workers blindly. Correctly wake that thread up when the processing of outstanding jobs is done. (3) Split the waiting for a fence to become runnable from a fence is pending. This avoids problems if more than one fence can end up on the queue. The thread that took a fence off the queue is responsible for clearing the fence processing flag. --- Source/cmWorkerPool.cxx | 69 +++++++++++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx index 9fec936..12aba4f 100644 --- a/Source/cmWorkerPool.cxx +++ b/Source/cmWorkerPool.cxx @@ -469,11 +469,9 @@ void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) void cmWorkerPoolWorker::UVProcessFinished() { - { - std::lock_guard lock(Proc_.Mutex); - if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) { - Proc_.ROP.reset(); - } + std::lock_guard lock(Proc_.Mutex); + if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) { + Proc_.ROP.reset(); } // Notify idling thread Proc_.Condition.notify_one(); @@ -532,6 +530,7 @@ public: unsigned int JobsProcessing = 0; std::deque Queue; std::condition_variable Condition; + std::condition_variable ConditionFence; std::vector> Workers; // -- References @@ -593,19 +592,12 @@ bool cmWorkerPoolInternal::Process() void cmWorkerPoolInternal::Abort() { - bool notifyThreads = false; // Clear all jobs and set abort flag - { - std::lock_guard guard(Mutex); - if (Processing && !Aborting) { - // Register abort and clear queue - Aborting = true; - Queue.clear(); - notifyThreads = true; - } - } - if (notifyThreads) { - // Wake threads + std::lock_guard guard(Mutex); + if (!Aborting) { + // Register abort and clear queue + Aborting = true; + Queue.clear(); Condition.notify_all(); } } @@ -669,7 +661,7 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) if (Aborting) { break; } - // Wait for new jobs + // Wait for new jobs on the main CV if (Queue.empty()) { ++WorkersIdle; Condition.wait(uLock); @@ -677,20 +669,34 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) continue; } - // Check for fence jobs - if (FenceProcessing || Queue.front()->IsFence()) { - if (JobsProcessing != 0) { - Condition.wait(uLock); - continue; - } - // No jobs get processed. Set the fence job processing flag. - FenceProcessing = true; + // If there is a fence currently active or waiting, + // sleep on the main CV and try again. + if (FenceProcessing) { + Condition.wait(uLock); + continue; } // Pop next job from queue jobHandle = std::move(Queue.front()); Queue.pop_front(); + // Check for fence jobs + bool raisedFence = false; + if (jobHandle->IsFence()) { + FenceProcessing = true; + raisedFence = true; + // Wait on the Fence CV until all pending jobs are done. + while (JobsProcessing != 0 && !Aborting) { + ConditionFence.wait(uLock); + } + // When aborting, explicitly kick all threads alive once more. + if (Aborting) { + FenceProcessing = false; + Condition.notify_all(); + break; + } + } + // Unlocked scope for job processing ++JobsProcessing; { @@ -701,11 +707,18 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) } --JobsProcessing; - // Was this a fence job? - if (FenceProcessing) { + // If this was the thread that entered fence processing + // originally, notify all idling workers that the fence + // is done. + if (raisedFence) { FenceProcessing = false; Condition.notify_all(); } + // If fence processing is still not done, notify the + // the fencing worker when all active jobs are done. + if (FenceProcessing && JobsProcessing == 0) { + ConditionFence.notify_all(); + } } // Decrement running workers count -- cgit v0.12