diff --git a/api/libs/handlers/src/datasets/deploy.rs b/api/libs/handlers/src/datasets/deploy.rs index 3249eea97..208b48689 100644 --- a/api/libs/handlers/src/datasets/deploy.rs +++ b/api/libs/handlers/src/datasets/deploy.rs @@ -54,6 +54,9 @@ pub async fn deploy_datasets_handler_core( .push(req); } + let mut successfully_processed_data_source_ids: HashSet = HashSet::new(); + let mut deployed_datasets_by_data_source: HashMap> = HashMap::new(); + for ((data_source_name, env, _database), group) in data_source_groups { // Get data source let data_source = match database::schema::data_sources::table // Incorrect path @@ -87,6 +90,8 @@ pub async fn deploy_datasets_handler_core( } }; + successfully_processed_data_source_ids.insert(data_source.id); + let request_db_names: Vec = group.iter().map(|req| req.name.clone()).collect(); let existing_dataset_ids: HashMap = match database::schema::datasets::table // Incorrect path .filter(database::schema::datasets::data_source_id.eq(data_source.id)) @@ -98,20 +103,47 @@ pub async fn deploy_datasets_handler_core( Ok(ids) => ids.into_iter().collect(), Err(e) => { error!("Failed to retrieve existing dataset IDs for data source '{}': {}", data_source_name, e); - return Err(anyhow::anyhow!("Failed to retrieve existing dataset IDs: {}", e)); + // Propagate error for this group or mark results as failed + for req_in_group in group { + if let Some(validation_result) = results.iter_mut().find(|r| r.model_name == req_in_group.name && r.data_source_name == req_in_group.data_source_name && r.schema == req_in_group.schema) { + validation_result.add_error(ValidationError::internal_error(format!("Failed to retrieve existing dataset IDs: {}", e))); + } else { + let mut validation = ValidationResult::new( + req_in_group.name.clone(), + req_in_group.data_source_name.clone(), + req_in_group.schema.clone(), + ); + validation.add_error(ValidationError::internal_error(format!("Failed to retrieve existing dataset IDs: {}", e))); + results.push(validation); + } + } + continue; // Continue to the next group } }; let mut datasets_to_upsert_map: HashMap<(String, Uuid), database::models::Dataset> = HashMap::new(); // Incorrect path - for req in group.clone() { + for req in group.clone() { // group is Vec<&DeployDatasetsRequest> + // Collect names of datasets intended for deployment in this group for this data_source.id + deployed_datasets_by_data_source + .entry(data_source.id) + .or_default() + .insert(req.name.clone()); + let mut validation = ValidationResult::new( req.name.clone(), req.data_source_name.clone(), req.schema.clone(), ); - validation.success = true; - results.push(validation); // Add to results early + // Basic validation, e.g., if model is required + if req.model.is_none() { // Example validation: model is required + validation.add_error(ValidationError::internal_error("Field 'model' is required.".to_string())); + results.push(validation); + continue; // Skip this request + } + validation.success = true; // Assume success initially, will be overridden by upsert errors + results.push(validation); + let now = Utc::now(); let dataset_id = existing_dataset_ids.get(&req.name).copied().unwrap_or_else(|| req.id.unwrap_or_else(Uuid::new_v4)); @@ -120,7 +152,7 @@ pub async fn deploy_datasets_handler_core( id: dataset_id, name: req.name.clone(), data_source_id: data_source.id, - created_at: now, + created_at: now, // This will be handled by DB default or on_conflict for new records updated_at: now, database_name: req.name.clone(), when_to_use: Some(req.description.clone()), @@ -128,10 +160,10 @@ pub async fn deploy_datasets_handler_core( type_: database::enums::DatasetType::View, // Incorrect path definition: req.sql_definition.clone().unwrap_or_default(), // Still keeping SQL definition schema: req.schema.clone(), - enabled: true, - created_by: *user_id, + enabled: true, // By default, a deployed dataset is enabled + created_by: *user_id, // This will be handled by DB default or on_conflict for new records updated_by: *user_id, - deleted_at: None, + deleted_at: None, // Explicitly mark as not deleted imported: false, organization_id: organization_id, model: req.model.clone(), @@ -156,26 +188,28 @@ pub async fn deploy_datasets_handler_core( database::schema::datasets::definition.eq(excluded(database::schema::datasets::definition)), database::schema::datasets::when_to_use.eq(excluded(database::schema::datasets::when_to_use)), database::schema::datasets::model.eq(excluded(database::schema::datasets::model)), - database::schema::datasets::yml_file.eq(excluded(database::schema::datasets::yml_file)), // Upsert yml_file + database::schema::datasets::yml_file.eq(excluded(database::schema::datasets::yml_file)), database::schema::datasets::schema.eq(excluded(database::schema::datasets::schema)), database::schema::datasets::database_identifier.eq(excluded(database::schema::datasets::database_identifier)), - database::schema::datasets::enabled.eq(excluded(database::schema::datasets::enabled)), - database::schema::datasets::deleted_at.eq(None::>), + database::schema::datasets::enabled.eq(true), // Directly set to true on upsert + database::schema::datasets::deleted_at.eq(None as Option>), // Explicitly ensure it's not deleted )) .execute(&mut conn) .await { - Ok(_) => info!("Successfully upserted {} datasets for data source '{}'", datasets_to_upsert.len(), data_source_name), + Ok(num_upserted) => { + info!("Successfully upserted {} datasets for data source '{}'", num_upserted, data_source_name); + // Success is already marked for validation results, no change needed here unless specific counts matter. + } Err(e) => { - error!("Failed to bulk upsert datasets for data source '{}': {}", data_source_name, e); - // Mark all results for this group as failed - for req_in_group in group { - if let Some(validation_result) = results.iter_mut().find(|r| r.model_name == req_in_group.name && r.data_source_name == req_in_group.data_source_name) { + error!("Failed to bulk upsert datasets for data_source_id '{}': {}", data_source.id, e); + // Mark all results for this group's successfully mapped datasets as failed + for dataset_to_upsert in datasets_to_upsert { + if let Some(validation_result) = results.iter_mut().find(|r| r.model_name == dataset_to_upsert.name && r.data_source_name == data_source_name && r.schema == dataset_to_upsert.schema) { + validation_result.success = false; // Mark as false explicitly validation_result.add_error(ValidationError::internal_error(format!("Failed to upsert dataset: {}", e))); } } - // Optionally, you might want to return early here or collect errors and continue - // For now, let's just log and the results will reflect the failure. } }; // Column processing is skipped as per requirements. @@ -185,6 +219,80 @@ pub async fn deploy_datasets_handler_core( } } + // --- SOFT DELETION LOGIC --- + info!("Starting soft-deletion phase for datasets not in the current deployment batch..."); + + for data_source_id_to_clean in successfully_processed_data_source_ids { + let names_in_current_deployment = deployed_datasets_by_data_source + .get(&data_source_id_to_clean) + .cloned() + .unwrap_or_default(); + + let active_datasets_in_db: Vec<(Uuid, String)> = match database::schema::datasets::table + .filter(database::schema::datasets::data_source_id.eq(data_source_id_to_clean)) + .filter(database::schema::datasets::deleted_at.is_null()) + .select((database::schema::datasets::id, database::schema::datasets::database_name)) + .load::<(Uuid, String)>(&mut conn) + .await + { + Ok(datasets) => datasets, + Err(e) => { + error!( + "SOFT_DELETE_FAIL: Failed to retrieve active datasets for data_source_id '{}': {}. Skipping soft-deletion for this data source.", + data_source_id_to_clean, e + ); + // Optionally, add a non-model-specific error to `results` or a general operational warning. + continue; + } + }; + + let mut dataset_ids_to_soft_delete: Vec = Vec::new(); + for (dataset_id, dataset_name_in_db) in active_datasets_in_db { + if !names_in_current_deployment.contains(&dataset_name_in_db) { + dataset_ids_to_soft_delete.push(dataset_id); + } + } + + if !dataset_ids_to_soft_delete.is_empty() { + info!( + "SOFT_DELETE: Identified {} datasets to soft-delete for data_source_id '{}'.", + dataset_ids_to_soft_delete.len(), + data_source_id_to_clean + ); + let now = Utc::now(); + match diesel::update(database::schema::datasets::table.filter(database::schema::datasets::id.eq_any(&dataset_ids_to_soft_delete))) + .set(( + database::schema::datasets::deleted_at.eq(now), + database::schema::datasets::updated_at.eq(now), + database::schema::datasets::updated_by.eq(*user_id), + database::schema::datasets::enabled.eq(false), + )) + .execute(&mut conn) + .await + { + Ok(num_deleted) => { + info!( + "SOFT_DELETE_SUCCESS: Successfully soft-deleted {} datasets for data_source_id '{}'.", + num_deleted, data_source_id_to_clean + ); + } + Err(e) => { + error!( + "SOFT_DELETE_FAIL: Failed to soft-delete {} datasets for data_source_id '{}': {}. These datasets may remain active erroneously.", + dataset_ids_to_soft_delete.len(), data_source_id_to_clean, e + ); + // Optionally, add a non-model-specific error to `results` or a general operational warning. + } + } + } else { + info!( + "SOFT_DELETE: No datasets to soft-delete for data_source_id '{}'. All active datasets were part of the deployment or no relevant active datasets found.", + data_source_id_to_clean + ); + } + } + // --- END SOFT DELETION LOGIC --- + Ok(results) } diff --git a/cli/cli/src/commands/deploy/deploy.rs b/cli/cli/src/commands/deploy/deploy.rs index ed8087fd2..3b2aba7de 100644 --- a/cli/cli/src/commands/deploy/deploy.rs +++ b/cli/cli/src/commands/deploy/deploy.rs @@ -741,7 +741,7 @@ fn handle_deploy_response( let mut has_validation_errors = false; // Process validation results - for validation in &response.results { + for validation in response.results.iter() { // Find corresponding file from model mapping let file = model_mappings .iter()