migration changes

This commit is contained in:
dal 2025-08-12 12:44:15 -06:00
parent 79103376f1
commit c6a28e39d5
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 48 additions and 139 deletions

View File

@ -15,44 +15,10 @@ import {
} from '@buster/database'; } from '@buster/database';
// AI package imports // AI package imports
import { type AnalystRuntimeContext, analystWorkflow } from '@buster/ai'; import { type AnalystWorkflowInput, runAnalystWorkflow } from '@buster/ai';
// Mastra workflow integration
import { RuntimeContext } from '@mastra/core/runtime-context';
import type { messagePostProcessingTask } from '../message-post-processing/message-post-processing'; import type { messagePostProcessingTask } from '../message-post-processing/message-post-processing';
/**
* Task 3: Setup runtime context from Task 2 database helper outputs
* Uses individual helper results to populate Mastra RuntimeContext
*/
function setupRuntimeContextFromMessage(
messageContext: MessageContextOutput,
dataSource: OrganizationDataSourceOutput,
messageId: string
): RuntimeContext<AnalystRuntimeContext> {
try {
const runtimeContext = new RuntimeContext<AnalystRuntimeContext>();
// Populate from Task 2 helper outputs
runtimeContext.set('userId', messageContext.userId);
runtimeContext.set('chatId', messageContext.chatId);
runtimeContext.set('organizationId', messageContext.organizationId);
runtimeContext.set('dataSourceId', dataSource.dataSourceId);
runtimeContext.set('dataSourceSyntax', dataSource.dataSourceSyntax);
runtimeContext.set('workflowStartTime', Date.now());
// Add messageId for database persistence (following AI package pattern)
runtimeContext.set('messageId', messageId);
return runtimeContext;
} catch (error) {
throw error instanceof Error
? error
: new Error(`Failed to setup runtime context: ${String(error)}`);
}
}
/** /**
* Resource usage tracker for the entire task execution * Resource usage tracker for the entire task execution
*/ */
@ -358,30 +324,33 @@ export const analystAgentTask: ReturnType<
// Log performance after data loading // Log performance after data loading
logPerformanceMetrics('post-data-load', payload.message_id, taskStartTime, resourceTracker); logPerformanceMetrics('post-data-load', payload.message_id, taskStartTime, resourceTracker);
// Task 3: Setup runtime context for workflow execution
const contextSetupStart = Date.now();
const runtimeContext = setupRuntimeContextFromMessage(
messageContext,
dataSource,
payload.message_id
);
const contextSetupTime = Date.now() - contextSetupStart;
// Task 4: Prepare workflow input with conversation history and dashboard files // Task 4: Prepare workflow input with conversation history and dashboard files
const workflowInput = { // Convert conversation history to messages format expected by the workflow
prompt: messageContext.requestMessage, const messages =
conversationHistory: conversationHistory.length > 0 ? conversationHistory : undefined, conversationHistory.length > 0
dashboardFiles: dashboardFiles.length > 0 ? dashboardFiles : undefined, ? conversationHistory
: [
{
role: 'user' as const,
content: messageContext.requestMessage,
},
];
const workflowInput: AnalystWorkflowInput = {
messages,
messageId: payload.message_id,
chatId: messageContext.chatId,
userId: messageContext.userId,
organizationId: messageContext.organizationId,
dataSourceId: dataSource.dataSourceId,
dataSourceSyntax: dataSource.dataSourceSyntax,
}; };
logger.log('Workflow input prepared', { logger.log('Workflow input prepared', {
messageId: payload.message_id, messageId: payload.message_id,
hasPrompt: !!workflowInput.prompt, messagesCount: workflowInput.messages.length,
hasConversationHistory: !!workflowInput.conversationHistory, hasDashboardFiles: dashboardFiles.length > 0,
conversationHistoryLength: workflowInput.conversationHistory?.length || 0, dashboardFilesCount: dashboardFiles.length,
hasDashboardFiles: !!workflowInput.dashboardFiles,
dashboardFilesCount: workflowInput.dashboardFiles?.length || 0,
contextSetupTimeMs: contextSetupTime,
totalPrepTimeMs: Date.now() - dataLoadStart, totalPrepTimeMs: Date.now() - dataLoadStart,
}); });
@ -395,21 +364,7 @@ export const analystAgentTask: ReturnType<
totalPrepTimeMs: Date.now() - dataLoadStart, totalPrepTimeMs: Date.now() - dataLoadStart,
}); });
// Pre-create the workflow run to measure initialization time separately
const createRunStart = Date.now();
const run = analystWorkflow.createRun();
const createRunTime = Date.now() - createRunStart;
logger.log('Workflow run created', {
messageId: payload.message_id,
createRunTimeMs: createRunTime,
});
// Log performance after workflow run creation
logPerformanceMetrics('post-createrun', payload.message_id, taskStartTime, resourceTracker);
// Execute workflow with tracing // Execute workflow with tracing
const workflowStartMethodStart = Date.now();
const tracedWorkflow = wrapTraced( const tracedWorkflow = wrapTraced(
async () => { async () => {
currentSpan().log({ currentSpan().log({
@ -423,26 +378,19 @@ export const analystAgentTask: ReturnType<
}, },
}); });
return await run.start({ return await runAnalystWorkflow(workflowInput);
inputData: workflowInput,
runtimeContext,
});
}, },
{ {
name: 'Analyst Agent Task Workflow', name: 'Analyst Agent Task Workflow',
} }
); );
const workflowResult = await tracedWorkflow(); await tracedWorkflow();
const workflowStartMethodTime = Date.now() - workflowStartMethodStart;
const totalWorkflowTime = Date.now() - workflowExecutionStart; const totalWorkflowTime = Date.now() - workflowExecutionStart;
logger.log('Analyst workflow completed successfully', { logger.log('Analyst workflow completed successfully', {
messageId: payload.message_id, messageId: payload.message_id,
workflowResult: !!workflowResult,
workflowStartMethodTimeMs: workflowStartMethodTime,
totalWorkflowTimeMs: totalWorkflowTime, totalWorkflowTimeMs: totalWorkflowTime,
createRunTimeMs: createRunTime,
}); });
// Log final performance metrics // Log final performance metrics
@ -459,9 +407,6 @@ export const analystAgentTask: ReturnType<
executionTimeMs: totalExecutionTime, executionTimeMs: totalExecutionTime,
breakdown: { breakdown: {
dataLoadTimeMs: dataLoadTime, dataLoadTimeMs: dataLoadTime,
contextSetupTimeMs: contextSetupTime,
createRunTimeMs: createRunTime,
workflowStartMethodTimeMs: workflowStartMethodTime,
totalWorkflowTimeMs: totalWorkflowTime, totalWorkflowTimeMs: totalWorkflowTime,
}, },
}); });
@ -558,9 +503,6 @@ function getErrorCode(error: unknown): string {
if (error.message.includes('Database query failed')) return 'DATABASE_ERROR'; if (error.message.includes('Database query failed')) return 'DATABASE_ERROR';
if (error.message.includes('Output validation failed')) return 'DATA_VALIDATION_ERROR'; if (error.message.includes('Output validation failed')) return 'DATA_VALIDATION_ERROR';
// Task 3: Runtime context errors
if (error.message.includes('Failed to setup runtime context')) return 'RUNTIME_CONTEXT_ERROR';
// Task 5: Workflow execution errors // Task 5: Workflow execution errors
if (error.message.includes('workflow') || error.message.includes('Workflow')) if (error.message.includes('workflow') || error.message.includes('Workflow'))
return 'WORKFLOW_EXECUTION_ERROR'; return 'WORKFLOW_EXECUTION_ERROR';

View File

@ -1,6 +1,6 @@
import postProcessingWorkflow, { import postProcessingWorkflow, {
type PostProcessingWorkflowOutput, type PostProcessingWorkflowOutput,
} from '@buster/ai/workflows/post-processing-workflow'; } from '@buster/ai/workflows/message-post-processing-workflow/message-post-processing-workflow';
import { eq, getBraintrustMetadata, getDb, messages, slackIntegrations } from '@buster/database'; import { eq, getBraintrustMetadata, getDb, messages, slackIntegrations } from '@buster/database';
import type { import type {
AssumptionClassification, AssumptionClassification,
@ -37,14 +37,16 @@ function extractDbFields(
// Check if there are any major assumptions // Check if there are any major assumptions
const hasMajorAssumptions = const hasMajorAssumptions =
workflowOutput.assumptions?.some((assumption) => assumption.label === 'major') ?? false; workflowOutput.assumptionsResult?.assumptions?.some(
(assumption) => assumption.label === 'major'
) ?? false;
// Determine confidence score based on rules: // Determine confidence score based on rules:
// - Low if toolCalled is 'flagChat' // - Low if toolCalled is 'flagChat'
// - Low if there are any major assumptions // - Low if there are any major assumptions
// - High otherwise // - High otherwise
let confidence_score: ConfidenceScore = 'high'; let confidence_score: ConfidenceScore = 'high';
if (workflowOutput.toolCalled === 'flagChat' || hasMajorAssumptions) { if (workflowOutput.assumptionsResult?.toolCalled === 'flagChat' || hasMajorAssumptions) {
confidence_score = 'low'; confidence_score = 'low';
} }
@ -52,27 +54,30 @@ function extractDbFields(
let summaryMessage: string; let summaryMessage: string;
let summaryTitle: string; let summaryTitle: string;
if (!hasMajorAssumptions && workflowOutput.flagChatMessage) { if (!hasMajorAssumptions && workflowOutput.flagChatResult?.message) {
// If no major assumptions, use flagChatMessage as summaryMessage // If no major assumptions, use flagChatMessage as summaryMessage
summaryMessage = workflowOutput.flagChatMessage; summaryMessage = workflowOutput.flagChatResult.message;
summaryTitle = 'No Major Assumptions Identified'; summaryTitle = 'No Major Assumptions Identified';
} else { } else {
// Otherwise use the provided summary fields or defaults // Otherwise use the provided summary fields or defaults
summaryMessage = workflowOutput.summaryMessage || 'No summary available'; summaryMessage =
summaryTitle = workflowOutput.summaryTitle || 'Summary'; workflowOutput.flagChatResult?.summaryMessage ||
workflowOutput.formattedMessage ||
'No summary available';
summaryTitle = workflowOutput.flagChatResult?.summaryTitle || 'Summary';
} }
const extracted: PostProcessingMessage = { const extracted: PostProcessingMessage = {
summary_message: summaryMessage, summary_message: summaryMessage,
summary_title: summaryTitle, summary_title: summaryTitle,
confidence_score, confidence_score,
assumptions: workflowOutput.assumptions?.map((assumption) => ({ assumptions: workflowOutput.assumptionsResult?.assumptions?.map((assumption) => ({
descriptive_title: assumption.descriptiveTitle, descriptive_title: assumption.descriptiveTitle,
classification: assumption.classification as AssumptionClassification, classification: assumption.classification as AssumptionClassification,
explanation: assumption.explanation, explanation: assumption.explanation,
label: assumption.label as AssumptionLabel, label: assumption.label as AssumptionLabel,
})), })),
tool_called: workflowOutput.toolCalled || 'unknown', // Provide default if missing tool_called: workflowOutput.assumptionsResult?.toolCalled || 'unknown', // Provide default if missing
user_name: userName, user_name: userName,
}; };
@ -171,63 +176,24 @@ export const messagePostProcessingTask: ReturnType<
}, },
}); });
const run = postProcessingWorkflow.createRun(); return await postProcessingWorkflow(workflowInput);
return await run.start({
inputData: workflowInput,
});
}, },
{ {
name: 'Message Post-Processing Workflow', name: 'Message Post-Processing Workflow',
} }
); );
const workflowResult = await tracedWorkflow(); const validatedOutput = await tracedWorkflow();
if (!workflowResult || workflowResult.status !== 'success' || !workflowResult.result) { if (!validatedOutput) {
throw new Error('Post-processing workflow returned no output'); throw new Error('Post-processing workflow returned no output');
} }
// Handle branch results - the result will have one of the branch step IDs as a key
let validatedOutput: PostProcessingWorkflowOutput;
// Define the expected shape of branch results
type BranchResult = {
'format-follow-up-message'?: PostProcessingWorkflowOutput;
'format-initial-message'?: PostProcessingWorkflowOutput;
};
const branchResult = workflowResult.result as BranchResult;
if ('format-follow-up-message' in branchResult && branchResult['format-follow-up-message']) {
validatedOutput = branchResult['format-follow-up-message'] as PostProcessingWorkflowOutput;
} else if (
'format-initial-message' in branchResult &&
branchResult['format-initial-message']
) {
validatedOutput = branchResult['format-initial-message'] as PostProcessingWorkflowOutput;
} else {
logger.error('Unexpected workflow result structure', {
messageId: payload.messageId,
resultKeys: Object.keys(branchResult),
result: branchResult,
});
console.error('Unexpected workflow result structure:', {
messageId: payload.messageId,
resultKeys: Object.keys(branchResult),
result: branchResult,
});
throw new Error('Post-processing workflow returned unexpected result structure');
}
logger.log('Validated output', { logger.log('Validated output', {
messageId: payload.messageId, messageId: payload.messageId,
summaryTitle: validatedOutput.summaryTitle, flagChatResult: validatedOutput.flagChatResult,
summaryMessage: validatedOutput.summaryMessage, assumptionsResult: validatedOutput.assumptionsResult,
flagChatMessage: validatedOutput.flagChatMessage, formattedMessage: validatedOutput.formattedMessage,
flagChatTitle: validatedOutput.flagChatTitle,
toolCalled: validatedOutput.toolCalled,
assumptions: validatedOutput.assumptions,
message: validatedOutput.message,
}); });
// Step 6: Store result in database // Step 6: Store result in database

View File

@ -55,7 +55,8 @@ describe('format-follow-up-message-step integration', () => {
majorAssumptions: [ majorAssumptions: [
{ {
descriptiveTitle: 'Date range assumption for monthly trends', descriptiveTitle: 'Date range assumption for monthly trends',
explanation: 'User requested monthly trends but did not specify date range, so assuming all available historical data', explanation:
'User requested monthly trends but did not specify date range, so assuming all available historical data',
label: 'major', label: 'major',
}, },
], ],