diff options
author | Justin Berger <j.david.berger@gmail.com> | 2017-03-25 03:38:52 (GMT) |
---|---|---|
committer | Justin Berger <j.david.berger@gmail.com> | 2017-07-11 00:11:27 (GMT) |
commit | d4f5d35ca491ede92003b26a7d0eb06aea3a2bbb (patch) | |
tree | 90f665b7e1611c47ab8eaa3f069e9572d3df58fb /Source | |
parent | 5acbf08bff643ec6779464c840bfe2f02a4e65ae (diff) | |
download | CMake-d4f5d35ca491ede92003b26a7d0eb06aea3a2bbb.zip CMake-d4f5d35ca491ede92003b26a7d0eb06aea3a2bbb.tar.gz CMake-d4f5d35ca491ede92003b26a7d0eb06aea3a2bbb.tar.bz2 |
server: Refactor to make the event loop owned by server object
Diffstat (limited to 'Source')
-rw-r--r-- | Source/CMakeLists.txt | 2 | ||||
-rw-r--r-- | Source/cmConnection.cxx | 150 | ||||
-rw-r--r-- | Source/cmConnection.h | 104 | ||||
-rw-r--r-- | Source/cmFileMonitor.cxx | 4 | ||||
-rw-r--r-- | Source/cmPipeConnection.cxx | 81 | ||||
-rw-r--r-- | Source/cmPipeConnection.h | 28 | ||||
-rw-r--r-- | Source/cmServer.cxx | 236 | ||||
-rw-r--r-- | Source/cmServer.h | 98 | ||||
-rw-r--r-- | Source/cmServerConnection.cxx | 391 | ||||
-rw-r--r-- | Source/cmServerConnection.h | 94 | ||||
-rw-r--r-- | Source/cmcmd.cxx | 4 |
11 files changed, 762 insertions, 430 deletions
diff --git a/Source/CMakeLists.txt b/Source/CMakeLists.txt index 40403ca..d41d514 100644 --- a/Source/CMakeLists.txt +++ b/Source/CMakeLists.txt @@ -976,7 +976,9 @@ target_link_libraries(cmake CMakeLib) if(CMake_ENABLE_SERVER_MODE) add_library(CMakeServerLib + cmConnection.h cmConnection.cxx cmFileMonitor.cxx cmFileMonitor.h + cmPipeConnection.cxx cmPipeConnection.h cmServer.cxx cmServer.h cmServerConnection.cxx cmServerConnection.h cmServerProtocol.cxx cmServerProtocol.h diff --git a/Source/cmConnection.cxx b/Source/cmConnection.cxx new file mode 100644 index 0000000..b25843b --- /dev/null +++ b/Source/cmConnection.cxx @@ -0,0 +1,150 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#include "cmConnection.h" + +#include "cmServer.h" +#include "cm_uv.h" + +#include <cassert> +#include <cstring> + +struct write_req_t +{ + uv_write_t req; + uv_buf_t buf; +}; + +void cmConnection::on_alloc_buffer(uv_handle_t* handle, size_t suggested_size, + uv_buf_t* buf) +{ + (void)(handle); + char* rawBuffer = new char[suggested_size]; + *buf = uv_buf_init(rawBuffer, static_cast<unsigned int>(suggested_size)); +} + +void cmConnection::on_read(uv_stream_t* stream, ssize_t nread, + const uv_buf_t* buf) +{ + auto conn = reinterpret_cast<cmConnection*>(stream->data); + if (conn) { + if (nread >= 0) { + conn->ReadData(std::string(buf->base, buf->base + nread)); + } else { + conn->OnDisconnect((int)nread); + } + } + + delete[](buf->base); +} + +void cmConnection::on_close_delete(uv_handle_t* handle) +{ + delete handle; +} + +void cmConnection::on_close(uv_handle_t*) +{ +} + +void cmConnection::on_write(uv_write_t* req, int status) +{ + (void)(status); + + // Free req and buffer + write_req_t* wr = reinterpret_cast<write_req_t*>(req); + delete[](wr->buf.base); + delete wr; +} + +void cmConnection::on_new_connection(uv_stream_t* stream, int status) +{ + (void)(status); + auto conn = reinterpret_cast<cmConnection*>(stream->data); + + if (conn) { + conn->Connect(stream); + } +} + +bool cmConnection::IsOpen() const +{ + return this->WriteStream != CM_NULLPTR; +} + +void cmConnection::WriteData(const std::string& data) +{ + assert(this->WriteStream); + + auto ds = data.size(); + + write_req_t* req = new write_req_t; + req->req.data = this; + req->buf = uv_buf_init(new char[ds], static_cast<unsigned int>(ds)); + memcpy(req->buf.base, data.c_str(), ds); + uv_write(reinterpret_cast<uv_write_t*>(req), + static_cast<uv_stream_t*>(this->WriteStream), &req->buf, 1, + on_write); +} + +cmConnection::~cmConnection() +{ + OnServerShuttingDown(); +} + +void cmConnection::ReadData(const std::string& data) +{ + this->RawReadBuffer += data; + if (BufferStrategy) { + std::string packet = BufferStrategy->BufferMessage(this->RawReadBuffer); + do { + ProcessRequest(packet); + packet = BufferStrategy->BufferMessage(this->RawReadBuffer); + } while (!packet.empty()); + + } else { + ProcessRequest(this->RawReadBuffer); + this->RawReadBuffer.clear(); + } +} + +void cmConnection::SetServer(cmServerBase* s) +{ + Server = s; +} + +cmConnection::cmConnection(cmConnectionBufferStrategy* bufferStrategy) + : BufferStrategy(bufferStrategy) +{ +} + +void cmConnection::Connect(uv_stream_t*) +{ + Server->OnConnected(nullptr); +} + +void cmConnection::ProcessRequest(const std::string& request) +{ + Server->ProcessRequest(this, request); +} + +bool cmConnection::OnServeStart(std::string* errString) +{ + (void)errString; + return true; +} + +void cmConnection::OnDisconnect(int errorCode) +{ + (void)errorCode; + this->Server->OnDisconnect(this); +} + +bool cmConnection::OnServerShuttingDown() +{ + this->WriteStream->data = nullptr; + this->ReadStream->data = nullptr; + + this->ReadStream = nullptr; + this->WriteStream = nullptr; + return true; +} diff --git a/Source/cmConnection.h b/Source/cmConnection.h new file mode 100644 index 0000000..e994716 --- /dev/null +++ b/Source/cmConnection.h @@ -0,0 +1,104 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ + +#pragma once + +#include "cmConfigure.h" + +#include "cm_uv.h" + +#include <cstddef> +#include <memory> +#include <string> + +class cmServerBase; + +/*** + * Given a sequence of bytes with any kind of buffering, instances of this + * class arrange logical chunks according to whatever the use case is for + * the connection. + */ +class cmConnectionBufferStrategy +{ +public: + virtual ~cmConnectionBufferStrategy(); + + /*** + * Called whenever with an active raw buffer. If a logical chunk + * becomes available, that chunk is returned and that portion is + * removed from the rawBuffer + * + * @param rawBuffer in/out parameter. Receive buffer; the buffer strategy is + * free to manipulate this buffer anyway it needs to. + * + * @return Next chunk from the stream. Returns the empty string if a chunk + * isn't ready yet. Users of this interface should repeatedly call this + * function until an empty string is returned since its entirely possible + * multiple chunks come in a single raw buffer. + */ + virtual std::string BufferMessage(std::string& rawBuffer) = 0; + + /*** + * Resets the internal state of the buffering + */ + virtual void clear(); + + // TODO: There should be a callback / flag set for errors +}; + +/*** + * Abstraction of a connection; ties in event callbacks from libuv and notifies + * the server when appropriate + */ +class cmConnection +{ + CM_DISABLE_COPY(cmConnection) + +public: + virtual ~cmConnection(); + + /*** + * @param bufferStrategy If no strategy is given, it will process the raw + * chunks as they come in. The connection + * owns the pointer given. + */ + cmConnection(cmConnectionBufferStrategy* bufferStrategy = nullptr); + + virtual void Connect(uv_stream_t* server); + + virtual void ReadData(const std::string& data); + + virtual bool OnServeStart(std::string* errString); + + virtual bool OnServerShuttingDown(); + + virtual bool IsOpen() const; + + virtual void WriteData(const std::string& data); + + virtual void ProcessRequest(const std::string& request); + + virtual void SetServer(cmServerBase* s); + + virtual void OnDisconnect(int errorCode); + uv_stream_t* ReadStream = nullptr; + cmServerBase* Server = nullptr; + uv_stream_t* WriteStream = nullptr; + + static void on_close(uv_handle_t* handle); + static void on_close_delete(uv_handle_t* handle); + +protected: + std::string RawReadBuffer; + + std::unique_ptr<cmConnectionBufferStrategy> BufferStrategy; + + static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); + + static void on_write(uv_write_t* req, int status); + + static void on_new_connection(uv_stream_t* stream, int status); + + static void on_alloc_buffer(uv_handle_t* handle, size_t suggested_size, + uv_buf_t* buf); +}; diff --git a/Source/cmFileMonitor.cxx b/Source/cmFileMonitor.cxx index 8027535..c0401d7 100644 --- a/Source/cmFileMonitor.cxx +++ b/Source/cmFileMonitor.cxx @@ -171,7 +171,9 @@ public: { if (this->Handle) { uv_fs_event_stop(this->Handle); - uv_close(reinterpret_cast<uv_handle_t*>(this->Handle), &on_fs_close); + if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(this->Handle))) { + uv_close(reinterpret_cast<uv_handle_t*>(this->Handle), &on_fs_close); + } this->Handle = nullptr; } cmVirtualDirectoryWatcher::StopWatching(); diff --git a/Source/cmPipeConnection.cxx b/Source/cmPipeConnection.cxx new file mode 100644 index 0000000..438719f --- /dev/null +++ b/Source/cmPipeConnection.cxx @@ -0,0 +1,81 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#include "cmPipeConnection.h" + +#include "cmConfigure.h" +#include "cmServer.h" + +cmPipeConnection::cmPipeConnection(const std::string& name, + cmConnectionBufferStrategy* bufferStrategy) + : cmConnection(bufferStrategy) + , PipeName(name) +{ +} + +void cmPipeConnection::Connect(uv_stream_t* server) +{ + if (this->ClientPipe) { + // Accept and close all pipes but the first: + uv_pipe_t* rejectPipe = new uv_pipe_t(); + + uv_pipe_init(this->Server->GetLoop(), rejectPipe, 0); + uv_accept(server, reinterpret_cast<uv_stream_t*>(rejectPipe)); + uv_close(reinterpret_cast<uv_handle_t*>(rejectPipe), &on_close_delete); + return; + } + + this->ClientPipe = new uv_pipe_t(); + uv_pipe_init(this->Server->GetLoop(), this->ClientPipe, 0); + this->ClientPipe->data = static_cast<cmConnection*>(this); + + auto client = reinterpret_cast<uv_stream_t*>(this->ClientPipe); + if (uv_accept(server, client) != 0) { + uv_close(reinterpret_cast<uv_handle_t*>(client), &on_close_delete); + this->ClientPipe = CM_NULLPTR; + return; + } + this->ReadStream = client; + this->WriteStream = client; + + uv_read_start(this->ReadStream, on_alloc_buffer, on_read); + Server->OnConnected(this); +} + +bool cmPipeConnection::OnServeStart(std::string* errorMessage) +{ + this->ServerPipe = new uv_pipe_t(); + uv_pipe_init(this->Server->GetLoop(), this->ServerPipe, 0); + this->ServerPipe->data = static_cast<cmConnection*>(this); + + int r; + if ((r = uv_pipe_bind(this->ServerPipe, this->PipeName.c_str())) != 0) { + *errorMessage = std::string("Internal Error with ") + this->PipeName + + ": " + uv_err_name(r); + return false; + } + auto serverStream = reinterpret_cast<uv_stream_t*>(this->ServerPipe); + if ((r = uv_listen(serverStream, 1, on_new_connection)) != 0) { + *errorMessage = std::string("Internal Error listening on ") + + this->PipeName + ": " + uv_err_name(r); + return false; + } + + return cmConnection::OnServeStart(errorMessage); +} + +bool cmPipeConnection::OnServerShuttingDown() +{ + if (this->ClientPipe) { + uv_close(reinterpret_cast<uv_handle_t*>(this->ClientPipe), + &on_close_delete); + this->WriteStream->data = nullptr; + } + uv_close(reinterpret_cast<uv_handle_t*>(this->ServerPipe), &on_close_delete); + + this->ClientPipe = nullptr; + this->ServerPipe = nullptr; + this->WriteStream = nullptr; + this->ReadStream = nullptr; + + return cmConnection::OnServerShuttingDown(); +} diff --git a/Source/cmPipeConnection.h b/Source/cmPipeConnection.h new file mode 100644 index 0000000..3d3d52c --- /dev/null +++ b/Source/cmPipeConnection.h @@ -0,0 +1,28 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ + +#pragma once + +#include "cmConnection.h" + +#include "cm_uv.h" + +#include <string> + +class cmPipeConnection : public cmConnection +{ +public: + cmPipeConnection(const std::string& name, + cmConnectionBufferStrategy* bufferStrategy = nullptr); + + bool OnServeStart(std::string* pString) override; + + bool OnServerShuttingDown() override; + + void Connect(uv_stream_t* server) override; + +private: + const std::string PipeName; + uv_pipe_t* ServerPipe = nullptr; + uv_pipe_t* ClientPipe = nullptr; +}; diff --git a/Source/cmServer.cxx b/Source/cmServer.cxx index 7fc6ed7..03e0115 100644 --- a/Source/cmServer.cxx +++ b/Source/cmServer.cxx @@ -2,7 +2,8 @@ file Copyright.txt or https://cmake.org/licensing for details. */ #include "cmServer.h" -#include "cmServerConnection.h" +#include "cmConnection.h" +#include "cmFileMonitor.h" #include "cmServerDictionary.h" #include "cmServerProtocol.h" #include "cmSystemTools.h" @@ -14,8 +15,23 @@ #include <algorithm> #include <cassert> #include <cstdint> +#include <memory> #include <utility> +void on_signal(uv_signal_t* signal, int signum) +{ + auto conn = reinterpret_cast<cmServerBase*>(signal->data); + conn->OnSignal(signum); +} + +static void on_walk_to_shutdown(uv_handle_t* handle, void* arg) +{ + (void)arg; + if (!uv_is_closing(handle)) { + uv_close(handle, &cmConnection::on_close); + } +} + class cmServer::DebugInfo { public: @@ -30,11 +46,10 @@ public: uint64_t StartTime; }; -cmServer::cmServer(cmServerConnection* conn, bool supportExperimental) - : Connection(conn) +cmServer::cmServer(cmConnection* conn, bool supportExperimental) + : cmServerBase(conn) , SupportExperimental(supportExperimental) { - this->Connection->SetServer(this); // Register supported protocols: this->RegisterProtocol(new cmServerProtocol1_0); } @@ -48,23 +63,15 @@ cmServer::~cmServer() for (cmServerProtocol* p : this->SupportedProtocols) { delete p; } - - delete this->Connection; } -void cmServer::PopOne() +void cmServer::ProcessRequest(cmConnection* connection, + const std::string& input) { - if (this->Queue.empty()) { - return; - } - Json::Reader reader; Json::Value value; - const std::string input = this->Queue.front(); - this->Queue.erase(this->Queue.begin()); - if (!reader.parse(input, value)) { - this->WriteParseError("Failed to parse JSON input."); + this->WriteParseError(connection, "Failed to parse JSON input."); return; } @@ -82,7 +89,7 @@ void cmServer::PopOne() if (request.Type == "") { cmServerResponse response(request); response.SetError("No type given in request."); - this->WriteResponse(response, nullptr); + this->WriteResponse(connection, response, nullptr); return; } @@ -91,9 +98,11 @@ void cmServer::PopOne() if (this->Protocol) { this->Protocol->CMakeInstance()->SetProgressCallback( reportProgress, const_cast<cmServerRequest*>(&request)); - this->WriteResponse(this->Protocol->Process(request), debug.get()); + this->WriteResponse(connection, this->Protocol->Process(request), + debug.get()); } else { - this->WriteResponse(this->SetProtocolVersion(request), debug.get()); + this->WriteResponse(connection, this->SetProtocolVersion(request), + debug.get()); } } @@ -115,7 +124,7 @@ void cmServer::RegisterProtocol(cmServerProtocol* protocol) } } -void cmServer::PrintHello() const +void cmServer::PrintHello(cmConnection* connection) const { Json::Value hello = Json::objectValue; hello[kTYPE_KEY] = "hello"; @@ -134,13 +143,7 @@ void cmServer::PrintHello() const protocolVersions.append(tmp); } - this->WriteJsonObject(hello, nullptr); -} - -void cmServer::QueueRequest(const std::string& request) -{ - this->Queue.push_back(request); - this->PopOne(); + this->WriteJsonObject(connection, hello, nullptr); } void cmServer::reportProgress(const char* msg, float progress, void* data) @@ -232,17 +235,26 @@ bool cmServer::Serve(std::string* errorMessage) } assert(!this->Protocol); - return Connection->ProcessEvents(errorMessage); + return cmServerBase::Serve(errorMessage); } cmFileMonitor* cmServer::FileMonitor() const { - return Connection->FileMonitor(); + return fileMonitor.get(); } void cmServer::WriteJsonObject(const Json::Value& jsonValue, const DebugInfo* debug) const { + for (auto& connection : this->Connections) { + WriteJsonObject(connection.get(), jsonValue, debug); + } +} + +void cmServer::WriteJsonObject(cmConnection* connection, + const Json::Value& jsonValue, + const DebugInfo* debug) const +{ Json::FastWriter writer; auto beforeJson = uv_hrtime(); @@ -272,7 +284,7 @@ void cmServer::WriteJsonObject(const Json::Value& jsonValue, } } - Connection->WriteData(std::string("\n") + kSTART_MAGIC + std::string("\n") + + connection->WriteData(std::string("\n") + kSTART_MAGIC + std::string("\n") + result + kEND_MAGIC + std::string("\n")); } @@ -334,7 +346,8 @@ void cmServer::WriteMessage(const cmServerRequest& request, WriteJsonObject(obj, nullptr); } -void cmServer::WriteParseError(const std::string& message) const +void cmServer::WriteParseError(cmConnection* connection, + const std::string& message) const { Json::Value obj = Json::objectValue; obj[kTYPE_KEY] = kERROR_TYPE; @@ -342,7 +355,7 @@ void cmServer::WriteParseError(const std::string& message) const obj[kREPLY_TO_KEY] = ""; obj[kCOOKIE_KEY] = ""; - this->WriteJsonObject(obj, nullptr); + this->WriteJsonObject(connection, obj, nullptr); } void cmServer::WriteSignal(const std::string& name, @@ -358,7 +371,8 @@ void cmServer::WriteSignal(const std::string& name, WriteJsonObject(obj, nullptr); } -void cmServer::WriteResponse(const cmServerResponse& response, +void cmServer::WriteResponse(cmConnection* connection, + const cmServerResponse& response, const DebugInfo* debug) const { assert(response.IsComplete()); @@ -371,5 +385,161 @@ void cmServer::WriteResponse(const cmServerResponse& response, obj[kERROR_MESSAGE_KEY] = response.ErrorMessage(); } - this->WriteJsonObject(obj, debug); + this->WriteJsonObject(connection, obj, debug); +} + +void cmServer::OnConnected(cmConnection* connection) +{ + PrintHello(connection); +} + +void cmServer::OnServeStart() +{ + cmServerBase::OnServeStart(); + fileMonitor = std::make_shared<cmFileMonitor>(GetLoop()); +} + +void cmServer::StartShutDown() +{ + if (fileMonitor) { + fileMonitor->StopMonitoring(); + fileMonitor.reset(); + } + cmServerBase::StartShutDown(); +} + +static void __start_thread(void* arg) +{ + auto server = reinterpret_cast<cmServerBase*>(arg); + std::string error; + server->Serve(&error); +} + +bool cmServerBase::StartServeThread() +{ + ServeThreadRunning = true; + uv_thread_create(&ServeThread, __start_thread, this); + return true; +} + +bool cmServerBase::Serve(std::string* errorMessage) +{ + errorMessage->clear(); + + uv_signal_init(&Loop, &this->SIGINTHandler); + uv_signal_init(&Loop, &this->SIGHUPHandler); + + this->SIGINTHandler.data = this; + this->SIGHUPHandler.data = this; + + uv_signal_start(&this->SIGINTHandler, &on_signal, SIGINT); + uv_signal_start(&this->SIGHUPHandler, &on_signal, SIGHUP); + + OnServeStart(); + + for (auto& connection : Connections) { + if (!connection->OnServeStart(errorMessage)) { + return false; + } + } + + if (uv_run(&Loop, UV_RUN_DEFAULT) != 0) { + *errorMessage = "Internal Error: Event loop stopped in unclean state."; + StartShutDown(); + return false; + } + + ServeThreadRunning = false; + return true; +} + +void cmServerBase::OnConnected(cmConnection*) +{ +} + +void cmServerBase::OnDisconnect() +{ +} + +void cmServerBase::OnServeStart() +{ + uv_signal_start(&this->SIGINTHandler, &on_signal, SIGINT); + uv_signal_start(&this->SIGHUPHandler, &on_signal, SIGHUP); +} + +void cmServerBase::StartShutDown() +{ + if (!uv_is_closing((const uv_handle_t*)&this->SIGINTHandler)) { + uv_signal_stop(&this->SIGINTHandler); + } + + if (!uv_is_closing((const uv_handle_t*)&this->SIGHUPHandler)) { + uv_signal_stop(&this->SIGHUPHandler); + } + + for (auto& connection : Connections) { + connection->OnServerShuttingDown(); + } + Connections.clear(); + + uv_stop(&Loop); + + uv_walk(&Loop, on_walk_to_shutdown, CM_NULLPTR); + + uv_run(&Loop, UV_RUN_DEFAULT); +} + +bool cmServerBase::OnSignal(int signum) +{ + (void)signum; + StartShutDown(); + return true; +} + +cmServerBase::cmServerBase(cmConnection* connection) +{ + uv_loop_init(&Loop); + + uv_signal_init(&Loop, &this->SIGINTHandler); + uv_signal_init(&Loop, &this->SIGHUPHandler); + + this->SIGINTHandler.data = this; + this->SIGHUPHandler.data = this; + + AddNewConnection(connection); +} + +cmServerBase::~cmServerBase() +{ + + if (ServeThreadRunning) { + StartShutDown(); + uv_thread_join(&ServeThread); + } + + uv_loop_close(&Loop); +} + +void cmServerBase::AddNewConnection(cmConnection* ownedConnection) +{ + Connections.emplace_back(ownedConnection); + ownedConnection->SetServer(this); +} + +uv_loop_t* cmServerBase::GetLoop() +{ + return &Loop; +} + +void cmServerBase::OnDisconnect(cmConnection* pConnection) +{ + auto pred = [pConnection](const std::unique_ptr<cmConnection>& m) { + return m.get() == pConnection; + }; + Connections.erase( + std::remove_if(Connections.begin(), Connections.end(), pred), + Connections.end()); + if (Connections.empty()) { + StartShutDown(); + } } diff --git a/Source/cmServer.h b/Source/cmServer.h index b814050..0000704 100644 --- a/Source/cmServer.h +++ b/Source/cmServer.h @@ -7,26 +7,83 @@ #include "cm_jsoncpp_value.h" #include "cm_uv.h" +#include <memory> // IWYU pragma: keep #include <string> #include <vector> +class cmConnection; class cmFileMonitor; -class cmServerConnection; class cmServerProtocol; class cmServerRequest; class cmServerResponse; -class cmServer +/*** + * This essentially hold and manages a libuv event queue and responds to + * messages + * on any of its connections. + */ +class cmServerBase +{ +public: + cmServerBase(cmConnection* connection); + virtual ~cmServerBase(); + + virtual void AddNewConnection(cmConnection* ownedConnection); + + /*** + * The main override responsible for tailoring behavior towards + * whatever the given server is supposed to do + * + * This should almost always be called by the given connections + * directly. + * + * @param connection The connectiont the request was received on + * @param request The actual request + */ + virtual void ProcessRequest(cmConnection* connection, + const std::string& request) = 0; + virtual void OnConnected(cmConnection* connection); + virtual void OnDisconnect(); + + /*** + * Start a dedicated thread. If this is used to start the server, it will + * join on the + * servers dtor. + */ + virtual bool StartServeThread(); + virtual bool Serve(std::string* errorMessage); + + virtual void OnServeStart(); + virtual void StartShutDown(); + + virtual bool OnSignal(int signum); + uv_loop_t* GetLoop(); + + void OnDisconnect(cmConnection* pConnection); + +protected: + std::vector<std::unique_ptr<cmConnection> > Connections; + + bool ServeThreadRunning = false; + uv_thread_t ServeThread; + + uv_loop_t Loop; + + uv_signal_t SIGINTHandler; + uv_signal_t SIGHUPHandler; +}; + +class cmServer : public cmServerBase { CM_DISABLE_COPY(cmServer) public: class DebugInfo; - cmServer(cmServerConnection* conn, bool supportExperimental); - ~cmServer(); + cmServer(cmConnection* conn, bool supportExperimental); + ~cmServer() override; - bool Serve(std::string* errorMessage); + bool Serve(std::string* errorMessage) override; cmFileMonitor* FileMonitor() const; @@ -34,9 +91,20 @@ private: void RegisterProtocol(cmServerProtocol* protocol); // Callbacks from cmServerConnection: - void PopOne(); - void QueueRequest(const std::string& request); + void ProcessRequest(cmConnection* connection, + const std::string& request) override; + std::shared_ptr<cmFileMonitor> fileMonitor; + +public: + void OnServeStart() override; + + void StartShutDown() override; + +public: + void OnConnected(cmConnection* connection) override; + +private: static void reportProgress(const char* msg, float progress, void* data); static void reportMessage(const char* msg, const char* title, bool& cancel, void* data); @@ -44,36 +112,37 @@ private: // Handle requests: cmServerResponse SetProtocolVersion(const cmServerRequest& request); - void PrintHello() const; + void PrintHello(cmConnection* connection) const; // Write responses: void WriteProgress(const cmServerRequest& request, int min, int current, int max, const std::string& message) const; void WriteMessage(const cmServerRequest& request, const std::string& message, const std::string& title) const; - void WriteResponse(const cmServerResponse& response, + void WriteResponse(cmConnection* connection, + const cmServerResponse& response, const DebugInfo* debug) const; - void WriteParseError(const std::string& message) const; + void WriteParseError(cmConnection* connection, + const std::string& message) const; void WriteSignal(const std::string& name, const Json::Value& obj) const; void WriteJsonObject(Json::Value const& jsonValue, const DebugInfo* debug) const; + void WriteJsonObject(cmConnection* connection, Json::Value const& jsonValue, + const DebugInfo* debug) const; + static cmServerProtocol* FindMatchingProtocol( const std::vector<cmServerProtocol*>& protocols, int major, int minor); - cmServerConnection* Connection = nullptr; const bool SupportExperimental; cmServerProtocol* Protocol = nullptr; std::vector<cmServerProtocol*> SupportedProtocols; - std::vector<std::string> Queue; std::string DataBuffer; std::string JsonData; - uv_loop_t* Loop = nullptr; - typedef union { uv_tty_t tty; @@ -87,7 +156,6 @@ private: mutable bool Writing = false; - friend class cmServerConnection; friend class cmServerProtocol; friend class cmServerRequest; }; diff --git a/Source/cmServerConnection.cxx b/Source/cmServerConnection.cxx index 36312ed..3cc7722 100644 --- a/Source/cmServerConnection.cxx +++ b/Source/cmServerConnection.cxx @@ -2,376 +2,123 @@ file Copyright.txt or https://cmake.org/licensing for details. */ #include "cmServerConnection.h" -#include "cmFileMonitor.h" #include "cmServer.h" #include "cmServerDictionary.h" -#include <assert.h> -#include <string.h> - -namespace { - -struct write_req_t +cmStdIoConnection::cmStdIoConnection( + cmConnectionBufferStrategy* bufferStrategy) + : cmConnection(bufferStrategy) + , Input() + , Output() { - uv_write_t req; - uv_buf_t buf; -}; - -void on_alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) -{ - (void)(handle); - char* rawBuffer = new char[suggested_size]; - *buf = uv_buf_init(rawBuffer, static_cast<unsigned int>(suggested_size)); -} - -void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) -{ - auto conn = reinterpret_cast<cmServerConnection*>(stream->data); - if (nread >= 0) { - conn->ReadData(std::string(buf->base, buf->base + nread)); - } else { - conn->TriggerShutdown(); - } - - delete[](buf->base); } -void on_write(uv_write_t* req, int status) +void cmStdIoConnection::SetServer(cmServerBase* s) { - (void)(status); - auto conn = reinterpret_cast<cmServerConnection*>(req->data); - - // Free req and buffer - write_req_t* wr = reinterpret_cast<write_req_t*>(req); - delete[](wr->buf.base); - delete wr; + cmConnection::SetServer(s); - conn->ProcessNextRequest(); -} + if (uv_guess_handle(1) == UV_TTY) { + usesTty = true; -void on_new_connection(uv_stream_t* stream, int status) -{ - (void)(status); - auto conn = reinterpret_cast<cmServerConnection*>(stream->data); - conn->Connect(stream); -} + this->Input.tty = new uv_tty_t(); + uv_tty_init(this->Server->GetLoop(), this->Input.tty, 0, 1); + uv_tty_set_mode(this->Input.tty, UV_TTY_MODE_NORMAL); + this->Input.tty->data = static_cast<cmConnection*>(this); + this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.tty); -void on_signal(uv_signal_t* signal, int signum) -{ - auto conn = reinterpret_cast<cmServerConnection*>(signal->data); - (void)(signum); - conn->TriggerShutdown(); -} + this->Output.tty = new uv_tty_t(); + uv_tty_init(this->Server->GetLoop(), this->Output.tty, 1, 0); + uv_tty_set_mode(this->Output.tty, UV_TTY_MODE_NORMAL); + this->Output.tty->data = static_cast<cmConnection*>(this); + this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.tty); + } else { + usesTty = false; -void on_signal_close(uv_handle_t* handle) -{ - delete reinterpret_cast<uv_signal_t*>(handle); -} + this->Input.pipe = new uv_pipe_t(); + uv_pipe_init(this->Server->GetLoop(), this->Input.pipe, 0); + uv_pipe_open(this->Input.pipe, 0); + this->Input.pipe->data = static_cast<cmConnection*>(this); + this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.pipe); -void on_pipe_close(uv_handle_t* handle) -{ - delete reinterpret_cast<uv_pipe_t*>(handle); + this->Output.pipe = new uv_pipe_t(); + uv_pipe_init(this->Server->GetLoop(), this->Output.pipe, 0); + uv_pipe_open(this->Output.pipe, 1); + this->Output.pipe->data = static_cast<cmConnection*>(this); + this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.pipe); + } } -void on_tty_close(uv_handle_t* handle) +bool cmStdIoConnection::OnServeStart(std::string* pString) { - delete reinterpret_cast<uv_tty_t*>(handle); + uv_read_start(this->ReadStream, on_alloc_buffer, on_read); + Server->OnConnected(this); + return cmConnection::OnServeStart(pString); } -} // namespace - -class LoopGuard +bool cmStdIoConnection::OnServerShuttingDown() { -public: - LoopGuard(cmServerConnection* connection) - : Connection(connection) - { - this->Connection->mLoop = uv_default_loop(); - if (!this->Connection->mLoop) { - return; - } - this->Connection->mFileMonitor = - new cmFileMonitor(this->Connection->mLoop); - } - - ~LoopGuard() - { - if (!this->Connection->mLoop) { - return; - } + cmConnection::OnServerShuttingDown(); - if (this->Connection->mFileMonitor) { - delete this->Connection->mFileMonitor; - } - uv_loop_close(this->Connection->mLoop); - this->Connection->mLoop = nullptr; + if (usesTty) { + uv_read_stop(reinterpret_cast<uv_stream_t*>(this->Input.tty)); + uv_close(reinterpret_cast<uv_handle_t*>(this->Input.tty), + &on_close_delete); + uv_close(reinterpret_cast<uv_handle_t*>(this->Output.tty), + &on_close_delete); + } else { + uv_close(reinterpret_cast<uv_handle_t*>(this->Input.pipe), + &on_close_delete); + uv_close(reinterpret_cast<uv_handle_t*>(this->Output.pipe), + &on_close_delete); } -private: - cmServerConnection* Connection; -}; + return true; +} -cmServerConnection::cmServerConnection() +cmServerPipeConnection::cmServerPipeConnection(const std::string& name) + : cmPipeConnection(name, new cmServerBufferStrategy) { } -cmServerConnection::~cmServerConnection() +cmServerStdIoConnection::cmServerStdIoConnection() + : cmStdIoConnection(new cmServerBufferStrategy) { } -void cmServerConnection::SetServer(cmServer* s) +cmConnectionBufferStrategy::~cmConnectionBufferStrategy() { - this->Server = s; } -bool cmServerConnection::ProcessEvents(std::string* errorMessage) +void cmConnectionBufferStrategy::clear() { - assert(this->Server); - errorMessage->clear(); - - this->RawReadBuffer.clear(); - this->RequestBuffer.clear(); - - LoopGuard guard(this); - (void)(guard); - if (!this->mLoop) { - *errorMessage = "Internal Error: Failed to create event loop."; - return false; - } - - this->SIGINTHandler = new uv_signal_t; - uv_signal_init(this->mLoop, this->SIGINTHandler); - this->SIGINTHandler->data = static_cast<void*>(this); - uv_signal_start(this->SIGINTHandler, &on_signal, SIGINT); - - this->SIGHUPHandler = new uv_signal_t; - uv_signal_init(this->mLoop, this->SIGHUPHandler); - this->SIGHUPHandler->data = static_cast<void*>(this); - uv_signal_start(this->SIGHUPHandler, &on_signal, SIGHUP); - - if (!DoSetup(errorMessage)) { - return false; - } - - if (uv_run(this->mLoop, UV_RUN_DEFAULT) != 0) { - *errorMessage = "Internal Error: Event loop stopped in unclean state."; - return false; - } - - // These need to be cleaned up by now: - assert(!this->ReadStream); - assert(!this->WriteStream); - - this->RawReadBuffer.clear(); - this->RequestBuffer.clear(); - - return true; } -void cmServerConnection::ReadData(const std::string& data) +std::string cmServerBufferStrategy::BufferMessage(std::string& RawReadBuffer) { - this->RawReadBuffer += data; - for (;;) { - auto needle = this->RawReadBuffer.find('\n'); + auto needle = RawReadBuffer.find('\n'); if (needle == std::string::npos) { - return; + return ""; } - std::string line = this->RawReadBuffer.substr(0, needle); + std::string line = RawReadBuffer.substr(0, needle); const auto ls = line.size(); if (ls > 1 && line.at(ls - 1) == '\r') { line.erase(ls - 1, 1); } - this->RawReadBuffer.erase(this->RawReadBuffer.begin(), - this->RawReadBuffer.begin() + - static_cast<long>(needle) + 1); + RawReadBuffer.erase(RawReadBuffer.begin(), + RawReadBuffer.begin() + static_cast<long>(needle) + 1); if (line == kSTART_MAGIC) { - this->RequestBuffer.clear(); + RequestBuffer.clear(); continue; } if (line == kEND_MAGIC) { - this->Server->QueueRequest(this->RequestBuffer); - this->RequestBuffer.clear(); - } else { - this->RequestBuffer += line; - this->RequestBuffer += "\n"; + std::string rtn; + rtn.swap(this->RequestBuffer); + return rtn; } - } -} -void cmServerConnection::TriggerShutdown() -{ - this->FileMonitor()->StopMonitoring(); - - uv_signal_stop(this->SIGINTHandler); - uv_signal_stop(this->SIGHUPHandler); - - uv_close(reinterpret_cast<uv_handle_t*>(this->SIGINTHandler), - &on_signal_close); // delete handle - uv_close(reinterpret_cast<uv_handle_t*>(this->SIGHUPHandler), - &on_signal_close); // delete handle - - this->SIGINTHandler = nullptr; - this->SIGHUPHandler = nullptr; - - this->TearDown(); -} - -void cmServerConnection::WriteData(const std::string& data) -{ - assert(this->WriteStream); - - auto ds = data.size(); - - write_req_t* req = new write_req_t; - req->req.data = this; - req->buf = uv_buf_init(new char[ds], static_cast<unsigned int>(ds)); - memcpy(req->buf.base, data.c_str(), ds); - - uv_write(reinterpret_cast<uv_write_t*>(req), - static_cast<uv_stream_t*>(this->WriteStream), &req->buf, 1, - on_write); -} - -void cmServerConnection::ProcessNextRequest() -{ - Server->PopOne(); -} - -void cmServerConnection::SendGreetings() -{ - Server->PrintHello(); -} - -cmServerStdIoConnection::cmServerStdIoConnection() -{ - this->Input.tty = nullptr; - this->Output.tty = nullptr; -} - -bool cmServerStdIoConnection::DoSetup(std::string* errorMessage) -{ - (void)(errorMessage); - - if (uv_guess_handle(1) == UV_TTY) { - usesTty = true; - this->Input.tty = new uv_tty_t; - uv_tty_init(this->Loop(), this->Input.tty, 0, 1); - uv_tty_set_mode(this->Input.tty, UV_TTY_MODE_NORMAL); - Input.tty->data = this; - this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.tty); - - this->Output.tty = new uv_tty_t; - uv_tty_init(this->Loop(), this->Output.tty, 1, 0); - uv_tty_set_mode(this->Output.tty, UV_TTY_MODE_NORMAL); - Output.tty->data = this; - this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.tty); - } else { - usesTty = false; - this->Input.pipe = new uv_pipe_t; - uv_pipe_init(this->Loop(), this->Input.pipe, 0); - uv_pipe_open(this->Input.pipe, 0); - Input.pipe->data = this; - this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.pipe); - - this->Output.pipe = new uv_pipe_t; - uv_pipe_init(this->Loop(), this->Output.pipe, 0); - uv_pipe_open(this->Output.pipe, 1); - Output.pipe->data = this; - this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.pipe); + this->RequestBuffer += line; + this->RequestBuffer += "\n"; } - - SendGreetings(); - uv_read_start(this->ReadStream, on_alloc_buffer, on_read); - - return true; -} - -void cmServerStdIoConnection::TearDown() -{ - if (usesTty) { - uv_close(reinterpret_cast<uv_handle_t*>(this->Input.tty), &on_tty_close); - uv_close(reinterpret_cast<uv_handle_t*>(this->Output.tty), &on_tty_close); - this->Input.tty = nullptr; - this->Output.tty = nullptr; - } else { - uv_close(reinterpret_cast<uv_handle_t*>(this->Input.pipe), &on_pipe_close); - uv_close(reinterpret_cast<uv_handle_t*>(this->Output.pipe), - &on_pipe_close); - this->Input.pipe = nullptr; - this->Input.pipe = nullptr; - } - this->ReadStream = nullptr; - this->WriteStream = nullptr; -} - -cmServerPipeConnection::cmServerPipeConnection(const std::string& name) - : PipeName(name) -{ -} - -bool cmServerPipeConnection::DoSetup(std::string* errorMessage) -{ - this->ServerPipe = new uv_pipe_t; - uv_pipe_init(this->Loop(), this->ServerPipe, 0); - this->ServerPipe->data = this; - - int r; - if ((r = uv_pipe_bind(this->ServerPipe, this->PipeName.c_str())) != 0) { - *errorMessage = std::string("Internal Error with ") + this->PipeName + - ": " + uv_err_name(r); - return false; - } - auto serverStream = reinterpret_cast<uv_stream_t*>(this->ServerPipe); - if ((r = uv_listen(serverStream, 1, on_new_connection)) != 0) { - *errorMessage = std::string("Internal Error listening on ") + - this->PipeName + ": " + uv_err_name(r); - return false; - } - - return true; -} - -void cmServerPipeConnection::TearDown() -{ - if (this->ClientPipe) { - uv_close(reinterpret_cast<uv_handle_t*>(this->ClientPipe), &on_pipe_close); - this->WriteStream->data = nullptr; - } - uv_close(reinterpret_cast<uv_handle_t*>(this->ServerPipe), &on_pipe_close); - - this->ClientPipe = nullptr; - this->ServerPipe = nullptr; - this->WriteStream = nullptr; - this->ReadStream = nullptr; -} - -void cmServerPipeConnection::Connect(uv_stream_t* server) -{ - if (this->ClientPipe) { - // Accept and close all pipes but the first: - uv_pipe_t* rejectPipe = new uv_pipe_t; - - uv_pipe_init(this->Loop(), rejectPipe, 0); - auto rejecter = reinterpret_cast<uv_stream_t*>(rejectPipe); - uv_accept(server, rejecter); - uv_close(reinterpret_cast<uv_handle_t*>(rejecter), &on_pipe_close); - return; - } - - this->ClientPipe = new uv_pipe_t; - uv_pipe_init(this->Loop(), this->ClientPipe, 0); - this->ClientPipe->data = this; - auto client = reinterpret_cast<uv_stream_t*>(this->ClientPipe); - if (uv_accept(server, client) != 0) { - uv_close(reinterpret_cast<uv_handle_t*>(client), nullptr); - return; - } - this->ReadStream = client; - this->WriteStream = client; - - uv_read_start(this->ReadStream, on_alloc_buffer, on_read); - - this->SendGreetings(); } diff --git a/Source/cmServerConnection.h b/Source/cmServerConnection.h index b96bf3c..8865480 100644 --- a/Source/cmServerConnection.h +++ b/Source/cmServerConnection.h @@ -2,68 +2,46 @@ file Copyright.txt or https://cmake.org/licensing for details. */ #pragma once -#include "cmConfigure.h" +#include "cmConnection.h" +#include "cmPipeConnection.h" #include "cm_uv.h" #include <string> -class cmFileMonitor; -class cmServer; +class cmServerBase; -class cmServerConnection +/*** + * This connection buffer strategy accepts messages in the form of + * [== "CMake Server" ==[ +{ + ... some JSON message ... +} +]== "CMake Server" ==] + * and only passes on the core json; it discards the envelope. + */ +class cmServerBufferStrategy : public cmConnectionBufferStrategy { - CM_DISABLE_COPY(cmServerConnection) - public: - cmServerConnection(); - virtual ~cmServerConnection(); - - void SetServer(cmServer* s); - - bool ProcessEvents(std::string* errorMessage); - - void ReadData(const std::string& data); - void TriggerShutdown(); - void WriteData(const std::string& data); - void ProcessNextRequest(); - - virtual void Connect(uv_stream_t* server) { (void)(server); } - - cmFileMonitor* FileMonitor() const { return this->mFileMonitor; } - -protected: - virtual bool DoSetup(std::string* errorMessage) = 0; - virtual void TearDown() = 0; - - void SendGreetings(); - - uv_loop_t* Loop() const { return mLoop; } - -protected: - std::string RawReadBuffer; - std::string RequestBuffer; - - uv_stream_t* ReadStream = nullptr; - uv_stream_t* WriteStream = nullptr; + std::string BufferMessage(std::string& rawBuffer) override; private: - uv_loop_t* mLoop = nullptr; - cmFileMonitor* mFileMonitor = nullptr; - cmServer* Server = nullptr; - uv_signal_t* SIGINTHandler = nullptr; - uv_signal_t* SIGHUPHandler = nullptr; - - friend class LoopGuard; + std::string RequestBuffer; }; -class cmServerStdIoConnection : public cmServerConnection +/*** + * Generic connection over std io interfaces -- tty + */ +class cmStdIoConnection : public cmConnection { public: - cmServerStdIoConnection(); - bool DoSetup(std::string* errorMessage) override; + cmStdIoConnection(cmConnectionBufferStrategy* bufferStrategy); + + void SetServer(cmServerBase* s) override; + + bool OnServerShuttingDown() override; - void TearDown() override; + bool OnServeStart(std::string* pString) override; private: typedef union @@ -78,18 +56,18 @@ private: InOutUnion Output; }; -class cmServerPipeConnection : public cmServerConnection +/*** + * These specific connections use the cmake server + * buffering strategy. + */ +class cmServerStdIoConnection : public cmStdIoConnection { public: - cmServerPipeConnection(const std::string& name); - bool DoSetup(std::string* errorMessage) override; - - void TearDown() override; - - void Connect(uv_stream_t* server) override; + cmServerStdIoConnection(); +}; -private: - const std::string PipeName; - uv_pipe_t* ServerPipe = nullptr; - uv_pipe_t* ClientPipe = nullptr; +class cmServerPipeConnection : public cmPipeConnection +{ +public: + cmServerPipeConnection(const std::string& name); }; diff --git a/Source/cmcmd.cxx b/Source/cmcmd.cxx index d5b0861..8fadbb1 100644 --- a/Source/cmcmd.cxx +++ b/Source/cmcmd.cxx @@ -44,6 +44,8 @@ #include <stdlib.h> #include <time.h> +class cmConnection; + int cmcmd_cmake_ninja_depends(std::vector<std::string>::const_iterator argBeg, std::vector<std::string>::const_iterator argEnd); int cmcmd_cmake_ninja_dyndep(std::vector<std::string>::const_iterator argBeg, @@ -1013,7 +1015,7 @@ int cmcmd::ExecuteCMakeCommand(std::vector<std::string>& args) } } #if defined(HAVE_SERVER_MODE) && HAVE_SERVER_MODE - cmServerConnection* conn; + cmConnection* conn; if (isDebug) { conn = new cmServerStdIoConnection; } else { |