From 59049b5604efb6271f75a13520fce3f58a2a01f1 Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 4 Feb 2025 11:30:45 -0800 Subject: [PATCH 1/3] refactor(stored_values): improve background processing and error handling for stored column values (#85) - Refactor stored values processing in dataset deployment to use background task - Add `StoredValueColumn` struct to encapsulate column processing details - Implement `process_stored_values_background` for parallel and resilient value storage - Add logging for successful and failed stored value processing - Update CLI to handle optional SQL definitions and improve file processing --- .../rest/routes/datasets/deploy_datasets.rs | 62 +++++---- api/src/utils/stored_values/mod.rs | 40 ++++++ cli/.gitignore | 4 +- cli/src/commands/deploy.rs | 2 +- cli/src/utils/file/model_files.rs | 128 ++++++++++++------ 5 files changed, 161 insertions(+), 75 deletions(-) diff --git a/api/src/routes/rest/routes/datasets/deploy_datasets.rs b/api/src/routes/rest/routes/datasets/deploy_datasets.rs index 4b834c422..d5d3ab64a 100644 --- a/api/src/routes/rest/routes/datasets/deploy_datasets.rs +++ b/api/src/routes/rest/routes/datasets/deploy_datasets.rs @@ -23,6 +23,7 @@ use crate::{ write_query_engine::write_query_engine, }, security::checks::is_user_workspace_admin_or_data_admin, user::user_info::get_user_organization_id }, + utils::stored_values::{store_column_values, process_stored_values_background, StoredValueColumn}, }; #[derive(Debug, Deserialize)] @@ -558,17 +559,23 @@ async fn deploy_datasets_handler( } // Find the foreign dataset by the relationship name - let foreign_dataset = inserted_datasets + if let Some(foreign_dataset) = inserted_datasets .iter() .find(|d| d.name == rel.name) - .ok_or(anyhow!("Foreign dataset not found for relationship"))?; - - entity_relationships_to_upsert.push(EntityRelationship { - primary_dataset_id: current_dataset.id, - foreign_dataset_id: foreign_dataset.id, - relationship_type: rel.type_.clone(), - created_at: Utc::now(), - }); + { + entity_relationships_to_upsert.push(EntityRelationship { + primary_dataset_id: current_dataset.id, + foreign_dataset_id: foreign_dataset.id, + relationship_type: rel.type_.clone(), + created_at: Utc::now(), + }); + } else { + tracing::warn!( + "Skipping relationship: foreign dataset '{}' not found for dataset '{}'", + rel.name, + current_dataset.name + ); + } } } } @@ -616,6 +623,7 @@ async fn deploy_datasets_handler( .ok_or(anyhow!("Dataset not found"))?; // Process columns that have stored_values enabled + let mut stored_value_columns = Vec::new(); for col in &req.columns { if col.stored_values { // Find the column ID from inserted_columns @@ -624,28 +632,24 @@ async fn deploy_datasets_handler( .find(|c| c.dataset_id == dataset.id && c.name == col.name) .ok_or(anyhow!("Column not found"))?; - match crate::utils::stored_values::store_column_values( - &organization_id, - &dataset.id, - &col.name, - &column.id, // Pass the column ID - &data_source.id, - &dataset.schema, - &dataset.database_name, - ).await { - Ok(_) => (), - Err(e) => { - tracing::error!( - "Error storing values for column {}: {:?}", - col.name, - e - ); - // Continue with other columns even if one fails - continue; - } - } + stored_value_columns.push(StoredValueColumn { + organization_id: organization_id.clone(), + dataset_id: dataset.id, + column_name: col.name.clone(), + column_id: column.id, + data_source_id: data_source.id, + schema: dataset.schema.clone(), + table_name: dataset.database_name.clone(), + }); } } + + // Spawn background task to process stored values + if !stored_value_columns.is_empty() { + tokio::spawn(async move { + process_stored_values_background(stored_value_columns).await; + }); + } } if is_simple { diff --git a/api/src/utils/stored_values/mod.rs b/api/src/utils/stored_values/mod.rs index b4e53014b..19c776ac5 100644 --- a/api/src/utils/stored_values/mod.rs +++ b/api/src/utils/stored_values/mod.rs @@ -194,4 +194,44 @@ pub async fn search_stored_values( .await?; Ok(results.into_iter().map(|r| (r.value, r.column_name, r.column_id)).collect()) +} + +pub struct StoredValueColumn { + pub organization_id: Uuid, + pub dataset_id: Uuid, + pub column_name: String, + pub column_id: Uuid, + pub data_source_id: Uuid, + pub schema: String, + pub table_name: String, +} + +pub async fn process_stored_values_background(columns: Vec) { + for column in columns { + match store_column_values( + &column.organization_id, + &column.dataset_id, + &column.column_name, + &column.column_id, + &column.data_source_id, + &column.schema, + &column.table_name, + ).await { + Ok(_) => { + tracing::info!( + "Successfully processed stored values for column '{}' in dataset '{}'", + column.column_name, + column.table_name + ); + } + Err(e) => { + tracing::error!( + "Failed to process stored values for column '{}' in dataset '{}': {:?}", + column.column_name, + column.table_name, + e + ); + } + } + } } \ No newline at end of file diff --git a/cli/.gitignore b/cli/.gitignore index a9073a566..3be7153a3 100644 --- a/cli/.gitignore +++ b/cli/.gitignore @@ -14,4 +14,6 @@ Cargo.lock *.pdb Makefile -.vscode/ \ No newline at end of file +.vscode/ + +/prd \ No newline at end of file diff --git a/cli/src/commands/deploy.rs b/cli/src/commands/deploy.rs index 08e137f51..065a1ccc2 100644 --- a/cli/src/commands/deploy.rs +++ b/cli/src/commands/deploy.rs @@ -93,7 +93,7 @@ pub async fn deploy( processed_files.insert( model.name.clone(), format!("SQL: {} bytes, YML: {} bytes", - obj.sql_definition.len(), + obj.sql_definition.as_ref().map_or(0, |s| s.len()), obj.yml_content.len() ), ); diff --git a/cli/src/utils/file/model_files.rs b/cli/src/utils/file/model_files.rs index 74ff4def0..00be3672c 100644 --- a/cli/src/utils/file/model_files.rs +++ b/cli/src/utils/file/model_files.rs @@ -15,7 +15,7 @@ use super::{ #[derive(Debug, Serialize, Deserialize)] pub struct BusterModelObject { - pub sql_definition: String, + pub sql_definition: Option, pub model_file: BusterModel, pub yml_content: String, } @@ -32,8 +32,11 @@ pub struct Model { pub description: String, pub model: Option, pub schema: Option, + #[serde(default)] pub entities: Vec, + #[serde(default)] pub dimensions: Vec, + #[serde(default)] pub measures: Vec, } @@ -52,6 +55,8 @@ pub struct Dimension { #[serde(rename = "type")] pub dimension_type: String, pub description: String, + #[serde(default = "bool::default")] + pub stored_values: bool, } #[derive(Debug, Serialize, Deserialize)] @@ -65,21 +70,93 @@ pub struct Measure { pub async fn get_model_files(dir_path: Option<&str>) -> Result> { let mut model_objects = Vec::new(); let path = match dir_path { - Some(p) => std::path::Path::new("models").join(p), + Some(p) => std::path::PathBuf::from(p), None => std::path::Path::new("models").to_path_buf(), }; if !path.exists() { - return Err(anyhow::anyhow!("Directory not found: {}", path.display())); + return Err(anyhow::anyhow!("Path not found: {}", path.display())); + } + + if path.is_file() { + if let Some(ext) = path.extension() { + if ext == "yml" { + process_yml_file(&path, &mut model_objects, dir_path.is_some()).await?; + } else { + return Err(anyhow::anyhow!("File must be a YML file: {}", path.display())); + } + } else { + return Err(anyhow::anyhow!("File must be a YML file: {}", path.display())); + } + } else { + process_directory(&path, &mut model_objects, dir_path.is_some()).await?; } - process_directory(&path, &mut model_objects).await?; Ok(model_objects) } +async fn process_yml_file( + path: &std::path::Path, + model_objects: &mut Vec, + is_custom_path: bool, +) -> Result<()> { + println!("πŸ“„ Processing YAML file: {}", path.display()); + + let yaml_content = match fs::read_to_string(path).await { + Ok(content) => content, + Err(e) => { + println!("❌ Failed to read YAML file {}: {}", path.display(), e); + return Ok(()); + } + }; + + let model: BusterModel = match serde_yaml::from_str(&yaml_content) { + Ok(model) => model, + Err(e) => { + println!("⚠️ Skipping invalid YAML file {}: {}", path.display(), e); + return Ok(()); + } + }; + + if is_custom_path { + // In custom path mode, we don't need SQL files + model_objects.push(BusterModelObject { + sql_definition: None, + model_file: model, + yml_content: yaml_content, + }); + println!("βœ… Successfully processed YML file: {}", path.display()); + } else { + // In default mode, we require SQL files + let sql_path = path.with_extension("sql"); + if !sql_path.exists() { + println!("⚠️ Skipping {} - No matching SQL file found at {}", path.display(), sql_path.display()); + return Ok(()); + } + + let sql_definition = match tokio::fs::read_to_string(&sql_path).await { + Ok(content) => content, + Err(e) => { + println!("❌ Failed to read SQL file {}: {}", sql_path.display(), e); + return Ok(()); + } + }; + + model_objects.push(BusterModelObject { + sql_definition: Some(sql_definition), + model_file: model, + yml_content: yaml_content, + }); + println!("βœ… Successfully processed model file: {}", path.display()); + } + + Ok(()) +} + async fn process_directory( dir_path: &std::path::Path, model_objects: &mut Vec, + is_custom_path: bool, ) -> Result<()> { println!("πŸ“‚ Processing directory: {}", dir_path.display()); let mut dir = tokio::fs::read_dir(dir_path).await?; @@ -88,50 +165,13 @@ async fn process_directory( let path = entry.path(); if path.is_dir() { - Box::pin(process_directory(&path, model_objects)).await?; + Box::pin(process_directory(&path, model_objects, is_custom_path)).await?; continue; } if let Some(ext) = path.extension() { if ext == "yml" { - println!("πŸ“„ Found YAML file: {}", path.display()); - let sql_path = path.with_extension("sql"); - - if !sql_path.exists() { - println!("⚠️ Skipping {} - No matching SQL file found at {}", path.display(), sql_path.display()); - continue; - } - - let sql_definition = match tokio::fs::read_to_string(&sql_path).await { - Ok(content) => content, - Err(e) => { - println!("❌ Failed to read SQL file {}: {}", sql_path.display(), e); - continue; - } - }; - - let yaml_content = match fs::read_to_string(&path).await { - Ok(content) => content, - Err(e) => { - println!("❌ Failed to read YAML file {}: {}", path.display(), e); - continue; - } - }; - - let model: BusterModel = match serde_yaml::from_str(&yaml_content) { - Ok(model) => model, - Err(e) => { - println!("❌ Failed to parse YAML file {}: {}", path.display(), e); - continue; - } - }; - - println!("βœ… Successfully processed model file: {}", path.display()); - model_objects.push(BusterModelObject { - sql_definition, - model_file: model, - yml_content: yaml_content, - }); + process_yml_file(&path, model_objects, is_custom_path).await?; } } } @@ -206,7 +246,7 @@ pub async fn upload_model_files( model: semantic_model.model, schema: schema_name.clone(), description: semantic_model.description, - sql_definition: Some(model.sql_definition.clone()), + sql_definition: model.sql_definition.clone(), entity_relationships: Some(entity_relationships), columns, yml_file: Some(model.yml_content.clone()), From d4825c0ffeb51597b35e137271838549c07967da Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 4 Feb 2025 14:32:23 -0800 Subject: [PATCH 2/3] bugfix(snowflake_query): add data processing helpers for query results (#88) - Introduce helper functions for processing string and JSON values - Implement case-insensitive string and JSON value transformations - Add robust timestamp parsing with error handling - Enhance Snowflake query result processing with consistent data normalization --- .../snowflake_query.rs | 55 +++++++++++++++---- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs b/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs index 51ce89925..fc31cd58f 100644 --- a/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs +++ b/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs @@ -11,13 +11,39 @@ use arrow::array::{ use arrow::datatypes::TimeUnit; use anyhow::{anyhow, Error}; -use chrono::{LocalResult, TimeZone, Utc, NaiveTime}; +use chrono::{DateTime, LocalResult, NaiveTime, TimeZone, Utc}; use snowflake_api::SnowflakeApi; use serde_json::Value; use crate::utils::query_engine::data_types::DataType; +// Add helper functions at the top level +fn process_string_value(value: String) -> String { + value.to_lowercase() +} + +fn process_json_value(value: Value) -> Value { + match value { + Value::String(s) => Value::String(s.to_lowercase()), + Value::Array(arr) => Value::Array(arr.into_iter().map(process_json_value).collect()), + Value::Object(map) => { + let new_map = map.into_iter() + .map(|(k, v)| (k.to_lowercase(), process_json_value(v))) + .collect(); + Value::Object(new_map) + } + _ => value, + } +} + +fn parse_snowflake_timestamp(epoch_data: i64, subsec_nanos: u32) -> Result, Error> { + match Utc.timestamp_opt(epoch_data, subsec_nanos) { + LocalResult::Single(dt) => Ok(dt), + _ => Err(anyhow!("Invalid timestamp value")) + } +} + pub async fn snowflake_query( mut snowflake_client: SnowflakeApi, query: String, @@ -100,12 +126,12 @@ pub async fn snowflake_query( arrow::datatypes::DataType::Utf8 => { let array = column.as_any().downcast_ref::().unwrap(); if array.is_null(row_idx) { DataType::Null } - else { DataType::Text(Some(array.value(row_idx).to_string())) } + else { DataType::Text(Some(process_string_value(array.value(row_idx).to_string()))) } } arrow::datatypes::DataType::LargeUtf8 => { let array = column.as_any().downcast_ref::().unwrap(); if array.is_null(row_idx) { DataType::Null } - else { DataType::Text(Some(array.value(row_idx).to_string())) } + else { DataType::Text(Some(process_string_value(array.value(row_idx).to_string()))) } } arrow::datatypes::DataType::Binary => { let array = column.as_any().downcast_ref::().unwrap(); @@ -144,8 +170,9 @@ pub async fn snowflake_query( } arrow::datatypes::DataType::Timestamp(unit, tz) => { let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { + if array.is_null(row_idx) { + DataType::Null + } else { let nanos = array.value(row_idx); let (secs, subsec_nanos) = match unit { TimeUnit::Second => (nanos, 0), @@ -153,12 +180,16 @@ pub async fn snowflake_query( TimeUnit::Microsecond => (nanos / 1_000_000, (nanos % 1_000_000) * 1000), TimeUnit::Nanosecond => (nanos / 1_000_000_000, nanos % 1_000_000_000), }; - match Utc.timestamp_opt(secs as i64, subsec_nanos as u32) { - LocalResult::Single(dt) => match tz { + + match parse_snowflake_timestamp(secs as i64, subsec_nanos as u32) { + Ok(dt) => match tz { Some(_) => DataType::Timestamptz(Some(dt)), None => DataType::Timestamp(Some(dt.naive_utc())), }, - _ => DataType::Null, + Err(e) => { + tracing::error!("Failed to parse timestamp: {}", e); + DataType::Null + } } } } @@ -281,14 +312,14 @@ pub async fn snowflake_query( } else if let Some(num) = values.as_any().downcast_ref::() { Some(Value::Number(num.value(i).into())) } else if let Some(str) = values.as_any().downcast_ref::() { - Some(Value::String(str.value(i).to_string())) + Some(Value::String(process_string_value(str.value(i).to_string()))) } else { None } }) .collect() ); - DataType::Json(Some(json_array)) + DataType::Json(Some(process_json_value(json_array))) } } arrow::datatypes::DataType::Struct(fields) => { @@ -309,7 +340,7 @@ pub async fn snowflake_query( }; map.insert(field_name.to_string(), value); } - DataType::Json(Some(Value::Object(map))) + DataType::Json(Some(process_json_value(Value::Object(map)))) } } arrow::datatypes::DataType::Union(_, _) => { @@ -347,7 +378,7 @@ pub async fn snowflake_query( json_map.insert(key.to_string(), Value::Number(value.into())); } } - DataType::Json(Some(Value::Object(json_map))) + DataType::Json(Some(process_json_value(Value::Object(json_map)))) } } arrow::datatypes::DataType::RunEndEncoded(_, _) => { From d0ff21e10d9e5f1fc8e0bcfb0e7d8b0f7b72322d Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 4 Feb 2025 15:10:54 -0800 Subject: [PATCH 3/3] Merge pull request #90 from buster-so/dal/stored_values_enum_push_to_description MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit feat(stored_values): enhance column value processing with enum detect… --- api/src/utils/stored_values/mod.rs | 43 ++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/api/src/utils/stored_values/mod.rs b/api/src/utils/stored_values/mod.rs index 19c776ac5..da2044e76 100644 --- a/api/src/utils/stored_values/mod.rs +++ b/api/src/utils/stored_values/mod.rs @@ -7,7 +7,8 @@ use chrono::Utc; use diesel::prelude::*; use diesel_async::RunQueryDsl; use uuid::Uuid; -use crate::database::lib::get_pg_pool; +use crate::database::enums::StoredValuesStatus; +use crate::database::{lib::get_pg_pool, schema::dataset_columns}; use crate::utils::clients::ai::embedding_router::embedding_router; use diesel::sql_types::{Text, Uuid as SqlUuid, Array, Float4, Timestamptz, Integer}; @@ -88,6 +89,9 @@ pub async fn store_column_values( // Query distinct values in batches let mut offset = 0; + let mut first_batch = true; + let schema_name = organization_id.to_string().replace("-", "_"); + loop { let query = format!( "SELECT DISTINCT \"{}\" as value @@ -104,6 +108,9 @@ pub async fn store_column_values( Ok(results) => results, Err(e) => { tracing::error!("Error querying stored values: {:?}", e); + if first_batch { + return Err(e); + } vec![] } }; @@ -128,10 +135,41 @@ pub async fn store_column_values( break; } + // If this is the first batch and we have 15 or fewer values, handle as enum + if first_batch && values.len() <= 15 { + // Get current description + let current_description = diesel::sql_query("SELECT description FROM dataset_columns WHERE id = $1") + .bind::(column_id) + .get_result::(&mut conn) + .await + .ok() + .and_then(|row| Some(row.value)); + + // Format new description + let enum_list = format!("Values for this column are: {}", values.join(", ")); + let new_description = match current_description { + Some(desc) if !desc.is_empty() => format!("{}. {}", desc, enum_list), + _ => enum_list, + }; + + // Update column description + diesel::update(dataset_columns::table) + .filter(dataset_columns::id.eq(column_id)) + .set(( + dataset_columns::description.eq(new_description), + dataset_columns::stored_values_status.eq(StoredValuesStatus::Success), + dataset_columns::stored_values_count.eq(values.len() as i64), + dataset_columns::stored_values_last_synced.eq(Utc::now()), + )) + .execute(&mut conn) + .await?; + + return Ok(()); + } + // Create embeddings for the batch let embeddings = create_embeddings_batch(&values).await?; - let schema_name = organization_id.to_string().replace("-", "_"); // Insert values and embeddings for (value, embedding) in values.iter().zip(embeddings.iter()) { let insert_sql = format!( @@ -154,6 +192,7 @@ pub async fn store_column_values( .await?; } + first_batch = false; offset += BATCH_SIZE; }