From 1aa87c3ff1854665a4af20d23b0cc3c202a644f9 Mon Sep 17 00:00:00 2001 From: Adam Cohen Hillel Date: Thu, 17 Apr 2025 13:24:42 +0100 Subject: [PATCH] fix streaming --- .../app/dashboard/agents/[threadId]/page.tsx | 257 ++++++++++++------ .../thread/tool-call-side-panel.tsx | 26 +- frontend/src/lib/api.ts | 117 +++----- 3 files changed, 238 insertions(+), 162 deletions(-) diff --git a/frontend/src/app/dashboard/agents/[threadId]/page.tsx b/frontend/src/app/dashboard/agents/[threadId]/page.tsx index cca135c1..5971a0ed 100644 --- a/frontend/src/app/dashboard/agents/[threadId]/page.tsx +++ b/frontend/src/app/dashboard/agents/[threadId]/page.tsx @@ -63,34 +63,49 @@ const getToolIcon = (toolName: string): ElementType => { // Convert to lowercase for case-insensitive matching const normalizedName = toolName.toLowerCase(); + // Check for browser-related tools with a prefix check + if (normalizedName.startsWith('browser-')) { + return Globe; + } + switch (normalizedName) { + // File operations case 'create-file': case 'str-replace': - case 'write-file': + case 'full-file-rewrite': + case 'read-file': return FileEdit; - case 'run_terminal_cmd': - case 'run_command': + + // Shell commands + case 'execute-command': return Terminal; - case 'web_search': + + // Web operations + case 'web-search': return Search; - case 'browse_url': - return Globe; - case 'call_api': - return Code; - case 'send_message': - return MessageSquare; - case 'list_dir': - return Folder; - case 'read_file': - return FileText; - case 'delete_file': + + // API and data operations + case 'call-data-provider': + case 'get-data-provider-endpoints': + return ExternalLink; // Using ExternalLink instead of Database which isn't imported + + // Code operations + case 'delete-file': return FileX; - case 'deploy': + + // Deployment + case 'deploy-site': return CloudUpload; + + // Tools and utilities + case 'execute-code': + return Code; + + // Default case default: // Add logging for debugging unhandled tool types console.log(`[PAGE] Using default icon for unknown tool type: ${toolName}`); - return Cog; // Default icon + return Wrench; // Default icon for tools } }; @@ -99,31 +114,65 @@ const extractPrimaryParam = (toolName: string, content: string | undefined): str if (!content) return null; try { + // Handle browser tools with a prefix check + if (toolName?.toLowerCase().startsWith('browser-')) { + // Try to extract URL for navigation + const urlMatch = content.match(/url=(?:"|')([^"|']+)(?:"|')/); + if (urlMatch) return urlMatch[1]; + + // For other browser operations, extract the goal or action + const goalMatch = content.match(/goal=(?:"|')([^"|']+)(?:"|')/); + if (goalMatch) { + const goal = goalMatch[1]; + return goal.length > 30 ? goal.substring(0, 27) + '...' : goal; + } + + return null; + } + // Simple regex for common parameters - adjust as needed let match: RegExpMatchArray | null = null; + switch (toolName?.toLowerCase()) { - case 'edit_file': - case 'read_file': - case 'delete_file': - case 'write_file': - match = content.match(/target_file=(?:"|')([^"|']+)(?:"|')/); + // File operations + case 'create-file': + case 'full-file-rewrite': + case 'read-file': + case 'delete-file': + case 'str-replace': + // Try to match file_path attribute + match = content.match(/file_path=(?:"|')([^"|']+)(?:"|')/); // Return just the filename part return match ? match[1].split('/').pop() || match[1] : null; - case 'run_terminal_cmd': - case 'run_command': + + // Shell commands + case 'execute-command': + // Extract command content match = content.match(/command=(?:"|')([^"|']+)(?:"|')/); - // Truncate long commands - return match ? (match[1].length > 30 ? match[1].substring(0, 27) + '...' : match[1]) : null; - case 'web_search': + if (match) { + const cmd = match[1]; + return cmd.length > 30 ? cmd.substring(0, 27) + '...' : cmd; + } + return null; + + // Web search + case 'web-search': match = content.match(/query=(?:"|')([^"|']+)(?:"|')/); return match ? (match[1].length > 30 ? match[1].substring(0, 27) + '...' : match[1]) : null; - case 'browse_url': - match = content.match(/url=(?:"|')([^"|']+)(?:"|')/); + + // Data provider operations + case 'call-data-provider': + match = content.match(/service_name=(?:"|')([^"|']+)(?:"|')/); + const route = content.match(/route=(?:"|')([^"|']+)(?:"|')/); + return match && route ? `${match[1]}/${route[1]}` : (match ? match[1] : null); + + // Deployment + case 'deploy-site': + match = content.match(/site_name=(?:"|')([^"|']+)(?:"|')/); return match ? match[1] : null; - // Add more cases as needed for other tools - default: - return null; } + + return null; } catch (e) { console.warn("Error parsing tool parameters:", e); return null; @@ -146,14 +195,14 @@ function groupMessages(messages: ApiMessage[]): RenderItem[] { // Check if current message is the start of a potential sequence if (currentMsg.role === 'assistant') { - // Regex to find the first XML-like tag: or - const toolTagMatch = currentMsg.content?.match(/<([a-zA-Z\-_]+)(?:\s+[^>]*)?>/); + // Regex to find the first XML-like tag: or or self-closing tags + const toolTagMatch = currentMsg.content?.match(/<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/); if (toolTagMatch && nextMsg && nextMsg.role === 'user') { const expectedTag = toolTagMatch[1]; // Regex to check for ... - // Using 's' flag for dotall to handle multiline content within tags -> Replaced with [\s\S] to avoid ES target issues - const toolResultRegex = new RegExp(`^\\s*<(${expectedTag})(?:\\s+[^>]*)?>[\\s\\S]*?\\s*`); + // Also handle self-closing tags in the response + const toolResultRegex = new RegExp(`^\\s*<(${expectedTag})(?:\\s+[^>]*)?(?:/>|>[\\s\\S]*?)\\s*`); if (nextMsg.content?.match(toolResultRegex)) { // Found a pair, start a sequence @@ -167,12 +216,12 @@ function groupMessages(messages: ApiMessage[]): RenderItem[] { const potentialUser = i + 1 < messages.length ? messages[i + 1] : null; if (potentialAssistant.role === 'assistant') { - const nextToolTagMatch = potentialAssistant.content?.match(/<([a-zA-Z\-_]+)(?:\s+[^>]*)?>/); + const nextToolTagMatch = potentialAssistant.content?.match(/<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/); if (nextToolTagMatch && potentialUser && potentialUser.role === 'user') { const nextExpectedTag = nextToolTagMatch[1]; - // Replaced dotall 's' flag with [\s\S] - const nextToolResultRegex = new RegExp(`^\\s*<(${nextExpectedTag})(?:\\s+[^>]*)?>[\\s\\S]*?\\s*`); + // Also handle self-closing tags in the response + const nextToolResultRegex = new RegExp(`^\\s*<(${nextExpectedTag})(?:\\s+[^>]*)?(?:/>|>[\\s\\S]*?)\\s*`); if (potentialUser.content?.match(nextToolResultRegex)) { // Sequence continues @@ -429,7 +478,7 @@ export default function ThreadPage({ params }: { params: Promise } // If there's a specific finish reason, handle it if (jsonData.finish_reason === 'xml_tool_limit_reached') { // Potentially show a toast notification - toast.info('Tool execution limit reached. The agent will continue with available information.'); + // toast.info('Tool execution limit reached. The agent will continue with available information.'); } return; } @@ -456,17 +505,9 @@ export default function ThreadPage({ params }: { params: Promise } }; setToolCallData(toolInfo); - // Optionally add to stream content to show tool execution - setStreamContent(prev => - prev + `\n\nExecuting tool: ${jsonData.function_name}\n` - ); } else if (jsonData.status === 'completed') { // Update UI to show tool completion setToolCallData(null); - // Optionally add to stream content - setStreamContent(prev => - prev + `\nTool execution completed: ${jsonData.function_name}\n` - ); } return; } @@ -479,10 +520,6 @@ export default function ThreadPage({ params }: { params: Promise } setSidePanelContent(null); setToolCallData(null); - // Add tool result to the stream content for visibility - setStreamContent(prev => - prev + `\nReceived result from ${jsonData.function_name}\n` - ); return; } @@ -499,11 +536,6 @@ export default function ThreadPage({ params }: { params: Promise } setCurrentPairIndex(null); // Live data means not viewing a historical pair setSidePanelContent(currentLiveToolCall); // Update side panel - // Add to stream content so it's visible - setStreamContent(prev => - prev + `\nCalling tool: ${currentLiveToolCall.name}\n` - ); - if (!isSidePanelOpen) { // Optionally auto-open side panel? Maybe only if user hasn't closed it recently. // setIsSidePanelOpen(true); @@ -1008,11 +1040,12 @@ export default function ThreadPage({ params }: { params: Promise } // Extract only the XML part and the tool name from the assistant message const assistantContent = pair.assistantCall.content || ''; - // Find the first opening tag and the corresponding closing tag - const xmlRegex = /<([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>/; + // First try to match complete tags, then try self-closing tags + const xmlRegex = /<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>|<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/; const xmlMatch = assistantContent.match(xmlRegex); const toolCallXml = xmlMatch ? xmlMatch[0] : '[Could not extract XML tag]'; - const assistantToolName = xmlMatch ? xmlMatch[1] : 'Tool'; // Extract name from the matched tag + // Get tag name from either the first capturing group (full tag) or second capturing group (self-closing) + const assistantToolName = xmlMatch ? (xmlMatch[1] || xmlMatch[2]) : 'Tool'; const userResultContent = pair.userResult.content?.match(/([\s\S]*)<\/tool_result>/)?.[1].trim() || '[Could not parse result]'; @@ -1040,10 +1073,10 @@ export default function ThreadPage({ params }: { params: Promise } // Re-extract data for the side panel (similar to handleHistoricalToolClick) const assistantContent = pair.assistantCall.content || ''; - const xmlRegex = /<([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>/; + const xmlRegex = /<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>|<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/; const xmlMatch = assistantContent.match(xmlRegex); const toolCallXml = xmlMatch ? xmlMatch[0] : '[Could not extract XML tag]'; - const assistantToolName = xmlMatch ? xmlMatch[1] : 'Tool'; + const assistantToolName = xmlMatch ? (xmlMatch[1] || xmlMatch[2]) : 'Tool'; const userToolName = pair.userResult.content?.match(/\s*<([a-zA-Z\-_]+)/)?.[1] || 'Tool'; const userResultContent = pair.userResult.content?.match(/([\s\S]*)<\/tool_result>/)?.[1].trim() || '[Could not parse result]'; @@ -1173,6 +1206,12 @@ export default function ThreadPage({ params }: { params: Promise }
{/* Map over processed messages */} {processedMessages.map((item, index) => { + // Check if this message is an assistant message that follows a user message + const prevMessage = index > 0 ? processedMessages[index - 1] : null; + const isAssistantAfterUser = + (isToolSequence(item) || ((item as ApiMessage).role === 'assistant')) && + (prevMessage && !isToolSequence(prevMessage) && (prevMessage as ApiMessage).role === 'user'); + // ---- Rendering Logic for Tool Sequences ---- if (isToolSequence(item)) { // Group sequence items into pairs of [assistant, user] @@ -1187,24 +1226,27 @@ export default function ThreadPage({ params }: { params: Promise }
- {/* Simplified header with logo and name */} -
-
- Suna + {/* Show header only if this is an assistant message after a user message */} + {isAssistantAfterUser && ( +
+
+ Suna +
+ Suna
- Suna -
+ )} {/* Container for the pairs within the sequence */}
{pairs.map((pair, pairIndex) => { // Parse assistant message content const assistantContent = pair.assistantCall.content || ''; - const xmlRegex = /<([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>/; + const xmlRegex = /<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>|<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/; const xmlMatch = assistantContent.match(xmlRegex); - const toolName = xmlMatch ? xmlMatch[1] : 'Tool'; + // Get tag name from either the first or second capturing group + const toolName = xmlMatch ? (xmlMatch[1] || xmlMatch[2]) : 'Tool'; const preContent = xmlMatch ? assistantContent.substring(0, xmlMatch.index).trim() : assistantContent.trim(); const postContent = xmlMatch ? assistantContent.substring(xmlMatch.index + xmlMatch[0].length).trim() : ''; const userResultName = pair.userResult.content?.match(/\s*<([a-zA-Z\-_]+)/)?.[1] || 'Result'; @@ -1275,24 +1317,26 @@ export default function ThreadPage({ params }: { params: Promise }
0 ? 'border-t border-gray-100' : ''}`} // Add top border between messages + className={`${message.role === 'user' ? 'text-right py-1' : 'py-2'}`} // Removed border-t > {/* Avatar (User = Right, Assistant/Tool = Left) */} {message.role === 'user' ? ( - // User bubble comes first in flex-end -
+ // User bubble with rounded background that fits to text +
{message.content}
) : ( // Assistant / Tool bubble on the left
- {/* Simplified header with logo and name */} -
-
- Suna + {/* Show header only if this is an assistant message after a user message */} + {isAssistantAfterUser && ( +
+
+ Suna +
+ Suna
- Suna -
+ )} {/* Message content */} {message.type === 'tool_call' && message.tool_call ? ( @@ -1317,7 +1361,7 @@ export default function ThreadPage({ params }: { params: Promise } } }} > - + {toolName} @@ -1351,7 +1395,56 @@ export default function ThreadPage({ params }: { params: Promise } ) : ( // Plain text message
- {message.content} + {(() => { + // Parse XML from assistant messages + if (message.role === 'assistant') { + const assistantContent = message.content || ''; + const xmlRegex = /<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?>[\s\S]*?<\/\1>|<(?!inform\b)([a-zA-Z\-_]+)(?:\s+[^>]*)?(?:\/)?>/; + const xmlMatch = assistantContent.match(xmlRegex); + + if (xmlMatch) { + // Get tag name from either the first capturing group (full tag) or second capturing group (self-closing) + const toolName = xmlMatch[1] || xmlMatch[2]; + const preContent = assistantContent.substring(0, xmlMatch.index).trim(); + const postContent = assistantContent.substring(xmlMatch.index + xmlMatch[0].length).trim(); + const IconComponent = getToolIcon(toolName); + const paramDisplay = extractPrimaryParam(toolName, assistantContent); + + return ( + <> + {preContent &&

{preContent}

} + + + + {postContent &&

{postContent}

} + + ); + } + } + + // Default rendering for non-XML or non-assistant messages + return message.content; + })()}
)}
diff --git a/frontend/src/components/thread/tool-call-side-panel.tsx b/frontend/src/components/thread/tool-call-side-panel.tsx index 4b8c151b..e9479230 100644 --- a/frontend/src/components/thread/tool-call-side-panel.tsx +++ b/frontend/src/components/thread/tool-call-side-panel.tsx @@ -174,6 +174,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten oldParts: { text: string; highlighted: boolean }[]; newParts: { text: string; highlighted: boolean }[]; }; + // Add a unique key for React rendering + key: string; }; const diff: DiffLine[] = []; @@ -194,6 +196,7 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten oldIndex, newIndex, oldContent: oldLine, + key: `unchanged-${oldIndex}-${newIndex}` }); oldIndex++; newIndex++; @@ -256,7 +259,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten highlights: { oldParts, newParts - } + }, + key: `modified-${oldIndex}-${newIndex}` }); oldIndex++; @@ -270,7 +274,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten diff.push({ type: 'added', newIndex, - newContent: newLine + newContent: newLine, + key: `added-${newIndex}` }); newIndex++; } else if (newLine === null) { @@ -278,7 +283,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten diff.push({ type: 'removed', oldIndex, - oldContent: oldLine + oldContent: oldLine, + key: `removed-${oldIndex}` }); oldIndex++; } else { @@ -297,7 +303,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten diff.push({ type: 'added', newIndex: newIndex + i, - newContent: newLines[newIndex + i] + newContent: newLines[newIndex + i], + key: `added-lookahead-${newIndex + i}` }); } foundMatch = true; @@ -314,7 +321,8 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten diff.push({ type: 'removed', oldIndex: oldIndex + i, - oldContent: oldLines[oldIndex + i] + oldContent: oldLines[oldIndex + i], + key: `removed-lookahead-${oldIndex + i}` }); } foundMatch = true; @@ -329,12 +337,14 @@ function StrReplaceToolView({ assistantContent, userContent }: { assistantConten diff.push({ type: 'removed', oldIndex, - oldContent: oldLine + oldContent: oldLine, + key: `removed-nomatch-${oldIndex}` }); diff.push({ type: 'added', newIndex, - newContent: newLine + newContent: newLine, + key: `added-nomatch-${newIndex}` }); oldIndex++; newIndex++; @@ -1185,7 +1195,7 @@ export function ToolCallSidePanel({ const showNavigation = isHistoricalPair(content) && totalPairs > 1 && currentIndex !== null; // Get VNC preview URL from project if available - const vncPreviewUrl = project?.sandbox?.vnc_preview; + const vncPreviewUrl = project?.sandbox?.vnc_preview ? `${project.sandbox.vnc_preview}/vnc_lite.html?password=${project?.sandbox?.pass}` : undefined; // Get the sandbox ID from project for todo.md fetching const sandboxId = project?.sandbox?.id || null; diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index c88a8960..fe9267ef 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -602,7 +602,7 @@ export const streamAgent = (agentRunId: string, callbacks: { if (!session?.access_token) { console.error('[STREAM] No auth token available'); - callbacks.onError('Authentication required'); + callbacks.onError(new Error('Authentication required')); callbacks.onClose(); return; } @@ -622,77 +622,50 @@ export const streamAgent = (agentRunId: string, callbacks: { const rawData = event.data; if (rawData.includes('"type":"ping"')) return; - // Skip empty messages - if (!rawData || rawData.trim() === '') return; - // Log raw data for debugging console.log(`[STREAM] Received data: ${rawData.substring(0, 100)}${rawData.length > 100 ? '...' : ''}`); - - let jsonData; - try { - jsonData = JSON.parse(rawData); - } catch (parseError) { - console.error('[STREAM] Failed to parse message:', parseError); - return; - } - - // Handle stream errors and failures first - if (jsonData.status === 'error' || (jsonData.type === 'status' && jsonData.status === 'failed')) { - // Get a clean string version of any error message - const errorMessage = typeof jsonData.message === 'object' - ? JSON.stringify(jsonData.message) - : String(jsonData.message || 'Stream failed'); - - // Only log to console if it's an unexpected error (not a known API error response) - if (jsonData.status !== 'error') { - console.error(`[STREAM] Stream error for ${agentRunId}:`, errorMessage); - } - - // Ensure we close the stream and prevent reconnection - if (!isClosing) { - isClosing = true; - if (eventSourceInstance) { - eventSourceInstance.close(); - eventSourceInstance = null; - } - callbacks.onError(errorMessage); - callbacks.onClose(); - } - return; - } - - // Handle completion status - if (jsonData.type === 'status' && jsonData.status === 'completed') { - console.log(`[STREAM] Completion message received for ${agentRunId}`); - - if (!isClosing) { - isClosing = true; - callbacks.onMessage(rawData); - if (eventSourceInstance) { - eventSourceInstance.close(); - eventSourceInstance = null; - } - callbacks.onClose(); - } - return; - } - - // Pass other messages normally - if (!isClosing) { - callbacks.onMessage(rawData); - } - } catch (error) { - console.error(`[STREAM] Error in message handler:`, error); - if (!isClosing) { - isClosing = true; - if (eventSourceInstance) { - eventSourceInstance.close(); - eventSourceInstance = null; - } - callbacks.onError(error instanceof Error ? error.message : 'Stream processing error'); - callbacks.onClose(); + // Skip empty messages + if (!rawData || rawData.trim() === '') { + console.debug('[STREAM] Received empty message, skipping'); + return; } + + // Check if this is a status completion message + if (rawData.includes('"type":"status"') && rawData.includes('"status":"completed"')) { + console.log(`[STREAM] ⚠️ Detected completion status message: ${rawData}`); + + try { + // Explicitly call onMessage before closing the stream to ensure the message is processed + callbacks.onMessage(rawData); + + // Explicitly close the EventSource connection when we receive a completion message + if (eventSourceInstance && !isClosing) { + console.log(`[STREAM] ⚠️ Closing EventSource due to completion message for ${agentRunId}`); + isClosing = true; + eventSourceInstance.close(); + eventSourceInstance = null; + + // Explicitly call onClose here to ensure the client knows the stream is closed + setTimeout(() => { + console.log(`[STREAM] 🚨 Explicitly calling onClose after completion for ${agentRunId}`); + callbacks.onClose(); + }, 0); + } + + // Exit early to prevent duplicate message processing + return; + } catch (closeError) { + console.error(`[STREAM] ❌ Error while closing stream on completion: ${closeError}`); + // Continue with normal processing if there's an error during closure + } + } + + // Pass the raw data directly to onMessage for handling in the component + callbacks.onMessage(rawData); + } catch (error) { + console.error(`[STREAM] Error handling message:`, error); + callbacks.onError(error instanceof Error ? error : String(error)); } }; @@ -700,6 +673,7 @@ export const streamAgent = (agentRunId: string, callbacks: { // Add detailed event logging console.log(`[STREAM] 🔍 EventSource onerror triggered for ${agentRunId}`, event); + // EventSource errors are often just connection closures // For clean closures (manual or completed), we don't need to log an error if (isClosing) { console.log(`[STREAM] EventSource closed as expected for ${agentRunId}`); @@ -707,10 +681,10 @@ export const streamAgent = (agentRunId: string, callbacks: { } // Only log as error for unexpected closures - console.error(`[STREAM] EventSource connection error/closed unexpectedly for ${agentRunId}`); + console.log(`[STREAM] EventSource connection closed for ${agentRunId}`); if (!isClosing) { - console.log(`[STREAM] Handling unexpected connection close for ${agentRunId}`); + console.log(`[STREAM] Handling connection close for ${agentRunId}`); // Close the connection if (eventSourceInstance) { @@ -718,9 +692,8 @@ export const streamAgent = (agentRunId: string, callbacks: { eventSourceInstance = null; } - // Then notify error and close (once) + // Then notify error (once) isClosing = true; - callbacks.onError(new Error('Stream connection closed unexpectedly.')); // Add error callback callbacks.onClose(); } };