//! Library for handling dataset security and permissions. use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use database::{ enums::{IdentityType, UserOrganizationRole}, pool::{get_pg_pool, PgPool}, schema::{ dataset_permissions, datasets, datasets_to_permission_groups, permission_groups, permission_groups_to_identities, teams, teams_to_users, users_to_organizations, }, }; use diesel::prelude::*; use diesel::{ BoolExpressionMethods, ExpressionMethods, JoinOnDsl, NullableExpressionMethods, Selectable, SelectableHelper, }; use diesel_async::{ pooled_connection::AsyncDieselConnectionManager, AsyncPgConnection, RunQueryDsl, }; use std::collections::HashSet; use tokio::{task::JoinHandle, try_join}; use uuid::Uuid; use tracing::{debug, warn}; // Define the new struct mirroring the one in search_data_catalog.rs #[derive(Queryable, Selectable, Clone, Debug)] #[diesel(table_name = datasets)] #[diesel(check_for_backend(diesel::pg::Pg))] pub struct PermissionedDataset { pub id: Uuid, pub name: String, #[diesel(column_name = "yml_file")] pub yml_content: Option, // Matches the local struct field name #[allow(dead_code)] pub created_at: DateTime, #[allow(dead_code)] pub updated_at: DateTime, #[allow(dead_code)] pub deleted_at: Option>, pub data_source_id: Uuid, } // --- Corrected Fetcher functions for different access paths --- // Path 1: Direct User -> Dataset async fn fetch_direct_user_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get connection inside function dataset_permissions::table .filter(dataset_permissions::permission_id.eq(user_id)) .filter(dataset_permissions::permission_type.eq("user")) .filter(dataset_permissions::deleted_at.is_null()) .select(dataset_permissions::dataset_id) .load::(&mut conn) .await .context("Failed to fetch direct user dataset IDs") } // Path 3: User -> Team -> Dataset (Direct team assignment) async fn fetch_team_direct_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; dataset_permissions::table .inner_join( teams_to_users::table.on(dataset_permissions::permission_id .eq(teams_to_users::team_id) .and(dataset_permissions::permission_type.eq("team")) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(dataset_permissions::deleted_at.is_null()) .select(dataset_permissions::dataset_id) .distinct() .load::(&mut conn) .await .context("Failed to fetch team direct dataset IDs") } // Path 2: User -> Group -> Dataset async fn fetch_user_group_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_id.eq(user_id)) .and(permission_groups_to_identities::identity_type.eq(IdentityType::User)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(datasets_to_permission_groups::dataset_id) .distinct() .load::(&mut conn) .await .context("Failed to fetch user group dataset IDs") } // Path 4: User -> Team -> Group -> Dataset async fn fetch_team_group_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_type.eq(IdentityType::Team)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .inner_join( teams_to_users::table.on(permission_groups_to_identities::identity_id .eq(teams_to_users::team_id) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(datasets_to_permission_groups::dataset_id) .distinct() .load::(&mut conn) .await .context("Failed to fetch team group dataset IDs") } // Path 5: User -> Organization -> Default Permission Group -> Dataset async fn fetch_org_default_dataset_ids( user_id: &Uuid ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get all the user's organizations let user_orgs = users_to_organizations::table .filter(users_to_organizations::user_id.eq(user_id)) .filter(users_to_organizations::deleted_at.is_null()) .select(users_to_organizations::organization_id) .load::(&mut conn) .await .context("Failed to fetch user organizations")?; if user_orgs.is_empty() { return Ok(Vec::new()); // User not in any organization } // Get datasets from all default permission groups across all user's organizations let mut all_dataset_ids = HashSet::new(); for organization_id in user_orgs { let default_group_name = format!("default:{}", organization_id); // Find the default permission group for this organization let default_group = permission_groups::table .filter(permission_groups::name.eq(&default_group_name)) .filter(permission_groups::organization_id.eq(organization_id)) .filter(permission_groups::deleted_at.is_null()) .select(permission_groups::id) .first::(&mut conn) .await .optional() .context("Failed to fetch default permission group")?; let Some(default_group_id) = default_group else { continue; // No default permission group exists for this org }; // Get all datasets in the default permission group let dataset_ids = datasets_to_permission_groups::table .filter(datasets_to_permission_groups::permission_group_id.eq(default_group_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(datasets_to_permission_groups::dataset_id) .load::(&mut conn) .await .context("Failed to fetch org default dataset IDs")?; all_dataset_ids.extend(dataset_ids); } // Return unique dataset IDs as a Vec Ok(all_dataset_ids.into_iter().collect()) } // --- Main Function --- pub async fn get_permissioned_datasets( user_id: &Uuid, page: i64, page_size: i64, ) -> Result> { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get initial connection // Fetch all user's organizations and roles let user_orgs = users_to_organizations::table .filter(users_to_organizations::user_id.eq(user_id)) .filter(users_to_organizations::deleted_at.is_null()) .select(( users_to_organizations::organization_id, users_to_organizations::role, )) .load::<(Uuid, UserOrganizationRole)>(&mut conn) .await .context("Failed to fetch user organizations")?; if user_orgs.is_empty() { return Ok(Vec::new()); // User not in any organization } // --- Admin/Querier Path --- // Check if user has admin/querier role in any organization let admin_org_ids: Vec = user_orgs .iter() .filter(|(_, role)| { matches!( role, UserOrganizationRole::WorkspaceAdmin | UserOrganizationRole::DataAdmin | UserOrganizationRole::Querier ) }) .map(|(org_id, _)| *org_id) .collect(); if !admin_org_ids.is_empty() { // Get datasets from ALL organizations where user has admin/querier access return datasets::table .filter(datasets::organization_id.eq_any(admin_org_ids)) .filter(datasets::deleted_at.is_null()) .select(PermissionedDataset::as_select()) .order(datasets::name.asc()) .limit(page_size) .offset(page * page_size) .load::(&mut conn) .await .context("Failed to load datasets for admin/querier"); } // --- Non-Admin Path --- // Drop the initial connection before concurrent fetches drop(conn); // Fetch all potential dataset IDs concurrently let ( direct_user_ids, team_direct_ids, user_group_ids, team_group_ids, org_default_ids, ) = try_join!( // Call helpers directly, they get their own connections fetch_direct_user_dataset_ids(user_id), fetch_team_direct_dataset_ids(user_id), fetch_user_group_dataset_ids(user_id), fetch_team_group_dataset_ids(user_id), fetch_org_default_dataset_ids(user_id) )?; // Combine and deduplicate IDs let mut all_accessible_ids = HashSet::new(); all_accessible_ids.extend(direct_user_ids); all_accessible_ids.extend(team_direct_ids); all_accessible_ids.extend(user_group_ids); all_accessible_ids.extend(team_group_ids); all_accessible_ids.extend(org_default_ids); if all_accessible_ids.is_empty() { return Ok(Vec::new()); // No datasets accessible } // Get all organization IDs for the user let org_ids: Vec = user_orgs.into_iter().map(|(org_id, _)| org_id).collect(); // Fetch the actual dataset info for the combined IDs with pagination // IMPORTANT: Filter by organization to prevent cross-org data access let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get final connection datasets::table .filter(datasets::id.eq_any(all_accessible_ids)) .filter(datasets::organization_id.eq_any(org_ids)) .filter(datasets::deleted_at.is_null()) .select(PermissionedDataset::as_select()) .order(datasets::name.asc()) .limit(page_size) .offset(page * page_size) .load::(&mut conn) .await .context("Failed to load datasets for non-admin user") } // Simplified check function /* async fn check_permission_exists

