diff --git a/api/libs/agents/src/agent.rs b/api/libs/agents/src/agent.rs index 609ffcfa4..6fa7af58b 100644 --- a/api/libs/agents/src/agent.rs +++ b/api/libs/agents/src/agent.rs @@ -286,7 +286,7 @@ impl Agent { Some("shutdown_message".to_string()), Some("Processing interrupted due to shutdown signal".to_string()), None, - None, + MessageProgress::Complete, None, Some(agent_clone.name.clone()), )) @@ -314,7 +314,7 @@ impl Agent { Some("max_recursion_depth_message".to_string()), Some("I apologize, but I've reached the maximum number of actions (30). Please try breaking your request into smaller parts.".to_string()), None, - None, + MessageProgress::Complete, None, Some(self.name.clone()), ); @@ -374,7 +374,7 @@ impl Agent { message_id.clone(), Some(content_buffer.clone()), None, - Some(MessageProgress::InProgress), + MessageProgress::InProgress, Some(!first_message_sent), // Set initial=true only for the first message Some(self.name.clone()), ); @@ -426,7 +426,7 @@ impl Agent { Some(content_buffer.clone()) }, Some(tool_calls_vec), - Some(MessageProgress::InProgress), + MessageProgress::InProgress, Some(!first_message_sent), // Set initial=true only for the first message Some(self.name.clone()), ); @@ -466,7 +466,7 @@ impl Agent { Some(content_buffer) }, final_tool_calls.clone(), - Some(MessageProgress::Complete), + MessageProgress::Complete, Some(false), Some(self.name.clone()), ); @@ -499,7 +499,7 @@ impl Agent { result_str, tool_call.id.clone(), Some(tool_call.function.name.clone()), - None, + MessageProgress::Complete, ); // Broadcast the tool message as soon as we receive it @@ -656,7 +656,7 @@ mod tests { content, tool_id, Some(self.get_name()), - Some(progress), + progress, ); self.agent.get_stream_sender().await.send(Ok(message))?; Ok(()) diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index b96b9ff3c..71b042d22 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -212,9 +212,9 @@ pub async fn post_chat_handler( } BusterContainer::ReasoningMessage(reasoning) => { let reasoning_value = match &reasoning.reasoning { - ReasoningMessage::Pill(thought) => serde_json::to_value(thought).ok(), - ReasoningMessage::File(file) => serde_json::to_value(file).ok(), - ReasoningMessage::Text(text) => serde_json::to_value(text).ok(), + BusterReasoningMessage::Pill(thought) => serde_json::to_value(thought).ok(), + BusterReasoningMessage::File(file) => serde_json::to_value(file).ok(), + BusterReasoningMessage::Text(text) => serde_json::to_value(text).ok(), }; if let Some(value) = reasoning_value { @@ -271,7 +271,7 @@ async fn store_final_message_state( for container in transformed_messages { match container { BusterContainer::ReasoningMessage(msg) => match &msg.reasoning { - ReasoningMessage::File(file) if file.file_type == "metric" => { + BusterReasoningMessage::File(file) if file.file_type == "metric" => { if let Some(file_content) = &file.file { let metric_file = MetricFile { id: Uuid::new_v4(), @@ -312,7 +312,7 @@ async fn store_final_message_state( .await?; } } - ReasoningMessage::File(file) if file.file_type == "dashboard" => { + BusterReasoningMessage::File(file) if file.file_type == "dashboard" => { if let Some(file_content) = &file.file { let dashboard_file = DashboardFile { id: Uuid::new_v4(), @@ -376,7 +376,7 @@ pub struct BusterChatMessageContainer { #[derive(Debug, Serialize, Clone)] #[serde(untagged)] -pub enum ReasoningMessage { +pub enum BusterReasoningMessage { Pill(BusterReasoningPill), File(BusterReasoningFile), Text(BusterReasoningText), @@ -384,7 +384,7 @@ pub enum ReasoningMessage { #[derive(Debug, Serialize, Clone)] pub struct BusterReasoningMessageContainer { - pub reasoning: ReasoningMessage, + pub reasoning: BusterReasoningMessage, pub chat_id: Uuid, pub message_id: Uuid, } @@ -422,7 +422,6 @@ pub struct BusterReasoningText { pub status: Option, } - #[derive(Debug, Serialize, Clone)] pub struct BusterThoughtPillContainer { pub title: String, @@ -563,7 +562,7 @@ pub fn transform_message( } => { if let Some(name) = name { let name_str = name.clone(); // Clone here to use in println later - + // Use tool_call_id directly as it's already a String let messages = match transform_tool_message( tool_call_id, @@ -595,7 +594,7 @@ pub fn transform_message( fn transform_text_message( id: Option, content: String, - progress: Option, + progress: MessageProgress, chat_id: Uuid, message_id: Uuid, ) -> Result> { @@ -604,63 +603,43 @@ fn transform_text_message( progress ); - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - let container = BusterChatMessageContainer { - response_message: BusterChatMessage { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - message_type: "text".to_string(), - message: None, - message_chunk: Some(content), - is_final_message: Some(false), - }, - chat_id, - message_id, - }; - println!( - "MESSAGE_STREAM: Created in-progress text message: {:?}", - container - ); - Ok(vec![container]) - } - MessageProgress::Complete => { - let container = BusterChatMessageContainer { - response_message: BusterChatMessage { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - message_type: "text".to_string(), - message: Some(content), - message_chunk: None, - is_final_message: Some(true), - }, - chat_id, - message_id, - }; - println!( - "MESSAGE_STREAM: Created complete text message: {:?}", - container - ); - Ok(vec![container]) - } + match progress { + MessageProgress::InProgress => { + let container = BusterChatMessageContainer { + response_message: BusterChatMessage { + id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), + message_type: "text".to_string(), + message: None, + message_chunk: Some(content), + is_final_message: Some(false), + }, + chat_id, + message_id, + }; + println!( + "MESSAGE_STREAM: Created in-progress text message: {:?}", + container + ); + Ok(vec![container]) + } + MessageProgress::Complete => { + let container = BusterChatMessageContainer { + response_message: BusterChatMessage { + id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), + message_type: "text".to_string(), + message: Some(content), + message_chunk: None, + is_final_message: Some(true), + }, + chat_id, + message_id, + }; + println!( + "MESSAGE_STREAM: Created complete text message: {:?}", + container + ); + Ok(vec![container]) } - } else { - // Default case - let container = BusterChatMessageContainer { - response_message: BusterChatMessage { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - message_type: "text".to_string(), - message: None, - message_chunk: None, - is_final_message: Some(false), - }, - chat_id, - message_id, - }; - println!( - "MESSAGE_STREAM: Created default text message: {:?}", - container - ); - Ok(vec![container]) } } @@ -673,7 +652,7 @@ fn transform_tool_message( message_id: Uuid, ) -> Result> { // Use required ID (tool call ID) for all function calls - let containers = match name.as_str() { + let messages = match name.as_str() { "search_data_catalog" => tool_data_catalog_search(id.clone(), content)?, "create_file" => tool_create_file(id.clone(), content)?, "modify_file" => tool_modify_file(id.clone(), content)?, @@ -685,20 +664,24 @@ fn transform_tool_message( _ => return Err(anyhow::anyhow!("Unknown tool name: {}", name)), }; - // Transform to reasoning containers - let reasoning_containers = transform_to_reasoning_container(containers, chat_id, message_id); + // Convert BusterReasoningMessage to BusterReasoningMessageContainer + let reasoning_containers = messages.into_iter() + .map(|reasoning| BusterReasoningMessageContainer { + reasoning, + chat_id, + message_id, + }) + .collect(); + Ok(reasoning_containers) } // Update tool_create_metrics to require ID -fn tool_create_metrics( - id: String, - content: String, -) -> Result> { +fn tool_create_metrics(id: String, content: String) -> Result> { println!("MESSAGE_STREAM: Processing tool create metrics message"); - + let mut parser = StreamingParser::new(); - + match parser.process_chunk(id, &content)? { Some(message) => { println!( @@ -715,14 +698,11 @@ fn tool_create_metrics( } // Update tool_modify_metrics to require ID -fn tool_modify_metrics( - id: String, - content: String, -) -> Result> { +fn tool_modify_metrics(id: String, content: String) -> Result> { println!("MESSAGE_STREAM: Processing tool modify metrics message"); - + let mut parser = StreamingParser::new(); - + match parser.process_chunk(id, &content)? { Some(message) => { println!( @@ -739,14 +719,11 @@ fn tool_modify_metrics( } // Update tool_create_dashboards to require ID -fn tool_create_dashboards( - id: String, - content: String, -) -> Result> { +fn tool_create_dashboards(id: String, content: String) -> Result> { println!("MESSAGE_STREAM: Processing tool create dashboards message"); - + let mut parser = StreamingParser::new(); - + match parser.process_chunk(id, &content)? { Some(message) => { println!( @@ -757,20 +734,19 @@ fn tool_create_dashboards( } None => { println!("MESSAGE_STREAM: No valid dashboard data found in content"); - Err(anyhow::anyhow!("Failed to parse dashboard data from content")) + Err(anyhow::anyhow!( + "Failed to parse dashboard data from content" + )) } } } // Update tool_modify_dashboards to require ID -fn tool_modify_dashboards( - id: String, - content: String, -) -> Result> { +fn tool_modify_dashboards(id: String, content: String) -> Result> { println!("MESSAGE_STREAM: Processing tool modify dashboards message"); - + let mut parser = StreamingParser::new(); - + match parser.process_chunk(id, &content)? { Some(message) => { println!( @@ -781,67 +757,120 @@ fn tool_modify_dashboards( } None => { println!("MESSAGE_STREAM: No valid dashboard data found in content"); - Err(anyhow::anyhow!("Failed to parse dashboard data from content")) + Err(anyhow::anyhow!( + "Failed to parse dashboard data from content" + )) } } } -// Update tool_create_plan to require ID -fn tool_create_plan( - id: String, - content: String, -) -> Result> { - println!("MESSAGE_STREAM: Processing tool create plan message"); - +// Update tool_create_plan to work with BusterReasoningMessage +fn tool_create_plan(id: String, content: String) -> Result> { let mut parser = StreamingParser::new(); - match parser.process_chunk(id, &content)? { - Some(message) => { - println!( - "MESSAGE_STREAM: StreamingParser produced create plan message: {:?}", - message - ); - Ok(vec![message]) - } - None => { - println!("MESSAGE_STREAM: No valid plan data found in content"); - Err(anyhow::anyhow!("Failed to parse plan data from content")) + // First try to parse as plan data using StreamingParser + if let Ok(Some(message)) = parser.process_chunk(id.clone(), &content) { + println!( + "MESSAGE_STREAM: StreamingParser produced create plan message: {:?}", + message + ); + // Convert BusterReasoningMessage to BusterChatContainer + match message { + BusterReasoningMessage::Text(text) => { + // Create a thought pill container for the plan text + let plan_container = BusterThoughtPillContainer { + title: text.title.clone(), + pills: vec![BusterThoughtPill { + id: Uuid::new_v4().to_string(), + text: text.message.unwrap_or_default(), + thought_file_type: "text".to_string(), + }], + }; + + Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill { + id: text.id, + thought_type: "thought".to_string(), + title: text.title, + secondary_title: text.secondary_title.unwrap_or_default(), + pill_containers: Some(vec![plan_container]), + status: text.status.unwrap_or_else(|| "loading".to_string()), + })]) + } + _ => unreachable!("Plan creation should only return Text type"), } + } else { + println!("MESSAGE_STREAM: No valid plan data found in content"); + Err(anyhow::anyhow!("Failed to parse plan data from content")) } } -// Fix tool_data_catalog_search to use required ID directly -fn tool_data_catalog_search( - id: String, - content: String, -) -> Result> { +// Restore the original tool_data_catalog_search function +fn tool_data_catalog_search(id: String, content: String) -> Result> { + // First try to parse as search requirements using StreamingParser + let mut parser = StreamingParser::new(); + if let Ok(Some(message)) = parser.process_chunk(id.clone(), &content) { + match message { + BusterReasoningMessage::Text(text) => { + // If we successfully parsed search requirements, convert to appropriate format + return Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill { + id: text.id, + thought_type: "thought".to_string(), + title: "Search Query".to_string(), + secondary_title: "Processing search...".to_string(), + pill_containers: Some(vec![BusterThoughtPillContainer { + title: "Search Requirements".to_string(), + pills: vec![BusterThoughtPill { + id: Uuid::new_v4().to_string(), + text: text.message.unwrap_or_default(), + thought_file_type: "text".to_string(), + }], + }]), + status: text.status.unwrap_or_else(|| "loading".to_string()), + })]); + } + // For other message types from parser, convert accordingly + BusterReasoningMessage::File(file) => { + return Ok(vec![BusterReasoningMessage::File(file)]); + } + BusterReasoningMessage::Pill(pill) => { + return Ok(vec![BusterReasoningMessage::Pill(pill)]); + } + } + } + + // Fall back to existing logic for parsing search results let data_catalog_result = match serde_json::from_str::(&content) { Ok(result) => result, - Err(_) => return Ok(vec![]), // Silently ignore parsing errors + Err(e) => { + println!("Failed to parse SearchDataCatalogOutput: {:?}", e); + return Ok(vec![]); + } }; - let duration = (data_catalog_result.duration.clone() as f64 / 1000.0 * 10.0).round() / 10.0; + let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; let result_count = data_catalog_result.results.len(); let query_params = data_catalog_result.search_requirements.clone(); - let thought_pill_containters = - match proccess_data_catalog_search_results(data_catalog_result) { - Ok(object) => object, - Err(_) => return Ok(vec![]), // Silently ignore processing errors - }; + let thought_pill_containers = match proccess_data_catalog_search_results(data_catalog_result) { + Ok(containers) => containers, + Err(e) => { + println!("Failed to process data catalog search results: {:?}", e); + return Ok(vec![]); + } + }; let buster_thought = if result_count > 0 { - BusterChatContainer::Thought(BusterReasoningPill { - id: id, // Use required ID directly + BusterReasoningMessage::Pill(BusterReasoningPill { + id: id.clone(), thought_type: "thought".to_string(), title: format!("Found {} results", result_count), secondary_title: format!("{} seconds", duration), - pill_containers: Some(thought_pill_containters), + pill_containers: Some(thought_pill_containers), status: "completed".to_string(), }) } else { - BusterChatContainer::Thought(BusterReasoningPill { - id: id, // Use required ID directly + BusterReasoningMessage::Pill(BusterReasoningPill { + id: id.clone(), thought_type: "thought".to_string(), title: "No data catalog items found".to_string(), secondary_title: format!("{} seconds", duration), @@ -901,14 +930,11 @@ fn proccess_data_catalog_search_results( } // Implement tool_create_file with required ID -fn tool_create_file( - id: String, - content: String, -) -> Result> { +fn tool_create_file(id: String, content: String) -> Result> { println!("MESSAGE_STREAM: Processing tool create file message"); - + let mut parser = StreamingParser::new(); - + match parser.process_chunk(id, &content)? { Some(message) => { println!( @@ -925,14 +951,11 @@ fn tool_create_file( } // Implement tool_modify_file with required ID -fn tool_modify_file( - id: String, - content: String, -) -> Result> { +fn tool_modify_file(id: String, content: String) -> Result> { println!("MESSAGE_STREAM: Processing tool modify file message"); - + let mut parser = StreamingParser::new(); - + match parser.process_chunk(id, &content)? { Some(message) => { println!( @@ -948,33 +971,9 @@ fn tool_modify_file( } } -// Helper function to transform BusterChatContainer to BusterReasoningMessageContainer -fn transform_to_reasoning_container( - containers: Vec, - chat_id: Uuid, - message_id: Uuid, -) -> Vec { - containers - .into_iter() - .map(|container| match container { - BusterChatContainer::Thought(thought) => BusterReasoningMessageContainer { - reasoning: ReasoningMessage::Pill(thought), - chat_id, - message_id, - }, - BusterChatContainer::File(file) => BusterReasoningMessageContainer { - reasoning: ReasoningMessage::File(file), - chat_id, - message_id, - }, - _ => unreachable!("Tool messages should only return Thought or File"), - }) - .collect() -} - fn transform_assistant_tool_message( tool_calls: Vec, - progress: Option, + progress: MessageProgress, initial: bool, chat_id: Uuid, message_id: Uuid, @@ -994,399 +993,328 @@ fn transform_assistant_tool_message( // Process each tool call individually for tool_call in &tool_calls { let tool_id = tool_call.id.clone(); - - let messages = match &progress { - Some(MessageProgress::InProgress) => { - // For InProgress, we show loading states - match tool_call.function.name.as_str() { - "search_data_catalog" => assistant_data_catalog_search( - tool_id.clone(), - progress.clone(), - initial - ), - "create_metrics" => assistant_create_metrics( - tool_id.clone(), - progress.clone(), - initial - ), - "update_metrics" => assistant_modify_metrics( - tool_id.clone(), - progress.clone(), - initial - ), - "create_dashboards" => assistant_create_dashboards( - tool_id.clone(), - progress.clone(), - initial - ), - "update_dashboards" => assistant_modify_dashboards( - tool_id.clone(), - progress.clone(), - initial - ), - "create_plan" => assistant_create_plan( - tool_id.clone(), - progress.clone(), - initial - ), - _ => Err(anyhow::anyhow!( - "Unsupported tool name: {}", - tool_call.function.name - )), - } - }, - Some(MessageProgress::Complete) => { - // For Complete, we process the actual tool calls - match tool_call.function.name.as_str() { - "search_data_catalog" => Ok(vec![]), // Search doesn't have a complete state - "create_metrics" => process_assistant_create_metrics(tool_call), - "update_metrics" => process_assistant_modify_metrics(tool_call), - "create_dashboards" => process_assistant_create_dashboards(tool_call), - "update_dashboards" => process_assistant_modify_dashboards(tool_call), - "create_plan" => process_assistant_create_plan(tool_call), - _ => Err(anyhow::anyhow!( - "Unsupported tool name: {}", - tool_call.function.name - )), - } - }, - None => Err(anyhow::anyhow!("Progress state is required")), + + // Always use the assistant_* functions, passing both arguments and progress + // Clone progress for each iteration to avoid moved value errors + let messages = match tool_call.function.name.as_str() { + "search_data_catalog" => assistant_data_catalog_search( + tool_id, + tool_call.function.arguments.clone(), + progress.clone(), + initial, + )?, + "create_metrics" => assistant_create_metrics( + tool_id, + tool_call.function.arguments.clone(), + progress.clone(), + initial, + )?, + "modify_metrics" => assistant_modify_metrics( + tool_id, + tool_call.function.arguments.clone(), + progress.clone(), + initial, + )?, + "create_dashboards" => assistant_create_dashboards( + tool_id, + tool_call.function.arguments.clone(), + progress.clone(), + initial, + )?, + "modify_dashboards" => assistant_modify_dashboards( + tool_id, + tool_call.function.arguments.clone(), + progress.clone(), + initial, + )?, + "create_plan" => assistant_create_plan( + tool_id, + tool_call.function.arguments.clone(), + progress.clone(), + initial, + )?, + _ => { + println!( + "MESSAGE_STREAM: Unknown tool name: {}", + tool_call.function.name + ); + vec![] + } }; - // Add messages from this tool call to our collection if successful - if let Ok(tool_messages) = messages { - all_messages.extend(tool_messages); - } + // Convert BusterReasoningMessage to BusterReasoningMessageContainer + let containers: Vec = messages.into_iter() + .map(|reasoning| BusterReasoningMessageContainer { + reasoning, + chat_id, + message_id, + }) + .collect(); + + all_messages.extend(containers); } - println!( - "MESSAGE_STREAM: transform_assistant_tool_message returning {} containers", - all_messages.len() - ); - - // Transform all collected messages into BusterReasoningMessageContainer objects - Ok(all_messages - .into_iter() - .map(|message| BusterReasoningMessageContainer { - reasoning: match message { - BusterChatContainer::Thought(thought) => ReasoningMessage::Pill(thought), - BusterChatContainer::File(file) => ReasoningMessage::File(file), - _ => unreachable!("Assistant tool messages should only return Thought or File"), - }, - chat_id: chat_id.clone(), - message_id: message_id.clone(), - }) - .collect()) + Ok(all_messages) } +// Fix the assistant_data_catalog_search function to return BusterReasoningMessage instead of BusterChatContainer fn assistant_data_catalog_search( id: String, - progress: Option, + content: String, + progress: MessageProgress, initial: bool, -) -> Result> { - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - if initial { - Ok(vec![BusterChatContainer::Thought(BusterReasoningPill { - id, +) -> Result> { + // Only process complete messages for data catalog search + if matches!(progress, MessageProgress::Complete) { + // First try to parse as search requirements using StreamingParser + let mut parser = StreamingParser::new(); + if let Ok(Some(message)) = parser.process_chunk(id.clone(), &content) { + match message { + BusterReasoningMessage::Text(text) => { + // If we successfully parsed search requirements, convert to appropriate format + return Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill { + id: text.id, thought_type: "thought".to_string(), - title: "Searching your data catalog...".to_string(), - secondary_title: "".to_string(), - pill_containers: None, - status: "loading".to_string(), - })]) - } else { - Ok(vec![]) + title: "Search Query".to_string(), + secondary_title: "Processing search...".to_string(), + pill_containers: Some(vec![BusterThoughtPillContainer { + title: "Search Requirements".to_string(), + pills: vec![BusterThoughtPill { + id: Uuid::new_v4().to_string(), + text: text.message.unwrap_or_default(), + thought_file_type: "text".to_string(), + }], + }]), + status: "completed".to_string(), + })]); } + _ => unreachable!("Data catalog search should only return Text type"), } - _ => Err(anyhow::anyhow!( - "Assistant data catalog search only supports in progress." - )), } + + // Fall back to existing logic for full search results + let data_catalog_result = match serde_json::from_str::(&content) { + Ok(result) => result, + Err(e) => { + println!("Failed to parse SearchDataCatalogOutput: {:?}", e); + return Ok(vec![]); + } + }; + + let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; + let result_count = data_catalog_result.results.len(); + let query_params = data_catalog_result.search_requirements.clone(); + + let thought_pill_containers = + match proccess_data_catalog_search_results(data_catalog_result) { + Ok(containers) => containers, + Err(e) => { + println!("Failed to process data catalog search results: {:?}", e); + return Ok(vec![]); + } + }; + + let thought = if result_count > 0 { + BusterReasoningMessage::Pill(BusterReasoningPill { + id: id.clone(), + thought_type: "thought".to_string(), + title: format!("Found {} results", result_count), + secondary_title: format!("{} seconds", duration), + pill_containers: Some(thought_pill_containers), + status: "completed".to_string(), + }) + } else { + BusterReasoningMessage::Pill(BusterReasoningPill { + id: id.clone(), + thought_type: "thought".to_string(), + title: "No data catalog items found".to_string(), + secondary_title: format!("{} seconds", duration), + pill_containers: Some(vec![BusterThoughtPillContainer { + title: "No results found".to_string(), + pills: vec![BusterThoughtPill { + id: "".to_string(), + text: query_params, + thought_file_type: "empty".to_string(), + }], + }]), + status: "completed".to_string(), + }) + }; + + Ok(vec![thought]) } else { - Err(anyhow::anyhow!( - "Assistant data catalog search requires progress." - )) + Ok(vec![]) // Skip in-progress messages } } fn assistant_create_metrics( id: String, - progress: Option, + content: String, + progress: MessageProgress, initial: bool, -) -> Result> { - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - Ok(vec![BusterChatContainer::Thought(BusterReasoningPill { - id, - thought_type: "thought".to_string(), - title: "Creating metrics...".to_string(), - secondary_title: "".to_string(), - pill_containers: None, - status: "loading".to_string(), - })]) +) -> Result> { + let mut parser = StreamingParser::new(); + // Process both in-progress and complete messages for metrics + match parser.process_chunk(id.clone(), &content) { + Ok(Some(message)) => { + let status = match progress { + MessageProgress::InProgress => "in_progress", + MessageProgress::Complete => "completed", + _ => "loading", + }; + + // Update status in the message if it's a File type + match message { + BusterReasoningMessage::File(mut file) => { + file.status = status.to_string(); + Ok(vec![BusterReasoningMessage::File(file)]) + } + _ => Ok(vec![message]), } - _ => Err(anyhow::anyhow!( - "Assistant create metrics only supports in progress." - )), - } - } else { - Err(anyhow::anyhow!( - "Assistant create metrics requires progress." - )) - } -} - -fn process_assistant_create_metrics( - tool_call: &ToolCall, -) -> Result> { - println!( - "MESSAGE_STREAM: process_assistant_create_metrics called with arguments: {}", - tool_call.function.arguments - ); - - let mut parser = StreamingParser::new(); - - // Process the arguments from the tool call - match parser.process_chunk(tool_call.id.clone(), &tool_call.function.arguments)? { - Some(message) => { - println!( - "MESSAGE_STREAM: StreamingParser produced metric message: {:?}", - message - ); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for metrics, waiting for more data" - ); - Ok(vec![]) // Return empty vec instead of error when waiting for file data - } - } -} - -fn process_assistant_modify_metrics( - tool_call: &ToolCall, -) -> Result> { - println!( - "MESSAGE_STREAM: process_assistant_modify_metrics called with arguments: {}", - tool_call.function.arguments - ); - - let mut parser = StreamingParser::new(); - - // Process the arguments from the tool call - match parser.process_chunk(tool_call.id.clone(), &tool_call.function.arguments)? { - Some(message) => { - println!( - "MESSAGE_STREAM: StreamingParser produced modify metric message: {:?}", - message - ); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for modify metrics, waiting for more data" - ); - Ok(vec![]) // Return empty vec instead of error when waiting for file data - } - } -} - -fn assistant_create_dashboards( - id: String, - progress: Option, - initial: bool, -) -> Result> { - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - Ok(vec![BusterChatContainer::Thought(BusterReasoningPill { - id, - thought_type: "thought".to_string(), - title: "Creating dashboards...".to_string(), - secondary_title: "".to_string(), - pill_containers: None, - status: "loading".to_string(), - })]) - } - _ => Err(anyhow::anyhow!( - "Assistant create dashboards only supports in progress." - )), - } - } else { - Err(anyhow::anyhow!( - "Assistant create dashboards requires progress." - )) - } -} - -fn process_assistant_create_dashboards( - tool_call: &ToolCall, -) -> Result> { - println!( - "MESSAGE_STREAM: process_assistant_create_dashboards called with arguments: {}", - tool_call.function.arguments - ); - - let mut parser = StreamingParser::new(); - - // Process the arguments from the tool call - match parser.process_chunk(tool_call.id.clone(), &tool_call.function.arguments)? { - Some(message) => { - println!( - "MESSAGE_STREAM: StreamingParser produced dashboard message: {:?}", - message - ); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for dashboards, waiting for more data" - ); - Ok(vec![]) // Return empty vec instead of error when waiting for file data - } - } -} - -fn assistant_modify_dashboards( - id: String, - progress: Option, - initial: bool, -) -> Result> { - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - Ok(vec![BusterChatContainer::Thought(BusterReasoningPill { - id, - thought_type: "thought".to_string(), - title: "Updating dashboards...".to_string(), - secondary_title: "".to_string(), - pill_containers: None, - status: "loading".to_string(), - })]) - } - _ => Err(anyhow::anyhow!( - "Assistant modify dashboards only supports in progress." - )), - } - } else { - Err(anyhow::anyhow!( - "Assistant modify dashboards requires progress." - )) - } -} - -fn process_assistant_modify_dashboards( - tool_call: &ToolCall, -) -> Result> { - println!( - "MESSAGE_STREAM: process_assistant_modify_dashboards called with arguments: {}", - tool_call.function.arguments - ); - - let mut parser = StreamingParser::new(); - - // Process the arguments from the tool call - match parser.process_chunk(tool_call.id.clone(), &tool_call.function.arguments)? { - Some(message) => { - println!( - "MESSAGE_STREAM: StreamingParser produced modify dashboard message: {:?}", - message - ); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for modify dashboards, waiting for more data" - ); - Ok(vec![]) // Return empty vec instead of error when waiting for file data - } - } -} - -fn assistant_create_plan( - id: String, - progress: Option, - initial: bool, -) -> Result> { - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - Ok(vec![BusterChatContainer::Thought(BusterReasoningPill { - id, - thought_type: "thought".to_string(), - title: "Creating plan...".to_string(), - secondary_title: "".to_string(), - pill_containers: None, - status: "loading".to_string(), - })]) - } - _ => Err(anyhow::anyhow!( - "Assistant create plan only supports in progress." - )), - } - } else { - Err(anyhow::anyhow!( - "Assistant create plan requires progress." - )) - } -} - -fn process_assistant_create_plan( - tool_call: &ToolCall, -) -> Result> { - println!( - "MESSAGE_STREAM: process_assistant_create_plan called with arguments: {}", - tool_call.function.arguments - ); - - let mut parser = StreamingParser::new(); - - // Process the arguments from the tool call - match parser.process_chunk(tool_call.id.clone(), &tool_call.function.arguments)? { - Some(message) => { - println!( - "MESSAGE_STREAM: StreamingParser produced plan message: {:?}", - message - ); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for plan, waiting for more data" - ); - Ok(vec![]) // Return empty vec instead of error when waiting for file data } + Ok(None) => Ok(vec![]), + Err(e) => Err(e), } } fn assistant_modify_metrics( id: String, - progress: Option, + content: String, + progress: MessageProgress, initial: bool, -) -> Result> { - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - Ok(vec![BusterChatContainer::Thought(BusterReasoningPill { - id, - thought_type: "thought".to_string(), - title: "Updating metrics...".to_string(), - secondary_title: "".to_string(), - pill_containers: None, - status: "loading".to_string(), - })]) +) -> Result> { + let mut parser = StreamingParser::new(); + // Process both in-progress and complete messages for metrics + match parser.process_chunk(id.clone(), &content) { + Ok(Some(message)) => { + let status = match progress { + MessageProgress::InProgress => "in_progress", + MessageProgress::Complete => "completed", + }; + + // Update status in the message if it's a File type + match message { + BusterReasoningMessage::File(mut file) => { + file.status = status.to_string(); + Ok(vec![BusterReasoningMessage::File(file)]) + } + _ => Ok(vec![message]), } - _ => Err(anyhow::anyhow!( - "Assistant modify metrics only supports in progress." - )), } - } else { - Err(anyhow::anyhow!( - "Assistant modify metrics requires progress." - )) + Ok(None) => Ok(vec![]), + Err(e) => Err(e), + } +} + +fn assistant_create_dashboards( + id: String, + content: String, + progress: MessageProgress, + initial: bool, +) -> Result> { + let mut parser = StreamingParser::new(); + // Process both in-progress and complete messages for dashboards + match parser.process_chunk(id.clone(), &content) { + Ok(Some(message)) => { + let status = match progress { + MessageProgress::InProgress => "in_progress", + MessageProgress::Complete => "completed", + }; + + // Update status in the message if it's a File type + match message { + BusterReasoningMessage::File(mut file) => { + file.status = status.to_string(); + Ok(vec![BusterReasoningMessage::File(file)]) + } + _ => Ok(vec![message]), + } + } + Ok(None) => Ok(vec![]), + Err(e) => Err(e), + } +} + +// Fix for the modify_dashboards function to return BusterReasoningMessage instead of BusterChatContainer +fn assistant_modify_dashboards( + id: String, + content: String, + progress: MessageProgress, + initial: bool, +) -> Result> { + let mut parser = StreamingParser::new(); + // Process both in-progress and complete messages for dashboards + match parser.process_chunk(id.clone(), &content) { + Ok(Some(message)) => { + let status = match progress { + MessageProgress::InProgress => "in_progress", + MessageProgress::Complete => "completed", + _ => "loading", + }; + + // Update status in the message if it's a File type + match message { + BusterReasoningMessage::File(mut file) => { + file.status = status.to_string(); + Ok(vec![BusterReasoningMessage::File(file)]) + } + _ => Ok(vec![message]), + } + } + Ok(None) => Ok(vec![]), + Err(e) => Err(e), + } +} + +fn assistant_create_plan( + id: String, + content: String, + progress: MessageProgress, + initial: bool, +) -> Result> { + let mut parser = StreamingParser::new(); + // Process both in-progress and complete messages for plan creation + match parser.process_chunk(id.clone(), &content) { + Ok(Some(message)) => { + let status = match progress { + MessageProgress::InProgress => "in_progress", + MessageProgress::Complete => "completed", + _ => "loading", + }; + + // Convert BusterReasoningMessage to BusterChatContainer and update status + match message { + BusterReasoningMessage::Text(text) => { + // Create a thought pill container for the plan text + let plan_container = BusterThoughtPillContainer { + title: text.title.clone(), + pills: vec![BusterThoughtPill { + id: Uuid::new_v4().to_string(), + text: text.message.unwrap_or_default(), + thought_file_type: "markdown".to_string(), + }], + }; + + Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill { + id: text.id, + thought_type: "thought".to_string(), + title: text.title, + secondary_title: text.secondary_title.unwrap_or_default(), + pill_containers: Some(vec![plan_container]), + status: status.to_string(), + })]) + } + BusterReasoningMessage::File(mut file) => { + file.status = status.to_string(); + Ok(vec![BusterReasoningMessage::File(file)]) + } + BusterReasoningMessage::Pill(mut thought) => { + thought.status = status.to_string(); + Ok(vec![BusterReasoningMessage::Pill(thought)]) + } + } + } + Ok(None) => Ok(vec![]), + Err(e) => Err(e), } } diff --git a/api/libs/handlers/src/chats/streaming_parser.rs b/api/libs/handlers/src/chats/streaming_parser.rs index 6620738bb..fb86bb95f 100644 --- a/api/libs/handlers/src/chats/streaming_parser.rs +++ b/api/libs/handlers/src/chats/streaming_parser.rs @@ -2,7 +2,7 @@ use anyhow::Result; use serde_json::Value; use uuid::Uuid; -use super::post_chat_handler::{BusterChatContainer, BusterFileLine, BusterReasoningFile}; +use super::post_chat_handler::{BusterReasoningMessage, BusterFileLine, BusterReasoningFile, BusterReasoningPill, BusterThoughtPill, BusterThoughtPillContainer, BusterReasoningText}; pub struct StreamingParser { buffer: String, @@ -20,10 +20,77 @@ impl StreamingParser { } } - pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result> { + // Main entry point to process chunks based on type + pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result> { // Add new chunk to buffer self.buffer.push_str(chunk); + // Try to process as a plan first + if let Some(plan) = self.process_plan_chunk(id.clone())? { + return Ok(Some(plan)); + } + + // Then try to process as search data catalog + if let Some(search) = self.process_search_catalog_chunk(id.clone())? { + return Ok(Some(search)); + } + + // Finally try to process as a file + self.process_file_chunk(id) + } + + // Process chunks meant for plan creation + pub fn process_plan_chunk(&mut self, id: String) -> Result> { + // Complete any incomplete JSON structure + let processed_json = self.complete_json_structure(self.buffer.clone()); + + // Try to parse the JSON + if let Ok(value) = serde_json::from_str::(&processed_json) { + // Check if it's a plan structure (has plan_markdown key) + if let Some(plan_markdown) = value.get("plan_markdown").and_then(Value::as_str) { + // Return the plan as a BusterReasoningText + return Ok(Some(BusterReasoningMessage::Text(BusterReasoningText { + id, + reasoning_type: "text".to_string(), + title: "Generated Plan".to_string(), + secondary_title: Some("Plan details".to_string()), + message: Some(plan_markdown.to_string()), + message_chunk: None, + status: Some("loading".to_string()), + }))); + } + } + + Ok(None) + } + + // Process chunks meant for search data catalog + pub fn process_search_catalog_chunk(&mut self, id: String) -> Result> { + // Complete any incomplete JSON structure + let processed_json = self.complete_json_structure(self.buffer.clone()); + + // Try to parse the JSON + if let Ok(value) = serde_json::from_str::(&processed_json) { + // Check if it's a search requirements structure + if let Some(search_requirements) = value.get("search_requirements").and_then(Value::as_str) { + // Return the search requirements as a BusterReasoningText + return Ok(Some(BusterReasoningMessage::Text(BusterReasoningText { + id, + reasoning_type: "text".to_string(), + title: "Search Requirements".to_string(), + secondary_title: Some("Processing search...".to_string()), + message: Some(search_requirements.to_string()), + message_chunk: None, + status: Some("loading".to_string()), + }))); + } + } + + Ok(None) + } + + // Process chunks meant for files (original implementation) + pub fn process_file_chunk(&mut self, id: String) -> Result> { // Extract and replace yml_content with placeholders let mut yml_contents = Vec::new(); let mut positions = Vec::new(); @@ -78,25 +145,13 @@ impl StreamingParser { } // Now check the structure after modifications - if let Some(obj) = value.as_object() { - if let Some(files) = obj.get("files").and_then(Value::as_array) { - if let Some(last_file) = files.last().and_then(Value::as_object) { - let has_name = last_file.get("name").and_then(Value::as_str).is_some(); - let has_file_type = - last_file.get("file_type").and_then(Value::as_str).is_some(); - let has_yml_content = last_file.get("yml_content").is_some(); - - if has_name && has_file_type && has_yml_content { - return self.convert_to_message(id, value); - } - } - } - } + return self.convert_file_to_message(id, value); } Ok(None) } + // Helper method to complete JSON structure (shared functionality) fn complete_json_structure(&self, json: String) -> String { let mut processed = String::with_capacity(json.len()); let mut nesting_stack = Vec::new(); @@ -144,44 +199,44 @@ impl StreamingParser { } } - println!("complete_json_structure: {:?}", processed); processed } - fn convert_to_message(&self, id: String, value: Value) -> Result> { + // Helper method to convert file JSON to message + fn convert_file_to_message(&self, id: String, value: Value) -> Result> { if let Some(files) = value.get("files").and_then(Value::as_array) { if let Some(last_file) = files.last().and_then(Value::as_object) { - let name = last_file.get("name").and_then(Value::as_str).unwrap_or(""); - let file_type = last_file - .get("file_type") - .and_then(Value::as_str) - .unwrap_or(""); - let yml_content = last_file - .get("yml_content") - .and_then(Value::as_str) - .unwrap_or(""); + let has_name = last_file.get("name").and_then(Value::as_str).is_some(); + let has_file_type = last_file.get("file_type").and_then(Value::as_str).is_some(); + let has_yml_content = last_file.get("yml_content").is_some(); - let mut current_lines = Vec::new(); - for (i, line) in yml_content.lines().enumerate() { - current_lines.push(BusterFileLine { - line_number: i + 1, - text: line.to_string(), - modified: Some(false), - }); + if has_name && has_file_type && has_yml_content { + let name = last_file.get("name").and_then(Value::as_str).unwrap_or(""); + let file_type = last_file.get("file_type").and_then(Value::as_str).unwrap_or(""); + let yml_content = last_file.get("yml_content").and_then(Value::as_str).unwrap_or(""); + + let mut current_lines = Vec::new(); + for (i, line) in yml_content.lines().enumerate() { + current_lines.push(BusterFileLine { + line_number: i + 1, + text: line.to_string(), + modified: Some(false), + }); + } + + return Ok(Some(BusterReasoningMessage::File(BusterReasoningFile { + id, + message_type: "file".to_string(), + file_type: file_type.to_string(), + file_name: name.to_string(), + version_number: 1, + version_id: Uuid::new_v4().to_string(), + status: "loading".to_string(), + file: Some(current_lines), + filter_version_id: None, + metadata: None, + }))); } - - return Ok(Some(BusterChatContainer::File(BusterReasoningFile { - id, - message_type: "file".to_string(), - file_type: file_type.to_string(), - file_name: name.to_string(), - version_number: 1, - version_id: Uuid::new_v4().to_string(), - status: "loading".to_string(), - file: Some(current_lines), - filter_version_id: None, - metadata: None, - }))); } } Ok(None) diff --git a/api/libs/litellm/src/types.rs b/api/libs/litellm/src/types.rs index 30bc0a950..221737177 100644 --- a/api/libs/litellm/src/types.rs +++ b/api/libs/litellm/src/types.rs @@ -100,6 +100,12 @@ pub enum MessageProgress { Complete, } +impl Default for MessageProgress { + fn default() -> Self { + Self::Complete + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "role")] #[serde(rename_all = "lowercase")] @@ -129,7 +135,7 @@ pub enum AgentMessage { #[serde(skip_serializing_if = "Option::is_none")] tool_calls: Option>, #[serde(skip)] - progress: Option, + progress: MessageProgress, #[serde(skip)] initial: bool, }, @@ -141,7 +147,7 @@ pub enum AgentMessage { #[serde(skip_serializing_if = "Option::is_none")] name: Option, #[serde(skip)] - progress: Option, + progress: MessageProgress, }, } @@ -168,7 +174,7 @@ impl AgentMessage { id: Option, content: Option, tool_calls: Option>, - progress: Option, + progress: MessageProgress, initial: Option, name: Option, ) -> Self { @@ -189,7 +195,7 @@ impl AgentMessage { content: impl Into, tool_call_id: impl Into, name: Option, - progress: Option, + progress: MessageProgress, ) -> Self { Self::Tool { id, @@ -501,7 +507,7 @@ mod tests { Some("\n\nHello there, how may I assist you today?".to_string()), None, None, - None, + MessageProgress::Complete, None, None, ), @@ -645,7 +651,14 @@ mod tests { choices: vec![Choice { finish_reason: Some("length".to_string()), index: 0, - message: AgentMessage::assistant(Some("".to_string()), None, None, None, None, None), + message: AgentMessage::assistant( + Some("".to_string()), + None, + None, + MessageProgress::Complete, + None, + None, + ), delta: None, logprobs: None, }], @@ -934,7 +947,7 @@ mod tests { code_interpreter: None, retrieval: None, }]), - None, + MessageProgress::Complete, None, None, ),