From 35de7f1b40397497ddea86f962551aeafb804733 Mon Sep 17 00:00:00 2001 From: sharath <29162020+tnfssc@users.noreply.github.com> Date: Thu, 15 May 2025 23:28:28 +0000 Subject: [PATCH] fix(stream): redis latency issue mitigated --- backend/agent/api.py | 4 ++-- backend/agentpress/response_processor.py | 4 ++++ frontend/src/components/thread/types.ts | 1 + frontend/src/hooks/useAgentStream.ts | 29 +++++++++++++++++------- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/backend/agent/api.py b/backend/agent/api.py index 618b2b0d..525c394d 100644 --- a/backend/agent/api.py +++ b/backend/agent/api.py @@ -744,8 +744,8 @@ async def run_agent_background( # Store response in Redis list and publish notification response_json = json.dumps(response) - await redis.rpush(response_list_key, response_json) - await redis.publish(response_channel, "new") + asyncio.create_task(redis.rpush(response_list_key, response_json)) + asyncio.create_task(redis.publish(response_channel, "new")) total_responses += 1 # Check for agent-signaled completion or error diff --git a/backend/agentpress/response_processor.py b/backend/agentpress/response_processor.py index 78c95af5..ea6e028a 100644 --- a/backend/agentpress/response_processor.py +++ b/backend/agentpress/response_processor.py @@ -147,6 +147,8 @@ class ResponseProcessor: if assist_start_msg_obj: yield assist_start_msg_obj # --- End Start Events --- + __sequence = 0 + async for chunk in llm_response: if hasattr(chunk, 'choices') and chunk.choices and hasattr(chunk.choices[0], 'finish_reason') and chunk.choices[0].finish_reason: finish_reason = chunk.choices[0].finish_reason @@ -175,12 +177,14 @@ class ResponseProcessor: # Yield ONLY content chunk (don't save) now_chunk = datetime.now(timezone.utc).isoformat() yield { + "sequence": __sequence, "message_id": None, "thread_id": thread_id, "type": "assistant", "is_llm_message": True, "content": json.dumps({"role": "assistant", "content": chunk_content}), "metadata": json.dumps({"stream_status": "chunk", "thread_run_id": thread_run_id}), "created_at": now_chunk, "updated_at": now_chunk } + __sequence += 1 else: logger.info("XML tool call limit reached - not yielding more content chunks") diff --git a/frontend/src/components/thread/types.ts b/frontend/src/components/thread/types.ts index 8aee81f9..eeb3053a 100644 --- a/frontend/src/components/thread/types.ts +++ b/frontend/src/components/thread/types.ts @@ -9,6 +9,7 @@ export type ThreadParams = { // Unified Message Interface matching the backend/database schema export interface UnifiedMessage { + sequence?: number; message_id: string | null; // Can be null for transient stream events (chunks, unsaved statuses) thread_id: string; type: 'user' | 'assistant' | 'tool' | 'system' | 'status' | 'browser_state'; // Add 'system' if used diff --git a/frontend/src/hooks/useAgentStream.ts b/frontend/src/hooks/useAgentStream.ts index 74406cb0..19c94645 100644 --- a/frontend/src/hooks/useAgentStream.ts +++ b/frontend/src/hooks/useAgentStream.ts @@ -1,4 +1,4 @@ -import { useState, useEffect, useRef, useCallback } from 'react'; +import { useState, useEffect, useRef, useCallback, useMemo } from 'react'; import { streamAgent, getAgentStatus, @@ -72,7 +72,9 @@ export function useAgentStream( ): UseAgentStreamResult { const [agentRunId, setAgentRunId] = useState(null); const [status, setStatus] = useState('idle'); - const [textContent, setTextContent] = useState(''); + const [textContent, setTextContent] = useState< + { content: string; sequence?: number }[] + >([]); const [toolCall, setToolCall] = useState(null); const [error, setError] = useState(null); @@ -82,6 +84,12 @@ export function useAgentStream( const threadIdRef = useRef(threadId); // Ref to hold the current threadId const setMessagesRef = useRef(setMessages); // Ref to hold the setMessages function + const orderedTextContent = useMemo(() => { + return textContent + .sort((a, b) => a.sequence - b.sequence) + .reduce((acc, curr) => acc + curr.content, ''); + }, [textContent]); + // Update refs if threadId or setMessages changes useEffect(() => { threadIdRef.current = threadId; @@ -148,7 +156,7 @@ export function useAgentStream( } // Reset streaming-specific state - setTextContent(''); + setTextContent([]); setToolCall(null); // Update status and clear run ID @@ -292,10 +300,15 @@ export function useAgentStream( parsedMetadata.stream_status === 'chunk' && parsedContent.content ) { - setTextContent((prev) => prev + parsedContent.content); + setTextContent((prev) => { + return prev.concat({ + sequence: message.sequence, + content: parsedContent.content, + }); + }); callbacks.onAssistantChunk?.({ content: parsedContent.content }); } else if (parsedMetadata.stream_status === 'complete') { - setTextContent(''); + setTextContent([]); setToolCall(null); if (message.message_id) callbacks.onMessage(message); } else if (!parsedMetadata.stream_status) { @@ -501,7 +514,7 @@ export function useAgentStream( } // Reset state on unmount if needed, though finalizeStream should handle most cases setStatus('idle'); - setTextContent(''); + setTextContent([]); setToolCall(null); setError(null); setAgentRunId(null); @@ -528,7 +541,7 @@ export function useAgentStream( } // Reset state before starting - setTextContent(''); + setTextContent([]); setToolCall(null); setError(null); updateStatus('connecting'); @@ -616,7 +629,7 @@ export function useAgentStream( return { status, - textContent, + textContent: orderedTextContent, toolCall, error, agentRunId,