feat: Implement agentic orchestration within ChatService
This commit is contained in:
parent
5f5d5ab45f
commit
60475dca3c
|
|
@ -1,10 +1,15 @@
|
|||
import { getAuthHeaders, getJsonHeaders } from '$lib/utils';
|
||||
import { AttachmentType } from '$lib/enums';
|
||||
import { config } from '$lib/stores/settings.svelte';
|
||||
import { ensureMcpClient } from '$lib/services/mcp-singleton';
|
||||
import { getAgenticConfig } from '$lib/config/agentic';
|
||||
import { AgenticOrchestrator } from '$lib/agentic/orchestrator';
|
||||
import { OpenAISseClient } from '$lib/agentic/openai-sse-client';
|
||||
import { OpenAISseClient, type OpenAISseTurnResult } from '$lib/agentic/openai-sse-client';
|
||||
import type {
|
||||
AgenticChatCompletionRequest,
|
||||
AgenticMessage,
|
||||
AgenticToolCallList
|
||||
} from '$lib/agentic/types';
|
||||
import { toAgenticMessages } from '$lib/agentic/types';
|
||||
import type { MCPToolCall, IMCPClient } from '$lib/mcp';
|
||||
|
||||
/**
|
||||
* ChatService - Low-level API communication layer for Chat Completions
|
||||
|
|
@ -57,7 +62,8 @@ export class ChatService {
|
|||
messages: ApiChatMessageData[] | (DatabaseMessage & { extra?: DatabaseMessageExtra[] })[],
|
||||
options: SettingsChatServiceOptions = {},
|
||||
conversationId?: string,
|
||||
signal?: AbortSignal
|
||||
signal?: AbortSignal,
|
||||
mcpClient?: IMCPClient
|
||||
): Promise<string | void> {
|
||||
const {
|
||||
stream,
|
||||
|
|
@ -178,68 +184,35 @@ export class ChatService {
|
|||
}
|
||||
}
|
||||
|
||||
// MCP agentic orchestration (low-coupling mode)
|
||||
// Check if MCP client is available and agentic mode is enabled
|
||||
if (stream) {
|
||||
const mcpClient = await ensureMcpClient();
|
||||
const agenticConfig = mcpClient ? getAgenticConfig(config()) : undefined;
|
||||
// MCP agentic orchestration
|
||||
// Run agentic loop if MCP client is provided and agentic mode is enabled
|
||||
const agenticConfig = getAgenticConfig(config());
|
||||
if (stream && agenticConfig.enabled && mcpClient) {
|
||||
console.log('[ChatService] Running agentic loop with MCP client');
|
||||
try {
|
||||
const agenticResult = await ChatService.runAgenticLoop({
|
||||
mcpClient,
|
||||
messages: normalizedMessages,
|
||||
requestBody,
|
||||
agenticConfig,
|
||||
callbacks: {
|
||||
onChunk,
|
||||
onReasoningChunk,
|
||||
onToolCallChunk,
|
||||
onModel,
|
||||
onComplete,
|
||||
onError,
|
||||
onTimings
|
||||
},
|
||||
signal
|
||||
});
|
||||
|
||||
// Debug: verify MCP tools are available
|
||||
if (mcpClient) {
|
||||
const availableTools = mcpClient.listTools();
|
||||
console.log(
|
||||
`[MCP] Client initialized with ${availableTools.length} tools:`,
|
||||
availableTools
|
||||
);
|
||||
} else {
|
||||
console.log('[MCP] No MCP client available');
|
||||
}
|
||||
|
||||
if (mcpClient && agenticConfig?.enabled) {
|
||||
try {
|
||||
const llmClient = new OpenAISseClient({
|
||||
url: './v1/chat/completions',
|
||||
buildHeaders: () => getAuthHeaders()
|
||||
});
|
||||
|
||||
const orchestrator = new AgenticOrchestrator({
|
||||
mcpClient,
|
||||
llmClient,
|
||||
maxTurns: agenticConfig.maxTurns,
|
||||
maxToolPreviewLines: agenticConfig.maxToolPreviewLines
|
||||
});
|
||||
|
||||
let capturedTimings: ChatMessageTimings | undefined;
|
||||
|
||||
await orchestrator.run({
|
||||
initialMessages: normalizedMessages,
|
||||
requestTemplate: requestBody,
|
||||
callbacks: {
|
||||
onChunk,
|
||||
onReasoningChunk,
|
||||
onToolCallChunk,
|
||||
onModel,
|
||||
onComplete: onComplete
|
||||
? () => onComplete('', undefined, capturedTimings, undefined)
|
||||
: undefined,
|
||||
onError
|
||||
},
|
||||
abortSignal: signal,
|
||||
onProcessingUpdate: (timings, progress) => {
|
||||
ChatService.notifyTimings(timings, progress, onTimings);
|
||||
if (timings) {
|
||||
capturedTimings = timings;
|
||||
}
|
||||
},
|
||||
maxTurns: agenticConfig.maxTurns,
|
||||
filterReasoningAfterFirstTurn: agenticConfig.filterReasoningAfterFirstTurn
|
||||
});
|
||||
|
||||
return;
|
||||
} catch (error) {
|
||||
// If MCP orchestration fails, log and fall through to standard flow
|
||||
console.warn('MCP orchestration failed, falling back to standard flow:', error);
|
||||
if (agenticResult) {
|
||||
return; // Agentic loop handled the request
|
||||
}
|
||||
// Fall through to standard flow if agentic loop returned false
|
||||
} catch (error) {
|
||||
console.warn('[ChatService] Agentic loop failed, falling back to standard flow:', error);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -851,4 +824,251 @@ export class ChatService {
|
|||
|
||||
onTimingsCallback(timings, promptProgress);
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Agentic Orchestration
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Run the agentic orchestration loop with MCP tools.
|
||||
* The MCP client is passed from the store layer - ChatService remains stateless.
|
||||
*
|
||||
* @param params - Parameters for the agentic loop including the MCP client
|
||||
* @returns true if agentic loop handled the request
|
||||
*/
|
||||
private static async runAgenticLoop(params: {
|
||||
mcpClient: IMCPClient;
|
||||
messages: ApiChatMessageData[];
|
||||
requestBody: ApiChatCompletionRequest;
|
||||
agenticConfig: ReturnType<typeof getAgenticConfig>;
|
||||
callbacks: {
|
||||
onChunk?: (chunk: string) => void;
|
||||
onReasoningChunk?: (chunk: string) => void;
|
||||
onToolCallChunk?: (serializedToolCalls: string) => void;
|
||||
onModel?: (model: string) => void;
|
||||
onComplete?: (
|
||||
content: string,
|
||||
reasoningContent?: string,
|
||||
timings?: ChatMessageTimings,
|
||||
toolCalls?: string
|
||||
) => void;
|
||||
onError?: (error: Error) => void;
|
||||
onTimings?: (timings: ChatMessageTimings, promptProgress?: ChatMessagePromptProgress) => void;
|
||||
};
|
||||
signal?: AbortSignal;
|
||||
}): Promise<boolean> {
|
||||
const { mcpClient, messages, requestBody, agenticConfig, callbacks, signal } = params;
|
||||
const { onChunk, onReasoningChunk, onToolCallChunk, onModel, onComplete, onError, onTimings } =
|
||||
callbacks;
|
||||
|
||||
console.log(`[ChatService] Running agentic loop with ${mcpClient.listTools().length} tools`);
|
||||
|
||||
// Set up LLM client
|
||||
const llmClient = new OpenAISseClient({
|
||||
url: './v1/chat/completions',
|
||||
buildHeaders: () => getAuthHeaders()
|
||||
});
|
||||
|
||||
// Prepare session state
|
||||
const sessionMessages: AgenticMessage[] = toAgenticMessages(messages);
|
||||
const tools = await mcpClient.getToolsDefinition();
|
||||
const allToolCalls: ApiChatCompletionToolCall[] = [];
|
||||
let capturedTimings: ChatMessageTimings | undefined;
|
||||
|
||||
const requestWithoutMessages = { ...requestBody };
|
||||
delete (requestWithoutMessages as Partial<ApiChatCompletionRequest>).messages;
|
||||
const requestBase: AgenticChatCompletionRequest = {
|
||||
...(requestWithoutMessages as Omit<ApiChatCompletionRequest, 'messages'>),
|
||||
stream: true,
|
||||
messages: []
|
||||
};
|
||||
|
||||
const maxTurns = agenticConfig.maxTurns;
|
||||
const maxToolPreviewLines = agenticConfig.maxToolPreviewLines;
|
||||
|
||||
// Run agentic loop
|
||||
for (let turn = 0; turn < maxTurns; turn++) {
|
||||
if (signal?.aborted) {
|
||||
onComplete?.('', undefined, capturedTimings, undefined);
|
||||
return true;
|
||||
}
|
||||
|
||||
const llmRequest: AgenticChatCompletionRequest = {
|
||||
...requestBase,
|
||||
messages: sessionMessages,
|
||||
tools: tools.length > 0 ? tools : undefined
|
||||
};
|
||||
|
||||
const shouldFilterReasoning = agenticConfig.filterReasoningAfterFirstTurn && turn > 0;
|
||||
|
||||
let turnResult: OpenAISseTurnResult;
|
||||
try {
|
||||
turnResult = await llmClient.stream(
|
||||
llmRequest,
|
||||
{
|
||||
onChunk,
|
||||
onReasoningChunk: shouldFilterReasoning ? undefined : onReasoningChunk,
|
||||
onModel,
|
||||
onFirstValidChunk: undefined,
|
||||
onProcessingUpdate: (timings, progress) => {
|
||||
ChatService.notifyTimings(timings, progress, onTimings);
|
||||
if (timings) capturedTimings = timings;
|
||||
}
|
||||
},
|
||||
signal
|
||||
);
|
||||
} catch (error) {
|
||||
if (signal?.aborted) {
|
||||
onComplete?.('', undefined, capturedTimings, undefined);
|
||||
return true;
|
||||
}
|
||||
const normalizedError = error instanceof Error ? error : new Error('LLM stream error');
|
||||
onError?.(normalizedError);
|
||||
onChunk?.(`\n\n\`\`\`\nUpstream LLM error:\n${normalizedError.message}\n\`\`\`\n`);
|
||||
onComplete?.('', undefined, capturedTimings, undefined);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if we should stop (no tool calls or finish reason isn't tool_calls)
|
||||
if (
|
||||
turnResult.toolCalls.length === 0 ||
|
||||
(turnResult.finishReason && turnResult.finishReason !== 'tool_calls')
|
||||
) {
|
||||
onComplete?.('', undefined, capturedTimings, undefined);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Normalize and validate tool calls
|
||||
const normalizedCalls = ChatService.normalizeToolCalls(turnResult.toolCalls);
|
||||
if (normalizedCalls.length === 0) {
|
||||
onComplete?.('', undefined, capturedTimings, undefined);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Accumulate tool calls
|
||||
for (const call of normalizedCalls) {
|
||||
allToolCalls.push({
|
||||
id: call.id,
|
||||
type: call.type,
|
||||
function: call.function ? { ...call.function } : undefined
|
||||
});
|
||||
}
|
||||
onToolCallChunk?.(JSON.stringify(allToolCalls));
|
||||
|
||||
// Add assistant message with tool calls to session
|
||||
sessionMessages.push({
|
||||
role: 'assistant',
|
||||
content: turnResult.content || undefined,
|
||||
tool_calls: normalizedCalls
|
||||
});
|
||||
|
||||
// Execute each tool call
|
||||
for (const toolCall of normalizedCalls) {
|
||||
if (signal?.aborted) {
|
||||
onComplete?.('', undefined, capturedTimings, undefined);
|
||||
return true;
|
||||
}
|
||||
|
||||
const mcpCall: MCPToolCall = {
|
||||
id: toolCall.id,
|
||||
function: {
|
||||
name: toolCall.function.name,
|
||||
arguments: toolCall.function.arguments
|
||||
}
|
||||
};
|
||||
|
||||
let result: string;
|
||||
try {
|
||||
result = await mcpClient.execute(mcpCall, signal);
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.name !== 'AbortError') {
|
||||
onError?.(error);
|
||||
}
|
||||
result = `Error: ${error instanceof Error ? error.message : String(error)}`;
|
||||
}
|
||||
|
||||
if (signal?.aborted) {
|
||||
onComplete?.('', undefined, capturedTimings, undefined);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Emit tool preview
|
||||
ChatService.emitToolPreview(toolCall, result, maxToolPreviewLines, onChunk);
|
||||
|
||||
// Add tool result to session (sanitize base64 images for context)
|
||||
const contextValue = ChatService.isBase64Image(result)
|
||||
? '[Image displayed to user]'
|
||||
: result;
|
||||
sessionMessages.push({
|
||||
role: 'tool',
|
||||
tool_call_id: toolCall.id,
|
||||
content: contextValue
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Turn limit reached
|
||||
onChunk?.('\n\n```\nTurn limit reached\n```\n');
|
||||
onComplete?.('', undefined, capturedTimings, undefined);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize tool calls from LLM response
|
||||
*/
|
||||
private static normalizeToolCalls(toolCalls: ApiChatCompletionToolCall[]): AgenticToolCallList {
|
||||
if (!toolCalls) return [];
|
||||
return toolCalls.map((call, index) => ({
|
||||
id: call?.id ?? `tool_${index}`,
|
||||
type: (call?.type as 'function') ?? 'function',
|
||||
function: {
|
||||
name: call?.function?.name ?? '',
|
||||
arguments: call?.function?.arguments ?? ''
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit tool call preview to the chunk callback
|
||||
*/
|
||||
private static emitToolPreview(
|
||||
toolCall: AgenticToolCallList[number],
|
||||
result: string,
|
||||
maxLines: number,
|
||||
emit?: (chunk: string) => void
|
||||
): void {
|
||||
if (!emit) return;
|
||||
|
||||
const toolName = toolCall.function.name;
|
||||
const toolArgs = toolCall.function.arguments;
|
||||
|
||||
let output = `\n\n<!-- AGENTIC_TOOL_CALL_START -->`;
|
||||
output += `\n<!-- TOOL_NAME: ${toolName} -->`;
|
||||
output += `\n<!-- TOOL_ARGS: ${toolArgs.replace(/\n/g, '\\n')} -->`;
|
||||
|
||||
if (ChatService.isBase64Image(result)) {
|
||||
output += `\n})`;
|
||||
} else {
|
||||
const lines = result.split('\n');
|
||||
const trimmedLines = lines.length > maxLines ? lines.slice(-maxLines) : lines;
|
||||
output += `\n\`\`\`\n${trimmedLines.join('\n')}\n\`\`\``;
|
||||
}
|
||||
|
||||
output += `\n<!-- AGENTIC_TOOL_CALL_END -->\n`;
|
||||
emit(output);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if content is a base64 image
|
||||
*/
|
||||
private static isBase64Image(content: string): boolean {
|
||||
const trimmed = content.trim();
|
||||
if (!trimmed.startsWith('data:image/')) return false;
|
||||
|
||||
const match = trimmed.match(/^data:image\/(png|jpe?g|gif|webp);base64,([A-Za-z0-9+/]+=*)$/);
|
||||
if (!match) return false;
|
||||
|
||||
const base64Payload = match[2];
|
||||
return base64Payload.length > 0 && base64Payload.length % 4 === 0;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue