mirror of https://github.com/buster-so/buster.git
ok most everything is there
This commit is contained in:
parent
79a2d7cb04
commit
70b92895eb
|
@ -399,99 +399,122 @@ pub async fn deploy(path: Option<&str>, dry_run: bool, recursive: bool) -> Resul
|
|||
if let Some(ref cfg) = buster_config {
|
||||
if let Some(ref projects) = cfg.projects {
|
||||
for project_ctx in projects {
|
||||
if let Some(ref semantic_models_file_str) = project_ctx.semantic_models_file {
|
||||
println!(
|
||||
"ℹ️ Using semantic_models_file for project '{}': {}",
|
||||
project_ctx.identifier().cyan(),
|
||||
semantic_models_file_str.cyan()
|
||||
);
|
||||
let semantic_spec_path = effective_buster_config_dir.join(semantic_models_file_str);
|
||||
if let Some(ref semantic_model_dirs) = project_ctx.semantic_model_paths {
|
||||
for semantic_models_dir_str in semantic_model_dirs {
|
||||
println!(
|
||||
"ℹ️ Using semantic model directory for project '{}': {}",
|
||||
project_ctx.identifier().cyan(),
|
||||
semantic_models_dir_str.cyan()
|
||||
);
|
||||
let semantic_models_dir_path = effective_buster_config_dir.join(semantic_models_dir_str);
|
||||
|
||||
if !semantic_spec_path.exists() {
|
||||
// Log error for this specific project and continue to next or fallback
|
||||
let error_msg = format!("Specified semantic_models_file not found for project '{}': {}", project_ctx.identifier(), semantic_spec_path.display());
|
||||
eprintln!("❌ {}", error_msg.red());
|
||||
result.failures.push((
|
||||
semantic_spec_path.to_string_lossy().into_owned(),
|
||||
format!("project_{}", project_ctx.identifier()),
|
||||
vec![format!("File not found: {}", semantic_spec_path.display())]
|
||||
));
|
||||
continue; // Continue to the next project or fallback if this was the last one
|
||||
}
|
||||
|
||||
progress.current_file = semantic_spec_path.to_string_lossy().into_owned();
|
||||
progress.status = format!("Loading semantic layer specification for project '{}'...", project_ctx.identifier());
|
||||
progress.log_progress();
|
||||
|
||||
let spec = match parse_semantic_layer_spec(&semantic_spec_path) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Failed to parse semantic layer spec for project '{}': {}", project_ctx.identifier(), e));
|
||||
if !semantic_models_dir_path.is_dir() {
|
||||
let error_msg = format!("Specified semantic model path is not a directory or does not exist for project '{}': {}", project_ctx.identifier(), semantic_models_dir_path.display());
|
||||
eprintln!("❌ {}", error_msg.red());
|
||||
result.failures.push((
|
||||
progress.current_file.clone(),
|
||||
format!("project_{}_spec_level", project_ctx.identifier()),
|
||||
vec![e.to_string()]
|
||||
semantic_models_dir_path.to_string_lossy().into_owned(),
|
||||
format!("project_{}_dir_not_found", project_ctx.identifier()),
|
||||
vec![error_msg]
|
||||
));
|
||||
continue; // Continue to the next project or fallback
|
||||
continue; // Continue to the next directory or project
|
||||
}
|
||||
};
|
||||
progress.total_files += spec.models.len(); // Accumulate total files
|
||||
processed_models_from_spec = true;
|
||||
|
||||
// Resolve configurations for all models in the spec using the current project_ctx
|
||||
let models_with_context: Vec<(Model, Option<&ProjectContext>)> = spec.models.into_iter()
|
||||
.map(|m| (m, Some(project_ctx)))
|
||||
.collect();
|
||||
|
||||
let resolved_models = match resolve_model_configurations(models_with_context, cfg) { // cfg is the global BusterConfig
|
||||
Ok(models) => models,
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Configuration resolution failed for spec in project '{}': {}", project_ctx.identifier(), e));
|
||||
result.failures.push((
|
||||
progress.current_file.clone(),
|
||||
format!("project_{}_config_resolution", project_ctx.identifier()),
|
||||
vec![e.to_string()]
|
||||
));
|
||||
continue; // Continue to the next project or fallback
|
||||
}
|
||||
};
|
||||
|
||||
for model in resolved_models {
|
||||
progress.processed += 1;
|
||||
progress.current_file = format!("{} (from {} in project '{}')", model.name, semantic_spec_path.file_name().unwrap_or_default().to_string_lossy(), project_ctx.identifier());
|
||||
progress.status = format!("Processing model '{}'", model.name);
|
||||
progress.log_progress();
|
||||
|
||||
let sql_content = match get_sql_content_for_model(&model, &effective_buster_config_dir, &semantic_spec_path) {
|
||||
Ok(content) => content,
|
||||
// Scan this directory for .yml files
|
||||
// Using a temporary ExclusionManager as deploy_individual_yml_files does, or simplify if not needed here.
|
||||
let exclusion_manager = ExclusionManager::new(cfg).unwrap_or_else(|_| ExclusionManager::empty());
|
||||
let yml_files_in_dir = match find_yml_files(&semantic_models_dir_path, true, &exclusion_manager, Some(&mut progress)) { // Assuming recursive scan for now
|
||||
Ok(files) => files,
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Failed to get SQL for model {}: {}", model.name, e));
|
||||
result.failures.push((progress.current_file.clone(),model.name.clone(),vec![e.to_string()]));
|
||||
continue;
|
||||
progress.log_error(&format!("Failed to scan for YML files in directory '{}' for project '{}': {}", semantic_models_dir_path.display(), project_ctx.identifier(), e));
|
||||
result.failures.push((
|
||||
semantic_models_dir_path.to_string_lossy().into_owned(),
|
||||
format!("project_{}_scan_failed", project_ctx.identifier()),
|
||||
vec![e.to_string()]
|
||||
));
|
||||
continue; // Next directory or project
|
||||
}
|
||||
};
|
||||
|
||||
if yml_files_in_dir.is_empty() {
|
||||
println!("ℹ️ No .yml files found in directory: {}", semantic_models_dir_path.display());
|
||||
continue;
|
||||
}
|
||||
|
||||
model_mappings_final.push(ModelMapping {
|
||||
file: semantic_spec_path.file_name().unwrap_or_default().to_string_lossy().into_owned(),
|
||||
model_name: model.name.clone()
|
||||
});
|
||||
deploy_requests_final.push(to_deploy_request(&model, sql_content));
|
||||
progress.log_success();
|
||||
processed_models_from_spec = true; // Mark that we are processing based on config
|
||||
progress.total_files += yml_files_in_dir.len();
|
||||
|
||||
for yml_file_path in yml_files_in_dir {
|
||||
progress.current_file = yml_file_path.strip_prefix(&effective_buster_config_dir).unwrap_or(&yml_file_path).to_string_lossy().into_owned();
|
||||
progress.status = format!("Loading models from '{}' in project '{}'...", yml_file_path.file_name().unwrap_or_default().to_string_lossy(), project_ctx.identifier());
|
||||
progress.log_progress();
|
||||
|
||||
let parsed_models = match parse_model_file(&yml_file_path) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Failed to parse model file '{}': {}", yml_file_path.display(), e));
|
||||
result.failures.push((progress.current_file.clone(), "parse_failed".to_string(), vec![e.to_string()]));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let models_with_context: Vec<(Model, Option<&ProjectContext>)> = parsed_models.into_iter()
|
||||
.map(|m| (m, Some(project_ctx)))
|
||||
.collect();
|
||||
|
||||
let resolved_models = match resolve_model_configurations(models_with_context, cfg) {
|
||||
Ok(models) => models,
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Config resolution for '{}': {}", yml_file_path.display(), e));
|
||||
result.failures.push((progress.current_file.clone(), "config_resolution_failed".to_string(), vec![e.to_string()]));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for model in resolved_models {
|
||||
progress.processed += 1;
|
||||
progress.current_file = format!("{} (from {} in project '{}')", model.name, yml_file_path.file_name().unwrap_or_default().to_string_lossy(), project_ctx.identifier());
|
||||
progress.status = format!("Processing model '{}'", model.name);
|
||||
progress.log_progress();
|
||||
|
||||
let sql_content = match get_sql_content_for_model(&model, &effective_buster_config_dir, &yml_file_path) {
|
||||
Ok(content) => content,
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Failed to get SQL for model {}: {}", model.name, e));
|
||||
result.failures.push((progress.current_file.clone(),model.name.clone(),vec![e.to_string()]));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
model_mappings_final.push(ModelMapping {
|
||||
file: yml_file_path.file_name().unwrap_or_default().to_string_lossy().into_owned(),
|
||||
model_name: model.name.clone()
|
||||
});
|
||||
deploy_requests_final.push(to_deploy_request(&model, sql_content));
|
||||
progress.log_success();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// This project_ctx has no semantic_model_paths defined.
|
||||
// It will be handled by the fallback mechanism if no other projects define paths.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- FALLBACK or ADDITIONAL: Scan for individual .yml files ---
|
||||
// This runs if no semantic_models_file was processed from any project,
|
||||
// or to supplement if specific logic allows (currently, it runs if processed_models_from_spec is false).
|
||||
if !processed_models_from_spec {
|
||||
if buster_config.as_ref().map_or(false, |cfg| cfg.projects.as_ref().map_or(false, |p| p.iter().any(|pc| pc.semantic_models_file.is_some()))) {
|
||||
// This case means semantic_models_file was specified in some project but all failed to load/process.
|
||||
println!("⚠️ A semantic_models_file was specified in buster.yml project(s) but failed to process. Now attempting to scan for individual .yml files.");
|
||||
// Check if any project *attempted* to specify paths, to adjust message
|
||||
let any_project_had_paths_configured = buster_config.as_ref().map_or(false, |cfg|
|
||||
cfg.projects.as_ref().map_or(false, |p_vec|
|
||||
p_vec.iter().any(|pc| pc.semantic_model_paths.as_ref().map_or(false, |paths| !paths.is_empty()))
|
||||
)
|
||||
);
|
||||
|
||||
if any_project_had_paths_configured {
|
||||
println!("⚠️ Semantic model paths were specified in buster.yml project(s) but may have failed to yield models or directories were empty/inaccessible. Now attempting to scan for individual .yml files based on broader model_paths configuration.");
|
||||
} else if buster_config.is_some() {
|
||||
println!("ℹ️ No semantic_models_file specified in any project in buster.yml. Falling back to scanning for individual .yml files.");
|
||||
println!("ℹ️ No specific semantic_model_paths found or processed from projects in buster.yml. Falling back to scanning for individual .yml files based on model_paths.");
|
||||
} else {
|
||||
println!("ℹ️ No buster.yml loaded. Scanning current/target directory for individual .yml files.");
|
||||
}
|
||||
|
@ -879,7 +902,7 @@ models:
|
|||
exclude_tags: None,
|
||||
model_paths: None,
|
||||
name: Some("Test Project".to_string()),
|
||||
semantic_models_file: None,
|
||||
semantic_model_paths: None,
|
||||
};
|
||||
|
||||
let global_config = BusterConfig {
|
||||
|
|
|
@ -18,7 +18,7 @@ use glob::{Pattern};
|
|||
|
||||
pub async fn generate_semantic_models_command(
|
||||
path_arg: Option<String>,
|
||||
target_semantic_file_arg: Option<String>,
|
||||
target_output_dir_arg: Option<String>,
|
||||
) -> Result<()> {
|
||||
println!(
|
||||
"{}",
|
||||
|
@ -26,7 +26,7 @@ pub async fn generate_semantic_models_command(
|
|||
);
|
||||
|
||||
// 1. Determine Buster configuration directory (where buster.yml is or should be)
|
||||
// For now, assume current directory. This might need to be more sophisticated if target_semantic_file_arg implies a different project.
|
||||
// For now, assume current directory. This might need to be more sophisticated if target_output_dir_arg implies a different project.
|
||||
let buster_config_dir = std::env::current_dir().context("Failed to get current directory")?;
|
||||
|
||||
// 2. Load BusterConfig
|
||||
|
@ -43,46 +43,48 @@ pub async fn generate_semantic_models_command(
|
|||
}
|
||||
};
|
||||
|
||||
// 3. Determine target semantic YAML file path
|
||||
let semantic_models_file_path_str = match target_semantic_file_arg {
|
||||
Some(path_str) => path_str,
|
||||
None => match buster_config.projects.as_ref().and_then(|projects| projects.first()) {
|
||||
Some(project) => project.semantic_models_file.clone().unwrap_or_else(|| "models.yml".to_string()),
|
||||
None => {
|
||||
return Err(anyhow!(
|
||||
"No target semantic model file specified and 'semantic_models_file' not set in buster.yml. \nPlease use the --output-file option or configure buster.yml via 'buster init'."
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
// Resolve the path: if it's absolute, use it. If relative, resolve from buster_config_dir.
|
||||
let semantic_models_file_path = if Path::new(&semantic_models_file_path_str).is_absolute() {
|
||||
PathBuf::from(&semantic_models_file_path_str)
|
||||
// 3. Determine target semantic YAML base directory and generation mode
|
||||
let mut is_side_by_side_generation = false;
|
||||
let effective_semantic_models_base_dir: PathBuf; // Base for path construction
|
||||
|
||||
if let Some(path_str) = target_output_dir_arg {
|
||||
// User specified an output directory via CLI arg. Not side-by-side.
|
||||
effective_semantic_models_base_dir = if Path::new(&path_str).is_absolute() {
|
||||
PathBuf::from(path_str)
|
||||
} else {
|
||||
buster_config_dir.join(path_str)
|
||||
};
|
||||
println!("Target semantic models base directory (from CLI arg): {}", effective_semantic_models_base_dir.display().to_string().cyan());
|
||||
fs::create_dir_all(&effective_semantic_models_base_dir).with_context(|| format!("Failed to create semantic models base directory: {}", effective_semantic_models_base_dir.display()))?;
|
||||
} else {
|
||||
buster_config_dir.join(&semantic_models_file_path_str)
|
||||
};
|
||||
// No CLI arg, check buster.yml config
|
||||
let configured_semantic_paths = buster_config.projects.as_ref()
|
||||
.and_then(|projs| projs.first())
|
||||
.and_then(|proj| proj.semantic_model_paths.as_ref());
|
||||
|
||||
println!("Target semantic model file: {}", semantic_models_file_path.display().to_string().cyan());
|
||||
|
||||
// 4. Load existing semantic models from the target file (if it exists)
|
||||
let mut existing_yaml_models_map: HashMap<String, YamlModel> = if semantic_models_file_path.exists() {
|
||||
println!("Loading existing semantic models from {}", semantic_models_file_path.display());
|
||||
let content = fs::read_to_string(&semantic_models_file_path)
|
||||
.with_context(|| format!("Failed to read existing semantic model file: {}", semantic_models_file_path.display()))?;
|
||||
|
||||
if content.trim().is_empty() {
|
||||
println!("{}", "Existing semantic model file is empty.".yellow());
|
||||
HashMap::new()
|
||||
if configured_semantic_paths.map_or(true, |paths| paths.is_empty()) { // Default to side-by-side if None or empty list
|
||||
is_side_by_side_generation = true;
|
||||
effective_semantic_models_base_dir = buster_config_dir.clone(); // Project root is the base for side-by-side
|
||||
println!("Semantic models will be generated side-by-side with SQL models (base: {}).", effective_semantic_models_base_dir.display().to_string().cyan());
|
||||
// No specific single base directory to create for all YAMLs in this mode.
|
||||
} else {
|
||||
let spec: YamlSemanticLayerSpec = serde_yaml::from_str(&content)
|
||||
.with_context(|| format!("Failed to parse existing semantic model file: {}. Ensure it is a valid YAML with a top-level 'models:' key.", semantic_models_file_path.display()))?;
|
||||
spec.models.into_iter().map(|m| (m.name.clone(), m)).collect()
|
||||
// Configured path(s) exist, use the first one. Not side-by-side.
|
||||
let first_path_str = configured_semantic_paths.unwrap().first().unwrap(); // Safe due to map_or and is_empty checks
|
||||
effective_semantic_models_base_dir = if Path::new(first_path_str).is_absolute() {
|
||||
PathBuf::from(first_path_str)
|
||||
} else {
|
||||
buster_config_dir.join(first_path_str)
|
||||
};
|
||||
println!("Target semantic models base directory (from buster.yml): {}", effective_semantic_models_base_dir.display().to_string().cyan());
|
||||
fs::create_dir_all(&effective_semantic_models_base_dir).with_context(|| format!("Failed to create semantic models base directory: {}", effective_semantic_models_base_dir.display()))?;
|
||||
}
|
||||
} else {
|
||||
println!("{}", "No existing semantic model file found. Will generate a new one.".yellow());
|
||||
HashMap::new()
|
||||
};
|
||||
let initial_model_count = existing_yaml_models_map.len();
|
||||
}
|
||||
|
||||
// 4. Load existing semantic models - THIS LOGIC WILL CHANGE SIGNIFICANTLY.
|
||||
// For now, we clear it as we load 1-to-1.
|
||||
let mut existing_yaml_models_map: HashMap<String, YamlModel> = HashMap::new();
|
||||
|
||||
let initial_model_count = 0; // This will be re-evaluated based on files found
|
||||
|
||||
// 5. Run dbt docs generate (similar to init.rs)
|
||||
let dbt_project_path = &buster_config_dir; // Assuming buster.yml is at the root of dbt project
|
||||
|
@ -153,14 +155,72 @@ pub async fn generate_semantic_models_command(
|
|||
let mut columns_updated_count = 0;
|
||||
let mut columns_removed_count = 0;
|
||||
|
||||
let mut processed_dbt_model_names: HashSet<String> = HashSet::new();
|
||||
let mut processed_dbt_model_unique_ids: HashSet<String> = HashSet::new(); // Using unique_id for tracking
|
||||
|
||||
for (dbt_node_id, dbt_node) in dbt_catalog.nodes.iter().filter(|(_,n)| n.resource_type == "model") {
|
||||
let dbt_model_name = dbt_node.metadata.name.clone();
|
||||
processed_dbt_model_names.insert(dbt_model_name.clone());
|
||||
// Get dbt model source roots for path stripping (similar to init.rs)
|
||||
let dbt_project_file_content_for_paths = crate::commands::init::parse_dbt_project_file_content(&buster_config_dir)?;
|
||||
let dbt_model_source_roots: Vec<PathBuf> = dbt_project_file_content_for_paths.as_ref()
|
||||
.map(|content| content.model_paths.iter().map(PathBuf::from).collect())
|
||||
.unwrap_or_else(|| vec![PathBuf::from("models")]);
|
||||
|
||||
// --- Scoping logic --- Apply path_arg and configured_model_path_patterns ---
|
||||
let dbt_original_file_path_abs = buster_config_dir.join(&dbt_node.original_file_path);
|
||||
for (dbt_node_id, dbt_node) in dbt_catalog.nodes.iter().filter(|(_,n)| {
|
||||
match &n.resource_type {
|
||||
Some(rt) => rt == "model",
|
||||
None => {
|
||||
eprintln!(
|
||||
"{}",
|
||||
format!(
|
||||
"Warning: Skipping dbt node with unique_id: {} because it is missing 'resource_type' in catalog.json.",
|
||||
n.unique_id
|
||||
).yellow()
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
}) {
|
||||
// Path construction for individual YAML
|
||||
let Some(ref dbt_original_file_path_str) = dbt_node.original_file_path else {
|
||||
eprintln!("{}", format!("Warning: Skipping dbt model {} due to missing 'original_file_path'.", dbt_node.unique_id).yellow());
|
||||
continue;
|
||||
};
|
||||
|
||||
let dbt_model_path_obj = Path::new(dbt_original_file_path_str);
|
||||
let mut relative_to_dbt_model_root = PathBuf::new();
|
||||
let mut found_base_for_stripping = false;
|
||||
for dbt_source_root in &dbt_model_source_roots { // dbt_source_root is e.g. "models"
|
||||
if let Ok(stripped_path) = dbt_model_path_obj.strip_prefix(dbt_source_root) {
|
||||
relative_to_dbt_model_root = stripped_path.to_path_buf(); // e.g. "marts/sales/revenue.sql"
|
||||
found_base_for_stripping = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !found_base_for_stripping {
|
||||
// Fallback: if original_file_path_str didn't start with any known dbt_model_source_roots,
|
||||
// then use original_file_path_str as is for the suffix part for dedicated dir mode.
|
||||
// For side-by-side, the full original path is used anyway.
|
||||
relative_to_dbt_model_root = dbt_model_path_obj.to_path_buf();
|
||||
eprintln!("{}", format!(
|
||||
"Warning: Could not strip a known dbt model source root ('{:?}') from dbt model path '{}'. Using full path for suffix calculation: '{}'",
|
||||
dbt_model_source_roots, dbt_original_file_path_str, relative_to_dbt_model_root.display()
|
||||
).yellow()
|
||||
);
|
||||
}
|
||||
|
||||
let individual_semantic_yaml_path: PathBuf;
|
||||
if is_side_by_side_generation {
|
||||
// Side-by-side: YAML is next to SQL. dbt_original_file_path_str is relative to buster_config_dir.
|
||||
individual_semantic_yaml_path = buster_config_dir.join(dbt_original_file_path_str).with_extension("yml");
|
||||
} else {
|
||||
// Dedicated output directory (effective_semantic_models_base_dir)
|
||||
// relative_to_dbt_model_root is the path part after the dbt model source root (e.g. "marts/sales/revenue.sql")
|
||||
let yaml_filename_with_subdir = relative_to_dbt_model_root.with_extension("yml"); // e.g. "marts/sales/revenue.yml"
|
||||
individual_semantic_yaml_path = effective_semantic_models_base_dir.join(yaml_filename_with_subdir);
|
||||
}
|
||||
|
||||
processed_dbt_model_unique_ids.insert(dbt_node.unique_id.clone()); // Store unique_id
|
||||
|
||||
// --- Scoping logic (remains similar, but applied before file load) ---
|
||||
let dbt_original_file_path_abs = buster_config_dir.join(dbt_original_file_path_str);
|
||||
let is_in_configured_model_paths = configured_model_path_patterns.is_empty() ||
|
||||
configured_model_path_patterns.iter().any(|p| p.matches_path(&dbt_original_file_path_abs));
|
||||
|
||||
|
@ -173,117 +233,156 @@ pub async fn generate_semantic_models_command(
|
|||
dbt_original_file_path_abs.starts_with(&target_path_abs)
|
||||
}
|
||||
}
|
||||
None => true, // No path_arg, so all models (that match buster.yml model_paths) are in scope
|
||||
None => true,
|
||||
};
|
||||
|
||||
if !is_in_configured_model_paths || !is_in_path_arg_scope {
|
||||
// println!("Skipping dbt model {} (not in scope of generate command or buster.yml model_paths)", dbt_model_name.dimmed());
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ensure metadata.name exists, as it's crucial for the semantic model name
|
||||
let Some(ref dbt_model_name_for_yaml_from_metadata) = dbt_node.metadata.name else {
|
||||
eprintln!(
|
||||
"{}",
|
||||
format!(
|
||||
"Warning: Skipping dbt model with unique_id: {} because its 'metadata.name' is missing in catalog.json.",
|
||||
dbt_node.unique_id
|
||||
).yellow()
|
||||
);
|
||||
continue;
|
||||
};
|
||||
let dbt_model_name_for_yaml = dbt_model_name_for_yaml_from_metadata.clone(); // Now safe to clone
|
||||
|
||||
dbt_models_processed_count += 1;
|
||||
// --- End Scoping Logic ---
|
||||
|
||||
match existing_yaml_models_map.get_mut(&dbt_model_name) {
|
||||
Some(mut existing_semantic_model) => {
|
||||
let existing_semantic_model_opt: Option<YamlModel> = if individual_semantic_yaml_path.exists() {
|
||||
match fs::read_to_string(&individual_semantic_yaml_path) {
|
||||
Ok(content) => {
|
||||
match serde_yaml::from_str::<YamlModel>(&content) {
|
||||
Ok(model) => Some(model),
|
||||
Err(e) => {
|
||||
eprintln!("{}", format!("Warning: Failed to parse existing semantic YAML '{}': {}. Will attempt to overwrite.", individual_semantic_yaml_path.display(), e).yellow());
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("{}", format!("Warning: Failed to read existing semantic YAML '{}': {}. Will attempt to create anew.", individual_semantic_yaml_path.display(), e).yellow());
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
match existing_semantic_model_opt {
|
||||
Some(mut existing_model) => {
|
||||
// Existing model: Update it
|
||||
let mut model_was_updated = false;
|
||||
println!("Updating existing semantic model: {}", dbt_model_name.cyan());
|
||||
println!("Updating existing semantic model: {} at {}", dbt_model_name_for_yaml.cyan(), individual_semantic_yaml_path.display());
|
||||
|
||||
// Update description if dbt comment exists and is different
|
||||
if let Some(dbt_comment) = &dbt_node.metadata.comment {
|
||||
if existing_semantic_model.description.as_deref() != Some(dbt_comment.as_str()) {
|
||||
println!(" Updating description for model {}", dbt_model_name);
|
||||
existing_semantic_model.description = Some(dbt_comment.clone());
|
||||
model_was_updated = true;
|
||||
}
|
||||
} // If dbt_comment is None, we keep user's existing description
|
||||
|
||||
// Update original_file_path
|
||||
if existing_semantic_model.original_file_path.as_deref() != Some(dbt_node.original_file_path.as_str()) {
|
||||
existing_semantic_model.original_file_path = Some(dbt_node.original_file_path.clone());
|
||||
if existing_model.name != dbt_model_name_for_yaml {
|
||||
// This might happen if filename and inner model name differ. We prioritize dbt_model_name_for_yaml.
|
||||
// Or if user manually changed name in YML. For now, dbt catalog is source of truth for name.
|
||||
println!(" Aligning name in YAML from '{}' to '{}'", existing_model.name, dbt_model_name_for_yaml);
|
||||
existing_model.name = dbt_model_name_for_yaml.clone();
|
||||
model_was_updated = true;
|
||||
}
|
||||
|
||||
// Update DB/Schema from dbt catalog if present
|
||||
// ... (add logic for database/schema update based on dbt_node.database/schema) ...
|
||||
if let Some(dbt_comment) = &dbt_node.metadata.comment {
|
||||
if existing_model.description.as_deref() != Some(dbt_comment.as_str()) {
|
||||
existing_model.description = Some(dbt_comment.clone());
|
||||
model_was_updated = true;
|
||||
}
|
||||
} // Consider if dbt_comment=None should clear existing_model.description
|
||||
|
||||
if existing_model.original_file_path.as_deref() != Some(dbt_original_file_path_str.as_str()) {
|
||||
existing_model.original_file_path = Some(dbt_original_file_path_str.clone());
|
||||
model_was_updated = true;
|
||||
}
|
||||
// Update DB/Schema if different - dbt catalog is source of truth
|
||||
if existing_model.database != dbt_node.database {
|
||||
existing_model.database = dbt_node.database.clone();
|
||||
model_was_updated = true;
|
||||
}
|
||||
if existing_model.schema != dbt_node.schema {
|
||||
existing_model.schema = dbt_node.schema.clone();
|
||||
model_was_updated = true;
|
||||
}
|
||||
|
||||
// Reconcile columns
|
||||
let mut current_dims: Vec<YamlDimension> = Vec::new();
|
||||
let mut current_measures: Vec<YamlMeasure> = Vec::new();
|
||||
let mut dbt_columns_map: HashMap<String, &DbtColumn> = dbt_node.columns.values().map(|c| (c.name.clone(), c)).collect();
|
||||
|
||||
// Process existing dimensions
|
||||
for existing_dim in std::mem::take(&mut existing_semantic_model.dimensions) {
|
||||
if let Some(dbt_col) = dbt_columns_map.remove(&existing_dim.name) {
|
||||
let mut updated_dim = existing_dim.clone();
|
||||
let mut dim_updated = false;
|
||||
for existing_dim_col in std::mem::take(&mut existing_model.dimensions) {
|
||||
if let Some(dbt_col) = dbt_columns_map.remove(&existing_dim_col.name) {
|
||||
let mut updated_dim = existing_dim_col.clone();
|
||||
let mut dim_col_updated = false;
|
||||
if updated_dim.type_.as_deref() != Some(dbt_col.column_type.as_str()) {
|
||||
updated_dim.type_ = Some(dbt_col.column_type.clone());
|
||||
dim_updated = true; columns_updated_count +=1;
|
||||
dim_col_updated = true; columns_updated_count +=1;
|
||||
}
|
||||
if let Some(dbt_col_comment) = &dbt_col.comment {
|
||||
if updated_dim.description.as_deref() != Some(dbt_col_comment.as_str()) {
|
||||
updated_dim.description = Some(dbt_col_comment.clone());
|
||||
dim_updated = true; columns_updated_count +=1;
|
||||
dim_col_updated = true; columns_updated_count +=1;
|
||||
}
|
||||
} // else keep user's existing_dim.description
|
||||
current_dims.push(updated_dim);
|
||||
if dim_updated { model_was_updated = true; }
|
||||
if dim_col_updated { model_was_updated = true; }
|
||||
} else {
|
||||
println!(" Removing dimension '{}' from model '{}' (no longer in dbt model)", existing_dim.name.yellow(), dbt_model_name);
|
||||
println!(" Removing dimension '{}' from semantic model '{}' (no longer in dbt model)", existing_dim_col.name.yellow(), dbt_model_name_for_yaml);
|
||||
columns_removed_count += 1; model_was_updated = true;
|
||||
}
|
||||
}
|
||||
// Process existing measures (similar logic)
|
||||
for existing_measure in std::mem::take(&mut existing_semantic_model.measures) {
|
||||
if let Some(dbt_col) = dbt_columns_map.remove(&existing_measure.name) {
|
||||
let mut updated_measure = existing_measure.clone();
|
||||
let mut measure_updated = false;
|
||||
for existing_measure_col in std::mem::take(&mut existing_model.measures) {
|
||||
if let Some(dbt_col) = dbt_columns_map.remove(&existing_measure_col.name) {
|
||||
let mut updated_measure = existing_measure_col.clone();
|
||||
let mut measure_col_updated = false;
|
||||
if updated_measure.type_.as_deref() != Some(dbt_col.column_type.as_str()) {
|
||||
updated_measure.type_ = Some(dbt_col.column_type.clone());
|
||||
measure_updated = true; columns_updated_count +=1;
|
||||
measure_col_updated = true; columns_updated_count +=1;
|
||||
}
|
||||
if let Some(dbt_col_comment) = &dbt_col.comment {
|
||||
if updated_measure.description.as_deref() != Some(dbt_col_comment.as_str()) {
|
||||
updated_measure.description = Some(dbt_col_comment.clone());
|
||||
measure_updated = true; columns_updated_count +=1;
|
||||
measure_col_updated = true; columns_updated_count +=1;
|
||||
}
|
||||
} // else keep user's description
|
||||
current_measures.push(updated_measure);
|
||||
if measure_updated { model_was_updated = true; }
|
||||
if measure_col_updated { model_was_updated = true; }
|
||||
} else {
|
||||
println!(" Removing measure '{}' from model '{}' (no longer in dbt model)", existing_measure.name.yellow(), dbt_model_name);
|
||||
println!(" Removing measure '{}' from semantic model '{}' (no longer in dbt model)", existing_measure_col.name.yellow(), dbt_model_name_for_yaml);
|
||||
columns_removed_count += 1; model_was_updated = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Add new columns from dbt_node not yet processed
|
||||
for (col_name, dbt_col) in dbt_columns_map {
|
||||
println!(" Adding new column '{}' to model '{}'", col_name.green(), dbt_model_name);
|
||||
println!(" Adding new column '{}' to semantic model '{}'", col_name.green(), dbt_model_name_for_yaml);
|
||||
if is_measure_type(&dbt_col.column_type) {
|
||||
current_measures.push(YamlMeasure {
|
||||
name: dbt_col.name.clone(),
|
||||
description: dbt_col.comment.clone(),
|
||||
type_: Some(dbt_col.column_type.clone()),
|
||||
});
|
||||
current_measures.push(YamlMeasure { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.column_type.clone()) });
|
||||
} else {
|
||||
current_dims.push(YamlDimension {
|
||||
name: dbt_col.name.clone(),
|
||||
description: dbt_col.comment.clone(),
|
||||
type_: Some(dbt_col.column_type.clone()),
|
||||
searchable: false, // Default for new dimensions
|
||||
options: None,
|
||||
});
|
||||
current_dims.push(YamlDimension { name: dbt_col.name.clone(), description: dbt_col.comment.clone(), type_: Some(dbt_col.column_type.clone()), searchable: false, options: None });
|
||||
}
|
||||
columns_added_count += 1; model_was_updated = true;
|
||||
}
|
||||
existing_semantic_model.dimensions = current_dims;
|
||||
existing_semantic_model.measures = current_measures;
|
||||
if model_was_updated { models_updated_count += 1; }
|
||||
existing_model.dimensions = current_dims;
|
||||
existing_model.measures = current_measures;
|
||||
|
||||
if model_was_updated {
|
||||
models_updated_count += 1;
|
||||
let yaml_string = serde_yaml::to_string(&existing_model).context(format!("Failed to serialize updated semantic model {} to YAML", existing_model.name))?;
|
||||
if let Some(parent_dir) = individual_semantic_yaml_path.parent() { fs::create_dir_all(parent_dir)?; }
|
||||
fs::write(&individual_semantic_yaml_path, yaml_string).context(format!("Failed to write updated semantic model to {}", individual_semantic_yaml_path.display()))?;
|
||||
} else {
|
||||
println!(" No changes detected for semantic model: {}", dbt_model_name_for_yaml);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// New model: Generate from scratch
|
||||
println!("Found new dbt model: {}. Generating semantic model definition.", dbt_model_name.green());
|
||||
// New semantic model: Generate from scratch
|
||||
println!("Generating new semantic model: {} at {}", dbt_model_name_for_yaml.green(), individual_semantic_yaml_path.display());
|
||||
let mut dimensions = Vec::new();
|
||||
let mut measures = Vec::new();
|
||||
for (_col_name, col) in &dbt_node.columns {
|
||||
|
@ -294,22 +393,25 @@ pub async fn generate_semantic_models_command(
|
|||
}
|
||||
}
|
||||
let new_model = YamlModel {
|
||||
name: dbt_model_name.clone(),
|
||||
name: dbt_model_name_for_yaml.clone(),
|
||||
description: dbt_node.metadata.comment.clone(),
|
||||
data_source_name: None, // Will be resolved by deploy or could use buster_config defaults
|
||||
data_source_name: buster_config.projects.as_ref().and_then(|p|p.first()).and_then(|pc|pc.data_source_name.clone()), // Default from first project context
|
||||
database: dbt_node.database.clone(),
|
||||
schema: dbt_node.schema.clone(),
|
||||
dimensions,
|
||||
measures,
|
||||
original_file_path: Some(dbt_node.original_file_path.clone()),
|
||||
original_file_path: Some(dbt_original_file_path_str.clone()),
|
||||
};
|
||||
existing_yaml_models_map.insert(dbt_model_name, new_model);
|
||||
let yaml_string = serde_yaml::to_string(&new_model).context(format!("Failed to serialize new semantic model {} to YAML", new_model.name))?;
|
||||
if let Some(parent_dir) = individual_semantic_yaml_path.parent() { fs::create_dir_all(parent_dir)?; }
|
||||
fs::write(&individual_semantic_yaml_path, yaml_string).context(format!("Failed to write new semantic model to {}", individual_semantic_yaml_path.display()))?;
|
||||
new_models_added_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Identify and remove models that are in semantic_models_file but no longer in dbt catalog (or not in scope)
|
||||
// Remove or comment out the old logic for handling removed models from a single spec file
|
||||
/*
|
||||
let mut removed_models_count = 0;
|
||||
existing_yaml_models_map.retain(|model_name: &String, _model: &mut YamlModel| {
|
||||
if processed_dbt_model_names.contains(model_name) {
|
||||
|
@ -320,27 +422,31 @@ pub async fn generate_semantic_models_command(
|
|||
false
|
||||
}
|
||||
});
|
||||
*/
|
||||
|
||||
// 8. Save updated semantic models
|
||||
let final_models_vec: Vec<YamlModel> = existing_yaml_models_map.values().cloned().collect();
|
||||
let updated_spec = YamlSemanticLayerSpec { models: final_models_vec };
|
||||
|
||||
let yaml_string = serde_yaml::to_string(&updated_spec).context("Failed to serialize updated semantic models to YAML")?;
|
||||
if let Some(parent_dir) = semantic_models_file_path.parent() {
|
||||
fs::create_dir_all(parent_dir).with_context(|| format!("Failed to create directory for semantic models file: {}", parent_dir.display()))?;
|
||||
}
|
||||
fs::write(&semantic_models_file_path, yaml_string).with_context(|| format!("Failed to write updated semantic models to {}", semantic_models_file_path.display()))?;
|
||||
// Remove the final save logic for the aggregated spec file
|
||||
// let final_models_vec: Vec<YamlModel> = existing_yaml_models_map.values().cloned().collect();
|
||||
// let updated_spec = YamlSemanticLayerSpec { models: final_models_vec };
|
||||
// let yaml_string = serde_yaml::to_string(&updated_spec).context("Failed to serialize updated semantic models to YAML")?;
|
||||
// fs::write(&semantic_models_base_dir_path, yaml_string).context(format!("Failed to write updated semantic models to {}", semantic_models_base_dir_path.display()))?;
|
||||
// Note: The above fs::write was to semantic_models_base_dir_path which is a directory, this was an error in previous diff. It should have been semantic_models_file_path.
|
||||
// Since we save per file, this block is removed.
|
||||
|
||||
println!("\n{}", "Semantic Model Generation Summary:".bold().green());
|
||||
println!(" Processed dbt models (in scope): {}", dbt_models_processed_count);
|
||||
println!(" Semantic models initially loaded: {}", initial_model_count);
|
||||
println!(" New semantic models added: {}", new_models_added_count.to_string().green());
|
||||
println!(" Existing semantic models updated: {}", models_updated_count.to_string().cyan());
|
||||
println!(" Semantic models removed (dbt model deleted/out of scope): {}", removed_models_count.to_string().red());
|
||||
println!(" Semantic models removed (dbt model deleted/out of scope): {}", columns_removed_count.to_string().red());
|
||||
println!(" Columns added: {}", columns_added_count.to_string().green());
|
||||
println!(" Columns updated (type/dbt_comment): {}", columns_updated_count.to_string().cyan());
|
||||
println!(" Columns removed: {}", columns_removed_count.to_string().red());
|
||||
println!("✓ Semantic models successfully updated at {}", semantic_models_file_path.display().to_string().green());
|
||||
|
||||
if is_side_by_side_generation {
|
||||
println!("✓ Semantic models successfully updated (side-by-side with SQL models, base directory: {}).", effective_semantic_models_base_dir.display().to_string().green());
|
||||
} else {
|
||||
println!("✓ Semantic models successfully updated in {}.", effective_semantic_models_base_dir.display().to_string().green());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -127,26 +127,27 @@ struct DbtModelGroupConfig {
|
|||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
struct DbtProjectModelsBlock {
|
||||
pub struct DbtProjectModelsBlock {
|
||||
#[serde(flatten)]
|
||||
project_configs: HashMap<String, DbtModelGroupConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
pub struct DbtProjectFileContent {
|
||||
name: Option<String>,
|
||||
#[serde(rename = "model-paths", default = "default_model_paths")]
|
||||
pub model_paths: Vec<String>,
|
||||
#[serde(default)]
|
||||
models: Option<DbtProjectModelsBlock>,
|
||||
}
|
||||
|
||||
fn default_model_paths() -> Vec<String> {
|
||||
vec!["models".to_string()]
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
struct DbtProjectFileContent {
|
||||
name: Option<String>,
|
||||
#[serde(rename = "model-paths", default = "default_model_paths")]
|
||||
model_paths: Vec<String>,
|
||||
#[serde(default)]
|
||||
models: Option<DbtProjectModelsBlock>,
|
||||
}
|
||||
|
||||
// Helper function to parse dbt_project.yml if it exists
|
||||
fn parse_dbt_project_file_content(base_dir: &Path) -> Result<Option<DbtProjectFileContent>> {
|
||||
// Make this function public so it can be called from generate.rs
|
||||
pub fn parse_dbt_project_file_content(base_dir: &Path) -> Result<Option<DbtProjectFileContent>> {
|
||||
let dbt_project_path = base_dir.join("dbt_project.yml");
|
||||
if dbt_project_path.exists() && dbt_project_path.is_file() {
|
||||
println!(
|
||||
|
@ -490,58 +491,66 @@ pub async fn init(destination_path: Option<&str>) -> Result<()> {
|
|||
.with_default(true)
|
||||
.prompt()?
|
||||
{
|
||||
// Default directory for semantic models:
|
||||
// Try to use the first model_path from the first project context, if available.
|
||||
let default_semantic_models_dir = current_buster_config.projects.as_ref()
|
||||
// Default directory for semantic models: "" for side-by-side
|
||||
let default_semantic_models_dirs_str = current_buster_config.projects.as_ref()
|
||||
.and_then(|projs| projs.first())
|
||||
.and_then(|proj| proj.model_paths.as_ref())
|
||||
.and_then(|paths| paths.first())
|
||||
.map(|p| Path::new(p).parent().unwrap_or_else(|| Path::new(p)).to_string_lossy().into_owned()) // Use parent of first model path, or the path itself
|
||||
.unwrap_or_else(|| "./buster_semantic_models".to_string());
|
||||
.and_then(|proj| proj.semantic_model_paths.as_ref())
|
||||
.filter(|paths| !paths.is_empty()) // Only join if paths exist and are not empty
|
||||
.map(|paths| paths.join(","))
|
||||
.unwrap_or_else(String::new); // Default to empty string for side-by-side
|
||||
|
||||
let semantic_models_dirs_input_str = Text::new("Enter directory/directories for generated semantic model YAML files (comma-separated, leave empty for side-by-side with SQL files):")
|
||||
.with_default(&default_semantic_models_dirs_str)
|
||||
.with_help_message("Example: ./semantic_layer (for dedicated dir) or empty (for side-by-side)")
|
||||
.prompt()?;
|
||||
|
||||
let semantic_models_dir_str = Text::new("Enter directory for generated semantic model YAML files:")
|
||||
.with_default(&default_semantic_models_dir)
|
||||
.with_help_message("Example: ./semantic_layer or ./models")
|
||||
.prompt()?;
|
||||
let semantic_models_filename_str = Text::new("Enter filename for the main semantic models YAML file:")
|
||||
.with_default("models.yml") // Keep models.yml as a common default name
|
||||
.with_help_message("Example: main_spec.yml or buster_models.yml")
|
||||
.prompt()?;
|
||||
|
||||
let semantic_output_path = PathBuf::from(&semantic_models_dir_str).join(&semantic_models_filename_str);
|
||||
|
||||
// Ensure the output directory exists
|
||||
if let Some(parent_dir) = semantic_output_path.parent() {
|
||||
fs::create_dir_all(parent_dir).map_err(|e| {
|
||||
anyhow!("Failed to create directory for semantic models YAML '{}': {}", parent_dir.display(), e)
|
||||
let semantic_model_paths_vec = semantic_models_dirs_input_str
|
||||
.split(',')
|
||||
.map(|s| s.trim().to_string())
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
// If semantic_model_paths_vec is empty, it implies side-by-side generation.
|
||||
// No error here, this is a valid configuration.
|
||||
|
||||
if !semantic_model_paths_vec.is_empty() {
|
||||
// Only create primary output directory if a specific path is given (not side-by-side)
|
||||
let primary_semantic_models_dir_str = semantic_model_paths_vec.first().unwrap().clone(); // Must exist due to !is_empty()
|
||||
let primary_semantic_output_dir_abs = dest_path.join(&primary_semantic_models_dir_str);
|
||||
fs::create_dir_all(&primary_semantic_output_dir_abs).map_err(|e| {
|
||||
anyhow!("Failed to create primary directory for semantic models YAML '{}': {}", primary_semantic_output_dir_abs.display(), e)
|
||||
})?;
|
||||
println!("{} {}", "✓".green(), format!("Ensured directory exists: {}", parent_dir.display()).dimmed());
|
||||
println!("{} {}", "✓".green(), format!("Ensured primary semantic model directory exists: {}", primary_semantic_output_dir_abs.display()).dimmed());
|
||||
} else {
|
||||
println!("{}", "Semantic models will be generated side-by-side with their SQL counterparts.".dimmed());
|
||||
}
|
||||
|
||||
let relative_semantic_path = match pathdiff::diff_paths(&semantic_output_path, &dest_path) {
|
||||
Some(p) => p.to_string_lossy().into_owned(),
|
||||
None => {
|
||||
eprintln!("{}", "Could not determine relative path for semantic models file. Using absolute path.".yellow());
|
||||
semantic_output_path.to_string_lossy().into_owned()
|
||||
|
||||
// Store relative paths in the config
|
||||
let relative_semantic_model_paths = semantic_model_paths_vec.iter().map(|p_str| {
|
||||
let p_path = PathBuf::from(p_str);
|
||||
match pathdiff::diff_paths(&p_path, &dest_path) {
|
||||
Some(p) => p.to_string_lossy().into_owned(),
|
||||
None => {
|
||||
eprintln!("{}", format!("Could not determine relative path for semantic model directory '{}'. Using path as is.", p_str).yellow());
|
||||
p_str.clone()
|
||||
}
|
||||
}
|
||||
};
|
||||
}).collect::<Vec<String>>();
|
||||
|
||||
// Store in the first project context
|
||||
if let Some(projects) = current_buster_config.projects.as_mut() {
|
||||
if let Some(first_project) = projects.first_mut() {
|
||||
first_project.semantic_models_file = Some(relative_semantic_path.clone());
|
||||
first_project.semantic_model_paths = Some(relative_semantic_model_paths.clone());
|
||||
} else {
|
||||
// This case should ideally not happen if create_buster_config_file always creates a project
|
||||
eprintln!("{}", "Warning: No project contexts found in buster.yml to store semantic_models_file path.".yellow());
|
||||
// Optionally, create a default project here if necessary, or rely on create_buster_config_file to have done its job
|
||||
eprintln!("{}", "Warning: No project contexts found in buster.yml to store semantic_model_paths.".yellow());
|
||||
}
|
||||
} else {
|
||||
eprintln!("{}", "Warning: 'projects' array is None in buster.yml. Cannot store semantic_models_file path.".yellow());
|
||||
eprintln!("{}", "Warning: 'projects' array is None in buster.yml. Cannot store semantic_model_paths.".yellow());
|
||||
}
|
||||
|
||||
current_buster_config.save(&config_path).map_err(|e| anyhow!("Failed to save buster.yml with semantic model path: {}", e))?;
|
||||
println!("{} {} {}", "✓".green(), "Updated buster.yml with semantic_models_file path in the first project:".green(), relative_semantic_path.cyan());
|
||||
current_buster_config.save(&config_path).map_err(|e| anyhow!("Failed to save buster.yml with semantic model paths: {}", e))?;
|
||||
println!("{} {} {}: {}", "✓".green(), "Updated buster.yml with".green(), "semantic_model_paths".cyan(), relative_semantic_model_paths.join(", ").cyan());
|
||||
|
||||
generate_semantic_models_from_dbt_catalog(¤t_buster_config, &config_path, &dest_path).await?;
|
||||
}
|
||||
|
@ -552,59 +561,67 @@ pub async fn init(destination_path: Option<&str>) -> Result<()> {
|
|||
|
||||
// Helper function to manage the flow of semantic model generation
|
||||
async fn generate_semantic_models_flow(buster_config: &mut BusterConfig, config_path: &Path, buster_config_dir: &Path) -> Result<()> {
|
||||
let default_dir = "./buster_semantic_models";
|
||||
let default_file = "models.yml";
|
||||
let default_dirs_str = String::new(); // Default to empty string for side-by-side
|
||||
|
||||
// Try to get defaults from the first project context's semantic_models_file
|
||||
let (initial_dir, initial_file) = buster_config.projects.as_ref()
|
||||
// Try to get defaults from the first project context's semantic_model_paths
|
||||
let initial_dirs_str = buster_config.projects.as_ref()
|
||||
.and_then(|projs| projs.first())
|
||||
.and_then(|proj| proj.semantic_models_file.as_ref())
|
||||
.map(|p_str| {
|
||||
let pth = Path::new(p_str);
|
||||
let dir = pth.parent().and_then(|pp| pp.to_str()).unwrap_or(default_dir);
|
||||
let file = pth.file_name().and_then(|f| f.to_str()).unwrap_or(default_file);
|
||||
(dir.to_string(), file.to_string())
|
||||
})
|
||||
.unwrap_or((default_dir.to_string(), default_file.to_string()));
|
||||
.and_then(|proj| proj.semantic_model_paths.as_ref())
|
||||
.filter(|paths| !paths.is_empty()) // Only join if paths exist and are not empty
|
||||
.map(|paths| paths.join(","))
|
||||
.unwrap_or(default_dirs_str);
|
||||
|
||||
let semantic_models_dir_str = Text::new("Enter directory for generated semantic model YAML files:")
|
||||
.with_default(&initial_dir)
|
||||
.prompt()?;
|
||||
let semantic_models_filename_str = Text::new("Enter filename for the main semantic models YAML file:")
|
||||
.with_default(&initial_file)
|
||||
let semantic_models_dirs_input_str = Text::new("Enter directory/directories for generated semantic model YAML files (comma-separated, leave empty for side-by-side):")
|
||||
.with_default(&initial_dirs_str)
|
||||
.prompt()?;
|
||||
|
||||
let semantic_output_path = PathBuf::from(&semantic_models_dir_str).join(&semantic_models_filename_str);
|
||||
let semantic_model_paths_vec = semantic_models_dirs_input_str
|
||||
.split(',')
|
||||
.map(|s| s.trim().to_string())
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
// Ensure the output directory exists
|
||||
if let Some(parent_dir) = semantic_output_path.parent() {
|
||||
fs::create_dir_all(parent_dir).map_err(|e| {
|
||||
anyhow!("Failed to create directory for semantic models YAML '{}': {}", parent_dir.display(), e)
|
||||
// If semantic_model_paths_vec is empty, it implies side-by-side generation.
|
||||
// No error here.
|
||||
|
||||
if !semantic_model_paths_vec.is_empty() {
|
||||
let primary_semantic_models_dir_str = semantic_model_paths_vec.first().unwrap().clone();
|
||||
let primary_semantic_output_dir_abs = buster_config_dir.join(&primary_semantic_models_dir_str);
|
||||
|
||||
// Ensure the primary output directory exists
|
||||
fs::create_dir_all(&primary_semantic_output_dir_abs).map_err(|e| {
|
||||
anyhow!("Failed to create primary directory for semantic models YAML '{}': {}", primary_semantic_output_dir_abs.display(), e)
|
||||
})?;
|
||||
println!("{} {}", "✓".green(), format!("Ensured directory exists: {}", parent_dir.display()).dimmed());
|
||||
println!("{} {}", "✓".green(), format!("Ensured primary semantic model directory exists: {}", primary_semantic_output_dir_abs.display()).dimmed());
|
||||
} else {
|
||||
println!("{}", "Semantic models will be generated side-by-side with their SQL counterparts.".dimmed());
|
||||
}
|
||||
|
||||
let relative_semantic_path = match pathdiff::diff_paths(&semantic_output_path, buster_config_dir) {
|
||||
Some(p) => p.to_string_lossy().into_owned(),
|
||||
None => {
|
||||
eprintln!("{}", "Could not determine relative path for semantic models file. Using absolute path.".yellow());
|
||||
semantic_output_path.to_string_lossy().into_owned()
|
||||
// Store relative paths in the config
|
||||
let relative_semantic_model_paths = semantic_model_paths_vec.iter().map(|p_str| {
|
||||
let p_path = PathBuf::from(p_str);
|
||||
match pathdiff::diff_paths(&p_path, buster_config_dir) {
|
||||
Some(p) => p.to_string_lossy().into_owned(),
|
||||
None => {
|
||||
eprintln!("{}", format!("Could not determine relative path for semantic model directory '{}' relative to '{}'. Using path as is.", p_path.display(), buster_config_dir.display()).yellow());
|
||||
p_str.clone()
|
||||
}
|
||||
}
|
||||
};
|
||||
}).collect::<Vec<String>>();
|
||||
|
||||
// Store in the first project context
|
||||
if let Some(projects) = buster_config.projects.as_mut() {
|
||||
if let Some(first_project) = projects.first_mut() {
|
||||
first_project.semantic_models_file = Some(relative_semantic_path.clone());
|
||||
first_project.semantic_model_paths = Some(relative_semantic_model_paths.clone());
|
||||
} else {
|
||||
eprintln!("{}", "Warning: No project contexts found in buster.yml to update semantic_models_file path.".yellow());
|
||||
eprintln!("{}", "Warning: No project contexts found in buster.yml to update semantic_model_paths.".yellow());
|
||||
}
|
||||
} else {
|
||||
eprintln!("{}", "Warning: 'projects' array is None in buster.yml. Cannot update semantic_models_file path.".yellow());
|
||||
eprintln!("{}", "Warning: 'projects' array is None in buster.yml. Cannot update semantic_model_paths.".yellow());
|
||||
}
|
||||
|
||||
buster_config.save(config_path).map_err(|e| anyhow!("Failed to save buster.yml with semantic model path: {}", e))?;
|
||||
println!("{} {} {}", "✓".green(), "Updated buster.yml with semantic_models_file path in the first project:".green(), relative_semantic_path.cyan());
|
||||
buster_config.save(config_path).map_err(|e| anyhow!("Failed to save buster.yml with semantic model paths: {}", e))?;
|
||||
println!("{} {} {}: {}", "✓".green(), "Updated buster.yml with".green(), "semantic_model_paths".cyan(), relative_semantic_model_paths.join(", ").cyan());
|
||||
|
||||
generate_semantic_models_from_dbt_catalog(buster_config, config_path, buster_config_dir).await
|
||||
}
|
||||
|
@ -613,18 +630,49 @@ async fn generate_semantic_models_flow(buster_config: &mut BusterConfig, config_
|
|||
// Placeholder for the main logic function
|
||||
async fn generate_semantic_models_from_dbt_catalog(
|
||||
buster_config: &BusterConfig,
|
||||
_config_path: &Path, // Path to buster.yml (config_path is not directly used for choosing semantic_models_file anymore)
|
||||
_config_path: &Path, // Path to buster.yml
|
||||
buster_config_dir: &Path, // Directory containing buster.yml, assumed dbt project root
|
||||
) -> Result<()> {
|
||||
println!("{}", "Starting semantic model generation from dbt catalog...".dimmed());
|
||||
|
||||
// Get semantic_models_file from the first project context
|
||||
let semantic_output_path_str = buster_config.projects.as_ref()
|
||||
// Get the semantic model output configuration from the first project context
|
||||
let project_semantic_model_paths_config = buster_config.projects.as_ref()
|
||||
.and_then(|projs| projs.first())
|
||||
.and_then(|proj| proj.semantic_models_file.as_ref())
|
||||
.ok_or_else(|| anyhow!("Semantic models file path not set in any project context within BusterConfig. This should have been prompted."))?;
|
||||
.and_then(|proj| proj.semantic_model_paths.as_ref());
|
||||
|
||||
let is_side_by_side_generation = project_semantic_model_paths_config.map_or(true, |paths| paths.is_empty());
|
||||
|
||||
let path_construction_base_dir: PathBuf; // Base directory for constructing output paths
|
||||
|
||||
if is_side_by_side_generation {
|
||||
path_construction_base_dir = buster_config_dir.to_path_buf(); // Project root is the base for side-by-side
|
||||
println!("{}", format!("Semantic models will be generated side-by-side with SQL models (within '{}').", path_construction_base_dir.display()).dimmed());
|
||||
} else {
|
||||
// A specific directory (or directories) was configured for semantic models. Use the first one.
|
||||
let primary_path_str = project_semantic_model_paths_config.unwrap().first().unwrap(); // Safe due to map_or check
|
||||
path_construction_base_dir = buster_config_dir.join(primary_path_str);
|
||||
println!("{}", format!("Semantic models will be generated in/under: {}", path_construction_base_dir.display()).dimmed());
|
||||
// Ensure this specific output directory exists
|
||||
fs::create_dir_all(&path_construction_base_dir).map_err(|e| {
|
||||
anyhow!("Failed to create semantic models output directory '{}': {}", path_construction_base_dir.display(), e)
|
||||
})?;
|
||||
}
|
||||
|
||||
let semantic_output_path = buster_config_dir.join(semantic_output_path_str);
|
||||
// Get dbt model source roots (e.g., ["models", "my_other_models"])
|
||||
// These are paths relative to the dbt_project_path (buster_config_dir)
|
||||
let dbt_project_content = parse_dbt_project_file_content(buster_config_dir)?;
|
||||
let dbt_model_source_roots: Vec<PathBuf> = dbt_project_content.as_ref()
|
||||
.map(|content| content.model_paths.iter().map(PathBuf::from).collect())
|
||||
.unwrap_or_else(|| vec![PathBuf::from("models")]); // Default if not found
|
||||
|
||||
// Get defaults from the primary project context for model properties
|
||||
let primary_project_context = buster_config.projects.as_ref().and_then(|p| p.first());
|
||||
let default_data_source_name = primary_project_context
|
||||
.and_then(|pc| pc.data_source_name.as_ref());
|
||||
let default_database = primary_project_context
|
||||
.and_then(|pc| pc.database.as_ref());
|
||||
let default_schema = primary_project_context
|
||||
.and_then(|pc| pc.schema.as_ref());
|
||||
|
||||
let dbt_project_path = buster_config_dir;
|
||||
let catalog_json_path = dbt_project_path.join("target").join("catalog.json");
|
||||
|
@ -710,23 +758,52 @@ async fn generate_semantic_models_from_dbt_catalog(
|
|||
}
|
||||
// --- End Model Scoping Logic ---
|
||||
|
||||
let mut yaml_models: Vec<YamlModel> = Vec::new();
|
||||
let primary_project_context = buster_config.projects.as_ref().and_then(|p| p.first());
|
||||
|
||||
// These defaults are now primarily for the model properties themselves if not set in dbt,
|
||||
// data_source_name should come from the project context more directly.
|
||||
let default_data_source_name = primary_project_context
|
||||
.and_then(|pc| pc.data_source_name.as_ref());
|
||||
let default_database = primary_project_context
|
||||
.and_then(|pc| pc.database.as_ref());
|
||||
let default_schema = primary_project_context
|
||||
.and_then(|pc| pc.schema.as_ref());
|
||||
let mut yaml_models_generated_count = 0;
|
||||
|
||||
for (_node_id, node) in dbt_catalog.nodes.iter().filter(|(_id, n)| n.resource_type == "model") {
|
||||
let original_file_path_abs = buster_config_dir.join(&node.original_file_path);
|
||||
for (_node_id, node) in dbt_catalog.nodes.iter().filter(|(_id, n)| {
|
||||
match &n.resource_type {
|
||||
Some(rt) => rt == "model",
|
||||
None => {
|
||||
eprintln!(
|
||||
"{}",
|
||||
format!(
|
||||
"Warning: Skipping dbt node with unique_id: {} because it is missing 'resource_type' in catalog.json.",
|
||||
n.unique_id
|
||||
).yellow()
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
}) {
|
||||
let Some(ref original_file_path_str) = node.original_file_path else {
|
||||
eprintln!(
|
||||
"{}",
|
||||
format!(
|
||||
"Warning: Skipping dbt model {} (unique_id: {}) because it is missing 'original_file_path' in catalog.json.",
|
||||
node.name.as_deref().unwrap_or("[unknown name]"), // Use derived node.name if available
|
||||
node.unique_id
|
||||
).yellow()
|
||||
);
|
||||
continue;
|
||||
};
|
||||
|
||||
// Ensure metadata.name exists, as it's crucial for the semantic model name
|
||||
let Some(ref actual_model_name_from_metadata) = node.metadata.name else {
|
||||
eprintln!(
|
||||
"{}",
|
||||
format!(
|
||||
"Warning: Skipping dbt model with unique_id: {} because its 'metadata.name' is missing in catalog.json.",
|
||||
node.unique_id
|
||||
).yellow()
|
||||
);
|
||||
continue;
|
||||
};
|
||||
let actual_model_name = actual_model_name_from_metadata.clone(); // Now safe to clone
|
||||
|
||||
let original_file_path_abs = buster_config_dir.join(original_file_path_str);
|
||||
|
||||
let in_scope = if configured_model_path_patterns.is_empty() {
|
||||
true // If no patterns, assume all models are in scope (or handle as error/warning)
|
||||
true // If no patterns, assume all models are in scope
|
||||
} else {
|
||||
configured_model_path_patterns
|
||||
.iter()
|
||||
|
@ -734,13 +811,13 @@ async fn generate_semantic_models_from_dbt_catalog(
|
|||
};
|
||||
|
||||
if !in_scope {
|
||||
println!("Skipping dbt model (not in configured model_paths): {}", node.unique_id.dimmed());
|
||||
// Only log if verbose or similar, this can be noisy
|
||||
// println!("Skipping dbt model (not in configured model_paths): {}", node.unique_id.dimmed());
|
||||
continue;
|
||||
}
|
||||
|
||||
println!("Processing dbt model: {}", node.unique_id.cyan());
|
||||
println!("Processing dbt model for semantic layer: {}: {}", node.unique_id.cyan(), actual_model_name.cyan());
|
||||
|
||||
let actual_model_name = node.metadata.name.clone();
|
||||
let mut dimensions: Vec<YamlDimension> = Vec::new();
|
||||
let mut measures: Vec<YamlMeasure> = Vec::new();
|
||||
|
||||
|
@ -756,60 +833,96 @@ async fn generate_semantic_models_from_dbt_catalog(
|
|||
name: col.name.clone(),
|
||||
description: col.comment.clone(),
|
||||
type_: Some(col.column_type.clone()),
|
||||
searchable: false,
|
||||
searchable: false, // Default to false, user can change
|
||||
options: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let yaml_model = YamlModel {
|
||||
name: actual_model_name,
|
||||
description: node.metadata.comment.clone(),
|
||||
name: actual_model_name, // This should be the model's identifier name
|
||||
description: node.metadata.comment.clone(), // Use metadata.comment as the source for description
|
||||
data_source_name: default_data_source_name.cloned(),
|
||||
database: node.database.clone().or_else(|| default_database.cloned()),
|
||||
schema: node.schema.clone().or_else(|| default_schema.cloned()),
|
||||
dimensions,
|
||||
measures,
|
||||
original_file_path: Some(node.original_file_path.clone()),
|
||||
original_file_path: Some(original_file_path_str.clone()), // Keep original dbt model path for reference
|
||||
};
|
||||
yaml_models.push(yaml_model);
|
||||
|
||||
// Determine the output path for this individual YAML model
|
||||
let dbt_model_path = Path::new(original_file_path_str);
|
||||
let mut stripped_model_path_suffix = PathBuf::new(); // e.g. "marts/sales/revenue.sql" if original is "models/marts/sales/revenue.sql"
|
||||
let mut found_base_for_stripping = false;
|
||||
|
||||
for dbt_source_root in &dbt_model_source_roots { // dbt_source_root is like "models"
|
||||
if let Ok(stripped_path) = dbt_model_path.strip_prefix(dbt_source_root) {
|
||||
stripped_model_path_suffix = stripped_path.to_path_buf();
|
||||
found_base_for_stripping = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if !found_base_for_stripping {
|
||||
// Fallback: if original_file_path_str didn't start with any known dbt_model_source_roots,
|
||||
// (e.g. original_file_path_str is "marts/revenue.sql" and source_root is "models")
|
||||
// then use original_file_path_str as is for the suffix part.
|
||||
// This can happen if dbt_model_source_roots are not exhaustive or path is weird.
|
||||
// The resulting YAML structure will still be relative to path_construction_base_dir.
|
||||
stripped_model_path_suffix = dbt_model_path.to_path_buf();
|
||||
eprintln!("{}", format!(
|
||||
"Warning: Could not strip a known dbt model source root ('{:?}') from dbt model path '{}'. Using full path for suffix: '{}'",
|
||||
dbt_model_source_roots, original_file_path_str, stripped_model_path_suffix.display()
|
||||
).yellow()
|
||||
);
|
||||
}
|
||||
|
||||
let output_yaml_path: PathBuf;
|
||||
if is_side_by_side_generation {
|
||||
// For side-by-side, output is next to the SQL file.
|
||||
// original_file_path_str is relative to buster_config_dir (e.g., "models/marts/sales/revenue.sql")
|
||||
// buster_config_dir is the dbt project root.
|
||||
output_yaml_path = buster_config_dir.join(original_file_path_str).with_extension("yml");
|
||||
} else {
|
||||
// For dedicated output directory:
|
||||
// path_construction_base_dir is the dedicated dir (e.g., "/path/to/project/buster_yamls")
|
||||
// stripped_model_path_suffix is the path part after dbt source root (e.g., "marts/sales/revenue.sql")
|
||||
let yaml_filename_with_subdir = stripped_model_path_suffix.with_extension("yml"); // e.g., "marts/sales/revenue.yml"
|
||||
output_yaml_path = path_construction_base_dir.join(yaml_filename_with_subdir);
|
||||
}
|
||||
|
||||
if let Some(parent_dir) = output_yaml_path.parent() {
|
||||
fs::create_dir_all(parent_dir).map_err(|e| {
|
||||
anyhow!("Failed to create directory for semantic model YAML '{}': {}", parent_dir.display(), e)
|
||||
})?;
|
||||
}
|
||||
|
||||
let yaml_string = serde_yaml::to_string(&yaml_model)
|
||||
.map_err(|e| anyhow!("Failed to serialize semantic model '{}' to YAML: {}", yaml_model.name, e))?;
|
||||
fs::write(&output_yaml_path, yaml_string)
|
||||
.map_err(|e| anyhow!("Failed to write semantic model YAML for '{}' to '{}': {}", yaml_model.name, output_yaml_path.display(), e))?;
|
||||
|
||||
println!(
|
||||
"{} Generated semantic model: {}",
|
||||
"✓".green(),
|
||||
output_yaml_path.display().to_string().cyan()
|
||||
);
|
||||
yaml_models_generated_count += 1;
|
||||
}
|
||||
|
||||
if yaml_models.is_empty() {
|
||||
if yaml_models_generated_count == 0 {
|
||||
println!(
|
||||
"{}",
|
||||
"No dbt models found matching configured paths in catalog.json. Skipping YAML file creation."
|
||||
"No dbt models found matching configured paths in catalog.json, or no models in catalog. No semantic model YAML files generated."
|
||||
.yellow()
|
||||
);
|
||||
return Ok(());
|
||||
} else {
|
||||
println!(
|
||||
"{}",
|
||||
format!("Successfully generated {} semantic model YAML file(s).", yaml_models_generated_count).bold().green()
|
||||
);
|
||||
}
|
||||
|
||||
let semantic_spec = YamlSemanticLayerSpec { models: yaml_models };
|
||||
// The semantic_output_path is already determined above using project context's semantic_models_file
|
||||
// let yaml_output_path_str = buster_config
|
||||
// .semantic_models_file // This top-level field is removed
|
||||
// .as_ref()
|
||||
// .ok_or_else(|| anyhow!("Semantic models file path not set in BusterConfig"))?;
|
||||
// let semantic_output_path = buster_config_dir.join(yaml_output_path_str);
|
||||
|
||||
|
||||
if let Some(parent_dir) = semantic_output_path.parent() {
|
||||
fs::create_dir_all(parent_dir).map_err(|e| {
|
||||
anyhow!("Failed to create directory for semantic models YAML '{}': {}", parent_dir.display(), e)
|
||||
})?;
|
||||
}
|
||||
|
||||
let yaml_string = serde_yaml::to_string(&semantic_spec)
|
||||
.map_err(|e| anyhow!("Failed to serialize semantic models to YAML: {}", e))?;
|
||||
fs::write(&semantic_output_path, yaml_string)
|
||||
.map_err(|e| anyhow!("Failed to write semantic models YAML file: {}", e))?;
|
||||
|
||||
println!(
|
||||
"{} {}",
|
||||
"✓ Successfully generated semantic layer YAML at:".green(),
|
||||
semantic_output_path.display().to_string().cyan()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -937,7 +1050,7 @@ fn create_buster_config_file(
|
|||
model_paths: model_paths_vec,
|
||||
exclude_files: None,
|
||||
exclude_tags: None,
|
||||
semantic_models_file: None, // Initialized as None, will be set later if user opts in
|
||||
semantic_model_paths: None, // Initialized as None, will be set later if user opts in
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -949,7 +1062,6 @@ fn create_buster_config_file(
|
|||
exclude_tags: None,
|
||||
model_paths: None, // This top-level field is superseded by 'projects'
|
||||
projects: Some(project_contexts),
|
||||
// semantic_models_file: None, // Removed from top-level
|
||||
};
|
||||
|
||||
config.save(path)?;
|
||||
|
@ -1027,7 +1139,7 @@ fn build_contexts_recursive(
|
|||
model_paths: if model_globs_for_context.is_empty() { None } else { Some(model_globs_for_context) },
|
||||
exclude_files: None,
|
||||
exclude_tags: None,
|
||||
semantic_models_file: None, // Initialized as None for contexts derived from dbt_project.yml
|
||||
semantic_model_paths: None, // Initialized as None, will be set later if user opts in
|
||||
});
|
||||
println!("Generated project context: {} (Schema: {}, DB: {})",
|
||||
context_name.cyan(),
|
||||
|
|
|
@ -23,7 +23,7 @@ pub struct ProjectContext {
|
|||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub name: Option<String>, // Optional name for the project
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub semantic_models_file: Option<String>, // Path to the semantic layer YAML for this project
|
||||
pub semantic_model_paths: Option<Vec<String>>, // Paths to directories where semantic model YAML files (1:1 with SQL models) are stored
|
||||
}
|
||||
|
||||
impl ProjectContext {
|
||||
|
|
|
@ -77,8 +77,41 @@ pub fn load_and_parse_catalog(catalog_json_path: &Path) -> Result<DbtCatalog> {
|
|||
let catalog_content = fs::read_to_string(catalog_json_path)
|
||||
.with_context(|| format!("Failed to read catalog.json from {}", catalog_json_path.display()))?;
|
||||
|
||||
serde_json::from_str(&catalog_content)
|
||||
.with_context(|| format!("Failed to parse catalog.json from {}. Ensure it is valid JSON.", catalog_json_path.display()))
|
||||
let mut catalog: DbtCatalog = serde_json::from_str(&catalog_content)
|
||||
.map_err(|e| {
|
||||
// Log the detailed serde error
|
||||
eprintln!("Detailed parsing error for {}: {:#?}", catalog_json_path.display(), e);
|
||||
anyhow!(
|
||||
"Failed to parse catalog.json from {}. Error: {}. Ensure the file content is valid and matches the expected dbt catalog structure.",
|
||||
catalog_json_path.display(),
|
||||
e // e.to_string() will give a concise error message from serde
|
||||
)
|
||||
})?;
|
||||
|
||||
// Post-process nodes to derive resource_type if missing
|
||||
for node in catalog.nodes.values_mut() {
|
||||
if node.resource_type.is_none() {
|
||||
let parts: Vec<&str> = node.unique_id.splitn(2, '.').collect();
|
||||
if !parts.is_empty() {
|
||||
let potential_type = parts[0];
|
||||
if ["model", "source", "seed", "snapshot", "test"].contains(&potential_type) {
|
||||
node.resource_type = Some(potential_type.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if node.name.is_none() {
|
||||
// Try to derive node.name from the last part of unique_id
|
||||
// e.g., model.my_package.my_model_name -> my_model_name
|
||||
if let Some(last_part) = node.unique_id.split('.').last() {
|
||||
if !last_part.is_empty() {
|
||||
node.name = Some(last_part.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(catalog)
|
||||
}
|
||||
|
||||
pub fn add(left: usize, right: usize) -> usize {
|
||||
|
|
|
@ -1,53 +1,119 @@
|
|||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
|
||||
// Struct definitions copied from commands/init.rs and made pub.
|
||||
// These are for parsing dbt's catalog.json.
|
||||
// Struct definitions for parsing dbt's catalog.json.
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct DbtCatalog {
|
||||
#[allow(dead_code)]
|
||||
pub metadata: DbtCatalogMetadata,
|
||||
pub nodes: HashMap<String, DbtNode>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub sources: Option<HashMap<String, DbtSource>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub macros: Option<HashMap<String, serde_json::Value>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub exposures: Option<HashMap<String, serde_json::Value>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub metrics: Option<HashMap<String, serde_json::Value>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub selectors: Option<HashMap<String, serde_json::Value>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub disabled: Option<HashMap<String, Vec<serde_json::Value>>>, // dbt-core uses Vec here
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub parent_map: Option<HashMap<String, Vec<String>>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub child_map: Option<HashMap<String, Vec<String>>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub errors: Option<serde_json::Value>, // Can be null or an object with error details
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct DbtCatalogMetadata {
|
||||
#[serde(rename = "dbt_schema_version")]
|
||||
#[allow(dead_code)]
|
||||
pub dbt_schema_version: String,
|
||||
#[allow(dead_code)] // If not used directly by Buster, but good for complete parsing
|
||||
pub dbt_version: Option<String>,
|
||||
#[allow(dead_code)]
|
||||
pub generated_at: Option<String>,
|
||||
#[allow(dead_code)]
|
||||
pub invocation_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct DbtNode {
|
||||
pub metadata: DbtNodeMetadata,
|
||||
// Ensure metadata is present, matches example which has it implicitly via direct fields
|
||||
// For the example catalog's node structure, we might need to flatten some metadata fields
|
||||
// or expect them directly if `metadata` as a block is not always there.
|
||||
// However, standard dbt catalog.json *does* have a metadata block within each node.
|
||||
// The example provided might be a slight simplification or custom representation.
|
||||
// Assuming standard catalog structure for now, where DbtNodeMetadata is a separate struct.
|
||||
pub metadata: DbtNodeMetadata,
|
||||
pub columns: HashMap<String, DbtColumn>,
|
||||
pub resource_type: String,
|
||||
#[serde(rename = "resource_type")] // if resource_type is not directly in JSON, this helps map if some other key exists
|
||||
// if type is the key in JSON for resource_type, then it should be:
|
||||
// #[serde(alias = "type")] // or handle it in DbtNodeMetadata if type is part of metadata
|
||||
#[serde(default)] // Make it optional and handle missing field
|
||||
pub resource_type: Option<String>, // This refers to model, seed, snapshot, test etc.
|
||||
pub unique_id: String,
|
||||
#[serde(default)]
|
||||
pub original_file_path: String,
|
||||
#[serde(default)] // original_file_path might not be present for all node types
|
||||
pub original_file_path: Option<String>,
|
||||
pub database: Option<String>,
|
||||
pub schema: Option<String>,
|
||||
pub name: String, // This is the alias in dbt, metadata.name is the relation name
|
||||
#[serde(default)] // Make name optional
|
||||
pub name: Option<String>, // This is often the filename or alias. metadata.name is relation name.
|
||||
pub comment: Option<String>, // Comment can be directly on the node for some versions/types
|
||||
pub stats: Option<serde_json::Value>, // To capture general stats blocks
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct DbtNodeMetadata {
|
||||
#[serde(rename = "type")]
|
||||
// Standard dbt catalog.json has `name` here as the relation name.
|
||||
#[serde(default)] // Make name optional
|
||||
pub name: Option<String>,
|
||||
#[serde(rename = "type")] // This 'type' inside metadata usually refers to the materialization (table, view, etc.) for models
|
||||
pub relation_type: Option<String>,
|
||||
pub schema: Option<String>,
|
||||
pub name: String,
|
||||
pub database: Option<String>,
|
||||
pub comment: Option<String>,
|
||||
pub schema: Option<String>, // schema can also be here
|
||||
pub database: Option<String>, // database can also be here
|
||||
pub comment: Option<String>, // comment for the model/node itself
|
||||
#[allow(dead_code)]
|
||||
pub owner: Option<String>,
|
||||
// Add other potential metadata fields if necessary, e.g., tags, config, etc.
|
||||
#[serde(default)]
|
||||
pub tags: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct DbtSource {
|
||||
pub name: String, // This is the source's table name
|
||||
pub unique_id: String,
|
||||
pub database: Option<String>,
|
||||
pub schema: Option<String>,
|
||||
#[serde(default, alias = "resource_type")] // Sources have "source" as resource_type, or a specific table type.
|
||||
pub table_type: Option<String>, // e.g. "table", often not explicitly a 'type' field in catalog for sources, but implied.
|
||||
pub columns: HashMap<String, DbtColumn>,
|
||||
pub comment: Option<String>,
|
||||
pub stats: Option<serde_json::Value>,
|
||||
// Sources can also have a 'meta' field, 'tags', 'description', 'loader', 'freshness' etc.
|
||||
#[serde(default)]
|
||||
pub description: Option<String>, // description is preferred over comment for sources usually
|
||||
#[serde(default)]
|
||||
pub meta: Option<HashMap<String, serde_json::Value>>,
|
||||
#[serde(default)]
|
||||
pub tags: Vec<String>,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct DbtColumn {
|
||||
#[serde(rename = "type")]
|
||||
pub column_type: String,
|
||||
pub index: u32,
|
||||
pub index: Option<u32>, // Index might not always be present
|
||||
pub name: String,
|
||||
pub comment: Option<String>,
|
||||
#[serde(default)]
|
||||
pub description: Option<String>, // Columns can also have descriptions
|
||||
#[serde(default)]
|
||||
pub meta: Option<HashMap<String, serde_json::Value>>,
|
||||
#[serde(default)]
|
||||
pub tags: Vec<String>,
|
||||
}
|
Loading…
Reference in New Issue