feat: Per-conversation agentic loop state

This commit is contained in:
Aleksander Grygier 2026-01-22 17:38:51 +01:00
parent 39d0ff485d
commit c02e83c32a
3 changed files with 187 additions and 57 deletions

View File

@ -66,6 +66,8 @@ export interface AgenticFlowOptions {
}
export interface AgenticFlowParams {
/** Conversation ID for per-conversation state tracking */
conversationId: string;
messages: (ApiChatMessageData | (DatabaseMessage & { extra?: DatabaseMessageExtra[] }))[];
options?: AgenticFlowOptions;
callbacks: AgenticFlowCallbacks;
@ -80,12 +82,15 @@ export interface AgenticFlowResult {
}
interface AgenticStoreStateCallbacks {
setRunning: (running: boolean) => void;
setCurrentTurn: (turn: number) => void;
setTotalToolCalls: (count: number) => void;
setLastError: (error: Error | null) => void;
setStreamingToolCall: (tc: { name: string; arguments: string } | null) => void;
clearStreamingToolCall: () => void;
setRunning: (conversationId: string, running: boolean) => void;
setCurrentTurn: (conversationId: string, turn: number) => void;
setTotalToolCalls: (conversationId: string, count: number) => void;
setLastError: (conversationId: string, error: Error | null) => void;
setStreamingToolCall: (
conversationId: string,
tc: { name: string; arguments: string } | null
) => void;
clearStreamingToolCall: (conversationId: string) => void;
}
export class AgenticClient {
@ -132,7 +137,7 @@ export class AgenticClient {
* @returns Result indicating if the flow handled the request
*/
async runAgenticFlow(params: AgenticFlowParams): Promise<AgenticFlowResult> {
const { messages, options = {}, callbacks, signal, perChatOverrides } = params;
const { conversationId, messages, options = {}, callbacks, signal, perChatOverrides } = params;
const {
onChunk,
onReasoningChunk,
@ -183,13 +188,14 @@ export class AgenticClient {
return true;
});
this.store.setRunning(true);
this.store.setCurrentTurn(0);
this.store.setTotalToolCalls(0);
this.store.setLastError(null);
this.store.setRunning(conversationId, true);
this.store.setCurrentTurn(conversationId, 0);
this.store.setTotalToolCalls(conversationId, 0);
this.store.setLastError(conversationId, null);
try {
await this.executeAgenticLoop({
conversationId,
messages: normalizedMessages,
options,
tools,
@ -209,11 +215,11 @@ export class AgenticClient {
return { handled: true };
} catch (error) {
const normalizedError = error instanceof Error ? error : new Error(String(error));
this.store.setLastError(normalizedError);
this.store.setLastError(conversationId, normalizedError);
onError?.(normalizedError);
return { handled: true, error: normalizedError };
} finally {
this.store.setRunning(false);
this.store.setRunning(conversationId, false);
// Lazy Disconnect: Close MCP connections after agentic flow completes
// This prevents continuous keepalive/heartbeat polling when tools are not in use
await mcpClient.shutdown().catch((err) => {
@ -225,6 +231,7 @@ export class AgenticClient {
}
private async executeAgenticLoop(params: {
conversationId: string;
messages: ApiChatMessageData[];
options: AgenticFlowOptions;
tools: ReturnType<typeof mcpClient.getToolDefinitionsForLLM>;
@ -232,7 +239,7 @@ export class AgenticClient {
callbacks: AgenticFlowCallbacks;
signal?: AbortSignal;
}): Promise<void> {
const { messages, options, tools, agenticConfig, callbacks, signal } = params;
const { conversationId, messages, options, tools, agenticConfig, callbacks, signal } = params;
const {
onChunk,
onReasoningChunk,
@ -265,7 +272,7 @@ export class AgenticClient {
const maxToolPreviewLines = agenticConfig.maxToolPreviewLines;
for (let turn = 0; turn < maxTurns; turn++) {
this.store.setCurrentTurn(turn + 1);
this.store.setCurrentTurn(conversationId, turn + 1);
agenticTimings.turns = turn + 1;
if (signal?.aborted) {
@ -359,7 +366,7 @@ export class AgenticClient {
) {
lastStreamingToolCallName = name;
lastStreamingToolCallArgsLength = argsLengthBucket;
this.store.setStreamingToolCall({ name, arguments: args });
this.store.setStreamingToolCall(conversationId, { name, arguments: args });
}
}
} catch {
@ -385,7 +392,7 @@ export class AgenticClient {
signal
);
this.store.clearStreamingToolCall();
this.store.clearStreamingToolCall(conversationId);
if (turnTimings) {
agenticTimings.llm.predicted_n += turnTimings.predicted_n || 0;
@ -447,7 +454,7 @@ export class AgenticClient {
function: call.function ? { ...call.function } : undefined
});
}
this.store.setTotalToolCalls(allToolCalls.length);
this.store.setTotalToolCalls(conversationId, allToolCalls.length);
onToolCallChunk?.(JSON.stringify(allToolCalls));
sessionMessages.push({
@ -689,8 +696,8 @@ export class AgenticClient {
return `mcp-attachment-${timestamp}-${index}.${extension}`;
}
clearError(): void {
this.store.setLastError(null);
clearError(conversationId: string): void {
this.store.setLastError(conversationId, null);
}
}

View File

@ -10,10 +10,16 @@
* - **agenticStore** (this): Reactive state for UI components
*
* **Responsibilities:**
* - Hold reactive state for UI binding (isRunning, currentTurn, etc.)
* - Provide getters for computed values
* - Hold per-conversation reactive state for UI binding
* - Provide getters for computed values (scoped by conversationId)
* - Expose setters for AgenticClient to update state
* - Forward method calls to AgenticClient
* - Track sampling requests for debugging
*
* **Per-Conversation Architecture:**
* - Each conversation has its own AgenticSession
* - Parallel agentic flows in different chats don't interfere
* - Sessions are created on-demand and cleaned up when done
*
* @see AgenticClient in clients/agentic/ for business logic
* @see MCPClient in clients/mcp/ for tool execution
@ -21,6 +27,7 @@
import { browser } from '$app/environment';
import type { AgenticFlowParams, AgenticFlowResult } from '$lib/clients';
import type { AgenticSession } from '$lib/types/agentic';
export type {
AgenticFlowCallbacks,
@ -29,12 +36,25 @@ export type {
AgenticFlowResult
} from '$lib/clients';
/**
* Creates a fresh agentic session with default values.
*/
function createDefaultSession(): AgenticSession {
return {
isRunning: false,
currentTurn: 0,
totalToolCalls: 0,
lastError: null,
streamingToolCall: null
};
}
class AgenticStore {
private _isRunning = $state(false);
private _currentTurn = $state(0);
private _totalToolCalls = $state(0);
private _lastError = $state<Error | null>(null);
private _streamingToolCall = $state<{ name: string; arguments: string } | null>(null);
/**
* Per-conversation agentic sessions.
* Key is conversationId, value is the session state.
*/
private _sessions = $state<Map<string, AgenticSession>>(new Map());
/** Reference to the client (lazy loaded to avoid circular dependency) */
private _client: typeof import('$lib/clients/agentic.client').agenticClient | null = null;
@ -60,35 +80,119 @@ class AgenticStore {
this._client = agenticClient;
agenticClient.setStoreCallbacks({
setRunning: (running) => (this._isRunning = running),
setCurrentTurn: (turn) => (this._currentTurn = turn),
setTotalToolCalls: (count) => (this._totalToolCalls = count),
setLastError: (error) => (this._lastError = error),
setStreamingToolCall: (tc) => (this._streamingToolCall = tc),
clearStreamingToolCall: () => (this._streamingToolCall = null)
setRunning: (convId, running) => this.updateSession(convId, { isRunning: running }),
setCurrentTurn: (convId, turn) => this.updateSession(convId, { currentTurn: turn }),
setTotalToolCalls: (convId, count) => this.updateSession(convId, { totalToolCalls: count }),
setLastError: (convId, error) => this.updateSession(convId, { lastError: error }),
setStreamingToolCall: (convId, tc) => this.updateSession(convId, { streamingToolCall: tc }),
clearStreamingToolCall: (convId) => this.updateSession(convId, { streamingToolCall: null })
});
}
get isRunning(): boolean {
return this._isRunning;
/**
*
* Session Management
*
*/
/**
* Get session for a conversation, creating if needed.
*/
getSession(conversationId: string): AgenticSession {
let session = this._sessions.get(conversationId);
if (!session) {
session = createDefaultSession();
this._sessions.set(conversationId, session);
}
return session;
}
get currentTurn(): number {
return this._currentTurn;
/**
* Update session state for a conversation.
*/
private updateSession(conversationId: string, update: Partial<AgenticSession>): void {
const session = this.getSession(conversationId);
const updated = { ...session, ...update };
this._sessions.set(conversationId, updated);
}
get totalToolCalls(): number {
return this._totalToolCalls;
/**
* Clear session for a conversation.
*/
clearSession(conversationId: string): void {
this._sessions.delete(conversationId);
}
get lastError(): Error | null {
return this._lastError;
/**
* Get all active sessions (conversations with running agentic flows).
*/
getActiveSessions(): Array<{ conversationId: string; session: AgenticSession }> {
const active: Array<{ conversationId: string; session: AgenticSession }> = [];
for (const [conversationId, session] of this._sessions.entries()) {
if (session.isRunning) {
active.push({ conversationId, session });
}
}
return active;
}
get streamingToolCall(): { name: string; arguments: string } | null {
return this._streamingToolCall;
/**
*
* Convenience Getters (for current/active conversation)
*
*/
/**
* Check if any agentic flow is running (global).
*/
get isAnyRunning(): boolean {
for (const session of this._sessions.values()) {
if (session.isRunning) return true;
}
return false;
}
/**
* Get running state for a specific conversation.
*/
isRunning(conversationId: string): boolean {
return this.getSession(conversationId).isRunning;
}
/**
* Get current turn for a specific conversation.
*/
currentTurn(conversationId: string): number {
return this.getSession(conversationId).currentTurn;
}
/**
* Get total tool calls for a specific conversation.
*/
totalToolCalls(conversationId: string): number {
return this.getSession(conversationId).totalToolCalls;
}
/**
* Get last error for a specific conversation.
*/
lastError(conversationId: string): Error | null {
return this.getSession(conversationId).lastError;
}
/**
* Get streaming tool call for a specific conversation.
*/
streamingToolCall(conversationId: string): { name: string; arguments: string } | null {
return this.getSession(conversationId).streamingToolCall;
}
/**
*
* Agentic Flow Execution
*
*/
/**
* Run the agentic orchestration loop with MCP tools.
* Delegates to AgenticClient.
@ -101,11 +205,10 @@ class AgenticStore {
}
/**
* Clear error state
* Clear error state for a conversation.
*/
clearError(): void {
if (!this.client) return;
this.client.clearError();
clearError(conversationId: string): void {
this.updateSession(conversationId, { lastError: null });
}
}
@ -116,22 +219,30 @@ if (browser) {
agenticStore.init();
}
export function agenticIsRunning() {
return agenticStore.isRunning;
/**
* Helper functions for reactive access in components.
* These require conversationId parameter for per-conversation state.
*/
export function agenticIsRunning(conversationId: string) {
return agenticStore.isRunning(conversationId);
}
export function agenticCurrentTurn() {
return agenticStore.currentTurn;
export function agenticCurrentTurn(conversationId: string) {
return agenticStore.currentTurn(conversationId);
}
export function agenticTotalToolCalls() {
return agenticStore.totalToolCalls;
export function agenticTotalToolCalls(conversationId: string) {
return agenticStore.totalToolCalls(conversationId);
}
export function agenticLastError() {
return agenticStore.lastError;
export function agenticLastError(conversationId: string) {
return agenticStore.lastError(conversationId);
}
export function agenticStreamingToolCall() {
return agenticStore.streamingToolCall;
export function agenticStreamingToolCall(conversationId: string) {
return agenticStore.streamingToolCall(conversationId);
}
export function agenticIsAnyRunning() {
return agenticStore.isAnyRunning;
}

View File

@ -48,3 +48,15 @@ export type AgenticChatCompletionRequest = Omit<ApiChatCompletionRequest, 'messa
stream: true;
tools?: ApiChatCompletionRequest['tools'];
};
/**
* Per-conversation agentic session state.
* Enables parallel agentic flows across multiple chats.
*/
export interface AgenticSession {
isRunning: boolean;
currentTurn: number;
totalToolCalls: number;
lastError: Error | null;
streamingToolCall: { name: string; arguments: string } | null;
}