diff --git a/api/libs/agents/src/agent.rs b/api/libs/agents/src/agent.rs index d0ae2a703..609ffcfa4 100644 --- a/api/libs/agents/src/agent.rs +++ b/api/libs/agents/src/agent.rs @@ -353,6 +353,8 @@ impl Agent { let mut content_buffer = String::new(); let mut message_id: Option = None; let mut is_complete = false; + // Flag to track if we've sent the first message + let mut first_message_sent = false; while let Some(chunk_result) = stream_rx.recv().await { match chunk_result { @@ -367,17 +369,19 @@ impl Agent { if let Some(content) = &delta.content { content_buffer.push_str(content); - // Stream the content update + // Stream the content update with initial=true for the first message only let partial_message = AgentMessage::assistant( message_id.clone(), Some(content_buffer.clone()), None, Some(MessageProgress::InProgress), - Some(false), + Some(!first_message_sent), // Set initial=true only for the first message Some(self.name.clone()), ); self.get_stream_sender().await.send(Ok(partial_message))?; + // Mark that we've sent the first message + first_message_sent = true; } // Process tool calls if present @@ -423,11 +427,13 @@ impl Agent { }, Some(tool_calls_vec), Some(MessageProgress::InProgress), - Some(false), + Some(!first_message_sent), // Set initial=true only for the first message Some(self.name.clone()), ); self.get_stream_sender().await.send(Ok(partial_message))?; + // Mark that we've sent the first message + first_message_sent = true; } } diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 268b9b4d5..7cae568d4 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -27,6 +27,7 @@ use super::types::ChatWithMessages; use tokio::sync::mpsc; // Define ThreadEvent +#[derive(Clone, Copy, Debug)] pub enum ThreadEvent { GeneratingResponseMessage, GeneratingReasoningMessage, @@ -42,7 +43,7 @@ pub struct ChatCreateNewChat { pub async fn post_chat_handler( request: ChatCreateNewChat, user: User, - tx: Option>>, + tx: Option>>, ) -> Result { let chat_id = request.chat_id.unwrap_or_else(Uuid::new_v4); let message_id = request.message_id.unwrap_or_else(Uuid::new_v4); @@ -118,31 +119,17 @@ pub async fn post_chat_handler( // Collect all messages for final processing let mut all_messages: Vec = Vec::new(); let mut all_transformed_containers: Vec = Vec::new(); - let mut final_message: Option = None; // Process all messages from the agent while let Ok(message_result) = rx.recv().await { match message_result { Ok(msg) => { - // Check for final message from manager_agent - if let AgentMessage::Assistant { - name: Some(name), - content: Some(content), - tool_calls: None, - .. - } = &msg - { - if name == "manager_agent" { - final_message = Some(content.clone()); - } - } - // Store the original message all_messages.push(msg.clone()); // Always transform the message match transform_message(&chat_id, &message_id, msg) { - Ok((containers, _event)) => { + Ok((containers, event)) => { // Store all transformed containers for container in containers.clone() { all_transformed_containers.push(container.clone()); @@ -151,8 +138,8 @@ pub async fn post_chat_handler( // If we have a tx channel, send the transformed messages if let Some(tx) = &tx { for container in containers { - if tx.send(Ok(container)).await.is_err() { - // Client disconnected, but continue processing + if tx.send(Ok((container, event.clone()))).await.is_err() { + // Client disconnected, but continue processing messages tracing::warn!( "Client disconnected, but continuing to process messages" ); @@ -242,13 +229,6 @@ pub async fn post_chat_handler( let mut final_response_messages: Vec = response_messages.into_iter().flatten().collect(); - // Add the final message from manager_agent if available - if let Some(final_msg) = final_message { - if let Ok(value) = serde_json::to_value(final_msg) { - final_response_messages.push(value); - } - } - // Update the chat message with processed content chat_message.response_messages = final_response_messages; chat_message.reasoning = reasoning_messages.into_iter().flatten().collect(); @@ -526,8 +506,9 @@ pub fn transform_message( filtered_messages } Err(e) => { + tracing::warn!("Error transforming text message: {:?}", e); println!("MESSAGE_STREAM: Error transforming text message: {:?}", e); - vec![] // Silently ignore errors by returning empty vec + vec![] // Return empty vec but warn about the error } }; @@ -543,35 +524,17 @@ pub fn transform_message( chat_id.clone(), message_id.clone(), ) { - Ok(messages) => { - let filtered_messages: Vec = messages - .into_iter() - .filter(|msg| match &msg.reasoning { - ReasoningMessage::Thought(thought) => thought.status == "completed", - ReasoningMessage::File(file) => file.status == "completed", - }) - .map(BusterContainer::ReasoningMessage) - .collect(); - - println!( - "MESSAGE_STREAM: Transformed assistant tool message into {} containers", - filtered_messages.len() - ); - if !filtered_messages.is_empty() { - println!( - "MESSAGE_STREAM: First container: {:?}", - filtered_messages[0] - ); - } - - filtered_messages - } + Ok(messages) => messages + .into_iter() + .map(BusterContainer::ReasoningMessage) + .collect(), Err(e) => { + tracing::warn!("Error transforming assistant tool message: {:?}", e); println!( "MESSAGE_STREAM: Error transforming assistant tool message: {:?}", e ); - vec![] // Silently ignore errors by returning empty vec + vec![] // Return empty vec but warn about the error } }; @@ -589,48 +552,30 @@ pub fn transform_message( } => { if let Some(name) = name { let name_str = name.clone(); // Clone here to use in println later + + // Use tool_call_id directly as it's already a String let messages = match transform_tool_message( - id, + tool_call_id, name, content.clone(), - progress, chat_id.clone(), message_id.clone(), ) { - Ok(messages) => { - let filtered_messages: Vec = messages - .into_iter() - .filter(|msg| match &msg.reasoning { - ReasoningMessage::Thought(thought) => thought.status == "completed", - ReasoningMessage::File(file) => file.status == "completed", - }) - .map(BusterContainer::ReasoningMessage) - .collect(); - - println!( - "MESSAGE_STREAM: Transformed tool message '{}' into {} containers", - name_str, - filtered_messages.len() - ); - if !filtered_messages.is_empty() { - println!( - "MESSAGE_STREAM: First container: {:?}", - filtered_messages[0] - ); - } - - filtered_messages - } + Ok(messages) => messages + .into_iter() + .map(BusterContainer::ReasoningMessage) + .collect(), Err(e) => { + tracing::warn!("Error transforming tool message '{}': {:?}", name_str, e); println!("MESSAGE_STREAM: Error transforming tool message: {:?}", e); - vec![] // Silently ignore errors by returning empty vec + vec![] // Return empty vec but warn about the error } }; return Ok((messages, ThreadEvent::GeneratingReasoningMessage)); } - Ok((vec![], ThreadEvent::GeneratingReasoningMessage)) // Return empty vec instead of error + Ok((vec![], ThreadEvent::GeneratingResponseMessage)) // Return empty vec instead of error } _ => Ok((vec![], ThreadEvent::GeneratingResponseMessage)), // Return empty vec instead of error } @@ -708,205 +653,202 @@ fn transform_text_message( } } +// Update transform_tool_message to require ID and not include progress parameter fn transform_tool_message( - id: Option, + id: String, name: String, content: String, - progress: Option, chat_id: Uuid, message_id: Uuid, ) -> Result> { - let name_str = name.clone(); // Clone here to use in println later - println!( - "MESSAGE_STREAM: transform_tool_message called with name: {}", - name_str - ); + // Use required ID (tool call ID) for all function calls + let containers = match name.as_str() { + "data_catalog_search" => tool_data_catalog_search(id.clone(), content)?, + "create_file" => tool_create_file(id.clone(), content)?, + "modify_file" => tool_modify_file(id.clone(), content)?, + "create_metrics" => tool_create_metrics(id.clone(), content)?, + "modify_metrics" => tool_modify_metrics(id.clone(), content)?, + "create_dashboards" => tool_create_dashboards(id.clone(), content)?, + "modify_dashboards" => tool_modify_dashboards(id.clone(), content)?, + "create_plan" => tool_create_plan(id.clone(), content)?, + _ => return Err(anyhow::anyhow!("Unknown tool name: {}", name)), + }; - let messages = match name.as_str() { - "search_data_catalog" => tool_data_catalog_search(id, content.clone(), progress), - "create_files" => tool_create_file(id.clone(), content.clone(), progress), - "modify_files" => tool_modify_file(id.clone(), content.clone(), progress), - "create_metrics" => tool_create_metrics(id.clone(), content.clone(), progress), - "update_metrics" => tool_modify_metrics(id.clone(), content.clone(), progress), - "create_dashboards" => tool_create_dashboards(id.clone(), content.clone(), progress), - "update_dashboards" => tool_modify_dashboards(id.clone(), content.clone(), progress), - "create_plan" => tool_create_plan(id, content.clone(), progress), - _ => { - println!("MESSAGE_STREAM: Unsupported tool name: {}", name_str); - Err(anyhow::anyhow!("Unsupported tool name: {}", name)) - } - }?; - - println!( - "MESSAGE_STREAM: transform_tool_message for '{}' returning {} containers", - name_str, - messages.len() - ); - if !messages.is_empty() { - println!("MESSAGE_STREAM: First container: {:?}", messages[0]); - } - - Ok(messages - .into_iter() - .map(|message| BusterReasoningMessageContainer { - reasoning: match message { - BusterChatContainer::Thought(thought) => ReasoningMessage::Thought(thought), - BusterChatContainer::File(file) => ReasoningMessage::File(file), - _ => unreachable!("Tool messages should only return Thought or File"), - }, - chat_id, - message_id, - }) - .collect()) + // Transform to reasoning containers + let reasoning_containers = transform_to_reasoning_container(containers, chat_id, message_id); + Ok(reasoning_containers) } -fn transform_assistant_tool_message( - id: Option, - tool_calls: Vec, - progress: Option, - initial: bool, - chat_id: Uuid, - message_id: Uuid, -) -> Result> { - println!( - "MESSAGE_STREAM: transform_assistant_tool_message called with tool_calls: {:?}", - tool_calls - ); - - if tool_calls.is_empty() { - println!("MESSAGE_STREAM: No tool calls found"); - return Ok(vec![]); - } - - let tool_call = &tool_calls[0]; - let messages = match tool_call.function.name.as_str() { - "search_data_catalog" => assistant_data_catalog_search(id, progress, initial), - "create_metrics" => assistant_create_metrics(id, tool_calls.clone(), progress), - "update_metrics" => assistant_modify_metrics(id, tool_calls.clone(), progress), - "create_dashboards" => assistant_create_dashboards(id, tool_calls.clone(), progress), - "update_dashboards" => assistant_modify_dashboards(id, tool_calls.clone(), progress), - "create_plan" => assistant_create_plan(id, tool_calls.clone(), progress), - _ => Err(anyhow::anyhow!( - "Unsupported tool name: {}", - tool_call.function.name - )), - }?; - - println!( - "MESSAGE_STREAM: transform_assistant_tool_message returning {} containers", - messages.len() - ); - Ok(messages - .into_iter() - .map(|message| BusterReasoningMessageContainer { - reasoning: match message { - BusterChatContainer::Thought(thought) => ReasoningMessage::Thought(thought), - BusterChatContainer::File(file) => ReasoningMessage::File(file), - _ => unreachable!("Assistant tool messages should only return Thought or File"), - }, - chat_id, - message_id, - }) - .collect()) -} - -fn assistant_data_catalog_search( - id: Option, - progress: Option, - initial: bool, -) -> Result> { - if let Some(progress) = progress { - if initial { - match progress { - MessageProgress::InProgress => { - let id = id.unwrap_or_else(|| Uuid::new_v4().to_string()); - - Ok(vec![BusterChatContainer::Thought(BusterThought { - id, - thought_type: "thought".to_string(), - thought_title: "Searching your data catalog...".to_string(), - thought_secondary_title: "".to_string(), - thoughts: None, - status: "loading".to_string(), - })]) - } - _ => Err(anyhow::anyhow!( - "Assistant data catalog search only supports in progress." - )), - } - } else { - Err(anyhow::anyhow!( - "Assistant data catalog search only supports initial." - )) - } - } else { - Err(anyhow::anyhow!( - "Assistant data catalog search requires progress." - )) - } -} - -fn tool_data_catalog_search( - id: Option, +// Update tool_create_metrics to require ID +fn tool_create_metrics( + id: String, content: String, - progress: Option, ) -> Result> { - if let Some(progress) = progress { - let data_catalog_result = match serde_json::from_str::(&content) { - Ok(result) => result, - Err(_) => return Ok(vec![]), // Silently ignore parsing errors - }; - - let duration = (data_catalog_result.duration.clone() as f64 / 1000.0 * 10.0).round() / 10.0; - let result_count = data_catalog_result.results.len(); - let query_params = data_catalog_result.search_requirements.clone(); - - let thought_pill_containters = - match proccess_data_catalog_search_results(data_catalog_result) { - Ok(object) => object, - Err(_) => return Ok(vec![]), // Silently ignore processing errors - }; - - let buster_thought = if result_count > 0 { - BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: format!("Found {} results", result_count), - thought_secondary_title: format!("{} seconds", duration), - thoughts: Some(thought_pill_containters), - status: "completed".to_string(), - }) - } else { - BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: "No data catalog items found".to_string(), - thought_secondary_title: format!("{} seconds", duration), - thoughts: Some(vec![BusterThoughtPillContainer { - title: "No results found".to_string(), - thought_pills: vec![BusterThoughtPill { - id: "".to_string(), - text: query_params, - thought_file_type: "empty".to_string(), - }], - }]), - status: "completed".to_string(), - }) - }; - - match progress { - MessageProgress::Complete => Ok(vec![buster_thought]), - _ => Err(anyhow::anyhow!( - "Tool data catalog search only supports complete." - )), + println!("MESSAGE_STREAM: Processing tool create metrics message"); + + let mut parser = StreamingParser::new(); + + match parser.process_chunk(id, &content)? { + Some(message) => { + println!( + "MESSAGE_STREAM: StreamingParser produced create metrics message: {:?}", + message + ); + Ok(vec![message]) + } + None => { + println!("MESSAGE_STREAM: No valid metrics data found in content"); + Err(anyhow::anyhow!("Failed to parse metrics data from content")) } - } else { - Err(anyhow::anyhow!( - "Tool data catalog search requires progress." - )) } } +// Update tool_modify_metrics to require ID +fn tool_modify_metrics( + id: String, + content: String, +) -> Result> { + println!("MESSAGE_STREAM: Processing tool modify metrics message"); + + let mut parser = StreamingParser::new(); + + match parser.process_chunk(id, &content)? { + Some(message) => { + println!( + "MESSAGE_STREAM: StreamingParser produced modify metrics message: {:?}", + message + ); + Ok(vec![message]) + } + None => { + println!("MESSAGE_STREAM: No valid metrics data found in content"); + Err(anyhow::anyhow!("Failed to parse metrics data from content")) + } + } +} + +// Update tool_create_dashboards to require ID +fn tool_create_dashboards( + id: String, + content: String, +) -> Result> { + println!("MESSAGE_STREAM: Processing tool create dashboards message"); + + let mut parser = StreamingParser::new(); + + match parser.process_chunk(id, &content)? { + Some(message) => { + println!( + "MESSAGE_STREAM: StreamingParser produced create dashboards message: {:?}", + message + ); + Ok(vec![message]) + } + None => { + println!("MESSAGE_STREAM: No valid dashboard data found in content"); + Err(anyhow::anyhow!("Failed to parse dashboard data from content")) + } + } +} + +// Update tool_modify_dashboards to require ID +fn tool_modify_dashboards( + id: String, + content: String, +) -> Result> { + println!("MESSAGE_STREAM: Processing tool modify dashboards message"); + + let mut parser = StreamingParser::new(); + + match parser.process_chunk(id, &content)? { + Some(message) => { + println!( + "MESSAGE_STREAM: StreamingParser produced modify dashboard message: {:?}", + message + ); + Ok(vec![message]) + } + None => { + println!("MESSAGE_STREAM: No valid dashboard data found in content"); + Err(anyhow::anyhow!("Failed to parse dashboard data from content")) + } + } +} + +// Update tool_create_plan to require ID +fn tool_create_plan( + id: String, + content: String, +) -> Result> { + println!("MESSAGE_STREAM: Processing tool create plan message"); + + let mut parser = StreamingParser::new(); + + match parser.process_chunk(id, &content)? { + Some(message) => { + println!( + "MESSAGE_STREAM: StreamingParser produced create plan message: {:?}", + message + ); + Ok(vec![message]) + } + None => { + println!("MESSAGE_STREAM: No valid plan data found in content"); + Err(anyhow::anyhow!("Failed to parse plan data from content")) + } + } +} + +// Fix tool_data_catalog_search to use required ID directly +fn tool_data_catalog_search( + id: String, + content: String, +) -> Result> { + let data_catalog_result = match serde_json::from_str::(&content) { + Ok(result) => result, + Err(_) => return Ok(vec![]), // Silently ignore parsing errors + }; + + let duration = (data_catalog_result.duration.clone() as f64 / 1000.0 * 10.0).round() / 10.0; + let result_count = data_catalog_result.results.len(); + let query_params = data_catalog_result.search_requirements.clone(); + + let thought_pill_containters = + match proccess_data_catalog_search_results(data_catalog_result) { + Ok(object) => object, + Err(_) => return Ok(vec![]), // Silently ignore processing errors + }; + + let buster_thought = if result_count > 0 { + BusterChatContainer::Thought(BusterThought { + id: id, // Use required ID directly + thought_type: "thought".to_string(), + thought_title: format!("Found {} results", result_count), + thought_secondary_title: format!("{} seconds", duration), + thoughts: Some(thought_pill_containters), + status: "completed".to_string(), + }) + } else { + BusterChatContainer::Thought(BusterThought { + id: id, // Use required ID directly + thought_type: "thought".to_string(), + thought_title: "No data catalog items found".to_string(), + thought_secondary_title: format!("{} seconds", duration), + thoughts: Some(vec![BusterThoughtPillContainer { + title: "No results found".to_string(), + thought_pills: vec![BusterThoughtPill { + id: "".to_string(), + text: query_params, + thought_file_type: "empty".to_string(), + }], + }]), + status: "completed".to_string(), + }) + }; + + Ok(vec![buster_thought]) +} + fn proccess_data_catalog_search_results( results: SearchDataCatalogOutput, ) -> Result> { @@ -947,10 +889,171 @@ fn proccess_data_catalog_search_results( Ok(buster_thought_pill_containers) } +// Implement tool_create_file with required ID +fn tool_create_file( + id: String, + content: String, +) -> Result> { + println!("MESSAGE_STREAM: Processing tool create file message"); + + let mut parser = StreamingParser::new(); + + match parser.process_chunk(id, &content)? { + Some(message) => { + println!( + "MESSAGE_STREAM: StreamingParser produced create file message: {:?}", + message + ); + Ok(vec![message]) + } + None => { + println!("MESSAGE_STREAM: No valid file data found in content"); + Err(anyhow::anyhow!("Failed to parse file data from content")) + } + } +} + +// Implement tool_modify_file with required ID +fn tool_modify_file( + id: String, + content: String, +) -> Result> { + println!("MESSAGE_STREAM: Processing tool modify file message"); + + let mut parser = StreamingParser::new(); + + match parser.process_chunk(id, &content)? { + Some(message) => { + println!( + "MESSAGE_STREAM: StreamingParser produced modify file message: {:?}", + message + ); + Ok(vec![message]) + } + None => { + println!("MESSAGE_STREAM: No valid file data found in content"); + Err(anyhow::anyhow!("Failed to parse file data from content")) + } + } +} + +// Helper function to transform BusterChatContainer to BusterReasoningMessageContainer +fn transform_to_reasoning_container( + containers: Vec, + chat_id: Uuid, + message_id: Uuid, +) -> Vec { + containers + .into_iter() + .map(|container| match container { + BusterChatContainer::Thought(thought) => BusterReasoningMessageContainer { + reasoning: ReasoningMessage::Thought(thought), + chat_id, + message_id, + }, + BusterChatContainer::File(file) => BusterReasoningMessageContainer { + reasoning: ReasoningMessage::File(file), + chat_id, + message_id, + }, + _ => unreachable!("Tool messages should only return Thought or File"), + }) + .collect() +} + +fn transform_assistant_tool_message( + id: Option, + tool_calls: Vec, + progress: Option, + initial: bool, + chat_id: Uuid, + message_id: Uuid, +) -> Result> { + println!( + "MESSAGE_STREAM: transform_assistant_tool_message called with tool_calls: {:?}", + tool_calls + ); + + if tool_calls.is_empty() { + println!("MESSAGE_STREAM: No tool calls found"); + return Ok(vec![]); + } + + let tool_call = &tool_calls[0]; + let messages = match tool_call.function.name.as_str() { + "search_data_catalog" => assistant_data_catalog_search(id, progress, initial), + "create_metrics" => assistant_create_metrics(id, tool_calls.clone(), progress, initial), + "update_metrics" => assistant_modify_metrics(id, tool_calls.clone(), progress, initial), + "create_dashboards" => { + assistant_create_dashboards(id, tool_calls.clone(), progress, initial) + } + "update_dashboards" => { + assistant_modify_dashboards(id, tool_calls.clone(), progress, initial) + } + "create_plan" => assistant_create_plan(id, tool_calls.clone(), progress, initial), + _ => Err(anyhow::anyhow!( + "Unsupported tool name: {}", + tool_call.function.name + )), + }?; + + println!( + "MESSAGE_STREAM: transform_assistant_tool_message returning {} containers", + messages.len() + ); + Ok(messages + .into_iter() + .map(|message| BusterReasoningMessageContainer { + reasoning: match message { + BusterChatContainer::Thought(thought) => ReasoningMessage::Thought(thought), + BusterChatContainer::File(file) => ReasoningMessage::File(file), + _ => unreachable!("Assistant tool messages should only return Thought or File"), + }, + chat_id, + message_id, + }) + .collect()) +} + +fn assistant_data_catalog_search( + id: Option, + progress: Option, + initial: bool, +) -> Result> { + if let Some(progress) = progress { + match progress { + MessageProgress::InProgress => { + if initial { + let id = id.unwrap_or_else(|| Uuid::new_v4().to_string()); + + Ok(vec![BusterChatContainer::Thought(BusterThought { + id, + thought_type: "thought".to_string(), + thought_title: "Searching your data catalog...".to_string(), + thought_secondary_title: "".to_string(), + thoughts: None, + status: "loading".to_string(), + })]) + } else { + Ok(vec![]) + } + } + _ => Err(anyhow::anyhow!( + "Assistant data catalog search only supports in progress." + )), + } + } else { + Err(anyhow::anyhow!( + "Assistant data catalog search requires progress." + )) + } +} + fn assistant_create_metrics( id: Option, tool_calls: Vec, progress: Option, + initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { @@ -993,7 +1096,7 @@ fn process_assistant_create_metrics(tool_call: &ToolCall) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced metric message: {:?}", @@ -1014,6 +1117,7 @@ fn assistant_modify_metrics( id: Option, tool_calls: Vec, progress: Option, + initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { @@ -1056,7 +1160,7 @@ fn process_assistant_modify_metrics(tool_call: &ToolCall) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced modify metric message: {:?}", @@ -1075,6 +1179,7 @@ fn assistant_create_dashboards( id: Option, tool_calls: Vec, progress: Option, + initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { @@ -1117,7 +1222,7 @@ fn process_assistant_create_dashboards(tool_call: &ToolCall) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced dashboard message: {:?}", @@ -1136,6 +1241,7 @@ fn assistant_modify_dashboards( id: Option, tool_calls: Vec, progress: Option, + initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { @@ -1178,7 +1284,7 @@ fn process_assistant_modify_dashboards(tool_call: &ToolCall) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced modify dashboard message: {:?}", @@ -1193,279 +1299,11 @@ fn process_assistant_modify_dashboards(tool_call: &ToolCall) -> Result, - content: String, - progress: Option, -) -> Result> { - println!( - "MESSAGE_STREAM: tool_create_file called with content: {}", - content - ); - - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - // Return a loading thought for in-progress file creation - Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: "Creating file...".to_string(), - thought_secondary_title: "".to_string(), - thoughts: None, - status: "loading".to_string(), - })]) - } - MessageProgress::Complete => { - // Process the completed file creation - let mut parser = StreamingParser::new(); - match parser.process_chunk(&content)? { - Some(message) => { - println!( - "MESSAGE_STREAM: StreamingParser produced file message from tool: {:?}", - message - ); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for create file tool" - ); - Ok(vec![]) // Return empty vec instead of error - } - } - } - } - } else { - Err(anyhow::anyhow!("Tool create file requires progress.")) - } -} - -fn tool_modify_file( - id: Option, - content: String, - progress: Option, -) -> Result> { - println!( - "MESSAGE_STREAM: tool_modify_file called with content: {}", - content - ); - - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - // Return a loading thought for in-progress file modification - Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: "Modifying file...".to_string(), - thought_secondary_title: "".to_string(), - thoughts: None, - status: "loading".to_string(), - })]) - } - MessageProgress::Complete => { - // Process the completed file modification - let mut parser = StreamingParser::new(); - match parser.process_chunk(&content)? { - Some(message) => { - println!("MESSAGE_STREAM: StreamingParser produced modify file message from tool: {:?}", message); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for modify file tool" - ); - Ok(vec![]) // Return empty vec instead of error - } - } - } - } - } else { - Err(anyhow::anyhow!("Tool modify file requires progress.")) - } -} - -fn tool_create_metrics( - id: Option, - content: String, - progress: Option, -) -> Result> { - println!( - "MESSAGE_STREAM: tool_create_metrics called with content: {}", - content - ); - - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - // Return a loading thought for in-progress metrics - Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: "Creating metrics...".to_string(), - thought_secondary_title: "".to_string(), - thoughts: None, - status: "loading".to_string(), - })]) - } - MessageProgress::Complete => { - // Process the completed metrics - let mut parser = StreamingParser::new(); - match parser.process_chunk(&content)? { - Some(message) => { - println!("MESSAGE_STREAM: StreamingParser produced metric message from tool: {:?}", message); - Ok(vec![message]) - } - None => { - println!("MESSAGE_STREAM: StreamingParser returned None for metrics tool"); - Ok(vec![]) // Return empty vec instead of error - } - } - } - } - } else { - Err(anyhow::anyhow!("Tool create metrics requires progress.")) - } -} - -fn tool_modify_metrics( - id: Option, - content: String, - progress: Option, -) -> Result> { - println!( - "MESSAGE_STREAM: tool_modify_metrics called with content: {}", - content - ); - - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - // Return a loading thought for in-progress metric modifications - Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: "Updating metrics...".to_string(), - thought_secondary_title: "".to_string(), - thoughts: None, - status: "loading".to_string(), - })]) - } - MessageProgress::Complete => { - // Process the completed metric modifications - let mut parser = StreamingParser::new(); - match parser.process_chunk(&content)? { - Some(message) => { - println!("MESSAGE_STREAM: StreamingParser produced modify metric message from tool: {:?}", message); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for modify metrics tool" - ); - Ok(vec![]) // Return empty vec instead of error - } - } - } - } - } else { - Err(anyhow::anyhow!("Tool modify metrics requires progress.")) - } -} - -fn tool_create_dashboards( - id: Option, - content: String, - progress: Option, -) -> Result> { - println!( - "MESSAGE_STREAM: tool_create_dashboards called with content: {}", - content - ); - - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - // Return a loading thought for in-progress dashboards - Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: "Creating dashboards...".to_string(), - thought_secondary_title: "".to_string(), - thoughts: None, - status: "loading".to_string(), - })]) - } - MessageProgress::Complete => { - // Process the completed dashboards - let mut parser = StreamingParser::new(); - match parser.process_chunk(&content)? { - Some(message) => { - println!("MESSAGE_STREAM: StreamingParser produced dashboard message from tool: {:?}", message); - Ok(vec![message]) - } - None => { - println!( - "MESSAGE_STREAM: StreamingParser returned None for dashboards tool" - ); - Ok(vec![]) // Return empty vec instead of error - } - } - } - } - } else { - Err(anyhow::anyhow!("Tool create dashboards requires progress.")) - } -} - -fn tool_modify_dashboards( - id: Option, - content: String, - progress: Option, -) -> Result> { - println!( - "MESSAGE_STREAM: tool_modify_dashboards called with content: {}", - content - ); - - if let Some(progress) = progress { - match progress { - MessageProgress::InProgress => { - // Return a loading thought for in-progress dashboard modifications - Ok(vec![BusterChatContainer::Thought(BusterThought { - id: id.unwrap_or_else(|| Uuid::new_v4().to_string()), - thought_type: "thought".to_string(), - thought_title: "Updating dashboards...".to_string(), - thought_secondary_title: "".to_string(), - thoughts: None, - status: "loading".to_string(), - })]) - } - MessageProgress::Complete => { - // Process the completed dashboard modifications - let mut parser = StreamingParser::new(); - match parser.process_chunk(&content)? { - Some(message) => { - println!("MESSAGE_STREAM: StreamingParser produced modify dashboard message from tool: {:?}", message); - Ok(vec![message]) - } - None => { - println!("MESSAGE_STREAM: StreamingParser returned None for modify dashboards tool"); - Ok(vec![]) // Return empty vec instead of error - } - } - } - } - } else { - Err(anyhow::anyhow!("Tool modify dashboards requires progress.")) - } -} - fn assistant_create_plan( id: Option, tool_calls: Vec, progress: Option, + initial: bool, ) -> Result> { if let Some(progress) = progress { match progress { @@ -1498,16 +1336,3 @@ fn assistant_create_plan( )) } } - -fn tool_create_plan( - id: Option, - content: String, - progress: Option, -) -> Result> { - println!( - "MESSAGE_STREAM: tool_create_plan called with content: {}", - content - ); - - Ok(vec![]) -} diff --git a/api/libs/handlers/src/chats/streaming_parser.rs b/api/libs/handlers/src/chats/streaming_parser.rs index b2b6767f2..6f27ddbfc 100644 --- a/api/libs/handlers/src/chats/streaming_parser.rs +++ b/api/libs/handlers/src/chats/streaming_parser.rs @@ -20,7 +20,7 @@ impl StreamingParser { } } - pub fn process_chunk(&mut self, chunk: &str) -> Result> { + pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result> { // Add new chunk to buffer self.buffer.push_str(chunk); @@ -87,7 +87,7 @@ impl StreamingParser { let has_yml_content = last_file.get("yml_content").is_some(); if has_name && has_file_type && has_yml_content { - return self.convert_to_message(value); + return self.convert_to_message(id, value); } } } @@ -148,7 +148,7 @@ impl StreamingParser { processed } - fn convert_to_message(&self, value: Value) -> Result> { + fn convert_to_message(&self, id: String, value: Value) -> Result> { if let Some(files) = value.get("files").and_then(Value::as_array) { if let Some(last_file) = files.last().and_then(Value::as_object) { let name = last_file.get("name").and_then(Value::as_str).unwrap_or(""); @@ -171,7 +171,7 @@ impl StreamingParser { } return Ok(Some(BusterChatContainer::File(BusterFileMessage { - id: name.to_string(), + id, message_type: "file".to_string(), file_type: file_type.to_string(), file_name: name.to_string(), diff --git a/api/src/routes/ws/threads_and_messages/post_thread.rs b/api/src/routes/ws/threads_and_messages/post_thread.rs index d2d0e7b55..8646ef7a3 100644 --- a/api/src/routes/ws/threads_and_messages/post_thread.rs +++ b/api/src/routes/ws/threads_and_messages/post_thread.rs @@ -1,15 +1,15 @@ use std::sync::Arc; use anyhow::Result; -use handlers::chats::post_chat_handler; use handlers::chats::post_chat_handler::ChatCreateNewChat; +use handlers::chats::post_chat_handler::{self, ThreadEvent}; use handlers::chats::types::ChatWithMessages; use tokio::sync::mpsc; use crate::{ database::models::User, routes::ws::{ - threads_and_messages::threads_router::{ThreadEvent, ThreadRoute}, + threads_and_messages::threads_router::{ThreadEvent as WSThreadEvent, ThreadRoute}, ws::{SubscriptionRwLock, WsEvent, WsResponseMessage, WsSendMethod}, ws_router::WsRoutes, ws_utils::send_ws_message, @@ -25,32 +25,47 @@ pub async fn post_thread( ) -> Result<()> { let (tx, mut rx) = mpsc::channel(1000); - // Call the shared handler - post_chat_handler::post_chat_handler(request, user.clone(), Some(tx)).await?; + let user_id = user.id.to_string(); - while let Some(result) = rx.recv().await { - match result { - Ok(chat_with_messages) => { - println!("MESSAGE SHOULD BE SENT: {:?}", chat_with_messages); + tokio::spawn(async move { + while let Some(result) = rx.recv().await { + match result { + Ok((message, event)) => { + println!("MESSAGE SHOULD BE SENT: {:?}", message); - let response = WsResponseMessage::new_no_user( - WsRoutes::Threads(ThreadRoute::Post), - WsEvent::Threads(ThreadEvent::InitializeChat), - &chat_with_messages, - None, - WsSendMethod::All, - ); + let event = match event { + ThreadEvent::GeneratingResponseMessage => { + WsEvent::Threads(WSThreadEvent::GeneratingResponseMessage) + } + ThreadEvent::GeneratingReasoningMessage => { + WsEvent::Threads(WSThreadEvent::GeneratingReasoningMessage) + } + }; - if let Err(e) = send_ws_message(&user.id.to_string(), &response).await { - tracing::error!("Failed to send websocket message: {}", e); + let response = WsResponseMessage::new_no_user( + WsRoutes::Threads(ThreadRoute::Post), + event, + &message, + None, + WsSendMethod::All, + ); + + if let Err(e) = send_ws_message(&user_id, &response).await { + tracing::error!("Failed to send websocket message: {}", e); + } + } + Err(err) => { + tracing::error!("Error in message stream: {:?}", err); + return Err(err); } } - Err(err) => { - tracing::error!("Error in message stream: {:?}", err); - return Err(err); - } } - } + + Ok(()) + }); + + // Call the shared handler + post_chat_handler::post_chat_handler(request, user.clone(), Some(tx)).await?; Ok(()) } @@ -59,7 +74,7 @@ pub async fn post_thread( async fn send_ws_response(subscription: &str, chat_with_messages: &ChatWithMessages) -> Result<()> { let response = WsResponseMessage::new_no_user( WsRoutes::Threads(ThreadRoute::Post), - WsEvent::Threads(ThreadEvent::InitializeChat), + WsEvent::Threads(WSThreadEvent::InitializeChat), chat_with_messages, None, WsSendMethod::All,