Enhance dataset listing functionality with user organization roles

- Refactored dataset listing logic to incorporate user organization roles, allowing for more granular access control based on user permissions.
- Introduced new role checks for `WorkspaceAdmin`, `DataAdmin`, `Querier`, `RestrictedQuerier`, and `Viewer` to determine dataset visibility.
- Updated database queries to fetch datasets based on user roles and organization associations, improving data retrieval efficiency.
- Removed deprecated functions and streamlined the dataset fetching process, ensuring clarity and maintainability in the codebase.

These changes improve the API's security and usability by enforcing role-based access control for dataset operations.
This commit is contained in:
dal 2025-01-16 10:23:06 -07:00
parent acdb260cb4
commit d0b05608e5
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
3 changed files with 2444 additions and 1643 deletions

View File

@ -12,12 +12,13 @@ use uuid::Uuid;
use crate::{
database::{
enums::IdentityType,
enums::{IdentityType, UserOrganizationRole},
lib::{get_pg_pool, PgPool},
models::User,
models::{User, UserToOrganization},
schema::{
data_sources, datasets, datasets_to_permission_groups, messages,
permission_groups_to_identities, teams_to_users, users,
data_sources, dataset_permissions, datasets, datasets_to_permission_groups, messages,
permission_groups_to_identities, permission_groups_to_users, teams_to_users, users,
users_to_organizations,
},
},
routes::rest::ApiResponse,
@ -115,173 +116,46 @@ async fn list_datasets_handler(
let page_size = page_size.unwrap_or(25);
let admin_view = admin_view.unwrap_or(false);
let pg_pool = get_pg_pool();
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
let organization_id = get_user_organization_id(&user_id).await?;
let list_of_datasets = if let Some(permission_group_id) = permission_group_id {
list_permission_group_datasets(
pg_pool,
organization_id,
page,
page_size,
permission_group_id,
)
.await?
} else {
match admin_view {
true => {
get_org_datasets(
pg_pool,
&organization_id,
page,
page_size,
enabled,
imported,
data_source_id,
)
.await?
}
false => get_user_permissioned_datasets(pg_pool, &user_id, page, page_size).await?,
// Right now we only allow users to have one organization this will change in the future
let user_organization_record = match users_to_organizations::table
.filter(users_to_organizations::user_id.eq(user_id))
.filter(users_to_organizations::deleted_at.is_null())
.select(users_to_organizations::all_columns)
.first::<UserToOrganization>(&mut conn)
.await
{
Ok(organization_id) => organization_id,
Err(e) => return Err(anyhow!("Unable to get organization from database: {}", e)),
};
let list_of_datasets = match &user_organization_record.role {
UserOrganizationRole::WorkspaceAdmin
| UserOrganizationRole::DataAdmin
| UserOrganizationRole::Querier => {
get_org_datasets(
&user_organization_record.organization_id,
page,
page_size,
enabled,
imported,
data_source_id,
)
.await?
}
UserOrganizationRole::RestrictedQuerier => {
get_restricted_user_datasets(user_id, page, page_size).await?
}
UserOrganizationRole::Viewer => Vec::new(),
};
Ok(list_of_datasets)
}
async fn get_user_permissioned_datasets(
pool: &PgPool,
user_id: &Uuid,
page: i64,
page_size: i64,
) -> Result<Vec<ListDatasetObject>> {
let mut conn = match pool.get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
let list_dataset_records = match datasets::table
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
.inner_join(
datasets_to_permission_groups::table.on(datasets::id
.eq(datasets_to_permission_groups::dataset_id)
.and(datasets_to_permission_groups::deleted_at.is_null())),
)
.inner_join(
permission_groups_to_identities::table.on(
datasets_to_permission_groups::permission_group_id
.eq(permission_groups_to_identities::permission_group_id)
.and(permission_groups_to_identities::deleted_at.is_null()),
),
)
.left_join(
teams_to_users::table.on(permission_groups_to_identities::identity_id
.eq(teams_to_users::team_id)
.and(permission_groups_to_identities::identity_type.eq(IdentityType::Team))
.and(teams_to_users::deleted_at.is_null())),
)
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name.nullable(),
users::email,
data_sources::id,
data_sources::name,
sql::<Nullable<Timestamptz>>("max(messages.created_at) as last_queried"),
))
.group_by((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name,
users::email,
data_sources::id,
data_sources::name,
))
.filter(datasets::deleted_at.is_null())
.filter(
permission_groups_to_identities::identity_id
.eq(user_id)
.or(teams_to_users::user_id.eq(user_id)),
)
.limit(page_size)
.offset(page * page_size)
.load::<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)>(&mut conn)
.await
{
Ok(datasets) => datasets,
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
};
let list_dataset_objects: Vec<ListDatasetObject> = list_dataset_records
.into_iter()
.map(
|(
id,
name,
created_at,
updated_at,
enabled,
imported,
user_id,
user_name,
user_email,
data_source_id,
data_source_name,
last_queried,
)| {
ListDatasetObject {
id,
name,
created_at: Some(created_at),
updated_at: Some(updated_at),
enabled: Some(enabled),
imported: Some(imported),
data_source: ListDatasetDataSource {
id: data_source_id,
name: data_source_name,
},
last_queried,
owner: Some(ListDatasetOwner {
id: user_id,
name: user_name.unwrap_or(user_email),
avatar_url: None,
}),
belongs_to: None,
}
},
)
.collect();
Ok(list_dataset_objects)
}
async fn get_org_datasets(
pool: &PgPool,
organization_id: &Uuid,
page: i64,
page_size: i64,
@ -289,7 +163,7 @@ async fn get_org_datasets(
imported: Option<bool>,
data_source_id: Option<Uuid>,
) -> Result<Vec<ListDatasetObject>> {
let mut conn = match pool.get().await {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
@ -409,62 +283,245 @@ async fn get_org_datasets(
Ok(list_dataset_objects)
}
async fn list_permission_group_datasets(
pool: &PgPool,
organization_id: Uuid,
async fn get_restricted_user_datasets(
user_id: &Uuid,
page: i64,
page_size: i64,
permission_group_id: Uuid,
) -> Result<Vec<ListDatasetObject>> {
let mut conn = match pool.get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
let direct_user_permissioned_datasets_handle = {
let user_id = user_id.clone();
tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
let result = match datasets::table
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
.inner_join(
dataset_permissions::table.on(dataset_permissions::dataset_id.eq(datasets::id)),
)
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name.nullable(),
users::email,
data_sources::id,
data_sources::name,
sql::<Nullable<Timestamptz>>("max(messages.created_at) as last_queried"),
))
.group_by((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name,
users::email,
data_sources::id,
data_sources::name,
))
.filter(dataset_permissions::permission_id.eq(user_id))
.filter(dataset_permissions::permission_type.eq("user"))
.filter(datasets::deleted_at.is_null())
.filter(datasets::enabled.eq(true))
.limit(page_size)
.offset(page * page_size)
.load::<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)>(&mut conn)
.await
{
Ok(datasets) => datasets,
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
};
Ok(result)
})
};
let list_dataset_records = match datasets::table
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
.left_join(
datasets_to_permission_groups::table.on(datasets::id
.eq(datasets_to_permission_groups::dataset_id)
.and(datasets_to_permission_groups::permission_group_id.eq(permission_group_id))
.and(datasets_to_permission_groups::deleted_at.is_null())),
)
.select((
datasets::id,
datasets::name,
data_sources::id,
data_sources::name,
datasets_to_permission_groups::permission_group_id.nullable(),
))
.filter(datasets::organization_id.eq(organization_id))
.filter(datasets::deleted_at.is_null())
.filter(datasets::enabled.eq(true))
.limit(page_size)
.offset(page * page_size)
.load::<(Uuid, String, Uuid, String, Option<Uuid>)>(&mut conn)
.await
{
Ok(datasets) => datasets,
// fetch permissions for user through permission group
let permission_group_datasets_handle = {
let user_id = user_id.clone();
tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
let permission_group_datasets: Vec<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)> = match datasets::table
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
.inner_join(
dataset_permissions::table.on(dataset_permissions::dataset_id.eq(datasets::id)),
)
.inner_join(
permission_groups_to_users::table
.on(permission_groups_to_users::user_id.eq(user_id)),
)
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name.nullable(),
users::email,
data_sources::id,
data_sources::name,
sql::<Nullable<Timestamptz>>("max(messages.created_at) as last_queried"),
))
.group_by((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name,
users::email,
data_sources::id,
data_sources::name,
))
.filter(
dataset_permissions::permission_id
.eq(permission_groups_to_users::permission_group_id),
)
.filter(dataset_permissions::permission_type.eq("group"))
.filter(datasets::deleted_at.is_null())
.filter(datasets::enabled.eq(true))
.limit(page_size)
.offset(page * page_size)
.load::<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)>(&mut conn)
.await
{
Ok(datasets) => datasets,
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
};
Ok(permission_group_datasets)
})
};
let mut all_datasets: Vec<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)> = Vec::new();
match direct_user_permissioned_datasets_handle.await {
Ok(Ok(direct_user_permissioned_datasets)) => {
all_datasets.extend(direct_user_permissioned_datasets)
}
Ok(Err(e)) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
};
}
let list_dataset_objects: Vec<ListDatasetObject> = list_dataset_records
match permission_group_datasets_handle.await {
Ok(Ok(permission_group_datasets)) => all_datasets.extend(permission_group_datasets),
Ok(Err(e)) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
}
// Deduplicate based on dataset id (first tuple element)
all_datasets.sort_by_key(|k| k.0);
all_datasets.dedup_by_key(|k| k.0);
let list_dataset_objects: Vec<ListDatasetObject> = all_datasets
.into_iter()
.map(
|(id, name, data_source_id, data_source_name, permission_group_id)| ListDatasetObject {
|(
id,
name,
created_at: None,
updated_at: None,
enabled: None,
imported: None,
data_source: ListDatasetDataSource {
id: data_source_id,
name: data_source_name,
},
last_queried: None,
owner: None,
belongs_to: Some(permission_group_id.is_some()),
created_at,
updated_at,
enabled,
imported,
user_id,
user_name,
user_email,
data_source_id,
data_source_name,
last_queried,
)| {
ListDatasetObject {
id,
name,
created_at: Some(created_at),
updated_at: Some(updated_at),
enabled: Some(enabled),
imported: Some(imported),
data_source: ListDatasetDataSource {
id: data_source_id,
name: data_source_name,
},
last_queried,
owner: Some(ListDatasetOwner {
id: user_id,
name: user_name.unwrap_or(user_email),
avatar_url: None,
}),
belongs_to: None,
}
},
)
.collect();

View File

@ -8,17 +8,19 @@ use diesel::{
};
use diesel_async::RunQueryDsl;
use uuid::Uuid;
use tokio;
use serde::{Deserialize, Serialize};
use crate::{
database::{
enums::IdentityType,
lib::get_pg_pool,
models::User,
enums::{IdentityType, UserOrganizationRole},
lib::{get_pg_pool, PgPool},
models::{User, UserToOrganization},
schema::{
data_sources, datasets, datasets_to_permission_groups, messages,
permission_groups_to_identities, teams_to_users, users,
data_sources, dataset_permissions, datasets, datasets_to_permission_groups, messages,
permission_groups_to_identities, permission_groups_to_users, teams_to_users, users,
users_to_organizations,
},
},
routes::ws::{
@ -92,7 +94,7 @@ allow_columns_to_appear_in_same_group_by_clause!(
pub async fn list_datasets(user: &User, req: ListDatasetsRequest) -> Result<()> {
let list_dashboards_res = match list_datasets_handler(
user.id,
&user.id,
req.page,
req.page_size,
req.admin_view,
@ -151,7 +153,7 @@ pub async fn list_datasets(user: &User, req: ListDatasetsRequest) -> Result<()>
}
async fn list_datasets_handler(
user_id: Uuid,
user_id: &Uuid,
page: Option<i64>,
page_size: Option<i64>,
admin_view: Option<bool>,
@ -165,31 +167,41 @@ async fn list_datasets_handler(
let page_size = page_size.unwrap_or(25);
let admin_view = admin_view.unwrap_or(false);
let organization_id = match get_user_organization_id(&user_id).await {
Ok(organization_id) => organization_id,
// Added this to handle the case where the user does not have an organization...
// Likely will need to be revisited in the future. TODO.
Err(_) => return Ok(vec![]),
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
let list_of_datasets = if let Some(permission_group_id) = permission_group_id {
list_permission_group_datasets(organization_id, page, page_size, permission_group_id)
// Right now we only allow users to have one organization this will change in the future
let user_organization_record = match users_to_organizations::table
.filter(users_to_organizations::user_id.eq(user_id))
.filter(users_to_organizations::deleted_at.is_null())
.select(users_to_organizations::all_columns)
.first::<UserToOrganization>(&mut conn)
.await
{
Ok(organization_id) => organization_id,
Err(e) => return Err(anyhow!("Unable to get organization from database: {}", e)),
};
let list_of_datasets = match &user_organization_record.role {
UserOrganizationRole::WorkspaceAdmin
| UserOrganizationRole::DataAdmin
| UserOrganizationRole::Querier => {
get_org_datasets(
&user_organization_record.organization_id,
page,
page_size,
enabled,
imported,
data_source_id,
)
.await?
} else {
match admin_view {
true => {
get_org_datasets(
&organization_id,
page,
page_size,
enabled,
imported,
data_source_id,
)
.await?
}
false => get_user_permissioned_datasets(&user_id, page, page_size).await?,
}
UserOrganizationRole::RestrictedQuerier => {
get_restricted_user_datasets(user_id, page, page_size).await?
}
UserOrganizationRole::Viewer => Vec::new(),
};
Ok(list_of_datasets)
@ -512,3 +524,249 @@ async fn list_permission_group_datasets(
Ok(list_dataset_objects)
}
async fn get_restricted_user_datasets(
user_id: &Uuid,
page: i64,
page_size: i64,
) -> Result<Vec<ListDatasetObject>> {
let direct_user_permissioned_datasets_handle = {
let user_id = user_id.clone();
tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
let result = match datasets::table
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
.inner_join(
dataset_permissions::table.on(dataset_permissions::dataset_id.eq(datasets::id)),
)
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name.nullable(),
users::email,
data_sources::id,
data_sources::name,
sql::<Nullable<Timestamptz>>("max(messages.created_at) as last_queried"),
))
.group_by((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name,
users::email,
data_sources::id,
data_sources::name,
))
.filter(dataset_permissions::permission_id.eq(user_id))
.filter(dataset_permissions::permission_type.eq("user"))
.filter(datasets::deleted_at.is_null())
.filter(datasets::enabled.eq(true))
.limit(page_size)
.offset(page * page_size)
.load::<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)>(&mut conn)
.await
{
Ok(datasets) => datasets,
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
};
Ok(result)
})
};
// fetch permissions for user through permission group
let permission_group_datasets_handle = {
let user_id = user_id.clone();
tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
let permission_group_datasets: Vec<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)> = match datasets::table
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
.inner_join(
dataset_permissions::table.on(dataset_permissions::dataset_id.eq(datasets::id)),
)
.inner_join(
permission_groups_to_users::table
.on(permission_groups_to_users::user_id.eq(user_id)),
)
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name.nullable(),
users::email,
data_sources::id,
data_sources::name,
sql::<Nullable<Timestamptz>>("max(messages.created_at) as last_queried"),
))
.group_by((
datasets::id,
datasets::name,
datasets::created_at,
datasets::updated_at,
datasets::enabled,
datasets::imported,
users::id,
users::name,
users::email,
data_sources::id,
data_sources::name,
))
.filter(
dataset_permissions::permission_id
.eq(permission_groups_to_users::permission_group_id),
)
.filter(dataset_permissions::permission_type.eq("group"))
.filter(datasets::deleted_at.is_null())
.filter(datasets::enabled.eq(true))
.limit(page_size)
.offset(page * page_size)
.load::<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)>(&mut conn)
.await
{
Ok(datasets) => datasets,
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
};
Ok(permission_group_datasets)
})
};
let mut all_datasets: Vec<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
bool,
bool,
Uuid,
Option<String>,
String,
Uuid,
String,
Option<DateTime<Utc>>,
)> = Vec::new();
match direct_user_permissioned_datasets_handle.await {
Ok(Ok(direct_user_permissioned_datasets)) => {
all_datasets.extend(direct_user_permissioned_datasets)
}
Ok(Err(e)) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
}
match permission_group_datasets_handle.await {
Ok(Ok(permission_group_datasets)) => all_datasets.extend(permission_group_datasets),
Ok(Err(e)) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
}
// Deduplicate based on dataset id (first tuple element)
all_datasets.sort_by_key(|k| k.0);
all_datasets.dedup_by_key(|k| k.0);
let list_dataset_objects: Vec<ListDatasetObject> = all_datasets
.into_iter()
.map(
|(
id,
name,
created_at,
updated_at,
enabled,
imported,
user_id,
user_name,
user_email,
data_source_id,
data_source_name,
last_queried,
)| {
ListDatasetObject {
id,
name,
created_at: Some(created_at),
updated_at: Some(updated_at),
enabled: Some(enabled),
imported: Some(imported),
data_source: ListDatasetDataSource {
id: data_source_id,
name: data_source_name,
},
last_queried,
owner: Some(ListDatasetOwner {
id: user_id,
name: user_name.unwrap_or(user_email),
avatar_url: None,
}),
belongs_to: None,
}
},
)
.collect();
Ok(list_dataset_objects)
}

3298
package-lock.json generated

File diff suppressed because it is too large Load Diff