diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 7cae568d4..57ccc364e 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -517,7 +517,6 @@ pub fn transform_message( if let Some(tool_calls) = tool_calls { let messages = match transform_assistant_tool_message( - id, tool_calls.clone(), progress, initial, @@ -663,7 +662,7 @@ fn transform_tool_message( ) -> Result> { // Use required ID (tool call ID) for all function calls let containers = match name.as_str() { - "data_catalog_search" => tool_data_catalog_search(id.clone(), content)?, + "search_data_catalog" => tool_data_catalog_search(id.clone(), content)?, "create_file" => tool_create_file(id.clone(), content)?, "modify_file" => tool_modify_file(id.clone(), content)?, "create_metrics" => tool_create_metrics(id.clone(), content)?, @@ -962,7 +961,6 @@ fn transform_to_reasoning_container( } fn transform_assistant_tool_message( - id: Option, tool_calls: Vec, progress: Option, initial: bool, @@ -970,8 +968,8 @@ fn transform_assistant_tool_message( message_id: Uuid, ) -> Result> { println!( - "MESSAGE_STREAM: transform_assistant_tool_message called with tool_calls: {:?}", - tool_calls + "MESSAGE_STREAM: transform_assistant_tool_message called with {} tool_calls", + tool_calls.len() ); if tool_calls.is_empty() { @@ -979,29 +977,83 @@ fn transform_assistant_tool_message( return Ok(vec![]); } - let tool_call = &tool_calls[0]; - let messages = match tool_call.function.name.as_str() { - "search_data_catalog" => assistant_data_catalog_search(id, progress, initial), - "create_metrics" => assistant_create_metrics(id, tool_calls.clone(), progress, initial), - "update_metrics" => assistant_modify_metrics(id, tool_calls.clone(), progress, initial), - "create_dashboards" => { - assistant_create_dashboards(id, tool_calls.clone(), progress, initial) + let mut all_messages = Vec::new(); + + // Process each tool call individually + for tool_call in &tool_calls { + let tool_id = tool_call.id.clone(); + + let messages = match &progress { + Some(MessageProgress::InProgress) => { + // For InProgress, we show loading states + match tool_call.function.name.as_str() { + "search_data_catalog" => assistant_data_catalog_search( + tool_id.clone(), + progress.clone(), + initial + ), + "create_metrics" => assistant_create_metrics( + tool_id.clone(), + progress.clone(), + initial + ), + "update_metrics" => assistant_modify_metrics( + tool_id.clone(), + progress.clone(), + initial + ), + "create_dashboards" => assistant_create_dashboards( + tool_id.clone(), + progress.clone(), + initial + ), + "update_dashboards" => assistant_modify_dashboards( + tool_id.clone(), + progress.clone(), + initial + ), + "create_plan" => assistant_create_plan( + tool_id.clone(), + progress.clone(), + initial + ), + _ => Err(anyhow::anyhow!( + "Unsupported tool name: {}", + tool_call.function.name + )), + } + }, + Some(MessageProgress::Complete) => { + // For Complete, we process the actual tool calls + match tool_call.function.name.as_str() { + "search_data_catalog" => Ok(vec![]), // Search doesn't have a complete state + "create_metrics" => process_assistant_create_metrics(tool_call), + "update_metrics" => process_assistant_modify_metrics(tool_call), + "create_dashboards" => process_assistant_create_dashboards(tool_call), + "update_dashboards" => process_assistant_modify_dashboards(tool_call), + "create_plan" => process_assistant_create_plan(tool_call), + _ => Err(anyhow::anyhow!( + "Unsupported tool name: {}", + tool_call.function.name + )), + } + }, + None => Err(anyhow::anyhow!("Progress state is required")), + }; + + // Add messages from this tool call to our collection if successful + if let Ok(tool_messages) = messages { + all_messages.extend(tool_messages); } - "update_dashboards" => { - assistant_modify_dashboards(id, tool_calls.clone(), progress, initial) - } - "create_plan" => assistant_create_plan(id, tool_calls.clone(), progress, initial), - _ => Err(anyhow::anyhow!( - "Unsupported tool name: {}", - tool_call.function.name - )), - }?; + } println!( "MESSAGE_STREAM: transform_assistant_tool_message returning {} containers", - messages.len() + all_messages.len() ); - Ok(messages + + // Transform all collected messages into BusterReasoningMessageContainer objects + Ok(all_messages .into_iter() .map(|message| BusterReasoningMessageContainer { reasoning: match message { @@ -1009,14 +1061,14 @@ fn transform_assistant_tool_message( BusterChatContainer::File(file) => ReasoningMessage::File(file), _ => unreachable!("Assistant tool messages should only return Thought or File"), }, - chat_id, - message_id, + chat_id: chat_id.clone(), + message_id: message_id.clone(), }) .collect()) } fn assistant_data_catalog_search( - id: Option, + id: String, progress: Option, initial: bool, ) -> Result> { @@ -1024,8 +1076,6 @@ fn assistant_data_catalog_search( match progress { MessageProgress::InProgress => { if initial { - let id = id.unwrap_or_else(|| Uuid::new_v4().to_string()); - Ok(vec![BusterChatContainer::Thought(BusterThought { id, thought_type: "thought".to_string(), @@ -1050,17 +1100,15 @@ fn assistant_data_catalog_search( } fn assistant_create_metrics( - id: Option, - tool_calls: Vec, + id: String, progress: Option, initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { MessageProgress::InProgress => { - // Simple loading message for metric creation Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), + id, thought_type: "thought".to_string(), thought_title: "Creating metrics...".to_string(), thought_secondary_title: "".to_string(), @@ -1068,17 +1116,9 @@ fn assistant_create_metrics( status: "loading".to_string(), })]) } - MessageProgress::Complete => { - println!("MESSAGE_STREAM: Processing complete create metrics message"); - // Check if there are any tool calls - if tool_calls.is_empty() { - return Err(anyhow::anyhow!("No tool call found")); - } - - // Access the first tool call safely - let tool_call = &tool_calls[0]; - return process_assistant_create_metrics(tool_call); - } + _ => Err(anyhow::anyhow!( + "Assistant create metrics only supports in progress." + )), } } else { Err(anyhow::anyhow!( @@ -1087,7 +1127,9 @@ fn assistant_create_metrics( } } -fn process_assistant_create_metrics(tool_call: &ToolCall) -> Result> { +fn process_assistant_create_metrics( + tool_call: &ToolCall, +) -> Result> { println!( "MESSAGE_STREAM: process_assistant_create_metrics called with arguments: {}", tool_call.function.arguments @@ -1096,7 +1138,7 @@ fn process_assistant_create_metrics(tool_call: &ToolCall) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced metric message: {:?}", @@ -1113,45 +1155,9 @@ fn process_assistant_create_metrics(tool_call: &ToolCall) -> Result, - tool_calls: Vec, - progress: Option, - initial: bool, +fn process_assistant_modify_metrics( + tool_call: &ToolCall, ) -> Result> { - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - // Simple loading message for metric modification - Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: "Updating metrics...".to_string(), - thought_secondary_title: "".to_string(), - thoughts: None, - status: "loading".to_string(), - })]) - } - MessageProgress::Complete => { - println!("MESSAGE_STREAM: Processing complete modify metrics message"); - // Check if there are any tool calls - if tool_calls.is_empty() { - return Err(anyhow::anyhow!("No tool call found")); - } - - // Access the first tool call safely - let tool_call = &tool_calls[0]; - return process_assistant_modify_metrics(tool_call); - } - } - } else { - Err(anyhow::anyhow!( - "Assistant modify metrics requires progress." - )) - } -} - -fn process_assistant_modify_metrics(tool_call: &ToolCall) -> Result> { println!( "MESSAGE_STREAM: process_assistant_modify_metrics called with arguments: {}", tool_call.function.arguments @@ -1160,7 +1166,7 @@ fn process_assistant_modify_metrics(tool_call: &ToolCall) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced modify metric message: {:?}", @@ -1169,24 +1175,24 @@ fn process_assistant_modify_metrics(tool_call: &ToolCall) -> Result { - println!("MESSAGE_STREAM: StreamingParser returned None for modify metrics, waiting for more data"); + println!( + "MESSAGE_STREAM: StreamingParser returned None for modify metrics, waiting for more data" + ); Ok(vec![]) // Return empty vec instead of error when waiting for file data } } } fn assistant_create_dashboards( - id: Option, - tool_calls: Vec, + id: String, progress: Option, initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { MessageProgress::InProgress => { - // Simple loading message for dashboard creation Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), + id, thought_type: "thought".to_string(), thought_title: "Creating dashboards...".to_string(), thought_secondary_title: "".to_string(), @@ -1194,17 +1200,9 @@ fn assistant_create_dashboards( status: "loading".to_string(), })]) } - MessageProgress::Complete => { - println!("MESSAGE_STREAM: Processing complete create dashboards message"); - // Check if there are any tool calls - if tool_calls.is_empty() { - return Err(anyhow::anyhow!("No tool call found")); - } - - // Access the first tool call safely - let tool_call = &tool_calls[0]; - return process_assistant_create_dashboards(tool_call); - } + _ => Err(anyhow::anyhow!( + "Assistant create dashboards only supports in progress." + )), } } else { Err(anyhow::anyhow!( @@ -1213,7 +1211,9 @@ fn assistant_create_dashboards( } } -fn process_assistant_create_dashboards(tool_call: &ToolCall) -> Result> { +fn process_assistant_create_dashboards( + tool_call: &ToolCall, +) -> Result> { println!( "MESSAGE_STREAM: process_assistant_create_dashboards called with arguments: {}", tool_call.function.arguments @@ -1222,7 +1222,7 @@ fn process_assistant_create_dashboards(tool_call: &ToolCall) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced dashboard message: {:?}", @@ -1231,24 +1231,24 @@ fn process_assistant_create_dashboards(tool_call: &ToolCall) -> Result { - println!("MESSAGE_STREAM: StreamingParser returned None for dashboards, waiting for more data"); + println!( + "MESSAGE_STREAM: StreamingParser returned None for dashboards, waiting for more data" + ); Ok(vec![]) // Return empty vec instead of error when waiting for file data } } } fn assistant_modify_dashboards( - id: Option, - tool_calls: Vec, + id: String, progress: Option, initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { MessageProgress::InProgress => { - // Simple loading message for dashboard modification Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), + id, thought_type: "thought".to_string(), thought_title: "Updating dashboards...".to_string(), thought_secondary_title: "".to_string(), @@ -1256,17 +1256,9 @@ fn assistant_modify_dashboards( status: "loading".to_string(), })]) } - MessageProgress::Complete => { - println!("MESSAGE_STREAM: Processing complete modify dashboards message"); - // Check if there are any tool calls - if tool_calls.is_empty() { - return Err(anyhow::anyhow!("No tool call found")); - } - - // Access the first tool call safely - let tool_call = &tool_calls[0]; - return process_assistant_modify_dashboards(tool_call); - } + _ => Err(anyhow::anyhow!( + "Assistant modify dashboards only supports in progress." + )), } } else { Err(anyhow::anyhow!( @@ -1275,7 +1267,9 @@ fn assistant_modify_dashboards( } } -fn process_assistant_modify_dashboards(tool_call: &ToolCall) -> Result> { +fn process_assistant_modify_dashboards( + tool_call: &ToolCall, +) -> Result> { println!( "MESSAGE_STREAM: process_assistant_modify_dashboards called with arguments: {}", tool_call.function.arguments @@ -1284,7 +1278,7 @@ fn process_assistant_modify_dashboards(tool_call: &ToolCall) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced modify dashboard message: {:?}", @@ -1293,46 +1287,94 @@ fn process_assistant_modify_dashboards(tool_call: &ToolCall) -> Result { - println!("MESSAGE_STREAM: StreamingParser returned None for modify dashboards, waiting for more data"); + println!( + "MESSAGE_STREAM: StreamingParser returned None for modify dashboards, waiting for more data" + ); Ok(vec![]) // Return empty vec instead of error when waiting for file data } } } fn assistant_create_plan( - id: Option, - tool_calls: Vec, + id: String, progress: Option, initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { MessageProgress::InProgress => { - // Simple loading message for metric creation Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), + id, thought_type: "thought".to_string(), - thought_title: "Creating metrics...".to_string(), + thought_title: "Creating plan...".to_string(), thought_secondary_title: "".to_string(), thoughts: None, status: "loading".to_string(), })]) } - MessageProgress::Complete => { - println!("MESSAGE_STREAM: Processing complete create metrics message"); - // Check if there are any tool calls - if tool_calls.is_empty() { - return Err(anyhow::anyhow!("No tool call found")); - } - - // Access the first tool call safely - let tool_call = &tool_calls[0]; - return process_assistant_create_metrics(tool_call); - } + _ => Err(anyhow::anyhow!( + "Assistant create plan only supports in progress." + )), } } else { Err(anyhow::anyhow!( - "Assistant create metrics requires progress." + "Assistant create plan requires progress." + )) + } +} + +fn process_assistant_create_plan( + tool_call: &ToolCall, +) -> Result> { + println!( + "MESSAGE_STREAM: process_assistant_create_plan called with arguments: {}", + tool_call.function.arguments + ); + + let mut parser = StreamingParser::new(); + + // Process the arguments from the tool call + match parser.process_chunk(tool_call.id.clone(), &tool_call.function.arguments)? { + Some(message) => { + println!( + "MESSAGE_STREAM: StreamingParser produced plan message: {:?}", + message + ); + Ok(vec![message]) + } + None => { + println!( + "MESSAGE_STREAM: StreamingParser returned None for plan, waiting for more data" + ); + Ok(vec![]) // Return empty vec instead of error when waiting for file data + } + } +} + +fn assistant_modify_metrics( + id: String, + progress: Option, + initial: bool, +) -> Result> { + if let Some(progress) = progress { + match progress { + MessageProgress::InProgress => { + Ok(vec![BusterChatContainer::Thought(BusterThought { + id, + thought_type: "thought".to_string(), + thought_title: "Updating metrics...".to_string(), + thought_secondary_title: "".to_string(), + thoughts: None, + status: "loading".to_string(), + })]) + } + _ => Err(anyhow::anyhow!( + "Assistant modify metrics only supports in progress." + )), + } + } else { + Err(anyhow::anyhow!( + "Assistant modify metrics requires progress." )) } }