fix: prevent EventSource memory leaks in frontend API

- Added consistent EventSource cleanup helper function
- Fixed missing cleanup in error handlers that could cause memory leaks
- Added safety timeout (30min) to prevent indefinite connections
- Exported cleanup function for external use during app teardown
- Improved error logging and connection state tracking

Fixes the critical memory leak where EventSource connections weren't
properly closed in all error scenarios, particularly in the catch
blocks of the onerror handler.
This commit is contained in:
Prem Prakash Sharma 2025-09-05 23:18:26 +05:30
parent 11fef9c1d7
commit a6ef884db0
1 changed files with 73 additions and 18 deletions

View File

@ -9,6 +9,52 @@ const API_URL = process.env.NEXT_PUBLIC_BACKEND_URL || '';
const nonRunningAgentRuns = new Set<string>();
// Map to keep track of active EventSource streams
const activeStreams = new Map<string, EventSource>();
// Map to keep track of safety timeouts
const safetyTimeouts = new Map<string, number>();
/**
* Helper function to safely cleanup EventSource connections
* This ensures consistent cleanup and prevents memory leaks
*/
const cleanupEventSource = (agentRunId: string, reason?: string): void => {
const stream = activeStreams.get(agentRunId);
if (stream) {
if (reason) {
console.log(`[STREAM] Cleaning up EventSource for ${agentRunId}: ${reason}`);
}
// Close the connection
if (stream.readyState !== EventSource.CLOSED) {
stream.close();
}
// Remove from active streams
activeStreams.delete(agentRunId);
}
// Clear any associated safety timeout
const timeout = safetyTimeouts.get(agentRunId);
if (timeout) {
clearTimeout(timeout);
safetyTimeouts.delete(agentRunId);
}
};
/**
* Failsafe cleanup function to prevent memory leaks
* Should be called periodically or during app teardown
*/
const cleanupAllEventSources = (reason = 'batch cleanup'): void => {
console.log(`[STREAM] Running batch cleanup: ${activeStreams.size} active streams`);
const streamIds = Array.from(activeStreams.keys());
streamIds.forEach(agentRunId => {
cleanupEventSource(agentRunId, reason);
});
};
// Export cleanup function for external use
export { cleanupAllEventSources };
// Custom error for billing issues
export class BillingError extends Error {
@ -931,8 +977,7 @@ export const streamAgent = (
const existingStream = activeStreams.get(agentRunId);
if (existingStream) {
existingStream.close();
activeStreams.delete(agentRunId);
cleanupEventSource(agentRunId, 'replacing existing stream');
}
try {
@ -982,7 +1027,19 @@ export const streamAgent = (
activeStreams.set(agentRunId, eventSource);
// Safety timeout to prevent indefinite connections (30 minutes)
const safetyTimeout = setTimeout(() => {
console.warn(`[STREAM] Safety timeout reached for ${agentRunId}, cleaning up`);
cleanupEventSource(agentRunId, 'safety timeout');
callbacks.onError('Connection timeout - stream has been running too long');
callbacks.onClose();
}, 30 * 60 * 1000); // 30 minutes
// Store the timeout ID for cleanup
safetyTimeouts.set(agentRunId, safetyTimeout as unknown as number);
eventSource.onopen = () => {
console.log(`[STREAM] EventSource opened for ${agentRunId}`);
};
eventSource.onmessage = (event) => {
@ -1023,8 +1080,7 @@ export const streamAgent = (
callbacks.onError('Agent run not found in active runs');
// Clean up
eventSource.close();
activeStreams.delete(agentRunId);
cleanupEventSource(agentRunId, 'agent run not found');
callbacks.onClose();
return;
@ -1045,8 +1101,7 @@ export const streamAgent = (
callbacks.onMessage(rawData);
// Clean up
eventSource.close();
activeStreams.delete(agentRunId);
cleanupEventSource(agentRunId, 'agent run completed');
callbacks.onClose();
return;
@ -1071,13 +1126,14 @@ export const streamAgent = (
};
eventSource.onerror = (event) => {
console.error(`[STREAM] EventSource error for ${agentRunId}:`, event);
// Check if the agent is still running
getAgentStatus(agentRunId)
.then((status) => {
if (status.status !== 'running') {
nonRunningAgentRuns.add(agentRunId);
eventSource.close();
activeStreams.delete(agentRunId);
cleanupEventSource(agentRunId, 'agent not running');
callbacks.onClose();
} else {
// Let the browser handle reconnection for non-fatal errors
@ -1098,13 +1154,16 @@ export const streamAgent = (
if (isNotFoundErr) {
nonRunningAgentRuns.add(agentRunId);
eventSource.close();
activeStreams.delete(agentRunId);
cleanupEventSource(agentRunId, 'agent not found');
callbacks.onClose();
} else {
// For other errors, still clean up the stream to prevent memory leaks
// but don't add to nonRunningAgentRuns as it might be a temporary network issue
console.warn(`[STREAM] Cleaning up stream for ${agentRunId} due to persistent error`);
cleanupEventSource(agentRunId, 'persistent error');
callbacks.onError(errMsg);
callbacks.onClose();
}
// For other errors, notify but don't close the stream
callbacks.onError(errMsg);
});
};
};
@ -1114,11 +1173,7 @@ export const streamAgent = (
// Return a cleanup function
return () => {
const stream = activeStreams.get(agentRunId);
if (stream) {
stream.close();
activeStreams.delete(agentRunId);
}
cleanupEventSource(agentRunId, 'manual cleanup');
};
} catch (error) {
console.error(`[STREAM] Error setting up stream for ${agentRunId}:`, error);