From 60475dca3c1b7162b0e24f90941a3b7c0cbf6955 Mon Sep 17 00:00:00 2001 From: Aleksander Grygier Date: Mon, 29 Dec 2025 10:35:46 +0100 Subject: [PATCH] feat: Implement agentic orchestration within ChatService --- tools/server/webui/src/lib/services/chat.ts | 348 ++++++++++++++++---- 1 file changed, 284 insertions(+), 64 deletions(-) diff --git a/tools/server/webui/src/lib/services/chat.ts b/tools/server/webui/src/lib/services/chat.ts index 12b7deb221..636d3c5463 100644 --- a/tools/server/webui/src/lib/services/chat.ts +++ b/tools/server/webui/src/lib/services/chat.ts @@ -1,10 +1,15 @@ import { getAuthHeaders, getJsonHeaders } from '$lib/utils'; import { AttachmentType } from '$lib/enums'; import { config } from '$lib/stores/settings.svelte'; -import { ensureMcpClient } from '$lib/services/mcp-singleton'; import { getAgenticConfig } from '$lib/config/agentic'; -import { AgenticOrchestrator } from '$lib/agentic/orchestrator'; -import { OpenAISseClient } from '$lib/agentic/openai-sse-client'; +import { OpenAISseClient, type OpenAISseTurnResult } from '$lib/agentic/openai-sse-client'; +import type { + AgenticChatCompletionRequest, + AgenticMessage, + AgenticToolCallList +} from '$lib/agentic/types'; +import { toAgenticMessages } from '$lib/agentic/types'; +import type { MCPToolCall, IMCPClient } from '$lib/mcp'; /** * ChatService - Low-level API communication layer for Chat Completions @@ -57,7 +62,8 @@ export class ChatService { messages: ApiChatMessageData[] | (DatabaseMessage & { extra?: DatabaseMessageExtra[] })[], options: SettingsChatServiceOptions = {}, conversationId?: string, - signal?: AbortSignal + signal?: AbortSignal, + mcpClient?: IMCPClient ): Promise { const { stream, @@ -178,68 +184,35 @@ export class ChatService { } } - // MCP agentic orchestration (low-coupling mode) - // Check if MCP client is available and agentic mode is enabled - if (stream) { - const mcpClient = await ensureMcpClient(); - const agenticConfig = mcpClient ? getAgenticConfig(config()) : undefined; + // MCP agentic orchestration + // Run agentic loop if MCP client is provided and agentic mode is enabled + const agenticConfig = getAgenticConfig(config()); + if (stream && agenticConfig.enabled && mcpClient) { + console.log('[ChatService] Running agentic loop with MCP client'); + try { + const agenticResult = await ChatService.runAgenticLoop({ + mcpClient, + messages: normalizedMessages, + requestBody, + agenticConfig, + callbacks: { + onChunk, + onReasoningChunk, + onToolCallChunk, + onModel, + onComplete, + onError, + onTimings + }, + signal + }); - // Debug: verify MCP tools are available - if (mcpClient) { - const availableTools = mcpClient.listTools(); - console.log( - `[MCP] Client initialized with ${availableTools.length} tools:`, - availableTools - ); - } else { - console.log('[MCP] No MCP client available'); - } - - if (mcpClient && agenticConfig?.enabled) { - try { - const llmClient = new OpenAISseClient({ - url: './v1/chat/completions', - buildHeaders: () => getAuthHeaders() - }); - - const orchestrator = new AgenticOrchestrator({ - mcpClient, - llmClient, - maxTurns: agenticConfig.maxTurns, - maxToolPreviewLines: agenticConfig.maxToolPreviewLines - }); - - let capturedTimings: ChatMessageTimings | undefined; - - await orchestrator.run({ - initialMessages: normalizedMessages, - requestTemplate: requestBody, - callbacks: { - onChunk, - onReasoningChunk, - onToolCallChunk, - onModel, - onComplete: onComplete - ? () => onComplete('', undefined, capturedTimings, undefined) - : undefined, - onError - }, - abortSignal: signal, - onProcessingUpdate: (timings, progress) => { - ChatService.notifyTimings(timings, progress, onTimings); - if (timings) { - capturedTimings = timings; - } - }, - maxTurns: agenticConfig.maxTurns, - filterReasoningAfterFirstTurn: agenticConfig.filterReasoningAfterFirstTurn - }); - - return; - } catch (error) { - // If MCP orchestration fails, log and fall through to standard flow - console.warn('MCP orchestration failed, falling back to standard flow:', error); + if (agenticResult) { + return; // Agentic loop handled the request } + // Fall through to standard flow if agentic loop returned false + } catch (error) { + console.warn('[ChatService] Agentic loop failed, falling back to standard flow:', error); } } @@ -851,4 +824,251 @@ export class ChatService { onTimingsCallback(timings, promptProgress); } + + // ───────────────────────────────────────────────────────────────────────────── + // Agentic Orchestration + // ───────────────────────────────────────────────────────────────────────────── + + /** + * Run the agentic orchestration loop with MCP tools. + * The MCP client is passed from the store layer - ChatService remains stateless. + * + * @param params - Parameters for the agentic loop including the MCP client + * @returns true if agentic loop handled the request + */ + private static async runAgenticLoop(params: { + mcpClient: IMCPClient; + messages: ApiChatMessageData[]; + requestBody: ApiChatCompletionRequest; + agenticConfig: ReturnType; + callbacks: { + onChunk?: (chunk: string) => void; + onReasoningChunk?: (chunk: string) => void; + onToolCallChunk?: (serializedToolCalls: string) => void; + onModel?: (model: string) => void; + onComplete?: ( + content: string, + reasoningContent?: string, + timings?: ChatMessageTimings, + toolCalls?: string + ) => void; + onError?: (error: Error) => void; + onTimings?: (timings: ChatMessageTimings, promptProgress?: ChatMessagePromptProgress) => void; + }; + signal?: AbortSignal; + }): Promise { + const { mcpClient, messages, requestBody, agenticConfig, callbacks, signal } = params; + const { onChunk, onReasoningChunk, onToolCallChunk, onModel, onComplete, onError, onTimings } = + callbacks; + + console.log(`[ChatService] Running agentic loop with ${mcpClient.listTools().length} tools`); + + // Set up LLM client + const llmClient = new OpenAISseClient({ + url: './v1/chat/completions', + buildHeaders: () => getAuthHeaders() + }); + + // Prepare session state + const sessionMessages: AgenticMessage[] = toAgenticMessages(messages); + const tools = await mcpClient.getToolsDefinition(); + const allToolCalls: ApiChatCompletionToolCall[] = []; + let capturedTimings: ChatMessageTimings | undefined; + + const requestWithoutMessages = { ...requestBody }; + delete (requestWithoutMessages as Partial).messages; + const requestBase: AgenticChatCompletionRequest = { + ...(requestWithoutMessages as Omit), + stream: true, + messages: [] + }; + + const maxTurns = agenticConfig.maxTurns; + const maxToolPreviewLines = agenticConfig.maxToolPreviewLines; + + // Run agentic loop + for (let turn = 0; turn < maxTurns; turn++) { + if (signal?.aborted) { + onComplete?.('', undefined, capturedTimings, undefined); + return true; + } + + const llmRequest: AgenticChatCompletionRequest = { + ...requestBase, + messages: sessionMessages, + tools: tools.length > 0 ? tools : undefined + }; + + const shouldFilterReasoning = agenticConfig.filterReasoningAfterFirstTurn && turn > 0; + + let turnResult: OpenAISseTurnResult; + try { + turnResult = await llmClient.stream( + llmRequest, + { + onChunk, + onReasoningChunk: shouldFilterReasoning ? undefined : onReasoningChunk, + onModel, + onFirstValidChunk: undefined, + onProcessingUpdate: (timings, progress) => { + ChatService.notifyTimings(timings, progress, onTimings); + if (timings) capturedTimings = timings; + } + }, + signal + ); + } catch (error) { + if (signal?.aborted) { + onComplete?.('', undefined, capturedTimings, undefined); + return true; + } + const normalizedError = error instanceof Error ? error : new Error('LLM stream error'); + onError?.(normalizedError); + onChunk?.(`\n\n\`\`\`\nUpstream LLM error:\n${normalizedError.message}\n\`\`\`\n`); + onComplete?.('', undefined, capturedTimings, undefined); + return true; + } + + // Check if we should stop (no tool calls or finish reason isn't tool_calls) + if ( + turnResult.toolCalls.length === 0 || + (turnResult.finishReason && turnResult.finishReason !== 'tool_calls') + ) { + onComplete?.('', undefined, capturedTimings, undefined); + return true; + } + + // Normalize and validate tool calls + const normalizedCalls = ChatService.normalizeToolCalls(turnResult.toolCalls); + if (normalizedCalls.length === 0) { + onComplete?.('', undefined, capturedTimings, undefined); + return true; + } + + // Accumulate tool calls + for (const call of normalizedCalls) { + allToolCalls.push({ + id: call.id, + type: call.type, + function: call.function ? { ...call.function } : undefined + }); + } + onToolCallChunk?.(JSON.stringify(allToolCalls)); + + // Add assistant message with tool calls to session + sessionMessages.push({ + role: 'assistant', + content: turnResult.content || undefined, + tool_calls: normalizedCalls + }); + + // Execute each tool call + for (const toolCall of normalizedCalls) { + if (signal?.aborted) { + onComplete?.('', undefined, capturedTimings, undefined); + return true; + } + + const mcpCall: MCPToolCall = { + id: toolCall.id, + function: { + name: toolCall.function.name, + arguments: toolCall.function.arguments + } + }; + + let result: string; + try { + result = await mcpClient.execute(mcpCall, signal); + } catch (error) { + if (error instanceof Error && error.name !== 'AbortError') { + onError?.(error); + } + result = `Error: ${error instanceof Error ? error.message : String(error)}`; + } + + if (signal?.aborted) { + onComplete?.('', undefined, capturedTimings, undefined); + return true; + } + + // Emit tool preview + ChatService.emitToolPreview(toolCall, result, maxToolPreviewLines, onChunk); + + // Add tool result to session (sanitize base64 images for context) + const contextValue = ChatService.isBase64Image(result) + ? '[Image displayed to user]' + : result; + sessionMessages.push({ + role: 'tool', + tool_call_id: toolCall.id, + content: contextValue + }); + } + } + + // Turn limit reached + onChunk?.('\n\n```\nTurn limit reached\n```\n'); + onComplete?.('', undefined, capturedTimings, undefined); + return true; + } + + /** + * Normalize tool calls from LLM response + */ + private static normalizeToolCalls(toolCalls: ApiChatCompletionToolCall[]): AgenticToolCallList { + if (!toolCalls) return []; + return toolCalls.map((call, index) => ({ + id: call?.id ?? `tool_${index}`, + type: (call?.type as 'function') ?? 'function', + function: { + name: call?.function?.name ?? '', + arguments: call?.function?.arguments ?? '' + } + })); + } + + /** + * Emit tool call preview to the chunk callback + */ + private static emitToolPreview( + toolCall: AgenticToolCallList[number], + result: string, + maxLines: number, + emit?: (chunk: string) => void + ): void { + if (!emit) return; + + const toolName = toolCall.function.name; + const toolArgs = toolCall.function.arguments; + + let output = `\n\n`; + output += `\n`; + output += `\n`; + + if (ChatService.isBase64Image(result)) { + output += `\n![tool-result](${result.trim()})`; + } else { + const lines = result.split('\n'); + const trimmedLines = lines.length > maxLines ? lines.slice(-maxLines) : lines; + output += `\n\`\`\`\n${trimmedLines.join('\n')}\n\`\`\``; + } + + output += `\n\n`; + emit(output); + } + + /** + * Check if content is a base64 image + */ + private static isBase64Image(content: string): boolean { + const trimmed = content.trim(); + if (!trimmed.startsWith('data:image/')) return false; + + const match = trimmed.match(/^data:image\/(png|jpe?g|gif|webp);base64,([A-Za-z0-9+/]+=*)$/); + if (!match) return false; + + const base64Payload = match[2]; + return base64Payload.length > 0 && base64Payload.length % 4 === 0; + } }