summaryrefslogtreecommitdiffstats
path: root/src/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp516
1 files changed, 516 insertions, 0 deletions
diff --git a/src/session.cpp b/src/session.cpp
new file mode 100644
index 0000000..d88a697
--- /dev/null
+++ b/src/session.cpp
@@ -0,0 +1,516 @@
+// Copyright 2019 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "content_stream.h"
+
+#include "dap/any.h"
+#include "dap/session.h"
+
+#include "chan.h"
+#include "json_serializer.h"
+#include "socket.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <atomic>
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+namespace {
+
+class Impl : public dap::Session {
+ public:
+ void onError(const ErrorHandler& handler) override { handlers.put(handler); }
+
+ void registerHandler(const dap::TypeInfo* typeinfo,
+ const GenericRequestHandler& handler) override {
+ handlers.put(typeinfo, handler);
+ }
+
+ void registerHandler(const dap::TypeInfo* typeinfo,
+ const GenericEventHandler& handler) override {
+ handlers.put(typeinfo, handler);
+ }
+
+ void registerHandler(const dap::TypeInfo* typeinfo,
+ const GenericResponseSentHandler& handler) override {
+ handlers.put(typeinfo, handler);
+ }
+
+ std::function<void()> getPayload() override {
+ auto request = reader.read();
+ if (request.size() > 0) {
+ if (auto payload = processMessage(request)) {
+ return payload;
+ }
+ }
+ return {};
+ }
+
+ void connect(const std::shared_ptr<dap::Reader>& r,
+ const std::shared_ptr<dap::Writer>& w) override {
+ if (isBound.exchange(true)) {
+ handlers.error("Session::connect called twice");
+ return;
+ }
+
+ reader = dap::ContentReader(r);
+ writer = dap::ContentWriter(w);
+ }
+
+ void startProcessingMessages(
+ const ClosedHandler& onClose /* = {} */) override {
+ if (isProcessingMessages.exchange(true)) {
+ handlers.error("Session::startProcessingMessages() called twice");
+ return;
+ }
+ recvThread = std::thread([this, onClose] {
+ while (reader.isOpen()) {
+ if (auto payload = getPayload()) {
+ inbox.put(std::move(payload));
+ }
+ }
+ if (onClose) {
+ onClose();
+ }
+ });
+
+ dispatchThread = std::thread([this] {
+ while (auto payload = inbox.take()) {
+ payload.value()();
+ }
+ });
+ }
+
+ bool send(const dap::TypeInfo* requestTypeInfo,
+ const dap::TypeInfo* responseTypeInfo,
+ const void* request,
+ const GenericResponseHandler& responseHandler) override {
+ int seq = nextSeq++;
+
+ handlers.put(seq, responseTypeInfo, responseHandler);
+
+ dap::json::Serializer s;
+ if (!s.object([&](dap::FieldSerializer* fs) {
+ return fs->field("seq", dap::integer(seq)) &&
+ fs->field("type", "request") &&
+ fs->field("command", requestTypeInfo->name()) &&
+ fs->field("arguments", [&](dap::Serializer* s) {
+ return requestTypeInfo->serialize(s, request);
+ });
+ })) {
+ return false;
+ }
+ return send(s.dump());
+ }
+
+ bool send(const dap::TypeInfo* typeinfo, const void* event) override {
+ dap::json::Serializer s;
+ if (!s.object([&](dap::FieldSerializer* fs) {
+ return fs->field("seq", dap::integer(nextSeq++)) &&
+ fs->field("type", "event") &&
+ fs->field("event", typeinfo->name()) &&
+ fs->field("body", [&](dap::Serializer* s) {
+ return typeinfo->serialize(s, event);
+ });
+ })) {
+ return false;
+ }
+ return send(s.dump());
+ }
+
+ ~Impl() {
+ inbox.close();
+ reader.close();
+ writer.close();
+ if (recvThread.joinable()) {
+ recvThread.join();
+ }
+ if (dispatchThread.joinable()) {
+ dispatchThread.join();
+ }
+ }
+
+ private:
+ using Payload = std::function<void()>;
+
+ class EventHandlers {
+ public:
+ void put(const ErrorHandler& handler) {
+ std::unique_lock<std::mutex> lock(errorMutex);
+ errorHandler = handler;
+ }
+
+ void error(const char* format, ...) {
+ va_list vararg;
+ va_start(vararg, format);
+ std::unique_lock<std::mutex> lock(errorMutex);
+ errorLocked(format, vararg);
+ va_end(vararg);
+ }
+
+ std::pair<const dap::TypeInfo*, GenericRequestHandler> request(
+ const std::string& name) {
+ std::unique_lock<std::mutex> lock(requestMutex);
+ auto it = requestMap.find(name);
+ return (it != requestMap.end()) ? it->second : decltype(it->second){};
+ }
+
+ void put(const dap::TypeInfo* typeinfo,
+ const GenericRequestHandler& handler) {
+ std::unique_lock<std::mutex> lock(requestMutex);
+ auto added =
+ requestMap
+ .emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
+ .second;
+ if (!added) {
+ errorfLocked("Request handler for '%s' already registered",
+ typeinfo->name().c_str());
+ }
+ }
+
+ std::pair<const dap::TypeInfo*, GenericResponseHandler> response(
+ int64_t seq) {
+ std::unique_lock<std::mutex> lock(responseMutex);
+ auto responseIt = responseMap.find(seq);
+ if (responseIt == responseMap.end()) {
+ errorfLocked("Unknown response with sequence %d", seq);
+ return {};
+ }
+ auto out = std::move(responseIt->second);
+ responseMap.erase(seq);
+ return out;
+ }
+
+ void put(int seq,
+ const dap::TypeInfo* typeinfo,
+ const GenericResponseHandler& handler) {
+ std::unique_lock<std::mutex> lock(responseMutex);
+ auto added =
+ responseMap.emplace(seq, std::make_pair(typeinfo, handler)).second;
+ if (!added) {
+ errorfLocked("Response handler for sequence %d already registered",
+ seq);
+ }
+ }
+
+ std::pair<const dap::TypeInfo*, GenericEventHandler> event(
+ const std::string& name) {
+ std::unique_lock<std::mutex> lock(eventMutex);
+ auto it = eventMap.find(name);
+ return (it != eventMap.end()) ? it->second : decltype(it->second){};
+ }
+
+ void put(const dap::TypeInfo* typeinfo,
+ const GenericEventHandler& handler) {
+ std::unique_lock<std::mutex> lock(eventMutex);
+ auto added =
+ eventMap.emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
+ .second;
+ if (!added) {
+ errorfLocked("Event handler for '%s' already registered",
+ typeinfo->name().c_str());
+ }
+ }
+
+ GenericResponseSentHandler responseSent(const dap::TypeInfo* typeinfo) {
+ std::unique_lock<std::mutex> lock(responseSentMutex);
+ auto it = responseSentMap.find(typeinfo);
+ return (it != responseSentMap.end()) ? it->second
+ : decltype(it->second){};
+ }
+
+ void put(const dap::TypeInfo* typeinfo,
+ const GenericResponseSentHandler& handler) {
+ std::unique_lock<std::mutex> lock(responseSentMutex);
+ auto added = responseSentMap.emplace(typeinfo, handler).second;
+ if (!added) {
+ errorfLocked("Response sent handler for '%s' already registered",
+ typeinfo->name().c_str());
+ }
+ }
+
+ private:
+ void errorfLocked(const char* format, ...) {
+ va_list vararg;
+ va_start(vararg, format);
+ errorLocked(format, vararg);
+ va_end(vararg);
+ }
+
+ void errorLocked(const char* format, va_list args) {
+ char buf[2048];
+ vsnprintf(buf, sizeof(buf), format, args);
+ if (errorHandler) {
+ errorHandler(buf);
+ }
+ }
+
+ std::mutex errorMutex;
+ ErrorHandler errorHandler;
+
+ std::mutex requestMutex;
+ std::unordered_map<std::string,
+ std::pair<const dap::TypeInfo*, GenericRequestHandler>>
+ requestMap;
+
+ std::mutex responseMutex;
+ std::unordered_map<int64_t,
+ std::pair<const dap::TypeInfo*, GenericResponseHandler>>
+ responseMap;
+
+ std::mutex eventMutex;
+ std::unordered_map<std::string,
+ std::pair<const dap::TypeInfo*, GenericEventHandler>>
+ eventMap;
+
+ std::mutex responseSentMutex;
+ std::unordered_map<const dap::TypeInfo*, GenericResponseSentHandler>
+ responseSentMap;
+ }; // EventHandlers
+
+ Payload processMessage(const std::string& str) {
+ auto d = dap::json::Deserializer(str);
+ dap::string type;
+ if (!d.field("type", &type)) {
+ handlers.error("Message missing string 'type' field");
+ return {};
+ }
+
+ dap::integer sequence = 0;
+ if (!d.field("seq", &sequence)) {
+ handlers.error("Message missing number 'seq' field");
+ return {};
+ }
+
+ if (type == "request") {
+ return processRequest(&d, sequence);
+ } else if (type == "event") {
+ return processEvent(&d);
+ } else if (type == "response") {
+ processResponse(&d);
+ return {};
+ } else {
+ handlers.error("Unknown message type '%s'", type.c_str());
+ }
+
+ return {};
+ }
+
+ Payload processRequest(dap::json::Deserializer* d, dap::integer sequence) {
+ dap::string command;
+ if (!d->field("command", &command)) {
+ handlers.error("Request missing string 'command' field");
+ return {};
+ }
+
+ const dap::TypeInfo* typeinfo;
+ GenericRequestHandler handler;
+ std::tie(typeinfo, handler) = handlers.request(command);
+ if (!typeinfo) {
+ handlers.error("No request handler registered for command '%s'",
+ command.c_str());
+ return {};
+ }
+
+ auto data = new uint8_t[typeinfo->size()];
+ typeinfo->construct(data);
+
+ if (!d->field("arguments", [&](dap::Deserializer* d) {
+ return typeinfo->deserialize(d, data);
+ })) {
+ handlers.error("Failed to deserialize request");
+ typeinfo->destruct(data);
+ delete[] data;
+ return {};
+ }
+
+ return [=] {
+ handler(
+ data,
+ [=](const dap::TypeInfo* typeinfo, const void* data) {
+ // onSuccess
+ dap::json::Serializer s;
+ s.object([&](dap::FieldSerializer* fs) {
+ return fs->field("seq", dap::integer(nextSeq++)) &&
+ fs->field("type", "response") &&
+ fs->field("request_seq", sequence) &&
+ fs->field("success", dap::boolean(true)) &&
+ fs->field("command", command) &&
+ fs->field("body", [&](dap::Serializer* s) {
+ return typeinfo->serialize(s, data);
+ });
+ });
+ send(s.dump());
+
+ if (auto handler = handlers.responseSent(typeinfo)) {
+ handler(data, nullptr);
+ }
+ },
+ [=](const dap::TypeInfo* typeinfo, const dap::Error& error) {
+ // onError
+ dap::json::Serializer s;
+ s.object([&](dap::FieldSerializer* fs) {
+ return fs->field("seq", dap::integer(nextSeq++)) &&
+ fs->field("type", "response") &&
+ fs->field("request_seq", sequence) &&
+ fs->field("success", dap::boolean(false)) &&
+ fs->field("command", command) &&
+ fs->field("message", error.message);
+ });
+ send(s.dump());
+
+ if (auto handler = handlers.responseSent(typeinfo)) {
+ handler(nullptr, &error);
+ }
+ });
+ typeinfo->destruct(data);
+ delete[] data;
+ };
+ }
+
+ Payload processEvent(dap::json::Deserializer* d) {
+ dap::string event;
+ if (!d->field("event", &event)) {
+ handlers.error("Event missing string 'event' field");
+ return {};
+ }
+
+ const dap::TypeInfo* typeinfo;
+ GenericEventHandler handler;
+ std::tie(typeinfo, handler) = handlers.event(event);
+ if (!typeinfo) {
+ handlers.error("No event handler registered for event '%s'",
+ event.c_str());
+ return {};
+ }
+
+ auto data = new uint8_t[typeinfo->size()];
+ typeinfo->construct(data);
+
+ // "body" is an optional field for some events, such as "Terminated Event".
+ bool body_ok = true;
+ d->field("body", [&](dap::Deserializer* d) {
+ if (!typeinfo->deserialize(d, data)) {
+ body_ok = false;
+ }
+ return true;
+ });
+
+ if (!body_ok) {
+ handlers.error("Failed to deserialize event '%s' body", event.c_str());
+ typeinfo->destruct(data);
+ delete[] data;
+ return {};
+ }
+
+ return [=] {
+ handler(data);
+ typeinfo->destruct(data);
+ delete[] data;
+ };
+ }
+
+ void processResponse(const dap::Deserializer* d) {
+ dap::integer requestSeq = 0;
+ if (!d->field("request_seq", &requestSeq)) {
+ handlers.error("Response missing int 'request_seq' field");
+ return;
+ }
+
+ const dap::TypeInfo* typeinfo;
+ GenericResponseHandler handler;
+ std::tie(typeinfo, handler) = handlers.response(requestSeq);
+ if (!typeinfo) {
+ handlers.error("Unknown response with sequence %d", requestSeq);
+ return;
+ }
+
+ dap::boolean success = false;
+ if (!d->field("success", &success)) {
+ handlers.error("Response missing int 'success' field");
+ return;
+ }
+
+ if (success) {
+ auto data = std::unique_ptr<uint8_t[]>(new uint8_t[typeinfo->size()]);
+ typeinfo->construct(data.get());
+
+ // "body" field in Response is an optional field.
+ d->field("body", [&](const dap::Deserializer* d) {
+ return typeinfo->deserialize(d, data.get());
+ });
+
+ handler(data.get(), nullptr);
+ typeinfo->destruct(data.get());
+ } else {
+ std::string message;
+ if (!d->field("message", &message)) {
+ handlers.error("Failed to deserialize message");
+ return;
+ }
+ auto error = dap::Error("%s", message.c_str());
+ handler(nullptr, &error);
+ }
+ }
+
+ bool send(const std::string& s) {
+ std::unique_lock<std::mutex> lock(sendMutex);
+ if (!writer.isOpen()) {
+ handlers.error("Send failed as the writer is closed");
+ return false;
+ }
+ return writer.write(s);
+ }
+
+ std::atomic<bool> isBound = {false};
+ std::atomic<bool> isProcessingMessages = {false};
+ dap::ContentReader reader;
+ dap::ContentWriter writer;
+
+ std::atomic<bool> shutdown = {false};
+ EventHandlers handlers;
+ std::thread recvThread;
+ std::thread dispatchThread;
+ dap::Chan<Payload> inbox;
+ std::atomic<uint32_t> nextSeq = {1};
+ std::mutex sendMutex;
+};
+
+} // anonymous namespace
+
+namespace dap {
+
+Error::Error(const std::string& message) : message(message) {}
+
+Error::Error(const char* msg, ...) {
+ char buf[2048];
+ va_list vararg;
+ va_start(vararg, msg);
+ vsnprintf(buf, sizeof(buf), msg, vararg);
+ va_end(vararg);
+ message = buf;
+}
+
+Session::~Session() = default;
+
+std::unique_ptr<Session> Session::create() {
+ return std::unique_ptr<Session>(new Impl());
+}
+
+} // namespace dap