From 38244a1bfa928572157d5f26285eed98151151b2 Mon Sep 17 00:00:00 2001 From: Pascal Date: Sat, 10 Jan 2026 18:48:46 +0100 Subject: [PATCH] webui: enable streaming of tool call arguments --- .../chat/ChatMessages/AgenticContent.svelte | 11 +- .../server/webui/src/lib/constants/agentic.ts | 9 +- .../webui/src/lib/stores/agentic.svelte.ts | 266 ++++++++++++++++++ 3 files changed, 275 insertions(+), 11 deletions(-) diff --git a/tools/server/webui/src/lib/components/app/chat/ChatMessages/AgenticContent.svelte b/tools/server/webui/src/lib/components/app/chat/ChatMessages/AgenticContent.svelte index 293fef854b..7baf7e2825 100644 --- a/tools/server/webui/src/lib/components/app/chat/ChatMessages/AgenticContent.svelte +++ b/tools/server/webui/src/lib/components/app/chat/ChatMessages/AgenticContent.svelte @@ -86,8 +86,7 @@ } const toolName = match[1]; - const toolArgsBase64 = match[2]; - const toolArgs = decodeBase64(toolArgsBase64); + const toolArgs = match[2]; // Direct JSON const toolResult = match[3].replace(/^\n+|\n+$/g, ''); sections.push({ @@ -119,9 +118,8 @@ } const toolName = pendingMatch[1]; - const toolArgsBase64 = pendingMatch[2]; - const toolArgs = decodeBase64(toolArgsBase64); - // Capture streaming result content (everything after args marker) + const toolArgs = pendingMatch[2]; // Direct JSON + // Capture streaming result content (everything after TOOL_ARGS_END marker) const streamingResult = (pendingMatch[3] || '').replace(/^\n+|\n+$/g, ''); sections.push({ @@ -140,8 +138,7 @@ } } - const partialArgsBase64 = partialWithNameMatch[2] || ''; - const partialArgs = decodeBase64(partialArgsBase64); + const partialArgs = partialWithNameMatch[2] || ''; // Direct JSON streaming sections.push({ type: AgenticSectionType.TOOL_CALL_STREAMING, diff --git a/tools/server/webui/src/lib/constants/agentic.ts b/tools/server/webui/src/lib/constants/agentic.ts index ea06bab48b..6c6cc665ca 100644 --- a/tools/server/webui/src/lib/constants/agentic.ts +++ b/tools/server/webui/src/lib/constants/agentic.ts @@ -12,7 +12,8 @@ export const AGENTIC_TAGS = { TOOL_CALL_START: '<<>>', TOOL_CALL_END: '<<>>', TOOL_NAME_PREFIX: '<<>>', + TOOL_ARGS_END: '<<>>', TAG_SUFFIX: '>>>' } as const; @@ -20,13 +21,13 @@ export const AGENTIC_TAGS = { export const AGENTIC_REGEX = { // Matches completed tool calls (with END marker) COMPLETED_TOOL_CALL: - /<<>>\n<<>>\n<<>>([\s\S]*?)<<>>/g, + /<<>>\n<<>>\n<<>>([\s\S]*?)<<>>([\s\S]*?)<<>>/g, // Matches pending tool call (has NAME and ARGS but no END) PENDING_TOOL_CALL: - /<<>>\n<<>>\n<<>>([\s\S]*)$/, + /<<>>\n<<>>\n<<>>([\s\S]*?)<<>>([\s\S]*)$/, // Matches partial tool call (has START and NAME, ARGS still streaming) PARTIAL_WITH_NAME: - /<<>>\n<<>>\n<<>>\n<<>>\n<<>>([\s\S]*)$/, // Matches early tool call (just START marker) EARLY_MATCH: /<<>>([\s\S]*)$/, // Matches partial marker at end of content diff --git a/tools/server/webui/src/lib/stores/agentic.svelte.ts b/tools/server/webui/src/lib/stores/agentic.svelte.ts index b34d6be11e..c7d529aa53 100644 --- a/tools/server/webui/src/lib/stores/agentic.svelte.ts +++ b/tools/server/webui/src/lib/stores/agentic.svelte.ts @@ -100,6 +100,272 @@ class AgenticStore { return this.client.runAgenticFlow(params); } + // ───────────────────────────────────────────────────────────────────────────── + // Private: Agentic Loop Implementation + // ───────────────────────────────────────────────────────────────────────────── + + private async executeAgenticLoop(params: { + messages: ApiChatMessageData[]; + options: AgenticFlowOptions; + tools: ReturnType; + agenticConfig: ReturnType; + callbacks: AgenticFlowCallbacks; + signal?: AbortSignal; + }): Promise { + const { messages, options, tools, agenticConfig, callbacks, signal } = params; + const { onChunk, onReasoningChunk, onToolCallChunk, onModel, onComplete, onTimings } = + callbacks; + + // Set up LLM client + const llmClient = new OpenAISseClient({ + url: './v1/chat/completions', + buildHeaders: () => getAuthHeaders() + }); + + // Prepare session state + const sessionMessages: AgenticMessage[] = toAgenticMessages(messages); + const allToolCalls: ApiChatCompletionToolCall[] = []; + + // Wrapper to emit agentic tags progressively during streaming + const emittedToolCallStates = $state( + new Map() + ); + const wrappedOnToolCallChunk = (serializedToolCalls: string) => { + const toolCalls: ApiChatCompletionToolCall[] = JSON.parse(serializedToolCalls); + + for (let i = 0; i < toolCalls.length; i++) { + const toolCall = toolCalls[i]; + const toolName = toolCall.function?.name ?? ''; + const toolArgs = toolCall.function?.arguments ?? ''; + + const state = emittedToolCallStates.get(i) || { emittedOnce: false, lastArgs: '' }; + + if (!state.emittedOnce) { + // First emission: send full header + args + let output = `\n\n<<>>`; + output += `\n<<>>`; + output += `\n<<>>\n`; + output += toolArgs; + onChunk?.(output); + state.emittedOnce = true; + state.lastArgs = toolArgs; + } else if (toolArgs !== state.lastArgs) { + // Subsequent emissions: send only delta + const delta = toolArgs.slice(state.lastArgs.length); + onChunk?.(delta); + state.lastArgs = toolArgs; + } + + emittedToolCallStates.set(i, state); + } + + onToolCallChunk?.(serializedToolCalls); + }; + let capturedTimings: ChatMessageTimings | undefined; + + // Build base request from options (messages change per turn) + const requestBase: AgenticChatCompletionRequest = { + ...options, + stream: true, + messages: [] + }; + + const maxTurns = agenticConfig.maxTurns; + const maxToolPreviewLines = agenticConfig.maxToolPreviewLines; + + // Run agentic loop + for (let turn = 0; turn < maxTurns; turn++) { + this._currentTurn = turn + 1; + + if (signal?.aborted) { + onComplete?.('', undefined, capturedTimings, undefined); + return; + } + + // Build LLM request for this turn + const llmRequest: AgenticChatCompletionRequest = { + ...requestBase, + messages: sessionMessages, + tools: tools.length > 0 ? tools : undefined + }; + + // Filter reasoning content after first turn if configured + const shouldFilterReasoning = agenticConfig.filterReasoningAfterFirstTurn && turn > 0; + + // Stream from LLM + let turnResult: OpenAISseTurnResult; + try { + turnResult = await llmClient.stream( + llmRequest, + { + onChunk, + onReasoningChunk: shouldFilterReasoning ? undefined : onReasoningChunk, + onToolCallChunk: wrappedOnToolCallChunk, + onModel, + onFirstValidChunk: undefined, + onProcessingUpdate: (timings, progress) => { + onTimings?.(timings, progress); + if (timings) capturedTimings = timings; + } + }, + signal + ); + } catch (error) { + if (signal?.aborted) { + onComplete?.('', undefined, capturedTimings, undefined); + return; + } + const normalizedError = error instanceof Error ? error : new Error('LLM stream error'); + onChunk?.(`\n\n\`\`\`\nUpstream LLM error:\n${normalizedError.message}\n\`\`\`\n`); + onComplete?.('', undefined, capturedTimings, undefined); + throw normalizedError; + } + + // Check if we should stop (no tool calls or finish reason isn't tool_calls) + if ( + turnResult.toolCalls.length === 0 || + (turnResult.finishReason && turnResult.finishReason !== 'tool_calls') + ) { + onComplete?.('', undefined, capturedTimings, undefined); + return; + } + + // Normalize and validate tool calls + const normalizedCalls = this.normalizeToolCalls(turnResult.toolCalls); + if (normalizedCalls.length === 0) { + onComplete?.('', undefined, capturedTimings, undefined); + return; + } + + // Accumulate tool calls + for (const call of normalizedCalls) { + allToolCalls.push({ + id: call.id, + type: call.type, + function: call.function ? { ...call.function } : undefined + }); + } + this._totalToolCalls = allToolCalls.length; + + // Add assistant message with tool calls to session + sessionMessages.push({ + role: 'assistant', + content: turnResult.content || undefined, + tool_calls: normalizedCalls + }); + + // Execute each tool call via MCP + for (const toolCall of normalizedCalls) { + if (signal?.aborted) { + onComplete?.('', undefined, capturedTimings, undefined); + return; + } + + const mcpCall: MCPToolCall = { + id: toolCall.id, + function: { + name: toolCall.function.name, + arguments: toolCall.function.arguments + } + }; + + let result: string; + try { + const executionResult = await mcpStore.executeTool(mcpCall, signal); + result = executionResult.content; + } catch (error) { + if (error instanceof Error && error.name === 'AbortError') { + onComplete?.('', undefined, capturedTimings, undefined); + return; + } + result = `Error: ${error instanceof Error ? error.message : String(error)}`; + } + + if (signal?.aborted) { + onComplete?.('', undefined, capturedTimings, undefined); + return; + } + + // Emit tool result and end marker + this.emitToolCallResult(result, maxToolPreviewLines, onChunk); + + // Add tool result to session (sanitize base64 images for context) + const contextValue = this.isBase64Image(result) ? '[Image displayed to user]' : result; + sessionMessages.push({ + role: 'tool', + tool_call_id: toolCall.id, + content: contextValue + }); + } + } + + // Turn limit reached + onChunk?.('\n\n```\nTurn limit reached\n```\n'); + onComplete?.('', undefined, capturedTimings, undefined); + } + + // ───────────────────────────────────────────────────────────────────────────── + // Private: Helper Methods + // ───────────────────────────────────────────────────────────────────────────── + + /** + * Normalize tool calls from LLM response + */ + private normalizeToolCalls(toolCalls: ApiChatCompletionToolCall[]): AgenticToolCallList { + if (!toolCalls) return []; + return toolCalls.map((call, index) => ({ + id: call?.id ?? `tool_${index}`, + type: (call?.type as 'function') ?? 'function', + function: { + name: call?.function?.name ?? '', + arguments: call?.function?.arguments ?? '' + } + })); + } + + /** + * Emit tool call result and end marker. + */ + private emitToolCallResult( + result: string, + maxLines: number, + emit?: (chunk: string) => void + ): void { + if (!emit) return; + + let output = ''; + output += `\n<<>>`; + if (this.isBase64Image(result)) { + output += `\n![tool-result](${result.trim()})`; + } else { + // Don't wrap in code fences - result may already be markdown with its own code blocks + const lines = result.split('\n'); + const trimmedLines = lines.length > maxLines ? lines.slice(-maxLines) : lines; + output += `\n${trimmedLines.join('\n')}`; + } + + output += `\n<<>>\n`; + emit(output); + } + + /** + * Check if content is a base64 image + */ + private isBase64Image(content: string): boolean { + const trimmed = content.trim(); + if (!trimmed.startsWith('data:image/')) return false; + + const match = trimmed.match(/^data:image\/(png|jpe?g|gif|webp);base64,([A-Za-z0-9+/]+=*)$/); + if (!match) return false; + + const base64Payload = match[2]; + return base64Payload.length > 0 && base64Payload.length % 4 === 0; + } + + // ───────────────────────────────────────────────────────────────────────────── + // Utilities + // ───────────────────────────────────────────────────────────────────────────── + /** * Clear error state */