get metric handler in the dashboard handler

This commit is contained in:
dal 2025-04-16 17:01:20 -06:00
parent 8a203c74c2
commit 764bbfa344
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
3 changed files with 286 additions and 156 deletions

View File

@ -4,9 +4,10 @@ use database::{
enums::{AssetPermissionRole, AssetType}, enums::{AssetPermissionRole, AssetType},
helpers::collections::fetch_collection_with_permission, helpers::collections::fetch_collection_with_permission,
metric_files::fetch_metric_file_with_permissions, metric_files::fetch_metric_file_with_permissions,
chats::fetch_chat_with_permission,
models::CollectionToAsset, models::CollectionToAsset,
pool::get_pg_pool, pool::get_pg_pool,
schema::collections_to_assets, schema::{collections_to_assets, chats},
}; };
use diesel::{ExpressionMethods, QueryDsl}; use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
@ -113,11 +114,13 @@ pub async fn add_assets_to_collection_handler(
// 3. Group assets by type for efficient processing // 3. Group assets by type for efficient processing
let mut dashboard_ids = Vec::new(); let mut dashboard_ids = Vec::new();
let mut metric_ids = Vec::new(); let mut metric_ids = Vec::new();
let mut chat_ids = Vec::new();
for asset in &assets { for asset in &assets {
match asset.asset_type { match asset.asset_type {
AssetType::DashboardFile => dashboard_ids.push(asset.id), AssetType::DashboardFile => dashboard_ids.push(asset.id),
AssetType::MetricFile => metric_ids.push(asset.id), AssetType::MetricFile => metric_ids.push(asset.id),
AssetType::Chat => chat_ids.push(asset.id),
_ => { _ => {
error!( error!(
asset_id = %asset.id, asset_id = %asset.id,
@ -447,6 +450,119 @@ pub async fn add_assets_to_collection_handler(
} }
} }
// Process chats
if !chat_ids.is_empty() {
for chat_id in &chat_ids {
// Check if chat exists and user has permission
let chat_permission = fetch_chat_with_permission(&chat_id, &user.id).await?;
let chat_permission = if let Some(cp) = chat_permission {
cp
} else {
error!(chat_id = %chat_id, user_id = %user.id, "Chat not found");
result.failed_count += 1;
result.failed_assets.push(( *chat_id, AssetType::Chat, "Chat not found".to_string()));
continue;
};
// Check if user has at least CanView access to the chat
let has_chat_permission = check_permission_access(
chat_permission.permission,
&[
AssetPermissionRole::CanView,
AssetPermissionRole::CanEdit,
AssetPermissionRole::FullAccess,
AssetPermissionRole::Owner,
],
chat_permission.chat.organization_id,
&user.organizations,
);
if !has_chat_permission {
error!(chat_id = %chat_id, user_id = %user.id, "User does not have permission to access this chat");
result.failed_count += 1;
result.failed_assets.push(( *chat_id, AssetType::Chat, "Insufficient permissions".to_string()));
continue;
}
// Check if the chat is already in the collection
let existing = match collections_to_assets::table
.filter(collections_to_assets::collection_id.eq(collection_id))
.filter(collections_to_assets::asset_id.eq(chat_id))
.filter(collections_to_assets::asset_type.eq(AssetType::Chat))
.first::<CollectionToAsset>(&mut conn)
.await
{
Ok(record) => Some(record),
Err(diesel::NotFound) => None,
Err(e) => {
error!("Error checking if chat is already in collection: {}", e);
result.failed_count += 1;
result.failed_assets.push(( *chat_id, AssetType::Chat, format!("Database error: {}", e)));
continue;
}
};
if let Some(existing_record) = existing {
if existing_record.deleted_at.is_some() {
// If it was previously deleted, update it
match diesel::update(collections_to_assets::table)
.filter(collections_to_assets::collection_id.eq(collection_id))
.filter(collections_to_assets::asset_id.eq(chat_id))
.filter(collections_to_assets::asset_type.eq(AssetType::Chat))
.set((
collections_to_assets::deleted_at.eq::<Option<chrono::DateTime<chrono::Utc>>>(None),
collections_to_assets::updated_at.eq(chrono::Utc::now()),
collections_to_assets::updated_by.eq(user.id),
))
.execute(&mut conn)
.await
{
Ok(_) => {
result.added_count += 1;
}
Err(e) => {
error!(collection_id = %collection_id, chat_id = %chat_id, "Error updating chat in collection: {}", e);
result.failed_count += 1;
result.failed_assets.push(( *chat_id, AssetType::Chat, format!("Database error: {}", e)));
}
}
} else {
// Already in the collection
info!(collection_id = %collection_id, chat_id = %chat_id, "Chat already in collection");
result.added_count += 1;
}
} else {
// Add to collection
let new_record = CollectionToAsset {
collection_id: *collection_id,
asset_id: *chat_id,
asset_type: AssetType::Chat,
created_at: chrono::Utc::now(),
created_by: user.id,
updated_at: chrono::Utc::now(),
updated_by: user.id,
deleted_at: None,
};
match diesel::insert_into(collections_to_assets::table)
.values(&new_record)
.execute(&mut conn)
.await
{
Ok(_) => {
result.added_count += 1;
}
Err(e) => {
error!(collection_id = %collection_id, chat_id = %chat_id, "Error adding chat to collection: {}", e);
result.failed_count += 1;
result.failed_assets.push(( *chat_id, AssetType::Chat, format!("Database error: {}", e)));
}
}
}
}
}
Ok(result) Ok(result)
} }

View File

@ -6,6 +6,7 @@ use database::{
pool::get_pg_pool, pool::get_pg_pool,
schema::{ schema::{
asset_permissions, collections_to_assets, dashboard_files, metric_files, users, asset_permissions, collections_to_assets, dashboard_files, metric_files, users,
chats,
}, },
}; };
use diesel::{ExpressionMethods, JoinOnDsl, NullableExpressionMethods, QueryDsl, Queryable}; use diesel::{ExpressionMethods, JoinOnDsl, NullableExpressionMethods, QueryDsl, Queryable};
@ -165,70 +166,141 @@ pub async fn get_collection_handler(
Err(_) => (false, None, None), Err(_) => (false, None, None),
}; };
// Query for metric assets in the collection // Get the pool once
let metric_assets_result = collections_to_assets::table let pool = get_pg_pool();
.inner_join(metric_files::table.on(metric_files::id.eq(collections_to_assets::asset_id)))
.left_join(users::table.on(users::id.eq(metric_files::created_by)))
.filter(collections_to_assets::collection_id.eq(req.id))
.filter(collections_to_assets::asset_type.eq(AssetType::MetricFile))
.filter(collections_to_assets::deleted_at.is_null())
.filter(metric_files::deleted_at.is_null())
.select((
metric_files::id,
metric_files::name,
users::name.nullable(),
users::email.nullable(),
metric_files::created_at,
metric_files::updated_at,
collections_to_assets::asset_type,
))
.load::<AssetQueryResult>(&mut conn)
.await;
// Query for dashboard assets in the collection // Spawn tasks for fetching assets concurrently
let dashboard_assets_result = collections_to_assets::table let metric_assets_handle = tokio::spawn({
.inner_join( let pool = pool.clone();
dashboard_files::table.on(dashboard_files::id.eq(collections_to_assets::asset_id)), let req_id = req.id;
) async move {
.left_join(users::table.on(users::id.eq(dashboard_files::created_by))) let mut conn = pool.get().await.map_err(anyhow::Error::from)?;
.filter(collections_to_assets::collection_id.eq(req.id)) collections_to_assets::table
.filter(collections_to_assets::asset_type.eq(AssetType::DashboardFile)) .inner_join(metric_files::table.on(metric_files::id.eq(collections_to_assets::asset_id)))
.filter(collections_to_assets::deleted_at.is_null()) .left_join(users::table.on(users::id.eq(metric_files::created_by)))
.filter(dashboard_files::deleted_at.is_null()) .filter(collections_to_assets::collection_id.eq(req_id))
.select(( .filter(collections_to_assets::asset_type.eq(AssetType::MetricFile))
dashboard_files::id, .filter(collections_to_assets::deleted_at.is_null())
dashboard_files::name, .filter(metric_files::deleted_at.is_null())
users::name.nullable(), .select((
users::email.nullable(), metric_files::id,
dashboard_files::created_at, metric_files::name,
dashboard_files::updated_at, users::name.nullable(),
collections_to_assets::asset_type, users::email.nullable(),
)) metric_files::created_at,
.load::<AssetQueryResult>(&mut conn) metric_files::updated_at,
.await; collections_to_assets::asset_type,
))
.load::<AssetQueryResult>(&mut conn)
.await
.map_err(anyhow::Error::from)
}
});
let dashboard_assets_handle = tokio::spawn({
let pool = pool.clone();
let req_id = req.id;
async move {
let mut conn = pool.get().await.map_err(anyhow::Error::from)?;
collections_to_assets::table
.inner_join(dashboard_files::table.on(dashboard_files::id.eq(collections_to_assets::asset_id)))
.left_join(users::table.on(users::id.eq(dashboard_files::created_by)))
.filter(collections_to_assets::collection_id.eq(req_id))
.filter(collections_to_assets::asset_type.eq(AssetType::DashboardFile))
.filter(collections_to_assets::deleted_at.is_null())
.filter(dashboard_files::deleted_at.is_null())
.select((
dashboard_files::id,
dashboard_files::name,
users::name.nullable(),
users::email.nullable(),
dashboard_files::created_at,
dashboard_files::updated_at,
collections_to_assets::asset_type,
))
.load::<AssetQueryResult>(&mut conn)
.await
.map_err(anyhow::Error::from)
}
});
let chat_assets_handle = tokio::spawn({
let pool = pool.clone();
let req_id = req.id;
async move {
let mut conn = pool.get().await.map_err(anyhow::Error::from)?;
collections_to_assets::table
.inner_join(chats::table.on(chats::id.eq(collections_to_assets::asset_id)))
.left_join(users::table.on(users::id.eq(chats::created_by)))
.filter(collections_to_assets::collection_id.eq(req_id))
.filter(collections_to_assets::asset_type.eq(AssetType::Chat))
.filter(collections_to_assets::deleted_at.is_null())
.filter(chats::deleted_at.is_null())
.select((
chats::id,
chats::title, // Use title as name for chats
users::name.nullable(),
users::email.nullable(),
chats::created_at,
chats::updated_at,
collections_to_assets::asset_type,
))
.load::<AssetQueryResult>(&mut conn)
.await
.map_err(anyhow::Error::from)
}
});
// Await all tasks and handle results
let (metric_assets_result, dashboard_assets_result, chat_assets_result) = tokio::join!(
metric_assets_handle,
dashboard_assets_handle,
chat_assets_handle
);
// Process metric assets // Process metric assets
let metric_assets = match metric_assets_result { let metric_assets = match metric_assets_result {
Ok(assets) => assets, Ok(Ok(assets)) => assets,
Err(e) => { Ok(Err(e)) => {
tracing::error!("Failed to fetch metric assets: {}", e); tracing::error!("Failed to fetch metric assets: {}", e);
vec![] vec![]
} }
Err(e) => {
tracing::error!("Metric asset task failed: {}", e);
vec![]
}
}; };
// Process dashboard assets // Process dashboard assets
let dashboard_assets = match dashboard_assets_result { let dashboard_assets = match dashboard_assets_result {
Ok(assets) => assets, Ok(Ok(assets)) => assets,
Err(e) => { Ok(Err(e)) => {
tracing::error!("Failed to fetch dashboard assets: {}", e); tracing::error!("Failed to fetch dashboard assets: {}", e);
vec![] vec![]
} }
Err(e) => {
tracing::error!("Dashboard asset task failed: {}", e);
vec![]
}
}; };
println!("dashboard_assets: {:?}", dashboard_assets); // Process chat assets
let chat_assets = match chat_assets_result {
Ok(Ok(assets)) => assets,
Ok(Err(e)) => {
tracing::error!("Failed to fetch chat assets: {}", e);
vec![]
}
Err(e) => {
tracing::error!("Chat asset task failed: {}", e);
vec![]
}
};
// println!("dashboard_assets: {:?}", dashboard_assets); // Keep or remove debug print?
// Combine all assets // Combine all assets
let all_assets = [metric_assets, dashboard_assets].concat(); let all_assets = [metric_assets, dashboard_assets, chat_assets].concat(); // Add chat_assets
let formatted_assets = format_assets(all_assets); let formatted_assets = format_assets(all_assets);
// Create collection state // Create collection state

View File

@ -12,7 +12,8 @@ use tokio::task::JoinHandle;
use uuid::Uuid; use uuid::Uuid;
use crate::dashboards::types::{BusterShareIndividual, DashboardCollection}; use crate::dashboards::types::{BusterShareIndividual, DashboardCollection};
use crate::metrics::{get_metric_handler, BusterMetric, Dataset, Version}; use crate::metrics::get_metric_handler;
use crate::metrics::{BusterMetric, Dataset, Version};
use database::enums::{AssetPermissionRole, AssetType, IdentityType, Verification}; use database::enums::{AssetPermissionRole, AssetType, IdentityType, Verification};
use database::helpers::dashboard_files::fetch_dashboard_file_with_permission; use database::helpers::dashboard_files::fetch_dashboard_file_with_permission;
use database::pool::get_pg_pool; use database::pool::get_pg_pool;
@ -86,112 +87,6 @@ async fn fetch_associated_collections_for_dashboard(
Ok(associated_collections) Ok(associated_collections)
} }
/// Fetches minimal metric data for display on a dashboard, enforcing a specific permission level.
async fn get_metrics_for_dashboard(
metric_ids: &[Uuid],
dashboard_permission: AssetPermissionRole,
// user: &AuthenticatedUser, // We might not need the user if we aren't doing fine-grained checks here
) -> Result<HashMap<Uuid, BusterMetric>> {
if metric_ids.is_empty() {
return Ok(HashMap::new());
}
let mut conn = get_pg_pool().get().await?;
// Fetch all required MetricFile records in one query
let metric_files_data = metric_files::table
.filter(metric_files::id.eq_any(metric_ids))
.filter(metric_files::deleted_at.is_null()) // Ensure metrics aren't deleted
.load::<database::models::MetricFile>(&mut conn)
.await?;
let mut metrics_map = HashMap::new();
// Process each fetched metric file
for metric_file in metric_files_data {
// Use latest content directly from the MetricFile record
let metric_content: MetricYml = metric_file.content;
let version_num = metric_file.version_history.get_version_number();
// Convert content to YAML string for the 'file' field
let file_yaml = match serde_yaml::to_string(&metric_content) {
Ok(yaml) => yaml,
Err(e) => {
tracing::error!(metric_id = %metric_file.id, error = %e, "Failed to serialize metric content to YAML for dashboard view");
// Skip this metric or handle error appropriately
continue;
}
};
// Simplified dataset fetching (if needed, adapt from get_metric_handler if complex logic required)
let mut datasets = Vec::new();
let mut first_data_source_id = None;
// Placeholder: Fetching actual dataset names might require another query or joining.
// For now, create dummy datasets based on IDs if necessary for the type.
if !metric_content.dataset_ids.is_empty() {
first_data_source_id = Some(Uuid::nil()); // Placeholder - needs actual lookup if required
datasets = metric_content
.dataset_ids
.iter()
.map(|id| Dataset {
id: id.to_string(),
name: format!("Dataset {}", id), // Placeholder name
})
.collect();
}
// Construct the BusterMetric, forcing the dashboard's permission level
let buster_metric = BusterMetric {
id: metric_file.id,
metric_type: "metric".to_string(), // Assuming standard type
name: metric_file.name,
version_number: version_num,
description: metric_content.description,
file_name: metric_file.file_name,
time_frame: metric_content.time_frame,
datasets, // Using simplified dataset info
data_source_id: first_data_source_id.map_or("".to_string(), |id| id.to_string()),
error: None, // Assuming no error fetching this basic view
chart_config: Some(metric_content.chart_config),
data_metadata: metric_file.data_metadata,
status: metric_file.verification,
evaluation_score: metric_file.evaluation_score.map(|score| {
// Simple mapping
if score >= 0.8 {
"High".to_string()
} else if score >= 0.5 {
"Moderate".to_string()
} else {
"Low".to_string()
}
}),
evaluation_summary: metric_file.evaluation_summary.unwrap_or_default(),
file: file_yaml,
created_at: metric_file.created_at,
updated_at: metric_file.updated_at,
sent_by_id: metric_file.created_by,
sent_by_name: "".to_string(), // Placeholder - requires user lookup
sent_by_avatar_url: None, // Placeholder - requires user lookup
code: None, // Assuming not needed for dashboard view
dashboards: vec![], // Omit associated dashboards/collections for simplicity
collections: vec![], // Omit associated dashboards/collections for simplicity
versions: vec![], // Omit detailed versions for simplicity
permission: dashboard_permission, // <<< Force the permission level here
sql: metric_content.sql,
// Omit detailed sharing info for simplicity in this view
individual_permissions: None,
publicly_accessible: metric_file.publicly_accessible,
public_expiry_date: metric_file.public_expiry_date,
public_enabled_by: None, // Placeholder - requires user lookup
public_password: metric_file.public_password,
};
metrics_map.insert(buster_metric.id, buster_metric);
}
Ok(metrics_map)
}
pub async fn get_dashboard_handler( pub async fn get_dashboard_handler(
dashboard_id: &Uuid, dashboard_id: &Uuid,
user: &AuthenticatedUser, user: &AuthenticatedUser,
@ -351,8 +246,55 @@ pub async fn get_dashboard_handler(
}) })
.collect(); .collect();
// Fetch metrics using the specialized function, enforcing dashboard permission // Fetch metrics concurrently using get_metric_handler
let metrics = get_metrics_for_dashboard(&metric_ids, permission).await?; let mut metric_fetch_handles = Vec::new();
for metric_id in metric_ids {
let user_clone = user.clone(); // Clone user for the spawned task
// Spawn a task for each metric fetch.
// Pass None for version_number and password as the dashboard view uses the latest metric
// and access is primarily determined by dashboard permissions.
let handle = tokio::spawn(async move {
get_metric_handler(&metric_id, &user_clone, None, None).await
});
metric_fetch_handles.push((metric_id, handle));
}
// Await all metric fetch tasks and collect results
let metric_results = join_all(
metric_fetch_handles
.into_iter()
.map(|(_, handle)| handle),
)
.await;
// Process results and build the metrics map
let mut metrics = HashMap::new();
for result in metric_results {
match result {
Ok(Ok(metric)) => {
// Successfully fetched metric
metrics.insert(metric.id, metric);
}
Ok(Err(e)) => {
// get_metric_handler returned an error
// Log the error, but don't fail the entire dashboard load
tracing::error!(
"Failed to fetch metric for dashboard {}: {}",
dashboard_id,
e
);
// Optionally, insert a placeholder or error metric into the map
}
Err(e) => {
// Task join error (panic)
tracing::error!(
"Task join error fetching metric for dashboard {}: {}",
dashboard_id,
e
);
}
}
}
// Query individual permissions for this dashboard // Query individual permissions for this dashboard
let individual_permissions_query = asset_permissions::table let individual_permissions_query = asset_permissions::table