From 71a500cacfb8a5be4a95980363e1d58cc3826ec9 Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 25 Feb 2025 12:13:23 -0700 Subject: [PATCH] recursive directory structure works greate --- .../rest/routes/datasets/deploy_datasets.rs | 30 +++++ cli/Cargo.toml | 1 + cli/src/commands/deploy_v2.rs | 57 +++++++-- cli/src/commands/generate.rs | 118 ++++++++++++++---- cli/src/commands/mod.rs | 2 +- cli/src/main.rs | 33 +++-- 6 files changed, 194 insertions(+), 47 deletions(-) diff --git a/api/src/routes/rest/routes/datasets/deploy_datasets.rs b/api/src/routes/rest/routes/datasets/deploy_datasets.rs index b2b5277a3..5cf6be7b9 100644 --- a/api/src/routes/rest/routes/datasets/deploy_datasets.rs +++ b/api/src/routes/rest/routes/datasets/deploy_datasets.rs @@ -470,6 +470,36 @@ async fn deploy_datasets_handler( .into_iter() .collect(); + // Get new dataset names from the request + let new_dataset_names: HashSet = valid_datasets + .iter() + .map(|req| req.name.clone()) + .collect(); + + // Find datasets that exist but aren't in the request + let datasets_to_delete: Vec = existing_datasets + .difference(&new_dataset_names) + .cloned() + .collect(); + + // Mark datasets as deleted if they're not in the request + if !datasets_to_delete.is_empty() { + tracing::info!( + "Marking {} datasets as deleted for data source '{}': {:?}", + datasets_to_delete.len(), + data_source_name, + datasets_to_delete + ); + + diesel::update(datasets::table) + .filter(datasets::data_source_id.eq(&data_source.id)) + .filter(datasets::name.eq_any(&datasets_to_delete)) + .filter(datasets::deleted_at.is_null()) + .set(datasets::deleted_at.eq(now)) + .execute(&mut conn) + .await?; + } + // Prepare datasets for upsert let datasets_to_upsert: Vec = valid_datasets .iter() diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 41d2719de..93cd33664 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -37,6 +37,7 @@ rustls = { version = "0.23", features = ["tls12"] } url = "2.5.0" zip = "2.2.2" glob = "0.3.1" +walkdir = "2.5.0" [dev-dependencies] tempfile = "3.16.0" diff --git a/cli/src/commands/deploy_v2.rs b/cli/src/commands/deploy_v2.rs index a043ee897..f96e9b3a3 100644 --- a/cli/src/commands/deploy_v2.rs +++ b/cli/src/commands/deploy_v2.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::path::{Path, PathBuf}; use tokio::task; +use walkdir::WalkDir; use crate::utils::{ buster_credentials::get_and_validate_buster_credentials, BusterClient, @@ -808,7 +809,7 @@ impl ModelFile { } } -pub async fn deploy_v2(path: Option<&str>, dry_run: bool) -> Result<()> { +pub async fn deploy_v2(path: Option<&str>, dry_run: bool, recursive: bool) -> Result<()> { let target_path = PathBuf::from(path.unwrap_or(".")); let mut progress = DeployProgress::new(0); let mut result = DeployResult::default(); @@ -856,7 +857,10 @@ pub async fn deploy_v2(path: Option<&str>, dry_run: bool) -> Result<()> { let yml_files: Vec = if target_path.is_file() { vec![target_path.clone()] + } else if recursive { + find_yml_files_recursively(&target_path)? } else { + // Non-recursive mode - only search in the specified directory std::fs::read_dir(&target_path)? .filter_map(|entry| entry.ok()) .filter(|entry| { @@ -889,9 +893,9 @@ pub async fn deploy_v2(path: Option<&str>, dry_run: bool) -> Result<()> { for yml_path in yml_files { progress.processed += 1; progress.current_file = yml_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("unknown") + .strip_prefix(&target_path) + .unwrap_or(&yml_path) + .to_string_lossy() .to_string(); progress.status = "Loading model file...".to_string(); @@ -1138,6 +1142,35 @@ pub async fn deploy_v2(path: Option<&str>, dry_run: bool) -> Result<()> { Ok(()) } +// New helper function to find YML files recursively +fn find_yml_files_recursively(dir: &Path) -> Result> { + let mut result = Vec::new(); + + if !dir.is_dir() { + return Err(anyhow::anyhow!("Path is not a directory: {}", dir.display())); + } + + for entry in WalkDir::new(dir) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()) + { + let path = entry.path(); + + // Skip buster.yml files + if path.file_name().and_then(|n| n.to_str()) == Some("buster.yml") { + continue; + } + + if path.is_file() && + path.extension().and_then(|ext| ext.to_str()) == Some("yml") { + result.push(path.to_path_buf()); + } + } + + Ok(result) +} + #[cfg(test)] mod tests { use super::*; @@ -1197,7 +1230,7 @@ mod tests { create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?; // Test dry run - let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await; + let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await; assert!(result.is_ok()); Ok(()) @@ -1241,7 +1274,7 @@ mod tests { create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?; // Test dry run - let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await; + let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await; assert!(result.is_ok()); Ok(()) @@ -1285,7 +1318,7 @@ mod tests { create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?; // Test dry run - should fail due to data source mismatch - let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await; + let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await; assert!(result.is_err()); Ok(()) @@ -1320,7 +1353,7 @@ mod tests { create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?; // Test dry run - should fail due to missing project - let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await; + let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await; assert!(result.is_err()); Ok(()) @@ -1368,7 +1401,7 @@ mod tests { } // Test dry run - let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await; + let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await; assert!(result.is_ok()); Ok(()) @@ -1390,7 +1423,7 @@ mod tests { create_test_yaml(temp_dir.path(), "invalid_model.yml", invalid_yml).await?; // Test dry run - should fail due to invalid YAML - let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await; + let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await; assert!(result.is_err()); Ok(()) @@ -1442,7 +1475,7 @@ mod tests { create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?; // Test dry run - should succeed because actual_model exists - let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await; + let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await; assert!(result.is_ok()); Ok(()) @@ -1477,7 +1510,7 @@ mod tests { create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?; // Test dry run - should fail because referenced model doesn't exist - let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await; + let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await; assert!(result.is_err()); Ok(()) diff --git a/cli/src/commands/generate.rs b/cli/src/commands/generate.rs index f250c2ac5..d2a07610c 100644 --- a/cli/src/commands/generate.rs +++ b/cli/src/commands/generate.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::collections::HashMap; use regex::Regex; use lazy_static::lazy_static; @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; use std::fs; use std::fmt; use inquire::{Text, required}; +use walkdir::WalkDir; use crate::utils::{ buster_credentials::get_and_validate_buster_credentials, BusterClient, GenerateApiRequest, GenerateApiResponse, @@ -23,6 +24,7 @@ pub struct GenerateCommand { schema: Option, database: Option, config: BusterConfig, + maintain_directory_structure: bool, } #[derive(Debug)] @@ -164,6 +166,7 @@ impl GenerateCommand { schema, database, config, + maintain_directory_structure: true, // Default to maintaining directory structure } } @@ -187,6 +190,7 @@ impl GenerateCommand { schema: self.schema.clone(), database: self.database.clone(), config, // Use the loaded config + maintain_directory_structure: self.maintain_directory_structure, }; let model_names = cmd.process_sql_files(&mut progress).await?; @@ -222,7 +226,19 @@ impl GenerateCommand { Ok(response) => { // Process each model's YAML for (model_name, yml_content) in response.yml_contents { - let file_path = self.destination_path.join(format!("{}.yml", model_name)); + // Find the source file for this model + let source_file = model_names.iter() + .find(|m| m.name == model_name) + .map(|m| m.source_file.clone()) + .unwrap_or_else(|| self.destination_path.join(format!("{}.sql", model_name))); + + // Determine output path based on source file + let file_path = self.get_output_path(&model_name, &source_file); + + // Create parent directories if they don't exist + if let Some(parent) = file_path.parent() { + fs::create_dir_all(parent)?; + } if file_path.exists() { // Use YAML diff merger for existing files @@ -238,15 +254,15 @@ impl GenerateCommand { match merger.apply_changes(&diff_result) { Ok(_) => { progress.log_success(); - println!("✅ Updated {}.yml", model_name); + println!("✅ Updated {}", file_path.display()); } Err(e) => { - progress.log_error(&format!("Failed to update {}.yml: {}", model_name, e)); + progress.log_error(&format!("Failed to update {}: {}", file_path.display(), e)); } } } Err(e) => { - progress.log_error(&format!("Failed to compute diff for {}.yml: {}", model_name, e)); + progress.log_error(&format!("Failed to compute diff for {}: {}", file_path.display(), e)); } } } else { @@ -254,10 +270,10 @@ impl GenerateCommand { match fs::write(&file_path, yml_content) { Ok(_) => { progress.log_success(); - println!("✅ Created new file {}.yml", model_name); + println!("✅ Created new file {}", file_path.display()); } Err(e) => { - progress.log_error(&format!("Failed to write {}.yml: {}", model_name, e)); + progress.log_error(&format!("Failed to write {}: {}", file_path.display(), e)); } } } @@ -384,24 +400,15 @@ impl GenerateCommand { Vec::new() }; - // Get list of SQL files first to set total - let sql_files: Vec<_> = fs::read_dir(&self.source_path)? - .filter_map(|entry| entry.ok()) - .filter(|entry| { - entry.path().extension() - .and_then(|ext| ext.to_str()) - .map(|ext| ext.to_lowercase() == "sql") - .unwrap_or(false) - }) - .collect(); + // Get list of SQL files recursively + let sql_files = find_sql_files_recursively(&self.source_path)?; progress.total_files = sql_files.len(); progress.status = format!("Found {} SQL files to process", sql_files.len()); progress.log_progress(); - for entry in sql_files { + for file_path in sql_files { progress.processed += 1; - let file_path = entry.path(); // Get the relative path from the source directory let relative_path = file_path.strip_prefix(&self.source_path) @@ -436,7 +443,7 @@ impl GenerateCommand { errors.push(GenerateError::DuplicateModelName { name: model_name.name, first_occurrence: existing.clone(), - duplicate_occurrence: entry.path(), + duplicate_occurrence: file_path.clone(), }); } else { progress.log_info(&format!( @@ -444,7 +451,7 @@ impl GenerateCommand { model_name.name, if model_name.is_from_alias { "from alias" } else { "from filename" } )); - seen_names.insert(model_name.name.clone(), entry.path()); + seen_names.insert(model_name.name.clone(), file_path.clone()); names.push(model_name); } } @@ -537,8 +544,73 @@ impl GenerateCommand { ALIAS_RE.captures(content) .map(|cap| cap[1].to_string()) } + + // Add a method to determine the output path for a model + fn get_output_path(&self, model_name: &str, source_file: &Path) -> PathBuf { + // If destination_path is specified, use it + if self.destination_path != self.source_path { + // Use destination path with flat or mirrored structure + if self.maintain_directory_structure { + let relative = source_file.strip_prefix(&self.source_path).unwrap_or(Path::new("")); + let parent = relative.parent().unwrap_or(Path::new("")); + self.destination_path.join(parent).join(format!("{}.yml", model_name)) + } else { + // Flat structure + self.destination_path.join(format!("{}.yml", model_name)) + } + } else { + // Write alongside the SQL file + let parent = source_file.parent().unwrap_or(Path::new(".")); + parent.join(format!("{}.yml", model_name)) + } + } } -pub async fn generate() -> Result<()> { - Ok(()) +// New helper function to find SQL files recursively +fn find_sql_files_recursively(dir: &Path) -> Result> { + let mut result = Vec::new(); + + if !dir.is_dir() { + return Err(anyhow::anyhow!("Path is not a directory: {}", dir.display())); + } + + for entry in WalkDir::new(dir) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()) + { + let path = entry.path(); + + if path.is_file() && + path.extension().and_then(|ext| ext.to_str()) == Some("sql") { + result.push(path.to_path_buf()); + } + } + + Ok(result) +} + +pub async fn generate( + source_path: Option<&str>, + destination_path: Option<&str>, + data_source_name: Option, + schema: Option, + database: Option, + flat_structure: bool, +) -> Result<()> { + let source = PathBuf::from(source_path.unwrap_or(".")); + let destination = PathBuf::from(destination_path.unwrap_or(".")); + + let mut cmd = GenerateCommand::new( + source, + destination, + data_source_name, + schema, + database, + ); + + // Set directory structure preference + cmd.maintain_directory_structure = !flat_structure; + + cmd.execute().await } diff --git a/cli/src/commands/mod.rs b/cli/src/commands/mod.rs index 06a8dcd48..5035eca04 100644 --- a/cli/src/commands/mod.rs +++ b/cli/src/commands/mod.rs @@ -10,7 +10,7 @@ pub mod update; pub use auth::{auth, auth_with_args, AuthArgs}; pub use deploy::deploy; pub use deploy_v2::deploy_v2; -pub use generate::GenerateCommand; +pub use generate::{GenerateCommand, generate}; pub use import::import; pub use init::init; pub use update::UpdateCommand; diff --git a/cli/src/main.rs b/cli/src/main.rs index 92cdce6a1..3cbfbaf05 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -5,8 +5,7 @@ mod utils; use clap::{Parser, Subcommand}; use colored::*; -use commands::{auth::AuthArgs, deploy, deploy_v2, import, init, GenerateCommand}; -use std::path::PathBuf; +use commands::{auth::AuthArgs, deploy_v2, import, init}; pub const APP_NAME: &str = "buster"; pub const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -56,6 +55,9 @@ pub enum Commands { schema: Option, #[arg(long)] database: Option, + /// Output YML files in a flat structure instead of maintaining directory hierarchy + #[arg(long, default_value_t = false)] + flat_structure: bool, }, Import, Deploy { @@ -63,6 +65,9 @@ pub enum Commands { path: Option, #[arg(long, default_value_t = false)] dry_run: bool, + /// Recursively search for model files in subdirectories + #[arg(long, default_value_t = true)] + recursive: bool, }, } @@ -126,18 +131,24 @@ async fn main() { data_source_name, schema, database, + flat_structure, } => { - let source = source_path - .map(PathBuf::from) - .unwrap_or_else(|| PathBuf::from(".")); - let dest = destination_path - .map(PathBuf::from) - .unwrap_or_else(|| PathBuf::from(".")); - let cmd = GenerateCommand::new(source, dest, data_source_name, schema, database); - cmd.execute().await + commands::generate( + source_path.as_deref(), + destination_path.as_deref(), + data_source_name, + schema, + database, + flat_structure, + ) + .await } Commands::Import => import().await, - Commands::Deploy { path, dry_run } => deploy_v2(path.as_deref(), dry_run).await, + Commands::Deploy { + path, + dry_run, + recursive, + } => deploy_v2(path.as_deref(), dry_run, recursive).await, }; if let Err(e) = result {