diff options
Diffstat (limited to 'src/session.cpp')
-rw-r--r-- | src/session.cpp | 516 |
1 files changed, 516 insertions, 0 deletions
diff --git a/src/session.cpp b/src/session.cpp new file mode 100644 index 0000000..d88a697 --- /dev/null +++ b/src/session.cpp @@ -0,0 +1,516 @@ +// Copyright 2019 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "content_stream.h" + +#include "dap/any.h" +#include "dap/session.h" + +#include "chan.h" +#include "json_serializer.h" +#include "socket.h" + +#include <stdarg.h> +#include <stdio.h> +#include <atomic> +#include <deque> +#include <memory> +#include <mutex> +#include <thread> +#include <unordered_map> +#include <vector> + +namespace { + +class Impl : public dap::Session { + public: + void onError(const ErrorHandler& handler) override { handlers.put(handler); } + + void registerHandler(const dap::TypeInfo* typeinfo, + const GenericRequestHandler& handler) override { + handlers.put(typeinfo, handler); + } + + void registerHandler(const dap::TypeInfo* typeinfo, + const GenericEventHandler& handler) override { + handlers.put(typeinfo, handler); + } + + void registerHandler(const dap::TypeInfo* typeinfo, + const GenericResponseSentHandler& handler) override { + handlers.put(typeinfo, handler); + } + + std::function<void()> getPayload() override { + auto request = reader.read(); + if (request.size() > 0) { + if (auto payload = processMessage(request)) { + return payload; + } + } + return {}; + } + + void connect(const std::shared_ptr<dap::Reader>& r, + const std::shared_ptr<dap::Writer>& w) override { + if (isBound.exchange(true)) { + handlers.error("Session::connect called twice"); + return; + } + + reader = dap::ContentReader(r); + writer = dap::ContentWriter(w); + } + + void startProcessingMessages( + const ClosedHandler& onClose /* = {} */) override { + if (isProcessingMessages.exchange(true)) { + handlers.error("Session::startProcessingMessages() called twice"); + return; + } + recvThread = std::thread([this, onClose] { + while (reader.isOpen()) { + if (auto payload = getPayload()) { + inbox.put(std::move(payload)); + } + } + if (onClose) { + onClose(); + } + }); + + dispatchThread = std::thread([this] { + while (auto payload = inbox.take()) { + payload.value()(); + } + }); + } + + bool send(const dap::TypeInfo* requestTypeInfo, + const dap::TypeInfo* responseTypeInfo, + const void* request, + const GenericResponseHandler& responseHandler) override { + int seq = nextSeq++; + + handlers.put(seq, responseTypeInfo, responseHandler); + + dap::json::Serializer s; + if (!s.object([&](dap::FieldSerializer* fs) { + return fs->field("seq", dap::integer(seq)) && + fs->field("type", "request") && + fs->field("command", requestTypeInfo->name()) && + fs->field("arguments", [&](dap::Serializer* s) { + return requestTypeInfo->serialize(s, request); + }); + })) { + return false; + } + return send(s.dump()); + } + + bool send(const dap::TypeInfo* typeinfo, const void* event) override { + dap::json::Serializer s; + if (!s.object([&](dap::FieldSerializer* fs) { + return fs->field("seq", dap::integer(nextSeq++)) && + fs->field("type", "event") && + fs->field("event", typeinfo->name()) && + fs->field("body", [&](dap::Serializer* s) { + return typeinfo->serialize(s, event); + }); + })) { + return false; + } + return send(s.dump()); + } + + ~Impl() { + inbox.close(); + reader.close(); + writer.close(); + if (recvThread.joinable()) { + recvThread.join(); + } + if (dispatchThread.joinable()) { + dispatchThread.join(); + } + } + + private: + using Payload = std::function<void()>; + + class EventHandlers { + public: + void put(const ErrorHandler& handler) { + std::unique_lock<std::mutex> lock(errorMutex); + errorHandler = handler; + } + + void error(const char* format, ...) { + va_list vararg; + va_start(vararg, format); + std::unique_lock<std::mutex> lock(errorMutex); + errorLocked(format, vararg); + va_end(vararg); + } + + std::pair<const dap::TypeInfo*, GenericRequestHandler> request( + const std::string& name) { + std::unique_lock<std::mutex> lock(requestMutex); + auto it = requestMap.find(name); + return (it != requestMap.end()) ? it->second : decltype(it->second){}; + } + + void put(const dap::TypeInfo* typeinfo, + const GenericRequestHandler& handler) { + std::unique_lock<std::mutex> lock(requestMutex); + auto added = + requestMap + .emplace(typeinfo->name(), std::make_pair(typeinfo, handler)) + .second; + if (!added) { + errorfLocked("Request handler for '%s' already registered", + typeinfo->name().c_str()); + } + } + + std::pair<const dap::TypeInfo*, GenericResponseHandler> response( + int64_t seq) { + std::unique_lock<std::mutex> lock(responseMutex); + auto responseIt = responseMap.find(seq); + if (responseIt == responseMap.end()) { + errorfLocked("Unknown response with sequence %d", seq); + return {}; + } + auto out = std::move(responseIt->second); + responseMap.erase(seq); + return out; + } + + void put(int seq, + const dap::TypeInfo* typeinfo, + const GenericResponseHandler& handler) { + std::unique_lock<std::mutex> lock(responseMutex); + auto added = + responseMap.emplace(seq, std::make_pair(typeinfo, handler)).second; + if (!added) { + errorfLocked("Response handler for sequence %d already registered", + seq); + } + } + + std::pair<const dap::TypeInfo*, GenericEventHandler> event( + const std::string& name) { + std::unique_lock<std::mutex> lock(eventMutex); + auto it = eventMap.find(name); + return (it != eventMap.end()) ? it->second : decltype(it->second){}; + } + + void put(const dap::TypeInfo* typeinfo, + const GenericEventHandler& handler) { + std::unique_lock<std::mutex> lock(eventMutex); + auto added = + eventMap.emplace(typeinfo->name(), std::make_pair(typeinfo, handler)) + .second; + if (!added) { + errorfLocked("Event handler for '%s' already registered", + typeinfo->name().c_str()); + } + } + + GenericResponseSentHandler responseSent(const dap::TypeInfo* typeinfo) { + std::unique_lock<std::mutex> lock(responseSentMutex); + auto it = responseSentMap.find(typeinfo); + return (it != responseSentMap.end()) ? it->second + : decltype(it->second){}; + } + + void put(const dap::TypeInfo* typeinfo, + const GenericResponseSentHandler& handler) { + std::unique_lock<std::mutex> lock(responseSentMutex); + auto added = responseSentMap.emplace(typeinfo, handler).second; + if (!added) { + errorfLocked("Response sent handler for '%s' already registered", + typeinfo->name().c_str()); + } + } + + private: + void errorfLocked(const char* format, ...) { + va_list vararg; + va_start(vararg, format); + errorLocked(format, vararg); + va_end(vararg); + } + + void errorLocked(const char* format, va_list args) { + char buf[2048]; + vsnprintf(buf, sizeof(buf), format, args); + if (errorHandler) { + errorHandler(buf); + } + } + + std::mutex errorMutex; + ErrorHandler errorHandler; + + std::mutex requestMutex; + std::unordered_map<std::string, + std::pair<const dap::TypeInfo*, GenericRequestHandler>> + requestMap; + + std::mutex responseMutex; + std::unordered_map<int64_t, + std::pair<const dap::TypeInfo*, GenericResponseHandler>> + responseMap; + + std::mutex eventMutex; + std::unordered_map<std::string, + std::pair<const dap::TypeInfo*, GenericEventHandler>> + eventMap; + + std::mutex responseSentMutex; + std::unordered_map<const dap::TypeInfo*, GenericResponseSentHandler> + responseSentMap; + }; // EventHandlers + + Payload processMessage(const std::string& str) { + auto d = dap::json::Deserializer(str); + dap::string type; + if (!d.field("type", &type)) { + handlers.error("Message missing string 'type' field"); + return {}; + } + + dap::integer sequence = 0; + if (!d.field("seq", &sequence)) { + handlers.error("Message missing number 'seq' field"); + return {}; + } + + if (type == "request") { + return processRequest(&d, sequence); + } else if (type == "event") { + return processEvent(&d); + } else if (type == "response") { + processResponse(&d); + return {}; + } else { + handlers.error("Unknown message type '%s'", type.c_str()); + } + + return {}; + } + + Payload processRequest(dap::json::Deserializer* d, dap::integer sequence) { + dap::string command; + if (!d->field("command", &command)) { + handlers.error("Request missing string 'command' field"); + return {}; + } + + const dap::TypeInfo* typeinfo; + GenericRequestHandler handler; + std::tie(typeinfo, handler) = handlers.request(command); + if (!typeinfo) { + handlers.error("No request handler registered for command '%s'", + command.c_str()); + return {}; + } + + auto data = new uint8_t[typeinfo->size()]; + typeinfo->construct(data); + + if (!d->field("arguments", [&](dap::Deserializer* d) { + return typeinfo->deserialize(d, data); + })) { + handlers.error("Failed to deserialize request"); + typeinfo->destruct(data); + delete[] data; + return {}; + } + + return [=] { + handler( + data, + [=](const dap::TypeInfo* typeinfo, const void* data) { + // onSuccess + dap::json::Serializer s; + s.object([&](dap::FieldSerializer* fs) { + return fs->field("seq", dap::integer(nextSeq++)) && + fs->field("type", "response") && + fs->field("request_seq", sequence) && + fs->field("success", dap::boolean(true)) && + fs->field("command", command) && + fs->field("body", [&](dap::Serializer* s) { + return typeinfo->serialize(s, data); + }); + }); + send(s.dump()); + + if (auto handler = handlers.responseSent(typeinfo)) { + handler(data, nullptr); + } + }, + [=](const dap::TypeInfo* typeinfo, const dap::Error& error) { + // onError + dap::json::Serializer s; + s.object([&](dap::FieldSerializer* fs) { + return fs->field("seq", dap::integer(nextSeq++)) && + fs->field("type", "response") && + fs->field("request_seq", sequence) && + fs->field("success", dap::boolean(false)) && + fs->field("command", command) && + fs->field("message", error.message); + }); + send(s.dump()); + + if (auto handler = handlers.responseSent(typeinfo)) { + handler(nullptr, &error); + } + }); + typeinfo->destruct(data); + delete[] data; + }; + } + + Payload processEvent(dap::json::Deserializer* d) { + dap::string event; + if (!d->field("event", &event)) { + handlers.error("Event missing string 'event' field"); + return {}; + } + + const dap::TypeInfo* typeinfo; + GenericEventHandler handler; + std::tie(typeinfo, handler) = handlers.event(event); + if (!typeinfo) { + handlers.error("No event handler registered for event '%s'", + event.c_str()); + return {}; + } + + auto data = new uint8_t[typeinfo->size()]; + typeinfo->construct(data); + + // "body" is an optional field for some events, such as "Terminated Event". + bool body_ok = true; + d->field("body", [&](dap::Deserializer* d) { + if (!typeinfo->deserialize(d, data)) { + body_ok = false; + } + return true; + }); + + if (!body_ok) { + handlers.error("Failed to deserialize event '%s' body", event.c_str()); + typeinfo->destruct(data); + delete[] data; + return {}; + } + + return [=] { + handler(data); + typeinfo->destruct(data); + delete[] data; + }; + } + + void processResponse(const dap::Deserializer* d) { + dap::integer requestSeq = 0; + if (!d->field("request_seq", &requestSeq)) { + handlers.error("Response missing int 'request_seq' field"); + return; + } + + const dap::TypeInfo* typeinfo; + GenericResponseHandler handler; + std::tie(typeinfo, handler) = handlers.response(requestSeq); + if (!typeinfo) { + handlers.error("Unknown response with sequence %d", requestSeq); + return; + } + + dap::boolean success = false; + if (!d->field("success", &success)) { + handlers.error("Response missing int 'success' field"); + return; + } + + if (success) { + auto data = std::unique_ptr<uint8_t[]>(new uint8_t[typeinfo->size()]); + typeinfo->construct(data.get()); + + // "body" field in Response is an optional field. + d->field("body", [&](const dap::Deserializer* d) { + return typeinfo->deserialize(d, data.get()); + }); + + handler(data.get(), nullptr); + typeinfo->destruct(data.get()); + } else { + std::string message; + if (!d->field("message", &message)) { + handlers.error("Failed to deserialize message"); + return; + } + auto error = dap::Error("%s", message.c_str()); + handler(nullptr, &error); + } + } + + bool send(const std::string& s) { + std::unique_lock<std::mutex> lock(sendMutex); + if (!writer.isOpen()) { + handlers.error("Send failed as the writer is closed"); + return false; + } + return writer.write(s); + } + + std::atomic<bool> isBound = {false}; + std::atomic<bool> isProcessingMessages = {false}; + dap::ContentReader reader; + dap::ContentWriter writer; + + std::atomic<bool> shutdown = {false}; + EventHandlers handlers; + std::thread recvThread; + std::thread dispatchThread; + dap::Chan<Payload> inbox; + std::atomic<uint32_t> nextSeq = {1}; + std::mutex sendMutex; +}; + +} // anonymous namespace + +namespace dap { + +Error::Error(const std::string& message) : message(message) {} + +Error::Error(const char* msg, ...) { + char buf[2048]; + va_list vararg; + va_start(vararg, msg); + vsnprintf(buf, sizeof(buf), msg, vararg); + va_end(vararg); + message = buf; +} + +Session::~Session() = default; + +std::unique_ptr<Session> Session::create() { + return std::unique_ptr<Session>(new Impl()); +} + +} // namespace dap |