diff --git a/api/libs/handlers/Cargo.toml b/api/libs/handlers/Cargo.toml index 4aed53257..edbf748ab 100644 --- a/api/libs/handlers/Cargo.toml +++ b/api/libs/handlers/Cargo.toml @@ -32,6 +32,7 @@ search = { path = "../search" } email = { path = "../email" } sql_analyzer = { path = "../sql_analyzer" } dataset_security = { path = "../dataset_security" } +semantic_layer = { path = "../semantic_layer" } # Add any handler-specific dependencies here dashmap = "5.5.3" diff --git a/api/libs/handlers/src/datasets/deploy.rs b/api/libs/handlers/src/datasets/deploy.rs index b94fe6529..df109ca64 100644 --- a/api/libs/handlers/src/datasets/deploy.rs +++ b/api/libs/handlers/src/datasets/deploy.rs @@ -1,24 +1,30 @@ -// This file will contain the new dataset deployment handler logic. +// This file will contain the new dataset deployment handler logic. use anyhow::Result; use chrono::{DateTime, Utc}; use diesel::{upsert::excluded, ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use serde::Deserialize; // Added for DeployDatasetsRequest if it's moved or redefined here +use serde_yaml; // Added for model deserialization use std::collections::{HashMap, HashSet}; -use uuid::Uuid; +use tokio::spawn; // Added for concurrent job execution use tracing::{error, info, warn}; +use uuid::Uuid; // Types from this crate's parent (handlers) -> Corrected to super -use super::types::{ValidationError, ValidationResult, DeployDatasetsRequest}; // Added DeployDatasetsRequest +use super::types::{DeployDatasetsRequest, ValidationError, ValidationResult}; // Added DeployDatasetsRequest // Corrected to use the `database` crate directly as per Cargo.toml -use database::{{ +use database::{ enums::DatasetType, - models::{{DataSource, Dataset}}, + models::{DataSource, Dataset}, pool::get_pg_pool, - schema::{{data_sources, datasets}}, -}}; + schema::{data_sources, datasets}, +}; + +// Add imports for semantic_layer and stored_values +use semantic_layer::models::Model as SemanticModel; // Using alias +use stored_values::jobs as stored_values_jobs; // Using module alias for clarity // TODO: Define or import necessary structs like DeployDatasetsRequest, DataSource, Dataset, etc. // For now, let's assume DeployDatasetsRequest will be passed in. @@ -32,7 +38,7 @@ pub async fn deploy_datasets_handler_core( user_id: &Uuid, organization_id: Uuid, // Pass organization_id directly requests: Vec, // This now uses the imported DeployDatasetsRequest - // conn: &mut AsyncPgConnection, // Or get a connection from a pool passed in/accessible globally + // conn: &mut AsyncPgConnection, // Or get a connection from a pool passed in/accessible globally ) -> Result> { // Temporary: Get a connection. This needs to be replaced with proper DB connection management. let mut conn = database::pool::get_pg_pool().get().await?; // This path is incorrect from here @@ -121,9 +127,11 @@ pub async fn deploy_datasets_handler_core( } }; - let mut datasets_to_upsert_map: HashMap<(String, Uuid), database::models::Dataset> = HashMap::new(); // Incorrect path + let mut datasets_to_upsert_map: HashMap<(String, Uuid), database::models::Dataset> = + HashMap::new(); // Incorrect path - for req in group.clone() { // group is Vec<&DeployDatasetsRequest> + for req in group.clone() { + // group is Vec<&DeployDatasetsRequest> // Collect names of datasets intended for deployment in this group for this data_source.id deployed_datasets_by_data_source .entry(data_source.id) @@ -136,22 +144,31 @@ pub async fn deploy_datasets_handler_core( req.schema.clone(), ); // Basic validation, e.g., if model is required - if req.model.is_none() { // Example validation: model is required - validation.add_error(ValidationError::internal_error("Field 'model' is required.".to_string())); + if req.model.is_none() { + // Example validation: model is required + validation.add_error(ValidationError::internal_error( + "Field 'model' is required.".to_string(), + )); results.push(validation); continue; // Skip this request } validation.success = true; // Assume success initially, will be overridden by upsert errors results.push(validation); - let now = Utc::now(); - let dataset_id = existing_dataset_ids.get(&req.name).copied().unwrap_or_else(|| req.id.unwrap_or_else(Uuid::new_v4)); + let dataset_id = existing_dataset_ids + .get(&req.name) + .copied() + .unwrap_or_else(|| req.id.unwrap_or_else(Uuid::new_v4)); // Use req.database as a fallback for database_identifier - let final_database_identifier = req.database_identifier.clone().or_else(|| req.database.clone()); + let final_database_identifier = req + .database_identifier + .clone() + .or_else(|| req.database.clone()); - let dataset = database::models::Dataset { // Incorrect path + let dataset = database::models::Dataset { + // Incorrect path id: dataset_id, name: req.name.clone(), data_source_id: data_source.id, @@ -163,7 +180,7 @@ pub async fn deploy_datasets_handler_core( type_: database::enums::DatasetType::View, // Incorrect path definition: req.sql_definition.clone().unwrap_or_default(), // Still keeping SQL definition schema: req.schema.clone(), - enabled: true, // By default, a deployed dataset is enabled + enabled: true, // By default, a deployed dataset is enabled created_by: *user_id, // This will be handled by DB default or on_conflict for new records updated_by: *user_id, deleted_at: None, // Explicitly mark as not deleted @@ -176,24 +193,35 @@ pub async fn deploy_datasets_handler_core( datasets_to_upsert_map.insert((req.name.clone(), data_source.id), dataset); } - let datasets_to_upsert: Vec = datasets_to_upsert_map.into_values().collect(); // Incorrect path + let datasets_to_upsert: Vec = + datasets_to_upsert_map.into_values().collect(); // Incorrect path if !datasets_to_upsert.is_empty() { let now = Utc::now(); match diesel::insert_into(database::schema::datasets::table) // Incorrect path .values(&datasets_to_upsert) - .on_conflict((database::schema::datasets::database_name, database::schema::datasets::data_source_id)) // Incorrect path + .on_conflict(( + database::schema::datasets::database_name, + database::schema::datasets::data_source_id, + )) // Incorrect path .do_update() .set(( database::schema::datasets::name.eq(excluded(database::schema::datasets::name)), database::schema::datasets::updated_at.eq(now), - database::schema::datasets::updated_by.eq(excluded(database::schema::datasets::updated_by)), - database::schema::datasets::definition.eq(excluded(database::schema::datasets::definition)), - database::schema::datasets::when_to_use.eq(excluded(database::schema::datasets::when_to_use)), - database::schema::datasets::model.eq(excluded(database::schema::datasets::model)), - database::schema::datasets::yml_file.eq(excluded(database::schema::datasets::yml_file)), - database::schema::datasets::schema.eq(excluded(database::schema::datasets::schema)), - database::schema::datasets::database_identifier.eq(excluded(database::schema::datasets::database_identifier)), + database::schema::datasets::updated_by + .eq(excluded(database::schema::datasets::updated_by)), + database::schema::datasets::definition + .eq(excluded(database::schema::datasets::definition)), + database::schema::datasets::when_to_use + .eq(excluded(database::schema::datasets::when_to_use)), + database::schema::datasets::model + .eq(excluded(database::schema::datasets::model)), + database::schema::datasets::yml_file + .eq(excluded(database::schema::datasets::yml_file)), + database::schema::datasets::schema + .eq(excluded(database::schema::datasets::schema)), + database::schema::datasets::database_identifier + .eq(excluded(database::schema::datasets::database_identifier)), database::schema::datasets::enabled.eq(true), // Directly set to true on upsert database::schema::datasets::deleted_at.eq(None as Option>), // Explicitly ensure it's not deleted )) @@ -201,24 +229,136 @@ pub async fn deploy_datasets_handler_core( .await { Ok(num_upserted) => { - info!("Successfully upserted {} datasets for data source '{}'", num_upserted, data_source_name); + info!( + "Successfully upserted {} datasets for data source '{}'", + num_upserted, data_source_name + ); // Success is already marked for validation results, no change needed here unless specific counts matter. } Err(e) => { - error!("Failed to bulk upsert datasets for data_source_id '{}': {}", data_source.id, e); + error!( + "Failed to bulk upsert datasets for data_source_id '{}': {}", + data_source.id, e + ); // Mark all results for this group's successfully mapped datasets as failed - for dataset_to_upsert in datasets_to_upsert { - if let Some(validation_result) = results.iter_mut().find(|r| r.model_name == dataset_to_upsert.name && r.data_source_name == data_source_name && r.schema == dataset_to_upsert.schema) { + for dataset_to_upsert in &datasets_to_upsert { + if let Some(validation_result) = results.iter_mut().find(|r| { + r.model_name == dataset_to_upsert.name + && r.data_source_name == data_source_name + && r.schema == dataset_to_upsert.schema + }) { validation_result.success = false; // Mark as false explicitly - validation_result.add_error(ValidationError::internal_error(format!("Failed to upsert dataset: {}", e))); + validation_result.add_error(ValidationError::internal_error(format!( + "Failed to upsert dataset: {}", + e + ))); } } } }; // Column processing is skipped as per requirements. - // Stored values sync job logic is also skipped as it depends on columns. + + // ---- START: New logic for stored values jobs ---- + info!( + data_source_id = %data_source.id, + "Processing datasets for potential stored value sync jobs." + ); + for dataset_for_jobs in &datasets_to_upsert { + if let Some(yml_file_content) = &dataset_for_jobs.yml_file { + match serde_yaml::from_str::(yml_file_content) { + Ok(model_data) => { + let job_database_name = match dataset_for_jobs + .database_identifier + .as_ref() + { + Some(db_id) => db_id.clone(), + None => { + warn!( + dataset_name = %dataset_for_jobs.name, + data_source_id = %dataset_for_jobs.data_source_id, + model_name = %model_data.name, + "Skipping stored values job creation for model dimensions: dataset.database_identifier is None." + ); + continue; // Skip this model's dimensions processing + } + }; + + let job_schema_name = dataset_for_jobs.schema.clone(); + + for dimension in model_data.dimensions { + if dimension.searchable { + info!( + "Found searchable dimension '{}' in model '{}' for dataset '{}' (data_source_id: {})", + dimension.name, model_data.name, dataset_for_jobs.name, dataset_for_jobs.data_source_id + ); + + let job_data_source_id = dataset_for_jobs.data_source_id; + let current_job_database_name = job_database_name.clone(); + let current_job_schema_name = job_schema_name.clone(); + let job_table_name = dataset_for_jobs.name.clone(); + let job_column_name = dimension.name.clone(); + + spawn(async move { + info!( + data_source_id = %job_data_source_id, + database_name = %current_job_database_name, + schema_name = %current_job_schema_name, + table_name = %job_table_name, + column_name = %job_column_name, + "Setting up and running stored values sync job for searchable dimension." + ); + + if let Err(e) = stored_values_jobs::setup_sync_job( + job_data_source_id, + current_job_database_name.clone(), + current_job_schema_name.clone(), + job_table_name.clone(), + job_column_name.clone(), + ) + .await + { + error!( + "Failed to setup stored values sync job for {}.{}.{}.{} on data_source {}: {}", + current_job_database_name, current_job_schema_name, job_table_name, job_column_name, job_data_source_id, e + ); + return; + } + + match stored_values_jobs::sync_distinct_values_chunk( + job_data_source_id, + current_job_database_name, + current_job_schema_name, + job_table_name, + job_column_name, + ).await { + Ok(count) => info!( + "Successfully synced {} distinct values for searchable dimension '{}' (data_source_id: {}).", + count, dimension.name, job_data_source_id + ), + Err(e) => error!( + "Failed to sync distinct values for searchable dimension '{}' (data_source_id: {}): {}", + dimension.name, job_data_source_id, e + ), + } + }); + } + } + } + Err(e) => { + error!( + "Failed to deserialize YML content for dataset '{}' (data_source_id: {}): {}. Skipping sync job creation for its dimensions.", + dataset_for_jobs.name, dataset_for_jobs.data_source_id, e + ); + } + } + } + } + // ---- END: New logic for stored values jobs ---- } else { - info!("No datasets to upsert for data source '{}'", data_source_name); + info!( + "No datasets to upsert for data source '{}'", + data_source_name + ); } } @@ -234,7 +374,10 @@ pub async fn deploy_datasets_handler_core( let active_datasets_in_db: Vec<(Uuid, String)> = match database::schema::datasets::table .filter(database::schema::datasets::data_source_id.eq(data_source_id_to_clean)) .filter(database::schema::datasets::deleted_at.is_null()) - .select((database::schema::datasets::id, database::schema::datasets::database_name)) + .select(( + database::schema::datasets::id, + database::schema::datasets::database_name, + )) .load::<(Uuid, String)>(&mut conn) .await { @@ -245,7 +388,7 @@ pub async fn deploy_datasets_handler_core( data_source_id_to_clean, e ); // Optionally, add a non-model-specific error to `results` or a general operational warning. - continue; + continue; } }; @@ -263,15 +406,18 @@ pub async fn deploy_datasets_handler_core( data_source_id_to_clean ); let now = Utc::now(); - match diesel::update(database::schema::datasets::table.filter(database::schema::datasets::id.eq_any(&dataset_ids_to_soft_delete))) - .set(( - database::schema::datasets::deleted_at.eq(now), - database::schema::datasets::updated_at.eq(now), - database::schema::datasets::updated_by.eq(*user_id), - database::schema::datasets::enabled.eq(false), - )) - .execute(&mut conn) - .await + match diesel::update( + database::schema::datasets::table + .filter(database::schema::datasets::id.eq_any(&dataset_ids_to_soft_delete)), + ) + .set(( + database::schema::datasets::deleted_at.eq(now), + database::schema::datasets::updated_at.eq(now), + database::schema::datasets::updated_by.eq(*user_id), + database::schema::datasets::enabled.eq(false), + )) + .execute(&mut conn) + .await { Ok(num_deleted) => { info!( @@ -310,4 +456,4 @@ pub async fn deploy_datasets_handler_core( // or these types need to be passed in or accessed via a shared context/crate. // For example, `database::pool::get_pg_pool()` would need to be changed to something like `shared_db::pool::get()`. // Similarly for models like `database::models::Dataset` -> `shared_db::models::Dataset`. -// And schema `database::schema::datasets::table` -> `shared_db::schema::datasets::table`. \ No newline at end of file +// And schema `database::schema::datasets::table` -> `shared_db::schema::datasets::table`.