mirror of https://github.com/buster-so/buster.git
Merge pull request #1116 from buster-so/dallin-bus-1886-another-overloaded-error
Dallin-bus-1886-another-overloaded-error
This commit is contained in:
commit
a29e6463cf
|
@ -1,3 +1,4 @@
|
|||
import { randomUUID } from 'node:crypto';
|
||||
import { logger, schemaTask, tasks } from '@trigger.dev/sdk';
|
||||
import { currentSpan, initLogger, wrapTraced } from 'braintrust';
|
||||
import { analystQueue } from '../../queues/analyst-queue';
|
||||
|
@ -12,6 +13,8 @@ import {
|
|||
getOrganizationDataSource,
|
||||
getOrganizationDocs,
|
||||
getUserPersonalization,
|
||||
updateMessage,
|
||||
updateMessageEntries,
|
||||
} from '@buster/database/queries';
|
||||
|
||||
// Access control imports
|
||||
|
@ -134,11 +137,11 @@ class ResourceTracker {
|
|||
},
|
||||
cpuUsage: finalCpuUsage
|
||||
? {
|
||||
userTimeMs: Math.round(finalCpuUsage.user / 1000),
|
||||
systemTimeMs: Math.round(finalCpuUsage.system / 1000),
|
||||
totalTimeMs: Math.round(totalCpuTime / 1000),
|
||||
estimatedUsagePercent: Math.round(cpuPercentage * 100) / 100,
|
||||
}
|
||||
userTimeMs: Math.round(finalCpuUsage.user / 1000),
|
||||
systemTimeMs: Math.round(finalCpuUsage.system / 1000),
|
||||
totalTimeMs: Math.round(totalCpuTime / 1000),
|
||||
estimatedUsagePercent: Math.round(cpuPercentage * 100) / 100,
|
||||
}
|
||||
: { error: 'CPU usage not available' },
|
||||
stageBreakdown: this.snapshots.map((snapshot, index) => {
|
||||
const prevSnapshot = index > 0 ? this.snapshots[index - 1] : null;
|
||||
|
@ -249,6 +252,51 @@ export const analystAgentTask: ReturnType<
|
|||
schema: AnalystAgentTaskInputSchema,
|
||||
queue: analystQueue,
|
||||
maxDuration: 1200, // 20 minutes for complex analysis
|
||||
retry: {
|
||||
maxAttempts: 0
|
||||
},
|
||||
onFailure: async ({ error, payload }) => {
|
||||
// Log the failure for debugging
|
||||
logger.error('Analyst agent task failed - executing onFailure handler', {
|
||||
messageId: payload.message_id,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
errorType: error instanceof Error ? error.name : typeof error,
|
||||
});
|
||||
|
||||
try {
|
||||
// Add error message to user and mark as complete
|
||||
const errorResponseId = randomUUID();
|
||||
|
||||
// Add the error response message
|
||||
await updateMessageEntries({
|
||||
messageId: payload.message_id,
|
||||
responseMessages: [
|
||||
{
|
||||
id: errorResponseId,
|
||||
type: 'text',
|
||||
message: "I'm sorry, I ran into a technical error. Please try your request again.",
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
// Mark the message as complete with final reasoning
|
||||
await updateMessage(payload.message_id, {
|
||||
isCompleted: true,
|
||||
finalReasoningMessage: 'Finished reasoning.',
|
||||
});
|
||||
|
||||
logger.log('Error response added and message marked complete', {
|
||||
messageId: payload.message_id,
|
||||
errorResponseId,
|
||||
});
|
||||
} catch (handlerError) {
|
||||
// Log but don't throw - onFailure should never throw
|
||||
logger.error('Failed to update message in onFailure handler', {
|
||||
messageId: payload.message_id,
|
||||
error: handlerError instanceof Error ? handlerError.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
},
|
||||
run: async (payload): Promise<AnalystAgentTaskOutput> => {
|
||||
const taskStartTime = Date.now();
|
||||
const resourceTracker = new ResourceTracker();
|
||||
|
@ -376,12 +424,12 @@ export const analystAgentTask: ReturnType<
|
|||
conversationHistory.length > 0
|
||||
? conversationHistory
|
||||
: [
|
||||
{
|
||||
role: 'user',
|
||||
// v5 supports string content directly for user messages
|
||||
content: messageContext.requestMessage,
|
||||
},
|
||||
];
|
||||
{
|
||||
role: 'user',
|
||||
// v5 supports string content directly for user messages
|
||||
content: messageContext.requestMessage,
|
||||
},
|
||||
];
|
||||
|
||||
const workflowInput: AnalystWorkflowInput = {
|
||||
messages: modelMessages,
|
||||
|
@ -435,7 +483,28 @@ export const analystAgentTask: ReturnType<
|
|||
}
|
||||
);
|
||||
|
||||
await tracedWorkflow();
|
||||
// Wrap workflow execution to capture and log errors before re-throwing
|
||||
try {
|
||||
await tracedWorkflow();
|
||||
} catch (workflowError) {
|
||||
// Log workflow-specific error details
|
||||
logger.error('Analyst workflow execution failed', {
|
||||
messageId: payload.message_id,
|
||||
error: workflowError instanceof Error ? workflowError.message : 'Unknown error',
|
||||
errorType: workflowError instanceof Error ? workflowError.name : typeof workflowError,
|
||||
stack: workflowError instanceof Error ? workflowError.stack : undefined,
|
||||
workflowInput: {
|
||||
messageCount: workflowInput.messages.length,
|
||||
datasetsCount: datasets.length,
|
||||
hasAnalystInstructions: !!analystInstructions,
|
||||
organizationDocsCount: organizationDocs.length,
|
||||
},
|
||||
});
|
||||
|
||||
// Re-throw to let Trigger handle the retry
|
||||
throw workflowError;
|
||||
}
|
||||
|
||||
const totalWorkflowTime = Date.now() - workflowExecutionStart;
|
||||
|
||||
logger.log('Analyst workflow completed successfully', {
|
||||
|
@ -509,27 +578,20 @@ export const analystAgentTask: ReturnType<
|
|||
logPerformanceMetrics('task-error', payload.message_id, taskStartTime, resourceTracker);
|
||||
resourceTracker.generateReport(payload.message_id);
|
||||
|
||||
logger.error('Task execution failed', {
|
||||
logger.error('Task execution failed - will retry', {
|
||||
messageId: payload.message_id,
|
||||
error: errorMessage,
|
||||
errorType: error instanceof Error ? error.name : typeof error,
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
executionTimeMs: totalExecutionTime,
|
||||
errorCode: getErrorCode(error),
|
||||
});
|
||||
|
||||
// Need to flush the Braintrust logger to ensure all traces are sent
|
||||
await braintrustLogger.flush();
|
||||
|
||||
return {
|
||||
success: false,
|
||||
messageId: payload.message_id,
|
||||
error: {
|
||||
code: getErrorCode(error),
|
||||
message: errorMessage,
|
||||
details: {
|
||||
operation: 'analyst_agent_task_execution',
|
||||
messageId: payload.message_id,
|
||||
},
|
||||
},
|
||||
};
|
||||
// Re-throw the error so Trigger knows the task failed and should retry
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
});
|
||||
|
|
|
@ -15,7 +15,6 @@ export {
|
|||
recoverMessages,
|
||||
executeStreamAttempt,
|
||||
handleFailedAttempt,
|
||||
analyzeError,
|
||||
createRetryExecutor,
|
||||
composeMiddleware,
|
||||
retryMiddleware,
|
||||
|
|
|
@ -2,7 +2,6 @@ import type { ModelMessage } from 'ai';
|
|||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import {
|
||||
type StreamExecutor,
|
||||
analyzeError,
|
||||
calculateBackoffDelay,
|
||||
composeMiddleware,
|
||||
createMockAgent,
|
||||
|
@ -84,22 +83,6 @@ describe('with-agent-retry', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('analyzeError', () => {
|
||||
it('should correctly identify retryable errors', () => {
|
||||
const overloadedError = createOverloadedError();
|
||||
const result = analyzeError(overloadedError);
|
||||
expect(result.isRetryable).toBe(true);
|
||||
expect(result.error).toEqual(overloadedError);
|
||||
});
|
||||
|
||||
it('should treat all errors as retryable', () => {
|
||||
const regularError = new Error('Regular error');
|
||||
const result = analyzeError(regularError);
|
||||
expect(result.isRetryable).toBe(true);
|
||||
expect(result.error).toEqual(regularError);
|
||||
});
|
||||
});
|
||||
|
||||
describe('sleep', () => {
|
||||
it('should resolve after specified duration', async () => {
|
||||
const startTime = Date.now();
|
||||
|
@ -238,19 +221,23 @@ describe('with-agent-retry', () => {
|
|||
expect(mockFetchMessageEntries).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not retry when recovery fails', async () => {
|
||||
it('should continue with original messages when recovery fails', async () => {
|
||||
mockFetchMessageEntries.mockRejectedValue(new Error('DB error'));
|
||||
const originalMessages: ModelMessage[] = [{ role: 'user', content: 'original' }];
|
||||
|
||||
const result = await handleFailedAttempt(
|
||||
createOverloadedError(),
|
||||
1,
|
||||
3,
|
||||
'test-id',
|
||||
[],
|
||||
originalMessages,
|
||||
1000
|
||||
);
|
||||
|
||||
expect(result.shouldRetry).toBe(false);
|
||||
// We now continue with original messages when recovery fails
|
||||
expect(result.shouldRetry).toBe(true);
|
||||
expect(result.nextMessages).toEqual(originalMessages);
|
||||
expect(result.delayMs).toBe(1000);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -38,14 +38,6 @@ interface RetryOptions {
|
|||
onRetry?: (attempt: number, recoveredMessageCount: number) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Result of checking if an error is retryable
|
||||
*/
|
||||
interface RetryableCheck {
|
||||
isRetryable: boolean;
|
||||
error: unknown;
|
||||
}
|
||||
|
||||
// ===== Pure Functions =====
|
||||
|
||||
/**
|
||||
|
@ -96,16 +88,6 @@ export const calculateBackoffDelay = (attempt: number, baseDelayMs: number): num
|
|||
export const sleep = (ms: number): Promise<void> =>
|
||||
new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
/**
|
||||
* Check if an error is retryable and return structured result
|
||||
* Now treats ALL errors as retryable to handle various provider errors
|
||||
* Pure function for error analysis
|
||||
*/
|
||||
export const analyzeError = (error: unknown): RetryableCheck => ({
|
||||
isRetryable: true, // All errors are now retryable
|
||||
error,
|
||||
});
|
||||
|
||||
/**
|
||||
* Recover messages from database
|
||||
* Returns either recovered messages or original messages
|
||||
|
@ -137,6 +119,8 @@ export const recoverMessages = async (
|
|||
console.error('[Agent Retry] Failed to recover from database', {
|
||||
messageId,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
errorType: error instanceof Error ? error.name : typeof error,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
|
@ -177,12 +161,12 @@ export const handleFailedAttempt = async (
|
|||
messageId,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
errorType: error instanceof Error ? error.name : typeof error,
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
});
|
||||
|
||||
const { isRetryable } = analyzeError(error);
|
||||
|
||||
if (!isRetryable || attempt === maxAttempts) {
|
||||
console.error('[Agent Retry] Non-retryable error or max attempts reached', {
|
||||
// Check if we've reached max attempts
|
||||
if (attempt === maxAttempts) {
|
||||
console.error('[Agent Retry] Max attempts reached', {
|
||||
messageId,
|
||||
attempt,
|
||||
maxAttempts,
|
||||
|
@ -190,7 +174,7 @@ export const handleFailedAttempt = async (
|
|||
return { shouldRetry: false, nextMessages: currentMessages, delayMs: 0 };
|
||||
}
|
||||
|
||||
console.warn('[Agent Retry] Error detected, preparing retry', {
|
||||
console.warn('[Agent Retry] Preparing retry', {
|
||||
messageId,
|
||||
attempt,
|
||||
remainingAttempts: maxAttempts - attempt,
|
||||
|
@ -215,9 +199,30 @@ export const handleFailedAttempt = async (
|
|||
nextMessages: recoveredMessages,
|
||||
delayMs,
|
||||
};
|
||||
} catch (_recoveryError) {
|
||||
// If recovery fails, don't retry
|
||||
return { shouldRetry: false, nextMessages: currentMessages, delayMs: 0 };
|
||||
} catch (recoveryError) {
|
||||
// Log the recovery failure with full context
|
||||
console.error('[Agent Retry] Failed to recover messages from database', {
|
||||
messageId,
|
||||
attempt,
|
||||
recoveryError: recoveryError instanceof Error ? recoveryError.message : 'Unknown error',
|
||||
recoveryErrorType: recoveryError instanceof Error ? recoveryError.name : typeof recoveryError,
|
||||
recoveryStack: recoveryError instanceof Error ? recoveryError.stack : undefined,
|
||||
originalError: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
|
||||
// Continue with original messages if recovery fails
|
||||
console.warn('[Agent Retry] Continuing with original messages after recovery failure', {
|
||||
messageId,
|
||||
messageCount: currentMessages.length,
|
||||
});
|
||||
|
||||
const delayMs = calculateBackoffDelay(attempt, baseDelayMs);
|
||||
|
||||
return {
|
||||
shouldRetry: true,
|
||||
nextMessages: currentMessages,
|
||||
delayMs,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -271,19 +276,13 @@ export function withAgentRetry<
|
|||
TStreamResult = unknown,
|
||||
TAgent extends Agent<TStreamResult> = Agent<TStreamResult>,
|
||||
>(agent: TAgent, options: RetryOptions): TAgent {
|
||||
// Create a new object with the same prototype
|
||||
const wrappedAgent = Object.create(Object.getPrototypeOf(agent)) as TAgent;
|
||||
|
||||
// Copy all properties except stream
|
||||
for (const key in agent) {
|
||||
if (key !== 'stream' && Object.prototype.hasOwnProperty.call(agent, key)) {
|
||||
wrappedAgent[key] = agent[key];
|
||||
}
|
||||
}
|
||||
|
||||
// Wrap the stream method with retry logic
|
||||
wrappedAgent.stream = (streamOptions: StreamOptions) =>
|
||||
retryStream(agent, streamOptions.messages, options);
|
||||
// Create a new agent with all properties spread from the original
|
||||
// This ensures type safety and copies all properties correctly
|
||||
const wrappedAgent = {
|
||||
...agent,
|
||||
// Override the stream method with retry logic
|
||||
stream: (streamOptions: StreamOptions) => retryStream(agent, streamOptions.messages, options),
|
||||
};
|
||||
|
||||
return wrappedAgent;
|
||||
}
|
||||
|
@ -305,7 +304,7 @@ export const createRetryExecutor = <TStreamResult>(
|
|||
): StreamExecutor<TStreamResult> => {
|
||||
return async (messages: ModelMessage[]) => {
|
||||
const agent: Agent<TStreamResult> = {
|
||||
stream: async ({ messages }) => executor(messages),
|
||||
stream: async (streamOptions: StreamOptions) => executor(streamOptions.messages),
|
||||
};
|
||||
return retryStream(agent, messages, options);
|
||||
};
|
||||
|
|
|
@ -46,6 +46,7 @@ export async function withStepRetry<T>(
|
|||
console.error(`[${stepName}] Error on attempt ${attempt}:`, {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
errorType: error instanceof Error ? error.name : typeof error,
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
});
|
||||
|
||||
// If this was the last attempt, throw the error
|
||||
|
|
Loading…
Reference in New Issue