From b7eaccbe318fc37b5d9818c4a79d6e26e861c078 Mon Sep 17 00:00:00 2001 From: Krishav Raj Singh Date: Fri, 5 Sep 2025 10:55:21 +0530 Subject: [PATCH] fix stream --- .../src/components/thread/ThreadComponent.tsx | 17 ++++ frontend/src/hooks/useAgentStream.ts | 82 +++++++++++-------- 2 files changed, 67 insertions(+), 32 deletions(-) diff --git a/frontend/src/components/thread/ThreadComponent.tsx b/frontend/src/components/thread/ThreadComponent.tsx index ad2d2255..97f42d6d 100644 --- a/frontend/src/components/thread/ThreadComponent.tsx +++ b/frontend/src/components/thread/ThreadComponent.tsx @@ -98,6 +98,7 @@ export function ThreadComponent({ projectId, threadId, compact = false, configur const latestMessageRef = useRef(null); const initialLayoutAppliedRef = useRef(false); const scrollContainerRef = useRef(null); + const lastStreamStartedRef = useRef(null); // Track last runId we started streaming for // Sidebar const { state: leftSidebarState, setOpen: setLeftSidebarOpen } = useSidebar(); @@ -637,9 +638,16 @@ export function ThreadComponent({ projectId, threadId, compact = false, configur ]); useEffect(() => { + // Prevent duplicate streaming calls for the same runId + if (agentRunId && lastStreamStartedRef.current === agentRunId) { + return; + } + // Start streaming if user initiated a run (don't wait for initialLoadCompleted for first-time users) if (agentRunId && agentRunId !== currentHookRunId && userInitiatedRun) { + console.log(`[ThreadComponent] Starting user-initiated stream for runId: ${agentRunId}`); startStreaming(agentRunId); + lastStreamStartedRef.current = agentRunId; // Track that we started this runId setUserInitiatedRun(false); // Reset flag after starting return; } @@ -652,7 +660,9 @@ export function ThreadComponent({ projectId, threadId, compact = false, configur !userInitiatedRun && agentStatus === 'running' ) { + console.log(`[ThreadComponent] Starting auto stream for runId: ${agentRunId}`); startStreaming(agentRunId); + lastStreamStartedRef.current = agentRunId; // Track that we started this runId } }, [ agentRunId, @@ -673,9 +683,16 @@ export function ThreadComponent({ projectId, threadId, compact = false, configur ) { setAgentStatus('idle'); setAgentRunId(null); + // Reset the stream tracking ref when stream completes + lastStreamStartedRef.current = null; } }, [streamHookStatus, agentStatus, setAgentStatus, setAgentRunId]); + // Reset stream tracking ref when threadId changes + useEffect(() => { + lastStreamStartedRef.current = null; + }, [threadId]); + // SEO title update useEffect(() => { if (projectName) { diff --git a/frontend/src/hooks/useAgentStream.ts b/frontend/src/hooks/useAgentStream.ts index b194b266..fe40c1b5 100644 --- a/frontend/src/hooks/useAgentStream.ts +++ b/frontend/src/hooks/useAgentStream.ts @@ -626,51 +626,54 @@ export function useAgentStream( console.log(`[useAgentStream] Starting stream for run ID: ${runId}`); - // Clean up any previous stream - if (streamCleanupRef.current) { - console.log(`[useAgentStream] Cleaning up previous stream`); - streamCleanupRef.current(); - streamCleanupRef.current = null; - } - - // Reset state before starting - setTextContent([]); - setToolCall(null); - setError(null); - updateStatus('connecting'); - setAgentRunId(runId); - currentRunIdRef.current = runId; // Set the ref immediately + // Store previous stream cleanup for potential restoration + const previousCleanup = streamCleanupRef.current; + const previousRunId = currentRunIdRef.current; try { - // *** Crucial check: Verify agent is running BEFORE connecting *** + // *** Crucial check: Verify agent is running BEFORE cleaning up previous stream *** console.log(`[useAgentStream] Checking status for run ID: ${runId}`); const agentStatus = await getAgentStatus(runId); if (!isMountedRef.current) return; // Check mount status after async call - // Check if this is still the current run ID we're trying to start - if (currentRunIdRef.current !== runId) { - console.log( - `[useAgentStream] Run ID changed during status check, aborting stream for ${runId}`, - ); - return; - } - if (agentStatus.status !== 'running') { // Expected when opening an old conversation; don't surface as error/toast console.info( - `[useAgentStream] Skip streaming for inactive run ${runId} (status: ${agentStatus.status}).`, + `[useAgentStream] Stream not started for ${runId}: Agent run ${runId} is not running (status: ${agentStatus.status})`, ); - if (currentRunIdRef.current === runId) { - const final = - agentStatus.status === 'completed' || - agentStatus.status === 'stopped' - ? mapAgentStatus(agentStatus.status) - : 'agent_not_running'; + + // DON'T clean up the previous stream if this new one can't start + // Just finalize with the appropriate status but keep previous stream if it was working + const final = + agentStatus.status === 'completed' || + agentStatus.status === 'stopped' + ? mapAgentStatus(agentStatus.status) + : 'agent_not_running'; + + // Only finalize if we don't have a working previous stream + if (!previousRunId || previousRunId === runId) { finalizeStream(final, runId); + } else { + console.log(`[useAgentStream] Keeping previous stream ${previousRunId} active since new stream ${runId} can't start`); } return; } + // New agent is running, now it's safe to clean up previous stream + if (previousCleanup && previousRunId !== runId) { + console.log(`[useAgentStream] Cleaning up previous stream ${previousRunId} to start new stream ${runId}`); + previousCleanup(); + streamCleanupRef.current = null; + } + + // Reset state for the new stream + setTextContent([]); + setToolCall(null); + setError(null); + updateStatus('connecting'); + setAgentRunId(runId); + currentRunIdRef.current = runId; // Set the ref immediately + console.log( `[useAgentStream] Agent run ${runId} is running, creating stream`, ); @@ -743,13 +746,28 @@ export function useAgentStream( console.info( `[useAgentStream] Stream not started for ${runId}: ${errorMessage}`, ); - finalizeStream('agent_not_running', runId); + + // Similar logic - don't finalize if we have a working previous stream + if (!previousRunId || previousRunId === runId) { + finalizeStream('agent_not_running', runId); + } else { + console.log(`[useAgentStream] Keeping previous stream ${previousRunId} active since new stream ${runId} failed to start`); + } } else { console.error( `[useAgentStream] Error initiating stream for ${runId}: ${errorMessage}`, ); setError(errorMessage); - finalizeStream('error', runId); + + // For unexpected errors, still preserve previous stream if possible + if (!previousRunId || previousRunId === runId) { + finalizeStream('error', runId); + } else { + console.log(`[useAgentStream] Keeping previous stream ${previousRunId} active despite error starting new stream ${runId}`); + // Reset current run ID back to previous to maintain stream continuity + currentRunIdRef.current = previousRunId; + setAgentRunId(previousRunId); + } } } },