diff --git a/api/libs/stored_values/src/schema.rs b/api/libs/stored_values/src/schema.rs index 8bb590180..921aecdc4 100644 --- a/api/libs/stored_values/src/schema.rs +++ b/api/libs/stored_values/src/schema.rs @@ -90,65 +90,65 @@ pub async fn create_search_schema(data_source_id: Uuid) -> Result<()> { info!(%schema_name, "Unique index on (value, db, schema, table, column) created successfully"); // 6. Create schema-specific embedding input function - let input_fn_name = format!("embedding_input_{}", schema_name); - let create_input_fn_query = format!(r#" - CREATE OR REPLACE FUNCTION "{}"."{}"(rec "{}"."searchable_column_values") - RETURNS text - LANGUAGE plpgsql - IMMUTABLE - AS $body$ - BEGIN - RETURN rec.value; - END; - $body$; - "#, - schema_name, // Create function within the target schema - input_fn_name, - schema_name // Reference table type within the schema - ); - conn.execute(create_input_fn_query.as_str()) - .await - .with_context(|| format!("Failed to create embedding input function in schema {}", schema_name))?; - info!(%schema_name, function_name=%input_fn_name, "Embedding input function created"); + // let input_fn_name = format!("embedding_input_{}", schema_name); + // let create_input_fn_query = format!(r#" + // CREATE OR REPLACE FUNCTION "{}"."{}"(rec "{}"."searchable_column_values") + // RETURNS text + // LANGUAGE plpgsql + // IMMUTABLE + // AS $body$ + // BEGIN + // RETURN rec.value; + // END; + // $body$; + // "#, + // schema_name, // Create function within the target schema + // input_fn_name, + // schema_name // Reference table type within the schema + // ); + // conn.execute(create_input_fn_query.as_str()) + // .await + // .with_context(|| format!("Failed to create embedding input function in schema {}", schema_name))?; + // info!(%schema_name, function_name=%input_fn_name, "Embedding input function created"); // 7. Create INSERT trigger for embeddings - let insert_trigger_name = format!("embed_values_on_insert_{}", schema_name); - let create_insert_trigger_query = format!(r#" - CREATE OR REPLACE TRIGGER "{}" - AFTER INSERT - ON "{}"."searchable_column_values" - FOR EACH ROW - EXECUTE FUNCTION util.queue_embeddings('{}.{}', 'embedding'); - "#, - insert_trigger_name, - schema_name, // Table schema - schema_name, // Function schema - input_fn_name // Function name - ); - conn.execute(create_insert_trigger_query.as_str()) - .await - .with_context(|| format!("Failed to create insert trigger for embeddings in schema {}", schema_name))?; - info!(%schema_name, trigger_name=%insert_trigger_name, "Insert trigger for embeddings created"); + // let insert_trigger_name = format!("embed_values_on_insert_{}", schema_name); + // let create_insert_trigger_query = format!(r#" + // CREATE OR REPLACE TRIGGER "{}" + // AFTER INSERT + // ON "{}"."searchable_column_values" + // FOR EACH ROW + // EXECUTE FUNCTION util.queue_embeddings('{}.{}', 'embedding'); + // "#, + // insert_trigger_name, + // schema_name, // Table schema + // schema_name, // Function schema + // input_fn_name // Function name + // ); + // conn.execute(create_insert_trigger_query.as_str()) + // .await + // .with_context(|| format!("Failed to create insert trigger for embeddings in schema {}", schema_name))?; + // info!(%schema_name, trigger_name=%insert_trigger_name, "Insert trigger for embeddings created"); // 8. Create UPDATE trigger for embeddings - let update_trigger_name = format!("embed_values_on_update_{}", schema_name); - let create_update_trigger_query = format!(r#" - CREATE OR REPLACE TRIGGER "{}" - AFTER UPDATE OF value -- Only trigger if 'value' changes - ON "{}"."searchable_column_values" - FOR EACH ROW - WHEN (OLD.value IS DISTINCT FROM NEW.value) -- Ensure value actually changed - EXECUTE FUNCTION util.queue_embeddings('{}.{}', 'embedding'); - "#, - update_trigger_name, - schema_name, // Table schema - schema_name, // Function schema - input_fn_name // Function name - ); - conn.execute(create_update_trigger_query.as_str()) - .await - .with_context(|| format!("Failed to create update trigger for embeddings in schema {}", schema_name))?; - info!(%schema_name, trigger_name=%update_trigger_name, "Update trigger for embeddings created"); + // let update_trigger_name = format!("embed_values_on_update_{}", schema_name); + // let create_update_trigger_query = format!(r#" + // CREATE OR REPLACE TRIGGER "{}" + // AFTER UPDATE OF value -- Only trigger if 'value' changes + // ON "{}"."searchable_column_values" + // FOR EACH ROW + // WHEN (OLD.value IS DISTINCT FROM NEW.value) -- Ensure value actually changed + // EXECUTE FUNCTION util.queue_embeddings('{}.{}', 'embedding'); + // "#, + // update_trigger_name, + // schema_name, // Table schema + // schema_name, // Function schema + // input_fn_name // Function name + // ); + // conn.execute(create_update_trigger_query.as_str()) + // .await + // .with_context(|| format!("Failed to create update trigger for embeddings in schema {}", schema_name))?; + // info!(%schema_name, trigger_name=%update_trigger_name, "Update trigger for embeddings created"); Ok(()) } diff --git a/cli/cli/src/commands/init.rs b/cli/cli/src/commands/init.rs index 254032889..5c0aff02c 100644 --- a/cli/cli/src/commands/init.rs +++ b/cli/cli/src/commands/init.rs @@ -47,6 +47,13 @@ impl std::fmt::Display for DatabaseType { // Using shared RedshiftCredentials from query_engine now, no need for local definition +// Helper struct to parse dbt_project.yml +#[derive(Debug, Deserialize)] +struct DbtProject { + #[serde(rename = "model-paths")] + model_paths: Option>, +} + pub async fn init(destination_path: Option<&str>) -> Result<()> { println!("{}", "Initializing Buster...".bold().green()); @@ -1152,12 +1159,55 @@ fn create_buster_config_file( database: &str, schema: Option<&str>, ) -> Result<()> { - // Prompt for model paths (optional) + // --- BEGIN DBT PROJECT DETECTION --- + let mut suggested_model_paths = "".to_string(); // Default to empty string + + // Construct path to dbt_project.yml relative to the buster.yml path + if let Some(parent_dir) = path.parent() { + let dbt_project_path = parent_dir.join("dbt_project.yml"); + if dbt_project_path.exists() && dbt_project_path.is_file() { + match fs::read_to_string(&dbt_project_path) { + Ok(content) => { + match serde_yaml::from_str::(&content) { + Ok(dbt_config) => { + // Use specified model-paths or default ["models"] if present in file + let paths_to_suggest = dbt_config.model_paths.unwrap_or_else(|| vec!["models".to_string()]); + if !paths_to_suggest.is_empty() { + suggested_model_paths = paths_to_suggest.join(","); + println!( + "{}", + format!("Found dbt_project.yml, suggesting model paths: {}", suggested_model_paths.cyan()).dimmed() + ); + } + }, + Err(e) => { + // Log error but don't fail the init process + eprintln!( + "{}", + format!("Warning: Failed to parse {}: {}. Proceeding without suggested model paths.", dbt_project_path.display(), e).yellow() + ); + } + } + }, + Err(e) => { + eprintln!( + "{}", + format!("Warning: Failed to read {}: {}. Proceeding without suggested model paths.", dbt_project_path.display(), e).yellow() + ); + } + } + } + } + // --- END DBT PROJECT DETECTION --- + + + // Prompt for model paths (optional), now with potential initial input let model_paths_input = Text::new( "Enter paths to your SQL models (optional, comma-separated):", ) + .with_default(&suggested_model_paths) // Use with_default instead .with_help_message( - "Leave blank to use current directory, or specify paths like './models,./analytics/models'", + "Leave blank if none, or specify paths like './models,./analytics/models'", ) .prompt()?;