fix(stream): redis latency issue mitigated

This commit is contained in:
sharath 2025-05-15 23:28:28 +00:00
parent 90a284a14f
commit 35de7f1b40
No known key found for this signature in database
4 changed files with 28 additions and 10 deletions

View File

@ -744,8 +744,8 @@ async def run_agent_background(
# Store response in Redis list and publish notification # Store response in Redis list and publish notification
response_json = json.dumps(response) response_json = json.dumps(response)
await redis.rpush(response_list_key, response_json) asyncio.create_task(redis.rpush(response_list_key, response_json))
await redis.publish(response_channel, "new") asyncio.create_task(redis.publish(response_channel, "new"))
total_responses += 1 total_responses += 1
# Check for agent-signaled completion or error # Check for agent-signaled completion or error

View File

@ -147,6 +147,8 @@ class ResponseProcessor:
if assist_start_msg_obj: yield assist_start_msg_obj if assist_start_msg_obj: yield assist_start_msg_obj
# --- End Start Events --- # --- End Start Events ---
__sequence = 0
async for chunk in llm_response: async for chunk in llm_response:
if hasattr(chunk, 'choices') and chunk.choices and hasattr(chunk.choices[0], 'finish_reason') and chunk.choices[0].finish_reason: if hasattr(chunk, 'choices') and chunk.choices and hasattr(chunk.choices[0], 'finish_reason') and chunk.choices[0].finish_reason:
finish_reason = chunk.choices[0].finish_reason finish_reason = chunk.choices[0].finish_reason
@ -175,12 +177,14 @@ class ResponseProcessor:
# Yield ONLY content chunk (don't save) # Yield ONLY content chunk (don't save)
now_chunk = datetime.now(timezone.utc).isoformat() now_chunk = datetime.now(timezone.utc).isoformat()
yield { yield {
"sequence": __sequence,
"message_id": None, "thread_id": thread_id, "type": "assistant", "message_id": None, "thread_id": thread_id, "type": "assistant",
"is_llm_message": True, "is_llm_message": True,
"content": json.dumps({"role": "assistant", "content": chunk_content}), "content": json.dumps({"role": "assistant", "content": chunk_content}),
"metadata": json.dumps({"stream_status": "chunk", "thread_run_id": thread_run_id}), "metadata": json.dumps({"stream_status": "chunk", "thread_run_id": thread_run_id}),
"created_at": now_chunk, "updated_at": now_chunk "created_at": now_chunk, "updated_at": now_chunk
} }
__sequence += 1
else: else:
logger.info("XML tool call limit reached - not yielding more content chunks") logger.info("XML tool call limit reached - not yielding more content chunks")

View File

@ -9,6 +9,7 @@ export type ThreadParams = {
// Unified Message Interface matching the backend/database schema // Unified Message Interface matching the backend/database schema
export interface UnifiedMessage { export interface UnifiedMessage {
sequence?: number;
message_id: string | null; // Can be null for transient stream events (chunks, unsaved statuses) message_id: string | null; // Can be null for transient stream events (chunks, unsaved statuses)
thread_id: string; thread_id: string;
type: 'user' | 'assistant' | 'tool' | 'system' | 'status' | 'browser_state'; // Add 'system' if used type: 'user' | 'assistant' | 'tool' | 'system' | 'status' | 'browser_state'; // Add 'system' if used

View File

@ -1,4 +1,4 @@
import { useState, useEffect, useRef, useCallback } from 'react'; import { useState, useEffect, useRef, useCallback, useMemo } from 'react';
import { import {
streamAgent, streamAgent,
getAgentStatus, getAgentStatus,
@ -72,7 +72,9 @@ export function useAgentStream(
): UseAgentStreamResult { ): UseAgentStreamResult {
const [agentRunId, setAgentRunId] = useState<string | null>(null); const [agentRunId, setAgentRunId] = useState<string | null>(null);
const [status, setStatus] = useState<string>('idle'); const [status, setStatus] = useState<string>('idle');
const [textContent, setTextContent] = useState<string>(''); const [textContent, setTextContent] = useState<
{ content: string; sequence?: number }[]
>([]);
const [toolCall, setToolCall] = useState<ParsedContent | null>(null); const [toolCall, setToolCall] = useState<ParsedContent | null>(null);
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
@ -82,6 +84,12 @@ export function useAgentStream(
const threadIdRef = useRef(threadId); // Ref to hold the current threadId const threadIdRef = useRef(threadId); // Ref to hold the current threadId
const setMessagesRef = useRef(setMessages); // Ref to hold the setMessages function const setMessagesRef = useRef(setMessages); // Ref to hold the setMessages function
const orderedTextContent = useMemo(() => {
return textContent
.sort((a, b) => a.sequence - b.sequence)
.reduce((acc, curr) => acc + curr.content, '');
}, [textContent]);
// Update refs if threadId or setMessages changes // Update refs if threadId or setMessages changes
useEffect(() => { useEffect(() => {
threadIdRef.current = threadId; threadIdRef.current = threadId;
@ -148,7 +156,7 @@ export function useAgentStream(
} }
// Reset streaming-specific state // Reset streaming-specific state
setTextContent(''); setTextContent([]);
setToolCall(null); setToolCall(null);
// Update status and clear run ID // Update status and clear run ID
@ -292,10 +300,15 @@ export function useAgentStream(
parsedMetadata.stream_status === 'chunk' && parsedMetadata.stream_status === 'chunk' &&
parsedContent.content parsedContent.content
) { ) {
setTextContent((prev) => prev + parsedContent.content); setTextContent((prev) => {
return prev.concat({
sequence: message.sequence,
content: parsedContent.content,
});
});
callbacks.onAssistantChunk?.({ content: parsedContent.content }); callbacks.onAssistantChunk?.({ content: parsedContent.content });
} else if (parsedMetadata.stream_status === 'complete') { } else if (parsedMetadata.stream_status === 'complete') {
setTextContent(''); setTextContent([]);
setToolCall(null); setToolCall(null);
if (message.message_id) callbacks.onMessage(message); if (message.message_id) callbacks.onMessage(message);
} else if (!parsedMetadata.stream_status) { } else if (!parsedMetadata.stream_status) {
@ -501,7 +514,7 @@ export function useAgentStream(
} }
// Reset state on unmount if needed, though finalizeStream should handle most cases // Reset state on unmount if needed, though finalizeStream should handle most cases
setStatus('idle'); setStatus('idle');
setTextContent(''); setTextContent([]);
setToolCall(null); setToolCall(null);
setError(null); setError(null);
setAgentRunId(null); setAgentRunId(null);
@ -528,7 +541,7 @@ export function useAgentStream(
} }
// Reset state before starting // Reset state before starting
setTextContent(''); setTextContent([]);
setToolCall(null); setToolCall(null);
setError(null); setError(null);
updateStatus('connecting'); updateStatus('connecting');
@ -616,7 +629,7 @@ export function useAgentStream(
return { return {
status, status,
textContent, textContent: orderedTextContent,
toolCall, toolCall,
error, error,
agentRunId, agentRunId,