fixes so far

This commit is contained in:
dal 2025-04-03 12:24:06 -06:00
parent f363caa7a7
commit 071574fd0e
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
14 changed files with 126 additions and 203 deletions

View File

@ -627,10 +627,11 @@ pub async fn process_metric_file(
Err(e) => return Err(format!("Database connection error: {}", e)),
};
let organization_id = get_user_organization_id(user_id)
.await
.map_err(|e| format!("Error getting organization: {}", e))?;
let organization_id = match get_user_organization_id(user_id).await {
Ok(Some(org_id)) => org_id,
Ok(None) => return Err("User does not belong to any organization".to_string()),
Err(e) => return Err(format!("Error getting organization: {}", e)),
};
// Generate deterministic UUID
let id = match generate_deterministic_uuid(&tool_call_id, &file_name, "metric") {

View File

@ -13,6 +13,7 @@ use crate::schema::users_to_organizations;
///
/// # Returns
/// * `Result<Option<Uuid>>` - The organization ID if found
/// Right now we are assuming each user belongs to only one organization, however that can change in teh future.
pub async fn get_user_organization_id(user_id: &Uuid) -> Result<Option<Uuid>> {
let mut conn = get_pg_pool().get().await?;

View File

@ -1,22 +1,20 @@
use anyhow::{anyhow, Result};
use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl, Queryable};
use diesel_async::RunQueryDsl;
use futures::future::{join};
use middleware::AuthenticatedUser;
use serde_yaml;
use uuid::Uuid;
use futures::future::{join, try_join};
use crate::metrics::types::{
BusterMetric, ColumnMetaData, ColumnType, DataMetadata, Dataset, MinMaxValue, SimpleType,
AssociatedDashboard, AssociatedCollection,
AssociatedCollection, AssociatedDashboard, BusterMetric, Dataset,
};
use database::enums::{AssetPermissionRole, AssetType, IdentityType};
use database::helpers::metric_files::fetch_metric_file_with_permissions;
use database::pool::get_pg_pool;
use database::schema::{
asset_permissions, datasets, users,
collections, collections_to_assets,
dashboard_files, metric_files_to_dashboard_files
asset_permissions, collections, collections_to_assets, dashboard_files, datasets,
metric_files_to_dashboard_files, users,
};
use sharing::check_permission_access;
@ -37,10 +35,15 @@ struct AssetPermissionInfo {
}
/// Fetch the dashboards associated with the given metric id
async fn fetch_associated_dashboards_for_metric(metric_id: Uuid) -> Result<Vec<AssociatedDashboard>> {
async fn fetch_associated_dashboards_for_metric(
metric_id: Uuid,
) -> Result<Vec<AssociatedDashboard>> {
let mut conn = get_pg_pool().get().await?;
let associated_dashboards = metric_files_to_dashboard_files::table
.inner_join(dashboard_files::table.on(dashboard_files::id.eq(metric_files_to_dashboard_files::dashboard_file_id)))
.inner_join(
dashboard_files::table
.on(dashboard_files::id.eq(metric_files_to_dashboard_files::dashboard_file_id)),
)
.filter(metric_files_to_dashboard_files::metric_file_id.eq(metric_id))
.filter(dashboard_files::deleted_at.is_null())
.filter(metric_files_to_dashboard_files::deleted_at.is_null())
@ -48,16 +51,15 @@ async fn fetch_associated_dashboards_for_metric(metric_id: Uuid) -> Result<Vec<A
.load::<(Uuid, String)>(&mut conn)
.await?
.into_iter()
.map(|(id, name)| AssociatedDashboard {
id,
name,
})
.map(|(id, name)| AssociatedDashboard { id, name })
.collect();
Ok(associated_dashboards)
}
/// Fetch the collections associated with the given metric id
async fn fetch_associated_collections_for_metric(metric_id: Uuid) -> Result<Vec<AssociatedCollection>> {
async fn fetch_associated_collections_for_metric(
metric_id: Uuid,
) -> Result<Vec<AssociatedCollection>> {
let mut conn = get_pg_pool().get().await?;
let associated_collections = collections_to_assets::table
.inner_join(collections::table.on(collections::id.eq(collections_to_assets::collection_id)))
@ -69,10 +71,7 @@ async fn fetch_associated_collections_for_metric(metric_id: Uuid) -> Result<Vec<
.load::<(Uuid, String)>(&mut conn)
.await?
.into_iter()
.map(|(id, name)| AssociatedCollection {
id,
name,
})
.map(|(id, name)| AssociatedCollection { id, name })
.collect();
Ok(associated_collections)
}
@ -100,7 +99,12 @@ pub async fn get_metric_handler(
// 2. Check if user has at least FullAccess permission
if !check_permission_access(
metric_file.permission,
&[AssetPermissionRole::FullAccess, AssetPermissionRole::Owner],
&[
AssetPermissionRole::FullAccess,
AssetPermissionRole::Owner,
AssetPermissionRole::Editor,
AssetPermissionRole::Viewer,
],
metric_file.metric_file.organization_id,
&user.organizations,
) {
@ -160,38 +164,10 @@ pub async fn get_metric_handler(
Err(e) => return Err(anyhow!("Failed to convert content to YAML: {}", e)),
};
let mut conn = get_pg_pool().get().await?;
// Data metadata is fetched directly from the metric_file database record
let data_metadata = metric_file.data_metadata;
// Parse data metadata from the selected version's MetricYml
let data_metadata = metric_content.data_metadata.map(|metadata| {
DataMetadata {
column_count: metadata.len() as i32,
column_metadata: metadata
.iter()
.map(|col| ColumnMetaData {
name: col.name.clone(),
min_value: MinMaxValue::Number(0.0), // Default value
max_value: MinMaxValue::Number(0.0), // Default value
unique_values: 0, // Default value
simple_type: match col.data_type.as_str() {
"string" => SimpleType::Text,
"number" => SimpleType::Number,
"boolean" => SimpleType::Boolean,
"date" => SimpleType::Date,
_ => SimpleType::Text,
},
column_type: match col.data_type.as_str() {
"string" => ColumnType::Text,
"number" => ColumnType::Number,
"boolean" => ColumnType::Boolean,
"date" => ColumnType::Date,
_ => ColumnType::Text,
},
})
.collect(),
row_count: 1, // Default value since it's not in the MetricYml structure
}
});
let mut conn = get_pg_pool().get().await?;
// Get dataset information for all dataset IDs
let mut datasets = Vec::new();
@ -260,23 +236,31 @@ pub async fn get_metric_handler(
let metrics_id_clone = *metric_id;
let dashboards_future = fetch_associated_dashboards_for_metric(metrics_id_clone);
let collections_future = fetch_associated_collections_for_metric(metrics_id_clone);
// Await both futures concurrently
let (dashboards_result, collections_result) = join(dashboards_future, collections_future).await;
// Handle results, logging errors but returning empty Vecs for failed tasks
let dashboards = match dashboards_result {
Ok(dashboards) => dashboards,
Err(e) => {
tracing::error!("Failed to fetch associated dashboards for metric {}: {}", metric_id, e);
tracing::error!(
"Failed to fetch associated dashboards for metric {}: {}",
metric_id,
e
);
vec![]
}
};
let collections = match collections_result {
Ok(collections) => collections,
Err(e) => {
tracing::error!("Failed to fetch associated collections for metric {}: {}", metric_id, e);
tracing::error!(
"Failed to fetch associated collections for metric {}: {}",
metric_id,
e
);
vec![]
}
};
@ -329,10 +313,8 @@ pub async fn get_metric_handler(
dashboards,
collections,
versions,
// Use the actual permission from the fetch operation
permission,
sql: metric_content.sql,
// New sharing fields
individual_permissions,
publicly_accessible: metric_file.publicly_accessible,
public_expiry_date: metric_file.public_expiry_date,

View File

@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use database::{enums::{AssetPermissionRole, Verification}, types::ChartConfig};
use database::{enums::{AssetPermissionRole, Verification}, types::{ChartConfig, DataMetadata}};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use std::collections::HashMap;
@ -85,80 +85,6 @@ pub struct Collection {
pub name: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DataMetadata {
pub column_count: i32,
pub column_metadata: Vec<ColumnMetaData>,
pub row_count: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ColumnMetaData {
pub name: String,
#[serde(rename = "min_value")]
pub min_value: MinMaxValue,
#[serde(rename = "max_value")]
pub max_value: MinMaxValue,
pub unique_values: i32,
pub simple_type: SimpleType,
#[serde(rename = "type")]
pub column_type: ColumnType,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(untagged)]
pub enum MinMaxValue {
String(String),
Number(f64),
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum SimpleType {
#[serde(rename = "text")]
Text,
#[serde(rename = "number")]
Number,
#[serde(rename = "date")]
Date,
#[serde(rename = "boolean")]
Boolean,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "lowercase")]
pub enum ColumnType {
Text,
Float,
Integer,
Date,
Float8,
Timestamp,
Timestamptz,
Bool,
Time,
Boolean,
Json,
Jsonb,
Int8,
Int4,
Int2,
Decimal,
Char,
#[serde(rename = "character varying")]
CharacterVarying,
Character,
Varchar,
Number,
Numeric,
Tinytext,
Mediumtext,
Longtext,
Nchar,
Nvarchat,
Ntext,
Float4,
}
// IDataResult equivalent
pub type DataResult = Option<Vec<HashMap<String, Option<DataValue>>>>;

View File

@ -5,15 +5,15 @@ use database::{
helpers::metric_files::fetch_metric_file_with_permissions,
pool::get_pg_pool,
schema::{datasets, metric_files},
types::{MetricYml, VersionContent, VersionHistory, data_metadata::DataMetadata},
types::{MetricYml, VersionContent, VersionHistory},
};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use middleware::AuthenticatedUser;
use query_engine::data_source_query_routes::query_engine::query_engine;
use serde_json::Value;
use sharing::check_permission_access;
use uuid::Uuid;
use query_engine::data_source_query_routes::query_engine;
/// Recursively merges two JSON objects.
/// The second object (update) takes precedence over the first (base) where there are conflicts.
@ -98,8 +98,31 @@ pub async fn update_metric_handler(
.await
.map_err(|e| anyhow!("Failed to get database connection: {}", e))?;
// Check if metric exists and user has access - use the latest version
// First, check if the user has access to the metric with the right permission level
let metric_file_with_permissions = fetch_metric_file_with_permissions(metric_id, &user.id)
.await
.map_err(|e| anyhow!("Failed to fetch metric file with permissions: {}", e))?;
let (permission, organization_id) = if let Some(file_with_permission) = metric_file_with_permissions {
(
file_with_permission.permission,
file_with_permission.metric_file.organization_id,
)
} else {
return Err(anyhow!("Metric file not found"));
};
// Verify the user has at least Editor, FullAccess, or Owner permission
if !check_permission_access(
permission,
&[AssetPermissionRole::Editor, AssetPermissionRole::FullAccess, AssetPermissionRole::Owner],
organization_id,
&user.organizations,
) {
return Err(anyhow!("You don't have permission to update this metric. Editor or higher role required."));
}
// Now get the full metric with all its data needed for the update
let metric = get_metric_handler(metric_id, user, None).await?;
// Get version history
@ -130,7 +153,7 @@ pub async fn update_metric_handler(
_ => return Err(anyhow!("Invalid version content type")),
}
// If file is provided, it takes precedence over individual fields
} else if let Some(file_content) = request.file {
} else if let Some(ref file_content) = request.file {
serde_yaml::from_str::<MetricYml>(&file_content)
.map_err(|e| anyhow!("Failed to parse provided file content: {}", e))?
} else {
@ -167,8 +190,8 @@ pub async fn update_metric_handler(
if let Some(title) = request.name {
content.name = title;
}
if let Some(sql) = request.sql {
content.sql = sql;
if let Some(ref sql) = request.sql {
content.sql = sql.clone();
}
content
};
@ -188,21 +211,28 @@ pub async fn update_metric_handler(
}
// Calculate data_metadata if SQL changed
let data_metadata = if request.sql.is_some() || request.file.is_some() || request.restore_to_version.is_some() {
let data_metadata = if request.sql.is_some()
|| request.file.is_some()
|| request.restore_to_version.is_some()
{
// Get data source for dataset
let dataset_id = content.dataset_ids.get(0).ok_or_else(|| anyhow!("No dataset ID found"))?;
let dataset_id = content
.dataset_ids
.get(0)
.ok_or_else(|| anyhow!("No dataset ID found"))?;
let data_source_id = datasets::table
.filter(datasets::id.eq(dataset_id))
.select(datasets::data_source_id)
.first::<Uuid>(&mut conn)
.await
.map_err(|e| anyhow!("Failed to get data source ID: {}", e))?;
// Execute query and get results with metadata
let query_result = query_engine(&data_source_id, &content.sql, Some(100)).await
let query_result = query_engine(&data_source_id, &content.sql, Some(100))
.await
.map_err(|e| anyhow!("Failed to execute SQL for metadata calculation: {}", e))?;
// Return metadata
Some(query_result.metadata)
} else {

View File

@ -102,10 +102,10 @@ async fn get_dataset_data_sample_handler(
Ok(data) => data,
Err(e) => {
tracing::error!("Error getting dataset data: {:?}", e);
Vec::new()
return Err(anyhow!("Error getting dataset data: {:?}", e));
}
}
};
Ok(data)
Ok(data.data)
}

View File

@ -13,7 +13,7 @@ use database::{
enums::UserOrganizationRole,
models::{ColumnMetadata, DataMetadataJsonBody, MinMaxValue},
pool::get_pg_pool,
schema::{data_sources, datasets, users_to_organizations},
schema::{data_sources, datasets, users_to_organizations}, types::DataMetadata,
};
use crate::{

View File

@ -226,10 +226,10 @@ async fn fetch_data_handler(subscription: &String, metric: &Metric, user: &Authe
let fetching_data_body = FetchingData {
progress: StepProgress::Completed,
data: if data.is_empty() {
data: if data.data.is_empty() {
Some(vec![])
} else {
Some(data)
Some(data.data)
},
metric_id: metric.id,
};

View File

@ -5,7 +5,6 @@ mod datasets;
mod organizations;
mod permissions;
mod search;
mod sql;
mod teams;
mod terms;
mod metrics;

View File

@ -1,2 +0,0 @@
pub mod sql_router;
mod run_sql;

View File

@ -1 +0,0 @@

View File

@ -1,39 +0,0 @@
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use middleware::AuthenticatedUser;
use super::run_sql::run_sql;
#[derive(Deserialize, Serialize, Debug, Clone)]
pub enum SqlRoute {
#[serde(rename = "/sql/run")]
Run,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub enum SqlEvent {
RunSql,
}
pub async fn sql_router(route: SqlRoute, data: Value, user: &AuthenticatedUser) -> Result<()> {
match route {
SqlRoute::Run => {
let req = serde_json::from_value(data)?;
run_sql(user, req).await?;
}
};
Ok(())
}
impl SqlRoute {
pub fn from_str(path: &str) -> Result<Self> {
match path {
"/sql/run" => Ok(Self::Run),
_ => Err(anyhow!("Invalid path")),
}
}
}

View File

@ -5,7 +5,6 @@ use std::{
time::{Duration, Instant},
};
use database::pool::get_redis_pool;
use async_compression::tokio::bufread::GzipDecoder;
use axum::{
extract::{
@ -15,6 +14,7 @@ use axum::{
response::IntoResponse,
Extension,
};
use database::pool::get_redis_pool;
use futures::{
sink::SinkExt,
stream::{SplitSink, StreamExt},
@ -33,7 +33,20 @@ use uuid::Uuid;
use middleware::AuthenticatedUser;
use super::{
collections::collections_router::CollectionEvent, dashboards::dashboards_router::DashboardEvent, data_sources::data_sources_router::DataSourceEvent, datasets::datasets_router::DatasetEvent, metrics::MetricEvent, organizations::organization_router::OrganizationEvent, permissions::permissions_router::PermissionEvent, search::search_router::SearchEvent, sql::sql_router::SqlEvent, teams::teams_routes::TeamEvent, terms::terms_router::TermEvent, threads_and_messages::threads_router::ThreadEvent, users::users_router::UserEvent, ws_router::{ws_router, WsRoutes}, ws_utils::{subscribe_to_stream, unsubscribe_from_stream}
collections::collections_router::CollectionEvent,
dashboards::dashboards_router::DashboardEvent,
data_sources::data_sources_router::DataSourceEvent,
datasets::datasets_router::DatasetEvent,
metrics::MetricEvent,
organizations::organization_router::OrganizationEvent,
permissions::permissions_router::PermissionEvent,
search::search_router::SearchEvent,
teams::teams_routes::TeamEvent,
terms::terms_router::TermEvent,
threads_and_messages::threads_router::ThreadEvent,
users::users_router::UserEvent,
ws_router::{ws_router, WsRoutes},
ws_utils::{subscribe_to_stream, unsubscribe_from_stream},
};
const CLIENT_TIMEOUT: Duration = Duration::from_secs(900);
@ -52,7 +65,6 @@ pub enum WsEvent {
Threads(ThreadEvent),
Dashboards(DashboardEvent),
Datasets(DatasetEvent),
Sql(SqlEvent),
Users(UserEvent),
Collections(CollectionEvent),
Teams(TeamEvent),
@ -220,7 +232,11 @@ pub async fn ws(
})
}
async fn ws_handler(stream: WebSocket, user: AuthenticatedUser, shutdown_tx: Arc<broadcast::Sender<()>>) {
async fn ws_handler(
stream: WebSocket,
user: AuthenticatedUser,
shutdown_tx: Arc<broadcast::Sender<()>>,
) {
let mut shutdown_rx = shutdown_tx.subscribe();
let (sender, mut receiver) = stream.split();

View File

@ -11,7 +11,19 @@ use crate::routes::ws::{
};
use super::{
collections::collections_router::{collections_router, CollectionRoute}, dashboards::dashboards_router::DashboardRoute, data_sources::data_sources_router::{data_sources_router, DataSourceRoute}, datasets::datasets_router::DatasetRoute, metrics::{metrics_router, MetricRoute}, organizations::organization_router::{organizations_router, OrganizationRoute}, permissions::permissions_router::{permissions_router, PermissionRoute}, search::search_router::{search_router, SearchRoute}, sql::sql_router::{sql_router, SqlRoute}, teams::teams_routes::{teams_router, TeamRoute}, terms::terms_router::{terms_router, TermRoute}, threads_and_messages::threads_router::{threads_router, ThreadRoute}, users::users_router::{users_router, UserRoute}, ws::SubscriptionRwLock
collections::collections_router::{collections_router, CollectionRoute},
dashboards::dashboards_router::DashboardRoute,
data_sources::data_sources_router::{data_sources_router, DataSourceRoute},
datasets::datasets_router::DatasetRoute,
metrics::{metrics_router, MetricRoute},
organizations::organization_router::{organizations_router, OrganizationRoute},
permissions::permissions_router::{permissions_router, PermissionRoute},
search::search_router::{search_router, SearchRoute},
teams::teams_routes::{teams_router, TeamRoute},
terms::terms_router::{terms_router, TermRoute},
threads_and_messages::threads_router::{threads_router, ThreadRoute},
users::users_router::{users_router, UserRoute},
ws::SubscriptionRwLock,
};
#[derive(Deserialize, Serialize, Debug, Clone)]
@ -22,7 +34,6 @@ pub enum WsRoutes {
Datasets(DatasetRoute),
Users(UserRoute),
Collections(CollectionRoute),
Sql(SqlRoute),
Teams(TeamRoute),
DataSources(DataSourceRoute),
Permissions(PermissionRoute),
@ -45,7 +56,6 @@ impl WsRoutes {
"datasets" => Ok(Self::Datasets(DatasetRoute::from_str(path)?)),
"users" => Ok(Self::Users(UserRoute::from_str(path)?)),
"collections" => Ok(Self::Collections(CollectionRoute::from_str(path)?)),
"sql" => Ok(Self::Sql(SqlRoute::from_str(path)?)),
"teams" => Ok(Self::Teams(TeamRoute::from_str(path)?)),
"data_sources" => Ok(Self::DataSources(DataSourceRoute::from_str(path)?)),
"permissions" => Ok(Self::Permissions(PermissionRoute::from_str(path)?)),