/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying file Copyright.txt or https://cmake.org/licensing for details. */ #include "cmWorkerPool.h" #include <algorithm> #include <array> #include <condition_variable> #include <cstddef> #include <deque> #include <functional> #include <mutex> #include <thread> #include <cm/memory> #include <cm3p/uv.h> #include "cmRange.h" #include "cmStringAlgorithms.h" #include "cmUVHandlePtr.h" #include "cmUVSignalHackRAII.h" // IWYU pragma: keep /** * @brief libuv pipe buffer class */ class cmUVPipeBuffer { public: using DataRange = cmRange<const char*>; using DataFunction = std::function<void(DataRange)>; /// On error the ssize_t argument is a non zero libuv error code using EndFunction = std::function<void(ssize_t)>; /** * Reset to construction state */ void reset(); /** * Initializes uv_pipe(), uv_stream() and uv_handle() * @return true on success */ bool init(uv_loop_t* uv_loop); /** * Start reading * @return true on success */ bool startRead(DataFunction dataFunction, EndFunction endFunction); //! libuv pipe uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); } //! uv_pipe() casted to libuv stream uv_stream_t* uv_stream() const { return static_cast<uv_stream_t*>(this->UVPipe_); } //! uv_pipe() casted to libuv handle uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); } private: // -- Libuv callbacks static void UVAlloc(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* buf); static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); cm::uv_pipe_ptr UVPipe_; std::vector<char> Buffer_; DataFunction DataFunction_; EndFunction EndFunction_; }; void cmUVPipeBuffer::reset() { if (this->UVPipe_.get() != nullptr) { this->EndFunction_ = nullptr; this->DataFunction_ = nullptr; this->Buffer_.clear(); this->Buffer_.shrink_to_fit(); this->UVPipe_.reset(); } } bool cmUVPipeBuffer::init(uv_loop_t* uv_loop) { this->reset(); if (uv_loop == nullptr) { return false; } int ret = this->UVPipe_.init(*uv_loop, 0, this); return (ret == 0); } bool cmUVPipeBuffer::startRead(DataFunction dataFunction, EndFunction endFunction) { if (this->UVPipe_.get() == nullptr) { return false; } if (!dataFunction || !endFunction) { return false; } this->DataFunction_ = std::move(dataFunction); this->EndFunction_ = std::move(endFunction); int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc, &cmUVPipeBuffer::UVData); return (ret == 0); } void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* buf) { auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data); pipe.Buffer_.resize(suggestedSize); buf->base = pipe.Buffer_.data(); buf->len = static_cast<unsigned long>(pipe.Buffer_.size()); } void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data); if (nread > 0) { if (buf->base != nullptr) { // Call data function pipe.DataFunction_(DataRange(buf->base, buf->base + nread)); } } else if (nread < 0) { // Save the end function on the stack before resetting the pipe EndFunction efunc; efunc.swap(pipe.EndFunction_); // Reset pipe before calling the end function pipe.reset(); // Call end function efunc((nread == UV_EOF) ? 0 : nread); } } /** * @brief External process management class */ class cmUVReadOnlyProcess { public: // -- Types //! @brief Process settings struct SetupT { std::string WorkingDirectory; std::vector<std::string> Command; cmWorkerPool::ProcessResultT* Result = nullptr; bool MergedOutput = false; }; // -- Const accessors SetupT const& Setup() const { return this->Setup_; } cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; } bool IsStarted() const { return this->IsStarted_; } bool IsFinished() const { return this->IsFinished_; } // -- Runtime void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput, std::vector<std::string> const& command, std::string const& workingDirectory = std::string()); bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback); private: // -- Libuv callbacks static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal); void UVPipeOutData(cmUVPipeBuffer::DataRange data) const; void UVPipeOutEnd(ssize_t error); void UVPipeErrData(cmUVPipeBuffer::DataRange data) const; void UVPipeErrEnd(ssize_t error); void UVTryFinish(); // -- Setup SetupT Setup_; // -- Runtime bool IsStarted_ = false; bool IsFinished_ = false; std::function<void()> FinishedCallback_; std::vector<const char*> CommandPtr_; std::array<uv_stdio_container_t, 3> UVOptionsStdIO_; uv_process_options_t UVOptions_; cm::uv_process_ptr UVProcess_; cmUVPipeBuffer UVPipeOut_; cmUVPipeBuffer UVPipeErr_; }; void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput, std::vector<std::string> const& command, std::string const& workingDirectory) { this->Setup_.WorkingDirectory = workingDirectory; this->Setup_.Command = command; this->Setup_.Result = result; this->Setup_.MergedOutput = mergedOutput; } bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop, std::function<void()> finishedCallback) { if (this->IsStarted() || (this->Result() == nullptr)) { return false; } // Reset result before the start this->Result()->reset(); // Fill command string pointers if (!this->Setup().Command.empty()) { this->CommandPtr_.reserve(this->Setup().Command.size() + 1); for (std::string const& arg : this->Setup().Command) { this->CommandPtr_.push_back(arg.c_str()); } this->CommandPtr_.push_back(nullptr); } else { this->Result()->ErrorMessage = "Empty command"; } if (!this->Result()->error()) { if (!this->UVPipeOut_.init(uv_loop)) { this->Result()->ErrorMessage = "libuv stdout pipe initialization failed"; } } if (!this->Result()->error()) { if (!this->UVPipeErr_.init(uv_loop)) { this->Result()->ErrorMessage = "libuv stderr pipe initialization failed"; } } if (!this->Result()->error()) { // -- Setup process stdio options // stdin this->UVOptionsStdIO_[0].flags = UV_IGNORE; this->UVOptionsStdIO_[0].data.stream = nullptr; // stdout this->UVOptionsStdIO_[1].flags = static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE); this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream(); // stderr this->UVOptionsStdIO_[2].flags = static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE); this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream(); // -- Setup process options std::fill_n(reinterpret_cast<char*>(&this->UVOptions_), sizeof(this->UVOptions_), 0); this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit; this->UVOptions_.file = this->CommandPtr_[0]; this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data()); this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str(); this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE; this->UVOptions_.stdio_count = static_cast<int>(this->UVOptionsStdIO_.size()); this->UVOptions_.stdio = this->UVOptionsStdIO_.data(); // -- Spawn process int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this); if (uvErrorCode != 0) { this->Result()->ErrorMessage = "libuv process spawn failed"; if (const char* uvErr = uv_strerror(uvErrorCode)) { this->Result()->ErrorMessage += ": "; this->Result()->ErrorMessage += uvErr; } } } // -- Start reading from stdio streams if (!this->Result()->error()) { if (!this->UVPipeOut_.startRead( [this](cmUVPipeBuffer::DataRange range) { this->UVPipeOutData(range); }, [this](ssize_t error) { this->UVPipeOutEnd(error); })) { this->Result()->ErrorMessage = "libuv start reading from stdout pipe failed"; } } if (!this->Result()->error()) { if (!this->UVPipeErr_.startRead( [this](cmUVPipeBuffer::DataRange range) { this->UVPipeErrData(range); }, [this](ssize_t error) { this->UVPipeErrEnd(error); })) { this->Result()->ErrorMessage = "libuv start reading from stderr pipe failed"; } } if (!this->Result()->error()) { this->IsStarted_ = true; this->FinishedCallback_ = std::move(finishedCallback); } else { // Clear libuv handles and finish this->UVProcess_.reset(); this->UVPipeOut_.reset(); this->UVPipeErr_.reset(); this->CommandPtr_.clear(); } return this->IsStarted(); } void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal) { auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data); if (proc.IsStarted() && !proc.IsFinished()) { // Set error message on demand proc.Result()->ExitStatus = exitStatus; proc.Result()->TermSignal = termSignal; if (!proc.Result()->error()) { if (termSignal != 0) { proc.Result()->ErrorMessage = cmStrCat( "Process was terminated by signal ", proc.Result()->TermSignal); } else if (exitStatus != 0) { proc.Result()->ErrorMessage = cmStrCat( "Process failed with return value ", proc.Result()->ExitStatus); } } // Reset process handle proc.UVProcess_.reset(); // Try finish proc.UVTryFinish(); } } void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const { this->Result()->StdOut.append(data.begin(), data.end()); } void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error) { // Process pipe error if ((error != 0) && !this->Result()->error()) { this->Result()->ErrorMessage = cmStrCat( "Reading from stdout pipe failed with libuv error code ", error); } // Try finish this->UVTryFinish(); } void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const { std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut : &this->Result()->StdErr; str->append(data.begin(), data.end()); } void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error) { // Process pipe error if ((error != 0) && !this->Result()->error()) { this->Result()->ErrorMessage = cmStrCat( "Reading from stderr pipe failed with libuv error code ", error); } // Try finish this->UVTryFinish(); } void cmUVReadOnlyProcess::UVTryFinish() { // There still might be data in the pipes after the process has finished. // Therefore check if the process is finished AND all pipes are closed // before signaling the worker thread to continue. if ((this->UVProcess_.get() != nullptr) || (this->UVPipeOut_.uv_pipe() != nullptr) || (this->UVPipeErr_.uv_pipe() != nullptr)) { return; } this->IsFinished_ = true; this->FinishedCallback_(); } /** * @brief Worker pool worker thread */ class cmWorkerPoolWorker { public: cmWorkerPoolWorker(uv_loop_t& uvLoop); ~cmWorkerPoolWorker(); cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete; cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete; /** * Set the internal thread */ void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); } /** * Run an external process */ bool RunProcess(cmWorkerPool::ProcessResultT& result, std::vector<std::string> const& command, std::string const& workingDirectory); private: // -- Libuv callbacks static void UVProcessStart(uv_async_t* handle); void UVProcessFinished(); // -- Process management struct { std::mutex Mutex; cm::uv_async_ptr Request; std::condition_variable Condition; std::unique_ptr<cmUVReadOnlyProcess> ROP; } Proc_; // -- System thread std::thread Thread_; }; cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop) { this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this); } cmWorkerPoolWorker::~cmWorkerPoolWorker() { if (this->Thread_.joinable()) { this->Thread_.join(); } } bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result, std::vector<std::string> const& command, std::string const& workingDirectory) { if (command.empty()) { return false; } // Create process instance { std::lock_guard<std::mutex> lock(this->Proc_.Mutex); this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>(); this->Proc_.ROP->setup(&result, true, command, workingDirectory); } // Send asynchronous process start request to libuv loop this->Proc_.Request.send(); // Wait until the process has been finished and destroyed { std::unique_lock<std::mutex> ulock(this->Proc_.Mutex); while (this->Proc_.ROP) { this->Proc_.Condition.wait(ulock); } } return !result.error(); } void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) { auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data); bool startFailed = false; { auto& Proc = wrk->Proc_; std::lock_guard<std::mutex> lock(Proc.Mutex); if (Proc.ROP && !Proc.ROP->IsStarted()) { startFailed = !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); }); } } // Clean up if starting of the process failed if (startFailed) { wrk->UVProcessFinished(); } } void cmWorkerPoolWorker::UVProcessFinished() { std::lock_guard<std::mutex> lock(this->Proc_.Mutex); if (this->Proc_.ROP && (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) { this->Proc_.ROP.reset(); } // Notify idling thread this->Proc_.Condition.notify_one(); } /** * @brief Private worker pool internals */ class cmWorkerPoolInternal { public: // -- Constructors cmWorkerPoolInternal(cmWorkerPool* pool); ~cmWorkerPoolInternal(); /** * Runs the libuv loop. */ bool Process(); /** * Clear queue and abort threads. */ void Abort(); /** * Push a job to the queue and notify a worker. */ bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); /** * Worker thread main loop method. */ void Work(unsigned int workerIndex); // -- Request slots static void UVSlotBegin(uv_async_t* handle); static void UVSlotEnd(uv_async_t* handle); // -- UV loop #ifdef CMAKE_UV_SIGNAL_HACK std::unique_ptr<cmUVSignalHackRAII> UVHackRAII; #endif std::unique_ptr<uv_loop_t> UVLoop; cm::uv_async_ptr UVRequestBegin; cm::uv_async_ptr UVRequestEnd; // -- Thread pool and job queue std::mutex Mutex; bool Processing = false; bool Aborting = false; bool FenceProcessing = false; unsigned int WorkersRunning = 0; unsigned int WorkersIdle = 0; unsigned int JobsProcessing = 0; std::deque<cmWorkerPool::JobHandleT> Queue; std::condition_variable Condition; std::condition_variable ConditionFence; std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers; // -- References cmWorkerPool* Pool = nullptr; }; void cmWorkerPool::ProcessResultT::reset() { this->ExitStatus = 0; this->TermSignal = 0; if (!this->StdOut.empty()) { this->StdOut.clear(); this->StdOut.shrink_to_fit(); } if (!this->StdErr.empty()) { this->StdErr.clear(); this->StdErr.shrink_to_fit(); } if (!this->ErrorMessage.empty()) { this->ErrorMessage.clear(); this->ErrorMessage.shrink_to_fit(); } } cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool) : Pool(pool) { // Initialize libuv loop uv_disable_stdio_inheritance(); #ifdef CMAKE_UV_SIGNAL_HACK UVHackRAII = cm::make_unique<cmUVSignalHackRAII>(); #endif this->UVLoop = cm::make_unique<uv_loop_t>(); uv_loop_init(this->UVLoop.get()); } cmWorkerPoolInternal::~cmWorkerPoolInternal() { uv_loop_close(this->UVLoop.get()); } bool cmWorkerPoolInternal::Process() { // Reset state flags this->Processing = true; this->Aborting = false; // Initialize libuv asynchronous request this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this); this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd, this); // Send begin request this->UVRequestBegin.send(); // Run libuv loop bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0); // Update state flags this->Processing = false; this->Aborting = false; return success; } void cmWorkerPoolInternal::Abort() { // Clear all jobs and set abort flag std::lock_guard<std::mutex> guard(this->Mutex); if (!this->Aborting) { // Register abort and clear queue this->Aborting = true; this->Queue.clear(); this->Condition.notify_all(); } } inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle) { std::lock_guard<std::mutex> guard(this->Mutex); if (this->Aborting) { return false; } // Append the job to the queue this->Queue.emplace_back(std::move(jobHandle)); // Notify an idle worker if there's one if (this->WorkersIdle != 0) { this->Condition.notify_one(); } // Return success return true; } void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle) { auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data); // Create worker threads { unsigned int const num = gint.Pool->ThreadCount(); // Create workers gint.Workers.reserve(num); for (unsigned int ii = 0; ii != num; ++ii) { gint.Workers.emplace_back( cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop)); } // Start worker threads for (unsigned int ii = 0; ii != num; ++ii) { gint.Workers[ii]->SetThread( std::thread(&cmWorkerPoolInternal::Work, &gint, ii)); } } // Destroy begin request gint.UVRequestBegin.reset(); } void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle) { auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data); // Join and destroy worker threads gint.Workers.clear(); // Destroy end request gint.UVRequestEnd.reset(); } void cmWorkerPoolInternal::Work(unsigned int workerIndex) { cmWorkerPool::JobHandleT jobHandle; std::unique_lock<std::mutex> uLock(this->Mutex); // Increment running workers count ++this->WorkersRunning; // Enter worker main loop while (true) { // Abort on request if (this->Aborting) { break; } // Wait for new jobs on the main CV if (this->Queue.empty()) { ++this->WorkersIdle; this->Condition.wait(uLock); --this->WorkersIdle; continue; } // If there is a fence currently active or waiting, // sleep on the main CV and try again. if (this->FenceProcessing) { this->Condition.wait(uLock); continue; } // Pop next job from queue jobHandle = std::move(this->Queue.front()); this->Queue.pop_front(); // Check for fence jobs bool raisedFence = false; if (jobHandle->IsFence()) { this->FenceProcessing = true; raisedFence = true; // Wait on the Fence CV until all pending jobs are done. while (this->JobsProcessing != 0 && !this->Aborting) { this->ConditionFence.wait(uLock); } // When aborting, explicitly kick all threads alive once more. if (this->Aborting) { this->FenceProcessing = false; this->Condition.notify_all(); break; } } // Unlocked scope for job processing ++this->JobsProcessing; { uLock.unlock(); jobHandle->Work(this->Pool, workerIndex); // Process job jobHandle.reset(); // Destroy job uLock.lock(); } --this->JobsProcessing; // If this was the thread that entered fence processing // originally, notify all idling workers that the fence // is done. if (raisedFence) { this->FenceProcessing = false; this->Condition.notify_all(); } // If fence processing is still not done, notify the // the fencing worker when all active jobs are done. if (this->FenceProcessing && this->JobsProcessing == 0) { this->ConditionFence.notify_all(); } } // Decrement running workers count if (--this->WorkersRunning == 0) { // Last worker thread about to finish. Send libuv event. this->UVRequestEnd.send(); } } cmWorkerPool::JobT::~JobT() = default; bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result, std::vector<std::string> const& command, std::string const& workingDirectory) { // Get worker by index auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get(); return wrk->RunProcess(result, command, workingDirectory); } cmWorkerPool::cmWorkerPool() : Int_(cm::make_unique<cmWorkerPoolInternal>(this)) { } cmWorkerPool::~cmWorkerPool() = default; void cmWorkerPool::SetThreadCount(unsigned int threadCount) { if (!this->Int_->Processing) { this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u; } } bool cmWorkerPool::Process(void* userData) { // Setup user data this->UserData_ = userData; // Run libuv loop bool success = this->Int_->Process(); // Clear user data this->UserData_ = nullptr; // Return return success; } bool cmWorkerPool::PushJob(JobHandleT&& jobHandle) { return this->Int_->PushJob(std::move(jobHandle)); } void cmWorkerPool::Abort() { this->Int_->Abort(); }