buster/apps/cli/src/services/analytics-engineer-handler.ts

143 lines
4.6 KiB
TypeScript
Raw Normal View History

import { randomUUID } from 'node:crypto';
import type { ModelMessage } from '@buster/ai';
import { createAnalyticsEngineerAgent } from '@buster/ai/agents/analytics-engineer-agent/analytics-engineer-agent';
2025-10-04 00:16:31 +08:00
import { createProxyModel } from '@buster/ai/llm/providers/proxy-model';
2025-10-03 23:38:30 +08:00
import type { AgentMessage } from '../types/agent-messages';
2025-10-04 00:16:31 +08:00
import { getProxyConfig } from '../utils/ai-proxy';
import { loadConversation, saveModelMessages } from '../utils/conversation-history';
/**
* CLI wrapper for agent messages with unique ID for React keys
*/
export interface CliAgentMessage {
id: number;
message: AgentMessage;
2025-10-01 10:49:25 +08:00
}
export interface RunAnalyticsEngineerAgentParams {
2025-10-04 00:16:31 +08:00
chatId: string;
workingDirectory: string;
onThinkingStateChange?: (thinking: boolean) => void;
2025-10-04 05:49:21 +08:00
onMessageUpdate?: (messages: ModelMessage[]) => void;
abortSignal?: AbortSignal;
2025-10-01 10:49:25 +08:00
}
/**
* Runs the analytics engineer agent in the CLI without sandbox
2025-10-01 10:49:25 +08:00
* The agent runs locally but uses the proxy model to route LLM calls through the server
2025-10-04 05:49:21 +08:00
* Messages are emitted via callback for immediate UI updates and saved to disk for persistence
2025-10-01 10:49:25 +08:00
*/
export async function runAnalyticsEngineerAgent(params: RunAnalyticsEngineerAgentParams) {
2025-10-04 05:49:21 +08:00
const { chatId, workingDirectory, onThinkingStateChange, onMessageUpdate, abortSignal } = params;
// Load conversation history to maintain context across sessions
const conversation = await loadConversation(chatId, workingDirectory);
// Get the stored model messages (full conversation including tool calls/results)
const previousMessages: ModelMessage[] = conversation
? (conversation.modelMessages as ModelMessage[])
: [];
2025-10-01 10:49:25 +08:00
// Get proxy configuration
const proxyConfig = await getProxyConfig();
// Create proxy model that routes through server
const proxyModel = createProxyModel({
baseURL: proxyConfig.baseURL,
apiKey: proxyConfig.apiKey,
2025-10-08 01:31:35 +08:00
modelId: 'openai/gpt-5-codex',
2025-10-01 10:49:25 +08:00
});
// Create the docs agent with proxy model
2025-10-01 10:49:25 +08:00
// Tools are handled locally, only model calls go through proxy
const analyticsEngineerAgent = createAnalyticsEngineerAgent({
folder_structure: process.cwd(), // Use current working directory for CLI mode
2025-10-01 10:49:25 +08:00
userId: 'cli-user',
2025-10-04 00:16:31 +08:00
chatId: chatId,
2025-10-01 10:49:25 +08:00
dataSourceId: '',
organizationId: 'cli',
messageId: randomUUID(),
2025-10-07 03:06:09 +08:00
todosList: [],
2025-10-01 10:49:25 +08:00
model: proxyModel,
abortSignal,
2025-10-07 10:27:03 +08:00
apiKey: proxyConfig.apiKey,
apiUrl: proxyConfig.baseURL,
});
// Use conversation history - includes user messages, assistant messages, tool calls, and tool results
const messages: ModelMessage[] = previousMessages;
2025-10-03 22:34:18 +08:00
// Start the stream - this triggers the agent to run
const stream = await analyticsEngineerAgent.stream({ messages });
2025-10-03 23:07:20 +08:00
// Notify thinking state
onThinkingStateChange?.(true);
2025-10-04 05:49:21 +08:00
// Track accumulated messages as we stream
let currentMessages = [...messages];
2025-10-06 23:37:40 +08:00
let accumulatedText = '';
2025-10-04 05:49:21 +08:00
// Consume the stream
for await (const part of stream.fullStream) {
2025-10-06 23:37:40 +08:00
if (part.type === 'tool-call') {
const toolCallMessage: ModelMessage = {
role: 'assistant',
content: [
{
type: 'tool-call',
toolCallId: part.toolCallId,
toolName: part.toolName,
input: part.input,
},
],
};
currentMessages.push(toolCallMessage);
onMessageUpdate?.(currentMessages);
await saveModelMessages(chatId, workingDirectory, currentMessages);
}
if (part.type === 'tool-result') {
const toolResultMessage: ModelMessage = {
role: 'tool',
content: [
{
type: 'tool-result',
toolCallId: part.toolCallId,
toolName: part.toolName,
output: {
type: 'json',
value: typeof part.output === 'string' ? part.output : JSON.stringify(part.output),
},
},
],
};
currentMessages.push(toolResultMessage);
onMessageUpdate?.(currentMessages);
await saveModelMessages(chatId, workingDirectory, currentMessages);
}
if (part.type === 'text-delta') {
accumulatedText += part.text;
}
2025-10-04 05:49:21 +08:00
if (part.type === 'finish') {
2025-10-06 23:37:40 +08:00
// Add final assistant message if there's any text
if (accumulatedText.trim()) {
const assistantMessage: ModelMessage = {
role: 'assistant',
content: accumulatedText,
};
currentMessages.push(assistantMessage);
}
2025-10-04 05:49:21 +08:00
// Update state with final messages
2025-10-06 23:37:40 +08:00
onMessageUpdate?.(currentMessages);
2025-10-04 05:49:21 +08:00
// Save to disk
2025-10-06 23:37:40 +08:00
await saveModelMessages(chatId, workingDirectory, currentMessages);
2025-10-04 05:49:21 +08:00
onThinkingStateChange?.(false);
}
2025-10-01 10:49:25 +08:00
}
}