From 897ac4ce837c2f3f508518438d183485e2e922ef Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 9 Jul 2025 07:44:10 -0600 Subject: [PATCH] added in the slack message tracking --- .../helpers/slack-notifier.ts | 83 +++++++++++++++- .../message-post-processing.ts | 15 +++ packages/ai/src/steps/analyst-step.ts | 29 +++--- packages/ai/src/steps/think-and-prep-step.ts | 29 +++--- packages/ai/src/utils/retry/retry-helpers.ts | 31 +++--- .../utils/retry/ai-sdk-error-mocks.test.ts | 24 +++-- .../utils/retry/healing-behavior.test.ts | 98 ++++++++++--------- .../utils/retry/healing-strategies.test.ts | 36 ++++--- .../tests/utils/retry/retry-helpers.test.ts | 54 +++++----- .../retry/streaming-error-scenarios.test.ts | 43 ++++---- 10 files changed, 267 insertions(+), 175 deletions(-) diff --git a/apps/trigger/src/tasks/message-post-processing/helpers/slack-notifier.ts b/apps/trigger/src/tasks/message-post-processing/helpers/slack-notifier.ts index 245a4b0d0..6e748437c 100644 --- a/apps/trigger/src/tasks/message-post-processing/helpers/slack-notifier.ts +++ b/apps/trigger/src/tasks/message-post-processing/helpers/slack-notifier.ts @@ -1,4 +1,13 @@ -import { and, eq, getDb, getSecretByName, isNull, slackIntegrations } from '@buster/database'; +import { + and, + eq, + getDb, + getSecretByName, + isNull, + messagesToSlackMessages, + slackIntegrations, + slackMessageTracking, +} from '@buster/database'; import { logger } from '@trigger.dev/sdk/v3'; export interface SlackNotificationParams { @@ -15,6 +24,9 @@ export interface SlackNotificationParams { export interface SlackNotificationResult { sent: boolean; error?: string; + messageTs?: string; + integrationId?: string; + channelId?: string; } interface SlackBlock { @@ -106,7 +118,12 @@ export async function sendSlackNotification( channelId: integration.defaultChannel.id, messageTs: result.messageTs, }); - return { sent: true }; + return { + sent: true, + ...(result.messageTs && { messageTs: result.messageTs }), + integrationId: integration.id, + channelId: integration.defaultChannel.id, + }; } logger.error('Failed to send Slack notification', { @@ -274,3 +291,65 @@ async function sendSlackMessage( }; } } + +/** + * Track a sent Slack notification in the database + */ +export async function trackSlackNotification(params: { + messageId: string; + integrationId: string; + channelId: string; + messageTs: string; + userName: string | null; + chatId: string; + summaryTitle?: string; + summaryMessage?: string; +}): Promise { + const db = getDb(); + + try { + await db.transaction(async (tx) => { + // Insert into slack_message_tracking + const [slackMessage] = await tx + .insert(slackMessageTracking) + .values({ + integrationId: params.integrationId, + internalMessageId: params.messageId, + slackChannelId: params.channelId, + slackMessageTs: params.messageTs, + slackThreadTs: null, + messageType: 'message', + content: + params.summaryTitle && params.summaryMessage + ? `${params.summaryTitle}\n\n${params.summaryMessage}` + : 'Notification sent', + senderInfo: { + sentBy: 'buster-post-processing', + userName: params.userName, + chatId: params.chatId, + }, + sentAt: new Date().toISOString(), + }) + .returning(); + + // Create association in messages_to_slack_messages + if (slackMessage) { + await tx.insert(messagesToSlackMessages).values({ + messageId: params.messageId, + slackMessageId: slackMessage.id, + }); + } + }); + + logger.log('Successfully tracked Slack notification', { + messageId: params.messageId, + integrationId: params.integrationId, + }); + } catch (error) { + // Log but don't throw - tracking failure shouldn't break the flow + logger.error('Failed to track Slack notification', { + messageId: params.messageId, + error: error instanceof Error ? error.message : 'Unknown error', + }); + } +} diff --git a/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts b/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts index 49aa2674b..fb9e20220 100644 --- a/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts +++ b/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts @@ -18,6 +18,7 @@ import { fetchPreviousPostProcessingMessages, fetchUserDatasets, sendSlackNotification, + trackSlackNotification, } from './helpers'; import { DataFetchError, MessageNotFoundError, TaskInputSchema } from './types'; import type { TaskInput, TaskOutput } from './types'; @@ -302,6 +303,20 @@ export const messagePostProcessingTask: ReturnType< messageId: payload.messageId, organizationId: messageContext.organizationId, }); + + // Track the sent notification + if (slackResult.messageTs && slackResult.integrationId && slackResult.channelId) { + await trackSlackNotification({ + messageId: payload.messageId, + integrationId: slackResult.integrationId, + channelId: slackResult.channelId, + messageTs: slackResult.messageTs, + userName: messageContext.userName, + chatId: messageContext.chatId, + summaryTitle: dbData.summary_title, + summaryMessage: dbData.summary_message, + }); + } } else { logger.log('Slack notification not sent', { messageId: payload.messageId, diff --git a/packages/ai/src/steps/analyst-step.ts b/packages/ai/src/steps/analyst-step.ts index e829ef7e0..4b4cd74de 100644 --- a/packages/ai/src/steps/analyst-step.ts +++ b/packages/ai/src/steps/analyst-step.ts @@ -18,10 +18,10 @@ import { ThinkAndPrepOutputSchema, } from '../utils/memory/types'; import { - isRetryWithHealingError, createRetryOnErrorHandler, createUserFriendlyErrorMessage, handleRetryWithHealing, + isRetryWithHealingError, } from '../utils/retry'; import type { RetryableError, WorkflowContext } from '../utils/retry/types'; import { createOnChunkHandler, handleStreamingError } from '../utils/streaming'; @@ -374,9 +374,9 @@ const analystExecution = async ({ onError: createRetryOnErrorHandler({ retryCount, maxRetries, - workflowContext: { + workflowContext: { currentStep: 'analyst', - availableTools + availableTools, }, }), }); @@ -425,22 +425,17 @@ const analystExecution = async ({ // Get the current messages from chunk processor const currentMessages = chunkProcessor.getAccumulatedMessages(); - + // Handle the retry with healing - const { healedMessages, shouldContinueWithoutHealing, backoffDelay } = - await handleRetryWithHealing( - retryableError, - currentMessages, - retryCount, - { - currentStep: 'analyst', - availableTools - } - ); - + const { healedMessages, shouldContinueWithoutHealing, backoffDelay } = + await handleRetryWithHealing(retryableError, currentMessages, retryCount, { + currentStep: 'analyst', + availableTools, + }); + // Wait before retrying await new Promise((resolve) => setTimeout(resolve, backoffDelay)); - + // If it's a network error, just increment and continue if (shouldContinueWithoutHealing) { retryCount++; @@ -460,7 +455,7 @@ const analystExecution = async ({ console.error('Analyst: Failed to save healing message to database', { error: dbError, retryCount, - willContinueAnyway: true + willContinueAnyway: true, }); // Continue with retry even if save fails } diff --git a/packages/ai/src/steps/think-and-prep-step.ts b/packages/ai/src/steps/think-and-prep-step.ts index fa1703e9f..17fc7d487 100644 --- a/packages/ai/src/steps/think-and-prep-step.ts +++ b/packages/ai/src/steps/think-and-prep-step.ts @@ -8,10 +8,10 @@ import { thinkAndPrepAgent } from '../agents/think-and-prep-agent/think-and-prep import type { thinkAndPrepWorkflowInputSchema } from '../schemas/workflow-schemas'; import { ChunkProcessor } from '../utils/database/chunk-processor'; import { - isRetryWithHealingError, createRetryOnErrorHandler, createUserFriendlyErrorMessage, handleRetryWithHealing, + isRetryWithHealingError, } from '../utils/retry'; import type { RetryableError, WorkflowContext } from '../utils/retry/types'; import { appendToConversation, standardizeMessages } from '../utils/standardizeMessages'; @@ -253,9 +253,9 @@ const thinkAndPrepExecution = async ({ onError: createRetryOnErrorHandler({ retryCount, maxRetries, - workflowContext: { + workflowContext: { currentStep: 'think-and-prep', - availableTools + availableTools, }, }), }); @@ -299,22 +299,17 @@ const thinkAndPrepExecution = async ({ // Get the current messages from chunk processor const currentMessages = chunkProcessor.getAccumulatedMessages(); - + // Handle the retry with healing - const { healedMessages, shouldContinueWithoutHealing, backoffDelay } = - await handleRetryWithHealing( - retryableError, - currentMessages, - retryCount, - { - currentStep: 'think-and-prep', - availableTools - } - ); - + const { healedMessages, shouldContinueWithoutHealing, backoffDelay } = + await handleRetryWithHealing(retryableError, currentMessages, retryCount, { + currentStep: 'think-and-prep', + availableTools, + }); + // Wait before retrying await new Promise((resolve) => setTimeout(resolve, backoffDelay)); - + // If it's a network error, just increment and continue if (shouldContinueWithoutHealing) { retryCount++; @@ -334,7 +329,7 @@ const thinkAndPrepExecution = async ({ console.error('Think and Prep: Failed to save healing message to database', { error: dbError, retryCount, - willContinueAnyway: true + willContinueAnyway: true, }); // Continue with retry even if save fails } diff --git a/packages/ai/src/utils/retry/retry-helpers.ts b/packages/ai/src/utils/retry/retry-helpers.ts index 84e94e7cb..6fb8dcb28 100644 --- a/packages/ai/src/utils/retry/retry-helpers.ts +++ b/packages/ai/src/utils/retry/retry-helpers.ts @@ -1,12 +1,12 @@ import type { CoreMessage } from 'ai'; +import { + applyHealingStrategy, + determineHealingStrategy, + shouldRetryWithoutHealing, +} from './healing-strategies'; import { detectRetryableError } from './retry-agent-stream'; import { RetryWithHealingError } from './retry-error'; import type { RetryableError, WorkflowContext } from './types'; -import { - determineHealingStrategy, - applyHealingStrategy, - shouldRetryWithoutHealing -} from './healing-strategies'; /** * Creates an onError handler for agent streaming with retry logic @@ -327,36 +327,37 @@ export async function handleRetryWithHealing( }> { // Determine the healing strategy const healingStrategy = determineHealingStrategy(retryableError, context); - + // For network/server errors, just retry without healing if (shouldRetryWithoutHealing(retryableError.type)) { - const backoffDelay = calculateBackoffDelay(retryCount, 10000) * (healingStrategy.backoffMultiplier || 1); + const backoffDelay = + calculateBackoffDelay(retryCount, 10000) * (healingStrategy.backoffMultiplier || 1); console.info(`${context.currentStep}: Retrying after network/server error`, { retryCount, errorType: retryableError.type, backoffDelay, }); - + return { healedMessages: currentMessages, shouldContinueWithoutHealing: true, backoffDelay, }; } - + // Apply healing strategy to get updated messages let healedMessages = applyHealingStrategy(currentMessages, healingStrategy); - + // For tool errors, we still need to find the correct insertion point if (retryableError.type === 'no-such-tool' && healingStrategy.healingMessage) { const { insertionIndex, updatedHealingMessage } = findHealingMessageInsertionIndex( retryableError, currentMessages ); - + // Remove the healing message that was added by applyHealingStrategy const messagesWithoutHealing = healedMessages.slice(0, -1); - + // Insert at the correct position healedMessages = [ ...messagesWithoutHealing.slice(0, insertionIndex), @@ -364,10 +365,10 @@ export async function handleRetryWithHealing( ...messagesWithoutHealing.slice(insertionIndex), ]; } - + // Calculate backoff delay const backoffDelay = calculateBackoffDelay(retryCount) * (healingStrategy.backoffMultiplier || 1); - + console.info(`${context.currentStep}: Applying healing strategy`, { retryCount, errorType: retryableError.type, @@ -375,7 +376,7 @@ export async function handleRetryWithHealing( hasHealingMessage: !!healingStrategy.healingMessage, backoffDelay, }); - + return { healedMessages, shouldContinueWithoutHealing: false, diff --git a/packages/ai/tests/utils/retry/ai-sdk-error-mocks.test.ts b/packages/ai/tests/utils/retry/ai-sdk-error-mocks.test.ts index f5658c530..5f3dcd83f 100644 --- a/packages/ai/tests/utils/retry/ai-sdk-error-mocks.test.ts +++ b/packages/ai/tests/utils/retry/ai-sdk-error-mocks.test.ts @@ -19,13 +19,13 @@ import { UnsupportedFunctionalityError, } from 'ai'; import { describe, expect, it, vi } from 'vitest'; +import { detectRetryableError } from '../../../src/utils/retry/retry-agent-stream'; +import { RetryWithHealingError } from '../../../src/utils/retry/retry-error'; import { createRetryOnErrorHandler, createUserFriendlyErrorMessage, extractDetailedErrorMessage, } from '../../../src/utils/retry/retry-helpers'; -import { detectRetryableError } from '../../../src/utils/retry/retry-agent-stream'; -import { RetryWithHealingError } from '../../../src/utils/retry/retry-error'; import type { WorkflowContext } from '../../../src/utils/retry/types'; describe('AI SDK Error Mocks - Comprehensive Error Handling', () => { @@ -150,15 +150,13 @@ describe('AI SDK Error Mocks - Comprehensive Error Handling', () => { toolCallId: 'call_123', args: { query: 123 }, // Wrong type cause: { - errors: [ - { path: ['query'], message: 'Expected string, received number' }, - ], + errors: [{ path: ['query'], message: 'Expected string, received number' }], }, }); // Cast to Error with name property for detection (error as any).name = 'AI_InvalidToolArgumentsError'; - (error as any).toolCallId = 'call_123'; // Ensure toolCallId is accessible + (error as any).toolCallId = 'call_123'; // Ensure toolCallId is accessible const retryableError = detectRetryableError(error); @@ -439,19 +437,25 @@ describe('AI SDK Error Mocks - Comprehensive Error Handling', () => { it('should create user-friendly message for API errors', () => { const error = new Error('API request failed'); const message = createUserFriendlyErrorMessage(error); - expect(message).toBe('The analysis service is temporarily unavailable. Please try again in a few moments.'); + expect(message).toBe( + 'The analysis service is temporarily unavailable. Please try again in a few moments.' + ); }); it('should create user-friendly message for model errors', () => { const error = new Error('model not found'); const message = createUserFriendlyErrorMessage(error); - expect(message).toBe('The analysis service is temporarily unavailable. Please try again in a few moments.'); + expect(message).toBe( + 'The analysis service is temporarily unavailable. Please try again in a few moments.' + ); }); it('should create generic message for unknown errors', () => { const error = new Error('Something went wrong'); const message = createUserFriendlyErrorMessage(error); - expect(message).toBe('Something went wrong during the analysis. Please try again or contact support if the issue persists.'); + expect(message).toBe( + 'Something went wrong during the analysis. Please try again or contact support if the issue persists.' + ); }); }); @@ -488,4 +492,4 @@ describe('AI SDK Error Mocks - Comprehensive Error Handling', () => { consoleInfoSpy.mockRestore(); }); }); -}); \ No newline at end of file +}); diff --git a/packages/ai/tests/utils/retry/healing-behavior.test.ts b/packages/ai/tests/utils/retry/healing-behavior.test.ts index 3bbd8b2eb..d324c2c7e 100644 --- a/packages/ai/tests/utils/retry/healing-behavior.test.ts +++ b/packages/ai/tests/utils/retry/healing-behavior.test.ts @@ -1,20 +1,20 @@ import type { CoreMessage } from 'ai'; -import { - APICallError, - EmptyResponseBodyError, - InvalidToolArgumentsError, - JSONParseError, - NoSuchToolError +import { + APICallError, + EmptyResponseBodyError, + InvalidToolArgumentsError, + JSONParseError, + NoSuchToolError, } from 'ai'; import { describe, expect, it, vi } from 'vitest'; -import { createRetryOnErrorHandler } from '../../../src/utils/retry/retry-helpers'; -import { detectRetryableError } from '../../../src/utils/retry/retry-agent-stream'; -import { RetryWithHealingError } from '../../../src/utils/retry/retry-error'; import { applyHealingStrategy, determineHealingStrategy, shouldRetryWithoutHealing, } from '../../../src/utils/retry/healing-strategies'; +import { detectRetryableError } from '../../../src/utils/retry/retry-agent-stream'; +import { RetryWithHealingError } from '../../../src/utils/retry/retry-error'; +import { createRetryOnErrorHandler } from '../../../src/utils/retry/retry-helpers'; import type { WorkflowContext } from '../../../src/utils/retry/types'; describe('Healing Behavior - Different Error Types', () => { @@ -41,9 +41,11 @@ describe('Healing Behavior - Different Error Types', () => { expect(healedMessages).toHaveLength(2); expect(healedMessages[0]?.content).toBe('Analyze my revenue data'); expect(healedMessages[1]?.content).toBe('Please continue with your analysis.'); - + // The empty assistant message should be gone - expect(healedMessages.find(m => m.role === 'assistant' && m.content === '')).toBeUndefined(); + expect( + healedMessages.find((m) => m.role === 'assistant' && m.content === '') + ).toBeUndefined(); }); }); @@ -51,17 +53,17 @@ describe('Healing Behavior - Different Error Types', () => { it('should remove malformed JSON response and retry', () => { const messages: CoreMessage[] = [ { role: 'user', content: 'Create a metric' }, - { - role: 'assistant', + { + role: 'assistant', content: [ { type: 'text', text: 'Creating metric...' }, - { - type: 'tool-call', - toolCallId: '123', + { + type: 'tool-call', + toolCallId: '123', toolName: 'createMetrics', - args: '{"name": "revenue", "expression": ' // Incomplete JSON - } - ] + args: '{"name": "revenue", "expression": ', // Incomplete JSON + }, + ], }, ]; @@ -80,9 +82,9 @@ describe('Healing Behavior - Different Error Types', () => { const healedMessages = applyHealingStrategy(messages, strategy); expect(healedMessages).toHaveLength(2); expect(healedMessages[1]?.content).toBe('Please continue with your analysis.'); - + // The malformed assistant message should be removed - expect(healedMessages.find(m => m.role === 'assistant')).toBeUndefined(); + expect(healedMessages.find((m) => m.role === 'assistant')).toBeUndefined(); }); }); @@ -90,16 +92,16 @@ describe('Healing Behavior - Different Error Types', () => { it('should keep the tool attempt and add healing message', () => { const messages: CoreMessage[] = [ { role: 'user', content: 'Create a dashboard' }, - { - role: 'assistant', + { + role: 'assistant', content: [ - { - type: 'tool-call', - toolCallId: 'call_123', + { + type: 'tool-call', + toolCallId: 'call_123', toolName: 'createDashboards', - args: { name: 'Revenue Dashboard' } - } - ] + args: { name: 'Revenue Dashboard' }, + }, + ], }, ]; @@ -118,15 +120,17 @@ describe('Healing Behavior - Different Error Types', () => { const healedMessages = applyHealingStrategy(messages, strategy); expect(healedMessages).toHaveLength(3); - + // Original messages should still be there expect(healedMessages[0]?.content).toBe('Create a dashboard'); expect(healedMessages[1]?.role).toBe('assistant'); - + // Healing message should be added expect(healedMessages[2]?.role).toBe('tool'); expect(healedMessages[2]?.content[0].type).toBe('tool-result'); - expect(healedMessages[2]?.content[0].result.error).toContain('Tool "createDashboards" is not available'); + expect(healedMessages[2]?.content[0].result.error).toContain( + 'Tool "createDashboards" is not available' + ); }); }); @@ -134,16 +138,16 @@ describe('Healing Behavior - Different Error Types', () => { it('should keep the tool call and add error result', () => { const messages: CoreMessage[] = [ { role: 'user', content: 'Query the database' }, - { - role: 'assistant', + { + role: 'assistant', content: [ - { - type: 'tool-call', - toolCallId: 'call_456', + { + type: 'tool-call', + toolCallId: 'call_456', toolName: 'executeSql', - args: { query: 123 } // Wrong type - } - ] + args: { query: 123 }, // Wrong type + }, + ], }, ]; @@ -152,9 +156,7 @@ describe('Healing Behavior - Different Error Types', () => { toolCallId: 'call_456', args: { query: 123 }, cause: { - errors: [ - { path: ['query'], message: 'Expected string, received number' } - ], + errors: [{ path: ['query'], message: 'Expected string, received number' }], }, }); (error as any).name = 'AI_InvalidToolArgumentsError'; @@ -168,11 +170,13 @@ describe('Healing Behavior - Different Error Types', () => { const healedMessages = applyHealingStrategy(messages, strategy); expect(healedMessages).toHaveLength(3); - + // Tool error result should be added const toolResult = healedMessages[2]; expect(toolResult?.role).toBe('tool'); - expect(toolResult?.content[0].result.error).toContain('query: Expected string, received number'); + expect(toolResult?.content[0].result.error).toContain( + 'query: Expected string, received number' + ); }); }); @@ -254,7 +258,7 @@ describe('Healing Behavior - Different Error Types', () => { expect(thrownError).toBeInstanceOf(RetryWithHealingError); expect(thrownError.retryableError.type).toBe('empty-response'); - + // The healing message should be a simple "continue" message expect(thrownError.retryableError.healingMessage.role).toBe('user'); expect(thrownError.retryableError.healingMessage.content).toBe('Please continue.'); @@ -293,4 +297,4 @@ describe('Healing Behavior - Different Error Types', () => { consoleInfoSpy.mockRestore(); }); }); -}); \ No newline at end of file +}); diff --git a/packages/ai/tests/utils/retry/healing-strategies.test.ts b/packages/ai/tests/utils/retry/healing-strategies.test.ts index c788b551a..df280cf1f 100644 --- a/packages/ai/tests/utils/retry/healing-strategies.test.ts +++ b/packages/ai/tests/utils/retry/healing-strategies.test.ts @@ -108,13 +108,17 @@ describe('healing-strategies', () => { it('should remove assistant message and subsequent tool results', () => { const messages: CoreMessage[] = [ { role: 'user', content: 'Hello' }, - { role: 'assistant', content: [ - { type: 'text', text: 'Let me help' }, - { type: 'tool-call', toolCallId: '123', toolName: 'test', args: {} }, - ]}, - { role: 'tool', content: [ - { type: 'tool-result', toolCallId: '123', toolName: 'test', result: {} }, - ]}, + { + role: 'assistant', + content: [ + { type: 'text', text: 'Let me help' }, + { type: 'tool-call', toolCallId: '123', toolName: 'test', args: {} }, + ], + }, + { + role: 'tool', + content: [{ type: 'tool-result', toolCallId: '123', toolName: 'test', result: {} }], + }, ]; const result = removeLastAssistantMessage(messages); @@ -158,7 +162,10 @@ describe('healing-strategies', () => { const strategy = { shouldRemoveLastAssistantMessage: true, - healingMessage: { role: 'user', content: 'Please continue with your analysis.' } as CoreMessage, + healingMessage: { + role: 'user', + content: 'Please continue with your analysis.', + } as CoreMessage, }; const result = applyHealingStrategy(messages, strategy); @@ -171,9 +178,10 @@ describe('healing-strategies', () => { it('should add healing without removing for tool errors', () => { const messages: CoreMessage[] = [ { role: 'user', content: 'Analyze data' }, - { role: 'assistant', content: [ - { type: 'tool-call', toolCallId: '123', toolName: 'wrongTool', args: {} }, - ]}, + { + role: 'assistant', + content: [{ type: 'tool-call', toolCallId: '123', toolName: 'wrongTool', args: {} }], + }, ]; const strategy = { @@ -237,7 +245,7 @@ describe('healing-strategies', () => { healingMessage: { role: 'user', content: '' }, }; expect(getErrorExplanationForUser(emptyError)).toBe( - 'The assistant\'s response was incomplete. Retrying...' + "The assistant's response was incomplete. Retrying..." ); const jsonError: RetryableError = { @@ -255,7 +263,7 @@ describe('healing-strategies', () => { healingMessage: { role: 'user', content: '' }, }; expect(getErrorExplanationForUser(toolError)).toBe( - 'The assistant tried to use a tool that\'s not available in the current mode.' + "The assistant tried to use a tool that's not available in the current mode." ); }); @@ -268,4 +276,4 @@ describe('healing-strategies', () => { expect(getErrorExplanationForUser(networkError)).toBeNull(); }); }); -}); \ No newline at end of file +}); diff --git a/packages/ai/tests/utils/retry/retry-helpers.test.ts b/packages/ai/tests/utils/retry/retry-helpers.test.ts index 007d11611..7cbfbef4b 100644 --- a/packages/ai/tests/utils/retry/retry-helpers.test.ts +++ b/packages/ai/tests/utils/retry/retry-helpers.test.ts @@ -1,16 +1,16 @@ import { NoSuchToolError } from 'ai'; import { describe, expect, it, vi } from 'vitest'; +import { RetryWithHealingError } from '../../../src/utils/retry/retry-error'; import { calculateBackoffDelay, createRetryOnErrorHandler, createUserFriendlyErrorMessage, extractDetailedErrorMessage, findHealingMessageInsertionIndex, + handleRetryWithHealing, logMessagesAfterHealing, logRetryInfo, - handleRetryWithHealing, } from '../../../src/utils/retry/retry-helpers'; -import { RetryWithHealingError } from '../../../src/utils/retry/retry-error'; import type { CoreMessage, RetryableError } from '../../../src/utils/retry/types'; // Mock the detectRetryableError function @@ -133,7 +133,9 @@ describe('retry-helpers', () => { }; const result = extractDetailedErrorMessage(error); - expect(result).toBe('Validation failed - Validation errors: field.nested: Required; other: Invalid'); + expect(result).toBe( + 'Validation failed - Validation errors: field.nested: Required; other: Invalid' + ); }); it('should include status code for API errors', () => { @@ -183,7 +185,9 @@ describe('retry-helpers', () => { (error as any).responseBody = 'Server error details'; const result = extractDetailedErrorMessage(error); - expect(result).toBe('Complex error (Status: 500) - Response: Server error details (Tool: complexTool)'); + expect(result).toBe( + 'Complex error (Status: 500) - Response: Server error details (Tool: complexTool)' + ); }); }); @@ -461,12 +465,9 @@ describe('retry-helpers', () => { { role: 'assistant', content: 'Processing...' }, ]; - const result = await handleRetryWithHealing( - retryableError, - messages, - 2, - { currentStep: 'analyst' } - ); + const result = await handleRetryWithHealing(retryableError, messages, 2, { + currentStep: 'analyst', + }); expect(result.shouldContinueWithoutHealing).toBe(true); expect(result.healedMessages).toEqual(messages); // Messages unchanged @@ -489,12 +490,9 @@ describe('retry-helpers', () => { { role: 'assistant', content: '' }, // Empty response ]; - const result = await handleRetryWithHealing( - retryableError, - messages, - 1, - { currentStep: 'think-and-prep' } - ); + const result = await handleRetryWithHealing(retryableError, messages, 1, { + currentStep: 'think-and-prep', + }); expect(result.shouldContinueWithoutHealing).toBe(false); expect(result.healedMessages).toHaveLength(2); @@ -542,12 +540,9 @@ describe('retry-helpers', () => { }, ]; - const result = await handleRetryWithHealing( - retryableError, - messages, - 0, - { currentStep: 'analyst' } - ); + const result = await handleRetryWithHealing(retryableError, messages, 0, { + currentStep: 'analyst', + }); expect(result.shouldContinueWithoutHealing).toBe(false); expect(result.healedMessages).toHaveLength(3); @@ -567,16 +562,11 @@ describe('retry-helpers', () => { originalError: new Error('429 Too Many Requests'), }; - const messages: CoreMessage[] = [ - { role: 'user', content: 'Query' }, - ]; + const messages: CoreMessage[] = [{ role: 'user', content: 'Query' }]; - const result = await handleRetryWithHealing( - retryableError, - messages, - 3, - { currentStep: 'analyst' } - ); + const result = await handleRetryWithHealing(retryableError, messages, 3, { + currentStep: 'analyst', + }); expect(result.shouldContinueWithoutHealing).toBe(true); expect(result.backoffDelay).toBe(24000); // 2^3 * 1000 * 3 (rate limit multiplier) @@ -584,4 +574,4 @@ describe('retry-helpers', () => { consoleInfoSpy.mockRestore(); }); }); -}); \ No newline at end of file +}); diff --git a/packages/ai/tests/utils/retry/streaming-error-scenarios.test.ts b/packages/ai/tests/utils/retry/streaming-error-scenarios.test.ts index 7b0495cf5..6de45db04 100644 --- a/packages/ai/tests/utils/retry/streaming-error-scenarios.test.ts +++ b/packages/ai/tests/utils/retry/streaming-error-scenarios.test.ts @@ -3,8 +3,8 @@ import { RuntimeContext } from '@mastra/core/runtime-context'; import type { CoreMessage, StreamTextResult, TextStreamPart } from 'ai'; import { APICallError, NoSuchToolError } from 'ai'; import { describe, expect, it, vi } from 'vitest'; -import { createRetryOnErrorHandler } from '../../../src/utils/retry/retry-helpers'; import { RetryWithHealingError } from '../../../src/utils/retry/retry-error'; +import { createRetryOnErrorHandler } from '../../../src/utils/retry/retry-helpers'; import type { WorkflowContext } from '../../../src/utils/retry/types'; describe('Streaming Error Scenarios - Real-World Tests', () => { @@ -35,9 +35,7 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { describe('Stream Error During Tool Call', () => { it('should handle error thrown during tool call streaming', async () => { - const messages: CoreMessage[] = [ - { role: 'user', content: 'Analyze my data' }, - ]; + const messages: CoreMessage[] = [{ role: 'user', content: 'Analyze my data' }]; let onChunkCalled = 0; let onErrorHandler: ((event: { error: unknown }) => Promise) | undefined; @@ -77,12 +75,12 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { // Capture the onError handler when stream is called mockAgent.stream.mockImplementation(async (_messages, options) => { onErrorHandler = options?.onError; - + const streamBehavior = () => ({ async *[Symbol.asyncIterator]() { yield { type: 'text-delta', textDelta: 'Let me analyze' } as TextStreamPart; onChunkCalled++; - + // Simulate the error and let onError handle it const error = new APICallError({ message: 'Connection reset', @@ -94,11 +92,11 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { cause: new Error('ECONNRESET'), isRetryable: true, }); - + if (onErrorHandler) { await onErrorHandler({ error }); } - + // Continue streaming after healing yield { type: 'text-delta', textDelta: ' your data...' } as TextStreamPart; }, @@ -253,14 +251,15 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { describe('Complex Streaming Scenarios', () => { it('should handle partial tool call followed by error', async () => { - const messages: CoreMessage[] = [ - { role: 'user', content: 'Create a metric for revenue' }, - ]; + const messages: CoreMessage[] = [{ role: 'user', content: 'Create a metric for revenue' }]; async function* streamGenerator() { // Start with text - yield { type: 'text-delta', textDelta: 'I\'ll create a revenue metric' } as TextStreamPart; - + yield { + type: 'text-delta', + textDelta: "I'll create a revenue metric", + } as TextStreamPart; + // Start tool call yield { type: 'tool-call-delta', @@ -269,13 +268,13 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { toolName: 'createMetrics', argsTextDelta: '{"name": "revenue", "expression": ', } as TextStreamPart; - + // Simulate JSON parse error in the middle of tool args throw new Error('Unexpected end of JSON input'); } const mockAgent = createMockAgent(streamGenerator); - + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); const consoleInfoSpy = vi.spyOn(console, 'info').mockImplementation(() => {}); @@ -288,7 +287,7 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { // Test the onError handler directly with the expected error const streamError = new Error('Unexpected end of JSON input'); - + let errorThrown: any; try { await onErrorHandler({ error: streamError }); @@ -298,7 +297,9 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { expect(errorThrown).toBeInstanceOf(RetryWithHealingError); expect(errorThrown.retryableError.type).toBe('unknown-error'); - expect(errorThrown.retryableError.healingMessage.content).toContain('Unexpected end of JSON input'); + expect(errorThrown.retryableError.healingMessage.content).toContain( + 'Unexpected end of JSON input' + ); consoleErrorSpy.mockRestore(); consoleInfoSpy.mockRestore(); @@ -311,7 +312,7 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { // Simulate that chunks were emitted before error const chunksEmitted = 4; // Simulating 4 chunks were emitted - + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); const consoleInfoSpy = vi.spyOn(console, 'info').mockImplementation(() => {}); @@ -374,11 +375,11 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { } expect(thrownError).toBeInstanceOf(RetryWithHealingError); - + const healingMessage = thrownError.retryableError.healingMessage; expect(healingMessage.role).toBe('tool'); expect(Array.isArray(healingMessage.content)).toBe(true); - + const toolResult = healingMessage.content[0]; expect(toolResult.type).toBe('tool-result'); expect(toolResult.toolCallId).toBeDefined(); @@ -390,4 +391,4 @@ describe('Streaming Error Scenarios - Real-World Tests', () => { consoleInfoSpy.mockRestore(); }); }); -}); \ No newline at end of file +});