mirror of https://github.com/buster-so/buster.git
recursive directory structure works greate
This commit is contained in:
parent
5e947b7c55
commit
71a500cacf
|
@ -470,6 +470,36 @@ async fn deploy_datasets_handler(
|
|||
.into_iter()
|
||||
.collect();
|
||||
|
||||
// Get new dataset names from the request
|
||||
let new_dataset_names: HashSet<String> = valid_datasets
|
||||
.iter()
|
||||
.map(|req| req.name.clone())
|
||||
.collect();
|
||||
|
||||
// Find datasets that exist but aren't in the request
|
||||
let datasets_to_delete: Vec<String> = existing_datasets
|
||||
.difference(&new_dataset_names)
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// Mark datasets as deleted if they're not in the request
|
||||
if !datasets_to_delete.is_empty() {
|
||||
tracing::info!(
|
||||
"Marking {} datasets as deleted for data source '{}': {:?}",
|
||||
datasets_to_delete.len(),
|
||||
data_source_name,
|
||||
datasets_to_delete
|
||||
);
|
||||
|
||||
diesel::update(datasets::table)
|
||||
.filter(datasets::data_source_id.eq(&data_source.id))
|
||||
.filter(datasets::name.eq_any(&datasets_to_delete))
|
||||
.filter(datasets::deleted_at.is_null())
|
||||
.set(datasets::deleted_at.eq(now))
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Prepare datasets for upsert
|
||||
let datasets_to_upsert: Vec<Dataset> = valid_datasets
|
||||
.iter()
|
||||
|
|
|
@ -37,6 +37,7 @@ rustls = { version = "0.23", features = ["tls12"] }
|
|||
url = "2.5.0"
|
||||
zip = "2.2.2"
|
||||
glob = "0.3.1"
|
||||
walkdir = "2.5.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.16.0"
|
||||
|
|
|
@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
|
|||
use std::collections::HashSet;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::task;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::utils::{
|
||||
buster_credentials::get_and_validate_buster_credentials, BusterClient,
|
||||
|
@ -808,7 +809,7 @@ impl ModelFile {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn deploy_v2(path: Option<&str>, dry_run: bool) -> Result<()> {
|
||||
pub async fn deploy_v2(path: Option<&str>, dry_run: bool, recursive: bool) -> Result<()> {
|
||||
let target_path = PathBuf::from(path.unwrap_or("."));
|
||||
let mut progress = DeployProgress::new(0);
|
||||
let mut result = DeployResult::default();
|
||||
|
@ -856,7 +857,10 @@ pub async fn deploy_v2(path: Option<&str>, dry_run: bool) -> Result<()> {
|
|||
|
||||
let yml_files: Vec<PathBuf> = if target_path.is_file() {
|
||||
vec![target_path.clone()]
|
||||
} else if recursive {
|
||||
find_yml_files_recursively(&target_path)?
|
||||
} else {
|
||||
// Non-recursive mode - only search in the specified directory
|
||||
std::fs::read_dir(&target_path)?
|
||||
.filter_map(|entry| entry.ok())
|
||||
.filter(|entry| {
|
||||
|
@ -889,9 +893,9 @@ pub async fn deploy_v2(path: Option<&str>, dry_run: bool) -> Result<()> {
|
|||
for yml_path in yml_files {
|
||||
progress.processed += 1;
|
||||
progress.current_file = yml_path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or("unknown")
|
||||
.strip_prefix(&target_path)
|
||||
.unwrap_or(&yml_path)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
|
||||
progress.status = "Loading model file...".to_string();
|
||||
|
@ -1138,6 +1142,35 @@ pub async fn deploy_v2(path: Option<&str>, dry_run: bool) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
// New helper function to find YML files recursively
|
||||
fn find_yml_files_recursively(dir: &Path) -> Result<Vec<PathBuf>> {
|
||||
let mut result = Vec::new();
|
||||
|
||||
if !dir.is_dir() {
|
||||
return Err(anyhow::anyhow!("Path is not a directory: {}", dir.display()));
|
||||
}
|
||||
|
||||
for entry in WalkDir::new(dir)
|
||||
.follow_links(true)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
{
|
||||
let path = entry.path();
|
||||
|
||||
// Skip buster.yml files
|
||||
if path.file_name().and_then(|n| n.to_str()) == Some("buster.yml") {
|
||||
continue;
|
||||
}
|
||||
|
||||
if path.is_file() &&
|
||||
path.extension().and_then(|ext| ext.to_str()) == Some("yml") {
|
||||
result.push(path.to_path_buf());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -1197,7 +1230,7 @@ mod tests {
|
|||
create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?;
|
||||
|
||||
// Test dry run
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await;
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
Ok(())
|
||||
|
@ -1241,7 +1274,7 @@ mod tests {
|
|||
create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?;
|
||||
|
||||
// Test dry run
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await;
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
Ok(())
|
||||
|
@ -1285,7 +1318,7 @@ mod tests {
|
|||
create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?;
|
||||
|
||||
// Test dry run - should fail due to data source mismatch
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await;
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await;
|
||||
assert!(result.is_err());
|
||||
|
||||
Ok(())
|
||||
|
@ -1320,7 +1353,7 @@ mod tests {
|
|||
create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?;
|
||||
|
||||
// Test dry run - should fail due to missing project
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await;
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await;
|
||||
assert!(result.is_err());
|
||||
|
||||
Ok(())
|
||||
|
@ -1368,7 +1401,7 @@ mod tests {
|
|||
}
|
||||
|
||||
// Test dry run
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await;
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
Ok(())
|
||||
|
@ -1390,7 +1423,7 @@ mod tests {
|
|||
create_test_yaml(temp_dir.path(), "invalid_model.yml", invalid_yml).await?;
|
||||
|
||||
// Test dry run - should fail due to invalid YAML
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await;
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await;
|
||||
assert!(result.is_err());
|
||||
|
||||
Ok(())
|
||||
|
@ -1442,7 +1475,7 @@ mod tests {
|
|||
create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?;
|
||||
|
||||
// Test dry run - should succeed because actual_model exists
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await;
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
Ok(())
|
||||
|
@ -1477,7 +1510,7 @@ mod tests {
|
|||
create_test_yaml(temp_dir.path(), "test_model.yml", model_yml).await?;
|
||||
|
||||
// Test dry run - should fail because referenced model doesn't exist
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true).await;
|
||||
let result = deploy_v2(Some(temp_dir.path().to_str().unwrap()), true, false).await;
|
||||
assert!(result.is_err());
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use anyhow::Result;
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::collections::HashMap;
|
||||
use regex::Regex;
|
||||
use lazy_static::lazy_static;
|
||||
|
@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
|
|||
use std::fs;
|
||||
use std::fmt;
|
||||
use inquire::{Text, required};
|
||||
use walkdir::WalkDir;
|
||||
use crate::utils::{
|
||||
buster_credentials::get_and_validate_buster_credentials,
|
||||
BusterClient, GenerateApiRequest, GenerateApiResponse,
|
||||
|
@ -23,6 +24,7 @@ pub struct GenerateCommand {
|
|||
schema: Option<String>,
|
||||
database: Option<String>,
|
||||
config: BusterConfig,
|
||||
maintain_directory_structure: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -164,6 +166,7 @@ impl GenerateCommand {
|
|||
schema,
|
||||
database,
|
||||
config,
|
||||
maintain_directory_structure: true, // Default to maintaining directory structure
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,6 +190,7 @@ impl GenerateCommand {
|
|||
schema: self.schema.clone(),
|
||||
database: self.database.clone(),
|
||||
config, // Use the loaded config
|
||||
maintain_directory_structure: self.maintain_directory_structure,
|
||||
};
|
||||
|
||||
let model_names = cmd.process_sql_files(&mut progress).await?;
|
||||
|
@ -222,7 +226,19 @@ impl GenerateCommand {
|
|||
Ok(response) => {
|
||||
// Process each model's YAML
|
||||
for (model_name, yml_content) in response.yml_contents {
|
||||
let file_path = self.destination_path.join(format!("{}.yml", model_name));
|
||||
// Find the source file for this model
|
||||
let source_file = model_names.iter()
|
||||
.find(|m| m.name == model_name)
|
||||
.map(|m| m.source_file.clone())
|
||||
.unwrap_or_else(|| self.destination_path.join(format!("{}.sql", model_name)));
|
||||
|
||||
// Determine output path based on source file
|
||||
let file_path = self.get_output_path(&model_name, &source_file);
|
||||
|
||||
// Create parent directories if they don't exist
|
||||
if let Some(parent) = file_path.parent() {
|
||||
fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
if file_path.exists() {
|
||||
// Use YAML diff merger for existing files
|
||||
|
@ -238,15 +254,15 @@ impl GenerateCommand {
|
|||
match merger.apply_changes(&diff_result) {
|
||||
Ok(_) => {
|
||||
progress.log_success();
|
||||
println!("✅ Updated {}.yml", model_name);
|
||||
println!("✅ Updated {}", file_path.display());
|
||||
}
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Failed to update {}.yml: {}", model_name, e));
|
||||
progress.log_error(&format!("Failed to update {}: {}", file_path.display(), e));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Failed to compute diff for {}.yml: {}", model_name, e));
|
||||
progress.log_error(&format!("Failed to compute diff for {}: {}", file_path.display(), e));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -254,10 +270,10 @@ impl GenerateCommand {
|
|||
match fs::write(&file_path, yml_content) {
|
||||
Ok(_) => {
|
||||
progress.log_success();
|
||||
println!("✅ Created new file {}.yml", model_name);
|
||||
println!("✅ Created new file {}", file_path.display());
|
||||
}
|
||||
Err(e) => {
|
||||
progress.log_error(&format!("Failed to write {}.yml: {}", model_name, e));
|
||||
progress.log_error(&format!("Failed to write {}: {}", file_path.display(), e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -384,24 +400,15 @@ impl GenerateCommand {
|
|||
Vec::new()
|
||||
};
|
||||
|
||||
// Get list of SQL files first to set total
|
||||
let sql_files: Vec<_> = fs::read_dir(&self.source_path)?
|
||||
.filter_map(|entry| entry.ok())
|
||||
.filter(|entry| {
|
||||
entry.path().extension()
|
||||
.and_then(|ext| ext.to_str())
|
||||
.map(|ext| ext.to_lowercase() == "sql")
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.collect();
|
||||
// Get list of SQL files recursively
|
||||
let sql_files = find_sql_files_recursively(&self.source_path)?;
|
||||
|
||||
progress.total_files = sql_files.len();
|
||||
progress.status = format!("Found {} SQL files to process", sql_files.len());
|
||||
progress.log_progress();
|
||||
|
||||
for entry in sql_files {
|
||||
for file_path in sql_files {
|
||||
progress.processed += 1;
|
||||
let file_path = entry.path();
|
||||
|
||||
// Get the relative path from the source directory
|
||||
let relative_path = file_path.strip_prefix(&self.source_path)
|
||||
|
@ -436,7 +443,7 @@ impl GenerateCommand {
|
|||
errors.push(GenerateError::DuplicateModelName {
|
||||
name: model_name.name,
|
||||
first_occurrence: existing.clone(),
|
||||
duplicate_occurrence: entry.path(),
|
||||
duplicate_occurrence: file_path.clone(),
|
||||
});
|
||||
} else {
|
||||
progress.log_info(&format!(
|
||||
|
@ -444,7 +451,7 @@ impl GenerateCommand {
|
|||
model_name.name,
|
||||
if model_name.is_from_alias { "from alias" } else { "from filename" }
|
||||
));
|
||||
seen_names.insert(model_name.name.clone(), entry.path());
|
||||
seen_names.insert(model_name.name.clone(), file_path.clone());
|
||||
names.push(model_name);
|
||||
}
|
||||
}
|
||||
|
@ -537,8 +544,73 @@ impl GenerateCommand {
|
|||
ALIAS_RE.captures(content)
|
||||
.map(|cap| cap[1].to_string())
|
||||
}
|
||||
|
||||
// Add a method to determine the output path for a model
|
||||
fn get_output_path(&self, model_name: &str, source_file: &Path) -> PathBuf {
|
||||
// If destination_path is specified, use it
|
||||
if self.destination_path != self.source_path {
|
||||
// Use destination path with flat or mirrored structure
|
||||
if self.maintain_directory_structure {
|
||||
let relative = source_file.strip_prefix(&self.source_path).unwrap_or(Path::new(""));
|
||||
let parent = relative.parent().unwrap_or(Path::new(""));
|
||||
self.destination_path.join(parent).join(format!("{}.yml", model_name))
|
||||
} else {
|
||||
// Flat structure
|
||||
self.destination_path.join(format!("{}.yml", model_name))
|
||||
}
|
||||
} else {
|
||||
// Write alongside the SQL file
|
||||
let parent = source_file.parent().unwrap_or(Path::new("."));
|
||||
parent.join(format!("{}.yml", model_name))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn generate() -> Result<()> {
|
||||
Ok(())
|
||||
// New helper function to find SQL files recursively
|
||||
fn find_sql_files_recursively(dir: &Path) -> Result<Vec<PathBuf>> {
|
||||
let mut result = Vec::new();
|
||||
|
||||
if !dir.is_dir() {
|
||||
return Err(anyhow::anyhow!("Path is not a directory: {}", dir.display()));
|
||||
}
|
||||
|
||||
for entry in WalkDir::new(dir)
|
||||
.follow_links(true)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
{
|
||||
let path = entry.path();
|
||||
|
||||
if path.is_file() &&
|
||||
path.extension().and_then(|ext| ext.to_str()) == Some("sql") {
|
||||
result.push(path.to_path_buf());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn generate(
|
||||
source_path: Option<&str>,
|
||||
destination_path: Option<&str>,
|
||||
data_source_name: Option<String>,
|
||||
schema: Option<String>,
|
||||
database: Option<String>,
|
||||
flat_structure: bool,
|
||||
) -> Result<()> {
|
||||
let source = PathBuf::from(source_path.unwrap_or("."));
|
||||
let destination = PathBuf::from(destination_path.unwrap_or("."));
|
||||
|
||||
let mut cmd = GenerateCommand::new(
|
||||
source,
|
||||
destination,
|
||||
data_source_name,
|
||||
schema,
|
||||
database,
|
||||
);
|
||||
|
||||
// Set directory structure preference
|
||||
cmd.maintain_directory_structure = !flat_structure;
|
||||
|
||||
cmd.execute().await
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ pub mod update;
|
|||
pub use auth::{auth, auth_with_args, AuthArgs};
|
||||
pub use deploy::deploy;
|
||||
pub use deploy_v2::deploy_v2;
|
||||
pub use generate::GenerateCommand;
|
||||
pub use generate::{GenerateCommand, generate};
|
||||
pub use import::import;
|
||||
pub use init::init;
|
||||
pub use update::UpdateCommand;
|
||||
|
|
|
@ -5,8 +5,7 @@ mod utils;
|
|||
|
||||
use clap::{Parser, Subcommand};
|
||||
use colored::*;
|
||||
use commands::{auth::AuthArgs, deploy, deploy_v2, import, init, GenerateCommand};
|
||||
use std::path::PathBuf;
|
||||
use commands::{auth::AuthArgs, deploy_v2, import, init};
|
||||
|
||||
pub const APP_NAME: &str = "buster";
|
||||
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
@ -56,6 +55,9 @@ pub enum Commands {
|
|||
schema: Option<String>,
|
||||
#[arg(long)]
|
||||
database: Option<String>,
|
||||
/// Output YML files in a flat structure instead of maintaining directory hierarchy
|
||||
#[arg(long, default_value_t = false)]
|
||||
flat_structure: bool,
|
||||
},
|
||||
Import,
|
||||
Deploy {
|
||||
|
@ -63,6 +65,9 @@ pub enum Commands {
|
|||
path: Option<String>,
|
||||
#[arg(long, default_value_t = false)]
|
||||
dry_run: bool,
|
||||
/// Recursively search for model files in subdirectories
|
||||
#[arg(long, default_value_t = true)]
|
||||
recursive: bool,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -126,18 +131,24 @@ async fn main() {
|
|||
data_source_name,
|
||||
schema,
|
||||
database,
|
||||
flat_structure,
|
||||
} => {
|
||||
let source = source_path
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|| PathBuf::from("."));
|
||||
let dest = destination_path
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|| PathBuf::from("."));
|
||||
let cmd = GenerateCommand::new(source, dest, data_source_name, schema, database);
|
||||
cmd.execute().await
|
||||
commands::generate(
|
||||
source_path.as_deref(),
|
||||
destination_path.as_deref(),
|
||||
data_source_name,
|
||||
schema,
|
||||
database,
|
||||
flat_structure,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Commands::Import => import().await,
|
||||
Commands::Deploy { path, dry_run } => deploy_v2(path.as_deref(), dry_run).await,
|
||||
Commands::Deploy {
|
||||
path,
|
||||
dry_run,
|
||||
recursive,
|
||||
} => deploy_v2(path.as_deref(), dry_run, recursive).await,
|
||||
};
|
||||
|
||||
if let Err(e) = result {
|
||||
|
|
Loading…
Reference in New Issue