mirror of https://github.com/buster-so/buster.git
init and generate
This commit is contained in:
parent
a6a8e31cdd
commit
5d59828c45
|
@ -262,94 +262,160 @@ pub async fn generate_semantic_models_command(
|
|||
if existing_model.name != actual_model_name_in_yaml {
|
||||
existing_model.name = actual_model_name_in_yaml.clone(); model_was_updated = true;
|
||||
}
|
||||
// Update description from catalog if catalog has one
|
||||
if table_meta.comment.is_some() && existing_model.description != table_meta.comment {
|
||||
existing_model.description = table_meta.comment.clone(); model_was_updated = true;
|
||||
}
|
||||
// Update db/schema from catalog node, clearing if they match project defaults
|
||||
let cat_db_from_meta = &table_meta.database; // Option<String>
|
||||
let new_yaml_db = cat_db_from_meta.as_ref()
|
||||
.filter(|cat_db_val_str_ref| proj_default_database != Some(cat_db_val_str_ref.as_str()))
|
||||
.cloned();
|
||||
if existing_model.database != new_yaml_db {
|
||||
existing_model.database = new_yaml_db;
|
||||
model_was_updated = true;
|
||||
}
|
||||
|
||||
let cat_schema_from_meta = &table_meta.schema; // String
|
||||
let new_yaml_schema = if proj_default_schema.as_deref() == Some(cat_schema_from_meta.as_str()) {
|
||||
None
|
||||
} else {
|
||||
Some(cat_schema_from_meta.clone())
|
||||
};
|
||||
if existing_model.schema != new_yaml_schema {
|
||||
existing_model.schema = new_yaml_schema;
|
||||
model_was_updated = true;
|
||||
}
|
||||
|
||||
// For data_source_name, if it was manually set and matches project default, clear it.
|
||||
// Otherwise, preserve manual overrides. Catalog doesn't provide this.
|
||||
if let Some(default_ds_val_str) = proj_default_ds_name {
|
||||
if existing_model.data_source_name.as_deref() == Some(default_ds_val_str) {
|
||||
if existing_model.data_source_name.is_some() { // Only update if it changes from Some to None
|
||||
existing_model.data_source_name = None;
|
||||
// Preserve manual description, otherwise update from catalog if catalog has one.
|
||||
let placeholder_desc = "Description missing - please update.".to_string();
|
||||
match &existing_model.description {
|
||||
Some(existing_desc) if existing_desc != &placeholder_desc => {
|
||||
// Manual description exists and is not the placeholder, do nothing to preserve it.
|
||||
}
|
||||
_ => { // Existing is None or is the placeholder
|
||||
if table_meta.comment.is_some() && existing_model.description != table_meta.comment {
|
||||
existing_model.description = table_meta.comment.clone();
|
||||
model_was_updated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve manual database override, otherwise update from catalog.
|
||||
if existing_model.database.is_none() {
|
||||
let cat_db_from_meta = &table_meta.database; // Option<String>
|
||||
let new_yaml_db = cat_db_from_meta.as_ref()
|
||||
.filter(|cat_db_val_str_ref| proj_default_database != Some(cat_db_val_str_ref.as_str()))
|
||||
.cloned();
|
||||
if existing_model.database != new_yaml_db { // Check if it actually changes
|
||||
existing_model.database = new_yaml_db;
|
||||
model_was_updated = true;
|
||||
}
|
||||
} // If Some, it's preserved.
|
||||
|
||||
// Preserve manual schema override, otherwise update from catalog.
|
||||
if existing_model.schema.is_none() {
|
||||
let cat_schema_from_meta = &table_meta.schema; // String
|
||||
let new_yaml_schema = if proj_default_schema.as_deref() == Some(cat_schema_from_meta.as_str()) {
|
||||
None
|
||||
} else {
|
||||
Some(cat_schema_from_meta.clone())
|
||||
};
|
||||
if existing_model.schema != new_yaml_schema { // Check if it actually changes
|
||||
existing_model.schema = new_yaml_schema;
|
||||
model_was_updated = true;
|
||||
}
|
||||
} // If Some, it's preserved.
|
||||
|
||||
// Reconcile columns
|
||||
let mut current_dims: Vec<YamlDimension> = Vec::new();
|
||||
let mut current_measures: Vec<YamlMeasure> = Vec::new();
|
||||
let mut dbt_columns_map: HashMap<String, &ColumnMetadata> = catalog_node.columns.values().map(|c| (c.name.clone(), c)).collect();
|
||||
|
||||
for existing_dim_col in std::mem::take(&mut existing_model.dimensions) {
|
||||
if let Some(dbt_col) = dbt_columns_map.remove(&existing_dim_col.name) {
|
||||
let mut updated_dim = existing_dim_col.clone();
|
||||
for existing_dim in std::mem::take(&mut existing_model.dimensions) {
|
||||
if let Some(dbt_col) = dbt_columns_map.get(&existing_dim.name) { // Use .get() to keep it in map for measure pass
|
||||
let mut updated_dim = existing_dim.clone();
|
||||
let mut dim_col_updated = false;
|
||||
|
||||
if !crate::commands::init::is_measure_type(&dbt_col.type_) { // Still a dimension
|
||||
if updated_dim.type_.as_deref() != Some(&dbt_col.type_) {
|
||||
updated_dim.type_ = Some(dbt_col.type_.clone());
|
||||
dim_col_updated = true;
|
||||
// Preserve manual type if Some, otherwise update from catalog.
|
||||
if updated_dim.type_.is_none() {
|
||||
if updated_dim.type_.as_deref() != Some(&dbt_col.type_) { // Check if it actually changes
|
||||
updated_dim.type_ = Some(dbt_col.type_.clone());
|
||||
dim_col_updated = true;
|
||||
}
|
||||
}
|
||||
if dbt_col.comment.is_some() && updated_dim.description != dbt_col.comment {
|
||||
updated_dim.description = dbt_col.comment.clone();
|
||||
dim_col_updated = true;
|
||||
|
||||
// Preserve manual description if Some and not placeholder, otherwise update from catalog.
|
||||
let placeholder_col_desc = "Description missing - please update.".to_string();
|
||||
match &updated_dim.description {
|
||||
Some(existing_col_desc) if existing_col_desc != &placeholder_col_desc => {
|
||||
// Manual description exists and is not placeholder, do nothing.
|
||||
}
|
||||
_ => { // Existing is None or is placeholder
|
||||
let new_description_from_catalog = dbt_col.comment.as_ref().filter(|s| !s.is_empty()).cloned();
|
||||
if updated_dim.description != new_description_from_catalog {
|
||||
updated_dim.description = new_description_from_catalog.or_else(|| Some(placeholder_col_desc));
|
||||
dim_col_updated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve existing_dim.searchable and existing_dim.options, so no changes needed here for them.
|
||||
// If updated_dim.searchable was true, it remains true.
|
||||
// If updated_dim.options was Some, it remains Some.
|
||||
|
||||
if dim_col_updated { columns_updated_count +=1; model_was_updated = true; }
|
||||
current_dims.push(updated_dim);
|
||||
} else { // Was a dimension, but is now a measure according to dbt_col
|
||||
columns_removed_count += 1; model_was_updated = true; // Will be added as a new measure later
|
||||
dbt_columns_map.remove(&existing_dim.name); // Consume it now that it's processed as a dim
|
||||
} else { // Was a dimension, but is now a measure according to dbt_col type
|
||||
println!("{}", format!(" ✏️ Column '{}' changed from Dimension to Measure. It will be re-added as Measure.", existing_dim.name).yellow());
|
||||
columns_removed_count += 1; // Count as removed dimension
|
||||
model_was_updated = true;
|
||||
// Do not remove from dbt_columns_map yet, it will be picked up as a new measure.
|
||||
}
|
||||
} else { columns_removed_count += 1; model_was_updated = true; }
|
||||
} else { // Dimension no longer in catalog
|
||||
println!("{}", format!(" ➖ Dimension '{}' removed (not in catalog).", existing_dim.name).yellow());
|
||||
columns_removed_count += 1; model_was_updated = true;
|
||||
// dbt_columns_map.remove(&existing_dim.name); // Not needed, it's not in the map
|
||||
}
|
||||
}
|
||||
for existing_measure_col in std::mem::take(&mut existing_model.measures) {
|
||||
if let Some(dbt_col) = dbt_columns_map.remove(&existing_measure_col.name) {
|
||||
let mut updated_measure = existing_measure_col.clone();
|
||||
|
||||
for existing_measure in std::mem::take(&mut existing_model.measures) {
|
||||
if let Some(dbt_col) = dbt_columns_map.get(&existing_measure.name) { // Use .get() initially
|
||||
let mut updated_measure = existing_measure.clone();
|
||||
let mut measure_col_updated = false;
|
||||
|
||||
if crate::commands::init::is_measure_type(&dbt_col.type_) { // Still a measure
|
||||
if updated_measure.type_.as_deref() != Some(&dbt_col.type_) {
|
||||
updated_measure.type_ = Some(dbt_col.type_.clone());
|
||||
measure_col_updated = true;
|
||||
// Preserve manual type if Some, otherwise update from catalog.
|
||||
if updated_measure.type_.is_none() {
|
||||
if updated_measure.type_.as_deref() != Some(&dbt_col.type_) { // Check if it actually changes
|
||||
updated_measure.type_ = Some(dbt_col.type_.clone());
|
||||
measure_col_updated = true;
|
||||
}
|
||||
}
|
||||
if dbt_col.comment.is_some() && updated_measure.description != dbt_col.comment {
|
||||
updated_measure.description = dbt_col.comment.clone();
|
||||
measure_col_updated = true;
|
||||
|
||||
// Preserve manual description if Some and not placeholder, otherwise update from catalog.
|
||||
let placeholder_col_desc = "Description missing - please update.".to_string();
|
||||
match &updated_measure.description {
|
||||
Some(existing_col_desc) if existing_col_desc != &placeholder_col_desc => {
|
||||
// Manual description exists and is not placeholder, do nothing.
|
||||
}
|
||||
_ => { // Existing is None or is placeholder
|
||||
let new_description_from_catalog = dbt_col.comment.as_ref().filter(|s| !s.is_empty()).cloned();
|
||||
if updated_measure.description != new_description_from_catalog {
|
||||
updated_measure.description = new_description_from_catalog.or_else(|| Some(placeholder_col_desc));
|
||||
measure_col_updated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if measure_col_updated { columns_updated_count +=1; model_was_updated = true; }
|
||||
current_measures.push(updated_measure);
|
||||
dbt_columns_map.remove(&existing_measure.name); // Consume it
|
||||
} else { // Was a measure, but is now a dimension
|
||||
columns_removed_count += 1; model_was_updated = true; // Will be added as a new dimension later
|
||||
println!("{}", format!(" ✏️ Column '{}' changed from Measure to Dimension. It will be re-added as Dimension.", existing_measure.name).cyan());
|
||||
columns_removed_count += 1; // Count as removed measure
|
||||
model_was_updated = true;
|
||||
// Do not remove from dbt_columns_map yet, it will be picked up as a new dimension.
|
||||
}
|
||||
} else { columns_removed_count += 1; model_was_updated = true; }
|
||||
} else { // Measure no longer in catalog
|
||||
println!("{}", format!(" ➖ Measure '{}' removed (not in catalog).", existing_measure.name).yellow());
|
||||
columns_removed_count += 1; model_was_updated = true;
|
||||
// dbt_columns_map.remove(&existing_measure.name); // Not needed
|
||||
}
|
||||
}
|
||||
for (_col_name, dbt_col) in dbt_columns_map { // Remaining are new columns
|
||||
if crate::commands::init::is_measure_type(&dbt_col.type_) {
|
||||
current_measures.push(YamlMeasure { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.type_.clone()) });
|
||||
current_measures.push(YamlMeasure {
|
||||
name: dbt_col.name.clone(),
|
||||
description: dbt_col.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("Description missing - please update.".to_string())),
|
||||
type_: Some(dbt_col.type_.clone())
|
||||
});
|
||||
} else {
|
||||
current_dims.push(YamlDimension { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.type_.clone()), searchable: false, options: None });
|
||||
current_dims.push(YamlDimension {
|
||||
name: dbt_col.name.clone(),
|
||||
description: dbt_col.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("Description missing - please update.".to_string())),
|
||||
type_: Some(dbt_col.type_.clone()),
|
||||
searchable: false, // Ensure searchable is false
|
||||
options: None
|
||||
});
|
||||
}
|
||||
columns_added_count += 1; model_was_updated = true;
|
||||
}
|
||||
|
@ -370,9 +436,19 @@ pub async fn generate_semantic_models_command(
|
|||
let mut measures = Vec::new();
|
||||
for (_col_name, col_meta) in &catalog_node.columns {
|
||||
if crate::commands::init::is_measure_type(&col_meta.type_) { // type_ is String
|
||||
measures.push(YamlMeasure { name: col_meta.name.clone(), description: col_meta.comment.clone(), type_: Some(col_meta.type_.clone()) });
|
||||
measures.push(YamlMeasure {
|
||||
name: col_meta.name.clone(),
|
||||
description: col_meta.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("Description missing - please update.".to_string())),
|
||||
type_: Some(col_meta.type_.clone())
|
||||
});
|
||||
} else {
|
||||
dimensions.push(YamlDimension { name: col_meta.name.clone(), description: col_meta.comment.clone(), type_: Some(col_meta.type_.clone()), searchable: false, options: None });
|
||||
dimensions.push(YamlDimension {
|
||||
name: col_meta.name.clone(),
|
||||
description: col_meta.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("Description missing - please update.".to_string())),
|
||||
type_: Some(col_meta.type_.clone()),
|
||||
searchable: false, // Ensure searchable is false
|
||||
options: None
|
||||
});
|
||||
}
|
||||
}
|
||||
let new_model = YamlModel {
|
||||
|
|
|
@ -211,14 +211,6 @@ fn default_model_paths() -> Vec<String> {
|
|||
pub fn parse_dbt_project_file_content(base_dir: &Path) -> Result<Option<DbtProjectFileContent>> {
|
||||
let dbt_project_path = base_dir.join("dbt_project.yml");
|
||||
if dbt_project_path.exists() && dbt_project_path.is_file() {
|
||||
println!(
|
||||
"{}",
|
||||
format!(
|
||||
"Found {}, attempting to read config for model paths and schemas...",
|
||||
dbt_project_path.display()
|
||||
)
|
||||
.dimmed()
|
||||
);
|
||||
match fs::read_to_string(&dbt_project_path) {
|
||||
Ok(content) => {
|
||||
match serde_yaml::from_str::<DbtProjectFileContent>(&content) {
|
||||
|
@ -438,7 +430,7 @@ pub async fn init(destination_path: Option<&str>) -> Result<()> {
|
|||
|
||||
if let Some(name) = &dbt_project_main_name_suggestion {
|
||||
println!(
|
||||
"{}",
|
||||
"\n{}",
|
||||
format!(
|
||||
"ℹ️ dbt_project.yml found. Suggesting data source name: '{}'",
|
||||
name.cyan()
|
||||
|
@ -463,11 +455,6 @@ pub async fn init(destination_path: Option<&str>) -> Result<()> {
|
|||
|
||||
let db_type = Select::new("Select your database type:", db_types).prompt()?;
|
||||
|
||||
println!(
|
||||
"{}",
|
||||
format!("➡️ You selected: {}", db_type.to_string().cyan()).dimmed()
|
||||
);
|
||||
|
||||
// --- Database specific setup --- (This section largely remains the same)
|
||||
// It will eventually call create_buster_config_file internally or return data for it.
|
||||
// For brevity, assuming it populates necessary details for BusterConfig.
|
||||
|
@ -859,6 +846,7 @@ async fn generate_semantic_models_from_dbt_catalog(
|
|||
let default_data_source_name = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.data_source_name.as_deref());
|
||||
let default_database = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.database.as_deref());
|
||||
let default_schema = buster_config.projects.as_ref().and_then(|p| p.first()).and_then(|pc| pc.schema.as_deref());
|
||||
let mut failed_models: Vec<(String, String)> = Vec::new(); // To store (path_or_name, reason)
|
||||
|
||||
for sql_file_abs_path in sql_files_to_process {
|
||||
let model_name_from_filename = sql_file_abs_path.file_stem().map_or_else(
|
||||
|
@ -867,26 +855,23 @@ async fn generate_semantic_models_from_dbt_catalog(
|
|||
);
|
||||
|
||||
if model_name_from_filename.is_empty() {
|
||||
eprintln!("{}", format!("⚠️ Warning: Could not determine model name from file: {}. Skipping.", sql_file_abs_path.display()).yellow());
|
||||
let warning_msg = format!("Could not determine model name from file: {}. Skipping.", sql_file_abs_path.display());
|
||||
eprintln!("{}", format!("⚠️ Warning: {}", warning_msg).yellow());
|
||||
failed_models.push((sql_file_abs_path.display().to_string(), warning_msg));
|
||||
continue;
|
||||
}
|
||||
|
||||
match catalog_nodes_by_name.get(&model_name_from_filename) {
|
||||
Some(catalog_node) => {
|
||||
let Some(ref node_metadata_opt) = catalog_node.metadata else {
|
||||
eprintln!("{}", format!("⚠️ Warning: Skipping model '{}' (from file {}): Missing metadata in catalog entry.", model_name_from_filename, sql_file_abs_path.display()).yellow());
|
||||
let warning_msg = format!("Skipping model '{}' (from file {}): Missing metadata in catalog entry.", model_name_from_filename, sql_file_abs_path.display());
|
||||
eprintln!("{}", format!("⚠️ Warning: {}", warning_msg).yellow());
|
||||
failed_models.push((sql_file_abs_path.display().to_string(), warning_msg));
|
||||
continue;
|
||||
};
|
||||
let node_metadata = node_metadata_opt; // Shadow to non-Option for easier access, already checked Some
|
||||
// actual_model_name for YamlModel comes from catalog metadata.name
|
||||
let actual_semantic_model_name = node_metadata.name.clone();
|
||||
|
||||
println!("➡️ Processing: {} (Catalog: {}, UniqueID: {})",
|
||||
sql_file_abs_path.display().to_string().cyan(),
|
||||
actual_semantic_model_name.purple(),
|
||||
catalog_node.unique_id.as_deref().unwrap_or("N/A").dimmed()
|
||||
);
|
||||
|
||||
let mut dimensions: Vec<YamlDimension> = Vec::new();
|
||||
let mut measures: Vec<YamlMeasure> = Vec::new();
|
||||
|
||||
|
@ -894,13 +879,13 @@ async fn generate_semantic_models_from_dbt_catalog(
|
|||
if is_measure_type(&col_meta.type_) {
|
||||
measures.push(YamlMeasure {
|
||||
name: col_meta.name.clone(),
|
||||
description: col_meta.comment.clone(),
|
||||
description: col_meta.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("Description missing - please update.".to_string())),
|
||||
type_: Some(col_meta.type_.clone()),
|
||||
});
|
||||
} else {
|
||||
dimensions.push(YamlDimension {
|
||||
name: col_meta.name.clone(),
|
||||
description: col_meta.comment.clone(),
|
||||
description: col_meta.comment.as_ref().filter(|s| !s.is_empty()).cloned().or_else(|| Some("Description missing - please update.".to_string())),
|
||||
type_: Some(col_meta.type_.clone()),
|
||||
searchable: false,
|
||||
options: None,
|
||||
|
@ -912,12 +897,11 @@ async fn generate_semantic_models_from_dbt_catalog(
|
|||
.map(|p| p.to_string_lossy().into_owned())
|
||||
.unwrap_or_else(|| sql_file_abs_path.to_string_lossy().into_owned());
|
||||
|
||||
// Determine database and schema for YAML, comparing with project defaults
|
||||
let yaml_database = node_metadata.database.as_ref()
|
||||
.filter(|catalog_db_val_str_ref| default_database != Some(catalog_db_val_str_ref.as_str()))
|
||||
.cloned();
|
||||
|
||||
let model_schema_from_catalog = &node_metadata.schema; // This is String
|
||||
let model_schema_from_catalog = &node_metadata.schema;
|
||||
let yaml_schema = if default_schema.as_deref() == Some(model_schema_from_catalog.as_str()) {
|
||||
None
|
||||
} else {
|
||||
|
@ -925,59 +909,106 @@ async fn generate_semantic_models_from_dbt_catalog(
|
|||
};
|
||||
|
||||
let yaml_model = YamlModel {
|
||||
name: actual_semantic_model_name, // Use name from catalog metadata
|
||||
name: actual_semantic_model_name.clone(),
|
||||
description: node_metadata.comment.clone(),
|
||||
data_source_name: None, // Per user request, dbt catalog doesn't provide this, so imply project default
|
||||
data_source_name: None,
|
||||
database: yaml_database,
|
||||
schema: yaml_schema,
|
||||
dimensions,
|
||||
measures,
|
||||
};
|
||||
|
||||
// Determine output path
|
||||
let output_yaml_path: PathBuf;
|
||||
if is_side_by_side_generation {
|
||||
output_yaml_path = sql_file_abs_path.with_extension("yml");
|
||||
} else if let Some(ref dedicated_dir) = primary_dedicated_output_dir {
|
||||
// Need to reconstruct subpath relative to a dbt model root (e.g. "models/")
|
||||
let dbt_model_source_roots_for_stripping = parse_dbt_project_file_content(buster_config_dir)?.as_ref()
|
||||
.map(|c| c.model_paths.iter().map(PathBuf::from).collect::<Vec<PathBuf>>())
|
||||
.unwrap_or_else(|| vec![PathBuf::from("models")]);
|
||||
let final_sub_path_for_yaml =
|
||||
if sql_file_abs_path.starts_with(dedicated_dir) {
|
||||
// Case: SQL file is AT or UNDER the dedicated_dir.
|
||||
// Example: dedicated_dir = /proj/models/mart
|
||||
// sql_file_abs_path = /proj/models/mart/sub/file.sql
|
||||
// We want final_sub_path_for_yaml = sub/file.yml (path relative to dedicated_dir)
|
||||
sql_file_abs_path.strip_prefix(dedicated_dir)
|
||||
.unwrap() // Should not fail due to starts_with check
|
||||
.with_extension("yml")
|
||||
} else {
|
||||
// Case: SQL file is ELSEWHERE, and dedicated_dir is a separate output target.
|
||||
// Example: dedicated_dir = /proj/semantic_output
|
||||
// sql_file_abs_path = /proj/models/mart/file.sql
|
||||
// We want final_sub_path_for_yaml = mart/file.yml (relative to its dbt model root)
|
||||
|
||||
let dbt_model_source_roots_for_stripping = match parse_dbt_project_file_content(buster_config_dir) {
|
||||
Ok(Some(c)) => c.model_paths.iter().map(PathBuf::from).collect::<Vec<PathBuf>>(),
|
||||
_ => vec![PathBuf::from("models")],
|
||||
};
|
||||
|
||||
let mut stripped_relative_to_dbt_root: Option<PathBuf> = None;
|
||||
for dbt_root_rel in &dbt_model_source_roots_for_stripping {
|
||||
let abs_dbt_model_root = buster_config_dir.join(dbt_root_rel);
|
||||
if let Ok(stripped) = sql_file_abs_path.strip_prefix(&abs_dbt_model_root) {
|
||||
stripped_relative_to_dbt_root = Some(stripped.with_extension("yml"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
stripped_relative_to_dbt_root.unwrap_or_else(||
|
||||
PathBuf::from(&model_name_from_filename).with_extension("yml") // Fallback to flat file name
|
||||
)
|
||||
};
|
||||
|
||||
let mut stripped_suffix_for_yaml: Option<PathBuf> = None;
|
||||
for dbt_root in &dbt_model_source_roots_for_stripping {
|
||||
let abs_dbt_root = buster_config_dir.join(dbt_root);
|
||||
if let Ok(stripped) = sql_file_abs_path.strip_prefix(&abs_dbt_root) {
|
||||
stripped_suffix_for_yaml = Some(stripped.with_extension("yml"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
let final_suffix = stripped_suffix_for_yaml.unwrap_or_else(||
|
||||
PathBuf::from(&model_name_from_filename).with_extension("yml")
|
||||
);
|
||||
output_yaml_path = dedicated_dir.join(final_suffix);
|
||||
} else { // Should not be reached due to earlier checks, but for safety:
|
||||
output_yaml_path = dedicated_dir.join(final_sub_path_for_yaml);
|
||||
|
||||
} else {
|
||||
// This case (not side-by-side but no primary_dedicated_output_dir) should ideally not happen if config is valid.
|
||||
// Defaulting to side-by-side for safety, though this indicates a potential config issue handled earlier in init.
|
||||
output_yaml_path = sql_file_abs_path.with_extension("yml");
|
||||
}
|
||||
|
||||
if let Some(parent_dir) = output_yaml_path.parent() {
|
||||
fs::create_dir_all(parent_dir).map_err(|e| anyhow!("Failed to create dir '{}': {}", parent_dir.display(), e))?;
|
||||
if let Err(e) = fs::create_dir_all(parent_dir) {
|
||||
let error_msg = format!("Failed to create dir '{}': {}", parent_dir.display(), e);
|
||||
eprintln!("{}", format!("❌ Error: {}", error_msg).red());
|
||||
failed_models.push((actual_semantic_model_name, error_msg));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match serde_yaml::to_string(&yaml_model) {
|
||||
Ok(yaml_string) => {
|
||||
if let Err(e) = fs::write(&output_yaml_path, yaml_string) {
|
||||
let error_msg = format!("Failed to write YAML file '{}': {}", output_yaml_path.display(), e);
|
||||
eprintln!("{}", format!("❌ Error: {}", error_msg).red());
|
||||
failed_models.push((actual_semantic_model_name, error_msg));
|
||||
} else {
|
||||
yaml_models_generated_count += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let error_msg = format!("Failed to serialize model '{}' to YAML: {}", actual_semantic_model_name, e);
|
||||
eprintln!("{}", format!("❌ Error: {}", error_msg).red());
|
||||
failed_models.push((actual_semantic_model_name, error_msg));
|
||||
}
|
||||
}
|
||||
let yaml_string = serde_yaml::to_string(&yaml_model)?;
|
||||
fs::write(&output_yaml_path, yaml_string)?;
|
||||
println!(" {} Generated semantic model: {}", "✅".green(), output_yaml_path.display().to_string().cyan());
|
||||
yaml_models_generated_count += 1;
|
||||
}
|
||||
None => {
|
||||
eprintln!("{}", format!("⚠️ Warning: SQL model file '{}' (model name: '{}') found, but no corresponding entry in dbt catalog. Skipping.", sql_file_abs_path.display(), model_name_from_filename).yellow());
|
||||
let warning_msg = format!("SQL model file '{}' (model name: '{}') found, but no corresponding entry in dbt catalog. Skipping.", sql_file_abs_path.display(), model_name_from_filename);
|
||||
eprintln!("{}", format!("⚠️ Warning: {}", warning_msg).yellow());
|
||||
failed_models.push((sql_file_abs_path.display().to_string(), warning_msg));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if yaml_models_generated_count == 0 {
|
||||
println!("{}", "\nℹ️ No semantic model YAML files were generated.".yellow());
|
||||
println!(); // Add a blank line for spacing before the summary
|
||||
if yaml_models_generated_count > 0 {
|
||||
println!("{}", format!("🎉 Successfully generated {} semantic model YAML file(s).", yaml_models_generated_count).bold().green());
|
||||
} else {
|
||||
println!("{}", format!("\n🎉 Successfully generated {} semantic model YAML file(s).", yaml_models_generated_count).bold().green());
|
||||
println!("{}", "ℹ️ No semantic model YAML files were successfully generated.".yellow());
|
||||
}
|
||||
|
||||
if !failed_models.is_empty() {
|
||||
println!("{}", format!("❌ Encountered issues with {} model(s):", failed_models.len()).bold().red());
|
||||
for (name_or_path, reason) in failed_models {
|
||||
println!(" - {}: {}", name_or_path.cyan(), reason.yellow());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -1051,7 +1082,7 @@ fn create_buster_config_file(
|
|||
}
|
||||
|
||||
if !potential_contexts_info.is_empty() {
|
||||
println!("{}", "ℹ️ Found the following potential model configurations in your dbt_project.yml:".dimmed());
|
||||
println!("\n{}", "ℹ️ Found the following potential model configurations in your dbt_project.yml:".dimmed());
|
||||
// Sort for consistent display
|
||||
potential_contexts_info.sort_by(|a, b| a.display_name.cmp(&b.display_name));
|
||||
|
||||
|
@ -1068,11 +1099,6 @@ fn create_buster_config_file(
|
|||
println!("{}", "No dbt configurations selected. Will prompt for manual model path configuration.".yellow());
|
||||
}
|
||||
for selected_info in infos {
|
||||
println!(
|
||||
" {}{}",
|
||||
"Selected: ".dimmed(),
|
||||
selected_info.display_name.cyan()
|
||||
);
|
||||
project_contexts.push(ProjectContext {
|
||||
name: None, // User wants None
|
||||
data_source_name: Some(data_source_name_cli.to_string()),
|
||||
|
@ -1096,13 +1122,13 @@ fn create_buster_config_file(
|
|||
|
||||
|
||||
if project_contexts.is_empty() {
|
||||
println!("{}", "ℹ️ No dbt-derived project contexts created. Proceeding with manual model path configuration...".yellow());
|
||||
println!("\n{}", "ℹ️ No dbt-derived project contexts created. Proceeding with manual model path configuration...".yellow());
|
||||
|
||||
let mut suggested_model_paths_str = "models".to_string();
|
||||
if let Some(dbt_content) = parse_dbt_project_file_content(buster_config_dir).ok().flatten() {
|
||||
if !dbt_content.model_paths.is_empty() && dbt_content.model_paths != vec!["models"] {
|
||||
suggested_model_paths_str = dbt_content.model_paths.join(",");
|
||||
println!("{}", format!("ℹ️ Suggesting model paths from dbt_project.yml global model-paths: {}", suggested_model_paths_str.cyan()).dimmed());
|
||||
println!("\n{}", format!("ℹ️ Suggesting model paths from dbt_project.yml global model-paths: {}", suggested_model_paths_str.cyan()).dimmed());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1213,7 +1239,7 @@ fn collect_potential_dbt_contexts_recursive(
|
|||
.or(top_level_schema_cli_default);
|
||||
|
||||
// Construct model paths for this specific group
|
||||
// current_config_path_segments: ["mart"] or ["staging", "jaffle_shop"]
|
||||
// current_config_path_segments: ["mart"] or ["staging/core"]
|
||||
// base_dbt_model_paths: ["models", "analysis_models"]
|
||||
// derived_model_paths should be: ["models/mart", "analysis_models/mart"] etc.
|
||||
let mut derived_model_paths_for_this_group: Vec<String> = Vec::new();
|
||||
|
@ -1294,7 +1320,7 @@ async fn setup_redshift(
|
|||
buster_api_key: String,
|
||||
suggested_name: Option<&str>,
|
||||
) -> Result<(String, String, Option<String>)> {
|
||||
println!("{}", "Setting up Redshift connection...".bold().green());
|
||||
println!("\n{}", "Setting up Redshift connection...".bold().green());
|
||||
let name = prompt_validated_name("Enter a unique name for this data source:", suggested_name)?;
|
||||
let host = prompt_required_text(
|
||||
"Enter the Redshift host:",
|
||||
|
@ -1310,15 +1336,6 @@ async fn setup_redshift(
|
|||
let database = prompt_required_text("Enter the default Redshift database:", None)?;
|
||||
let schema = prompt_required_text("Enter the default Redshift schema:", None)?;
|
||||
|
||||
println!("\n{}", "📝 Connection Summary:".bold());
|
||||
println!(" Name: {}", name.cyan());
|
||||
println!(" Host: {}", host.cyan());
|
||||
println!(" Port: {}", port.to_string().cyan());
|
||||
println!(" Username: {}", username.cyan());
|
||||
println!(" Password: {}", "********".cyan());
|
||||
println!(" Default Database: {}", database.cyan());
|
||||
println!(" Default Schema: {}", schema.cyan());
|
||||
|
||||
if Confirm::new("Do you want to create this data source in Buster Cloud?")
|
||||
.with_default(true)
|
||||
.prompt()?
|
||||
|
@ -1353,15 +1370,6 @@ async fn setup_postgres(
|
|||
let database = prompt_required_text("Enter the default PostgreSQL database name:", None)?;
|
||||
let schema = prompt_required_text("Enter the default PostgreSQL schema:", None)?;
|
||||
|
||||
println!("\n{}", "📝 Connection Summary:".bold());
|
||||
println!(" Name: {}", name.cyan());
|
||||
println!(" Host: {}", host.cyan());
|
||||
println!(" Port: {}", port.to_string().cyan());
|
||||
println!(" Username: {}", username.cyan());
|
||||
println!(" Password: {}", "********".cyan());
|
||||
println!(" Default Database: {}", database.cyan());
|
||||
println!(" Default Schema: {}", schema.cyan());
|
||||
|
||||
if Confirm::new("Do you want to create this data source in Buster Cloud?")
|
||||
.with_default(true)
|
||||
.prompt()?
|
||||
|
@ -1399,12 +1407,6 @@ async fn setup_bigquery(
|
|||
let credentials_content = fs::read_to_string(&credentials_path_str).map_err(|e| anyhow!("Failed to read credentials file '{}': {}", credentials_path_str, e))?;
|
||||
let credentials_json: serde_json::Value = serde_json::from_str(&credentials_content).map_err(|e| anyhow!("Invalid JSON in credentials file '{}': {}", credentials_path_str, e))?;
|
||||
|
||||
println!("\n{}", "📝 Connection Summary:".bold());
|
||||
println!(" Name: {}", name.cyan());
|
||||
println!(" Default Project ID: {}", project_id.cyan());
|
||||
println!(" Default Dataset ID: {}", dataset_id.cyan());
|
||||
println!(" Credentials File: {}", credentials_path_str.cyan());
|
||||
|
||||
if Confirm::new("Do you want to create this data source in Buster Cloud?")
|
||||
.with_default(true)
|
||||
.prompt()?
|
||||
|
@ -1435,14 +1437,6 @@ async fn setup_mysql(
|
|||
let database = prompt_required_text("Enter the default MySQL/MariaDB database name:", None)?;
|
||||
// No schema for MySQL
|
||||
|
||||
println!("\n{}", "📝 Connection Summary:".bold());
|
||||
println!(" Name: {}", name.cyan());
|
||||
println!(" Host: {}", host.cyan());
|
||||
println!(" Port: {}", port.to_string().cyan());
|
||||
println!(" Username: {}", username.cyan());
|
||||
println!(" Password: {}", "********".cyan());
|
||||
println!(" Default Database: {}", database.cyan());
|
||||
|
||||
if Confirm::new("Do you want to create this data source in Buster Cloud?")
|
||||
.with_default(true)
|
||||
.prompt()?
|
||||
|
@ -1473,15 +1467,6 @@ async fn setup_sqlserver(
|
|||
let database = prompt_required_text("Enter the default SQL Server database name:", None)?;
|
||||
let schema = prompt_required_text("Enter the default SQL Server schema:", None)?;
|
||||
|
||||
println!("\n{}", "📝 Connection Summary:".bold());
|
||||
println!(" Name: {}", name.cyan());
|
||||
println!(" Host: {}", host.cyan());
|
||||
println!(" Port: {}", port.to_string().cyan());
|
||||
println!(" Username: {}", username.cyan());
|
||||
println!(" Password: {}", "********".cyan());
|
||||
println!(" Default Database: {}", database.cyan());
|
||||
println!(" Default Schema: {}", schema.cyan());
|
||||
|
||||
if Confirm::new("Do you want to create this data source in Buster Cloud?")
|
||||
.with_default(true)
|
||||
.prompt()?
|
||||
|
@ -1511,14 +1496,6 @@ async fn setup_databricks(
|
|||
let catalog = prompt_required_text("Enter the default Databricks catalog:", None)?;
|
||||
let schema = prompt_required_text("Enter the default Databricks schema:", None)?;
|
||||
|
||||
println!("\n{}", "📝 Connection Summary:".bold());
|
||||
println!(" Name: {}", name.cyan());
|
||||
println!(" Host: {}", host.cyan());
|
||||
println!(" API Key: {}", "********".cyan());
|
||||
println!(" Warehouse ID: {}", warehouse_id.cyan());
|
||||
println!(" Default Catalog: {}", catalog.cyan());
|
||||
println!(" Default Schema: {}", schema.cyan());
|
||||
|
||||
if Confirm::new("Do you want to create this data source in Buster Cloud?")
|
||||
.with_default(true)
|
||||
.prompt()?
|
||||
|
@ -1551,18 +1528,6 @@ async fn setup_snowflake(
|
|||
let database = prompt_required_text("Enter the default Snowflake database name:", None)?;
|
||||
let schema = prompt_required_text("Enter the default Snowflake schema:", None)?;
|
||||
|
||||
println!("\n{}", "📝 Connection Summary:".bold());
|
||||
println!(" Name: {}", name.cyan());
|
||||
println!(" Account Identifier: {}", account_id.cyan());
|
||||
println!(" Warehouse: {}", warehouse_id.cyan());
|
||||
println!(" Username: {}", username.cyan());
|
||||
println!(" Password: {}", "********".cyan());
|
||||
if let Some(r) = &role_opt {
|
||||
println!(" Role: {}", r.cyan());
|
||||
}
|
||||
println!(" Default Database: {}", database.cyan());
|
||||
println!(" Default Schema: {}", schema.cyan());
|
||||
|
||||
if Confirm::new("Do you want to create this data source in Buster Cloud?")
|
||||
.with_default(true)
|
||||
.prompt()?
|
||||
|
|
Loading…
Reference in New Issue