Compare commits

...

2 Commits

Author SHA1 Message Date
LE Quoc Dat 290fa6a8d3 fix vercel 2025-04-24 22:08:14 +01:00
LE Quoc Dat 98c494807b fix stream issue 2025-04-24 15:53:10 +01:00
3 changed files with 228 additions and 232 deletions

View File

@ -217,6 +217,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const [currentToolIndex, setCurrentToolIndex] = useState<number>(0); const [currentToolIndex, setCurrentToolIndex] = useState<number>(0);
const [autoOpenedPanel, setAutoOpenedPanel] = useState(false); const [autoOpenedPanel, setAutoOpenedPanel] = useState(false);
const [initialPanelOpenAttempted, setInitialPanelOpenAttempted] = useState(false); const [initialPanelOpenAttempted, setInitialPanelOpenAttempted] = useState(false);
const [streamingMessageId, setStreamingMessageId] = useState<string | null>(null);
// Billing alert state // Billing alert state
const [showBillingAlert, setShowBillingAlert] = useState(false); const [showBillingAlert, setShowBillingAlert] = useState(false);
@ -244,7 +245,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const messagesLoadedRef = useRef(false); const messagesLoadedRef = useRef(false);
const agentRunsCheckedRef = useRef(false); const agentRunsCheckedRef = useRef(false);
const previousAgentStatus = useRef<typeof agentStatus>('idle'); const previousAgentStatus = useRef<typeof agentStatus>('idle');
const pollingIntervalRef = useRef<NodeJS.Timeout | null>(null); // POLLING FOR MESSAGES
const handleProjectRenamed = useCallback((newName: string) => { const handleProjectRenamed = useCallback((newName: string) => {
setProjectName(newName); setProjectName(newName);
@ -337,27 +337,84 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
return () => window.removeEventListener('keydown', handleKeyDown); return () => window.removeEventListener('keydown', handleKeyDown);
}, [toggleSidePanel, isSidePanelOpen, leftSidebarState, setLeftSidebarOpen]); }, [toggleSidePanel, isSidePanelOpen, leftSidebarState, setLeftSidebarOpen]);
const handleNewMessageFromStream = useCallback((message: UnifiedMessage) => { // Callback for when assistant stream starts
// Log the ID of the message received from the stream const handleAssistantStreamStart = useCallback((initialMessage: UnifiedMessage) => {
console.log(`[STREAM HANDLER] Received message: ID=${message.message_id}, Type=${message.type}`); console.log('[PAGE] Assistant stream started. Adding placeholder message:', initialMessage.message_id);
if (!message.message_id) { setMessages(prev => {
console.warn(`[STREAM HANDLER] Received message is missing ID: Type=${message.type}, Content=${message.content?.substring(0, 50)}...`); // Avoid adding duplicates if reconnect happens quickly
if (prev.some(m => m.message_id === initialMessage.message_id)) {
return prev;
}
return [...prev, initialMessage];
});
setStreamingMessageId(initialMessage.message_id);
if (!userHasScrolled) scrollToBottom('smooth'); // Scroll down when assistant starts
}, [userHasScrolled]);
// Callback for receiving assistant message chunks
const handleAssistantStreamChunk = useCallback(({ content: chunkContent, message_id }: { content: string; message_id: string | null }) => {
if (!message_id) {
console.warn("[PAGE] Received assistant chunk without a message_id. Cannot update state.");
return;
} }
setMessages(prevMessages => {
// Find the index of the message being streamed
const msgIndex = prevMessages.findIndex(m => m.message_id === message_id);
if (msgIndex === -1) {
console.warn(`[PAGE] Could not find message with ID ${message_id} to append chunk.`);
// Optionally: Could try adding a new message if not found, but might indicate a logic error
return prevMessages;
}
// Create a new array with the updated message
return prevMessages.map((msg, index) => {
if (index === msgIndex) {
// Parse the existing content
const existingParsedContent = safeJsonParse<ParsedContent>(msg.content, { role: 'assistant', content: '' });
// Append the new chunk to the inner content string
const newInnerContent = (existingParsedContent.content || '') + chunkContent;
// Create the updated content object and stringify it
const updatedContentObject: ParsedContent = { ...existingParsedContent, content: newInnerContent };
return {
...msg,
content: JSON.stringify(updatedContentObject),
updated_at: new Date().toISOString() // Update timestamp
};
}
return msg;
});
});
// Continuously scroll if user hasn't manually scrolled up
if (!userHasScrolled) scrollToBottom('auto'); // Use 'auto' for potentially smoother streaming scroll
}, [userHasScrolled]);
// Callback for receiving complete messages (Handles the final update)
const handleNewMessageFromStream = useCallback((message: UnifiedMessage) => {
console.log(`[PAGE - onMessage] Received complete message: ID=${message.message_id}, Type=${message.type}`);
setMessages(prev => { setMessages(prev => {
const messageExists = prev.some(m => m.message_id === message.message_id); const messageExists = prev.some(m => m.message_id === message.message_id);
// Update existing message (especially the final assistant one) or add new (e.g., tool result)
if (messageExists) { if (messageExists) {
console.log(`[PAGE - onMessage] Updating existing message ID: ${message.message_id}`);
return prev.map(m => m.message_id === message.message_id ? message : m); return prev.map(m => m.message_id === message.message_id ? message : m);
} else { } else {
console.log(`[PAGE - onMessage] Adding new message ID: ${message.message_id}`);
return [...prev, message]; return [...prev, message];
} }
}); });
// If we received the final assistant message, clear the streaming ID
// If we received a tool message, refresh the tool panel if (message.message_id === streamingMessageId && message.type === 'assistant') {
console.log(`[PAGE - onMessage] Finalizing streaming for message ID: ${streamingMessageId}`);
setStreamingMessageId(null);
}
// Reset auto-opened panel state if needed (e.g., after tool message)
if (message.type === 'tool') { if (message.type === 'tool') {
setAutoOpenedPanel(false); setAutoOpenedPanel(false);
} }
}, []); // Scroll to bottom for final messages
if (!userHasScrolled) scrollToBottom('smooth');
}, [streamingMessageId, userHasScrolled]);
const handleStreamStatusChange = useCallback((hookStatus: string) => { const handleStreamStatusChange = useCallback((hookStatus: string) => {
console.log(`[PAGE] Hook status changed: ${hookStatus}`); console.log(`[PAGE] Hook status changed: ${hookStatus}`);
@ -372,11 +429,16 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setAgentRunId(null); setAgentRunId(null);
// Reset auto-opened state when agent completes to trigger tool detection // Reset auto-opened state when agent completes to trigger tool detection
setAutoOpenedPanel(false); setAutoOpenedPanel(false);
// Ensure streaming ID is cleared if hook terminates unexpectedly
if (streamingMessageId) {
console.log('[PAGE] Clearing streamingMessageId due to hook termination.');
setStreamingMessageId(null);
}
// After terminal states, we should scroll to bottom to show latest messages // After terminal states, we should scroll to bottom to show latest messages
// The hook will already have refetched messages by this point // The hook will already have refetched messages by this point
if (['completed', 'stopped', 'agent_not_running', 'error', 'failed'].includes(hookStatus)) { if (['completed', 'stopped', 'agent_not_running', 'error', 'failed'].includes(hookStatus)) {
scrollToBottom('smooth'); if (!userHasScrolled) scrollToBottom('smooth');
} }
break; break;
case 'connecting': case 'connecting':
@ -386,7 +448,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setAgentStatus('running'); setAgentStatus('running');
break; break;
} }
}, []); }, [streamingMessageId, userHasScrolled]);
const handleStreamError = useCallback((errorMessage: string) => { const handleStreamError = useCallback((errorMessage: string) => {
console.error(`[PAGE] Stream hook error: ${errorMessage}`); console.error(`[PAGE] Stream hook error: ${errorMessage}`);
@ -394,15 +456,24 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
!errorMessage.toLowerCase().includes('agent run is not running')) { !errorMessage.toLowerCase().includes('agent run is not running')) {
toast.error(`Stream Error: ${errorMessage}`); toast.error(`Stream Error: ${errorMessage}`);
} }
}, []); // Ensure streaming ID is cleared on error
if (streamingMessageId) {
console.log('[PAGE] Clearing streamingMessageId due to stream error.');
setStreamingMessageId(null);
}
}, [streamingMessageId]);
const handleStreamClose = useCallback(() => { const handleStreamClose = useCallback(() => {
console.log(`[PAGE] Stream hook closed with final status: ${agentStatus}`); console.log(`[PAGE] Stream hook closed with final status: ${agentStatus}`);
}, [agentStatus]); // Ensure streaming ID is cleared when stream closes
if (streamingMessageId) {
console.log('[PAGE] Clearing streamingMessageId due to stream close.');
setStreamingMessageId(null);
}
}, [agentStatus, streamingMessageId]);
const { const {
status: streamHookStatus, status: streamHookStatus,
textContent: streamingTextContent,
toolCall: streamingToolCall, toolCall: streamingToolCall,
error: streamError, error: streamError,
agentRunId: currentHookRunId, agentRunId: currentHookRunId,
@ -410,6 +481,8 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
stopStreaming, stopStreaming,
} = useAgentStream({ } = useAgentStream({
onMessage: handleNewMessageFromStream, onMessage: handleNewMessageFromStream,
onAssistantStart: handleAssistantStreamStart,
onAssistantChunk: handleAssistantStreamChunk,
onStatusChange: handleStreamStatusChange, onStatusChange: handleStreamStatusChange,
onError: handleStreamError, onError: handleStreamError,
onClose: handleStreamClose, onClose: handleStreamClose,
@ -466,7 +539,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const messagesData = await getMessages(threadId); const messagesData = await getMessages(threadId);
if (isMounted) { if (isMounted) {
// Log raw messages fetched from API // Log raw messages fetched from API
console.log('[PAGE] Raw messages fetched:', messagesData); console.log('[PAGE] Raw messages fetched:', messagesData?.length);
// Map API message type to UnifiedMessage type // Map API message type to UnifiedMessage type
const unifiedMessages = (messagesData || []) const unifiedMessages = (messagesData || [])
@ -479,19 +552,12 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
console.warn(`[MAP ${index}] Non-status message fetched from API is missing ID: Type=${msg.type}`); console.warn(`[MAP ${index}] Non-status message fetched from API is missing ID: Type=${msg.type}`);
} }
const threadIdMapped = msg.thread_id || threadId; const threadIdMapped = msg.thread_id || threadId;
console.log(`[MAP ${index}] Accessed msg.thread_id (using fallback):`, threadIdMapped);
const typeMapped = (msg.type || 'system') as UnifiedMessage['type']; const typeMapped = (msg.type || 'system') as UnifiedMessage['type'];
console.log(`[MAP ${index}] Accessed msg.type (using fallback):`, typeMapped);
const isLlmMessageMapped = Boolean(msg.is_llm_message); const isLlmMessageMapped = Boolean(msg.is_llm_message);
console.log(`[MAP ${index}] Accessed msg.is_llm_message:`, isLlmMessageMapped);
const contentMapped = msg.content || ''; const contentMapped = msg.content || '';
console.log(`[MAP ${index}] Accessed msg.content (using fallback):`, contentMapped.substring(0, 50) + '...');
const metadataMapped = msg.metadata || '{}'; const metadataMapped = msg.metadata || '{}';
console.log(`[MAP ${index}] Accessed msg.metadata (using fallback):`, metadataMapped);
const createdAtMapped = msg.created_at || new Date().toISOString(); const createdAtMapped = msg.created_at || new Date().toISOString();
console.log(`[MAP ${index}] Accessed msg.created_at (using fallback):`, createdAtMapped);
const updatedAtMapped = msg.updated_at || new Date().toISOString(); const updatedAtMapped = msg.updated_at || new Date().toISOString();
console.log(`[MAP ${index}] Accessed msg.updated_at (using fallback):`, updatedAtMapped);
return { return {
message_id: messageId || null, message_id: messageId || null,
@ -506,29 +572,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
}); });
setMessages(unifiedMessages); // Set the filtered and mapped messages setMessages(unifiedMessages); // Set the filtered and mapped messages
console.log('[PAGE] Loaded Messages (excluding status, keeping browser_state):', unifiedMessages.length) console.log('[PAGE] Loaded Messages (excluding status):', unifiedMessages.length)
// Debug loaded messages
const assistantMessages = unifiedMessages.filter(m => m.type === 'assistant');
const toolMessages = unifiedMessages.filter(m => m.type === 'tool');
console.log('[PAGE] Assistant messages:', assistantMessages.length);
console.log('[PAGE] Tool messages:', toolMessages.length);
// Check if tool messages have associated assistant messages
toolMessages.forEach(toolMsg => {
try {
const metadata = JSON.parse(toolMsg.metadata);
if (metadata.assistant_message_id) {
const hasAssociated = assistantMessages.some(
assMsg => assMsg.message_id === metadata.assistant_message_id
);
console.log(`[PAGE] Tool message ${toolMsg.message_id} references assistant ${metadata.assistant_message_id} - found: ${hasAssociated}`);
}
} catch (e) {
console.error("Error parsing tool message metadata:", e);
}
});
messagesLoadedRef.current = true; messagesLoadedRef.current = true;
if (!hasInitiallyScrolled.current) { if (!hasInitiallyScrolled.current) {
@ -646,9 +690,9 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setUserHasScrolled(isScrolledUp); setUserHasScrolled(isScrolledUp);
}; };
const scrollToBottom = (behavior: ScrollBehavior = 'smooth') => { const scrollToBottom = useCallback((behavior: ScrollBehavior = 'smooth') => {
messagesEndRef.current?.scrollIntoView({ behavior }); messagesEndRef.current?.scrollIntoView({ behavior });
}; }, []);
useEffect(() => { useEffect(() => {
const lastMsg = messages[messages.length - 1]; const lastMsg = messages[messages.length - 1];
@ -666,7 +710,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
); );
observer.observe(latestMessageRef.current); observer.observe(latestMessageRef.current);
return () => observer.disconnect(); return () => observer.disconnect();
}, [messages, streamingTextContent, streamingToolCall, setShowScrollButton]); }, [messages, streamingToolCall, setShowScrollButton]);
const handleScrollButtonClick = () => { const handleScrollButtonClick = () => {
scrollToBottom('smooth'); scrollToBottom('smooth');
@ -674,7 +718,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
}; };
useEffect(() => { useEffect(() => {
console.log(`[PAGE] 🔄 Page AgentStatus: ${agentStatus}, Hook Status: ${streamHookStatus}, Target RunID: ${agentRunId || 'none'}, Hook RunID: ${currentHookRunId || 'none'}`); console.log(`[PAGE] 🔄 Page AgentStatus: ${agentStatus}, Hook Status: ${streamHookStatus}, Target RunID: ${agentRunId || 'none'}, Hook RunID: ${currentHookRunId || 'none'}, StreamingMsgID: ${streamingMessageId || 'none'}`);
// If the stream hook reports completion/stopping but our UI hasn't updated // If the stream hook reports completion/stopping but our UI hasn't updated
if ((streamHookStatus === 'completed' || streamHookStatus === 'stopped' || if ((streamHookStatus === 'completed' || streamHookStatus === 'stopped' ||
@ -685,7 +729,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setAgentRunId(null); setAgentRunId(null);
setAutoOpenedPanel(false); setAutoOpenedPanel(false);
} }
}, [agentStatus, streamHookStatus, agentRunId, currentHookRunId]); }, [agentStatus, streamHookStatus, agentRunId, currentHookRunId, streamingMessageId]);
const handleOpenFileViewer = useCallback((filePath?: string) => { const handleOpenFileViewer = useCallback((filePath?: string) => {
if (filePath) { if (filePath) {
@ -943,7 +987,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setCurrentToolIndex(0); setCurrentToolIndex(0);
setIsSidePanelOpen(true); setIsSidePanelOpen(true);
}, []); }, [isSidePanelOpen]);
// SEO title update // SEO title update
useEffect(() => { useEffect(() => {
@ -970,63 +1014,15 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
} }
}, [projectName]); }, [projectName]);
// POLLING FOR MESSAGES // POLLING FOR MESSAGES - Keep disabled for now, let the stream hook handle final refetch
// Set up polling for messages // useEffect(() => {
useEffect(() => { // const fetchMessages = async () => { /* ... */ };
// Function to fetch messages // if (initialLoadCompleted.current && !pollingIntervalRef.current) {
const fetchMessages = async () => { // fetchMessages();
if (!threadId) return; // pollingIntervalRef.current = setInterval(fetchMessages, 2000);
// }
try { // return () => { /* cleanup interval */ };
console.log('[POLLING] Refetching messages...'); // }, [threadId, userHasScrolled, initialLoadCompleted]);
const messagesData = await getMessages(threadId);
if (messagesData) {
console.log(`[POLLING] Refetch completed with ${messagesData.length} messages`);
// Map API message type to UnifiedMessage type
const unifiedMessages = (messagesData || [])
.filter(msg => msg.type !== 'status')
.map((msg: ApiMessageType) => ({
message_id: msg.message_id || null,
thread_id: msg.thread_id || threadId,
type: (msg.type || 'system') as UnifiedMessage['type'],
is_llm_message: Boolean(msg.is_llm_message),
content: msg.content || '',
metadata: msg.metadata || '{}',
created_at: msg.created_at || new Date().toISOString(),
updated_at: msg.updated_at || new Date().toISOString()
}));
setMessages(unifiedMessages);
// Only auto-scroll if not manually scrolled up
if (!userHasScrolled) {
scrollToBottom('smooth');
}
}
} catch (error) {
console.error('[POLLING] Error fetching messages:', error);
}
};
// Start polling once initial load is complete
if (initialLoadCompleted.current && !pollingIntervalRef.current) {
// Initial fetch
fetchMessages();
// Set up interval (every 2 seconds)
pollingIntervalRef.current = setInterval(fetchMessages, 2000);
}
// Clean up interval when component unmounts
return () => {
if (pollingIntervalRef.current) {
clearInterval(pollingIntervalRef.current);
pollingIntervalRef.current = null;
}
};
}, [threadId, userHasScrolled, initialLoadCompleted]);
// POLLING FOR MESSAGES
// Add another useEffect to ensure messages are refreshed when agent status changes to idle // Add another useEffect to ensure messages are refreshed when agent status changes to idle
useEffect(() => { useEffect(() => {
@ -1061,7 +1057,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
}).catch(err => { }).catch(err => {
console.error('Error in backup message refetch:', err); console.error('Error in backup message refetch:', err);
}); });
}, 1000); }, 500); // Shortened delay
return () => clearTimeout(timer); return () => clearTimeout(timer);
} }
@ -1349,7 +1345,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
onScroll={handleScroll} onScroll={handleScroll}
> >
<div className="mx-auto max-w-3xl"> <div className="mx-auto max-w-3xl">
{messages.length === 0 && !streamingTextContent && !streamingToolCall && agentStatus === 'idle' ? ( {messages.length === 0 && !streamingToolCall && agentStatus === 'idle' ? (
<div className="flex h-full items-center justify-center"> <div className="flex h-full items-center justify-center">
<div className="text-center text-muted-foreground">Send a message to start.</div> <div className="text-center text-muted-foreground">Send a message to start.</div>
</div> </div>
@ -1429,7 +1425,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
)} )}
{/* Use the helper function to render user attachments */} {/* Use the helper function to render user attachments */}
{renderAttachments(attachments as string[], handleOpenFileViewer)} {renderAttachments(attachments, handleOpenFileViewer)}
</div> </div>
</div> </div>
</div> </div>
@ -1444,113 +1440,67 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
<div className="flex-1"> <div className="flex-1">
<div className="inline-flex max-w-[90%] rounded-lg bg-muted/5 px-4 py-3 text-sm"> <div className="inline-flex max-w-[90%] rounded-lg bg-muted/5 px-4 py-3 text-sm">
<div className="space-y-2"> <div className="space-y-2">
{(() => { {/* --- START: Updated Assistant Message Rendering --- */}
const toolResultsMap = new Map<string | null, UnifiedMessage[]>(); {group.messages.map((message, msgIndex) => {
group.messages.forEach(msg => { if (message.type === 'assistant') {
if (msg.type === 'tool') { const parsedContent = safeJsonParse<ParsedContent>(message.content, { content: '' });
const meta = safeJsonParse<ParsedMetadata>(msg.metadata, {}); const msgKey = message.message_id || `submsg-assistant-${msgIndex}`;
const assistantId = meta.assistant_message_id || null;
if (!toolResultsMap.has(assistantId)) {
toolResultsMap.set(assistantId, []);
}
toolResultsMap.get(assistantId)?.push(msg);
}
});
const renderedToolResultIds = new Set<string>();
const elements: React.ReactNode[] = [];
group.messages.forEach((message, msgIndex) => { // Don't render empty non-streaming messages
if (message.type === 'assistant') { if (!parsedContent.content && message.message_id !== streamingMessageId) return null;
const parsedContent = safeJsonParse<ParsedContent>(message.content, {});
const msgKey = message.message_id || `submsg-assistant-${msgIndex}`;
if (!parsedContent.content) return; // Render the content using Markdown, which will now update incrementally
const renderedContent = renderMarkdownContent(
parsedContent.content || '', // Pass the accumulated content
handleToolClick,
message.message_id,
handleOpenFileViewer
);
const renderedContent = renderMarkdownContent( // Check if this is the message currently being streamed
parsedContent.content, const isStreamingThisMessage = message.message_id === streamingMessageId;
handleToolClick,
message.message_id,
handleOpenFileViewer
);
elements.push( return (
<div key={msgKey} className={msgIndex > 0 ? "mt-2" : ""}> <div key={msgKey} className={msgIndex > 0 && group.messages[msgIndex-1]?.type === 'assistant' ? "mt-2" : ""}>
<div className="prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3"> <div className="prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3">
{renderedContent} {renderedContent}
</div> {/* Add the blinking cursor ONLY to the currently streaming message */}
{isStreamingThisMessage && (streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && (
<span className="inline-block h-4 w-0.5 bg-primary ml-0.5 -mb-1 animate-pulse" />
)}
</div> </div>
); </div>
} );
}); }
// Skip rendering tool messages directly here, they are handled via tool buttons inside assistant messages
return null;
})}
{/* --- END: Updated Assistant Message Rendering --- */}
return elements; {/* Render streaming tool call indicator if needed (e.g., <execute-command>...) */}
})()} {groupIndex === groupedMessages.length - 1 && streamingToolCall && (streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && (
<div className="mt-2 mb-1">
{(() => {
const toolName = streamingToolCall.name || streamingToolCall.xml_tag_name || 'Tool';
const IconComponent = getToolIcon(toolName);
const paramDisplay = extractPrimaryParam(toolName, streamingToolCall.arguments || '');
// Don't render the streaming tool indicator if the text content is already showing it
const hideToolIndicator = streamingMessageId && HIDE_STREAMING_XML_TAGS.has(toolName);
if (hideToolIndicator) return null;
{groupIndex === groupedMessages.length - 1 && (streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && ( return (
<div className="mt-2"> <button
{(() => { className="inline-flex items-center gap-1.5 py-1 px-2.5 text-xs font-medium text-primary bg-primary/10 hover:bg-primary/20 rounded-md transition-colors cursor-pointer border border-primary/20"
let detectedTag: string | null = null; >
let tagStartIndex = -1; <CircleDashed className="h-3.5 w-3.5 text-primary flex-shrink-0 animate-spin animation-duration-2000" />
if (streamingTextContent) { <span className="font-mono text-xs text-primary">{toolName}</span>
for (const tag of HIDE_STREAMING_XML_TAGS) { {paramDisplay && <span className="ml-1 text-primary/70 truncate max-w-[200px]" title={paramDisplay}>{paramDisplay}</span>}
const openingTagPattern = `<${tag}`; </button>
const index = streamingTextContent.indexOf(openingTagPattern); );
if (index !== -1) { })()}
detectedTag = tag; </div>
tagStartIndex = index; )}
break;
}
}
}
const textToRender = streamingTextContent || '';
const textBeforeTag = detectedTag ? textToRender.substring(0, tagStartIndex) : textToRender;
const showCursor = (streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && !detectedTag;
return (
<>
{textBeforeTag && (
<Markdown className="text-sm prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3">{textBeforeTag}</Markdown>
)}
{showCursor && (
<span className="inline-block h-4 w-0.5 bg-primary ml-0.5 -mb-1 animate-pulse" />
)}
{detectedTag && (
<div className="mt-2 mb-1">
<button
className="inline-flex items-center gap-1.5 py-1 px-2.5 text-xs font-medium text-primary bg-primary/10 hover:bg-primary/20 rounded-md transition-colors cursor-pointer border border-primary/20"
>
<CircleDashed className="h-3.5 w-3.5 text-primary flex-shrink-0 animate-spin animation-duration-2000" />
<span className="font-mono text-xs text-primary">{detectedTag}</span>
</button>
</div>
)}
{streamingToolCall && !detectedTag && (
<div className="mt-2 mb-1">
{(() => {
const toolName = streamingToolCall.name || streamingToolCall.xml_tag_name || 'Tool';
const IconComponent = getToolIcon(toolName);
const paramDisplay = extractPrimaryParam(toolName, streamingToolCall.arguments || '');
return (
<button
className="inline-flex items-center gap-1.5 py-1 px-2.5 text-xs font-medium text-primary bg-primary/10 hover:bg-primary/20 rounded-md transition-colors cursor-pointer border border-primary/20"
>
<CircleDashed className="h-3.5 w-3.5 text-primary flex-shrink-0 animate-spin animation-duration-2000" />
<span className="font-mono text-xs text-primary">{toolName}</span>
{paramDisplay && <span className="ml-1 text-primary/70 truncate max-w-[200px]" title={paramDisplay}>{paramDisplay}</span>}
</button>
);
})()}
</div>
)}
</>
);
})()}
</div>
)}
</div> </div>
</div> </div>
</div> </div>
@ -1561,7 +1511,8 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
return null; return null;
}); });
})()} })()}
{(agentStatus === 'running' || agentStatus === 'connecting') && {(agentStatus === 'running' || agentStatus === 'connecting') &&
!streamingMessageId && // Only show thinking indicator if not actively streaming text
(messages.length === 0 || messages[messages.length - 1].type === 'user') && ( (messages.length === 0 || messages[messages.length - 1].type === 'user') && (
<div ref={latestMessageRef}> <div ref={latestMessageRef}>
<div className="flex items-start gap-3"> <div className="flex items-start gap-3">
@ -1597,6 +1548,14 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
"mx-auto", "mx-auto",
isMobile ? "w-full px-4" : "max-w-3xl" isMobile ? "w-full px-4" : "max-w-3xl"
)}> )}>
{showScrollButton && (
<button
onClick={handleScrollButtonClick}
className="absolute bottom-24 right-6 z-20 flex h-8 w-8 items-center justify-center rounded-full bg-primary text-primary-foreground shadow-md transition-opacity hover:bg-primary/90 focus:outline-none focus:ring-2 focus:ring-primary focus:ring-offset-2"
>
<ArrowDown className="h-4 w-4" />
</button>
)}
<ChatInput <ChatInput
value={newMessage} value={newMessage}
onChange={setNewMessage} onChange={setNewMessage}
@ -1618,7 +1577,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
onClose={() => { onClose={() => {
setIsSidePanelOpen(false); setIsSidePanelOpen(false);
userClosedPanelRef.current = true; userClosedPanelRef.current = true;
setAutoOpenedPanel(true);
}} }}
toolCalls={toolCalls} toolCalls={toolCalls}
messages={messages as ApiMessageType[]} messages={messages as ApiMessageType[]}

