improve streaming

This commit is contained in:
marko-kraemer 2025-08-27 15:48:44 -07:00
parent 5c8a82ffc4
commit 6dbc63baee
3 changed files with 92 additions and 26 deletions

View File

@ -0,0 +1,22 @@
import React from 'react';
import { ComposioUrlDetector } from './composio-url-detector';
interface StreamingTextProps {
content: string;
className?: string;
}
export const StreamingText: React.FC<StreamingTextProps> = ({
content,
className = "text-sm prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3 break-words overflow-wrap-anywhere"
}) => {
if (!content) {
return null;
}
return (
<div className="prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3 break-words overflow-hidden">
<ComposioUrlDetector content={content} className={className} />
</div>
);
};

View File

@ -17,6 +17,7 @@ import { AgentAvatar, AgentName } from './agent-avatar';
import { parseXmlToolCalls, isNewXmlFormat } from '@/components/thread/tool-views/xml-parser';
import { ShowToolStream } from './ShowToolStream';
import { ComposioUrlDetector } from './composio-url-detector';
import { StreamingText } from './StreamingText';
import { HIDE_STREAMING_XML_TAGS } from '@/components/thread/utils';
@ -910,12 +911,10 @@ export const ThreadContent: React.FC<ThreadContentProps> = ({
return (
<>
{textBeforeTag && (
<ComposioUrlDetector content={textBeforeTag} className="text-sm prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3 break-words overflow-wrap-anywhere" />
)}
{showCursor && (
<span className="inline-block h-4 w-0.5 bg-primary ml-0.5 -mb-1 animate-pulse" />
)}
<StreamingText
content={textBeforeTag}
className="text-sm prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3 break-words overflow-wrap-anywhere"
/>
{detectedTag && (
<ShowToolStream
@ -926,8 +925,6 @@ export const ThreadContent: React.FC<ThreadContentProps> = ({
startTime={Date.now()}
/>
)}
</>
);
})()}
@ -970,15 +967,16 @@ export const ThreadContent: React.FC<ThreadContentProps> = ({
{debugMode && streamingText ? (
<pre className="text-xs font-mono whitespace-pre-wrap overflow-x-auto p-2 border border-border rounded-md bg-muted/30">
{streamingText}
{showCursor && (
<span className="inline-block h-4 w-0.5 bg-primary ml-0.5 -mb-1 animate-pulse duration-1000" />
)}
</pre>
) : (
<>
{textBeforeTag && (
<ComposioUrlDetector content={textBeforeTag} className="text-sm prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3 break-words overflow-wrap-anywhere" />
)}
{showCursor && (
<span className="inline-block h-4 w-0.5 bg-primary ml-0.5 -mb-1 animate-pulse" />
)}
<StreamingText
content={textBeforeTag}
className="text-sm prose prose-sm dark:prose-invert chat-markdown max-w-none [&>:first-child]:mt-0 prose-headings:mt-3 break-words overflow-wrap-anywhere"
/>
{detectedTag && (
<ShowToolStream
@ -1070,9 +1068,9 @@ export const ThreadContent: React.FC<ThreadContentProps> = ({
{/* Streaming indicator content */}
<div className="max-w-[90%] px-4 py-3 text-sm">
<div className="flex items-center gap-1.5 py-1">
<div className="h-1.5 w-1.5 rounded-full bg-primary/50 animate-pulse" />
<div className="h-1.5 w-1.5 rounded-full bg-primary/50 animate-pulse delay-150" />
<div className="h-1.5 w-1.5 rounded-full bg-primary/50 animate-pulse delay-300" />
<div className="h-1.5 w-1.5 rounded-full bg-primary/50 animate-pulse duration-1000" />
<div className="h-1.5 w-1.5 rounded-full bg-primary/50 animate-pulse duration-1000 delay-150" />
<div className="h-1.5 w-1.5 rounded-full bg-primary/50 animate-pulse duration-1000 delay-300" />
</div>
</div>
</div>

View File

@ -1,4 +1,4 @@
import { useState, useEffect, useRef, useCallback, useMemo } from 'react';
import React, { useState, useEffect, useRef, useCallback, useMemo } from 'react';
import { useQueryClient } from '@tanstack/react-query';
import {
streamAgent,
@ -90,6 +90,34 @@ export function useAgentStream(
const [textContent, setTextContent] = useState<
{ content: string; sequence?: number }[]
>([]);
// Add throttled state updates for smoother streaming
const throttleRef = useRef<NodeJS.Timeout | null>(null);
const pendingContentRef = useRef<{ content: string; sequence?: number }[]>([]);
// Throttled content update function for smoother streaming
const flushPendingContent = useCallback(() => {
if (pendingContentRef.current.length > 0) {
const newContent = [...pendingContentRef.current];
pendingContentRef.current = [];
React.startTransition(() => {
setTextContent((prev) => [...prev, ...newContent]);
});
}
}, []);
const addContentThrottled = useCallback((content: { content: string; sequence?: number }) => {
pendingContentRef.current.push(content);
// Clear existing throttle
if (throttleRef.current) {
clearTimeout(throttleRef.current);
}
// Set new throttle for smooth updates (16ms ≈ 60fps)
throttleRef.current = setTimeout(flushPendingContent, 16);
}, [flushPendingContent]);
const [toolCall, setToolCall] = useState<ParsedContent | null>(null);
const [error, setError] = useState<string | null>(null);
const [agentRunId, setAgentRunId] = useState<string | null>(null);
@ -101,9 +129,16 @@ export function useAgentStream(
const setMessagesRef = useRef(setMessages); // Ref to hold the setMessages function
const orderedTextContent = useMemo(() => {
return textContent
.sort((a, b) => (a.sequence ?? 0) - (b.sequence ?? 0))
.reduce((acc, curr) => acc + curr.content, '');
// Use a more efficient approach for streaming performance
if (textContent.length === 0) return '';
// Sort once and concatenate efficiently
const sorted = textContent.slice().sort((a, b) => (a.sequence ?? 0) - (b.sequence ?? 0));
let result = '';
for (let i = 0; i < sorted.length; i++) {
result += sorted[i].content;
}
return result;
}, [textContent]);
// Refs to capture current state for persistence
@ -341,14 +376,16 @@ export function useAgentStream(
parsedMetadata.stream_status === 'chunk' &&
parsedContent.content
) {
setTextContent((prev) => {
return prev.concat({
sequence: message.sequence,
content: parsedContent.content,
});
// Use throttled approach for smoother streaming
addContentThrottled({
sequence: message.sequence,
content: parsedContent.content,
});
callbacks.onAssistantChunk?.({ content: parsedContent.content });
} else if (parsedMetadata.stream_status === 'complete') {
// Flush any pending content before completing
flushPendingContent();
setTextContent([]);
setToolCall(null);
if (message.message_id) callbacks.onMessage(message);
@ -554,6 +591,15 @@ export function useAgentStream(
return () => {
isMountedRef.current = false;
// Clean up throttle timeout
if (throttleRef.current) {
clearTimeout(throttleRef.current);
throttleRef.current = null;
}
// Flush any remaining pending content
flushPendingContent();
// Don't automatically cleanup streams on navigation
// Only set mounted flag to false to prevent new operations
// Streams will be cleaned up when they naturally complete or on explicit stop