llama.cpp/tools/server/webui/src/lib/server/mcp-bridge.ts

335 lines
10 KiB
TypeScript

// [AI] MCP Bridge - Manages MCP server processes via stdio JSON-RPC
import { spawn, type ChildProcess } from 'child_process';
import { MCPServerStatus } from '../types';
import type {
MCPServer,
MCPTool,
MCPToolExecutionRequest,
MCPToolExecutionResult
} from '../types';
/**
* MCPBridge - Manages MCP server processes and communication
*
* This class handles:
* - Spawning/stopping MCP server processes
* - stdio-based JSON-RPC communication with MCP servers
* - Tool schema fetching and caching
* - Tool execution routing
*
* Architecture:
* - MCP servers run as child processes
* - Communication via stdio using JSON-RPC
* - Each server provides tools via list_tools method
* - Tool execution routed to appropriate server
*/
export class MCPBridge {
private processes = new Map<string, ChildProcess>();
private servers = new Map<string, MCPServer>();
private toolCache = new Map<string, MCPTool[]>();
private messageId = 1;
private pendingRequests = new Map<number, {
resolve: (value: unknown) => void;
reject: (error: Error) => void;
}>();
constructor() {
// Initialize with default servers (can be configured later)
this.loadDefaultServers();
}
// ─────────────────────────────────────────────────────────────────────────────
// Server Management
// ─────────────────────────────────────────────────────────────────────────────
private loadDefaultServers(): void {
// Load MCP servers from config (for now, empty - will be configured via UI)
const defaultServers: MCPServer[] = [];
for (const server of defaultServers) {
this.servers.set(server.id, server);
}
}
async listServers(): Promise<MCPServer[]> {
return Array.from(this.servers.values());
}
async addServer(server: Omit<MCPServer, 'status'>): Promise<MCPServer> {
console.log("[MCPBridge] addServer called:", server.id, "enabled:", server.enabled);
const newServer: MCPServer = {
...server,
status: MCPServerStatus.STOPPED
};
this.servers.set(server.id, newServer);
if (server.enabled) {
console.log("[MCPBridge] Starting server:", server.id);
await this.startServer(server.id);
}
return newServer;
}
async updateServer(serverId: string, updates: Partial<MCPServer>): Promise<MCPServer> {
const server = this.servers.get(serverId);
if (!server) {
throw new Error(`Server ${serverId} not found`);
}
const wasEnabled = server.enabled;
const updatedServer = { ...server, ...updates };
this.servers.set(serverId, updatedServer);
// Handle enable/disable state changes
if (wasEnabled && !updatedServer.enabled) {
await this.stopServer(serverId);
} else if (!wasEnabled && updatedServer.enabled) {
await this.startServer(serverId);
}
return updatedServer;
}
async removeServer(serverId: string): Promise<void> {
await this.stopServer(serverId);
this.servers.delete(serverId);
this.toolCache.delete(serverId);
}
// ─────────────────────────────────────────────────────────────────────────────
// Process Lifecycle
// ─────────────────────────────────────────────────────────────────────────────
private async startServer(serverId: string): Promise<void> {
const server = this.servers.get(serverId);
if (!server) {
throw new Error(`Server ${serverId} not found`);
}
if (this.processes.has(serverId)) {
console.warn(`Server ${serverId} already running`);
return;
}
server.status = MCPServerStatus.STARTING;
try {
const childProcess = spawn(server.command, server.args, {
env: { ...process.env, ...server.env },
stdio: ['pipe', 'pipe', 'pipe']
});
this.processes.set(serverId, childProcess);
// Handle stdout (JSON-RPC responses)
let buffer = '';
childProcess.stdout?.on('data', (data) => {
buffer += data.toString();
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.trim()) {
try {
const message = JSON.parse(line);
this.handleMessage(serverId, message);
} catch (error) {
console.error(`Failed to parse message from ${serverId}:`, error);
}
}
}
});
// Handle stderr (logging)
childProcess.stderr?.on('data', (data) => {
console.error(`[${serverId}] ${data.toString()}`);
});
// Handle process exit
childProcess.on('exit', (code) => {
console.log(`Server ${serverId} exited with code ${code}`);
this.processes.delete(serverId);
server.status = code === 0 ? MCPServerStatus.STOPPED : MCPServerStatus.ERROR;
});
childProcess.on('error', (error) => {
console.error(`Server ${serverId} error:`, error);
server.status = MCPServerStatus.ERROR;
this.processes.delete(serverId);
});
// Initialize connection and fetch tools
await this.initializeServer(serverId);
server.status = MCPServerStatus.RUNNING;
} catch (error) {
server.status = MCPServerStatus.ERROR;
throw error;
}
}
private async stopServer(serverId: string): Promise<void> {
const childProcess = this.processes.get(serverId);
if (!childProcess) return;
return new Promise((resolve) => {
childProcess.on('exit', () => {
this.processes.delete(serverId);
const server = this.servers.get(serverId);
if (server) {
server.status = MCPServerStatus.STOPPED;
}
resolve();
});
childProcess.kill('SIGTERM');
// Force kill after 5 seconds
setTimeout(() => {
if (this.processes.has(serverId)) {
childProcess.kill('SIGKILL');
}
}, 5000);
});
}
// ─────────────────────────────────────────────────────────────────────────────
// JSON-RPC Communication
// ─────────────────────────────────────────────────────────────────────────────
private async sendRequest(serverId: string, method: string, params?: unknown): Promise<unknown> {
const childProcess = this.processes.get(serverId);
if (!childProcess) {
throw new Error(`Server ${serverId} not running`);
}
const id = this.messageId++;
const request = {
jsonrpc: '2.0',
id,
method,
...(params && { params })
};
return new Promise((resolve, reject) => {
this.pendingRequests.set(id, { resolve, reject });
childProcess.stdin?.write(JSON.stringify(request) + '\n');
// Timeout after 30 seconds
setTimeout(() => {
if (this.pendingRequests.has(id)) {
this.pendingRequests.delete(id);
reject(new Error(`Request timeout for ${method}`));
}
}, 30000);
});
}
private handleMessage(serverId: string, message: {
jsonrpc: string;
id?: number;
result?: unknown;
error?: { code: number; message: string; data?: unknown };
}): void {
if (message.id === undefined) return;
const pending = this.pendingRequests.get(message.id);
if (!pending) return;
this.pendingRequests.delete(message.id);
if (message.error) {
pending.reject(new Error(message.error.message));
} else {
pending.resolve(message.result);
}
}
// ─────────────────────────────────────────────────────────────────────────────
// MCP Protocol Methods
// ─────────────────────────────────────────────────────────────────────────────
private async initializeServer(serverId: string): Promise<void> {
// Send initialize request
await this.sendRequest(serverId, 'initialize', {
protocolVersion: '2024-11-05',
capabilities: {
tools: {}
},
clientInfo: {
name: 'llama.cpp-webui',
version: '1.0.0'
}
});
// Fetch available tools
await this.fetchTools(serverId);
}
private async fetchTools(serverId: string): Promise<void> {
const result = await this.sendRequest(serverId, 'tools/list') as { tools: Array<{
name: string;
description?: string;
inputSchema: Record<string, unknown>;
}>};
const tools: MCPTool[] = result.tools.map(tool => ({
type: 'function' as const,
function: {
name: tool.name,
description: tool.description,
parameters: tool.inputSchema
},
serverId
}));
this.toolCache.set(serverId, tools);
}
async listTools(): Promise<MCPTool[]> {
const allTools: MCPTool[] = [];
for (const server of this.servers.values()) {
if (server.enabled && server.status === MCPServerStatus.RUNNING) {
const tools = this.toolCache.get(server.id) || [];
allTools.push(...tools);
}
}
return allTools;
}
async executeTool(request: MCPToolExecutionRequest): Promise<MCPToolExecutionResult> {
const { serverId, toolName, arguments: args } = request;
try {
const result = await this.sendRequest(serverId, 'tools/call', {
name: toolName,
arguments: args
}) as { content: Array<{ type: string; text: string }> };
return {
success: true,
result: result.content[0]?.text || JSON.stringify(result)
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Cleanup
// ─────────────────────────────────────────────────────────────────────────────
async shutdown(): Promise<void> {
const stopPromises = Array.from(this.servers.keys()).map(id => this.stopServer(id));
await Promise.all(stopPromises);
}
}