bad fe wip

This commit is contained in:
marko-kraemer 2025-04-18 12:27:21 +01:00
parent 749565c3d1
commit 638fa069db
3 changed files with 777 additions and 967 deletions

View File

@ -7,7 +7,7 @@ import { Button } from '@/components/ui/button';
import {
ArrowDown, CheckCircle, CircleDashed, AlertTriangle, Info
} from 'lucide-react';
import { addUserMessage, getMessages, startAgent, stopAgent, getAgentStatus, streamAgent, getAgentRuns, getProject, getThread, updateProject, Project, Message as BaseApiMessageType } from '@/lib/api';
import { addUserMessage, getMessages, startAgent, stopAgent, getAgentRuns, getProject, getThread, updateProject, Project, Message as BaseApiMessageType } from '@/lib/api';
import { toast } from 'sonner';
import { Skeleton } from "@/components/ui/skeleton";
import { ChatInput } from '@/components/thread/chat-input';
@ -15,6 +15,7 @@ import { FileViewerModal } from '@/components/thread/file-viewer-modal';
import { SiteHeader } from "@/components/thread/thread-site-header"
import { ToolCallSidePanel, SidePanelContent, ToolCallData } from "@/components/thread/tool-call-side-panel";
import { useSidebar } from "@/components/ui/sidebar";
import { useAgentStream, AgentStreamStatus } from '@/hooks/useAgentStream';
import { UnifiedMessage, ParsedContent, ParsedMetadata, ThreadParams } from '@/components/thread/types';
import { getToolIcon, extractPrimaryParam, safeJsonParse } from '@/components/thread/utils';
@ -40,14 +41,10 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const [isSending, setIsSending] = useState(false);
const [error, setError] = useState<string | null>(null);
const [agentRunId, setAgentRunId] = useState<string | null>(null);
const [agentStatus, setAgentStatus] = useState<'idle' | 'running'>('idle');
const [isStreaming, setIsStreaming] = useState(false);
const [streamingTextContent, setStreamingTextContent] = useState('');
const [streamingToolCall, setStreamingToolCall] = useState<ParsedContent | null>(null);
const [agentStatus, setAgentStatus] = useState<'idle' | 'running' | 'connecting' | 'error'>('idle');
const [isSidePanelOpen, setIsSidePanelOpen] = useState(false);
const [sidePanelContent, setSidePanelContent] = useState<SidePanelContent | null>(null);
const streamCleanupRef = useRef<(() => void) | null>(null);
const messagesEndRef = useRef<HTMLDivElement>(null);
const messagesContainerRef = useRef<HTMLDivElement>(null);
const latestMessageRef = useRef<HTMLDivElement>(null);
@ -63,7 +60,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const initialLoadCompleted = useRef<boolean>(false);
const messagesLoadedRef = useRef(false);
const agentRunsCheckedRef = useRef(false);
const streamInitializedRef = useRef(false);
const { state: leftSidebarState, setOpen: setLeftSidebarOpen } = useSidebar();
const initialLayoutAppliedRef = useRef(false);
@ -106,347 +102,72 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
return () => window.removeEventListener('keydown', handleKeyDown);
}, [toggleSidePanel]);
const handleStreamAgent = useCallback(async (runId: string) => {
if (!runId) return;
// Set the agent run ID and status first to ensure UI shows running state
setAgentRunId(runId);
setAgentStatus('running');
// Check if we already have an active stream for this run
if (streamCleanupRef.current && streamInitializedRef.current && agentRunId === runId) {
console.log(`[PAGE] Already streaming agent run ${runId}, not reconnecting`);
return;
}
// Clean up any existing stream
if (streamCleanupRef.current) {
console.log(`[PAGE] Cleaning up existing stream before creating new one for ${runId}`);
streamCleanupRef.current();
streamCleanupRef.current = null;
}
// Mark stream as initializing and show streaming state in UI
console.log(`[PAGE] Setting up stream for agent run ${runId}`);
streamInitializedRef.current = true;
setIsStreaming(true);
setStreamingTextContent('');
setStreamingToolCall(null);
// Create the stream and get cleanup function
const cleanup = streamAgent(runId, {
onMessage: async (rawData: string) => {
try {
(window as any).lastStreamMessage = Date.now();
let processedData = rawData;
if (processedData.startsWith('data: ')) {
processedData = processedData.substring(6).trim();
}
if (!processedData) return;
// Handle special non-JSON completion message
if (processedData.includes('"type":"status"') && processedData.includes('"status":"completed"')) {
console.log('[PAGE] Received completion status message');
setIsStreaming(false);
setAgentStatus('idle');
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
streamInitializedRef.current = false;
return;
}
const message: UnifiedMessage = safeJsonParse(processedData, null);
if (!message) {
console.warn('[PAGE] Failed to parse streamed message:', processedData);
return;
}
console.log(`[PAGE] Received Streamed Message (Type: ${message.type}, ID: ${message.message_id}):`, message);
const parsedContent = safeJsonParse<ParsedContent>(message.content, {});
const parsedMetadata = safeJsonParse<ParsedMetadata>(message.metadata, {});
if (message.type === 'assistant' && parsedMetadata.stream_status === 'chunk' && parsedContent.content) {
setStreamingTextContent(prev => prev + parsedContent.content);
}
else if (message.type === 'assistant' && parsedMetadata.stream_status === 'complete') {
setStreamingTextContent('');
setStreamingToolCall(null);
// Only add/replace message if it has a message_id
if (message.message_id) {
setMessages(prev => {
// Check if this message already exists in the array
const messageExists = prev.some(m => m.message_id === message.message_id);
if (messageExists) {
// Replace the existing message
return prev.map(m => m.message_id === message.message_id ? message : m);
} else {
// Add as a new message
return [...prev, message];
}
});
}
}
else if (message.type === 'tool') {
setStreamingToolCall(null);
// Only add/replace if it has a message_id
if (message.message_id) {
setMessages(prev => {
// Check if this message already exists
const messageExists = prev.some(m => m.message_id === message.message_id);
if (messageExists) {
// Replace the existing message
return prev.map(m => m.message_id === message.message_id ? message : m);
} else {
// Add as a new message
return [...prev, message];
}
});
}
}
else if (message.type === 'status') {
switch(parsedContent.status_type) {
case 'tool_started':
setStreamingToolCall({
role: 'assistant',
status_type: 'tool_started',
name: parsedContent.function_name,
arguments: parsedContent.arguments,
xml_tag_name: parsedContent.xml_tag_name,
tool_index: parsedContent.tool_index
});
if (isSidePanelOpen) {
// Convert to ToolCallData format
const toolCallData: ToolCallData = {
id: message.message_id || undefined,
name: parsedContent.function_name || parsedContent.xml_tag_name || 'Unknown Tool',
arguments: parsedContent.arguments || '{}',
index: parsedContent.tool_index
};
setSidePanelContent(toolCallData);
}
break;
case 'tool_completed':
case 'tool_failed':
case 'tool_error':
if (streamingToolCall?.tool_index === parsedContent.tool_index) {
setStreamingToolCall(null);
}
break;
case 'finish':
console.log('[PAGE] Received finish status:', parsedContent.finish_reason);
if (parsedContent.finish_reason === 'xml_tool_limit_reached') {
// toast.info('Agent stopped due to tool limit.');
}
break;
case 'thread_run_start':
// Ensure we're showing running state for new runs
setAgentStatus('running');
break;
case 'assistant_response_start':
break;
case 'thread_run_end':
console.log('[PAGE] Received thread run end status.');
setIsStreaming(false);
setAgentStatus('idle');
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
streamInitializedRef.current = false;
break;
case 'error':
console.error('[PAGE] Received error status:', parsedContent.message);
toast.error(`Agent Error: ${parsedContent.message}`);
setIsStreaming(false);
setAgentStatus('idle');
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
streamInitializedRef.current = false;
break;
default:
console.warn('[PAGE] Unhandled status type:', parsedContent.status_type);
}
}
else if (message.type === 'user' || message.type === 'assistant' || message.type === 'system') {
// Handle other message types (user, assistant if not streaming, system)
if (message.message_id) {
setMessages(prev => {
// Check if message already exists
const messageExists = prev.some(m => m.message_id === message.message_id);
if (messageExists) {
// Replace existing message
return prev.map(m => m.message_id === message.message_id ? message : m);
} else {
// Add new message
return [...prev, message];
}
});
} else if (!parsedMetadata.stream_status) {
// Only add messages without IDs if they're not temporary streaming chunks
setMessages(prev => [...prev, message]);
}
}
else {
console.warn('[PAGE] Unhandled message type:', message.type);
}
} catch (error) {
console.error('[PAGE] Error processing streamed message:', error, rawData);
}
},
onError: (error: Error | string) => {
console.error('[PAGE] Streaming error:', error);
const errorMessage = typeof error === 'string' ? error : error.message;
// Check if this is a "not found" error
const isNotFoundError = errorMessage.includes('not found') ||
errorMessage.includes('does not exist');
// Only show toast for errors that aren't "not found" errors
if (!isNotFoundError) {
toast.error(errorMessage);
}
// Reset stream state
streamCleanupRef.current = null;
// If this is a "not found" error, reset agent state immediately
if (isNotFoundError) {
console.log(`[PAGE] Agent run ${runId} not found, resetting state immediately`);
setIsStreaming(false);
setAgentStatus('idle');
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
streamInitializedRef.current = false;
return;
}
// For other errors, check if agent is still running
if (agentRunId === runId) {
console.log(`[PAGE] Stream error for active agent ${runId}, checking if still running...`);
getAgentStatus(runId).then(status => {
if (status.status === 'running') {
console.log(`[PAGE] Agent ${runId} still running after stream error, will reconnect shortly`);
// Give a short delay before reconnecting
setTimeout(() => {
if (agentRunId === runId) {
streamInitializedRef.current = false;
handleStreamAgent(runId);
}
}, 2000);
} else {
// Agent is no longer running, reset state
console.log(`[PAGE] Agent ${runId} not running after stream error, resetting state`);
setIsStreaming(false);
setAgentStatus('idle');
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
streamInitializedRef.current = false;
}
}).catch(err => {
console.error(`[PAGE] Error checking agent status after stream error:`, err);
// Reset on error checking status
setIsStreaming(false);
setAgentStatus('idle');
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
streamInitializedRef.current = false;
});
} else {
// This was not for the current agent run, just clean up
setIsStreaming(false);
streamInitializedRef.current = false;
}
},
onClose: async () => {
console.log('[PAGE] Stream connection closed by server.');
// Check if agent is still running after stream close
if (agentStatus === 'running' && agentRunId === runId) {
console.log(`[PAGE] Stream closed for active agent ${runId}, checking if still running...`);
try {
const status = await getAgentStatus(runId);
if (status.status === 'running') {
console.log(`[PAGE] Agent ${runId} still running after stream closed, will reconnect`);
// Stream closed but agent is still running - reconnect
setTimeout(() => {
if (agentRunId === runId) {
console.log(`[PAGE] Reconnecting to running agent ${runId}`);
streamInitializedRef.current = false;
handleStreamAgent(runId);
}
}, 1000);
return;
} else {
console.log(`[PAGE] Agent ${runId} not running after stream closed (${status.status}), resetting state`);
// Agent is done, reset state
setIsStreaming(false);
setAgentStatus('idle');
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
streamCleanupRef.current = null;
streamInitializedRef.current = false;
}
} catch (err) {
console.error(`[PAGE] Error checking agent status after stream close:`, err);
// Check if this is a "not found" error
const errorMessage = err instanceof Error ? err.message : String(err);
const isNotFoundError = errorMessage.includes('not found') ||
errorMessage.includes('404') ||
errorMessage.includes('does not exist');
if (isNotFoundError) {
console.log(`[PAGE] Agent run ${runId} not found after stream close, resetting state`);
// Reset all state since the agent run doesn't exist
setIsStreaming(false);
setAgentStatus('idle');
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
streamCleanupRef.current = null;
streamInitializedRef.current = false;
return;
}
// For other errors, only reset streaming state but maintain agent status as running
setIsStreaming(false);
// Don't set agent status to idle here
setStreamingTextContent('');
setStreamingToolCall(null);
streamCleanupRef.current = null;
// Try to reconnect after a short delay if agent is still marked as running
setTimeout(() => {
if (agentRunId === runId && agentStatus === 'running') {
console.log(`[PAGE] Attempting to reconnect after error checking status`);
streamInitializedRef.current = false;
handleStreamAgent(runId);
}
}, 2000);
}
} else {
// Normal cleanup for non-running agent
streamCleanupRef.current = null;
streamInitializedRef.current = false;
}
const handleNewMessageFromStream = useCallback((message: UnifiedMessage) => {
setMessages(prev => {
const messageExists = prev.some(m => m.message_id === message.message_id);
if (messageExists) {
return prev.map(m => m.message_id === message.message_id ? message : m);
} else {
return [...prev, message];
}
});
// Store cleanup function
streamCleanupRef.current = cleanup;
}, []);
}, [threadId, agentRunId, agentStatus, isSidePanelOpen]);
const handleStreamStatusChange = useCallback((hookStatus: AgentStreamStatus) => {
console.log(`[PAGE] Hook status changed: ${hookStatus}`);
switch(hookStatus) {
case 'idle':
case 'completed':
case 'stopped':
case 'agent_not_running':
setAgentStatus('idle');
setAgentRunId(null);
break;
case 'connecting':
setAgentStatus('connecting');
break;
case 'streaming':
setAgentStatus('running');
break;
case 'error':
setAgentStatus('error');
break;
}
}, []);
const handleStreamError = useCallback((errorMessage: string) => {
console.error(`[PAGE] Stream hook error: ${errorMessage}`);
if (!errorMessage.toLowerCase().includes('not found') &&
!errorMessage.toLowerCase().includes('agent run is not running')) {
toast.error(`Stream Error: ${errorMessage}`);
}
}, []);
const handleStreamClose = useCallback(() => {
console.log(`[PAGE] Stream hook closed with final status: ${agentStatus}`);
}, [agentStatus]);
const {
status: streamHookStatus,
textContent: streamingTextContent,
toolCall: streamingToolCall,
error: streamError,
agentRunId: currentHookRunId,
startStreaming,
stopStreaming,
} = useAgentStream({
onMessage: handleNewMessageFromStream,
onStatusChange: handleStreamStatusChange,
onError: handleStreamError,
onClose: handleStreamClose,
});
useEffect(() => {
if (agentRunId && agentRunId !== currentHookRunId) {
console.log(`[PAGE] Target agentRunId set to ${agentRunId}, initiating stream...`);
startStreaming(agentRunId);
}
}, [agentRunId, startStreaming]);
useEffect(() => {
let isMounted = true;
@ -500,29 +221,24 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
}
}
// Check once for active agent run on initial load
if (!agentRunsCheckedRef.current && isMounted) {
try {
console.log('[PAGE] Checking for active agent runs...');
const agentRuns = await getAgentRuns(threadId);
// Mark as checked immediately to prevent duplicate checks
agentRunsCheckedRef.current = true;
// Look for running agent
const activeRun = agentRuns.find(run => run.status === 'running');
if (activeRun) {
if (activeRun && isMounted) {
console.log('[PAGE] Found active run on load:', activeRun.id);
// Don't set agentRunId here, let handleStreamAgent do it
handleStreamAgent(activeRun.id);
setAgentRunId(activeRun.id);
} else {
console.log('[PAGE] No active agent runs found');
setAgentStatus('idle');
if (isMounted) setAgentStatus('idle');
}
} catch (err) {
console.error('[PAGE] Error checking for active runs:', err);
agentRunsCheckedRef.current = true;
setAgentStatus('idle');
if (isMounted) setAgentStatus('idle');
}
}
@ -542,68 +258,14 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
loadData();
// Handle reconnection when tab visibility changes
const handleVisibilityChange = () => {
if (document.visibilityState === 'visible' && agentRunId && agentStatus === 'running') {
const lastMessage = (window as any).lastStreamMessage || 0;
const now = Date.now();
const messageTimeout = 10000; // Reduced timeout for faster reconnection
// If no recent messages and not already attempting to reconnect
if (now - lastMessage > messageTimeout && !streamInitializedRef.current) {
console.log('[PAGE] Tab became visible, stream appears stale. Verifying agent status...');
getAgentStatus(agentRunId).then(status => {
if (status.status === 'running') {
console.log('[PAGE] Agent still running after visibility change, reconnecting');
handleStreamAgent(agentRunId);
} else {
console.log('[PAGE] Agent no longer running after visibility change');
setAgentStatus('idle');
setAgentRunId(null);
}
}).catch(err => {
console.error('[PAGE] Error checking agent status on visibility change:', err);
// Check if this is a "not found" error
const errorMessage = err instanceof Error ? err.message : String(err);
const isNotFoundError = errorMessage.includes('not found') ||
errorMessage.includes('404') ||
errorMessage.includes('does not exist');
if (isNotFoundError) {
console.log(`[PAGE] Agent run ${agentRunId} not found after visibility change, resetting state`);
// Reset all state since the agent run doesn't exist
setAgentStatus('idle');
setAgentRunId(null);
return;
}
// For other errors, don't reset agent status - maintain as running
});
}
}
};
document.addEventListener('visibilitychange', handleVisibilityChange);
return () => {
isMounted = false;
document.removeEventListener('visibilitychange', handleVisibilityChange);
if (streamCleanupRef.current) {
console.log('[PAGE] Cleaning up stream on unmount');
streamCleanupRef.current();
streamCleanupRef.current = null;
streamInitializedRef.current = false;
}
};
}, [threadId, handleStreamAgent]);
}, [threadId]);
const handleSubmitMessage = useCallback(async (message: string) => {
if (!message.trim()) return;
setIsSending(true);
setStreamingTextContent('');
setStreamingToolCall(null);
const optimisticUserMessage: UnifiedMessage = {
message_id: `temp-${Date.now()}`,
@ -638,12 +300,8 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const agentResult = results[1].value;
setMessages(prev => prev.filter(m => m.message_id !== optimisticUserMessage.message_id));
// Reset stream for new agent run
streamInitializedRef.current = false;
// Connect to the new agent run
handleStreamAgent(agentResult.agent_run_id);
setAgentRunId(agentResult.agent_run_id);
} catch (err) {
console.error('Error sending message or starting agent:', err);
@ -652,39 +310,13 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
} finally {
setIsSending(false);
}
}, [threadId, handleStreamAgent]);
}, [threadId]);
const handleStopAgent = useCallback(async () => {
if (!agentRunId) return;
console.log(`[PAGE] Stopping agent run: ${agentRunId}`);
// Set UI state immediately
setIsStreaming(false);
console.log(`[PAGE] Requesting agent stop via hook.`);
setAgentStatus('idle');
// Store agent run ID for the API call
const runIdToStop = agentRunId;
// Reset all state
setAgentRunId(null);
setStreamingTextContent('');
setStreamingToolCall(null);
// Clean up stream
if (streamCleanupRef.current) {
streamCleanupRef.current();
streamCleanupRef.current = null;
streamInitializedRef.current = false;
}
try {
await stopAgent(runIdToStop);
toast.success('Agent stop request sent.');
} catch (err) {
console.error('[PAGE] Error stopping agent:', err);
toast.error(err instanceof Error ? err.message : 'Failed to stop agent');
}
}, [agentRunId]);
await stopStreaming();
}, [stopStreaming]);
const handleScroll = () => {
if (!messagesContainerRef.current) return;
@ -721,85 +353,9 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setUserHasScrolled(false);
};
// Periodically check agent run status if not streaming
useEffect(() => {
let checkInterval: NodeJS.Timeout | null = null;
// If agent should be running but we're not streaming
if (agentRunId && agentStatus === 'running' && !isStreaming) {
console.warn('[PAGE] Agent running but not streaming. Setting up status check interval.');
// Track consecutive completed responses
let consecutiveCompleted = 0;
const MAX_CONSECUTIVE_COMPLETED = 2;
checkInterval = setInterval(async () => {
try {
console.log(`[PAGE] Checking status of agent run ${agentRunId}`);
const status = await getAgentStatus(agentRunId);
// Check if the agent is still running
if (status.status === 'running') {
console.log('[PAGE] Agent confirmed running, reconnecting to stream');
consecutiveCompleted = 0; // Reset counter
// Force stream reconnection since agent is still running
if (!streamInitializedRef.current) {
handleStreamAgent(agentRunId);
}
} else {
console.log(`[PAGE] Agent no longer running (status: ${status.status}), incrementing completed counter`);
// Agent is not running anymore, increment counter
consecutiveCompleted++;
if (consecutiveCompleted >= MAX_CONSECUTIVE_COMPLETED) {
console.log(`[PAGE] Agent confirmed ${status.status} after ${consecutiveCompleted} checks, resetting state`);
// Reset all state after confirming non-running status multiple times
setAgentRunId(null);
setAgentStatus('idle');
if (checkInterval) {
clearInterval(checkInterval);
checkInterval = null;
}
}
}
} catch (err) {
console.error('[PAGE] Error checking agent status:', err);
// Check if this is a "not found" error
const errorMessage = err instanceof Error ? err.message : String(err);
const isNotFoundError = errorMessage.includes('not found') ||
errorMessage.includes('404') ||
errorMessage.includes('does not exist');
if (isNotFoundError) {
console.log(`[PAGE] Agent run ${agentRunId} not found during status check, resetting state`);
// Reset all state since the agent run doesn't exist
setAgentRunId(null);
setAgentStatus('idle');
if (checkInterval) {
clearInterval(checkInterval);
checkInterval = null;
}
return;
}
// For other errors, don't reset agent state to avoid flickering
// Just let next check try again
}
}, 8000); // Check every 8 seconds
}
return () => {
if (checkInterval) {
clearInterval(checkInterval);
}
};
}, [agentRunId, agentStatus, isStreaming, handleStreamAgent]);
// Debug logging
useEffect(() => {
console.log(`[PAGE] 🔄 AgentStatus: ${agentStatus}, isStreaming: ${isStreaming}, agentRunId: ${agentRunId || 'none'}`);
}, [agentStatus, isStreaming, agentRunId]);
console.log(`[PAGE] 🔄 Page AgentStatus: ${agentStatus}, Hook Status: ${streamHookStatus}, Target RunID: ${agentRunId || 'none'}, Hook RunID: ${currentHookRunId || 'none'}`);
}, [agentStatus, streamHookStatus, agentRunId, currentHookRunId]);
const handleOpenFileViewer = useCallback(() => setFileViewerOpen(true), []);
@ -966,7 +522,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
onScroll={handleScroll}
>
<div className="mx-auto max-w-3xl">
{messages.length === 0 && !streamingTextContent && !streamingToolCall ? (
{messages.length === 0 && !streamingTextContent && !streamingToolCall && agentStatus === 'idle' ? (
<div className="flex h-full items-center justify-center">
<div className="text-center text-muted-foreground">Send a message to start.</div>
</div>
@ -1164,7 +720,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
);
}
})}
{(streamingTextContent || streamingToolCall) && (
{(streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && (
<div ref={latestMessageRef}>
<div className="flex items-start gap-3">
<div className="flex-shrink-0 w-5 h-5 rounded-full flex items-center justify-center overflow-hidden bg-gray-200">
@ -1177,7 +733,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
{streamingTextContent}
</span>
)}
{isStreaming && <span className="inline-block h-4 w-0.5 bg-gray-400 ml-0.5 -mb-1 animate-pulse" />}
{(streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && <span className="inline-block h-4 w-0.5 bg-gray-400 ml-0.5 -mb-1 animate-pulse" />}
{streamingToolCall && (
<div className="mt-2">
@ -1203,7 +759,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
</div>
)}
{agentStatus === 'running' && !streamingTextContent && !streamingToolCall && messages.length > 0 && messages[messages.length-1].type === 'user' && (
<div>
<div ref={latestMessageRef}>
<div className="flex items-start gap-3">
<div className="flex-shrink-0 w-5 h-5 rounded-full flex items-center justify-center overflow-hidden bg-gray-200">
<Image src="/kortix-symbol.svg" alt="Suna" width={14} height={14} className="object-contain"/>
@ -1239,8 +795,8 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
onSubmit={handleSubmitMessage}
placeholder="Ask Suna anything..."
loading={isSending}
disabled={isSending || agentStatus === 'running'}
isAgentRunning={agentStatus === 'running'}
disabled={isSending || agentStatus === 'running' || agentStatus === 'connecting'}
isAgentRunning={agentStatus === 'running' || agentStatus === 'connecting'}
onStopAgent={handleStopAgent}
autoFocus={!isLoading}
onFileBrowse={handleOpenFileViewer}

View File

@ -0,0 +1,427 @@
import { useState, useEffect, useRef, useCallback } from 'react';
import {
streamAgent,
getAgentStatus,
stopAgent,
AgentRun,
Message as ApiMessageType
} from '@/lib/api';
import { toast } from 'sonner';
import { UnifiedMessage, ParsedContent, ParsedMetadata } from '@/components/thread/types';
import { safeJsonParse } from '@/components/thread/utils';
// Define the possible statuses for the stream hook
export type AgentStreamStatus =
| 'idle' // No active stream or agent run
| 'connecting' // Verifying agent status and initiating stream
| 'streaming' // Actively receiving messages
| 'completed' // Stream finished successfully, agent run completed
| 'stopped' // Stream stopped by user action
| 'error' // An error occurred during streaming or connection
| 'agent_not_running'; // Agent run provided was not in a running state
// Define the structure returned by the hook
export interface UseAgentStreamResult {
status: AgentStreamStatus;
textContent: string;
toolCall: ParsedContent | null;
error: string | null;
agentRunId: string | null; // Expose the currently managed agentRunId
startStreaming: (runId: string) => void;
stopStreaming: () => Promise<void>;
}
// Define the callbacks the hook consumer can provide
export interface AgentStreamCallbacks {
onMessage: (message: UnifiedMessage) => void; // Callback for complete messages
onStatusChange?: (status: AgentStreamStatus) => void; // Optional: Notify on internal status changes
onError?: (error: string) => void; // Optional: Notify on errors
onClose?: (finalStatus: AgentStreamStatus) => void; // Optional: Notify when streaming definitively ends
}
export function useAgentStream(callbacks: AgentStreamCallbacks): UseAgentStreamResult {
const [agentRunId, setAgentRunId] = useState<string | null>(null);
const [status, setStatus] = useState<AgentStreamStatus>('idle');
const [textContent, setTextContent] = useState<string>('');
const [toolCall, setToolCall] = useState<ParsedContent | null>(null);
const [error, setError] = useState<string | null>(null);
const streamCleanupRef = useRef<(() => void) | null>(null);
const isMountedRef = useRef<boolean>(true);
const currentRunIdRef = useRef<string | null>(null); // Ref to track the run ID being processed
// Internal function to update status and notify consumer
const updateStatus = useCallback((newStatus: AgentStreamStatus) => {
if (isMountedRef.current) {
setStatus(newStatus);
callbacks.onStatusChange?.(newStatus);
if (newStatus === 'error' && error) {
callbacks.onError?.(error);
}
if (['completed', 'stopped', 'error', 'agent_not_running'].includes(newStatus)) {
callbacks.onClose?.(newStatus);
}
}
}, [callbacks, error]); // Include error dependency
// Function to handle finalization of a stream (completion, stop, error)
const finalizeStream = useCallback((finalStatus: AgentStreamStatus, runId: string | null = agentRunId) => {
if (!isMountedRef.current) return;
console.log(`[useAgentStream] Finalizing stream for ${runId} with status: ${finalStatus}`);
if (streamCleanupRef.current) {
streamCleanupRef.current();
streamCleanupRef.current = null;
}
// Reset streaming-specific state
setTextContent('');
setToolCall(null);
// Update status and clear run ID
updateStatus(finalStatus);
setAgentRunId(null);
currentRunIdRef.current = null;
// If the run was stopped or completed, try to get final status to update nonRunning set
if (runId && (finalStatus === 'completed' || finalStatus === 'stopped' || finalStatus === 'agent_not_running')) {
getAgentStatus(runId).catch(err => {
console.log(`[useAgentStream] Post-finalization status check for ${runId} failed (this might be expected if not found): ${err.message}`);
});
}
}, [agentRunId, updateStatus]);
// --- Stream Callback Handlers ---
const handleStreamMessage = useCallback((rawData: string) => {
if (!isMountedRef.current) return;
(window as any).lastStreamMessage = Date.now(); // Keep track of last message time
let processedData = rawData;
if (processedData.startsWith('data: ')) {
processedData = processedData.substring(6).trim();
}
if (!processedData) return;
// --- Early exit for non-JSON completion messages ---
if (processedData.includes('"type":"status"') && processedData.includes('"status":"completed"')) {
console.log('[useAgentStream] Received final completion status message');
finalizeStream('completed', currentRunIdRef.current);
return;
}
if (processedData.includes('Run data not available for streaming') || processedData.includes('Stream ended with status: completed')) {
console.log(`[useAgentStream] Detected final completion message: "${processedData}", finalizing.`);
finalizeStream('completed', currentRunIdRef.current);
return;
}
// --- Process JSON messages ---
const message: UnifiedMessage = safeJsonParse(processedData, null);
if (!message) {
console.warn('[useAgentStream] Failed to parse streamed message:', processedData);
return;
}
const parsedContent = safeJsonParse<ParsedContent>(message.content, {});
const parsedMetadata = safeJsonParse<ParsedMetadata>(message.metadata, {});
// Update status to streaming if we receive a valid message
if (status !== 'streaming') updateStatus('streaming');
switch (message.type) {
case 'assistant':
if (parsedMetadata.stream_status === 'chunk' && parsedContent.content) {
setTextContent(prev => prev + parsedContent.content);
} else if (parsedMetadata.stream_status === 'complete') {
setTextContent('');
setToolCall(null);
if (message.message_id) callbacks.onMessage(message);
} else if (!parsedMetadata.stream_status) {
// Handle non-chunked assistant messages if needed
if (message.message_id) callbacks.onMessage(message);
}
break;
case 'tool':
setToolCall(null); // Clear any streaming tool call
if (message.message_id) callbacks.onMessage(message);
break;
case 'status':
switch (parsedContent.status_type) {
case 'tool_started':
setToolCall({
role: 'assistant',
status_type: 'tool_started',
name: parsedContent.function_name,
arguments: parsedContent.arguments,
xml_tag_name: parsedContent.xml_tag_name,
tool_index: parsedContent.tool_index
});
break;
case 'tool_completed':
case 'tool_failed':
case 'tool_error':
if (toolCall?.tool_index === parsedContent.tool_index) {
setToolCall(null);
}
break;
case 'thread_run_end':
console.log('[useAgentStream] Received thread run end status, finalizing.');
break;
case 'finish':
// Optional: Handle finish reasons like 'xml_tool_limit_reached'
console.log('[useAgentStream] Received finish status:', parsedContent.finish_reason);
// Don't finalize here, wait for thread_run_end or completion message
break;
case 'error':
console.error('[useAgentStream] Received error status message:', parsedContent.message);
setError(parsedContent.message || 'Agent run failed');
finalizeStream('error', currentRunIdRef.current);
break;
// Ignore thread_run_start, assistant_response_start etc. for now
default:
// console.debug('[useAgentStream] Received unhandled status type:', parsedContent.status_type);
break;
}
break;
case 'user':
case 'system':
// Handle other message types if necessary, e.g., if backend sends historical context
if (message.message_id) callbacks.onMessage(message);
break;
default:
console.warn('[useAgentStream] Unhandled message type:', message.type);
}
}, [status, toolCall, callbacks, finalizeStream, updateStatus]);
const handleStreamError = useCallback((err: Error | string | Event) => {
if (!isMountedRef.current) return;
// Extract error message
let errorMessage = 'Unknown streaming error';
if (typeof err === 'string') {
errorMessage = err;
} else if (err instanceof Error) {
errorMessage = err.message;
} else if (err instanceof Event && err.type === 'error') {
// Standard EventSource errors don't have much detail, might need status check
errorMessage = 'Stream connection error';
}
console.error('[useAgentStream] Streaming error:', errorMessage, err);
setError(errorMessage);
const runId = currentRunIdRef.current;
if (!runId) {
console.warn('[useAgentStream] Stream error occurred but no agentRunId is active.');
finalizeStream('error'); // Finalize with generic error if no runId
return;
}
// Check agent status immediately after an error
getAgentStatus(runId)
.then(agentStatus => {
if (!isMountedRef.current) return; // Check mount status again after async call
if (agentStatus.status === 'running') {
console.warn(`[useAgentStream] Stream error for ${runId}, but agent is still running. Attempting reconnect.`);
// Consider adding a delay or backoff here
// For now, just finalize with error and let the user retry or handle reconnection logic outside
finalizeStream('error', runId);
toast.warning("Stream interrupted. Agent might still be running.");
} else {
console.log(`[useAgentStream] Stream error for ${runId}, agent status is ${agentStatus.status}. Finalizing stream.`);
finalizeStream(agentStatus.status === 'completed' ? 'completed' : 'error', runId);
}
})
.catch(statusError => {
if (!isMountedRef.current) return;
const statusErrorMessage = statusError instanceof Error ? statusError.message : String(statusError);
console.error(`[useAgentStream] Error checking agent status for ${runId} after stream error: ${statusErrorMessage}`);
const isNotFoundError = statusErrorMessage.includes('not found') ||
statusErrorMessage.includes('404') ||
statusErrorMessage.includes('does not exist');
if (isNotFoundError) {
console.log(`[useAgentStream] Agent run ${runId} not found after stream error. Finalizing.`);
finalizeStream('agent_not_running', runId); // Use a specific status
} else {
// For other status check errors, finalize with the original stream error
finalizeStream('error', runId);
}
});
}, [finalizeStream]);
const handleStreamClose = useCallback(() => {
if (!isMountedRef.current) return;
console.log('[useAgentStream] Stream connection closed by server.');
const runId = currentRunIdRef.current;
if (!runId) {
console.warn('[useAgentStream] Stream closed but no active agentRunId.');
// If status was streaming, something went wrong, finalize as error
if (status === 'streaming' || status === 'connecting') {
finalizeStream('error');
} else if (status !== 'idle' && status !== 'completed' && status !== 'stopped' && status !== 'agent_not_running') {
// If in some other state, just go back to idle if no runId
finalizeStream('idle');
}
return;
}
// Immediately check the agent status when the stream closes unexpectedly
// This covers cases where the agent finished but the final message wasn't received,
// or if the agent errored out on the backend.
getAgentStatus(runId)
.then(agentStatus => {
if (!isMountedRef.current) return; // Check mount status again
console.log(`[useAgentStream] Agent status after stream close for ${runId}: ${agentStatus.status}`);
if (agentStatus.status === 'running') {
// This case is tricky. The stream closed, but the agent is running.
// Could be a temporary network issue, or the backend stream terminated prematurely.
console.warn(`[useAgentStream] Stream closed for ${runId}, but agent is still running. Reconnection logic needed or signal error.`);
setError('Stream closed unexpectedly while agent was running.');
finalizeStream('error', runId); // Finalize as error for now
// Optionally: Implement automatic reconnection attempts here or notify parent component.
toast.warning("Stream disconnected. Agent might still be running.");
} else {
// Agent is not running (completed, stopped, error). Finalize accordingly.
finalizeStream(agentStatus.status === 'completed' ? 'completed' : 'error', runId);
}
})
.catch(err => {
if (!isMountedRef.current) return;
const errorMessage = err instanceof Error ? err.message : String(err);
console.error(`[useAgentStream] Error checking agent status for ${runId} after stream close: ${errorMessage}`);
const isNotFoundError = errorMessage.includes('not found') ||
errorMessage.includes('404') ||
errorMessage.includes('does not exist');
if (isNotFoundError) {
console.log(`[useAgentStream] Agent run ${runId} not found after stream close. Finalizing.`);
finalizeStream('agent_not_running', runId); // Use specific status
} else {
// For other errors checking status, finalize with generic error
finalizeStream('error', runId);
}
});
}, [status, finalizeStream]); // Include status
// --- Effect to manage the stream lifecycle ---
useEffect(() => {
isMountedRef.current = true;
// Cleanup function for when the component unmounts or agentRunId changes
return () => {
isMountedRef.current = false;
console.log('[useAgentStream] Unmounting or agentRunId changing. Cleaning up stream.');
if (streamCleanupRef.current) {
streamCleanupRef.current();
streamCleanupRef.current = null;
}
// Reset state on unmount if needed, though finalizeStream should handle most cases
setStatus('idle');
setTextContent('');
setToolCall(null);
setError(null);
setAgentRunId(null);
currentRunIdRef.current = null;
};
}, []); // Empty dependency array for mount/unmount effect
// --- Public Functions ---
const startStreaming = useCallback(async (runId: string) => {
if (!isMountedRef.current) return;
console.log(`[useAgentStream] Received request to start streaming for ${runId}`);
// Clean up any previous stream
if (streamCleanupRef.current) {
console.log('[useAgentStream] Cleaning up existing stream before starting new one.');
streamCleanupRef.current();
streamCleanupRef.current = null;
}
// Reset state before starting
setTextContent('');
setToolCall(null);
setError(null);
updateStatus('connecting');
setAgentRunId(runId);
currentRunIdRef.current = runId; // Set the ref immediately
try {
// *** Crucial check: Verify agent is running BEFORE connecting ***
const agentStatus = await getAgentStatus(runId);
if (!isMountedRef.current) return; // Check mount status after async call
if (agentStatus.status !== 'running') {
console.warn(`[useAgentStream] Agent run ${runId} is not in running state (status: ${agentStatus.status}). Cannot start stream.`);
setError(`Agent run is not running (status: ${agentStatus.status})`);
finalizeStream('agent_not_running', runId);
return;
}
// Agent is running, proceed to create the stream
console.log(`[useAgentStream] Agent run ${runId} confirmed running. Setting up EventSource.`);
const cleanup = streamAgent(runId, {
onMessage: handleStreamMessage,
onError: handleStreamError,
onClose: handleStreamClose,
});
streamCleanupRef.current = cleanup;
// Status will be updated to 'streaming' by the first message received in handleStreamMessage
} catch (err) {
if (!isMountedRef.current) return; // Check mount status after async call
const errorMessage = err instanceof Error ? err.message : String(err);
console.error(`[useAgentStream] Error initiating stream for ${runId}: ${errorMessage}`);
setError(errorMessage);
const isNotFoundError = errorMessage.includes('not found') ||
errorMessage.includes('404') ||
errorMessage.includes('does not exist');
finalizeStream(isNotFoundError ? 'agent_not_running' : 'error', runId);
}
}, [updateStatus, finalizeStream, handleStreamMessage, handleStreamError, handleStreamClose]); // Add dependencies
const stopStreaming = useCallback(async () => {
if (!isMountedRef.current || !agentRunId) return;
const runIdToStop = agentRunId;
console.log(`[useAgentStream] Stopping stream for agent run ${runIdToStop}`);
// Immediately update status and clean up stream
finalizeStream('stopped', runIdToStop);
try {
await stopAgent(runIdToStop);
toast.success('Agent stop request sent.');
// finalizeStream already called getAgentStatus implicitly if needed
} catch (err) {
// Don't revert status here, as the user intended to stop. Just log error.
const errorMessage = err instanceof Error ? err.message : String(err);
console.error(`[useAgentStream] Error sending stop request for ${runIdToStop}: ${errorMessage}`);
toast.error(`Failed to stop agent: ${errorMessage}`);
}
}, [agentRunId, finalizeStream]); // Add dependencies
return {
status,
textContent,
toolCall,
error,
agentRunId,
startStreaming,
stopStreaming,
};
}

View File

@ -2,12 +2,11 @@ import { createClient } from '@/lib/supabase/client';
const API_URL = process.env.NEXT_PUBLIC_BACKEND_URL || '';
// Simple cache implementation
// Simple cache implementation for non-agent data
const apiCache = {
projects: new Map(),
threads: new Map(),
threadMessages: new Map(),
agentRuns: new Map(),
getProject: (projectId: string) => apiCache.projects.get(projectId),
setProject: (projectId: string, data: any) => apiCache.projects.set(projectId, data),
@ -21,75 +20,15 @@ const apiCache = {
getThreadMessages: (threadId: string) => apiCache.threadMessages.get(threadId),
setThreadMessages: (threadId: string, data: any) => apiCache.threadMessages.set(threadId, data),
getAgentRuns: (threadId: string) => apiCache.agentRuns.get(threadId),
setAgentRuns: (threadId: string, data: any) => apiCache.agentRuns.set(threadId, data),
// Helper to clear parts of the cache when data changes
invalidateThreadMessages: (threadId: string) => apiCache.threadMessages.delete(threadId),
invalidateAgentRuns: (threadId: string) => apiCache.agentRuns.delete(threadId),
};
// Add a fetch queue system to prevent multiple simultaneous requests
const fetchQueue = {
agentRuns: new Map<string, Promise<any>>(),
threads: new Map<string, Promise<any>>(),
messages: new Map<string, Promise<any>>(),
projects: new Map<string, Promise<any>>(),
getQueuedAgentRuns: (threadId: string) => fetchQueue.agentRuns.get(threadId),
setQueuedAgentRuns: (threadId: string, promise: Promise<any>) => {
fetchQueue.agentRuns.set(threadId, promise);
// Auto-clean the queue after the promise resolves
promise.finally(() => {
fetchQueue.agentRuns.delete(threadId);
});
return promise;
},
getQueuedThreads: (projectId: string) => fetchQueue.threads.get(projectId || 'all'),
setQueuedThreads: (projectId: string, promise: Promise<any>) => {
fetchQueue.threads.set(projectId || 'all', promise);
promise.finally(() => {
fetchQueue.threads.delete(projectId || 'all');
});
return promise;
},
getQueuedMessages: (threadId: string) => fetchQueue.messages.get(threadId),
setQueuedMessages: (threadId: string, promise: Promise<any>) => {
fetchQueue.messages.set(threadId, promise);
promise.finally(() => {
fetchQueue.messages.delete(threadId);
});
return promise;
},
getQueuedProjects: () => fetchQueue.projects.get('all'),
setQueuedProjects: (promise: Promise<any>) => {
fetchQueue.projects.set('all', promise);
promise.finally(() => {
fetchQueue.projects.delete('all');
});
return promise;
}
};
// Track active streams by agent run ID
const activeStreams = new Map<string, {
eventSource: EventSource;
lastMessageTime: number;
subscribers: Set<{
onMessage: (content: string) => void;
onError: (error: Error | string) => void;
onClose: () => void;
}>;
}>();
const activeStreams = new Map<string, EventSource>();
// Track recent agent status requests to prevent duplicates
const recentAgentStatusRequests = new Map<string, {
timestamp: number;
promise: Promise<AgentRun>;
}>();
// Track agent runs that have been confirmed as completed or not found
const nonRunningAgentRuns = new Set<string>();
export type Project = {
id: string;
@ -135,47 +74,35 @@ export type ToolCall = {
// Project APIs
export const getProjects = async (): Promise<Project[]> => {
// Check if we already have a pending request
const pendingRequest = fetchQueue.getQueuedProjects();
if (pendingRequest) {
return pendingRequest;
}
// Check cache first
const cached = apiCache.getProjects();
if (cached) {
return cached;
}
// Create and queue the promise
const fetchPromise = (async () => {
try {
const supabase = createClient();
const { data, error } = await supabase
.from('projects')
.select('*');
if (error) {
// Handle permission errors specifically
if (error.code === '42501' && error.message.includes('has_role_on_account')) {
console.error('Permission error: User does not have proper account access');
return []; // Return empty array instead of throwing
}
throw error;
try {
const supabase = createClient();
const { data, error } = await supabase
.from('projects')
.select('*');
if (error) {
// Handle permission errors specifically
if (error.code === '42501' && error.message.includes('has_role_on_account')) {
console.error('Permission error: User does not have proper account access');
return []; // Return empty array instead of throwing
}
// Cache the result
apiCache.setProjects(data || []);
return data || [];
} catch (err) {
console.error('Error fetching projects:', err);
// Return empty array for permission errors to avoid crashing the UI
return [];
throw error;
}
})();
// Add to queue and return
return fetchQueue.setQueuedProjects(fetchPromise);
// Cache the result
apiCache.setProjects(data || []);
return data || [];
} catch (err) {
console.error('Error fetching projects:', err);
// Return empty array for permission errors to avoid crashing the UI
return [];
}
};
export const getProject = async (projectId: string): Promise<Project> => {
@ -290,38 +217,26 @@ export const deleteProject = async (projectId: string): Promise<void> => {
// Thread APIs
export const getThreads = async (projectId?: string): Promise<Thread[]> => {
// Check if we already have a pending request
const pendingRequest = fetchQueue.getQueuedThreads(projectId || 'all');
if (pendingRequest) {
return pendingRequest;
}
// Check cache first
const cached = apiCache.getThreads(projectId || 'all');
if (cached) {
return cached;
}
// Create and queue the promise
const fetchPromise = (async () => {
const supabase = createClient();
let query = supabase.from('threads').select('*');
if (projectId) {
query = query.eq('project_id', projectId);
}
const { data, error } = await query;
if (error) throw error;
// Cache the result
apiCache.setThreads(projectId || 'all', data || []);
return data || [];
})();
const supabase = createClient();
let query = supabase.from('threads').select('*');
// Add to queue and return
return fetchQueue.setQueuedThreads(projectId || 'all', fetchPromise);
if (projectId) {
query = query.eq('project_id', projectId);
}
const { data, error } = await query;
if (error) throw error;
// Cache the result
apiCache.setThreads(projectId || 'all', data || []);
return data || [];
};
export const getThread = async (threadId: string): Promise<Thread> => {
@ -389,43 +304,31 @@ export const addUserMessage = async (threadId: string, content: string): Promise
};
export const getMessages = async (threadId: string): Promise<Message[]> => {
// Check if we already have a pending request
const pendingRequest = fetchQueue.getQueuedMessages(threadId);
if (pendingRequest) {
return pendingRequest;
}
// Check cache first
const cached = apiCache.getThreadMessages(threadId);
if (cached) {
return cached;
}
// Create and queue the promise
const fetchPromise = (async () => {
const supabase = createClient();
const { data, error } = await supabase
.from('messages')
.select('*')
.eq('thread_id', threadId)
.neq('type', 'cost')
.neq('type', 'summary')
.order('created_at', { ascending: true });
if (error) {
console.error('Error fetching messages:', error);
throw new Error(`Error getting messages: ${error.message}`);
}
// Cache the result
apiCache.setThreadMessages(threadId, data || []);
return data || [];
})();
const supabase = createClient();
// Add to queue and return
return fetchQueue.setQueuedMessages(threadId, fetchPromise);
const { data, error } = await supabase
.from('messages')
.select('*')
.eq('thread_id', threadId)
.neq('type', 'cost')
.neq('type', 'summary')
.order('created_at', { ascending: true });
if (error) {
console.error('Error fetching messages:', error);
throw new Error(`Error getting messages: ${error.message}`);
}
// Cache the result
apiCache.setThreadMessages(threadId, data || []);
return data || [];
};
// Agent APIs
@ -451,6 +354,8 @@ export const startAgent = async (threadId: string): Promise<{ agent_run_id: stri
'Content-Type': 'application/json',
'Authorization': `Bearer ${session.access_token}`,
},
// Add cache: 'no-store' to prevent caching
cache: 'no-store',
});
if (!response.ok) {
@ -459,10 +364,6 @@ export const startAgent = async (threadId: string): Promise<{ agent_run_id: stri
throw new Error(`Error starting agent: ${response.statusText} (${response.status})`);
}
// Invalidate relevant caches
apiCache.invalidateAgentRuns(threadId);
apiCache.invalidateThreadMessages(threadId);
return response.json();
} catch (error) {
console.error('[API] Failed to start agent:', error);
@ -477,6 +378,17 @@ export const startAgent = async (threadId: string): Promise<{ agent_run_id: stri
};
export const stopAgent = async (agentRunId: string): Promise<void> => {
// Add to non-running set immediately to prevent reconnection attempts
nonRunningAgentRuns.add(agentRunId);
// Close any existing stream
const existingStream = activeStreams.get(agentRunId);
if (existingStream) {
console.log(`[API] Closing existing stream for ${agentRunId} before stopping agent`);
existingStream.close();
activeStreams.delete(agentRunId);
}
const supabase = createClient();
const { data: { session } } = await supabase.auth.getSession();
@ -490,6 +402,8 @@ export const stopAgent = async (agentRunId: string): Promise<void> => {
'Content-Type': 'application/json',
'Authorization': `Bearer ${session.access_token}`,
},
// Add cache: 'no-store' to prevent caching
cache: 'no-store',
});
if (!response.ok) {
@ -498,16 +412,12 @@ export const stopAgent = async (agentRunId: string): Promise<void> => {
};
export const getAgentStatus = async (agentRunId: string): Promise<AgentRun> => {
console.log(`[API] ⚠️ Requesting agent status for ${agentRunId}`);
console.log(`[API] Requesting agent status for ${agentRunId}`);
// Check if we have a recent request for this agent run
const now = Date.now();
const recentRequest = recentAgentStatusRequests.get(agentRunId);
// If we have a request from the last 2 seconds, reuse its promise
if (recentRequest && now - recentRequest.timestamp < 2000) {
console.log(`[API] 🔄 Reusing recent status request for ${agentRunId} from ${now - recentRequest.timestamp}ms ago`);
return recentRequest.promise;
// If we already know this agent is not running, throw an error
if (nonRunningAgentRuns.has(agentRunId)) {
console.log(`[API] Agent run ${agentRunId} is known to be non-running, returning error`);
throw new Error(`Agent run ${agentRunId} is not running`);
}
try {
@ -515,69 +425,50 @@ export const getAgentStatus = async (agentRunId: string): Promise<AgentRun> => {
const { data: { session } } = await supabase.auth.getSession();
if (!session?.access_token) {
console.error('[API] No access token available for getAgentStatus');
console.error('[API] No access token available for getAgentStatus');
throw new Error('No access token available');
}
const url = `${API_URL}/agent-run/${agentRunId}`;
console.log(`[API] 🔍 Fetching from: ${url}`);
console.log(`[API] Fetching from: ${url}`);
// Create the promise for this request
const requestPromise = (async () => {
const response = await fetch(url, {
headers: {
'Authorization': `Bearer ${session.access_token}`,
},
});
if (!response.ok) {
const errorText = await response.text().catch(() => 'No error details available');
console.error(`[API] ❌ Error getting agent status: ${response.status} ${response.statusText}`, errorText);
throw new Error(`Error getting agent status: ${response.statusText} (${response.status})`);
}
const data = await response.json();
console.log(`[API] ✅ Successfully got agent status:`, data);
// Clean up old entries after 5 seconds
setTimeout(() => {
const entry = recentAgentStatusRequests.get(agentRunId);
if (entry && entry.timestamp === now) {
recentAgentStatusRequests.delete(agentRunId);
}
}, 5000);
return data;
})();
// Store this request in our cache
recentAgentStatusRequests.set(agentRunId, {
timestamp: now,
promise: requestPromise
const response = await fetch(url, {
headers: {
'Authorization': `Bearer ${session.access_token}`,
},
// Add cache: 'no-store' to prevent caching
cache: 'no-store',
});
return requestPromise;
if (!response.ok) {
const errorText = await response.text().catch(() => 'No error details available');
console.error(`[API] Error getting agent status: ${response.status} ${response.statusText}`, errorText);
// If we get a 404, add to non-running set
if (response.status === 404) {
nonRunningAgentRuns.add(agentRunId);
}
throw new Error(`Error getting agent status: ${response.statusText} (${response.status})`);
}
const data = await response.json();
console.log(`[API] Successfully got agent status:`, data);
// If agent is not running, add to non-running set
if (data.status !== 'running') {
nonRunningAgentRuns.add(agentRunId);
}
return data;
} catch (error) {
console.error('[API] ❌ Failed to get agent status:', error);
console.error('[API] Failed to get agent status:', error);
throw error;
}
};
export const getAgentRuns = async (threadId: string): Promise<AgentRun[]> => {
// Check if we already have a pending request for this thread ID
const pendingRequest = fetchQueue.getQueuedAgentRuns(threadId);
if (pendingRequest) {
return pendingRequest;
}
// Check cache first
const cached = apiCache.getAgentRuns(threadId);
if (cached) {
return cached;
}
// Create and queue the promise to prevent duplicate requests
const fetchPromise = (async () => {
try {
const supabase = createClient();
const { data: { session } } = await supabase.auth.getSession();
@ -589,6 +480,8 @@ export const getAgentRuns = async (threadId: string): Promise<AgentRun[]> => {
headers: {
'Authorization': `Bearer ${session.access_token}`,
},
// Add cache: 'no-store' to prevent caching
cache: 'no-store',
});
if (!response.ok) {
@ -596,15 +489,11 @@ export const getAgentRuns = async (threadId: string): Promise<AgentRun[]> => {
}
const data = await response.json();
const agentRuns = data.agent_runs || [];
// Cache the result
apiCache.setAgentRuns(threadId, agentRuns);
return agentRuns;
})();
// Add to queue and return
return fetchQueue.setQueuedAgentRuns(threadId, fetchPromise);
return data.agent_runs || [];
} catch (error) {
console.error('Failed to get agent runs:', error);
throw error;
}
};
export const streamAgent = (agentRunId: string, callbacks: {
@ -612,41 +501,58 @@ export const streamAgent = (agentRunId: string, callbacks: {
onError: (error: Error | string) => void;
onClose: () => void;
}): () => void => {
console.log(`[STREAM] streamAgent called for ${agentRunId}, active streams: ${Array.from(activeStreams.keys()).join(', ')}`);
console.log(`[STREAM] streamAgent called for ${agentRunId}`);
// Check if there's already an active stream for this agent run
let activeStream = activeStreams.get(agentRunId);
// If we already have a stream, just add this subscriber
if (activeStream) {
console.log(`[STREAM] Reusing existing stream for ${agentRunId}, adding subscriber`);
activeStream.subscribers.add(callbacks);
// Check if this agent run is known to be non-running
if (nonRunningAgentRuns.has(agentRunId)) {
console.log(`[STREAM] Agent run ${agentRunId} is known to be non-running, not creating stream`);
// Notify the caller immediately
setTimeout(() => {
callbacks.onError(`Agent run ${agentRunId} is not running`);
callbacks.onClose();
}, 0);
// Return a cleanup function for this specific subscriber
return () => {
console.log(`[STREAM] Removing subscriber from ${agentRunId}`);
const stream = activeStreams.get(agentRunId);
if (stream) {
stream.subscribers.delete(callbacks);
// If no subscribers remain, clean up the stream
if (stream.subscribers.size === 0) {
console.log(`[STREAM] No subscribers left for ${agentRunId}, closing stream`);
stream.eventSource.close();
activeStreams.delete(agentRunId);
}
}
};
// Return a no-op cleanup function
return () => {};
}
// If no active stream exists, create a new one
console.log(`[STREAM] Creating new stream for ${agentRunId}`);
let isClosing = false;
// Check if there's already an active stream for this agent run
const existingStream = activeStreams.get(agentRunId);
if (existingStream) {
console.log(`[STREAM] Stream already exists for ${agentRunId}, closing it first`);
existingStream.close();
activeStreams.delete(agentRunId);
}
const setupStream = async () => {
try {
if (isClosing) {
console.log(`[STREAM] Already closing, not setting up stream for ${agentRunId}`);
// Set up a new stream
try {
const setupStream = async () => {
// First verify the agent is actually running
try {
const status = await getAgentStatus(agentRunId);
if (status.status !== 'running') {
console.log(`[STREAM] Agent run ${agentRunId} is not running (status: ${status.status}), not creating stream`);
nonRunningAgentRuns.add(agentRunId);
callbacks.onError(`Agent run ${agentRunId} is not running (status: ${status.status})`);
callbacks.onClose();
return;
}
} catch (err) {
console.error(`[STREAM] Error verifying agent run ${agentRunId}:`, err);
// Check if this is a "not found" error
const errorMessage = err instanceof Error ? err.message : String(err);
const isNotFoundError = errorMessage.includes('not found') ||
errorMessage.includes('404') ||
errorMessage.includes('does not exist');
if (isNotFoundError) {
console.log(`[STREAM] Agent run ${agentRunId} not found, not creating stream`);
nonRunningAgentRuns.add(agentRunId);
}
callbacks.onError(errorMessage);
callbacks.onClose();
return;
}
@ -666,14 +572,8 @@ export const streamAgent = (agentRunId: string, callbacks: {
console.log(`[STREAM] Creating EventSource for ${agentRunId}`);
const eventSource = new EventSource(url.toString());
// Create and add to active streams map immediately
activeStream = {
eventSource,
lastMessageTime: Date.now(),
subscribers: new Set([callbacks])
};
activeStreams.set(agentRunId, activeStream);
// Store the EventSource in the active streams map
activeStreams.set(agentRunId, eventSource);
eventSource.onopen = () => {
console.log(`[STREAM] Connection opened for ${agentRunId}`);
@ -684,11 +584,6 @@ export const streamAgent = (agentRunId: string, callbacks: {
const rawData = event.data;
if (rawData.includes('"type":"ping"')) return;
// Update last message time
if (activeStream) {
activeStream.lastMessageTime = Date.now();
}
// Log raw data for debugging (truncated for readability)
console.log(`[STREAM] Received data for ${agentRunId}: ${rawData.substring(0, 100)}${rawData.length > 100 ? '...' : ''}`);
@ -700,199 +595,131 @@ export const streamAgent = (agentRunId: string, callbacks: {
// Check for "Agent run not found" error
if (rawData.includes('Agent run') && rawData.includes('not found in active runs')) {
console.log(`[STREAM] ⚠️ Agent run ${agentRunId} not found in active runs, closing stream`);
console.log(`[STREAM] Agent run ${agentRunId} not found in active runs, closing stream`);
// Notify subscribers about the error
const currentStream = activeStreams.get(agentRunId);
if (currentStream) {
currentStream.subscribers.forEach(subscriber => {
try {
subscriber.onError("Agent run not found in active runs");
subscriber.onClose();
} catch (subError) {
console.error(`[STREAM] Error in subscriber notification for not found:`, subError);
}
});
}
// Add to non-running set to prevent future reconnection attempts
nonRunningAgentRuns.add(agentRunId);
// Notify about the error
callbacks.onError("Agent run not found in active runs");
// Clean up
eventSource.close();
activeStreams.delete(agentRunId);
callbacks.onClose();
// Clean up stream since agent is not found
if (!isClosing) {
cleanupStream();
}
return;
}
// Check for simple completion status message
// Check for completion messages
if (rawData.includes('"type":"status"') && rawData.includes('"status":"completed"')) {
console.log(`[STREAM] ⚠️ Detected simple completion status message for ${agentRunId}, closing stream`);
console.log(`[STREAM] Detected completion status message for ${agentRunId}`);
// Notify all subscribers this is the final message
const currentStream = activeStreams.get(agentRunId);
if (currentStream) {
currentStream.subscribers.forEach(subscriber => {
try {
subscriber.onMessage(rawData);
} catch (subError) {
console.error(`[STREAM] Error in subscriber onMessage for completion:`, subError);
}
});
// Check for specific completion messages that indicate we should stop checking
if (rawData.includes('Run data not available for streaming') ||
rawData.includes('Stream ended with status: completed')) {
console.log(`[STREAM] Detected final completion message for ${agentRunId}, adding to non-running set`);
// Add to non-running set to prevent future reconnection attempts
nonRunningAgentRuns.add(agentRunId);
}
// Clean up stream since agent is complete
if (!isClosing) {
cleanupStream();
}
// Notify about the message
callbacks.onMessage(rawData);
// Clean up
eventSource.close();
activeStreams.delete(agentRunId);
callbacks.onClose();
return;
}
// Check for completion status message
const isCompletionMessage = rawData.includes('"type":"status"') &&
(rawData.includes('"status":"completed"') ||
rawData.includes('"status_type":"thread_run_end"'));
if (isCompletionMessage) {
console.log(`[STREAM] ⚠️ Detected completion status message for ${agentRunId}`);
// Check for thread run end message
if (rawData.includes('"type":"status"') && rawData.includes('"status_type":"thread_run_end"')) {
console.log(`[STREAM] Detected thread run end message for ${agentRunId}`);
// Add to non-running set
nonRunningAgentRuns.add(agentRunId);
// Notify about the message
callbacks.onMessage(rawData);
// Clean up
eventSource.close();
activeStreams.delete(agentRunId);
callbacks.onClose();
return;
}
// Notify all subscribers about this message
const currentStream = activeStreams.get(agentRunId);
if (currentStream) {
currentStream.subscribers.forEach(subscriber => {
try {
subscriber.onMessage(rawData);
} catch (subError) {
console.error(`[STREAM] Error in subscriber onMessage:`, subError);
// Don't let subscriber errors affect other subscribers
}
});
}
// For all other messages, just pass them through
callbacks.onMessage(rawData);
// Handle completion message cleanup after delivering to subscribers
if (isCompletionMessage && !isClosing) {
console.log(`[STREAM] ⚠️ Closing stream due to completion message for ${agentRunId}`);
cleanupStream();
}
} catch (error) {
console.error(`[STREAM] Error handling message:`, error);
// Notify error without closing the stream to allow retries
const currentStream = activeStreams.get(agentRunId);
if (currentStream) {
currentStream.subscribers.forEach(subscriber => {
try {
subscriber.onError(error instanceof Error ? error : String(error));
} catch (subError) {
console.error(`[STREAM] Error in subscriber onError:`, subError);
}
});
}
callbacks.onError(error instanceof Error ? error : String(error));
}
};
eventSource.onerror = (event) => {
console.log(`[STREAM] 🔍 EventSource error for ${agentRunId}:`, event);
console.log(`[STREAM] EventSource error for ${agentRunId}:`, event);
if (isClosing) {
console.log(`[STREAM] Error ignored because stream is closing for ${agentRunId}`);
return;
}
// Check if we need to verify agent run status
const shouldVerifyAgentStatus = (event as any).target?.readyState === 2; // CLOSED
if (shouldVerifyAgentStatus) {
console.log(`[STREAM] Connection closed, verifying if agent run ${agentRunId} still exists`);
// Verify if the agent run still exists before reconnecting
getAgentStatus(agentRunId)
.then(status => {
if (status.status === 'running') {
console.log(`[STREAM] Agent run ${agentRunId} is still running, will attempt reconnection`);
// Don't clean up, let the page component handle reconnection
} else {
console.log(`[STREAM] Agent run ${agentRunId} is no longer running (${status.status}), cleaning up stream`);
cleanupStream();
}
})
.catch(err => {
console.error(`[STREAM] Error checking agent status after connection error:`, err);
// If we get a 404 or similar error, the agent run doesn't exist
if (err.message && (
err.message.includes('not found') ||
err.message.includes('404') ||
err.message.includes('does not exist')
)) {
console.log(`[STREAM] Agent run ${agentRunId} appears to not exist, cleaning up stream`);
cleanupStream();
} else {
// For other errors, we'll let the page component handle reconnection
console.log(`[STREAM] Network or other error checking agent status, will let page handle reconnection`);
}
});
} else {
// For other types of errors, we'll attempt to keep the stream alive
console.log(`[STREAM] Non-fatal error for ${agentRunId}, keeping stream alive`);
}
// Check if the agent is still running
getAgentStatus(agentRunId)
.then(status => {
if (status.status !== 'running') {
console.log(`[STREAM] Agent run ${agentRunId} is not running after error, closing stream`);
nonRunningAgentRuns.add(agentRunId);
eventSource.close();
activeStreams.delete(agentRunId);
callbacks.onClose();
} else {
console.log(`[STREAM] Agent run ${agentRunId} is still running after error, keeping stream open`);
// Let the browser handle reconnection for non-fatal errors
}
})
.catch(err => {
console.error(`[STREAM] Error checking agent status after stream error:`, err);
// Check if this is a "not found" error
const errMsg = err instanceof Error ? err.message : String(err);
const isNotFoundErr = errMsg.includes('not found') ||
errMsg.includes('404') ||
errMsg.includes('does not exist');
if (isNotFoundErr) {
console.log(`[STREAM] Agent run ${agentRunId} not found after error, closing stream`);
nonRunningAgentRuns.add(agentRunId);
eventSource.close();
activeStreams.delete(agentRunId);
callbacks.onClose();
}
// For other errors, notify but don't close the stream
callbacks.onError(errMsg);
});
};
} catch (error) {
console.error(`[STREAM] Error setting up stream for ${agentRunId}:`, error);
if (!isClosing) {
callbacks.onError(error instanceof Error ? error : String(error));
cleanupStream();
};
// Start the stream setup
setupStream();
// Return a cleanup function
return () => {
console.log(`[STREAM] Cleanup called for ${agentRunId}`);
const stream = activeStreams.get(agentRunId);
if (stream) {
console.log(`[STREAM] Closing stream for ${agentRunId}`);
stream.close();
activeStreams.delete(agentRunId);
}
}
};
const cleanupStream = () => {
if (isClosing) return;
isClosing = true;
console.log(`[STREAM] Cleaning up stream for ${agentRunId}`);
const stream = activeStreams.get(agentRunId);
if (stream) {
// Close the EventSource
stream.eventSource.close();
// Notify all subscribers
stream.subscribers.forEach(subscriber => {
try {
subscriber.onClose();
} catch (error) {
console.error(`[STREAM] Error in subscriber onClose:`, error);
}
});
// Remove from active streams
activeStreams.delete(agentRunId);
}
};
// Setup the stream
setupStream();
// Return cleanup function for this subscriber
return () => {
console.log(`[STREAM] Cleanup called for ${agentRunId}`);
const stream = activeStreams.get(agentRunId);
if (stream) {
// Remove this subscriber
stream.subscribers.delete(callbacks);
// If this was the last subscriber, clean up the entire stream
if (stream.subscribers.size === 0) {
console.log(`[STREAM] Last subscriber removed, closing stream for ${agentRunId}`);
if (!isClosing) {
cleanupStream();
}
} else {
console.log(`[STREAM] Subscriber removed, but ${stream.subscribers.size} still active for ${agentRunId}`);
}
}
};
};
} catch (error) {
console.error(`[STREAM] Error setting up stream for ${agentRunId}:`, error);
callbacks.onError(error instanceof Error ? error : String(error));
callbacks.onClose();
return () => {};
}
};
// Sandbox API Functions