From 789b22fe1ef770a303ff7be164fdfc00d03f72fd Mon Sep 17 00:00:00 2001 From: dal Date: Mon, 10 Feb 2025 07:30:52 -0700 Subject: [PATCH] Improve message streaming and tool call processing - Enhance message streaming with more precise content and tool call handling - Add logic to only send and store meaningful assistant messages and tool calls - Prevent sending empty or redundant messages during stream processing - Improve tool call and content update tracking in agent stream method - Optimize message inclusion in recursive thread generation --- api/src/utils/agent/agent.rs | 70 ++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 22 deletions(-) diff --git a/api/src/utils/agent/agent.rs b/api/src/utils/agent/agent.rs index 243b3bbbf..d81e946e7 100644 --- a/api/src/utils/agent/agent.rs +++ b/api/src/utils/agent/agent.rs @@ -351,6 +351,10 @@ impl Agent { if let Some(pending) = current_pending_tool.take() { let tool_call = pending.into_tool_call(); + // Create and preserve the assistant message with the tool call + let assistant_tool_message = Message::assistant(None, Some(vec![tool_call.clone()])); + let _ = tx.send(Ok(assistant_tool_message.clone())).await; + // Execute the tool if let Some(tool) = tools_ref.get(&tool_call.function.name) { match tool.execute(&tool_call).await { @@ -358,12 +362,18 @@ impl Agent { let result_str = serde_json::to_string(&result)?; let tool_result = Message::tool(result_str, tool_call.id.clone()); let _ = tx.send(Ok(tool_result.clone())).await; + + // Store both the assistant tool message and the tool result + tool_results.push(assistant_tool_message); tool_results.push(tool_result); } Err(e) => { let error_msg = format!("Tool execution failed: {:?}", e); let tool_error = Message::tool(error_msg, tool_call.id.clone()); let _ = tx.send(Ok(tool_error.clone())).await; + + // Store both the assistant tool message and the error + tool_results.push(assistant_tool_message); tool_results.push(tool_error); } } @@ -373,28 +383,29 @@ impl Agent { } } - // Handle role changes + // Handle role changes - just update internal state, don't send message if let Some(role) = &delta.role { match role.as_str() { "assistant" => { current_message = Message::assistant(Some(String::new()), None); - let _ = tx.send(Ok(current_message.clone())).await; } _ => continue, } } - // Handle content updates + // Handle content updates - only send if we have actual content if let Some(content) = &delta.content { - if let Message::Assistant { content: msg_content, .. } = &mut current_message { - if let Some(existing) = msg_content { - existing.push_str(content); + if !content.trim().is_empty() { + if let Message::Assistant { content: msg_content, .. } = &mut current_message { + if let Some(existing) = msg_content { + existing.push_str(content); + } } + let _ = tx.send(Ok(Message::assistant(Some(content.clone()), None))).await; } - let _ = tx.send(Ok(Message::assistant(Some(content.clone()), None))).await; } - // Handle tool calls + // Handle tool calls - only send when we have meaningful tool call data if let Some(tool_calls) = &delta.tool_calls { has_tool_calls = true; @@ -406,20 +417,22 @@ impl Agent { for tool_call in tool_calls { pending.update_from_delta(tool_call); - // Send intermediate updates about the tool call + // Only send intermediate updates if we have a function name if let Some(name) = &pending.function_name { - let temp_tool_call = ToolCall { - id: pending.id.clone().unwrap_or_default(), - function: FunctionCall { - name: name.clone(), - arguments: pending.arguments.clone(), - }, - call_type: pending.call_type.clone().unwrap_or_default(), - code_interpreter: None, - retrieval: None, - }; - - let _ = tx.send(Ok(Message::assistant(None, Some(vec![temp_tool_call])))).await; + if !pending.arguments.trim().is_empty() { + let temp_tool_call = ToolCall { + id: pending.id.clone().unwrap_or_default(), + function: FunctionCall { + name: name.clone(), + arguments: pending.arguments.clone(), + }, + call_type: pending.call_type.clone().unwrap_or_default(), + code_interpreter: None, + retrieval: None, + }; + + let _ = tx.send(Ok(Message::assistant(None, Some(vec![temp_tool_call])))).await; + } } } } @@ -434,12 +447,25 @@ impl Agent { // If we didn't get any tool calls in the auto response, we're done if !has_tool_calls { + // Only include current_message in the thread if it has content + if let Message::Assistant { content: Some(content), .. } = ¤t_message { + if !content.trim().is_empty() { + let mut new_thread = thread.clone(); + new_thread.messages.push(current_message); + return Ok(()); + } + } return Ok(()); } // Create new thread with tool results and recurse let mut new_thread = thread.clone(); - new_thread.messages.push(current_message); + // Only include current_message if it has content + if let Message::Assistant { content: Some(content), .. } = ¤t_message { + if !content.trim().is_empty() { + new_thread.messages.push(current_message); + } + } new_thread.messages.extend(tool_results); // Recurse with new thread