Improve message streaming and tool call processing

- Enhance message streaming with more precise content and tool call handling
- Add logic to only send and store meaningful assistant messages and tool calls
- Prevent sending empty or redundant messages during stream processing
- Improve tool call and content update tracking in agent stream method
- Optimize message inclusion in recursive thread generation
This commit is contained in:
dal 2025-02-10 07:30:52 -07:00
parent 641527114c
commit 789b22fe1e
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 48 additions and 22 deletions

View File

@ -351,6 +351,10 @@ impl Agent {
if let Some(pending) = current_pending_tool.take() {
let tool_call = pending.into_tool_call();
// Create and preserve the assistant message with the tool call
let assistant_tool_message = Message::assistant(None, Some(vec![tool_call.clone()]));
let _ = tx.send(Ok(assistant_tool_message.clone())).await;
// Execute the tool
if let Some(tool) = tools_ref.get(&tool_call.function.name) {
match tool.execute(&tool_call).await {
@ -358,12 +362,18 @@ impl Agent {
let result_str = serde_json::to_string(&result)?;
let tool_result = Message::tool(result_str, tool_call.id.clone());
let _ = tx.send(Ok(tool_result.clone())).await;
// Store both the assistant tool message and the tool result
tool_results.push(assistant_tool_message);
tool_results.push(tool_result);
}
Err(e) => {
let error_msg = format!("Tool execution failed: {:?}", e);
let tool_error = Message::tool(error_msg, tool_call.id.clone());
let _ = tx.send(Ok(tool_error.clone())).await;
// Store both the assistant tool message and the error
tool_results.push(assistant_tool_message);
tool_results.push(tool_error);
}
}
@ -373,28 +383,29 @@ impl Agent {
}
}
// Handle role changes
// Handle role changes - just update internal state, don't send message
if let Some(role) = &delta.role {
match role.as_str() {
"assistant" => {
current_message = Message::assistant(Some(String::new()), None);
let _ = tx.send(Ok(current_message.clone())).await;
}
_ => continue,
}
}
// Handle content updates
// Handle content updates - only send if we have actual content
if let Some(content) = &delta.content {
if let Message::Assistant { content: msg_content, .. } = &mut current_message {
if let Some(existing) = msg_content {
existing.push_str(content);
if !content.trim().is_empty() {
if let Message::Assistant { content: msg_content, .. } = &mut current_message {
if let Some(existing) = msg_content {
existing.push_str(content);
}
}
let _ = tx.send(Ok(Message::assistant(Some(content.clone()), None))).await;
}
let _ = tx.send(Ok(Message::assistant(Some(content.clone()), None))).await;
}
// Handle tool calls
// Handle tool calls - only send when we have meaningful tool call data
if let Some(tool_calls) = &delta.tool_calls {
has_tool_calls = true;
@ -406,20 +417,22 @@ impl Agent {
for tool_call in tool_calls {
pending.update_from_delta(tool_call);
// Send intermediate updates about the tool call
// Only send intermediate updates if we have a function name
if let Some(name) = &pending.function_name {
let temp_tool_call = ToolCall {
id: pending.id.clone().unwrap_or_default(),
function: FunctionCall {
name: name.clone(),
arguments: pending.arguments.clone(),
},
call_type: pending.call_type.clone().unwrap_or_default(),
code_interpreter: None,
retrieval: None,
};
let _ = tx.send(Ok(Message::assistant(None, Some(vec![temp_tool_call])))).await;
if !pending.arguments.trim().is_empty() {
let temp_tool_call = ToolCall {
id: pending.id.clone().unwrap_or_default(),
function: FunctionCall {
name: name.clone(),
arguments: pending.arguments.clone(),
},
call_type: pending.call_type.clone().unwrap_or_default(),
code_interpreter: None,
retrieval: None,
};
let _ = tx.send(Ok(Message::assistant(None, Some(vec![temp_tool_call])))).await;
}
}
}
}
@ -434,12 +447,25 @@ impl Agent {
// If we didn't get any tool calls in the auto response, we're done
if !has_tool_calls {
// Only include current_message in the thread if it has content
if let Message::Assistant { content: Some(content), .. } = &current_message {
if !content.trim().is_empty() {
let mut new_thread = thread.clone();
new_thread.messages.push(current_message);
return Ok(());
}
}
return Ok(());
}
// Create new thread with tool results and recurse
let mut new_thread = thread.clone();
new_thread.messages.push(current_message);
// Only include current_message if it has content
if let Message::Assistant { content: Some(content), .. } = &current_message {
if !content.trim().is_empty() {
new_thread.messages.push(current_message);
}
}
new_thread.messages.extend(tool_results);
// Recurse with new thread