Separation of concerns (separate transport and comms layer)

This commit is contained in:
Piotr Wilkin 2026-02-05 21:28:24 +01:00
parent 17919e700d
commit 9e60c0c83f
1 changed files with 203 additions and 153 deletions

View File

@ -1,12 +1,11 @@
#pragma once
#include "../../vendor/sheredom/subprocess.h"
#include "common.h"
#include "log.h"
#include <atomic>
#include <condition_variable>
#include <fstream>
#include <functional>
#include <future>
#include <iostream>
#include <map>
@ -26,7 +25,9 @@ struct mcp_tool {
std::string server_name;
};
class mcp_server {
// Low-level transport layer for subprocess-based MCP communication.
// Handles subprocess lifecycle, raw I/O, and line-based message framing.
class mcp_transport {
std::string name;
std::string command;
std::vector<std::string> args;
@ -38,27 +39,24 @@ class mcp_server {
std::thread err_thread;
std::atomic<bool> stop_read{ false };
std::mutex mutex;
int next_id = 1;
std::map<int, std::promise<json>> pending_requests;
// Buffer for reading
std::string read_buffer;
std::function<void(const std::string &)> line_handler;
public:
mcp_server(const std::string & name,
const std::string & cmd,
const std::vector<std::string> & args,
const std::map<std::string, std::string> & env) :
mcp_transport(const std::string & name,
const std::string & cmd,
const std::vector<std::string> & args,
const std::map<std::string, std::string> & env) :
name(name),
command(cmd),
args(args),
env(env) {}
mcp_server(const mcp_server &) = delete;
mcp_server & operator=(const mcp_server &) = delete;
mcp_transport(const mcp_transport &) = delete;
mcp_transport & operator=(const mcp_transport &) = delete;
mcp_server(mcp_server && other) noexcept :
mcp_transport(mcp_transport && other) noexcept :
name(std::move(other.name)),
command(std::move(other.command)),
args(std::move(other.args)),
@ -68,39 +66,41 @@ class mcp_server {
read_thread(std::move(other.read_thread)),
err_thread(std::move(other.err_thread)),
stop_read(other.stop_read.load()),
next_id(other.next_id),
pending_requests(std::move(other.pending_requests)),
read_buffer(std::move(other.read_buffer)) {
// Zero out the source process to prevent double-free
read_buffer(std::move(other.read_buffer)),
line_handler(std::move(other.line_handler)) {
other.process = {};
other.running = false;
}
mcp_server & operator=(mcp_server && other) noexcept {
mcp_transport & operator=(mcp_transport && other) noexcept {
if (this != &other) {
stop(); // Clean up current resources
stop();
name = std::move(other.name);
command = std::move(other.command);
args = std::move(other.args);
env = std::move(other.env);
process = other.process;
running = other.running.load();
read_thread = std::move(other.read_thread);
err_thread = std::move(other.err_thread);
stop_read = other.stop_read.load();
next_id = other.next_id;
pending_requests = std::move(other.pending_requests);
read_buffer = std::move(other.read_buffer);
name = std::move(other.name);
command = std::move(other.command);
args = std::move(other.args);
env = std::move(other.env);
process = other.process;
running = other.running.load();
read_thread = std::move(other.read_thread);
err_thread = std::move(other.err_thread);
stop_read = other.stop_read.load();
read_buffer = std::move(other.read_buffer);
line_handler = std::move(other.line_handler);
// Zero out source
other.process = {};
other.running = false;
}
return *this;
}
~mcp_server() { stop(); }
~mcp_transport() { stop(); }
void set_line_handler(std::function<void(const std::string &)> handler) {
line_handler = std::move(handler);
}
bool is_running() const { return running; }
bool start() {
std::vector<const char *> cmd_args;
@ -122,7 +122,6 @@ class mcp_server {
env_vars.push_back(nullptr);
}
// Blocking I/O is simpler with threads
int options = subprocess_option_search_user_path;
int result;
if (env.empty()) {
@ -138,14 +137,16 @@ class mcp_server {
}
running = true;
read_thread = std::thread(&mcp_server::read_loop, this);
err_thread = std::thread(&mcp_server::err_loop, this);
read_thread = std::thread(&mcp_transport::read_loop, this);
err_thread = std::thread(&mcp_transport::err_loop, this);
return true;
}
void stop() {
if (!running) return;
if (!running) {
return;
}
LOG_INF("Stopping MCP server %s...\n", name.c_str());
stop_read = true;
@ -157,12 +158,12 @@ class mcp_server {
// 2. Wait for 10 seconds for normal termination
bool terminated = false;
for (int i = 0; i < 100; ++i) { // 100 * 100ms = 10s
if (subprocess_alive(&process) == 0) {
terminated = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (int i = 0; i < 100; ++i) { // 100 * 100ms = 10s
if (subprocess_alive(&process) == 0) {
terminated = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 3. Terminate if still running
@ -181,14 +182,131 @@ class mcp_server {
// 5. Cleanup
if (running) {
subprocess_destroy(&process);
running = false;
subprocess_destroy(&process);
running = false;
}
LOG_INF("MCP server %s stopped.\n", name.c_str());
}
bool send(const std::string & data) {
FILE * f = subprocess_stdin(&process);
if (!f) {
return false;
}
fwrite(data.c_str(), 1, data.size(), f);
fflush(f);
return true;
}
private:
void read_loop() {
LOG_DBG("MCP read_loop started for %s\n", name.c_str());
char buffer[4096];
while (!stop_read && running) {
unsigned bytes_read = subprocess_read_stdout(&process, buffer, sizeof(buffer));
if (bytes_read == 0) {
// If blocking read returns 0, it means EOF (process exited or pipe closed).
// We should NOT call subprocess_alive() here because it calls subprocess_join()
// which modifies the process struct (closes stdin) and causes race conditions/double-free
// when stop() is called concurrently.
// Just break the loop. The process is likely dead or dying.
if (!stop_read) {
LOG_ERR("MCP server %s: read_loop exiting (stdout closed/EOF)\n", name.c_str());
}
running = false;
break;
}
read_buffer.append(buffer, bytes_read);
size_t pos;
while ((pos = read_buffer.find('\n')) != std::string::npos) {
std::string line = read_buffer.substr(0, pos);
read_buffer.erase(0, pos + 1);
if (line.empty()) {
continue;
}
if (line_handler) {
line_handler(line);
}
}
}
LOG_DBG("MCP read_loop exiting for %s (stop_read=%d, running=%d)\n", name.c_str(), stop_read.load(),
running.load());
}
void err_loop() {
char buffer[1024];
while (!stop_read && running) {
unsigned bytes_read = subprocess_read_stderr(&process, buffer, sizeof(buffer));
if (bytes_read > 0) {
if (stop_read) {
break; // Don't log stderr during shutdown
}
std::string err_str(buffer, bytes_read);
LOG_WRN("[%s stderr] %s", name.c_str(), err_str.c_str());
} else {
// EOF
break;
}
}
}
};
// JSON-RPC layer for MCP server communication.
// Handles request/response correlation, message formatting, and MCP protocol methods.
class mcp_server {
std::string name;
mcp_transport transport;
std::mutex mutex;
int next_id = 1;
std::map<int, std::promise<json>> pending_requests;
public:
mcp_server(const std::string & name,
const std::string & cmd,
const std::vector<std::string> & args,
const std::map<std::string, std::string> & env) :
name(name),
transport(name, cmd, args, env) {}
mcp_server(const mcp_server &) = delete;
mcp_server & operator=(const mcp_server &) = delete;
mcp_server(mcp_server && other) noexcept :
name(std::move(other.name)),
transport(std::move(other.transport)),
next_id(other.next_id),
pending_requests(std::move(other.pending_requests)) {}
mcp_server & operator=(mcp_server && other) noexcept {
if (this != &other) {
stop();
name = std::move(other.name);
transport = std::move(other.transport);
next_id = other.next_id;
pending_requests = std::move(other.pending_requests);
}
return *this;
}
~mcp_server() { stop(); }
bool start() {
transport.set_line_handler([this](const std::string & line) {
handle_line(line);
});
return transport.start();
}
void stop() { transport.stop(); }
json send_request(const std::string & method, const json & params = json::object()) {
if (!running) {
if (!transport.is_running()) {
LOG_ERR("Cannot send request to %s: server not running\n", name.c_str());
return nullptr;
}
@ -208,13 +326,9 @@ class mcp_server {
{ "params", params }
};
std::string req_str = req.dump() + "\n";
FILE * stdin_file = subprocess_stdin(&process);
if (stdin_file) {
LOG_DBG("MCP request to %s [id=%d]: %s\n", name.c_str(), id, method.c_str());
fwrite(req_str.c_str(), 1, req_str.size(), stdin_file);
fflush(stdin_file);
} else {
std::string req_str = req.dump() + "\n";
LOG_DBG("MCP request to %s [id=%d]: %s\n", name.c_str(), id, method.c_str());
if (!transport.send(req_str)) {
LOG_ERR("Cannot send request to %s: stdin is null\n", name.c_str());
std::lock_guard<std::mutex> lock(mutex);
pending_requests.erase(id);
@ -239,29 +353,19 @@ class mcp_server {
{ "params", params }
};
std::string req_str = req.dump() + "\n";
FILE * stdin_file = subprocess_stdin(&process);
if (stdin_file) {
fwrite(req_str.c_str(), 1, req_str.size(), stdin_file);
fflush(stdin_file);
}
transport.send(req.dump() + "\n");
}
// Initialize handshake
bool initialize() {
// Send initialize
json init_params = {
{ "protocolVersion", "2024-11-05" },
{ "capabilities", {
{ "roots", {
{ "listChanged", false }
} },
{ "sampling", json::object() }
} },
{ "clientInfo", {
{ "name", "llama.cpp-cli" },
{ "version", "0.1.0" } // TODO: use real version
} }
{ "protocolVersion", "2024-11-05" },
{ "capabilities", { { "roots", { { "listChanged", false } } }, { "sampling", json::object() } } },
{ "clientInfo",
{
{ "name", "llama.cpp-cli" }, { "version", "0.1.0" } // TODO: use real version
} }
};
json res = send_request("initialize", init_params);
@ -320,83 +424,34 @@ class mcp_server {
}
private:
void read_loop() {
LOG_DBG("MCP read_loop started for %s\n", name.c_str());
char buffer[4096];
while (!stop_read && running) {
unsigned bytes_read = subprocess_read_stdout(&process, buffer, sizeof(buffer));
if (bytes_read == 0) {
// If blocking read returns 0, it means EOF (process exited or pipe closed).
// We should NOT call subprocess_alive() here because it calls subprocess_join()
// which modifies the process struct (closes stdin) and causes race conditions/double-free
// when stop() is called concurrently.
// Just break the loop. The process is likely dead or dying.
if (!stop_read) {
LOG_ERR("MCP server %s: read_loop exiting (stdout closed/EOF)\n", name.c_str());
void handle_line(const std::string & line) {
try {
json msg = json::parse(line);
if (msg.contains("id") && !msg["id"].is_null()) {
// Response - handle both int and string IDs (JSON-RPC allows both)
int id;
if (msg["id"].is_string()) {
id = std::stoi(msg["id"].get<std::string>());
} else {
id = msg["id"].get<int>();
}
running = false;
break;
}
read_buffer.append(buffer, bytes_read);
size_t pos;
while ((pos = read_buffer.find('\n')) != std::string::npos) {
std::string line = read_buffer.substr(0, pos);
read_buffer.erase(0, pos + 1);
if (line.empty()) {
continue;
std::lock_guard<std::mutex> lock(mutex);
if (pending_requests.count(id)) {
LOG_DBG("MCP response received from %s [id=%d]\n", name.c_str(), id);
pending_requests[id].set_value(msg);
pending_requests.erase(id);
} else {
LOG_WRN("MCP response for unknown id %d from %s: %s\n", id, name.c_str(), line.c_str());
}
try {
json msg = json::parse(line);
if (msg.contains("id") && !msg["id"].is_null()) {
// Response - handle both int and string IDs (JSON-RPC allows both)
int id;
if (msg["id"].is_string()) {
id = std::stoi(msg["id"].get<std::string>());
} else {
id = msg["id"].get<int>();
}
std::lock_guard<std::mutex> lock(mutex);
if (pending_requests.count(id)) {
LOG_DBG("MCP response received from %s [id=%d]\n", name.c_str(), id);
pending_requests[id].set_value(msg);
pending_requests.erase(id);
} else {
LOG_WRN("MCP response for unknown id %d from %s: %s\n", id, name.c_str(), line.c_str());
}
} else {
// Notification or request from server -> ignore for now or log
// MCP servers might send notifications (e.g. logging)
LOG_INF("MCP Notification from %s: %s\n", name.c_str(), line.c_str());
}
} catch (const std::exception & e) {
// Not a full JSON yet? Or invalid?
// If it was a line, it should be valid JSON-RPC
LOG_WRN("Failed to parse JSON from %s: %s (line: %s)\n", name.c_str(), e.what(), line.c_str());
}
}
}
LOG_DBG("MCP read_loop exiting for %s (stop_read=%d, running=%d)\n",
name.c_str(), stop_read.load(), running.load());
}
void err_loop() {
char buffer[1024];
while (!stop_read && running) {
unsigned bytes_read = subprocess_read_stderr(&process, buffer, sizeof(buffer));
if (bytes_read > 0) {
if (stop_read) break; // Don't log stderr during shutdown
std::string err_str(buffer, bytes_read);
// Filter out empty/whitespace-only stderr if desired, or just keep it.
// User said "extra logging that passes the stderr here is unnecessary" referring to shutdown.
LOG_WRN("[%s stderr] %s", name.c_str(), err_str.c_str());
} else {
// EOF
break;
// Notification or request from server -> ignore for now or log
// MCP servers might send notifications (e.g. logging)
LOG_INF("MCP Notification from %s: %s\n", name.c_str(), line.c_str());
}
} catch (const std::exception & e) {
// Not a full JSON yet? Or invalid?
// If it was a line, it should be valid JSON-RPC
LOG_WRN("Failed to parse JSON from %s: %s (line: %s)\n", name.c_str(), e.what(), line.c_str());
}
}
};
@ -434,19 +489,14 @@ class mcp_context {
if (config.contains("mcpServers")) {
std::string server_list;
for (auto & [key, val] : config["mcpServers"].items()) {
if (!server_list.empty()) server_list += ", ";
if (!server_list.empty()) {
server_list += ", ";
}
server_list += key;
}
LOG_INF("MCP configuration found with servers: %s\n", server_list.c_str());
for (auto & [key, val] : config["mcpServers"].items()) {
// If enabled_servers_str is empty, enable all? User said "possibility to pick which MCP servers to enable".
// If the user specifies explicit list, we filter. If not, maybe we shouldn't enable any or enable all?
// The prompt says "possibility to pick".
// Let's assume if list provided, use it. If not, enable all? Or none?
// User provided example implies explicit enabling might be desired.
// Let's assume if `enabled_servers_str` is not empty, we filter.
bool enabled = true;
if (!enabled_list.empty()) {
bool found = false;