diff --git a/frontend/src/app/dashboard/agents/[threadId]/page.tsx b/frontend/src/app/dashboard/agents/[threadId]/page.tsx index cdd76c4b..4d5ed3c8 100644 --- a/frontend/src/app/dashboard/agents/[threadId]/page.tsx +++ b/frontend/src/app/dashboard/agents/[threadId]/page.tsx @@ -7,7 +7,7 @@ import { Button } from '@/components/ui/button'; import { ArrowDown, CheckCircle, CircleDashed, AlertTriangle, Info } from 'lucide-react'; -import { addUserMessage, getMessages, startAgent, stopAgent, getAgentStatus, streamAgent, getAgentRuns, getProject, getThread, updateProject, Project, Message as BaseApiMessageType } from '@/lib/api'; +import { addUserMessage, getMessages, startAgent, stopAgent, getAgentRuns, getProject, getThread, updateProject, Project, Message as BaseApiMessageType } from '@/lib/api'; import { toast } from 'sonner'; import { Skeleton } from "@/components/ui/skeleton"; import { ChatInput } from '@/components/thread/chat-input'; @@ -15,6 +15,7 @@ import { FileViewerModal } from '@/components/thread/file-viewer-modal'; import { SiteHeader } from "@/components/thread/thread-site-header" import { ToolCallSidePanel, SidePanelContent, ToolCallData } from "@/components/thread/tool-call-side-panel"; import { useSidebar } from "@/components/ui/sidebar"; +import { useAgentStream, AgentStreamStatus } from '@/hooks/useAgentStream'; import { UnifiedMessage, ParsedContent, ParsedMetadata, ThreadParams } from '@/components/thread/types'; import { getToolIcon, extractPrimaryParam, safeJsonParse } from '@/components/thread/utils'; @@ -40,14 +41,10 @@ export default function ThreadPage({ params }: { params: Promise } const [isSending, setIsSending] = useState(false); const [error, setError] = useState(null); const [agentRunId, setAgentRunId] = useState(null); - const [agentStatus, setAgentStatus] = useState<'idle' | 'running'>('idle'); - const [isStreaming, setIsStreaming] = useState(false); - const [streamingTextContent, setStreamingTextContent] = useState(''); - const [streamingToolCall, setStreamingToolCall] = useState(null); + const [agentStatus, setAgentStatus] = useState<'idle' | 'running' | 'connecting' | 'error'>('idle'); const [isSidePanelOpen, setIsSidePanelOpen] = useState(false); const [sidePanelContent, setSidePanelContent] = useState(null); - const streamCleanupRef = useRef<(() => void) | null>(null); const messagesEndRef = useRef(null); const messagesContainerRef = useRef(null); const latestMessageRef = useRef(null); @@ -63,7 +60,6 @@ export default function ThreadPage({ params }: { params: Promise } const initialLoadCompleted = useRef(false); const messagesLoadedRef = useRef(false); const agentRunsCheckedRef = useRef(false); - const streamInitializedRef = useRef(false); const { state: leftSidebarState, setOpen: setLeftSidebarOpen } = useSidebar(); const initialLayoutAppliedRef = useRef(false); @@ -106,347 +102,72 @@ export default function ThreadPage({ params }: { params: Promise } return () => window.removeEventListener('keydown', handleKeyDown); }, [toggleSidePanel]); - const handleStreamAgent = useCallback(async (runId: string) => { - if (!runId) return; - - // Set the agent run ID and status first to ensure UI shows running state - setAgentRunId(runId); - setAgentStatus('running'); - - // Check if we already have an active stream for this run - if (streamCleanupRef.current && streamInitializedRef.current && agentRunId === runId) { - console.log(`[PAGE] Already streaming agent run ${runId}, not reconnecting`); - return; - } - - // Clean up any existing stream - if (streamCleanupRef.current) { - console.log(`[PAGE] Cleaning up existing stream before creating new one for ${runId}`); - streamCleanupRef.current(); - streamCleanupRef.current = null; - } - - // Mark stream as initializing and show streaming state in UI - console.log(`[PAGE] Setting up stream for agent run ${runId}`); - streamInitializedRef.current = true; - setIsStreaming(true); - setStreamingTextContent(''); - setStreamingToolCall(null); - - // Create the stream and get cleanup function - const cleanup = streamAgent(runId, { - onMessage: async (rawData: string) => { - try { - (window as any).lastStreamMessage = Date.now(); - let processedData = rawData; - if (processedData.startsWith('data: ')) { - processedData = processedData.substring(6).trim(); - } - if (!processedData) return; - - // Handle special non-JSON completion message - if (processedData.includes('"type":"status"') && processedData.includes('"status":"completed"')) { - console.log('[PAGE] Received completion status message'); - setIsStreaming(false); - setAgentStatus('idle'); - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - streamInitializedRef.current = false; - return; - } - - const message: UnifiedMessage = safeJsonParse(processedData, null); - if (!message) { - console.warn('[PAGE] Failed to parse streamed message:', processedData); - return; - } - - console.log(`[PAGE] Received Streamed Message (Type: ${message.type}, ID: ${message.message_id}):`, message); - - const parsedContent = safeJsonParse(message.content, {}); - const parsedMetadata = safeJsonParse(message.metadata, {}); - - if (message.type === 'assistant' && parsedMetadata.stream_status === 'chunk' && parsedContent.content) { - setStreamingTextContent(prev => prev + parsedContent.content); - } - else if (message.type === 'assistant' && parsedMetadata.stream_status === 'complete') { - setStreamingTextContent(''); - setStreamingToolCall(null); - - // Only add/replace message if it has a message_id - if (message.message_id) { - setMessages(prev => { - // Check if this message already exists in the array - const messageExists = prev.some(m => m.message_id === message.message_id); - if (messageExists) { - // Replace the existing message - return prev.map(m => m.message_id === message.message_id ? message : m); - } else { - // Add as a new message - return [...prev, message]; - } - }); - } - } - else if (message.type === 'tool') { - setStreamingToolCall(null); - - // Only add/replace if it has a message_id - if (message.message_id) { - setMessages(prev => { - // Check if this message already exists - const messageExists = prev.some(m => m.message_id === message.message_id); - if (messageExists) { - // Replace the existing message - return prev.map(m => m.message_id === message.message_id ? message : m); - } else { - // Add as a new message - return [...prev, message]; - } - }); - } - } - else if (message.type === 'status') { - switch(parsedContent.status_type) { - case 'tool_started': - setStreamingToolCall({ - role: 'assistant', - status_type: 'tool_started', - name: parsedContent.function_name, - arguments: parsedContent.arguments, - xml_tag_name: parsedContent.xml_tag_name, - tool_index: parsedContent.tool_index - }); - if (isSidePanelOpen) { - // Convert to ToolCallData format - const toolCallData: ToolCallData = { - id: message.message_id || undefined, - name: parsedContent.function_name || parsedContent.xml_tag_name || 'Unknown Tool', - arguments: parsedContent.arguments || '{}', - index: parsedContent.tool_index - }; - setSidePanelContent(toolCallData); - } - break; - case 'tool_completed': - case 'tool_failed': - case 'tool_error': - if (streamingToolCall?.tool_index === parsedContent.tool_index) { - setStreamingToolCall(null); - } - break; - case 'finish': - console.log('[PAGE] Received finish status:', parsedContent.finish_reason); - if (parsedContent.finish_reason === 'xml_tool_limit_reached') { - // toast.info('Agent stopped due to tool limit.'); - } - break; - case 'thread_run_start': - // Ensure we're showing running state for new runs - setAgentStatus('running'); - break; - case 'assistant_response_start': - break; - case 'thread_run_end': - console.log('[PAGE] Received thread run end status.'); - setIsStreaming(false); - setAgentStatus('idle'); - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - streamInitializedRef.current = false; - break; - case 'error': - console.error('[PAGE] Received error status:', parsedContent.message); - toast.error(`Agent Error: ${parsedContent.message}`); - setIsStreaming(false); - setAgentStatus('idle'); - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - streamInitializedRef.current = false; - break; - default: - console.warn('[PAGE] Unhandled status type:', parsedContent.status_type); - } - } - else if (message.type === 'user' || message.type === 'assistant' || message.type === 'system') { - // Handle other message types (user, assistant if not streaming, system) - if (message.message_id) { - setMessages(prev => { - // Check if message already exists - const messageExists = prev.some(m => m.message_id === message.message_id); - if (messageExists) { - // Replace existing message - return prev.map(m => m.message_id === message.message_id ? message : m); - } else { - // Add new message - return [...prev, message]; - } - }); - } else if (!parsedMetadata.stream_status) { - // Only add messages without IDs if they're not temporary streaming chunks - setMessages(prev => [...prev, message]); - } - } - else { - console.warn('[PAGE] Unhandled message type:', message.type); - } - - } catch (error) { - console.error('[PAGE] Error processing streamed message:', error, rawData); - } - }, - onError: (error: Error | string) => { - console.error('[PAGE] Streaming error:', error); - const errorMessage = typeof error === 'string' ? error : error.message; - - // Check if this is a "not found" error - const isNotFoundError = errorMessage.includes('not found') || - errorMessage.includes('does not exist'); - - // Only show toast for errors that aren't "not found" errors - if (!isNotFoundError) { - toast.error(errorMessage); - } - - // Reset stream state - streamCleanupRef.current = null; - - // If this is a "not found" error, reset agent state immediately - if (isNotFoundError) { - console.log(`[PAGE] Agent run ${runId} not found, resetting state immediately`); - setIsStreaming(false); - setAgentStatus('idle'); - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - streamInitializedRef.current = false; - return; - } - - // For other errors, check if agent is still running - if (agentRunId === runId) { - console.log(`[PAGE] Stream error for active agent ${runId}, checking if still running...`); - - getAgentStatus(runId).then(status => { - if (status.status === 'running') { - console.log(`[PAGE] Agent ${runId} still running after stream error, will reconnect shortly`); - // Give a short delay before reconnecting - setTimeout(() => { - if (agentRunId === runId) { - streamInitializedRef.current = false; - handleStreamAgent(runId); - } - }, 2000); - } else { - // Agent is no longer running, reset state - console.log(`[PAGE] Agent ${runId} not running after stream error, resetting state`); - setIsStreaming(false); - setAgentStatus('idle'); - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - streamInitializedRef.current = false; - } - }).catch(err => { - console.error(`[PAGE] Error checking agent status after stream error:`, err); - // Reset on error checking status - setIsStreaming(false); - setAgentStatus('idle'); - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - streamInitializedRef.current = false; - }); - } else { - // This was not for the current agent run, just clean up - setIsStreaming(false); - streamInitializedRef.current = false; - } - }, - onClose: async () => { - console.log('[PAGE] Stream connection closed by server.'); - - // Check if agent is still running after stream close - if (agentStatus === 'running' && agentRunId === runId) { - console.log(`[PAGE] Stream closed for active agent ${runId}, checking if still running...`); - - try { - const status = await getAgentStatus(runId); - if (status.status === 'running') { - console.log(`[PAGE] Agent ${runId} still running after stream closed, will reconnect`); - // Stream closed but agent is still running - reconnect - setTimeout(() => { - if (agentRunId === runId) { - console.log(`[PAGE] Reconnecting to running agent ${runId}`); - streamInitializedRef.current = false; - handleStreamAgent(runId); - } - }, 1000); - return; - } else { - console.log(`[PAGE] Agent ${runId} not running after stream closed (${status.status}), resetting state`); - // Agent is done, reset state - setIsStreaming(false); - setAgentStatus('idle'); - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - streamCleanupRef.current = null; - streamInitializedRef.current = false; - } - } catch (err) { - console.error(`[PAGE] Error checking agent status after stream close:`, err); - - // Check if this is a "not found" error - const errorMessage = err instanceof Error ? err.message : String(err); - const isNotFoundError = errorMessage.includes('not found') || - errorMessage.includes('404') || - errorMessage.includes('does not exist'); - - if (isNotFoundError) { - console.log(`[PAGE] Agent run ${runId} not found after stream close, resetting state`); - // Reset all state since the agent run doesn't exist - setIsStreaming(false); - setAgentStatus('idle'); - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - streamCleanupRef.current = null; - streamInitializedRef.current = false; - return; - } - - // For other errors, only reset streaming state but maintain agent status as running - setIsStreaming(false); - // Don't set agent status to idle here - setStreamingTextContent(''); - setStreamingToolCall(null); - streamCleanupRef.current = null; - - // Try to reconnect after a short delay if agent is still marked as running - setTimeout(() => { - if (agentRunId === runId && agentStatus === 'running') { - console.log(`[PAGE] Attempting to reconnect after error checking status`); - streamInitializedRef.current = false; - handleStreamAgent(runId); - } - }, 2000); - } - } else { - // Normal cleanup for non-running agent - streamCleanupRef.current = null; - streamInitializedRef.current = false; - } + const handleNewMessageFromStream = useCallback((message: UnifiedMessage) => { + setMessages(prev => { + const messageExists = prev.some(m => m.message_id === message.message_id); + if (messageExists) { + return prev.map(m => m.message_id === message.message_id ? message : m); + } else { + return [...prev, message]; } }); - - // Store cleanup function - streamCleanupRef.current = cleanup; + }, []); - }, [threadId, agentRunId, agentStatus, isSidePanelOpen]); + const handleStreamStatusChange = useCallback((hookStatus: AgentStreamStatus) => { + console.log(`[PAGE] Hook status changed: ${hookStatus}`); + switch(hookStatus) { + case 'idle': + case 'completed': + case 'stopped': + case 'agent_not_running': + setAgentStatus('idle'); + setAgentRunId(null); + break; + case 'connecting': + setAgentStatus('connecting'); + break; + case 'streaming': + setAgentStatus('running'); + break; + case 'error': + setAgentStatus('error'); + break; + } + }, []); + + const handleStreamError = useCallback((errorMessage: string) => { + console.error(`[PAGE] Stream hook error: ${errorMessage}`); + if (!errorMessage.toLowerCase().includes('not found') && + !errorMessage.toLowerCase().includes('agent run is not running')) { + toast.error(`Stream Error: ${errorMessage}`); + } + }, []); + + const handleStreamClose = useCallback(() => { + console.log(`[PAGE] Stream hook closed with final status: ${agentStatus}`); + }, [agentStatus]); + + const { + status: streamHookStatus, + textContent: streamingTextContent, + toolCall: streamingToolCall, + error: streamError, + agentRunId: currentHookRunId, + startStreaming, + stopStreaming, + } = useAgentStream({ + onMessage: handleNewMessageFromStream, + onStatusChange: handleStreamStatusChange, + onError: handleStreamError, + onClose: handleStreamClose, + }); + + useEffect(() => { + if (agentRunId && agentRunId !== currentHookRunId) { + console.log(`[PAGE] Target agentRunId set to ${agentRunId}, initiating stream...`); + startStreaming(agentRunId); + } + }, [agentRunId, startStreaming]); useEffect(() => { let isMounted = true; @@ -500,29 +221,24 @@ export default function ThreadPage({ params }: { params: Promise } } } - // Check once for active agent run on initial load if (!agentRunsCheckedRef.current && isMounted) { try { console.log('[PAGE] Checking for active agent runs...'); const agentRuns = await getAgentRuns(threadId); - - // Mark as checked immediately to prevent duplicate checks agentRunsCheckedRef.current = true; - - // Look for running agent + const activeRun = agentRuns.find(run => run.status === 'running'); - if (activeRun) { + if (activeRun && isMounted) { console.log('[PAGE] Found active run on load:', activeRun.id); - // Don't set agentRunId here, let handleStreamAgent do it - handleStreamAgent(activeRun.id); + setAgentRunId(activeRun.id); } else { console.log('[PAGE] No active agent runs found'); - setAgentStatus('idle'); + if (isMounted) setAgentStatus('idle'); } } catch (err) { console.error('[PAGE] Error checking for active runs:', err); agentRunsCheckedRef.current = true; - setAgentStatus('idle'); + if (isMounted) setAgentStatus('idle'); } } @@ -542,68 +258,14 @@ export default function ThreadPage({ params }: { params: Promise } loadData(); - // Handle reconnection when tab visibility changes - const handleVisibilityChange = () => { - if (document.visibilityState === 'visible' && agentRunId && agentStatus === 'running') { - const lastMessage = (window as any).lastStreamMessage || 0; - const now = Date.now(); - const messageTimeout = 10000; // Reduced timeout for faster reconnection - - // If no recent messages and not already attempting to reconnect - if (now - lastMessage > messageTimeout && !streamInitializedRef.current) { - console.log('[PAGE] Tab became visible, stream appears stale. Verifying agent status...'); - - getAgentStatus(agentRunId).then(status => { - if (status.status === 'running') { - console.log('[PAGE] Agent still running after visibility change, reconnecting'); - handleStreamAgent(agentRunId); - } else { - console.log('[PAGE] Agent no longer running after visibility change'); - setAgentStatus('idle'); - setAgentRunId(null); - } - }).catch(err => { - console.error('[PAGE] Error checking agent status on visibility change:', err); - - // Check if this is a "not found" error - const errorMessage = err instanceof Error ? err.message : String(err); - const isNotFoundError = errorMessage.includes('not found') || - errorMessage.includes('404') || - errorMessage.includes('does not exist'); - - if (isNotFoundError) { - console.log(`[PAGE] Agent run ${agentRunId} not found after visibility change, resetting state`); - // Reset all state since the agent run doesn't exist - setAgentStatus('idle'); - setAgentRunId(null); - return; - } - - // For other errors, don't reset agent status - maintain as running - }); - } - } - }; - - document.addEventListener('visibilitychange', handleVisibilityChange); - return () => { isMounted = false; - document.removeEventListener('visibilitychange', handleVisibilityChange); - if (streamCleanupRef.current) { - console.log('[PAGE] Cleaning up stream on unmount'); - streamCleanupRef.current(); - streamCleanupRef.current = null; - streamInitializedRef.current = false; - } }; - }, [threadId, handleStreamAgent]); + }, [threadId]); const handleSubmitMessage = useCallback(async (message: string) => { if (!message.trim()) return; setIsSending(true); - setStreamingTextContent(''); - setStreamingToolCall(null); const optimisticUserMessage: UnifiedMessage = { message_id: `temp-${Date.now()}`, @@ -638,12 +300,8 @@ export default function ThreadPage({ params }: { params: Promise } const agentResult = results[1].value; setMessages(prev => prev.filter(m => m.message_id !== optimisticUserMessage.message_id)); - - // Reset stream for new agent run - streamInitializedRef.current = false; - - // Connect to the new agent run - handleStreamAgent(agentResult.agent_run_id); + + setAgentRunId(agentResult.agent_run_id); } catch (err) { console.error('Error sending message or starting agent:', err); @@ -652,39 +310,13 @@ export default function ThreadPage({ params }: { params: Promise } } finally { setIsSending(false); } - }, [threadId, handleStreamAgent]); + }, [threadId]); const handleStopAgent = useCallback(async () => { - if (!agentRunId) return; - console.log(`[PAGE] Stopping agent run: ${agentRunId}`); - - // Set UI state immediately - setIsStreaming(false); + console.log(`[PAGE] Requesting agent stop via hook.`); setAgentStatus('idle'); - - // Store agent run ID for the API call - const runIdToStop = agentRunId; - - // Reset all state - setAgentRunId(null); - setStreamingTextContent(''); - setStreamingToolCall(null); - - // Clean up stream - if (streamCleanupRef.current) { - streamCleanupRef.current(); - streamCleanupRef.current = null; - streamInitializedRef.current = false; - } - - try { - await stopAgent(runIdToStop); - toast.success('Agent stop request sent.'); - } catch (err) { - console.error('[PAGE] Error stopping agent:', err); - toast.error(err instanceof Error ? err.message : 'Failed to stop agent'); - } - }, [agentRunId]); + await stopStreaming(); + }, [stopStreaming]); const handleScroll = () => { if (!messagesContainerRef.current) return; @@ -721,85 +353,9 @@ export default function ThreadPage({ params }: { params: Promise } setUserHasScrolled(false); }; - // Periodically check agent run status if not streaming useEffect(() => { - let checkInterval: NodeJS.Timeout | null = null; - - // If agent should be running but we're not streaming - if (agentRunId && agentStatus === 'running' && !isStreaming) { - console.warn('[PAGE] Agent running but not streaming. Setting up status check interval.'); - - // Track consecutive completed responses - let consecutiveCompleted = 0; - const MAX_CONSECUTIVE_COMPLETED = 2; - - checkInterval = setInterval(async () => { - try { - console.log(`[PAGE] Checking status of agent run ${agentRunId}`); - const status = await getAgentStatus(agentRunId); - - // Check if the agent is still running - if (status.status === 'running') { - console.log('[PAGE] Agent confirmed running, reconnecting to stream'); - consecutiveCompleted = 0; // Reset counter - // Force stream reconnection since agent is still running - if (!streamInitializedRef.current) { - handleStreamAgent(agentRunId); - } - } else { - console.log(`[PAGE] Agent no longer running (status: ${status.status}), incrementing completed counter`); - // Agent is not running anymore, increment counter - consecutiveCompleted++; - - if (consecutiveCompleted >= MAX_CONSECUTIVE_COMPLETED) { - console.log(`[PAGE] Agent confirmed ${status.status} after ${consecutiveCompleted} checks, resetting state`); - // Reset all state after confirming non-running status multiple times - setAgentRunId(null); - setAgentStatus('idle'); - if (checkInterval) { - clearInterval(checkInterval); - checkInterval = null; - } - } - } - } catch (err) { - console.error('[PAGE] Error checking agent status:', err); - - // Check if this is a "not found" error - const errorMessage = err instanceof Error ? err.message : String(err); - const isNotFoundError = errorMessage.includes('not found') || - errorMessage.includes('404') || - errorMessage.includes('does not exist'); - - if (isNotFoundError) { - console.log(`[PAGE] Agent run ${agentRunId} not found during status check, resetting state`); - // Reset all state since the agent run doesn't exist - setAgentRunId(null); - setAgentStatus('idle'); - if (checkInterval) { - clearInterval(checkInterval); - checkInterval = null; - } - return; - } - - // For other errors, don't reset agent state to avoid flickering - // Just let next check try again - } - }, 8000); // Check every 8 seconds - } - - return () => { - if (checkInterval) { - clearInterval(checkInterval); - } - }; - }, [agentRunId, agentStatus, isStreaming, handleStreamAgent]); - - // Debug logging - useEffect(() => { - console.log(`[PAGE] 🔄 AgentStatus: ${agentStatus}, isStreaming: ${isStreaming}, agentRunId: ${agentRunId || 'none'}`); - }, [agentStatus, isStreaming, agentRunId]); + console.log(`[PAGE] 🔄 Page AgentStatus: ${agentStatus}, Hook Status: ${streamHookStatus}, Target RunID: ${agentRunId || 'none'}, Hook RunID: ${currentHookRunId || 'none'}`); + }, [agentStatus, streamHookStatus, agentRunId, currentHookRunId]); const handleOpenFileViewer = useCallback(() => setFileViewerOpen(true), []); @@ -966,7 +522,7 @@ export default function ThreadPage({ params }: { params: Promise } onScroll={handleScroll} >
- {messages.length === 0 && !streamingTextContent && !streamingToolCall ? ( + {messages.length === 0 && !streamingTextContent && !streamingToolCall && agentStatus === 'idle' ? (
Send a message to start.
@@ -1164,7 +720,7 @@ export default function ThreadPage({ params }: { params: Promise } ); } })} - {(streamingTextContent || streamingToolCall) && ( + {(streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && (
@@ -1177,7 +733,7 @@ export default function ThreadPage({ params }: { params: Promise } {streamingTextContent} )} - {isStreaming && } + {(streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && } {streamingToolCall && (
@@ -1203,7 +759,7 @@ export default function ThreadPage({ params }: { params: Promise }
)} {agentStatus === 'running' && !streamingTextContent && !streamingToolCall && messages.length > 0 && messages[messages.length-1].type === 'user' && ( -
+
Suna @@ -1239,8 +795,8 @@ export default function ThreadPage({ params }: { params: Promise } onSubmit={handleSubmitMessage} placeholder="Ask Suna anything..." loading={isSending} - disabled={isSending || agentStatus === 'running'} - isAgentRunning={agentStatus === 'running'} + disabled={isSending || agentStatus === 'running' || agentStatus === 'connecting'} + isAgentRunning={agentStatus === 'running' || agentStatus === 'connecting'} onStopAgent={handleStopAgent} autoFocus={!isLoading} onFileBrowse={handleOpenFileViewer} diff --git a/frontend/src/hooks/useAgentStream.ts b/frontend/src/hooks/useAgentStream.ts new file mode 100644 index 00000000..b7299b6e --- /dev/null +++ b/frontend/src/hooks/useAgentStream.ts @@ -0,0 +1,427 @@ +import { useState, useEffect, useRef, useCallback } from 'react'; +import { + streamAgent, + getAgentStatus, + stopAgent, + AgentRun, + Message as ApiMessageType +} from '@/lib/api'; +import { toast } from 'sonner'; +import { UnifiedMessage, ParsedContent, ParsedMetadata } from '@/components/thread/types'; +import { safeJsonParse } from '@/components/thread/utils'; + +// Define the possible statuses for the stream hook +export type AgentStreamStatus = + | 'idle' // No active stream or agent run + | 'connecting' // Verifying agent status and initiating stream + | 'streaming' // Actively receiving messages + | 'completed' // Stream finished successfully, agent run completed + | 'stopped' // Stream stopped by user action + | 'error' // An error occurred during streaming or connection + | 'agent_not_running'; // Agent run provided was not in a running state + +// Define the structure returned by the hook +export interface UseAgentStreamResult { + status: AgentStreamStatus; + textContent: string; + toolCall: ParsedContent | null; + error: string | null; + agentRunId: string | null; // Expose the currently managed agentRunId + startStreaming: (runId: string) => void; + stopStreaming: () => Promise; +} + +// Define the callbacks the hook consumer can provide +export interface AgentStreamCallbacks { + onMessage: (message: UnifiedMessage) => void; // Callback for complete messages + onStatusChange?: (status: AgentStreamStatus) => void; // Optional: Notify on internal status changes + onError?: (error: string) => void; // Optional: Notify on errors + onClose?: (finalStatus: AgentStreamStatus) => void; // Optional: Notify when streaming definitively ends +} + +export function useAgentStream(callbacks: AgentStreamCallbacks): UseAgentStreamResult { + const [agentRunId, setAgentRunId] = useState(null); + const [status, setStatus] = useState('idle'); + const [textContent, setTextContent] = useState(''); + const [toolCall, setToolCall] = useState(null); + const [error, setError] = useState(null); + + const streamCleanupRef = useRef<(() => void) | null>(null); + const isMountedRef = useRef(true); + const currentRunIdRef = useRef(null); // Ref to track the run ID being processed + + // Internal function to update status and notify consumer + const updateStatus = useCallback((newStatus: AgentStreamStatus) => { + if (isMountedRef.current) { + setStatus(newStatus); + callbacks.onStatusChange?.(newStatus); + if (newStatus === 'error' && error) { + callbacks.onError?.(error); + } + if (['completed', 'stopped', 'error', 'agent_not_running'].includes(newStatus)) { + callbacks.onClose?.(newStatus); + } + } + }, [callbacks, error]); // Include error dependency + + // Function to handle finalization of a stream (completion, stop, error) + const finalizeStream = useCallback((finalStatus: AgentStreamStatus, runId: string | null = agentRunId) => { + if (!isMountedRef.current) return; + + console.log(`[useAgentStream] Finalizing stream for ${runId} with status: ${finalStatus}`); + + if (streamCleanupRef.current) { + streamCleanupRef.current(); + streamCleanupRef.current = null; + } + + // Reset streaming-specific state + setTextContent(''); + setToolCall(null); + + // Update status and clear run ID + updateStatus(finalStatus); + setAgentRunId(null); + currentRunIdRef.current = null; + + // If the run was stopped or completed, try to get final status to update nonRunning set + if (runId && (finalStatus === 'completed' || finalStatus === 'stopped' || finalStatus === 'agent_not_running')) { + getAgentStatus(runId).catch(err => { + console.log(`[useAgentStream] Post-finalization status check for ${runId} failed (this might be expected if not found): ${err.message}`); + }); + } + }, [agentRunId, updateStatus]); + + // --- Stream Callback Handlers --- + + const handleStreamMessage = useCallback((rawData: string) => { + if (!isMountedRef.current) return; + (window as any).lastStreamMessage = Date.now(); // Keep track of last message time + + let processedData = rawData; + if (processedData.startsWith('data: ')) { + processedData = processedData.substring(6).trim(); + } + if (!processedData) return; + + // --- Early exit for non-JSON completion messages --- + if (processedData.includes('"type":"status"') && processedData.includes('"status":"completed"')) { + console.log('[useAgentStream] Received final completion status message'); + finalizeStream('completed', currentRunIdRef.current); + return; + } + if (processedData.includes('Run data not available for streaming') || processedData.includes('Stream ended with status: completed')) { + console.log(`[useAgentStream] Detected final completion message: "${processedData}", finalizing.`); + finalizeStream('completed', currentRunIdRef.current); + return; + } + + // --- Process JSON messages --- + const message: UnifiedMessage = safeJsonParse(processedData, null); + if (!message) { + console.warn('[useAgentStream] Failed to parse streamed message:', processedData); + return; + } + + const parsedContent = safeJsonParse(message.content, {}); + const parsedMetadata = safeJsonParse(message.metadata, {}); + + // Update status to streaming if we receive a valid message + if (status !== 'streaming') updateStatus('streaming'); + + switch (message.type) { + case 'assistant': + if (parsedMetadata.stream_status === 'chunk' && parsedContent.content) { + setTextContent(prev => prev + parsedContent.content); + } else if (parsedMetadata.stream_status === 'complete') { + setTextContent(''); + setToolCall(null); + if (message.message_id) callbacks.onMessage(message); + } else if (!parsedMetadata.stream_status) { + // Handle non-chunked assistant messages if needed + if (message.message_id) callbacks.onMessage(message); + } + break; + case 'tool': + setToolCall(null); // Clear any streaming tool call + if (message.message_id) callbacks.onMessage(message); + break; + case 'status': + switch (parsedContent.status_type) { + case 'tool_started': + setToolCall({ + role: 'assistant', + status_type: 'tool_started', + name: parsedContent.function_name, + arguments: parsedContent.arguments, + xml_tag_name: parsedContent.xml_tag_name, + tool_index: parsedContent.tool_index + }); + break; + case 'tool_completed': + case 'tool_failed': + case 'tool_error': + if (toolCall?.tool_index === parsedContent.tool_index) { + setToolCall(null); + } + break; + case 'thread_run_end': + console.log('[useAgentStream] Received thread run end status, finalizing.'); + break; + case 'finish': + // Optional: Handle finish reasons like 'xml_tool_limit_reached' + console.log('[useAgentStream] Received finish status:', parsedContent.finish_reason); + // Don't finalize here, wait for thread_run_end or completion message + break; + case 'error': + console.error('[useAgentStream] Received error status message:', parsedContent.message); + setError(parsedContent.message || 'Agent run failed'); + finalizeStream('error', currentRunIdRef.current); + break; + // Ignore thread_run_start, assistant_response_start etc. for now + default: + // console.debug('[useAgentStream] Received unhandled status type:', parsedContent.status_type); + break; + } + break; + case 'user': + case 'system': + // Handle other message types if necessary, e.g., if backend sends historical context + if (message.message_id) callbacks.onMessage(message); + break; + default: + console.warn('[useAgentStream] Unhandled message type:', message.type); + } + }, [status, toolCall, callbacks, finalizeStream, updateStatus]); + + const handleStreamError = useCallback((err: Error | string | Event) => { + if (!isMountedRef.current) return; + + // Extract error message + let errorMessage = 'Unknown streaming error'; + if (typeof err === 'string') { + errorMessage = err; + } else if (err instanceof Error) { + errorMessage = err.message; + } else if (err instanceof Event && err.type === 'error') { + // Standard EventSource errors don't have much detail, might need status check + errorMessage = 'Stream connection error'; + } + + console.error('[useAgentStream] Streaming error:', errorMessage, err); + setError(errorMessage); + + const runId = currentRunIdRef.current; + if (!runId) { + console.warn('[useAgentStream] Stream error occurred but no agentRunId is active.'); + finalizeStream('error'); // Finalize with generic error if no runId + return; + } + + // Check agent status immediately after an error + getAgentStatus(runId) + .then(agentStatus => { + if (!isMountedRef.current) return; // Check mount status again after async call + + if (agentStatus.status === 'running') { + console.warn(`[useAgentStream] Stream error for ${runId}, but agent is still running. Attempting reconnect.`); + // Consider adding a delay or backoff here + // For now, just finalize with error and let the user retry or handle reconnection logic outside + finalizeStream('error', runId); + toast.warning("Stream interrupted. Agent might still be running."); + } else { + console.log(`[useAgentStream] Stream error for ${runId}, agent status is ${agentStatus.status}. Finalizing stream.`); + finalizeStream(agentStatus.status === 'completed' ? 'completed' : 'error', runId); + } + }) + .catch(statusError => { + if (!isMountedRef.current) return; + + const statusErrorMessage = statusError instanceof Error ? statusError.message : String(statusError); + console.error(`[useAgentStream] Error checking agent status for ${runId} after stream error: ${statusErrorMessage}`); + + const isNotFoundError = statusErrorMessage.includes('not found') || + statusErrorMessage.includes('404') || + statusErrorMessage.includes('does not exist'); + + if (isNotFoundError) { + console.log(`[useAgentStream] Agent run ${runId} not found after stream error. Finalizing.`); + finalizeStream('agent_not_running', runId); // Use a specific status + } else { + // For other status check errors, finalize with the original stream error + finalizeStream('error', runId); + } + }); + + }, [finalizeStream]); + + const handleStreamClose = useCallback(() => { + if (!isMountedRef.current) return; + console.log('[useAgentStream] Stream connection closed by server.'); + + const runId = currentRunIdRef.current; + if (!runId) { + console.warn('[useAgentStream] Stream closed but no active agentRunId.'); + // If status was streaming, something went wrong, finalize as error + if (status === 'streaming' || status === 'connecting') { + finalizeStream('error'); + } else if (status !== 'idle' && status !== 'completed' && status !== 'stopped' && status !== 'agent_not_running') { + // If in some other state, just go back to idle if no runId + finalizeStream('idle'); + } + return; + } + + // Immediately check the agent status when the stream closes unexpectedly + // This covers cases where the agent finished but the final message wasn't received, + // or if the agent errored out on the backend. + getAgentStatus(runId) + .then(agentStatus => { + if (!isMountedRef.current) return; // Check mount status again + + console.log(`[useAgentStream] Agent status after stream close for ${runId}: ${agentStatus.status}`); + if (agentStatus.status === 'running') { + // This case is tricky. The stream closed, but the agent is running. + // Could be a temporary network issue, or the backend stream terminated prematurely. + console.warn(`[useAgentStream] Stream closed for ${runId}, but agent is still running. Reconnection logic needed or signal error.`); + setError('Stream closed unexpectedly while agent was running.'); + finalizeStream('error', runId); // Finalize as error for now + // Optionally: Implement automatic reconnection attempts here or notify parent component. + toast.warning("Stream disconnected. Agent might still be running."); + } else { + // Agent is not running (completed, stopped, error). Finalize accordingly. + finalizeStream(agentStatus.status === 'completed' ? 'completed' : 'error', runId); + } + }) + .catch(err => { + if (!isMountedRef.current) return; + + const errorMessage = err instanceof Error ? err.message : String(err); + console.error(`[useAgentStream] Error checking agent status for ${runId} after stream close: ${errorMessage}`); + + const isNotFoundError = errorMessage.includes('not found') || + errorMessage.includes('404') || + errorMessage.includes('does not exist'); + + if (isNotFoundError) { + console.log(`[useAgentStream] Agent run ${runId} not found after stream close. Finalizing.`); + finalizeStream('agent_not_running', runId); // Use specific status + } else { + // For other errors checking status, finalize with generic error + finalizeStream('error', runId); + } + }); + + }, [status, finalizeStream]); // Include status + + // --- Effect to manage the stream lifecycle --- + useEffect(() => { + isMountedRef.current = true; + + // Cleanup function for when the component unmounts or agentRunId changes + return () => { + isMountedRef.current = false; + console.log('[useAgentStream] Unmounting or agentRunId changing. Cleaning up stream.'); + if (streamCleanupRef.current) { + streamCleanupRef.current(); + streamCleanupRef.current = null; + } + // Reset state on unmount if needed, though finalizeStream should handle most cases + setStatus('idle'); + setTextContent(''); + setToolCall(null); + setError(null); + setAgentRunId(null); + currentRunIdRef.current = null; + }; + }, []); // Empty dependency array for mount/unmount effect + + // --- Public Functions --- + + const startStreaming = useCallback(async (runId: string) => { + if (!isMountedRef.current) return; + console.log(`[useAgentStream] Received request to start streaming for ${runId}`); + + // Clean up any previous stream + if (streamCleanupRef.current) { + console.log('[useAgentStream] Cleaning up existing stream before starting new one.'); + streamCleanupRef.current(); + streamCleanupRef.current = null; + } + + // Reset state before starting + setTextContent(''); + setToolCall(null); + setError(null); + updateStatus('connecting'); + setAgentRunId(runId); + currentRunIdRef.current = runId; // Set the ref immediately + + try { + // *** Crucial check: Verify agent is running BEFORE connecting *** + const agentStatus = await getAgentStatus(runId); + if (!isMountedRef.current) return; // Check mount status after async call + + if (agentStatus.status !== 'running') { + console.warn(`[useAgentStream] Agent run ${runId} is not in running state (status: ${agentStatus.status}). Cannot start stream.`); + setError(`Agent run is not running (status: ${agentStatus.status})`); + finalizeStream('agent_not_running', runId); + return; + } + + // Agent is running, proceed to create the stream + console.log(`[useAgentStream] Agent run ${runId} confirmed running. Setting up EventSource.`); + const cleanup = streamAgent(runId, { + onMessage: handleStreamMessage, + onError: handleStreamError, + onClose: handleStreamClose, + }); + streamCleanupRef.current = cleanup; + // Status will be updated to 'streaming' by the first message received in handleStreamMessage + + } catch (err) { + if (!isMountedRef.current) return; // Check mount status after async call + + const errorMessage = err instanceof Error ? err.message : String(err); + console.error(`[useAgentStream] Error initiating stream for ${runId}: ${errorMessage}`); + setError(errorMessage); + + const isNotFoundError = errorMessage.includes('not found') || + errorMessage.includes('404') || + errorMessage.includes('does not exist'); + + finalizeStream(isNotFoundError ? 'agent_not_running' : 'error', runId); + } + }, [updateStatus, finalizeStream, handleStreamMessage, handleStreamError, handleStreamClose]); // Add dependencies + + const stopStreaming = useCallback(async () => { + if (!isMountedRef.current || !agentRunId) return; + + const runIdToStop = agentRunId; + console.log(`[useAgentStream] Stopping stream for agent run ${runIdToStop}`); + + // Immediately update status and clean up stream + finalizeStream('stopped', runIdToStop); + + try { + await stopAgent(runIdToStop); + toast.success('Agent stop request sent.'); + // finalizeStream already called getAgentStatus implicitly if needed + } catch (err) { + // Don't revert status here, as the user intended to stop. Just log error. + const errorMessage = err instanceof Error ? err.message : String(err); + console.error(`[useAgentStream] Error sending stop request for ${runIdToStop}: ${errorMessage}`); + toast.error(`Failed to stop agent: ${errorMessage}`); + } + }, [agentRunId, finalizeStream]); // Add dependencies + + return { + status, + textContent, + toolCall, + error, + agentRunId, + startStreaming, + stopStreaming, + }; +} \ No newline at end of file diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index a7ff37d2..8109fb23 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -2,12 +2,11 @@ import { createClient } from '@/lib/supabase/client'; const API_URL = process.env.NEXT_PUBLIC_BACKEND_URL || ''; -// Simple cache implementation +// Simple cache implementation for non-agent data const apiCache = { projects: new Map(), threads: new Map(), threadMessages: new Map(), - agentRuns: new Map(), getProject: (projectId: string) => apiCache.projects.get(projectId), setProject: (projectId: string, data: any) => apiCache.projects.set(projectId, data), @@ -21,75 +20,15 @@ const apiCache = { getThreadMessages: (threadId: string) => apiCache.threadMessages.get(threadId), setThreadMessages: (threadId: string, data: any) => apiCache.threadMessages.set(threadId, data), - getAgentRuns: (threadId: string) => apiCache.agentRuns.get(threadId), - setAgentRuns: (threadId: string, data: any) => apiCache.agentRuns.set(threadId, data), - // Helper to clear parts of the cache when data changes invalidateThreadMessages: (threadId: string) => apiCache.threadMessages.delete(threadId), - invalidateAgentRuns: (threadId: string) => apiCache.agentRuns.delete(threadId), -}; - -// Add a fetch queue system to prevent multiple simultaneous requests -const fetchQueue = { - agentRuns: new Map>(), - threads: new Map>(), - messages: new Map>(), - projects: new Map>(), - - getQueuedAgentRuns: (threadId: string) => fetchQueue.agentRuns.get(threadId), - setQueuedAgentRuns: (threadId: string, promise: Promise) => { - fetchQueue.agentRuns.set(threadId, promise); - // Auto-clean the queue after the promise resolves - promise.finally(() => { - fetchQueue.agentRuns.delete(threadId); - }); - return promise; - }, - - getQueuedThreads: (projectId: string) => fetchQueue.threads.get(projectId || 'all'), - setQueuedThreads: (projectId: string, promise: Promise) => { - fetchQueue.threads.set(projectId || 'all', promise); - promise.finally(() => { - fetchQueue.threads.delete(projectId || 'all'); - }); - return promise; - }, - - getQueuedMessages: (threadId: string) => fetchQueue.messages.get(threadId), - setQueuedMessages: (threadId: string, promise: Promise) => { - fetchQueue.messages.set(threadId, promise); - promise.finally(() => { - fetchQueue.messages.delete(threadId); - }); - return promise; - }, - - getQueuedProjects: () => fetchQueue.projects.get('all'), - setQueuedProjects: (promise: Promise) => { - fetchQueue.projects.set('all', promise); - promise.finally(() => { - fetchQueue.projects.delete('all'); - }); - return promise; - } }; // Track active streams by agent run ID -const activeStreams = new Map void; - onError: (error: Error | string) => void; - onClose: () => void; - }>; -}>(); +const activeStreams = new Map(); -// Track recent agent status requests to prevent duplicates -const recentAgentStatusRequests = new Map; -}>(); +// Track agent runs that have been confirmed as completed or not found +const nonRunningAgentRuns = new Set(); export type Project = { id: string; @@ -135,47 +74,35 @@ export type ToolCall = { // Project APIs export const getProjects = async (): Promise => { - // Check if we already have a pending request - const pendingRequest = fetchQueue.getQueuedProjects(); - if (pendingRequest) { - return pendingRequest; - } - // Check cache first const cached = apiCache.getProjects(); if (cached) { return cached; } - // Create and queue the promise - const fetchPromise = (async () => { - try { - const supabase = createClient(); - const { data, error } = await supabase - .from('projects') - .select('*'); - - if (error) { - // Handle permission errors specifically - if (error.code === '42501' && error.message.includes('has_role_on_account')) { - console.error('Permission error: User does not have proper account access'); - return []; // Return empty array instead of throwing - } - throw error; + try { + const supabase = createClient(); + const { data, error } = await supabase + .from('projects') + .select('*'); + + if (error) { + // Handle permission errors specifically + if (error.code === '42501' && error.message.includes('has_role_on_account')) { + console.error('Permission error: User does not have proper account access'); + return []; // Return empty array instead of throwing } - - // Cache the result - apiCache.setProjects(data || []); - return data || []; - } catch (err) { - console.error('Error fetching projects:', err); - // Return empty array for permission errors to avoid crashing the UI - return []; + throw error; } - })(); - - // Add to queue and return - return fetchQueue.setQueuedProjects(fetchPromise); + + // Cache the result + apiCache.setProjects(data || []); + return data || []; + } catch (err) { + console.error('Error fetching projects:', err); + // Return empty array for permission errors to avoid crashing the UI + return []; + } }; export const getProject = async (projectId: string): Promise => { @@ -290,38 +217,26 @@ export const deleteProject = async (projectId: string): Promise => { // Thread APIs export const getThreads = async (projectId?: string): Promise => { - // Check if we already have a pending request - const pendingRequest = fetchQueue.getQueuedThreads(projectId || 'all'); - if (pendingRequest) { - return pendingRequest; - } - // Check cache first const cached = apiCache.getThreads(projectId || 'all'); if (cached) { return cached; } - // Create and queue the promise - const fetchPromise = (async () => { - const supabase = createClient(); - let query = supabase.from('threads').select('*'); - - if (projectId) { - query = query.eq('project_id', projectId); - } - - const { data, error } = await query; - - if (error) throw error; - - // Cache the result - apiCache.setThreads(projectId || 'all', data || []); - return data || []; - })(); + const supabase = createClient(); + let query = supabase.from('threads').select('*'); - // Add to queue and return - return fetchQueue.setQueuedThreads(projectId || 'all', fetchPromise); + if (projectId) { + query = query.eq('project_id', projectId); + } + + const { data, error } = await query; + + if (error) throw error; + + // Cache the result + apiCache.setThreads(projectId || 'all', data || []); + return data || []; }; export const getThread = async (threadId: string): Promise => { @@ -389,43 +304,31 @@ export const addUserMessage = async (threadId: string, content: string): Promise }; export const getMessages = async (threadId: string): Promise => { - // Check if we already have a pending request - const pendingRequest = fetchQueue.getQueuedMessages(threadId); - if (pendingRequest) { - return pendingRequest; - } - // Check cache first const cached = apiCache.getThreadMessages(threadId); if (cached) { return cached; } - // Create and queue the promise - const fetchPromise = (async () => { - const supabase = createClient(); - - const { data, error } = await supabase - .from('messages') - .select('*') - .eq('thread_id', threadId) - .neq('type', 'cost') - .neq('type', 'summary') - .order('created_at', { ascending: true }); - - if (error) { - console.error('Error fetching messages:', error); - throw new Error(`Error getting messages: ${error.message}`); - } - - // Cache the result - apiCache.setThreadMessages(threadId, data || []); - - return data || []; - })(); + const supabase = createClient(); - // Add to queue and return - return fetchQueue.setQueuedMessages(threadId, fetchPromise); + const { data, error } = await supabase + .from('messages') + .select('*') + .eq('thread_id', threadId) + .neq('type', 'cost') + .neq('type', 'summary') + .order('created_at', { ascending: true }); + + if (error) { + console.error('Error fetching messages:', error); + throw new Error(`Error getting messages: ${error.message}`); + } + + // Cache the result + apiCache.setThreadMessages(threadId, data || []); + + return data || []; }; // Agent APIs @@ -451,6 +354,8 @@ export const startAgent = async (threadId: string): Promise<{ agent_run_id: stri 'Content-Type': 'application/json', 'Authorization': `Bearer ${session.access_token}`, }, + // Add cache: 'no-store' to prevent caching + cache: 'no-store', }); if (!response.ok) { @@ -459,10 +364,6 @@ export const startAgent = async (threadId: string): Promise<{ agent_run_id: stri throw new Error(`Error starting agent: ${response.statusText} (${response.status})`); } - // Invalidate relevant caches - apiCache.invalidateAgentRuns(threadId); - apiCache.invalidateThreadMessages(threadId); - return response.json(); } catch (error) { console.error('[API] Failed to start agent:', error); @@ -477,6 +378,17 @@ export const startAgent = async (threadId: string): Promise<{ agent_run_id: stri }; export const stopAgent = async (agentRunId: string): Promise => { + // Add to non-running set immediately to prevent reconnection attempts + nonRunningAgentRuns.add(agentRunId); + + // Close any existing stream + const existingStream = activeStreams.get(agentRunId); + if (existingStream) { + console.log(`[API] Closing existing stream for ${agentRunId} before stopping agent`); + existingStream.close(); + activeStreams.delete(agentRunId); + } + const supabase = createClient(); const { data: { session } } = await supabase.auth.getSession(); @@ -490,6 +402,8 @@ export const stopAgent = async (agentRunId: string): Promise => { 'Content-Type': 'application/json', 'Authorization': `Bearer ${session.access_token}`, }, + // Add cache: 'no-store' to prevent caching + cache: 'no-store', }); if (!response.ok) { @@ -498,16 +412,12 @@ export const stopAgent = async (agentRunId: string): Promise => { }; export const getAgentStatus = async (agentRunId: string): Promise => { - console.log(`[API] ⚠️ Requesting agent status for ${agentRunId}`); + console.log(`[API] Requesting agent status for ${agentRunId}`); - // Check if we have a recent request for this agent run - const now = Date.now(); - const recentRequest = recentAgentStatusRequests.get(agentRunId); - - // If we have a request from the last 2 seconds, reuse its promise - if (recentRequest && now - recentRequest.timestamp < 2000) { - console.log(`[API] 🔄 Reusing recent status request for ${agentRunId} from ${now - recentRequest.timestamp}ms ago`); - return recentRequest.promise; + // If we already know this agent is not running, throw an error + if (nonRunningAgentRuns.has(agentRunId)) { + console.log(`[API] Agent run ${agentRunId} is known to be non-running, returning error`); + throw new Error(`Agent run ${agentRunId} is not running`); } try { @@ -515,69 +425,50 @@ export const getAgentStatus = async (agentRunId: string): Promise => { const { data: { session } } = await supabase.auth.getSession(); if (!session?.access_token) { - console.error('[API] ❌ No access token available for getAgentStatus'); + console.error('[API] No access token available for getAgentStatus'); throw new Error('No access token available'); } const url = `${API_URL}/agent-run/${agentRunId}`; - console.log(`[API] 🔍 Fetching from: ${url}`); + console.log(`[API] Fetching from: ${url}`); - // Create the promise for this request - const requestPromise = (async () => { - const response = await fetch(url, { - headers: { - 'Authorization': `Bearer ${session.access_token}`, - }, - }); - - if (!response.ok) { - const errorText = await response.text().catch(() => 'No error details available'); - console.error(`[API] ❌ Error getting agent status: ${response.status} ${response.statusText}`, errorText); - throw new Error(`Error getting agent status: ${response.statusText} (${response.status})`); - } - - const data = await response.json(); - console.log(`[API] ✅ Successfully got agent status:`, data); - - // Clean up old entries after 5 seconds - setTimeout(() => { - const entry = recentAgentStatusRequests.get(agentRunId); - if (entry && entry.timestamp === now) { - recentAgentStatusRequests.delete(agentRunId); - } - }, 5000); - - return data; - })(); - - // Store this request in our cache - recentAgentStatusRequests.set(agentRunId, { - timestamp: now, - promise: requestPromise + const response = await fetch(url, { + headers: { + 'Authorization': `Bearer ${session.access_token}`, + }, + // Add cache: 'no-store' to prevent caching + cache: 'no-store', }); - return requestPromise; + if (!response.ok) { + const errorText = await response.text().catch(() => 'No error details available'); + console.error(`[API] Error getting agent status: ${response.status} ${response.statusText}`, errorText); + + // If we get a 404, add to non-running set + if (response.status === 404) { + nonRunningAgentRuns.add(agentRunId); + } + + throw new Error(`Error getting agent status: ${response.statusText} (${response.status})`); + } + + const data = await response.json(); + console.log(`[API] Successfully got agent status:`, data); + + // If agent is not running, add to non-running set + if (data.status !== 'running') { + nonRunningAgentRuns.add(agentRunId); + } + + return data; } catch (error) { - console.error('[API] ❌ Failed to get agent status:', error); + console.error('[API] Failed to get agent status:', error); throw error; } }; export const getAgentRuns = async (threadId: string): Promise => { - // Check if we already have a pending request for this thread ID - const pendingRequest = fetchQueue.getQueuedAgentRuns(threadId); - if (pendingRequest) { - return pendingRequest; - } - - // Check cache first - const cached = apiCache.getAgentRuns(threadId); - if (cached) { - return cached; - } - - // Create and queue the promise to prevent duplicate requests - const fetchPromise = (async () => { + try { const supabase = createClient(); const { data: { session } } = await supabase.auth.getSession(); @@ -589,6 +480,8 @@ export const getAgentRuns = async (threadId: string): Promise => { headers: { 'Authorization': `Bearer ${session.access_token}`, }, + // Add cache: 'no-store' to prevent caching + cache: 'no-store', }); if (!response.ok) { @@ -596,15 +489,11 @@ export const getAgentRuns = async (threadId: string): Promise => { } const data = await response.json(); - const agentRuns = data.agent_runs || []; - - // Cache the result - apiCache.setAgentRuns(threadId, agentRuns); - return agentRuns; - })(); - - // Add to queue and return - return fetchQueue.setQueuedAgentRuns(threadId, fetchPromise); + return data.agent_runs || []; + } catch (error) { + console.error('Failed to get agent runs:', error); + throw error; + } }; export const streamAgent = (agentRunId: string, callbacks: { @@ -612,41 +501,58 @@ export const streamAgent = (agentRunId: string, callbacks: { onError: (error: Error | string) => void; onClose: () => void; }): () => void => { - console.log(`[STREAM] streamAgent called for ${agentRunId}, active streams: ${Array.from(activeStreams.keys()).join(', ')}`); + console.log(`[STREAM] streamAgent called for ${agentRunId}`); - // Check if there's already an active stream for this agent run - let activeStream = activeStreams.get(agentRunId); - - // If we already have a stream, just add this subscriber - if (activeStream) { - console.log(`[STREAM] Reusing existing stream for ${agentRunId}, adding subscriber`); - activeStream.subscribers.add(callbacks); + // Check if this agent run is known to be non-running + if (nonRunningAgentRuns.has(agentRunId)) { + console.log(`[STREAM] Agent run ${agentRunId} is known to be non-running, not creating stream`); + // Notify the caller immediately + setTimeout(() => { + callbacks.onError(`Agent run ${agentRunId} is not running`); + callbacks.onClose(); + }, 0); - // Return a cleanup function for this specific subscriber - return () => { - console.log(`[STREAM] Removing subscriber from ${agentRunId}`); - const stream = activeStreams.get(agentRunId); - if (stream) { - stream.subscribers.delete(callbacks); - - // If no subscribers remain, clean up the stream - if (stream.subscribers.size === 0) { - console.log(`[STREAM] No subscribers left for ${agentRunId}, closing stream`); - stream.eventSource.close(); - activeStreams.delete(agentRunId); - } - } - }; + // Return a no-op cleanup function + return () => {}; } - // If no active stream exists, create a new one - console.log(`[STREAM] Creating new stream for ${agentRunId}`); - let isClosing = false; + // Check if there's already an active stream for this agent run + const existingStream = activeStreams.get(agentRunId); + if (existingStream) { + console.log(`[STREAM] Stream already exists for ${agentRunId}, closing it first`); + existingStream.close(); + activeStreams.delete(agentRunId); + } - const setupStream = async () => { - try { - if (isClosing) { - console.log(`[STREAM] Already closing, not setting up stream for ${agentRunId}`); + // Set up a new stream + try { + const setupStream = async () => { + // First verify the agent is actually running + try { + const status = await getAgentStatus(agentRunId); + if (status.status !== 'running') { + console.log(`[STREAM] Agent run ${agentRunId} is not running (status: ${status.status}), not creating stream`); + nonRunningAgentRuns.add(agentRunId); + callbacks.onError(`Agent run ${agentRunId} is not running (status: ${status.status})`); + callbacks.onClose(); + return; + } + } catch (err) { + console.error(`[STREAM] Error verifying agent run ${agentRunId}:`, err); + + // Check if this is a "not found" error + const errorMessage = err instanceof Error ? err.message : String(err); + const isNotFoundError = errorMessage.includes('not found') || + errorMessage.includes('404') || + errorMessage.includes('does not exist'); + + if (isNotFoundError) { + console.log(`[STREAM] Agent run ${agentRunId} not found, not creating stream`); + nonRunningAgentRuns.add(agentRunId); + } + + callbacks.onError(errorMessage); + callbacks.onClose(); return; } @@ -666,14 +572,8 @@ export const streamAgent = (agentRunId: string, callbacks: { console.log(`[STREAM] Creating EventSource for ${agentRunId}`); const eventSource = new EventSource(url.toString()); - // Create and add to active streams map immediately - activeStream = { - eventSource, - lastMessageTime: Date.now(), - subscribers: new Set([callbacks]) - }; - - activeStreams.set(agentRunId, activeStream); + // Store the EventSource in the active streams map + activeStreams.set(agentRunId, eventSource); eventSource.onopen = () => { console.log(`[STREAM] Connection opened for ${agentRunId}`); @@ -684,11 +584,6 @@ export const streamAgent = (agentRunId: string, callbacks: { const rawData = event.data; if (rawData.includes('"type":"ping"')) return; - // Update last message time - if (activeStream) { - activeStream.lastMessageTime = Date.now(); - } - // Log raw data for debugging (truncated for readability) console.log(`[STREAM] Received data for ${agentRunId}: ${rawData.substring(0, 100)}${rawData.length > 100 ? '...' : ''}`); @@ -700,199 +595,131 @@ export const streamAgent = (agentRunId: string, callbacks: { // Check for "Agent run not found" error if (rawData.includes('Agent run') && rawData.includes('not found in active runs')) { - console.log(`[STREAM] ⚠️ Agent run ${agentRunId} not found in active runs, closing stream`); + console.log(`[STREAM] Agent run ${agentRunId} not found in active runs, closing stream`); - // Notify subscribers about the error - const currentStream = activeStreams.get(agentRunId); - if (currentStream) { - currentStream.subscribers.forEach(subscriber => { - try { - subscriber.onError("Agent run not found in active runs"); - subscriber.onClose(); - } catch (subError) { - console.error(`[STREAM] Error in subscriber notification for not found:`, subError); - } - }); - } + // Add to non-running set to prevent future reconnection attempts + nonRunningAgentRuns.add(agentRunId); + + // Notify about the error + callbacks.onError("Agent run not found in active runs"); + + // Clean up + eventSource.close(); + activeStreams.delete(agentRunId); + callbacks.onClose(); - // Clean up stream since agent is not found - if (!isClosing) { - cleanupStream(); - } return; } - // Check for simple completion status message + // Check for completion messages if (rawData.includes('"type":"status"') && rawData.includes('"status":"completed"')) { - console.log(`[STREAM] ⚠️ Detected simple completion status message for ${agentRunId}, closing stream`); + console.log(`[STREAM] Detected completion status message for ${agentRunId}`); - // Notify all subscribers this is the final message - const currentStream = activeStreams.get(agentRunId); - if (currentStream) { - currentStream.subscribers.forEach(subscriber => { - try { - subscriber.onMessage(rawData); - } catch (subError) { - console.error(`[STREAM] Error in subscriber onMessage for completion:`, subError); - } - }); + // Check for specific completion messages that indicate we should stop checking + if (rawData.includes('Run data not available for streaming') || + rawData.includes('Stream ended with status: completed')) { + console.log(`[STREAM] Detected final completion message for ${agentRunId}, adding to non-running set`); + // Add to non-running set to prevent future reconnection attempts + nonRunningAgentRuns.add(agentRunId); } - // Clean up stream since agent is complete - if (!isClosing) { - cleanupStream(); - } + // Notify about the message + callbacks.onMessage(rawData); + + // Clean up + eventSource.close(); + activeStreams.delete(agentRunId); + callbacks.onClose(); + return; } - // Check for completion status message - const isCompletionMessage = rawData.includes('"type":"status"') && - (rawData.includes('"status":"completed"') || - rawData.includes('"status_type":"thread_run_end"')); - - if (isCompletionMessage) { - console.log(`[STREAM] ⚠️ Detected completion status message for ${agentRunId}`); + // Check for thread run end message + if (rawData.includes('"type":"status"') && rawData.includes('"status_type":"thread_run_end"')) { + console.log(`[STREAM] Detected thread run end message for ${agentRunId}`); + + // Add to non-running set + nonRunningAgentRuns.add(agentRunId); + + // Notify about the message + callbacks.onMessage(rawData); + + // Clean up + eventSource.close(); + activeStreams.delete(agentRunId); + callbacks.onClose(); + + return; } - // Notify all subscribers about this message - const currentStream = activeStreams.get(agentRunId); - if (currentStream) { - currentStream.subscribers.forEach(subscriber => { - try { - subscriber.onMessage(rawData); - } catch (subError) { - console.error(`[STREAM] Error in subscriber onMessage:`, subError); - // Don't let subscriber errors affect other subscribers - } - }); - } + // For all other messages, just pass them through + callbacks.onMessage(rawData); - // Handle completion message cleanup after delivering to subscribers - if (isCompletionMessage && !isClosing) { - console.log(`[STREAM] ⚠️ Closing stream due to completion message for ${agentRunId}`); - cleanupStream(); - } } catch (error) { console.error(`[STREAM] Error handling message:`, error); - // Notify error without closing the stream to allow retries - const currentStream = activeStreams.get(agentRunId); - if (currentStream) { - currentStream.subscribers.forEach(subscriber => { - try { - subscriber.onError(error instanceof Error ? error : String(error)); - } catch (subError) { - console.error(`[STREAM] Error in subscriber onError:`, subError); - } - }); - } + callbacks.onError(error instanceof Error ? error : String(error)); } }; eventSource.onerror = (event) => { - console.log(`[STREAM] 🔍 EventSource error for ${agentRunId}:`, event); + console.log(`[STREAM] EventSource error for ${agentRunId}:`, event); - if (isClosing) { - console.log(`[STREAM] Error ignored because stream is closing for ${agentRunId}`); - return; - } - - // Check if we need to verify agent run status - const shouldVerifyAgentStatus = (event as any).target?.readyState === 2; // CLOSED - - if (shouldVerifyAgentStatus) { - console.log(`[STREAM] Connection closed, verifying if agent run ${agentRunId} still exists`); - - // Verify if the agent run still exists before reconnecting - getAgentStatus(agentRunId) - .then(status => { - if (status.status === 'running') { - console.log(`[STREAM] Agent run ${agentRunId} is still running, will attempt reconnection`); - // Don't clean up, let the page component handle reconnection - } else { - console.log(`[STREAM] Agent run ${agentRunId} is no longer running (${status.status}), cleaning up stream`); - cleanupStream(); - } - }) - .catch(err => { - console.error(`[STREAM] Error checking agent status after connection error:`, err); - - // If we get a 404 or similar error, the agent run doesn't exist - if (err.message && ( - err.message.includes('not found') || - err.message.includes('404') || - err.message.includes('does not exist') - )) { - console.log(`[STREAM] Agent run ${agentRunId} appears to not exist, cleaning up stream`); - cleanupStream(); - } else { - // For other errors, we'll let the page component handle reconnection - console.log(`[STREAM] Network or other error checking agent status, will let page handle reconnection`); - } - }); - } else { - // For other types of errors, we'll attempt to keep the stream alive - console.log(`[STREAM] Non-fatal error for ${agentRunId}, keeping stream alive`); - } + // Check if the agent is still running + getAgentStatus(agentRunId) + .then(status => { + if (status.status !== 'running') { + console.log(`[STREAM] Agent run ${agentRunId} is not running after error, closing stream`); + nonRunningAgentRuns.add(agentRunId); + eventSource.close(); + activeStreams.delete(agentRunId); + callbacks.onClose(); + } else { + console.log(`[STREAM] Agent run ${agentRunId} is still running after error, keeping stream open`); + // Let the browser handle reconnection for non-fatal errors + } + }) + .catch(err => { + console.error(`[STREAM] Error checking agent status after stream error:`, err); + + // Check if this is a "not found" error + const errMsg = err instanceof Error ? err.message : String(err); + const isNotFoundErr = errMsg.includes('not found') || + errMsg.includes('404') || + errMsg.includes('does not exist'); + + if (isNotFoundErr) { + console.log(`[STREAM] Agent run ${agentRunId} not found after error, closing stream`); + nonRunningAgentRuns.add(agentRunId); + eventSource.close(); + activeStreams.delete(agentRunId); + callbacks.onClose(); + } + + // For other errors, notify but don't close the stream + callbacks.onError(errMsg); + }); }; - - } catch (error) { - console.error(`[STREAM] Error setting up stream for ${agentRunId}:`, error); - - if (!isClosing) { - callbacks.onError(error instanceof Error ? error : String(error)); - cleanupStream(); + }; + + // Start the stream setup + setupStream(); + + // Return a cleanup function + return () => { + console.log(`[STREAM] Cleanup called for ${agentRunId}`); + const stream = activeStreams.get(agentRunId); + if (stream) { + console.log(`[STREAM] Closing stream for ${agentRunId}`); + stream.close(); + activeStreams.delete(agentRunId); } - } - }; - - const cleanupStream = () => { - if (isClosing) return; - isClosing = true; - - console.log(`[STREAM] Cleaning up stream for ${agentRunId}`); - - const stream = activeStreams.get(agentRunId); - if (stream) { - // Close the EventSource - stream.eventSource.close(); - - // Notify all subscribers - stream.subscribers.forEach(subscriber => { - try { - subscriber.onClose(); - } catch (error) { - console.error(`[STREAM] Error in subscriber onClose:`, error); - } - }); - - // Remove from active streams - activeStreams.delete(agentRunId); - } - }; - - // Setup the stream - setupStream(); - - // Return cleanup function for this subscriber - return () => { - console.log(`[STREAM] Cleanup called for ${agentRunId}`); - - const stream = activeStreams.get(agentRunId); - if (stream) { - // Remove this subscriber - stream.subscribers.delete(callbacks); - - // If this was the last subscriber, clean up the entire stream - if (stream.subscribers.size === 0) { - console.log(`[STREAM] Last subscriber removed, closing stream for ${agentRunId}`); - if (!isClosing) { - cleanupStream(); - } - } else { - console.log(`[STREAM] Subscriber removed, but ${stream.subscribers.size} still active for ${agentRunId}`); - } - } - }; + }; + } catch (error) { + console.error(`[STREAM] Error setting up stream for ${agentRunId}:`, error); + callbacks.onError(error instanceof Error ? error : String(error)); + callbacks.onClose(); + return () => {}; + } }; // Sandbox API Functions