ok things are pretty good, but a few more tweaks

This commit is contained in:
dal 2025-05-06 13:23:18 -06:00
parent b8128dc75c
commit f0530f1ef9
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
10 changed files with 956 additions and 747 deletions

View File

@ -0,0 +1,9 @@
[package]
name = "rerank"
version = "0.1.0"
edition = "2021"
[dependencies]
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

View File

@ -0,0 +1,87 @@
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::error::Error;
#[derive(Debug)]
pub enum RerankerType {
Cohere,
Mxbai,
Jina,
}
pub struct Reranker {
reranker_type: RerankerType,
api_key: String,
base_url: String,
model: String,
client: Client,
}
impl Reranker {
pub fn new() -> Result<Self, Box<dyn Error>> {
let provider = std::env::var("RERANK_PROVIDER")?;
let reranker_type = match provider.to_lowercase().as_str() {
"cohere" => RerankerType::Cohere,
"mxbai" => RerankerType::Mxbai,
"jina" => RerankerType::Jina,
_ => return Err("Invalid provider specified".into()),
};
let api_key = std::env::var("RERANK_API_KEY")?;
let model = std::env::var("RERANK_MODEL")?;
let base_url = match reranker_type {
RerankerType::Cohere => "https://api.cohere.com/v2/rerank",
RerankerType::Mxbai => "https://api.mixedbread.ai/v1/rerank",
RerankerType::Jina => "https://api.jina.ai/v1/rerank",
}.to_string();
let client = Client::new();
Ok(Self {
reranker_type,
api_key,
base_url,
model,
client,
})
}
pub async fn rerank(
&self,
query: &str,
documents: &[&str],
top_n: usize,
) -> Result<Vec<RerankResult>, Box<dyn Error>> {
let request_body = RerankRequest {
query: query.to_string(),
documents: documents.iter().map(|s| s.to_string()).collect(),
top_n,
model: self.model.clone(),
};
let response = self
.client
.post(&self.base_url)
.header("Authorization", format!("Bearer {}", self.api_key))
.json(&request_body)
.send()
.await?;
let response_body: RerankResponse = response.json().await?;
Ok(response_body.results)
}
}
#[derive(Serialize)]
struct RerankRequest {
query: String,
documents: Vec<String>,
top_n: usize,
model: String,
}
#[derive(Deserialize)]
struct RerankResponse {
results: Vec<RerankResult>,
}
#[derive(Deserialize)]
pub struct RerankResult {
pub index: usize,
pub relevance_score: f32,
}

View File

