mirror of https://github.com/buster-so/buster.git
Implement AbortError handling in streaming models
- Added tests for handling AbortError scenarios in both memory-safe and standard streaming models. - Updated the FallbackModel to gracefully handle intentional aborts without retrying or causing errors. - Ensured that the controller closes properly on AbortError and that no fallback to subsequent models occurs. - Enhanced mock models to simulate AbortError conditions for comprehensive testing.
This commit is contained in:
parent
481bbed328
commit
f10627b005
|
@ -64,6 +64,47 @@ function createMemorySafeMockModel(
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Memory-safe mock that simulates AbortError during streaming
|
||||||
|
function createAbortingMemorySafeMockModel(id: string, abortAfterChunks = 1): LanguageModelV1 {
|
||||||
|
return {
|
||||||
|
specificationVersion: 'v1' as const,
|
||||||
|
modelId: id,
|
||||||
|
provider: `provider-${id}`,
|
||||||
|
defaultObjectGenerationMode: undefined,
|
||||||
|
|
||||||
|
doGenerate: vi.fn(),
|
||||||
|
|
||||||
|
doStream: vi.fn().mockImplementation(async () => {
|
||||||
|
let readCount = 0;
|
||||||
|
|
||||||
|
return {
|
||||||
|
stream: {
|
||||||
|
getReader: () => ({
|
||||||
|
read: vi.fn().mockImplementation(async () => {
|
||||||
|
if (readCount < abortAfterChunks) {
|
||||||
|
readCount++;
|
||||||
|
return {
|
||||||
|
done: false,
|
||||||
|
value: {
|
||||||
|
type: 'text-delta',
|
||||||
|
textDelta: `Stream chunk ${readCount} from ${id}`,
|
||||||
|
} as LanguageModelV1StreamPart,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
// Throw AbortError after specified chunks
|
||||||
|
const abortError = new Error('The operation was aborted');
|
||||||
|
abortError.name = 'AbortError';
|
||||||
|
throw abortError;
|
||||||
|
}),
|
||||||
|
releaseLock: vi.fn(),
|
||||||
|
}),
|
||||||
|
} as any,
|
||||||
|
rawCall: { rawPrompt: 'test', rawSettings: {} },
|
||||||
|
};
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
describe('FallbackModel - Memory Safe Streaming Tests', () => {
|
describe('FallbackModel - Memory Safe Streaming Tests', () => {
|
||||||
it('should successfully stream from the first model', async () => {
|
it('should successfully stream from the first model', async () => {
|
||||||
const model1 = createMemorySafeMockModel('model1');
|
const model1 = createMemorySafeMockModel('model1');
|
||||||
|
@ -138,4 +179,149 @@ describe('FallbackModel - Memory Safe Streaming Tests', () => {
|
||||||
await expect(fallback.doStream(options)).rejects.toThrow('Invalid API key');
|
await expect(fallback.doStream(options)).rejects.toThrow('Invalid API key');
|
||||||
expect(model2.doStream).not.toHaveBeenCalled();
|
expect(model2.doStream).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('AbortError handling', () => {
|
||||||
|
it('should handle AbortError without retrying to next model', async () => {
|
||||||
|
const model1 = createAbortingMemorySafeMockModel('model1', 1); // Aborts after first chunk
|
||||||
|
const model2 = createMemorySafeMockModel('model2');
|
||||||
|
const onError = vi.fn();
|
||||||
|
|
||||||
|
const fallback = createFallback({
|
||||||
|
models: [model1, model2],
|
||||||
|
onError,
|
||||||
|
});
|
||||||
|
|
||||||
|
const options: LanguageModelV1CallOptions = {
|
||||||
|
inputFormat: 'prompt',
|
||||||
|
prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }],
|
||||||
|
mode: { type: 'regular' },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await fallback.doStream(options);
|
||||||
|
const reader = result.stream.getReader();
|
||||||
|
|
||||||
|
// Read first chunk successfully
|
||||||
|
const firstChunk = await reader.read();
|
||||||
|
expect(firstChunk.done).toBe(false);
|
||||||
|
expect(firstChunk.value).toEqual({
|
||||||
|
type: 'text-delta',
|
||||||
|
textDelta: 'Stream chunk 1 from model1',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Next read should complete without error (AbortError is handled gracefully)
|
||||||
|
const secondRead = await reader.read();
|
||||||
|
expect(secondRead.done).toBe(true);
|
||||||
|
|
||||||
|
// Should not have called onError since AbortError is intentional
|
||||||
|
expect(onError).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// Should not have tried the second model
|
||||||
|
expect(model2.doStream).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle AbortError before any output', async () => {
|
||||||
|
const model1 = createAbortingMemorySafeMockModel('model1', 0); // Aborts immediately
|
||||||
|
const model2 = createMemorySafeMockModel('model2');
|
||||||
|
|
||||||
|
const fallback = createFallback({ models: [model1, model2] });
|
||||||
|
|
||||||
|
const options: LanguageModelV1CallOptions = {
|
||||||
|
inputFormat: 'prompt',
|
||||||
|
prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }],
|
||||||
|
mode: { type: 'regular' },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await fallback.doStream(options);
|
||||||
|
const reader = result.stream.getReader();
|
||||||
|
|
||||||
|
// First read should complete without error
|
||||||
|
const firstRead = await reader.read();
|
||||||
|
expect(firstRead.done).toBe(true);
|
||||||
|
|
||||||
|
// Should not have tried the second model (abort is intentional)
|
||||||
|
expect(model2.doStream).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not retry on AbortError even with retryAfterOutput enabled', async () => {
|
||||||
|
const model1 = createAbortingMemorySafeMockModel('model1', 2); // Aborts after 2 chunks
|
||||||
|
const model2 = createMemorySafeMockModel('model2');
|
||||||
|
|
||||||
|
const fallback = createFallback({
|
||||||
|
models: [model1, model2],
|
||||||
|
retryAfterOutput: true, // Even with this enabled, AbortError should not retry
|
||||||
|
});
|
||||||
|
|
||||||
|
const options: LanguageModelV1CallOptions = {
|
||||||
|
inputFormat: 'prompt',
|
||||||
|
prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }],
|
||||||
|
mode: { type: 'regular' },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await fallback.doStream(options);
|
||||||
|
const reader = result.stream.getReader();
|
||||||
|
|
||||||
|
// Read chunks successfully
|
||||||
|
const chunk1 = await reader.read();
|
||||||
|
expect(chunk1.value).toEqual({
|
||||||
|
type: 'text-delta',
|
||||||
|
textDelta: 'Stream chunk 1 from model1',
|
||||||
|
});
|
||||||
|
|
||||||
|
const chunk2 = await reader.read();
|
||||||
|
expect(chunk2.value).toEqual({
|
||||||
|
type: 'text-delta',
|
||||||
|
textDelta: 'Stream chunk 2 from model1',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Next read should complete without error
|
||||||
|
const finalRead = await reader.read();
|
||||||
|
expect(finalRead.done).toBe(true);
|
||||||
|
|
||||||
|
// Should not have tried model2 even with retryAfterOutput
|
||||||
|
expect(model2.doStream).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not cause "Controller is already closed" error on AbortError', async () => {
|
||||||
|
// This test specifically validates the fix for the controller closed issue
|
||||||
|
const model1 = createAbortingMemorySafeMockModel('model1', 1);
|
||||||
|
const model2 = createMemorySafeMockModel('model2');
|
||||||
|
|
||||||
|
const fallback = createFallback({
|
||||||
|
models: [model1, model2],
|
||||||
|
retryAfterOutput: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const options: LanguageModelV1CallOptions = {
|
||||||
|
inputFormat: 'prompt',
|
||||||
|
prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }],
|
||||||
|
mode: { type: 'regular' },
|
||||||
|
};
|
||||||
|
|
||||||
|
// This should not throw "Controller is already closed" error
|
||||||
|
const result = await fallback.doStream(options);
|
||||||
|
const reader = result.stream.getReader();
|
||||||
|
|
||||||
|
let errorThrown: Error | null = null;
|
||||||
|
const chunks: any[] = [];
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
chunks.push(value);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
errorThrown = error as Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should have successfully read one chunk before abort
|
||||||
|
expect(chunks).toHaveLength(1);
|
||||||
|
|
||||||
|
// Should not have thrown any error
|
||||||
|
expect(errorThrown).toBeNull();
|
||||||
|
|
||||||
|
// Should not have tried to fallback to model2
|
||||||
|
expect(model2.doStream).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -99,6 +99,52 @@ function createFailingStreamModel(id: string, errorAfterChunks = 1): LanguageMod
|
||||||
return mockModel;
|
return mockModel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper to create a model that throws AbortError during streaming
|
||||||
|
function createAbortingStreamModel(id: string, abortAfterChunks = 1): LanguageModelV1 {
|
||||||
|
const mockModel: LanguageModelV1 = {
|
||||||
|
specificationVersion: 'v1' as const,
|
||||||
|
modelId: id,
|
||||||
|
provider: `provider-${id}`,
|
||||||
|
defaultObjectGenerationMode: undefined,
|
||||||
|
|
||||||
|
doGenerate: vi.fn(),
|
||||||
|
|
||||||
|
doStream: vi.fn().mockImplementation(async () => {
|
||||||
|
const chunks: LanguageModelV1StreamPart[] = [
|
||||||
|
{ type: 'text-delta', textDelta: `Stream from ${id} before abort` },
|
||||||
|
{ type: 'text-delta', textDelta: ' more text' },
|
||||||
|
{ type: 'finish', finishReason: 'stop', usage: { promptTokens: 10, completionTokens: 20 } },
|
||||||
|
];
|
||||||
|
|
||||||
|
const stream = new ReadableStream<LanguageModelV1StreamPart>({
|
||||||
|
start(controller) {
|
||||||
|
let chunkCount = 0;
|
||||||
|
// Enqueue chunks up to the abort point
|
||||||
|
for (const chunk of chunks) {
|
||||||
|
if (chunkCount >= abortAfterChunks) {
|
||||||
|
// Simulate an AbortError
|
||||||
|
const abortError = new Error('The operation was aborted');
|
||||||
|
abortError.name = 'AbortError';
|
||||||
|
setTimeout(() => controller.error(abortError), 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
controller.enqueue(chunk);
|
||||||
|
chunkCount++;
|
||||||
|
}
|
||||||
|
controller.close();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
stream,
|
||||||
|
rawCall: { rawPrompt: 'test', rawSettings: {} },
|
||||||
|
};
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
return mockModel;
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: These streaming tests are temporarily disabled due to memory issues
|
// NOTE: These streaming tests are temporarily disabled due to memory issues
|
||||||
// with ReadableStream in the test environment. See ai-fallback-memory-safe.test.ts
|
// with ReadableStream in the test environment. See ai-fallback-memory-safe.test.ts
|
||||||
// for alternative streaming tests that avoid memory issues.
|
// for alternative streaming tests that avoid memory issues.
|
||||||
|
@ -276,6 +322,126 @@ describe.skip('FallbackModel - Streaming', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('abort error handling', () => {
|
||||||
|
it('should handle AbortError without retrying or causing controller closed error', async () => {
|
||||||
|
const model1 = createAbortingStreamModel('model1', 1); // Aborts after first chunk
|
||||||
|
const model2 = createMockModel('model2');
|
||||||
|
const onError = vi.fn();
|
||||||
|
const fallback = createFallback({
|
||||||
|
models: [model1, model2],
|
||||||
|
onError,
|
||||||
|
});
|
||||||
|
|
||||||
|
const options: LanguageModelV1CallOptions = {
|
||||||
|
inputFormat: 'prompt',
|
||||||
|
prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }],
|
||||||
|
mode: { type: 'regular' },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await fallback.doStream(options);
|
||||||
|
const reader = result.stream.getReader();
|
||||||
|
const chunks: LanguageModelV1StreamPart[] = [];
|
||||||
|
let errorThrown: Error | null = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
chunks.push(value);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
errorThrown = error as Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should have received the chunk before the abort
|
||||||
|
expect(chunks).toHaveLength(1);
|
||||||
|
expect(chunks[0]).toEqual({
|
||||||
|
type: 'text-delta',
|
||||||
|
textDelta: 'Stream from model1 before abort',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Should not have called onError since AbortError is intentional
|
||||||
|
expect(onError).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// Should not have tried the second model
|
||||||
|
expect(model2.doStream).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// Stream should have ended cleanly without throwing
|
||||||
|
expect(errorThrown).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle AbortError before any output', async () => {
|
||||||
|
const model1 = createAbortingStreamModel('model1', 0); // Aborts immediately
|
||||||
|
const model2 = createMockModel('model2');
|
||||||
|
const fallback = createFallback({ models: [model1, model2] });
|
||||||
|
|
||||||
|
const options: LanguageModelV1CallOptions = {
|
||||||
|
inputFormat: 'prompt',
|
||||||
|
prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }],
|
||||||
|
mode: { type: 'regular' },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await fallback.doStream(options);
|
||||||
|
const reader = result.stream.getReader();
|
||||||
|
const chunks: LanguageModelV1StreamPart[] = [];
|
||||||
|
let errorThrown: Error | null = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
chunks.push(value);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
errorThrown = error as Error;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should not have received any chunks
|
||||||
|
expect(chunks).toHaveLength(0);
|
||||||
|
|
||||||
|
// Should not have tried the second model (abort is intentional)
|
||||||
|
expect(model2.doStream).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// Stream should have ended cleanly without throwing
|
||||||
|
expect(errorThrown).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle AbortError with retryAfterOutput enabled', async () => {
|
||||||
|
const model1 = createAbortingStreamModel('model1', 1); // Aborts after first chunk
|
||||||
|
const model2 = createMockModel('model2');
|
||||||
|
const fallback = createFallback({
|
||||||
|
models: [model1, model2],
|
||||||
|
retryAfterOutput: true, // Even with this enabled, AbortError should not retry
|
||||||
|
});
|
||||||
|
|
||||||
|
const options: LanguageModelV1CallOptions = {
|
||||||
|
inputFormat: 'prompt',
|
||||||
|
prompt: [{ role: 'user', content: [{ type: 'text', text: 'Test prompt' }] }],
|
||||||
|
mode: { type: 'regular' },
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await fallback.doStream(options);
|
||||||
|
const reader = result.stream.getReader();
|
||||||
|
const chunks: LanguageModelV1StreamPart[] = [];
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
chunks.push(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should only have chunks from model1 before abort
|
||||||
|
expect(chunks).toHaveLength(1);
|
||||||
|
expect(chunks[0]).toEqual({
|
||||||
|
type: 'text-delta',
|
||||||
|
textDelta: 'Stream from model1 before abort',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Should not have tried model2 even with retryAfterOutput
|
||||||
|
expect(model2.doStream).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('stream retry with status codes', () => {
|
describe('stream retry with status codes', () => {
|
||||||
it('should retry streaming on retryable status code', async () => {
|
it('should retry streaming on retryable status code', async () => {
|
||||||
const error = Object.assign(new Error('Rate limited'), { statusCode: 429 });
|
const error = Object.assign(new Error('Rate limited'), { statusCode: 429 });
|
||||||
|
|
|
@ -191,9 +191,24 @@ export class FallbackModel implements LanguageModelV1 {
|
||||||
}
|
}
|
||||||
controller.close();
|
controller.close();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
// Check if this is an intentional abort (not a retry scenario)
|
||||||
|
if (error instanceof Error && error.name === 'AbortError') {
|
||||||
|
// Don't retry on intentional aborts, just close the controller
|
||||||
|
controller.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (self.settings.onError) {
|
if (self.settings.onError) {
|
||||||
await self.settings.onError(error as Error, self.modelId);
|
await self.settings.onError(error as Error, self.modelId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if we should retry this error
|
||||||
|
const shouldRetry = self.settings.shouldRetryThisError || defaultShouldRetryThisError;
|
||||||
|
if (!shouldRetry(error as Error)) {
|
||||||
|
controller.error(error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (
|
if (
|
||||||
(!hasStreamedAny || self.retryAfterOutput) &&
|
(!hasStreamedAny || self.retryAfterOutput) &&
|
||||||
streamRetryAttempts < maxStreamRetries
|
streamRetryAttempts < maxStreamRetries
|
||||||
|
|
|
@ -55,13 +55,14 @@ function initializeHaiku35() {
|
||||||
|
|
||||||
// Export a proxy that initializes on first use
|
// Export a proxy that initializes on first use
|
||||||
export const Haiku35 = new Proxy({} as ReturnType<typeof createFallback>, {
|
export const Haiku35 = new Proxy({} as ReturnType<typeof createFallback>, {
|
||||||
get(_target, prop, receiver) {
|
get(_target, prop) {
|
||||||
const instance = initializeHaiku35();
|
const instance = initializeHaiku35();
|
||||||
return Reflect.get(instance, prop, receiver);
|
// Direct property access without receiver to avoid proxy conflicts
|
||||||
|
return instance[prop as keyof typeof instance];
|
||||||
},
|
},
|
||||||
has(_target, prop) {
|
has(_target, prop) {
|
||||||
const instance = initializeHaiku35();
|
const instance = initializeHaiku35();
|
||||||
return Reflect.has(instance, prop);
|
return prop in instance;
|
||||||
},
|
},
|
||||||
ownKeys(_target) {
|
ownKeys(_target) {
|
||||||
const instance = initializeHaiku35();
|
const instance = initializeHaiku35();
|
||||||
|
|
|
@ -55,13 +55,14 @@ function initializeSonnet4() {
|
||||||
|
|
||||||
// Export a proxy that initializes on first use
|
// Export a proxy that initializes on first use
|
||||||
export const Sonnet4 = new Proxy({} as ReturnType<typeof createFallback>, {
|
export const Sonnet4 = new Proxy({} as ReturnType<typeof createFallback>, {
|
||||||
get(_target, prop, receiver) {
|
get(_target, prop) {
|
||||||
const instance = initializeSonnet4();
|
const instance = initializeSonnet4();
|
||||||
return Reflect.get(instance, prop, receiver);
|
// Direct property access without receiver to avoid proxy conflicts
|
||||||
|
return instance[prop as keyof typeof instance];
|
||||||
},
|
},
|
||||||
has(_target, prop) {
|
has(_target, prop) {
|
||||||
const instance = initializeSonnet4();
|
const instance = initializeSonnet4();
|
||||||
return Reflect.has(instance, prop);
|
return prop in instance;
|
||||||
},
|
},
|
||||||
ownKeys(_target) {
|
ownKeys(_target) {
|
||||||
const instance = initializeSonnet4();
|
const instance = initializeSonnet4();
|
||||||
|
|
Loading…
Reference in New Issue