diff --git a/api/src/routes/ws/threads_and_messages/post_thread/agent_message_transformer.rs b/api/src/routes/ws/threads_and_messages/post_thread/agent_message_transformer.rs index e7ad3c54d..bce3091f3 100644 --- a/api/src/routes/ws/threads_and_messages/post_thread/agent_message_transformer.rs +++ b/api/src/routes/ws/threads_and_messages/post_thread/agent_message_transformer.rs @@ -8,12 +8,12 @@ use uuid::Uuid; use crate::utils::clients::ai::litellm::{Message, MessageProgress, ToolCall}; +use crate::utils::tools::file_tools::create_files::CreateFilesOutput; use crate::utils::tools::file_tools::file_types::file::FileEnum; use crate::utils::tools::file_tools::modify_files::ModifyFilesParams; use crate::utils::tools::file_tools::open_files::OpenFilesOutput; use crate::utils::tools::file_tools::search_data_catalog::SearchDataCatalogOutput; use crate::utils::tools::file_tools::search_files::SearchFilesOutput; -use crate::utils::tools::file_tools::create_files::CreateFilesOutput; struct StreamingParser { buffer: String, @@ -310,12 +310,14 @@ fn transform_text_message( message_chunk: Some(content), })]) } - MessageProgress::Complete => Ok(vec![BusterThreadMessage::ChatMessage(BusterChatMessage { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - message_type: "text".to_string(), - message: Some(content), - message_chunk: None, - })]), + MessageProgress::Complete => { + Ok(vec![BusterThreadMessage::ChatMessage(BusterChatMessage { + id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), + message_type: "text".to_string(), + message: Some(content), + message_chunk: None, + })]) + } _ => Err(anyhow::anyhow!("Unsupported message progress")), } } else { @@ -352,12 +354,12 @@ fn transform_assistant_tool_message( ) -> Result> { if let Some(tool_call) = tool_calls.first() { match tool_call.function.name.as_str() { - "search_data_catalog" => assistant_data_catalog_search(id, progress), - "stored_values_search" => assistant_stored_values_search(id, progress), - "search_files" => assistant_file_search(id, progress), + "search_data_catalog" => assistant_data_catalog_search(id, tool_calls, progress), + "stored_values_search" => assistant_stored_values_search(id, tool_calls, progress), + "search_files" => assistant_file_search(id, tool_calls, progress), "create_files" => assistant_create_file(id, tool_calls, progress), "modify_files" => assistant_modify_file(id, tool_calls, progress), - "open_files" => assistant_open_files(id, progress), + "open_files" => assistant_open_files(id, tool_calls, progress), _ => Err(anyhow::anyhow!("Unsupported tool name")), } } else { @@ -367,6 +369,7 @@ fn transform_assistant_tool_message( fn assistant_data_catalog_search( id: Option, + tool_calls: Vec, progress: Option, ) -> Result> { if let Some(progress) = progress { @@ -510,6 +513,7 @@ fn proccess_data_catalog_search_results( fn assistant_stored_values_search( id: Option, + tool_calls: Vec, progress: Option, ) -> Result> { if let Some(progress) = progress { @@ -562,6 +566,7 @@ fn tool_stored_values_search( fn assistant_file_search( id: Option, + tool_calls: Vec, progress: Option, ) -> Result> { if let Some(progress) = progress { @@ -685,6 +690,7 @@ fn process_file_search_results( fn assistant_open_files( id: Option, + tool_calls: Vec, progress: Option, ) -> Result> { if let Some(progress) = progress { @@ -862,7 +868,7 @@ fn tool_create_file( // Parse the content to get file information using CreateFilesOutput let create_files_result = serde_json::from_str::(&content)?; let mut messages = Vec::new(); - + for file in create_files_result.files { let (name, file_type, content) = match &file { FileEnum::Dashboard(dashboard) => ( @@ -876,7 +882,7 @@ fn tool_create_file( serde_json::to_string_pretty(&metric)?, ), }; - + let mut current_lines = Vec::new(); for (i, line) in content.lines().enumerate() { current_lines.push(BusterFileLine { @@ -896,11 +902,11 @@ fn tool_create_file( file: Some(current_lines), })); } - + if messages.is_empty() { return Err(anyhow::anyhow!("No valid files found in response")); } - + Ok(messages) } _ => Err(anyhow::anyhow!("Tool create file only supports complete.")), diff --git a/api/src/utils/agent/agent.rs b/api/src/utils/agent/agent.rs index 4320a598e..45491e2a2 100644 --- a/api/src/utils/agent/agent.rs +++ b/api/src/utils/agent/agent.rs @@ -253,95 +253,91 @@ impl Agent { }) .collect(); - // First, make request with tool_choice set to none - let initial_request = ChatCompletionRequest { - model: model.to_string(), - messages: thread.messages.clone(), - tools: if tools.is_empty() { - None - } else { - Some(tools.clone()) - }, - tool_choice: Some(ToolChoice::None("none".to_string())), - stream: Some(true), - ..Default::default() - }; + let mut tool_thread = thread.clone(); - // Get streaming response for initial thoughts - let mut initial_stream = llm_client.stream_chat_completion(initial_request).await?; - let mut initial_message = Message::assistant( - None, - Some(String::new()), - None, - None, - ); + // Only do initial message phase if this is the first call (recursion_depth = 0) + if recursion_depth == 0 { + // First, make request with tool_choice set to none + let initial_request = ChatCompletionRequest { + model: model.to_string(), + messages: thread.messages.clone(), + tools: if tools.is_empty() { + None + } else { + Some(tools.clone()) + }, + tool_choice: Some(ToolChoice::None("none".to_string())), + stream: Some(true), + ..Default::default() + }; - // Process initial stream chunks - while let Some(chunk_result) = initial_stream.recv().await { - match chunk_result { - Ok(chunk) => { - initial_message.set_id(chunk.id.clone()); + // Get streaming response for initial thoughts + let mut initial_stream = llm_client.stream_chat_completion(initial_request).await?; + let mut initial_message = Message::assistant( + None, + Some(String::new()), + None, + None, + ); - let delta = &chunk.choices[0].delta; + // Process initial stream chunks + while let Some(chunk_result) = initial_stream.recv().await { + match chunk_result { + Ok(chunk) => { + initial_message.set_id(chunk.id.clone()); - // Handle content updates - send delta directly - if let Some(content) = &delta.content { - // Send the delta chunk immediately with InProgress - let _ = tx - .send(Ok(Message::assistant( - Some("initial_message".to_string()), - Some(content.clone()), - None, - Some(MessageProgress::InProgress), - ))) - .await; + let delta = &chunk.choices[0].delta; - // Also accumulate for our thread history - if let Message::Assistant { - content: msg_content, - .. - } = &mut initial_message - { - if let Some(existing) = msg_content { - existing.push_str(content); + // Handle content updates - send delta directly + if let Some(content) = &delta.content { + // Send the delta chunk immediately with InProgress + let _ = tx + .send(Ok(Message::assistant( + Some("initial_message".to_string()), + Some(content.clone()), + None, + Some(MessageProgress::InProgress), + ))) + .await; + + // Also accumulate for our thread history + if let Message::Assistant { + content: msg_content, + .. + } = &mut initial_message + { + if let Some(existing) = msg_content { + existing.push_str(content); + } } } } - } - Err(e) => { - let _ = tx.send(Err(anyhow::Error::from(e))).await; - return Ok(()); + Err(e) => { + let _ = tx.send(Err(anyhow::Error::from(e))).await; + return Ok(()); + } } } + + // Ensure we have content in the initial message + let initial_content = match &initial_message { + Message::Assistant { content, .. } => content.clone().unwrap_or_default(), + _ => String::new(), + }; + + // Send the complete message with accumulated content + if !initial_content.trim().is_empty() { + let _ = tx + .send(Ok(Message::assistant( + Some("initial_message".to_string()), + Some(initial_content.clone()), + None, + Some(MessageProgress::Complete), + ))) + .await; + } } - // Ensure we have content in the initial message - let initial_content = match &initial_message { - Message::Assistant { content, .. } => content.clone().unwrap_or_default(), - _ => String::new(), - }; - - // Send the complete message with accumulated content - if !initial_content.trim().is_empty() { - let _ = tx - .send(Ok(Message::assistant( - Some("initial_message".to_string()), - Some(initial_content.clone()), - None, - Some(MessageProgress::Complete), - ))) - .await; - } - - // Create a new thread with the initial response - let mut tool_thread = thread.clone(); - tool_thread.messages.push(Message::assistant( - Some("initial_message".to_string()), - Some(initial_content.clone()), - None, - None, - )); - // Create the tool-enabled request let request = ChatCompletionRequest { model: model.to_string(),