diff --git a/tools/server/webui/src/lib/clients/agentic.client.ts b/tools/server/webui/src/lib/clients/agentic.client.ts index dc4e25d82b..0ef242e9a6 100644 --- a/tools/server/webui/src/lib/clients/agentic.client.ts +++ b/tools/server/webui/src/lib/clients/agentic.client.ts @@ -66,6 +66,8 @@ export interface AgenticFlowOptions { } export interface AgenticFlowParams { + /** Conversation ID for per-conversation state tracking */ + conversationId: string; messages: (ApiChatMessageData | (DatabaseMessage & { extra?: DatabaseMessageExtra[] }))[]; options?: AgenticFlowOptions; callbacks: AgenticFlowCallbacks; @@ -80,12 +82,15 @@ export interface AgenticFlowResult { } interface AgenticStoreStateCallbacks { - setRunning: (running: boolean) => void; - setCurrentTurn: (turn: number) => void; - setTotalToolCalls: (count: number) => void; - setLastError: (error: Error | null) => void; - setStreamingToolCall: (tc: { name: string; arguments: string } | null) => void; - clearStreamingToolCall: () => void; + setRunning: (conversationId: string, running: boolean) => void; + setCurrentTurn: (conversationId: string, turn: number) => void; + setTotalToolCalls: (conversationId: string, count: number) => void; + setLastError: (conversationId: string, error: Error | null) => void; + setStreamingToolCall: ( + conversationId: string, + tc: { name: string; arguments: string } | null + ) => void; + clearStreamingToolCall: (conversationId: string) => void; } export class AgenticClient { @@ -132,7 +137,7 @@ export class AgenticClient { * @returns Result indicating if the flow handled the request */ async runAgenticFlow(params: AgenticFlowParams): Promise { - const { messages, options = {}, callbacks, signal, perChatOverrides } = params; + const { conversationId, messages, options = {}, callbacks, signal, perChatOverrides } = params; const { onChunk, onReasoningChunk, @@ -183,13 +188,14 @@ export class AgenticClient { return true; }); - this.store.setRunning(true); - this.store.setCurrentTurn(0); - this.store.setTotalToolCalls(0); - this.store.setLastError(null); + this.store.setRunning(conversationId, true); + this.store.setCurrentTurn(conversationId, 0); + this.store.setTotalToolCalls(conversationId, 0); + this.store.setLastError(conversationId, null); try { await this.executeAgenticLoop({ + conversationId, messages: normalizedMessages, options, tools, @@ -209,11 +215,11 @@ export class AgenticClient { return { handled: true }; } catch (error) { const normalizedError = error instanceof Error ? error : new Error(String(error)); - this.store.setLastError(normalizedError); + this.store.setLastError(conversationId, normalizedError); onError?.(normalizedError); return { handled: true, error: normalizedError }; } finally { - this.store.setRunning(false); + this.store.setRunning(conversationId, false); // Lazy Disconnect: Close MCP connections after agentic flow completes // This prevents continuous keepalive/heartbeat polling when tools are not in use await mcpClient.shutdown().catch((err) => { @@ -225,6 +231,7 @@ export class AgenticClient { } private async executeAgenticLoop(params: { + conversationId: string; messages: ApiChatMessageData[]; options: AgenticFlowOptions; tools: ReturnType; @@ -232,7 +239,7 @@ export class AgenticClient { callbacks: AgenticFlowCallbacks; signal?: AbortSignal; }): Promise { - const { messages, options, tools, agenticConfig, callbacks, signal } = params; + const { conversationId, messages, options, tools, agenticConfig, callbacks, signal } = params; const { onChunk, onReasoningChunk, @@ -265,7 +272,7 @@ export class AgenticClient { const maxToolPreviewLines = agenticConfig.maxToolPreviewLines; for (let turn = 0; turn < maxTurns; turn++) { - this.store.setCurrentTurn(turn + 1); + this.store.setCurrentTurn(conversationId, turn + 1); agenticTimings.turns = turn + 1; if (signal?.aborted) { @@ -359,7 +366,7 @@ export class AgenticClient { ) { lastStreamingToolCallName = name; lastStreamingToolCallArgsLength = argsLengthBucket; - this.store.setStreamingToolCall({ name, arguments: args }); + this.store.setStreamingToolCall(conversationId, { name, arguments: args }); } } } catch { @@ -385,7 +392,7 @@ export class AgenticClient { signal ); - this.store.clearStreamingToolCall(); + this.store.clearStreamingToolCall(conversationId); if (turnTimings) { agenticTimings.llm.predicted_n += turnTimings.predicted_n || 0; @@ -447,7 +454,7 @@ export class AgenticClient { function: call.function ? { ...call.function } : undefined }); } - this.store.setTotalToolCalls(allToolCalls.length); + this.store.setTotalToolCalls(conversationId, allToolCalls.length); onToolCallChunk?.(JSON.stringify(allToolCalls)); sessionMessages.push({ @@ -689,8 +696,8 @@ export class AgenticClient { return `mcp-attachment-${timestamp}-${index}.${extension}`; } - clearError(): void { - this.store.setLastError(null); + clearError(conversationId: string): void { + this.store.setLastError(conversationId, null); } } diff --git a/tools/server/webui/src/lib/stores/agentic.svelte.ts b/tools/server/webui/src/lib/stores/agentic.svelte.ts index b34d6be11e..5d717867e6 100644 --- a/tools/server/webui/src/lib/stores/agentic.svelte.ts +++ b/tools/server/webui/src/lib/stores/agentic.svelte.ts @@ -10,10 +10,16 @@ * - **agenticStore** (this): Reactive state for UI components * * **Responsibilities:** - * - Hold reactive state for UI binding (isRunning, currentTurn, etc.) - * - Provide getters for computed values + * - Hold per-conversation reactive state for UI binding + * - Provide getters for computed values (scoped by conversationId) * - Expose setters for AgenticClient to update state * - Forward method calls to AgenticClient + * - Track sampling requests for debugging + * + * **Per-Conversation Architecture:** + * - Each conversation has its own AgenticSession + * - Parallel agentic flows in different chats don't interfere + * - Sessions are created on-demand and cleaned up when done * * @see AgenticClient in clients/agentic/ for business logic * @see MCPClient in clients/mcp/ for tool execution @@ -21,6 +27,7 @@ import { browser } from '$app/environment'; import type { AgenticFlowParams, AgenticFlowResult } from '$lib/clients'; +import type { AgenticSession } from '$lib/types/agentic'; export type { AgenticFlowCallbacks, @@ -29,12 +36,25 @@ export type { AgenticFlowResult } from '$lib/clients'; +/** + * Creates a fresh agentic session with default values. + */ +function createDefaultSession(): AgenticSession { + return { + isRunning: false, + currentTurn: 0, + totalToolCalls: 0, + lastError: null, + streamingToolCall: null + }; +} + class AgenticStore { - private _isRunning = $state(false); - private _currentTurn = $state(0); - private _totalToolCalls = $state(0); - private _lastError = $state(null); - private _streamingToolCall = $state<{ name: string; arguments: string } | null>(null); + /** + * Per-conversation agentic sessions. + * Key is conversationId, value is the session state. + */ + private _sessions = $state>(new Map()); /** Reference to the client (lazy loaded to avoid circular dependency) */ private _client: typeof import('$lib/clients/agentic.client').agenticClient | null = null; @@ -60,35 +80,119 @@ class AgenticStore { this._client = agenticClient; agenticClient.setStoreCallbacks({ - setRunning: (running) => (this._isRunning = running), - setCurrentTurn: (turn) => (this._currentTurn = turn), - setTotalToolCalls: (count) => (this._totalToolCalls = count), - setLastError: (error) => (this._lastError = error), - setStreamingToolCall: (tc) => (this._streamingToolCall = tc), - clearStreamingToolCall: () => (this._streamingToolCall = null) + setRunning: (convId, running) => this.updateSession(convId, { isRunning: running }), + setCurrentTurn: (convId, turn) => this.updateSession(convId, { currentTurn: turn }), + setTotalToolCalls: (convId, count) => this.updateSession(convId, { totalToolCalls: count }), + setLastError: (convId, error) => this.updateSession(convId, { lastError: error }), + setStreamingToolCall: (convId, tc) => this.updateSession(convId, { streamingToolCall: tc }), + clearStreamingToolCall: (convId) => this.updateSession(convId, { streamingToolCall: null }) }); } - get isRunning(): boolean { - return this._isRunning; + /** + * + * Session Management + * + */ + + /** + * Get session for a conversation, creating if needed. + */ + getSession(conversationId: string): AgenticSession { + let session = this._sessions.get(conversationId); + if (!session) { + session = createDefaultSession(); + this._sessions.set(conversationId, session); + } + return session; } - get currentTurn(): number { - return this._currentTurn; + /** + * Update session state for a conversation. + */ + private updateSession(conversationId: string, update: Partial): void { + const session = this.getSession(conversationId); + const updated = { ...session, ...update }; + this._sessions.set(conversationId, updated); } - get totalToolCalls(): number { - return this._totalToolCalls; + /** + * Clear session for a conversation. + */ + clearSession(conversationId: string): void { + this._sessions.delete(conversationId); } - get lastError(): Error | null { - return this._lastError; + /** + * Get all active sessions (conversations with running agentic flows). + */ + getActiveSessions(): Array<{ conversationId: string; session: AgenticSession }> { + const active: Array<{ conversationId: string; session: AgenticSession }> = []; + for (const [conversationId, session] of this._sessions.entries()) { + if (session.isRunning) { + active.push({ conversationId, session }); + } + } + return active; } - get streamingToolCall(): { name: string; arguments: string } | null { - return this._streamingToolCall; + /** + * + * Convenience Getters (for current/active conversation) + * + */ + + /** + * Check if any agentic flow is running (global). + */ + get isAnyRunning(): boolean { + for (const session of this._sessions.values()) { + if (session.isRunning) return true; + } + return false; } + /** + * Get running state for a specific conversation. + */ + isRunning(conversationId: string): boolean { + return this.getSession(conversationId).isRunning; + } + + /** + * Get current turn for a specific conversation. + */ + currentTurn(conversationId: string): number { + return this.getSession(conversationId).currentTurn; + } + + /** + * Get total tool calls for a specific conversation. + */ + totalToolCalls(conversationId: string): number { + return this.getSession(conversationId).totalToolCalls; + } + + /** + * Get last error for a specific conversation. + */ + lastError(conversationId: string): Error | null { + return this.getSession(conversationId).lastError; + } + + /** + * Get streaming tool call for a specific conversation. + */ + streamingToolCall(conversationId: string): { name: string; arguments: string } | null { + return this.getSession(conversationId).streamingToolCall; + } + + /** + * + * Agentic Flow Execution + * + */ + /** * Run the agentic orchestration loop with MCP tools. * Delegates to AgenticClient. @@ -101,11 +205,10 @@ class AgenticStore { } /** - * Clear error state + * Clear error state for a conversation. */ - clearError(): void { - if (!this.client) return; - this.client.clearError(); + clearError(conversationId: string): void { + this.updateSession(conversationId, { lastError: null }); } } @@ -116,22 +219,30 @@ if (browser) { agenticStore.init(); } -export function agenticIsRunning() { - return agenticStore.isRunning; +/** + * Helper functions for reactive access in components. + * These require conversationId parameter for per-conversation state. + */ +export function agenticIsRunning(conversationId: string) { + return agenticStore.isRunning(conversationId); } -export function agenticCurrentTurn() { - return agenticStore.currentTurn; +export function agenticCurrentTurn(conversationId: string) { + return agenticStore.currentTurn(conversationId); } -export function agenticTotalToolCalls() { - return agenticStore.totalToolCalls; +export function agenticTotalToolCalls(conversationId: string) { + return agenticStore.totalToolCalls(conversationId); } -export function agenticLastError() { - return agenticStore.lastError; +export function agenticLastError(conversationId: string) { + return agenticStore.lastError(conversationId); } -export function agenticStreamingToolCall() { - return agenticStore.streamingToolCall; +export function agenticStreamingToolCall(conversationId: string) { + return agenticStore.streamingToolCall(conversationId); +} + +export function agenticIsAnyRunning() { + return agenticStore.isAnyRunning; } diff --git a/tools/server/webui/src/lib/types/agentic.d.ts b/tools/server/webui/src/lib/types/agentic.d.ts index 4bf7c93768..88ba324da0 100644 --- a/tools/server/webui/src/lib/types/agentic.d.ts +++ b/tools/server/webui/src/lib/types/agentic.d.ts @@ -48,3 +48,15 @@ export type AgenticChatCompletionRequest = Omit