summaryrefslogtreecommitdiffstats
path: root/Source
diff options
context:
space:
mode:
Diffstat (limited to 'Source')
-rw-r--r--Source/cmConnection.cxx31
-rw-r--r--Source/cmConnection.h18
-rw-r--r--Source/cmPipeConnection.cxx13
-rw-r--r--Source/cmServer.cxx86
-rw-r--r--Source/cmServer.h30
-rw-r--r--Source/cmServerConnection.cxx158
-rw-r--r--Source/cmServerConnection.h13
7 files changed, 230 insertions, 119 deletions
diff --git a/Source/cmConnection.cxx b/Source/cmConnection.cxx
index f3fc1ef..f482412 100644
--- a/Source/cmConnection.cxx
+++ b/Source/cmConnection.cxx
@@ -38,11 +38,6 @@ void cmEventBasedConnection::on_read(uv_stream_t* stream, ssize_t nread,
delete[](buf->base);
}
-void cmEventBasedConnection::on_close_delete(uv_handle_t* handle)
-{
- delete handle;
-}
-
void cmEventBasedConnection::on_close(uv_handle_t* /*handle*/)
{
}
@@ -72,9 +67,19 @@ bool cmEventBasedConnection::IsOpen() const
return this->WriteStream != nullptr;
}
-void cmEventBasedConnection::WriteData(const std::string& data)
+void cmEventBasedConnection::WriteData(const std::string& _data)
{
+#ifndef NDEBUG
+ auto curr_thread_id = uv_thread_self();
+ assert(this->Server);
+ assert(uv_thread_equal(&curr_thread_id, &this->Server->ServeThreadId));
+#endif
+
+ auto data = _data;
assert(this->WriteStream);
+ if (BufferStrategy) {
+ data = BufferStrategy->BufferOutMessage(data);
+ }
auto ds = data.size();
@@ -119,7 +124,9 @@ void cmEventBasedConnection::OnDisconnect(int onerror)
{
(void)onerror;
this->OnConnectionShuttingDown();
- this->Server->OnDisconnect(this);
+ if (this->Server) {
+ this->Server->OnDisconnect(this);
+ }
}
cmConnection::~cmConnection()
@@ -128,6 +135,7 @@ cmConnection::~cmConnection()
bool cmConnection::OnConnectionShuttingDown()
{
+ this->Server = nullptr;
return true;
}
@@ -149,9 +157,12 @@ bool cmConnection::OnServeStart(std::string* errString)
bool cmEventBasedConnection::OnConnectionShuttingDown()
{
- this->WriteStream->data = nullptr;
- this->ReadStream->data = nullptr;
-
+ if (this->WriteStream) {
+ this->WriteStream->data = nullptr;
+ }
+ if (this->ReadStream) {
+ this->ReadStream->data = nullptr;
+ }
this->ReadStream = nullptr;
this->WriteStream = nullptr;
return true;
diff --git a/Source/cmConnection.h b/Source/cmConnection.h
index f9d50de..ddb7744 100644
--- a/Source/cmConnection.h
+++ b/Source/cmConnection.h
@@ -39,6 +39,17 @@ public:
virtual std::string BufferMessage(std::string& rawBuffer) = 0;
/***
+ * Called to properly buffer an outgoing message.
+ *
+ * @param rawBuffer Message to format in the correct way
+ *
+ * @return Formatted message
+ */
+ virtual std::string BufferOutMessage(const std::string& rawBuffer) const
+ {
+ return rawBuffer;
+ };
+ /***
* Resets the internal state of the buffering
*/
virtual void clear();
@@ -100,7 +111,12 @@ public:
uv_stream_t* WriteStream = nullptr;
static void on_close(uv_handle_t* handle);
- static void on_close_delete(uv_handle_t* handle);
+
+ template <typename T>
+ static void on_close_delete(uv_handle_t* handle)
+ {
+ delete reinterpret_cast<T*>(handle);
+ }
protected:
std::string RawReadBuffer;
diff --git a/Source/cmPipeConnection.cxx b/Source/cmPipeConnection.cxx
index b18a1d6..9e565f6 100644
--- a/Source/cmPipeConnection.cxx
+++ b/Source/cmPipeConnection.cxx
@@ -19,7 +19,8 @@ void cmPipeConnection::Connect(uv_stream_t* server)
uv_pipe_init(this->Server->GetLoop(), rejectPipe, 0);
uv_accept(server, reinterpret_cast<uv_stream_t*>(rejectPipe));
- uv_close(reinterpret_cast<uv_handle_t*>(rejectPipe), &on_close_delete);
+ uv_close(reinterpret_cast<uv_handle_t*>(rejectPipe),
+ &on_close_delete<uv_pipe_t>);
return;
}
@@ -28,7 +29,8 @@ void cmPipeConnection::Connect(uv_stream_t* server)
this->ClientPipe->data = static_cast<cmEventBasedConnection*>(this);
auto client = reinterpret_cast<uv_stream_t*>(this->ClientPipe);
if (uv_accept(server, client) != 0) {
- uv_close(reinterpret_cast<uv_handle_t*>(client), &on_close_delete);
+ uv_close(reinterpret_cast<uv_handle_t*>(client),
+ &on_close_delete<uv_pipe_t>);
this->ClientPipe = nullptr;
return;
}
@@ -65,15 +67,16 @@ bool cmPipeConnection::OnConnectionShuttingDown()
{
if (this->ClientPipe) {
uv_close(reinterpret_cast<uv_handle_t*>(this->ClientPipe),
- &on_close_delete);
+ &on_close_delete<uv_pipe_t>);
this->WriteStream->data = nullptr;
}
- uv_close(reinterpret_cast<uv_handle_t*>(this->ServerPipe), &on_close_delete);
+ uv_close(reinterpret_cast<uv_handle_t*>(this->ServerPipe),
+ &on_close_delete<uv_pipe_t>);
this->ClientPipe = nullptr;
this->ServerPipe = nullptr;
this->WriteStream = nullptr;
this->ReadStream = nullptr;
- return cmConnection::OnConnectionShuttingDown();
+ return cmEventBasedConnection::OnConnectionShuttingDown();
}
diff --git a/Source/cmServer.cxx b/Source/cmServer.cxx
index c3e6811..30d0f51 100644
--- a/Source/cmServer.cxx
+++ b/Source/cmServer.cxx
@@ -16,6 +16,7 @@
#include <algorithm>
#include <cassert>
#include <cstdint>
+#include <iostream>
#include <memory>
#include <utility>
@@ -57,10 +58,6 @@ cmServer::cmServer(cmConnection* conn, bool supportExperimental)
cmServer::~cmServer()
{
- if (!this->Protocol) { // Server was never fully started!
- return;
- }
-
for (cmServerProtocol* p : this->SupportedProtocols) {
delete p;
}
@@ -110,6 +107,7 @@ void cmServer::ProcessRequest(cmConnection* connection,
void cmServer::RegisterProtocol(cmServerProtocol* protocol)
{
if (protocol->IsExperimental() && !this->SupportExperimental) {
+ delete protocol;
return;
}
auto version = protocol->ProtocolVersion();
@@ -247,9 +245,11 @@ cmFileMonitor* cmServer::FileMonitor() const
void cmServer::WriteJsonObject(const Json::Value& jsonValue,
const DebugInfo* debug) const
{
+ uv_rwlock_rdlock(&ConnectionsMutex);
for (auto& connection : this->Connections) {
WriteJsonObject(connection.get(), jsonValue, debug);
}
+ uv_rwlock_rdunlock(&ConnectionsMutex);
}
void cmServer::WriteJsonObject(cmConnection* connection,
@@ -285,8 +285,7 @@ void cmServer::WriteJsonObject(cmConnection* connection,
}
}
- connection->WriteData(std::string("\n") + kSTART_MAGIC + std::string("\n") +
- result + kEND_MAGIC + std::string("\n"));
+ connection->WriteData(result);
}
cmServerProtocol* cmServer::FindMatchingProtocol(
@@ -413,18 +412,36 @@ static void __start_thread(void* arg)
{
auto server = reinterpret_cast<cmServerBase*>(arg);
std::string error;
- server->Serve(&error);
+ bool success = server->Serve(&error);
+ if (!success || error.empty() == false) {
+ std::cerr << "Error during serve: " << error << std::endl;
+ }
+}
+
+static void __shutdownThread(uv_async_t* arg)
+{
+ auto server = reinterpret_cast<cmServerBase*>(arg->data);
+ on_walk_to_shutdown(reinterpret_cast<uv_handle_t*>(arg), nullptr);
+ server->StartShutDown();
}
bool cmServerBase::StartServeThread()
{
ServeThreadRunning = true;
+ uv_async_init(&Loop, &this->ShutdownSignal, __shutdownThread);
+ this->ShutdownSignal.data = this;
uv_thread_create(&ServeThread, __start_thread, this);
return true;
}
bool cmServerBase::Serve(std::string* errorMessage)
{
+#ifndef NDEBUG
+ uv_thread_t blank_thread_t = {};
+ assert(uv_thread_equal(&blank_thread_t, &ServeThreadId));
+ ServeThreadId = uv_thread_self();
+#endif
+
errorMessage->clear();
uv_signal_init(&Loop, &this->SIGINTHandler);
@@ -438,15 +455,24 @@ bool cmServerBase::Serve(std::string* errorMessage)
OnServeStart();
- for (auto& connection : Connections) {
- if (!connection->OnServeStart(errorMessage)) {
- return false;
+ {
+ uv_rwlock_rdlock(&ConnectionsMutex);
+ for (auto& connection : Connections) {
+ if (!connection->OnServeStart(errorMessage)) {
+ uv_rwlock_rdunlock(&ConnectionsMutex);
+ return false;
+ }
}
+ uv_rwlock_rdunlock(&ConnectionsMutex);
}
if (uv_run(&Loop, UV_RUN_DEFAULT) != 0) {
+ // It is important we don't ever let the event loop exit with open handles
+ // at best this is a memory leak, but it can also introduce race conditions
+ // which can hang the program.
+ assert(false && "Event loop stopped in unclean state.");
+
*errorMessage = "Internal Error: Event loop stopped in unclean state.";
- StartShutDown();
return false;
}
@@ -458,14 +484,8 @@ void cmServerBase::OnConnected(cmConnection*)
{
}
-void cmServerBase::OnDisconnect()
-{
-}
-
void cmServerBase::OnServeStart()
{
- uv_signal_start(&this->SIGINTHandler, &on_signal, SIGINT);
- uv_signal_start(&this->SIGHUPHandler, &on_signal, SIGHUP);
}
void cmServerBase::StartShutDown()
@@ -480,16 +500,16 @@ void cmServerBase::StartShutDown()
uv_signal_stop(&this->SIGHUPHandler);
}
- for (auto& connection : Connections) {
- connection->OnConnectionShuttingDown();
+ {
+ uv_rwlock_wrlock(&ConnectionsMutex);
+ for (auto& connection : Connections) {
+ connection->OnConnectionShuttingDown();
+ }
+ Connections.clear();
+ uv_rwlock_wrunlock(&ConnectionsMutex);
}
- Connections.clear();
-
- uv_stop(&Loop);
uv_walk(&Loop, on_walk_to_shutdown, nullptr);
-
- uv_run(&Loop, UV_RUN_DEFAULT);
}
bool cmServerBase::OnSignal(int signum)
@@ -501,13 +521,12 @@ bool cmServerBase::OnSignal(int signum)
cmServerBase::cmServerBase(cmConnection* connection)
{
- uv_loop_init(&Loop);
+ auto err = uv_loop_init(&Loop);
+ (void)err;
+ assert(err == 0);
- uv_signal_init(&Loop, &this->SIGINTHandler);
- uv_signal_init(&Loop, &this->SIGHUPHandler);
-
- this->SIGINTHandler.data = this;
- this->SIGHUPHandler.data = this;
+ err = uv_rwlock_init(&ConnectionsMutex);
+ assert(err == 0);
AddNewConnection(connection);
}
@@ -516,16 +535,19 @@ cmServerBase::~cmServerBase()
{
if (ServeThreadRunning) {
- StartShutDown();
+ uv_async_send(&this->ShutdownSignal);
uv_thread_join(&ServeThread);
}
uv_loop_close(&Loop);
+ uv_rwlock_destroy(&ConnectionsMutex);
}
void cmServerBase::AddNewConnection(cmConnection* ownedConnection)
{
+ uv_rwlock_wrlock(&ConnectionsMutex);
Connections.emplace_back(ownedConnection);
+ uv_rwlock_wrunlock(&ConnectionsMutex);
ownedConnection->SetServer(this);
}
@@ -539,9 +561,11 @@ void cmServerBase::OnDisconnect(cmConnection* pConnection)
auto pred = [pConnection](const std::unique_ptr<cmConnection>& m) {
return m.get() == pConnection;
};
+ uv_rwlock_wrlock(&ConnectionsMutex);
Connections.erase(
std::remove_if(Connections.begin(), Connections.end(), pred),
Connections.end());
+ uv_rwlock_wrunlock(&ConnectionsMutex);
if (Connections.empty()) {
StartShutDown();
}
diff --git a/Source/cmServer.h b/Source/cmServer.h
index 9d8473d..15fd2ba 100644
--- a/Source/cmServer.h
+++ b/Source/cmServer.h
@@ -37,13 +37,12 @@ public:
* This should almost always be called by the given connections
* directly.
*
- * @param connection The connectiont the request was received on
+ * @param connection The connection the request was received on
* @param request The actual request
*/
virtual void ProcessRequest(cmConnection* connection,
const std::string& request) = 0;
virtual void OnConnected(cmConnection* connection);
- virtual void OnDisconnect();
/***
* Start a dedicated thread. If this is used to start the server, it will
@@ -62,10 +61,21 @@ public:
void OnDisconnect(cmConnection* pConnection);
protected:
+ mutable uv_rwlock_t ConnectionsMutex;
std::vector<std::unique_ptr<cmConnection>> Connections;
bool ServeThreadRunning = false;
uv_thread_t ServeThread;
+ uv_async_t ShutdownSignal;
+#ifndef NDEBUG
+public:
+ // When the server starts it will mark down it's current thread ID,
+ // which is useful in other contexts to just assert that operations
+ // are performed on that same thread.
+ uv_thread_t ServeThreadId = {};
+
+protected:
+#endif
uv_loop_t Loop;
@@ -140,22 +150,6 @@ private:
cmServerProtocol* Protocol = nullptr;
std::vector<cmServerProtocol*> SupportedProtocols;
- std::string DataBuffer;
- std::string JsonData;
-
- typedef union
- {
- uv_tty_t tty;
- uv_pipe_t pipe;
- } InOutUnion;
-
- InOutUnion Input;
- InOutUnion Output;
- uv_stream_t* InputStream = nullptr;
- uv_stream_t* OutputStream = nullptr;
-
- mutable bool Writing = false;
-
friend class cmServerProtocol;
friend class cmServerRequest;
};
diff --git a/Source/cmServerConnection.cxx b/Source/cmServerConnection.cxx
index 4891131..44af75f 100644
--- a/Source/cmServerConnection.cxx
+++ b/Source/cmServerConnection.cxx
@@ -2,76 +2,139 @@
file Copyright.txt or https://cmake.org/licensing for details. */
#include "cmServerConnection.h"
+#include "cmConfigure.h"
#include "cmServer.h"
#include "cmServerDictionary.h"
+#ifdef _WIN32
+#include "io.h"
+#else
+#include <unistd.h>
+#endif
+#include <cassert>
cmStdIoConnection::cmStdIoConnection(
cmConnectionBufferStrategy* bufferStrategy)
: cmEventBasedConnection(bufferStrategy)
- , Input()
- , Output()
{
}
+void cmStdIoConnection::SetupStream(uv_stream_t*& stream, int file_id)
+{
+ assert(stream == nullptr);
+ switch (uv_guess_handle(file_id)) {
+ case UV_TTY: {
+ auto tty = new uv_tty_t();
+ uv_tty_init(this->Server->GetLoop(), tty, file_id, file_id == 0);
+ uv_tty_set_mode(tty, UV_TTY_MODE_NORMAL);
+ stream = reinterpret_cast<uv_stream_t*>(tty);
+ break;
+ }
+ case UV_FILE:
+ if (file_id == 0) {
+ return;
+ }
+ // Intentional fallthrough; stdin can _not_ be treated as a named
+ // pipe, however stdout can be.
+ CM_FALLTHROUGH;
+ case UV_NAMED_PIPE: {
+ auto pipe = new uv_pipe_t();
+ uv_pipe_init(this->Server->GetLoop(), pipe, 0);
+ uv_pipe_open(pipe, file_id);
+ stream = reinterpret_cast<uv_stream_t*>(pipe);
+ break;
+ }
+ default:
+ assert(false && "Unable to determine stream type");
+ return;
+ }
+ stream->data = static_cast<cmEventBasedConnection*>(this);
+}
+
void cmStdIoConnection::SetServer(cmServerBase* s)
{
cmConnection::SetServer(s);
+ if (!s) {
+ return;
+ }
- if (uv_guess_handle(1) == UV_TTY) {
- usesTty = true;
-
- this->Input.tty = new uv_tty_t();
- uv_tty_init(this->Server->GetLoop(), this->Input.tty, 0, 1);
- uv_tty_set_mode(this->Input.tty, UV_TTY_MODE_NORMAL);
- this->Input.tty->data = static_cast<cmEventBasedConnection*>(this);
- this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.tty);
-
- this->Output.tty = new uv_tty_t();
- uv_tty_init(this->Server->GetLoop(), this->Output.tty, 1, 0);
- uv_tty_set_mode(this->Output.tty, UV_TTY_MODE_NORMAL);
- this->Output.tty->data = static_cast<cmEventBasedConnection*>(this);
- this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.tty);
- } else {
- usesTty = false;
-
- this->Input.pipe = new uv_pipe_t();
- uv_pipe_init(this->Server->GetLoop(), this->Input.pipe, 0);
- uv_pipe_open(this->Input.pipe, 0);
- this->Input.pipe->data = static_cast<cmEventBasedConnection*>(this);
- this->ReadStream = reinterpret_cast<uv_stream_t*>(this->Input.pipe);
-
- this->Output.pipe = new uv_pipe_t();
- uv_pipe_init(this->Server->GetLoop(), this->Output.pipe, 0);
- uv_pipe_open(this->Output.pipe, 1);
- this->Output.pipe->data = static_cast<cmEventBasedConnection*>(this);
- this->WriteStream = reinterpret_cast<uv_stream_t*>(this->Output.pipe);
+ SetupStream(this->ReadStream, 0);
+ SetupStream(this->WriteStream, 1);
+}
+
+void shutdown_connection(uv_prepare_t* prepare)
+{
+ cmStdIoConnection* connection =
+ reinterpret_cast<cmStdIoConnection*>(prepare->data);
+
+ if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(prepare))) {
+ uv_close(reinterpret_cast<uv_handle_t*>(prepare),
+ &cmEventBasedConnection::on_close_delete<uv_prepare_t>);
}
+ connection->OnDisconnect(0);
}
bool cmStdIoConnection::OnServeStart(std::string* pString)
{
- uv_read_start(this->ReadStream, on_alloc_buffer, on_read);
Server->OnConnected(this);
+ if (this->ReadStream) {
+ uv_read_start(this->ReadStream, on_alloc_buffer, on_read);
+ } else if (uv_guess_handle(0) == UV_FILE) {
+ char buffer[1024];
+ while (auto len = read(0, buffer, sizeof(buffer))) {
+ ReadData(std::string(buffer, buffer + len));
+ }
+
+ // We can't start the disconnect from here, add a prepare hook to do that
+ // for us
+ auto prepare = new uv_prepare_t();
+ prepare->data = this;
+ uv_prepare_init(Server->GetLoop(), prepare);
+ uv_prepare_start(prepare, shutdown_connection);
+ }
return cmConnection::OnServeStart(pString);
}
-bool cmStdIoConnection::OnConnectionShuttingDown()
+void cmStdIoConnection::ShutdownStream(uv_stream_t*& stream)
{
- cmEventBasedConnection::OnConnectionShuttingDown();
+ if (!stream) {
+ return;
+ }
+ switch (stream->type) {
+ case UV_TTY: {
+ assert(!uv_is_closing(reinterpret_cast<uv_handle_t*>(stream)));
+ if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(stream))) {
+ uv_close(reinterpret_cast<uv_handle_t*>(stream),
+ &on_close_delete<uv_tty_t>);
+ }
+ break;
+ }
+ case UV_FILE:
+ case UV_NAMED_PIPE: {
+ assert(!uv_is_closing(reinterpret_cast<uv_handle_t*>(stream)));
+ if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(stream))) {
+ uv_close(reinterpret_cast<uv_handle_t*>(stream),
+ &on_close_delete<uv_pipe_t>);
+ }
+ break;
+ }
+ default:
+ assert(false && "Unable to determine stream type");
+ }
+
+ stream = nullptr;
+}
- if (usesTty) {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(this->Input.tty));
- uv_close(reinterpret_cast<uv_handle_t*>(this->Input.tty),
- &on_close_delete);
- uv_close(reinterpret_cast<uv_handle_t*>(this->Output.tty),
- &on_close_delete);
- } else {
- uv_close(reinterpret_cast<uv_handle_t*>(this->Input.pipe),
- &on_close_delete);
- uv_close(reinterpret_cast<uv_handle_t*>(this->Output.pipe),
- &on_close_delete);
+bool cmStdIoConnection::OnConnectionShuttingDown()
+{
+ if (ReadStream) {
+ uv_read_stop(ReadStream);
}
+ ShutdownStream(ReadStream);
+ ShutdownStream(WriteStream);
+
+ cmEventBasedConnection::OnConnectionShuttingDown();
+
return true;
}
@@ -93,6 +156,13 @@ void cmConnectionBufferStrategy::clear()
{
}
+std::string cmServerBufferStrategy::BufferOutMessage(
+ const std::string& rawBuffer) const
+{
+ return std::string("\n") + kSTART_MAGIC + std::string("\n") + rawBuffer +
+ kEND_MAGIC + std::string("\n");
+}
+
std::string cmServerBufferStrategy::BufferMessage(std::string& RawReadBuffer)
{
for (;;) {
diff --git a/Source/cmServerConnection.h b/Source/cmServerConnection.h
index df404ce..4ca908d 100644
--- a/Source/cmServerConnection.h
+++ b/Source/cmServerConnection.h
@@ -25,6 +25,7 @@ class cmServerBufferStrategy : public cmConnectionBufferStrategy
{
public:
std::string BufferMessage(std::string& rawBuffer) override;
+ std::string BufferOutMessage(const std::string& rawBuffer) const override;
private:
std::string RequestBuffer;
@@ -45,16 +46,8 @@ public:
bool OnServeStart(std::string* pString) override;
private:
- typedef union
- {
- uv_tty_t* tty;
- uv_pipe_t* pipe;
- } InOutUnion;
-
- bool usesTty = false;
-
- InOutUnion Input;
- InOutUnion Output;
+ void SetupStream(uv_stream_t*& stream, int file_id);
+ void ShutdownStream(uv_stream_t*& stream);
};
/***