diff --git a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs index 6be6225f6..6e292b7f9 100644 --- a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs +++ b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs @@ -179,6 +179,7 @@ impl SearchDataCatalogTool { trace_id: session_id.to_string(), }), max_completion_tokens: Some(8092), + temperature: Some(0.0), ..Default::default() }; diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 1c1e9fdfb..c7aba9869 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -48,6 +48,15 @@ use crate::messages::types::{ChatMessage, ChatUserMessage}; use super::types::ChatWithMessages; use tokio::sync::mpsc; +// Define the helper struct at the module level +#[derive(Debug, Clone)] +struct CompletedFileInfo { + id: String, + file_type: String, // "metric" or "dashboard" + file_name: String, + version_number: i32, +} + // Define ThreadEvent #[derive(Clone, Copy, Debug)] pub enum ThreadEvent { @@ -236,25 +245,25 @@ pub async fn post_chat_handler( .and_then(|id_str| Uuid::parse_str(id_str).ok()) .unwrap_or(asset_id_value); - // Verify the response ID matches the asset ID - if response_id == asset_id_value { - // Create association in database - now the message exists in DB - if let Err(e) = create_message_file_association( - message.id, - asset_id_value, - asset_version_number, - asset_type_value, - ) - .await { - tracing::warn!("Failed to create message file association: {}", e); + // Verify the response ID matches the asset ID + if response_id == asset_id_value { + // Create association in database - now the message exists in DB + if let Err(e) = create_message_file_association( + message.id, + asset_id_value, + asset_version_number, + asset_type_value, + ) + .await { + tracing::warn!("Failed to create message file association: {}", e); + } } + + // We only need to process one file association + break; } - - // We only need to process one file association - break; } } - } // Add to updated messages for the response updated_messages.push(message); @@ -515,11 +524,80 @@ pub async fn post_chat_handler( } }; + // --- START: New File Filtering Logic --- + + // 1. Collect all completed files from reasoning messages + let mut completed_files: Vec = Vec::new(); + for container in &all_transformed_containers { + if let BusterContainer::ReasoningMessage(reasoning_msg) = container { + if let BusterReasoningMessage::File(file_reasoning) = &reasoning_msg.reasoning { + // Consider files from both create and modify operations ("files" type) + // Check if the overall reasoning status is completed + if file_reasoning.message_type == "files" && file_reasoning.status == "completed" { + for (_file_id_key, file_detail) in &file_reasoning.files { + // Also ensure the individual file status is completed + if file_detail.status == "completed" { + completed_files.push(CompletedFileInfo { + id: file_detail.id.clone(), + file_type: file_detail.file_type.clone(), + file_name: file_detail.file_name.clone(), + version_number: file_detail.version_number, + }); + } + } + } + } + } + } + + // 2. Apply filtering logic + let contains_metrics = completed_files.iter().any(|f| f.file_type == "metric"); + let contains_dashboards = completed_files.iter().any(|f| f.file_type == "dashboard"); + + let filtered_files_for_response: Vec = if contains_dashboards { + // If any dashboards exist (created or modified), only show dashboards + completed_files.into_iter().filter(|f| f.file_type == "dashboard").collect() + } else if contains_metrics { + // If only metrics exist, show all metrics + completed_files.into_iter().filter(|f| f.file_type == "metric").collect() + } else { + // If neither (or empty), show nothing + vec![] + }; + + // 3. Generate BusterChatMessage::File for the filtered files + // Populate the vector declared outside the scope + let mut filtered_file_response_messages: Vec = Vec::new(); + for file_info in filtered_files_for_response { + let response_message = BusterChatMessage::File { + id: file_info.id.clone(), + file_type: file_info.file_type.clone(), + file_name: file_info.file_name.clone(), + version_number: file_info.version_number, + filter_version_id: None, // Assuming no filter ID needed here + metadata: Some(vec![BusterChatResponseFileMetadata { + status: "completed".to_string(), + message: "Generated by Buster".to_string(), // Or indicate modification if possible? Needs more state tracking. + timestamp: Some(Utc::now().timestamp()), + }]), + }; + if let Ok(value) = serde_json::to_value(&response_message) { + filtered_file_response_messages.push(value); + } + } + + // --- END: New File Filtering Logic --- + // Transform all messages for final storage - let (response_messages, reasoning_messages) = + // Get initial response messages (text, etc.) and reasoning messages separately + let (mut text_and_other_response_messages, reasoning_messages) = prepare_final_message_state(&all_transformed_containers)?; - // Update chat_with_messages with final state + // Create the final response message list: Start with filtered files, then add text/other messages + let mut final_response_messages = filtered_file_response_messages; + final_response_messages.append(&mut text_and_other_response_messages); + + // Update chat_with_messages with final state (now including filtered files first) let message = ChatMessage::new_with_messages( message_id, Some(ChatUserMessage { @@ -528,7 +606,7 @@ pub async fn post_chat_handler( sender_name: user.name.clone().unwrap_or_default(), sender_avatar: None, }), - response_messages.clone(), + final_response_messages.clone(), // Use the reordered list reasoning_messages.clone(), Some(formatted_reasoning_duration.clone()), // Use formatted reasoning duration for regular messages Utc::now(), @@ -545,7 +623,7 @@ pub async fn post_chat_handler( created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, - response_messages: serde_json::to_value(&response_messages)?, + response_messages: serde_json::to_value(&final_response_messages)?, // Use the reordered list reasoning: serde_json::to_value(&reasoning_messages)?, final_reasoning_message: Some(formatted_reasoning_duration), // Use formatted reasoning duration for regular messages title: title.title.clone().unwrap_or_default(), @@ -1036,42 +1114,7 @@ pub async fn transform_message( ) { Ok(messages) => { for reasoning_container in messages { - // Only process file response messages when they're completed - match &reasoning_container { - BusterReasoningMessage::File(file) - if matches!(progress, MessageProgress::Complete) - && file.status == "completed" - && file.message_type == "files" => - { - // For each completed file, create and send a file response message - for (_file_id, file_content) in &file.files { - let response_message = BusterChatMessage::File { - id: file_content.id.clone(), - file_type: file_content.file_type.clone(), - file_name: file_content.file_name.clone(), - version_number: file_content.version_number, - filter_version_id: None, - metadata: Some(vec![BusterChatResponseFileMetadata { - status: "completed".to_string(), - message: "Created by Buster".to_string(), - timestamp: Some(Utc::now().timestamp()), - }]), - }; - - containers.push(( - BusterContainer::ChatMessage( - BusterChatMessageContainer { - response_message, - chat_id: *chat_id, - message_id: *message_id, - }, - ), - ThreadEvent::GeneratingResponseMessage, - )); - } - } - _ => {} - } + // No longer generate response messages here, only reasoning containers.push(( BusterContainer::ReasoningMessage( @@ -1110,7 +1153,7 @@ pub async fn transform_message( let name_str = name.clone(); let mut containers = Vec::new(); - let messages = match transform_tool_message( + match transform_tool_message( tool_call_id, name, content.clone(), @@ -1119,42 +1162,7 @@ pub async fn transform_message( ) { Ok(messages) => { for reasoning_container in messages { - // Only process file response messages when they're completed - match &reasoning_container { - BusterReasoningMessage::File(file) - if matches!(progress, MessageProgress::Complete) - && file.status == "completed" - && file.message_type == "files" => - { - // For each completed file, create and send a file response message - for (_file_id, file_content) in &file.files { - let response_message = BusterChatMessage::File { - id: file_content.id.clone(), - file_type: file_content.file_type.clone(), - file_name: file_content.file_name.clone(), - version_number: file_content.version_number, - filter_version_id: None, - metadata: Some(vec![BusterChatResponseFileMetadata { - status: "completed".to_string(), - message: "Created by Buster".to_string(), - timestamp: Some(Utc::now().timestamp()), - }]), - }; - - containers.push(( - BusterContainer::ChatMessage( - BusterChatMessageContainer { - response_message, - chat_id: *chat_id, - message_id: *message_id, - }, - ), - ThreadEvent::GeneratingResponseMessage, - )); - } - } - _ => {} - } + // No longer generate response messages here, only reasoning containers.push(( BusterContainer::ReasoningMessage( @@ -1167,16 +1175,14 @@ pub async fn transform_message( ThreadEvent::GeneratingReasoningMessage, )); } - containers } Err(e) => { tracing::warn!("Error transforming tool message '{}': {:?}", name_str, e); println!("MESSAGE_STREAM: Error transforming tool message: {:?}", e); - vec![] } }; - Ok(messages) + Ok(containers) } else { Ok(vec![]) }