//! Library for handling dataset security and permissions. use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use database::{ enums::{IdentityType, UserOrganizationRole}, pool::{get_pg_pool, PgPool}, schema::{ dataset_permissions, datasets, datasets_to_permission_groups, permission_groups, permission_groups_to_identities, teams, teams_to_users, users_to_organizations, }, }; use diesel::prelude::*; use diesel::{ BoolExpressionMethods, ExpressionMethods, JoinOnDsl, NullableExpressionMethods, Selectable, SelectableHelper, }; use diesel_async::{ pooled_connection::AsyncDieselConnectionManager, AsyncPgConnection, RunQueryDsl, }; use std::collections::HashSet; use tokio::{task::JoinHandle, try_join}; use uuid::Uuid; // Define the new struct mirroring the one in search_data_catalog.rs #[derive(Queryable, Selectable, Clone, Debug)] #[diesel(table_name = datasets)] #[diesel(check_for_backend(diesel::pg::Pg))] pub struct PermissionedDataset { pub id: Uuid, pub name: String, #[diesel(column_name = "yml_file")] pub yml_content: Option, // Matches the local struct field name #[allow(dead_code)] pub created_at: DateTime, #[allow(dead_code)] pub updated_at: DateTime, #[allow(dead_code)] pub deleted_at: Option>, } // --- Corrected Fetcher functions for different access paths --- // Path 1: Direct User -> Dataset async fn fetch_direct_user_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get connection inside function dataset_permissions::table .filter(dataset_permissions::permission_id.eq(user_id)) .filter(dataset_permissions::permission_type.eq("user")) .filter(dataset_permissions::deleted_at.is_null()) .select(dataset_permissions::dataset_id) .load::(&mut conn) .await .context("Failed to fetch direct user dataset IDs") } // Path 3: User -> Team -> Dataset (Direct team assignment) async fn fetch_team_direct_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; dataset_permissions::table .inner_join( teams_to_users::table.on(dataset_permissions::permission_id .eq(teams_to_users::team_id) .and(dataset_permissions::permission_type.eq("team")) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(dataset_permissions::deleted_at.is_null()) .select(dataset_permissions::dataset_id) .distinct() .load::(&mut conn) .await .context("Failed to fetch team direct dataset IDs") } // Path 2: User -> Group -> Dataset async fn fetch_user_group_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_id.eq(user_id)) .and(permission_groups_to_identities::identity_type.eq(IdentityType::User)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(datasets_to_permission_groups::dataset_id) .distinct() .load::(&mut conn) .await .context("Failed to fetch user group dataset IDs") } // Path 4: User -> Team -> Group -> Dataset async fn fetch_team_group_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_type.eq(IdentityType::Team)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .inner_join( teams_to_users::table.on(permission_groups_to_identities::identity_id .eq(teams_to_users::team_id) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(datasets_to_permission_groups::dataset_id) .distinct() .load::(&mut conn) .await .context("Failed to fetch team group dataset IDs") } // --- Main Function --- pub async fn get_permissioned_datasets( user_id: &Uuid, page: i64, page_size: i64, ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get initial connection // Fetch user's organization and role let user_org_info = users_to_organizations::table .filter(users_to_organizations::user_id.eq(user_id)) .filter(users_to_organizations::deleted_at.is_null()) .select(( users_to_organizations::organization_id, users_to_organizations::role, )) .first::<(Uuid, UserOrganizationRole)>(&mut conn) .await; match user_org_info { // --- Admin/Querier Path --- Ok((organization_id, role)) if matches!( role, UserOrganizationRole::WorkspaceAdmin | UserOrganizationRole::DataAdmin | UserOrganizationRole::Querier ) => { // Use the same connection for the admin query datasets::table .filter(datasets::organization_id.eq(organization_id)) .filter(datasets::deleted_at.is_null()) .select(PermissionedDataset::as_select()) .order(datasets::name.asc()) .limit(page_size) .offset(page * page_size) .load::(&mut conn) .await .context("Failed to load datasets for admin/querier") } // --- Non-Admin Path --- Ok(_) => { // Drop the initial connection before concurrent fetches drop(conn); // Fetch all potential dataset IDs concurrently let ( direct_user_ids, team_direct_ids, user_group_ids, team_group_ids, ) = try_join!( // Call helpers directly, they get their own connections fetch_direct_user_dataset_ids(user_id), fetch_team_direct_dataset_ids(user_id), fetch_user_group_dataset_ids(user_id), fetch_team_group_dataset_ids(user_id) )?; // Combine and deduplicate IDs let mut all_accessible_ids = HashSet::new(); all_accessible_ids.extend(direct_user_ids); all_accessible_ids.extend(team_direct_ids); all_accessible_ids.extend(user_group_ids); all_accessible_ids.extend(team_group_ids); if all_accessible_ids.is_empty() { return Ok(Vec::new()); // No datasets accessible } // Fetch the actual dataset info for the combined IDs with pagination let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get final connection datasets::table .filter(datasets::id.eq_any(all_accessible_ids)) .filter(datasets::deleted_at.is_null()) .select(PermissionedDataset::as_select()) .order(datasets::name.asc()) .limit(page_size) .offset(page * page_size) .load::(&mut conn) .await .context("Failed to load datasets for non-admin user") } // --- User Not In Organization --- Err(diesel::NotFound) => Ok(Vec::new()), // --- Other Error --- Err(e) => Err(e).context("Error fetching user organization role"), } } // Simplified check function /* async fn check_permission_exists

