diff options
author | Brad King <brad.king@kitware.com> | 2023-12-04 14:47:05 (GMT) |
---|---|---|
committer | Kitware Robot <kwrobot@kitware.com> | 2023-12-04 14:47:25 (GMT) |
commit | ef89ad6c5562cca1911146834b6241e7c4c54653 (patch) | |
tree | de32c80c2b477f4d54e4dd84b32d9532073115a9 | |
parent | 9cb1471ff60fe2bbd1fac4f09281aebe572b4d2f (diff) | |
parent | 80fe56c481a5e69cd5ee4fbbbd77266579f96d98 (diff) | |
download | CMake-ef89ad6c5562cca1911146834b6241e7c4c54653.zip CMake-ef89ad6c5562cca1911146834b6241e7c4c54653.tar.gz CMake-ef89ad6c5562cca1911146834b6241e7c4c54653.tar.bz2 |
Merge topic 'ctest-jobserver-client'
80fe56c481 ctest: Add support for running under a make job server on POSIX systems
5396f4a9a3 cmUVJobServerClient: Add libuv-based job server integration client
Acked-by: Kitware Robot <kwrobot@kitware.com>
Tested-by: buildbot <buildbot@kitware.com>
Merge-request: !9021
-rw-r--r-- | Help/manual/CTEST_EXAMPLE_MAKEFILE_JOB_SERVER.make | 2 | ||||
-rw-r--r-- | Help/manual/ctest.1.rst | 25 | ||||
-rw-r--r-- | Help/release/dev/ctest-jobserver-client.rst | 5 | ||||
-rw-r--r-- | Source/CMakeLists.txt | 3 | ||||
-rw-r--r-- | Source/CTest/cmCTestMultiProcessHandler.cxx | 33 | ||||
-rw-r--r-- | Source/CTest/cmCTestMultiProcessHandler.h | 10 | ||||
-rw-r--r-- | Source/CTest/cmUVJobServerClient.cxx | 518 | ||||
-rw-r--r-- | Source/CTest/cmUVJobServerClient.h | 96 | ||||
-rw-r--r-- | Tests/CMakeLib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Tests/CMakeLib/testUVJobServerClient.cxx | 179 | ||||
-rw-r--r-- | Tests/RunCMake/Make/CTestJobServer-NoPipe-j2-stdout.txt | 9 | ||||
-rw-r--r-- | Tests/RunCMake/Make/CTestJobServer-NoTests-j2-stderr.txt | 1 | ||||
-rw-r--r-- | Tests/RunCMake/Make/CTestJobServer-NoTests-j2-stdout.txt | 3 | ||||
-rw-r--r-- | Tests/RunCMake/Make/CTestJobServer-Tests-j2-stdout.txt | 6 | ||||
-rw-r--r-- | Tests/RunCMake/Make/CTestJobServer-Tests-j3-stdout.txt | 7 | ||||
-rw-r--r-- | Tests/RunCMake/Make/CTestJobServer.cmake | 4 | ||||
-rw-r--r-- | Tests/RunCMake/Make/CTestJobServer.make | 11 | ||||
-rw-r--r-- | Tests/RunCMake/Make/RunCMakeTest.cmake | 20 |
18 files changed, 933 insertions, 0 deletions
diff --git a/Help/manual/CTEST_EXAMPLE_MAKEFILE_JOB_SERVER.make b/Help/manual/CTEST_EXAMPLE_MAKEFILE_JOB_SERVER.make new file mode 100644 index 0000000..a17673a --- /dev/null +++ b/Help/manual/CTEST_EXAMPLE_MAKEFILE_JOB_SERVER.make @@ -0,0 +1,2 @@ +test: + +ctest -j 8 diff --git a/Help/manual/ctest.1.rst b/Help/manual/ctest.1.rst index 9f7c72e..b519ccf 100644 --- a/Help/manual/ctest.1.rst +++ b/Help/manual/ctest.1.rst @@ -1841,6 +1841,31 @@ fixture in their :prop_test:`FIXTURES_REQUIRED`, and a resource spec file may not be specified with the ``--resource-spec-file`` argument or the :variable:`CTEST_RESOURCE_SPEC_FILE` variable. +.. _`ctest-job-server-integration`: + +Job Server Integration +====================== + +.. versionadded:: 3.29 + +On POSIX systems, when running under the context of a `Job Server`_, +CTest shares its job slots. This is independent of the :prop_test:`PROCESSORS` +test property, which still counts against CTest's :option:`-j <ctest -j>` +parallel level. CTest acquires exactly one token from the job server before +running each test, and returns it when the test finishes. + +For example, consider the ``Makefile``: + +.. literalinclude:: CTEST_EXAMPLE_MAKEFILE_JOB_SERVER.make + :language: make + +When invoked via ``make -j 2 test``, ``ctest`` connects to the job server, +acquires a token for each test, and runs at most 2 tests concurrently. + +On Windows systems, job server integration is not yet implemented. + +.. _`Job Server`: https://www.gnu.org/software/make/manual/html_node/Job-Slots.html + See Also ======== diff --git a/Help/release/dev/ctest-jobserver-client.rst b/Help/release/dev/ctest-jobserver-client.rst new file mode 100644 index 0000000..37e22c0 --- /dev/null +++ b/Help/release/dev/ctest-jobserver-client.rst @@ -0,0 +1,5 @@ +ctest-jobserver-client +---------------------- + +* :manual:`ctest(1)` now supports :ref:`job server integration + <ctest-job-server-integration>` on POSIX systems. diff --git a/Source/CMakeLists.txt b/Source/CMakeLists.txt index eafb3b8..8c57762 100644 --- a/Source/CMakeLists.txt +++ b/Source/CMakeLists.txt @@ -1091,6 +1091,9 @@ add_library( CTest/cmCTestP4.cxx CTest/cmCTestP4.h + CTest/cmUVJobServerClient.cxx + CTest/cmUVJobServerClient.h + LexerParser/cmCTestResourceGroupsLexer.cxx LexerParser/cmCTestResourceGroupsLexer.h LexerParser/cmCTestResourceGroupsLexer.in.l diff --git a/Source/CTest/cmCTestMultiProcessHandler.cxx b/Source/CTest/cmCTestMultiProcessHandler.cxx index be210f4..7b72f30 100644 --- a/Source/CTest/cmCTestMultiProcessHandler.cxx +++ b/Source/CTest/cmCTestMultiProcessHandler.cxx @@ -40,6 +40,7 @@ #include "cmRange.h" #include "cmStringAlgorithms.h" #include "cmSystemTools.h" +#include "cmUVJobServerClient.h" #include "cmWorkingDirectory.h" namespace cmsys { @@ -130,10 +131,19 @@ void cmCTestMultiProcessHandler::InitializeLoop() this->Loop.init(); this->StartNextTestsOnIdle_.init(*this->Loop, this); this->StartNextTestsOnTimer_.init(*this->Loop, this); + + this->JobServerClient = cmUVJobServerClient::Connect( + *this->Loop, /*onToken=*/[this]() { this->JobServerReceivedToken(); }, + /*onDisconnect=*/nullptr); + if (this->JobServerClient) { + cmCTestLog(this->CTest, OUTPUT, + "Connected to MAKE jobserver" << std::endl); + } } void cmCTestMultiProcessHandler::FinalizeLoop() { + this->JobServerClient.reset(); this->StartNextTestsOnTimer_.reset(); this->StartNextTestsOnIdle_.reset(); this->Loop.reset(); @@ -461,6 +471,26 @@ std::string cmCTestMultiProcessHandler::GetName(int test) void cmCTestMultiProcessHandler::StartTest(int test) { + if (this->JobServerClient) { + // There is a job server. Request a token and queue the test to run + // when a token is received. Note that if we do not get a token right + // away it's possible that the system load will be higher when the + // token is received and we may violate the test-load limit. However, + // this is unlikely because if we do not get a token right away, some + // other job that's currently running must finish before we get one. + this->JobServerClient->RequestToken(); + this->JobServerQueuedTests.emplace_back(test); + } else { + // There is no job server. Start the test now. + this->StartTestProcess(test); + } +} + +void cmCTestMultiProcessHandler::JobServerReceivedToken() +{ + assert(!this->JobServerQueuedTests.empty()); + int test = this->JobServerQueuedTests.front(); + this->JobServerQueuedTests.pop_front(); this->StartTestProcess(test); } @@ -692,6 +722,9 @@ void cmCTestMultiProcessHandler::FinishTestProcess( runner.reset(); + if (this->JobServerClient) { + this->JobServerClient->ReleaseToken(); + } this->StartNextTestsOnIdle(); } diff --git a/Source/CTest/cmCTestMultiProcessHandler.h b/Source/CTest/cmCTestMultiProcessHandler.h index 1be04aa..02589ca 100644 --- a/Source/CTest/cmCTestMultiProcessHandler.h +++ b/Source/CTest/cmCTestMultiProcessHandler.h @@ -19,6 +19,7 @@ #include "cmCTestResourceSpec.h" #include "cmCTestTestHandler.h" #include "cmUVHandlePtr.h" +#include "cmUVJobServerClient.h" struct cmCTestBinPackerAllocation; class cmCTestRunTest; @@ -204,6 +205,15 @@ protected: cmCTestResourceAllocator ResourceAllocator; std::vector<cmCTestTestHandler::cmCTestTestResult>* TestResults; size_t ParallelLevel; // max number of process that can be run at once + + // 'make' jobserver client. If connected, we acquire a token + // for each test before running its process. + cm::optional<cmUVJobServerClient> JobServerClient; + // List of tests that are queued to run when a token is available. + std::list<int> JobServerQueuedTests; + // Callback invoked when a token is received. + void JobServerReceivedToken(); + unsigned long TestLoad; unsigned long FakeLoadForTesting; cm::uv_loop_ptr Loop; diff --git a/Source/CTest/cmUVJobServerClient.cxx b/Source/CTest/cmUVJobServerClient.cxx new file mode 100644 index 0000000..d7d76c9 --- /dev/null +++ b/Source/CTest/cmUVJobServerClient.cxx @@ -0,0 +1,518 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#include "cmUVJobServerClient.h" + +#include <cassert> +#include <utility> + +#ifndef _WIN32 +# include <cstdio> +# include <string> +# include <vector> + +# include <fcntl.h> +# include <unistd.h> +#endif + +#include <cm/memory> +#include <cm/optional> +#include <cm/string_view> + +#include "cmRange.h" +#include "cmStringAlgorithms.h" +#include "cmSystemTools.h" +#include "cmUVHandlePtr.h" + +class cmUVJobServerClient::Impl +{ +public: + uv_loop_t& Loop; + + cm::uv_idle_ptr ImplicitToken; + std::function<void()> OnToken; + std::function<void(int)> OnDisconnect; + + // The number of tokens held by this client. + unsigned int HeldTokens = 0; + + // The number of tokens we need to receive from the job server. + unsigned int NeedTokens = 0; + + Impl(uv_loop_t& loop); + virtual ~Impl(); + + virtual void SendToken() = 0; + virtual void StartReceivingTokens() = 0; + virtual void StopReceivingTokens() = 0; + + void RequestToken(); + void ReleaseToken(); + void RequestExplicitToken(); + void DecrementNeedTokens(); + void HoldToken(); + void RequestImplicitToken(); + void ReleaseImplicitToken(); + void ReceivedToken(); + void Disconnected(int status); +}; + +cmUVJobServerClient::Impl::Impl(uv_loop_t& loop) + : Loop(loop) +{ + this->ImplicitToken.init(this->Loop, this); +} + +cmUVJobServerClient::Impl::~Impl() = default; + +void cmUVJobServerClient::Impl::RequestToken() +{ + if (this->HeldTokens == 0 && !uv_is_active(this->ImplicitToken)) { + this->RequestImplicitToken(); + } else { + this->RequestExplicitToken(); + } +} + +void cmUVJobServerClient::Impl::ReleaseToken() +{ + assert(this->HeldTokens > 0); + --this->HeldTokens; + if (this->HeldTokens == 0) { + // This was the token implicitly owned by our process. + this->ReleaseImplicitToken(); + } else { + // This was a token we received from the job server. Send it back. + this->SendToken(); + } +} + +void cmUVJobServerClient::Impl::RequestExplicitToken() +{ + ++this->NeedTokens; + this->StartReceivingTokens(); +} + +void cmUVJobServerClient::Impl::DecrementNeedTokens() +{ + assert(this->NeedTokens > 0); + --this->NeedTokens; + if (this->NeedTokens == 0) { + this->StopReceivingTokens(); + } +} + +void cmUVJobServerClient::Impl::HoldToken() +{ + ++this->HeldTokens; + if (this->OnToken) { + this->OnToken(); + } else { + this->ReleaseToken(); + } +} + +void cmUVJobServerClient::Impl::RequestImplicitToken() +{ + assert(this->HeldTokens == 0); + this->ImplicitToken.start([](uv_idle_t* handle) { + uv_idle_stop(handle); + auto* self = static_cast<Impl*>(handle->data); + self->HoldToken(); + }); +} + +void cmUVJobServerClient::Impl::ReleaseImplicitToken() +{ + assert(this->HeldTokens == 0); + // Use the implicit token in place of receiving one from the job server. + if (this->NeedTokens > 0) { + this->DecrementNeedTokens(); + this->RequestImplicitToken(); + } +} + +void cmUVJobServerClient::Impl::ReceivedToken() +{ + this->DecrementNeedTokens(); + this->HoldToken(); +} + +void cmUVJobServerClient::Impl::Disconnected(int status) +{ + if (this->OnDisconnect) { + this->OnDisconnect(status); + } +} + +//--------------------------------------------------------------------------- +// Implementation on POSIX platforms. +// https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html + +#ifndef _WIN32 +namespace { +class ImplPosix : public cmUVJobServerClient::Impl +{ +public: + enum class Connection + { + None, + FDs, + FIFO, + }; + Connection Conn = Connection::None; + + cm::uv_pipe_ptr ConnRead; + cm::uv_pipe_ptr ConnWrite; + cm::uv_pipe_ptr ConnFIFO; + + std::shared_ptr<std::function<void(int)>> OnWrite; + + void Connect(); + void ConnectFDs(int rfd, int wfd); + void ConnectFIFO(const char* path); + void Disconnect(int status); + + cm::uv_pipe_ptr OpenFD(int fd); + + uv_stream_t* GetWriter() const; + uv_stream_t* GetReader() const; + + static void OnAllocateCB(uv_handle_t* handle, size_t suggested_size, + uv_buf_t* buf); + static void OnReadCB(uv_stream_t* stream, ssize_t nread, + const uv_buf_t* buf); + + void OnAllocate(size_t suggested_size, uv_buf_t* buf); + void OnRead(ssize_t nread, const uv_buf_t* buf); + + char ReadBuf = '.'; + + bool ReceivingTokens = false; + + bool IsConnected() const; + + void SendToken() override; + void StartReceivingTokens() override; + void StopReceivingTokens() override; + + ImplPosix(uv_loop_t& loop); + ~ImplPosix() override; +}; + +ImplPosix::ImplPosix(uv_loop_t& loop) + : Impl(loop) + , OnWrite(std::make_shared<std::function<void(int)>>([this](int status) { + if (status != 0) { + this->Disconnect(status); + } + })) +{ + this->Connect(); +} + +ImplPosix::~ImplPosix() +{ + this->Disconnect(0); +} + +void ImplPosix::Connect() +{ + // --jobserver-auth= for gnu make versions >= 4.2 + // --jobserver-fds= for gnu make versions < 4.2 + // -J for bsd make + static const std::vector<cm::string_view> prefixes = { + "--jobserver-auth=", "--jobserver-fds=", "-J" + }; + + cm::optional<std::string> makeflags = cmSystemTools::GetEnvVar("MAKEFLAGS"); + if (!makeflags) { + return; + } + + // Look for the *last* occurrence of jobserver flags. + cm::optional<std::string> auth; + std::vector<std::string> args; + cmSystemTools::ParseUnixCommandLine(makeflags->c_str(), args); + for (cm::string_view arg : cmReverseRange(args)) { + for (cm::string_view prefix : prefixes) { + if (cmHasPrefix(arg, prefix)) { + auth = cmTrimWhitespace(arg.substr(prefix.length())); + break; + } + } + if (auth) { + break; + } + } + + if (!auth) { + return; + } + + // fifo:PATH + if (cmHasLiteralPrefix(*auth, "fifo:")) { + ConnectFIFO(auth->substr(cmStrLen("fifo:")).c_str()); + return; + } + + // reader,writer + int reader; + int writer; + if (std::sscanf(auth->c_str(), "%d,%d", &reader, &writer) == 2) { + ConnectFDs(reader, writer); + } +} + +cm::uv_pipe_ptr ImplPosix::OpenFD(int fd) +{ + // Create a CLOEXEC duplicate so `uv_pipe_ptr` can close it + // without closing the original file descriptor, which our + // child processes might want to use too. + cm::uv_pipe_ptr p; + int fd_dup = dup(fd); + if (fd_dup < 0) { + return p; + } + if (fcntl(fd_dup, F_SETFD, FD_CLOEXEC) == -1) { + close(fd_dup); + return p; + } + p.init(this->Loop, 0, this); + if (uv_pipe_open(p, fd_dup) < 0) { + close(fd_dup); + } + return p; +} + +void ImplPosix::ConnectFDs(int rfd, int wfd) +{ + cm::uv_pipe_ptr connRead = this->OpenFD(rfd); + cm::uv_pipe_ptr connWrite = this->OpenFD(wfd); + + // Verify that the read end is readable and the write end is writable. + if (!connRead || !uv_is_readable(connRead) || // + !connWrite || !uv_is_writable(connWrite)) { + return; + } + + this->ConnRead = std::move(connRead); + this->ConnWrite = std::move(connWrite); + this->Conn = Connection::FDs; +} + +void ImplPosix::ConnectFIFO(const char* path) +{ + int fd = open(path, O_RDWR); + if (fd < 0) { + return; + } + + cm::uv_pipe_ptr connFIFO; + connFIFO.init(this->Loop, 0, this); + if (uv_pipe_open(connFIFO, fd) != 0) { + close(fd); + return; + } + + // Verify that the fifo is both readable and writable. + if (!connFIFO || !uv_is_readable(connFIFO) || !uv_is_writable(connFIFO)) { + return; + } + + this->ConnFIFO = std::move(connFIFO); + this->Conn = Connection::FIFO; +} + +void ImplPosix::Disconnect(int status) +{ + if (this->Conn == Connection::None) { + return; + } + + this->StopReceivingTokens(); + + switch (this->Conn) { + case Connection::FDs: + this->ConnRead.reset(); + this->ConnWrite.reset(); + break; + case Connection::FIFO: + this->ConnFIFO.reset(); + break; + default: + break; + } + + this->Conn = Connection::None; + if (status != 0) { + this->Disconnected(status); + } +} + +uv_stream_t* ImplPosix::GetWriter() const +{ + switch (this->Conn) { + case Connection::FDs: + return this->ConnWrite; + case Connection::FIFO: + return this->ConnFIFO; + default: + return nullptr; + } +} + +uv_stream_t* ImplPosix::GetReader() const +{ + switch (this->Conn) { + case Connection::FDs: + return this->ConnRead; + case Connection::FIFO: + return this->ConnFIFO; + default: + return nullptr; + } +} + +void ImplPosix::OnAllocateCB(uv_handle_t* handle, size_t suggested_size, + uv_buf_t* buf) +{ + auto* self = static_cast<ImplPosix*>(handle->data); + self->OnAllocate(suggested_size, buf); +} + +void ImplPosix::OnReadCB(uv_stream_t* stream, ssize_t nread, + const uv_buf_t* buf) +{ + auto* self = static_cast<ImplPosix*>(stream->data); + self->OnRead(nread, buf); +} + +void ImplPosix::OnAllocate(size_t /*suggested_size*/, uv_buf_t* buf) +{ + *buf = uv_buf_init(&this->ReadBuf, 1); +} + +void ImplPosix::OnRead(ssize_t nread, const uv_buf_t* /*buf*/) +{ + if (nread == 0) { + return; + } + + if (nread < 0) { + auto status = static_cast<int>(nread); + this->Disconnect(status); + return; + } + + assert(nread == 1); + this->ReceivedToken(); +} + +bool ImplPosix::IsConnected() const +{ + return this->Conn != Connection::None; +} + +void ImplPosix::SendToken() +{ + if (this->Conn == Connection::None) { + return; + } + + static char token = '.'; + + uv_buf_t const buf = uv_buf_init(&token, sizeof(token)); + int status = cm::uv_write(this->GetWriter(), &buf, 1, this->OnWrite); + if (status != 0) { + this->Disconnect(status); + } +} + +void ImplPosix::StartReceivingTokens() +{ + if (this->Conn == Connection::None) { + return; + } + if (this->ReceivingTokens) { + return; + } + + int status = uv_read_start(this->GetReader(), &ImplPosix::OnAllocateCB, + &ImplPosix::OnReadCB); + if (status != 0) { + this->Disconnect(status); + return; + } + + this->ReceivingTokens = true; +} + +void ImplPosix::StopReceivingTokens() +{ + if (this->Conn == Connection::None) { + return; + } + if (!this->ReceivingTokens) { + return; + } + + this->ReceivingTokens = false; + uv_read_stop(this->GetReader()); +} +} +#endif + +//--------------------------------------------------------------------------- +// Implementation of public interface. + +cmUVJobServerClient::cmUVJobServerClient(std::unique_ptr<Impl> impl) + : Impl_(std::move(impl)) +{ +} + +cmUVJobServerClient::~cmUVJobServerClient() = default; + +cmUVJobServerClient::cmUVJobServerClient(cmUVJobServerClient&&) noexcept = + default; +cmUVJobServerClient& cmUVJobServerClient::operator=( + cmUVJobServerClient&&) noexcept = default; + +void cmUVJobServerClient::RequestToken() +{ + this->Impl_->RequestToken(); +} + +void cmUVJobServerClient::ReleaseToken() +{ + this->Impl_->ReleaseToken(); +} + +int cmUVJobServerClient::GetHeldTokens() const +{ + return this->Impl_->HeldTokens; +} + +int cmUVJobServerClient::GetNeedTokens() const +{ + return this->Impl_->NeedTokens; +} + +cm::optional<cmUVJobServerClient> cmUVJobServerClient::Connect( + uv_loop_t& loop, std::function<void()> onToken, + std::function<void(int)> onDisconnect) +{ +#if defined(_WIN32) + // FIXME: Windows job server client not yet implemented. + static_cast<void>(loop); + static_cast<void>(onToken); + static_cast<void>(onDisconnect); +#else + auto impl = cm::make_unique<ImplPosix>(loop); + if (impl && impl->IsConnected()) { + impl->OnToken = std::move(onToken); + impl->OnDisconnect = std::move(onDisconnect); + return cmUVJobServerClient(std::move(impl)); + } +#endif + return cm::nullopt; +} diff --git a/Source/CTest/cmUVJobServerClient.h b/Source/CTest/cmUVJobServerClient.h new file mode 100644 index 0000000..bbb5f08 --- /dev/null +++ b/Source/CTest/cmUVJobServerClient.h @@ -0,0 +1,96 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#pragma once + +#include "cmConfigure.h" // IWYU pragma: keep + +#include <functional> +#include <memory> + +#include <cm/optional> + +#include <cm3p/uv.h> + +/** \class cmUVJobServerClient + * \brief Job server client that can integrate with a libuv event loop. + * + * Use the \a Connect method to connect to an ambient job server as + * described by the MAKEFLAGS environment variable, if any. Request + * a token using the \a RequestToken method. The \a onToken callback + * will be invoked asynchronously when the token is received. Act + * on the token, and then use \a ReleaseToken to release it. + * + * The job server protocol states that a client process implicitly + * has one free token available, corresponding to the token its + * parent used to start it. \a cmUVJobServerClient will use the + * implicit token whenever it is available instead of requesting + * an explicit token from the job server. However, clients of + * this class must still request and receive the token before + * acting on it, and cannot assume that it is always held. + * + * If the job server connection breaks, \a onDisconnect will be + * called with the libuv error. No further tokens can be received + * from the job server, but progress can still be made serially + * using the implicit token. + */ +class cmUVJobServerClient +{ +public: + class Impl; + +private: + std::unique_ptr<Impl> Impl_; + + cmUVJobServerClient(std::unique_ptr<Impl> impl); + +public: + /** + * Disconnect from the job server. + */ + ~cmUVJobServerClient(); + + cmUVJobServerClient(cmUVJobServerClient&&) noexcept; + cmUVJobServerClient(cmUVJobServerClient const&) = delete; + cmUVJobServerClient& operator=(cmUVJobServerClient&&) noexcept; + cmUVJobServerClient& operator=(cmUVJobServerClient const&) = delete; + + /** + * Request a token from the job server. + * When the token is held, the \a onToken callback will be invoked. + */ + void RequestToken(); + + /** + * Release a token to the job server. + * This may be called only after a corresponding \a onToken callback. + */ + void ReleaseToken(); + + /** + * Get the number of implicit and explicit tokens currently held. + * This is the number of times \a onToken has been called but not + * yet followed by a call to \a ReleaseToken. + * This is meant for testing and debugging. + */ + int GetHeldTokens() const; + + /** + * Get the number of explicit tokens currently requested from the + * job server but not yet received. If the implicit token becomes + * available, it is used in place of a requested token, and this + * is decremented without receiving an explicit token. + * This is meant for testing and debugging. + */ + int GetNeedTokens() const; + + /** + * Connect to an ambient job server, if any. + * \param loop The libuv event loop on which to schedule events. + * \param onToken Function to call when a new token is held. + * \param onDisconnect Function to call on disconnect, with libuv error. + * \returns Connected instance, or cm::nullopt. + */ + static cm::optional<cmUVJobServerClient> Connect( + uv_loop_t& loop, std::function<void()> onToken, + std::function<void(int)> onDisconnect); +}; diff --git a/Tests/CMakeLib/CMakeLists.txt b/Tests/CMakeLib/CMakeLists.txt index 979d449..2c1d22b 100644 --- a/Tests/CMakeLib/CMakeLists.txt +++ b/Tests/CMakeLib/CMakeLists.txt @@ -26,6 +26,7 @@ set(CMakeLib_TESTS testXMLSafe.cxx testFindPackageCommand.cxx testUVHandlePtr.cxx + testUVJobServerClient.cxx testUVProcessChain.cxx testUVRAII.cxx testUVStreambuf.cxx diff --git a/Tests/CMakeLib/testUVJobServerClient.cxx b/Tests/CMakeLib/testUVJobServerClient.cxx new file mode 100644 index 0000000..13f0f40 --- /dev/null +++ b/Tests/CMakeLib/testUVJobServerClient.cxx @@ -0,0 +1,179 @@ +#include <cassert> +#include <cstddef> +#include <deque> +#include <iostream> +#include <vector> + +#include <cm/optional> + +#include <cm3p/uv.h> + +#ifndef _WIN32 +# include <unistd.h> +#endif + +#include "cmGetPipes.h" +#include "cmStringAlgorithms.h" +#include "cmSystemTools.h" +#include "cmUVHandlePtr.h" +#include "cmUVJobServerClient.h" + +namespace { + +const std::size_t kTOTAL_JOBS = 10; +const std::size_t kTOTAL_TOKENS = 3; + +struct Job +{ + cm::uv_timer_ptr Timer; +}; + +struct JobRunner +{ + cm::uv_loop_ptr Loop; + cm::optional<cmUVJobServerClient> JSC; + std::vector<Job> Jobs; + std::size_t NextJobIndex = 0; + + std::size_t ActiveJobs = 0; + + std::deque<std::size_t> Queue; + + bool Okay = true; + + JobRunner() + : Jobs(kTOTAL_JOBS) + { + this->Loop.init(nullptr); + this->JSC = cmUVJobServerClient::Connect( + *this->Loop, [this]() { this->StartQueuedJob(); }, nullptr); + if (!this->JSC) { + std::cerr << "Failed to connect to job server.\n"; + this->Okay = false; + } + } + + ~JobRunner() {} + + bool Run() + { + if (this->Okay) { + this->QueueNextJobs(); + uv_run(this->Loop, UV_RUN_DEFAULT); + std::cerr << "HeldTokens: " << this->JSC->GetHeldTokens() << '\n'; + std::cerr << "NeedTokens: " << this->JSC->GetNeedTokens() << '\n'; + } +#ifdef _WIN32 + // FIXME: Windows job server client not yet implemented. + return true; +#else + return this->Okay; +#endif + } + + void QueueNextJobs() + { + std::cerr << "QueueNextJobs()\n"; + std::size_t queued = 0; + while (queued < 2 && this->NextJobIndex < this->Jobs.size()) { + this->QueueJob(this->NextJobIndex); + ++this->NextJobIndex; + ++queued; + } + std::cerr << "QueueNextJobs done\n"; + } + + void StartQueuedJob() + { + std::cerr << "StartQueuedJob()\n"; + assert(!this->Queue.empty()); + + std::size_t index = this->Queue.front(); + this->Queue.pop_front(); + this->StartJob(index); + + std::cerr << "StartQueuedJob done\n"; + } + + void StartJob(std::size_t index) + { + cm::uv_timer_ptr& job = this->Jobs[index].Timer; + job.init(*this->Loop, this); + uv_timer_start( + job, + [](uv_timer_t* handle) { + uv_timer_stop(handle); + auto self = static_cast<JobRunner*>(handle->data); + self->FinishJob(); + }, + /*timeout_ms=*/10 * (1 + (index % 3)), /*repeat_ms=*/0); + ++this->ActiveJobs; + std::cerr << " StartJob(" << index + << "): Active jobs: " << this->ActiveJobs << '\n'; + + if (this->ActiveJobs > kTOTAL_TOKENS) { + std::cerr << "Started more than " << kTOTAL_TOKENS << " jobs at once!\n"; + this->Okay = false; + return; + } + } + + void QueueJob(std::size_t index) + { + this->JSC->RequestToken(); + this->Queue.push_back(index); + std::cerr << " QueueJob(" << index + << "): Queue length: " << this->Queue.size() << '\n'; + } + + void FinishJob() + { + --this->ActiveJobs; + std::cerr << "FinishJob: Active jobs: " << this->ActiveJobs << '\n'; + + this->JSC->ReleaseToken(); + this->QueueNextJobs(); + } +}; + +bool testJobServer() +{ +#ifdef _WIN32 + // FIXME: Windows job server client not yet implemented. +#else + // Create a job server pipe. + int jobServerPipe[2]; + if (cmGetPipes(jobServerPipe) < 0) { + std::cerr << "Failed to create job server pipe\n"; + return false; + } + + // Write N-1 tokens to the pipe. + std::vector<char> jobServerInit(kTOTAL_TOKENS - 1, '.'); + if (write(jobServerPipe[1], jobServerInit.data(), jobServerInit.size()) != + kTOTAL_TOKENS - 1) { + std::cerr << "Failed to initialize job server pipe\n"; + return false; + } + + // Establish the job server client context. + // Add a bogus server spec to verify we use the last spec. + cmSystemTools::PutEnv(cmStrCat("MAKEFLAGS=--flags-before" + " --jobserver-auth=bogus" + " --flags-between" + " --jobserver-fds=", + jobServerPipe[0], ',', jobServerPipe[1], + " --flags-after")); +#endif + + JobRunner jobRunner; + return jobRunner.Run(); +} +} + +int testUVJobServerClient(int, char** const) +{ + bool passed = true; + passed = testJobServer() && passed; + return passed ? 0 : -1; +} diff --git a/Tests/RunCMake/Make/CTestJobServer-NoPipe-j2-stdout.txt b/Tests/RunCMake/Make/CTestJobServer-NoPipe-j2-stdout.txt new file mode 100644 index 0000000..579c722 --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer-NoPipe-j2-stdout.txt @@ -0,0 +1,9 @@ +Test project [^ +]*/Tests/RunCMake/Make/CTestJobServer-build + Start [0-9]+: test[0-9]+ + Start [0-9]+: test[0-9]+ + Start [0-9]+: test[0-9]+ + Start [0-9]+: test[0-9]+ + Start [0-9]+: test[0-9]+ + Start [0-9]+: test[0-9]+ +1/6 Test #[0-9]+: test[0-9]+ ............................ Passed +[0-9.]+ sec diff --git a/Tests/RunCMake/Make/CTestJobServer-NoTests-j2-stderr.txt b/Tests/RunCMake/Make/CTestJobServer-NoTests-j2-stderr.txt new file mode 100644 index 0000000..eafba1c --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer-NoTests-j2-stderr.txt @@ -0,0 +1 @@ +No tests were found!!! diff --git a/Tests/RunCMake/Make/CTestJobServer-NoTests-j2-stdout.txt b/Tests/RunCMake/Make/CTestJobServer-NoTests-j2-stdout.txt new file mode 100644 index 0000000..0547dc7 --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer-NoTests-j2-stdout.txt @@ -0,0 +1,3 @@ +Test project [^ +]*/Tests/RunCMake/Make/CTestJobServer-build +Connected to MAKE jobserver diff --git a/Tests/RunCMake/Make/CTestJobServer-Tests-j2-stdout.txt b/Tests/RunCMake/Make/CTestJobServer-Tests-j2-stdout.txt new file mode 100644 index 0000000..a700999 --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer-Tests-j2-stdout.txt @@ -0,0 +1,6 @@ +Test project [^ +]*/Tests/RunCMake/Make/CTestJobServer-build +Connected to MAKE jobserver + Start [0-9]+: test[0-9]+ + Start [0-9]+: test[0-9]+ +1/6 Test #[0-9]+: test[0-9]+ ............................ Passed +[0-9.]+ sec diff --git a/Tests/RunCMake/Make/CTestJobServer-Tests-j3-stdout.txt b/Tests/RunCMake/Make/CTestJobServer-Tests-j3-stdout.txt new file mode 100644 index 0000000..5a76bdc --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer-Tests-j3-stdout.txt @@ -0,0 +1,7 @@ +Test project [^ +]*/Tests/RunCMake/Make/CTestJobServer-build +Connected to MAKE jobserver + Start [0-9]+: test[0-9]+ + Start [0-9]+: test[0-9]+ + Start [0-9]+: test[0-9]+ +1/6 Test #[0-9]+: test[0-9]+ ............................ Passed +[0-9.]+ sec diff --git a/Tests/RunCMake/Make/CTestJobServer.cmake b/Tests/RunCMake/Make/CTestJobServer.cmake new file mode 100644 index 0000000..2ca3d54 --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer.cmake @@ -0,0 +1,4 @@ +enable_testing() +foreach(i RANGE 1 6) + add_test(NAME test${i} COMMAND ${CMAKE_COMMAND} -E true) +endforeach() diff --git a/Tests/RunCMake/Make/CTestJobServer.make b/Tests/RunCMake/Make/CTestJobServer.make new file mode 100644 index 0000000..7fc5e28 --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer.make @@ -0,0 +1,11 @@ +NoPipe: + env MAKEFLAGS= $(CMAKE_CTEST_COMMAND) -j6 +.PHONY: NoPipe + +NoTests: + +$(CMAKE_CTEST_COMMAND) -j6 -R NoTests +.PHONY: NoTests + +Tests: + +$(CMAKE_CTEST_COMMAND) -j6 +.PHONY: Tests diff --git a/Tests/RunCMake/Make/RunCMakeTest.cmake b/Tests/RunCMake/Make/RunCMakeTest.cmake index 5d1ba48..cfaf759 100644 --- a/Tests/RunCMake/Make/RunCMakeTest.cmake +++ b/Tests/RunCMake/Make/RunCMakeTest.cmake @@ -79,9 +79,29 @@ function(detect_jobserver_present) run_cmake_command(DetectJobServer-present-parallel-build ${CMAKE_COMMAND} --build . -j4) endfunction() +function(run_make_rule case rule job_count) + run_cmake_command(${case}-${rule}-j${job_count} + ${RunCMake_MAKE_PROGRAM} -f "${RunCMake_SOURCE_DIR}/${case}.make" ${rule} -j${job_count} + CMAKE_COMMAND="${CMAKE_COMMAND}" CMAKE_CTEST_COMMAND="${CMAKE_CTEST_COMMAND}" + ) +endfunction() + +function(run_CTestJobServer) + set(RunCMake_TEST_BINARY_DIR ${RunCMake_BINARY_DIR}/CTestJobServer-build) + run_cmake(CTestJobServer) + set(RunCMake_TEST_NO_CLEAN 1) + run_make_rule(CTestJobServer NoPipe 2) + run_make_rule(CTestJobServer NoTests 2) + run_make_rule(CTestJobServer Tests 2) + run_make_rule(CTestJobServer Tests 3) +endfunction() + # Jobservers are currently only supported by GNU makes, except MSYS2 make if(MAKE_IS_GNU AND NOT RunCMake_GENERATOR MATCHES "MSYS Makefiles") detect_jobserver_present() + if(UNIX) + run_CTestJobServer() + endif() endif() if(MAKE_IS_GNU) |