diff options
Diffstat (limited to 'Source/cmWorkerPool.h')
-rw-r--r-- | Source/cmWorkerPool.h | 219 |
1 files changed, 219 insertions, 0 deletions
diff --git a/Source/cmWorkerPool.h b/Source/cmWorkerPool.h new file mode 100644 index 0000000..71c7d84 --- /dev/null +++ b/Source/cmWorkerPool.h @@ -0,0 +1,219 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#ifndef cmWorkerPool_h +#define cmWorkerPool_h + +#include "cmConfigure.h" // IWYU pragma: keep + +#include "cmAlgorithms.h" // IWYU pragma: keep + +#include <memory> // IWYU pragma: keep +#include <stdint.h> +#include <string> +#include <utility> +#include <vector> + +// -- Types +class cmWorkerPoolInternal; + +/** @class cmWorkerPool + * @brief Thread pool with job queue + */ +class cmWorkerPool +{ +public: + /** + * Return value and output of an external process. + */ + struct ProcessResultT + { + void reset(); + bool error() const + { + return (ExitStatus != 0) || (TermSignal != 0) || !ErrorMessage.empty(); + } + + std::int64_t ExitStatus = 0; + int TermSignal = 0; + std::string StdOut; + std::string StdErr; + std::string ErrorMessage; + }; + + /** + * Abstract job class for concurrent job processing. + */ + class JobT + { + public: + JobT(JobT const&) = delete; + JobT& operator=(JobT const&) = delete; + + /** + * @brief Virtual destructor. + */ + virtual ~JobT(); + + /** + * @brief Fence job flag + * + * Fence jobs require that: + * - all jobs before in the queue have been processed + * - no jobs later in the queue will be processed before this job was + * processed + */ + bool IsFence() const { return Fence_; } + + protected: + /** + * @brief Protected default constructor + */ + JobT(bool fence = false) + : Fence_(fence) + { + } + + /** + * Abstract processing interface that must be implement in derived classes. + */ + virtual void Process() = 0; + + /** + * Get the worker pool. + * Only valid during the JobT::Process() call! + */ + cmWorkerPool* Pool() const { return Pool_; } + + /** + * Get the user data. + * Only valid during the JobT::Process() call! + */ + void* UserData() const { return Pool_->UserData(); }; + + /** + * Get the worker index. + * This is the index of the thread processing this job and is in the range + * [0..ThreadCount). + * Concurrently processing jobs will never have the same WorkerIndex(). + * Only valid during the JobT::Process() call! + */ + unsigned int WorkerIndex() const { return WorkerIndex_; } + + /** + * Run an external read only process. + * Use only during JobT::Process() call! + */ + bool RunProcess(ProcessResultT& result, + std::vector<std::string> const& command, + std::string const& workingDirectory); + + private: + //! Needs access to Work() + friend class cmWorkerPoolInternal; + //! Worker thread entry method. + void Work(cmWorkerPool* pool, unsigned int workerIndex) + { + Pool_ = pool; + WorkerIndex_ = workerIndex; + this->Process(); + } + + private: + cmWorkerPool* Pool_ = nullptr; + unsigned int WorkerIndex_ = 0; + bool Fence_ = false; + }; + + /** + * @brief Job handle type + */ + typedef std::unique_ptr<JobT> JobHandleT; + + /** + * @brief Fence job base class + */ + class JobFenceT : public JobT + { + public: + JobFenceT() + : JobT(true) + { + } + //! Does nothing + void Process() override{}; + }; + + /** + * @brief Fence job that aborts the worker pool. + * This class is useful as the last job in the job queue. + */ + class JobEndT : JobFenceT + { + public: + //! Does nothing + void Process() override { Pool()->Abort(); } + }; + +public: + // -- Methods + cmWorkerPool(); + ~cmWorkerPool(); + + /** + * @brief Blocking function that starts threads to process all Jobs in + * the queue. + * + * This method blocks until a job calls the Abort() method. + * @arg threadCount Number of threads to process jobs. + * @arg userData Common user data pointer available in all Jobs. + */ + bool Process(unsigned int threadCount, void* userData = nullptr); + + /** + * Number of worker threads passed to Process(). + * Only valid during Process(). + */ + unsigned int ThreadCount() const { return ThreadCount_; } + + /** + * User data reference passed to Process(). + * Only valid during Process(). + */ + void* UserData() const { return UserData_; } + + // -- Job processing interface + + /** + * @brief Clears the job queue and aborts all worker threads. + * + * This method is thread safe and can be called from inside a job. + */ + void Abort(); + + /** + * @brief Push job to the queue. + * + * This method is thread safe and can be called from inside a job or before + * Process(). + */ + bool PushJob(JobHandleT&& jobHandle); + + /** + * @brief Push job to the queue + * + * This method is thread safe and can be called from inside a job or before + * Process(). + */ + template <class T, typename... Args> + bool EmplaceJob(Args&&... args) + { + return PushJob(cm::make_unique<T>(std::forward<Args>(args)...)); + } + +private: + void* UserData_ = nullptr; + unsigned int ThreadCount_ = 0; + std::unique_ptr<cmWorkerPoolInternal> Int_; +}; + +#endif |