diff --git a/api/libs/handlers/src/collections/add_assets_to_collection_handler.rs b/api/libs/handlers/src/collections/add_assets_to_collection_handler.rs index 392e8aff3..21675c7e1 100644 --- a/api/libs/handlers/src/collections/add_assets_to_collection_handler.rs +++ b/api/libs/handlers/src/collections/add_assets_to_collection_handler.rs @@ -4,9 +4,10 @@ use database::{ enums::{AssetPermissionRole, AssetType}, helpers::collections::fetch_collection_with_permission, metric_files::fetch_metric_file_with_permissions, + chats::fetch_chat_with_permission, models::CollectionToAsset, pool::get_pg_pool, - schema::collections_to_assets, + schema::{collections_to_assets, chats}, }; use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; @@ -113,11 +114,13 @@ pub async fn add_assets_to_collection_handler( // 3. Group assets by type for efficient processing let mut dashboard_ids = Vec::new(); let mut metric_ids = Vec::new(); + let mut chat_ids = Vec::new(); for asset in &assets { match asset.asset_type { AssetType::DashboardFile => dashboard_ids.push(asset.id), AssetType::MetricFile => metric_ids.push(asset.id), + AssetType::Chat => chat_ids.push(asset.id), _ => { error!( asset_id = %asset.id, @@ -447,6 +450,119 @@ pub async fn add_assets_to_collection_handler( } } + // Process chats + if !chat_ids.is_empty() { + for chat_id in &chat_ids { + // Check if chat exists and user has permission + let chat_permission = fetch_chat_with_permission(&chat_id, &user.id).await?; + + let chat_permission = if let Some(cp) = chat_permission { + cp + } else { + error!(chat_id = %chat_id, user_id = %user.id, "Chat not found"); + result.failed_count += 1; + result.failed_assets.push(( *chat_id, AssetType::Chat, "Chat not found".to_string())); + continue; + }; + + // Check if user has at least CanView access to the chat + let has_chat_permission = check_permission_access( + chat_permission.permission, + &[ + AssetPermissionRole::CanView, + AssetPermissionRole::CanEdit, + AssetPermissionRole::FullAccess, + AssetPermissionRole::Owner, + ], + chat_permission.chat.organization_id, + &user.organizations, + ); + + if !has_chat_permission { + error!(chat_id = %chat_id, user_id = %user.id, "User does not have permission to access this chat"); + result.failed_count += 1; + result.failed_assets.push(( *chat_id, AssetType::Chat, "Insufficient permissions".to_string())); + continue; + } + + // Check if the chat is already in the collection + let existing = match collections_to_assets::table + .filter(collections_to_assets::collection_id.eq(collection_id)) + .filter(collections_to_assets::asset_id.eq(chat_id)) + .filter(collections_to_assets::asset_type.eq(AssetType::Chat)) + .first::(&mut conn) + .await + { + Ok(record) => Some(record), + Err(diesel::NotFound) => None, + Err(e) => { + error!("Error checking if chat is already in collection: {}", e); + result.failed_count += 1; + result.failed_assets.push(( *chat_id, AssetType::Chat, format!("Database error: {}", e))); + continue; + } + }; + + if let Some(existing_record) = existing { + if existing_record.deleted_at.is_some() { + // If it was previously deleted, update it + match diesel::update(collections_to_assets::table) + .filter(collections_to_assets::collection_id.eq(collection_id)) + .filter(collections_to_assets::asset_id.eq(chat_id)) + .filter(collections_to_assets::asset_type.eq(AssetType::Chat)) + .set(( + collections_to_assets::deleted_at.eq::>>(None), + collections_to_assets::updated_at.eq(chrono::Utc::now()), + collections_to_assets::updated_by.eq(user.id), + )) + .execute(&mut conn) + .await + { + Ok(_) => { + result.added_count += 1; + } + Err(e) => { + error!(collection_id = %collection_id, chat_id = %chat_id, "Error updating chat in collection: {}", e); + result.failed_count += 1; + result.failed_assets.push(( *chat_id, AssetType::Chat, format!("Database error: {}", e))); + } + } + } else { + // Already in the collection + info!(collection_id = %collection_id, chat_id = %chat_id, "Chat already in collection"); + result.added_count += 1; + } + } else { + // Add to collection + let new_record = CollectionToAsset { + collection_id: *collection_id, + asset_id: *chat_id, + asset_type: AssetType::Chat, + created_at: chrono::Utc::now(), + created_by: user.id, + updated_at: chrono::Utc::now(), + updated_by: user.id, + deleted_at: None, + }; + + match diesel::insert_into(collections_to_assets::table) + .values(&new_record) + .execute(&mut conn) + .await + { + Ok(_) => { + result.added_count += 1; + } + Err(e) => { + error!(collection_id = %collection_id, chat_id = %chat_id, "Error adding chat to collection: {}", e); + result.failed_count += 1; + result.failed_assets.push(( *chat_id, AssetType::Chat, format!("Database error: {}", e))); + } + } + } + } + } + Ok(result) } diff --git a/api/libs/handlers/src/collections/get_collection_handler.rs b/api/libs/handlers/src/collections/get_collection_handler.rs index def95f001..01ed9f3aa 100644 --- a/api/libs/handlers/src/collections/get_collection_handler.rs +++ b/api/libs/handlers/src/collections/get_collection_handler.rs @@ -6,6 +6,7 @@ use database::{ pool::get_pg_pool, schema::{ asset_permissions, collections_to_assets, dashboard_files, metric_files, users, + chats, }, }; use diesel::{ExpressionMethods, JoinOnDsl, NullableExpressionMethods, QueryDsl, Queryable}; @@ -165,70 +166,141 @@ pub async fn get_collection_handler( Err(_) => (false, None, None), }; - // Query for metric assets in the collection - let metric_assets_result = collections_to_assets::table - .inner_join(metric_files::table.on(metric_files::id.eq(collections_to_assets::asset_id))) - .left_join(users::table.on(users::id.eq(metric_files::created_by))) - .filter(collections_to_assets::collection_id.eq(req.id)) - .filter(collections_to_assets::asset_type.eq(AssetType::MetricFile)) - .filter(collections_to_assets::deleted_at.is_null()) - .filter(metric_files::deleted_at.is_null()) - .select(( - metric_files::id, - metric_files::name, - users::name.nullable(), - users::email.nullable(), - metric_files::created_at, - metric_files::updated_at, - collections_to_assets::asset_type, - )) - .load::(&mut conn) - .await; + // Get the pool once + let pool = get_pg_pool(); - // Query for dashboard assets in the collection - let dashboard_assets_result = collections_to_assets::table - .inner_join( - dashboard_files::table.on(dashboard_files::id.eq(collections_to_assets::asset_id)), - ) - .left_join(users::table.on(users::id.eq(dashboard_files::created_by))) - .filter(collections_to_assets::collection_id.eq(req.id)) - .filter(collections_to_assets::asset_type.eq(AssetType::DashboardFile)) - .filter(collections_to_assets::deleted_at.is_null()) - .filter(dashboard_files::deleted_at.is_null()) - .select(( - dashboard_files::id, - dashboard_files::name, - users::name.nullable(), - users::email.nullable(), - dashboard_files::created_at, - dashboard_files::updated_at, - collections_to_assets::asset_type, - )) - .load::(&mut conn) - .await; + // Spawn tasks for fetching assets concurrently + let metric_assets_handle = tokio::spawn({ + let pool = pool.clone(); + let req_id = req.id; + async move { + let mut conn = pool.get().await.map_err(anyhow::Error::from)?; + collections_to_assets::table + .inner_join(metric_files::table.on(metric_files::id.eq(collections_to_assets::asset_id))) + .left_join(users::table.on(users::id.eq(metric_files::created_by))) + .filter(collections_to_assets::collection_id.eq(req_id)) + .filter(collections_to_assets::asset_type.eq(AssetType::MetricFile)) + .filter(collections_to_assets::deleted_at.is_null()) + .filter(metric_files::deleted_at.is_null()) + .select(( + metric_files::id, + metric_files::name, + users::name.nullable(), + users::email.nullable(), + metric_files::created_at, + metric_files::updated_at, + collections_to_assets::asset_type, + )) + .load::(&mut conn) + .await + .map_err(anyhow::Error::from) + } + }); + + let dashboard_assets_handle = tokio::spawn({ + let pool = pool.clone(); + let req_id = req.id; + async move { + let mut conn = pool.get().await.map_err(anyhow::Error::from)?; + collections_to_assets::table + .inner_join(dashboard_files::table.on(dashboard_files::id.eq(collections_to_assets::asset_id))) + .left_join(users::table.on(users::id.eq(dashboard_files::created_by))) + .filter(collections_to_assets::collection_id.eq(req_id)) + .filter(collections_to_assets::asset_type.eq(AssetType::DashboardFile)) + .filter(collections_to_assets::deleted_at.is_null()) + .filter(dashboard_files::deleted_at.is_null()) + .select(( + dashboard_files::id, + dashboard_files::name, + users::name.nullable(), + users::email.nullable(), + dashboard_files::created_at, + dashboard_files::updated_at, + collections_to_assets::asset_type, + )) + .load::(&mut conn) + .await + .map_err(anyhow::Error::from) + } + }); + + let chat_assets_handle = tokio::spawn({ + let pool = pool.clone(); + let req_id = req.id; + async move { + let mut conn = pool.get().await.map_err(anyhow::Error::from)?; + collections_to_assets::table + .inner_join(chats::table.on(chats::id.eq(collections_to_assets::asset_id))) + .left_join(users::table.on(users::id.eq(chats::created_by))) + .filter(collections_to_assets::collection_id.eq(req_id)) + .filter(collections_to_assets::asset_type.eq(AssetType::Chat)) + .filter(collections_to_assets::deleted_at.is_null()) + .filter(chats::deleted_at.is_null()) + .select(( + chats::id, + chats::title, // Use title as name for chats + users::name.nullable(), + users::email.nullable(), + chats::created_at, + chats::updated_at, + collections_to_assets::asset_type, + )) + .load::(&mut conn) + .await + .map_err(anyhow::Error::from) + } + }); + + // Await all tasks and handle results + let (metric_assets_result, dashboard_assets_result, chat_assets_result) = tokio::join!( + metric_assets_handle, + dashboard_assets_handle, + chat_assets_handle + ); // Process metric assets let metric_assets = match metric_assets_result { - Ok(assets) => assets, - Err(e) => { + Ok(Ok(assets)) => assets, + Ok(Err(e)) => { tracing::error!("Failed to fetch metric assets: {}", e); vec![] } + Err(e) => { + tracing::error!("Metric asset task failed: {}", e); + vec![] + } }; // Process dashboard assets let dashboard_assets = match dashboard_assets_result { - Ok(assets) => assets, - Err(e) => { + Ok(Ok(assets)) => assets, + Ok(Err(e)) => { tracing::error!("Failed to fetch dashboard assets: {}", e); vec![] } + Err(e) => { + tracing::error!("Dashboard asset task failed: {}", e); + vec![] + } }; - println!("dashboard_assets: {:?}", dashboard_assets); + // Process chat assets + let chat_assets = match chat_assets_result { + Ok(Ok(assets)) => assets, + Ok(Err(e)) => { + tracing::error!("Failed to fetch chat assets: {}", e); + vec![] + } + Err(e) => { + tracing::error!("Chat asset task failed: {}", e); + vec![] + } + }; + + // println!("dashboard_assets: {:?}", dashboard_assets); // Keep or remove debug print? // Combine all assets - let all_assets = [metric_assets, dashboard_assets].concat(); + let all_assets = [metric_assets, dashboard_assets, chat_assets].concat(); // Add chat_assets let formatted_assets = format_assets(all_assets); // Create collection state diff --git a/api/libs/handlers/src/dashboards/get_dashboard_handler.rs b/api/libs/handlers/src/dashboards/get_dashboard_handler.rs index 4d62a141d..c038f9c27 100644 --- a/api/libs/handlers/src/dashboards/get_dashboard_handler.rs +++ b/api/libs/handlers/src/dashboards/get_dashboard_handler.rs @@ -12,7 +12,8 @@ use tokio::task::JoinHandle; use uuid::Uuid; use crate::dashboards::types::{BusterShareIndividual, DashboardCollection}; -use crate::metrics::{get_metric_handler, BusterMetric, Dataset, Version}; +use crate::metrics::get_metric_handler; +use crate::metrics::{BusterMetric, Dataset, Version}; use database::enums::{AssetPermissionRole, AssetType, IdentityType, Verification}; use database::helpers::dashboard_files::fetch_dashboard_file_with_permission; use database::pool::get_pg_pool; @@ -86,112 +87,6 @@ async fn fetch_associated_collections_for_dashboard( Ok(associated_collections) } -/// Fetches minimal metric data for display on a dashboard, enforcing a specific permission level. -async fn get_metrics_for_dashboard( - metric_ids: &[Uuid], - dashboard_permission: AssetPermissionRole, - // user: &AuthenticatedUser, // We might not need the user if we aren't doing fine-grained checks here -) -> Result> { - if metric_ids.is_empty() { - return Ok(HashMap::new()); - } - - let mut conn = get_pg_pool().get().await?; - - // Fetch all required MetricFile records in one query - let metric_files_data = metric_files::table - .filter(metric_files::id.eq_any(metric_ids)) - .filter(metric_files::deleted_at.is_null()) // Ensure metrics aren't deleted - .load::(&mut conn) - .await?; - - let mut metrics_map = HashMap::new(); - - // Process each fetched metric file - for metric_file in metric_files_data { - // Use latest content directly from the MetricFile record - let metric_content: MetricYml = metric_file.content; - let version_num = metric_file.version_history.get_version_number(); - - // Convert content to YAML string for the 'file' field - let file_yaml = match serde_yaml::to_string(&metric_content) { - Ok(yaml) => yaml, - Err(e) => { - tracing::error!(metric_id = %metric_file.id, error = %e, "Failed to serialize metric content to YAML for dashboard view"); - // Skip this metric or handle error appropriately - continue; - } - }; - - // Simplified dataset fetching (if needed, adapt from get_metric_handler if complex logic required) - let mut datasets = Vec::new(); - let mut first_data_source_id = None; - // Placeholder: Fetching actual dataset names might require another query or joining. - // For now, create dummy datasets based on IDs if necessary for the type. - if !metric_content.dataset_ids.is_empty() { - first_data_source_id = Some(Uuid::nil()); // Placeholder - needs actual lookup if required - datasets = metric_content - .dataset_ids - .iter() - .map(|id| Dataset { - id: id.to_string(), - name: format!("Dataset {}", id), // Placeholder name - }) - .collect(); - } - - // Construct the BusterMetric, forcing the dashboard's permission level - let buster_metric = BusterMetric { - id: metric_file.id, - metric_type: "metric".to_string(), // Assuming standard type - name: metric_file.name, - version_number: version_num, - description: metric_content.description, - file_name: metric_file.file_name, - time_frame: metric_content.time_frame, - datasets, // Using simplified dataset info - data_source_id: first_data_source_id.map_or("".to_string(), |id| id.to_string()), - error: None, // Assuming no error fetching this basic view - chart_config: Some(metric_content.chart_config), - data_metadata: metric_file.data_metadata, - status: metric_file.verification, - evaluation_score: metric_file.evaluation_score.map(|score| { - // Simple mapping - if score >= 0.8 { - "High".to_string() - } else if score >= 0.5 { - "Moderate".to_string() - } else { - "Low".to_string() - } - }), - evaluation_summary: metric_file.evaluation_summary.unwrap_or_default(), - file: file_yaml, - created_at: metric_file.created_at, - updated_at: metric_file.updated_at, - sent_by_id: metric_file.created_by, - sent_by_name: "".to_string(), // Placeholder - requires user lookup - sent_by_avatar_url: None, // Placeholder - requires user lookup - code: None, // Assuming not needed for dashboard view - dashboards: vec![], // Omit associated dashboards/collections for simplicity - collections: vec![], // Omit associated dashboards/collections for simplicity - versions: vec![], // Omit detailed versions for simplicity - permission: dashboard_permission, // <<< Force the permission level here - sql: metric_content.sql, - // Omit detailed sharing info for simplicity in this view - individual_permissions: None, - publicly_accessible: metric_file.publicly_accessible, - public_expiry_date: metric_file.public_expiry_date, - public_enabled_by: None, // Placeholder - requires user lookup - public_password: metric_file.public_password, - }; - - metrics_map.insert(buster_metric.id, buster_metric); - } - - Ok(metrics_map) -} - pub async fn get_dashboard_handler( dashboard_id: &Uuid, user: &AuthenticatedUser, @@ -351,8 +246,55 @@ pub async fn get_dashboard_handler( }) .collect(); - // Fetch metrics using the specialized function, enforcing dashboard permission - let metrics = get_metrics_for_dashboard(&metric_ids, permission).await?; + // Fetch metrics concurrently using get_metric_handler + let mut metric_fetch_handles = Vec::new(); + for metric_id in metric_ids { + let user_clone = user.clone(); // Clone user for the spawned task + // Spawn a task for each metric fetch. + // Pass None for version_number and password as the dashboard view uses the latest metric + // and access is primarily determined by dashboard permissions. + let handle = tokio::spawn(async move { + get_metric_handler(&metric_id, &user_clone, None, None).await + }); + metric_fetch_handles.push((metric_id, handle)); + } + + // Await all metric fetch tasks and collect results + let metric_results = join_all( + metric_fetch_handles + .into_iter() + .map(|(_, handle)| handle), + ) + .await; + + // Process results and build the metrics map + let mut metrics = HashMap::new(); + for result in metric_results { + match result { + Ok(Ok(metric)) => { + // Successfully fetched metric + metrics.insert(metric.id, metric); + } + Ok(Err(e)) => { + // get_metric_handler returned an error + // Log the error, but don't fail the entire dashboard load + tracing::error!( + "Failed to fetch metric for dashboard {}: {}", + dashboard_id, + e + ); + // Optionally, insert a placeholder or error metric into the map + } + Err(e) => { + // Task join error (panic) + tracing::error!( + "Task join error fetching metric for dashboard {}: {}", + dashboard_id, + e + ); + } + } + } // Query individual permissions for this dashboard let individual_permissions_query = asset_permissions::table