From 8ff67d62f2ae4f952958e9d692716ab08ef48f20 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 24 Sep 2025 09:53:55 -0600 Subject: [PATCH 1/2] better error handling and logging on streams --- packages/ai/src/utils/index.ts | 1 - .../ai/src/utils/with-agent-retry.test.ts | 27 ++----- packages/ai/src/utils/with-agent-retry.ts | 79 +++++++++---------- packages/ai/src/utils/with-step-retry.ts | 1 + 4 files changed, 47 insertions(+), 61 deletions(-) diff --git a/packages/ai/src/utils/index.ts b/packages/ai/src/utils/index.ts index 17f4c2b85..4e1fb8adf 100644 --- a/packages/ai/src/utils/index.ts +++ b/packages/ai/src/utils/index.ts @@ -15,7 +15,6 @@ export { recoverMessages, executeStreamAttempt, handleFailedAttempt, - analyzeError, createRetryExecutor, composeMiddleware, retryMiddleware, diff --git a/packages/ai/src/utils/with-agent-retry.test.ts b/packages/ai/src/utils/with-agent-retry.test.ts index 25da26bf2..ed2623b9c 100644 --- a/packages/ai/src/utils/with-agent-retry.test.ts +++ b/packages/ai/src/utils/with-agent-retry.test.ts @@ -2,7 +2,6 @@ import type { ModelMessage } from 'ai'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import { type StreamExecutor, - analyzeError, calculateBackoffDelay, composeMiddleware, createMockAgent, @@ -84,22 +83,6 @@ describe('with-agent-retry', () => { }); }); - describe('analyzeError', () => { - it('should correctly identify retryable errors', () => { - const overloadedError = createOverloadedError(); - const result = analyzeError(overloadedError); - expect(result.isRetryable).toBe(true); - expect(result.error).toEqual(overloadedError); - }); - - it('should treat all errors as retryable', () => { - const regularError = new Error('Regular error'); - const result = analyzeError(regularError); - expect(result.isRetryable).toBe(true); - expect(result.error).toEqual(regularError); - }); - }); - describe('sleep', () => { it('should resolve after specified duration', async () => { const startTime = Date.now(); @@ -238,19 +221,23 @@ describe('with-agent-retry', () => { expect(mockFetchMessageEntries).not.toHaveBeenCalled(); }); - it('should not retry when recovery fails', async () => { + it('should continue with original messages when recovery fails', async () => { mockFetchMessageEntries.mockRejectedValue(new Error('DB error')); + const originalMessages: ModelMessage[] = [{ role: 'user', content: 'original' }]; const result = await handleFailedAttempt( createOverloadedError(), 1, 3, 'test-id', - [], + originalMessages, 1000 ); - expect(result.shouldRetry).toBe(false); + // We now continue with original messages when recovery fails + expect(result.shouldRetry).toBe(true); + expect(result.nextMessages).toEqual(originalMessages); + expect(result.delayMs).toBe(1000); }); }); }); diff --git a/packages/ai/src/utils/with-agent-retry.ts b/packages/ai/src/utils/with-agent-retry.ts index e27b88946..1c9f0b99a 100644 --- a/packages/ai/src/utils/with-agent-retry.ts +++ b/packages/ai/src/utils/with-agent-retry.ts @@ -38,14 +38,6 @@ interface RetryOptions { onRetry?: (attempt: number, recoveredMessageCount: number) => void; } -/** - * Result of checking if an error is retryable - */ -interface RetryableCheck { - isRetryable: boolean; - error: unknown; -} - // ===== Pure Functions ===== /** @@ -96,16 +88,6 @@ export const calculateBackoffDelay = (attempt: number, baseDelayMs: number): num export const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); -/** - * Check if an error is retryable and return structured result - * Now treats ALL errors as retryable to handle various provider errors - * Pure function for error analysis - */ -export const analyzeError = (error: unknown): RetryableCheck => ({ - isRetryable: true, // All errors are now retryable - error, -}); - /** * Recover messages from database * Returns either recovered messages or original messages @@ -137,6 +119,8 @@ export const recoverMessages = async ( console.error('[Agent Retry] Failed to recover from database', { messageId, error: error instanceof Error ? error.message : 'Unknown error', + stack: error instanceof Error ? error.stack : undefined, + errorType: error instanceof Error ? error.name : typeof error, }); throw error; } @@ -177,12 +161,12 @@ export const handleFailedAttempt = async ( messageId, error: error instanceof Error ? error.message : 'Unknown error', errorType: error instanceof Error ? error.name : typeof error, + stack: error instanceof Error ? error.stack : undefined, }); - const { isRetryable } = analyzeError(error); - - if (!isRetryable || attempt === maxAttempts) { - console.error('[Agent Retry] Non-retryable error or max attempts reached', { + // Check if we've reached max attempts + if (attempt === maxAttempts) { + console.error('[Agent Retry] Max attempts reached', { messageId, attempt, maxAttempts, @@ -190,7 +174,7 @@ export const handleFailedAttempt = async ( return { shouldRetry: false, nextMessages: currentMessages, delayMs: 0 }; } - console.warn('[Agent Retry] Error detected, preparing retry', { + console.warn('[Agent Retry] Preparing retry', { messageId, attempt, remainingAttempts: maxAttempts - attempt, @@ -215,9 +199,30 @@ export const handleFailedAttempt = async ( nextMessages: recoveredMessages, delayMs, }; - } catch (_recoveryError) { - // If recovery fails, don't retry - return { shouldRetry: false, nextMessages: currentMessages, delayMs: 0 }; + } catch (recoveryError) { + // Log the recovery failure with full context + console.error('[Agent Retry] Failed to recover messages from database', { + messageId, + attempt, + recoveryError: recoveryError instanceof Error ? recoveryError.message : 'Unknown error', + recoveryErrorType: recoveryError instanceof Error ? recoveryError.name : typeof recoveryError, + recoveryStack: recoveryError instanceof Error ? recoveryError.stack : undefined, + originalError: error instanceof Error ? error.message : 'Unknown error', + }); + + // Continue with original messages if recovery fails + console.warn('[Agent Retry] Continuing with original messages after recovery failure', { + messageId, + messageCount: currentMessages.length, + }); + + const delayMs = calculateBackoffDelay(attempt, baseDelayMs); + + return { + shouldRetry: true, + nextMessages: currentMessages, + delayMs, + }; } }; @@ -271,19 +276,13 @@ export function withAgentRetry< TStreamResult = unknown, TAgent extends Agent = Agent, >(agent: TAgent, options: RetryOptions): TAgent { - // Create a new object with the same prototype - const wrappedAgent = Object.create(Object.getPrototypeOf(agent)) as TAgent; - - // Copy all properties except stream - for (const key in agent) { - if (key !== 'stream' && Object.prototype.hasOwnProperty.call(agent, key)) { - wrappedAgent[key] = agent[key]; - } - } - - // Wrap the stream method with retry logic - wrappedAgent.stream = (streamOptions: StreamOptions) => - retryStream(agent, streamOptions.messages, options); + // Create a new agent with all properties spread from the original + // This ensures type safety and copies all properties correctly + const wrappedAgent = { + ...agent, + // Override the stream method with retry logic + stream: (streamOptions: StreamOptions) => retryStream(agent, streamOptions.messages, options), + }; return wrappedAgent; } @@ -305,7 +304,7 @@ export const createRetryExecutor = ( ): StreamExecutor => { return async (messages: ModelMessage[]) => { const agent: Agent = { - stream: async ({ messages }) => executor(messages), + stream: async (streamOptions: StreamOptions) => executor(streamOptions.messages), }; return retryStream(agent, messages, options); }; diff --git a/packages/ai/src/utils/with-step-retry.ts b/packages/ai/src/utils/with-step-retry.ts index bfc27c552..21dd4abbf 100644 --- a/packages/ai/src/utils/with-step-retry.ts +++ b/packages/ai/src/utils/with-step-retry.ts @@ -46,6 +46,7 @@ export async function withStepRetry( console.error(`[${stepName}] Error on attempt ${attempt}:`, { error: error instanceof Error ? error.message : 'Unknown error', errorType: error instanceof Error ? error.name : typeof error, + stack: error instanceof Error ? error.stack : undefined, }); // If this was the last attempt, throw the error From 2ac1ddb99fb49998579bd4cd12d267ee77d8c2cd Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 24 Sep 2025 13:15:55 -0600 Subject: [PATCH 2/2] Trigger Error Handling and clean up on message --- .../analyst-agent-task/analyst-agent-task.ts | 112 ++++++++++++++---- 1 file changed, 87 insertions(+), 25 deletions(-) diff --git a/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts b/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts index f74bdc6aa..54ef0f077 100644 --- a/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts +++ b/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts @@ -1,3 +1,4 @@ +import { randomUUID } from 'node:crypto'; import { logger, schemaTask, tasks } from '@trigger.dev/sdk'; import { currentSpan, initLogger, wrapTraced } from 'braintrust'; import { analystQueue } from '../../queues/analyst-queue'; @@ -12,6 +13,8 @@ import { getOrganizationDataSource, getOrganizationDocs, getUserPersonalization, + updateMessage, + updateMessageEntries, } from '@buster/database/queries'; // Access control imports @@ -134,11 +137,11 @@ class ResourceTracker { }, cpuUsage: finalCpuUsage ? { - userTimeMs: Math.round(finalCpuUsage.user / 1000), - systemTimeMs: Math.round(finalCpuUsage.system / 1000), - totalTimeMs: Math.round(totalCpuTime / 1000), - estimatedUsagePercent: Math.round(cpuPercentage * 100) / 100, - } + userTimeMs: Math.round(finalCpuUsage.user / 1000), + systemTimeMs: Math.round(finalCpuUsage.system / 1000), + totalTimeMs: Math.round(totalCpuTime / 1000), + estimatedUsagePercent: Math.round(cpuPercentage * 100) / 100, + } : { error: 'CPU usage not available' }, stageBreakdown: this.snapshots.map((snapshot, index) => { const prevSnapshot = index > 0 ? this.snapshots[index - 1] : null; @@ -249,6 +252,51 @@ export const analystAgentTask: ReturnType< schema: AnalystAgentTaskInputSchema, queue: analystQueue, maxDuration: 1200, // 20 minutes for complex analysis + retry: { + maxAttempts: 0 + }, + onFailure: async ({ error, payload }) => { + // Log the failure for debugging + logger.error('Analyst agent task failed - executing onFailure handler', { + messageId: payload.message_id, + error: error instanceof Error ? error.message : 'Unknown error', + errorType: error instanceof Error ? error.name : typeof error, + }); + + try { + // Add error message to user and mark as complete + const errorResponseId = randomUUID(); + + // Add the error response message + await updateMessageEntries({ + messageId: payload.message_id, + responseMessages: [ + { + id: errorResponseId, + type: 'text', + message: "I'm sorry, I ran into a technical error. Please try your request again.", + }, + ], + }); + + // Mark the message as complete with final reasoning + await updateMessage(payload.message_id, { + isCompleted: true, + finalReasoningMessage: 'Finished reasoning.', + }); + + logger.log('Error response added and message marked complete', { + messageId: payload.message_id, + errorResponseId, + }); + } catch (handlerError) { + // Log but don't throw - onFailure should never throw + logger.error('Failed to update message in onFailure handler', { + messageId: payload.message_id, + error: handlerError instanceof Error ? handlerError.message : 'Unknown error', + }); + } + }, run: async (payload): Promise => { const taskStartTime = Date.now(); const resourceTracker = new ResourceTracker(); @@ -376,12 +424,12 @@ export const analystAgentTask: ReturnType< conversationHistory.length > 0 ? conversationHistory : [ - { - role: 'user', - // v5 supports string content directly for user messages - content: messageContext.requestMessage, - }, - ]; + { + role: 'user', + // v5 supports string content directly for user messages + content: messageContext.requestMessage, + }, + ]; const workflowInput: AnalystWorkflowInput = { messages: modelMessages, @@ -435,7 +483,28 @@ export const analystAgentTask: ReturnType< } ); - await tracedWorkflow(); + // Wrap workflow execution to capture and log errors before re-throwing + try { + await tracedWorkflow(); + } catch (workflowError) { + // Log workflow-specific error details + logger.error('Analyst workflow execution failed', { + messageId: payload.message_id, + error: workflowError instanceof Error ? workflowError.message : 'Unknown error', + errorType: workflowError instanceof Error ? workflowError.name : typeof workflowError, + stack: workflowError instanceof Error ? workflowError.stack : undefined, + workflowInput: { + messageCount: workflowInput.messages.length, + datasetsCount: datasets.length, + hasAnalystInstructions: !!analystInstructions, + organizationDocsCount: organizationDocs.length, + }, + }); + + // Re-throw to let Trigger handle the retry + throw workflowError; + } + const totalWorkflowTime = Date.now() - workflowExecutionStart; logger.log('Analyst workflow completed successfully', { @@ -509,27 +578,20 @@ export const analystAgentTask: ReturnType< logPerformanceMetrics('task-error', payload.message_id, taskStartTime, resourceTracker); resourceTracker.generateReport(payload.message_id); - logger.error('Task execution failed', { + logger.error('Task execution failed - will retry', { messageId: payload.message_id, error: errorMessage, + errorType: error instanceof Error ? error.name : typeof error, + stack: error instanceof Error ? error.stack : undefined, executionTimeMs: totalExecutionTime, + errorCode: getErrorCode(error), }); // Need to flush the Braintrust logger to ensure all traces are sent await braintrustLogger.flush(); - return { - success: false, - messageId: payload.message_id, - error: { - code: getErrorCode(error), - message: errorMessage, - details: { - operation: 'analyst_agent_task_execution', - messageId: payload.message_id, - }, - }, - }; + // Re-throw the error so Trigger knows the task failed and should retry + throw error; } }, });