mirror of https://github.com/buster-so/buster.git
better error handling and logging on streams
This commit is contained in:
parent
0a45fbd1d8
commit
8ff67d62f2
|
@ -15,7 +15,6 @@ export {
|
||||||
recoverMessages,
|
recoverMessages,
|
||||||
executeStreamAttempt,
|
executeStreamAttempt,
|
||||||
handleFailedAttempt,
|
handleFailedAttempt,
|
||||||
analyzeError,
|
|
||||||
createRetryExecutor,
|
createRetryExecutor,
|
||||||
composeMiddleware,
|
composeMiddleware,
|
||||||
retryMiddleware,
|
retryMiddleware,
|
||||||
|
|
|
@ -2,7 +2,6 @@ import type { ModelMessage } from 'ai';
|
||||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||||
import {
|
import {
|
||||||
type StreamExecutor,
|
type StreamExecutor,
|
||||||
analyzeError,
|
|
||||||
calculateBackoffDelay,
|
calculateBackoffDelay,
|
||||||
composeMiddleware,
|
composeMiddleware,
|
||||||
createMockAgent,
|
createMockAgent,
|
||||||
|
@ -84,22 +83,6 @@ describe('with-agent-retry', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('analyzeError', () => {
|
|
||||||
it('should correctly identify retryable errors', () => {
|
|
||||||
const overloadedError = createOverloadedError();
|
|
||||||
const result = analyzeError(overloadedError);
|
|
||||||
expect(result.isRetryable).toBe(true);
|
|
||||||
expect(result.error).toEqual(overloadedError);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should treat all errors as retryable', () => {
|
|
||||||
const regularError = new Error('Regular error');
|
|
||||||
const result = analyzeError(regularError);
|
|
||||||
expect(result.isRetryable).toBe(true);
|
|
||||||
expect(result.error).toEqual(regularError);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('sleep', () => {
|
describe('sleep', () => {
|
||||||
it('should resolve after specified duration', async () => {
|
it('should resolve after specified duration', async () => {
|
||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
|
@ -238,19 +221,23 @@ describe('with-agent-retry', () => {
|
||||||
expect(mockFetchMessageEntries).not.toHaveBeenCalled();
|
expect(mockFetchMessageEntries).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should not retry when recovery fails', async () => {
|
it('should continue with original messages when recovery fails', async () => {
|
||||||
mockFetchMessageEntries.mockRejectedValue(new Error('DB error'));
|
mockFetchMessageEntries.mockRejectedValue(new Error('DB error'));
|
||||||
|
const originalMessages: ModelMessage[] = [{ role: 'user', content: 'original' }];
|
||||||
|
|
||||||
const result = await handleFailedAttempt(
|
const result = await handleFailedAttempt(
|
||||||
createOverloadedError(),
|
createOverloadedError(),
|
||||||
1,
|
1,
|
||||||
3,
|
3,
|
||||||
'test-id',
|
'test-id',
|
||||||
[],
|
originalMessages,
|
||||||
1000
|
1000
|
||||||
);
|
);
|
||||||
|
|
||||||
expect(result.shouldRetry).toBe(false);
|
// We now continue with original messages when recovery fails
|
||||||
|
expect(result.shouldRetry).toBe(true);
|
||||||
|
expect(result.nextMessages).toEqual(originalMessages);
|
||||||
|
expect(result.delayMs).toBe(1000);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -38,14 +38,6 @@ interface RetryOptions {
|
||||||
onRetry?: (attempt: number, recoveredMessageCount: number) => void;
|
onRetry?: (attempt: number, recoveredMessageCount: number) => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Result of checking if an error is retryable
|
|
||||||
*/
|
|
||||||
interface RetryableCheck {
|
|
||||||
isRetryable: boolean;
|
|
||||||
error: unknown;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== Pure Functions =====
|
// ===== Pure Functions =====
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -96,16 +88,6 @@ export const calculateBackoffDelay = (attempt: number, baseDelayMs: number): num
|
||||||
export const sleep = (ms: number): Promise<void> =>
|
export const sleep = (ms: number): Promise<void> =>
|
||||||
new Promise((resolve) => setTimeout(resolve, ms));
|
new Promise((resolve) => setTimeout(resolve, ms));
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if an error is retryable and return structured result
|
|
||||||
* Now treats ALL errors as retryable to handle various provider errors
|
|
||||||
* Pure function for error analysis
|
|
||||||
*/
|
|
||||||
export const analyzeError = (error: unknown): RetryableCheck => ({
|
|
||||||
isRetryable: true, // All errors are now retryable
|
|
||||||
error,
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recover messages from database
|
* Recover messages from database
|
||||||
* Returns either recovered messages or original messages
|
* Returns either recovered messages or original messages
|
||||||
|
@ -137,6 +119,8 @@ export const recoverMessages = async (
|
||||||
console.error('[Agent Retry] Failed to recover from database', {
|
console.error('[Agent Retry] Failed to recover from database', {
|
||||||
messageId,
|
messageId,
|
||||||
error: error instanceof Error ? error.message : 'Unknown error',
|
error: error instanceof Error ? error.message : 'Unknown error',
|
||||||
|
stack: error instanceof Error ? error.stack : undefined,
|
||||||
|
errorType: error instanceof Error ? error.name : typeof error,
|
||||||
});
|
});
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
@ -177,12 +161,12 @@ export const handleFailedAttempt = async (
|
||||||
messageId,
|
messageId,
|
||||||
error: error instanceof Error ? error.message : 'Unknown error',
|
error: error instanceof Error ? error.message : 'Unknown error',
|
||||||
errorType: error instanceof Error ? error.name : typeof error,
|
errorType: error instanceof Error ? error.name : typeof error,
|
||||||
|
stack: error instanceof Error ? error.stack : undefined,
|
||||||
});
|
});
|
||||||
|
|
||||||
const { isRetryable } = analyzeError(error);
|
// Check if we've reached max attempts
|
||||||
|
if (attempt === maxAttempts) {
|
||||||
if (!isRetryable || attempt === maxAttempts) {
|
console.error('[Agent Retry] Max attempts reached', {
|
||||||
console.error('[Agent Retry] Non-retryable error or max attempts reached', {
|
|
||||||
messageId,
|
messageId,
|
||||||
attempt,
|
attempt,
|
||||||
maxAttempts,
|
maxAttempts,
|
||||||
|
@ -190,7 +174,7 @@ export const handleFailedAttempt = async (
|
||||||
return { shouldRetry: false, nextMessages: currentMessages, delayMs: 0 };
|
return { shouldRetry: false, nextMessages: currentMessages, delayMs: 0 };
|
||||||
}
|
}
|
||||||
|
|
||||||
console.warn('[Agent Retry] Error detected, preparing retry', {
|
console.warn('[Agent Retry] Preparing retry', {
|
||||||
messageId,
|
messageId,
|
||||||
attempt,
|
attempt,
|
||||||
remainingAttempts: maxAttempts - attempt,
|
remainingAttempts: maxAttempts - attempt,
|
||||||
|
@ -215,9 +199,30 @@ export const handleFailedAttempt = async (
|
||||||
nextMessages: recoveredMessages,
|
nextMessages: recoveredMessages,
|
||||||
delayMs,
|
delayMs,
|
||||||
};
|
};
|
||||||
} catch (_recoveryError) {
|
} catch (recoveryError) {
|
||||||
// If recovery fails, don't retry
|
// Log the recovery failure with full context
|
||||||
return { shouldRetry: false, nextMessages: currentMessages, delayMs: 0 };
|
console.error('[Agent Retry] Failed to recover messages from database', {
|
||||||
|
messageId,
|
||||||
|
attempt,
|
||||||
|
recoveryError: recoveryError instanceof Error ? recoveryError.message : 'Unknown error',
|
||||||
|
recoveryErrorType: recoveryError instanceof Error ? recoveryError.name : typeof recoveryError,
|
||||||
|
recoveryStack: recoveryError instanceof Error ? recoveryError.stack : undefined,
|
||||||
|
originalError: error instanceof Error ? error.message : 'Unknown error',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Continue with original messages if recovery fails
|
||||||
|
console.warn('[Agent Retry] Continuing with original messages after recovery failure', {
|
||||||
|
messageId,
|
||||||
|
messageCount: currentMessages.length,
|
||||||
|
});
|
||||||
|
|
||||||
|
const delayMs = calculateBackoffDelay(attempt, baseDelayMs);
|
||||||
|
|
||||||
|
return {
|
||||||
|
shouldRetry: true,
|
||||||
|
nextMessages: currentMessages,
|
||||||
|
delayMs,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -271,19 +276,13 @@ export function withAgentRetry<
|
||||||
TStreamResult = unknown,
|
TStreamResult = unknown,
|
||||||
TAgent extends Agent<TStreamResult> = Agent<TStreamResult>,
|
TAgent extends Agent<TStreamResult> = Agent<TStreamResult>,
|
||||||
>(agent: TAgent, options: RetryOptions): TAgent {
|
>(agent: TAgent, options: RetryOptions): TAgent {
|
||||||
// Create a new object with the same prototype
|
// Create a new agent with all properties spread from the original
|
||||||
const wrappedAgent = Object.create(Object.getPrototypeOf(agent)) as TAgent;
|
// This ensures type safety and copies all properties correctly
|
||||||
|
const wrappedAgent = {
|
||||||
// Copy all properties except stream
|
...agent,
|
||||||
for (const key in agent) {
|
// Override the stream method with retry logic
|
||||||
if (key !== 'stream' && Object.prototype.hasOwnProperty.call(agent, key)) {
|
stream: (streamOptions: StreamOptions) => retryStream(agent, streamOptions.messages, options),
|
||||||
wrappedAgent[key] = agent[key];
|
};
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wrap the stream method with retry logic
|
|
||||||
wrappedAgent.stream = (streamOptions: StreamOptions) =>
|
|
||||||
retryStream(agent, streamOptions.messages, options);
|
|
||||||
|
|
||||||
return wrappedAgent;
|
return wrappedAgent;
|
||||||
}
|
}
|
||||||
|
@ -305,7 +304,7 @@ export const createRetryExecutor = <TStreamResult>(
|
||||||
): StreamExecutor<TStreamResult> => {
|
): StreamExecutor<TStreamResult> => {
|
||||||
return async (messages: ModelMessage[]) => {
|
return async (messages: ModelMessage[]) => {
|
||||||
const agent: Agent<TStreamResult> = {
|
const agent: Agent<TStreamResult> = {
|
||||||
stream: async ({ messages }) => executor(messages),
|
stream: async (streamOptions: StreamOptions) => executor(streamOptions.messages),
|
||||||
};
|
};
|
||||||
return retryStream(agent, messages, options);
|
return retryStream(agent, messages, options);
|
||||||
};
|
};
|
||||||
|
|
|
@ -46,6 +46,7 @@ export async function withStepRetry<T>(
|
||||||
console.error(`[${stepName}] Error on attempt ${attempt}:`, {
|
console.error(`[${stepName}] Error on attempt ${attempt}:`, {
|
||||||
error: error instanceof Error ? error.message : 'Unknown error',
|
error: error instanceof Error ? error.message : 'Unknown error',
|
||||||
errorType: error instanceof Error ? error.name : typeof error,
|
errorType: error instanceof Error ? error.name : typeof error,
|
||||||
|
stack: error instanceof Error ? error.stack : undefined,
|
||||||
});
|
});
|
||||||
|
|
||||||
// If this was the last attempt, throw the error
|
// If this was the last attempt, throw the error
|
||||||
|
|
Loading…
Reference in New Issue