diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 84b20afd1..ec27270fb 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -84,7 +84,7 @@ impl ChunkTracker { complete_text: String::new(), last_seen_content: String::new(), }); - + // Calculate the delta by finding what's new since last_seen_content let delta = if state.last_seen_content.is_empty() { // First chunk, use it as is @@ -102,13 +102,13 @@ impl ChunkTracker { } } }; - + // Update tracking state only if we found new content if !delta.is_empty() { state.complete_text.push_str(&delta); state.last_seen_content = new_chunk; } - + delta } else { new_chunk @@ -116,10 +116,11 @@ impl ChunkTracker { } pub fn get_complete_text(&self, chunk_id: String) -> Option { - self.chunks - .lock() - .ok() - .and_then(|chunks| chunks.get(&chunk_id).map(|state| state.complete_text.clone())) + self.chunks.lock().ok().and_then(|chunks| { + chunks + .get(&chunk_id) + .map(|state| state.complete_text.clone()) + }) } pub fn clear_chunk(&self, chunk_id: String) { @@ -248,16 +249,16 @@ pub async fn post_chat_handler( // Always transform the message match transform_message(&chat_id, &message_id, msg, tx.as_ref()).await { - Ok((containers, event)) => { + Ok(containers) => { // Store all transformed containers - for container in containers.clone() { + for (container, _) in containers.clone() { all_transformed_containers.push(container.clone()); } // If we have a tx channel, send the transformed messages if let Some(tx) = &tx { - for container in containers { - if tx.send(Ok((container, event.clone()))).await.is_err() { + for (container, thread_event) in containers { + if tx.send(Ok((container, thread_event))).await.is_err() { // Client disconnected, but continue processing messages tracing::warn!( "Client disconnected, but continuing to process messages" @@ -313,7 +314,7 @@ pub async fn post_chat_handler( reasoning_messages.clone(), Some(format!("Reasoned for {} seconds", reasoning_duration).to_string()), ); - + chat_with_messages.update_message(message); // Create and store message in the database with final state @@ -339,8 +340,14 @@ pub async fn post_chat_handler( .await?; // First process completed files (database updates only) - let _ = - process_completed_files(&mut conn, &db_message, &all_messages, &user_org_id, &user.id).await?; + let _ = process_completed_files( + &mut conn, + &db_message, + &all_messages, + &user_org_id, + &user.id, + ) + .await?; // Then send text response messages if let Some(tx) = &tx { @@ -454,7 +461,7 @@ async fn process_completed_files( ) -> Result<()> { let mut transformed_messages = Vec::new(); for msg in messages { - if let Ok((containers, _)) = + if let Ok(containers) = transform_message(&message.chat_id, &message.id, msg.clone(), None).await { transformed_messages.extend(containers); @@ -462,7 +469,7 @@ async fn process_completed_files( } // Process files for database updates only - for container in transformed_messages { + for (container, _) in transformed_messages { match container { BusterContainer::ReasoningMessage(msg) => match &msg.reasoning { BusterReasoningMessage::File(file) if file.message_type == "files" => { @@ -650,7 +657,7 @@ pub async fn transform_message( message_id: &Uuid, message: AgentMessage, tx: Option<&mpsc::Sender>>, -) -> Result<(Vec, ThreadEvent)> { +) -> Result> { println!("MESSAGE_STREAM: Transforming message: {:?}", message); match message { @@ -688,7 +695,7 @@ pub async fn transform_message( vec![] } }; - containers.extend(chat_messages); + containers.extend(chat_messages.into_iter().map(|container| (container, ThreadEvent::GeneratingResponseMessage))); // Add the "Finished reasoning" message if we're just starting if initial { @@ -709,24 +716,14 @@ pub async fn transform_message( message_id: *message_id, }); - // Send the finished reasoning message separately - if let Some(tx) = tx { - if let Err(e) = tx - .send(Ok(( - reasoning_container, - ThreadEvent::GeneratingReasoningMessage, - ))) - .await - { - tracing::warn!("Failed to send finished reasoning message: {:?}", e); - } - } + containers.push(( + reasoning_container, + ThreadEvent::GeneratingResponseMessage, + )); } - return Ok((containers, ThreadEvent::GeneratingResponseMessage)); - } - - if let Some(tool_calls) = tool_calls { + Ok(containers) + } else if let Some(tool_calls) = tool_calls { let mut containers = Vec::new(); // Transform tool messages @@ -740,50 +737,54 @@ pub async fn transform_message( Ok(messages) => { for reasoning_container in messages { // If this is a completed file reasoning message, send the file response separately - if let BusterReasoningMessage::File(ref file) = - reasoning_container.reasoning - { - if file.status == "completed" && file.message_type == "files" { - // For each completed file, create and send a file response message - for (file_id, file_content) in &file.files { - let response_message = BusterChatMessage::File { - id: file_content.id.clone(), - file_type: file_content.file_type.clone(), - file_name: file_content.file_name.clone(), - version_number: file_content.version_number, - version_id: file_content.version_id.clone(), - filter_version_id: None, - metadata: Some(vec![BusterChatResponseFileMetadata { - status: "completed".to_string(), - message: format!( - "File {} completed", - file_content.file_name - ), - timestamp: Some(Utc::now().timestamp()), - }]), - }; + match &reasoning_container { + BusterReasoningMessage::File(file) => { + if file.status == "completed" && file.message_type == "files" { + // For each completed file, create and send a file response message + for (file_id, file_content) in &file.files { + let response_message = BusterChatMessage::File { + id: file_content.id.clone(), + file_type: file_content.file_type.clone(), + file_name: file_content.file_name.clone(), + version_number: file_content.version_number, + version_id: file_content.version_id.clone(), + filter_version_id: None, + metadata: Some(vec![BusterChatResponseFileMetadata { + status: "completed".to_string(), + message: format!( + "File {} completed", + file_content.file_name + ), + timestamp: Some(Utc::now().timestamp()), + }]), + }; - let file_container = BusterContainer::ChatMessage( - BusterChatMessageContainer { - response_message, - chat_id: *chat_id, - message_id: *message_id, - }, - ); + let file_container = BusterContainer::ChatMessage( + BusterChatMessageContainer { + response_message, + chat_id: *chat_id, + message_id: *message_id, + }, + ); + + containers.push(( + file_container.clone(), + ThreadEvent::GeneratingResponseMessage, + )); - // Send file response message separately with GeneratingResponseMessage event - if let Some(tx) = tx { - let _ = tx - .send(Ok(( - file_container.clone(), - ThreadEvent::GeneratingResponseMessage, - ))) - .await; } } } + _ => {} } - containers.push(BusterContainer::ReasoningMessage(reasoning_container)); + containers.push(( + BusterContainer::ReasoningMessage(BusterReasoningMessageContainer { + reasoning: reasoning_container, + chat_id: *chat_id, + message_id: *message_id, + }), + ThreadEvent::GeneratingReasoningMessage, + )); } } Err(e) => { @@ -795,10 +796,10 @@ pub async fn transform_message( } }; - return Ok((containers, ThreadEvent::GeneratingReasoningMessage)); + Ok(containers) + } else { + Ok(vec![]) } - - Ok((vec![], ThreadEvent::GeneratingResponseMessage)) } AgentMessage::Tool { id, @@ -808,9 +809,8 @@ pub async fn transform_message( progress, } => { if let Some(name) = name { - let name_str = name.clone(); // Clone here to use in println later + let name_str = name.clone(); - // Use tool_call_id directly as it's already a String let messages = match transform_tool_message( tool_call_id, name, @@ -820,21 +820,28 @@ pub async fn transform_message( ) { Ok(messages) => messages .into_iter() - .map(BusterContainer::ReasoningMessage) + .map(|container| ( + BusterContainer::ReasoningMessage(BusterReasoningMessageContainer { + reasoning: container, + chat_id: *chat_id, + message_id: *message_id, + }), + ThreadEvent::GeneratingReasoningMessage, + )) .collect(), Err(e) => { tracing::warn!("Error transforming tool message '{}': {:?}", name_str, e); println!("MESSAGE_STREAM: Error transforming tool message: {:?}", e); - vec![] // Return empty vec but warn about the error + vec![] } }; - return Ok((messages, ThreadEvent::GeneratingReasoningMessage)); + Ok(messages) + } else { + Ok(vec![]) } - - Ok((vec![], ThreadEvent::GeneratingResponseMessage)) // Return empty vec instead of error } - _ => Ok((vec![], ThreadEvent::GeneratingResponseMessage)), // Return empty vec instead of error + _ => Ok(vec![]), } } @@ -858,7 +865,9 @@ fn transform_text_message( }]) } MessageProgress::Complete => { - let complete_text = tracker.get_complete_text(id.clone()).unwrap_or(content.clone()); + let complete_text = tracker + .get_complete_text(id.clone()) + .unwrap_or(content.clone()); tracker.clear_chunk(id.clone()); Ok(vec![BusterChatMessage::Text { id: id.clone(), @@ -877,7 +886,7 @@ fn transform_tool_message( content: String, chat_id: Uuid, message_id: Uuid, -) -> Result> { +) -> Result> { // Use required ID (tool call ID) for all function calls let messages = match name.as_str() { "search_data_catalog" => tool_data_catalog_search(id.clone(), content)?, @@ -889,34 +898,7 @@ fn transform_tool_message( _ => return Err(anyhow::anyhow!("Unknown tool name: {}", name)), }; - // Convert BusterReasoningMessage to BusterReasoningMessageContainer - let tracker = get_chunk_tracker(); - let reasoning_containers = messages - .into_iter() - .map(|reasoning| { - let updated_reasoning = if let BusterReasoningMessage::Text(mut text) = reasoning { - if let Some(chunk) = text.message_chunk.clone() { - let filtered_content = tracker.add_chunk(text.id.clone(), chunk.clone()); - println!("MESSAGE_STREAM: Filtered content: {:?}", filtered_content); - text.message_chunk = Some(filtered_content); - } - if text.status == Some("completed".to_string()) { - tracker.clear_chunk(text.id.clone()); - } - BusterReasoningMessage::Text(text) - } else { - reasoning - }; - - BusterReasoningMessageContainer { - reasoning: updated_reasoning, - chat_id, - message_id, - } - }) - .collect(); - - Ok(reasoning_containers) + Ok(messages) } // Update tool_create_metrics to require ID @@ -1095,7 +1077,7 @@ fn transform_assistant_tool_message( initial: bool, chat_id: Uuid, message_id: Uuid, -) -> Result> { +) -> Result> { let mut all_messages = Vec::new(); let tracker = get_chunk_tracker(); @@ -1142,7 +1124,7 @@ fn transform_assistant_tool_message( _ => vec![], }; - let containers: Vec = messages + let containers: Vec = messages .into_iter() .map(|reasoning| { let updated_reasoning = match reasoning { @@ -1151,13 +1133,13 @@ fn transform_assistant_tool_message( println!("CHUNK DEBUG [{}] Before filtering:", text.id); println!(" Incoming chunk length: {}", chunk.len()); println!(" Incoming chunk: {}", chunk); - + let delta = tracker.add_chunk(text.id.clone(), chunk); - + println!("CHUNK DEBUG [{}] After filtering:", text.id); println!(" Delta content length: {}", delta.len()); println!(" Delta content: {}", delta); - + if !delta.is_empty() { text.message_chunk = Some(delta); text.message = None; // Clear message field while streaming @@ -1166,52 +1148,54 @@ fn transform_assistant_tool_message( return None; } } - + if text.status == Some("completed".to_string()) { println!("CHUNK DEBUG [{}] Completing message", text.id); // For completed messages, either use accumulated text or the final message - text.message = tracker.get_complete_text(text.id.clone()) + text.message = tracker + .get_complete_text(text.id.clone()) .or(text.message) .or(text.message_chunk.clone()); text.message_chunk = None; tracker.clear_chunk(text.id.clone()); } - + Some(BusterReasoningMessage::Text(text)) } BusterReasoningMessage::File(mut file) => { let mut has_updates = false; let mut updated_files = std::collections::HashMap::new(); - + // Process each file's chunks for (file_id, file_content) in file.files.iter() { // Generate a consistent temporary ID for files during creation // This ensures the same file gets the same ID throughout the creation process - let temp_file_id = if file.message_type == "files" && file.status != "completed" { - // For files being created, use a hash of the file name as a temporary ID - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - let mut hasher = DefaultHasher::new(); - file_content.file_name.hash(&mut hasher); - format!("temp_{}", hasher.finish()) - } else { - file_id.clone() - }; - + let temp_file_id = + if file.message_type == "files" && file.status != "completed" { + // For files being created, use a hash of the file name as a temporary ID + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + file_content.file_name.hash(&mut hasher); + format!("temp_{}", hasher.finish()) + } else { + file_id.clone() + }; + // Use consistent ID for chunk tracking let chunk_id = format!("{}_{}", file.id, file_content.file_name); - + if let Some(chunk) = &file_content.file.text_chunk { println!("FILE CHUNK DEBUG [{}] Before filtering:", chunk_id); println!(" Incoming chunk length: {}", chunk.len()); println!(" Incoming chunk: {}", chunk); - + let delta = tracker.add_chunk(chunk_id.clone(), chunk.clone()); - + println!("FILE CHUNK DEBUG [{}] After filtering:", chunk_id); println!(" Delta content length: {}", delta.len()); println!(" Delta content: {}", delta); - + if !delta.is_empty() { // Only include files that have new content let mut updated_content = file_content.clone(); @@ -1223,24 +1207,26 @@ fn transform_assistant_tool_message( } } } - + if file.status == "completed" { // When completed, send all files with their complete text for (file_id, file_content) in file.files.iter() { let chunk_id = format!("{}_{}", file.id, file_content.file_name); - let complete_text = tracker.get_complete_text(chunk_id.clone()) - .unwrap_or_else(|| file_content.file.text_chunk.clone().unwrap_or_default()); - + let complete_text = + tracker.get_complete_text(chunk_id.clone()).unwrap_or_else( + || file_content.file.text_chunk.clone().unwrap_or_default(), + ); + let mut completed_content = file_content.clone(); completed_content.file.text = Some(complete_text); completed_content.file.text_chunk = None; updated_files.insert(file_id.clone(), completed_content); - + tracker.clear_chunk(chunk_id); } has_updates = true; } - + if has_updates { let mut updated_file = file.clone(); updated_file.files = updated_files; @@ -1252,11 +1238,7 @@ fn transform_assistant_tool_message( other => Some(other), }; - updated_reasoning.map(|reasoning| BusterReasoningMessageContainer { - reasoning, - chat_id, - message_id, - }) + updated_reasoning.map(|reasoning| reasoning) }) .filter_map(|container| container) .collect();