diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 5a169c83..0ad86eb1 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -9,6 +9,52 @@ const API_URL = process.env.NEXT_PUBLIC_BACKEND_URL || ''; const nonRunningAgentRuns = new Set(); // Map to keep track of active EventSource streams const activeStreams = new Map(); +// Map to keep track of safety timeouts +const safetyTimeouts = new Map(); + +/** + * Helper function to safely cleanup EventSource connections + * This ensures consistent cleanup and prevents memory leaks + */ +const cleanupEventSource = (agentRunId: string, reason?: string): void => { + const stream = activeStreams.get(agentRunId); + if (stream) { + if (reason) { + console.log(`[STREAM] Cleaning up EventSource for ${agentRunId}: ${reason}`); + } + + // Close the connection + if (stream.readyState !== EventSource.CLOSED) { + stream.close(); + } + + // Remove from active streams + activeStreams.delete(agentRunId); + } + + // Clear any associated safety timeout + const timeout = safetyTimeouts.get(agentRunId); + if (timeout) { + clearTimeout(timeout); + safetyTimeouts.delete(agentRunId); + } +}; + +/** + * Failsafe cleanup function to prevent memory leaks + * Should be called periodically or during app teardown + */ +const cleanupAllEventSources = (reason = 'batch cleanup'): void => { + console.log(`[STREAM] Running batch cleanup: ${activeStreams.size} active streams`); + + const streamIds = Array.from(activeStreams.keys()); + streamIds.forEach(agentRunId => { + cleanupEventSource(agentRunId, reason); + }); +}; + +// Export cleanup function for external use +export { cleanupAllEventSources }; // Custom error for billing issues export class BillingError extends Error { @@ -931,8 +977,7 @@ export const streamAgent = ( const existingStream = activeStreams.get(agentRunId); if (existingStream) { - existingStream.close(); - activeStreams.delete(agentRunId); + cleanupEventSource(agentRunId, 'replacing existing stream'); } try { @@ -982,7 +1027,19 @@ export const streamAgent = ( activeStreams.set(agentRunId, eventSource); + // Safety timeout to prevent indefinite connections (30 minutes) + const safetyTimeout = setTimeout(() => { + console.warn(`[STREAM] Safety timeout reached for ${agentRunId}, cleaning up`); + cleanupEventSource(agentRunId, 'safety timeout'); + callbacks.onError('Connection timeout - stream has been running too long'); + callbacks.onClose(); + }, 30 * 60 * 1000); // 30 minutes + + // Store the timeout ID for cleanup + safetyTimeouts.set(agentRunId, safetyTimeout as unknown as number); + eventSource.onopen = () => { + console.log(`[STREAM] EventSource opened for ${agentRunId}`); }; eventSource.onmessage = (event) => { @@ -1023,8 +1080,7 @@ export const streamAgent = ( callbacks.onError('Agent run not found in active runs'); // Clean up - eventSource.close(); - activeStreams.delete(agentRunId); + cleanupEventSource(agentRunId, 'agent run not found'); callbacks.onClose(); return; @@ -1045,8 +1101,7 @@ export const streamAgent = ( callbacks.onMessage(rawData); // Clean up - eventSource.close(); - activeStreams.delete(agentRunId); + cleanupEventSource(agentRunId, 'agent run completed'); callbacks.onClose(); return; @@ -1071,13 +1126,14 @@ export const streamAgent = ( }; eventSource.onerror = (event) => { + console.error(`[STREAM] EventSource error for ${agentRunId}:`, event); + // Check if the agent is still running getAgentStatus(agentRunId) .then((status) => { if (status.status !== 'running') { nonRunningAgentRuns.add(agentRunId); - eventSource.close(); - activeStreams.delete(agentRunId); + cleanupEventSource(agentRunId, 'agent not running'); callbacks.onClose(); } else { // Let the browser handle reconnection for non-fatal errors @@ -1098,13 +1154,16 @@ export const streamAgent = ( if (isNotFoundErr) { nonRunningAgentRuns.add(agentRunId); - eventSource.close(); - activeStreams.delete(agentRunId); + cleanupEventSource(agentRunId, 'agent not found'); + callbacks.onClose(); + } else { + // For other errors, still clean up the stream to prevent memory leaks + // but don't add to nonRunningAgentRuns as it might be a temporary network issue + console.warn(`[STREAM] Cleaning up stream for ${agentRunId} due to persistent error`); + cleanupEventSource(agentRunId, 'persistent error'); + callbacks.onError(errMsg); callbacks.onClose(); } - - // For other errors, notify but don't close the stream - callbacks.onError(errMsg); }); }; }; @@ -1114,11 +1173,7 @@ export const streamAgent = ( // Return a cleanup function return () => { - const stream = activeStreams.get(agentRunId); - if (stream) { - stream.close(); - activeStreams.delete(agentRunId); - } + cleanupEventSource(agentRunId, 'manual cleanup'); }; } catch (error) { console.error(`[STREAM] Error setting up stream for ${agentRunId}:`, error);