/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying file Copyright.txt or https://cmake.org/licensing for details. */ #include "cmUVJobServerClient.h" #include #include #ifndef _WIN32 # include # include # include # include # include #endif #include #include #include #include "cmRange.h" #include "cmStringAlgorithms.h" #include "cmSystemTools.h" #include "cmUVHandlePtr.h" class cmUVJobServerClient::Impl { public: uv_loop_t& Loop; cm::uv_idle_ptr ImplicitToken; std::function OnToken; std::function OnDisconnect; // The number of tokens held by this client. unsigned int HeldTokens = 0; // The number of tokens we need to receive from the job server. unsigned int NeedTokens = 0; Impl(uv_loop_t& loop); virtual ~Impl(); virtual void SendToken() = 0; virtual void StartReceivingTokens() = 0; virtual void StopReceivingTokens() = 0; void RequestToken(); void ReleaseToken(); void RequestExplicitToken(); void DecrementNeedTokens(); void HoldToken(); void RequestImplicitToken(); void ReleaseImplicitToken(); void ReceivedToken(); void Disconnected(int status); }; cmUVJobServerClient::Impl::Impl(uv_loop_t& loop) : Loop(loop) { this->ImplicitToken.init(this->Loop, this); } cmUVJobServerClient::Impl::~Impl() = default; void cmUVJobServerClient::Impl::RequestToken() { if (this->HeldTokens == 0 && !uv_is_active(this->ImplicitToken)) { this->RequestImplicitToken(); } else { this->RequestExplicitToken(); } } void cmUVJobServerClient::Impl::ReleaseToken() { assert(this->HeldTokens > 0); --this->HeldTokens; if (this->HeldTokens == 0) { // This was the token implicitly owned by our process. this->ReleaseImplicitToken(); } else { // This was a token we received from the job server. Send it back. this->SendToken(); } } void cmUVJobServerClient::Impl::RequestExplicitToken() { ++this->NeedTokens; this->StartReceivingTokens(); } void cmUVJobServerClient::Impl::DecrementNeedTokens() { assert(this->NeedTokens > 0); --this->NeedTokens; if (this->NeedTokens == 0) { this->StopReceivingTokens(); } } void cmUVJobServerClient::Impl::HoldToken() { ++this->HeldTokens; if (this->OnToken) { this->OnToken(); } else { this->ReleaseToken(); } } void cmUVJobServerClient::Impl::RequestImplicitToken() { assert(this->HeldTokens == 0); this->ImplicitToken.start([](uv_idle_t* handle) { uv_idle_stop(handle); auto* self = static_cast(handle->data); self->HoldToken(); }); } void cmUVJobServerClient::Impl::ReleaseImplicitToken() { assert(this->HeldTokens == 0); // Use the implicit token in place of receiving one from the job server. if (this->NeedTokens > 0) { this->DecrementNeedTokens(); this->RequestImplicitToken(); } } void cmUVJobServerClient::Impl::ReceivedToken() { this->DecrementNeedTokens(); this->HoldToken(); } void cmUVJobServerClient::Impl::Disconnected(int status) { if (this->OnDisconnect) { this->OnDisconnect(status); } } //--------------------------------------------------------------------------- // Implementation on POSIX platforms. // https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html #ifndef _WIN32 namespace { class ImplPosix : public cmUVJobServerClient::Impl { public: enum class Connection { None, FDs, FIFO, }; Connection Conn = Connection::None; cm::uv_pipe_ptr ConnRead; cm::uv_pipe_ptr ConnWrite; cm::uv_pipe_ptr ConnFIFO; std::shared_ptr> OnWrite; void Connect(); void ConnectFDs(int rfd, int wfd); void ConnectFIFO(const char* path); void Disconnect(int status); cm::uv_pipe_ptr OpenFD(int fd); uv_stream_t* GetWriter() const; uv_stream_t* GetReader() const; static void OnAllocateCB(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void OnReadCB(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); void OnAllocate(size_t suggested_size, uv_buf_t* buf); void OnRead(ssize_t nread, const uv_buf_t* buf); char ReadBuf = '.'; bool ReceivingTokens = false; bool IsConnected() const; void SendToken() override; void StartReceivingTokens() override; void StopReceivingTokens() override; ImplPosix(uv_loop_t& loop); ~ImplPosix() override; }; ImplPosix::ImplPosix(uv_loop_t& loop) : Impl(loop) , OnWrite(std::make_shared>([this](int status) { if (status != 0) { this->Disconnect(status); } })) { this->Connect(); } ImplPosix::~ImplPosix() { this->Disconnect(0); } void ImplPosix::Connect() { // --jobserver-auth= for gnu make versions >= 4.2 // --jobserver-fds= for gnu make versions < 4.2 // -J for bsd make static const std::vector prefixes = { "--jobserver-auth=", "--jobserver-fds=", "-J" }; cm::optional makeflags = cmSystemTools::GetEnvVar("MAKEFLAGS"); if (!makeflags) { return; } // Look for the *last* occurrence of jobserver flags. cm::optional auth; std::vector args; cmSystemTools::ParseUnixCommandLine(makeflags->c_str(), args); for (cm::string_view arg : cmReverseRange(args)) { for (cm::string_view prefix : prefixes) { if (cmHasPrefix(arg, prefix)) { auth = cmTrimWhitespace(arg.substr(prefix.length())); break; } } if (auth) { break; } } if (!auth) { return; } // fifo:PATH if (cmHasLiteralPrefix(*auth, "fifo:")) { ConnectFIFO(auth->substr(cmStrLen("fifo:")).c_str()); return; } // reader,writer int reader; int writer; if (std::sscanf(auth->c_str(), "%d,%d", &reader, &writer) == 2) { ConnectFDs(reader, writer); } } cm::uv_pipe_ptr ImplPosix::OpenFD(int fd) { // Create a CLOEXEC duplicate so `uv_pipe_ptr` can close it // without closing the original file descriptor, which our // child processes might want to use too. cm::uv_pipe_ptr p; int fd_dup = dup(fd); if (fd_dup < 0) { return p; } if (fcntl(fd_dup, F_SETFD, FD_CLOEXEC) == -1) { close(fd_dup); return p; } p.init(this->Loop, 0, this); if (uv_pipe_open(p, fd_dup) < 0) { close(fd_dup); } return p; } void ImplPosix::ConnectFDs(int rfd, int wfd) { cm::uv_pipe_ptr connRead = this->OpenFD(rfd); cm::uv_pipe_ptr connWrite = this->OpenFD(wfd); // Verify that the read end is readable and the write end is writable. if (!connRead || !uv_is_readable(connRead) || // !connWrite || !uv_is_writable(connWrite)) { return; } this->ConnRead = std::move(connRead); this->ConnWrite = std::move(connWrite); this->Conn = Connection::FDs; } void ImplPosix::ConnectFIFO(const char* path) { int fd = open(path, O_RDWR); if (fd < 0) { return; } if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1) { close(fd); return; } cm::uv_pipe_ptr connFIFO; connFIFO.init(this->Loop, 0, this); if (uv_pipe_open(connFIFO, fd) != 0) { close(fd); return; } // Verify that the fifo is both readable and writable. if (!connFIFO || !uv_is_readable(connFIFO) || !uv_is_writable(connFIFO)) { return; } this->ConnFIFO = std::move(connFIFO); this->Conn = Connection::FIFO; } void ImplPosix::Disconnect(int status) { if (this->Conn == Connection::None) { return; } this->StopReceivingTokens(); switch (this->Conn) { case Connection::FDs: this->ConnRead.reset(); this->ConnWrite.reset(); break; case Connection::FIFO: this->ConnFIFO.reset(); break; default: break; } this->Conn = Connection::None; if (status != 0) { this->Disconnected(status); } } uv_stream_t* ImplPosix::GetWriter() const { switch (this->Conn) { case Connection::FDs: return this->ConnWrite; case Connection::FIFO: return this->ConnFIFO; default: return nullptr; } } uv_stream_t* ImplPosix::GetReader() const { switch (this->Conn) { case Connection::FDs: return this->ConnRead; case Connection::FIFO: return this->ConnFIFO; default: return nullptr; } } void ImplPosix::OnAllocateCB(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { auto* self = static_cast(handle->data); self->OnAllocate(suggested_size, buf); } void ImplPosix::OnReadCB(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { auto* self = static_cast(stream->data); self->OnRead(nread, buf); } void ImplPosix::OnAllocate(size_t /*suggested_size*/, uv_buf_t* buf) { *buf = uv_buf_init(&this->ReadBuf, 1); } void ImplPosix::OnRead(ssize_t nread, const uv_buf_t* /*buf*/) { if (nread == 0) { return; } if (nread < 0) { auto status = static_cast(nread); this->Disconnect(status); return; } assert(nread == 1); this->ReceivedToken(); } bool ImplPosix::IsConnected() const { return this->Conn != Connection::None; } void ImplPosix::SendToken() { if (this->Conn == Connection::None) { return; } static char token = '.'; uv_buf_t const buf = uv_buf_init(&token, sizeof(token)); int status = cm::uv_write(this->GetWriter(), &buf, 1, this->OnWrite); if (status != 0) { this->Disconnect(status); } } void ImplPosix::StartReceivingTokens() { if (this->Conn == Connection::None) { return; } if (this->ReceivingTokens) { return; } int status = uv_read_start(this->GetReader(), &ImplPosix::OnAllocateCB, &ImplPosix::OnReadCB); if (status != 0) { this->Disconnect(status); return; } this->ReceivingTokens = true; } void ImplPosix::StopReceivingTokens() { if (this->Conn == Connection::None) { return; } if (!this->ReceivingTokens) { return; } this->ReceivingTokens = false; uv_read_stop(this->GetReader()); } } #endif //--------------------------------------------------------------------------- // Implementation of public interface. cmUVJobServerClient::cmUVJobServerClient(std::unique_ptr impl) : Impl_(std::move(impl)) { } cmUVJobServerClient::~cmUVJobServerClient() = default; cmUVJobServerClient::cmUVJobServerClient(cmUVJobServerClient&&) noexcept = default; cmUVJobServerClient& cmUVJobServerClient::operator=( cmUVJobServerClient&&) noexcept = default; void cmUVJobServerClient::RequestToken() { this->Impl_->RequestToken(); } void cmUVJobServerClient::ReleaseToken() { this->Impl_->ReleaseToken(); } int cmUVJobServerClient::GetHeldTokens() const { return this->Impl_->HeldTokens; } int cmUVJobServerClient::GetNeedTokens() const { return this->Impl_->NeedTokens; } cm::optional cmUVJobServerClient::Connect( uv_loop_t& loop, std::function onToken, std::function onDisconnect) { #if defined(_WIN32) // FIXME: Windows job server client not yet implemented. static_cast(loop); static_cast(onToken); static_cast(onDisconnect); #else auto impl = cm::make_unique(loop); if (impl && impl->IsConnected()) { impl->OnToken = std::move(onToken); impl->OnDisconnect = std::move(onDisconnect); return cmUVJobServerClient(std::move(impl)); } #endif return cm::nullopt; }