From c6a28e39d5a7ef9e0209ddb767ae20986feb3b25 Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 12 Aug 2025 12:44:15 -0600 Subject: [PATCH] migration changes --- .../analyst-agent-task/analyst-agent-task.ts | 108 ++++-------------- .../message-post-processing.ts | 74 ++++-------- .../format-follow-up-message-step.int.test.ts | 3 +- .../format-follow-up-message-step.ts | 2 +- 4 files changed, 48 insertions(+), 139 deletions(-) diff --git a/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts b/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts index 6476c4187..d68b08787 100644 --- a/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts +++ b/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts @@ -15,44 +15,10 @@ import { } from '@buster/database'; // AI package imports -import { type AnalystRuntimeContext, analystWorkflow } from '@buster/ai'; - -// Mastra workflow integration -import { RuntimeContext } from '@mastra/core/runtime-context'; +import { type AnalystWorkflowInput, runAnalystWorkflow } from '@buster/ai'; import type { messagePostProcessingTask } from '../message-post-processing/message-post-processing'; -/** - * Task 3: Setup runtime context from Task 2 database helper outputs - * Uses individual helper results to populate Mastra RuntimeContext - */ -function setupRuntimeContextFromMessage( - messageContext: MessageContextOutput, - dataSource: OrganizationDataSourceOutput, - messageId: string -): RuntimeContext { - try { - const runtimeContext = new RuntimeContext(); - - // Populate from Task 2 helper outputs - runtimeContext.set('userId', messageContext.userId); - runtimeContext.set('chatId', messageContext.chatId); - runtimeContext.set('organizationId', messageContext.organizationId); - runtimeContext.set('dataSourceId', dataSource.dataSourceId); - runtimeContext.set('dataSourceSyntax', dataSource.dataSourceSyntax); - runtimeContext.set('workflowStartTime', Date.now()); - - // Add messageId for database persistence (following AI package pattern) - runtimeContext.set('messageId', messageId); - - return runtimeContext; - } catch (error) { - throw error instanceof Error - ? error - : new Error(`Failed to setup runtime context: ${String(error)}`); - } -} - /** * Resource usage tracker for the entire task execution */ @@ -358,30 +324,33 @@ export const analystAgentTask: ReturnType< // Log performance after data loading logPerformanceMetrics('post-data-load', payload.message_id, taskStartTime, resourceTracker); - // Task 3: Setup runtime context for workflow execution - const contextSetupStart = Date.now(); - const runtimeContext = setupRuntimeContextFromMessage( - messageContext, - dataSource, - payload.message_id - ); - const contextSetupTime = Date.now() - contextSetupStart; - // Task 4: Prepare workflow input with conversation history and dashboard files - const workflowInput = { - prompt: messageContext.requestMessage, - conversationHistory: conversationHistory.length > 0 ? conversationHistory : undefined, - dashboardFiles: dashboardFiles.length > 0 ? dashboardFiles : undefined, + // Convert conversation history to messages format expected by the workflow + const messages = + conversationHistory.length > 0 + ? conversationHistory + : [ + { + role: 'user' as const, + content: messageContext.requestMessage, + }, + ]; + + const workflowInput: AnalystWorkflowInput = { + messages, + messageId: payload.message_id, + chatId: messageContext.chatId, + userId: messageContext.userId, + organizationId: messageContext.organizationId, + dataSourceId: dataSource.dataSourceId, + dataSourceSyntax: dataSource.dataSourceSyntax, }; logger.log('Workflow input prepared', { messageId: payload.message_id, - hasPrompt: !!workflowInput.prompt, - hasConversationHistory: !!workflowInput.conversationHistory, - conversationHistoryLength: workflowInput.conversationHistory?.length || 0, - hasDashboardFiles: !!workflowInput.dashboardFiles, - dashboardFilesCount: workflowInput.dashboardFiles?.length || 0, - contextSetupTimeMs: contextSetupTime, + messagesCount: workflowInput.messages.length, + hasDashboardFiles: dashboardFiles.length > 0, + dashboardFilesCount: dashboardFiles.length, totalPrepTimeMs: Date.now() - dataLoadStart, }); @@ -395,21 +364,7 @@ export const analystAgentTask: ReturnType< totalPrepTimeMs: Date.now() - dataLoadStart, }); - // Pre-create the workflow run to measure initialization time separately - const createRunStart = Date.now(); - const run = analystWorkflow.createRun(); - const createRunTime = Date.now() - createRunStart; - - logger.log('Workflow run created', { - messageId: payload.message_id, - createRunTimeMs: createRunTime, - }); - - // Log performance after workflow run creation - logPerformanceMetrics('post-createrun', payload.message_id, taskStartTime, resourceTracker); - // Execute workflow with tracing - const workflowStartMethodStart = Date.now(); const tracedWorkflow = wrapTraced( async () => { currentSpan().log({ @@ -423,26 +378,19 @@ export const analystAgentTask: ReturnType< }, }); - return await run.start({ - inputData: workflowInput, - runtimeContext, - }); + return await runAnalystWorkflow(workflowInput); }, { name: 'Analyst Agent Task Workflow', } ); - const workflowResult = await tracedWorkflow(); - const workflowStartMethodTime = Date.now() - workflowStartMethodStart; + await tracedWorkflow(); const totalWorkflowTime = Date.now() - workflowExecutionStart; logger.log('Analyst workflow completed successfully', { messageId: payload.message_id, - workflowResult: !!workflowResult, - workflowStartMethodTimeMs: workflowStartMethodTime, totalWorkflowTimeMs: totalWorkflowTime, - createRunTimeMs: createRunTime, }); // Log final performance metrics @@ -459,9 +407,6 @@ export const analystAgentTask: ReturnType< executionTimeMs: totalExecutionTime, breakdown: { dataLoadTimeMs: dataLoadTime, - contextSetupTimeMs: contextSetupTime, - createRunTimeMs: createRunTime, - workflowStartMethodTimeMs: workflowStartMethodTime, totalWorkflowTimeMs: totalWorkflowTime, }, }); @@ -558,9 +503,6 @@ function getErrorCode(error: unknown): string { if (error.message.includes('Database query failed')) return 'DATABASE_ERROR'; if (error.message.includes('Output validation failed')) return 'DATA_VALIDATION_ERROR'; - // Task 3: Runtime context errors - if (error.message.includes('Failed to setup runtime context')) return 'RUNTIME_CONTEXT_ERROR'; - // Task 5: Workflow execution errors if (error.message.includes('workflow') || error.message.includes('Workflow')) return 'WORKFLOW_EXECUTION_ERROR'; diff --git a/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts b/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts index a55ae8b61..f92ed5653 100644 --- a/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts +++ b/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts @@ -1,6 +1,6 @@ import postProcessingWorkflow, { type PostProcessingWorkflowOutput, -} from '@buster/ai/workflows/post-processing-workflow'; +} from '@buster/ai/workflows/message-post-processing-workflow/message-post-processing-workflow'; import { eq, getBraintrustMetadata, getDb, messages, slackIntegrations } from '@buster/database'; import type { AssumptionClassification, @@ -37,14 +37,16 @@ function extractDbFields( // Check if there are any major assumptions const hasMajorAssumptions = - workflowOutput.assumptions?.some((assumption) => assumption.label === 'major') ?? false; + workflowOutput.assumptionsResult?.assumptions?.some( + (assumption) => assumption.label === 'major' + ) ?? false; // Determine confidence score based on rules: // - Low if toolCalled is 'flagChat' // - Low if there are any major assumptions // - High otherwise let confidence_score: ConfidenceScore = 'high'; - if (workflowOutput.toolCalled === 'flagChat' || hasMajorAssumptions) { + if (workflowOutput.assumptionsResult?.toolCalled === 'flagChat' || hasMajorAssumptions) { confidence_score = 'low'; } @@ -52,27 +54,30 @@ function extractDbFields( let summaryMessage: string; let summaryTitle: string; - if (!hasMajorAssumptions && workflowOutput.flagChatMessage) { + if (!hasMajorAssumptions && workflowOutput.flagChatResult?.message) { // If no major assumptions, use flagChatMessage as summaryMessage - summaryMessage = workflowOutput.flagChatMessage; + summaryMessage = workflowOutput.flagChatResult.message; summaryTitle = 'No Major Assumptions Identified'; } else { // Otherwise use the provided summary fields or defaults - summaryMessage = workflowOutput.summaryMessage || 'No summary available'; - summaryTitle = workflowOutput.summaryTitle || 'Summary'; + summaryMessage = + workflowOutput.flagChatResult?.summaryMessage || + workflowOutput.formattedMessage || + 'No summary available'; + summaryTitle = workflowOutput.flagChatResult?.summaryTitle || 'Summary'; } const extracted: PostProcessingMessage = { summary_message: summaryMessage, summary_title: summaryTitle, confidence_score, - assumptions: workflowOutput.assumptions?.map((assumption) => ({ + assumptions: workflowOutput.assumptionsResult?.assumptions?.map((assumption) => ({ descriptive_title: assumption.descriptiveTitle, classification: assumption.classification as AssumptionClassification, explanation: assumption.explanation, label: assumption.label as AssumptionLabel, })), - tool_called: workflowOutput.toolCalled || 'unknown', // Provide default if missing + tool_called: workflowOutput.assumptionsResult?.toolCalled || 'unknown', // Provide default if missing user_name: userName, }; @@ -171,63 +176,24 @@ export const messagePostProcessingTask: ReturnType< }, }); - const run = postProcessingWorkflow.createRun(); - return await run.start({ - inputData: workflowInput, - }); + return await postProcessingWorkflow(workflowInput); }, { name: 'Message Post-Processing Workflow', } ); - const workflowResult = await tracedWorkflow(); + const validatedOutput = await tracedWorkflow(); - if (!workflowResult || workflowResult.status !== 'success' || !workflowResult.result) { + if (!validatedOutput) { throw new Error('Post-processing workflow returned no output'); } - // Handle branch results - the result will have one of the branch step IDs as a key - let validatedOutput: PostProcessingWorkflowOutput; - - // Define the expected shape of branch results - type BranchResult = { - 'format-follow-up-message'?: PostProcessingWorkflowOutput; - 'format-initial-message'?: PostProcessingWorkflowOutput; - }; - - const branchResult = workflowResult.result as BranchResult; - - if ('format-follow-up-message' in branchResult && branchResult['format-follow-up-message']) { - validatedOutput = branchResult['format-follow-up-message'] as PostProcessingWorkflowOutput; - } else if ( - 'format-initial-message' in branchResult && - branchResult['format-initial-message'] - ) { - validatedOutput = branchResult['format-initial-message'] as PostProcessingWorkflowOutput; - } else { - logger.error('Unexpected workflow result structure', { - messageId: payload.messageId, - resultKeys: Object.keys(branchResult), - result: branchResult, - }); - console.error('Unexpected workflow result structure:', { - messageId: payload.messageId, - resultKeys: Object.keys(branchResult), - result: branchResult, - }); - throw new Error('Post-processing workflow returned unexpected result structure'); - } - logger.log('Validated output', { messageId: payload.messageId, - summaryTitle: validatedOutput.summaryTitle, - summaryMessage: validatedOutput.summaryMessage, - flagChatMessage: validatedOutput.flagChatMessage, - flagChatTitle: validatedOutput.flagChatTitle, - toolCalled: validatedOutput.toolCalled, - assumptions: validatedOutput.assumptions, - message: validatedOutput.message, + flagChatResult: validatedOutput.flagChatResult, + assumptionsResult: validatedOutput.assumptionsResult, + formattedMessage: validatedOutput.formattedMessage, }); // Step 6: Store result in database diff --git a/packages/ai/src/steps/message-post-processing-steps/format-follow-up-message-step/format-follow-up-message-step.int.test.ts b/packages/ai/src/steps/message-post-processing-steps/format-follow-up-message-step/format-follow-up-message-step.int.test.ts index c3d445a39..e4812fc76 100644 --- a/packages/ai/src/steps/message-post-processing-steps/format-follow-up-message-step/format-follow-up-message-step.int.test.ts +++ b/packages/ai/src/steps/message-post-processing-steps/format-follow-up-message-step/format-follow-up-message-step.int.test.ts @@ -55,7 +55,8 @@ describe('format-follow-up-message-step integration', () => { majorAssumptions: [ { descriptiveTitle: 'Date range assumption for monthly trends', - explanation: 'User requested monthly trends but did not specify date range, so assuming all available historical data', + explanation: + 'User requested monthly trends but did not specify date range, so assuming all available historical data', label: 'major', }, ], diff --git a/packages/ai/src/steps/message-post-processing-steps/format-follow-up-message-step/format-follow-up-message-step.ts b/packages/ai/src/steps/message-post-processing-steps/format-follow-up-message-step/format-follow-up-message-step.ts index 98d684f1f..7dd7212f0 100644 --- a/packages/ai/src/steps/message-post-processing-steps/format-follow-up-message-step/format-follow-up-message-step.ts +++ b/packages/ai/src/steps/message-post-processing-steps/format-follow-up-message-step/format-follow-up-message-step.ts @@ -204,4 +204,4 @@ export async function runFormatFollowUpMessageStep( // For other errors, throw a user-friendly message throw new Error('Unable to format the follow-up message. Please try again later.'); } -} \ No newline at end of file +}