diff --git a/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts b/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts index f74bdc6aa..54ef0f077 100644 --- a/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts +++ b/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts @@ -1,3 +1,4 @@ +import { randomUUID } from 'node:crypto'; import { logger, schemaTask, tasks } from '@trigger.dev/sdk'; import { currentSpan, initLogger, wrapTraced } from 'braintrust'; import { analystQueue } from '../../queues/analyst-queue'; @@ -12,6 +13,8 @@ import { getOrganizationDataSource, getOrganizationDocs, getUserPersonalization, + updateMessage, + updateMessageEntries, } from '@buster/database/queries'; // Access control imports @@ -134,11 +137,11 @@ class ResourceTracker { }, cpuUsage: finalCpuUsage ? { - userTimeMs: Math.round(finalCpuUsage.user / 1000), - systemTimeMs: Math.round(finalCpuUsage.system / 1000), - totalTimeMs: Math.round(totalCpuTime / 1000), - estimatedUsagePercent: Math.round(cpuPercentage * 100) / 100, - } + userTimeMs: Math.round(finalCpuUsage.user / 1000), + systemTimeMs: Math.round(finalCpuUsage.system / 1000), + totalTimeMs: Math.round(totalCpuTime / 1000), + estimatedUsagePercent: Math.round(cpuPercentage * 100) / 100, + } : { error: 'CPU usage not available' }, stageBreakdown: this.snapshots.map((snapshot, index) => { const prevSnapshot = index > 0 ? this.snapshots[index - 1] : null; @@ -249,6 +252,51 @@ export const analystAgentTask: ReturnType< schema: AnalystAgentTaskInputSchema, queue: analystQueue, maxDuration: 1200, // 20 minutes for complex analysis + retry: { + maxAttempts: 0 + }, + onFailure: async ({ error, payload }) => { + // Log the failure for debugging + logger.error('Analyst agent task failed - executing onFailure handler', { + messageId: payload.message_id, + error: error instanceof Error ? error.message : 'Unknown error', + errorType: error instanceof Error ? error.name : typeof error, + }); + + try { + // Add error message to user and mark as complete + const errorResponseId = randomUUID(); + + // Add the error response message + await updateMessageEntries({ + messageId: payload.message_id, + responseMessages: [ + { + id: errorResponseId, + type: 'text', + message: "I'm sorry, I ran into a technical error. Please try your request again.", + }, + ], + }); + + // Mark the message as complete with final reasoning + await updateMessage(payload.message_id, { + isCompleted: true, + finalReasoningMessage: 'Finished reasoning.', + }); + + logger.log('Error response added and message marked complete', { + messageId: payload.message_id, + errorResponseId, + }); + } catch (handlerError) { + // Log but don't throw - onFailure should never throw + logger.error('Failed to update message in onFailure handler', { + messageId: payload.message_id, + error: handlerError instanceof Error ? handlerError.message : 'Unknown error', + }); + } + }, run: async (payload): Promise => { const taskStartTime = Date.now(); const resourceTracker = new ResourceTracker(); @@ -376,12 +424,12 @@ export const analystAgentTask: ReturnType< conversationHistory.length > 0 ? conversationHistory : [ - { - role: 'user', - // v5 supports string content directly for user messages - content: messageContext.requestMessage, - }, - ]; + { + role: 'user', + // v5 supports string content directly for user messages + content: messageContext.requestMessage, + }, + ]; const workflowInput: AnalystWorkflowInput = { messages: modelMessages, @@ -435,7 +483,28 @@ export const analystAgentTask: ReturnType< } ); - await tracedWorkflow(); + // Wrap workflow execution to capture and log errors before re-throwing + try { + await tracedWorkflow(); + } catch (workflowError) { + // Log workflow-specific error details + logger.error('Analyst workflow execution failed', { + messageId: payload.message_id, + error: workflowError instanceof Error ? workflowError.message : 'Unknown error', + errorType: workflowError instanceof Error ? workflowError.name : typeof workflowError, + stack: workflowError instanceof Error ? workflowError.stack : undefined, + workflowInput: { + messageCount: workflowInput.messages.length, + datasetsCount: datasets.length, + hasAnalystInstructions: !!analystInstructions, + organizationDocsCount: organizationDocs.length, + }, + }); + + // Re-throw to let Trigger handle the retry + throw workflowError; + } + const totalWorkflowTime = Date.now() - workflowExecutionStart; logger.log('Analyst workflow completed successfully', { @@ -509,27 +578,20 @@ export const analystAgentTask: ReturnType< logPerformanceMetrics('task-error', payload.message_id, taskStartTime, resourceTracker); resourceTracker.generateReport(payload.message_id); - logger.error('Task execution failed', { + logger.error('Task execution failed - will retry', { messageId: payload.message_id, error: errorMessage, + errorType: error instanceof Error ? error.name : typeof error, + stack: error instanceof Error ? error.stack : undefined, executionTimeMs: totalExecutionTime, + errorCode: getErrorCode(error), }); // Need to flush the Braintrust logger to ensure all traces are sent await braintrustLogger.flush(); - return { - success: false, - messageId: payload.message_id, - error: { - code: getErrorCode(error), - message: errorMessage, - details: { - operation: 'analyst_agent_task_execution', - messageId: payload.message_id, - }, - }, - }; + // Re-throw the error so Trigger knows the task failed and should retry + throw error; } }, });