From 59049b5604efb6271f75a13520fce3f58a2a01f1 Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 4 Feb 2025 11:30:45 -0800 Subject: [PATCH] refactor(stored_values): improve background processing and error handling for stored column values (#85) - Refactor stored values processing in dataset deployment to use background task - Add `StoredValueColumn` struct to encapsulate column processing details - Implement `process_stored_values_background` for parallel and resilient value storage - Add logging for successful and failed stored value processing - Update CLI to handle optional SQL definitions and improve file processing --- .../rest/routes/datasets/deploy_datasets.rs | 62 +++++---- api/src/utils/stored_values/mod.rs | 40 ++++++ cli/.gitignore | 4 +- cli/src/commands/deploy.rs | 2 +- cli/src/utils/file/model_files.rs | 128 ++++++++++++------ 5 files changed, 161 insertions(+), 75 deletions(-) diff --git a/api/src/routes/rest/routes/datasets/deploy_datasets.rs b/api/src/routes/rest/routes/datasets/deploy_datasets.rs index 4b834c422..d5d3ab64a 100644 --- a/api/src/routes/rest/routes/datasets/deploy_datasets.rs +++ b/api/src/routes/rest/routes/datasets/deploy_datasets.rs @@ -23,6 +23,7 @@ use crate::{ write_query_engine::write_query_engine, }, security::checks::is_user_workspace_admin_or_data_admin, user::user_info::get_user_organization_id }, + utils::stored_values::{store_column_values, process_stored_values_background, StoredValueColumn}, }; #[derive(Debug, Deserialize)] @@ -558,17 +559,23 @@ async fn deploy_datasets_handler( } // Find the foreign dataset by the relationship name - let foreign_dataset = inserted_datasets + if let Some(foreign_dataset) = inserted_datasets .iter() .find(|d| d.name == rel.name) - .ok_or(anyhow!("Foreign dataset not found for relationship"))?; - - entity_relationships_to_upsert.push(EntityRelationship { - primary_dataset_id: current_dataset.id, - foreign_dataset_id: foreign_dataset.id, - relationship_type: rel.type_.clone(), - created_at: Utc::now(), - }); + { + entity_relationships_to_upsert.push(EntityRelationship { + primary_dataset_id: current_dataset.id, + foreign_dataset_id: foreign_dataset.id, + relationship_type: rel.type_.clone(), + created_at: Utc::now(), + }); + } else { + tracing::warn!( + "Skipping relationship: foreign dataset '{}' not found for dataset '{}'", + rel.name, + current_dataset.name + ); + } } } } @@ -616,6 +623,7 @@ async fn deploy_datasets_handler( .ok_or(anyhow!("Dataset not found"))?; // Process columns that have stored_values enabled + let mut stored_value_columns = Vec::new(); for col in &req.columns { if col.stored_values { // Find the column ID from inserted_columns @@ -624,28 +632,24 @@ async fn deploy_datasets_handler( .find(|c| c.dataset_id == dataset.id && c.name == col.name) .ok_or(anyhow!("Column not found"))?; - match crate::utils::stored_values::store_column_values( - &organization_id, - &dataset.id, - &col.name, - &column.id, // Pass the column ID - &data_source.id, - &dataset.schema, - &dataset.database_name, - ).await { - Ok(_) => (), - Err(e) => { - tracing::error!( - "Error storing values for column {}: {:?}", - col.name, - e - ); - // Continue with other columns even if one fails - continue; - } - } + stored_value_columns.push(StoredValueColumn { + organization_id: organization_id.clone(), + dataset_id: dataset.id, + column_name: col.name.clone(), + column_id: column.id, + data_source_id: data_source.id, + schema: dataset.schema.clone(), + table_name: dataset.database_name.clone(), + }); } } + + // Spawn background task to process stored values + if !stored_value_columns.is_empty() { + tokio::spawn(async move { + process_stored_values_background(stored_value_columns).await; + }); + } } if is_simple { diff --git a/api/src/utils/stored_values/mod.rs b/api/src/utils/stored_values/mod.rs index b4e53014b..19c776ac5 100644 --- a/api/src/utils/stored_values/mod.rs +++ b/api/src/utils/stored_values/mod.rs @@ -194,4 +194,44 @@ pub async fn search_stored_values( .await?; Ok(results.into_iter().map(|r| (r.value, r.column_name, r.column_id)).collect()) +} + +pub struct StoredValueColumn { + pub organization_id: Uuid, + pub dataset_id: Uuid, + pub column_name: String, + pub column_id: Uuid, + pub data_source_id: Uuid, + pub schema: String, + pub table_name: String, +} + +pub async fn process_stored_values_background(columns: Vec) { + for column in columns { + match store_column_values( + &column.organization_id, + &column.dataset_id, + &column.column_name, + &column.column_id, + &column.data_source_id, + &column.schema, + &column.table_name, + ).await { + Ok(_) => { + tracing::info!( + "Successfully processed stored values for column '{}' in dataset '{}'", + column.column_name, + column.table_name + ); + } + Err(e) => { + tracing::error!( + "Failed to process stored values for column '{}' in dataset '{}': {:?}", + column.column_name, + column.table_name, + e + ); + } + } + } } \ No newline at end of file diff --git a/cli/.gitignore b/cli/.gitignore index a9073a566..3be7153a3 100644 --- a/cli/.gitignore +++ b/cli/.gitignore @@ -14,4 +14,6 @@ Cargo.lock *.pdb Makefile -.vscode/ \ No newline at end of file +.vscode/ + +/prd \ No newline at end of file diff --git a/cli/src/commands/deploy.rs b/cli/src/commands/deploy.rs index 08e137f51..065a1ccc2 100644 --- a/cli/src/commands/deploy.rs +++ b/cli/src/commands/deploy.rs @@ -93,7 +93,7 @@ pub async fn deploy( processed_files.insert( model.name.clone(), format!("SQL: {} bytes, YML: {} bytes", - obj.sql_definition.len(), + obj.sql_definition.as_ref().map_or(0, |s| s.len()), obj.yml_content.len() ), ); diff --git a/cli/src/utils/file/model_files.rs b/cli/src/utils/file/model_files.rs index 74ff4def0..00be3672c 100644 --- a/cli/src/utils/file/model_files.rs +++ b/cli/src/utils/file/model_files.rs @@ -15,7 +15,7 @@ use super::{ #[derive(Debug, Serialize, Deserialize)] pub struct BusterModelObject { - pub sql_definition: String, + pub sql_definition: Option, pub model_file: BusterModel, pub yml_content: String, } @@ -32,8 +32,11 @@ pub struct Model { pub description: String, pub model: Option, pub schema: Option, + #[serde(default)] pub entities: Vec, + #[serde(default)] pub dimensions: Vec, + #[serde(default)] pub measures: Vec, } @@ -52,6 +55,8 @@ pub struct Dimension { #[serde(rename = "type")] pub dimension_type: String, pub description: String, + #[serde(default = "bool::default")] + pub stored_values: bool, } #[derive(Debug, Serialize, Deserialize)] @@ -65,21 +70,93 @@ pub struct Measure { pub async fn get_model_files(dir_path: Option<&str>) -> Result> { let mut model_objects = Vec::new(); let path = match dir_path { - Some(p) => std::path::Path::new("models").join(p), + Some(p) => std::path::PathBuf::from(p), None => std::path::Path::new("models").to_path_buf(), }; if !path.exists() { - return Err(anyhow::anyhow!("Directory not found: {}", path.display())); + return Err(anyhow::anyhow!("Path not found: {}", path.display())); + } + + if path.is_file() { + if let Some(ext) = path.extension() { + if ext == "yml" { + process_yml_file(&path, &mut model_objects, dir_path.is_some()).await?; + } else { + return Err(anyhow::anyhow!("File must be a YML file: {}", path.display())); + } + } else { + return Err(anyhow::anyhow!("File must be a YML file: {}", path.display())); + } + } else { + process_directory(&path, &mut model_objects, dir_path.is_some()).await?; } - process_directory(&path, &mut model_objects).await?; Ok(model_objects) } +async fn process_yml_file( + path: &std::path::Path, + model_objects: &mut Vec, + is_custom_path: bool, +) -> Result<()> { + println!("📄 Processing YAML file: {}", path.display()); + + let yaml_content = match fs::read_to_string(path).await { + Ok(content) => content, + Err(e) => { + println!("❌ Failed to read YAML file {}: {}", path.display(), e); + return Ok(()); + } + }; + + let model: BusterModel = match serde_yaml::from_str(&yaml_content) { + Ok(model) => model, + Err(e) => { + println!("⚠️ Skipping invalid YAML file {}: {}", path.display(), e); + return Ok(()); + } + }; + + if is_custom_path { + // In custom path mode, we don't need SQL files + model_objects.push(BusterModelObject { + sql_definition: None, + model_file: model, + yml_content: yaml_content, + }); + println!("✅ Successfully processed YML file: {}", path.display()); + } else { + // In default mode, we require SQL files + let sql_path = path.with_extension("sql"); + if !sql_path.exists() { + println!("⚠️ Skipping {} - No matching SQL file found at {}", path.display(), sql_path.display()); + return Ok(()); + } + + let sql_definition = match tokio::fs::read_to_string(&sql_path).await { + Ok(content) => content, + Err(e) => { + println!("❌ Failed to read SQL file {}: {}", sql_path.display(), e); + return Ok(()); + } + }; + + model_objects.push(BusterModelObject { + sql_definition: Some(sql_definition), + model_file: model, + yml_content: yaml_content, + }); + println!("✅ Successfully processed model file: {}", path.display()); + } + + Ok(()) +} + async fn process_directory( dir_path: &std::path::Path, model_objects: &mut Vec, + is_custom_path: bool, ) -> Result<()> { println!("📂 Processing directory: {}", dir_path.display()); let mut dir = tokio::fs::read_dir(dir_path).await?; @@ -88,50 +165,13 @@ async fn process_directory( let path = entry.path(); if path.is_dir() { - Box::pin(process_directory(&path, model_objects)).await?; + Box::pin(process_directory(&path, model_objects, is_custom_path)).await?; continue; } if let Some(ext) = path.extension() { if ext == "yml" { - println!("📄 Found YAML file: {}", path.display()); - let sql_path = path.with_extension("sql"); - - if !sql_path.exists() { - println!("⚠️ Skipping {} - No matching SQL file found at {}", path.display(), sql_path.display()); - continue; - } - - let sql_definition = match tokio::fs::read_to_string(&sql_path).await { - Ok(content) => content, - Err(e) => { - println!("❌ Failed to read SQL file {}: {}", sql_path.display(), e); - continue; - } - }; - - let yaml_content = match fs::read_to_string(&path).await { - Ok(content) => content, - Err(e) => { - println!("❌ Failed to read YAML file {}: {}", path.display(), e); - continue; - } - }; - - let model: BusterModel = match serde_yaml::from_str(&yaml_content) { - Ok(model) => model, - Err(e) => { - println!("❌ Failed to parse YAML file {}: {}", path.display(), e); - continue; - } - }; - - println!("✅ Successfully processed model file: {}", path.display()); - model_objects.push(BusterModelObject { - sql_definition, - model_file: model, - yml_content: yaml_content, - }); + process_yml_file(&path, model_objects, is_custom_path).await?; } } } @@ -206,7 +246,7 @@ pub async fn upload_model_files( model: semantic_model.model, schema: schema_name.clone(), description: semantic_model.description, - sql_definition: Some(model.sql_definition.clone()), + sql_definition: model.sql_definition.clone(), entity_relationships: Some(entity_relationships), columns, yml_file: Some(model.yml_content.clone()),