diff --git a/api/libs/rerank/Cargo.toml b/api/libs/rerank/Cargo.toml new file mode 100644 index 000000000..7476f5f3c --- /dev/null +++ b/api/libs/rerank/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "rerank" +version = "0.1.0" +edition = "2021" + +[dependencies] +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/api/libs/rerank/src/lib.rs b/api/libs/rerank/src/lib.rs new file mode 100644 index 000000000..4df2ba84f --- /dev/null +++ b/api/libs/rerank/src/lib.rs @@ -0,0 +1,87 @@ +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::error::Error; + +#[derive(Debug)] +pub enum RerankerType { + Cohere, + Mxbai, + Jina, +} + +pub struct Reranker { + reranker_type: RerankerType, + api_key: String, + base_url: String, + model: String, + client: Client, +} + +impl Reranker { + pub fn new() -> Result> { + let provider = std::env::var("RERANK_PROVIDER")?; + let reranker_type = match provider.to_lowercase().as_str() { + "cohere" => RerankerType::Cohere, + "mxbai" => RerankerType::Mxbai, + "jina" => RerankerType::Jina, + _ => return Err("Invalid provider specified".into()), + }; + let api_key = std::env::var("RERANK_API_KEY")?; + let model = std::env::var("RERANK_MODEL")?; + let base_url = match reranker_type { + RerankerType::Cohere => "https://api.cohere.com/v2/rerank", + RerankerType::Mxbai => "https://api.mixedbread.ai/v1/rerank", + RerankerType::Jina => "https://api.jina.ai/v1/rerank", + }.to_string(); + let client = Client::new(); + Ok(Self { + reranker_type, + api_key, + base_url, + model, + client, + }) + } + + pub async fn rerank( + &self, + query: &str, + documents: &[&str], + top_n: usize, + ) -> Result, Box> { + let request_body = RerankRequest { + query: query.to_string(), + documents: documents.iter().map(|s| s.to_string()).collect(), + top_n, + model: self.model.clone(), + }; + let response = self + .client + .post(&self.base_url) + .header("Authorization", format!("Bearer {}", self.api_key)) + .json(&request_body) + .send() + .await?; + let response_body: RerankResponse = response.json().await?; + Ok(response_body.results) + } +} + +#[derive(Serialize)] +struct RerankRequest { + query: String, + documents: Vec, + top_n: usize, + model: String, +} + +#[derive(Deserialize)] +struct RerankResponse { + results: Vec, +} + +#[derive(Deserialize)] +pub struct RerankResult { + pub index: usize, + pub relevance_score: f32, +} \ No newline at end of file diff --git a/cli/cli/src/commands/deploy/deploy.rs b/cli/cli/src/commands/deploy/deploy.rs index 287323413..b6f9a7cc3 100644 --- a/cli/cli/src/commands/deploy/deploy.rs +++ b/cli/cli/src/commands/deploy/deploy.rs @@ -128,7 +128,7 @@ impl ProgressTracker for DeployProgress { } /// Parse a YAML model file into semantic_layer::Model structs -fn parse_model_file(file_path: &Path) -> Result> { +pub fn parse_model_file(file_path: &Path) -> Result> { let yml_content = std::fs::read_to_string(file_path)?; // First try parsing as a SemanticLayerSpec (with top-level 'models' key) @@ -150,7 +150,7 @@ fn parse_model_file(file_path: &Path) -> Result> { /// 1. Values in the model itself /// 2. Values from the project context /// 3. Values from the global config -fn resolve_model_configurations( +pub fn resolve_model_configurations( models_with_context: Vec<(Model, Option<&ProjectContext>)>, global_buster_config: &BusterConfig, ) -> Result> { diff --git a/cli/cli/src/commands/generate.rs b/cli/cli/src/commands/generate.rs index 1bdcae6e8..b19dda9e0 100644 --- a/cli/cli/src/commands/generate.rs +++ b/cli/cli/src/commands/generate.rs @@ -3,378 +3,282 @@ use colored::*; use std::collections::{HashMap, HashSet}; use std::fs; use std::path::{Path, PathBuf}; -use std::time::Duration; +// use std::time::Duration; // Duration seems unused here now use crate::utils::config::BusterConfig; -use crate::commands::init::{YamlModel, YamlSemanticLayerSpec, YamlDimension, YamlMeasure, is_measure_type}; +use crate::commands::init::{YamlModel, YamlDimension, YamlMeasure}; // is_measure_type is also in init -use dbt_utils::models::{DbtCatalog, DbtNode, DbtColumn, DbtNodeMetadata, DbtCatalogMetadata}; +// Use new struct names from dbt_utils +use dbt_utils::models::{DbtCatalog, CatalogNode, ColumnMetadata, TableMetadata}; // CatalogMetadata might not be directly used here use dbt_utils::{run_dbt_docs_generate, load_and_parse_catalog}; -use indicatif::{ProgressBar, ProgressStyle}; +use indicatif::{ProgressBar, ProgressStyle}; // Keep for progress spinners if any remain or are added use inquire::Confirm; -use glob::{Pattern}; +use glob::{glob, Pattern}; pub async fn generate_semantic_models_command( path_arg: Option, - target_output_dir_arg: Option, + target_output_dir_arg: Option, // This argument determines the *base* output dir if specified ) -> Result<()> { - println!( - "{}", - "Starting semantic model generation/update...".bold().blue() - ); + println!("{}", "Starting semantic model generation/update (file-first approach)...".bold().blue()); - // 1. Determine Buster configuration directory (where buster.yml is or should be) - // For now, assume current directory. This might need to be more sophisticated if target_output_dir_arg implies a different project. let buster_config_dir = std::env::current_dir().context("Failed to get current directory")?; + let buster_config = BusterConfig::load_from_dir(&buster_config_dir)?.ok_or_else(|| { + anyhow!("buster.yml not found in {}. Please run 'buster init' first.", buster_config_dir.display()) + })?; - // 2. Load BusterConfig - let buster_config = match BusterConfig::load_from_dir(&buster_config_dir) { - Ok(Some(cfg)) => cfg, - Ok(None) => { - return Err(anyhow!( - "buster.yml not found in {}. Please run 'buster init' first or ensure buster.yml exists.", - buster_config_dir.display() - )); - } - Err(e) => { - return Err(anyhow!("Failed to load buster.yml: {}. Please ensure it is correctly formatted.", e)); - } - }; - - // 3. Determine target semantic YAML base directory and generation mode - let mut is_side_by_side_generation = false; - let effective_semantic_models_base_dir: PathBuf; // Base for path construction - - if let Some(path_str) = target_output_dir_arg { - // User specified an output directory via CLI arg. Not side-by-side. - effective_semantic_models_base_dir = if Path::new(&path_str).is_absolute() { - PathBuf::from(path_str) - } else { - buster_config_dir.join(path_str) - }; - println!("Target semantic models base directory (from CLI arg): {}", effective_semantic_models_base_dir.display().to_string().cyan()); - fs::create_dir_all(&effective_semantic_models_base_dir).with_context(|| format!("Failed to create semantic models base directory: {}", effective_semantic_models_base_dir.display()))?; - } else { - // No CLI arg, check buster.yml config - let configured_semantic_paths = buster_config.projects.as_ref() - .and_then(|projs| projs.first()) - .and_then(|proj| proj.semantic_model_paths.as_ref()); - - if configured_semantic_paths.map_or(true, |paths| paths.is_empty()) { // Default to side-by-side if None or empty list - is_side_by_side_generation = true; - effective_semantic_models_base_dir = buster_config_dir.clone(); // Project root is the base for side-by-side - println!("Semantic models will be generated side-by-side with SQL models (base: {}).", effective_semantic_models_base_dir.display().to_string().cyan()); - // No specific single base directory to create for all YAMLs in this mode. - } else { - // Configured path(s) exist, use the first one. Not side-by-side. - let first_path_str = configured_semantic_paths.unwrap().first().unwrap(); // Safe due to map_or and is_empty checks - effective_semantic_models_base_dir = if Path::new(first_path_str).is_absolute() { - PathBuf::from(first_path_str) - } else { - buster_config_dir.join(first_path_str) - }; - println!("Target semantic models base directory (from buster.yml): {}", effective_semantic_models_base_dir.display().to_string().cyan()); - fs::create_dir_all(&effective_semantic_models_base_dir).with_context(|| format!("Failed to create semantic models base directory: {}", effective_semantic_models_base_dir.display()))?; - } - } - - // 4. Load existing semantic models - THIS LOGIC WILL CHANGE SIGNIFICANTLY. - // For now, we clear it as we load 1-to-1. - let mut existing_yaml_models_map: HashMap = HashMap::new(); - - let initial_model_count = 0; // This will be re-evaluated based on files found - - // 5. Run dbt docs generate (similar to init.rs) - let dbt_project_path = &buster_config_dir; // Assuming buster.yml is at the root of dbt project - let catalog_json_path = dbt_project_path.join("target").join("catalog.json"); - + // --- 1. Load Catalog & Build Lookup Map --- + let catalog_json_path = buster_config_dir.join("target").join("catalog.json"); if Confirm::new("Run 'dbt docs generate' to refresh dbt catalog (catalog.json)?") .with_default(true) .prompt()? { - // Use the dbt_utils helper - match run_dbt_docs_generate(dbt_project_path).await { - Ok(_) => { /* Success is logged by the helper */ } - Err(e) => { - eprintln!("{}", format!("Failed to run 'dbt docs generate' via dbt_utils: {}. Proceeding with existing catalog.json if available.", e).yellow()); - } + match run_dbt_docs_generate(&buster_config_dir).await { + Ok(_) => {}, + Err(e) => eprintln!("{}", format!("'dbt docs generate' error. Proceeding with existing catalog. Error: {}", e).yellow()), } } else { - println!("{}", "Skipping 'dbt docs generate'. Will look for existing catalog.json.".dimmed()); + println!("{}", "Skipping 'dbt docs generate'. Using existing catalog.json.".dimmed()); } - // Load and parse catalog.json using dbt_utils helper + if !catalog_json_path.exists() { + eprintln!("{}", format!("✗ catalog.json not found at {}. Cannot generate/update models.", catalog_json_path.display()).red()); + return Ok(()); + } let dbt_catalog = match load_and_parse_catalog(&catalog_json_path) { Ok(catalog) => { - println!("{}", "✓ Successfully parsed catalog.json via dbt_utils.".green()); + println!("{}", "✓ Successfully parsed catalog.json.".green()); catalog } Err(e) => { - eprintln!("{}", format!("✗ Error loading/parsing catalog.json via dbt_utils: {}. Ensure catalog.json exists and is valid.", e).red()); - return Err(e.into()); // Propagate error if catalog loading fails + eprintln!("{}", format!("✗ Error loading/parsing catalog.json: {}. Cannot generate/update.", e).red()); + return Ok(()); } }; - // 7. Determine scope & reconcile - // This is a placeholder for the detailed reconciliation logic - // described in the plan (new models, updated models/columns, deleted models/columns) - println!("{}", "Reconciling semantic models with dbt catalog...".yellow()); + let catalog_nodes_by_name: HashMap = dbt_catalog.nodes.values() + .filter_map(|node| { + if node.derived_resource_type.as_deref() == Some("model") { + node.derived_model_name_from_file.as_ref().map(|name| (name.clone(), node)) + } else { None } + }) + .collect(); - // Collect model_paths from buster_config for scoping (similar to init.rs) - let mut configured_model_path_patterns: Vec = Vec::new(); - if let Some(projects) = &buster_config.projects { - for pc in projects { - if let Some(model_paths) = &pc.model_paths { - for path_str in model_paths { - let pattern_str = buster_config_dir.join(path_str).join("**").join("*.sql").to_string_lossy().into_owned(); - match Pattern::new(&pattern_str) { - Ok(p) => configured_model_path_patterns.push(p), - Err(e) => eprintln!("{}", format!("Warning: Invalid glob pattern '{}': {}", pattern_str, e).yellow()), + if catalog_nodes_by_name.is_empty() { + println!("{}", "No models found in dbt catalog. Nothing to generate/update.".yellow()); + return Ok(()); + } + + // --- 2. Determine SQL Files to Process (based on path_arg or buster.yml model_paths) --- + let mut sql_files_to_process: HashSet = HashSet::new(); + let dbt_project_model_roots_for_stripping = crate::commands::init::parse_dbt_project_file_content(&buster_config_dir)?.as_ref() + .map(|c| c.model_paths.iter().map(PathBuf::from).collect::>()) + .unwrap_or_else(|| vec![PathBuf::from("models")]); + + if let Some(pa_str) = &path_arg { + let target_path = buster_config_dir.join(pa_str); + if target_path.is_file() && target_path.extension().map_or(false, |ext| ext == "sql") { + sql_files_to_process.insert(target_path); + } else if target_path.is_dir() { + let glob_pattern = target_path.join("**/*.sql"); + match glob(&glob_pattern.to_string_lossy()) { + Ok(paths) => paths.for_each(|entry| if let Ok(path) = entry { if path.is_file() { sql_files_to_process.insert(path); } }), + Err(e) => eprintln!("{}", format!("Error globbing '{}': {}", glob_pattern.display(), e).yellow()), + } + } else { + eprintln!("{}", format!("Warning: path_arg '{}' is not a valid SQL file or directory. Processing all configured models.", pa_str).yellow()); + // Fall through to buster.yml model_paths if path_arg is invalid + } + } + + if sql_files_to_process.is_empty() { // If path_arg didn't yield files, or wasn't provided, use buster.yml config + let mut patterns_from_config: Vec = Vec::new(); + if let Some(projects) = &buster_config.projects { + if let Some(first_project) = projects.first() { + if let Some(mp_globs) = &first_project.model_paths { + for path_str_glob in mp_globs { + match Pattern::new(&buster_config_dir.join(path_str_glob).to_string_lossy()) { + Ok(p) => patterns_from_config.push(p), + Err(e) => eprintln!("{}", format!("Warning: Invalid glob pattern from buster.yml '{}': {}", path_str_glob, e).yellow()), + } } } } } - } - // Also consider top-level model_paths if no projects or for global scope - if let Some(model_paths) = &buster_config.model_paths { - for path_str in model_paths { - let pattern_str = buster_config_dir.join(path_str).join("**").join("*.sql").to_string_lossy().into_owned(); - match Pattern::new(&pattern_str) { - Ok(p) => configured_model_path_patterns.push(p), - Err(e) => eprintln!("{}", format!("Warning: Invalid glob pattern '{}': {}", pattern_str, e).yellow()), + if patterns_from_config.is_empty() { // Still no patterns, use dbt_project.yml defaults + println!("{}", "No specific model paths from path_arg or buster.yml. Using dbt_project.yml model-paths.".dimmed()); + for dbt_root_rel in &dbt_project_model_roots_for_stripping { + let glob_pattern = buster_config_dir.join(dbt_root_rel).join("**/*.sql"); + match glob(&glob_pattern.to_string_lossy()) { + Ok(paths) => paths.for_each(|entry| if let Ok(p) = entry { if p.is_file() {sql_files_to_process.insert(p);}}), + Err(e) => eprintln!("{}", format!("Error globbing default dbt path '{}': {}",glob_pattern.display(),e).yellow()), + } + } + } else { + println!("{}", format!("Scanning for SQL files based on buster.yml model_paths: {:?}", patterns_from_config.iter().map(|p|p.as_str()).collect::>()).dimmed()); + for pattern in patterns_from_config { + match glob(pattern.as_str()) { + Ok(paths) => paths.for_each(|entry| if let Ok(p) = entry { if p.is_file() && p.extension().map_or(false, |e|e=="sql"){sql_files_to_process.insert(p);}}), + Err(e) => eprintln!("{}",format!("Glob pattern error for '{}': {}",pattern.as_str(),e).yellow()), + } } } } - let mut dbt_models_processed_count = 0; - let mut new_models_added_count = 0; + if sql_files_to_process.is_empty() { + println!("{}", "No SQL model files found to process/update based on configuration.".yellow()); + return Ok(()); + } + println!("{}", format!("Found {} SQL model file(s) for potential generation/update.", sql_files_to_process.len()).dimmed()); + + // --- 3. Determine Output Base Directory for Semantic Models --- + let semantic_models_base_dir_path_str = target_output_dir_arg.or_else(|| + buster_config.projects.as_ref() + .and_then(|p| p.first()) + .and_then(|proj| proj.semantic_model_paths.as_ref()) + .and_then(|paths| paths.first().cloned()) + ).unwrap_or_else(|| + // Default to side-by-side (empty string means relative to SQL file later) + String::new() + ); + + let (is_side_by_side_generation, semantic_output_base_abs_dir) = if semantic_models_base_dir_path_str.is_empty() { + println!("{}", "Semantic models output set to side-by-side with SQL files.".dimmed()); + (true, buster_config_dir.clone()) // Base for side-by-side is project root + } else { + let abs_path = if Path::new(&semantic_models_base_dir_path_str).is_absolute() { + PathBuf::from(&semantic_models_base_dir_path_str) + } else { + buster_config_dir.join(&semantic_models_base_dir_path_str) + }; + println!("{}", format!("Semantic models output base directory: {}", abs_path.display()).cyan()); + fs::create_dir_all(&abs_path).context(format!("Failed to create semantic models output dir: {}", abs_path.display()))?; + (false, abs_path) + }; + + // --- 4. Iterate SQL Files, Match to Catalog, Generate/Update YamlModels --- + let mut models_generated_count = 0; let mut models_updated_count = 0; let mut columns_added_count = 0; let mut columns_updated_count = 0; let mut columns_removed_count = 0; + let mut sql_models_successfully_processed_from_catalog_count = 0; // New counter - let mut processed_dbt_model_unique_ids: HashSet = HashSet::new(); // Using unique_id for tracking - - // Get dbt model source roots for path stripping (similar to init.rs) - let dbt_project_file_content_for_paths = crate::commands::init::parse_dbt_project_file_content(&buster_config_dir)?; - let dbt_model_source_roots: Vec = dbt_project_file_content_for_paths.as_ref() - .map(|content| content.model_paths.iter().map(PathBuf::from).collect()) - .unwrap_or_else(|| vec![PathBuf::from("models")]); - - for (dbt_node_id, dbt_node) in dbt_catalog.nodes.iter().filter(|(_,n)| { - match &n.resource_type { - Some(rt) => rt == "model", - None => { - eprintln!( - "{}", - format!( - "Warning: Skipping dbt node with unique_id: {} because it is missing 'resource_type' in catalog.json.", - n.unique_id - ).yellow() - ); - false - } + for sql_file_abs_path in sql_files_to_process { + let model_name_from_filename = sql_file_abs_path.file_stem().map_or_else(String::new, |s| s.to_string_lossy().into_owned()); + if model_name_from_filename.is_empty() { + eprintln!("{}", format!("Warning: Could not get model name from file {}. Skipping.", sql_file_abs_path.display()).yellow()); + continue; } - }) { - // Path construction for individual YAML - let Some(ref dbt_original_file_path_str) = dbt_node.original_file_path else { - eprintln!("{}", format!("Warning: Skipping dbt model {} due to missing 'original_file_path'.", dbt_node.unique_id).yellow()); + + let Some(catalog_node) = catalog_nodes_by_name.get(&model_name_from_filename) else { + eprintln!("{}", format!("Info: SQL model file '{}' found, but no corresponding entry ('{}') in dbt catalog. Skipping.", sql_file_abs_path.display(), model_name_from_filename).dimmed()); continue; }; - let dbt_model_path_obj = Path::new(dbt_original_file_path_str); - let mut relative_to_dbt_model_root = PathBuf::new(); - let mut found_base_for_stripping = false; - for dbt_source_root in &dbt_model_source_roots { // dbt_source_root is e.g. "models" - if let Ok(stripped_path) = dbt_model_path_obj.strip_prefix(dbt_source_root) { - relative_to_dbt_model_root = stripped_path.to_path_buf(); // e.g. "marts/sales/revenue.sql" - found_base_for_stripping = true; - break; - } - } - if !found_base_for_stripping { - // Fallback: if original_file_path_str didn't start with any known dbt_model_source_roots, - // then use original_file_path_str as is for the suffix part for dedicated dir mode. - // For side-by-side, the full original path is used anyway. - relative_to_dbt_model_root = dbt_model_path_obj.to_path_buf(); - eprintln!("{}", format!( - "Warning: Could not strip a known dbt model source root ('{:?}') from dbt model path '{}'. Using full path for suffix calculation: '{}'", - dbt_model_source_roots, dbt_original_file_path_str, relative_to_dbt_model_root.display() - ).yellow() - ); - } + let Some(ref table_meta) = catalog_node.metadata else { + eprintln!("{}", format!("Warning: Catalog entry for '{}' (file: {}) is missing metadata. Skipping.", model_name_from_filename, sql_file_abs_path.display()).yellow()); + continue; + }; + // actual_model_name_in_yaml is from catalog metadata.name + let actual_model_name_in_yaml = table_meta.name.clone(); + + println!("Processing: SQL '{}' -> Catalog Model '{}' (UniqueID: {})", + sql_file_abs_path.display().to_string().cyan(), + actual_model_name_in_yaml.purple(), + catalog_node.unique_id.as_deref().unwrap_or("N/A").dimmed() + ); + sql_models_successfully_processed_from_catalog_count += 1; // Increment here - let individual_semantic_yaml_path: PathBuf; - if is_side_by_side_generation { - // Side-by-side: YAML is next to SQL. dbt_original_file_path_str is relative to buster_config_dir. - individual_semantic_yaml_path = buster_config_dir.join(dbt_original_file_path_str).with_extension("yml"); + let relative_sql_path_str = pathdiff::diff_paths(&sql_file_abs_path, &buster_config_dir) + .map(|p| p.to_string_lossy().into_owned()) + .unwrap_or_else(|| sql_file_abs_path.to_string_lossy().into_owned()); + + let individual_semantic_yaml_path: PathBuf = if is_side_by_side_generation { + sql_file_abs_path.with_extension("yml") } else { - // Dedicated output directory (effective_semantic_models_base_dir) - // relative_to_dbt_model_root is the path part after the dbt model source root (e.g. "marts/sales/revenue.sql") - let yaml_filename_with_subdir = relative_to_dbt_model_root.with_extension("yml"); // e.g. "marts/sales/revenue.yml" - individual_semantic_yaml_path = effective_semantic_models_base_dir.join(yaml_filename_with_subdir); - } - - processed_dbt_model_unique_ids.insert(dbt_node.unique_id.clone()); // Store unique_id - - // --- Scoping logic (remains similar, but applied before file load) --- - let dbt_original_file_path_abs = buster_config_dir.join(dbt_original_file_path_str); - let is_in_configured_model_paths = configured_model_path_patterns.is_empty() || - configured_model_path_patterns.iter().any(|p| p.matches_path(&dbt_original_file_path_abs)); - - let is_in_path_arg_scope = match &path_arg { - Some(pa_str) => { - let target_path_abs = buster_config_dir.join(pa_str); - if target_path_abs.is_file() { - dbt_original_file_path_abs == target_path_abs - } else { // Assume directory - dbt_original_file_path_abs.starts_with(&target_path_abs) + let mut stripped_suffix_for_yaml: Option = None; + for dbt_root in &dbt_project_model_roots_for_stripping { + let abs_dbt_root = buster_config_dir.join(dbt_root); + if let Ok(stripped) = sql_file_abs_path.strip_prefix(&abs_dbt_root) { + stripped_suffix_for_yaml = Some(stripped.with_extension("yml")); + break; } } - None => true, + let final_suffix = stripped_suffix_for_yaml.unwrap_or_else(|| PathBuf::from(&model_name_from_filename).with_extension("yml")); + semantic_output_base_abs_dir.join(final_suffix) }; + if let Some(p) = individual_semantic_yaml_path.parent() { fs::create_dir_all(p)?; } - if !is_in_configured_model_paths || !is_in_path_arg_scope { - continue; - } + // --- Reconciliation Logic (Create or Update) --- + let existing_yaml_model_opt: Option = if individual_semantic_yaml_path.exists() { + fs::read_to_string(&individual_semantic_yaml_path) + .ok() + .and_then(|content| serde_yaml::from_str::(&content).ok()) + } else { None }; - // Ensure metadata and metadata.name exist - let Some(ref dbt_node_metadata) = dbt_node.metadata else { - eprintln!( - "{}", - format!( - "Warning: Skipping dbt node with unique_id: {} in generate because its 'metadata' block is missing.", - dbt_node.unique_id - ).yellow() - ); - continue; - }; - let Some(ref dbt_model_name_from_metadata) = dbt_node_metadata.name else { - eprintln!( - "{}", - format!( - "Warning: Skipping dbt model with unique_id: {} in generate because its 'metadata.name' is missing.", - dbt_node.unique_id - ).yellow() - ); - continue; - }; - let dbt_model_name_for_yaml = dbt_model_name_from_metadata.clone(); - - dbt_models_processed_count += 1; - // --- End Scoping Logic --- - - let existing_semantic_model_opt: Option = if individual_semantic_yaml_path.exists() { - match fs::read_to_string(&individual_semantic_yaml_path) { - Ok(content) => { - match serde_yaml::from_str::(&content) { - Ok(model) => Some(model), - Err(e) => { - eprintln!("{}", format!("Warning: Failed to parse existing semantic YAML '{}': {}. Will attempt to overwrite.", individual_semantic_yaml_path.display(), e).yellow()); - None - } - } - } - Err(e) => { - eprintln!("{}", format!("Warning: Failed to read existing semantic YAML '{}': {}. Will attempt to create anew.", individual_semantic_yaml_path.display(), e).yellow()); - None - } - } - } else { - None - }; - - match existing_semantic_model_opt { + match existing_yaml_model_opt { Some(mut existing_model) => { - // Existing model: Update it let mut model_was_updated = false; - println!("Updating existing semantic model: {} at {}", dbt_model_name_for_yaml.cyan(), individual_semantic_yaml_path.display()); - - if existing_model.name != dbt_model_name_for_yaml { - // This might happen if filename and inner model name differ. We prioritize dbt_model_name_for_yaml. - // Or if user manually changed name in YML. For now, dbt catalog is source of truth for name. - println!(" Aligning name in YAML from '{}' to '{}'", existing_model.name, dbt_model_name_for_yaml); - existing_model.name = dbt_model_name_for_yaml.clone(); - model_was_updated = true; + // Update name if it differs (dbt catalog is source of truth for relation name) + if existing_model.name != actual_model_name_in_yaml { + existing_model.name = actual_model_name_in_yaml.clone(); model_was_updated = true; } - - if let Some(dbt_comment) = &dbt_node_metadata.comment { - if existing_model.description.as_deref() != Some(dbt_comment.as_str()) { - existing_model.description = Some(dbt_comment.clone()); - model_was_updated = true; - } - } // Consider if dbt_comment=None should clear existing_model.description - - if existing_model.original_file_path.as_deref() != Some(dbt_original_file_path_str.as_str()) { - existing_model.original_file_path = Some(dbt_original_file_path_str.clone()); - model_was_updated = true; + // Update description from catalog if catalog has one + if table_meta.comment.is_some() && existing_model.description != table_meta.comment { + existing_model.description = table_meta.comment.clone(); model_was_updated = true; } - // Update DB/Schema if different - dbt catalog is source of truth - if existing_model.database != dbt_node.database { - existing_model.database = dbt_node.database.clone(); - model_was_updated = true; + // Update original_file_path + if existing_model.original_file_path.as_deref() != Some(&relative_sql_path_str) { + existing_model.original_file_path = Some(relative_sql_path_str.clone()); model_was_updated = true; } - if existing_model.schema != dbt_node.schema { - existing_model.schema = dbt_node.schema.clone(); - model_was_updated = true; + // Update db/schema from catalog node (which should be authoritative) + if existing_model.database.as_deref() != catalog_node.metadata.as_ref().and_then(|m| m.database.as_deref()) { + existing_model.database = catalog_node.metadata.as_ref().and_then(|m| m.database.clone()); model_was_updated = true; + } + if existing_model.schema.as_deref() != Some(table_meta.schema.as_str()) { // table_meta.schema is String, compare as &str + existing_model.schema = Some(table_meta.schema.clone()); model_was_updated = true; } // Reconcile columns let mut current_dims: Vec = Vec::new(); let mut current_measures: Vec = Vec::new(); - let mut dbt_columns_map: HashMap = dbt_node.columns.values().map(|c| (c.name.clone(), c)).collect(); + let mut dbt_columns_map: HashMap = catalog_node.columns.values().map(|c| (c.name.clone(), c)).collect(); for existing_dim_col in std::mem::take(&mut existing_model.dimensions) { if let Some(dbt_col) = dbt_columns_map.remove(&existing_dim_col.name) { let mut updated_dim = existing_dim_col.clone(); let mut dim_col_updated = false; - if updated_dim.type_.as_deref() != Some(dbt_col.column_type.as_str()) { - updated_dim.type_ = Some(dbt_col.column_type.clone()); - dim_col_updated = true; columns_updated_count +=1; + if updated_dim.type_.as_deref() != Some(&dbt_col.type_) { // type_ is String + updated_dim.type_ = Some(dbt_col.type_.clone()); dim_col_updated = true; columns_updated_count +=1; + } + if dbt_col.comment.is_some() && updated_dim.description != dbt_col.comment { + updated_dim.description = dbt_col.comment.clone(); dim_col_updated = true; columns_updated_count +=1; } - if let Some(dbt_col_comment) = &dbt_col.comment { - if updated_dim.description.as_deref() != Some(dbt_col_comment.as_str()) { - updated_dim.description = Some(dbt_col_comment.clone()); - dim_col_updated = true; columns_updated_count +=1; - } - } // else keep user's existing_dim.description current_dims.push(updated_dim); if dim_col_updated { model_was_updated = true; } - } else { - println!(" Removing dimension '{}' from semantic model '{}' (no longer in dbt model)", existing_dim_col.name.yellow(), dbt_model_name_for_yaml); - columns_removed_count += 1; model_was_updated = true; - } + } else { columns_removed_count += 1; model_was_updated = true; } } for existing_measure_col in std::mem::take(&mut existing_model.measures) { if let Some(dbt_col) = dbt_columns_map.remove(&existing_measure_col.name) { let mut updated_measure = existing_measure_col.clone(); let mut measure_col_updated = false; - if updated_measure.type_.as_deref() != Some(dbt_col.column_type.as_str()) { - updated_measure.type_ = Some(dbt_col.column_type.clone()); - measure_col_updated = true; columns_updated_count +=1; + if updated_measure.type_.as_deref() != Some(&dbt_col.type_) { // type_ is String + updated_measure.type_ = Some(dbt_col.type_.clone()); measure_col_updated = true; columns_updated_count +=1; + } + if dbt_col.comment.is_some() && updated_measure.description != dbt_col.comment { + updated_measure.description = dbt_col.comment.clone(); measure_col_updated = true; columns_updated_count +=1; } - if let Some(dbt_col_comment) = &dbt_col.comment { - if updated_measure.description.as_deref() != Some(dbt_col_comment.as_str()) { - updated_measure.description = Some(dbt_col_comment.clone()); - measure_col_updated = true; columns_updated_count +=1; - } - } // else keep user's description current_measures.push(updated_measure); if measure_col_updated { model_was_updated = true; } - } else { - println!(" Removing measure '{}' from semantic model '{}' (no longer in dbt model)", existing_measure_col.name.yellow(), dbt_model_name_for_yaml); - columns_removed_count += 1; model_was_updated = true; - } + } else { columns_removed_count += 1; model_was_updated = true; } } - - for (col_name, dbt_col) in dbt_columns_map { - println!(" Adding new column '{}' to semantic model '{}'", col_name.green(), dbt_model_name_for_yaml); - if crate::commands::init::is_measure_type(Some(dbt_col.column_type.as_str())) { // Assuming dbt_col.column_type is String - current_measures.push(YamlMeasure { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.column_type.clone()) }); + for (_col_name, dbt_col) in dbt_columns_map { // Remaining are new columns + if crate::commands::init::is_measure_type(&dbt_col.type_) { // type_ is String + current_measures.push(YamlMeasure { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.type_.clone()) }); } else { - current_dims.push(YamlDimension { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.column_type.clone()), searchable: false, options: None }); + current_dims.push(YamlDimension { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.type_.clone()), searchable: false, options: None }); } columns_added_count += 1; model_was_updated = true; } @@ -383,90 +287,46 @@ pub async fn generate_semantic_models_command( if model_was_updated { models_updated_count += 1; - let yaml_string = serde_yaml::to_string(&existing_model).context(format!("Failed to serialize updated semantic model {} to YAML", existing_model.name))?; - if let Some(parent_dir) = individual_semantic_yaml_path.parent() { fs::create_dir_all(parent_dir)?; } - fs::write(&individual_semantic_yaml_path, yaml_string).context(format!("Failed to write updated semantic model to {}", individual_semantic_yaml_path.display()))?; - } else { - println!(" No changes detected for semantic model: {}", dbt_model_name_for_yaml); - } + let yaml_string = serde_yaml::to_string(&existing_model)?; + fs::write(&individual_semantic_yaml_path, yaml_string)?; + } } - None => { - // New semantic model: Generate from scratch - println!("Generating new semantic model: {} at {}", dbt_model_name_for_yaml.green(), individual_semantic_yaml_path.display()); + None => { // New semantic model let mut dimensions = Vec::new(); let mut measures = Vec::new(); - for (_col_name, col) in &dbt_node.columns { // dbt_node.columns defaults to empty if missing - if crate::commands::init::is_measure_type(Some(col.column_type.as_str())) { // Assuming col.column_type is String - measures.push(YamlMeasure { - name: col.name.clone(), - description: col.comment.clone(), - type_: Some(col.column_type.clone()) - }); + for (_col_name, col_meta) in &catalog_node.columns { + if crate::commands::init::is_measure_type(&col_meta.type_) { // type_ is String + measures.push(YamlMeasure { name: col_meta.name.clone(), description: col_meta.comment.clone(), type_: Some(col_meta.type_.clone()) }); } else { - dimensions.push(YamlDimension { - name: col.name.clone(), - description: col.comment.clone(), - type_: Some(col.column_type.clone()), - searchable: false, - options: None - }); + dimensions.push(YamlDimension { name: col_meta.name.clone(), description: col_meta.comment.clone(), type_: Some(col_meta.type_.clone()), searchable: false, options: None }); } } let new_model = YamlModel { - name: dbt_model_name_for_yaml.clone(), - description: dbt_node_metadata.comment.clone(), // Use dbt_node_metadata + name: actual_model_name_in_yaml, + description: table_meta.comment.clone(), data_source_name: buster_config.projects.as_ref().and_then(|p|p.first()).and_then(|pc|pc.data_source_name.clone()), - database: dbt_node.database.clone(), - schema: dbt_node.schema.clone(), + database: table_meta.database.clone(), // From TableMetadata + schema: Some(table_meta.schema.clone()), // From TableMetadata (String -> Option) dimensions, measures, - original_file_path: Some(dbt_original_file_path_str.clone()), + original_file_path: Some(relative_sql_path_str), }; - let yaml_string = serde_yaml::to_string(&new_model).context(format!("Failed to serialize new semantic model {} to YAML", new_model.name))?; - if let Some(parent_dir) = individual_semantic_yaml_path.parent() { fs::create_dir_all(parent_dir)?; } - fs::write(&individual_semantic_yaml_path, yaml_string).context(format!("Failed to write new semantic model to {}", individual_semantic_yaml_path.display()))?; - new_models_added_count += 1; + let yaml_string = serde_yaml::to_string(&new_model)?; + fs::write(&individual_semantic_yaml_path, yaml_string)?; + models_generated_count += 1; } } } - // Remove or comment out the old logic for handling removed models from a single spec file - /* - let mut removed_models_count = 0; - existing_yaml_models_map.retain(|model_name: &String, _model: &mut YamlModel| { - if processed_dbt_model_names.contains(model_name) { - true - } else { - println!("Removing semantic model '{}' (no longer found in scoped dbt models or catalog.json)", model_name.red()); - removed_models_count += 1; - false - } - }); - */ - - // Remove the final save logic for the aggregated spec file - // let final_models_vec: Vec = existing_yaml_models_map.values().cloned().collect(); - // let updated_spec = YamlSemanticLayerSpec { models: final_models_vec }; - // let yaml_string = serde_yaml::to_string(&updated_spec).context("Failed to serialize updated semantic models to YAML")?; - // fs::write(&semantic_models_base_dir_path, yaml_string).context(format!("Failed to write updated semantic models to {}", semantic_models_base_dir_path.display()))?; - // Note: The above fs::write was to semantic_models_base_dir_path which is a directory, this was an error in previous diff. It should have been semantic_models_file_path. - // Since we save per file, this block is removed. - - println!("\n{}", "Semantic Model Generation Summary:".bold().green()); - println!(" Processed dbt models (in scope): {}", dbt_models_processed_count); - println!(" Semantic models initially loaded: {}", initial_model_count); - println!(" New semantic models added: {}", new_models_added_count.to_string().green()); + println!("\n{}", "Semantic Model Generation/Update Summary:".bold().green()); + println!(" SQL models processed that had a matching catalog entry: {}", sql_models_successfully_processed_from_catalog_count); + println!(" New semantic models generated: {}", models_generated_count.to_string().green()); println!(" Existing semantic models updated: {}", models_updated_count.to_string().cyan()); - println!(" Semantic models removed (dbt model deleted/out of scope): {}", columns_removed_count.to_string().red()); - println!(" Columns added: {}", columns_added_count.to_string().green()); - println!(" Columns updated (type/dbt_comment): {}", columns_updated_count.to_string().cyan()); - println!(" Columns removed: {}", columns_removed_count.to_string().red()); - - if is_side_by_side_generation { - println!("✓ Semantic models successfully updated (side-by-side with SQL models, base directory: {}).", effective_semantic_models_base_dir.display().to_string().green()); - } else { - println!("✓ Semantic models successfully updated in {}.", effective_semantic_models_base_dir.display().to_string().green()); - } + println!(" Columns added to existing models: {}", columns_added_count.to_string().green()); + println!(" Columns updated in existing models: {}", columns_updated_count.to_string().cyan()); + println!(" Columns removed from existing models: {}", columns_removed_count.to_string().red()); + // Consider adding a count for models found on disk but not in catalog / models in catalog but no matching SQL file + println!("✓ Semantic model generation/update complete."); Ok(()) } \ No newline at end of file diff --git a/cli/cli/src/commands/init.rs b/cli/cli/src/commands/init.rs index d1da9b6ee..7f41875ed 100644 --- a/cli/cli/src/commands/init.rs +++ b/cli/cli/src/commands/init.rs @@ -11,13 +11,13 @@ use query_engine::credentials::{ use regex::Regex; use serde::{Deserialize, Serialize}; use serde_yaml; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs; use std::path::{Path, PathBuf}; use std::time::Duration; // Import from dbt_utils for dbt catalog parsing -use dbt_utils::models::{DbtCatalog, DbtNode, DbtColumn, DbtNodeMetadata, DbtCatalogMetadata}; +use dbt_utils::models::{DbtCatalog, CatalogNode, ColumnMetadata, TableMetadata, CatalogMetadata}; use dbt_utils::{run_dbt_docs_generate, load_and_parse_catalog}; // Imports for Buster specific utilities and config @@ -84,21 +84,16 @@ pub fn is_false(val: &bool) -> bool { } // Helper function to determine if a SQL type should be a measure -pub fn is_measure_type(sql_type_opt: Option<&str>) -> bool { - match sql_type_opt { - Some(sql_type) => { - let lower_sql_type = sql_type.to_lowercase(); - lower_sql_type.contains("int") || - lower_sql_type.contains("numeric") || - lower_sql_type.contains("decimal") || - lower_sql_type.contains("real") || - lower_sql_type.contains("double") || - lower_sql_type.contains("float") || - lower_sql_type.contains("money") || - lower_sql_type.contains("number") - } - None => false, // If type is missing, default to not a measure (dimension) - } +pub fn is_measure_type(sql_type: &str) -> bool { + let lower_sql_type = sql_type.to_lowercase(); + lower_sql_type.contains("int") || + lower_sql_type.contains("numeric") || + lower_sql_type.contains("decimal") || + lower_sql_type.contains("real") || + lower_sql_type.contains("double") || + lower_sql_type.contains("float") || + lower_sql_type.contains("money") || + lower_sql_type.contains("number") } // Enum for Database Type selection (ensure only one definition, placed before use) @@ -638,303 +633,259 @@ async fn generate_semantic_models_from_dbt_catalog( _config_path: &Path, // Path to buster.yml buster_config_dir: &Path, // Directory containing buster.yml, assumed dbt project root ) -> Result<()> { - println!("{}", "Starting semantic model generation from dbt catalog...".dimmed()); + println!("{}", "Starting semantic model generation (file-first approach)...".dimmed()); - // Get the semantic model output configuration from the first project context - let project_semantic_model_paths_config = buster_config.projects.as_ref() - .and_then(|projs| projs.first()) - .and_then(|proj| proj.semantic_model_paths.as_ref()); - - let is_side_by_side_generation = project_semantic_model_paths_config.map_or(true, |paths| paths.is_empty()); - - let path_construction_base_dir: PathBuf; // Base directory for constructing output paths - - if is_side_by_side_generation { - path_construction_base_dir = buster_config_dir.to_path_buf(); // Project root is the base for side-by-side - println!("{}", format!("Semantic models will be generated side-by-side with SQL models (within '{}').", path_construction_base_dir.display()).dimmed()); - } else { - // A specific directory (or directories) was configured for semantic models. Use the first one. - let primary_path_str = project_semantic_model_paths_config.unwrap().first().unwrap(); // Safe due to map_or check - path_construction_base_dir = buster_config_dir.join(primary_path_str); - println!("{}", format!("Semantic models will be generated in/under: {}", path_construction_base_dir.display()).dimmed()); - // Ensure this specific output directory exists - fs::create_dir_all(&path_construction_base_dir).map_err(|e| { - anyhow!("Failed to create semantic models output directory '{}': {}", path_construction_base_dir.display(), e) - })?; - } - - // Get dbt model source roots (e.g., ["models", "my_other_models"]) - // These are paths relative to the dbt_project_path (buster_config_dir) - let dbt_project_content = parse_dbt_project_file_content(buster_config_dir)?; - let dbt_model_source_roots: Vec = dbt_project_content.as_ref() - .map(|content| content.model_paths.iter().map(PathBuf::from).collect()) - .unwrap_or_else(|| vec![PathBuf::from("models")]); // Default if not found - - // Get defaults from the primary project context for model properties - let primary_project_context = buster_config.projects.as_ref().and_then(|p| p.first()); - let default_data_source_name = primary_project_context - .and_then(|pc| pc.data_source_name.as_ref()); - let default_database = primary_project_context - .and_then(|pc| pc.database.as_ref()); - let default_schema = primary_project_context - .and_then(|pc| pc.schema.as_ref()); - - let dbt_project_path = buster_config_dir; - let catalog_json_path = dbt_project_path.join("target").join("catalog.json"); - - if Confirm::new("Can Buster run 'dbt docs generate' to get the latest schema (catalog.json)?") + // --- 1. Load Catalog & Build Lookup Map --- + let catalog_json_path = buster_config_dir.join("target").join("catalog.json"); + if Confirm::new("Run 'dbt docs generate' to refresh dbt catalog (catalog.json)?") .with_default(true) .prompt()? { - match run_dbt_docs_generate(dbt_project_path).await { - Ok(_) => { /* Success is logged by the helper function itself */ } - Err(e) => { - eprintln!("{}", format!("'dbt docs generate' (via dbt_utils) reported an error. Proceeding with existing catalog.json if available. Error: {}", e).yellow()); - } + match run_dbt_docs_generate(buster_config_dir).await { + Ok(_) => {} + Err(e) => eprintln!("{}", format!("'dbt docs generate' error. Proceeding with existing catalog if available. Error: {}", e).yellow()), } } else { - println!("{}", "Skipping 'dbt docs generate'. Will look for existing catalog.json.".dimmed()); + println!("{}", "Skipping 'dbt docs generate'. Using existing catalog.json if available.".dimmed()); } if !catalog_json_path.exists() { - eprintln!( - "{}", - format!("✗ catalog.json not found at {}.", catalog_json_path.display()).red() - ); - println!("Please ensure 'dbt docs generate' has been run successfully in your dbt project."); - println!("Skipping semantic model generation."); + eprintln!("{}", format!("✗ catalog.json not found at {}. Cannot generate models.", catalog_json_path.display()).red()); return Ok(()); } - - println!( - "{}", - format!( - "Attempting to load catalog.json from {}", - catalog_json_path.display() - ) - .dimmed() - ); + let dbt_catalog = match load_and_parse_catalog(&catalog_json_path) { Ok(catalog) => { - println!("{}", "✓ Successfully parsed catalog.json via dbt_utils.".green()); + println!("{}", "✓ Successfully parsed catalog.json.".green()); catalog } Err(e) => { - eprintln!("{}", format!("✗ Error loading/parsing catalog.json via dbt_utils: {}. Ensure catalog.json exists and is valid.", e).red()); + eprintln!("{}", format!("✗ Error loading/parsing catalog.json: {}. Cannot generate models.", e).red()); return Ok(()); } }; - // --- Model Scoping Logic --- - let mut configured_model_path_patterns: Vec = Vec::new(); - if let Some(projects) = &buster_config.projects { - for project_context in projects { - if let Some(model_paths) = &project_context.model_paths { - for path_str in model_paths { - // Construct absolute path for glob pattern relative to buster_config_dir - let path_to_glob = buster_config_dir.join(path_str); - // dbt model paths are often directories, so add a glob to match files within them. - // e.g., if path_str is "models", pattern becomes "/**/*.sql" - // If path_str already contains wildcards, use it as is more directly. - let pattern_str = if path_str.contains('*') || path_str.contains('?') || path_str.contains('[') { - path_to_glob.to_string_lossy().into_owned() - } else { - path_to_glob.join("**").join("*.sql").to_string_lossy().into_owned() - }; + // Build lookup map: Keyed by derived_model_name_from_file (which should be metadata.name) + // We filter for nodes that have a derived_model_name_from_file and are models. + let catalog_nodes_by_name: HashMap = dbt_catalog.nodes.values() + .filter_map(|node| { + if node.derived_resource_type.as_deref() == Some("model") { + node.derived_model_name_from_file.as_ref().map(|name| (name.clone(), node)) + } else { + None + } + }) + .collect(); + + if catalog_nodes_by_name.is_empty() { + println!("{}", "No models found in the dbt catalog after parsing. Nothing to generate.".yellow()); + return Ok(()); + } - match Pattern::new(&pattern_str) { - Ok(p) => configured_model_path_patterns.push(p), - Err(e) => eprintln!( - "{}", - format!( - "Warning: Invalid glob pattern '{}' from buster.yml model_paths: {}", - pattern_str, - e - ) - .yellow() - ), + // --- 2. Determine SQL Files to Process --- + let mut sql_files_to_process: HashSet = HashSet::new(); + let mut model_path_patterns_from_buster_yml: Vec = Vec::new(); + + if let Some(projects) = &buster_config.projects { + if let Some(first_project) = projects.first() { + if let Some(mp_globs) = &first_project.model_paths { + for path_str_glob in mp_globs { + match Pattern::new(&buster_config_dir.join(path_str_glob).to_string_lossy()) { + Ok(p) => model_path_patterns_from_buster_yml.push(p), + Err(e) => eprintln!("{}", format!("Warning: Invalid glob pattern '{}' from buster.yml: {}", path_str_glob, e).yellow()), } } } } } - if configured_model_path_patterns.is_empty() { - println!("{}", "No model_paths configured in buster.yml or patterns are invalid. Will process all models from catalog.json.".yellow()); + + if !model_path_patterns_from_buster_yml.is_empty() { + println!("{}", format!("Scanning for SQL files based on model_paths patterns in buster.yml: {:?}", + model_path_patterns_from_buster_yml.iter().map(|p| p.as_str()).collect::>() ).dimmed()); + for pattern in model_path_patterns_from_buster_yml { + // Ripgrep or glob to find files matching the pattern string itself + // This simple glob might need enhancement for more complex patterns handled by buster_config.model_paths + // For now, assuming model_paths are like "models/marts/**/*.sql" + match glob::glob(pattern.as_str()) { + Ok(paths) => { + for entry in paths { + match entry { + Ok(path) => if path.is_file() && path.extension().map_or(false, |ext| ext == "sql") { + sql_files_to_process.insert(path); + } + Err(e) => eprintln!("{}", format!("Error processing glob path: {}", e).yellow()), + } + } + } + Err(e) => eprintln!("{}", format!("Glob pattern error for '{}': {}", pattern.as_str(), e).yellow()), + } + } + } else { + println!("{}", "No model_paths in buster.yml. Using dbt_project.yml model-paths to find SQL files.".dimmed()); + let dbt_project_content = parse_dbt_project_file_content(buster_config_dir)?; + let dbt_model_source_roots = dbt_project_content.as_ref() + .map(|content| content.model_paths.iter().map(PathBuf::from).collect::>()) + .unwrap_or_else(|| vec![PathBuf::from("models")]); + + for dbt_source_root_rel in dbt_model_source_roots { + let dbt_source_root_abs = buster_config_dir.join(dbt_source_root_rel); + let glob_pattern = dbt_source_root_abs.join("**/*.sql"); + match glob::glob(&glob_pattern.to_string_lossy()) { + Ok(paths) => { + for entry in paths { + match entry { + Ok(path) => if path.is_file() { + sql_files_to_process.insert(path); + } + Err(e) => eprintln!("{}", format!("Error processing glob path from dbt_project.yml: {}", e).yellow()), + } + } + } + Err(e) => eprintln!("{}", format!("Glob pattern error for dbt_project.yml path '{}': {}", glob_pattern.display(), e).yellow()), + } + } } - // --- End Model Scoping Logic --- + if sql_files_to_process.is_empty() { + println!("{}", "No SQL model files found based on configuration. Nothing to generate.".yellow()); + return Ok(()); + } + println!("{}", format!("Found {} SQL model file(s) to process.", sql_files_to_process.len()).dimmed()); + + // --- 3. Determine Output Configuration (Side-by-side or Dedicated Dir) --- + let project_semantic_model_paths_config = buster_config.projects.as_ref() + .and_then(|projs| projs.first()) + .and_then(|proj| proj.semantic_model_paths.as_ref()); + let is_side_by_side_generation = project_semantic_model_paths_config.map_or(true, |paths| paths.is_empty()); + let primary_dedicated_output_dir: Option = if is_side_by_side_generation { None } + else { + project_semantic_model_paths_config.and_then(|paths| paths.first()).map(|p_str| buster_config_dir.join(p_str)) + }; + + if is_side_by_side_generation { + println!("{}", "Semantic models will be generated side-by-side with their SQL counterparts.".dimmed()); + } else if let Some(ref out_dir) = primary_dedicated_output_dir { + println!("{}", format!("Semantic models will be generated in/under: {}", out_dir.display()).dimmed()); + fs::create_dir_all(out_dir).map_err(|e| anyhow!("Failed to create semantic models output dir '{}': {}", out_dir.display(), e))?; + } else { + // This case (not side-by-side but no primary_dedicated_output_dir) should ideally not happen if config is valid. + // Defaulting to side-by-side for safety, though this indicates a potential config issue handled earlier in init. + println!("{}", "Warning: Semantic model output directory not clearly configured, defaulting to side-by-side generation logic.".yellow()); + } + + // --- 4. Iterate Through SQL Files & Generate YamlModels --- let mut yaml_models_generated_count = 0; + let default_data_source_name = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.data_source_name.as_ref()); + let default_database = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.database.as_ref()); + let default_schema = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.schema.as_ref()); - for (_node_id, node) in dbt_catalog.nodes.iter().filter(|(_id, n)| { - match &n.resource_type { - Some(rt) => rt == "model", - None => { - eprintln!( - "{}", - format!( - "Warning: Skipping dbt node with unique_id: {} because it is missing 'resource_type' in catalog.json.", - n.unique_id - ).yellow() - ); - false - } - } - }) { - let Some(ref original_file_path_str) = node.original_file_path else { - eprintln!( - "{}", - format!( - "Warning: Skipping dbt model unique_id: {} because it is missing 'original_file_path' in catalog.json.", - node.unique_id - ).yellow() - ); - continue; - }; - - // Ensure metadata and metadata.name exist, as it's crucial for the semantic model name - let Some(ref node_metadata) = node.metadata else { - eprintln!( - "{}", - format!( - "Warning: Skipping dbt model with unique_id: {} because its 'metadata' block is missing in catalog.json.", - node.unique_id - ).yellow() - ); - continue; - }; - let Some(ref actual_model_name_from_metadata) = node_metadata.name else { - eprintln!( - "{}", - format!( - "Warning: Skipping dbt model with unique_id: {} because its 'metadata.name' is missing in catalog.json.", - node.unique_id - ).yellow() - ); - continue; - }; - let actual_model_name = actual_model_name_from_metadata.clone(); - - let original_file_path_abs = buster_config_dir.join(original_file_path_str); - - let in_scope = if configured_model_path_patterns.is_empty() { - true // If no patterns, assume all models are in scope - } else { - configured_model_path_patterns - .iter() - .any(|pattern| pattern.matches_path(&original_file_path_abs)) - }; - - if !in_scope { - // Only log if verbose or similar, this can be noisy - // println!("Skipping dbt model (not in configured model_paths): {}", node.unique_id.dimmed()); - continue; - } - - println!("Processing dbt model for semantic layer: {}: {}", node.unique_id.cyan(), actual_model_name.cyan()); - - let mut dimensions: Vec = Vec::new(); - let mut measures: Vec = Vec::new(); - - for (_col_name, col) in &node.columns { // node.columns is HashMap, defaults to empty if missing - if is_measure_type(Some(col.column_type.as_str())) { // Assuming col.column_type is String here based on linter - measures.push(YamlMeasure { - name: col.name.clone(), - description: col.comment.clone(), - type_: Some(col.column_type.clone()), // Wrap in Some() - }); - } else { - dimensions.push(YamlDimension { - name: col.name.clone(), - description: col.comment.clone(), - type_: Some(col.column_type.clone()), // Wrap in Some() - searchable: false, - options: None, - }); - } - } - - let yaml_model = YamlModel { - name: actual_model_name.clone(), - description: node_metadata.comment.clone(), // Access comment via node_metadata ref - data_source_name: default_data_source_name.cloned(), - database: node.database.clone().or_else(|| default_database.cloned()), // node.database is Option - schema: node.schema.clone().or_else(|| default_schema.cloned()), // node.schema is Option - dimensions, - measures, - original_file_path: Some(original_file_path_str.clone()), - }; - - // Determine the output path for this individual YAML model - let dbt_model_path = Path::new(original_file_path_str); - let mut stripped_model_path_suffix = PathBuf::new(); // e.g. "marts/sales/revenue.sql" if original is "models/marts/sales/revenue.sql" - let mut found_base_for_stripping = false; - - for dbt_source_root in &dbt_model_source_roots { // dbt_source_root is like "models" - if let Ok(stripped_path) = dbt_model_path.strip_prefix(dbt_source_root) { - stripped_model_path_suffix = stripped_path.to_path_buf(); - found_base_for_stripping = true; - break; - } - } - - if !found_base_for_stripping { - // Fallback: if original_file_path_str didn't start with any known dbt_model_source_roots, - // (e.g. original_file_path_str is "marts/revenue.sql" and source_root is "models") - // then use original_file_path_str as is for the suffix part. - // This can happen if dbt_model_source_roots are not exhaustive or path is weird. - // The resulting YAML structure will still be relative to path_construction_base_dir. - stripped_model_path_suffix = dbt_model_path.to_path_buf(); - eprintln!("{}", format!( - "Warning: Could not strip a known dbt model source root ('{:?}') from dbt model path '{}'. Using full path for suffix: '{}'", - dbt_model_source_roots, original_file_path_str, stripped_model_path_suffix.display() - ).yellow() - ); - } - - let output_yaml_path: PathBuf; - if is_side_by_side_generation { - // For side-by-side, output is next to the SQL file. - // original_file_path_str is relative to buster_config_dir (e.g., "models/marts/sales/revenue.sql") - // buster_config_dir is the dbt project root. - output_yaml_path = buster_config_dir.join(original_file_path_str).with_extension("yml"); - } else { - // For dedicated output directory: - // path_construction_base_dir is the dedicated dir (e.g., "/path/to/project/buster_yamls") - // stripped_model_path_suffix is the path part after dbt source root (e.g., "marts/sales/revenue.sql") - let yaml_filename_with_subdir = stripped_model_path_suffix.with_extension("yml"); // e.g., "marts/sales/revenue.yml" - output_yaml_path = path_construction_base_dir.join(yaml_filename_with_subdir); - } - - if let Some(parent_dir) = output_yaml_path.parent() { - fs::create_dir_all(parent_dir).map_err(|e| { - anyhow!("Failed to create directory for semantic model YAML '{}': {}", parent_dir.display(), e) - })?; - } - - let yaml_string = serde_yaml::to_string(&yaml_model) - .map_err(|e| anyhow!("Failed to serialize semantic model '{}' to YAML: {}", yaml_model.name, e))?; - fs::write(&output_yaml_path, yaml_string) - .map_err(|e| anyhow!("Failed to write semantic model YAML for '{}' to '{}': {}", yaml_model.name, output_yaml_path.display(), e))?; - - println!( - "{} Generated semantic model: {}", - "✓".green(), - output_yaml_path.display().to_string().cyan() + for sql_file_abs_path in sql_files_to_process { + let model_name_from_filename = sql_file_abs_path.file_stem().map_or_else( + || "".to_string(), + |stem| stem.to_string_lossy().into_owned() ); - yaml_models_generated_count += 1; + + if model_name_from_filename.is_empty() { + eprintln!("{}", format!("Warning: Could not determine model name from file: {}. Skipping.", sql_file_abs_path.display()).yellow()); + continue; + } + + match catalog_nodes_by_name.get(&model_name_from_filename) { + Some(catalog_node) => { + let Some(ref node_metadata_opt) = catalog_node.metadata else { + eprintln!("{}", format!("Warning: Skipping model '{}' (from file {}): Missing metadata in catalog entry.", model_name_from_filename, sql_file_abs_path.display()).yellow()); + continue; + }; + let node_metadata = node_metadata_opt; // Shadow to non-Option for easier access, already checked Some + // actual_model_name for YamlModel comes from catalog metadata.name + let actual_semantic_model_name = node_metadata.name.clone(); + + println!("Processing SQL model: {} (Catalog Name: {}, UniqueID: {})", + sql_file_abs_path.display().to_string().cyan(), + actual_semantic_model_name.purple(), + catalog_node.unique_id.as_deref().unwrap_or("N/A").dimmed() + ); + + let mut dimensions: Vec = Vec::new(); + let mut measures: Vec = Vec::new(); + + for (_col_name, col_meta) in &catalog_node.columns { // col_meta is &ColumnMetadata + if is_measure_type(&col_meta.type_) { // Pass &String, is_measure_type takes &str + measures.push(YamlMeasure { + name: col_meta.name.clone(), + description: col_meta.comment.clone(), + type_: Some(col_meta.type_.clone()), + }); + } else { + dimensions.push(YamlDimension { + name: col_meta.name.clone(), + description: col_meta.comment.clone(), + type_: Some(col_meta.type_.clone()), + searchable: false, + options: None, + }); + } + } + + let relative_sql_file_path_str = pathdiff::diff_paths(&sql_file_abs_path, buster_config_dir) + .map(|p| p.to_string_lossy().into_owned()) + .unwrap_or_else(|| sql_file_abs_path.to_string_lossy().into_owned()); + + let yaml_model = YamlModel { + name: actual_semantic_model_name, // Use name from catalog metadata + description: node_metadata.comment.clone(), + data_source_name: default_data_source_name.cloned(), + database: node_metadata.database.clone().or_else(|| default_database.cloned()), + schema: Some(node_metadata.schema.clone()), // schema from TableMetadata is String, wrap in Some() + dimensions, + measures, + original_file_path: Some(relative_sql_file_path_str.clone()), + }; + + // Determine output path + let output_yaml_path: PathBuf; + if is_side_by_side_generation { + output_yaml_path = sql_file_abs_path.with_extension("yml"); + } else if let Some(ref dedicated_dir) = primary_dedicated_output_dir { + // Need to reconstruct subpath relative to a dbt model root (e.g. "models/") + let dbt_model_source_roots_for_stripping = parse_dbt_project_file_content(buster_config_dir)?.as_ref() + .map(|c| c.model_paths.iter().map(PathBuf::from).collect::>()) + .unwrap_or_else(|| vec![PathBuf::from("models")]); + + let mut stripped_suffix_for_yaml: Option = None; + for dbt_root in &dbt_model_source_roots_for_stripping { + let abs_dbt_root = buster_config_dir.join(dbt_root); + if let Ok(stripped) = sql_file_abs_path.strip_prefix(&abs_dbt_root) { + stripped_suffix_for_yaml = Some(stripped.with_extension("yml")); + break; + } + } + let final_suffix = stripped_suffix_for_yaml.unwrap_or_else(|| + PathBuf::from(&model_name_from_filename).with_extension("yml") + ); + output_yaml_path = dedicated_dir.join(final_suffix); + } else { // Should not be reached due to earlier checks, but for safety: + output_yaml_path = sql_file_abs_path.with_extension("yml"); + } + + if let Some(parent_dir) = output_yaml_path.parent() { + fs::create_dir_all(parent_dir).map_err(|e| anyhow!("Failed to create dir '{}': {}", parent_dir.display(), e))?; + } + let yaml_string = serde_yaml::to_string(&yaml_model)?; + fs::write(&output_yaml_path, yaml_string)?; + println!("{} Generated semantic model: {}", "✓".green(), output_yaml_path.display().to_string().cyan()); + yaml_models_generated_count += 1; + } + None => { + eprintln!("{}", format!("Warning: SQL model file '{}' (model name: '{}') found, but no corresponding entry in dbt catalog. Skipping.", sql_file_abs_path.display(), model_name_from_filename).yellow()); + } + } } if yaml_models_generated_count == 0 { - println!( - "{}", - "No dbt models found matching configured paths in catalog.json, or no models in catalog. No semantic model YAML files generated." - .yellow() - ); + println!("{}", "No semantic model YAML files were generated.".yellow()); } else { - println!( - "{}", - format!("Successfully generated {} semantic model YAML file(s).", yaml_models_generated_count).bold().green() - ); + println!("{}", format!("Successfully generated {} semantic model YAML file(s).", yaml_models_generated_count).bold().green()); } Ok(()) diff --git a/cli/cli/src/commands/mod.rs b/cli/cli/src/commands/mod.rs index a0d499b99..99c423026 100644 --- a/cli/cli/src/commands/mod.rs +++ b/cli/cli/src/commands/mod.rs @@ -1,9 +1,10 @@ pub mod auth; pub mod deploy; -pub mod init; -pub mod update; pub mod generate; +pub mod init; +pub mod parse; pub mod run; +pub mod update; pub use auth::auth_with_args; pub use init::init; diff --git a/cli/cli/src/commands/parse.rs b/cli/cli/src/commands/parse.rs new file mode 100644 index 000000000..d176b6a5e --- /dev/null +++ b/cli/cli/src/commands/parse.rs @@ -0,0 +1,289 @@ +use anyhow::{Result, anyhow}; +use std::path::{Path, PathBuf}; +use colored::*; + +use crate::utils::{ + config::{BusterConfig, ProjectContext}, + find_yml_files, ExclusionManager, + // Assuming ProgressTracker might be useful for logging, similar to deploy + // If not, it can be removed. + ProgressTracker, +}; +use crate::commands::deploy::deploy::{parse_model_file, resolve_model_configurations}; +use semantic_layer::models::Model; + +// A simple progress tracker for the parse command +#[derive(Debug, Default)] +struct ParseProgress { + total_files: usize, + processed_files: usize, + excluded_files: usize, + current_file: String, + errors: Vec<(String, String, Vec)>, // file, model_name, errors + successes: Vec<(String, String)>, // file, model_name +} + +impl ParseProgress { + fn new() -> Self { + Default::default() + } + + fn log_status(&self) { + if !self.current_file.is_empty() { + println!("Processing [{} / {}]: {}", self.processed_files, self.total_files, self.current_file); + } + } + + fn log_summary(&self) { + println!("\n--- Parse Summary ---"); + println!("Total files processed: {}", self.processed_files); + println!("Total files excluded: {}", self.excluded_files); + println!("Models successfully parsed & validated: {}", self.successes.len()); + println!("Models with errors: {}", self.errors.len()); + + if !self.successes.is_empty() { + println!("\nSuccessfully parsed models:"); + for (file, model_name) in &self.successes { + println!(" - {} (in file: {})", model_name.green(), file.dimmed()); + } + } + + if !self.errors.is_empty() { + println!("\nModels with errors:"); + for (file, model_name, errors) in &self.errors { + println!(" - {} (in file: {}):", model_name.red(), file.dimmed()); + for error in errors { + println!(" - {}", error); + } + } + } + println!("---------------------"); + } +} + +impl ProgressTracker for ParseProgress { + fn log_excluded_file(&mut self, path: &str, pattern: &str) { + self.excluded_files += 1; + println!("Excluding file: {} (matched pattern: {})", path, pattern); + } + + fn log_excluded_tag(&mut self, path: &str, tag: &str) { + self.excluded_files += 1; + println!( + "Excluding file: {} (matched excluded tag: {})", + path, + tag + ); + } +} + +pub async fn parse_models_command(path_arg: Option) -> Result<()> { + let current_dir = std::env::current_dir()?; + let buster_config_load_dir = path_arg.as_ref().map(PathBuf::from).unwrap_or_else(|| current_dir.clone()); + + let mut progress = ParseProgress::new(); + + println!("Looking for buster.yml configuration..."); + let buster_config = match BusterConfig::load_from_dir(&buster_config_load_dir) { + Ok(Some(cfg)) => { + println!("Found buster.yml configuration at {}", buster_config_load_dir.join("buster.yml").display()); + Some(cfg) + } + Ok(None) => { + println!("No buster.yml found in {}, will parse files directly or use defaults.", buster_config_load_dir.display()); + None + } + Err(e) => { + println!("Warning: Error reading buster.yml: {}. Proceeding without it.", e); + None + } + }; + + let effective_buster_config_dir = BusterConfig::base_dir(&buster_config_load_dir.join("buster.yml")).unwrap_or(buster_config_load_dir.clone()); + + let exclusion_manager = if let Some(cfg) = &buster_config { + ExclusionManager::new(cfg)? + } else { + ExclusionManager::empty() + }; + + // Determine search paths + let mut files_to_parse_with_context: Vec<(PathBuf, Option<&ProjectContext>)> = Vec::new(); + + if let Some(p_str) = &path_arg { + // If a specific path is given, use it directly. + // It could be a file or a directory. + let specific_path = effective_buster_config_dir.join(p_str); + println!("Processing specified path: {}", specific_path.display()); + if specific_path.is_dir() { + match find_yml_files(&specific_path, true, &exclusion_manager, Some(&mut progress)) { // Assuming recursive + Ok(files_in_dir) => { + for f in files_in_dir { + // For direct path, we don't have a specific project context from buster.yml easily + files_to_parse_with_context.push((f, None)); + } + }, + Err(e) => eprintln!("Error finding YML files in {}: {}", specific_path.display(), format!("{}", e).red()), + } + } else if specific_path.is_file() && specific_path.extension().and_then(|ext| ext.to_str()) == Some("yml") { + if specific_path.file_name().and_then(|name| name.to_str()) != Some("buster.yml") { + files_to_parse_with_context.push((specific_path, None)); + } + } else if !specific_path.exists() { + return Err(anyhow!("Specified path does not exist: {}", specific_path.display())); + } else { + return Err(anyhow!("Specified path is not a valid .yml file or directory: {}", specific_path.display())); + } + } else { + // No specific path, use buster_config (if available) or current directory + if let Some(cfg) = &buster_config { + let effective_paths_with_contexts = cfg.resolve_effective_model_paths(&effective_buster_config_dir); + if !effective_paths_with_contexts.is_empty() { + println!("Using effective model paths from buster.yml:"); + for (path, project_ctx_opt) in effective_paths_with_contexts { + let context_identifier = project_ctx_opt.map_or_else(|| "Global/Default".to_string(), |ctx| ctx.identifier()); + println!(" - Path: {}, Context: {}", path.display(), context_identifier.dimmed()); + if path.is_dir() { + match find_yml_files(&path, true, &exclusion_manager, Some(&mut progress)) { // Assuming recursive + Ok(files_in_dir) => { + for f in files_in_dir { + files_to_parse_with_context.push((f, project_ctx_opt)); + } + }, + Err(e) => eprintln!("Error finding YML files in {}: {}", path.display(), format!("{}", e).red()), + } + } else if path.is_file() && path.extension().and_then(|ext| ext.to_str()) == Some("yml") { + if path.file_name().and_then(|name| name.to_str()) != Some("buster.yml") { + files_to_parse_with_context.push((path.clone(), project_ctx_opt)); + } + } + } + } else { + println!("No model_paths specified in buster.yml, scanning current directory: {}", effective_buster_config_dir.display()); + match find_yml_files(&effective_buster_config_dir, true, &exclusion_manager, Some(&mut progress)) { + Ok(files_in_dir) => { + for f in files_in_dir { + files_to_parse_with_context.push((f, None)); // No specific project context for CWD scan unless we enhance this + } + }, + Err(e) => eprintln!("Error finding YML files in {}: {}", effective_buster_config_dir.display(), format!("{}", e).red()), + } + } + } else { + // No buster.yml and no path_arg, scan current directory. + println!("No buster.yml found and no specific path provided. Scanning current directory: {}", effective_buster_config_dir.display()); + match find_yml_files(&effective_buster_config_dir, true, &exclusion_manager, Some(&mut progress)) { + Ok(files_in_dir) => { + for f in files_in_dir { + files_to_parse_with_context.push((f, None)); + } + }, + Err(e) => eprintln!("Error finding YML files in {}: {}", effective_buster_config_dir.display(), format!("{}", e).red()), + } + } + } + + progress.total_files = files_to_parse_with_context.len(); + println!("Found {} model .yml file(s) to parse.", progress.total_files); + + if files_to_parse_with_context.is_empty() { + println!("No model files found to parse."); + progress.log_summary(); + return Ok(()); + } + + let default_cfg_storage; + let global_config_for_resolution = match buster_config.as_ref() { + Some(cfg) => cfg, + None => { + default_cfg_storage = BusterConfig::default(); + &default_cfg_storage + } + }; + + for (yml_path, project_ctx_opt) in files_to_parse_with_context { + progress.processed_files += 1; + progress.current_file = yml_path.strip_prefix(&effective_buster_config_dir).unwrap_or(&yml_path).to_string_lossy().into_owned(); + progress.log_status(); + + let parsed_models_result = parse_model_file(&yml_path); + + match parsed_models_result { + Ok(parsed_models) => { + let models_with_context: Vec<(Model, Option<&ProjectContext>)> = parsed_models.into_iter() + .map(|m| (m, project_ctx_opt)) + .collect(); + + match resolve_model_configurations(models_with_context, global_config_for_resolution) { + Ok(resolved_models) => { + if resolved_models.is_empty() && !yml_path.to_string_lossy().contains("empty_test") { // Guard against empty files unless for specific tests + println!("Warning: No models found in file: {}", yml_path.display()); + // Potentially add to errors if this is unexpected + } + for model in resolved_models { + // Basic validation: model name should not be empty + if model.name.is_empty() { + progress.errors.push(( + progress.current_file.clone(), + "".to_string(), + vec!["Model name is empty.".to_string()], + )); + continue; + } + // Further validation could be added here, e.g., checking for data_source_name and schema after resolution + if model.data_source_name.is_none() { + progress.errors.push(( + progress.current_file.clone(), + model.name.clone(), + vec!["data_source_name could not be resolved.".to_string()], + )); + } + if model.schema.is_none() { + progress.errors.push(( + progress.current_file.clone(), + model.name.clone(), + vec!["schema could not be resolved.".to_string()], + )); + } + + // If previous checks created errors for this model, don't mark as success. + // Check if current_file and model.name combination is already in errors. + let is_error = progress.errors.iter().any(|(f, m, _)| f == &progress.current_file && m == &model.name); + if !is_error { + println!(" Successfully parsed and resolved model: {}", model.name.green()); + progress.successes.push((progress.current_file.clone(), model.name.clone())); + } + } + } + Err(e) => { + println!(" Error resolving configurations for models in {}: {}", yml_path.display(), e.to_string().red()); + // Attempt to identify model names if possible, otherwise use file name + // This part is tricky as parsing might have succeeded but resolution failed for all. + // For now, associating error with the file. + progress.errors.push(( + progress.current_file.clone(), + format!("File-level resolution error"), + vec![e.to_string()] + )); + } + } + } + Err(e) => { + println!(" Error parsing model file {}: {}", yml_path.display(), e.to_string().red()); + progress.errors.push(( + progress.current_file.clone(), + "".to_string(), + vec![e.to_string()], + )); + } + } + } + + progress.log_summary(); + + if !progress.errors.is_empty() { + return Err(anyhow!("Found errors during parsing and validation. Please check the output above.")); + } + + Ok(()) +} \ No newline at end of file diff --git a/cli/cli/src/main.rs b/cli/cli/src/main.rs index 3446e0c1b..9a0bdaa1f 100644 --- a/cli/cli/src/main.rs +++ b/cli/cli/src/main.rs @@ -73,6 +73,13 @@ pub enum Commands { // output-file as a more descriptive name for the arg target_semantic_file: Option, }, + /// Parse and validate semantic model YAML definitions + Parse { + /// Optional path to a specific model .yml file or a directory of models to process. + /// If not provided, processes models based on 'model_paths' in buster.yml or CWD. + #[arg(long)] + path: Option, + }, Start, Stop, } @@ -130,6 +137,7 @@ async fn main() { path, target_semantic_file, } => commands::generate::generate_semantic_models_command(path, target_semantic_file).await, + Commands::Parse { path } => commands::parse::parse_models_command(path).await, Commands::Start => run::start().await.map_err(anyhow::Error::from), Commands::Stop => run::stop().await.map_err(anyhow::Error::from), }; diff --git a/cli/libs/dbt_utils/src/lib.rs b/cli/libs/dbt_utils/src/lib.rs index dc63cfcf5..9202418a9 100644 --- a/cli/libs/dbt_utils/src/lib.rs +++ b/cli/libs/dbt_utils/src/lib.rs @@ -7,7 +7,7 @@ use std::process::Command as StdCommand; use std::time::Duration; pub mod models; -use models::DbtCatalog; +use models::{CatalogNode, DbtCatalog}; /// Runs the `dbt docs generate` command for the specified dbt project path. pub async fn run_dbt_docs_generate(dbt_project_path: &Path) -> Result<()> { @@ -63,6 +63,7 @@ pub async fn run_dbt_docs_generate(dbt_project_path: &Path) -> Result<()> { } /// Loads and parses the dbt `catalog.json` file from the given path. +/// Also performs post-processing to derive convenience fields on CatalogNode. pub fn load_and_parse_catalog(catalog_json_path: &Path) -> Result { println!( "{}", @@ -82,30 +83,50 @@ pub fn load_and_parse_catalog(catalog_json_path: &Path) -> Result { // Log the detailed serde error eprintln!("Detailed parsing error for {}: {:#?}", catalog_json_path.display(), e); anyhow!( - "Failed to parse catalog.json from {}. Error: {}. Ensure the file content is valid and matches the expected dbt catalog structure.", + "Failed to parse catalog.json from {}. Error: {}. Ensure it matches expected dbt catalog structure (v1 tested).", catalog_json_path.display(), e // e.to_string() will give a concise error message from serde ) })?; - // Post-process nodes to derive resource_type if missing - for node in catalog.nodes.values_mut() { - if node.resource_type.is_none() { - let parts: Vec<&str> = node.unique_id.splitn(2, '.').collect(); + // Post-process nodes to derive resource_type and a display name + for (_node_key, node) in catalog.nodes.iter_mut() { // node_key is the unique_id string from the JSON map key + if let Some(ref unique_id_val) = node.unique_id { // unique_id is Option + let parts: Vec<&str> = unique_id_val.splitn(2, '.').collect(); if !parts.is_empty() { let potential_type = parts[0]; if ["model", "source", "seed", "snapshot", "test"].contains(&potential_type) { - node.resource_type = Some(potential_type.to_string()); + node.derived_resource_type = Some(potential_type.to_string()); } } - } - - if node.name.is_none() { - // Try to derive node.name from the last part of unique_id - // e.g., model.my_package.my_model_name -> my_model_name - if let Some(last_part) = node.unique_id.split('.').last() { + // Derive a node name (often the last part of unique_id, or from metadata.name) + if let Some(metadata) = &node.metadata { + // metadata.name is String, so it should exist if metadata block exists. + node.derived_model_name_from_file = Some(metadata.name.clone()); + } else if let Some(last_part) = unique_id_val.split('.').last() { if !last_part.is_empty() { - node.name = Some(last_part.to_string()); + // Fallback to last part of unique_id if metadata.name is somehow not accessible + node.derived_model_name_from_file = Some(last_part.to_string()); + } + } + } else { + // If unique_id itself is None, we can't do much derivation from it. + // We could try to use the node_key (which *is* the unique_id from the JSON structure) + // but node.unique_id inside the struct being None is strange for a valid catalog. + } + } + // Similar post-processing for sources if needed + for (_source_key, source_node) in catalog.sources.iter_mut() { + if let Some(ref unique_id_val) = source_node.unique_id { + let parts: Vec<&str> = unique_id_val.splitn(2, '.').collect(); + if !parts.is_empty() && parts[0] == "source" { + source_node.derived_resource_type = Some("source".to_string()); + } + if let Some(metadata) = &source_node.metadata { + source_node.derived_model_name_from_file = Some(metadata.name.clone()); + } else if let Some(last_part) = unique_id_val.split('.').last() { + if !last_part.is_empty() { + source_node.derived_model_name_from_file = Some(last_part.to_string()); } } } diff --git a/cli/libs/dbt_utils/src/models.rs b/cli/libs/dbt_utils/src/models.rs index bce7d6db7..9e5010ea6 100644 --- a/cli/libs/dbt_utils/src/models.rs +++ b/cli/libs/dbt_utils/src/models.rs @@ -1,16 +1,21 @@ use serde::Deserialize; use std::collections::HashMap; -// Struct definitions for parsing dbt's catalog.json. +// Struct definitions for parsing dbt's catalog.json (v1 schema) -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Clone, Default)] pub struct DbtCatalog { + // metadata is required by schema, but use Option + default for robustness if block is missing #[serde(default)] - pub metadata: Option, + pub metadata: Option, + #[serde(default)] // nodes map is required, default handles if key is missing (empty map) + pub nodes: HashMap, + #[serde(default)] // sources map is required + pub sources: HashMap, #[serde(default)] - pub nodes: HashMap, - #[serde(default)] - pub sources: Option>, + pub errors: Option>, // errors: string[] | null + + // --- Fields kept for potential compatibility or future use, not strictly in v1 catalog properties like nodes/sources/metadata --- #[serde(default, skip_serializing_if = "Option::is_none")] pub macros: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -20,109 +25,87 @@ pub struct DbtCatalog { #[serde(default, skip_serializing_if = "Option::is_none")] pub selectors: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] - pub disabled: Option>>, // dbt-core uses Vec here + pub disabled: Option>>, #[serde(default, skip_serializing_if = "Option::is_none")] pub parent_map: Option>>, #[serde(default, skip_serializing_if = "Option::is_none")] pub child_map: Option>>, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub errors: Option, // Can be null or an object with error details } -#[derive(Debug, Deserialize, Clone)] -pub struct DbtCatalogMetadata { - #[serde(rename = "dbt_schema_version", default)] - pub dbt_schema_version: Option, - #[allow(dead_code)] // If not used directly by Buster, but good for complete parsing - pub dbt_version: Option, - #[allow(dead_code)] - pub generated_at: Option, - #[allow(dead_code)] - pub invocation_id: Option, +#[derive(Debug, Deserialize, Clone, Default)] +pub struct CatalogMetadata { // Was DbtCatalogMetadata; matches schema.metadata + #[serde(default)] // Though schema implies required, default if block is imperfect + pub dbt_schema_version: String, + #[serde(default)] + pub dbt_version: Option, // Defaulted in schema "1.10.0a1" but can be other string + #[serde(default)] + pub generated_at: Option, // Is string in schema, but make Option for safety + #[serde(default)] + pub invocation_id: Option, // string | null + #[serde(default)] + pub invocation_started_at: Option, // string | null (from schema) + #[serde(default)] + pub env: HashMap, // From schema } -#[derive(Debug, Deserialize, Clone)] -pub struct DbtNode { - // Ensure metadata is present, matches example which has it implicitly via direct fields - // For the example catalog's node structure, we might need to flatten some metadata fields - // or expect them directly if `metadata` as a block is not always there. - // However, standard dbt catalog.json *does* have a metadata block within each node. - // The example provided might be a slight simplification or custom representation. - // Assuming standard catalog structure for now, where DbtNodeMetadata is a separate struct. +// Represents a "CatalogTable" from the dbt schema (for nodes and sources) +#[derive(Debug, Deserialize, Clone, Default)] +pub struct CatalogNode { // Was DbtNode; represents schema.nodes. + // metadata, columns, stats are required per schema for a CatalogTable + // Using Option + default for robustness if a catalog is malformed, + // but downstream code will need to handle None for these. #[serde(default)] - pub metadata: Option, + pub metadata: Option, #[serde(default)] - pub columns: HashMap, - #[serde(rename = "resource_type")] // if resource_type is not directly in JSON, this helps map if some other key exists - // if type is the key in JSON for resource_type, then it should be: - // #[serde(alias = "type")] // or handle it in DbtNodeMetadata if type is part of metadata - #[serde(default)] // Make it optional and handle missing field - pub resource_type: Option, // This refers to model, seed, snapshot, test etc. - pub unique_id: String, - #[serde(default)] // original_file_path might not be present for all node types - pub original_file_path: Option, - pub database: Option, - pub schema: Option, - #[serde(default)] // Make name optional - pub name: Option, // This is often the filename or alias. metadata.name is relation name. - pub comment: Option, // Comment can be directly on the node for some versions/types - pub stats: Option, // To capture general stats blocks + pub columns: HashMap, + #[serde(default)] + pub stats: HashMap, + #[serde(default)] + pub unique_id: Option, // string | null + + // --- Fields to be populated by post-processing in lib.rs --- + // These are not directly from catalog.json node structure + #[serde(skip_deserializing, skip_serializing_if = "Option::is_none")] + pub derived_resource_type: Option, + #[serde(skip_deserializing, skip_serializing_if = "Option::is_none")] + pub derived_model_name_from_file: Option, // Name derived from SQL filename } -#[derive(Debug, Deserialize, Clone)] -pub struct DbtNodeMetadata { - // Standard dbt catalog.json has `name` here as the relation name. - #[serde(default)] // Make name optional - pub name: Option, - #[serde(rename = "type")] // This 'type' inside metadata usually refers to the materialization (table, view, etc.) for models - pub relation_type: Option, - pub schema: Option, // schema can also be here - pub database: Option, // database can also be here - pub comment: Option, // comment for the model/node itself - #[allow(dead_code)] - pub owner: Option, - // Add other potential metadata fields if necessary, e.g., tags, config, etc. - #[serde(default)] - pub tags: Vec, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct DbtSource { - #[serde(default)] - pub name: Option, // This is the source's table name - pub unique_id: String, - #[serde(default)] - pub database: Option, - #[serde(default)] - pub schema: Option, - #[serde(default, alias = "resource_type")] // Sources have "source" as resource_type, or a specific table type. - pub table_type: Option, // e.g. "table", often not explicitly a 'type' field in catalog for sources, but implied. - #[serde(default)] - pub columns: HashMap, - #[serde(default)] - pub comment: Option, - pub stats: Option, - // Sources can also have a 'meta' field, 'tags', 'description', 'loader', 'freshness' etc. - #[serde(default)] - pub description: Option, // description is preferred over comment for sources usually - #[serde(default)] - pub meta: Option>, - #[serde(default)] - pub tags: Vec, -} +// Using CatalogNode for sources as well, as their structure is CatalogTable +pub type CatalogSource = CatalogNode; -#[derive(Debug, Deserialize, Clone)] -pub struct DbtColumn { +#[derive(Debug, Deserialize, Clone, Default)] +pub struct TableMetadata { // Was DbtNodeMetadata; matches schema.nodes..metadata #[serde(rename = "type")] - pub column_type: String, - pub index: Option, // Index might not always be present - pub name: String, - pub comment: Option, + pub type_: String, // Required: database object type (e.g. "TABLE", "VIEW") + pub schema: String, // Required + pub name: String, // Required: relation name in the database #[serde(default)] - pub description: Option, // Columns can also have descriptions + pub database: Option, // string | null #[serde(default)] - pub meta: Option>, + pub comment: Option, // string | null #[serde(default)] - pub tags: Vec, + pub owner: Option, // string | null +} + +#[derive(Debug, Deserialize, Clone, Default)] +pub struct ColumnMetadata { // Was DbtColumn; matches schema.nodes..columns. + #[serde(rename = "type")] + pub type_: String, // Required: database column type + pub index: u32, // Required + pub name: String, // Required + #[serde(default)] + pub comment: Option, // string | null +} + +#[derive(Debug, Deserialize, Clone, Default)] +pub struct StatsItem { // matches schema.nodes..stats. + pub id: String, // Required + pub label: String, // Required + #[serde(default)] + pub value: serde_json::Value, // anyOf: boolean, string, number, null. Required. + pub include: bool, // Required + #[serde(default)] + pub description: Option, // string | null } \ No newline at end of file