From abf09eed6b896d3618cfd12666ce410a07e23745 Mon Sep 17 00:00:00 2001 From: dal Date: Thu, 8 May 2025 03:12:44 -0600 Subject: [PATCH] ok some quick changes for bugs --- .github/workflows/cli-release.yml | 2 +- .../agents/src/agents/buster_multi_agent.rs | 34 +- .../file_tools/search_data_catalog.rs | 51 +- api/libs/handlers/src/datasets/deploy.rs | 5 +- api/libs/semantic_layer/src/models.rs | 168 +--- cli/cli/src/commands/deploy/deploy.rs | 720 ++++++++++++------ cli/cli/src/commands/run.rs | 167 ++-- docker-compose.yml | 4 +- 8 files changed, 645 insertions(+), 506 deletions(-) diff --git a/.github/workflows/cli-release.yml b/.github/workflows/cli-release.yml index 3f02a5b6e..b85f885df 100644 --- a/.github/workflows/cli-release.yml +++ b/.github/workflows/cli-release.yml @@ -130,7 +130,7 @@ jobs: - name: Extract version from Cargo.toml id: get_version run: | - VERSION=$(grep '^version =' cli/Cargo.toml | sed 's/version = "\(.*\)"/\1/') + VERSION=$(grep '^version =' cli/cli/Cargo.toml | sed 's/version = "\\(.*\ astounding\\)"/\\1/') echo "version=$VERSION" >> $GITHUB_OUTPUT echo "Extracted version: $VERSION" - name: Create Release diff --git a/api/libs/agents/src/agents/buster_multi_agent.rs b/api/libs/agents/src/agents/buster_multi_agent.rs index 16d380d16..54c766de0 100644 --- a/api/libs/agents/src/agents/buster_multi_agent.rs +++ b/api/libs/agents/src/agents/buster_multi_agent.rs @@ -25,7 +25,7 @@ use crate::{agent::ModeProvider, Agent, AgentError, AgentExt, AgentThread}; // A use litellm::AgentMessage; // Import the semantic layer models -use semantic_layer::models::SemanticLayerSpec; // Assuming models.rs is accessible like this +use semantic_layer::models::Model; // Assuming models.rs is accessible like this // Import AgentState and determine_agent_state (assuming they are pub in modes/mod.rs or similar) // If not, they might need to be moved or re-exported. @@ -140,23 +140,18 @@ impl BusterMultiAgent { let dataset_descriptions: Vec = permissioned_datasets .into_iter() .filter_map(|ds| ds.yml_content) // Get Some(String), filter out None - .map(|content| serde_yaml::from_str::(&content)) // Parse String -> Result + .map(|content| serde_yaml::from_str::(&content)) // Parse String -> Result .filter_map(|result| { // Handle Result match result { - Ok(parsed_spec) => { + Ok(model) => { // Extract info from the first model if available - if let Some(model) = parsed_spec.models.first() { - // model.description is Option, handle it - let description = model - .description - .as_deref() - .unwrap_or("No description available"); - Some(format!("{}: {}", model.name, description)) - } else { - tracing::warn!("Parsed YAML has no models"); - None - } + // model.description is Option, handle it + let description = model + .description + .as_deref() + .unwrap_or("No description available"); + Some(format!("{}: {}", model.name, description)) } Err(e) => { tracing::warn!("Failed to parse dataset YAML: {}", e); @@ -175,11 +170,12 @@ impl BusterMultiAgent { // Create the mode provider let mode_provider = Arc::new(BusterModeProvider { agent_data }); - let model = if env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()) == "local" { - "o4-mini".to_string() - } else { - "gemini-2.5-pro-exp-03-25".to_string() - }; + let model = + if env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()) == "local" { + "o4-mini".to_string() + } else { + "gemini-2.5-pro-exp-03-25".to_string() + }; // Create agent, passing the provider let agent = Arc::new(Agent::new( diff --git a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs index ea6b8b5a8..926516f55 100644 --- a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs +++ b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs @@ -1,19 +1,13 @@ use std::collections::{HashMap, HashSet}; use std::{env, sync::Arc, time::Instant}; +use database::enums::DataSourceType; use tokio::sync::Mutex; use anyhow::{Context, Result}; use async_trait::async_trait; use braintrust::{get_prompt_system_message, BraintrustClient}; -use chrono::{DateTime, Utc}; -use cohere_rust::{ - api::rerank::{ReRankModel, ReRankRequest}, - Cohere, -}; use database::{ - enums::DataSourceType, pool::get_pg_pool, - schema::datasets, schema::data_sources, }; use diesel::prelude::*; @@ -25,12 +19,11 @@ use serde_json::Value; use tracing::{debug, error, info, warn}; use uuid::Uuid; use dataset_security::{get_permissioned_datasets, PermissionedDataset}; -use sqlx::PgPool; use stored_values; use rerank::Reranker; // Import SemanticLayerSpec -use semantic_layer::models::SemanticLayerSpec; +use semantic_layer::models::Model; use crate::{agent::Agent, tools::ToolExecutor}; @@ -1179,13 +1172,12 @@ fn extract_searchable_dimensions(yml_content: &str) -> Result(yml_content) { - Ok(spec) => { + match serde_yaml::from_str::(yml_content) { + Ok(model) => { debug!("Successfully parsed yml_content with SemanticLayerSpec for extract_searchable_dimensions"); - for model in spec.models { - for dimension in model.dimensions { - if dimension.searchable { - searchable_dimensions.push(SearchableDimension { + for dimension in model.dimensions { + if dimension.searchable { + searchable_dimensions.push(SearchableDimension { model_name: model.name.clone(), dimension_name: dimension.name.clone(), // The dimension_path might need adjustment if its usage relies on the old dynamic structure. @@ -1193,7 +1185,6 @@ fn extract_searchable_dimensions(yml_content: &str) -> Result { @@ -1205,20 +1196,16 @@ fn extract_searchable_dimensions(yml_content: &str) -> Result Result Result>>>> { let mut database_info: HashMap>>> = HashMap::new(); - match serde_yaml::from_str::(yml_content) { - Ok(spec) => { + match serde_yaml::from_str::(yml_content) { + Ok(model) => { debug!("Successfully parsed yml_content with SemanticLayerSpec for extract_database_info_from_yaml"); - for model in spec.models { - let db_name = model.database.as_deref().unwrap_or("unknown_db").to_string(); - let sch_name = model.schema.as_deref().unwrap_or("unknown_schema").to_string(); - let tbl_name = model.name.clone(); // model.name is table name + let db_name = model.database.as_deref().unwrap_or("unknown_db").to_string(); + let sch_name = model.schema.as_deref().unwrap_or("unknown_schema").to_string(); + let tbl_name = model.name.clone(); // model.name is table name let mut columns = Vec::new(); for dim in model.dimensions { @@ -1259,7 +1245,6 @@ fn extract_database_info_from_yaml(yml_content: &str) -> Result { warn!( diff --git a/api/libs/handlers/src/datasets/deploy.rs b/api/libs/handlers/src/datasets/deploy.rs index 208b48689..b94fe6529 100644 --- a/api/libs/handlers/src/datasets/deploy.rs +++ b/api/libs/handlers/src/datasets/deploy.rs @@ -148,6 +148,9 @@ pub async fn deploy_datasets_handler_core( let now = Utc::now(); let dataset_id = existing_dataset_ids.get(&req.name).copied().unwrap_or_else(|| req.id.unwrap_or_else(Uuid::new_v4)); + // Use req.database as a fallback for database_identifier + let final_database_identifier = req.database_identifier.clone().or_else(|| req.database.clone()); + let dataset = database::models::Dataset { // Incorrect path id: dataset_id, name: req.name.clone(), @@ -168,7 +171,7 @@ pub async fn deploy_datasets_handler_core( organization_id: organization_id, model: req.model.clone(), yml_file: req.yml_file.clone(), // Ensure yml_file is included - database_identifier: req.database_identifier.clone(), // This was req.database before, ensure it's correct + database_identifier: final_database_identifier, // This was req.database before, ensure it's correct }; datasets_to_upsert_map.insert((req.name.clone(), data_source.id), dataset); } diff --git a/api/libs/semantic_layer/src/models.rs b/api/libs/semantic_layer/src/models.rs index 93b721502..a7f899712 100644 --- a/api/libs/semantic_layer/src/models.rs +++ b/api/libs/semantic_layer/src/models.rs @@ -1,10 +1,5 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, PartialEq)] -pub struct SemanticLayerSpec { - pub models: Vec, -} - #[derive(Debug, Deserialize, Serialize, PartialEq)] pub struct Model { pub name: String, @@ -75,167 +70,10 @@ pub struct Argument { #[derive(Debug, Deserialize, Serialize, PartialEq)] pub struct Relationship { pub name: String, - pub primary_key: String, - pub foreign_key: String, + pub source_col: String, + pub ref_col: String, #[serde(rename = "type")] pub type_: Option, // 'type' is optional according to spec pub cardinality: Option, // 'cardinality' is optional pub description: Option, -} - -#[cfg(test)] -mod tests { - use super::*; - use serde_yaml; - - #[test] - fn test_deserialize_model_file() { - let yaml_content = r#" -models: - - name: culture - description: Core model for cultural groups - original_file_path: "models/core/culture.sql" - dimensions: - - name: cultureid - description: Unique identifier for the culture - - name: name - description: Culture name - options: ["Western", "Eastern"] - measures: - - name: revenue - description: Revenue generated by the culture - filters: - - name: active_subscribed_customer - expr: logins.login_count > {threshold} AND subscriptions.subscription_status = 'active' - args: - - name: threshold - type: integer - description: Minimum number of logins - description: Customers with logins above threshold and active subscription - metrics: - - name: popular_product_revenue - expr: SUM(revenue) WHERE culture_products.product_count > 5 - description: Revenue from cultures with popular products - entities: - - name: logins - primary_key: cultureid - foreign_key: cultureid - type: LEFT - cardinality: one-to-many - description: Links to login activity - - name: subscriptions - primary_key: cultureid - foreign_key: cultureid - cardinality: one-to-one - description: Links to subscription data (no type, LLM decides) - - name: culture_products - primary_key: cultureid - foreign_key: cultureid - cardinality: many-to-many - description: Links to product associations (many-to-many via junction) - - name: logins - description: Tracks user logins by culture - dimensions: - - name: cultureid - description: Foreign key to culture - measures: - - name: login_count - description: Number of logins - entities: - - name: culture - primary_key: cultureid - foreign_key: cultureid - cardinality: many-to-one - - name: subscriptions - description: Subscription status for cultures - dimensions: - - name: cultureid - description: Foreign key to culture - - name: subscription_status - description: Current subscription status - options: ["active", "inactive"] - entities: - - name: culture - primary_key: cultureid - foreign_key: cultureid - cardinality: one-to-one - - name: culture_products - description: Junction table linking cultures to products - dimensions: - - name: cultureid - description: Foreign key to culture - - name: productid - description: Foreign key to products - measures: - - name: product_count - description: Number of products in this association - entities: - - name: culture - primary_key: cultureid - foreign_key: cultureid - cardinality: many-to-many - - name: products - primary_key: productid - foreign_key: productid - cardinality: many-to-many - "#; - - let spec: Result = serde_yaml::from_str(yaml_content); - assert!(spec.is_ok(), "Failed to deserialize YAML: {:?}", spec.err()); - let spec = spec.unwrap(); - - assert_eq!(spec.models.len(), 4); - - // Basic checks on the first model ('culture') - let culture_model = &spec.models[0]; - assert_eq!(culture_model.name, "culture"); - assert_eq!( - culture_model.description, - Some("Core model for cultural groups".to_string()) - ); - assert_eq!(culture_model.dimensions.len(), 2); - assert_eq!(culture_model.measures.len(), 1); - assert_eq!(culture_model.filters.len(), 1); - assert_eq!(culture_model.metrics.len(), 1); - assert_eq!(culture_model.relationships.len(), 3); - - // Check dimension 'name' options - let name_dim = &culture_model.dimensions[1]; - assert_eq!(name_dim.name, "name"); - assert_eq!( - name_dim.options, - Some(vec!["Western".to_string(), "Eastern".to_string()]) - ); - assert!(!name_dim.searchable); // Default false - - // Check filter 'active_subscribed_customer' args - let filter = &culture_model.filters[0]; - assert_eq!(filter.name, "active_subscribed_customer"); - assert!(!filter.args.is_empty()); - let filter_args = &filter.args; - assert_eq!(filter_args.len(), 1); - assert_eq!(filter_args[0].name, "threshold"); - assert_eq!(filter_args[0].type_, "integer"); - - // Check entity 'logins' type and cardinality - let logins_entity = &culture_model.relationships[0]; - assert_eq!(logins_entity.name, "logins"); - assert_eq!(logins_entity.type_, Some("LEFT".to_string())); - assert_eq!(logins_entity.cardinality, Some("one-to-many".to_string())); - - // Check entity 'subscriptions' type and cardinality (optional) - let subs_entity = &culture_model.relationships[1]; - assert_eq!(subs_entity.name, "subscriptions"); - assert_eq!(subs_entity.type_, None); - assert_eq!(subs_entity.cardinality, Some("one-to-one".to_string())); - - // Check second model ('logins') - let logins_model = &spec.models[1]; - assert_eq!(logins_model.name, "logins"); - assert_eq!(logins_model.dimensions.len(), 1); - assert_eq!(logins_model.measures.len(), 1); - assert_eq!(logins_model.filters.len(), 0); // Default empty vec - assert_eq!(logins_model.metrics.len(), 0); // Default empty vec - assert_eq!(logins_model.relationships.len(), 1); - } -} +} \ No newline at end of file diff --git a/cli/cli/src/commands/deploy/deploy.rs b/cli/cli/src/commands/deploy/deploy.rs index 77c52d215..bad0e5be2 100644 --- a/cli/cli/src/commands/deploy/deploy.rs +++ b/cli/cli/src/commands/deploy/deploy.rs @@ -1,19 +1,21 @@ use anyhow::{anyhow, Result}; -use std::path::{Path, PathBuf}; -use std::fs; use colored::*; +use std::fs; +use std::path::{Path, PathBuf}; -use crate::utils::{find_yml_files, ExclusionManager, ProgressTracker}; +use crate::commands::auth::check_authentication; use crate::utils::{ - buster::{BusterClient, DeployDatasetsResponse, DeployDatasetsRequest, DeployDatasetsColumnsRequest, DeployDatasetsEntityRelationshipsRequest}, + buster::{ + BusterClient, DeployDatasetsColumnsRequest, DeployDatasetsEntityRelationshipsRequest, + DeployDatasetsRequest, DeployDatasetsResponse, + }, config::{BusterConfig, ProjectContext}, file::buster_credentials::get_and_validate_buster_credentials, }; -use crate::commands::auth::check_authentication; +use crate::utils::{find_yml_files, ExclusionManager, ProgressTracker}; // Import the semantic layer models -use semantic_layer::models::{Model, SemanticLayerSpec, Dimension, Measure, Relationship}; - +use semantic_layer::models::{Dimension, Measure, Model, Relationship}; #[derive(Debug, Default)] pub struct DeployResult { @@ -53,7 +55,9 @@ impl DeployProgress { fn log_progress(&self) { println!( "\n[{}/{}] Processing: {}", - self.processed, self.total_files, self.current_file.cyan() + self.processed, + self.total_files, + self.current_file.cyan() ); if !self.status.is_empty() { println!("Status: {}", self.status.dimmed()); @@ -61,11 +65,18 @@ impl DeployProgress { } fn log_error(&self, error: &str) { - eprintln!("❌ Error processing {}: {}", self.current_file.cyan(), error.red()); + eprintln!( + "❌ Error processing {}: {}", + self.current_file.cyan(), + error.red() + ); } fn log_success(&self) { - println!("✅ Successfully processed and validated: {}", self.current_file.cyan()); + println!( + "✅ Successfully processed and validated: {}", + self.current_file.cyan() + ); } fn log_validating(&self, validation_data: (&str, &str, &str)) { @@ -76,7 +87,11 @@ impl DeployProgress { fn log_excluded(&mut self, reason: &str) { self.excluded += 1; - println!("⚠️ Skipping {} ({})", self.current_file.cyan(), reason.yellow()); + println!( + "⚠️ Skipping {} ({})", + self.current_file.cyan(), + reason.yellow() + ); } fn log_summary(&self, result: &DeployResult) { @@ -84,16 +99,27 @@ impl DeployProgress { println!("======================================"); println!( "Successfully deployed (new or updated): {} models", - (result.success.len() + result.updated.len()).to_string().green() + (result.success.len() + result.updated.len()) + .to_string() + .green() ); if !result.success.is_empty() { - println!(" ✨ New models deployed: {}", result.success.len().to_string().green()); + println!( + " ✨ New models deployed: {}", + result.success.len().to_string().green() + ); } if !result.updated.is_empty() { - println!(" 🔄 Models updated: {}", result.updated.len().to_string().cyan()); + println!( + " 🔄 Models updated: {}", + result.updated.len().to_string().cyan() + ); } if !result.no_change.is_empty() { - println!(" ➖ Models with no changes: {}", result.no_change.len().to_string().dimmed()); + println!( + " ➖ Models with no changes: {}", + result.no_change.len().to_string().dimmed() + ); } if self.excluded > 0 { @@ -104,24 +130,38 @@ impl DeployProgress { } if !result.failures.is_empty() { - println!("\n❌ Failed deployments: {} models", result.failures.len().to_string().red()); + println!( + "\n❌ Failed deployments: {} models", + result.failures.len().to_string().red() + ); println!("--------------------------------------"); for (file, model_name, errors) in &result.failures { println!( " - File: {} (Model: {})", - file.cyan(), model_name.purple() + file.cyan(), + model_name.purple() ); for error in errors { println!(" Error: {}", error.red()); } } println!("--------------------------------------"); - } + } println!("======================================"); if result.failures.is_empty() { - println!("{}", "🎉 All specified models processed successfully!".bold().green()); + println!( + "{}", + "🎉 All specified models processed successfully!" + .bold() + .green() + ); } else { - println!("{}", "⚠️ Some models failed to deploy. Please check the errors above.".bold().yellow()); + println!( + "{}", + "⚠️ Some models failed to deploy. Please check the errors above." + .bold() + .yellow() + ); } } } @@ -145,17 +185,19 @@ impl ProgressTracker for DeployProgress { /// Parse a YAML model file into semantic_layer::Model structs pub fn parse_model_file(file_path: &Path) -> Result> { let yml_content = std::fs::read_to_string(file_path)?; - + // First try parsing as a SemanticLayerSpec (with top-level 'models' key) - match serde_yaml::from_str::(&yml_content) { - Ok(spec) => { - Ok(spec.models) - }, + match serde_yaml::from_str::(&yml_content) { + Ok(model) => Ok(vec![model]), Err(_) => { // If that fails, try parsing as a single Model match serde_yaml::from_str::(&yml_content) { Ok(model) => Ok(vec![model]), - Err(e) => Err(anyhow!("Failed to parse model file {}: {}", file_path.display(), e)) + Err(e) => Err(anyhow!( + "Failed to parse model file {}: {}", + file_path.display(), + e + )), } } } @@ -173,29 +215,37 @@ pub fn resolve_model_configurations( for (mut model, proj_config_opt) in models_with_context { // Resolve data_source_name - let resolved_ds_name = model.data_source_name.clone() + let resolved_ds_name = model + .data_source_name + .clone() .or_else(|| proj_config_opt.and_then(|pc| pc.data_source_name.clone())) .or_else(|| global_buster_config.data_source_name.clone()); // Resolve schema - let resolved_schema = model.schema.clone() + let resolved_schema = model + .schema + .clone() .or_else(|| proj_config_opt.and_then(|pc| pc.schema.clone())) .or_else(|| global_buster_config.schema.clone()); // Resolve database - let resolved_database = model.database.clone() + let resolved_database = model + .database + .clone() .or_else(|| proj_config_opt.and_then(|pc| pc.database.clone())) .or_else(|| global_buster_config.database.clone()); // Validation: schema and data_source_name are essential for API processing if resolved_ds_name.is_none() { return Err(anyhow!( - "Model '{}': data_source_name could not be resolved.", model.name + "Model '{}': data_source_name could not be resolved.", + model.name )); } if resolved_schema.is_none() { return Err(anyhow!( - "Model '{}': schema could not be resolved.", model.name + "Model '{}': schema could not be resolved.", + model.name )); } @@ -214,10 +264,7 @@ pub fn resolve_model_configurations( } /// Check if a file should be excluded based on tags in SQL content -fn check_excluded_tags( - sql_path: &Option, - exclude_tags: &[String], -) -> Result { +fn check_excluded_tags(sql_path: &Option, exclude_tags: &[String]) -> Result { if exclude_tags.is_empty() || sql_path.is_none() { return Ok(false); } @@ -282,8 +329,14 @@ fn find_sql_file_in_model_paths( if dir_path.is_dir() { let sql_file_name = format!("{}.sql", model_name); let sql_file_path = dir_path.join(&sql_file_name); - if sql_file_path.is_file() { // is_file() checks for existence and that it's a file - println!(" {} Found SQL file for model '{}' at: {}", "➡️".dimmed(), model_name.purple(), sql_file_path.display().to_string().dimmed()); + if sql_file_path.is_file() { + // is_file() checks for existence and that it's a file + println!( + " {} Found SQL file for model '{}' at: {}", + "➡️".dimmed(), + model_name.purple(), + sql_file_path.display().to_string().dimmed() + ); return Some(sql_file_path); } } else { @@ -301,8 +354,15 @@ fn find_sql_file_in_model_paths( fn generate_default_sql(model: &Model) -> String { format!( "SELECT * FROM {}{}.{}", - model.database.as_ref().map(|db| format!("{}.", db)).unwrap_or_default(), - model.schema.as_ref().expect("Schema should be resolved by resolve_model_configurations"), + model + .database + .as_ref() + .map(|db| format!("{}.", db)) + .unwrap_or_default(), + model + .schema + .as_ref() + .expect("Schema should be resolved by resolve_model_configurations"), model.name ) } @@ -374,20 +434,24 @@ fn to_deploy_request(model: &Model, sql_content: String) -> DeployDatasetsReques // Measures might have an implicit aggregation (like SUM, AVG) based on their type or usage, // but semantic_layer::Measure doesn't explicitly store an `agg` field like `DeployDatasetsColumnsRequest` expects. // This might need further refinement based on how `agg` should be derived for measures. - agg: None, // Placeholder for now + agg: None, // Placeholder for now searchable: false, // Measures are typically not directly searched upon like free-text dimensions }); } // Convert entity relationships - let entity_relationships: Option> = + let entity_relationships: Option> = if !model.relationships.is_empty() { Some( - model.relationships.iter().map(|rel| DeployDatasetsEntityRelationshipsRequest { - name: rel.name.clone(), - expr: rel.foreign_key.clone(), // Assuming foreign_key is the expression for the relationship for now - type_: rel.type_.clone().unwrap_or_else(|| "LEFT".to_string()), // Default to LEFT if not specified - }).collect() + model + .relationships + .iter() + .map(|rel| DeployDatasetsEntityRelationshipsRequest { + name: rel.name.clone(), + expr: rel.source_col.clone(), // Assuming foreign_key is the expression for the relationship for now + type_: rel.type_.clone().unwrap_or_else(|| "LEFT".to_string()), // Default to LEFT if not specified + }) + .collect(), ) } else { None @@ -395,13 +459,17 @@ fn to_deploy_request(model: &Model, sql_content: String) -> DeployDatasetsReques let data_source_name = model.data_source_name.clone() .expect("data_source_name missing after validation, should be resolved by resolve_model_configurations"); - let schema = model.schema.clone() - .expect("schema missing after validation, should be resolved by resolve_model_configurations"); + let schema = model.schema.clone().expect( + "schema missing after validation, should be resolved by resolve_model_configurations", + ); // Serialize the input Model to YAML to be stored in the yml_file field of the request. // This captures the full semantic definition as sent. let yml_content_for_request = serde_yaml::to_string(&model).unwrap_or_else(|e| { - eprintln!("Error serializing model {} to YAML for deploy request: {}. Using empty string.", model.name, e); + eprintln!( + "Error serializing model {} to YAML for deploy request: {}. Using empty string.", + model.name, e + ); String::new() }); @@ -426,10 +494,18 @@ pub async fn deploy(path: Option<&str>, dry_run: bool, recursive: bool) -> Resul check_authentication().await?; let current_dir = std::env::current_dir()?; - println!("\n{}", "🚀 Starting Buster Deployment Process...".bold().blue()); - println!("Working directory: {}", current_dir.display().to_string().dimmed()); + println!( + "\n{}", + "🚀 Starting Buster Deployment Process...".bold().blue() + ); + println!( + "Working directory: {}", + current_dir.display().to_string().dimmed() + ); - let buster_config_load_dir = path.map(PathBuf::from).unwrap_or_else(|| current_dir.clone()); + let buster_config_load_dir = path + .map(PathBuf::from) + .unwrap_or_else(|| current_dir.clone()); let mut progress = DeployProgress::new(0); let mut result = DeployResult::default(); @@ -446,7 +522,10 @@ pub async fn deploy(path: Option<&str>, dry_run: bool, recursive: bool) -> Resul let buster_config = match BusterConfig::load_from_dir(&buster_config_load_dir) { Ok(Some(cfg)) => { - println!("✅ Found buster.yml configuration at {}", buster_config_load_dir.join("buster.yml").display()); + println!( + "✅ Found buster.yml configuration at {}", + buster_config_load_dir.join("buster.yml").display() + ); Some(cfg) } Ok(None) => { @@ -454,18 +533,23 @@ pub async fn deploy(path: Option<&str>, dry_run: bool, recursive: bool) -> Resul None } Err(e) => { - println!("⚠️ Error reading buster.yml: {}. Proceeding without it.", e.to_string().yellow()); + println!( + "⚠️ Error reading buster.yml: {}. Proceeding without it.", + e.to_string().yellow() + ); None } }; - - let effective_buster_config_dir = BusterConfig::base_dir(&buster_config_load_dir.join("buster.yml")).unwrap_or(buster_config_load_dir.clone()); + + let effective_buster_config_dir = + BusterConfig::base_dir(&buster_config_load_dir.join("buster.yml")) + .unwrap_or(buster_config_load_dir.clone()); let mut deploy_requests_final: Vec = Vec::new(); let mut model_mappings_final: Vec = Vec::new(); let mut processed_models_from_spec = false; - // --- PRIMARY PATH: Iterate through projects and use semantic_models_file if available --- + // --- PRIMARY PATH: Iterate through projects and use semantic_models_file if available --- if let Some(ref cfg) = buster_config { if let Some(ref projects) = cfg.projects { for project_ctx in projects { @@ -473,96 +557,170 @@ pub async fn deploy(path: Option<&str>, dry_run: bool, recursive: bool) -> Resul for semantic_models_dir_str in semantic_model_dirs { println!( "\n{}", - format!("🔍 Scanning semantic model directory for project '{}': {}", + format!( + "🔍 Scanning semantic model directory for project '{}': {}", project_ctx.identifier().cyan(), semantic_models_dir_str.cyan() - ).dimmed() + ) + .dimmed() ); - let semantic_models_dir_path = effective_buster_config_dir.join(semantic_models_dir_str); + let semantic_models_dir_path = + effective_buster_config_dir.join(semantic_models_dir_str); if !semantic_models_dir_path.is_dir() { let error_msg = format!("Specified semantic model path is not a directory or does not exist for project '{}': {}", project_ctx.identifier(), semantic_models_dir_path.display()); eprintln!("❌ {}", error_msg.red()); result.failures.push(( - semantic_models_dir_path.to_string_lossy().into_owned(), - format!("project_{}_dir_not_found", project_ctx.identifier()), - vec![error_msg] + semantic_models_dir_path.to_string_lossy().into_owned(), + format!("project_{}_dir_not_found", project_ctx.identifier()), + vec![error_msg], )); continue; // Continue to the next directory or project } // Scan this directory for .yml files // Using a temporary ExclusionManager as deploy_individual_yml_files does, or simplify if not needed here. - let exclusion_manager = ExclusionManager::new(cfg).unwrap_or_else(|_| ExclusionManager::empty()); - let yml_files_in_dir = match find_yml_files(&semantic_models_dir_path, true, &exclusion_manager, Some(&mut progress)) { // Assuming recursive scan for now + let exclusion_manager = ExclusionManager::new(cfg) + .unwrap_or_else(|_| ExclusionManager::empty()); + let yml_files_in_dir = match find_yml_files( + &semantic_models_dir_path, + true, + &exclusion_manager, + Some(&mut progress), + ) { + // Assuming recursive scan for now Ok(files) => files, Err(e) => { progress.log_error(&format!("Failed to scan for YML files in directory '{}' for project '{}': {}", semantic_models_dir_path.display(), project_ctx.identifier(), e)); result.failures.push(( semantic_models_dir_path.to_string_lossy().into_owned(), format!("project_{}_scan_failed", project_ctx.identifier()), - vec![e.to_string()] + vec![e.to_string()], )); continue; // Next directory or project } }; if yml_files_in_dir.is_empty() { - println!("ℹ️ No .yml files found in directory: {}", semantic_models_dir_path.display().to_string().dimmed()); + println!( + "ℹ️ No .yml files found in directory: {}", + semantic_models_dir_path.display().to_string().dimmed() + ); continue; } - + processed_models_from_spec = true; // Mark that we are processing based on config progress.total_files += yml_files_in_dir.len(); for yml_file_path in yml_files_in_dir { - progress.current_file = yml_file_path.strip_prefix(&effective_buster_config_dir).unwrap_or(&yml_file_path).to_string_lossy().into_owned(); - progress.status = format!("Parsing models from '{}' in project '{}'...", yml_file_path.file_name().unwrap_or_default().to_string_lossy(), project_ctx.identifier().cyan()); + progress.current_file = yml_file_path + .strip_prefix(&effective_buster_config_dir) + .unwrap_or(&yml_file_path) + .to_string_lossy() + .into_owned(); + progress.status = format!( + "Parsing models from '{}' in project '{}'...", + yml_file_path + .file_name() + .unwrap_or_default() + .to_string_lossy(), + project_ctx.identifier().cyan() + ); progress.log_progress(); let parsed_models = match parse_model_file(&yml_file_path) { Ok(m) => m, Err(e) => { - progress.log_error(&format!("Failed to parse model file '{}': {}", yml_file_path.display(), e)); - result.failures.push((progress.current_file.clone(), "parse_failed".to_string(), vec![e.to_string()])); + progress.log_error(&format!( + "Failed to parse model file '{}': {}", + yml_file_path.display(), + e + )); + result.failures.push(( + progress.current_file.clone(), + "parse_failed".to_string(), + vec![e.to_string()], + )); continue; } }; - let models_with_context: Vec<(Model, Option<&ProjectContext>)> = parsed_models.into_iter() - .map(|m| (m, Some(project_ctx))) - .collect(); + let models_with_context: Vec<(Model, Option<&ProjectContext>)> = + parsed_models + .into_iter() + .map(|m| (m, Some(project_ctx))) + .collect(); - let resolved_models = match resolve_model_configurations(models_with_context, cfg) { - Ok(models) => models, - Err(e) => { - progress.log_error(&format!("Config resolution for '{}': {}", yml_file_path.display(), e)); - result.failures.push((progress.current_file.clone(), "config_resolution_failed".to_string(), vec![e.to_string()])); - continue; - } - }; - - for model in resolved_models { - progress.processed += 1; - progress.current_file = format!("{} (from {} in project '{}')", model.name.purple(), yml_file_path.file_name().unwrap_or_default().to_string_lossy(), project_ctx.identifier().cyan()); - progress.status = format!("Resolving SQL for model '{}'", model.name.purple()); - progress.log_progress(); - - let sql_content = match get_sql_content_for_model(&model, Some(cfg), Some(project_ctx), &effective_buster_config_dir, &yml_file_path) { - Ok(content) => content, + let resolved_models = + match resolve_model_configurations(models_with_context, cfg) { + Ok(models) => models, Err(e) => { - progress.log_error(&format!("Failed to get SQL for model {}: {}", model.name.purple(), e)); - result.failures.push((progress.current_file.clone(), model.name.clone(), vec![e.to_string()])); + progress.log_error(&format!( + "Config resolution for '{}': {}", + yml_file_path.display(), + e + )); + result.failures.push(( + progress.current_file.clone(), + "config_resolution_failed".to_string(), + vec![e.to_string()], + )); continue; } }; - - model_mappings_final.push(ModelMapping { - file: yml_file_path.file_name().unwrap_or_default().to_string_lossy().into_owned(), - model_name: model.name.clone() + + for model in resolved_models { + progress.processed += 1; + progress.current_file = format!( + "{} (from {} in project '{}')", + model.name.purple(), + yml_file_path + .file_name() + .unwrap_or_default() + .to_string_lossy(), + project_ctx.identifier().cyan() + ); + progress.status = + format!("Resolving SQL for model '{}'", model.name.purple()); + progress.log_progress(); + + let sql_content = match get_sql_content_for_model( + &model, + Some(cfg), + Some(project_ctx), + &effective_buster_config_dir, + &yml_file_path, + ) { + Ok(content) => content, + Err(e) => { + progress.log_error(&format!( + "Failed to get SQL for model {}: {}", + model.name.purple(), + e + )); + result.failures.push(( + progress.current_file.clone(), + model.name.clone(), + vec![e.to_string()], + )); + continue; + } + }; + + model_mappings_final.push(ModelMapping { + file: yml_file_path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + model_name: model.name.clone(), }); deploy_requests_final.push(to_deploy_request(&model, sql_content)); - println!(" {} Model '{}' prepared for deployment.", "👍".green(), model.name.purple()); + println!( + " {} Model '{}' prepared for deployment.", + "👍".green(), + model.name.purple() + ); } } } @@ -574,42 +732,54 @@ pub async fn deploy(path: Option<&str>, dry_run: bool, recursive: bool) -> Resul } } - // --- FALLBACK or ADDITIONAL: Scan for individual .yml files --- + // --- FALLBACK or ADDITIONAL: Scan for individual .yml files --- if !processed_models_from_spec { // Check if any project *attempted* to specify paths, to adjust message - let any_project_had_paths_configured = buster_config.as_ref().map_or(false, |cfg| - cfg.projects.as_ref().map_or(false, |p_vec| - p_vec.iter().any(|pc| pc.semantic_model_paths.as_ref().map_or(false, |paths| !paths.is_empty())) - ) - ); + let any_project_had_paths_configured = buster_config.as_ref().map_or(false, |cfg| { + cfg.projects.as_ref().map_or(false, |p_vec| { + p_vec.iter().any(|pc| { + pc.semantic_model_paths + .as_ref() + .map_or(false, |paths| !paths.is_empty()) + }) + }) + }); if any_project_had_paths_configured { println!("⚠️ Semantic model paths were specified in buster.yml project(s) but may have failed to yield models or directories were empty/inaccessible. Now attempting to scan for individual .yml files based on broader model_paths configuration."); } else if buster_config.is_some() { println!("ℹ️ No specific semantic_model_paths found or processed from projects in buster.yml. Falling back to scanning for individual .yml files based on model_paths."); } else { - println!("ℹ️ No buster.yml loaded. Scanning current/target directory for individual .yml files."); + println!("ℹ️ No buster.yml loaded. Scanning current/target directory for individual .yml files."); } - + deploy_individual_yml_files( - buster_config.as_ref(), + buster_config.as_ref(), &effective_buster_config_dir, // Use effective_buster_config_dir as the base for resolving model_paths - recursive, - &mut progress, - &mut result, - &mut deploy_requests_final, - &mut model_mappings_final - ).await?; + recursive, + &mut progress, + &mut result, + &mut deploy_requests_final, + &mut model_mappings_final, + ) + .await?; } else { println!("{}", "\nℹ️ Processed models from semantic_model_paths specified in buster.yml. Skipping scan for individual .yml files.".dimmed()); } - // --- DEPLOYMENT TO API (remains largely the same, uses deploy_requests_final and model_mappings_final) --- if !deploy_requests_final.is_empty() { if dry_run { - println!("\n{}", "🔍 Dry Run Mode Activated. No changes will be made.".bold().yellow()); - println!("📦 Would attempt to deploy {} models:", deploy_requests_final.len()); + println!( + "\n{}", + "🔍 Dry Run Mode Activated. No changes will be made." + .bold() + .yellow() + ); + println!( + "📦 Would attempt to deploy {} models:", + deploy_requests_final.len() + ); for request in &deploy_requests_final { println!(" -------------------------------------"); println!(" Model Name: {}", request.name.purple()); @@ -620,8 +790,15 @@ pub async fn deploy(path: Option<&str>, dry_run: bool, recursive: bool) -> Resul } println!(" Description: {}", request.description.dimmed()); println!(" Columns: {}", request.columns.len()); - if request.entity_relationships.as_ref().map_or(false, |er| !er.is_empty()) { - println!(" Relationships: {}", request.entity_relationships.as_ref().unwrap().len()); + if request + .entity_relationships + .as_ref() + .map_or(false, |er| !er.is_empty()) + { + println!( + " Relationships: {}", + request.entity_relationships.as_ref().unwrap().len() + ); } // Optionally print SQL or YML content if needed for dry run, but can be verbose // println!(" SQL Definition:\n{}", request.sql_definition.as_deref().unwrap_or("N/A")); @@ -632,20 +809,34 @@ pub async fn deploy(path: Option<&str>, dry_run: bool, recursive: bool) -> Resul return Ok(()); } - println!("\n{}", format!("🚀 Deploying {} models to Buster Cloud...", deploy_requests_final.len()).bold().blue()); + println!( + "\n{}", + format!( + "🚀 Deploying {} models to Buster Cloud...", + deploy_requests_final.len() + ) + .bold() + .blue() + ); let client = client.expect("BusterClient should be initialized for non-dry run"); - // ... (rest of deployment logic, calling client.deploy_datasets(deploy_requests_final).await ...) + // ... (rest of deployment logic, calling client.deploy_datasets(deploy_requests_final).await ...) // ... (handle_deploy_response(&response, &mut result, &model_mappings_final, &progress)) ... - match client.deploy_datasets(deploy_requests_final).await { - Ok(response) => handle_deploy_response(&response, &mut result, &model_mappings_final, &progress), + match client.deploy_datasets(deploy_requests_final).await { + Ok(response) => { + handle_deploy_response(&response, &mut result, &model_mappings_final, &progress) + } Err(e) => { - eprintln!("❌ Critical error during deployment API call: {}\nDetailed error: {:?}", e.to_string().red(), e); + eprintln!( + "❌ Critical error during deployment API call: {}\nDetailed error: {:?}", + e.to_string().red(), + e + ); // Populate failures for all models that were attempted if a general API error occurs for mapping in model_mappings_final { result.failures.push(( - mapping.file.clone(), - mapping.model_name.clone(), - vec![format!("API deployment failed: {}", e)] + mapping.file.clone(), + mapping.model_name.clone(), + vec![format!("API deployment failed: {}", e)], )); } } @@ -683,11 +874,19 @@ async fn deploy_individual_yml_files( if let Some(cfg) = buster_config { let effective_paths_with_contexts = cfg.resolve_effective_model_paths(base_search_dir); if !effective_paths_with_contexts.is_empty() { - println!("\n{}", "ℹ️ Using effective model paths for individual .yml scan:".dimmed()); + println!( + "\n{}", + "ℹ️ Using effective model paths for individual .yml scan:".dimmed() + ); for (path, project_ctx_opt) in effective_paths_with_contexts { // Log the path and its associated project context identifier if available - let context_identifier = project_ctx_opt.map_or_else(|| "Global/Default".to_string(), |ctx| ctx.identifier()); - println!(" - Path: {}, Context: {}", path.display(), context_identifier.dimmed()); + let context_identifier = project_ctx_opt + .map_or_else(|| "Global/Default".to_string(), |ctx| ctx.identifier()); + println!( + " - Path: {}, Context: {}", + path.display(), + context_identifier.dimmed() + ); if path.is_dir() { match find_yml_files(&path, recursive, &exclusion_manager, Some(progress)) { @@ -695,10 +894,16 @@ async fn deploy_individual_yml_files( for f in files_in_dir { files_to_process_with_context.push((f, project_ctx_opt)); } - }, - Err(e) => eprintln!("Error finding YML files in {}: {}", path.display(), format!("{}", e).red()), + } + Err(e) => eprintln!( + "Error finding YML files in {}: {}", + path.display(), + format!("{}", e).red() + ), } - } else if path.is_file() && path.extension().and_then(|ext| ext.to_str()) == Some("yml") { + } else if path.is_file() + && path.extension().and_then(|ext| ext.to_str()) == Some("yml") + { if path.file_name().and_then(|name| name.to_str()) != Some("buster.yml") { files_to_process_with_context.push((path.clone(), project_ctx_opt)); } @@ -706,55 +911,93 @@ async fn deploy_individual_yml_files( } } else { // No effective paths from config, scan base_search_dir with no specific project context. - match find_yml_files(base_search_dir, recursive, &exclusion_manager, Some(progress)) { + match find_yml_files( + base_search_dir, + recursive, + &exclusion_manager, + Some(progress), + ) { Ok(files_in_dir) => { for f in files_in_dir { files_to_process_with_context.push((f, None)); } - }, - Err(e) => eprintln!("Error finding YML files in {}: {}", base_search_dir.display(), format!("{}", e).red()), + } + Err(e) => eprintln!( + "Error finding YML files in {}: {}", + base_search_dir.display(), + format!("{}", e).red() + ), } } } else { // No buster_config at all, scan base_search_dir with no project context. - match find_yml_files(base_search_dir, recursive, &exclusion_manager, Some(progress)) { + match find_yml_files( + base_search_dir, + recursive, + &exclusion_manager, + Some(progress), + ) { Ok(files_in_dir) => { for f in files_in_dir { files_to_process_with_context.push((f, None)); } - }, - Err(e) => eprintln!("Error finding YML files in {}: {}", base_search_dir.display(), format!("{}", e).red()), + } + Err(e) => eprintln!( + "Error finding YML files in {}: {}", + base_search_dir.display(), + format!("{}", e).red() + ), } }; - println!("\nFound {} individual model .yml files to process.", files_to_process_with_context.len().to_string().cyan()); + println!( + "\nFound {} individual model .yml files to process.", + files_to_process_with_context.len().to_string().cyan() + ); progress.total_files = files_to_process_with_context.len(); // Reset total files for this phase progress.processed = 0; // Reset processed for this phase for (yml_path, project_ctx_opt) in files_to_process_with_context { progress.processed += 1; - progress.current_file = yml_path.strip_prefix(base_search_dir).unwrap_or(&yml_path).to_string_lossy().into_owned(); + progress.current_file = yml_path + .strip_prefix(base_search_dir) + .unwrap_or(&yml_path) + .to_string_lossy() + .into_owned(); progress.status = "Parsing individual model file...".to_string(); progress.log_progress(); - let parsed_models = match parse_model_file(&yml_path) { // parse_model_file handles single or multi-model in one yml + let parsed_models = match parse_model_file(&yml_path) { + // parse_model_file handles single or multi-model in one yml Ok(models) => models, Err(e) => { progress.log_error(&format!("Failed to parse model file: {}", e)); - result.failures.push((progress.current_file.clone(), "unknown".to_string(), vec![e.to_string()])); + result.failures.push(( + progress.current_file.clone(), + "unknown".to_string(), + vec![e.to_string()], + )); continue; } }; - let models_with_context: Vec<(Model, Option<&ProjectContext>)> = parsed_models.into_iter() + let models_with_context: Vec<(Model, Option<&ProjectContext>)> = parsed_models + .into_iter() .map(|m| (m, project_ctx_opt)) .collect(); - - let resolved_models = match resolve_model_configurations(models_with_context, buster_config.unwrap_or(&BusterConfig::default())) { + + let resolved_models = match resolve_model_configurations( + models_with_context, + buster_config.unwrap_or(&BusterConfig::default()), + ) { Ok(models) => models, Err(e) => { progress.log_error(&format!("Configuration resolution failed: {}", e)); - result.failures.push((progress.current_file.clone(), "multiple".to_string(), vec![e.to_string()])); + result.failures.push(( + progress.current_file.clone(), + "multiple".to_string(), + vec![e.to_string()], + )); continue; } }; @@ -762,17 +1005,38 @@ async fn deploy_individual_yml_files( for model in resolved_models { // Use effective_buster_config_dir for resolving SQL paths if original_file_path is used // For find_sql_file, yml_path is the context - let sql_content = match get_sql_content_for_model(&model, buster_config, project_ctx_opt, base_search_dir, &yml_path) { + let sql_content = match get_sql_content_for_model( + &model, + buster_config, + project_ctx_opt, + base_search_dir, + &yml_path, + ) { Ok(content) => content, Err(e) => { - progress.log_error(&format!("Failed to get SQL for model {}: {}", model.name.purple(), e)); - result.failures.push((progress.current_file.clone(), model.name.clone(), vec![e.to_string()])); + progress.log_error(&format!( + "Failed to get SQL for model {}: {}", + model.name.purple(), + e + )); + result.failures.push(( + progress.current_file.clone(), + model.name.clone(), + vec![e.to_string()], + )); continue; } }; - model_mappings_final.push(ModelMapping { file: progress.current_file.clone(), model_name: model.name.clone() }); + model_mappings_final.push(ModelMapping { + file: progress.current_file.clone(), + model_name: model.name.clone(), + }); deploy_requests_final.push(to_deploy_request(&model, sql_content)); - println!(" {} Model '{}' prepared from individual file.", "👍".green(), model.name.purple()); + println!( + " {} Model '{}' prepared from individual file.", + "👍".green(), + model.name.purple() + ); } } Ok(()) @@ -788,7 +1052,10 @@ fn handle_deploy_response( let mut has_validation_errors = false; // Process validation results - println!("\n{}", "🔬 Processing deployment results from Buster Cloud...".dimmed()); + println!( + "\n{}", + "🔬 Processing deployment results from Buster Cloud...".dimmed() + ); for validation in response.results.iter() { // Find corresponding file from model mapping let file = model_mappings @@ -797,13 +1064,17 @@ fn handle_deploy_response( .map(|m| m.file.clone()) .unwrap_or_else(|| "".to_string()); - progress.log_validating((&validation.model_name, &validation.data_source_name, &validation.schema)); - + progress.log_validating(( + &validation.model_name, + &validation.data_source_name, + &validation.schema, + )); + if validation.success { // Check if it's a new deployment or an update based on previous state (if tracked) // For now, we'll simplify. If API says success, it's either new or successfully updated (no-op or actual change). // We can differentiate further if the API provides more info or if we compare with a previous state. - + // Let's assume for now the API doesn't tell us if it was new/update/no-change directly in this part of response. // We will base it on whether an ID was present in the request (implying update) or not (implying create). // This is a simplification as the `id` field in `DeployDatasetsRequest` is `Option` and might be `None` even for updates if not managed by CLI state. @@ -811,7 +1082,7 @@ fn handle_deploy_response( // For simplicity here, we'll assume all successes from API are either new or updated. // The `DeployResult` struct could be enhanced to better differentiate if needed by tracking initial state or from richer API responses. - + // If we had an ID in the original request for this model, it implies it was an update attempt. // For now, just add to generic success. We can refine later. result.success.push(( @@ -819,10 +1090,12 @@ fn handle_deploy_response( validation.model_name.clone(), validation.data_source_name.clone(), )); - } else { has_validation_errors = true; - eprintln!("❌ Validation failed for model: {}", validation.model_name.purple()); + eprintln!( + "❌ Validation failed for model: {}", + validation.model_name.purple() + ); if !validation.errors.is_empty() { eprintln!(" Errors:"); @@ -874,11 +1147,21 @@ fn handle_deploy_response( } } -fn parse_semantic_layer_spec(file_path: &Path) -> Result { - let yml_content = fs::read_to_string(file_path) - .map_err(|e| anyhow!("Failed to read semantic layer spec file {}: {}", file_path.display(), e))?; - serde_yaml::from_str::(&yml_content) - .map_err(|e| anyhow!("Failed to parse semantic layer spec from {}: {}", file_path.display(), e)) +fn parse_semantic_layer_spec(file_path: &Path) -> Result { + let yml_content = fs::read_to_string(file_path).map_err(|e| { + anyhow!( + "Failed to read semantic layer spec file {}: {}", + file_path.display(), + e + ) + })?; + serde_yaml::from_str::(&yml_content).map_err(|e| { + anyhow!( + "Failed to parse semantic layer spec from {}: {}", + file_path.display(), + e + ) + }) } #[cfg(test)] @@ -903,7 +1186,7 @@ mod tests { #[test] fn test_parse_model_file() -> Result<()> { let temp_dir = TempDir::new()?; - + let single_model_yml = r#" name: test_model description: "Test model" @@ -917,14 +1200,14 @@ measures: description: "First measure" type: "number" "#; - + let single_model_path = temp_dir.path().join("single_model.yml"); fs::write(&single_model_path, single_model_yml)?; - + let models = parse_model_file(&single_model_path)?; assert_eq!(models.len(), 1); assert_eq!(models[0].name, "test_model"); - + let multi_model_yml = r#" models: - name: model1 @@ -940,15 +1223,15 @@ models: description: "First measure" type: "number" "#; - + let multi_model_path = temp_dir.path().join("multi_model.yml"); fs::write(&multi_model_path, multi_model_yml)?; - + let models = parse_model_file(&multi_model_path)?; assert_eq!(models.len(), 2); assert_eq!(models[0].name, "model1"); assert_eq!(models[1].name, "model2"); - + Ok(()) } @@ -966,7 +1249,7 @@ models: filters: vec![], relationships: vec![], }; - + let model2 = Model { name: "model2".to_string(), description: Some("Model 2".to_string()), @@ -979,7 +1262,7 @@ models: filters: vec![], relationships: vec![], }; - + let model3 = Model { name: "model3".to_string(), description: Some("Model 3".to_string()), @@ -992,7 +1275,7 @@ models: filters: vec![], relationships: vec![], }; - + let project_context = ProjectContext { data_source_name: Some("project_ds".to_string()), schema: Some("project_schema".to_string()), @@ -1003,7 +1286,7 @@ models: name: Some("Test Project".to_string()), semantic_model_paths: None, }; - + let global_config = BusterConfig { data_source_name: Some("global_ds".to_string()), schema: Some("global_schema".to_string()), @@ -1014,27 +1297,42 @@ models: semantic_model_paths: None, projects: None, }; - + let models_with_context = vec![ (model1, Some(&project_context)), (model2, Some(&project_context)), (model3, None), ]; - + let resolved_models = resolve_model_configurations(models_with_context, &global_config)?; - - assert_eq!(resolved_models[0].data_source_name, Some("model1_ds".to_string())); - assert_eq!(resolved_models[0].schema, Some("project_schema".to_string())); + + assert_eq!( + resolved_models[0].data_source_name, + Some("model1_ds".to_string()) + ); + assert_eq!( + resolved_models[0].schema, + Some("project_schema".to_string()) + ); assert_eq!(resolved_models[0].database, Some("global_db".to_string())); - assert_eq!(resolved_models[1].data_source_name, Some("project_ds".to_string())); - assert_eq!(resolved_models[1].schema, Some("project_schema".to_string())); + assert_eq!( + resolved_models[1].data_source_name, + Some("project_ds".to_string()) + ); + assert_eq!( + resolved_models[1].schema, + Some("project_schema".to_string()) + ); assert_eq!(resolved_models[1].database, Some("model2_db".to_string())); - - assert_eq!(resolved_models[2].data_source_name, Some("global_ds".to_string())); + + assert_eq!( + resolved_models[2].data_source_name, + Some("global_ds".to_string()) + ); assert_eq!(resolved_models[2].schema, Some("global_schema".to_string())); assert_eq!(resolved_models[2].database, Some("global_db".to_string())); - + Ok(()) } @@ -1046,39 +1344,26 @@ models: data_source_name: Some("test_source".to_string()), database: Some("test_db".to_string()), schema: Some("test_schema".to_string()), - dimensions: vec![ - Dimension { - name: "dim1".to_string(), - description: Some("First dimension".to_string()), - type_: Some("string".to_string()), - searchable: true, // Example value - options: None, - } - ], - measures: vec![ - Measure { - name: "measure1".to_string(), - description: Some("First measure".to_string()), - type_: Some("number".to_string()), - } - ], + dimensions: vec![Dimension { + name: "dim1".to_string(), + description: Some("First dimension".to_string()), + type_: Some("string".to_string()), + searchable: true, // Example value + options: None, + }], + measures: vec![Measure { + name: "measure1".to_string(), + description: Some("First measure".to_string()), + type_: Some("number".to_string()), + }], metrics: vec![], filters: vec![], - relationships: vec![ - Relationship { - name: "related_model".to_string(), - primary_key: "id".to_string(), - foreign_key: "related_id".to_string(), - type_: Some("LEFT".to_string()), - cardinality: Some("one-to-many".to_string()), - description: Some("Relationship to another model".to_string()), - } - ], + relationships: vec![], }; - + let sql_content = "SELECT * FROM test_schema.test_model"; let request = to_deploy_request(&model, sql_content.to_string()); // Call the restored function - + assert_eq!(request.name, "test_model"); assert_eq!(request.columns.len(), 2); // 1 dim, 1 measure assert_eq!(request.columns[0].name, "dim1"); @@ -1086,10 +1371,13 @@ models: assert_eq!(request.columns[1].name, "measure1"); assert!(request.entity_relationships.is_some()); assert_eq!(request.entity_relationships.as_ref().unwrap().len(), 1); - assert_eq!(request.entity_relationships.as_ref().unwrap()[0].name, "related_model"); + assert_eq!( + request.entity_relationships.as_ref().unwrap()[0].name, + "related_model" + ); let expected_yml_content = serde_yaml::to_string(&model)?; assert_eq!(request.yml_file, Some(expected_yml_content)); - + Ok(()) } -} \ No newline at end of file +} diff --git a/cli/cli/src/commands/run.rs b/cli/cli/src/commands/run.rs index dfdfdbebe..daad8db8b 100644 --- a/cli/cli/src/commands/run.rs +++ b/cli/cli/src/commands/run.rs @@ -12,7 +12,6 @@ use std::time::Duration; #[derive(RustEmbed)] #[folder = "../../"] #[include = "docker-compose.yml"] -#[include = "litellm_vertex_config.yaml"] #[include = "supabase/.env.example"] #[include = "supabase/**/*"] #[exclude = "supabase/volumes/db/data/**/*"] @@ -70,6 +69,28 @@ async fn setup_persistent_app_environment() -> Result { )) })?; + // Initialize .env from supabase/.env.example, which should have been extracted by StaticAssets loop + let example_env_src_path = app_base_dir.join("supabase/.env.example"); + let main_dot_env_target_path = app_base_dir.join(".env"); + + if example_env_src_path.exists() { + fs::copy(&example_env_src_path, &main_dot_env_target_path).map_err(|e| { + BusterError::CommandError(format!( + "Failed to initialize {} from {}: {}", + main_dot_env_target_path.display(), + example_env_src_path.display(), + e + )) + })?; + } else { + // This case should ideally not be hit if supabase/.env.example is correctly embedded and extracted. + // If it's missing, it indicates an issue with asset handling. + return Err(BusterError::CommandError(format!( + "Critical setup error: {} not found after asset extraction. Cannot initialize main .env file.", + example_env_src_path.display() + ))); + } + let target_dotenv_path = app_base_dir.join(".env"); // --- BEGIN API Key and Reranker Setup using config_utils --- @@ -207,6 +228,7 @@ pub async fn reset() -> Result<(), BusterError> { println!( "This can lead to a complete wipe of the Buster database and any other local service data." ); + println!("The ~/.buster directory will be wiped, except for ~/.buster/credentials.yml if it exists."); println!("This action is irreversible."); print!("Are you sure you want to proceed with resetting? (y/n): "); io::stdout() @@ -223,7 +245,34 @@ pub async fn reset() -> Result<(), BusterError> { return Ok(()); } - let persistent_app_dir = setup_persistent_app_environment().await?; + let app_base_dir = config_utils::get_app_base_dir().map_err(|e| { + BusterError::CommandError(format!("Failed to get app base directory: {}", e)) + })?; + println!("Target application directory for reset: {}", app_base_dir.display()); + + // Backup credentials if they exist + let credentials_path = app_base_dir.join("credentials.yml"); + let credentials_backup = fs::read(&credentials_path).ok(); + if credentials_backup.is_some() { + println!("Found credentials.yml at {}, will attempt to preserve it.", credentials_path.display()); + } else { + println!("No credentials.yml found at {} to preserve.", credentials_path.display()); + } + + // Ensure app_base_dir exists and essential files for Docker commands are present + // These files will be wiped later with the rest of app_base_dir. + fs::create_dir_all(&app_base_dir).map_err(|e| BusterError::CommandError(format!("Failed to create app base directory {}: {}", app_base_dir.display(), e)))?; + + let dc_filename = "docker-compose.yml"; + let dc_asset = StaticAssets::get(dc_filename) + .ok_or_else(|| BusterError::CommandError(format!("Failed to get embedded asset: {}", dc_filename)))?; + fs::write(app_base_dir.join(dc_filename), dc_asset.data.as_ref()).map_err(|e| BusterError::CommandError(format!("Failed to write temporary {}: {}", dc_filename, e)))?; + + // docker-compose.yml references supabase/.env, so ensure it exists (can be empty) + let supabase_dir = app_base_dir.join("supabase"); + fs::create_dir_all(&supabase_dir).map_err(|e| BusterError::CommandError(format!("Failed to create supabase directory in app base dir: {}", e)))?; + fs::write(supabase_dir.join(".env"), "").map_err(|e| BusterError::CommandError(format!("Failed to write temporary supabase/.env: {}",e)))?; + let pb = ProgressBar::new_spinner(); pb.enable_steady_tick(Duration::from_millis(120)); @@ -235,16 +284,16 @@ pub async fn reset() -> Result<(), BusterError> { ); // Step 1: Stop services - pb.set_message("Resetting Buster services (step 1/4): Stopping services..."); + pb.set_message("Resetting Buster services (1/3): Stopping services..."); let mut down_cmd = Command::new("docker"); down_cmd - .current_dir(&persistent_app_dir) + .current_dir(&app_base_dir) // Use the prepared app_base_dir .arg("compose") .arg("-p") .arg("buster") .arg("-f") - .arg("docker-compose.yml") + .arg("docker-compose.yml") // Relative to app_base_dir .arg("down"); let down_output = down_cmd.output().map_err(|e| { @@ -259,72 +308,29 @@ Stdout: Stderr: {}", down_output.status, - persistent_app_dir.display(), + app_base_dir.display(), String::from_utf8_lossy(&down_output.stdout), String::from_utf8_lossy(&down_output.stderr) ); pb.abandon_with_message("Error: docker compose down failed. See console for details."); - println!("\nDocker Compose Down Error Details:\n{}", err_msg); + println!(" +Docker Compose Down Error Details: +{}", err_msg); return Err(BusterError::CommandError(err_msg)); } + pb.println("Services stopped successfully."); - // Step 2: Clear persistent data volumes - pb.set_message("Resetting Buster services (step 2/4): Clearing persistent data volumes..."); - let db_volume_path = persistent_app_dir.join("supabase/volumes/db/data"); - let storage_volume_path = persistent_app_dir.join("supabase/volumes/storage"); - if db_volume_path.exists() { - fs::remove_dir_all(&db_volume_path).map_err(|e| { - BusterError::CommandError(format!( - "Failed to remove db volume at {}: {}", - db_volume_path.display(), - e - )) - })?; - } - fs::create_dir_all(&db_volume_path).map_err(|e| { - BusterError::CommandError(format!( - "Failed to recreate db volume at {}: {}", - db_volume_path.display(), - e - )) - })?; - pb.println(format!( - "Successfully cleared and recreated database volume: {}", - db_volume_path.display() - )); - - if storage_volume_path.exists() { - fs::remove_dir_all(&storage_volume_path).map_err(|e| { - BusterError::CommandError(format!( - "Failed to remove storage volume at {}: {}", - storage_volume_path.display(), - e - )) - })?; - } - fs::create_dir_all(&storage_volume_path).map_err(|e| { - BusterError::CommandError(format!( - "Failed to recreate storage volume at {}: {}", - storage_volume_path.display(), - e - )) - })?; - pb.println(format!( - "Successfully cleared and recreated storage volume: {}", - storage_volume_path.display() - )); - - // Step 3: Identify service images - pb.set_message("Resetting Buster services (step 3/4): Identifying service images..."); + // Step 2: Identify and Remove service images + pb.set_message("Resetting Buster services (2/3): Removing service images..."); let mut config_images_cmd = Command::new("docker"); config_images_cmd - .current_dir(&persistent_app_dir) + .current_dir(&app_base_dir) // Use the prepared app_base_dir .arg("compose") .arg("-p") .arg("buster") .arg("-f") - .arg("docker-compose.yml") + .arg("docker-compose.yml") // Relative to app_base_dir .arg("config") .arg("--images"); @@ -343,7 +349,7 @@ Stdout: Stderr: {}", config_images_output.status, - persistent_app_dir.display(), + app_base_dir.display(), String::from_utf8_lossy(&config_images_output.stdout), String::from_utf8_lossy(&config_images_output.stderr) ); @@ -351,7 +357,9 @@ Stderr: "Error: Failed to identify service images. See console for details.", ); println!( - "\nDocker Compose Config --images Error Details:\n{}", + " +Docker Compose Config --images Error Details: +{}", err_msg ); return Err(BusterError::CommandError(err_msg)); @@ -363,29 +371,25 @@ Stderr: .filter(|line| !line.trim().is_empty()) .collect(); - // Step 4: Remove service images if image_names.is_empty() { pb.println( "No images identified by docker-compose config --images. Skipping image removal.", ); } else { - pb.set_message(format!( - "Resetting Buster services (step 4/4): Removing {} service image(s)...", - image_names.len() - )); + pb.println(format!("Found {} image(s) to remove.", image_names.len())); for (index, image_name) in image_names.iter().enumerate() { let current_image_name = image_name.trim(); if current_image_name.is_empty() { continue; } pb.set_message(format!( - "Resetting Buster services (step 4/4): Removing image {}/{} ('{}')...", + "Resetting Buster services (2/3): Removing image {}/{} ('{}')...", index + 1, image_names.len(), current_image_name )); let mut rmi_cmd = Command::new("docker"); - rmi_cmd.arg("image").arg("rm").arg(current_image_name); + rmi_cmd.arg("image").arg("rm").arg(current_image_name); // Image names are global let rmi_output = rmi_cmd.output().map_err(|e| { BusterError::CommandError(format!( @@ -394,19 +398,44 @@ Stderr: )) })?; - // Log warning on failure but continue, as image might not exist or be in use by other non-project containers if !rmi_output.status.success() { let rmi_stderr = String::from_utf8_lossy(&rmi_output.stderr); if !rmi_stderr.trim().is_empty() && !rmi_stderr.contains("No such image") { - // Don't warn if image was already gone pb.println(format!("Warning: Could not remove image '{}'. It might be in use or already removed. Stderr: {}", current_image_name, rmi_stderr.trim())); } + } else { + pb.println(format!("Successfully removed image: {}", current_image_name)); } } } + pb.println("Service image removal process complete."); + + // Step 3: Wipe app_base_dir and restore credentials + pb.set_message(format!("Resetting Buster services (3/3): Wiping {} and restoring credentials...", app_base_dir.display())); + + if app_base_dir.exists() { + fs::remove_dir_all(&app_base_dir).map_err(|e| { + BusterError::CommandError(format!("Failed to remove app directory {}: {}", app_base_dir.display(), e)) + })?; + pb.println(format!("Successfully removed directory: {}", app_base_dir.display())); + } + + fs::create_dir_all(&app_base_dir).map_err(|e| { + BusterError::CommandError(format!("Failed to recreate app directory {}: {}", app_base_dir.display(), e)) + })?; + pb.println(format!("Successfully recreated directory: {}", app_base_dir.display())); + + if let Some(backup_data) = credentials_backup { + fs::write(&credentials_path, backup_data).map_err(|e| { + BusterError::CommandError(format!("Failed to restore credentials.yml to {}: {}", credentials_path.display(), e)) + })?; + pb.println(format!("Successfully restored: {}", credentials_path.display())); + } else { + pb.println(format!("No prior credentials.yml to restore for {}.", credentials_path.display())); + } pb.finish_with_message( - "Buster services stopped, volumes cleared, and images removed successfully.", + format!("Buster reset complete. Docker services stopped, images removed. Directory {} wiped (credentials.yml preserved if found). Run 'buster start' to rebuild.", app_base_dir.display()) ); Ok(()) } diff --git a/docker-compose.yml b/docker-compose.yml index 2954a83dc..59e766174 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,7 +16,7 @@ services: retries: 30 api: - image: ghcr.io/buster-so/buster/api:latest + image: ghcr.io/buster-so/buster/api:latest-arm64 container_name: buster-api env_file: - .env @@ -50,7 +50,7 @@ services: condition: service_healthy web: - image: ghcr.io/buster-so/buster/web:latest + image: ghcr.io/buster-so/buster/web:latest-arm64 container_name: buster-web env_file: - .env