summaryrefslogtreecommitdiffstats
path: root/Source/cmWorkerPool.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'Source/cmWorkerPool.cxx')
-rw-r--r--Source/cmWorkerPool.cxx781
1 files changed, 781 insertions, 0 deletions
diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx
new file mode 100644
index 0000000..1d15c27
--- /dev/null
+++ b/Source/cmWorkerPool.cxx
@@ -0,0 +1,781 @@
+/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
+ file Copyright.txt or https://cmake.org/licensing for details. */
+#include "cmWorkerPool.h"
+
+#include <algorithm>
+#include <array>
+#include <condition_variable>
+#include <cstddef>
+#include <deque>
+#include <functional>
+#include <mutex>
+#include <thread>
+
+#include <cm/memory>
+
+#include <cm3p/uv.h>
+
+#include "cmRange.h"
+#include "cmStringAlgorithms.h"
+#include "cmUVHandlePtr.h"
+#include "cmUVSignalHackRAII.h" // IWYU pragma: keep
+
+/**
+ * @brief libuv pipe buffer class
+ */
+class cmUVPipeBuffer
+{
+public:
+ using DataRange = cmRange<const char*>;
+ using DataFunction = std::function<void(DataRange)>;
+ /// On error the ssize_t argument is a non zero libuv error code
+ using EndFunction = std::function<void(ssize_t)>;
+
+ /**
+ * Reset to construction state
+ */
+ void reset();
+
+ /**
+ * Initializes uv_pipe(), uv_stream() and uv_handle()
+ * @return true on success
+ */
+ bool init(uv_loop_t* uv_loop);
+
+ /**
+ * Start reading
+ * @return true on success
+ */
+ bool startRead(DataFunction dataFunction, EndFunction endFunction);
+
+ //! libuv pipe
+ uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
+ //! uv_pipe() casted to libuv stream
+ uv_stream_t* uv_stream() const
+ {
+ return static_cast<uv_stream_t*>(this->UVPipe_);
+ }
+ //! uv_pipe() casted to libuv handle
+ uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }
+
+private:
+ // -- Libuv callbacks
+ static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
+ uv_buf_t* buf);
+ static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
+
+ cm::uv_pipe_ptr UVPipe_;
+ std::vector<char> Buffer_;
+ DataFunction DataFunction_;
+ EndFunction EndFunction_;
+};
+
+void cmUVPipeBuffer::reset()
+{
+ if (this->UVPipe_.get() != nullptr) {
+ this->EndFunction_ = nullptr;
+ this->DataFunction_ = nullptr;
+ this->Buffer_.clear();
+ this->Buffer_.shrink_to_fit();
+ this->UVPipe_.reset();
+ }
+}
+
+bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
+{
+ this->reset();
+ if (uv_loop == nullptr) {
+ return false;
+ }
+ int ret = this->UVPipe_.init(*uv_loop, 0, this);
+ return (ret == 0);
+}
+
+bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
+ EndFunction endFunction)
+{
+ if (this->UVPipe_.get() == nullptr) {
+ return false;
+ }
+ if (!dataFunction || !endFunction) {
+ return false;
+ }
+ this->DataFunction_ = std::move(dataFunction);
+ this->EndFunction_ = std::move(endFunction);
+ int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
+ &cmUVPipeBuffer::UVData);
+ return (ret == 0);
+}
+
+void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
+ uv_buf_t* buf)
+{
+ auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
+ pipe.Buffer_.resize(suggestedSize);
+ buf->base = pipe.Buffer_.data();
+ buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
+}
+
+void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
+ const uv_buf_t* buf)
+{
+ auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
+ if (nread > 0) {
+ if (buf->base != nullptr) {
+ // Call data function
+ pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
+ }
+ } else if (nread < 0) {
+ // Save the end function on the stack before resetting the pipe
+ EndFunction efunc;
+ efunc.swap(pipe.EndFunction_);
+ // Reset pipe before calling the end function
+ pipe.reset();
+ // Call end function
+ efunc((nread == UV_EOF) ? 0 : nread);
+ }
+}
+
+/**
+ * @brief External process management class
+ */
+class cmUVReadOnlyProcess
+{
+public:
+ // -- Types
+ //! @brief Process settings
+ struct SetupT
+ {
+ std::string WorkingDirectory;
+ std::vector<std::string> Command;
+ cmWorkerPool::ProcessResultT* Result = nullptr;
+ bool MergedOutput = false;
+ };
+
+ // -- Const accessors
+ SetupT const& Setup() const { return this->Setup_; }
+ cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
+ bool IsStarted() const { return this->IsStarted_; }
+ bool IsFinished() const { return this->IsFinished_; }
+
+ // -- Runtime
+ void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory = std::string());
+ bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
+
+private:
+ // -- Libuv callbacks
+ static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
+ void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
+ void UVPipeOutEnd(ssize_t error);
+ void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
+ void UVPipeErrEnd(ssize_t error);
+ void UVTryFinish();
+
+ // -- Setup
+ SetupT Setup_;
+ // -- Runtime
+ bool IsStarted_ = false;
+ bool IsFinished_ = false;
+ std::function<void()> FinishedCallback_;
+ std::vector<const char*> CommandPtr_;
+ std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
+ uv_process_options_t UVOptions_;
+ cm::uv_process_ptr UVProcess_;
+ cmUVPipeBuffer UVPipeOut_;
+ cmUVPipeBuffer UVPipeErr_;
+};
+
+void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
+ bool mergedOutput,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory)
+{
+ this->Setup_.WorkingDirectory = workingDirectory;
+ this->Setup_.Command = command;
+ this->Setup_.Result = result;
+ this->Setup_.MergedOutput = mergedOutput;
+}
+
+bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
+ std::function<void()> finishedCallback)
+{
+ if (this->IsStarted() || (this->Result() == nullptr)) {
+ return false;
+ }
+
+ // Reset result before the start
+ this->Result()->reset();
+
+ // Fill command string pointers
+ if (!this->Setup().Command.empty()) {
+ this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
+ for (std::string const& arg : this->Setup().Command) {
+ this->CommandPtr_.push_back(arg.c_str());
+ }
+ this->CommandPtr_.push_back(nullptr);
+ } else {
+ this->Result()->ErrorMessage = "Empty command";
+ }
+
+ if (!this->Result()->error()) {
+ if (!this->UVPipeOut_.init(uv_loop)) {
+ this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
+ }
+ }
+ if (!this->Result()->error()) {
+ if (!this->UVPipeErr_.init(uv_loop)) {
+ this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
+ }
+ }
+ if (!this->Result()->error()) {
+ // -- Setup process stdio options
+ // stdin
+ this->UVOptionsStdIO_[0].flags = UV_IGNORE;
+ this->UVOptionsStdIO_[0].data.stream = nullptr;
+ // stdout
+ this->UVOptionsStdIO_[1].flags =
+ static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
+ this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
+ // stderr
+ this->UVOptionsStdIO_[2].flags =
+ static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
+ this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();
+
+ // -- Setup process options
+ std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
+ sizeof(this->UVOptions_), 0);
+ this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
+ this->UVOptions_.file = this->CommandPtr_[0];
+ this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
+ this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
+ this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
+ this->UVOptions_.stdio_count =
+ static_cast<int>(this->UVOptionsStdIO_.size());
+ this->UVOptions_.stdio = this->UVOptionsStdIO_.data();
+
+ // -- Spawn process
+ int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
+ if (uvErrorCode != 0) {
+ this->Result()->ErrorMessage = "libuv process spawn failed";
+ if (const char* uvErr = uv_strerror(uvErrorCode)) {
+ this->Result()->ErrorMessage += ": ";
+ this->Result()->ErrorMessage += uvErr;
+ }
+ }
+ }
+ // -- Start reading from stdio streams
+ if (!this->Result()->error()) {
+ if (!this->UVPipeOut_.startRead(
+ [this](cmUVPipeBuffer::DataRange range) {
+ this->UVPipeOutData(range);
+ },
+ [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
+ this->Result()->ErrorMessage =
+ "libuv start reading from stdout pipe failed";
+ }
+ }
+ if (!this->Result()->error()) {
+ if (!this->UVPipeErr_.startRead(
+ [this](cmUVPipeBuffer::DataRange range) {
+ this->UVPipeErrData(range);
+ },
+ [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
+ this->Result()->ErrorMessage =
+ "libuv start reading from stderr pipe failed";
+ }
+ }
+
+ if (!this->Result()->error()) {
+ this->IsStarted_ = true;
+ this->FinishedCallback_ = std::move(finishedCallback);
+ } else {
+ // Clear libuv handles and finish
+ this->UVProcess_.reset();
+ this->UVPipeOut_.reset();
+ this->UVPipeErr_.reset();
+ this->CommandPtr_.clear();
+ }
+
+ return this->IsStarted();
+}
+
+void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
+ int termSignal)
+{
+ auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
+ if (proc.IsStarted() && !proc.IsFinished()) {
+ // Set error message on demand
+ proc.Result()->ExitStatus = exitStatus;
+ proc.Result()->TermSignal = termSignal;
+ if (!proc.Result()->error()) {
+ if (termSignal != 0) {
+ proc.Result()->ErrorMessage = cmStrCat(
+ "Process was terminated by signal ", proc.Result()->TermSignal);
+ } else if (exitStatus != 0) {
+ proc.Result()->ErrorMessage = cmStrCat(
+ "Process failed with return value ", proc.Result()->ExitStatus);
+ }
+ }
+
+ // Reset process handle
+ proc.UVProcess_.reset();
+ // Try finish
+ proc.UVTryFinish();
+ }
+}
+
+void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
+{
+ this->Result()->StdOut.append(data.begin(), data.end());
+}
+
+void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
+{
+ // Process pipe error
+ if ((error != 0) && !this->Result()->error()) {
+ this->Result()->ErrorMessage = cmStrCat(
+ "Reading from stdout pipe failed with libuv error code ", error);
+ }
+ // Try finish
+ this->UVTryFinish();
+}
+
+void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
+{
+ std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
+ : &this->Result()->StdErr;
+ str->append(data.begin(), data.end());
+}
+
+void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
+{
+ // Process pipe error
+ if ((error != 0) && !this->Result()->error()) {
+ this->Result()->ErrorMessage = cmStrCat(
+ "Reading from stderr pipe failed with libuv error code ", error);
+ }
+ // Try finish
+ this->UVTryFinish();
+}
+
+void cmUVReadOnlyProcess::UVTryFinish()
+{
+ // There still might be data in the pipes after the process has finished.
+ // Therefore check if the process is finished AND all pipes are closed
+ // before signaling the worker thread to continue.
+ if ((this->UVProcess_.get() != nullptr) ||
+ (this->UVPipeOut_.uv_pipe() != nullptr) ||
+ (this->UVPipeErr_.uv_pipe() != nullptr)) {
+ return;
+ }
+ this->IsFinished_ = true;
+ this->FinishedCallback_();
+}
+
+/**
+ * @brief Worker pool worker thread
+ */
+class cmWorkerPoolWorker
+{
+public:
+ cmWorkerPoolWorker(uv_loop_t& uvLoop);
+ ~cmWorkerPoolWorker();
+
+ cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
+ cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
+
+ /**
+ * Set the internal thread
+ */
+ void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }
+
+ /**
+ * Run an external process
+ */
+ bool RunProcess(cmWorkerPool::ProcessResultT& result,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory);
+
+private:
+ // -- Libuv callbacks
+ static void UVProcessStart(uv_async_t* handle);
+ void UVProcessFinished();
+
+ // -- Process management
+ struct
+ {
+ std::mutex Mutex;
+ cm::uv_async_ptr Request;
+ std::condition_variable Condition;
+ std::unique_ptr<cmUVReadOnlyProcess> ROP;
+ } Proc_;
+ // -- System thread
+ std::thread Thread_;
+};
+
+cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
+{
+ this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
+}
+
+cmWorkerPoolWorker::~cmWorkerPoolWorker()
+{
+ if (this->Thread_.joinable()) {
+ this->Thread_.join();
+ }
+}
+
+bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory)
+{
+ if (command.empty()) {
+ return false;
+ }
+ // Create process instance
+ {
+ std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
+ this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
+ this->Proc_.ROP->setup(&result, true, command, workingDirectory);
+ }
+ // Send asynchronous process start request to libuv loop
+ this->Proc_.Request.send();
+ // Wait until the process has been finished and destroyed
+ {
+ std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
+ while (this->Proc_.ROP) {
+ this->Proc_.Condition.wait(ulock);
+ }
+ }
+ return !result.error();
+}
+
+void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
+{
+ auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
+ bool startFailed = false;
+ {
+ auto& Proc = wrk->Proc_;
+ std::lock_guard<std::mutex> lock(Proc.Mutex);
+ if (Proc.ROP && !Proc.ROP->IsStarted()) {
+ startFailed =
+ !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
+ }
+ }
+ // Clean up if starting of the process failed
+ if (startFailed) {
+ wrk->UVProcessFinished();
+ }
+}
+
+void cmWorkerPoolWorker::UVProcessFinished()
+{
+ std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
+ if (this->Proc_.ROP &&
+ (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
+ this->Proc_.ROP.reset();
+ }
+ // Notify idling thread
+ this->Proc_.Condition.notify_one();
+}
+
+/**
+ * @brief Private worker pool internals
+ */
+class cmWorkerPoolInternal
+{
+public:
+ // -- Constructors
+ cmWorkerPoolInternal(cmWorkerPool* pool);
+ ~cmWorkerPoolInternal();
+
+ /**
+ * Runs the libuv loop.
+ */
+ bool Process();
+
+ /**
+ * Clear queue and abort threads.
+ */
+ void Abort();
+
+ /**
+ * Push a job to the queue and notify a worker.
+ */
+ bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
+
+ /**
+ * Worker thread main loop method.
+ */
+ void Work(unsigned int workerIndex);
+
+ // -- Request slots
+ static void UVSlotBegin(uv_async_t* handle);
+ static void UVSlotEnd(uv_async_t* handle);
+
+ // -- UV loop
+#ifdef CMAKE_UV_SIGNAL_HACK
+ std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
+#endif
+ std::unique_ptr<uv_loop_t> UVLoop;
+ cm::uv_async_ptr UVRequestBegin;
+ cm::uv_async_ptr UVRequestEnd;
+
+ // -- Thread pool and job queue
+ std::mutex Mutex;
+ bool Processing = false;
+ bool Aborting = false;
+ bool FenceProcessing = false;
+ unsigned int WorkersRunning = 0;
+ unsigned int WorkersIdle = 0;
+ unsigned int JobsProcessing = 0;
+ std::deque<cmWorkerPool::JobHandleT> Queue;
+ std::condition_variable Condition;
+ std::condition_variable ConditionFence;
+ std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
+
+ // -- References
+ cmWorkerPool* Pool = nullptr;
+};
+
+void cmWorkerPool::ProcessResultT::reset()
+{
+ this->ExitStatus = 0;
+ this->TermSignal = 0;
+ if (!this->StdOut.empty()) {
+ this->StdOut.clear();
+ this->StdOut.shrink_to_fit();
+ }
+ if (!this->StdErr.empty()) {
+ this->StdErr.clear();
+ this->StdErr.shrink_to_fit();
+ }
+ if (!this->ErrorMessage.empty()) {
+ this->ErrorMessage.clear();
+ this->ErrorMessage.shrink_to_fit();
+ }
+}
+
+cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
+ : Pool(pool)
+{
+ // Initialize libuv loop
+ uv_disable_stdio_inheritance();
+#ifdef CMAKE_UV_SIGNAL_HACK
+ UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
+#endif
+ this->UVLoop = cm::make_unique<uv_loop_t>();
+ uv_loop_init(this->UVLoop.get());
+}
+
+cmWorkerPoolInternal::~cmWorkerPoolInternal()
+{
+ uv_loop_close(this->UVLoop.get());
+}
+
+bool cmWorkerPoolInternal::Process()
+{
+ // Reset state flags
+ this->Processing = true;
+ this->Aborting = false;
+ // Initialize libuv asynchronous request
+ this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
+ this);
+ this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
+ this);
+ // Send begin request
+ this->UVRequestBegin.send();
+ // Run libuv loop
+ bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
+ // Update state flags
+ this->Processing = false;
+ this->Aborting = false;
+ return success;
+}
+
+void cmWorkerPoolInternal::Abort()
+{
+ // Clear all jobs and set abort flag
+ std::lock_guard<std::mutex> guard(this->Mutex);
+ if (!this->Aborting) {
+ // Register abort and clear queue
+ this->Aborting = true;
+ this->Queue.clear();
+ this->Condition.notify_all();
+ }
+}
+
+inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
+{
+ std::lock_guard<std::mutex> guard(this->Mutex);
+ if (this->Aborting) {
+ return false;
+ }
+ // Append the job to the queue
+ this->Queue.emplace_back(std::move(jobHandle));
+ // Notify an idle worker if there's one
+ if (this->WorkersIdle != 0) {
+ this->Condition.notify_one();
+ }
+ // Return success
+ return true;
+}
+
+void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
+{
+ auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
+ // Create worker threads
+ {
+ unsigned int const num = gint.Pool->ThreadCount();
+ // Create workers
+ gint.Workers.reserve(num);
+ for (unsigned int ii = 0; ii != num; ++ii) {
+ gint.Workers.emplace_back(
+ cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
+ }
+ // Start worker threads
+ for (unsigned int ii = 0; ii != num; ++ii) {
+ gint.Workers[ii]->SetThread(
+ std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
+ }
+ }
+ // Destroy begin request
+ gint.UVRequestBegin.reset();
+}
+
+void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
+{
+ auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
+ // Join and destroy worker threads
+ gint.Workers.clear();
+ // Destroy end request
+ gint.UVRequestEnd.reset();
+}
+
+void cmWorkerPoolInternal::Work(unsigned int workerIndex)
+{
+ cmWorkerPool::JobHandleT jobHandle;
+ std::unique_lock<std::mutex> uLock(this->Mutex);
+ // Increment running workers count
+ ++this->WorkersRunning;
+ // Enter worker main loop
+ while (true) {
+ // Abort on request
+ if (this->Aborting) {
+ break;
+ }
+ // Wait for new jobs on the main CV
+ if (this->Queue.empty()) {
+ ++this->WorkersIdle;
+ this->Condition.wait(uLock);
+ --this->WorkersIdle;
+ continue;
+ }
+
+ // If there is a fence currently active or waiting,
+ // sleep on the main CV and try again.
+ if (this->FenceProcessing) {
+ this->Condition.wait(uLock);
+ continue;
+ }
+
+ // Pop next job from queue
+ jobHandle = std::move(this->Queue.front());
+ this->Queue.pop_front();
+
+ // Check for fence jobs
+ bool raisedFence = false;
+ if (jobHandle->IsFence()) {
+ this->FenceProcessing = true;
+ raisedFence = true;
+ // Wait on the Fence CV until all pending jobs are done.
+ while (this->JobsProcessing != 0 && !this->Aborting) {
+ this->ConditionFence.wait(uLock);
+ }
+ // When aborting, explicitly kick all threads alive once more.
+ if (this->Aborting) {
+ this->FenceProcessing = false;
+ this->Condition.notify_all();
+ break;
+ }
+ }
+
+ // Unlocked scope for job processing
+ ++this->JobsProcessing;
+ {
+ uLock.unlock();
+ jobHandle->Work(this->Pool, workerIndex); // Process job
+ jobHandle.reset(); // Destroy job
+ uLock.lock();
+ }
+ --this->JobsProcessing;
+
+ // If this was the thread that entered fence processing
+ // originally, notify all idling workers that the fence
+ // is done.
+ if (raisedFence) {
+ this->FenceProcessing = false;
+ this->Condition.notify_all();
+ }
+ // If fence processing is still not done, notify the
+ // the fencing worker when all active jobs are done.
+ if (this->FenceProcessing && this->JobsProcessing == 0) {
+ this->ConditionFence.notify_all();
+ }
+ }
+
+ // Decrement running workers count
+ if (--this->WorkersRunning == 0) {
+ // Last worker thread about to finish. Send libuv event.
+ this->UVRequestEnd.send();
+ }
+}
+
+cmWorkerPool::JobT::~JobT() = default;
+
+bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory)
+{
+ // Get worker by index
+ auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
+ return wrk->RunProcess(result, command, workingDirectory);
+}
+
+cmWorkerPool::cmWorkerPool()
+ : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
+{
+}
+
+cmWorkerPool::~cmWorkerPool() = default;
+
+void cmWorkerPool::SetThreadCount(unsigned int threadCount)
+{
+ if (!this->Int_->Processing) {
+ this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
+ }
+}
+
+bool cmWorkerPool::Process(void* userData)
+{
+ // Setup user data
+ this->UserData_ = userData;
+ // Run libuv loop
+ bool success = this->Int_->Process();
+ // Clear user data
+ this->UserData_ = nullptr;
+ // Return
+ return success;
+}
+
+bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
+{
+ return this->Int_->PushJob(std::move(jobHandle));
+}
+
+void cmWorkerPool::Abort()
+{
+ this->Int_->Abort();
+}