View File

@ -221,6 +221,8 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const messagesLoadedRef = useRef(false); const messagesLoadedRef = useRef(false);
const agentRunsCheckedRef = useRef(false); const agentRunsCheckedRef = useRef(false);
const [streamingTextContent, setStreamingTextContent] = useState("");
const handleProjectRenamed = useCallback((newName: string) => { const handleProjectRenamed = useCallback((newName: string) => {
setProjectName(newName); setProjectName(newName);
}, []); }, []);
@ -569,7 +571,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const { const {
status: streamHookStatus, status: streamHookStatus,
textContent: streamingTextContent,
toolCall: streamingToolCall, toolCall: streamingToolCall,
error: streamError, error: streamError,
agentRunId: currentHookRunId, agentRunId: currentHookRunId,
@ -577,6 +578,10 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
stopStreaming, stopStreaming,
} = useAgentStream({ } = useAgentStream({
onMessage: handleNewMessageFromStream, onMessage: handleNewMessageFromStream,
onAssistantStart: () => {},
onAssistantChunk: ({ content }) => {
setStreamingTextContent(prev => prev + content);
},
onStatusChange: handleStreamStatusChange, onStatusChange: handleStreamStatusChange,
onError: handleStreamError, onError: handleStreamError,
onClose: handleStreamClose, onClose: handleStreamClose,

View File

@ -24,7 +24,6 @@ interface ApiMessageType {
// Define the structure returned by the hook // Define the structure returned by the hook
export interface UseAgentStreamResult { export interface UseAgentStreamResult {
status: string; status: string;
textContent: string;
toolCall: ParsedContent | null; toolCall: ParsedContent | null;
error: string | null; error: string | null;
agentRunId: string | null; // Expose the currently managed agentRunId agentRunId: string | null; // Expose the currently managed agentRunId
@ -34,7 +33,9 @@ export interface UseAgentStreamResult {
// Define the callbacks the hook consumer can provide // Define the callbacks the hook consumer can provide
export interface AgentStreamCallbacks { export interface AgentStreamCallbacks {
onMessage: (message: UnifiedMessage) => void; // Callback for complete messages onMessage: (message: UnifiedMessage) => void; // For complete messages/non-assistant
onAssistantChunk: (chunk: { content: string; message_id: string | null }) => void; // NEW: Callback for assistant chunks
onAssistantStart: (initialMessage: UnifiedMessage) => void; // NEW: Callback when assistant stream starts
onStatusChange?: (status: string) => void; // Optional: Notify on internal status changes onStatusChange?: (status: string) => void; // Optional: Notify on internal status changes
onError?: (error: string) => void; // Optional: Notify on errors onError?: (error: string) => void; // Optional: Notify on errors
onClose?: (finalStatus: string) => void; // Optional: Notify when streaming definitively ends onClose?: (finalStatus: string) => void; // Optional: Notify when streaming definitively ends
@ -59,7 +60,6 @@ const mapApiMessagesToUnified = (messagesData: ApiMessageType[] | null | undefin
export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string, setMessages: (messages: UnifiedMessage[]) => void): UseAgentStreamResult { export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string, setMessages: (messages: UnifiedMessage[]) => void): UseAgentStreamResult {
const [agentRunId, setAgentRunId] = useState<string | null>(null); const [agentRunId, setAgentRunId] = useState<string | null>(null);
const [status, setStatus] = useState<string>('idle'); const [status, setStatus] = useState<string>('idle');
const [textContent, setTextContent] = useState<string>('');
const [toolCall, setToolCall] = useState<ParsedContent | null>(null); const [toolCall, setToolCall] = useState<ParsedContent | null>(null);
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
@ -68,6 +68,7 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
const currentRunIdRef = useRef<string | null>(null); // Ref to track the run ID being processed const currentRunIdRef = useRef<string | null>(null); // Ref to track the run ID being processed
const threadIdRef = useRef(threadId); // Ref to hold the current threadId const threadIdRef = useRef(threadId); // Ref to hold the current threadId
const setMessagesRef = useRef(setMessages); // Ref to hold the setMessages function const setMessagesRef = useRef(setMessages); // Ref to hold the setMessages function
const assistantMessageIdRef = useRef<string | null>(null); // Track the ID of the message being streamed
// Update refs if threadId or setMessages changes // Update refs if threadId or setMessages changes
useEffect(() => { useEffect(() => {
@ -117,8 +118,8 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
} }
// Reset streaming-specific state // Reset streaming-specific state
setTextContent('');
setToolCall(null); setToolCall(null);
assistantMessageIdRef.current = null; // Ensure reset on finalize
// Update status and clear run ID // Update status and clear run ID
updateStatus(finalStatus); updateStatus(finalStatus);
@ -195,23 +196,59 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
switch (message.type) { switch (message.type) {
case 'assistant': case 'assistant':
if (parsedMetadata.stream_status === 'chunk' && parsedContent.content) { if (parsedMetadata.stream_status === 'chunk' && parsedContent.content) {
setTextContent(prev => prev + parsedContent.content); if (assistantMessageIdRef.current === null) {
const tempId = message.message_id || `streaming-${Date.now()}`;
assistantMessageIdRef.current = tempId;
const initialMessage: UnifiedMessage = {
...message,
message_id: tempId,
content: JSON.stringify({ role: 'assistant', content: '' }),
metadata: JSON.stringify({ ...parsedMetadata, stream_status: 'streaming' })
};
console.log('[useAgentStream] Assistant stream started, calling onAssistantStart with ID:', tempId);
callbacks.onAssistantStart(initialMessage);
}
callbacks.onAssistantChunk({
content: parsedContent.content,
message_id: assistantMessageIdRef.current
});
} else if (parsedMetadata.stream_status === 'complete') { } else if (parsedMetadata.stream_status === 'complete') {
setTextContent(''); console.log('[useAgentStream] Received complete assistant message ID:', message.message_id);
setToolCall(null); setToolCall(null);
if (message.message_id) callbacks.onMessage(message); if (message.message_id) {
callbacks.onMessage(message);
} else if (assistantMessageIdRef.current) {
console.warn('[useAgentStream] Complete message missing ID, using tracked ID:', assistantMessageIdRef.current);
callbacks.onMessage({ ...message, message_id: assistantMessageIdRef.current });
} else {
console.error('[useAgentStream] Received complete assistant message without an ID and no ID was tracked.');
}
assistantMessageIdRef.current = null;
} else if (!parsedMetadata.stream_status) { } else if (!parsedMetadata.stream_status) {
// Handle non-chunked assistant messages if needed assistantMessageIdRef.current = null;
console.log('[useAgentStream] Received non-chunked assistant message ID:', message.message_id);
if (message.message_id) callbacks.onMessage(message); if (message.message_id) callbacks.onMessage(message);
} }
break; break;
case 'tool': case 'tool':
setToolCall(null); // Clear any streaming tool call assistantMessageIdRef.current = null;
setToolCall(null);
console.log('[useAgentStream] Received tool message ID:', message.message_id);
if (message.message_id) callbacks.onMessage(message); if (message.message_id) callbacks.onMessage(message);
break; break;
case 'status': case 'status':
if (parsedContent.status_type === 'thread_run_end' || parsedContent.status_type === 'finish' || parsedContent.status_type === 'error') {
if(assistantMessageIdRef.current) {
console.log(`[useAgentStream] Resetting assistantMessageIdRef due to status: ${parsedContent.status_type}`);
assistantMessageIdRef.current = null;
}
}
switch (parsedContent.status_type) { switch (parsedContent.status_type) {
case 'tool_started': case 'tool_started':
if(assistantMessageIdRef.current) {
console.log('[useAgentStream] Resetting assistantMessageIdRef due to tool_started');
assistantMessageIdRef.current = null;
}
setToolCall({ setToolCall({
role: 'assistant', role: 'assistant',
status_type: 'tool_started', status_type: 'tool_started',
@ -232,30 +269,27 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
console.log('[useAgentStream] Received thread run end status, finalizing.'); console.log('[useAgentStream] Received thread run end status, finalizing.');
break; break;
case 'finish': case 'finish':
// Optional: Handle finish reasons like 'xml_tool_limit_reached'
console.log('[useAgentStream] Received finish status:', parsedContent.finish_reason); console.log('[useAgentStream] Received finish status:', parsedContent.finish_reason);
// Don't finalize here, wait for thread_run_end or completion message
break; break;
case 'error': case 'error':
console.error('[useAgentStream] Received error status message:', parsedContent.message); console.error('[useAgentStream] Received error status message:', parsedContent.message);
setError(parsedContent.message || 'Agent run failed'); setError(parsedContent.message || 'Agent run failed');
finalizeStream('error', currentRunIdRef.current); finalizeStream('error', currentRunIdRef.current);
break; break;
// Ignore thread_run_start, assistant_response_start etc. for now
default: default:
// console.debug('[useAgentStream] Received unhandled status type:', parsedContent.status_type);
break; break;
} }
break; break;
case 'user': case 'user':
case 'system': case 'system':
// Handle other message types if necessary, e.g., if backend sends historical context assistantMessageIdRef.current = null;
if (message.message_id) callbacks.onMessage(message); if (message.message_id) callbacks.onMessage(message);
break; break;
default: default:
assistantMessageIdRef.current = null;
console.warn('[useAgentStream] Unhandled message type:', message.type); console.warn('[useAgentStream] Unhandled message type:', message.type);
} }
}, [threadId, setMessages, status, toolCall, callbacks, finalizeStream, updateStatus]); }, [status, toolCall, callbacks, finalizeStream, updateStatus]);
const handleStreamError = useCallback((err: Error | string | Event) => { const handleStreamError = useCallback((err: Error | string | Event) => {
if (!isMountedRef.current) return; if (!isMountedRef.current) return;
@ -392,11 +426,11 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
} }
// Reset state on unmount if needed, though finalizeStream should handle most cases // Reset state on unmount if needed, though finalizeStream should handle most cases
setStatus('idle'); setStatus('idle');
setTextContent('');
setToolCall(null); setToolCall(null);
setError(null); setError(null);
setAgentRunId(null); setAgentRunId(null);
currentRunIdRef.current = null; currentRunIdRef.current = null;
assistantMessageIdRef.current = null; // Reset assistant tracking on unmount
}; };
}, []); // Empty dependency array for mount/unmount effect }, []); // Empty dependency array for mount/unmount effect
@ -414,9 +448,9 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
} }
// Reset state before starting // Reset state before starting
setTextContent('');
setToolCall(null); setToolCall(null);
setError(null); setError(null);
assistantMessageIdRef.current = null; // Reset assistant tracking
updateStatus('connecting'); updateStatus('connecting');
setAgentRunId(runId); setAgentRunId(runId);
currentRunIdRef.current = runId; // Set the ref immediately currentRunIdRef.current = runId; // Set the ref immediately
@ -481,7 +515,6 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
return { return {
status, status,
textContent,
toolCall, toolCall,
error, error,
agentRunId, agentRunId,