diff options
-rw-r--r-- | Source/cmQtAutoMocUic.cxx | 6 | ||||
-rw-r--r-- | Source/cmWorkerPool.cxx | 37 | ||||
-rw-r--r-- | Source/cmWorkerPool.h | 47 |
3 files changed, 51 insertions, 39 deletions
diff --git a/Source/cmQtAutoMocUic.cxx b/Source/cmQtAutoMocUic.cxx index 75c5d8a..005c27d 100644 --- a/Source/cmQtAutoMocUic.cxx +++ b/Source/cmQtAutoMocUic.cxx @@ -1186,6 +1186,7 @@ bool cmQtAutoMocUic::Init(cmMakefile* makefile) num = std::min<unsigned long>(num, ParallelMax); Base_.NumThreads = static_cast<unsigned int>(num); } + WorkerPool_.SetThreadCount(Base_.NumThreads); } // - Files and directories @@ -1482,15 +1483,12 @@ bool cmQtAutoMocUic::Process() if (!CreateDirectories()) { return false; } - - if (!WorkerPool_.Process(Base().NumThreads, this)) { + if (!WorkerPool_.Process(this)) { return false; } - if (JobError_) { return false; } - return SettingsFileWrite(); } diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx index 464182c..75ca47a 100644 --- a/Source/cmWorkerPool.cxx +++ b/Source/cmWorkerPool.cxx @@ -468,6 +468,7 @@ public: // -- Thread pool and job queue std::mutex Mutex; + bool Processing = false; bool Aborting = false; bool FenceProcessing = false; unsigned int WorkersRunning = 0; @@ -591,7 +592,8 @@ cmWorkerPoolInternal::~cmWorkerPoolInternal() bool cmWorkerPoolInternal::Process() { - // Reset state + // Reset state flags + Processing = true; Aborting = false; // Initialize libuv asynchronous request UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this); @@ -599,23 +601,27 @@ bool cmWorkerPoolInternal::Process() // Send begin request UVRequestBegin.send(); // Run libuv loop - return (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0); + bool success = (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0); + // Update state flags + Processing = false; + Aborting = false; + return success; } void cmWorkerPoolInternal::Abort() { - bool firstCall = false; + bool notifyThreads = false; // Clear all jobs and set abort flag { std::lock_guard<std::mutex> guard(Mutex); - if (!Aborting) { + if (Processing && !Aborting) { // Register abort and clear queue Aborting = true; Queue.clear(); - firstCall = true; + notifyThreads = true; } } - if (firstCall) { + if (notifyThreads) { // Wake threads Condition.notify_all(); } @@ -627,15 +633,13 @@ inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle) if (Aborting) { return false; } - // Append the job to the queue Queue.emplace_back(std::move(jobHandle)); - // Notify an idle worker if there's one if (WorkersIdle != 0) { Condition.notify_one(); } - + // Return success return true; } @@ -743,19 +747,22 @@ cmWorkerPool::cmWorkerPool() cmWorkerPool::~cmWorkerPool() = default; -bool cmWorkerPool::Process(unsigned int threadCount, void* userData) +void cmWorkerPool::SetThreadCount(unsigned int threadCount) +{ + if (!Int_->Processing) { + ThreadCount_ = (threadCount > 0) ? threadCount : 1u; + } +} + +bool cmWorkerPool::Process(void* userData) { // Setup user data UserData_ = userData; - ThreadCount_ = (threadCount > 0) ? threadCount : 1u; - // Run libuv loop bool success = Int_->Process(); - // Clear user data UserData_ = nullptr; - ThreadCount_ = 0; - + // Return return success; } diff --git a/Source/cmWorkerPool.h b/Source/cmWorkerPool.h index 71c7d84..f08bb4f 100644 --- a/Source/cmWorkerPool.h +++ b/Source/cmWorkerPool.h @@ -50,12 +50,12 @@ public: JobT& operator=(JobT const&) = delete; /** - * @brief Virtual destructor. + * Virtual destructor. */ virtual ~JobT(); /** - * @brief Fence job flag + * Fence job flag * * Fence jobs require that: * - all jobs before in the queue have been processed @@ -66,7 +66,7 @@ public: protected: /** - * @brief Protected default constructor + * Protected default constructor */ JobT(bool fence = false) : Fence_(fence) @@ -125,12 +125,12 @@ public: }; /** - * @brief Job handle type + * Job handle type */ typedef std::unique_ptr<JobT> JobHandleT; /** - * @brief Fence job base class + * Fence job base class */ class JobFenceT : public JobT { @@ -144,8 +144,9 @@ public: }; /** - * @brief Fence job that aborts the worker pool. - * This class is useful as the last job in the job queue. + * Fence job that aborts the worker pool. + * + * Useful as the last job in the job queue. */ class JobEndT : JobFenceT { @@ -160,23 +161,29 @@ public: ~cmWorkerPool(); /** - * @brief Blocking function that starts threads to process all Jobs in - * the queue. + * Number of worker threads. + */ + unsigned int ThreadCount() const { return ThreadCount_; } + + /** + * Set the number of worker threads. * - * This method blocks until a job calls the Abort() method. - * @arg threadCount Number of threads to process jobs. - * @arg userData Common user data pointer available in all Jobs. + * Calling this method during Process() has no effect. */ - bool Process(unsigned int threadCount, void* userData = nullptr); + void SetThreadCount(unsigned int threadCount); /** - * Number of worker threads passed to Process(). - * Only valid during Process(). + * Blocking function that starts threads to process all Jobs in the queue. + * + * This method blocks until a job calls the Abort() method. + * @arg threadCount Number of threads to process jobs. + * @arg userData Common user data pointer available in all Jobs. */ - unsigned int ThreadCount() const { return ThreadCount_; } + bool Process(void* userData = nullptr); /** * User data reference passed to Process(). + * * Only valid during Process(). */ void* UserData() const { return UserData_; } @@ -184,14 +191,14 @@ public: // -- Job processing interface /** - * @brief Clears the job queue and aborts all worker threads. + * Clears the job queue and aborts all worker threads. * * This method is thread safe and can be called from inside a job. */ void Abort(); /** - * @brief Push job to the queue. + * Push job to the queue. * * This method is thread safe and can be called from inside a job or before * Process(). @@ -199,7 +206,7 @@ public: bool PushJob(JobHandleT&& jobHandle); /** - * @brief Push job to the queue + * Push job to the queue * * This method is thread safe and can be called from inside a job or before * Process(). @@ -212,7 +219,7 @@ public: private: void* UserData_ = nullptr; - unsigned int ThreadCount_ = 0; + unsigned int ThreadCount_ = 1; std::unique_ptr<cmWorkerPoolInternal> Int_; }; |