Compare commits

...

2 Commits

Author SHA1 Message Date
LE Quoc Dat 290fa6a8d3 fix vercel 2025-04-24 22:08:14 +01:00
LE Quoc Dat 98c494807b fix stream issue 2025-04-24 15:53:10 +01:00
3 changed files with 228 additions and 232 deletions

View File

@ -217,6 +217,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const [currentToolIndex, setCurrentToolIndex] = useState<number>(0);
const [autoOpenedPanel, setAutoOpenedPanel] = useState(false);
const [initialPanelOpenAttempted, setInitialPanelOpenAttempted] = useState(false);
const [streamingMessageId, setStreamingMessageId] = useState<string | null>(null);
// Billing alert state
const [showBillingAlert, setShowBillingAlert] = useState(false);
@ -244,7 +245,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const messagesLoadedRef = useRef(false);
const agentRunsCheckedRef = useRef(false);
const previousAgentStatus = useRef<typeof agentStatus>('idle');
const pollingIntervalRef = useRef<NodeJS.Timeout | null>(null); // POLLING FOR MESSAGES
const handleProjectRenamed = useCallback((newName: string) => {
setProjectName(newName);
@ -337,27 +337,84 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
return () => window.removeEventListener('keydown', handleKeyDown);
}, [toggleSidePanel, isSidePanelOpen, leftSidebarState, setLeftSidebarOpen]);
const handleNewMessageFromStream = useCallback((message: UnifiedMessage) => {
// Log the ID of the message received from the stream
console.log(`[STREAM HANDLER] Received message: ID=${message.message_id}, Type=${message.type}`);
if (!message.message_id) {
console.warn(`[STREAM HANDLER] Received message is missing ID: Type=${message.type}, Content=${message.content?.substring(0, 50)}...`);
// Callback for when assistant stream starts
const handleAssistantStreamStart = useCallback((initialMessage: UnifiedMessage) => {
console.log('[PAGE] Assistant stream started. Adding placeholder message:', initialMessage.message_id);
setMessages(prev => {
// Avoid adding duplicates if reconnect happens quickly
if (prev.some(m => m.message_id === initialMessage.message_id)) {
return prev;
}
return [...prev, initialMessage];
});
setStreamingMessageId(initialMessage.message_id);
if (!userHasScrolled) scrollToBottom('smooth'); // Scroll down when assistant starts
}, [userHasScrolled]);
// Callback for receiving assistant message chunks
const handleAssistantStreamChunk = useCallback(({ content: chunkContent, message_id }: { content: string; message_id: string | null }) => {
if (!message_id) {
console.warn("[PAGE] Received assistant chunk without a message_id. Cannot update state.");
return;
}
setMessages(prevMessages => {
// Find the index of the message being streamed
const msgIndex = prevMessages.findIndex(m => m.message_id === message_id);
if (msgIndex === -1) {
console.warn(`[PAGE] Could not find message with ID ${message_id} to append chunk.`);
// Optionally: Could try adding a new message if not found, but might indicate a logic error
return prevMessages;
}
// Create a new array with the updated message
return prevMessages.map((msg, index) => {
if (index === msgIndex) {
// Parse the existing content
const existingParsedContent = safeJsonParse<ParsedContent>(msg.content, { role: 'assistant', content: '' });
// Append the new chunk to the inner content string
const newInnerContent = (existingParsedContent.content || '') + chunkContent;
// Create the updated content object and stringify it
const updatedContentObject: ParsedContent = { ...existingParsedContent, content: newInnerContent };
return {
...msg,
content: JSON.stringify(updatedContentObject),
updated_at: new Date().toISOString() // Update timestamp
};
}
return msg;
});
});
// Continuously scroll if user hasn't manually scrolled up
if (!userHasScrolled) scrollToBottom('auto'); // Use 'auto' for potentially smoother streaming scroll
}, [userHasScrolled]);
// Callback for receiving complete messages (Handles the final update)
const handleNewMessageFromStream = useCallback((message: UnifiedMessage) => {
console.log(`[PAGE - onMessage] Received complete message: ID=${message.message_id}, Type=${message.type}`);
setMessages(prev => {
const messageExists = prev.some(m => m.message_id === message.message_id);
// Update existing message (especially the final assistant one) or add new (e.g., tool result)
if (messageExists) {
console.log(`[PAGE - onMessage] Updating existing message ID: ${message.message_id}`);
return prev.map(m => m.message_id === message.message_id ? message : m);
} else {
console.log(`[PAGE - onMessage] Adding new message ID: ${message.message_id}`);
return [...prev, message];
}
});
// If we received a tool message, refresh the tool panel
// If we received the final assistant message, clear the streaming ID
if (message.message_id === streamingMessageId && message.type === 'assistant') {
console.log(`[PAGE - onMessage] Finalizing streaming for message ID: ${streamingMessageId}`);
setStreamingMessageId(null);
}
// Reset auto-opened panel state if needed (e.g., after tool message)
if (message.type === 'tool') {
setAutoOpenedPanel(false);
}
}, []);
// Scroll to bottom for final messages
if (!userHasScrolled) scrollToBottom('smooth');
}, [streamingMessageId, userHasScrolled]);
const handleStreamStatusChange = useCallback((hookStatus: string) => {
console.log(`[PAGE] Hook status changed: ${hookStatus}`);
@ -372,11 +429,16 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setAgentRunId(null);
// Reset auto-opened state when agent completes to trigger tool detection
setAutoOpenedPanel(false);
// Ensure streaming ID is cleared if hook terminates unexpectedly
if (streamingMessageId) {
console.log('[PAGE] Clearing streamingMessageId due to hook termination.');
setStreamingMessageId(null);
}
// After terminal states, we should scroll to bottom to show latest messages
// The hook will already have refetched messages by this point
if (['completed', 'stopped', 'agent_not_running', 'error', 'failed'].includes(hookStatus)) {
scrollToBottom('smooth');
if (!userHasScrolled) scrollToBottom('smooth');
}
break;
case 'connecting':
@ -386,7 +448,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setAgentStatus('running');
break;
}
}, []);
}, [streamingMessageId, userHasScrolled]);
const handleStreamError = useCallback((errorMessage: string) => {
console.error(`[PAGE] Stream hook error: ${errorMessage}`);
@ -394,15 +456,24 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
!errorMessage.toLowerCase().includes('agent run is not running')) {
toast.error(`Stream Error: ${errorMessage}`);
}
}, []);
// Ensure streaming ID is cleared on error
if (streamingMessageId) {
console.log('[PAGE] Clearing streamingMessageId due to stream error.');
setStreamingMessageId(null);
}
}, [streamingMessageId]);
const handleStreamClose = useCallback(() => {
console.log(`[PAGE] Stream hook closed with final status: ${agentStatus}`);
}, [agentStatus]);
// Ensure streaming ID is cleared when stream closes
if (streamingMessageId) {
console.log('[PAGE] Clearing streamingMessageId due to stream close.');
setStreamingMessageId(null);
}
}, [agentStatus, streamingMessageId]);
const {
status: streamHookStatus,
textContent: streamingTextContent,
toolCall: streamingToolCall,
error: streamError,
agentRunId: currentHookRunId,
@ -410,6 +481,8 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
stopStreaming,
} = useAgentStream({
onMessage: handleNewMessageFromStream,
onAssistantStart: handleAssistantStreamStart,
onAssistantChunk: handleAssistantStreamChunk,
onStatusChange: handleStreamStatusChange,
onError: handleStreamError,
onClose: handleStreamClose,
@ -466,7 +539,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const messagesData = await getMessages(threadId);
if (isMounted) {
// Log raw messages fetched from API
console.log('[PAGE] Raw messages fetched:', messagesData);
console.log('[PAGE] Raw messages fetched:', messagesData?.length);
// Map API message type to UnifiedMessage type
const unifiedMessages = (messagesData || [])
@ -479,19 +552,12 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
console.warn(`[MAP ${index}] Non-status message fetched from API is missing ID: Type=${msg.type}`);
}
const threadIdMapped = msg.thread_id || threadId;
console.log(`[MAP ${index}] Accessed msg.thread_id (using fallback):`, threadIdMapped);
const typeMapped = (msg.type || 'system') as UnifiedMessage['type'];
console.log(`[MAP ${index}] Accessed msg.type (using fallback):`, typeMapped);
const isLlmMessageMapped = Boolean(msg.is_llm_message);
console.log(`[MAP ${index}] Accessed msg.is_llm_message:`, isLlmMessageMapped);
const contentMapped = msg.content || '';
console.log(`[MAP ${index}] Accessed msg.content (using fallback):`, contentMapped.substring(0, 50) + '...');
const metadataMapped = msg.metadata || '{}';
console.log(`[MAP ${index}] Accessed msg.metadata (using fallback):`, metadataMapped);
const createdAtMapped = msg.created_at || new Date().toISOString();
console.log(`[MAP ${index}] Accessed msg.created_at (using fallback):`, createdAtMapped);
const updatedAtMapped = msg.updated_at || new Date().toISOString();
console.log(`[MAP ${index}] Accessed msg.updated_at (using fallback):`, updatedAtMapped);
return {
message_id: messageId || null,
@ -506,29 +572,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
});
setMessages(unifiedMessages); // Set the filtered and mapped messages
console.log('[PAGE] Loaded Messages (excluding status, keeping browser_state):', unifiedMessages.length)
// Debug loaded messages
const assistantMessages = unifiedMessages.filter(m => m.type === 'assistant');
const toolMessages = unifiedMessages.filter(m => m.type === 'tool');
console.log('[PAGE] Assistant messages:', assistantMessages.length);
console.log('[PAGE] Tool messages:', toolMessages.length);
// Check if tool messages have associated assistant messages
toolMessages.forEach(toolMsg => {
try {
const metadata = JSON.parse(toolMsg.metadata);
if (metadata.assistant_message_id) {
const hasAssociated = assistantMessages.some(
assMsg => assMsg.message_id === metadata.assistant_message_id
);
console.log(`[PAGE] Tool message ${toolMsg.message_id} references assistant ${metadata.assistant_message_id} - found: ${hasAssociated}`);
}
} catch (e) {
console.error("Error parsing tool message metadata:", e);
}
});
console.log('[PAGE] Loaded Messages (excluding status):', unifiedMessages.length)
messagesLoadedRef.current = true;
if (!hasInitiallyScrolled.current) {
@ -646,9 +690,9 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setUserHasScrolled(isScrolledUp);
};
const scrollToBottom = (behavior: ScrollBehavior = 'smooth') => {
const scrollToBottom = useCallback((behavior: ScrollBehavior = 'smooth') => {
messagesEndRef.current?.scrollIntoView({ behavior });
};
}, []);
useEffect(() => {
const lastMsg = messages[messages.length - 1];
@ -666,7 +710,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
);
observer.observe(latestMessageRef.current);
return () => observer.disconnect();
}, [messages, streamingTextContent, streamingToolCall, setShowScrollButton]);
}, [messages, streamingToolCall, setShowScrollButton]);
const handleScrollButtonClick = () => {
scrollToBottom('smooth');
@ -674,7 +718,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
};
useEffect(() => {
console.log(`[PAGE] 🔄 Page AgentStatus: ${agentStatus}, Hook Status: ${streamHookStatus}, Target RunID: ${agentRunId || 'none'}, Hook RunID: ${currentHookRunId || 'none'}`);
console.log(`[PAGE] 🔄 Page AgentStatus: ${agentStatus}, Hook Status: ${streamHookStatus}, Target RunID: ${agentRunId || 'none'}, Hook RunID: ${currentHookRunId || 'none'}, StreamingMsgID: ${streamingMessageId || 'none'}`);
// If the stream hook reports completion/stopping but our UI hasn't updated
if ((streamHookStatus === 'completed' || streamHookStatus === 'stopped' ||
@ -685,7 +729,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setAgentRunId(null);
setAutoOpenedPanel(false);
}
}, [agentStatus, streamHookStatus, agentRunId, currentHookRunId]);
}, [agentStatus, streamHookStatus, agentRunId, currentHookRunId, streamingMessageId]);
const handleOpenFileViewer = useCallback((filePath?: string) => {
if (filePath) {
@ -943,7 +987,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
setCurrentToolIndex(0);
setIsSidePanelOpen(true);
}, []);
}, [isSidePanelOpen]);
// SEO title update
useEffect(() => {
@ -970,63 +1014,15 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
}
}, [projectName]);
// POLLING FOR MESSAGES
// Set up polling for messages
useEffect(() => {
// Function to fetch messages
const fetchMessages = async () => {
if (!threadId) return;
try {
console.log('[POLLING] Refetching messages...');
const messagesData = await getMessages(threadId);
if (messagesData) {
console.log(`[POLLING] Refetch completed with ${messagesData.length} messages`);
// Map API message type to UnifiedMessage type
const unifiedMessages = (messagesData || [])
.filter(msg => msg.type !== 'status')
.map((msg: ApiMessageType) => ({
message_id: msg.message_id || null,
thread_id: msg.thread_id || threadId,
type: (msg.type || 'system') as UnifiedMessage['type'],
is_llm_message: Boolean(msg.is_llm_message),
content: msg.content || '',
metadata: msg.metadata || '{}',
created_at: msg.created_at || new Date().toISOString(),
updated_at: msg.updated_at || new Date().toISOString()
}));
setMessages(unifiedMessages);
// Only auto-scroll if not manually scrolled up
if (!userHasScrolled) {
scrollToBottom('smooth');
}
}
} catch (error) {
console.error('[POLLING] Error fetching messages:', error);
}
};
// Start polling once initial load is complete
if (initialLoadCompleted.current && !pollingIntervalRef.current) {
// Initial fetch
fetchMessages();
// Set up interval (every 2 seconds)
pollingIntervalRef.current = setInterval(fetchMessages, 2000);
}
// Clean up interval when component unmounts
return () => {
if (pollingIntervalRef.current) {
clearInterval(pollingIntervalRef.current);
pollingIntervalRef.current = null;
}
};
}, [threadId, userHasScrolled, initialLoadCompleted]);
// POLLING FOR MESSAGES
// POLLING FOR MESSAGES - Keep disabled for now, let the stream hook handle final refetch
// useEffect(() => {
// const fetchMessages = async () => { /* ... */ };
// if (initialLoadCompleted.current && !pollingIntervalRef.current) {
// fetchMessages();
// pollingIntervalRef.current = setInterval(fetchMessages, 2000);
// }
// return () => { /* cleanup interval */ };
// }, [threadId, userHasScrolled, initialLoadCompleted]);
// Add another useEffect to ensure messages are refreshed when agent status changes to idle
useEffect(() => {
@ -1061,7 +1057,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
}).catch(err => {
console.error('Error in backup message refetch:', err);
});
}, 1000);
}, 500); // Shortened delay
return () => clearTimeout(timer);
}
@ -1349,7 +1345,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
onScroll={handleScroll}
>
<div className="mx-auto max-w-3xl">
{messages.length === 0 && !streamingTextContent && !streamingToolCall && agentStatus === 'idle' ? (
{messages.length === 0 && !streamingToolCall && agentStatus === 'idle' ? (
<div className="flex h-full items-center justify-center">
<div className="text-center text-muted-foreground">Send a message to start.</div>
</div>
@ -1429,7 +1425,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
)}
{/* Use the helper function to render user attachments */}
{renderAttachments(attachments as string[], handleOpenFileViewer)}
{renderAttachments(attachments, handleOpenFileViewer)}
</div>
</div>
</div>
@ -1444,96 +1440,54 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
<div className="flex-1">
<div className="inline-flex max-w-[90%] rounded-lg bg-muted/5 px-4 py-3 text-sm">
<div className="space-y-2">
{(() => {
const toolResultsMap = new Map<string | null, UnifiedMessage[]>();
group.messages.forEach(msg => {
if (msg.type === 'tool') {
const meta = safeJsonParse<ParsedMetadata>(msg.metadata, {});
const assistantId = meta.assistant_message_id || null;
if (!toolResultsMap.has(assistantId)) {
toolResultsMap.set(assistantId, []);
}
toolResultsMap.get(assistantId)?.push(msg);
}
});
const renderedToolResultIds = new Set<string>();
const elements: React.ReactNode[] = [];
group.messages.forEach((message, msgIndex) => {
{/* --- START: Updated Assistant Message Rendering --- */}
{group.messages.map((message, msgIndex) => {
if (message.type === 'assistant') {
const parsedContent = safeJsonParse<ParsedContent>(message.content, {});
const parsedContent = safeJsonParse<ParsedContent>(message.content, { content: '' });
const msgKey = message.message_id || `submsg-assistant-${msgIndex}`;
if (!parsedContent.content) return;
// Don't render empty non-streaming messages
if (!parsedContent.content && message.message_id !== streamingMessageId) return null;
// Render the content using Markdown, which will now update incrementally
const renderedContent = renderMarkdownContent(
parsedContent.content,
parsedContent.content || '', // Pass the accumulated content
handleToolClick,
message.message_id,
handleOpenFileViewer
);
elements.push(
<div key={msgKey} className={msgIndex > 0 ? "mt-2" : ""}>
// Check if this is the message currently being streamed
const isStreamingThisMessage = message.message_id === streamingMessageId;
return (
<div key={msgKey} className={msgIndex > 0 && group.messages[msgIndex-1]?.type === 'assistant' ? "mt-2" : ""}>
<div className="prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3">
{renderedContent}
{/* Add the blinking cursor ONLY to the currently streaming message */}
{isStreamingThisMessage && (streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && (
<span className="inline-block h-4 w-0.5 bg-primary ml-0.5 -mb-1 animate-pulse" />
)}
</div>
</div>
);
}
});
// Skip rendering tool messages directly here, they are handled via tool buttons inside assistant messages
return null;
})}
{/* --- END: Updated Assistant Message Rendering --- */}
return elements;
})()}
{groupIndex === groupedMessages.length - 1 && (streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && (
<div className="mt-2">
{(() => {
let detectedTag: string | null = null;
let tagStartIndex = -1;
if (streamingTextContent) {
for (const tag of HIDE_STREAMING_XML_TAGS) {
const openingTagPattern = `<${tag}`;
const index = streamingTextContent.indexOf(openingTagPattern);
if (index !== -1) {
detectedTag = tag;
tagStartIndex = index;
break;
}
}
}
const textToRender = streamingTextContent || '';
const textBeforeTag = detectedTag ? textToRender.substring(0, tagStartIndex) : textToRender;
const showCursor = (streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && !detectedTag;
return (
<>
{textBeforeTag && (
<Markdown className="text-sm prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3">{textBeforeTag}</Markdown>
)}
{showCursor && (
<span className="inline-block h-4 w-0.5 bg-primary ml-0.5 -mb-1 animate-pulse" />
)}
{detectedTag && (
<div className="mt-2 mb-1">
<button
className="inline-flex items-center gap-1.5 py-1 px-2.5 text-xs font-medium text-primary bg-primary/10 hover:bg-primary/20 rounded-md transition-colors cursor-pointer border border-primary/20"
>
<CircleDashed className="h-3.5 w-3.5 text-primary flex-shrink-0 animate-spin animation-duration-2000" />
<span className="font-mono text-xs text-primary">{detectedTag}</span>
</button>
</div>
)}
{streamingToolCall && !detectedTag && (
{/* Render streaming tool call indicator if needed (e.g., <execute-command>...) */}
{groupIndex === groupedMessages.length - 1 && streamingToolCall && (streamHookStatus === 'streaming' || streamHookStatus === 'connecting') && (
<div className="mt-2 mb-1">
{(() => {
const toolName = streamingToolCall.name || streamingToolCall.xml_tag_name || 'Tool';
const IconComponent = getToolIcon(toolName);
const paramDisplay = extractPrimaryParam(toolName, streamingToolCall.arguments || '');
// Don't render the streaming tool indicator if the text content is already showing it
const hideToolIndicator = streamingMessageId && HIDE_STREAMING_XML_TAGS.has(toolName);
if (hideToolIndicator) return null;
return (
<button
className="inline-flex items-center gap-1.5 py-1 px-2.5 text-xs font-medium text-primary bg-primary/10 hover:bg-primary/20 rounded-md transition-colors cursor-pointer border border-primary/20"
@ -1546,11 +1500,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
})()}
</div>
)}
</>
);
})()}
</div>
)}
</div>
</div>
</div>
@ -1562,6 +1512,7 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
});
})()}
{(agentStatus === 'running' || agentStatus === 'connecting') &&
!streamingMessageId && // Only show thinking indicator if not actively streaming text
(messages.length === 0 || messages[messages.length - 1].type === 'user') && (
<div ref={latestMessageRef}>
<div className="flex items-start gap-3">
@ -1597,6 +1548,14 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
"mx-auto",
isMobile ? "w-full px-4" : "max-w-3xl"
)}>
{showScrollButton && (
<button
onClick={handleScrollButtonClick}
className="absolute bottom-24 right-6 z-20 flex h-8 w-8 items-center justify-center rounded-full bg-primary text-primary-foreground shadow-md transition-opacity hover:bg-primary/90 focus:outline-none focus:ring-2 focus:ring-primary focus:ring-offset-2"
>
<ArrowDown className="h-4 w-4" />
</button>
)}
<ChatInput
value={newMessage}
onChange={setNewMessage}
@ -1618,7 +1577,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
onClose={() => {
setIsSidePanelOpen(false);
userClosedPanelRef.current = true;
setAutoOpenedPanel(true);
}}
toolCalls={toolCalls}
messages={messages as ApiMessageType[]}

View File

@ -221,6 +221,8 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const messagesLoadedRef = useRef(false);
const agentRunsCheckedRef = useRef(false);
const [streamingTextContent, setStreamingTextContent] = useState("");
const handleProjectRenamed = useCallback((newName: string) => {
setProjectName(newName);
}, []);
@ -569,7 +571,6 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
const {
status: streamHookStatus,
textContent: streamingTextContent,
toolCall: streamingToolCall,
error: streamError,
agentRunId: currentHookRunId,
@ -577,6 +578,10 @@ export default function ThreadPage({ params }: { params: Promise<ThreadParams> }
stopStreaming,
} = useAgentStream({
onMessage: handleNewMessageFromStream,
onAssistantStart: () => {},
onAssistantChunk: ({ content }) => {
setStreamingTextContent(prev => prev + content);
},
onStatusChange: handleStreamStatusChange,
onError: handleStreamError,
onClose: handleStreamClose,

View File

@ -24,7 +24,6 @@ interface ApiMessageType {
// Define the structure returned by the hook
export interface UseAgentStreamResult {
status: string;
textContent: string;
toolCall: ParsedContent | null;
error: string | null;
agentRunId: string | null; // Expose the currently managed agentRunId
@ -34,7 +33,9 @@ export interface UseAgentStreamResult {
// Define the callbacks the hook consumer can provide
export interface AgentStreamCallbacks {
onMessage: (message: UnifiedMessage) => void; // Callback for complete messages
onMessage: (message: UnifiedMessage) => void; // For complete messages/non-assistant
onAssistantChunk: (chunk: { content: string; message_id: string | null }) => void; // NEW: Callback for assistant chunks
onAssistantStart: (initialMessage: UnifiedMessage) => void; // NEW: Callback when assistant stream starts
onStatusChange?: (status: string) => void; // Optional: Notify on internal status changes
onError?: (error: string) => void; // Optional: Notify on errors
onClose?: (finalStatus: string) => void; // Optional: Notify when streaming definitively ends
@ -59,7 +60,6 @@ const mapApiMessagesToUnified = (messagesData: ApiMessageType[] | null | undefin
export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string, setMessages: (messages: UnifiedMessage[]) => void): UseAgentStreamResult {
const [agentRunId, setAgentRunId] = useState<string | null>(null);
const [status, setStatus] = useState<string>('idle');
const [textContent, setTextContent] = useState<string>('');
const [toolCall, setToolCall] = useState<ParsedContent | null>(null);
const [error, setError] = useState<string | null>(null);
@ -68,6 +68,7 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
const currentRunIdRef = useRef<string | null>(null); // Ref to track the run ID being processed
const threadIdRef = useRef(threadId); // Ref to hold the current threadId
const setMessagesRef = useRef(setMessages); // Ref to hold the setMessages function
const assistantMessageIdRef = useRef<string | null>(null); // Track the ID of the message being streamed
// Update refs if threadId or setMessages changes
useEffect(() => {
@ -117,8 +118,8 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
}
// Reset streaming-specific state
setTextContent('');
setToolCall(null);
assistantMessageIdRef.current = null; // Ensure reset on finalize
// Update status and clear run ID
updateStatus(finalStatus);
@ -195,23 +196,59 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
switch (message.type) {
case 'assistant':
if (parsedMetadata.stream_status === 'chunk' && parsedContent.content) {
setTextContent(prev => prev + parsedContent.content);
if (assistantMessageIdRef.current === null) {
const tempId = message.message_id || `streaming-${Date.now()}`;
assistantMessageIdRef.current = tempId;
const initialMessage: UnifiedMessage = {
...message,
message_id: tempId,
content: JSON.stringify({ role: 'assistant', content: '' }),
metadata: JSON.stringify({ ...parsedMetadata, stream_status: 'streaming' })
};
console.log('[useAgentStream] Assistant stream started, calling onAssistantStart with ID:', tempId);
callbacks.onAssistantStart(initialMessage);
}
callbacks.onAssistantChunk({
content: parsedContent.content,
message_id: assistantMessageIdRef.current
});
} else if (parsedMetadata.stream_status === 'complete') {
setTextContent('');
console.log('[useAgentStream] Received complete assistant message ID:', message.message_id);
setToolCall(null);
if (message.message_id) callbacks.onMessage(message);
if (message.message_id) {
callbacks.onMessage(message);
} else if (assistantMessageIdRef.current) {
console.warn('[useAgentStream] Complete message missing ID, using tracked ID:', assistantMessageIdRef.current);
callbacks.onMessage({ ...message, message_id: assistantMessageIdRef.current });
} else {
console.error('[useAgentStream] Received complete assistant message without an ID and no ID was tracked.');
}
assistantMessageIdRef.current = null;
} else if (!parsedMetadata.stream_status) {
// Handle non-chunked assistant messages if needed
assistantMessageIdRef.current = null;
console.log('[useAgentStream] Received non-chunked assistant message ID:', message.message_id);
if (message.message_id) callbacks.onMessage(message);
}
break;
case 'tool':
setToolCall(null); // Clear any streaming tool call
assistantMessageIdRef.current = null;
setToolCall(null);
console.log('[useAgentStream] Received tool message ID:', message.message_id);
if (message.message_id) callbacks.onMessage(message);
break;
case 'status':
if (parsedContent.status_type === 'thread_run_end' || parsedContent.status_type === 'finish' || parsedContent.status_type === 'error') {
if(assistantMessageIdRef.current) {
console.log(`[useAgentStream] Resetting assistantMessageIdRef due to status: ${parsedContent.status_type}`);
assistantMessageIdRef.current = null;
}
}
switch (parsedContent.status_type) {
case 'tool_started':
if(assistantMessageIdRef.current) {
console.log('[useAgentStream] Resetting assistantMessageIdRef due to tool_started');
assistantMessageIdRef.current = null;
}
setToolCall({
role: 'assistant',
status_type: 'tool_started',
@ -232,30 +269,27 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
console.log('[useAgentStream] Received thread run end status, finalizing.');
break;
case 'finish':
// Optional: Handle finish reasons like 'xml_tool_limit_reached'
console.log('[useAgentStream] Received finish status:', parsedContent.finish_reason);
// Don't finalize here, wait for thread_run_end or completion message
break;
case 'error':
console.error('[useAgentStream] Received error status message:', parsedContent.message);
setError(parsedContent.message || 'Agent run failed');
finalizeStream('error', currentRunIdRef.current);
break;
// Ignore thread_run_start, assistant_response_start etc. for now
default:
// console.debug('[useAgentStream] Received unhandled status type:', parsedContent.status_type);
break;
}
break;
case 'user':
case 'system':
// Handle other message types if necessary, e.g., if backend sends historical context
assistantMessageIdRef.current = null;
if (message.message_id) callbacks.onMessage(message);
break;
default:
assistantMessageIdRef.current = null;
console.warn('[useAgentStream] Unhandled message type:', message.type);
}
}, [threadId, setMessages, status, toolCall, callbacks, finalizeStream, updateStatus]);
}, [status, toolCall, callbacks, finalizeStream, updateStatus]);
const handleStreamError = useCallback((err: Error | string | Event) => {
if (!isMountedRef.current) return;
@ -392,11 +426,11 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
}
// Reset state on unmount if needed, though finalizeStream should handle most cases
setStatus('idle');
setTextContent('');
setToolCall(null);
setError(null);
setAgentRunId(null);
currentRunIdRef.current = null;
assistantMessageIdRef.current = null; // Reset assistant tracking on unmount
};
}, []); // Empty dependency array for mount/unmount effect
@ -414,9 +448,9 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
}
// Reset state before starting
setTextContent('');
setToolCall(null);
setError(null);
assistantMessageIdRef.current = null; // Reset assistant tracking
updateStatus('connecting');
setAgentRunId(runId);
currentRunIdRef.current = runId; // Set the ref immediately
@ -481,7 +515,6 @@ export function useAgentStream(callbacks: AgentStreamCallbacks, threadId: string
return {
status,
textContent,
toolCall,
error,
agentRunId,