Trigger Error Handling and clean up on message

This commit is contained in:
dal 2025-09-24 13:15:55 -06:00
parent 8ff67d62f2
commit 2ac1ddb99f
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 87 additions and 25 deletions

View File

@ -1,3 +1,4 @@
import { randomUUID } from 'node:crypto';
import { logger, schemaTask, tasks } from '@trigger.dev/sdk';
import { currentSpan, initLogger, wrapTraced } from 'braintrust';
import { analystQueue } from '../../queues/analyst-queue';
@ -12,6 +13,8 @@ import {
getOrganizationDataSource,
getOrganizationDocs,
getUserPersonalization,
updateMessage,
updateMessageEntries,
} from '@buster/database/queries';
// Access control imports
@ -134,11 +137,11 @@ class ResourceTracker {
},
cpuUsage: finalCpuUsage
? {
userTimeMs: Math.round(finalCpuUsage.user / 1000),
systemTimeMs: Math.round(finalCpuUsage.system / 1000),
totalTimeMs: Math.round(totalCpuTime / 1000),
estimatedUsagePercent: Math.round(cpuPercentage * 100) / 100,
}
userTimeMs: Math.round(finalCpuUsage.user / 1000),
systemTimeMs: Math.round(finalCpuUsage.system / 1000),
totalTimeMs: Math.round(totalCpuTime / 1000),
estimatedUsagePercent: Math.round(cpuPercentage * 100) / 100,
}
: { error: 'CPU usage not available' },
stageBreakdown: this.snapshots.map((snapshot, index) => {
const prevSnapshot = index > 0 ? this.snapshots[index - 1] : null;
@ -249,6 +252,51 @@ export const analystAgentTask: ReturnType<
schema: AnalystAgentTaskInputSchema,
queue: analystQueue,
maxDuration: 1200, // 20 minutes for complex analysis
retry: {
maxAttempts: 0
},
onFailure: async ({ error, payload }) => {
// Log the failure for debugging
logger.error('Analyst agent task failed - executing onFailure handler', {
messageId: payload.message_id,
error: error instanceof Error ? error.message : 'Unknown error',
errorType: error instanceof Error ? error.name : typeof error,
});
try {
// Add error message to user and mark as complete
const errorResponseId = randomUUID();
// Add the error response message
await updateMessageEntries({
messageId: payload.message_id,
responseMessages: [
{
id: errorResponseId,
type: 'text',
message: "I'm sorry, I ran into a technical error. Please try your request again.",
},
],
});
// Mark the message as complete with final reasoning
await updateMessage(payload.message_id, {
isCompleted: true,
finalReasoningMessage: 'Finished reasoning.',
});
logger.log('Error response added and message marked complete', {
messageId: payload.message_id,
errorResponseId,
});
} catch (handlerError) {
// Log but don't throw - onFailure should never throw
logger.error('Failed to update message in onFailure handler', {
messageId: payload.message_id,
error: handlerError instanceof Error ? handlerError.message : 'Unknown error',
});
}
},
run: async (payload): Promise<AnalystAgentTaskOutput> => {
const taskStartTime = Date.now();
const resourceTracker = new ResourceTracker();
@ -376,12 +424,12 @@ export const analystAgentTask: ReturnType<
conversationHistory.length > 0
? conversationHistory
: [
{
role: 'user',
// v5 supports string content directly for user messages
content: messageContext.requestMessage,
},
];
{
role: 'user',
// v5 supports string content directly for user messages
content: messageContext.requestMessage,
},
];
const workflowInput: AnalystWorkflowInput = {
messages: modelMessages,
@ -435,7 +483,28 @@ export const analystAgentTask: ReturnType<
}
);
await tracedWorkflow();
// Wrap workflow execution to capture and log errors before re-throwing
try {
await tracedWorkflow();
} catch (workflowError) {
// Log workflow-specific error details
logger.error('Analyst workflow execution failed', {
messageId: payload.message_id,
error: workflowError instanceof Error ? workflowError.message : 'Unknown error',
errorType: workflowError instanceof Error ? workflowError.name : typeof workflowError,
stack: workflowError instanceof Error ? workflowError.stack : undefined,
workflowInput: {
messageCount: workflowInput.messages.length,
datasetsCount: datasets.length,
hasAnalystInstructions: !!analystInstructions,
organizationDocsCount: organizationDocs.length,
},
});
// Re-throw to let Trigger handle the retry
throw workflowError;
}
const totalWorkflowTime = Date.now() - workflowExecutionStart;
logger.log('Analyst workflow completed successfully', {
@ -509,27 +578,20 @@ export const analystAgentTask: ReturnType<
logPerformanceMetrics('task-error', payload.message_id, taskStartTime, resourceTracker);
resourceTracker.generateReport(payload.message_id);
logger.error('Task execution failed', {
logger.error('Task execution failed - will retry', {
messageId: payload.message_id,
error: errorMessage,
errorType: error instanceof Error ? error.name : typeof error,
stack: error instanceof Error ? error.stack : undefined,
executionTimeMs: totalExecutionTime,
errorCode: getErrorCode(error),
});
// Need to flush the Braintrust logger to ensure all traces are sent
await braintrustLogger.flush();
return {
success: false,
messageId: payload.message_id,
error: {
code: getErrorCode(error),
message: errorMessage,
details: {
operation: 'analyst_agent_task_execution',
messageId: payload.message_id,
},
},
};
// Re-throw the error so Trigger knows the task failed and should retry
throw error;
}
},
});