@ -128,7 +128,7 @@ impl ProgressTracker for DeployProgress {
}
/// Parse a YAML model file into semantic_layer::Model structs
fn parse_model_file(file_path: &Path) -> Result<Vec<Model>> {
pub fn parse_model_file(file_path: &Path) -> Result<Vec<Model>> {
let yml_content = std::fs::read_to_string(file_path)?;
// First try parsing as a SemanticLayerSpec (with top-level 'models' key)
@ -150,7 +150,7 @@ fn parse_model_file(file_path: &Path) -> Result<Vec<Model>> {
/// 1. Values in the model itself
/// 2. Values from the project context
/// 3. Values from the global config
fn resolve_model_configurations(
pub fn resolve_model_configurations(
models_with_context: Vec<(Model, Option<&ProjectContext>)>,
global_buster_config: &BusterConfig,
) -> Result<Vec<Model>> {

View File

@ -3,378 +3,282 @@ use colored::*;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Duration;
// use std::time::Duration; // Duration seems unused here now
use crate::utils::config::BusterConfig;
use crate::commands::init::{YamlModel, YamlSemanticLayerSpec, YamlDimension, YamlMeasure, is_measure_type};
use crate::commands::init::{YamlModel, YamlDimension, YamlMeasure}; // is_measure_type is also in init
use dbt_utils::models::{DbtCatalog, DbtNode, DbtColumn, DbtNodeMetadata, DbtCatalogMetadata};
// Use new struct names from dbt_utils
use dbt_utils::models::{DbtCatalog, CatalogNode, ColumnMetadata, TableMetadata}; // CatalogMetadata might not be directly used here
use dbt_utils::{run_dbt_docs_generate, load_and_parse_catalog};
use indicatif::{ProgressBar, ProgressStyle};
use indicatif::{ProgressBar, ProgressStyle}; // Keep for progress spinners if any remain or are added
use inquire::Confirm;
use glob::{Pattern};
use glob::{glob, Pattern};
pub async fn generate_semantic_models_command(
path_arg: Option<String>,
target_output_dir_arg: Option<String>,
target_output_dir_arg: Option<String>, // This argument determines the *base* output dir if specified
) -> Result<()> {
println!(
"{}",
"Starting semantic model generation/update...".bold().blue()
);
println!("{}", "Starting semantic model generation/update (file-first approach)...".bold().blue());
// 1. Determine Buster configuration directory (where buster.yml is or should be)
// For now, assume current directory. This might need to be more sophisticated if target_output_dir_arg implies a different project.
let buster_config_dir = std::env::current_dir().context("Failed to get current directory")?;
let buster_config = BusterConfig::load_from_dir(&buster_config_dir)?.ok_or_else(|| {
anyhow!("buster.yml not found in {}. Please run 'buster init' first.", buster_config_dir.display())
})?;
// 2. Load BusterConfig
let buster_config = match BusterConfig::load_from_dir(&buster_config_dir) {
Ok(Some(cfg)) => cfg,
Ok(None) => {
return Err(anyhow!(
"buster.yml not found in {}. Please run 'buster init' first or ensure buster.yml exists.",
buster_config_dir.display()
));
}
Err(e) => {
return Err(anyhow!("Failed to load buster.yml: {}. Please ensure it is correctly formatted.", e));
}
};
// 3. Determine target semantic YAML base directory and generation mode
let mut is_side_by_side_generation = false;
let effective_semantic_models_base_dir: PathBuf; // Base for path construction
if let Some(path_str) = target_output_dir_arg {
// User specified an output directory via CLI arg. Not side-by-side.
effective_semantic_models_base_dir = if Path::new(&path_str).is_absolute() {
PathBuf::from(path_str)
} else {
buster_config_dir.join(path_str)
};
println!("Target semantic models base directory (from CLI arg): {}", effective_semantic_models_base_dir.display().to_string().cyan());
fs::create_dir_all(&effective_semantic_models_base_dir).with_context(|| format!("Failed to create semantic models base directory: {}", effective_semantic_models_base_dir.display()))?;
} else {
// No CLI arg, check buster.yml config
let configured_semantic_paths = buster_config.projects.as_ref()
.and_then(|projs| projs.first())
.and_then(|proj| proj.semantic_model_paths.as_ref());
if configured_semantic_paths.map_or(true, |paths| paths.is_empty()) { // Default to side-by-side if None or empty list
is_side_by_side_generation = true;
effective_semantic_models_base_dir = buster_config_dir.clone(); // Project root is the base for side-by-side
println!("Semantic models will be generated side-by-side with SQL models (base: {}).", effective_semantic_models_base_dir.display().to_string().cyan());
// No specific single base directory to create for all YAMLs in this mode.
} else {
// Configured path(s) exist, use the first one. Not side-by-side.
let first_path_str = configured_semantic_paths.unwrap().first().unwrap(); // Safe due to map_or and is_empty checks
effective_semantic_models_base_dir = if Path::new(first_path_str).is_absolute() {
PathBuf::from(first_path_str)
} else {
buster_config_dir.join(first_path_str)
};
println!("Target semantic models base directory (from buster.yml): {}", effective_semantic_models_base_dir.display().to_string().cyan());
fs::create_dir_all(&effective_semantic_models_base_dir).with_context(|| format!("Failed to create semantic models base directory: {}", effective_semantic_models_base_dir.display()))?;
}
}
// 4. Load existing semantic models - THIS LOGIC WILL CHANGE SIGNIFICANTLY.
// For now, we clear it as we load 1-to-1.
let mut existing_yaml_models_map: HashMap<String, YamlModel> = HashMap::new();
let initial_model_count = 0; // This will be re-evaluated based on files found
// 5. Run dbt docs generate (similar to init.rs)
let dbt_project_path = &buster_config_dir; // Assuming buster.yml is at the root of dbt project
let catalog_json_path = dbt_project_path.join("target").join("catalog.json");
// --- 1. Load Catalog & Build Lookup Map ---
let catalog_json_path = buster_config_dir.join("target").join("catalog.json");
if Confirm::new("Run 'dbt docs generate' to refresh dbt catalog (catalog.json)?")
.with_default(true)
.prompt()?
{
// Use the dbt_utils helper
match run_dbt_docs_generate(dbt_project_path).await {
Ok(_) => { /* Success is logged by the helper */ }
Err(e) => {
eprintln!("{}", format!("Failed to run 'dbt docs generate' via dbt_utils: {}. Proceeding with existing catalog.json if available.", e).yellow());
}
match run_dbt_docs_generate(&buster_config_dir).await {
Ok(_) => {},
Err(e) => eprintln!("{}", format!("'dbt docs generate' error. Proceeding with existing catalog. Error: {}", e).yellow()),
}
} else {
println!("{}", "Skipping 'dbt docs generate'. Will look for existing catalog.json.".dimmed());
println!("{}", "Skipping 'dbt docs generate'. Using existing catalog.json.".dimmed());
}
// Load and parse catalog.json using dbt_utils helper
if !catalog_json_path.exists() {
eprintln!("{}", format!("✗ catalog.json not found at {}. Cannot generate/update models.", catalog_json_path.display()).red());
return Ok(());
}
let dbt_catalog = match load_and_parse_catalog(&catalog_json_path) {
Ok(catalog) => {
println!("{}", "✓ Successfully parsed catalog.json via dbt_utils.".green());
println!("{}", "✓ Successfully parsed catalog.json.".green());
catalog
}
Err(e) => {
eprintln!("{}", format!("✗ Error loading/parsing catalog.json via dbt_utils: {}. Ensure catalog.json exists and is valid.", e).red());
return Err(e.into()); // Propagate error if catalog loading fails
eprintln!("{}", format!("✗ Error loading/parsing catalog.json: {}. Cannot generate/update.", e).red());
return Ok(());
}
};
// 7. Determine scope & reconcile
// This is a placeholder for the detailed reconciliation logic
// described in the plan (new models, updated models/columns, deleted models/columns)
println!("{}", "Reconciling semantic models with dbt catalog...".yellow());
let catalog_nodes_by_name: HashMap<String, &CatalogNode> = dbt_catalog.nodes.values()
.filter_map(|node| {
if node.derived_resource_type.as_deref() == Some("model") {
node.derived_model_name_from_file.as_ref().map(|name| (name.clone(), node))
} else { None }
})
.collect();
// Collect model_paths from buster_config for scoping (similar to init.rs)
let mut configured_model_path_patterns: Vec<Pattern> = Vec::new();
if let Some(projects) = &buster_config.projects {
for pc in projects {
if let Some(model_paths) = &pc.model_paths {
for path_str in model_paths {
let pattern_str = buster_config_dir.join(path_str).join("**").join("*.sql").to_string_lossy().into_owned();
match Pattern::new(&pattern_str) {
Ok(p) => configured_model_path_patterns.push(p),
Err(e) => eprintln!("{}", format!("Warning: Invalid glob pattern '{}': {}", pattern_str, e).yellow()),
if catalog_nodes_by_name.is_empty() {
println!("{}", "No models found in dbt catalog. Nothing to generate/update.".yellow());
return Ok(());
}
// --- 2. Determine SQL Files to Process (based on path_arg or buster.yml model_paths) ---
let mut sql_files_to_process: HashSet<PathBuf> = HashSet::new();
let dbt_project_model_roots_for_stripping = crate::commands::init::parse_dbt_project_file_content(&buster_config_dir)?.as_ref()
.map(|c| c.model_paths.iter().map(PathBuf::from).collect::<Vec<PathBuf>>())
.unwrap_or_else(|| vec![PathBuf::from("models")]);
if let Some(pa_str) = &path_arg {
let target_path = buster_config_dir.join(pa_str);
if target_path.is_file() && target_path.extension().map_or(false, |ext| ext == "sql") {
sql_files_to_process.insert(target_path);
} else if target_path.is_dir() {
let glob_pattern = target_path.join("**/*.sql");
match glob(&glob_pattern.to_string_lossy()) {
Ok(paths) => paths.for_each(|entry| if let Ok(path) = entry { if path.is_file() { sql_files_to_process.insert(path); } }),
Err(e) => eprintln!("{}", format!("Error globbing '{}': {}", glob_pattern.display(), e).yellow()),
}
} else {
eprintln!("{}", format!("Warning: path_arg '{}' is not a valid SQL file or directory. Processing all configured models.", pa_str).yellow());
// Fall through to buster.yml model_paths if path_arg is invalid
}
}
if sql_files_to_process.is_empty() { // If path_arg didn't yield files, or wasn't provided, use buster.yml config
let mut patterns_from_config: Vec<Pattern> = Vec::new();
if let Some(projects) = &buster_config.projects {
if let Some(first_project) = projects.first() {
if let Some(mp_globs) = &first_project.model_paths {
for path_str_glob in mp_globs {
match Pattern::new(&buster_config_dir.join(path_str_glob).to_string_lossy()) {
Ok(p) => patterns_from_config.push(p),
Err(e) => eprintln!("{}", format!("Warning: Invalid glob pattern from buster.yml '{}': {}", path_str_glob, e).yellow()),
}
}
}
}
}
}
// Also consider top-level model_paths if no projects or for global scope
if let Some(model_paths) = &buster_config.model_paths {
for path_str in model_paths {
let pattern_str = buster_config_dir.join(path_str).join("**").join("*.sql").to_string_lossy().into_owned();
match Pattern::new(&pattern_str) {
Ok(p) => configured_model_path_patterns.push(p),
Err(e) => eprintln!("{}", format!("Warning: Invalid glob pattern '{}': {}", pattern_str, e).yellow()),
if patterns_from_config.is_empty() { // Still no patterns, use dbt_project.yml defaults
println!("{}", "No specific model paths from path_arg or buster.yml. Using dbt_project.yml model-paths.".dimmed());
for dbt_root_rel in &dbt_project_model_roots_for_stripping {
let glob_pattern = buster_config_dir.join(dbt_root_rel).join("**/*.sql");
match glob(&glob_pattern.to_string_lossy()) {
Ok(paths) => paths.for_each(|entry| if let Ok(p) = entry { if p.is_file() {sql_files_to_process.insert(p);}}),
Err(e) => eprintln!("{}", format!("Error globbing default dbt path '{}': {}",glob_pattern.display(),e).yellow()),
}
}
} else {
println!("{}", format!("Scanning for SQL files based on buster.yml model_paths: {:?}", patterns_from_config.iter().map(|p|p.as_str()).collect::<Vec<_>>()).dimmed());
for pattern in patterns_from_config {
match glob(pattern.as_str()) {
Ok(paths) => paths.for_each(|entry| if let Ok(p) = entry { if p.is_file() && p.extension().map_or(false, |e|e=="sql"){sql_files_to_process.insert(p);}}),
Err(e) => eprintln!("{}",format!("Glob pattern error for '{}': {}",pattern.as_str(),e).yellow()),
}
}
}
}
let mut dbt_models_processed_count = 0;
let mut new_models_added_count = 0;
if sql_files_to_process.is_empty() {
println!("{}", "No SQL model files found to process/update based on configuration.".yellow());
return Ok(());
}
println!("{}", format!("Found {} SQL model file(s) for potential generation/update.", sql_files_to_process.len()).dimmed());
// --- 3. Determine Output Base Directory for Semantic Models ---
let semantic_models_base_dir_path_str = target_output_dir_arg.or_else(||
buster_config.projects.as_ref()
.and_then(|p| p.first())
.and_then(|proj| proj.semantic_model_paths.as_ref())
.and_then(|paths| paths.first().cloned())
).unwrap_or_else(||
// Default to side-by-side (empty string means relative to SQL file later)
String::new()
);
let (is_side_by_side_generation, semantic_output_base_abs_dir) = if semantic_models_base_dir_path_str.is_empty() {
println!("{}", "Semantic models output set to side-by-side with SQL files.".dimmed());
(true, buster_config_dir.clone()) // Base for side-by-side is project root
} else {
let abs_path = if Path::new(&semantic_models_base_dir_path_str).is_absolute() {
PathBuf::from(&semantic_models_base_dir_path_str)
} else {
buster_config_dir.join(&semantic_models_base_dir_path_str)
};
println!("{}", format!("Semantic models output base directory: {}", abs_path.display()).cyan());
fs::create_dir_all(&abs_path).context(format!("Failed to create semantic models output dir: {}", abs_path.display()))?;
(false, abs_path)
};
// --- 4. Iterate SQL Files, Match to Catalog, Generate/Update YamlModels ---
let mut models_generated_count = 0;
let mut models_updated_count = 0;
let mut columns_added_count = 0;
let mut columns_updated_count = 0;
let mut columns_removed_count = 0;
let mut sql_models_successfully_processed_from_catalog_count = 0; // New counter
let mut processed_dbt_model_unique_ids: HashSet<String> = HashSet::new(); // Using unique_id for tracking
// Get dbt model source roots for path stripping (similar to init.rs)
let dbt_project_file_content_for_paths = crate::commands::init::parse_dbt_project_file_content(&buster_config_dir)?;
let dbt_model_source_roots: Vec<PathBuf> = dbt_project_file_content_for_paths.as_ref()
.map(|content| content.model_paths.iter().map(PathBuf::from).collect())
.unwrap_or_else(|| vec![PathBuf::from("models")]);
for (dbt_node_id, dbt_node) in dbt_catalog.nodes.iter().filter(|(_,n)| {
match &n.resource_type {
Some(rt) => rt == "model",
None => {
eprintln!(
"{}",
format!(
"Warning: Skipping dbt node with unique_id: {} because it is missing 'resource_type' in catalog.json.",
n.unique_id
).yellow()
);
false
}
for sql_file_abs_path in sql_files_to_process {
let model_name_from_filename = sql_file_abs_path.file_stem().map_or_else(String::new, |s| s.to_string_lossy().into_owned());
if model_name_from_filename.is_empty() {
eprintln!("{}", format!("Warning: Could not get model name from file {}. Skipping.", sql_file_abs_path.display()).yellow());
continue;
}
}) {
// Path construction for individual YAML
let Some(ref dbt_original_file_path_str) = dbt_node.original_file_path else {
eprintln!("{}", format!("Warning: Skipping dbt model {} due to missing 'original_file_path'.", dbt_node.unique_id).yellow());
let Some(catalog_node) = catalog_nodes_by_name.get(&model_name_from_filename) else {
eprintln!("{}", format!("Info: SQL model file '{}' found, but no corresponding entry ('{}') in dbt catalog. Skipping.", sql_file_abs_path.display(), model_name_from_filename).dimmed());
continue;
};
let dbt_model_path_obj = Path::new(dbt_original_file_path_str);
let mut relative_to_dbt_model_root = PathBuf::new();
let mut found_base_for_stripping = false;
for dbt_source_root in &dbt_model_source_roots { // dbt_source_root is e.g. "models"
if let Ok(stripped_path) = dbt_model_path_obj.strip_prefix(dbt_source_root) {
relative_to_dbt_model_root = stripped_path.to_path_buf(); // e.g. "marts/sales/revenue.sql"
found_base_for_stripping = true;
break;
}
}
if !found_base_for_stripping {
// Fallback: if original_file_path_str didn't start with any known dbt_model_source_roots,
// then use original_file_path_str as is for the suffix part for dedicated dir mode.
// For side-by-side, the full original path is used anyway.
relative_to_dbt_model_root = dbt_model_path_obj.to_path_buf();
eprintln!("{}", format!(
"Warning: Could not strip a known dbt model source root ('{:?}') from dbt model path '{}'. Using full path for suffix calculation: '{}'",
dbt_model_source_roots, dbt_original_file_path_str, relative_to_dbt_model_root.display()
).yellow()
);
}
let Some(ref table_meta) = catalog_node.metadata else {
eprintln!("{}", format!("Warning: Catalog entry for '{}' (file: {}) is missing metadata. Skipping.", model_name_from_filename, sql_file_abs_path.display()).yellow());
continue;
};
// actual_model_name_in_yaml is from catalog metadata.name
let actual_model_name_in_yaml = table_meta.name.clone();
println!("Processing: SQL '{}' -> Catalog Model '{}' (UniqueID: {})",
sql_file_abs_path.display().to_string().cyan(),
actual_model_name_in_yaml.purple(),
catalog_node.unique_id.as_deref().unwrap_or("N/A").dimmed()
);
sql_models_successfully_processed_from_catalog_count += 1; // Increment here
let individual_semantic_yaml_path: PathBuf;
if is_side_by_side_generation {
// Side-by-side: YAML is next to SQL. dbt_original_file_path_str is relative to buster_config_dir.
individual_semantic_yaml_path = buster_config_dir.join(dbt_original_file_path_str).with_extension("yml");
let relative_sql_path_str = pathdiff::diff_paths(&sql_file_abs_path, &buster_config_dir)
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_else(|| sql_file_abs_path.to_string_lossy().into_owned());
let individual_semantic_yaml_path: PathBuf = if is_side_by_side_generation {
sql_file_abs_path.with_extension("yml")
} else {
// Dedicated output directory (effective_semantic_models_base_dir)
// relative_to_dbt_model_root is the path part after the dbt model source root (e.g. "marts/sales/revenue.sql")
let yaml_filename_with_subdir = relative_to_dbt_model_root.with_extension("yml"); // e.g. "marts/sales/revenue.yml"
individual_semantic_yaml_path = effective_semantic_models_base_dir.join(yaml_filename_with_subdir);
}
processed_dbt_model_unique_ids.insert(dbt_node.unique_id.clone()); // Store unique_id
// --- Scoping logic (remains similar, but applied before file load) ---
let dbt_original_file_path_abs = buster_config_dir.join(dbt_original_file_path_str);
let is_in_configured_model_paths = configured_model_path_patterns.is_empty() ||
configured_model_path_patterns.iter().any(|p| p.matches_path(&dbt_original_file_path_abs));
let is_in_path_arg_scope = match &path_arg {
Some(pa_str) => {
let target_path_abs = buster_config_dir.join(pa_str);
if target_path_abs.is_file() {
dbt_original_file_path_abs == target_path_abs
} else { // Assume directory
dbt_original_file_path_abs.starts_with(&target_path_abs)
let mut stripped_suffix_for_yaml: Option<PathBuf> = None;
for dbt_root in &dbt_project_model_roots_for_stripping {
let abs_dbt_root = buster_config_dir.join(dbt_root);
if let Ok(stripped) = sql_file_abs_path.strip_prefix(&abs_dbt_root) {
stripped_suffix_for_yaml = Some(stripped.with_extension("yml"));
break;
}
}
None => true,
let final_suffix = stripped_suffix_for_yaml.unwrap_or_else(|| PathBuf::from(&model_name_from_filename).with_extension("yml"));
semantic_output_base_abs_dir.join(final_suffix)
};
if let Some(p) = individual_semantic_yaml_path.parent() { fs::create_dir_all(p)?; }
if !is_in_configured_model_paths || !is_in_path_arg_scope {
continue;
}
// --- Reconciliation Logic (Create or Update) ---
let existing_yaml_model_opt: Option<YamlModel> = if individual_semantic_yaml_path.exists() {
fs::read_to_string(&individual_semantic_yaml_path)
.ok()
.and_then(|content| serde_yaml::from_str::<YamlModel>(&content).ok())
} else { None };
// Ensure metadata and metadata.name exist
let Some(ref dbt_node_metadata) = dbt_node.metadata else {
eprintln!(
"{}",
format!(
"Warning: Skipping dbt node with unique_id: {} in generate because its 'metadata' block is missing.",
dbt_node.unique_id
).yellow()
);
continue;
};
let Some(ref dbt_model_name_from_metadata) = dbt_node_metadata.name else {
eprintln!(
"{}",
format!(
"Warning: Skipping dbt model with unique_id: {} in generate because its 'metadata.name' is missing.",
dbt_node.unique_id
).yellow()
);
continue;
};
let dbt_model_name_for_yaml = dbt_model_name_from_metadata.clone();
dbt_models_processed_count += 1;
// --- End Scoping Logic ---
let existing_semantic_model_opt: Option<YamlModel> = if individual_semantic_yaml_path.exists() {
match fs::read_to_string(&individual_semantic_yaml_path) {
Ok(content) => {
match serde_yaml::from_str::<YamlModel>(&content) {
Ok(model) => Some(model),
Err(e) => {
eprintln!("{}", format!("Warning: Failed to parse existing semantic YAML '{}': {}. Will attempt to overwrite.", individual_semantic_yaml_path.display(), e).yellow());
None
}
}
}
Err(e) => {
eprintln!("{}", format!("Warning: Failed to read existing semantic YAML '{}': {}. Will attempt to create anew.", individual_semantic_yaml_path.display(), e).yellow());
None
}
}
} else {
None
};
match existing_semantic_model_opt {
match existing_yaml_model_opt {
Some(mut existing_model) => {
// Existing model: Update it
let mut model_was_updated = false;
println!("Updating existing semantic model: {} at {}", dbt_model_name_for_yaml.cyan(), individual_semantic_yaml_path.display());
if existing_model.name != dbt_model_name_for_yaml {
// This might happen if filename and inner model name differ. We prioritize dbt_model_name_for_yaml.
// Or if user manually changed name in YML. For now, dbt catalog is source of truth for name.
println!(" Aligning name in YAML from '{}' to '{}'", existing_model.name, dbt_model_name_for_yaml);
existing_model.name = dbt_model_name_for_yaml.clone();
model_was_updated = true;
// Update name if it differs (dbt catalog is source of truth for relation name)
if existing_model.name != actual_model_name_in_yaml {
existing_model.name = actual_model_name_in_yaml.clone(); model_was_updated = true;
}
if let Some(dbt_comment) = &dbt_node_metadata.comment {
if existing_model.description.as_deref() != Some(dbt_comment.as_str()) {
existing_model.description = Some(dbt_comment.clone());
model_was_updated = true;
}
} // Consider if dbt_comment=None should clear existing_model.description
if existing_model.original_file_path.as_deref() != Some(dbt_original_file_path_str.as_str()) {
existing_model.original_file_path = Some(dbt_original_file_path_str.clone());
model_was_updated = true;
// Update description from catalog if catalog has one
if table_meta.comment.is_some() && existing_model.description != table_meta.comment {
existing_model.description = table_meta.comment.clone(); model_was_updated = true;
}
// Update DB/Schema if different - dbt catalog is source of truth
if existing_model.database != dbt_node.database {
existing_model.database = dbt_node.database.clone();
model_was_updated = true;
// Update original_file_path
if existing_model.original_file_path.as_deref() != Some(&relative_sql_path_str) {
existing_model.original_file_path = Some(relative_sql_path_str.clone()); model_was_updated = true;
}
if existing_model.schema != dbt_node.schema {
existing_model.schema = dbt_node.schema.clone();
model_was_updated = true;
// Update db/schema from catalog node (which should be authoritative)
if existing_model.database.as_deref() != catalog_node.metadata.as_ref().and_then(|m| m.database.as_deref()) {
existing_model.database = catalog_node.metadata.as_ref().and_then(|m| m.database.clone()); model_was_updated = true;
}
if existing_model.schema.as_deref() != Some(table_meta.schema.as_str()) { // table_meta.schema is String, compare as &str
existing_model.schema = Some(table_meta.schema.clone()); model_was_updated = true;
}
// Reconcile columns
let mut current_dims: Vec<YamlDimension> = Vec::new();
let mut current_measures: Vec<YamlMeasure> = Vec::new();
let mut dbt_columns_map: HashMap<String, &DbtColumn> = dbt_node.columns.values().map(|c| (c.name.clone(), c)).collect();
let mut dbt_columns_map: HashMap<String, &ColumnMetadata> = catalog_node.columns.values().map(|c| (c.name.clone(), c)).collect();
for existing_dim_col in std::mem::take(&mut existing_model.dimensions) {
if let Some(dbt_col) = dbt_columns_map.remove(&existing_dim_col.name) {
let mut updated_dim = existing_dim_col.clone();
let mut dim_col_updated = false;
if updated_dim.type_.as_deref() != Some(dbt_col.column_type.as_str()) {
updated_dim.type_ = Some(dbt_col.column_type.clone());
dim_col_updated = true; columns_updated_count +=1;
if updated_dim.type_.as_deref() != Some(&dbt_col.type_) { // type_ is String
updated_dim.type_ = Some(dbt_col.type_.clone()); dim_col_updated = true; columns_updated_count +=1;
}
if dbt_col.comment.is_some() && updated_dim.description != dbt_col.comment {
updated_dim.description = dbt_col.comment.clone(); dim_col_updated = true; columns_updated_count +=1;
}
if let Some(dbt_col_comment) = &dbt_col.comment {
if updated_dim.description.as_deref() != Some(dbt_col_comment.as_str()) {
updated_dim.description = Some(dbt_col_comment.clone());
dim_col_updated = true; columns_updated_count +=1;
}
} // else keep user's existing_dim.description
current_dims.push(updated_dim);
if dim_col_updated { model_was_updated = true; }
} else {
println!(" Removing dimension '{}' from semantic model '{}' (no longer in dbt model)", existing_dim_col.name.yellow(), dbt_model_name_for_yaml);
columns_removed_count += 1; model_was_updated = true;
}
} else { columns_removed_count += 1; model_was_updated = true; }
}
for existing_measure_col in std::mem::take(&mut existing_model.measures) {
if let Some(dbt_col) = dbt_columns_map.remove(&existing_measure_col.name) {
let mut updated_measure = existing_measure_col.clone();
let mut measure_col_updated = false;
if updated_measure.type_.as_deref() != Some(dbt_col.column_type.as_str()) {
updated_measure.type_ = Some(dbt_col.column_type.clone());
measure_col_updated = true; columns_updated_count +=1;
if updated_measure.type_.as_deref() != Some(&dbt_col.type_) { // type_ is String
updated_measure.type_ = Some(dbt_col.type_.clone()); measure_col_updated = true; columns_updated_count +=1;
}
if dbt_col.comment.is_some() && updated_measure.description != dbt_col.comment {
updated_measure.description = dbt_col.comment.clone(); measure_col_updated = true; columns_updated_count +=1;
}
if let Some(dbt_col_comment) = &dbt_col.comment {
if updated_measure.description.as_deref() != Some(dbt_col_comment.as_str()) {
updated_measure.description = Some(dbt_col_comment.clone());
measure_col_updated = true; columns_updated_count +=1;
}
} // else keep user's description
current_measures.push(updated_measure);
if measure_col_updated { model_was_updated = true; }
} else {
println!(" Removing measure '{}' from semantic model '{}' (no longer in dbt model)", existing_measure_col.name.yellow(), dbt_model_name_for_yaml);
columns_removed_count += 1; model_was_updated = true;
}
} else { columns_removed_count += 1; model_was_updated = true; }
}
for (col_name, dbt_col) in dbt_columns_map {
println!(" Adding new column '{}' to semantic model '{}'", col_name.green(), dbt_model_name_for_yaml);
if crate::commands::init::is_measure_type(Some(dbt_col.column_type.as_str())) { // Assuming dbt_col.column_type is String
current_measures.push(YamlMeasure { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.column_type.clone()) });
for (_col_name, dbt_col) in dbt_columns_map { // Remaining are new columns
if crate::commands::init::is_measure_type(&dbt_col.type_) { // type_ is String
current_measures.push(YamlMeasure { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.type_.clone()) });
} else {
current_dims.push(YamlDimension { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.column_type.clone()), searchable: false, options: None });
current_dims.push(YamlDimension { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.type_.clone()), searchable: false, options: None });
}
columns_added_count += 1; model_was_updated = true;
}
@ -383,90 +287,46 @@ pub async fn generate_semantic_models_command(
if model_was_updated {
models_updated_count += 1;
let yaml_string = serde_yaml::to_string(&existing_model).context(format!("Failed to serialize updated semantic model {} to YAML", existing_model.name))?;
if let Some(parent_dir) = individual_semantic_yaml_path.parent() { fs::create_dir_all(parent_dir)?; }
fs::write(&individual_semantic_yaml_path, yaml_string).context(format!("Failed to write updated semantic model to {}", individual_semantic_yaml_path.display()))?;
} else {
println!(" No changes detected for semantic model: {}", dbt_model_name_for_yaml);
}
let yaml_string = serde_yaml::to_string(&existing_model)?;
fs::write(&individual_semantic_yaml_path, yaml_string)?;
}
}
None => {
// New semantic model: Generate from scratch
println!("Generating new semantic model: {} at {}", dbt_model_name_for_yaml.green(), individual_semantic_yaml_path.display());
None => { // New semantic model
let mut dimensions = Vec::new();
let mut measures = Vec::new();
for (_col_name, col) in &dbt_node.columns { // dbt_node.columns defaults to empty if missing
if crate::commands::init::is_measure_type(Some(col.column_type.as_str())) { // Assuming col.column_type is String
measures.push(YamlMeasure {
name: col.name.clone(),
description: col.comment.clone(),
type_: Some(col.column_type.clone())
});
for (_col_name, col_meta) in &catalog_node.columns {
if crate::commands::init::is_measure_type(&col_meta.type_) { // type_ is String
measures.push(YamlMeasure { name: col_meta.name.clone(), description: col_meta.comment.clone(), type_: Some(col_meta.type_.clone()) });
} else {
dimensions.push(YamlDimension {
name: col.name.clone(),
description: col.comment.clone(),
type_: Some(col.column_type.clone()),
searchable: false,
options: None
});
dimensions.push(YamlDimension { name: col_meta.name.clone(), description: col_meta.comment.clone(), type_: Some(col_meta.type_.clone()), searchable: false, options: None });
}
}
let new_model = YamlModel {
name: dbt_model_name_for_yaml.clone(),
description: dbt_node_metadata.comment.clone(), // Use dbt_node_metadata
name: actual_model_name_in_yaml,
description: table_meta.comment.clone(),
data_source_name: buster_config.projects.as_ref().and_then(|p|p.first()).and_then(|pc|pc.data_source_name.clone()),
database: dbt_node.database.clone(),
schema: dbt_node.schema.clone(),
database: table_meta.database.clone(), // From TableMetadata
schema: Some(table_meta.schema.clone()), // From TableMetadata (String -> Option<String>)
dimensions,
measures,
original_file_path: Some(dbt_original_file_path_str.clone()),
original_file_path: Some(relative_sql_path_str),
};
let yaml_string = serde_yaml::to_string(&new_model).context(format!("Failed to serialize new semantic model {} to YAML", new_model.name))?;
if let Some(parent_dir) = individual_semantic_yaml_path.parent() { fs::create_dir_all(parent_dir)?; }
fs::write(&individual_semantic_yaml_path, yaml_string).context(format!("Failed to write new semantic model to {}", individual_semantic_yaml_path.display()))?;
new_models_added_count += 1;
let yaml_string = serde_yaml::to_string(&new_model)?;
fs::write(&individual_semantic_yaml_path, yaml_string)?;
models_generated_count += 1;
}
}
}
// Remove or comment out the old logic for handling removed models from a single spec file
/*
let mut removed_models_count = 0;
existing_yaml_models_map.retain(|model_name: &String, _model: &mut YamlModel| {
if processed_dbt_model_names.contains(model_name) {
true
} else {
println!("Removing semantic model '{}' (no longer found in scoped dbt models or catalog.json)", model_name.red());
removed_models_count += 1;
false
}
});
*/
// Remove the final save logic for the aggregated spec file
// let final_models_vec: Vec<YamlModel> = existing_yaml_models_map.values().cloned().collect();
// let updated_spec = YamlSemanticLayerSpec { models: final_models_vec };
// let yaml_string = serde_yaml::to_string(&updated_spec).context("Failed to serialize updated semantic models to YAML")?;
// fs::write(&semantic_models_base_dir_path, yaml_string).context(format!("Failed to write updated semantic models to {}", semantic_models_base_dir_path.display()))?;
// Note: The above fs::write was to semantic_models_base_dir_path which is a directory, this was an error in previous diff. It should have been semantic_models_file_path.
// Since we save per file, this block is removed.
println!("\n{}", "Semantic Model Generation Summary:".bold().green());
println!(" Processed dbt models (in scope): {}", dbt_models_processed_count);
println!(" Semantic models initially loaded: {}", initial_model_count);
println!(" New semantic models added: {}", new_models_added_count.to_string().green());
println!("\n{}", "Semantic Model Generation/Update Summary:".bold().green());
println!(" SQL models processed that had a matching catalog entry: {}", sql_models_successfully_processed_from_catalog_count);
println!(" New semantic models generated: {}", models_generated_count.to_string().green());
println!(" Existing semantic models updated: {}", models_updated_count.to_string().cyan());
println!(" Semantic models removed (dbt model deleted/out of scope): {}", columns_removed_count.to_string().red());
println!(" Columns added: {}", columns_added_count.to_string().green());
println!(" Columns updated (type/dbt_comment): {}", columns_updated_count.to_string().cyan());
println!(" Columns removed: {}", columns_removed_count.to_string().red());
if is_side_by_side_generation {
println!("✓ Semantic models successfully updated (side-by-side with SQL models, base directory: {}).", effective_semantic_models_base_dir.display().to_string().green());
} else {
println!("✓ Semantic models successfully updated in {}.", effective_semantic_models_base_dir.display().to_string().green());
}
println!(" Columns added to existing models: {}", columns_added_count.to_string().green());
println!(" Columns updated in existing models: {}", columns_updated_count.to_string().cyan());
println!(" Columns removed from existing models: {}", columns_removed_count.to_string().red());
// Consider adding a count for models found on disk but not in catalog / models in catalog but no matching SQL file
println!("✓ Semantic model generation/update complete.");
Ok(())
}

View File

@ -11,13 +11,13 @@ use query_engine::credentials::{
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_yaml;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Duration;
// Import from dbt_utils for dbt catalog parsing
use dbt_utils::models::{DbtCatalog, DbtNode, DbtColumn, DbtNodeMetadata, DbtCatalogMetadata};
use dbt_utils::models::{DbtCatalog, CatalogNode, ColumnMetadata, TableMetadata, CatalogMetadata};
use dbt_utils::{run_dbt_docs_generate, load_and_parse_catalog};
// Imports for Buster specific utilities and config
@ -84,21 +84,16 @@ pub fn is_false(val: &bool) -> bool {
}
// Helper function to determine if a SQL type should be a measure
pub fn is_measure_type(sql_type_opt: Option<&str>) -> bool {
match sql_type_opt {
Some(sql_type) => {
let lower_sql_type = sql_type.to_lowercase();
lower_sql_type.contains("int") ||
lower_sql_type.contains("numeric") ||
lower_sql_type.contains("decimal") ||
lower_sql_type.contains("real") ||
lower_sql_type.contains("double") ||
lower_sql_type.contains("float") ||
lower_sql_type.contains("money") ||
lower_sql_type.contains("number")
}
None => false, // If type is missing, default to not a measure (dimension)
}
pub fn is_measure_type(sql_type: &str) -> bool {
let lower_sql_type = sql_type.to_lowercase();
lower_sql_type.contains("int") ||
lower_sql_type.contains("numeric") ||
lower_sql_type.contains("decimal") ||
lower_sql_type.contains("real") ||
lower_sql_type.contains("double") ||
lower_sql_type.contains("float") ||
lower_sql_type.contains("money") ||
lower_sql_type.contains("number")
}
// Enum for Database Type selection (ensure only one definition, placed before use)
@ -638,303 +633,259 @@ async fn generate_semantic_models_from_dbt_catalog(
_config_path: &Path, // Path to buster.yml
buster_config_dir: &Path, // Directory containing buster.yml, assumed dbt project root
) -> Result<()> {
println!("{}", "Starting semantic model generation from dbt catalog...".dimmed());
println!("{}", "Starting semantic model generation (file-first approach)...".dimmed());
// Get the semantic model output configuration from the first project context
let project_semantic_model_paths_config = buster_config.projects.as_ref()
.and_then(|projs| projs.first())
.and_then(|proj| proj.semantic_model_paths.as_ref());
let is_side_by_side_generation = project_semantic_model_paths_config.map_or(true, |paths| paths.is_empty());
let path_construction_base_dir: PathBuf; // Base directory for constructing output paths
if is_side_by_side_generation {
path_construction_base_dir = buster_config_dir.to_path_buf(); // Project root is the base for side-by-side
println!("{}", format!("Semantic models will be generated side-by-side with SQL models (within '{}').", path_construction_base_dir.display()).dimmed());
} else {
// A specific directory (or directories) was configured for semantic models. Use the first one.
let primary_path_str = project_semantic_model_paths_config.unwrap().first().unwrap(); // Safe due to map_or check
path_construction_base_dir = buster_config_dir.join(primary_path_str);
println!("{}", format!("Semantic models will be generated in/under: {}", path_construction_base_dir.display()).dimmed());
// Ensure this specific output directory exists
fs::create_dir_all(&path_construction_base_dir).map_err(|e| {
anyhow!("Failed to create semantic models output directory '{}': {}", path_construction_base_dir.display(), e)
})?;
}
// Get dbt model source roots (e.g., ["models", "my_other_models"])
// These are paths relative to the dbt_project_path (buster_config_dir)
let dbt_project_content = parse_dbt_project_file_content(buster_config_dir)?;
let dbt_model_source_roots: Vec<PathBuf> = dbt_project_content.as_ref()
.map(|content| content.model_paths.iter().map(PathBuf::from).collect())
.unwrap_or_else(|| vec![PathBuf::from("models")]); // Default if not found
// Get defaults from the primary project context for model properties
let primary_project_context = buster_config.projects.as_ref().and_then(|p| p.first());
let default_data_source_name = primary_project_context
.and_then(|pc| pc.data_source_name.as_ref());
let default_database = primary_project_context
.and_then(|pc| pc.database.as_ref());
let default_schema = primary_project_context
.and_then(|pc| pc.schema.as_ref());
let dbt_project_path = buster_config_dir;
let catalog_json_path = dbt_project_path.join("target").join("catalog.json");
if Confirm::new("Can Buster run 'dbt docs generate' to get the latest schema (catalog.json)?")
// --- 1. Load Catalog & Build Lookup Map ---
let catalog_json_path = buster_config_dir.join("target").join("catalog.json");
if Confirm::new("Run 'dbt docs generate' to refresh dbt catalog (catalog.json)?")
.with_default(true)
.prompt()?
{
match run_dbt_docs_generate(dbt_project_path).await {
Ok(_) => { /* Success is logged by the helper function itself */ }
Err(e) => {
eprintln!("{}", format!("'dbt docs generate' (via dbt_utils) reported an error. Proceeding with existing catalog.json if available. Error: {}", e).yellow());
}
match run_dbt_docs_generate(buster_config_dir).await {
Ok(_) => {}
Err(e) => eprintln!("{}", format!("'dbt docs generate' error. Proceeding with existing catalog if available. Error: {}", e).yellow()),
}
} else {
println!("{}", "Skipping 'dbt docs generate'. Will look for existing catalog.json.".dimmed());
println!("{}", "Skipping 'dbt docs generate'. Using existing catalog.json if available.".dimmed());
}
if !catalog_json_path.exists() {
eprintln!(
"{}",
format!("✗ catalog.json not found at {}.", catalog_json_path.display()).red()
);
println!("Please ensure 'dbt docs generate' has been run successfully in your dbt project.");
println!("Skipping semantic model generation.");
eprintln!("{}", format!("✗ catalog.json not found at {}. Cannot generate models.", catalog_json_path.display()).red());
return Ok(());
}
println!(
"{}",
format!(
"Attempting to load catalog.json from {}",
catalog_json_path.display()
)
.dimmed()
);
let dbt_catalog = match load_and_parse_catalog(&catalog_json_path) {
Ok(catalog) => {
println!("{}", "✓ Successfully parsed catalog.json via dbt_utils.".green());
println!("{}", "✓ Successfully parsed catalog.json.".green());
catalog
}
Err(e) => {
eprintln!("{}", format!("✗ Error loading/parsing catalog.json via dbt_utils: {}. Ensure catalog.json exists and is valid.", e).red());
eprintln!("{}", format!("✗ Error loading/parsing catalog.json: {}. Cannot generate models.", e).red());
return Ok(());
}
};
// --- Model Scoping Logic ---
let mut configured_model_path_patterns: Vec<Pattern> = Vec::new();
if let Some(projects) = &buster_config.projects {
for project_context in projects {
if let Some(model_paths) = &project_context.model_paths {
for path_str in model_paths {
// Construct absolute path for glob pattern relative to buster_config_dir
let path_to_glob = buster_config_dir.join(path_str);
// dbt model paths are often directories, so add a glob to match files within them.
// e.g., if path_str is "models", pattern becomes "<abs_path_to_models>/**/*.sql"
// If path_str already contains wildcards, use it as is more directly.
let pattern_str = if path_str.contains('*') || path_str.contains('?') || path_str.contains('[') {
path_to_glob.to_string_lossy().into_owned()
} else {
path_to_glob.join("**").join("*.sql").to_string_lossy().into_owned()
};
// Build lookup map: Keyed by derived_model_name_from_file (which should be metadata.name)
// We filter for nodes that have a derived_model_name_from_file and are models.
let catalog_nodes_by_name: HashMap<String, &CatalogNode> = dbt_catalog.nodes.values()
.filter_map(|node| {
if node.derived_resource_type.as_deref() == Some("model") {
node.derived_model_name_from_file.as_ref().map(|name| (name.clone(), node))
} else {
None
}
})
.collect();
if catalog_nodes_by_name.is_empty() {
println!("{}", "No models found in the dbt catalog after parsing. Nothing to generate.".yellow());
return Ok(());
}
match Pattern::new(&pattern_str) {
Ok(p) => configured_model_path_patterns.push(p),
Err(e) => eprintln!(
"{}",
format!(
"Warning: Invalid glob pattern '{}' from buster.yml model_paths: {}",
pattern_str,
e
)
.yellow()
),
// --- 2. Determine SQL Files to Process ---
let mut sql_files_to_process: HashSet<PathBuf> = HashSet::new();
let mut model_path_patterns_from_buster_yml: Vec<Pattern> = Vec::new();
if let Some(projects) = &buster_config.projects {
if let Some(first_project) = projects.first() {
if let Some(mp_globs) = &first_project.model_paths {
for path_str_glob in mp_globs {
match Pattern::new(&buster_config_dir.join(path_str_glob).to_string_lossy()) {
Ok(p) => model_path_patterns_from_buster_yml.push(p),
Err(e) => eprintln!("{}", format!("Warning: Invalid glob pattern '{}' from buster.yml: {}", path_str_glob, e).yellow()),
}
}
}
}
}
if configured_model_path_patterns.is_empty() {
println!("{}", "No model_paths configured in buster.yml or patterns are invalid. Will process all models from catalog.json.".yellow());
if !model_path_patterns_from_buster_yml.is_empty() {
println!("{}", format!("Scanning for SQL files based on model_paths patterns in buster.yml: {:?}",
model_path_patterns_from_buster_yml.iter().map(|p| p.as_str()).collect::<Vec<_>>() ).dimmed());
for pattern in model_path_patterns_from_buster_yml {
// Ripgrep or glob to find files matching the pattern string itself
// This simple glob might need enhancement for more complex patterns handled by buster_config.model_paths
// For now, assuming model_paths are like "models/marts/**/*.sql"
match glob::glob(pattern.as_str()) {
Ok(paths) => {
for entry in paths {
match entry {
Ok(path) => if path.is_file() && path.extension().map_or(false, |ext| ext == "sql") {
sql_files_to_process.insert(path);
}
Err(e) => eprintln!("{}", format!("Error processing glob path: {}", e).yellow()),
}
}
}
Err(e) => eprintln!("{}", format!("Glob pattern error for '{}': {}", pattern.as_str(), e).yellow()),
}
}
} else {
println!("{}", "No model_paths in buster.yml. Using dbt_project.yml model-paths to find SQL files.".dimmed());
let dbt_project_content = parse_dbt_project_file_content(buster_config_dir)?;
let dbt_model_source_roots = dbt_project_content.as_ref()
.map(|content| content.model_paths.iter().map(PathBuf::from).collect::<Vec<PathBuf>>())
.unwrap_or_else(|| vec![PathBuf::from("models")]);
for dbt_source_root_rel in dbt_model_source_roots {
let dbt_source_root_abs = buster_config_dir.join(dbt_source_root_rel);
let glob_pattern = dbt_source_root_abs.join("**/*.sql");
match glob::glob(&glob_pattern.to_string_lossy()) {
Ok(paths) => {
for entry in paths {
match entry {
Ok(path) => if path.is_file() {
sql_files_to_process.insert(path);
}
Err(e) => eprintln!("{}", format!("Error processing glob path from dbt_project.yml: {}", e).yellow()),
}
}
}
Err(e) => eprintln!("{}", format!("Glob pattern error for dbt_project.yml path '{}': {}", glob_pattern.display(), e).yellow()),
}
}
}
// --- End Model Scoping Logic ---
if sql_files_to_process.is_empty() {
println!("{}", "No SQL model files found based on configuration. Nothing to generate.".yellow());
return Ok(());
}
println!("{}", format!("Found {} SQL model file(s) to process.", sql_files_to_process.len()).dimmed());
// --- 3. Determine Output Configuration (Side-by-side or Dedicated Dir) ---
let project_semantic_model_paths_config = buster_config.projects.as_ref()
.and_then(|projs| projs.first())
.and_then(|proj| proj.semantic_model_paths.as_ref());
let is_side_by_side_generation = project_semantic_model_paths_config.map_or(true, |paths| paths.is_empty());
let primary_dedicated_output_dir: Option<PathBuf> = if is_side_by_side_generation { None }
else {
project_semantic_model_paths_config.and_then(|paths| paths.first()).map(|p_str| buster_config_dir.join(p_str))
};
if is_side_by_side_generation {
println!("{}", "Semantic models will be generated side-by-side with their SQL counterparts.".dimmed());
} else if let Some(ref out_dir) = primary_dedicated_output_dir {
println!("{}", format!("Semantic models will be generated in/under: {}", out_dir.display()).dimmed());
fs::create_dir_all(out_dir).map_err(|e| anyhow!("Failed to create semantic models output dir '{}': {}", out_dir.display(), e))?;
} else {
// This case (not side-by-side but no primary_dedicated_output_dir) should ideally not happen if config is valid.
// Defaulting to side-by-side for safety, though this indicates a potential config issue handled earlier in init.
println!("{}", "Warning: Semantic model output directory not clearly configured, defaulting to side-by-side generation logic.".yellow());
}
// --- 4. Iterate Through SQL Files & Generate YamlModels ---
let mut yaml_models_generated_count = 0;
let default_data_source_name = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.data_source_name.as_ref());
let default_database = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.database.as_ref());
let default_schema = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.schema.as_ref());
for (_node_id, node) in dbt_catalog.nodes.iter().filter(|(_id, n)| {
match &n.resource_type {
Some(rt) => rt == "model",
None => {
eprintln!(
"{}",
format!(
"Warning: Skipping dbt node with unique_id: {} because it is missing 'resource_type' in catalog.json.",
n.unique_id
).yellow()
);
false
}
}
}) {
let Some(ref original_file_path_str) = node.original_file_path else {
eprintln!(
"{}",
format!(
"Warning: Skipping dbt model unique_id: {} because it is missing 'original_file_path' in catalog.json.",
node.unique_id
).yellow()
);
continue;
};
// Ensure metadata and metadata.name exist, as it's crucial for the semantic model name
let Some(ref node_metadata) = node.metadata else {
eprintln!(
"{}",
format!(
"Warning: Skipping dbt model with unique_id: {} because its 'metadata' block is missing in catalog.json.",
node.unique_id
).yellow()
);
continue;
};
let Some(ref actual_model_name_from_metadata) = node_metadata.name else {
eprintln!(
"{}",
format!(
"Warning: Skipping dbt model with unique_id: {} because its 'metadata.name' is missing in catalog.json.",
node.unique_id
).yellow()
);
continue;
};
let actual_model_name = actual_model_name_from_metadata.clone();
let original_file_path_abs = buster_config_dir.join(original_file_path_str);
let in_scope = if configured_model_path_patterns.is_empty() {
true // If no patterns, assume all models are in scope
} else {
configured_model_path_patterns
.iter()
.any(|pattern| pattern.matches_path(&original_file_path_abs))
};
if !in_scope {
// Only log if verbose or similar, this can be noisy
// println!("Skipping dbt model (not in configured model_paths): {}", node.unique_id.dimmed());
continue;
}
println!("Processing dbt model for semantic layer: {}: {}", node.unique_id.cyan(), actual_model_name.cyan());
let mut dimensions: Vec<YamlDimension> = Vec::new();
let mut measures: Vec<YamlMeasure> = Vec::new();
for (_col_name, col) in &node.columns { // node.columns is HashMap, defaults to empty if missing
if is_measure_type(Some(col.column_type.as_str())) { // Assuming col.column_type is String here based on linter
measures.push(YamlMeasure {
name: col.name.clone(),
description: col.comment.clone(),
type_: Some(col.column_type.clone()), // Wrap in Some()
});
} else {
dimensions.push(YamlDimension {
name: col.name.clone(),
description: col.comment.clone(),
type_: Some(col.column_type.clone()), // Wrap in Some()
searchable: false,
options: None,
});
}
}
let yaml_model = YamlModel {
name: actual_model_name.clone(),
description: node_metadata.comment.clone(), // Access comment via node_metadata ref
data_source_name: default_data_source_name.cloned(),
database: node.database.clone().or_else(|| default_database.cloned()), // node.database is Option<String>
schema: node.schema.clone().or_else(|| default_schema.cloned()), // node.schema is Option<String>
dimensions,
measures,
original_file_path: Some(original_file_path_str.clone()),
};
// Determine the output path for this individual YAML model
let dbt_model_path = Path::new(original_file_path_str);
let mut stripped_model_path_suffix = PathBuf::new(); // e.g. "marts/sales/revenue.sql" if original is "models/marts/sales/revenue.sql"
let mut found_base_for_stripping = false;
for dbt_source_root in &dbt_model_source_roots { // dbt_source_root is like "models"
if let Ok(stripped_path) = dbt_model_path.strip_prefix(dbt_source_root) {
stripped_model_path_suffix = stripped_path.to_path_buf();
found_base_for_stripping = true;
break;
}
}
if !found_base_for_stripping {
// Fallback: if original_file_path_str didn't start with any known dbt_model_source_roots,
// (e.g. original_file_path_str is "marts/revenue.sql" and source_root is "models")
// then use original_file_path_str as is for the suffix part.
// This can happen if dbt_model_source_roots are not exhaustive or path is weird.
// The resulting YAML structure will still be relative to path_construction_base_dir.
stripped_model_path_suffix = dbt_model_path.to_path_buf();
eprintln!("{}", format!(
"Warning: Could not strip a known dbt model source root ('{:?}') from dbt model path '{}'. Using full path for suffix: '{}'",
dbt_model_source_roots, original_file_path_str, stripped_model_path_suffix.display()
).yellow()
);
}
let output_yaml_path: PathBuf;
if is_side_by_side_generation {
// For side-by-side, output is next to the SQL file.
// original_file_path_str is relative to buster_config_dir (e.g., "models/marts/sales/revenue.sql")
// buster_config_dir is the dbt project root.
output_yaml_path = buster_config_dir.join(original_file_path_str).with_extension("yml");
} else {
// For dedicated output directory:
// path_construction_base_dir is the dedicated dir (e.g., "/path/to/project/buster_yamls")
// stripped_model_path_suffix is the path part after dbt source root (e.g., "marts/sales/revenue.sql")
let yaml_filename_with_subdir = stripped_model_path_suffix.with_extension("yml"); // e.g., "marts/sales/revenue.yml"
output_yaml_path = path_construction_base_dir.join(yaml_filename_with_subdir);
}
if let Some(parent_dir) = output_yaml_path.parent() {
fs::create_dir_all(parent_dir).map_err(|e| {
anyhow!("Failed to create directory for semantic model YAML '{}': {}", parent_dir.display(), e)
})?;
}
let yaml_string = serde_yaml::to_string(&yaml_model)
.map_err(|e| anyhow!("Failed to serialize semantic model '{}' to YAML: {}", yaml_model.name, e))?;
fs::write(&output_yaml_path, yaml_string)
.map_err(|e| anyhow!("Failed to write semantic model YAML for '{}' to '{}': {}", yaml_model.name, output_yaml_path.display(), e))?;
println!(
"{} Generated semantic model: {}",
"".green(),
output_yaml_path.display().to_string().cyan()
for sql_file_abs_path in sql_files_to_process {
let model_name_from_filename = sql_file_abs_path.file_stem().map_or_else(
|| "".to_string(),
|stem| stem.to_string_lossy().into_owned()
);
yaml_models_generated_count += 1;
if model_name_from_filename.is_empty() {
eprintln!("{}", format!("Warning: Could not determine model name from file: {}. Skipping.", sql_file_abs_path.display()).yellow());
continue;
}
match catalog_nodes_by_name.get(&model_name_from_filename) {
Some(catalog_node) => {
let Some(ref node_metadata_opt) = catalog_node.metadata else {
eprintln!("{}", format!("Warning: Skipping model '{}' (from file {}): Missing metadata in catalog entry.", model_name_from_filename, sql_file_abs_path.display()).yellow());
continue;
};
let node_metadata = node_metadata_opt; // Shadow to non-Option for easier access, already checked Some
// actual_model_name for YamlModel comes from catalog metadata.name
let actual_semantic_model_name = node_metadata.name.clone();
println!("Processing SQL model: {} (Catalog Name: {}, UniqueID: {})",
sql_file_abs_path.display().to_string().cyan(),
actual_semantic_model_name.purple(),
catalog_node.unique_id.as_deref().unwrap_or("N/A").dimmed()
);
let mut dimensions: Vec<YamlDimension> = Vec::new();
let mut measures: Vec<YamlMeasure> = Vec::new();
for (_col_name, col_meta) in &catalog_node.columns { // col_meta is &ColumnMetadata
if is_measure_type(&col_meta.type_) { // Pass &String, is_measure_type takes &str
measures.push(YamlMeasure {
name: col_meta.name.clone(),
description: col_meta.comment.clone(),
type_: Some(col_meta.type_.clone()),
});
} else {
dimensions.push(YamlDimension {
name: col_meta.name.clone(),
description: col_meta.comment.clone(),
type_: Some(col_meta.type_.clone()),
searchable: false,
options: None,
});
}
}
let relative_sql_file_path_str = pathdiff::diff_paths(&sql_file_abs_path, buster_config_dir)
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_else(|| sql_file_abs_path.to_string_lossy().into_owned());
let yaml_model = YamlModel {
name: actual_semantic_model_name, // Use name from catalog metadata
description: node_metadata.comment.clone(),
data_source_name: default_data_source_name.cloned(),
database: node_metadata.database.clone().or_else(|| default_database.cloned()),
schema: Some(node_metadata.schema.clone()), // schema from TableMetadata is String, wrap in Some()
dimensions,
measures,
original_file_path: Some(relative_sql_file_path_str.clone()),
};
// Determine output path
let output_yaml_path: PathBuf;
if is_side_by_side_generation {
output_yaml_path = sql_file_abs_path.with_extension("yml");
} else if let Some(ref dedicated_dir) = primary_dedicated_output_dir {
// Need to reconstruct subpath relative to a dbt model root (e.g. "models/")
let dbt_model_source_roots_for_stripping = parse_dbt_project_file_content(buster_config_dir)?.as_ref()
.map(|c| c.model_paths.iter().map(PathBuf::from).collect::<Vec<PathBuf>>())
.unwrap_or_else(|| vec![PathBuf::from("models")]);
let mut stripped_suffix_for_yaml: Option<PathBuf> = None;
for dbt_root in &dbt_model_source_roots_for_stripping {
let abs_dbt_root = buster_config_dir.join(dbt_root);
if let Ok(stripped) = sql_file_abs_path.strip_prefix(&abs_dbt_root) {
stripped_suffix_for_yaml = Some(stripped.with_extension("yml"));
break;
}
}
let final_suffix = stripped_suffix_for_yaml.unwrap_or_else(||
PathBuf::from(&model_name_from_filename).with_extension("yml")
);
output_yaml_path = dedicated_dir.join(final_suffix);
} else { // Should not be reached due to earlier checks, but for safety:
output_yaml_path = sql_file_abs_path.with_extension("yml");
}
if let Some(parent_dir) = output_yaml_path.parent() {
fs::create_dir_all(parent_dir).map_err(|e| anyhow!("Failed to create dir '{}': {}", parent_dir.display(), e))?;
}
let yaml_string = serde_yaml::to_string(&yaml_model)?;
fs::write(&output_yaml_path, yaml_string)?;
println!("{} Generated semantic model: {}", "".green(), output_yaml_path.display().to_string().cyan());
yaml_models_generated_count += 1;
}
None => {
eprintln!("{}", format!("Warning: SQL model file '{}' (model name: '{}') found, but no corresponding entry in dbt catalog. Skipping.", sql_file_abs_path.display(), model_name_from_filename).yellow());
}
}
}
if yaml_models_generated_count == 0 {
println!(
"{}",
"No dbt models found matching configured paths in catalog.json, or no models in catalog. No semantic model YAML files generated."
.yellow()
);
println!("{}", "No semantic model YAML files were generated.".yellow());
} else {
println!(
"{}",
format!("Successfully generated {} semantic model YAML file(s).", yaml_models_generated_count).bold().green()
);
println!("{}", format!("Successfully generated {} semantic model YAML file(s).", yaml_models_generated_count).bold().green());
}
Ok(())

View File

@ -1,9 +1,10 @@
pub mod auth;
pub mod deploy;
pub mod init;
pub mod update;
pub mod generate;
pub mod init;
pub mod parse;
pub mod run;
pub mod update;
pub use auth::auth_with_args;
pub use init::init;

View File

@ -0,0 +1,289 @@
use anyhow::{Result, anyhow};
use std::path::{Path, PathBuf};
use colored::*;
use crate::utils::{
config::{BusterConfig, ProjectContext},
find_yml_files, ExclusionManager,
// Assuming ProgressTracker might be useful for logging, similar to deploy
// If not, it can be removed.
ProgressTracker,
};
use crate::commands::deploy::deploy::{parse_model_file, resolve_model_configurations};
use semantic_layer::models::Model;
// A simple progress tracker for the parse command
#[derive(Debug, Default)]
struct ParseProgress {
total_files: usize,
processed_files: usize,
excluded_files: usize,
current_file: String,
errors: Vec<(String, String, Vec<String>)>, // file, model_name, errors
successes: Vec<(String, String)>, // file, model_name
}
impl ParseProgress {
fn new() -> Self {
Default::default()
}
fn log_status(&self) {
if !self.current_file.is_empty() {
println!("Processing [{} / {}]: {}", self.processed_files, self.total_files, self.current_file);
}
}
fn log_summary(&self) {
println!("\n--- Parse Summary ---");
println!("Total files processed: {}", self.processed_files);
println!("Total files excluded: {}", self.excluded_files);
println!("Models successfully parsed & validated: {}", self.successes.len());
println!("Models with errors: {}", self.errors.len());
if !self.successes.is_empty() {
println!("\nSuccessfully parsed models:");
for (file, model_name) in &self.successes {
println!(" - {} (in file: {})", model_name.green(), file.dimmed());
}
}
if !self.errors.is_empty() {
println!("\nModels with errors:");
for (file, model_name, errors) in &self.errors {
println!(" - {} (in file: {}):", model_name.red(), file.dimmed());
for error in errors {
println!(" - {}", error);
}
}
}
println!("---------------------");
}
}
impl ProgressTracker for ParseProgress {
fn log_excluded_file(&mut self, path: &str, pattern: &str) {
self.excluded_files += 1;
println!("Excluding file: {} (matched pattern: {})", path, pattern);
}
fn log_excluded_tag(&mut self, path: &str, tag: &str) {
self.excluded_files += 1;
println!(
"Excluding file: {} (matched excluded tag: {})",
path,
tag
);
}
}
pub async fn parse_models_command(path_arg: Option<String>) -> Result<()> {
let current_dir = std::env::current_dir()?;
let buster_config_load_dir = path_arg.as_ref().map(PathBuf::from).unwrap_or_else(|| current_dir.clone());
let mut progress = ParseProgress::new();
println!("Looking for buster.yml configuration...");
let buster_config = match BusterConfig::load_from_dir(&buster_config_load_dir) {
Ok(Some(cfg)) => {
println!("Found buster.yml configuration at {}", buster_config_load_dir.join("buster.yml").display());
Some(cfg)
}
Ok(None) => {
println!("No buster.yml found in {}, will parse files directly or use defaults.", buster_config_load_dir.display());
None
}
Err(e) => {
println!("Warning: Error reading buster.yml: {}. Proceeding without it.", e);
None
}
};
let effective_buster_config_dir = BusterConfig::base_dir(&buster_config_load_dir.join("buster.yml")).unwrap_or(buster_config_load_dir.clone());
let exclusion_manager = if let Some(cfg) = &buster_config {
ExclusionManager::new(cfg)?
} else {
ExclusionManager::empty()
};
// Determine search paths
let mut files_to_parse_with_context: Vec<(PathBuf, Option<&ProjectContext>)> = Vec::new();
if let Some(p_str) = &path_arg {
// If a specific path is given, use it directly.
// It could be a file or a directory.
let specific_path = effective_buster_config_dir.join(p_str);
println!("Processing specified path: {}", specific_path.display());
if specific_path.is_dir() {
match find_yml_files(&specific_path, true, &exclusion_manager, Some(&mut progress)) { // Assuming recursive
Ok(files_in_dir) => {
for f in files_in_dir {
// For direct path, we don't have a specific project context from buster.yml easily
files_to_parse_with_context.push((f, None));
}
},
Err(e) => eprintln!("Error finding YML files in {}: {}", specific_path.display(), format!("{}", e).red()),
}
} else if specific_path.is_file() && specific_path.extension().and_then(|ext| ext.to_str()) == Some("yml") {
if specific_path.file_name().and_then(|name| name.to_str()) != Some("buster.yml") {
files_to_parse_with_context.push((specific_path, None));
}
} else if !specific_path.exists() {
return Err(anyhow!("Specified path does not exist: {}", specific_path.display()));
} else {
return Err(anyhow!("Specified path is not a valid .yml file or directory: {}", specific_path.display()));
}
} else {
// No specific path, use buster_config (if available) or current directory
if let Some(cfg) = &buster_config {
let effective_paths_with_contexts = cfg.resolve_effective_model_paths(&effective_buster_config_dir);
if !effective_paths_with_contexts.is_empty() {
println!("Using effective model paths from buster.yml:");
for (path, project_ctx_opt) in effective_paths_with_contexts {
let context_identifier = project_ctx_opt.map_or_else(|| "Global/Default".to_string(), |ctx| ctx.identifier());
println!(" - Path: {}, Context: {}", path.display(), context_identifier.dimmed());
if path.is_dir() {
match find_yml_files(&path, true, &exclusion_manager, Some(&mut progress)) { // Assuming recursive
Ok(files_in_dir) => {
for f in files_in_dir {
files_to_parse_with_context.push((f, project_ctx_opt));
}
},
Err(e) => eprintln!("Error finding YML files in {}: {}", path.display(), format!("{}", e).red()),
}
} else if path.is_file() && path.extension().and_then(|ext| ext.to_str()) == Some("yml") {
if path.file_name().and_then(|name| name.to_str()) != Some("buster.yml") {
files_to_parse_with_context.push((path.clone(), project_ctx_opt));
}
}
}
} else {
println!("No model_paths specified in buster.yml, scanning current directory: {}", effective_buster_config_dir.display());
match find_yml_files(&effective_buster_config_dir, true, &exclusion_manager, Some(&mut progress)) {
Ok(files_in_dir) => {
for f in files_in_dir {
files_to_parse_with_context.push((f, None)); // No specific project context for CWD scan unless we enhance this
}
},
Err(e) => eprintln!("Error finding YML files in {}: {}", effective_buster_config_dir.display(), format!("{}", e).red()),
}
}
} else {
// No buster.yml and no path_arg, scan current directory.
println!("No buster.yml found and no specific path provided. Scanning current directory: {}", effective_buster_config_dir.display());
match find_yml_files(&effective_buster_config_dir, true, &exclusion_manager, Some(&mut progress)) {
Ok(files_in_dir) => {
for f in files_in_dir {
files_to_parse_with_context.push((f, None));
}
},
Err(e) => eprintln!("Error finding YML files in {}: {}", effective_buster_config_dir.display(), format!("{}", e).red()),
}
}
}
progress.total_files = files_to_parse_with_context.len();
println!("Found {} model .yml file(s) to parse.", progress.total_files);
if files_to_parse_with_context.is_empty() {
println!("No model files found to parse.");
progress.log_summary();
return Ok(());
}
let default_cfg_storage;
let global_config_for_resolution = match buster_config.as_ref() {
Some(cfg) => cfg,
None => {
default_cfg_storage = BusterConfig::default();
&default_cfg_storage
}
};
for (yml_path, project_ctx_opt) in files_to_parse_with_context {
progress.processed_files += 1;
progress.current_file = yml_path.strip_prefix(&effective_buster_config_dir).unwrap_or(&yml_path).to_string_lossy().into_owned();
progress.log_status();
let parsed_models_result = parse_model_file(&yml_path);
match parsed_models_result {
Ok(parsed_models) => {
let models_with_context: Vec<(Model, Option<&ProjectContext>)> = parsed_models.into_iter()
.map(|m| (m, project_ctx_opt))
.collect();
match resolve_model_configurations(models_with_context, global_config_for_resolution) {
Ok(resolved_models) => {
if resolved_models.is_empty() && !yml_path.to_string_lossy().contains("empty_test") { // Guard against empty files unless for specific tests
println!("Warning: No models found in file: {}", yml_path.display());
// Potentially add to errors if this is unexpected
}
for model in resolved_models {
// Basic validation: model name should not be empty
if model.name.is_empty() {
progress.errors.push((
progress.current_file.clone(),
"<Unnamed Model>".to_string(),
vec!["Model name is empty.".to_string()],
));
continue;
}
// Further validation could be added here, e.g., checking for data_source_name and schema after resolution
if model.data_source_name.is_none() {
progress.errors.push((
progress.current_file.clone(),
model.name.clone(),
vec!["data_source_name could not be resolved.".to_string()],
));
}
if model.schema.is_none() {
progress.errors.push((
progress.current_file.clone(),
model.name.clone(),
vec!["schema could not be resolved.".to_string()],
));
}
// If previous checks created errors for this model, don't mark as success.
// Check if current_file and model.name combination is already in errors.
let is_error = progress.errors.iter().any(|(f, m, _)| f == &progress.current_file && m == &model.name);
if !is_error {
println!(" Successfully parsed and resolved model: {}", model.name.green());
progress.successes.push((progress.current_file.clone(), model.name.clone()));
}
}
}
Err(e) => {
println!(" Error resolving configurations for models in {}: {}", yml_path.display(), e.to_string().red());
// Attempt to identify model names if possible, otherwise use file name
// This part is tricky as parsing might have succeeded but resolution failed for all.
// For now, associating error with the file.
progress.errors.push((
progress.current_file.clone(),
format!("File-level resolution error"),
vec![e.to_string()]
));
}
}
}
Err(e) => {
println!(" Error parsing model file {}: {}", yml_path.display(), e.to_string().red());
progress.errors.push((
progress.current_file.clone(),
"<Parse Error>".to_string(),
vec![e.to_string()],
));
}
}
}
progress.log_summary();
if !progress.errors.is_empty() {
return Err(anyhow!("Found errors during parsing and validation. Please check the output above."));
}
Ok(())
}

View File

@ -73,6 +73,13 @@ pub enum Commands {
// output-file as a more descriptive name for the arg
target_semantic_file: Option<String>,
},
/// Parse and validate semantic model YAML definitions
Parse {
/// Optional path to a specific model .yml file or a directory of models to process.
/// If not provided, processes models based on 'model_paths' in buster.yml or CWD.
#[arg(long)]
path: Option<String>,
},
Start,
Stop,
}
@ -130,6 +137,7 @@ async fn main() {
path,
target_semantic_file,
} => commands::generate::generate_semantic_models_command(path, target_semantic_file).await,
Commands::Parse { path } => commands::parse::parse_models_command(path).await,
Commands::Start => run::start().await.map_err(anyhow::Error::from),
Commands::Stop => run::stop().await.map_err(anyhow::Error::from),
};

View File

@ -7,7 +7,7 @@ use std::process::Command as StdCommand;
use std::time::Duration;
pub mod models;
use models::DbtCatalog;
use models::{CatalogNode, DbtCatalog};
/// Runs the `dbt docs generate` command for the specified dbt project path.
pub async fn run_dbt_docs_generate(dbt_project_path: &Path) -> Result<()> {
@ -63,6 +63,7 @@ pub async fn run_dbt_docs_generate(dbt_project_path: &Path) -> Result<()> {
}
/// Loads and parses the dbt `catalog.json` file from the given path.
/// Also performs post-processing to derive convenience fields on CatalogNode.
pub fn load_and_parse_catalog(catalog_json_path: &Path) -> Result<DbtCatalog> {
println!(
"{}",
@ -82,30 +83,50 @@ pub fn load_and_parse_catalog(catalog_json_path: &Path) -> Result<DbtCatalog> {
// Log the detailed serde error
eprintln!("Detailed parsing error for {}: {:#?}", catalog_json_path.display(), e);
anyhow!(
"Failed to parse catalog.json from {}. Error: {}. Ensure the file content is valid and matches the expected dbt catalog structure.",
"Failed to parse catalog.json from {}. Error: {}. Ensure it matches expected dbt catalog structure (v1 tested).",
catalog_json_path.display(),
e // e.to_string() will give a concise error message from serde
)
})?;
// Post-process nodes to derive resource_type if missing
for node in catalog.nodes.values_mut() {
if node.resource_type.is_none() {
let parts: Vec<&str> = node.unique_id.splitn(2, '.').collect();
// Post-process nodes to derive resource_type and a display name
for (_node_key, node) in catalog.nodes.iter_mut() { // node_key is the unique_id string from the JSON map key
if let Some(ref unique_id_val) = node.unique_id { // unique_id is Option<String>
let parts: Vec<&str> = unique_id_val.splitn(2, '.').collect();
if !parts.is_empty() {
let potential_type = parts[0];
if ["model", "source", "seed", "snapshot", "test"].contains(&potential_type) {
node.resource_type = Some(potential_type.to_string());
node.derived_resource_type = Some(potential_type.to_string());
}
}
}
if node.name.is_none() {
// Try to derive node.name from the last part of unique_id
// e.g., model.my_package.my_model_name -> my_model_name
if let Some(last_part) = node.unique_id.split('.').last() {
// Derive a node name (often the last part of unique_id, or from metadata.name)
if let Some(metadata) = &node.metadata {
// metadata.name is String, so it should exist if metadata block exists.
node.derived_model_name_from_file = Some(metadata.name.clone());
} else if let Some(last_part) = unique_id_val.split('.').last() {
if !last_part.is_empty() {
node.name = Some(last_part.to_string());
// Fallback to last part of unique_id if metadata.name is somehow not accessible
node.derived_model_name_from_file = Some(last_part.to_string());
}
}
} else {
// If unique_id itself is None, we can't do much derivation from it.
// We could try to use the node_key (which *is* the unique_id from the JSON structure)
// but node.unique_id inside the struct being None is strange for a valid catalog.
}
}
// Similar post-processing for sources if needed
for (_source_key, source_node) in catalog.sources.iter_mut() {
if let Some(ref unique_id_val) = source_node.unique_id {
let parts: Vec<&str> = unique_id_val.splitn(2, '.').collect();
if !parts.is_empty() && parts[0] == "source" {
source_node.derived_resource_type = Some("source".to_string());
}
if let Some(metadata) = &source_node.metadata {
source_node.derived_model_name_from_file = Some(metadata.name.clone());
} else if let Some(last_part) = unique_id_val.split('.').last() {
if !last_part.is_empty() {
source_node.derived_model_name_from_file = Some(last_part.to_string());
}
}
}

View File

@ -1,16 +1,21 @@
use serde::Deserialize;
use std::collections::HashMap;
// Struct definitions for parsing dbt's catalog.json.
// Struct definitions for parsing dbt's catalog.json (v1 schema)
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize, Clone, Default)]
pub struct DbtCatalog {
// metadata is required by schema, but use Option + default for robustness if block is missing
#[serde(default)]
pub metadata: Option<DbtCatalogMetadata>,
pub metadata: Option<CatalogMetadata>,
#[serde(default)] // nodes map is required, default handles if key is missing (empty map)
pub nodes: HashMap<String, CatalogNode>,
#[serde(default)] // sources map is required
pub sources: HashMap<String, CatalogSource>,
#[serde(default)]
pub nodes: HashMap<String, DbtNode>,
#[serde(default)]
pub sources: Option<HashMap<String, DbtSource>>,
pub errors: Option<Vec<String>>, // errors: string[] | null
// --- Fields kept for potential compatibility or future use, not strictly in v1 catalog properties like nodes/sources/metadata ---
#[serde(default, skip_serializing_if = "Option::is_none")]
pub macros: Option<HashMap<String, serde_json::Value>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
@ -20,109 +25,87 @@ pub struct DbtCatalog {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub selectors: Option<HashMap<String, serde_json::Value>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub disabled: Option<HashMap<String, Vec<serde_json::Value>>>, // dbt-core uses Vec here
pub disabled: Option<HashMap<String, Vec<serde_json::Value>>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_map: Option<HashMap<String, Vec<String>>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub child_map: Option<HashMap<String, Vec<String>>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub errors: Option<serde_json::Value>, // Can be null or an object with error details
}
#[derive(Debug, Deserialize, Clone)]
pub struct DbtCatalogMetadata {
#[serde(rename = "dbt_schema_version", default)]
pub dbt_schema_version: Option<String>,
#[allow(dead_code)] // If not used directly by Buster, but good for complete parsing
pub dbt_version: Option<String>,
#[allow(dead_code)]
pub generated_at: Option<String>,
#[allow(dead_code)]
pub invocation_id: Option<String>,
#[derive(Debug, Deserialize, Clone, Default)]
pub struct CatalogMetadata { // Was DbtCatalogMetadata; matches schema.metadata
#[serde(default)] // Though schema implies required, default if block is imperfect
pub dbt_schema_version: String,
#[serde(default)]
pub dbt_version: Option<String>, // Defaulted in schema "1.10.0a1" but can be other string
#[serde(default)]
pub generated_at: Option<String>, // Is string in schema, but make Option for safety
#[serde(default)]
pub invocation_id: Option<String>, // string | null
#[serde(default)]
pub invocation_started_at: Option<String>, // string | null (from schema)
#[serde(default)]
pub env: HashMap<String, String>, // From schema
}
#[derive(Debug, Deserialize, Clone)]
pub struct DbtNode {
// Ensure metadata is present, matches example which has it implicitly via direct fields
// For the example catalog's node structure, we might need to flatten some metadata fields
// or expect them directly if `metadata` as a block is not always there.
// However, standard dbt catalog.json *does* have a metadata block within each node.
// The example provided might be a slight simplification or custom representation.
// Assuming standard catalog structure for now, where DbtNodeMetadata is a separate struct.
// Represents a "CatalogTable" from the dbt schema (for nodes and sources)
#[derive(Debug, Deserialize, Clone, Default)]
pub struct CatalogNode { // Was DbtNode; represents schema.nodes.<node_name>
// metadata, columns, stats are required per schema for a CatalogTable
// Using Option + default for robustness if a catalog is malformed,
// but downstream code will need to handle None for these.
#[serde(default)]
pub metadata: Option<DbtNodeMetadata>,
pub metadata: Option<TableMetadata>,
#[serde(default)]
pub columns: HashMap<String, DbtColumn>,
#[serde(rename = "resource_type")] // if resource_type is not directly in JSON, this helps map if some other key exists
// if type is the key in JSON for resource_type, then it should be:
// #[serde(alias = "type")] // or handle it in DbtNodeMetadata if type is part of metadata
#[serde(default)] // Make it optional and handle missing field
pub resource_type: Option<String>, // This refers to model, seed, snapshot, test etc.
pub unique_id: String,
#[serde(default)] // original_file_path might not be present for all node types
pub original_file_path: Option<String>,
pub database: Option<String>,
pub schema: Option<String>,
#[serde(default)] // Make name optional
pub name: Option<String>, // This is often the filename or alias. metadata.name is relation name.
pub comment: Option<String>, // Comment can be directly on the node for some versions/types
pub stats: Option<serde_json::Value>, // To capture general stats blocks
pub columns: HashMap<String, ColumnMetadata>,
#[serde(default)]
pub stats: HashMap<String, StatsItem>,
#[serde(default)]
pub unique_id: Option<String>, // string | null
// --- Fields to be populated by post-processing in lib.rs ---
// These are not directly from catalog.json node structure
#[serde(skip_deserializing, skip_serializing_if = "Option::is_none")]
pub derived_resource_type: Option<String>,
#[serde(skip_deserializing, skip_serializing_if = "Option::is_none")]
pub derived_model_name_from_file: Option<String>, // Name derived from SQL filename
}
#[derive(Debug, Deserialize, Clone)]
pub struct DbtNodeMetadata {
// Standard dbt catalog.json has `name` here as the relation name.
#[serde(default)] // Make name optional
pub name: Option<String>,
#[serde(rename = "type")] // This 'type' inside metadata usually refers to the materialization (table, view, etc.) for models
pub relation_type: Option<String>,
pub schema: Option<String>, // schema can also be here
pub database: Option<String>, // database can also be here
pub comment: Option<String>, // comment for the model/node itself
#[allow(dead_code)]
pub owner: Option<String>,
// Add other potential metadata fields if necessary, e.g., tags, config, etc.
#[serde(default)]
pub tags: Vec<String>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct DbtSource {
#[serde(default)]
pub name: Option<String>, // This is the source's table name
pub unique_id: String,
#[serde(default)]
pub database: Option<String>,
#[serde(default)]
pub schema: Option<String>,
#[serde(default, alias = "resource_type")] // Sources have "source" as resource_type, or a specific table type.
pub table_type: Option<String>, // e.g. "table", often not explicitly a 'type' field in catalog for sources, but implied.
#[serde(default)]
pub columns: HashMap<String, DbtColumn>,
#[serde(default)]
pub comment: Option<String>,
pub stats: Option<serde_json::Value>,
// Sources can also have a 'meta' field, 'tags', 'description', 'loader', 'freshness' etc.
#[serde(default)]
pub description: Option<String>, // description is preferred over comment for sources usually
#[serde(default)]
pub meta: Option<HashMap<String, serde_json::Value>>,
#[serde(default)]
pub tags: Vec<String>,
}
// Using CatalogNode for sources as well, as their structure is CatalogTable
pub type CatalogSource = CatalogNode;
#[derive(Debug, Deserialize, Clone)]
pub struct DbtColumn {
#[derive(Debug, Deserialize, Clone, Default)]
pub struct TableMetadata { // Was DbtNodeMetadata; matches schema.nodes.<node_name>.metadata
#[serde(rename = "type")]
pub column_type: String,
pub index: Option<u32>, // Index might not always be present
pub name: String,
pub comment: Option<String>,
pub type_: String, // Required: database object type (e.g. "TABLE", "VIEW")
pub schema: String, // Required
pub name: String, // Required: relation name in the database
#[serde(default)]
pub description: Option<String>, // Columns can also have descriptions
pub database: Option<String>, // string | null
#[serde(default)]
pub meta: Option<HashMap<String, serde_json::Value>>,
pub comment: Option<String>, // string | null
#[serde(default)]
pub tags: Vec<String>,
pub owner: Option<String>, // string | null
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct ColumnMetadata { // Was DbtColumn; matches schema.nodes.<node_name>.columns.<col_name>
#[serde(rename = "type")]
pub type_: String, // Required: database column type
pub index: u32, // Required
pub name: String, // Required
#[serde(default)]
pub comment: Option<String>, // string | null
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct StatsItem { // matches schema.nodes.<node_name>.stats.<stat_name>
pub id: String, // Required
pub label: String, // Required
#[serde(default)]
pub value: serde_json::Value, // anyOf: boolean, string, number, null. Required.
pub include: bool, // Required
#[serde(default)]
pub description: Option<String>, // string | null
}