From 9e60c0c83ff3dd8a618c6448508d8a38bc463284 Mon Sep 17 00:00:00 2001 From: Piotr Wilkin Date: Thu, 5 Feb 2026 21:28:24 +0100 Subject: [PATCH] Separation of concerns (separate transport and comms layer) --- tools/cli/mcp.hpp | 356 ++++++++++++++++++++++++++-------------------- 1 file changed, 203 insertions(+), 153 deletions(-) diff --git a/tools/cli/mcp.hpp b/tools/cli/mcp.hpp index 8d8ce4de98..e5d0d9d6c8 100644 --- a/tools/cli/mcp.hpp +++ b/tools/cli/mcp.hpp @@ -1,12 +1,11 @@ #pragma once #include "../../vendor/sheredom/subprocess.h" -#include "common.h" #include "log.h" #include -#include #include +#include #include #include #include @@ -26,7 +25,9 @@ struct mcp_tool { std::string server_name; }; -class mcp_server { +// Low-level transport layer for subprocess-based MCP communication. +// Handles subprocess lifecycle, raw I/O, and line-based message framing. +class mcp_transport { std::string name; std::string command; std::vector args; @@ -38,27 +39,24 @@ class mcp_server { std::thread err_thread; std::atomic stop_read{ false }; - std::mutex mutex; - int next_id = 1; - std::map> pending_requests; - - // Buffer for reading std::string read_buffer; + std::function line_handler; + public: - mcp_server(const std::string & name, - const std::string & cmd, - const std::vector & args, - const std::map & env) : + mcp_transport(const std::string & name, + const std::string & cmd, + const std::vector & args, + const std::map & env) : name(name), command(cmd), args(args), env(env) {} - mcp_server(const mcp_server &) = delete; - mcp_server & operator=(const mcp_server &) = delete; + mcp_transport(const mcp_transport &) = delete; + mcp_transport & operator=(const mcp_transport &) = delete; - mcp_server(mcp_server && other) noexcept : + mcp_transport(mcp_transport && other) noexcept : name(std::move(other.name)), command(std::move(other.command)), args(std::move(other.args)), @@ -68,39 +66,41 @@ class mcp_server { read_thread(std::move(other.read_thread)), err_thread(std::move(other.err_thread)), stop_read(other.stop_read.load()), - next_id(other.next_id), - pending_requests(std::move(other.pending_requests)), - read_buffer(std::move(other.read_buffer)) { - // Zero out the source process to prevent double-free + read_buffer(std::move(other.read_buffer)), + line_handler(std::move(other.line_handler)) { other.process = {}; other.running = false; } - mcp_server & operator=(mcp_server && other) noexcept { + mcp_transport & operator=(mcp_transport && other) noexcept { if (this != &other) { - stop(); // Clean up current resources + stop(); - name = std::move(other.name); - command = std::move(other.command); - args = std::move(other.args); - env = std::move(other.env); - process = other.process; - running = other.running.load(); - read_thread = std::move(other.read_thread); - err_thread = std::move(other.err_thread); - stop_read = other.stop_read.load(); - next_id = other.next_id; - pending_requests = std::move(other.pending_requests); - read_buffer = std::move(other.read_buffer); + name = std::move(other.name); + command = std::move(other.command); + args = std::move(other.args); + env = std::move(other.env); + process = other.process; + running = other.running.load(); + read_thread = std::move(other.read_thread); + err_thread = std::move(other.err_thread); + stop_read = other.stop_read.load(); + read_buffer = std::move(other.read_buffer); + line_handler = std::move(other.line_handler); - // Zero out source other.process = {}; other.running = false; } return *this; } - ~mcp_server() { stop(); } + ~mcp_transport() { stop(); } + + void set_line_handler(std::function handler) { + line_handler = std::move(handler); + } + + bool is_running() const { return running; } bool start() { std::vector cmd_args; @@ -122,7 +122,6 @@ class mcp_server { env_vars.push_back(nullptr); } - // Blocking I/O is simpler with threads int options = subprocess_option_search_user_path; int result; if (env.empty()) { @@ -138,14 +137,16 @@ class mcp_server { } running = true; - read_thread = std::thread(&mcp_server::read_loop, this); - err_thread = std::thread(&mcp_server::err_loop, this); + read_thread = std::thread(&mcp_transport::read_loop, this); + err_thread = std::thread(&mcp_transport::err_loop, this); return true; } void stop() { - if (!running) return; + if (!running) { + return; + } LOG_INF("Stopping MCP server %s...\n", name.c_str()); stop_read = true; @@ -157,12 +158,12 @@ class mcp_server { // 2. Wait for 10 seconds for normal termination bool terminated = false; - for (int i = 0; i < 100; ++i) { // 100 * 100ms = 10s - if (subprocess_alive(&process) == 0) { - terminated = true; - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + for (int i = 0; i < 100; ++i) { // 100 * 100ms = 10s + if (subprocess_alive(&process) == 0) { + terminated = true; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // 3. Terminate if still running @@ -181,14 +182,131 @@ class mcp_server { // 5. Cleanup if (running) { - subprocess_destroy(&process); - running = false; + subprocess_destroy(&process); + running = false; } LOG_INF("MCP server %s stopped.\n", name.c_str()); } + bool send(const std::string & data) { + FILE * f = subprocess_stdin(&process); + if (!f) { + return false; + } + fwrite(data.c_str(), 1, data.size(), f); + fflush(f); + return true; + } + + private: + void read_loop() { + LOG_DBG("MCP read_loop started for %s\n", name.c_str()); + char buffer[4096]; + while (!stop_read && running) { + unsigned bytes_read = subprocess_read_stdout(&process, buffer, sizeof(buffer)); + + if (bytes_read == 0) { + // If blocking read returns 0, it means EOF (process exited or pipe closed). + // We should NOT call subprocess_alive() here because it calls subprocess_join() + // which modifies the process struct (closes stdin) and causes race conditions/double-free + // when stop() is called concurrently. + // Just break the loop. The process is likely dead or dying. + if (!stop_read) { + LOG_ERR("MCP server %s: read_loop exiting (stdout closed/EOF)\n", name.c_str()); + } + running = false; + break; + } + + read_buffer.append(buffer, bytes_read); + size_t pos; + while ((pos = read_buffer.find('\n')) != std::string::npos) { + std::string line = read_buffer.substr(0, pos); + read_buffer.erase(0, pos + 1); + + if (line.empty()) { + continue; + } + + if (line_handler) { + line_handler(line); + } + } + } + LOG_DBG("MCP read_loop exiting for %s (stop_read=%d, running=%d)\n", name.c_str(), stop_read.load(), + running.load()); + } + + void err_loop() { + char buffer[1024]; + while (!stop_read && running) { + unsigned bytes_read = subprocess_read_stderr(&process, buffer, sizeof(buffer)); + if (bytes_read > 0) { + if (stop_read) { + break; // Don't log stderr during shutdown + } + std::string err_str(buffer, bytes_read); + LOG_WRN("[%s stderr] %s", name.c_str(), err_str.c_str()); + } else { + // EOF + break; + } + } + } +}; + +// JSON-RPC layer for MCP server communication. +// Handles request/response correlation, message formatting, and MCP protocol methods. +class mcp_server { + std::string name; + mcp_transport transport; + + std::mutex mutex; + int next_id = 1; + std::map> pending_requests; + + public: + mcp_server(const std::string & name, + const std::string & cmd, + const std::vector & args, + const std::map & env) : + name(name), + transport(name, cmd, args, env) {} + + mcp_server(const mcp_server &) = delete; + mcp_server & operator=(const mcp_server &) = delete; + + mcp_server(mcp_server && other) noexcept : + name(std::move(other.name)), + transport(std::move(other.transport)), + next_id(other.next_id), + pending_requests(std::move(other.pending_requests)) {} + + mcp_server & operator=(mcp_server && other) noexcept { + if (this != &other) { + stop(); + + name = std::move(other.name); + transport = std::move(other.transport); + next_id = other.next_id; + pending_requests = std::move(other.pending_requests); + } + return *this; + } + + ~mcp_server() { stop(); } + + bool start() { + transport.set_line_handler([this](const std::string & line) { + handle_line(line); + }); + return transport.start(); + } + + void stop() { transport.stop(); } + json send_request(const std::string & method, const json & params = json::object()) { - if (!running) { + if (!transport.is_running()) { LOG_ERR("Cannot send request to %s: server not running\n", name.c_str()); return nullptr; } @@ -208,13 +326,9 @@ class mcp_server { { "params", params } }; - std::string req_str = req.dump() + "\n"; - FILE * stdin_file = subprocess_stdin(&process); - if (stdin_file) { - LOG_DBG("MCP request to %s [id=%d]: %s\n", name.c_str(), id, method.c_str()); - fwrite(req_str.c_str(), 1, req_str.size(), stdin_file); - fflush(stdin_file); - } else { + std::string req_str = req.dump() + "\n"; + LOG_DBG("MCP request to %s [id=%d]: %s\n", name.c_str(), id, method.c_str()); + if (!transport.send(req_str)) { LOG_ERR("Cannot send request to %s: stdin is null\n", name.c_str()); std::lock_guard lock(mutex); pending_requests.erase(id); @@ -239,29 +353,19 @@ class mcp_server { { "params", params } }; - std::string req_str = req.dump() + "\n"; - FILE * stdin_file = subprocess_stdin(&process); - if (stdin_file) { - fwrite(req_str.c_str(), 1, req_str.size(), stdin_file); - fflush(stdin_file); - } + transport.send(req.dump() + "\n"); } // Initialize handshake bool initialize() { // Send initialize json init_params = { - { "protocolVersion", "2024-11-05" }, - { "capabilities", { - { "roots", { - { "listChanged", false } - } }, - { "sampling", json::object() } - } }, - { "clientInfo", { - { "name", "llama.cpp-cli" }, - { "version", "0.1.0" } // TODO: use real version - } } + { "protocolVersion", "2024-11-05" }, + { "capabilities", { { "roots", { { "listChanged", false } } }, { "sampling", json::object() } } }, + { "clientInfo", + { + { "name", "llama.cpp-cli" }, { "version", "0.1.0" } // TODO: use real version + } } }; json res = send_request("initialize", init_params); @@ -320,83 +424,34 @@ class mcp_server { } private: - void read_loop() { - LOG_DBG("MCP read_loop started for %s\n", name.c_str()); - char buffer[4096]; - while (!stop_read && running) { - unsigned bytes_read = subprocess_read_stdout(&process, buffer, sizeof(buffer)); - - if (bytes_read == 0) { - // If blocking read returns 0, it means EOF (process exited or pipe closed). - // We should NOT call subprocess_alive() here because it calls subprocess_join() - // which modifies the process struct (closes stdin) and causes race conditions/double-free - // when stop() is called concurrently. - // Just break the loop. The process is likely dead or dying. - if (!stop_read) { - LOG_ERR("MCP server %s: read_loop exiting (stdout closed/EOF)\n", name.c_str()); + void handle_line(const std::string & line) { + try { + json msg = json::parse(line); + if (msg.contains("id") && !msg["id"].is_null()) { + // Response - handle both int and string IDs (JSON-RPC allows both) + int id; + if (msg["id"].is_string()) { + id = std::stoi(msg["id"].get()); + } else { + id = msg["id"].get(); } - running = false; - break; - } - - read_buffer.append(buffer, bytes_read); - size_t pos; - while ((pos = read_buffer.find('\n')) != std::string::npos) { - std::string line = read_buffer.substr(0, pos); - read_buffer.erase(0, pos + 1); - - if (line.empty()) { - continue; + std::lock_guard lock(mutex); + if (pending_requests.count(id)) { + LOG_DBG("MCP response received from %s [id=%d]\n", name.c_str(), id); + pending_requests[id].set_value(msg); + pending_requests.erase(id); + } else { + LOG_WRN("MCP response for unknown id %d from %s: %s\n", id, name.c_str(), line.c_str()); } - - try { - json msg = json::parse(line); - if (msg.contains("id") && !msg["id"].is_null()) { - // Response - handle both int and string IDs (JSON-RPC allows both) - int id; - if (msg["id"].is_string()) { - id = std::stoi(msg["id"].get()); - } else { - id = msg["id"].get(); - } - std::lock_guard lock(mutex); - if (pending_requests.count(id)) { - LOG_DBG("MCP response received from %s [id=%d]\n", name.c_str(), id); - pending_requests[id].set_value(msg); - pending_requests.erase(id); - } else { - LOG_WRN("MCP response for unknown id %d from %s: %s\n", id, name.c_str(), line.c_str()); - } - } else { - // Notification or request from server -> ignore for now or log - // MCP servers might send notifications (e.g. logging) - LOG_INF("MCP Notification from %s: %s\n", name.c_str(), line.c_str()); - } - } catch (const std::exception & e) { - // Not a full JSON yet? Or invalid? - // If it was a line, it should be valid JSON-RPC - LOG_WRN("Failed to parse JSON from %s: %s (line: %s)\n", name.c_str(), e.what(), line.c_str()); - } - } - } - LOG_DBG("MCP read_loop exiting for %s (stop_read=%d, running=%d)\n", - name.c_str(), stop_read.load(), running.load()); - } - - void err_loop() { - char buffer[1024]; - while (!stop_read && running) { - unsigned bytes_read = subprocess_read_stderr(&process, buffer, sizeof(buffer)); - if (bytes_read > 0) { - if (stop_read) break; // Don't log stderr during shutdown - std::string err_str(buffer, bytes_read); - // Filter out empty/whitespace-only stderr if desired, or just keep it. - // User said "extra logging that passes the stderr here is unnecessary" referring to shutdown. - LOG_WRN("[%s stderr] %s", name.c_str(), err_str.c_str()); } else { - // EOF - break; + // Notification or request from server -> ignore for now or log + // MCP servers might send notifications (e.g. logging) + LOG_INF("MCP Notification from %s: %s\n", name.c_str(), line.c_str()); } + } catch (const std::exception & e) { + // Not a full JSON yet? Or invalid? + // If it was a line, it should be valid JSON-RPC + LOG_WRN("Failed to parse JSON from %s: %s (line: %s)\n", name.c_str(), e.what(), line.c_str()); } } }; @@ -434,19 +489,14 @@ class mcp_context { if (config.contains("mcpServers")) { std::string server_list; for (auto & [key, val] : config["mcpServers"].items()) { - if (!server_list.empty()) server_list += ", "; + if (!server_list.empty()) { + server_list += ", "; + } server_list += key; } LOG_INF("MCP configuration found with servers: %s\n", server_list.c_str()); for (auto & [key, val] : config["mcpServers"].items()) { - // If enabled_servers_str is empty, enable all? User said "possibility to pick which MCP servers to enable". - // If the user specifies explicit list, we filter. If not, maybe we shouldn't enable any or enable all? - // The prompt says "possibility to pick". - // Let's assume if list provided, use it. If not, enable all? Or none? - // User provided example implies explicit enabling might be desired. - // Let's assume if `enabled_servers_str` is not empty, we filter. - bool enabled = true; if (!enabled_list.empty()) { bool found = false;