mirror of https://github.com/buster-so/buster.git
commit
904665883e
|
@ -176,136 +176,23 @@ for (unique_id, node) in &dbt_catalog.nodes {
|
|||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
// --- 2. Determine SQL Files to Process (based on path_arg or buster.yml model_paths) ---
|
||||
println!("\n{}", "⚙️ Determining SQL files to process...".dimmed());
|
||||
let mut sql_files_to_process: HashSet<PathBuf> = HashSet::new();
|
||||
let dbt_project_model_roots_for_stripping = crate::commands::init::parse_dbt_project_file_content(&buster_config_dir)?.as_ref()
|
||||
.map(|c| c.model_paths.iter().map(PathBuf::from).collect::<Vec<PathBuf>>())
|
||||
.unwrap_or_else(|| vec![PathBuf::from("models")]);
|
||||
|
||||
if let Some(pa_str) = &path_arg {
|
||||
let target_path = buster_config_dir.join(pa_str);
|
||||
if target_path.is_file() && target_path.extension().map_or(false, |ext| ext == "sql") {
|
||||
sql_files_to_process.insert(target_path);
|
||||
} else if target_path.is_dir() {
|
||||
let glob_pattern = target_path.join("**/*.sql");
|
||||
match glob(&glob_pattern.to_string_lossy()) {
|
||||
Ok(paths) => paths.for_each(|entry| if let Ok(path) = entry { if path.is_file() { sql_files_to_process.insert(path); } }),
|
||||
Err(e) => eprintln!("{}", format!("Error globbing '{}': {}", glob_pattern.display(), e).yellow()),
|
||||
}
|
||||
} else {
|
||||
eprintln!("{}", format!("⚠️ Warning: path_arg '{}' is not a valid SQL file or directory. Processing all configured models.", pa_str).yellow());
|
||||
// Fall through to buster.yml model_paths if path_arg is invalid
|
||||
}
|
||||
}
|
||||
// --- 2. Process Each Project Separately ---
|
||||
println!("\n{}", "⚙️ Processing projects...".dimmed());
|
||||
|
||||
if sql_files_to_process.is_empty() { // If path_arg didn't yield files, or wasn't provided, use buster.yml config
|
||||
let mut processed_via_buster_yml_paths = false;
|
||||
if let Some(projects) = &buster_config.projects {
|
||||
if let Some(first_project) = projects.first() {
|
||||
if let Some(config_model_paths) = &first_project.model_paths { // Vec<String>
|
||||
if !config_model_paths.is_empty() { // Check if there are paths to process
|
||||
println!("{}", format!("ℹ️ No SQL files from path_arg. Scanning based on buster.yml model_paths: {:?}", config_model_paths).dimmed());
|
||||
for path_entry_from_config in config_model_paths {
|
||||
if path_entry_from_config.trim().is_empty() {
|
||||
continue; // Skip empty path strings
|
||||
}
|
||||
let final_glob_pattern_str: String;
|
||||
let path_is_absolute = Path::new(path_entry_from_config).is_absolute();
|
||||
let mut total_models_generated_count = 0;
|
||||
let mut total_models_updated_count = 0;
|
||||
let mut total_columns_added_count = 0;
|
||||
let mut total_columns_updated_count = 0;
|
||||
let mut total_columns_removed_count = 0;
|
||||
let mut total_sql_models_successfully_processed_from_catalog_count = 0;
|
||||
|
||||
let base_path_for_glob = if path_is_absolute {
|
||||
PathBuf::from(path_entry_from_config)
|
||||
} else {
|
||||
buster_config_dir.join(path_entry_from_config)
|
||||
};
|
||||
|
||||
if path_entry_from_config.contains('*') || path_entry_from_config.contains('?') || path_entry_from_config.contains('[') {
|
||||
final_glob_pattern_str = base_path_for_glob.to_string_lossy().into_owned();
|
||||
} else {
|
||||
final_glob_pattern_str = base_path_for_glob.join("**/*.sql").to_string_lossy().into_owned();
|
||||
}
|
||||
|
||||
match glob(&final_glob_pattern_str) {
|
||||
Ok(paths) => paths.for_each(|entry| {
|
||||
if let Ok(p) = entry {
|
||||
if p.is_file() && p.extension().map_or(false, |e| e == "sql") {
|
||||
sql_files_to_process.insert(p);
|
||||
}
|
||||
}
|
||||
}),
|
||||
Err(e) => eprintln!("{}", format!("Glob pattern error for buster.yml path '{}': {}", final_glob_pattern_str, e).yellow()),
|
||||
}
|
||||
}
|
||||
// If config_model_paths had at least one non-empty string, consider this path taken
|
||||
if config_model_paths.iter().any(|s| !s.trim().is_empty()) {
|
||||
processed_via_buster_yml_paths = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !processed_via_buster_yml_paths {
|
||||
// Fallback to dbt_project.yml defaults
|
||||
println!("{}", "ℹ️ No SQL files from path_arg, and no model_paths in buster.yml (or they were empty). Using dbt_project.yml model-paths as fallback.".dimmed());
|
||||
for dbt_root_rel in &dbt_project_model_roots_for_stripping {
|
||||
let glob_pattern = buster_config_dir.join(dbt_root_rel).join("**/*.sql");
|
||||
match glob(&glob_pattern.to_string_lossy()) {
|
||||
Ok(paths) => paths.for_each(|entry| if let Ok(p) = entry { if p.is_file() {sql_files_to_process.insert(p);}}),
|
||||
Err(e) => eprintln!("{}", format!("Error globbing default dbt path '{}': {}",glob_pattern.display(),e).yellow()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if sql_files_to_process.is_empty() {
|
||||
println!("{}", "ℹ️ No SQL model files found to process/update based on configuration.".yellow());
|
||||
return Ok(());
|
||||
}
|
||||
println!("{}", format!("✅ Found {} SQL model file(s) for potential generation/update.", sql_files_to_process.len()).dimmed());
|
||||
|
||||
// --- 3. Determine Output Base Directory for Semantic Models ---
|
||||
println!("\n{}", "⚙️ Determining output directory for semantic models...".dimmed());
|
||||
let semantic_models_base_dir_path_str = target_output_dir_arg.or_else(||
|
||||
buster_config.projects.as_ref()
|
||||
.and_then(|p| p.first())
|
||||
.and_then(|proj| proj.semantic_model_paths.as_ref())
|
||||
.and_then(|paths| paths.first().cloned())
|
||||
).unwrap_or_else(||
|
||||
// Default to side-by-side (empty string means relative to SQL file later)
|
||||
String::new()
|
||||
);
|
||||
|
||||
let (is_side_by_side_generation, semantic_output_base_abs_dir) = if semantic_models_base_dir_path_str.is_empty() {
|
||||
println!("{}", "ℹ️ Semantic models output set to side-by-side with SQL files.".dimmed());
|
||||
(true, buster_config_dir.clone()) // Base for side-by-side is project root
|
||||
// Get projects to process
|
||||
let projects_to_process = if let Some(projects) = &buster_config.projects {
|
||||
projects.clone()
|
||||
} else {
|
||||
let abs_path = if Path::new(&semantic_models_base_dir_path_str).is_absolute() {
|
||||
PathBuf::from(&semantic_models_base_dir_path_str)
|
||||
} else {
|
||||
buster_config_dir.join(&semantic_models_base_dir_path_str)
|
||||
};
|
||||
println!("{}", format!("➡️ Semantic models output base directory: {}", abs_path.display()).cyan());
|
||||
fs::create_dir_all(&abs_path).context(format!("Failed to create semantic models output dir: {}", abs_path.display()))?;
|
||||
(false, abs_path)
|
||||
println!("{}", "ℹ️ No projects found in buster.yml. Nothing to process.".yellow());
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// --- 4. Iterate SQL Files, Match to Catalog, Generate/Update YamlModels ---
|
||||
println!("\n{}", "✨ Processing SQL files and generating/updating YAML models...".dimmed());
|
||||
let mut models_generated_count = 0;
|
||||
let mut models_updated_count = 0;
|
||||
let mut columns_added_count = 0;
|
||||
let mut columns_updated_count = 0;
|
||||
let mut columns_removed_count = 0;
|
||||
let mut sql_models_successfully_processed_from_catalog_count = 0; // New counter
|
||||
|
||||
// Get project defaults for comparison
|
||||
let proj_default_ds_name = buster_config.projects.as_ref()
|
||||
.and_then(|p| p.first()).and_then(|pc| pc.data_source_name.as_deref());
|
||||
let proj_default_database = buster_config.projects.as_ref()
|
||||
.and_then(|p| p.first()).and_then(|pc| pc.database.as_deref());
|
||||
let proj_default_schema = buster_config.projects.as_ref()
|
||||
.and_then(|p| p.first()).and_then(|pc| pc.schema.as_deref());
|
||||
|
||||
// Helper function to find the best matching catalog node
|
||||
fn find_matching_catalog_node<'a>(
|
||||
|
@ -363,217 +250,333 @@ for (unique_id, node) in &dbt_catalog.nodes {
|
|||
)
|
||||
}
|
||||
|
||||
for sql_file_abs_path in sql_files_to_process {
|
||||
let model_name_from_filename = sql_file_abs_path.file_stem().map_or_else(String::new, |s| s.to_string_lossy().into_owned());
|
||||
if model_name_from_filename.is_empty() {
|
||||
eprintln!("{}", format!("⚠️ Warning: Could not get model name from file {}. Skipping.", sql_file_abs_path.display()).yellow());
|
||||
continue;
|
||||
}
|
||||
// Process each project
|
||||
for (project_index, project_config) in projects_to_process.iter().enumerate() {
|
||||
let project_name = project_config.name.as_deref().unwrap_or("unnamed_project");
|
||||
println!("\n{}", format!("🔄 Processing project: {}", project_name).bold().cyan());
|
||||
|
||||
// Get dbt project model roots for path stripping
|
||||
let dbt_project_model_roots_for_stripping = crate::commands::init::parse_dbt_project_file_content(&buster_config_dir)?.as_ref()
|
||||
.map(|c| c.model_paths.iter().map(PathBuf::from).collect::<Vec<PathBuf>>())
|
||||
.unwrap_or_else(|| vec![PathBuf::from("models")]);
|
||||
|
||||
// --- 2a. Determine SQL Files for this Project ---
|
||||
let mut sql_files_to_process: HashSet<PathBuf> = HashSet::new();
|
||||
|
||||
if let Some(pa_str) = &path_arg {
|
||||
// If path_arg is specified, only process files from that path (for any project that contains it)
|
||||
let target_path = buster_config_dir.join(pa_str);
|
||||
if target_path.is_file() && target_path.extension().map_or(false, |ext| ext == "sql") {
|
||||
// Check if this file is within any of this project's model_paths
|
||||
let should_include = if let Some(model_paths) = &project_config.model_paths {
|
||||
model_paths.iter().any(|model_path| {
|
||||
let abs_model_path = if Path::new(model_path).is_absolute() {
|
||||
PathBuf::from(model_path)
|
||||
} else {
|
||||
buster_config_dir.join(model_path)
|
||||
};
|
||||
target_path.starts_with(&abs_model_path)
|
||||
})
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if should_include {
|
||||
sql_files_to_process.insert(target_path);
|
||||
}
|
||||
} else if target_path.is_dir() {
|
||||
let glob_pattern = target_path.join("**/*.sql");
|
||||
match glob(&glob_pattern.to_string_lossy()) {
|
||||
Ok(paths) => {
|
||||
for entry in paths {
|
||||
if let Ok(path) = entry {
|
||||
if path.is_file() {
|
||||
// Check if this file is within any of this project's model_paths
|
||||
let should_include = if let Some(model_paths) = &project_config.model_paths {
|
||||
model_paths.iter().any(|model_path| {
|
||||
let abs_model_path = if Path::new(model_path).is_absolute() {
|
||||
PathBuf::from(model_path)
|
||||
} else {
|
||||
buster_config_dir.join(model_path)
|
||||
};
|
||||
path.starts_with(&abs_model_path)
|
||||
})
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if should_include {
|
||||
sql_files_to_process.insert(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => eprintln!("{}", format!("Error globbing '{}': {}", glob_pattern.display(), e).yellow()),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Use project's model_paths
|
||||
if let Some(config_model_paths) = &project_config.model_paths {
|
||||
if !config_model_paths.is_empty() {
|
||||
println!("{}", format!(" ℹ️ Scanning model_paths: {:?}", config_model_paths).dimmed());
|
||||
for path_entry_from_config in config_model_paths {
|
||||
if path_entry_from_config.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let base_path_for_glob = if Path::new(path_entry_from_config).is_absolute() {
|
||||
PathBuf::from(path_entry_from_config)
|
||||
} else {
|
||||
buster_config_dir.join(path_entry_from_config)
|
||||
};
|
||||
|
||||
// Extract path components from SQL file path
|
||||
let path_components = extract_sql_file_path_components(
|
||||
&sql_file_abs_path,
|
||||
&buster_config_dir,
|
||||
&dbt_project_model_roots_for_stripping
|
||||
);
|
||||
let final_glob_pattern_str = if path_entry_from_config.contains('*') ||
|
||||
path_entry_from_config.contains('?') ||
|
||||
path_entry_from_config.contains('[') {
|
||||
base_path_for_glob.to_string_lossy().into_owned()
|
||||
} else {
|
||||
base_path_for_glob.join("**/*.sql").to_string_lossy().into_owned()
|
||||
};
|
||||
|
||||
// Get the first project name from configs, defaulting to "default" if not found
|
||||
let project_name = buster_config.projects.as_ref()
|
||||
.and_then(|p| p.first())
|
||||
.and_then(|p| p.name.as_ref())
|
||||
.map_or(String::from("default"), |v| v.clone());
|
||||
|
||||
// Try to find the matching catalog node using a prioritized approach
|
||||
let catalog_node = match find_matching_catalog_node(
|
||||
&catalog_nodes_lookup,
|
||||
&project_name,
|
||||
&path_components,
|
||||
&model_name_from_filename
|
||||
) {
|
||||
Some((node, match_type, key)) => {
|
||||
node
|
||||
},
|
||||
None => {
|
||||
eprintln!("{}", format!("ℹ️ Info: SQL model file '{}' found, but no corresponding entry in dbt catalog. Skipping.\nTried looking up with components: {:?}",
|
||||
sql_file_abs_path.display(), path_components).dimmed());
|
||||
match glob(&final_glob_pattern_str) {
|
||||
Ok(paths) => {
|
||||
for entry in paths {
|
||||
if let Ok(p) = entry {
|
||||
if p.is_file() && p.extension().map_or(false, |e| e == "sql") {
|
||||
sql_files_to_process.insert(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => eprintln!("{}", format!(" Glob pattern error for path '{}': {}", final_glob_pattern_str, e).yellow()),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("{}", format!(" ℹ️ Project '{}' has empty model_paths, skipping", project_name).dimmed());
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
println!("{}", format!(" ℹ️ Project '{}' has no model_paths configured, skipping", project_name).dimmed());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let Some(ref table_meta) = catalog_node.metadata else {
|
||||
eprintln!("{}", format!("⚠️ Warning: Catalog entry for '{}' (file: {}) is missing metadata. Skipping.", model_name_from_filename, sql_file_abs_path.display()).yellow());
|
||||
if sql_files_to_process.is_empty() {
|
||||
println!("{}", format!(" ℹ️ No SQL files found for project '{}'", project_name).dimmed());
|
||||
continue;
|
||||
};
|
||||
// actual_model_name_in_yaml is from catalog metadata.name
|
||||
let actual_model_name_in_yaml = table_meta.name.clone();
|
||||
|
||||
sql_models_successfully_processed_from_catalog_count += 1; // Increment here
|
||||
}
|
||||
|
||||
let relative_sql_path_str = pathdiff::diff_paths(&sql_file_abs_path, &buster_config_dir)
|
||||
.map(|p| p.to_string_lossy().into_owned())
|
||||
.unwrap_or_else(|| sql_file_abs_path.to_string_lossy().into_owned());
|
||||
println!("{}", format!(" ✅ Found {} SQL file(s) for project '{}'", sql_files_to_process.len(), project_name).dimmed());
|
||||
|
||||
let individual_semantic_yaml_path: PathBuf = if is_side_by_side_generation {
|
||||
sql_file_abs_path.with_extension("yml")
|
||||
// --- 2b. Determine Output Directory for this Project ---
|
||||
let semantic_models_base_dir_path_str = target_output_dir_arg.as_ref().cloned().or_else(||
|
||||
project_config.semantic_model_paths.as_ref()
|
||||
.and_then(|paths| paths.first().cloned())
|
||||
).unwrap_or_else(|| String::new());
|
||||
|
||||
let (is_side_by_side_generation, semantic_output_base_abs_dir) = if semantic_models_base_dir_path_str.is_empty() {
|
||||
(true, buster_config_dir.clone())
|
||||
} else {
|
||||
let mut stripped_suffix_for_yaml: Option<PathBuf> = None;
|
||||
for dbt_root in &dbt_project_model_roots_for_stripping {
|
||||
let abs_dbt_root = buster_config_dir.join(dbt_root);
|
||||
if let Ok(stripped) = sql_file_abs_path.strip_prefix(&abs_dbt_root) {
|
||||
stripped_suffix_for_yaml = Some(stripped.with_extension("yml"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
let final_suffix_from_stripping = stripped_suffix_for_yaml.unwrap_or_else(|| PathBuf::from(&model_name_from_filename).with_extension("yml"));
|
||||
|
||||
let mut actual_suffix_to_join = final_suffix_from_stripping.clone();
|
||||
// Check if the semantic_output_base_abs_dir might already imply the first part of the stripped suffix.
|
||||
// e.g., base_dir = ".../models/mart", suffix_from_stripping = "mart/model.yml" -> actual_suffix_to_join = "model.yml"
|
||||
// e.g., base_dir = ".../output", suffix_from_stripping = "mart/model.yml" -> actual_suffix_to_join = "mart/model.yml"
|
||||
if let Some(first_component_in_suffix) = final_suffix_from_stripping.components().next() {
|
||||
if semantic_output_base_abs_dir.ends_with(first_component_in_suffix.as_os_str()) {
|
||||
// If the base output directory ends with the first path component of our stripped suffix
|
||||
// (e.g., base is ".../mart", suffix starts with "mart/"),
|
||||
// we should attempt to use the remainder of the suffix.
|
||||
if final_suffix_from_stripping.components().count() > 1 {
|
||||
// Only strip if there's more than one component in final_suffix_from_stripping.
|
||||
// e.g., if suffix is "mart/model.yml", first_component_in_suffix is "mart".
|
||||
// candidate_shorter_suffix becomes "model.yml". This is what we want.
|
||||
// If suffix was "model.yml", first_component_in_suffix is "model.yml".
|
||||
// semantic_output_base_abs_dir might end with "model.yml" (unlikely for a dir, but for robustness).
|
||||
// components().count() would be 1. We would not strip, correctly joining "model.yml".
|
||||
if let Ok(candidate_shorter_suffix) = final_suffix_from_stripping.strip_prefix(first_component_in_suffix.as_os_str()) {
|
||||
actual_suffix_to_join = candidate_shorter_suffix.to_path_buf();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
semantic_output_base_abs_dir.join(actual_suffix_to_join)
|
||||
let abs_path = if Path::new(&semantic_models_base_dir_path_str).is_absolute() {
|
||||
PathBuf::from(&semantic_models_base_dir_path_str)
|
||||
} else {
|
||||
buster_config_dir.join(&semantic_models_base_dir_path_str)
|
||||
};
|
||||
fs::create_dir_all(&abs_path).context(format!("Failed to create semantic models output dir: {}", abs_path.display()))?;
|
||||
(false, abs_path)
|
||||
};
|
||||
if let Some(p) = individual_semantic_yaml_path.parent() { fs::create_dir_all(p)?; }
|
||||
|
||||
// --- Reconciliation Logic (Create or Update) ---
|
||||
let existing_yaml_model_opt: Option<YamlModel> = if individual_semantic_yaml_path.exists() {
|
||||
fs::read_to_string(&individual_semantic_yaml_path)
|
||||
.ok()
|
||||
.and_then(|content| serde_yaml::from_str::<YamlModel>(&content).ok())
|
||||
} else { None };
|
||||
// --- 2c. Process SQL Files for this Project ---
|
||||
println!("{}", format!(" ✨ Processing SQL files for project '{}'...", project_name).dimmed());
|
||||
let mut models_generated_count = 0;
|
||||
let mut models_updated_count = 0;
|
||||
let mut columns_added_count = 0;
|
||||
let mut columns_updated_count = 0;
|
||||
let mut columns_removed_count = 0;
|
||||
let mut sql_models_successfully_processed_from_catalog_count = 0;
|
||||
|
||||
match existing_yaml_model_opt {
|
||||
Some(mut existing_model) => {
|
||||
// --- Reconciliation Logic for Existing Model ---
|
||||
let mut model_updated = false;
|
||||
let original_dim_count = existing_model.dimensions.len();
|
||||
let original_measure_count = existing_model.measures.len();
|
||||
// Get project-specific defaults
|
||||
let proj_default_ds_name = project_config.data_source_name.as_deref();
|
||||
let proj_default_database = project_config.database.as_deref();
|
||||
let proj_default_schema = project_config.schema.as_deref();
|
||||
|
||||
// Get the set of column names from the dbt catalog for this model
|
||||
let catalog_column_names: HashSet<String> = catalog_node.columns
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// Remove dimensions that are no longer in the catalog
|
||||
existing_model.dimensions.retain(|dim| {
|
||||
let keep = catalog_column_names.contains(&dim.name);
|
||||
if !keep {
|
||||
columns_removed_count += 1;
|
||||
model_updated = true;
|
||||
println!(" - Removing dimension '{}' (not in catalog)", dim.name.yellow());
|
||||
}
|
||||
keep
|
||||
});
|
||||
|
||||
// Remove measures that are no longer in the catalog
|
||||
existing_model.measures.retain(|measure| {
|
||||
let keep = catalog_column_names.contains(&measure.name);
|
||||
if !keep {
|
||||
columns_removed_count += 1;
|
||||
model_updated = true;
|
||||
println!(" - Removing measure '{}' (not in catalog)", measure.name.yellow());
|
||||
}
|
||||
keep
|
||||
});
|
||||
|
||||
// Note: We do NOT remove metrics, filters, or relationships automatically
|
||||
// as they might represent derived logic or explicitly defined connections
|
||||
// not directly tied 1:1 with current physical columns.
|
||||
|
||||
// TODO: Add logic here to ADD new columns from the catalog as dimensions/measures
|
||||
// if they don't already exist in the existing_model.
|
||||
|
||||
if model_updated {
|
||||
let yaml_string = serde_yaml::to_string(&existing_model)?;
|
||||
fs::write(&individual_semantic_yaml_path, yaml_string)?;
|
||||
models_updated_count += 1;
|
||||
println!(" {} Updated existing semantic model: {}", "🔄".cyan(), individual_semantic_yaml_path.display().to_string().cyan());
|
||||
} else {
|
||||
// If no columns were removed, maybe check if columns need *adding* later?
|
||||
// For now, just indicate no changes needed based on removal.
|
||||
// println!(" {} No column removals needed for: {}", "✅".dimmed(), individual_semantic_yaml_path.display().to_string().dimmed());
|
||||
}
|
||||
for sql_file_abs_path in sql_files_to_process {
|
||||
let model_name_from_filename = sql_file_abs_path.file_stem().map_or_else(String::new, |s| s.to_string_lossy().into_owned());
|
||||
if model_name_from_filename.is_empty() {
|
||||
eprintln!("{}", format!(" ⚠️ Warning: Could not get model name from file {}. Skipping.", sql_file_abs_path.display()).yellow());
|
||||
continue;
|
||||
}
|
||||
None => { // New semantic model
|
||||
let mut dimensions = Vec::new();
|
||||
let mut measures = Vec::new();
|
||||
for (_col_name, col_meta) in &catalog_node.columns {
|
||||
if crate::commands::init::is_measure_type(&col_meta.type_) { // type_ is String
|
||||
measures.push(YamlMeasure {
|
||||
name: col_meta.name.clone(),
|
||||
description: col_meta.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("{DESCRIPTION_NEEDED}.".to_string())),
|
||||
type_: Some(col_meta.type_.clone())
|
||||
});
|
||||
} else {
|
||||
dimensions.push(YamlDimension {
|
||||
name: col_meta.name.clone(),
|
||||
description: col_meta.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("{DESCRIPTION_NEEDED}.".to_string())),
|
||||
type_: Some(col_meta.type_.clone()),
|
||||
searchable: false, // Ensure searchable is false
|
||||
options: None
|
||||
});
|
||||
|
||||
// Extract path components from SQL file path
|
||||
let path_components = extract_sql_file_path_components(
|
||||
&sql_file_abs_path,
|
||||
&buster_config_dir,
|
||||
&dbt_project_model_roots_for_stripping
|
||||
);
|
||||
|
||||
// Try to find the matching catalog node using a prioritized approach
|
||||
let catalog_node = match find_matching_catalog_node(
|
||||
&catalog_nodes_lookup,
|
||||
project_name,
|
||||
&path_components,
|
||||
&model_name_from_filename
|
||||
) {
|
||||
Some((node, match_type, key)) => {
|
||||
node
|
||||
},
|
||||
None => {
|
||||
eprintln!("{}", format!(" ℹ️ Info: SQL model file '{}' found, but no corresponding entry in dbt catalog. Skipping.\nTried looking up with components: {:?}",
|
||||
sql_file_abs_path.display(), path_components).dimmed());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(ref table_meta) = catalog_node.metadata else {
|
||||
eprintln!("{}", format!(" ⚠️ Warning: Catalog entry for '{}' (file: {}) is missing metadata. Skipping.", model_name_from_filename, sql_file_abs_path.display()).yellow());
|
||||
continue;
|
||||
};
|
||||
|
||||
let actual_model_name_in_yaml = table_meta.name.clone();
|
||||
sql_models_successfully_processed_from_catalog_count += 1;
|
||||
|
||||
let individual_semantic_yaml_path: PathBuf = if is_side_by_side_generation {
|
||||
sql_file_abs_path.with_extension("yml")
|
||||
} else {
|
||||
// Write directly to semantic_model_paths without preserving directory structure
|
||||
semantic_output_base_abs_dir.join(format!("{}.yml", model_name_from_filename))
|
||||
};
|
||||
|
||||
if let Some(p) = individual_semantic_yaml_path.parent() {
|
||||
fs::create_dir_all(p)?;
|
||||
}
|
||||
|
||||
// --- Reconciliation Logic (Create or Update) ---
|
||||
let existing_yaml_model_opt: Option<YamlModel> = if individual_semantic_yaml_path.exists() {
|
||||
fs::read_to_string(&individual_semantic_yaml_path)
|
||||
.ok()
|
||||
.and_then(|content| serde_yaml::from_str::<YamlModel>(&content).ok())
|
||||
} else { None };
|
||||
|
||||
match existing_yaml_model_opt {
|
||||
Some(mut existing_model) => {
|
||||
// --- Reconciliation Logic for Existing Model ---
|
||||
let mut model_updated = false;
|
||||
|
||||
// Get the set of column names from the dbt catalog for this model
|
||||
let catalog_column_names: HashSet<String> = catalog_node.columns
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// Remove dimensions that are no longer in the catalog
|
||||
existing_model.dimensions.retain(|dim| {
|
||||
let keep = catalog_column_names.contains(&dim.name);
|
||||
if !keep {
|
||||
columns_removed_count += 1;
|
||||
model_updated = true;
|
||||
println!(" - Removing dimension '{}' (not in catalog)", dim.name.yellow());
|
||||
}
|
||||
keep
|
||||
});
|
||||
|
||||
// Remove measures that are no longer in the catalog
|
||||
existing_model.measures.retain(|measure| {
|
||||
let keep = catalog_column_names.contains(&measure.name);
|
||||
if !keep {
|
||||
columns_removed_count += 1;
|
||||
model_updated = true;
|
||||
println!(" - Removing measure '{}' (not in catalog)", measure.name.yellow());
|
||||
}
|
||||
keep
|
||||
});
|
||||
|
||||
if model_updated {
|
||||
let yaml_string = serde_yaml::to_string(&existing_model)?;
|
||||
fs::write(&individual_semantic_yaml_path, yaml_string)?;
|
||||
models_updated_count += 1;
|
||||
println!(" {} Updated existing semantic model: {}", "🔄".cyan(), individual_semantic_yaml_path.display().to_string().cyan());
|
||||
}
|
||||
}
|
||||
let new_model = YamlModel {
|
||||
name: actual_model_name_in_yaml,
|
||||
description: table_meta.comment.clone(),
|
||||
data_source_name: None, // Per user request, dbt catalog doesn't provide this, so imply project default for new models
|
||||
database: {
|
||||
let model_db_from_catalog = &table_meta.database; // Option<String>
|
||||
model_db_from_catalog.as_ref()
|
||||
.filter(|catalog_db_str_ref| proj_default_database != Some(catalog_db_str_ref.as_str()))
|
||||
.cloned()
|
||||
},
|
||||
schema: {
|
||||
let model_schema_from_catalog = &table_meta.schema; // String
|
||||
if proj_default_schema.as_deref() == Some(model_schema_from_catalog.as_str()) {
|
||||
None
|
||||
None => {
|
||||
// New semantic model
|
||||
let mut dimensions = Vec::new();
|
||||
let mut measures = Vec::new();
|
||||
for (_col_name, col_meta) in &catalog_node.columns {
|
||||
if crate::commands::init::is_measure_type(&col_meta.type_) {
|
||||
measures.push(YamlMeasure {
|
||||
name: col_meta.name.clone(),
|
||||
description: col_meta.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("{DESCRIPTION_NEEDED}.".to_string())),
|
||||
type_: Some(col_meta.type_.clone())
|
||||
});
|
||||
} else {
|
||||
Some(model_schema_from_catalog.clone())
|
||||
dimensions.push(YamlDimension {
|
||||
name: col_meta.name.clone(),
|
||||
description: col_meta.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("{DESCRIPTION_NEEDED}.".to_string())),
|
||||
type_: Some(col_meta.type_.clone()),
|
||||
searchable: false,
|
||||
options: None
|
||||
});
|
||||
}
|
||||
},
|
||||
dimensions,
|
||||
measures,
|
||||
};
|
||||
let yaml_string = serde_yaml::to_string(&new_model)?;
|
||||
fs::write(&individual_semantic_yaml_path, yaml_string)?;
|
||||
models_generated_count += 1;
|
||||
println!(" {} Generated new semantic model: {}", "✨".green(), individual_semantic_yaml_path.display().to_string().green());
|
||||
}
|
||||
let new_model = YamlModel {
|
||||
name: actual_model_name_in_yaml,
|
||||
description: table_meta.comment.clone(),
|
||||
data_source_name: None,
|
||||
database: {
|
||||
let model_db_from_catalog = &table_meta.database;
|
||||
model_db_from_catalog.as_ref()
|
||||
.filter(|catalog_db_str_ref| proj_default_database != Some(catalog_db_str_ref.as_str()))
|
||||
.cloned()
|
||||
},
|
||||
schema: {
|
||||
let model_schema_from_catalog = &table_meta.schema;
|
||||
if proj_default_schema.as_deref() == Some(model_schema_from_catalog.as_str()) {
|
||||
None
|
||||
} else {
|
||||
Some(model_schema_from_catalog.clone())
|
||||
}
|
||||
},
|
||||
dimensions,
|
||||
measures,
|
||||
};
|
||||
let yaml_string = serde_yaml::to_string(&new_model)?;
|
||||
fs::write(&individual_semantic_yaml_path, yaml_string)?;
|
||||
models_generated_count += 1;
|
||||
println!(" {} Generated new semantic model: {}", "✨".green(), individual_semantic_yaml_path.display().to_string().green());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Project summary
|
||||
println!("\n{}", format!(" 📊 Project '{}' Summary:", project_name).bold().blue());
|
||||
println!(" SQL models processed with catalog entry: {}", sql_models_successfully_processed_from_catalog_count.to_string().cyan());
|
||||
println!(" New semantic models generated : {}", models_generated_count.to_string().green());
|
||||
println!(" Existing semantic models updated : {}", models_updated_count.to_string().cyan());
|
||||
println!(" Columns removed from existing models : {}", columns_removed_count.to_string().red());
|
||||
|
||||
// Add to totals
|
||||
total_models_generated_count += models_generated_count;
|
||||
total_models_updated_count += models_updated_count;
|
||||
total_columns_added_count += columns_added_count;
|
||||
total_columns_updated_count += columns_updated_count;
|
||||
total_columns_removed_count += columns_removed_count;
|
||||
total_sql_models_successfully_processed_from_catalog_count += sql_models_successfully_processed_from_catalog_count;
|
||||
}
|
||||
|
||||
println!("\n{}", "📊 Semantic Model Generation/Update Summary:".bold().green());
|
||||
println!("\n{}", "📊 Overall Summary:".bold().green());
|
||||
println!(" --------------------------------------------------");
|
||||
println!(" SQL models processed with catalog entry: {}", sql_models_successfully_processed_from_catalog_count.to_string().cyan());
|
||||
println!(" New semantic models generated : {}", models_generated_count.to_string().green());
|
||||
println!(" Existing semantic models updated : {}", models_updated_count.to_string().cyan());
|
||||
println!(" Columns added to existing models : {}", columns_added_count.to_string().green());
|
||||
println!(" Columns updated in existing models : {}", columns_updated_count.to_string().cyan());
|
||||
println!(" Columns removed from existing models : {}", columns_removed_count.to_string().red());
|
||||
println!(" Total SQL models processed with catalog entry: {}", total_sql_models_successfully_processed_from_catalog_count.to_string().cyan());
|
||||
println!(" Total new semantic models generated : {}", total_models_generated_count.to_string().green());
|
||||
println!(" Total existing semantic models updated : {}", total_models_updated_count.to_string().cyan());
|
||||
println!(" Total columns added to existing models : {}", total_columns_added_count.to_string().green());
|
||||
println!(" Total columns updated in existing models : {}", total_columns_updated_count.to_string().cyan());
|
||||
println!(" Total columns removed from existing models : {}", total_columns_removed_count.to_string().red());
|
||||
println!(" --------------------------------------------------");
|
||||
if sql_models_successfully_processed_from_catalog_count == 0 && models_generated_count == 0 && models_updated_count == 0 {
|
||||
if total_sql_models_successfully_processed_from_catalog_count == 0 && total_models_generated_count == 0 && total_models_updated_count == 0 {
|
||||
println!("{}", "ℹ️ No models were generated or updated.".yellow());
|
||||
} else {
|
||||
println!("🎉 Semantic model generation/update complete.");
|
||||
println!("🎉 Semantic model generation/update complete for all projects.");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -13,7 +13,7 @@ use models::{CatalogNode, DbtCatalog};
|
|||
pub async fn run_dbt_docs_generate(dbt_project_path: &Path) -> Result<()> {
|
||||
println!(
|
||||
"{}",
|
||||
format!("Running 'dbt docs generate' for project at: {}", dbt_project_path.display()).dimmed()
|
||||
format!("Running 'dbt clean && dbt docs generate' for project at: {}", dbt_project_path.display()).dimmed()
|
||||
);
|
||||
let spinner = ProgressBar::new_spinner();
|
||||
spinner.set_style(
|
||||
|
@ -21,13 +21,40 @@ pub async fn run_dbt_docs_generate(dbt_project_path: &Path) -> Result<()> {
|
|||
.template("{spinner:.green} {msg}")
|
||||
.context("Failed to create progress style for dbt docs generate spinner")? // Added context
|
||||
);
|
||||
spinner.set_message("Executing dbt docs generate...");
|
||||
|
||||
// First run dbt clean
|
||||
spinner.set_message("Executing dbt clean...");
|
||||
spinner.enable_steady_tick(Duration::from_millis(100));
|
||||
|
||||
let clean_output = tokio::process::Command::new("dbt")
|
||||
.arg("clean")
|
||||
.arg("--project-dir")
|
||||
.arg(dbt_project_path.as_os_str())
|
||||
.output()
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute 'dbt clean' command for project: {}", dbt_project_path.display()))?;
|
||||
|
||||
if !clean_output.status.success() {
|
||||
eprintln!(
|
||||
"{}",
|
||||
format!(
|
||||
"⚠️ 'dbt clean' failed but continuing. Status: {}.\nStdout: {}\nStderr: {}",
|
||||
clean_output.status,
|
||||
String::from_utf8_lossy(&clean_output.stdout),
|
||||
String::from_utf8_lossy(&clean_output.stderr)
|
||||
)
|
||||
.yellow()
|
||||
);
|
||||
}
|
||||
|
||||
// Then run dbt docs generate
|
||||
spinner.set_message("Executing dbt docs generate...");
|
||||
|
||||
let output = tokio::process::Command::new("dbt") // Switched to tokio::process::Command for async
|
||||
.arg("docs")
|
||||
.arg("generate")
|
||||
.arg("--project-dir")
|
||||
|
||||
.arg(dbt_project_path.as_os_str())
|
||||
.output()
|
||||
.await
|
||||
|
@ -38,7 +65,7 @@ pub async fn run_dbt_docs_generate(dbt_project_path: &Path) -> Result<()> {
|
|||
if output.status.success() {
|
||||
println!(
|
||||
"{}",
|
||||
"✓ 'dbt docs generate' completed successfully.".green()
|
||||
"✓ 'dbt clean && dbt docs generate' completed successfully.".green()
|
||||
);
|
||||
// It might be useful to check if catalog.json was actually created/updated here,
|
||||
// but for now, we assume success means it's likely fine.
|
||||
|
|
|
@ -171,13 +171,13 @@ const CollapseToggleIcon = React.memo(
|
|||
className
|
||||
)}
|
||||
onClick={onClick}>
|
||||
<AnimatePresence mode="sync">
|
||||
<AnimatePresence mode="sync" initial={false}>
|
||||
{showChevron && (
|
||||
<motion.div
|
||||
key="chevron"
|
||||
initial={{ opacity: 0, scale: 0.9 }}
|
||||
animate={{ opacity: 1, scale: 1 }}
|
||||
exit={{ opacity: 0, scale: 0.9 }}
|
||||
initial={{ opacity: 0 }}
|
||||
animate={{ opacity: 1 }}
|
||||
exit={{ opacity: 0 }}
|
||||
transition={{ duration: 0.12, ease: 'easeInOut' }}
|
||||
className={cn(
|
||||
'text-icon-color absolute inset-0 flex h-5 w-5 items-center justify-center transition-transform duration-200',
|
||||
|
@ -189,9 +189,9 @@ const CollapseToggleIcon = React.memo(
|
|||
{showDefaultIcon && (
|
||||
<motion.div
|
||||
key="default"
|
||||
initial={{ opacity: 0, scale: 0.9 }}
|
||||
animate={{ opacity: 1, scale: 1 }}
|
||||
exit={{ opacity: 0, scale: 0.9 }}
|
||||
initial={{ opacity: 0 }}
|
||||
animate={{ opacity: 1 }}
|
||||
exit={{ opacity: 0 }}
|
||||
transition={{ duration: 0.12, ease: 'easeInOut' }}
|
||||
className="text-icon-color absolute inset-0 flex h-5 w-5 items-center justify-center">
|
||||
{collapseDefaultIcon}
|
||||
|
|
Loading…
Reference in New Issue