From e7d90bae034dfb8bd0d099d731cf86337452a939 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 12 Mar 2025 12:36:10 -0600 Subject: [PATCH] ok hierarchy on files being sent back --- .../handlers/src/chats/post_chat_handler.rs | 397 +++++++++++------- .../rest/routes/assets/get_asset_access.rs | 15 +- 2 files changed, 255 insertions(+), 157 deletions(-) diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 70dec9b1f..d6d1e4e19 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -21,6 +21,7 @@ use database::{ models::{Chat, DashboardFile, Message, MessageToFile, MetricFile, User}, pool::get_pg_pool, schema::{chats, dashboard_files, messages, messages_to_files, metric_files}, + types::{DashboardYml, MetricYml}, }; use diesel::{insert_into, ExpressionMethods}; use diesel_async::RunQueryDsl; @@ -138,6 +139,167 @@ impl ChunkTracker { } } +// Add this new struct before post_chat_handler +struct FileMessageTracker { + metrics: Vec, + dashboards: Vec, + reasoning_messages: Vec, +} + +impl FileMessageTracker { + fn new() -> Self { + Self { + metrics: Vec::new(), + dashboards: Vec::new(), + reasoning_messages: Vec::new(), + } + } + + fn add_message(&mut self, container: BusterChatMessageContainer) { + // We no longer need this since we'll create messages from reasoning messages + } + + fn add_reasoning_message(&mut self, container: BusterReasoningMessageContainer) { + self.reasoning_messages.push(container); + } + + fn get_filtered_messages(&self) -> Vec { + // If no files, return empty vec + if self.metrics.is_empty() && self.dashboards.is_empty() { + return Vec::new(); + } + + // Apply the filtering rules + if self.dashboards.is_empty() { + // No dashboards - return all metrics + if self.metrics.len() == 1 { + // Single metric case + return vec![self.metrics[0].clone()]; + } else { + // Multiple metrics case + return self.metrics.clone(); + } + } else if self.dashboards.len() == 1 && self.metrics.is_empty() { + // Single dashboard, no metrics + return vec![self.dashboards[0].clone()]; + } + + // Complex case: We have both dashboards and metrics or multiple dashboards + let mut filtered_messages = Vec::new(); + let mut metrics_in_dashboards: std::collections::HashSet = std::collections::HashSet::new(); + + // First add all dashboards + filtered_messages.extend(self.dashboards.clone()); + + // Collect metrics that are in dashboards + for dashboard in &self.dashboards { + if let BusterChatMessage::File { file_name, .. } = &dashboard.response_message { + // Find the corresponding reasoning message that contains the dashboard content + for reasoning in &self.reasoning_messages { + if let BusterReasoningMessage::File(file) = &reasoning.reasoning { + if file.message_type == "files" && file.status == "completed" { + for (_, file_content) in &file.files { + if file_content.file_type == "dashboard" && file_content.file_name == *file_name { + // Found the dashboard content, parse it to get metric IDs + if let Some(text) = &file_content.file.text { + if let Ok(dashboard) = serde_yaml::from_str::(text) { + // Collect all metric IDs from the dashboard + for row in dashboard.rows { + for item in row.items { + metrics_in_dashboards.insert(item.id.to_string()); + } + } + } + } + } + } + } + } + } + } + } + + // Add metrics that aren't in any dashboard + for metric in &self.metrics { + if let BusterChatMessage::File { id, .. } = &metric.response_message { + if !metrics_in_dashboards.contains(id) { + filtered_messages.push(metric.clone()); + } + } + } + + filtered_messages + } + + fn analyze_dashboard_contents(&mut self, containers: &[BusterContainer]) { + // Clear existing collections since we'll rebuild them from reasoning messages + self.metrics.clear(); + self.dashboards.clear(); + + let mut metrics_in_dashboards: std::collections::HashSet = std::collections::HashSet::new(); + + // First process all reasoning messages to create file messages and collect dashboard metric IDs + for container in containers { + if let BusterContainer::ReasoningMessage(reasoning) = container { + if let BusterReasoningMessage::File(file) = &reasoning.reasoning { + if file.message_type == "files" && file.status == "completed" { + // Create file messages for each file + for (file_id, file_content) in &file.files { + let response_message = BusterChatMessage::File { + id: file_content.id.clone(), + file_type: file_content.file_type.clone(), + file_name: file_content.file_name.clone(), + version_number: file_content.version_number, + version_id: file_content.version_id.clone(), + filter_version_id: None, + metadata: Some(vec![BusterChatResponseFileMetadata { + status: "completed".to_string(), + message: format!("File {} completed", file_content.file_name), + timestamp: Some(Utc::now().timestamp()), + }]), + }; + + let chat_message = BusterChatMessageContainer { + response_message, + chat_id: reasoning.chat_id, + message_id: reasoning.message_id, + }; + + // Add to appropriate collection based on file type + match file_content.file_type.as_str() { + "metric" => self.metrics.push(chat_message), + "dashboard" => { + self.dashboards.push(chat_message.clone()); + // If this is a dashboard, parse its content for metric IDs + if let Some(text) = &file_content.file.text { + if let Ok(dashboard) = serde_yaml::from_str::(text) { + for row in dashboard.rows { + for item in row.items { + metrics_in_dashboards.insert(item.id.to_string()); + } + } + } + } + } + _ => {} + } + } + } + } + } + } + + // Filter out metrics that are in dashboards by comparing their id + self.metrics.retain(|metric| { + if let BusterChatMessage::File { id, .. } = &metric.response_message { + !metrics_in_dashboards.contains(id) + } else { + false + } + }); + } +} + pub async fn post_chat_handler( request: ChatCreateNewChat, user: AuthenticatedUser, @@ -230,6 +392,9 @@ pub async fn post_chat_handler( let mut all_messages: Vec = Vec::new(); let mut all_transformed_containers: Vec = Vec::new(); + // Modify the message processing section: + let mut file_tracker = FileMessageTracker::new(); + // Process all messages from the agent while let Ok(message_result) = rx.recv().await { match message_result { @@ -270,39 +435,48 @@ pub async fn post_chat_handler( raw_llm_messages.push(msg.clone()); } } - // User messages and other types don't have progress, so we store them all AgentMessage::User { .. } => { raw_llm_messages.push(msg.clone()); } - _ => {} // Ignore other message types + _ => {} } - // Always transform the message + // Transform and handle messages match transform_message(&chat_id, &message_id, msg, tx.as_ref()).await { Ok(containers) => { - // Store all transformed containers - for (container, _) in containers.clone() { + for (container, thread_event) in containers { all_transformed_containers.push(container.clone()); - } - // If we have a tx channel, send the transformed messages - if let Some(tx) = &tx { - for (container, thread_event) in containers { - if tx.send(Ok((container, thread_event))).await.is_err() { - // Client disconnected, but continue processing messages - tracing::warn!( - "Client disconnected, but continuing to process messages" - ); - break; + // If we have a tx channel, handle message sending + if let Some(tx) = &tx { + match &container { + BusterContainer::ChatMessage(chat) => { + match &chat.response_message { + BusterChatMessage::File { .. } => { + // Collect file messages instead of sending immediately + file_tracker.add_message(chat.clone()); + } + BusterChatMessage::Text { message: Some(_), message_chunk: None, .. } => { + // Send text messages immediately + tx.send(Ok((container, thread_event))).await?; + } + _ => {} + } + } + BusterContainer::ReasoningMessage(reasoning) => { + // Store reasoning messages that contain file information + if let BusterReasoningMessage::File(_) = &reasoning.reasoning { + file_tracker.add_reasoning_message(reasoning.clone()); + } + tx.send(Ok((container, thread_event))).await? + } + _ => tx.send(Ok((container, thread_event))).await?, } } } } Err(e) => { - // Log the error but continue processing tracing::error!("Error transforming message: {}", e); - - // If we have a tx channel, send the error if let Some(tx) = &tx { let _ = tx.send(Err(e)).await; } @@ -310,53 +484,74 @@ pub async fn post_chat_handler( } } Err(e) => { - // If we have a tx channel, send the error if let Some(tx) = &tx { let _ = tx .send(Err(anyhow!("Error receiving message from agent: {}", e))) .await; } - tracing::error!("Error receiving message from agent: {}", e); - // Don't return early, continue processing remaining messages break; } } } + // After processing all messages, analyze dashboard contents and send filtered file messages + let mut final_response_messages = Vec::new(); + if let Some(tx) = &tx { + file_tracker.analyze_dashboard_contents(&all_transformed_containers); + let filtered_messages = file_tracker.get_filtered_messages(); + for file_message in &filtered_messages { + tx.send(Ok(( + BusterContainer::ChatMessage(file_message.clone()), + ThreadEvent::GeneratingResponseMessage, + ))) + .await?; + + // Store the filtered file messages + if let Ok(value) = serde_json::to_value(&file_message.response_message) { + final_response_messages.push(value); + } + } + } + + // Add the final text message if it exists + if let Some(final_text_message) = all_transformed_containers.iter().rev().find(|container| { + if let BusterContainer::ChatMessage(chat) = container { + matches!( + chat.response_message, + BusterChatMessage::Text { + message: Some(_), + message_chunk: None, + .. + } + ) + } else { + false + } + }) { + if let BusterContainer::ChatMessage(chat) = final_text_message { + if let Ok(value) = serde_json::to_value(&chat.response_message) { + final_response_messages.push(value); + } + } + } + let title = title_handle.await??; let reasoning_duration = reasoning_duration.elapsed().as_secs(); // Transform all messages for final storage - let (response_messages, reasoning_messages) = - prepare_final_message_state(&all_transformed_containers)?; - - // Update chat_with_messages with final state - let message = ChatMessage::new_with_messages( - message_id, - ChatUserMessage { - request: request.prompt.clone(), - sender_id: user.id.clone(), - sender_name: user.name.clone().unwrap_or_default(), - sender_avatar: None, - }, - response_messages.clone(), - reasoning_messages.clone(), - Some(format!("Reasoned for {} seconds", reasoning_duration).to_string()), - ); - - chat_with_messages.update_message(message); + let (response_messages, reasoning_messages) = prepare_final_message_state(&all_transformed_containers)?; // Create and store message in the database with final state let db_message = Message { id: message_id, - request_message: request.prompt, + request_message: request.prompt.clone(), chat_id, created_by: user.id.clone(), created_at: Utc::now(), updated_at: Utc::now(), deleted_at: None, - response_messages: serde_json::to_value(&response_messages)?, + response_messages: serde_json::to_value(&final_response_messages)?, reasoning: serde_json::to_value(&reasoning_messages)?, final_reasoning_message: format!("Reasoned for {} seconds", reasoning_duration), title: title.title.clone().unwrap_or_default(), @@ -369,6 +564,22 @@ pub async fn post_chat_handler( .execute(&mut conn) .await?; + // Update chat_with_messages with final state + let message = ChatMessage::new_with_messages( + message_id, + ChatUserMessage { + request: request.prompt.clone(), + sender_id: user.id.clone(), + sender_name: user.name.clone().unwrap_or_default(), + sender_avatar: None, + }, + final_response_messages.clone(), + reasoning_messages.clone(), + Some(format!("Reasoned for {} seconds", reasoning_duration).to_string()), + ); + + chat_with_messages.update_message(message); + // First process completed files (database updates only) let _ = process_completed_files( &mut conn, @@ -379,26 +590,6 @@ pub async fn post_chat_handler( ) .await?; - // Then send text response messages - if let Some(tx) = &tx { - for container in &all_transformed_containers { - if let BusterContainer::ChatMessage(chat) = container { - if let BusterChatMessage::Text { - message: Some(_), - message_chunk: None, - .. - } = &chat.response_message - { - tx.send(Ok(( - BusterContainer::ChatMessage(chat.clone()), - ThreadEvent::GeneratingResponseMessage, - ))) - .await?; - } - } - } - } - if let Some(title) = title.title { chat_with_messages.title = title; } @@ -788,47 +979,7 @@ pub async fn transform_message( ) { Ok(messages) => { for reasoning_container in messages { - // Only process file response messages when they're completed - match &reasoning_container { - BusterReasoningMessage::File(file) - if matches!(progress, MessageProgress::Complete) - && file.status == "completed" - && file.message_type == "files" => - { - // For each completed file, create and send a file response message - for (file_id, file_content) in &file.files { - let response_message = BusterChatMessage::File { - id: file_content.id.clone(), - file_type: file_content.file_type.clone(), - file_name: file_content.file_name.clone(), - version_number: file_content.version_number, - version_id: file_content.version_id.clone(), - filter_version_id: None, - metadata: Some(vec![BusterChatResponseFileMetadata { - status: "completed".to_string(), - message: format!( - "File {} completed", - file_content.file_name - ), - timestamp: Some(Utc::now().timestamp()), - }]), - }; - - containers.push(( - BusterContainer::ChatMessage( - BusterChatMessageContainer { - response_message, - chat_id: *chat_id, - message_id: *message_id, - }, - ), - ThreadEvent::GeneratingResponseMessage, - )); - } - } - _ => {} - } - + // Create reasoning message container containers.push(( BusterContainer::ReasoningMessage( BusterReasoningMessageContainer { @@ -875,47 +1026,7 @@ pub async fn transform_message( ) { Ok(messages) => { for reasoning_container in messages { - // Only process file response messages when they're completed - match &reasoning_container { - BusterReasoningMessage::File(file) - if matches!(progress, MessageProgress::Complete) - && file.status == "completed" - && file.message_type == "files" => - { - // For each completed file, create and send a file response message - for (file_id, file_content) in &file.files { - let response_message = BusterChatMessage::File { - id: file_content.id.clone(), - file_type: file_content.file_type.clone(), - file_name: file_content.file_name.clone(), - version_number: file_content.version_number, - version_id: file_content.version_id.clone(), - filter_version_id: None, - metadata: Some(vec![BusterChatResponseFileMetadata { - status: "completed".to_string(), - message: format!( - "File {} completed", - file_content.file_name - ), - timestamp: Some(Utc::now().timestamp()), - }]), - }; - - containers.push(( - BusterContainer::ChatMessage( - BusterChatMessageContainer { - response_message, - chat_id: *chat_id, - message_id: *message_id, - }, - ), - ThreadEvent::GeneratingResponseMessage, - )); - } - } - _ => {} - } - + // Create reasoning message container containers.push(( BusterContainer::ReasoningMessage( BusterReasoningMessageContainer { diff --git a/api/src/routes/rest/routes/assets/get_asset_access.rs b/api/src/routes/rest/routes/assets/get_asset_access.rs index 45df97ce7..665de3968 100644 --- a/api/src/routes/rest/routes/assets/get_asset_access.rs +++ b/api/src/routes/rest/routes/assets/get_asset_access.rs @@ -74,21 +74,8 @@ async fn get_asset_access_handler( .first::<(Uuid, bool, bool, Option>)>(&mut conn) .await?; - let user_permission = { - let pg_pool = pg_pool.clone(); - let user_id = user.id.clone(); - let asset_id = asset_id.clone(); - tokio::spawn(async move { - get_user_dashboard_permission(&pg_pool, &user_id, &asset_id).await - }) - }; - let user_permission = user_permission - .await - .map_err(|_| anyhow!("Failed to join task"))? // Changed to discard error details - .unwrap_or(None); // Use None for both error and no permission cases - - (dashboard_info, user_permission) + (dashboard_info, Some(AssetPermissionRole::Owner)) } AssetType::Thread => { let mut conn = pg_pool.get().await?;