Merge branch 'staging' into big-nate/bus-1273-add-assumptions-and-confidence-score-to-web-app

This commit is contained in:
Nate Kelley 2025-07-08 12:57:07 -06:00
commit 86aa9234b8
No known key found for this signature in database
GPG Key ID: FD90372AB8D98B4F
4 changed files with 81 additions and 65 deletions

View File

@ -1,5 +1,5 @@
import { logger, schemaTask, tasks } from '@trigger.dev/sdk';
import { initLogger, wrapTraced } from 'braintrust';
import { currentSpan, initLogger, wrapTraced } from 'braintrust';
import { AnalystAgentTaskInputSchema, type AnalystAgentTaskOutput } from './types';
// Task 2 & 4: Database helpers (IMPLEMENTED)
@ -290,26 +290,10 @@ export const analystAgentTask: ReturnType<
throw new Error('BRAINTRUST_KEY is not set');
}
// Start Braintrust initialization immediately but don't block the critical path
const braintrustInitStart = Date.now();
const braintrustInitPromise = Promise.resolve().then(async () => {
try {
initLogger({
apiKey: process.env.BRAINTRUST_KEY,
projectName: process.env.ENVIRONMENT || 'development',
});
logger.log('Braintrust initialization completed', {
messageId: payload.message_id,
braintrustInitTimeMs: Date.now() - braintrustInitStart,
});
} catch (error) {
logger.error('Braintrust initialization failed', {
messageId: payload.message_id,
error: error instanceof Error ? error.message : 'Unknown error',
braintrustInitTimeMs: Date.now() - braintrustInitStart,
});
// Don't throw - allow workflow to continue without Braintrust
}
// Initialize Braintrust logger
const braintrustLogger = initLogger({
apiKey: process.env.BRAINTRUST_KEY,
projectName: process.env.ENVIRONMENT || 'development',
});
try {
@ -424,23 +408,21 @@ export const analystAgentTask: ReturnType<
// Log performance after workflow run creation
logPerformanceMetrics('post-createrun', payload.message_id, taskStartTime, resourceTracker);
// Wait for Braintrust initialization if it's not ready yet
const braintrustWaitStart = Date.now();
await braintrustInitPromise;
const braintrustWaitTime = Date.now() - braintrustWaitStart;
if (braintrustWaitTime > 10) {
// Only log if we actually had to wait
logger.log('Waited for Braintrust initialization', {
messageId: payload.message_id,
braintrustWaitTimeMs: braintrustWaitTime,
});
}
// Execute workflow with tracing
const workflowStartMethodStart = Date.now();
const tracedWorkflow = wrapTraced(
async () => {
currentSpan().log({
metadata: {
userName: braintrustMetadata.userName || 'Unknown',
userId: braintrustMetadata.userId,
organizationName: braintrustMetadata.organizationName || 'Unknown',
organizationId: braintrustMetadata.organizationId,
messageId: braintrustMetadata.messageId,
chatId: braintrustMetadata.chatId,
},
});
return await run.start({
inputData: workflowInput,
runtimeContext,
@ -461,7 +443,6 @@ export const analystAgentTask: ReturnType<
workflowStartMethodTimeMs: workflowStartMethodTime,
totalWorkflowTimeMs: totalWorkflowTime,
createRunTimeMs: createRunTime,
braintrustWaitTimeMs: braintrustWaitTime,
});
// Log final performance metrics
@ -480,7 +461,6 @@ export const analystAgentTask: ReturnType<
dataLoadTimeMs: dataLoadTime,
contextSetupTimeMs: contextSetupTime,
createRunTimeMs: createRunTime,
braintrustWaitTimeMs: braintrustWaitTime,
workflowStartMethodTimeMs: workflowStartMethodTime,
totalWorkflowTimeMs: totalWorkflowTime,
},
@ -514,6 +494,8 @@ export const analystAgentTask: ReturnType<
logPerformanceMetrics('task-complete', payload.message_id, taskStartTime, resourceTracker);
resourceTracker.generateReport(payload.message_id);
await braintrustLogger.flush();
return {
success: true,
messageId: payload.message_id,
@ -538,6 +520,9 @@ export const analystAgentTask: ReturnType<
executionTimeMs: totalExecutionTime,
});
// Need to flush the Braintrust logger to ensure all traces are sent
await braintrustLogger.flush();
return {
success: false,
messageId: payload.message_id,

View File

@ -1,16 +1,15 @@
import postProcessingWorkflow, {
type PostProcessingWorkflowOutput,
} from '@buster/ai/workflows/post-processing-workflow';
import { eq, getDb, messages } from '@buster/database';
import { eq, getBraintrustMetadata, getDb, messages } from '@buster/database';
import type {
Assumption,
AssumptionClassification,
AssumptionLabel,
ConfidenceScore,
PostProcessingMessage,
} from '@buster/server-shared/message';
import { logger, schemaTask } from '@trigger.dev/sdk/v3';
import { initLogger, wrapTraced } from 'braintrust';
import { currentSpan, initLogger, wrapTraced } from 'braintrust';
import { z } from 'zod/v4';
import {
buildWorkflowInput,
@ -98,8 +97,8 @@ export const messagePostProcessingTask: ReturnType<
throw new Error('BRAINTRUST_KEY is not set');
}
// Initialize Braintrust logging for observability
initLogger({
// Initialize Braintrust logger
const braintrustLogger = initLogger({
apiKey: process.env.BRAINTRUST_KEY,
projectName: process.env.ENVIRONMENT || 'development',
});
@ -118,17 +117,20 @@ export const messagePostProcessingTask: ReturnType<
});
// Step 2: Fetch all required data concurrently
const [conversationMessages, previousPostProcessingResults, datasets] = await Promise.all([
fetchConversationHistory(messageContext.chatId),
fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt),
fetchUserDatasets(messageContext.createdBy),
]);
const [conversationMessages, previousPostProcessingResults, datasets, braintrustMetadata] =
await Promise.all([
fetchConversationHistory(messageContext.chatId),
fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt),
fetchUserDatasets(messageContext.createdBy),
getBraintrustMetadata({ messageId: payload.messageId }),
]);
logger.log('Fetched required data', {
messageId: payload.messageId,
conversationMessagesCount: conversationMessages.length,
previousPostProcessingCount: previousPostProcessingResults.length,
datasetsCount: datasets.length,
braintrustMetadata, // Log the metadata to verify it's working
});
// Step 3: Build workflow input
@ -154,6 +156,17 @@ export const messagePostProcessingTask: ReturnType<
const tracedWorkflow = wrapTraced(
async () => {
currentSpan().log({
metadata: {
userName: braintrustMetadata.userName || 'Unknown',
userId: braintrustMetadata.userId,
organizationName: braintrustMetadata.organizationName || 'Unknown',
organizationId: braintrustMetadata.organizationId,
messageId: braintrustMetadata.messageId,
chatId: braintrustMetadata.chatId,
},
});
const run = postProcessingWorkflow.createRun();
return await run.start({
inputData: workflowInput,
@ -251,6 +264,12 @@ export const messagePostProcessingTask: ReturnType<
// Step 6: Send Slack notification if conditions are met
let slackNotificationSent = false;
// Skip Slack notification if tool_called is "noIssuesFound" and there are no major assumptions
const hasMajorAssumptions =
dbData.assumptions?.some((assumption) => assumption.label === 'major') ?? false;
const shouldSkipSlackNotification =
dbData.tool_called === 'noIssuesFound' && !hasMajorAssumptions;
try {
logger.log('Checking Slack notification conditions', {
messageId: payload.messageId,
@ -258,29 +277,38 @@ export const messagePostProcessingTask: ReturnType<
summaryTitle: dbData.summary_title,
summaryMessage: dbData.summary_message,
toolCalled: dbData.tool_called,
hasMajorAssumptions,
shouldSkipSlackNotification,
});
const slackResult = await sendSlackNotification({
organizationId: messageContext.organizationId,
userName: messageContext.userName,
chatId: messageContext.chatId,
summaryTitle: dbData.summary_title,
summaryMessage: dbData.summary_message,
toolCalled: dbData.tool_called,
});
if (slackResult.sent) {
slackNotificationSent = true;
logger.log('Slack notification sent successfully', {
if (shouldSkipSlackNotification) {
logger.log('Skipping Slack notification: noIssuesFound with no major assumptions', {
messageId: payload.messageId,
organizationId: messageContext.organizationId,
});
} else {
logger.log('Slack notification not sent', {
messageId: payload.messageId,
const slackResult = await sendSlackNotification({
organizationId: messageContext.organizationId,
reason: slackResult.error,
userName: messageContext.userName,
chatId: messageContext.chatId,
summaryTitle: dbData.summary_title,
summaryMessage: dbData.summary_message,
toolCalled: dbData.tool_called,
});
if (slackResult.sent) {
slackNotificationSent = true;
logger.log('Slack notification sent successfully', {
messageId: payload.messageId,
organizationId: messageContext.organizationId,
});
} else {
logger.log('Slack notification not sent', {
messageId: payload.messageId,
organizationId: messageContext.organizationId,
reason: slackResult.error,
});
}
}
} catch (slackError) {
const errorMessage =
@ -305,8 +333,8 @@ export const messagePostProcessingTask: ReturnType<
slackNotificationSent,
});
// Wait 500ms to allow Braintrust to clean up its trace before completing
await new Promise((resolve) => setTimeout(resolve, 500));
// Need to flush the Braintrust logger to ensure all traces are sent
await braintrustLogger.flush();
return {
success: true,
@ -334,6 +362,9 @@ export const messagePostProcessingTask: ReturnType<
executionTimeMs: Date.now() - startTime,
});
// Need to flush the Braintrust logger to ensure all traces are sent
await braintrustLogger.flush();
return {
success: false,
messageId: payload.messageId,

View File

@ -153,7 +153,7 @@ function createDataMetadata(results: Record<string, unknown>[]): DataMetadata {
const uniqueValues = new Set(values).size;
columnMetadata.push({
name: columnName,
name: columnName.toLowerCase(),
min_value: minValue,
max_value: maxValue,
unique_values: uniqueValues,

View File

@ -145,7 +145,7 @@ function createDataMetadata(results: Record<string, unknown>[]): DataMetadata {
const uniqueValues = new Set(values).size;
columnMetadata.push({
name: columnName,
name: columnName.toLowerCase(),
min_value: minValue,
max_value: maxValue,
unique_values: uniqueValues,