diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index e89fb73e7..cd4ff69d2 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -2281,29 +2281,116 @@ fn transform_assistant_tool_message( // --- Handle Search Tool (Simple Text) --- "search_data_catalog" => { - // This tool doesn't stream complex arguments, just signals searching. - // Only send the message once when arguments start appearing. - if let Some(_args) = parser.process_search_data_catalog_chunk(tool_id.clone(), &tool_call.function.arguments) { - if tracker.get_complete_text(tool_id.clone()).is_none() { // Send only once - let search_msg = BusterReasoningMessage::Text(BusterReasoningText { - id: tool_id.clone(), - reasoning_type: "text".to_string(), - title: "Searching your data catalog...".to_string(), - secondary_title: "".to_string(), - message: None, - message_chunk: None, - status: Some("loading".to_string()), - finished_reasoning: false, - }); - all_results.push(ToolTransformResult::Reasoning(search_msg)); - // Use tracker to mark that we've sent the initial message for this ID - tracker.add_chunk(tool_id.clone(), "searching_sent".to_string()); - } + // --- Handle InProgress: Send initial message once --- + if progress == MessageProgress::InProgress { + // Use tracker to ensure this initial message is sent only once per tool call ID + let tracker_key = format!("{}_initial_search", tool_id); + if tracker.get_complete_text(tracker_key.clone()).is_none() { + let initial_search_msg = BusterReasoningMessage::Text(BusterReasoningText { + id: tool_id.clone(), // Use the tool call ID for the message + reasoning_type: "text".to_string(), + title: "Searching data catalog".to_string(), + secondary_title: "".to_string(), + message: None, + message_chunk: None, + status: Some("loading".to_string()), + finished_reasoning: false, + }); + all_results.push(ToolTransformResult::Reasoning(initial_search_msg)); + // Mark that we've sent the initial message for this ID + tracker.add_chunk(tracker_key, "initial_sent".to_string()); + } } - if progress == MessageProgress::Complete { - // The result from transform_tool_message will show final status and duration. - // Clear the tracker entry used for the initial "searching..." message. - tracker.clear_chunk(tool_id.clone()); + // --- Handle Complete: Update message with queries --- + else if progress == MessageProgress::Complete { + // Define struct to parse final arguments + #[derive(Deserialize)] + struct SearchArgs { + specific_queries: Option>, + exploratory_topics: Option>, + } + + // Parse the complete arguments string + match serde_json::from_str::(&tool_call.function.arguments) { + Ok(args) => { + let queries = args.specific_queries.unwrap_or_default(); + let topics = args.exploratory_topics.unwrap_or_default(); + + println!("queries: {:?}", queries); + println!("topics: {:?}", topics); + + if !queries.is_empty() || !topics.is_empty() { + let mut message_parts = Vec::new(); + if !queries.is_empty() { + message_parts.push("Specific Queries:".to_string()); + message_parts.extend(queries.iter().map(|q| format!("- {}", q))); + } + if !topics.is_empty() { + if !message_parts.is_empty() { message_parts.push("".to_string()); } // Add separator + message_parts.push("Exploratory Topics:".to_string()); + message_parts.extend(topics.iter().map(|t| format!("- {}", t))); + } + + // Create the UPDATED reasoning message with the full query list + // Use the SAME tool_id as the initial message + let mut pill_containers = Vec::new(); + + if !queries.is_empty() { + pill_containers.push(BusterThoughtPillContainer { + title: "Specific Queries".to_string(), + pills: queries.iter().enumerate().map(|(i, q)| BusterThoughtPill { + // Use a combination of tool_id and index for a unique pill id + id: format!("{}_query_{}", tool_id, i), + text: q.clone(), + thought_file_type: "query".to_string(), + }).collect(), + }); + } + + if !topics.is_empty() { + pill_containers.push(BusterThoughtPillContainer { + title: "Exploratory Topics".to_string(), + pills: topics.iter().enumerate().map(|(i, t)| BusterThoughtPill { + // Use a combination of tool_id and index for a unique pill id + id: format!("{}_topic_{}", tool_id, i), + text: t.clone(), + thought_file_type: "topic".to_string(), + }).collect(), + }); + } + + // Create the Pill reasoning message + let updated_search_msg = BusterReasoningMessage::Pill(BusterReasoningPill { + id: tool_id.clone(), + thought_type: "pills".to_string(), // Correct type + title: "Searching data catalog".to_string(), // Keep title simple + secondary_title: "".to_string(), // Duration added by tool result later + pill_containers: Some(pill_containers), // Use the generated containers + status: "loading".to_string(), // Still loading until tool result comes back + }); + all_results.push(ToolTransformResult::Reasoning(updated_search_msg)); + } else { + // If parsing succeeds but queries/topics are empty, + // we might want to keep the initial simple "Searching..." message. + // Currently, this branch does nothing, leaving the initial message as is. + tracing::debug!("Search arguments parsed but no queries or topics found for tool_id: {}", tool_id); + } + } + Err(e) => { + tracing::warn!( + "Failed to parse final search_data_catalog arguments for {}: {}. Args: {}", + tool_id, + e, + tool_call.function.arguments + ); + // If parsing fails, the initial "Searching..." message remains. + // Optionally create an error reasoning message here. + } + } + + // Clear the tracker flag used for the initial message + let tracker_key = format!("{}_initial_search", tool_id); + tracker.clear_chunk(tracker_key); } } @@ -3085,7 +3172,7 @@ async fn apply_file_filtering_rules( // return the context dashboard (unmodified info) followed by the other processed assets. tracing::debug!("Context dashboard {} was NOT modified this turn (only metrics). Prepending context info.", ctx_id); Ok(vec![context_dashboard_info] // Use the fetched (unmodified) context info - .into_iter() + .into_iter() .chain(new_filtered_assets.into_iter()) .collect()) } @@ -3166,7 +3253,7 @@ fn process_current_turn_files( .map_or(false, |uuid| unreferenced_metric_uuids.contains(&uuid)) }) .cloned() - .collect(); + .collect(); // Return unreferenced metrics first, then dashboards let mut combined = unreferenced_metrics; diff --git a/api/libs/handlers/src/chats/streaming_parser.rs b/api/libs/handlers/src/chats/streaming_parser.rs index 7966638bf..94afcad4f 100644 --- a/api/libs/handlers/src/chats/streaming_parser.rs +++ b/api/libs/handlers/src/chats/streaming_parser.rs @@ -1,6 +1,7 @@ use agents::tools::categories::file_tools::common::generate_deterministic_uuid; use anyhow::Result; use serde_json::Value; +use serde::Deserialize; use super::post_chat_handler::{ BusterFile, BusterFileContent, BusterReasoningFile, BusterReasoningMessage, BusterReasoningText, @@ -62,22 +63,12 @@ impl StreamingParser { pub fn process_search_data_catalog_chunk( &mut self, _id: String, - chunk: &str, - ) -> Option { - self.clear_buffer(); - self.buffer.push_str(chunk); - - let processed_json = self.complete_json_structure(self.buffer.clone()); - - // Try to parse arguments, return Some(Value) if successful and looks like search args - if let Ok(value) = serde_json::from_str::(&processed_json) { - if value.get("queries").is_some() { - return Some(value); - } - } - - // If the start of queries is not detected, return None - None + _chunk: &str, + ) -> Result> { + // We no longer process streaming search args here. + // Logic moved to post_chat_handler on message completion. + self.clear_buffer(); // Clear buffer in case it was used + Ok(None) // Always return None during streaming for this tool now } // Process chunks meant for metric files