fix streaming

This commit is contained in:
Adam Cohen Hillel 2025-04-17 13:24:42 +01:00
parent b2c307ef7e
commit 1aa87c3ff1
3 changed files with 238 additions and 162 deletions

View File

@ -63,34 +63,49 @@ const getToolIcon = (toolName: string): ElementType => {
// Convert to lowercase for case-insensitive matching
const normalizedName = toolName.toLowerCase();
// Check for browser-related tools with a prefix check
if (normalizedName.startsWith('browser-')) {
return Globe;
}
switch (normalizedName) {
// File operations
case 'create-file':
case 'str-replace':
case 'write-file':
case 'full-file-rewrite':
case 'read-file':
return FileEdit;
case 'run_terminal_cmd':
case 'run_command':
// Shell commands
case 'execute-command':
return Terminal;
case 'web_search':
// Web operations
case 'web-search':
return Search;
case 'browse_url':
return Globe;
case 'call_api':
return Code;
case 'send_message':
return MessageSquare;
case 'list_dir':
return Folder;
case 'read_file':
return FileText;
case 'delete_file':
// API and data operations
case 'call-data-provider':
case 'get-data-provider-endpoints':
return ExternalLink; // Using ExternalLink instead of Database which isn't imported
// Code operations
case 'delete-file':
return FileX;
case 'deploy':
// Deployment
case 'deploy-site':
return CloudUpload;
// Tools and utilities
case 'execute-code':
return Code;
// Default case
default:
// Add logging for debugging unhandled tool types
console.log(`[PAGE] Using default icon for unknown tool type: ${toolName}`);
return Cog; // Default icon
return Wrench; // Default icon for tools
}
};
@ -99,31 +114,65 @@ const extractPrimaryParam = (toolName: string, content: string | undefined): str
if (!content) return null;
try {
// Handle browser tools with a prefix check
if (toolName?.toLowerCase().startsWith('browser-')) {
// Try to extract URL for navigation
const urlMatch = content.match(/url=(?:"|')([^"|']+)(?:"|')/);
if (urlMatch) return urlMatch[1];
// For other browser operations, extract the goal or action
const goalMatch = content.match(/goal=(?:"|')([^"|']+)(?:"|')/);
if (goalMatch) {
const goal = goalMatch[1];
return goal.length > 30 ? goal.substring(0, 27) + '...' : goal;
}
return null;
}
// Simple regex for common parameters - adjust as needed
let match: RegExpMatchArray | null = null;
switch (toolName?.toLowerCase()) {
case 'edit_file':
case 'read_file':
case 'delete_file':
case 'write_file':
match = content.match(/target_file=(?:"|')([^"|']+)(?:"|')/);
// File operations
case 'create-file':
case 'full-file-rewrite':
case 'read-file':
case 'delete-file':
case 'str-replace':
// Try to match file_path attribute
match = content.match(/file_path=(?:"|')([^"|']+)(?:"|')/);
// Return just the filename part
return match ? match[1].split('/').pop() || match[1] : null;
case 'run_terminal_cmd':
case 'run_command':
// Shell commands
case 'execute-command':
// Extract command content
match = content.match(/command=(?:"|')([^"|']+)(?:"|')/);
// Truncate long commands
return match ? (match[1].length > 30 ? match[1].substring(0, 27) + '...' : match[1]) : null;
case 'web_search':
if (match) {
const cmd = match[1];
return cmd.length > 30 ? cmd.substring(0, 27) + '...' : cmd;
}
return null;
// Web search
case 'web-search':
match = content.match(/query=(?:"|')([^"|']+)(?:"|')/);
return match ? (match[1].length > 30 ? match[1].substring(0, 27) + '...' : match[1]) : null;
case 'browse_url':
match = content.match(/url=(?:"|')([^"|']+)(?:"|')/);
// Data provider operations
case 'call-data-provider':
match = content.match(/service_name=(?:"|')([^"|']+)(?:"|')/);
const route = content.match(/route=(?:"|')([^"|']+)(?:"|')/);
return match && route ? `${match[1]}/${route[1]}` : (match ? match[1] : null);
// Deployment
case 'deploy-site':
match = content.match(/site_name=(?:"|')([^"|']+)(?:"|')/);
return match ? match[1] : null;
// Add more cases as needed for other tools
default:
return null;
}
return null;
} catch (e) {
console.warn("Error parsing tool parameters:", e);
return null;
@ -146,14 +195,14 @@ function groupMessages(messages: ApiMessage[]): RenderItem[] {
// Check if current message is the start of a potential sequence
if (currentMsg.role === 'assistant') {
// Regex to find the first XML-like tag: <tagname ...> or <tagname>
const toolTagMatch = currentMsg.content?.match(/<([a-zA-Z\-_]+)(?:\s+[^>]*)?>/);
// Regex to find the first XML-like tag: <tagname ...> or <tagname> or self-closing tags
const toolTagMatch = currentMsg.content?.match(/<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/);
if (toolTagMatch && nextMsg && nextMsg.role === 'user') {
const expectedTag = toolTagMatch[1];
// Regex to check for <tool_result><tagname>...</tagname></tool_result>
// Using 's' flag for dotall to handle multiline content within tags -> Replaced with [\s\S] to avoid ES target issues
const toolResultRegex = new RegExp(`^<tool_result>\\s*<(${expectedTag})(?:\\s+[^>]*)?>[\\s\\S]*?</\\1>\\s*</tool_result>`);
// Also handle self-closing tags in the response
const toolResultRegex = new RegExp(`^<tool_result>\\s*<(${expectedTag})(?:\\s+[^>]*)?(?:/>|>[\\s\\S]*?</\\1>)\\s*</tool_result>`);
if (nextMsg.content?.match(toolResultRegex)) {
// Found a pair, start a sequence
@ -167,12 +216,12 @@ function groupMessages(messages: ApiMessage[]): RenderItem[] {
const potentialUser = i + 1 < messages.length ? messages[i + 1] : null;
if (potentialAssistant.role === 'assistant') {
const nextToolTagMatch = potentialAssistant.content?.match(/<([a-zA-Z\-_]+)(?:\s+[^>]*)?>/);
const nextToolTagMatch = potentialAssistant.content?.match(/<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/);
if (nextToolTagMatch && potentialUser && potentialUser.role === 'user') {
const nextExpectedTag = nextToolTagMatch[1];
// Replaced dotall 's' flag with [\s\S]
const nextToolResultRegex = new RegExp(`^<tool_result>\\s*<(${nextExpectedTag})(?:\\s+[^>]*)?>[\\s\\S]*?</\\1>\\s*</tool_result>`);
// Also handle self-closing tags in the response
const nextToolResultRegex = new RegExp(`^<tool_result>\\s*<(${nextExpectedTag})(?:\\s+[^>]*)?(?:/>|>[\\s\\S]*?</\\1>)\\s*</tool_result>`);
if (potentialUser.content?.match(nextToolResultRegex)) {
// Sequence continues
@ -429,7 +478,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
// If there's a specific finish reason, handle it
if (jsonData.finish_reason === 'xml_tool_limit_reached') {
// Potentially show a toast notification
toast.info('Tool execution limit reached. The agent will continue with available information.');
// toast.info('Tool execution limit reached. The agent will continue with available information.');
}
return;
}
@ -456,17 +505,9 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
};
setToolCallData(toolInfo);
// Optionally add to stream content to show tool execution
setStreamContent(prev =>
prev + `\n\nExecuting tool: ${jsonData.function_name}\n`
);
} else if (jsonData.status === 'completed') {
// Update UI to show tool completion
setToolCallData(null);
// Optionally add to stream content
setStreamContent(prev =>
prev + `\nTool execution completed: ${jsonData.function_name}\n`
);
}
return;
}
@ -479,10 +520,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setSidePanelContent(null);
setToolCallData(null);
// Add tool result to the stream content for visibility
setStreamContent(prev =>
prev + `\nReceived result from ${jsonData.function_name}\n`
);
return;
}
@ -499,11 +536,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setCurrentPairIndex(null); // Live data means not viewing a historical pair
setSidePanelContent(currentLiveToolCall); // Update side panel
// Add to stream content so it's visible
setStreamContent(prev =>
prev + `\nCalling tool: ${currentLiveToolCall.name}\n`
);
if (!isSidePanelOpen) {
// Optionally auto-open side panel? Maybe only if user hasn't closed it recently.
// setIsSidePanelOpen(true);
@ -1008,11 +1040,12 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
// Extract only the XML part and the tool name from the assistant message
const assistantContent = pair.assistantCall.content || '';
// Find the first opening tag and the corresponding closing tag
const xmlRegex = /<([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>/;
// First try to match complete tags, then try self-closing tags
const xmlRegex = /<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>|<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/;
const xmlMatch = assistantContent.match(xmlRegex);
const toolCallXml = xmlMatch ? xmlMatch[0] : '[Could not extract XML tag]';
const assistantToolName = xmlMatch ? xmlMatch[1] : 'Tool'; // Extract name from the matched tag
// Get tag name from either the first capturing group (full tag) or second capturing group (self-closing)
const assistantToolName = xmlMatch ? (xmlMatch[1] || xmlMatch[2]) : 'Tool';
const userResultContent = pair.userResult.content?.match(/<tool_result>([\s\S]*)<\/tool_result>/)?.[1].trim() || '[Could not parse result]';
@ -1040,10 +1073,10 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
// Re-extract data for the side panel (similar to handleHistoricalToolClick)
const assistantContent = pair.assistantCall.content || '';
const xmlRegex = /<([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>/;
const xmlRegex = /<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>|<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/;
const xmlMatch = assistantContent.match(xmlRegex);
const toolCallXml = xmlMatch ? xmlMatch[0] : '[Could not extract XML tag]';
const assistantToolName = xmlMatch ? xmlMatch[1] : 'Tool';
const assistantToolName = xmlMatch ? (xmlMatch[1] || xmlMatch[2]) : 'Tool';
const userToolName = pair.userResult.content?.match(/<tool_result>\s*<([a-zA-Z\-_]+)/)?.[1] || 'Tool';
const userResultContent = pair.userResult.content?.match(/<tool_result>([\s\S]*)<\/tool_result>/)?.[1].trim() || '[Could not parse result]';
@ -1173,6 +1206,12 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
<div className="space-y-6">
{/* Map over processed messages */}
{processedMessages.map((item, index) => {
// Check if this message is an assistant message that follows a user message
const prevMessage = index > 0 ? processedMessages[index - 1] : null;
const isAssistantAfterUser =
(isToolSequence(item) || ((item as ApiMessage).role === 'assistant')) &&
(prevMessage && !isToolSequence(prevMessage) && (prevMessage as ApiMessage).role === 'user');
// ---- Rendering Logic for Tool Sequences ----
if (isToolSequence(item)) {
// Group sequence items into pairs of [assistant, user]
@ -1187,24 +1226,27 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
<div
key={`seq-${index}`}
ref={index === processedMessages.length - 1 ? latestMessageRef : null}
className="relative group pt-4 pb-2 border-t border-gray-100"
className="relative group pt-4 pb-2"
>
{/* Simplified header with logo and name */}
<div className="flex items-center mb-2 text-sm gap-2">
<div className="flex-shrink-0 w-5 h-5 rounded-full flex items-center justify-center overflow-hidden">
<Image src="/kortix-symbol.svg" alt="Suna" width={16} height={16} className="object-contain" />
{/* Show header only if this is an assistant message after a user message */}
{isAssistantAfterUser && (
<div className="flex items-center mb-2 text-sm gap-2">
<div className="flex-shrink-0 w-5 h-5 rounded-full flex items-center justify-center overflow-hidden">
<Image src="/kortix-symbol.svg" alt="Suna" width={16} height={16} className="object-contain" />
</div>
<span className="text-gray-700 font-medium">Suna</span>
</div>
<span className="text-gray-700 font-medium">Suna</span>
</div>
)}
{/* Container for the pairs within the sequence */}
<div className="space-y-4">
{pairs.map((pair, pairIndex) => {
// Parse assistant message content
const assistantContent = pair.assistantCall.content || '';
const xmlRegex = /<([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>/;
const xmlRegex = /<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>|<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/;
const xmlMatch = assistantContent.match(xmlRegex);
const toolName = xmlMatch ? xmlMatch[1] : 'Tool';
// Get tag name from either the first or second capturing group
const toolName = xmlMatch ? (xmlMatch[1] || xmlMatch[2]) : 'Tool';
const preContent = xmlMatch ? assistantContent.substring(0, xmlMatch.index).trim() : assistantContent.trim();
const postContent = xmlMatch ? assistantContent.substring(xmlMatch.index + xmlMatch[0].length).trim() : '';
const userResultName = pair.userResult.content?.match(/<tool_result>\s*<([a-zA-Z\-_]+)/)?.[1] || 'Result';
@ -1275,24 +1317,26 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
<div
key={index} // Use the index from processedMessages
ref={index === processedMessages.length - 1 && message.role !== 'user' ? latestMessageRef : null} // Ref on the regular message div if it's last (and not user)
className={`${message.role === 'user' ? 'text-right py-1' : 'py-2'} ${index > 0 ? 'border-t border-gray-100' : ''}`} // Add top border between messages
className={`${message.role === 'user' ? 'text-right py-1' : 'py-2'}`} // Removed border-t
>
{/* Avatar (User = Right, Assistant/Tool = Left) */}
{message.role === 'user' ? (
// User bubble comes first in flex-end
<div className="max-w-[85%] ml-auto text-sm text-gray-800 whitespace-pre-wrap break-words">
// User bubble with rounded background that fits to text
<div className="inline-block ml-auto text-sm text-gray-800 whitespace-pre-wrap break-words bg-gray-100 rounded-lg py-2 px-3">
{message.content}
</div>
) : (
// Assistant / Tool bubble on the left
<div>
{/* Simplified header with logo and name */}
<div className="flex items-center mb-2 text-sm gap-2">
<div className="flex-shrink-0 w-5 h-5 rounded-full flex items-center justify-center overflow-hidden">
<Image src="/kortix-symbol.svg" alt="Suna" width={16} height={16} className="object-contain" />
{/* Show header only if this is an assistant message after a user message */}
{isAssistantAfterUser && (
<div className="flex items-center mb-2 text-sm gap-2">
<div className="flex-shrink-0 w-5 h-5 rounded-full flex items-center justify-center overflow-hidden">
<Image src="/kortix-symbol.svg" alt="Suna" width={16} height={16} className="object-contain" />
</div>
<span className="text-gray-700 font-medium">Suna</span>
</div>
<span className="text-gray-700 font-medium">Suna</span>
</div>
)}
{/* Message content */}
{message.type === 'tool_call' && message.tool_call ? (
@ -1317,7 +1361,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
}
}}
>
<IconComponent className="h-3.5 w-3.5 text-gray-500 flex-shrink-0 animate-spin animation-duration-2000" />
<CircleDashed className="h-3.5 w-3.5 text-gray-500 flex-shrink-0 animate-spin animation-duration-2000" />
<span className="font-mono text-xs text-gray-700">
{toolName}
</span>
@ -1351,7 +1395,56 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
) : (
// Plain text message
<div className="max-w-[85%] text-sm text-gray-800 whitespace-pre-wrap break-words">
{message.content}
{(() => {
// Parse XML from assistant messages
if (message.role === 'assistant') {
const assistantContent = message.content || '';
const xmlRegex = /<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>|<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/;
const xmlMatch = assistantContent.match(xmlRegex);
if (xmlMatch) {
// Get tag name from either the first capturing group (full tag) or second capturing group (self-closing)
const toolName = xmlMatch[1] || xmlMatch[2];
const preContent = assistantContent.substring(0, xmlMatch.index).trim();
const postContent = assistantContent.substring(xmlMatch.index + xmlMatch[0].length).trim();
const IconComponent = getToolIcon(toolName);
const paramDisplay = extractPrimaryParam(toolName, assistantContent);
return (
<>
{preContent && <p className="mb-2">{preContent}</p>}
<button
onClick={() => {
// Create a synthetic pair to match the history format
const syntheticPair = {
assistantCall: message,
userResult: { content: "", role: "user" } // Empty result
};
handleHistoricalToolClick(syntheticPair);
}}
className="inline-flex items-center gap-1.5 py-0.5 px-2 text-xs text-gray-600 bg-gray-100 hover:bg-gray-200 rounded-md transition-colors cursor-pointer border border-gray-200 mb-2"
>
<IconComponent className="h-3.5 w-3.5 text-gray-500 flex-shrink-0" />
<span className="font-mono text-xs text-gray-700">
{toolName}
</span>
{paramDisplay && (
<span className="ml-1 text-gray-500 truncate" title={paramDisplay}>
{paramDisplay}
</span>
)}
</button>
{postContent && <p>{postContent}</p>}
</>
);
}
}
// Default rendering for non-XML or non-assistant messages
return message.content;
})()}
</div>
)}
</div>

View File

@ -174,6 +174,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten
oldParts: { text: string; highlighted: boolean }[];
newParts: { text: string; highlighted: boolean }[];
};
// Add a unique key for React rendering
key: string;
};
const diff: DiffLine[] = [];
@ -194,6 +196,7 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten
oldIndex,
newIndex,
oldContent: oldLine,
key: `unchanged-${oldIndex}-${newIndex}`
});
oldIndex++;
newIndex++;
@ -256,7 +259,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten
highlights: {
oldParts,
newParts
}
},
key: `modified-${oldIndex}-${newIndex}`
});
oldIndex++;
@ -270,7 +274,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten
diff.push({
type: 'added',
newIndex,
newContent: newLine
newContent: newLine,
key: `added-${newIndex}`
});
newIndex++;
} else if (newLine === null) {
@ -278,7 +283,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten
diff.push({
type: 'removed',
oldIndex,
oldContent: oldLine
oldContent: oldLine,
key: `removed-${oldIndex}`
});
oldIndex++;
} else {
@ -297,7 +303,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten
diff.push({
type: 'added',
newIndex: newIndex + i,
newContent: newLines[newIndex + i]
newContent: newLines[newIndex + i],
key: `added-lookahead-${newIndex + i}`
});
}
foundMatch = true;
@ -314,7 +321,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten
diff.push({
type: 'removed',
oldIndex: oldIndex + i,
oldContent: oldLines[oldIndex + i]
oldContent: oldLines[oldIndex + i],
key: `removed-lookahead-${oldIndex + i}`
});
}
foundMatch = true;
@ -329,12 +337,14 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten
diff.push({
type: 'removed',
oldIndex,
oldContent: oldLine
oldContent: oldLine,
key: `removed-nomatch-${oldIndex}`
});
diff.push({
type: 'added',
newIndex,
newContent: newLine
newContent: newLine,
key: `added-nomatch-${newIndex}`
});
oldIndex++;
newIndex++;
@ -1185,7 +1195,7 @@ export function ToolCallSidePanel({
const showNavigation = isHistoricalPair(content) && totalPairs > 1 && currentIndex !== null;
// Get VNC preview URL from project if available
const vncPreviewUrl = project?.sandbox?.vnc_preview;
const vncPreviewUrl = project?.sandbox?.vnc_preview ? `${project.sandbox.vnc_preview}/vnc_lite.html?password=${project?.sandbox?.pass}` : undefined;
// Get the sandbox ID from project for todo.md fetching
const sandboxId = project?.sandbox?.id || null;

View File

@ -602,7 +602,7 @@ export const streamAgent = (agentRunId: string, callbacks: {
if (!session?.access_token) {
console.error('[STREAM] No auth token available');
callbacks.onError('Authentication required');
callbacks.onError(new Error('Authentication required'));
callbacks.onClose();
return;
}
@ -622,77 +622,50 @@ export const streamAgent = (agentRunId: string, callbacks: {
const rawData = event.data;
if (rawData.includes('"type":"ping"')) return;
// Skip empty messages
if (!rawData || rawData.trim() === '') return;
// Log raw data for debugging
console.log(`[STREAM] Received data: ${rawData.substring(0, 100)}${rawData.length > 100 ? '...' : ''}`);
let jsonData;
try {
jsonData = JSON.parse(rawData);
} catch (parseError) {
console.error('[STREAM] Failed to parse message:', parseError);
return;
}
// Handle stream errors and failures first
if (jsonData.status === 'error' || (jsonData.type === 'status' && jsonData.status === 'failed')) {
// Get a clean string version of any error message
const errorMessage = typeof jsonData.message === 'object'
? JSON.stringify(jsonData.message)
: String(jsonData.message || 'Stream failed');
// Only log to console if it's an unexpected error (not a known API error response)
if (jsonData.status !== 'error') {
console.error(`[STREAM] Stream error for ${agentRunId}:`, errorMessage);
}
// Ensure we close the stream and prevent reconnection
if (!isClosing) {
isClosing = true;
if (eventSourceInstance) {
eventSourceInstance.close();
eventSourceInstance = null;
}
callbacks.onError(errorMessage);
callbacks.onClose();
}
return;
}
// Handle completion status
if (jsonData.type === 'status' && jsonData.status === 'completed') {
console.log(`[STREAM] Completion message received for ${agentRunId}`);
if (!isClosing) {
isClosing = true;
callbacks.onMessage(rawData);
if (eventSourceInstance) {
eventSourceInstance.close();
eventSourceInstance = null;
}
callbacks.onClose();
}
return;
}
// Pass other messages normally
if (!isClosing) {
callbacks.onMessage(rawData);
}
} catch (error) {
console.error(`[STREAM] Error in message handler:`, error);
if (!isClosing) {
isClosing = true;
if (eventSourceInstance) {
eventSourceInstance.close();
eventSourceInstance = null;
}
callbacks.onError(error instanceof Error ? error.message : 'Stream processing error');
callbacks.onClose();
// Skip empty messages
if (!rawData || rawData.trim() === '') {
console.debug('[STREAM] Received empty message, skipping');
return;
}
// Check if this is a status completion message
if (rawData.includes('"type":"status"') && rawData.includes('"status":"completed"')) {
console.log(`[STREAM] ⚠️ Detected completion status message: ${rawData}`);
try {
// Explicitly call onMessage before closing the stream to ensure the message is processed
callbacks.onMessage(rawData);
// Explicitly close the EventSource connection when we receive a completion message
if (eventSourceInstance && !isClosing) {
console.log(`[STREAM] ⚠️ Closing EventSource due to completion message for ${agentRunId}`);
isClosing = true;
eventSourceInstance.close();
eventSourceInstance = null;
// Explicitly call onClose here to ensure the client knows the stream is closed
setTimeout(() => {
console.log(`[STREAM] 🚨 Explicitly calling onClose after completion for ${agentRunId}`);
callbacks.onClose();
}, 0);
}
// Exit early to prevent duplicate message processing
return;
} catch (closeError) {
console.error(`[STREAM] ❌ Error while closing stream on completion: ${closeError}`);
// Continue with normal processing if there's an error during closure
}
}
// Pass the raw data directly to onMessage for handling in the component
callbacks.onMessage(rawData);
} catch (error) {
console.error(`[STREAM] Error handling message:`, error);
callbacks.onError(error instanceof Error ? error : String(error));
}
};
@ -700,6 +673,7 @@ export const streamAgent = (agentRunId: string, callbacks: {
// Add detailed event logging
console.log(`[STREAM] 🔍 EventSource onerror triggered for ${agentRunId}`, event);
// EventSource errors are often just connection closures
// For clean closures (manual or completed), we don't need to log an error
if (isClosing) {
console.log(`[STREAM] EventSource closed as expected for ${agentRunId}`);
@ -707,10 +681,10 @@ export const streamAgent = (agentRunId: string, callbacks: {
}
// Only log as error for unexpected closures
console.error(`[STREAM] EventSource connection error/closed unexpectedly for ${agentRunId}`);
console.log(`[STREAM] EventSource connection closed for ${agentRunId}`);
if (!isClosing) {
console.log(`[STREAM] Handling unexpected connection close for ${agentRunId}`);
console.log(`[STREAM] Handling connection close for ${agentRunId}`);
// Close the connection
if (eventSourceInstance) {
@ -718,9 +692,8 @@ export const streamAgent = (agentRunId: string, callbacks: {
eventSourceInstance = null;
}
// Then notify error and close (once)
// Then notify error (once)
isClosing = true;
callbacks.onError(new Error('Stream connection closed unexpectedly.')); // Add error callback
callbacks.onClose();
}
};