feat: Per-conversation agentic loop state

This commit is contained in:
Aleksander Grygier 2026-01-22 17:38:51 +01:00
parent 1565cda1ff
commit da252e3425
3 changed files with 187 additions and 57 deletions

View File

@ -66,6 +66,8 @@ export interface AgenticFlowOptions {
} }
export interface AgenticFlowParams { export interface AgenticFlowParams {
/** Conversation ID for per-conversation state tracking */
conversationId: string;
messages: (ApiChatMessageData | (DatabaseMessage & { extra?: DatabaseMessageExtra[] }))[]; messages: (ApiChatMessageData | (DatabaseMessage & { extra?: DatabaseMessageExtra[] }))[];
options?: AgenticFlowOptions; options?: AgenticFlowOptions;
callbacks: AgenticFlowCallbacks; callbacks: AgenticFlowCallbacks;
@ -80,12 +82,15 @@ export interface AgenticFlowResult {
} }
interface AgenticStoreStateCallbacks { interface AgenticStoreStateCallbacks {
setRunning: (running: boolean) => void; setRunning: (conversationId: string, running: boolean) => void;
setCurrentTurn: (turn: number) => void; setCurrentTurn: (conversationId: string, turn: number) => void;
setTotalToolCalls: (count: number) => void; setTotalToolCalls: (conversationId: string, count: number) => void;
setLastError: (error: Error | null) => void; setLastError: (conversationId: string, error: Error | null) => void;
setStreamingToolCall: (tc: { name: string; arguments: string } | null) => void; setStreamingToolCall: (
clearStreamingToolCall: () => void; conversationId: string,
tc: { name: string; arguments: string } | null
) => void;
clearStreamingToolCall: (conversationId: string) => void;
} }
export class AgenticClient { export class AgenticClient {
@ -132,7 +137,7 @@ export class AgenticClient {
* @returns Result indicating if the flow handled the request * @returns Result indicating if the flow handled the request
*/ */
async runAgenticFlow(params: AgenticFlowParams): Promise<AgenticFlowResult> { async runAgenticFlow(params: AgenticFlowParams): Promise<AgenticFlowResult> {
const { messages, options = {}, callbacks, signal, perChatOverrides } = params; const { conversationId, messages, options = {}, callbacks, signal, perChatOverrides } = params;
const { const {
onChunk, onChunk,
onReasoningChunk, onReasoningChunk,
@ -183,13 +188,14 @@ export class AgenticClient {
return true; return true;
}); });
this.store.setRunning(true); this.store.setRunning(conversationId, true);
this.store.setCurrentTurn(0); this.store.setCurrentTurn(conversationId, 0);
this.store.setTotalToolCalls(0); this.store.setTotalToolCalls(conversationId, 0);
this.store.setLastError(null); this.store.setLastError(conversationId, null);
try { try {
await this.executeAgenticLoop({ await this.executeAgenticLoop({
conversationId,
messages: normalizedMessages, messages: normalizedMessages,
options, options,
tools, tools,
@ -209,11 +215,11 @@ export class AgenticClient {
return { handled: true }; return { handled: true };
} catch (error) { } catch (error) {
const normalizedError = error instanceof Error ? error : new Error(String(error)); const normalizedError = error instanceof Error ? error : new Error(String(error));
this.store.setLastError(normalizedError); this.store.setLastError(conversationId, normalizedError);
onError?.(normalizedError); onError?.(normalizedError);
return { handled: true, error: normalizedError }; return { handled: true, error: normalizedError };
} finally { } finally {
this.store.setRunning(false); this.store.setRunning(conversationId, false);
// Lazy Disconnect: Close MCP connections after agentic flow completes // Lazy Disconnect: Close MCP connections after agentic flow completes
// This prevents continuous keepalive/heartbeat polling when tools are not in use // This prevents continuous keepalive/heartbeat polling when tools are not in use
await mcpClient.shutdown().catch((err) => { await mcpClient.shutdown().catch((err) => {
@ -225,6 +231,7 @@ export class AgenticClient {
} }
private async executeAgenticLoop(params: { private async executeAgenticLoop(params: {
conversationId: string;
messages: ApiChatMessageData[]; messages: ApiChatMessageData[];
options: AgenticFlowOptions; options: AgenticFlowOptions;
tools: ReturnType<typeof mcpClient.getToolDefinitionsForLLM>; tools: ReturnType<typeof mcpClient.getToolDefinitionsForLLM>;
@ -232,7 +239,7 @@ export class AgenticClient {
callbacks: AgenticFlowCallbacks; callbacks: AgenticFlowCallbacks;
signal?: AbortSignal; signal?: AbortSignal;
}): Promise<void> { }): Promise<void> {
const { messages, options, tools, agenticConfig, callbacks, signal } = params; const { conversationId, messages, options, tools, agenticConfig, callbacks, signal } = params;
const { const {
onChunk, onChunk,
onReasoningChunk, onReasoningChunk,
@ -265,7 +272,7 @@ export class AgenticClient {
const maxToolPreviewLines = agenticConfig.maxToolPreviewLines; const maxToolPreviewLines = agenticConfig.maxToolPreviewLines;
for (let turn = 0; turn < maxTurns; turn++) { for (let turn = 0; turn < maxTurns; turn++) {
this.store.setCurrentTurn(turn + 1); this.store.setCurrentTurn(conversationId, turn + 1);
agenticTimings.turns = turn + 1; agenticTimings.turns = turn + 1;
if (signal?.aborted) { if (signal?.aborted) {
@ -359,7 +366,7 @@ export class AgenticClient {
) { ) {
lastStreamingToolCallName = name; lastStreamingToolCallName = name;
lastStreamingToolCallArgsLength = argsLengthBucket; lastStreamingToolCallArgsLength = argsLengthBucket;
this.store.setStreamingToolCall({ name, arguments: args }); this.store.setStreamingToolCall(conversationId, { name, arguments: args });
} }
} }
} catch { } catch {
@ -385,7 +392,7 @@ export class AgenticClient {
signal signal
); );
this.store.clearStreamingToolCall(); this.store.clearStreamingToolCall(conversationId);
if (turnTimings) { if (turnTimings) {
agenticTimings.llm.predicted_n += turnTimings.predicted_n || 0; agenticTimings.llm.predicted_n += turnTimings.predicted_n || 0;
@ -447,7 +454,7 @@ export class AgenticClient {
function: call.function ? { ...call.function } : undefined function: call.function ? { ...call.function } : undefined
}); });
} }
this.store.setTotalToolCalls(allToolCalls.length); this.store.setTotalToolCalls(conversationId, allToolCalls.length);
onToolCallChunk?.(JSON.stringify(allToolCalls)); onToolCallChunk?.(JSON.stringify(allToolCalls));
sessionMessages.push({ sessionMessages.push({
@ -689,8 +696,8 @@ export class AgenticClient {
return `mcp-attachment-${timestamp}-${index}.${extension}`; return `mcp-attachment-${timestamp}-${index}.${extension}`;
} }
clearError(): void { clearError(conversationId: string): void {
this.store.setLastError(null); this.store.setLastError(conversationId, null);
} }
} }

View File

@ -10,10 +10,16 @@
* - **agenticStore** (this): Reactive state for UI components * - **agenticStore** (this): Reactive state for UI components
* *
* **Responsibilities:** * **Responsibilities:**
* - Hold reactive state for UI binding (isRunning, currentTurn, etc.) * - Hold per-conversation reactive state for UI binding
* - Provide getters for computed values * - Provide getters for computed values (scoped by conversationId)
* - Expose setters for AgenticClient to update state * - Expose setters for AgenticClient to update state
* - Forward method calls to AgenticClient * - Forward method calls to AgenticClient
* - Track sampling requests for debugging
*
* **Per-Conversation Architecture:**
* - Each conversation has its own AgenticSession
* - Parallel agentic flows in different chats don't interfere
* - Sessions are created on-demand and cleaned up when done
* *
* @see AgenticClient in clients/agentic/ for business logic * @see AgenticClient in clients/agentic/ for business logic
* @see MCPClient in clients/mcp/ for tool execution * @see MCPClient in clients/mcp/ for tool execution
@ -21,6 +27,7 @@
import { browser } from '$app/environment'; import { browser } from '$app/environment';
import type { AgenticFlowParams, AgenticFlowResult } from '$lib/clients'; import type { AgenticFlowParams, AgenticFlowResult } from '$lib/clients';
import type { AgenticSession } from '$lib/types/agentic';
export type { export type {
AgenticFlowCallbacks, AgenticFlowCallbacks,
@ -29,12 +36,25 @@ export type {
AgenticFlowResult AgenticFlowResult
} from '$lib/clients'; } from '$lib/clients';
/**
* Creates a fresh agentic session with default values.
*/
function createDefaultSession(): AgenticSession {
return {
isRunning: false,
currentTurn: 0,
totalToolCalls: 0,
lastError: null,
streamingToolCall: null
};
}
class AgenticStore { class AgenticStore {
private _isRunning = $state(false); /**
private _currentTurn = $state(0); * Per-conversation agentic sessions.
private _totalToolCalls = $state(0); * Key is conversationId, value is the session state.
private _lastError = $state<Error | null>(null); */
private _streamingToolCall = $state<{ name: string; arguments: string } | null>(null); private _sessions = $state<Map<string, AgenticSession>>(new Map());
/** Reference to the client (lazy loaded to avoid circular dependency) */ /** Reference to the client (lazy loaded to avoid circular dependency) */
private _client: typeof import('$lib/clients/agentic.client').agenticClient | null = null; private _client: typeof import('$lib/clients/agentic.client').agenticClient | null = null;
@ -60,35 +80,119 @@ class AgenticStore {
this._client = agenticClient; this._client = agenticClient;
agenticClient.setStoreCallbacks({ agenticClient.setStoreCallbacks({
setRunning: (running) => (this._isRunning = running), setRunning: (convId, running) => this.updateSession(convId, { isRunning: running }),
setCurrentTurn: (turn) => (this._currentTurn = turn), setCurrentTurn: (convId, turn) => this.updateSession(convId, { currentTurn: turn }),
setTotalToolCalls: (count) => (this._totalToolCalls = count), setTotalToolCalls: (convId, count) => this.updateSession(convId, { totalToolCalls: count }),
setLastError: (error) => (this._lastError = error), setLastError: (convId, error) => this.updateSession(convId, { lastError: error }),
setStreamingToolCall: (tc) => (this._streamingToolCall = tc), setStreamingToolCall: (convId, tc) => this.updateSession(convId, { streamingToolCall: tc }),
clearStreamingToolCall: () => (this._streamingToolCall = null) clearStreamingToolCall: (convId) => this.updateSession(convId, { streamingToolCall: null })
}); });
} }
get isRunning(): boolean { /**
return this._isRunning; *
* Session Management
*
*/
/**
* Get session for a conversation, creating if needed.
*/
getSession(conversationId: string): AgenticSession {
let session = this._sessions.get(conversationId);
if (!session) {
session = createDefaultSession();
this._sessions.set(conversationId, session);
}
return session;
} }
get currentTurn(): number { /**
return this._currentTurn; * Update session state for a conversation.
*/
private updateSession(conversationId: string, update: Partial<AgenticSession>): void {
const session = this.getSession(conversationId);
const updated = { ...session, ...update };
this._sessions.set(conversationId, updated);
} }
get totalToolCalls(): number { /**
return this._totalToolCalls; * Clear session for a conversation.
*/
clearSession(conversationId: string): void {
this._sessions.delete(conversationId);
} }
get lastError(): Error | null { /**
return this._lastError; * Get all active sessions (conversations with running agentic flows).
*/
getActiveSessions(): Array<{ conversationId: string; session: AgenticSession }> {
const active: Array<{ conversationId: string; session: AgenticSession }> = [];
for (const [conversationId, session] of this._sessions.entries()) {
if (session.isRunning) {
active.push({ conversationId, session });
}
}
return active;
} }
get streamingToolCall(): { name: string; arguments: string } | null { /**
return this._streamingToolCall; *
* Convenience Getters (for current/active conversation)
*
*/
/**
* Check if any agentic flow is running (global).
*/
get isAnyRunning(): boolean {
for (const session of this._sessions.values()) {
if (session.isRunning) return true;
}
return false;
} }
/**
* Get running state for a specific conversation.
*/
isRunning(conversationId: string): boolean {
return this.getSession(conversationId).isRunning;
}
/**
* Get current turn for a specific conversation.
*/
currentTurn(conversationId: string): number {
return this.getSession(conversationId).currentTurn;
}
/**
* Get total tool calls for a specific conversation.
*/
totalToolCalls(conversationId: string): number {
return this.getSession(conversationId).totalToolCalls;
}
/**
* Get last error for a specific conversation.
*/
lastError(conversationId: string): Error | null {
return this.getSession(conversationId).lastError;
}
/**
* Get streaming tool call for a specific conversation.
*/
streamingToolCall(conversationId: string): { name: string; arguments: string } | null {
return this.getSession(conversationId).streamingToolCall;
}
/**
*
* Agentic Flow Execution
*
*/
/** /**
* Run the agentic orchestration loop with MCP tools. * Run the agentic orchestration loop with MCP tools.
* Delegates to AgenticClient. * Delegates to AgenticClient.
@ -368,11 +472,10 @@ class AgenticStore {
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
/** /**
* Clear error state * Clear error state for a conversation.
*/ */
clearError(): void { clearError(conversationId: string): void {
if (!this.client) return; this.updateSession(conversationId, { lastError: null });
this.client.clearError();
} }
} }
@ -383,22 +486,30 @@ if (browser) {
agenticStore.init(); agenticStore.init();
} }
export function agenticIsRunning() { /**
return agenticStore.isRunning; * Helper functions for reactive access in components.
* These require conversationId parameter for per-conversation state.
*/
export function agenticIsRunning(conversationId: string) {
return agenticStore.isRunning(conversationId);
} }
export function agenticCurrentTurn() { export function agenticCurrentTurn(conversationId: string) {
return agenticStore.currentTurn; return agenticStore.currentTurn(conversationId);
} }
export function agenticTotalToolCalls() { export function agenticTotalToolCalls(conversationId: string) {
return agenticStore.totalToolCalls; return agenticStore.totalToolCalls(conversationId);
} }
export function agenticLastError() { export function agenticLastError(conversationId: string) {
return agenticStore.lastError; return agenticStore.lastError(conversationId);
} }
export function agenticStreamingToolCall() { export function agenticStreamingToolCall(conversationId: string) {
return agenticStore.streamingToolCall; return agenticStore.streamingToolCall(conversationId);
}
export function agenticIsAnyRunning() {
return agenticStore.isAnyRunning;
} }

View File

@ -48,3 +48,15 @@ export type AgenticChatCompletionRequest = Omit<ApiChatCompletionRequest, 'messa
stream: true; stream: true;
tools?: ApiChatCompletionRequest['tools']; tools?: ApiChatCompletionRequest['tools'];
}; };
/**
* Per-conversation agentic session state.
* Enables parallel agentic flows across multiple chats.
*/
export interface AgenticSession {
isRunning: boolean;
currentTurn: number;
totalToolCalls: number;
lastError: Error | null;
streamingToolCall: { name: string; arguments: string } | null;
}