(predicate: P) -> Result where P: diesel_async::methods::LoadQuery<'static, AsyncPgConnection, (i64,)>, { let mut conn = get_conn().await?; let count = predicate .get_result::(&mut conn) // Use get_result for count .await?; Ok(count > 0) } */ pub async fn has_dataset_access(user_id: &Uuid, dataset_id: &Uuid) -> Result { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get initial connection // --- Check if Dataset exists and get Organization ID --- let dataset_org = datasets::table .filter(datasets::id.eq(dataset_id)) .filter(datasets::deleted_at.is_null()) .select(datasets::organization_id) .first::(&mut conn) .await; let organization_id = match dataset_org { Ok(org_id) => org_id, Err(diesel::NotFound) => return Ok(false), // Dataset doesn't exist or is deleted Err(e) => return Err(e).context("Failed to check dataset existence"), }; // --- Check Admin/Querier Access --- let admin_access = users_to_organizations::table .filter(users_to_organizations::user_id.eq(user_id)) .filter(users_to_organizations::organization_id.eq(organization_id)) .filter(users_to_organizations::deleted_at.is_null()) .select(users_to_organizations::role) .first::(&mut conn) .await; if let Ok(role) = admin_access { if matches!( role, UserOrganizationRole::WorkspaceAdmin | UserOrganizationRole::DataAdmin | UserOrganizationRole::Querier ) { return Ok(true); } } else if !matches!(admin_access, Err(diesel::NotFound)) { // Propagate unexpected errors // Explicitly convert diesel::Error to anyhow::Error return Err(anyhow::Error::from(admin_access.err().unwrap())) .context("Error checking admin access"); } // Drop initial connection before spawning tasks drop(conn); // Clone IDs needed for tasks let user_id = *user_id; let dataset_id = *dataset_id; // --- Check Non-Admin Access Paths Concurrently using Tokio tasks --- // Path 1: Direct User -> Dataset let task1: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = dataset_permissions::table .filter(dataset_permissions::permission_id.eq(user_id)) .filter(dataset_permissions::permission_type.eq("user")) .filter(dataset_permissions::dataset_id.eq(dataset_id)) .filter(dataset_permissions::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 3: User -> Team -> Dataset let task2: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = dataset_permissions::table .inner_join( teams_to_users::table.on(dataset_permissions::permission_id .eq(teams_to_users::team_id) .and(dataset_permissions::permission_type.eq("team")) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(dataset_permissions::dataset_id.eq(dataset_id)) .filter(dataset_permissions::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 2: User -> Group -> Dataset let task3: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_id.eq(user_id)) .and(permission_groups_to_identities::identity_type.eq(IdentityType::User)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::dataset_id.eq(dataset_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 4: User -> Team -> Group -> Dataset let task4: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_type.eq(IdentityType::Team)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .inner_join( teams_to_users::table.on(permission_groups_to_identities::identity_id .eq(teams_to_users::team_id) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::dataset_id.eq(dataset_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Await tasks and check results let results = vec![task1, task2, task3, task4]; for handle in results { match handle.await { Ok(Ok(true)) => return Ok(true), // Access granted by this path Ok(Ok(false)) => continue, // This path didn't grant access, check next Ok(Err(e)) => return Err(e).context("Permission check task failed"), // DB error in check // Explicitly convert JoinError to anyhow::Error Err(e) => return Err(anyhow::Error::from(e)).context("Tokio task join error"), } } // If no task returned true Ok(false) }