(predicate: P) -> Result where P: diesel_async::methods::LoadQuery<'static, AsyncPgConnection, (i64,)>, { let mut conn = get_conn().await?; let count = predicate .get_result::(&mut conn) // Use get_result for count .await?; Ok(count > 0) } */ pub async fn has_dataset_access(user_id: &Uuid, dataset_id: &Uuid) -> Result { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get initial connection // --- Check if Dataset exists and get Organization ID and deleted status --- let dataset_info = datasets::table .filter(datasets::id.eq(dataset_id)) // Remove the deleted_at filter here to check status later .select((datasets::organization_id, datasets::deleted_at)) .first::<(Uuid, Option>)>(&mut conn) .await; let (organization_id, deleted_at_status) = match dataset_info { Ok((org_id, deleted_at)) => (org_id, deleted_at), Err(diesel::NotFound) => return Ok(false), // Dataset doesn't exist Err(e) => return Err(e).context("Failed to fetch dataset info"), }; // --- Universal Check: If dataset is deleted, NO ONE has access --- if deleted_at_status.is_some() { return Ok(false); } // --- Dataset is NOT deleted, proceed with access checks --- // Check Admin/Querier Access let admin_access = users_to_organizations::table .filter(users_to_organizations::user_id.eq(user_id)) .filter(users_to_organizations::organization_id.eq(organization_id)) .filter(users_to_organizations::deleted_at.is_null()) .select(users_to_organizations::role) .first::(&mut conn) .await; if let Ok(role) = admin_access { if matches!( role, UserOrganizationRole::WorkspaceAdmin | UserOrganizationRole::DataAdmin | UserOrganizationRole::Querier ) { // Admins/Queriers have access to non-deleted datasets in their org return Ok(true); } } else if !matches!(admin_access, Err(diesel::NotFound)) { // Propagate unexpected errors from role check return Err(anyhow::Error::from(admin_access.err().unwrap())) .context("Error checking admin access"); } // --- If not Admin/Querier, proceed with detailed permission checks --- // (No need to check deleted_at again here) // Drop initial connection before spawning tasks drop(conn); // Clone IDs needed for tasks let user_id = *user_id; let dataset_id = *dataset_id; // --- Check Non-Admin Access Paths Concurrently using Tokio tasks --- // Path 1: Direct User -> Dataset let task1: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = dataset_permissions::table .filter(dataset_permissions::permission_id.eq(user_id)) .filter(dataset_permissions::permission_type.eq("user")) .filter(dataset_permissions::dataset_id.eq(dataset_id)) .filter(dataset_permissions::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 3: User -> Team -> Dataset let task2: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = dataset_permissions::table .inner_join( teams_to_users::table.on(dataset_permissions::permission_id .eq(teams_to_users::team_id) .and(dataset_permissions::permission_type.eq("team")) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(dataset_permissions::dataset_id.eq(dataset_id)) .filter(dataset_permissions::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 2: User -> Group -> Dataset let task3: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_id.eq(user_id)) .and(permission_groups_to_identities::identity_type.eq(IdentityType::User)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::dataset_id.eq(dataset_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 4: User -> Team -> Group -> Dataset let task4: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_type.eq(IdentityType::Team)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .inner_join( teams_to_users::table.on(permission_groups_to_identities::identity_id .eq(teams_to_users::team_id) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::dataset_id.eq(dataset_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 5: User -> Organization -> Default Permission Group -> Dataset let task5: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get all the user's organizations let user_orgs = users_to_organizations::table .filter(users_to_organizations::user_id.eq(user_id)) .filter(users_to_organizations::deleted_at.is_null()) .select(users_to_organizations::organization_id) .load::(&mut conn) .await?; if user_orgs.is_empty() { return Ok(false); // User not in any organization } // Check if the dataset is in any organization's default permission group for organization_id in user_orgs { let default_group_name = format!("default:{}", organization_id); let count = datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::name.eq(&default_group_name)) .and(permission_groups::organization_id.eq(organization_id)) .and(permission_groups::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::dataset_id.eq(dataset_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; if count > 0 { return Ok(true); // Found access in this org's default group } } Ok(false) // No access found in any org's default group }); // Await tasks and check results let results = vec![task1, task2, task3, task4, task5]; for handle in results { match handle.await { Ok(Ok(true)) => return Ok(true), // Access granted by this path Ok(Ok(false)) => continue, // This path didn't grant access, check next Ok(Err(e)) => return Err(e).context("Permission check task failed"), // DB error in check // Explicitly convert JoinError to anyhow::Error Err(e) => return Err(anyhow::Error::from(e)).context("Tokio task join error"), } } // If no task returned true Ok(false) } /// Checks if a user has access to ALL specified datasets. /// Returns true if the user has access to every dataset in the list, false otherwise. pub async fn has_all_datasets_access(user_id: &Uuid, dataset_ids: &[Uuid]) -> Result { if dataset_ids.is_empty() { // No datasets to check, vacuously true? Or should this be an error/false? // Let's assume true for now, meaning "no permissions required". Adjust if needed. // Changing to false for safer behavior: no datasets means no access granted. return Ok(false); } let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get initial connection // --- Step 1: Verify all datasets exist, are not deleted, and get their org IDs --- let dataset_infos = datasets::table .filter(datasets::id.eq_any(dataset_ids)) .select((datasets::id, datasets::organization_id, datasets::deleted_at)) .load::<(Uuid, Uuid, Option>)>(&mut conn) .await .context("Failed to fetch dataset info for bulk check")?; // Check if we found info for all requested datasets if dataset_infos.len() != dataset_ids.len() { warn!("One or more dataset IDs not found during bulk access check."); return Ok(false); // At least one dataset doesn't exist } // Check for deleted datasets and collect unique organization IDs let mut organization_ids = std::collections::HashSet::new(); for (id, org_id, deleted_at) in &dataset_infos { if deleted_at.is_some() { warn!("Dataset {} is deleted, access denied in bulk check.", id); return Ok(false); // Access denied if any dataset is deleted } organization_ids.insert(*org_id); } // --- Step 2: Check Admin/Querier access across all relevant organizations --- // If the user is an Admin/Querier in ANY of the organizations containing these datasets, // they have access to ALL non-deleted datasets within those specific orgs. // We need to ensure ALL datasets belong to orgs where the user has such a role. let admin_roles = users_to_organizations::table .filter(users_to_organizations::user_id.eq(user_id)) .filter(users_to_organizations::organization_id.eq_any(organization_ids.iter().cloned().collect::>())) .filter(users_to_organizations::deleted_at.is_null()) .select((users_to_organizations::organization_id, users_to_organizations::role)) .load::<(Uuid, UserOrganizationRole)>(&mut conn) .await .context("Error checking admin roles for bulk dataset access")?; let admin_org_ids_with_access: std::collections::HashSet = admin_roles .into_iter() .filter_map(|(org_id, role)| { if matches!( role, UserOrganizationRole::WorkspaceAdmin | UserOrganizationRole::DataAdmin | UserOrganizationRole::Querier ) { Some(org_id) } else { None } }) .collect(); // Check if all required organization IDs are covered by the user's admin/querier roles if organization_ids.is_subset(&admin_org_ids_with_access) { debug!("User {} has admin/querier access to all required organizations for datasets {:?}", user_id, dataset_ids); return Ok(true); // User is admin/querier in all necessary orgs } // --- Step 3: If not fully covered by admin/querier roles, check specific permissions for each dataset --- // This requires checking each dataset individually using the existing logic. // We can iterate and call the single `has_dataset_access` function. // This might be less efficient than a complex bulk query but reuses existing logic. // Drop the connection before potentially spawning tasks in the loop drop(conn); for dataset_id in dataset_ids { // We could call the existing has_dataset_access here. // However, it repeats the deleted check and org role check we partially did. // Let's refine the logic to avoid redundant checks. let dataset_org_id = dataset_infos.iter().find(|(id, _, _)| id == dataset_id).map(|(_, org_id, _)| *org_id) .expect("Dataset info missing after validation - this is a bug"); // Should exist due to earlier checks if admin_org_ids_with_access.contains(&dataset_org_id) { // User has admin/querier access in this dataset's org, so access is granted for this specific dataset. continue; // Move to the next dataset } // If not admin/querier for this dataset's org, perform detailed permission checks // Using a simplified version of the concurrent checks from has_dataset_access let has_specific_access = check_specific_dataset_permissions(user_id, dataset_id).await?; if !has_specific_access { debug!("User {} lacks specific permissions for dataset {} in bulk check.", user_id, dataset_id); return Ok(false); // If access fails for any single dataset, the whole check fails } } // If the loop completes without returning false, the user has access to all datasets debug!("User {} has access to all datasets {:?} via specific permissions or admin roles.", user_id, dataset_ids); Ok(true) } /// Helper function for `has_all_datasets_access` to check non-admin permissions for a single dataset. /// This avoids repeating the deleted check and admin check. async fn check_specific_dataset_permissions(user_id: &Uuid, dataset_id: &Uuid) -> Result { // Clone IDs needed for tasks let user_id = *user_id; let dataset_id = *dataset_id; // --- Check Non-Admin Access Paths Concurrently using Tokio tasks --- // Path 1: Direct User -> Dataset let task1: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = dataset_permissions::table .filter(dataset_permissions::permission_id.eq(user_id)) .filter(dataset_permissions::permission_type.eq("user")) .filter(dataset_permissions::dataset_id.eq(dataset_id)) .filter(dataset_permissions::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 3: User -> Team -> Dataset let task2: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = dataset_permissions::table .inner_join( teams_to_users::table.on(dataset_permissions::permission_id .eq(teams_to_users::team_id) .and(dataset_permissions::permission_type.eq("team")) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(dataset_permissions::dataset_id.eq(dataset_id)) .filter(dataset_permissions::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 2: User -> Group -> Dataset let task3: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_id.eq(user_id)) .and(permission_groups_to_identities::identity_type.eq(IdentityType::User)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::dataset_id.eq(dataset_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 4: User -> Team -> Group -> Dataset let task4: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; let count = datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::deleted_at.is_null())), ) .inner_join( permission_groups_to_identities::table.on(permission_groups::id .eq(permission_groups_to_identities::permission_group_id) .and(permission_groups_to_identities::identity_type.eq(IdentityType::Team)) .and(permission_groups_to_identities::deleted_at.is_null())), ) .inner_join( teams_to_users::table.on(permission_groups_to_identities::identity_id .eq(teams_to_users::team_id) .and(teams_to_users::user_id.eq(user_id)) .and(teams_to_users::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::dataset_id.eq(dataset_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; Ok(count > 0) }); // Path 5: User -> Organization -> Default Permission Group -> Dataset let task5: JoinHandle> = tokio::spawn(async move { let mut conn = get_pg_pool().get().await.context("DB Error")?; // Get all the user's organizations let user_orgs = users_to_organizations::table .filter(users_to_organizations::user_id.eq(user_id)) .filter(users_to_organizations::deleted_at.is_null()) .select(users_to_organizations::organization_id) .load::(&mut conn) .await?; if user_orgs.is_empty() { return Ok(false); // User not in any organization } // Check if the dataset is in any organization's default permission group for organization_id in user_orgs { let default_group_name = format!("default:{}", organization_id); let count = datasets_to_permission_groups::table .inner_join( permission_groups::table.on(datasets_to_permission_groups::permission_group_id .eq(permission_groups::id) .and(permission_groups::name.eq(&default_group_name)) .and(permission_groups::organization_id.eq(organization_id)) .and(permission_groups::deleted_at.is_null())), ) .filter(datasets_to_permission_groups::dataset_id.eq(dataset_id)) .filter(datasets_to_permission_groups::deleted_at.is_null()) .select(diesel::dsl::count_star()) .get_result::(&mut conn) .await?; if count > 0 { return Ok(true); // Found access in this org's default group } } Ok(false) // No access found in any org's default group }); // Await tasks and check results let results = vec![task1, task2, task3, task4, task5]; for handle in results { match handle.await { Ok(Ok(true)) => return Ok(true), // Access granted by this path Ok(Ok(false)) => continue, // This path didn't grant access, check next Ok(Err(e)) => return Err(e).context("Permission check task failed"), // DB error in check Err(e) => return Err(anyhow::Error::from(e)).context("Tokio task join error"), // Task failed } } // If no task returned true Ok(false) }