diff --git a/api/server/src/routes/rest/routes/datasets/deploy_datasets.rs b/api/server/src/routes/rest/routes/datasets/deploy_datasets.rs index 68f048d64..ee6c07f3d 100644 --- a/api/server/src/routes/rest/routes/datasets/deploy_datasets.rs +++ b/api/server/src/routes/rest/routes/datasets/deploy_datasets.rs @@ -5,26 +5,24 @@ use axum::{extract::Json, Extension}; use chrono::{DateTime, Utc}; use diesel::{upsert::excluded, ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use query_engine::credentials::get_data_source_credentials; +use middleware::AuthenticatedUser; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use uuid::Uuid; -use middleware::AuthenticatedUser; + +// Import from handlers library +use handlers::utils::user::user_info::get_user_organization_id; use crate::{ database::{ enums::DatasetType, - pool::get_pg_pool, models::{DataSource, Dataset, DatasetColumn}, + pool::get_pg_pool, schema::{data_sources, dataset_columns, datasets}, }, routes::rest::ApiResponse, - utils::{ - security::checks::is_user_workspace_admin_or_data_admin, - user::user_info::get_user_organization_id, - validation::{ValidationError, ValidationResult}, - }, + utils::security::checks::is_user_workspace_admin_or_data_admin, }; #[derive(Debug, Deserialize)] @@ -271,103 +269,47 @@ async fn handle_deploy_datasets( async fn deploy_datasets_handler( user_id: &Uuid, requests: Vec, - is_simple: bool, + _is_simple: bool, ) -> Result> { let organization_id = get_user_organization_id(user_id).await?; let mut conn = get_pg_pool().get().await?; let mut results = Vec::new(); - // Group requests by data source and database for efficient validation - let mut data_source_groups: HashMap<(String, Option), Vec<&DeployDatasetsRequest>> = HashMap::new(); + // Group requests by data source and database for efficient processing + let mut data_source_groups: HashMap< + (String, String, Option), + Vec<&DeployDatasetsRequest>, + > = HashMap::new(); for req in &requests { + // Group by data_source_name, env, and optional database data_source_groups - .entry((req.data_source_name.clone(), req.database.clone())) + .entry(( + req.data_source_name.clone(), + req.env.clone(), + req.database.clone(), + )) .or_default() .push(req); } // Process each data source group - for ((data_source_name, database), group) in data_source_groups { - - // Get data source + for ((data_source_name, env, _database), group) in data_source_groups { + // Get data source ID - still needed to link datasets let data_source = match data_sources::table .filter(data_sources::name.eq(&data_source_name)) - .filter(data_sources::env.eq(&group[0].env)) + .filter(data_sources::env.eq(&env)) // Use env from group key .filter(data_sources::organization_id.eq(&organization_id)) .filter(data_sources::deleted_at.is_null()) - .select(data_sources::all_columns) + .select(data_sources::all_columns) // Select needed columns, maybe just ID .first::(&mut conn) .await { Ok(ds) => ds, - Err(_) => { - for req in group { - let mut validation = ValidationResult::new( - req.name.clone(), - req.data_source_name.clone(), - req.schema.clone(), - ); - validation.add_error(ValidationError::data_source_error(format!( - "Data source '{}' not found", - data_source_name - ))); - results.push(validation); - } - continue; - } - }; - - // Get credentials for the data source - let credentials = match get_data_source_credentials(&data_source.id, &data_source.type_, false).await { - Ok(creds) => creds, - Err(e) => { - for req in group { - let mut validation = ValidationResult::new( - req.name.clone(), - req.data_source_name.clone(), - req.schema.clone(), - ); - validation.add_error(ValidationError::data_source_error(format!( - "Failed to get data source credentials: {}", - e - ))); - results.push(validation); - } - continue; - } - }; - - // Prepare tables for batch validation - let tables_to_validate: Vec<(String, String)> = group - .iter() - .map(|req| (req.name.clone(), req.schema.clone())) - .collect(); - - tracing::info!( - "Validating tables for data source '{:?}.{:?}': {:?}", - data_source_name, - database, - tables_to_validate - ); - - // Get all columns in one batch - this acts as our validation - let ds_columns = match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, database).await { - Ok(cols) => { - // Add debug logging - tracing::info!( - "Retrieved {} columns for data source '{}'. Tables found: {:?}", - cols.len(), - data_source_name, - cols.iter() - .map(|c| format!("{}.{}", c.schema_name, c.dataset_name)) - .collect::>() - ); - cols - }, Err(e) => { tracing::error!( - "Error retrieving columns for data source '{}': {:?}", + "Data source '{}' not found for env '{}': {}", data_source_name, + env, e ); for req in group { @@ -377,613 +319,402 @@ async fn deploy_datasets_handler( req.schema.clone(), ); validation.add_error(ValidationError::data_source_error(format!( - "Failed to get columns from data source: {}", - e + "Data source '{}' not found for environment '{}'", + data_source_name, env ))); results.push(validation); } - continue; + continue; // Skip this group if data source not found } }; - // Create a map of valid datasets and their columns - let mut valid_datasets = Vec::new(); - let mut dataset_columns_map: HashMap> = HashMap::new(); - - for req in group { + // Process all requests in the group directly + let mut datasets_to_upsert_map: HashMap<(String, Uuid), Dataset> = HashMap::new(); + let mut columns_to_upsert_map: HashMap> = HashMap::new(); + let mut dataset_req_map: HashMap = HashMap::new(); // To map back after upsert + + for req in group.clone() { + // Assume success unless DB operations fail let mut validation = ValidationResult::new( req.name.clone(), req.data_source_name.clone(), req.schema.clone(), ); + validation.success = true; // Start as successful + results.push(validation); // Add to results now - // Get columns for this dataset - let columns: Vec<_> = ds_columns - .iter() - .filter(|col| { - let name_match = col.dataset_name.to_lowercase() == req.name.to_lowercase(); - let schema_match = col.schema_name.to_lowercase() == req.schema.to_lowercase(); - - // Add detailed debug logging for column matching - tracing::info!( - "Matching table '{}.{}': name_match={}, schema_match={} (comparing against {}.{})", - col.schema_name, - col.dataset_name, - name_match, - schema_match, - req.schema, + let now = Utc::now(); + let dataset_id = req.id.unwrap_or_else(Uuid::new_v4); + + // Prepare Dataset for upsert + let dataset = Dataset { + id: dataset_id, + name: req.name.clone(), + data_source_id: data_source.id, // Use fetched data source ID + created_at: now, + updated_at: now, + database_name: req.name.clone(), // Use model name as database_name + when_to_use: Some(req.description.clone()), + when_not_to_use: None, + type_: DatasetType::View, // Assuming View, adjust if needed + definition: req.sql_definition.clone().unwrap_or_default(), + schema: req.schema.clone(), + enabled: true, + created_by: user_id.clone(), + updated_by: user_id.clone(), + deleted_at: None, + imported: false, // Mark as not imported from source + organization_id: organization_id.clone(), + model: req.model.clone(), + yml_file: req.yml_file.clone(), + database_identifier: req.database.clone(), + }; + // Use HashMap for deduplication before bulk insert + datasets_to_upsert_map.insert((req.name.clone(), data_source.id), dataset); + dataset_req_map.insert(req.name.clone(), req); // Store request for column processing later + + // Prepare Columns for this dataset + let mut current_dataset_columns = Vec::new(); + let mut column_name_set = HashSet::new(); // For deduplication within request + + for col_req in &req.columns { + // Deduplicate columns within the same request + if column_name_set.insert(col_req.name.clone()) { + let dataset_column = DatasetColumn { + id: Uuid::new_v4(), + dataset_id: dataset_id, // Temporarily use generated/provided ID + name: col_req.name.clone(), + type_: col_req.type_.clone().unwrap_or_else(|| "text".to_string()), + description: Some(col_req.description.clone()), + nullable: true, // Assume nullable, source info not available + created_at: now, + updated_at: now, + deleted_at: None, + stored_values: None, // Fields related to stored values nullified + stored_values_status: None, + stored_values_error: None, + stored_values_count: None, + stored_values_last_synced: None, + semantic_type: col_req.semantic_type.clone(), + dim_type: col_req.type_.clone(), // Use type_ for dim_type? Adjust if needed + expr: col_req.expr.clone(), + }; + current_dataset_columns.push(dataset_column); + } else { + tracing::warn!( + "Duplicate column '{}' found in request for dataset '{}'. Skipping.", + col_req.name, req.name ); - - name_match && schema_match - }) - .collect(); - - if columns.is_empty() { - tracing::warn!( - "No columns found for dataset '{}' in schema '{}'. Available tables:\n{}", - req.name, - req.schema, - ds_columns - .iter() - .map(|c| format!(" - {}.{}", c.schema_name, c.dataset_name)) - .collect::>() - .join("\n") - ); - validation.add_error(ValidationError::table_not_found(&format!( - "{}.{}", - req.schema, - req.name - ))); - validation.success = false; - } else { - tracing::info!( - "✅ Found {} columns for dataset '{}.{}'", - columns.len(), - req.schema, - req.name - ); - validation.success = true; - valid_datasets.push(req); - dataset_columns_map.insert(req.name.clone(), columns); + } } - - results.push(validation); + columns_to_upsert_map.insert(req.name.clone(), current_dataset_columns); } - // Bulk upsert valid datasets - if !valid_datasets.is_empty() { - let now = Utc::now(); - - // Get existing dataset IDs for this data source - let existing_datasets: HashSet = datasets::table - .filter(datasets::data_source_id.eq(&data_source.id)) - .filter(datasets::deleted_at.is_null()) - .select(datasets::name) - .load::(&mut conn) - .await? - .into_iter() - .collect(); + // ---- BULK UPSERT DATA ---- + let datasets_to_upsert: Vec = datasets_to_upsert_map.into_values().collect(); + + if !datasets_to_upsert.is_empty() { + let now = Utc::now(); // Re-capture now for upsert timestamps - // Prepare datasets for upsert with deduplication - // Use HashMap for deduplication, keyed by (database_name, data_source_id) - // The composite key ensures we don't have duplicates in the ON CONFLICT clause - let mut dataset_map: HashMap<(String, Uuid), Dataset> = HashMap::new(); - - for req in valid_datasets.iter() { - // Create a composite key using database_name and data_source_id - let key = (req.name.clone(), data_source.id); - - // Create dataset - let dataset = Dataset { - id: req.id.unwrap_or_else(Uuid::new_v4), - name: req.name.clone(), - data_source_id: data_source.id, - created_at: now, - updated_at: now, - database_name: req.name.clone(), - when_to_use: Some(req.description.clone()), - when_not_to_use: None, - type_: DatasetType::View, - definition: req.sql_definition.clone().unwrap_or_default(), - schema: req.schema.clone(), - enabled: true, - created_by: user_id.clone(), - updated_by: user_id.clone(), - deleted_at: None, - imported: false, - organization_id: organization_id.clone(), - model: req.model.clone(), - yml_file: req.yml_file.clone(), - database_identifier: req.database.clone(), - }; - - // Only insert if it doesn't exist or replace if it does - dataset_map.insert(key, dataset); - } - - // Convert map values to Vec for insert - let datasets_to_upsert: Vec = dataset_map.into_values().collect(); - // Log deduplication results - if datasets_to_upsert.len() < valid_datasets.len() { + if datasets_to_upsert.len() < group.len() { tracing::info!( - "Deduplicated {} datasets down to {} unique datasets", - valid_datasets.len(), - datasets_to_upsert.len() + "Deduplicated {} dataset requests down to {} unique datasets for data source '{}'", + group.len(), + datasets_to_upsert.len(), + data_source_name ); - - // Detailed logging of found duplicates - let mut dataset_names = HashSet::new(); - let mut duplicates = Vec::new(); - - for req in valid_datasets.iter() { - if !dataset_names.insert(req.name.clone()) { - duplicates.push(req.name.clone()); - } - } - - if !duplicates.is_empty() { - tracing::info!("Found duplicate dataset names: {:?}", duplicates); - } } // Bulk upsert datasets - diesel::insert_into(datasets::table) + // Using ON CONFLICT (database_name, data_source_id) + match diesel::insert_into(datasets::table) .values(&datasets_to_upsert) .on_conflict((datasets::database_name, datasets::data_source_id)) .do_update() .set(( - datasets::updated_at.eq(excluded(datasets::updated_at)), + datasets::id.eq(excluded(datasets::id)), // Ensure ID is updated on conflict if new one generated + datasets::name.eq(excluded(datasets::name)), + datasets::updated_at.eq(now), // Use current time for update datasets::updated_by.eq(excluded(datasets::updated_by)), datasets::definition.eq(excluded(datasets::definition)), datasets::when_to_use.eq(excluded(datasets::when_to_use)), datasets::model.eq(excluded(datasets::model)), datasets::yml_file.eq(excluded(datasets::yml_file)), datasets::schema.eq(excluded(datasets::schema)), - datasets::name.eq(excluded(datasets::name)), - datasets::deleted_at.eq(None::>), + datasets::database_identifier.eq(excluded(datasets::database_identifier)), + datasets::enabled.eq(excluded(datasets::enabled)), // Ensure enabled status is updated + datasets::deleted_at.eq(None::>), // Important: Undelete if re-deploying )) .execute(&mut conn) - .await?; + .await + { + Ok(_) => (), + Err(e) => { + tracing::error!( + "Failed to bulk upsert datasets for data source '{}': {}", + data_source_name, + e + ); + // How to handle partial failures? Mark corresponding results as failed? + // For now, return a general error for the batch. + return Err(anyhow::anyhow!("Failed to upsert datasets: {}", e)); + } + }; - // Get the dataset IDs after upsert for column operations - let dataset_ids: HashMap = datasets::table + // Get the final dataset IDs after upsert (using database_name and data_source_id) + let upserted_dataset_names: Vec = datasets_to_upsert + .iter() + .map(|d| d.database_name.clone()) + .collect(); + let final_dataset_ids: HashMap = match datasets::table .filter(datasets::data_source_id.eq(&data_source.id)) - .filter(datasets::database_name.eq_any(valid_datasets.iter().map(|req| &req.name))) + .filter(datasets::database_name.eq_any(upserted_dataset_names)) .filter(datasets::deleted_at.is_null()) .select((datasets::database_name, datasets::id)) .load::<(String, Uuid)>(&mut conn) - .await? - .into_iter() - .collect(); + .await + { + Ok(ids) => ids.into_iter().collect(), + Err(e) => { + tracing::error!( + "Failed to retrieve dataset IDs after upsert for data source '{}': {}", + data_source_name, + e + ); + return Err(anyhow::anyhow!( + "Failed to retrieve dataset IDs after upsert: {}", + e + )); + } + }; - // Bulk upsert columns for each dataset - for req in valid_datasets { - let dataset_id = match dataset_ids.get(&req.name) { + // Bulk upsert columns for each dataset that was successfully upserted + for (dataset_name, columns_for_dataset) in columns_to_upsert_map { + let dataset_id = match final_dataset_ids.get(&dataset_name) { Some(id) => *id, None => { tracing::error!( - "Dataset ID not found after upsert for {}.{}", - req.schema, - req.name + "Dataset ID not found after upsert for dataset named '{}' in data source '{}'. Skipping column update.", + dataset_name, data_source_name ); - continue; + // Mark corresponding result as failed? + if let Some(validation) = results.iter_mut().find(|r| { + r.model_name == dataset_name && r.data_source_name == data_source_name + }) { + validation.success = false; + validation.add_error(ValidationError::internal_error( + "Dataset ID missing after upsert.".to_string(), + )); + } + continue; // Skip to next dataset's columns } }; - // Prepare columns with deduplication - // Use HashMap for deduplication, keyed by (dataset_id, column_name) - // This matches the ON CONFLICT clause - let mut column_map: HashMap<(Uuid, String), DatasetColumn> = HashMap::new(); - - for col in &req.columns { - // Create a composite key using dataset_id and column name - let key = (dataset_id, col.name.clone()); - - // Create column - let dataset_column = DatasetColumn { - id: Uuid::new_v4(), - dataset_id, - name: col.name.clone(), - type_: col.type_.clone().unwrap_or_else(|| "text".to_string()), - description: Some(col.description.clone()), - nullable: true, - created_at: now, - updated_at: now, - deleted_at: None, - stored_values: None, - stored_values_status: None, - stored_values_error: None, - stored_values_count: None, - stored_values_last_synced: None, - semantic_type: col.semantic_type.clone(), - dim_type: col.type_.clone(), - expr: col.expr.clone(), - }; - - // Only insert if it doesn't exist or replace if it does - column_map.insert(key, dataset_column); - } - - // Convert map values to Vec for insert - let columns: Vec = column_map.into_values().collect(); - - // Log deduplication results - if columns.len() < req.columns.len() { - tracing::info!( - "Deduplicated {} columns down to {} unique columns for dataset {}", - req.columns.len(), - columns.len(), - req.name - ); - - // Detailed logging of found duplicates - let mut column_names = HashSet::new(); - let mut duplicates = Vec::new(); - - for col in &req.columns { - if !column_names.insert(col.name.clone()) { - duplicates.push(col.name.clone()); - } - } - - if !duplicates.is_empty() { - tracing::info!("Found duplicate column names in dataset {}: {:?}", req.name, duplicates); - } - } + // Update dataset_id in the prepared columns + let final_columns: Vec = columns_for_dataset + .into_iter() + .map(|mut col| { + col.dataset_id = dataset_id; + col + }) + .collect(); - // Get current column names - let current_column_names: HashSet = dataset_columns::table + // Get current column names from DB for soft deletion logic + let current_column_names: HashSet = match dataset_columns::table .filter(dataset_columns::dataset_id.eq(dataset_id)) .filter(dataset_columns::deleted_at.is_null()) .select(dataset_columns::name) .load::(&mut conn) - .await? - .into_iter() - .collect(); + .await + { + Ok(names) => names.into_iter().collect(), + Err(e) => { + tracing::error!( + "Failed to get current columns for dataset ID '{}': {}", + dataset_id, + e + ); + // Mark corresponding result as failed? + if let Some(validation) = results.iter_mut().find(|r| { + r.model_name == dataset_name && r.data_source_name == data_source_name + }) { + validation.success = false; + validation.add_error(ValidationError::internal_error(format!( + "Failed to fetch current columns: {}", + e + ))); + } + continue; // Skip column update for this dataset + } + }; - // Get new column names - let new_column_names: HashSet = columns - .iter() - .map(|c| c.name.clone()) - .collect(); + // Get new column names from the request + let new_column_names: HashSet = + final_columns.iter().map(|c| c.name.clone()).collect(); - // Soft delete removed columns + // Soft delete columns removed in the request let columns_to_delete: Vec = current_column_names .difference(&new_column_names) .cloned() .collect(); if !columns_to_delete.is_empty() { - diesel::update(dataset_columns::table) + match diesel::update(dataset_columns::table) .filter(dataset_columns::dataset_id.eq(dataset_id)) .filter(dataset_columns::name.eq_any(&columns_to_delete)) .filter(dataset_columns::deleted_at.is_null()) .set(dataset_columns::deleted_at.eq(now)) .execute(&mut conn) - .await?; + .await + { + Ok(_) => tracing::info!( + "Soft deleted {} columns for dataset ID '{}'", + columns_to_delete.len(), + dataset_id + ), + Err(e) => { + tracing::error!( + "Failed to soft delete columns for dataset ID '{}': {}", + dataset_id, + e + ); + // Mark corresponding result as failed? + if let Some(validation) = results.iter_mut().find(|r| { + r.model_name == dataset_name + && r.data_source_name == data_source_name + }) { + validation.success = false; + validation.add_error(ValidationError::internal_error(format!( + "Failed to delete old columns: {}", + e + ))); + } + // Consider if we should continue or stop the whole process + } + }; } // Bulk upsert columns - diesel::insert_into(dataset_columns::table) - .values(&columns) - .on_conflict((dataset_columns::dataset_id, dataset_columns::name)) - .do_update() - .set(( - dataset_columns::type_.eq(excluded(dataset_columns::type_)), - dataset_columns::description.eq(excluded(dataset_columns::description)), - dataset_columns::semantic_type.eq(excluded(dataset_columns::semantic_type)), - dataset_columns::dim_type.eq(excluded(dataset_columns::dim_type)), - dataset_columns::expr.eq(excluded(dataset_columns::expr)), - dataset_columns::updated_at.eq(now), - dataset_columns::deleted_at.eq(None::>), - )) - .execute(&mut conn) - .await?; + if !final_columns.is_empty() { + match diesel::insert_into(dataset_columns::table) + .values(&final_columns) + .on_conflict((dataset_columns::dataset_id, dataset_columns::name)) + .do_update() + .set(( + dataset_columns::type_.eq(excluded(dataset_columns::type_)), + dataset_columns::description.eq(excluded(dataset_columns::description)), + dataset_columns::semantic_type + .eq(excluded(dataset_columns::semantic_type)), + dataset_columns::dim_type.eq(excluded(dataset_columns::dim_type)), + dataset_columns::expr.eq(excluded(dataset_columns::expr)), + dataset_columns::nullable.eq(excluded(dataset_columns::nullable)), // Update nullable status + dataset_columns::updated_at.eq(now), // Use current time for update + dataset_columns::deleted_at.eq(None::>), // Undelete if needed + )) + .execute(&mut conn) + .await + { + Ok(_) => (), + Err(e) => { + tracing::error!( + "Failed to bulk upsert columns for dataset ID '{}': {}", + dataset_id, + e + ); + // Mark corresponding result as failed? + if let Some(validation) = results.iter_mut().find(|r| { + r.model_name == dataset_name + && r.data_source_name == data_source_name + }) { + validation.success = false; + validation.add_error(ValidationError::internal_error(format!( + "Failed to upsert columns: {}", + e + ))); + } + // Consider if we should continue or stop the whole process + } + }; + } } + } else { + tracing::info!( + "No datasets to upsert for data source '{}'", + data_source_name + ); } - } + } // End of loop through data_source_groups Ok(results) } -async fn batch_validate_datasets( - user_id: &Uuid, - requests: Vec, -) -> Result { - let mut successes = Vec::new(); - let mut failures = Vec::new(); - let organization_id = get_user_organization_id(user_id).await?; - - // Group requests by data source and database for efficient validation - let mut data_source_groups: HashMap< - (String, Option), - Vec<(&DatasetValidationRequest, Vec<(&str, &str)>)>, - > = HashMap::new(); - - for request in &requests { - let columns: Vec<(&str, &str)> = request - .columns - .iter() - .map(|c| (c.name.as_str(), c.type_.as_deref().unwrap_or("text"))) - .collect(); - - data_source_groups - .entry((request.data_source_name.clone(), None)) // Using None for database since it's not in the validation request - .or_default() - .push((request, columns)); - } - - // Process each data source group - for ((data_source_name, database), group) in data_source_groups { - let mut conn = get_pg_pool().get().await?; - - // Get data source - let data_source = match data_sources::table - .filter(data_sources::name.eq(&data_source_name)) - .filter(data_sources::organization_id.eq(organization_id)) - .select(data_sources::all_columns) - .first::(&mut conn) - .await - { - Ok(ds) => ds, - Err(e) => { - for (request, _) in group { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: vec![ValidationError::data_source_error(format!( - "Data source not found: {}", - e - ))], - }); - } - continue; - } - }; - - // Prepare tables for batch validation - let tables_to_validate: Vec<(String, String)> = group - .iter() - .map(|(req, _)| (req.name.clone(), req.schema.clone())) - .collect(); - - // Get credentials - let credentials = - match get_data_source_credentials(&data_source.id, &data_source.type_, false) - .await - { - Ok(creds) => creds, - Err(e) => { - for (request, _) in group { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: vec![ValidationError::data_source_error(format!( - "Failed to get data source credentials: {}", - e - ))], - }); - } - continue; - } - }; - - // Get all columns in one batch - let ds_columns = - match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, database).await { - Ok(cols) => cols, - Err(e) => { - for (request, _) in group { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: vec![ValidationError::data_source_error(format!( - "Failed to get columns from data source: {}", - e - ))], - }); - } - continue; - } - }; - - // Validate each dataset in the group - for (request, columns) in group { - let mut validation_errors = Vec::new(); - - // Filter columns for this dataset - let dataset_columns: Vec<_> = ds_columns - .iter() - .filter(|col| col.dataset_name == request.name && col.schema_name == request.schema) - .collect(); - - if dataset_columns.is_empty() { - validation_errors.push(ValidationError::table_not_found(&request.name)); - } else { - // Validate each column exists - for (col_name, _) in &columns { - if !dataset_columns.iter().any(|c| c.name == *col_name) { - validation_errors.push(ValidationError::column_not_found(col_name)); - } - } - } - - if validation_errors.is_empty() { - // Create or update dataset - match create_or_update_dataset(request, &organization_id, user_id).await { - Ok(dataset_id) => { - successes.push(DatasetValidationSuccess { - dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - }); - } - Err(e) => { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: vec![ValidationError::data_source_error(format!( - "Failed to create/update dataset: {}", - e - ))], - }); - } - } - } else { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: validation_errors, - }); - } - } - } - - Ok(BatchValidationResult { - successes, - failures, - }) +// --- Local Struct Definitions --- (No import needed for these within this file) +#[derive(Debug, Serialize, Clone)] // Make Cloneable if needed by results.push(validation) +pub struct ValidationResult { + pub model_name: String, + pub data_source_name: String, + pub schema: String, + pub success: bool, + pub errors: Vec, } -async fn create_or_update_dataset( - request: &DatasetValidationRequest, - organization_id: &Uuid, - user_id: &Uuid, -) -> Result { - let mut conn = get_pg_pool().get().await?; - let now = Utc::now(); - - let dataset_id = match request.dataset_id { - Some(id) => { - // Update existing dataset - diesel::update(datasets::table) - .filter(datasets::id.eq(id)) - .set(( - datasets::name.eq(&request.name), - datasets::updated_at.eq(now), - datasets::updated_by.eq(user_id), - )) - .execute(&mut conn) - .await?; - id +impl ValidationResult { + fn new(model_name: String, data_source_name: String, schema: String) -> Self { + Self { + model_name, + data_source_name, + schema, + success: true, + errors: Vec::new(), } - None => { - // Create new dataset - let dataset = Dataset { - id: Uuid::new_v4(), - name: request.name.clone(), - data_source_id: Uuid::new_v4(), // This needs to be set correctly - created_at: now, - updated_at: now, - database_name: request.name.clone(), - when_to_use: None, - when_not_to_use: None, - type_: DatasetType::View, - definition: String::new(), - schema: request.schema.clone(), - enabled: false, - created_by: user_id.clone(), - updated_by: user_id.clone(), - deleted_at: None, - imported: false, - organization_id: organization_id.clone(), - yml_file: None, - model: None, - database_identifier: None, - }; - - diesel::insert_into(datasets::table) - .values(&dataset) - .execute(&mut conn) - .await?; - - dataset.id - } - }; - - // Create new columns with deduplication - // Use HashMap for deduplication, keyed by (dataset_id, column_name) - // This matches the ON CONFLICT clause used for columns - let mut column_map: HashMap<(Uuid, String), DatasetColumn> = HashMap::new(); - - for col in &request.columns { - // Create a composite key using dataset_id and column name - let key = (dataset_id, col.name.clone()); - - // Create column - let dataset_column = DatasetColumn { - id: Uuid::new_v4(), - dataset_id, - name: col.name.clone(), - type_: col.type_.clone().unwrap_or_else(|| "text".to_string()), - description: Some(col.description.clone()), - nullable: true, // This should be determined from the source - created_at: now, - updated_at: now, - deleted_at: None, - stored_values: None, - stored_values_status: None, - stored_values_error: None, - stored_values_count: None, - stored_values_last_synced: None, - semantic_type: col.semantic_type.clone(), - dim_type: None, - expr: col.expr.clone(), - }; - - // Only insert if it doesn't exist or replace if it does - column_map.insert(key, dataset_column); - } - - // Convert map values to Vec for insert - let new_columns: Vec = column_map.into_values().collect(); - - // Log deduplication results - if new_columns.len() < request.columns.len() { - tracing::info!( - "Deduplicated {} columns down to {} unique columns for validation dataset {}", - request.columns.len(), - new_columns.len(), - request.name - ); } - // Get current column names for this dataset - let current_column_names: Vec = dataset_columns::table - .filter(dataset_columns::dataset_id.eq(dataset_id)) - .filter(dataset_columns::deleted_at.is_null()) - .select(dataset_columns::name) - .load::(&mut conn) - .await?; - - // Soft delete columns that are no longer present - let new_column_names: Vec = new_columns.iter().map(|c| c.name.clone()).collect(); - diesel::update(dataset_columns::table) - .filter(dataset_columns::dataset_id.eq(dataset_id)) - .filter(dataset_columns::deleted_at.is_null()) - .filter(dataset_columns::name.ne_all(&new_column_names)) - .set(dataset_columns::deleted_at.eq(now)) - .execute(&mut conn) - .await?; - - // Insert new columns - diesel::insert_into(dataset_columns::table) - .values(&new_columns) - .execute(&mut conn) - .await?; - - Ok(dataset_id) + fn add_error(&mut self, error: ValidationError) { + self.success = false; + self.errors.push(error); + } } + +#[derive(Debug, Serialize, Clone)] // Make Cloneable if needed by errors.clone() +pub struct ValidationError { + pub code: String, + pub message: String, + pub location: Option, // e.g., "column: column_name" +} + +impl ValidationError { + fn data_source_error(message: String) -> Self { + Self { + code: "DATA_SOURCE_ERROR".to_string(), + message, + location: None, + } + } + + fn internal_error(message: String) -> Self { + Self { + code: "INTERNAL_ERROR".to_string(), + message, + location: None, + } + } + + // Add other factory methods if they were used (e.g., table_not_found, column_not_found) + // fn table_not_found(table_name: &str) -> Self { + // Self { + // code: "TABLE_NOT_FOUND".to_string(), + // message: format!("Table '{}' not found in data source.", table_name), + // location: None, + // } + // } + // fn column_not_found(column_name: &str) -> Self { + // Self { + // code: "COLUMN_NOT_FOUND".to_string(), + // message: format!("Column '{}' not found in table.", column_name), + // location: Some(format!("column: {}", column_name)), + // } + // } +} +// --- End Local Struct Definitions --- diff --git a/api/server/src/routes/rest/routes/datasets/mod.rs b/api/server/src/routes/rest/routes/datasets/mod.rs index 44af3ee6f..4849095d6 100644 --- a/api/server/src/routes/rest/routes/datasets/mod.rs +++ b/api/server/src/routes/rest/routes/datasets/mod.rs @@ -1,6 +1,6 @@ mod assets; mod delete_dataset; -// mod deploy_datasets; +mod deploy_datasets; // mod generate_datasets; mod get_dataset; mod get_dataset_data_sample; @@ -16,7 +16,7 @@ pub fn router() -> Router { Router::new() .route("/", get(list_datasets::list_datasets)) .route("/", post(post_dataset::post_dataset)) - // .route("/deploy", post(deploy_datasets::deploy_datasets)) + .route("/deploy", post(deploy_datasets::deploy_datasets)) // .route("/generate", post(generate_datasets::generate_datasets)) .route("/:dataset_id", get(get_dataset::get_dataset)) .route("/:dataset_id", delete(delete_dataset::delete_dataset))