summaryrefslogtreecommitdiffstats
path: root/Source/CTest/cmUVJobServerClient.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'Source/CTest/cmUVJobServerClient.cxx')
-rw-r--r--Source/CTest/cmUVJobServerClient.cxx518
1 files changed, 518 insertions, 0 deletions
diff --git a/Source/CTest/cmUVJobServerClient.cxx b/Source/CTest/cmUVJobServerClient.cxx
new file mode 100644
index 0000000..d7d76c9
--- /dev/null
+++ b/Source/CTest/cmUVJobServerClient.cxx
@@ -0,0 +1,518 @@
+/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
+ file Copyright.txt or https://cmake.org/licensing for details. */
+#include "cmUVJobServerClient.h"
+
+#include <cassert>
+#include <utility>
+
+#ifndef _WIN32
+# include <cstdio>
+# include <string>
+# include <vector>
+
+# include <fcntl.h>
+# include <unistd.h>
+#endif
+
+#include <cm/memory>
+#include <cm/optional>
+#include <cm/string_view>
+
+#include "cmRange.h"
+#include "cmStringAlgorithms.h"
+#include "cmSystemTools.h"
+#include "cmUVHandlePtr.h"
+
+class cmUVJobServerClient::Impl
+{
+public:
+ uv_loop_t& Loop;
+
+ cm::uv_idle_ptr ImplicitToken;
+ std::function<void()> OnToken;
+ std::function<void(int)> OnDisconnect;
+
+ // The number of tokens held by this client.
+ unsigned int HeldTokens = 0;
+
+ // The number of tokens we need to receive from the job server.
+ unsigned int NeedTokens = 0;
+
+ Impl(uv_loop_t& loop);
+ virtual ~Impl();
+
+ virtual void SendToken() = 0;
+ virtual void StartReceivingTokens() = 0;
+ virtual void StopReceivingTokens() = 0;
+
+ void RequestToken();
+ void ReleaseToken();
+ void RequestExplicitToken();
+ void DecrementNeedTokens();
+ void HoldToken();
+ void RequestImplicitToken();
+ void ReleaseImplicitToken();
+ void ReceivedToken();
+ void Disconnected(int status);
+};
+
+cmUVJobServerClient::Impl::Impl(uv_loop_t& loop)
+ : Loop(loop)
+{
+ this->ImplicitToken.init(this->Loop, this);
+}
+
+cmUVJobServerClient::Impl::~Impl() = default;
+
+void cmUVJobServerClient::Impl::RequestToken()
+{
+ if (this->HeldTokens == 0 && !uv_is_active(this->ImplicitToken)) {
+ this->RequestImplicitToken();
+ } else {
+ this->RequestExplicitToken();
+ }
+}
+
+void cmUVJobServerClient::Impl::ReleaseToken()
+{
+ assert(this->HeldTokens > 0);
+ --this->HeldTokens;
+ if (this->HeldTokens == 0) {
+ // This was the token implicitly owned by our process.
+ this->ReleaseImplicitToken();
+ } else {
+ // This was a token we received from the job server. Send it back.
+ this->SendToken();
+ }
+}
+
+void cmUVJobServerClient::Impl::RequestExplicitToken()
+{
+ ++this->NeedTokens;
+ this->StartReceivingTokens();
+}
+
+void cmUVJobServerClient::Impl::DecrementNeedTokens()
+{
+ assert(this->NeedTokens > 0);
+ --this->NeedTokens;
+ if (this->NeedTokens == 0) {
+ this->StopReceivingTokens();
+ }
+}
+
+void cmUVJobServerClient::Impl::HoldToken()
+{
+ ++this->HeldTokens;
+ if (this->OnToken) {
+ this->OnToken();
+ } else {
+ this->ReleaseToken();
+ }
+}
+
+void cmUVJobServerClient::Impl::RequestImplicitToken()
+{
+ assert(this->HeldTokens == 0);
+ this->ImplicitToken.start([](uv_idle_t* handle) {
+ uv_idle_stop(handle);
+ auto* self = static_cast<Impl*>(handle->data);
+ self->HoldToken();
+ });
+}
+
+void cmUVJobServerClient::Impl::ReleaseImplicitToken()
+{
+ assert(this->HeldTokens == 0);
+ // Use the implicit token in place of receiving one from the job server.
+ if (this->NeedTokens > 0) {
+ this->DecrementNeedTokens();
+ this->RequestImplicitToken();
+ }
+}
+
+void cmUVJobServerClient::Impl::ReceivedToken()
+{
+ this->DecrementNeedTokens();
+ this->HoldToken();
+}
+
+void cmUVJobServerClient::Impl::Disconnected(int status)
+{
+ if (this->OnDisconnect) {
+ this->OnDisconnect(status);
+ }
+}
+
+//---------------------------------------------------------------------------
+// Implementation on POSIX platforms.
+// https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
+
+#ifndef _WIN32
+namespace {
+class ImplPosix : public cmUVJobServerClient::Impl
+{
+public:
+ enum class Connection
+ {
+ None,
+ FDs,
+ FIFO,
+ };
+ Connection Conn = Connection::None;
+
+ cm::uv_pipe_ptr ConnRead;
+ cm::uv_pipe_ptr ConnWrite;
+ cm::uv_pipe_ptr ConnFIFO;
+
+ std::shared_ptr<std::function<void(int)>> OnWrite;
+
+ void Connect();
+ void ConnectFDs(int rfd, int wfd);
+ void ConnectFIFO(const char* path);
+ void Disconnect(int status);
+
+ cm::uv_pipe_ptr OpenFD(int fd);
+
+ uv_stream_t* GetWriter() const;
+ uv_stream_t* GetReader() const;
+
+ static void OnAllocateCB(uv_handle_t* handle, size_t suggested_size,
+ uv_buf_t* buf);
+ static void OnReadCB(uv_stream_t* stream, ssize_t nread,
+ const uv_buf_t* buf);
+
+ void OnAllocate(size_t suggested_size, uv_buf_t* buf);
+ void OnRead(ssize_t nread, const uv_buf_t* buf);
+
+ char ReadBuf = '.';
+
+ bool ReceivingTokens = false;
+
+ bool IsConnected() const;
+
+ void SendToken() override;
+ void StartReceivingTokens() override;
+ void StopReceivingTokens() override;
+
+ ImplPosix(uv_loop_t& loop);
+ ~ImplPosix() override;
+};
+
+ImplPosix::ImplPosix(uv_loop_t& loop)
+ : Impl(loop)
+ , OnWrite(std::make_shared<std::function<void(int)>>([this](int status) {
+ if (status != 0) {
+ this->Disconnect(status);
+ }
+ }))
+{
+ this->Connect();
+}
+
+ImplPosix::~ImplPosix()
+{
+ this->Disconnect(0);
+}
+
+void ImplPosix::Connect()
+{
+ // --jobserver-auth= for gnu make versions >= 4.2
+ // --jobserver-fds= for gnu make versions < 4.2
+ // -J for bsd make
+ static const std::vector<cm::string_view> prefixes = {
+ "--jobserver-auth=", "--jobserver-fds=", "-J"
+ };
+
+ cm::optional<std::string> makeflags = cmSystemTools::GetEnvVar("MAKEFLAGS");
+ if (!makeflags) {
+ return;
+ }
+
+ // Look for the *last* occurrence of jobserver flags.
+ cm::optional<std::string> auth;
+ std::vector<std::string> args;
+ cmSystemTools::ParseUnixCommandLine(makeflags->c_str(), args);
+ for (cm::string_view arg : cmReverseRange(args)) {
+ for (cm::string_view prefix : prefixes) {
+ if (cmHasPrefix(arg, prefix)) {
+ auth = cmTrimWhitespace(arg.substr(prefix.length()));
+ break;
+ }
+ }
+ if (auth) {
+ break;
+ }
+ }
+
+ if (!auth) {
+ return;
+ }
+
+ // fifo:PATH
+ if (cmHasLiteralPrefix(*auth, "fifo:")) {
+ ConnectFIFO(auth->substr(cmStrLen("fifo:")).c_str());
+ return;
+ }
+
+ // reader,writer
+ int reader;
+ int writer;
+ if (std::sscanf(auth->c_str(), "%d,%d", &reader, &writer) == 2) {
+ ConnectFDs(reader, writer);
+ }
+}
+
+cm::uv_pipe_ptr ImplPosix::OpenFD(int fd)
+{
+ // Create a CLOEXEC duplicate so `uv_pipe_ptr` can close it
+ // without closing the original file descriptor, which our
+ // child processes might want to use too.
+ cm::uv_pipe_ptr p;
+ int fd_dup = dup(fd);
+ if (fd_dup < 0) {
+ return p;
+ }
+ if (fcntl(fd_dup, F_SETFD, FD_CLOEXEC) == -1) {
+ close(fd_dup);
+ return p;
+ }
+ p.init(this->Loop, 0, this);
+ if (uv_pipe_open(p, fd_dup) < 0) {
+ close(fd_dup);
+ }
+ return p;
+}
+
+void ImplPosix::ConnectFDs(int rfd, int wfd)
+{
+ cm::uv_pipe_ptr connRead = this->OpenFD(rfd);
+ cm::uv_pipe_ptr connWrite = this->OpenFD(wfd);
+
+ // Verify that the read end is readable and the write end is writable.
+ if (!connRead || !uv_is_readable(connRead) || //
+ !connWrite || !uv_is_writable(connWrite)) {
+ return;
+ }
+
+ this->ConnRead = std::move(connRead);
+ this->ConnWrite = std::move(connWrite);
+ this->Conn = Connection::FDs;
+}
+
+void ImplPosix::ConnectFIFO(const char* path)
+{
+ int fd = open(path, O_RDWR);
+ if (fd < 0) {
+ return;
+ }
+
+ cm::uv_pipe_ptr connFIFO;
+ connFIFO.init(this->Loop, 0, this);
+ if (uv_pipe_open(connFIFO, fd) != 0) {
+ close(fd);
+ return;
+ }
+
+ // Verify that the fifo is both readable and writable.
+ if (!connFIFO || !uv_is_readable(connFIFO) || !uv_is_writable(connFIFO)) {
+ return;
+ }
+
+ this->ConnFIFO = std::move(connFIFO);
+ this->Conn = Connection::FIFO;
+}
+
+void ImplPosix::Disconnect(int status)
+{
+ if (this->Conn == Connection::None) {
+ return;
+ }
+
+ this->StopReceivingTokens();
+
+ switch (this->Conn) {
+ case Connection::FDs:
+ this->ConnRead.reset();
+ this->ConnWrite.reset();
+ break;
+ case Connection::FIFO:
+ this->ConnFIFO.reset();
+ break;
+ default:
+ break;
+ }
+
+ this->Conn = Connection::None;
+ if (status != 0) {
+ this->Disconnected(status);
+ }
+}
+
+uv_stream_t* ImplPosix::GetWriter() const
+{
+ switch (this->Conn) {
+ case Connection::FDs:
+ return this->ConnWrite;
+ case Connection::FIFO:
+ return this->ConnFIFO;
+ default:
+ return nullptr;
+ }
+}
+
+uv_stream_t* ImplPosix::GetReader() const
+{
+ switch (this->Conn) {
+ case Connection::FDs:
+ return this->ConnRead;
+ case Connection::FIFO:
+ return this->ConnFIFO;
+ default:
+ return nullptr;
+ }
+}
+
+void ImplPosix::OnAllocateCB(uv_handle_t* handle, size_t suggested_size,
+ uv_buf_t* buf)
+{
+ auto* self = static_cast<ImplPosix*>(handle->data);
+ self->OnAllocate(suggested_size, buf);
+}
+
+void ImplPosix::OnReadCB(uv_stream_t* stream, ssize_t nread,
+ const uv_buf_t* buf)
+{
+ auto* self = static_cast<ImplPosix*>(stream->data);
+ self->OnRead(nread, buf);
+}
+
+void ImplPosix::OnAllocate(size_t /*suggested_size*/, uv_buf_t* buf)
+{
+ *buf = uv_buf_init(&this->ReadBuf, 1);
+}
+
+void ImplPosix::OnRead(ssize_t nread, const uv_buf_t* /*buf*/)
+{
+ if (nread == 0) {
+ return;
+ }
+
+ if (nread < 0) {
+ auto status = static_cast<int>(nread);
+ this->Disconnect(status);
+ return;
+ }
+
+ assert(nread == 1);
+ this->ReceivedToken();
+}
+
+bool ImplPosix::IsConnected() const
+{
+ return this->Conn != Connection::None;
+}
+
+void ImplPosix::SendToken()
+{
+ if (this->Conn == Connection::None) {
+ return;
+ }
+
+ static char token = '.';
+
+ uv_buf_t const buf = uv_buf_init(&token, sizeof(token));
+ int status = cm::uv_write(this->GetWriter(), &buf, 1, this->OnWrite);
+ if (status != 0) {
+ this->Disconnect(status);
+ }
+}
+
+void ImplPosix::StartReceivingTokens()
+{
+ if (this->Conn == Connection::None) {
+ return;
+ }
+ if (this->ReceivingTokens) {
+ return;
+ }
+
+ int status = uv_read_start(this->GetReader(), &ImplPosix::OnAllocateCB,
+ &ImplPosix::OnReadCB);
+ if (status != 0) {
+ this->Disconnect(status);
+ return;
+ }
+
+ this->ReceivingTokens = true;
+}
+
+void ImplPosix::StopReceivingTokens()
+{
+ if (this->Conn == Connection::None) {
+ return;
+ }
+ if (!this->ReceivingTokens) {
+ return;
+ }
+
+ this->ReceivingTokens = false;
+ uv_read_stop(this->GetReader());
+}
+}
+#endif
+
+//---------------------------------------------------------------------------
+// Implementation of public interface.
+
+cmUVJobServerClient::cmUVJobServerClient(std::unique_ptr<Impl> impl)
+ : Impl_(std::move(impl))
+{
+}
+
+cmUVJobServerClient::~cmUVJobServerClient() = default;
+
+cmUVJobServerClient::cmUVJobServerClient(cmUVJobServerClient&&) noexcept =
+ default;
+cmUVJobServerClient& cmUVJobServerClient::operator=(
+ cmUVJobServerClient&&) noexcept = default;
+
+void cmUVJobServerClient::RequestToken()
+{
+ this->Impl_->RequestToken();
+}
+
+void cmUVJobServerClient::ReleaseToken()
+{
+ this->Impl_->ReleaseToken();
+}
+
+int cmUVJobServerClient::GetHeldTokens() const
+{
+ return this->Impl_->HeldTokens;
+}
+
+int cmUVJobServerClient::GetNeedTokens() const
+{
+ return this->Impl_->NeedTokens;
+}
+
+cm::optional<cmUVJobServerClient> cmUVJobServerClient::Connect(
+ uv_loop_t& loop, std::function<void()> onToken,
+ std::function<void(int)> onDisconnect)
+{
+#if defined(_WIN32)
+ // FIXME: Windows job server client not yet implemented.
+ static_cast<void>(loop);
+ static_cast<void>(onToken);
+ static_cast<void>(onDisconnect);
+#else
+ auto impl = cm::make_unique<ImplPosix>(loop);
+ if (impl && impl->IsConnected()) {
+ impl->OnToken = std::move(onToken);
+ impl->OnDisconnect = std::move(onDisconnect);
+ return cmUVJobServerClient(std::move(impl));
+ }
+#endif
+ return cm::nullopt;
+}