diff --git a/packages/ai/src/utils/models/ai-fallback-memory-safe.test.ts b/packages/ai/src/utils/models/ai-fallback-memory-safe.test.ts index 779bebe8b..ec2452834 100644 --- a/packages/ai/src/utils/models/ai-fallback-memory-safe.test.ts +++ b/packages/ai/src/utils/models/ai-fallback-memory-safe.test.ts @@ -64,6 +64,47 @@ function createMemorySafeMockModel( }; } +// Memory-safe mock that simulates AbortError during streaming +function createAbortingMemorySafeMockModel(id: string, abortAfterChunks = 1): LanguageModelV1 { + return { + specificationVersion: 'v1' as const, + modelId: id, + provider: `provider-${id}`, + defaultObjectGenerationMode: undefined, + + doGenerate: vi.fn(), + + doStream: vi.fn().mockImplementation(async () => { + let readCount = 0; + + return { + stream: { + getReader: () => ({ + read: vi.fn().mockImplementation(async () => { + if (readCount < abortAfterChunks) { + readCount++; + return { + done: false, + value: { + type: 'text-delta', + textDelta: `Stream chunk ${readCount} from ${id}`, + } as LanguageModelV1StreamPart, + }; + } + // Throw AbortError after specified chunks + const abortError = new Error('The operation was aborted'); + abortError.name = 'AbortError'; + throw abortError; + }), + releaseLock: vi.fn(), + }), + } as any, + rawCall: { rawPrompt: 'test', rawSettings: {} }, + }; + }), + }; +} + describe('FallbackModel - Memory Safe Streaming Tests', () => { it('should successfully stream from the first model', async () => { const model1 = createMemorySafeMockModel('model1'); @@ -138,4 +179,149 @@ describe('FallbackModel - Memory Safe Streaming Tests', () => { await expect(fallback.doStream(options)).rejects.toThrow('Invalid API key'); expect(model2.doStream).not.toHaveBeenCalled(); }); + + describe('AbortError handling', () => { + it('should handle AbortError without retrying to next model', async () => { + const model1 = createAbortingMemorySafeMockModel('model1', 1); // Aborts after first chunk + const model2 = createMemorySafeMockModel('model2'); + const onError = vi.fn(); + + const fallback = createFallback({ + models: [model1, model2], + onError, + }); + + const options: LanguageModelV1CallOptions = { + inputFormat: 'prompt', + prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }], + mode: { type: 'regular' }, + }; + + const result = await fallback.doStream(options); + const reader = result.stream.getReader(); + + // Read first chunk successfully + const firstChunk = await reader.read(); + expect(firstChunk.done).toBe(false); + expect(firstChunk.value).toEqual({ + type: 'text-delta', + textDelta: 'Stream chunk 1 from model1', + }); + + // Next read should complete without error (AbortError is handled gracefully) + const secondRead = await reader.read(); + expect(secondRead.done).toBe(true); + + // Should not have called onError since AbortError is intentional + expect(onError).not.toHaveBeenCalled(); + + // Should not have tried the second model + expect(model2.doStream).not.toHaveBeenCalled(); + }); + + it('should handle AbortError before any output', async () => { + const model1 = createAbortingMemorySafeMockModel('model1', 0); // Aborts immediately + const model2 = createMemorySafeMockModel('model2'); + + const fallback = createFallback({ models: [model1, model2] }); + + const options: LanguageModelV1CallOptions = { + inputFormat: 'prompt', + prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }], + mode: { type: 'regular' }, + }; + + const result = await fallback.doStream(options); + const reader = result.stream.getReader(); + + // First read should complete without error + const firstRead = await reader.read(); + expect(firstRead.done).toBe(true); + + // Should not have tried the second model (abort is intentional) + expect(model2.doStream).not.toHaveBeenCalled(); + }); + + it('should not retry on AbortError even with retryAfterOutput enabled', async () => { + const model1 = createAbortingMemorySafeMockModel('model1', 2); // Aborts after 2 chunks + const model2 = createMemorySafeMockModel('model2'); + + const fallback = createFallback({ + models: [model1, model2], + retryAfterOutput: true, // Even with this enabled, AbortError should not retry + }); + + const options: LanguageModelV1CallOptions = { + inputFormat: 'prompt', + prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }], + mode: { type: 'regular' }, + }; + + const result = await fallback.doStream(options); + const reader = result.stream.getReader(); + + // Read chunks successfully + const chunk1 = await reader.read(); + expect(chunk1.value).toEqual({ + type: 'text-delta', + textDelta: 'Stream chunk 1 from model1', + }); + + const chunk2 = await reader.read(); + expect(chunk2.value).toEqual({ + type: 'text-delta', + textDelta: 'Stream chunk 2 from model1', + }); + + // Next read should complete without error + const finalRead = await reader.read(); + expect(finalRead.done).toBe(true); + + // Should not have tried model2 even with retryAfterOutput + expect(model2.doStream).not.toHaveBeenCalled(); + }); + + it('should not cause "Controller is already closed" error on AbortError', async () => { + // This test specifically validates the fix for the controller closed issue + const model1 = createAbortingMemorySafeMockModel('model1', 1); + const model2 = createMemorySafeMockModel('model2'); + + const fallback = createFallback({ + models: [model1, model2], + retryAfterOutput: true, + }); + + const options: LanguageModelV1CallOptions = { + inputFormat: 'prompt', + prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }], + mode: { type: 'regular' }, + }; + + // This should not throw "Controller is already closed" error + const result = await fallback.doStream(options); + const reader = result.stream.getReader(); + + let errorThrown: Error | null = null; + const chunks: any[] = []; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + } catch (error) { + errorThrown = error as Error; + } + + // Should have successfully read one chunk before abort + expect(chunks).toHaveLength(1); + + // Should not have thrown any error + expect(errorThrown).toBeNull(); + + // Should not have tried to fallback to model2 + expect(model2.doStream).not.toHaveBeenCalled(); + }); + }); }); diff --git a/packages/ai/src/utils/models/ai-fallback-streaming.test.ts b/packages/ai/src/utils/models/ai-fallback-streaming.test.ts index faf1eaf22..df0b0e0c5 100644 --- a/packages/ai/src/utils/models/ai-fallback-streaming.test.ts +++ b/packages/ai/src/utils/models/ai-fallback-streaming.test.ts @@ -99,6 +99,52 @@ function createFailingStreamModel(id: string, errorAfterChunks = 1): LanguageMod return mockModel; } +// Helper to create a model that throws AbortError during streaming +function createAbortingStreamModel(id: string, abortAfterChunks = 1): LanguageModelV1 { + const mockModel: LanguageModelV1 = { + specificationVersion: 'v1' as const, + modelId: id, + provider: `provider-${id}`, + defaultObjectGenerationMode: undefined, + + doGenerate: vi.fn(), + + doStream: vi.fn().mockImplementation(async () => { + const chunks: LanguageModelV1StreamPart[] = [ + { type: 'text-delta', textDelta: `Stream from ${id} before abort` }, + { type: 'text-delta', textDelta: ' more text' }, + { type: 'finish', finishReason: 'stop', usage: { promptTokens: 10, completionTokens: 20 } }, + ]; + + const stream = new ReadableStream({ + start(controller) { + let chunkCount = 0; + // Enqueue chunks up to the abort point + for (const chunk of chunks) { + if (chunkCount >= abortAfterChunks) { + // Simulate an AbortError + const abortError = new Error('The operation was aborted'); + abortError.name = 'AbortError'; + setTimeout(() => controller.error(abortError), 0); + return; + } + controller.enqueue(chunk); + chunkCount++; + } + controller.close(); + }, + }); + + return { + stream, + rawCall: { rawPrompt: 'test', rawSettings: {} }, + }; + }), + }; + + return mockModel; +} + // NOTE: These streaming tests are temporarily disabled due to memory issues // with ReadableStream in the test environment. See ai-fallback-memory-safe.test.ts // for alternative streaming tests that avoid memory issues. @@ -276,6 +322,126 @@ describe.skip('FallbackModel - Streaming', () => { }); }); + describe('abort error handling', () => { + it('should handle AbortError without retrying or causing controller closed error', async () => { + const model1 = createAbortingStreamModel('model1', 1); // Aborts after first chunk + const model2 = createMockModel('model2'); + const onError = vi.fn(); + const fallback = createFallback({ + models: [model1, model2], + onError, + }); + + const options: LanguageModelV1CallOptions = { + inputFormat: 'prompt', + prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }], + mode: { type: 'regular' }, + }; + + const result = await fallback.doStream(options); + const reader = result.stream.getReader(); + const chunks: LanguageModelV1StreamPart[] = []; + let errorThrown: Error | null = null; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + } catch (error) { + errorThrown = error as Error; + } + + // Should have received the chunk before the abort + expect(chunks).toHaveLength(1); + expect(chunks[0]).toEqual({ + type: 'text-delta', + textDelta: 'Stream from model1 before abort', + }); + + // Should not have called onError since AbortError is intentional + expect(onError).not.toHaveBeenCalled(); + + // Should not have tried the second model + expect(model2.doStream).not.toHaveBeenCalled(); + + // Stream should have ended cleanly without throwing + expect(errorThrown).toBeNull(); + }); + + it('should handle AbortError before any output', async () => { + const model1 = createAbortingStreamModel('model1', 0); // Aborts immediately + const model2 = createMockModel('model2'); + const fallback = createFallback({ models: [model1, model2] }); + + const options: LanguageModelV1CallOptions = { + inputFormat: 'prompt', + prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }], + mode: { type: 'regular' }, + }; + + const result = await fallback.doStream(options); + const reader = result.stream.getReader(); + const chunks: LanguageModelV1StreamPart[] = []; + let errorThrown: Error | null = null; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + } catch (error) { + errorThrown = error as Error; + } + + // Should not have received any chunks + expect(chunks).toHaveLength(0); + + // Should not have tried the second model (abort is intentional) + expect(model2.doStream).not.toHaveBeenCalled(); + + // Stream should have ended cleanly without throwing + expect(errorThrown).toBeNull(); + }); + + it('should handle AbortError with retryAfterOutput enabled', async () => { + const model1 = createAbortingStreamModel('model1', 1); // Aborts after first chunk + const model2 = createMockModel('model2'); + const fallback = createFallback({ + models: [model1, model2], + retryAfterOutput: true, // Even with this enabled, AbortError should not retry + }); + + const options: LanguageModelV1CallOptions = { + inputFormat: 'prompt', + prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }], + mode: { type: 'regular' }, + }; + + const result = await fallback.doStream(options); + const reader = result.stream.getReader(); + const chunks: LanguageModelV1StreamPart[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + + // Should only have chunks from model1 before abort + expect(chunks).toHaveLength(1); + expect(chunks[0]).toEqual({ + type: 'text-delta', + textDelta: 'Stream from model1 before abort', + }); + + // Should not have tried model2 even with retryAfterOutput + expect(model2.doStream).not.toHaveBeenCalled(); + }); + }); + describe('stream retry with status codes', () => { it('should retry streaming on retryable status code', async () => { const error = Object.assign(new Error('Rate limited'), { statusCode: 429 }); diff --git a/packages/ai/src/utils/models/ai-fallback.ts b/packages/ai/src/utils/models/ai-fallback.ts index b619121bd..1bb4d0471 100644 --- a/packages/ai/src/utils/models/ai-fallback.ts +++ b/packages/ai/src/utils/models/ai-fallback.ts @@ -191,9 +191,24 @@ export class FallbackModel implements LanguageModelV1 { } controller.close(); } catch (error) { + // Check if this is an intentional abort (not a retry scenario) + if (error instanceof Error && error.name === 'AbortError') { + // Don't retry on intentional aborts, just close the controller + controller.close(); + return; + } + if (self.settings.onError) { await self.settings.onError(error as Error, self.modelId); } + + // Check if we should retry this error + const shouldRetry = self.settings.shouldRetryThisError || defaultShouldRetryThisError; + if (!shouldRetry(error as Error)) { + controller.error(error); + return; + } + if ( (!hasStreamedAny || self.retryAfterOutput) && streamRetryAttempts < maxStreamRetries diff --git a/packages/ai/src/utils/models/haiku-3-5.ts b/packages/ai/src/utils/models/haiku-3-5.ts index 14b215519..dbab6b9b4 100644 --- a/packages/ai/src/utils/models/haiku-3-5.ts +++ b/packages/ai/src/utils/models/haiku-3-5.ts @@ -55,13 +55,14 @@ function initializeHaiku35() { // Export a proxy that initializes on first use export const Haiku35 = new Proxy({} as ReturnType, { - get(_target, prop, receiver) { + get(_target, prop) { const instance = initializeHaiku35(); - return Reflect.get(instance, prop, receiver); + // Direct property access without receiver to avoid proxy conflicts + return instance[prop as keyof typeof instance]; }, has(_target, prop) { const instance = initializeHaiku35(); - return Reflect.has(instance, prop); + return prop in instance; }, ownKeys(_target) { const instance = initializeHaiku35(); diff --git a/packages/ai/src/utils/models/sonnet-4.ts b/packages/ai/src/utils/models/sonnet-4.ts index 536b3033e..03bb8ecd3 100644 --- a/packages/ai/src/utils/models/sonnet-4.ts +++ b/packages/ai/src/utils/models/sonnet-4.ts @@ -55,13 +55,14 @@ function initializeSonnet4() { // Export a proxy that initializes on first use export const Sonnet4 = new Proxy({} as ReturnType, { - get(_target, prop, receiver) { + get(_target, prop) { const instance = initializeSonnet4(); - return Reflect.get(instance, prop, receiver); + // Direct property access without receiver to avoid proxy conflicts + return instance[prop as keyof typeof instance]; }, has(_target, prop) { const instance = initializeSonnet4(); - return Reflect.has(instance, prop); + return prop in instance; }, ownKeys(_target) { const instance = initializeSonnet4();