diff options
Diffstat (limited to 'Source')
-rw-r--r-- | Source/cmConnection.cxx | 31 | ||||
-rw-r--r-- | Source/cmConnection.h | 18 | ||||
-rw-r--r-- | Source/cmPipeConnection.cxx | 13 | ||||
-rw-r--r-- | Source/cmServer.cxx | 86 | ||||
-rw-r--r-- | Source/cmServer.h | 30 | ||||
-rw-r--r-- | Source/cmServerConnection.cxx | 158 | ||||
-rw-r--r-- | Source/cmServerConnection.h | 13 |
7 files changed, 230 insertions, 119 deletions
diff --git a/Source/cmConnection.cxx b/Source/cmConnection.cxx index f3fc1ef..f482412 100644 --- a/Source/cmConnection.cxx +++ b/Source/cmConnection.cxx @@ -38,11 +38,6 @@ void cmEventBasedConnection::on_read(uv_stream_t* stream, ssize_t nread, delete[](buf->base); } -void cmEventBasedConnection::on_close_delete(uv_handle_t* handle) -{ - delete handle; -} - void cmEventBasedConnection::on_close(uv_handle_t* /*handle*/) { } @@ -72,9 +67,19 @@ bool cmEventBasedConnection::IsOpen() const return this->WriteStream != nullptr; } -void cmEventBasedConnection::WriteData(const std::string& data) +void cmEventBasedConnection::WriteData(const std::string& _data) { +#ifndef NDEBUG + auto curr_thread_id = uv_thread_self(); + assert(this->Server); + assert(uv_thread_equal(&curr_thread_id, &this->Server->ServeThreadId)); +#endif + + auto data = _data; assert(this->WriteStream); + if (BufferStrategy) { + data = BufferStrategy->BufferOutMessage(data); + } auto ds = data.size(); @@ -119,7 +124,9 @@ void cmEventBasedConnection::OnDisconnect(int onerror) { (void)onerror; this->OnConnectionShuttingDown(); - this->Server->OnDisconnect(this); + if (this->Server) { + this->Server->OnDisconnect(this); + } } cmConnection::~cmConnection() @@ -128,6 +135,7 @@ cmConnection::~cmConnection() bool cmConnection::OnConnectionShuttingDown() { + this->Server = nullptr; return true; } @@ -149,9 +157,12 @@ bool cmConnection::OnServeStart(std::string* errString) bool cmEventBasedConnection::OnConnectionShuttingDown() { - this->WriteStream->data = nullptr; - this->ReadStream->data = nullptr; - + if (this->WriteStream) { + this->WriteStream->data = nullptr; + } + if (this->ReadStream) { + this->ReadStream->data = nullptr; + } this->ReadStream = nullptr; this->WriteStream = nullptr; return true; diff --git a/Source/cmConnection.h b/Source/cmConnection.h index f9d50de..ddb7744 100644 --- a/Source/cmConnection.h +++ b/Source/cmConnection.h @@ -39,6 +39,17 @@ public: virtual std::string BufferMessage(std::string& rawBuffer) = 0; /*** + * Called to properly buffer an outgoing message. + * + * @param rawBuffer Message to format in the correct way + * + * @return Formatted message + */ + virtual std::string BufferOutMessage(const std::string& rawBuffer) const + { + return rawBuffer; + }; + /*** * Resets the internal state of the buffering */ virtual void clear(); @@ -100,7 +111,12 @@ public: uv_stream_t* WriteStream = nullptr; static void on_close(uv_handle_t* handle); - static void on_close_delete(uv_handle_t* handle); + + template <typename T> + static void on_close_delete(uv_handle_t* handle) + { + delete reinterpret_cast<T*>(handle); + } protected: std::string RawReadBuffer; diff --git a/Source/cmPipeConnection.cxx b/Source/cmPipeConnection.cxx index b18a1d6..9e565f6 100644 --- a/Source/cmPipeConnection.cxx +++ b/Source/cmPipeConnection.cxx @@ -19,7 +19,8 @@ void cmPipeConnection::Connect(uv_stream_t* server) uv_pipe_init(this->Server->GetLoop(), rejectPipe, 0); uv_accept(server, reinterpret_cast<uv_stream_t*>(rejectPipe)); - uv_close(reinterpret_cast<uv_handle_t*>(rejectPipe), &on_close_delete); + uv_close(reinterpret_cast<uv_handle_t*>(rejectPipe), + &on_close_delete<uv_pipe_t>); return; } @@ -28,7 +29,8 @@ void cmPipeConnection::Connect(uv_stream_t* server) this->ClientPipe->data = static_cast<cmEventBasedConnection*>(this); auto client = reinterpret_cast<uv_stream_t*>(this->ClientPipe); if (uv_accept(server, client) != 0) { - uv_close(reinterpret_cast<uv_handle_t*>(client), &on_close_delete); + uv_close(reinterpret_cast<uv_handle_t*>(client), + &on_close_delete<uv_pipe_t>); this->ClientPipe = nullptr; return; } @@ -65,15 +67,16 @@ bool cmPipeConnection::OnConnectionShuttingDown() { if (this->ClientPipe) { uv_close(reinterpret_cast<uv_handle_t*>(this->ClientPipe), - &on_close_delete); + &on_close_delete<uv_pipe_t>); this->WriteStream->data = nullptr; } - uv_close(reinterpret_cast<uv_handle_t*>(this->ServerPipe), &on_close_delete); + uv_close(reinterpret_cast<uv_handle_t*>(this->ServerPipe), + &on_close_delete<uv_pipe_t>); this->ClientPipe = nullptr; this->ServerPipe = nullptr; this->WriteStream = nullptr; this->ReadStream = nullptr; - return cmConnection::OnConnectionShuttingDown(); + return cmEventBasedConnection::OnConnectionShuttingDown(); } diff --git a/Source/cmServer.cxx b/Source/cmServer.cxx index c3e6811..30d0f51 100644 --- a/Source/cmServer.cxx +++ b/Source/cmServer.cxx @@ -16,6 +16,7 @@ #include <algorithm> #include <cassert> #include <cstdint> +#include <iostream> #include <memory> #include <utility> @@ -57,10 +58,6 @@ cmServer::cmServer(cmConnection* conn, bool supportExperimental) cmServer::~cmServer() { - if (!this->Protocol) { // Server was never fully started! - return; - } - for (cmServerProtocol* p : this->SupportedProtocols) { delete p; } @@ -110,6 +107,7 @@ void cmServer::ProcessRequest(cmConnection* connection, void cmServer::RegisterProtocol(cmServerProtocol* protocol) { if (protocol->IsExperimental() && !this->SupportExperimental) { + delete protocol; return; } auto version = protocol->ProtocolVersion(); @@ -247,9 +245,11 @@ cmFileMonitor* cmServer::FileMonitor() const void cmServer::WriteJsonObject(const Json::Value& jsonValue, const DebugInfo* debug) const { + uv_rwlock_rdlock(&ConnectionsMutex); for (auto& connection : this->Connections) { WriteJsonObject(connection.get(), jsonValue, debug); } + uv_rwlock_rdunlock(&ConnectionsMutex); } void cmServer::WriteJsonObject(cmConnection* connection, @@ -285,8 +285,7 @@ void cmServer::WriteJsonObject(cmConnection* connection, } } - connection->WriteData(std::string("\n") + kSTART_MAGIC + std::string("\n") + - result + kEND_MAGIC + std::string("\n")); + connection->WriteData(result); } cmServerProtocol* cmServer::FindMatchingProtocol( @@ -413,18 +412,36 @@ static void __start_thread(void* arg) { auto server = reinterpret_cast<cmServerBase*>(arg); std::string error; - server->Serve(&error); + bool success = server->Serve(&error); + if (!success || error.empty() == false) { + std::cerr << "Error during serve: " << error << std::endl; + } +} + +static void __shutdownThread(uv_async_t* arg) +{ + auto server = reinterpret_cast<cmServerBase*>(arg->data); + on_walk_to_shutdown(reinterpret_cast<uv_handle_t*>(arg), nullptr); + server->StartShutDown(); } bool cmServerBase::StartServeThread() { ServeThreadRunning = true; + uv_async_init(&Loop, &this->ShutdownSignal, __shutdownThread); + this->ShutdownSignal.data = this; uv_thread_create(&ServeThread, __start_thread, this); return true; } bool cmServerBase::Serve(std::string* errorMessage) { +#ifndef NDEBUG + uv_thread_t blank_thread_t = {}; + assert(uv_thread_equal(&blank_thread_t, &ServeThreadId)); + ServeThreadId = uv_thread_self(); +#endif + errorMessage->clear(); uv_signal_init(&Loop, &this->SIGINTHandler); @@ -438,15 +455,24 @@ bool cmServerBase::Serve(std::string* errorMessage) OnServeStart(); - for (auto& connection : Connections) { - if (!connection->OnServeStart(errorMessage)) { - return false; + { + uv_rwlock_rdlock(&ConnectionsMutex); + for (auto& connection : Connections) { + if (!connection->OnServeStart(errorMessage)) { + uv_rwlock_rdunlock(&ConnectionsMutex); + return false; + } } + uv_rwlock_rdunlock(&ConnectionsMutex); } if (uv_run(&Loop, UV_RUN_DEFAULT) != 0) { + // It is important we don't ever let the event loop exit with open handles + // at best this is a memory leak, but it can also introduce race conditions + // which can hang the program. + assert(false && "Event loop stopped in unclean state."); + *errorMessage = "Internal Error: Event loop stopped in unclean state."; - StartShutDown(); return false; } @@ -458,14 +484,8 @@ void cmServerBase::OnConnected(cmConnection*) { } -void cmServerBase::OnDisconnect() -{ -} - void cmServerBase::OnServeStart() { - uv_signal_start(&this->SIGINTHandler, &on_signal, SIGINT); - uv_signal_start(&this->SIGHUPHandler, &on_signal, SIGHUP); } void cmServerBase::StartShutDown() @@ -480,16 +500,16 @@ void cmServerBase::StartShutDown() uv_signal_stop(&this->SIGHUPHandler); } - for (auto& connection : Connections) { - connection->OnConnectionShuttingDown(); + { + uv_rwlock_wrlock(&ConnectionsMutex); + for (auto& connection : Connections) { + connection->OnConnectionShuttingDown(); + } + Connections.clear(); + uv_rwlock_wrunlock(&ConnectionsMutex); } - Connections.clear(); - - uv_stop(&Loop); uv_walk(&Loop, on_walk_to_shutdown, nullptr); - - uv_run(&Loop, UV_RUN_DEFAULT); } bool cmServerBase::OnSignal(int signum) @@ -501,13 +521,12 @@ bool cmServerBase::OnSignal(int signum) cmServerBase::cmServerBase(cmConnection* connection) { - uv_loop_init(&Loop); + auto err = uv_loop_init(&Loop); + (void)err; + assert(err == 0); - uv_signal_init(&Loop, &this->SIGINTHandler); - uv_signal_init(&Loop, &this->SIGHUPHandler); - - this->SIGINTHandler.data = this; - this->SIGHUPHandler.data = this; + err = uv_rwlock_init(&ConnectionsMutex); + assert(err == 0); AddNewConnection(connection); } @@ -516,16 +535,19 @@ cmServerBase::~cmServerBase() { if (ServeThreadRunning) { - StartShutDown(); + uv_async_send(&this->ShutdownSignal); uv_thread_join(&ServeThread); } uv_loop_close(&Loop); + uv_rwlock_destroy(&ConnectionsMutex); } void cmServerBase::AddNewConnection(cmConnection* ownedConnection) { + uv_rwlock_wrlock(&ConnectionsMutex); Connections.emplace_back(ownedConnection); + uv_rwlock_wrunlock(&ConnectionsMutex); ownedConnection->SetServer(this); } @@ -539,9 +561,11 @@ void cmServerBase::OnDisconnect(cmConnection* pConnection) auto pred = [pConnection](const std::unique_ptr<cmConnection>& m) { return m.get() == pConnection; }; + uv_rwlock_wrlock(&ConnectionsMutex); Connections.erase( std::remove_if(Connections.begin(), Connections.end(), pred), Connections.end()); + uv_rwlock_wrunlock(&ConnectionsMutex); if (Connections.empty()) { StartShutDown(); } diff --git a/Source/cmServer.h b/Source/cmServer.h index 9d8473d..15fd2ba 100644 --- a/Source/cmServer.h +++ b/Source/cmServer.h @@ -37,13 +37,12 @@ public: * This should almost always be called by the given connections * directly. * - * @param connection The connectiont the request was received on + * @param connection The connection the request was received on * @param request The actual request */ virtual void ProcessRequest(cmConnection* connection, const std::string& request) = 0; virtual void OnConnected(cmConnection* connection); - virtual void OnDisconnect(); /*** * Start a dedicated thread. If this is used to start the server, it will @@ -62,10 +61,21 @@ public: void OnDisconnect(cmConnection* pConnection); protected: + mutable uv_rwlock_t ConnectionsMutex; std::vector<std::unique_ptr<cmConnection>> Connections; bool ServeThreadRunning = false; uv_thread_t ServeThread; + uv_async_t ShutdownSignal; +#ifndef NDEBUG +public: + // When the server starts it will mark down it's current thread ID, + // which is useful in other contexts to just assert that operations + // are performed on that same thread. + uv_thread_t ServeThreadId = {}; + +protected: +#endif uv_loop_t Loop; @@ -140,22 +150,6 @@ private: cmServerProtocol* Protocol = nullptr; std::vector<cmServerProtocol*> SupportedProtocols; - std::string DataBuffer; - std::string JsonData; - - typedef union - { - uv_tty_t tty; - uv_pipe_t pipe; - } InOutUnion; - - InOutUnion Input; - InOutUnion Output; - uv_stream_t* InputStream = nullptr; - uv_stream_t* OutputStream = nullptr; - - mutable bool Writing = false; - friend class cmServerProtocol; friend class cmServerRequest; }; diff --git a/Source/cmServerConnection.cxx b/Source/cmServerConnection.cxx index 4891131..44af75f 100644 --- a/Source/cmServerConnection.cxx +++ b/Source/cmServerConnection.cxx @@ -2,76 +2,139 @@ file Copyright.txt or https://cmake.org/licensing for details. */ #include "cmServerConnection.h" +#include "cmConfigure.h" #include "cmServer.h" #include "cmServerDictionary.h" +#ifdef _WIN32 +#include "io.h" +#else +#include <unistd.h> +#endif +#include <cassert> cmStdIoConnection::cmStdIoConnection( cmConnectionBufferStrategy* bufferStrategy) : cmEventBasedConnection(bufferStrategy) - , Input() - , Output() { } +void cmStdIoConnection::SetupStream(uv_stream_t*& stream, int file_id) +{ + assert(stream == nullptr); + switch (uv_guess_handle(file_id)) { + case UV_TTY: { + auto tty = new uv_tty_t(); + uv_tty_init(this->Server->GetLoop(), tty, file_id, file_id == 0); + uv_tty_set_mode(tty, UV_TTY_MODE_NORMAL); + stream = reinterpret_cast<uv_stream_t*>(tty); + break; + } + case UV_FILE: + if (file_id == 0) { + return; + } + // Intentional fallthrough; stdin can _not_ be treated as a named + // pipe, however stdout can be. + CM_FALLTHROUGH; + case UV_NAMED_PIPE: { + auto pipe = new uv_pipe_t(); + uv_pipe_init(this->Server->GetLoop(), pipe, 0); + uv_pipe_open(pipe, file_id); + stream = reinterpret_cast<uv_stream_t*>(pipe); + break; + } + default: + assert(false && "Unable to determine stream type"); + return; + } + stream->data = static_cast<cmEventBasedConnection*>(this); +} + void cmStdIoConnection::SetServer(cmServerBase* s) { cmConnection::SetServer(s); + if (!s) { + return; + } - if (uv_guess_handle(1) == UV_TTY) { - usesTty = true; - - this->Input.tty = new uv_tty_t(); - uv_tty_init(this->Server->GetLoop(), this->Input.tty, 0, 1); - uv_tty_set_mode(this->Input.tty, UV_TTY_MODE_NORMAL); - this->Input.tty->data = static_cast<cmEventBasedConnection*>(this); - this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.tty); - - this->Output.tty = new uv_tty_t(); - uv_tty_init(this->Server->GetLoop(), this->Output.tty, 1, 0); - uv_tty_set_mode(this->Output.tty, UV_TTY_MODE_NORMAL); - this->Output.tty->data = static_cast<cmEventBasedConnection*>(this); - this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.tty); - } else { - usesTty = false; - - this->Input.pipe = new uv_pipe_t(); - uv_pipe_init(this->Server->GetLoop(), this->Input.pipe, 0); - uv_pipe_open(this->Input.pipe, 0); - this->Input.pipe->data = static_cast<cmEventBasedConnection*>(this); - this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.pipe); - - this->Output.pipe = new uv_pipe_t(); - uv_pipe_init(this->Server->GetLoop(), this->Output.pipe, 0); - uv_pipe_open(this->Output.pipe, 1); - this->Output.pipe->data = static_cast<cmEventBasedConnection*>(this); - this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.pipe); + SetupStream(this->ReadStream, 0); + SetupStream(this->WriteStream, 1); +} + +void shutdown_connection(uv_prepare_t* prepare) +{ + cmStdIoConnection* connection = + reinterpret_cast<cmStdIoConnection*>(prepare->data); + + if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(prepare))) { + uv_close(reinterpret_cast<uv_handle_t*>(prepare), + &cmEventBasedConnection::on_close_delete<uv_prepare_t>); } + connection->OnDisconnect(0); } bool cmStdIoConnection::OnServeStart(std::string* pString) { - uv_read_start(this->ReadStream, on_alloc_buffer, on_read); Server->OnConnected(this); + if (this->ReadStream) { + uv_read_start(this->ReadStream, on_alloc_buffer, on_read); + } else if (uv_guess_handle(0) == UV_FILE) { + char buffer[1024]; + while (auto len = read(0, buffer, sizeof(buffer))) { + ReadData(std::string(buffer, buffer + len)); + } + + // We can't start the disconnect from here, add a prepare hook to do that + // for us + auto prepare = new uv_prepare_t(); + prepare->data = this; + uv_prepare_init(Server->GetLoop(), prepare); + uv_prepare_start(prepare, shutdown_connection); + } return cmConnection::OnServeStart(pString); } -bool cmStdIoConnection::OnConnectionShuttingDown() +void cmStdIoConnection::ShutdownStream(uv_stream_t*& stream) { - cmEventBasedConnection::OnConnectionShuttingDown(); + if (!stream) { + return; + } + switch (stream->type) { + case UV_TTY: { + assert(!uv_is_closing(reinterpret_cast<uv_handle_t*>(stream))); + if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(stream))) { + uv_close(reinterpret_cast<uv_handle_t*>(stream), + &on_close_delete<uv_tty_t>); + } + break; + } + case UV_FILE: + case UV_NAMED_PIPE: { + assert(!uv_is_closing(reinterpret_cast<uv_handle_t*>(stream))); + if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(stream))) { + uv_close(reinterpret_cast<uv_handle_t*>(stream), + &on_close_delete<uv_pipe_t>); + } + break; + } + default: + assert(false && "Unable to determine stream type"); + } + + stream = nullptr; +} - if (usesTty) { - uv_read_stop(reinterpret_cast<uv_stream_t*>(this->Input.tty)); - uv_close(reinterpret_cast<uv_handle_t*>(this->Input.tty), - &on_close_delete); - uv_close(reinterpret_cast<uv_handle_t*>(this->Output.tty), - &on_close_delete); - } else { - uv_close(reinterpret_cast<uv_handle_t*>(this->Input.pipe), - &on_close_delete); - uv_close(reinterpret_cast<uv_handle_t*>(this->Output.pipe), - &on_close_delete); +bool cmStdIoConnection::OnConnectionShuttingDown() +{ + if (ReadStream) { + uv_read_stop(ReadStream); } + ShutdownStream(ReadStream); + ShutdownStream(WriteStream); + + cmEventBasedConnection::OnConnectionShuttingDown(); + return true; } @@ -93,6 +156,13 @@ void cmConnectionBufferStrategy::clear() { } +std::string cmServerBufferStrategy::BufferOutMessage( + const std::string& rawBuffer) const +{ + return std::string("\n") + kSTART_MAGIC + std::string("\n") + rawBuffer + + kEND_MAGIC + std::string("\n"); +} + std::string cmServerBufferStrategy::BufferMessage(std::string& RawReadBuffer) { for (;;) { diff --git a/Source/cmServerConnection.h b/Source/cmServerConnection.h index df404ce..4ca908d 100644 --- a/Source/cmServerConnection.h +++ b/Source/cmServerConnection.h @@ -25,6 +25,7 @@ class cmServerBufferStrategy : public cmConnectionBufferStrategy { public: std::string BufferMessage(std::string& rawBuffer) override; + std::string BufferOutMessage(const std::string& rawBuffer) const override; private: std::string RequestBuffer; @@ -45,16 +46,8 @@ public: bool OnServeStart(std::string* pString) override; private: - typedef union - { - uv_tty_t* tty; - uv_pipe_t* pipe; - } InOutUnion; - - bool usesTty = false; - - InOutUnion Input; - InOutUnion Output; + void SetupStream(uv_stream_t*& stream, int file_id); + void ShutdownStream(uv_stream_t*& stream); }; /*** |