From 0d4543917310ea58994c8dc679a65093256256a7 Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 25 Feb 2025 11:17:28 -0700 Subject: [PATCH] successfully init on redshift --- .../routes/data_sources/post_data_sources.rs | 35 +- api/src/utils/clients/supabase_vault.rs | 7 + api/src/utils/query_engine/credentials.rs | 12 +- .../data_source_query_routes/query_router.rs | 13 +- cli/Cargo.toml | 2 +- cli/src/commands/init.rs | 305 ++++++++++-------- cli/src/utils/buster/api.rs | 22 -- 7 files changed, 238 insertions(+), 158 deletions(-) diff --git a/api/src/routes/rest/routes/data_sources/post_data_sources.rs b/api/src/routes/rest/routes/data_sources/post_data_sources.rs index 8cc4c2753..091430adf 100644 --- a/api/src/routes/rest/routes/data_sources/post_data_sources.rs +++ b/api/src/routes/rest/routes/data_sources/post_data_sources.rs @@ -24,6 +24,7 @@ use crate::database::schema::users_to_organizations; use crate::routes::rest::ApiResponse; use crate::utils::clients::supabase_vault::create_secrets; use crate::utils::query_engine::credentials::Credential; +use crate::utils::query_engine::credentials::PostgresCredentials; #[derive(Debug, Deserialize)] pub struct CreateDataSourceRequest { @@ -91,9 +92,41 @@ async fn post_data_sources_handler( let secret_values = requests .iter() .map(|request| { + // Special handling for Redshift credentials + let credential = if let Credential::Redshift(redshift_creds) = &request.credential { + tracing::info!("Redshift credentials before conversion - database: {:?}, host: {}, port: {}", + redshift_creds.database, redshift_creds.host, redshift_creds.port); + + // Convert RedshiftCredentials to PostgresCredentials + // This is necessary because Redshift uses PostgresCredentials internally + // but the database field is required in PostgresCredentials while optional in RedshiftCredentials + let postgres_creds = PostgresCredentials { + host: redshift_creds.host.clone(), + port: redshift_creds.port, + username: redshift_creds.username.clone(), + password: redshift_creds.password.clone(), + // Use the database value if provided, otherwise use "dev" as default + database: redshift_creds.database.clone().unwrap_or_else(|| "dev".to_string()), + schema: None, + jump_host: None, + ssh_username: None, + ssh_private_key: None, + }; + + Credential::Postgres(postgres_creds) + } else { + request.credential.clone() + }; + + let serialized = serde_json::to_string(&credential).unwrap(); + + if let Credential::Redshift(_) = &request.credential { + tracing::info!("Serialized Redshift credentials (converted to Postgres): {}", serialized); + } + ( request.name.clone(), - serde_json::to_string(&request.credential).unwrap(), + serialized, ) }) .collect::>(); diff --git a/api/src/utils/clients/supabase_vault.rs b/api/src/utils/clients/supabase_vault.rs index a4bd89a47..cb2758e1d 100644 --- a/api/src/utils/clients/supabase_vault.rs +++ b/api/src/utils/clients/supabase_vault.rs @@ -28,6 +28,13 @@ pub async fn create_secret(secret_value: &String) -> Result { pub async fn create_secrets( secret_values: &HashMap, ) -> Result> { + // Log the secret values for Redshift + for (name, value) in secret_values { + if value.contains("\"type\":\"redshift\"") { + tracing::info!("Creating secret for Redshift data source '{}': {}", name, value); + } + } + let secrets: Vec<(Uuid, &String)> = secret_values .iter() .map(|(_, value)| (Uuid::new_v4(), value)) diff --git a/api/src/utils/query_engine/credentials.rs b/api/src/utils/query_engine/credentials.rs index 7c739d774..e665fcdc3 100644 --- a/api/src/utils/query_engine/credentials.rs +++ b/api/src/utils/query_engine/credentials.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; +use tracing; use crate::{database::enums::DataSourceType, utils::clients::supabase_vault::read_secret}; @@ -74,7 +75,6 @@ pub struct PostgresCredentials { pub port: u16, pub username: String, pub password: String, - #[serde(alias = "dbname")] pub database: String, pub schema: Option, pub jump_host: Option, @@ -89,7 +89,7 @@ pub struct RedshiftCredentials { pub port: u16, pub username: String, pub password: String, - pub database: String, + pub database: Option, pub schemas: Option>, } @@ -216,12 +216,18 @@ pub async fn get_data_source_credentials( DataSourceType::Redshift => { match serde_json::from_str::(&secret_string) { Ok(mut credential) => { + tracing::info!("Retrieved Redshift credentials from vault: database={:?}, host={}, port={}", + credential.database, credential.host, credential.port); + if redact_secret { credential.password = "[REDACTED]".to_string(); } Credential::Postgres(credential) } - Err(e) => return Err(anyhow!("Error deserializing Redshift secret: {:?}", e)), + Err(e) => { + tracing::error!("Error deserializing Redshift secret: {:?}, raw secret: {}", e, secret_string); + return Err(anyhow!("Error deserializing Redshift secret: {:?}", e)); + } } } DataSourceType::Snowflake => { diff --git a/api/src/utils/query_engine/data_source_query_routes/query_router.rs b/api/src/utils/query_engine/data_source_query_routes/query_router.rs index 4aa1377c7..b55c796f3 100644 --- a/api/src/utils/query_engine/data_source_query_routes/query_router.rs +++ b/api/src/utils/query_engine/data_source_query_routes/query_router.rs @@ -114,7 +114,18 @@ async fn route_to_query( results } DataSourceType::Redshift => { - let credentials: PostgresCredentials = serde_json::from_str(&credentials_string)?; + tracing::info!("Raw Redshift credentials string before deserialization: {}", credentials_string); + + let credentials: PostgresCredentials = match serde_json::from_str(&credentials_string) { + Ok(creds) => creds, + Err(e) => { + tracing::error!("Error deserializing Redshift credentials: {:?}", e); + return Err(anyhow!("Error deserializing Redshift credentials: {:?}", e)); + } + }; + + tracing::info!("Redshift query using credentials - database: {}, host: {}, port: {}", + credentials.database, credentials.host, credentials.port); let redshift_client = get_redshift_connection(&credentials).await?; diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 4f603c593..41d2719de 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "buster-cli" -version = "0.0.4" +version = "0.0.5" edition = "2021" build = "build.rs" diff --git a/cli/src/commands/init.rs b/cli/src/commands/init.rs index c9b7ff2f4..1975cd402 100644 --- a/cli/src/commands/init.rs +++ b/cli/src/commands/init.rs @@ -1,146 +1,191 @@ use anyhow::Result; -use inquire::MultiSelect; -use tokio::task::JoinSet; +use colored::*; +use inquire::{Select, Text, Password, validator::Validation}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use std::error::Error; use crate::utils::{ buster_credentials::get_and_validate_buster_credentials, - command::{check_dbt_installation, dbt_command}, - profiles::{get_dbt_profile_credentials, upload_dbt_profiles_to_buster}, - project_files::{create_buster_from_dbt_project_yml, find_dbt_projects}, - text::print_error, + profiles::{Credential, PostgresCredentials}, + BusterClient, PostDataSourcesRequest, }; -use super::auth; +#[derive(Debug, Clone)] +enum DatabaseType { + Redshift, + Postgres, + BigQuery, + Snowflake, +} + +impl std::fmt::Display for DatabaseType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DatabaseType::Redshift => write!(f, "Redshift"), + DatabaseType::Postgres => write!(f, "Postgres"), + DatabaseType::BigQuery => write!(f, "BigQuery"), + DatabaseType::Snowflake => write!(f, "Snowflake"), + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +struct RedshiftCredentials { + pub host: String, + pub port: u16, + pub username: String, + pub password: String, + pub database: Option, + pub schemas: Option>, +} pub async fn init() -> Result<()> { - if let Err(e) = check_dbt_installation().await { - print_error("Error: Failed to check dbt installation"); - return Err(anyhow::anyhow!("Failed to check dbt installation: {}", e)); - } + println!("{}", "Initializing Buster...".bold().green()); - // Get buster credentials + // Check for Buster credentials + println!("Checking for Buster credentials..."); let buster_creds = match get_and_validate_buster_credentials().await { - Ok(buster_creds) => Some(buster_creds), - Err(_) => { - print_error("No Buster credentials found. Beginning authentication flow..."); - None - } - }; - - // If no buster credentials, go through auth flow. - let buster_creds = if let Some(buster_creds) = buster_creds { - buster_creds - } else { - match auth().await { - Ok(_) => match get_and_validate_buster_credentials().await { - Ok(buster_creds) => buster_creds, - Err(e) => { - print_error("Error: Authentication failed during credential validation"); - return Err(anyhow::anyhow!("Failed to authenticate: {}", e)); - } - }, - Err(e) => { - print_error("Error: Authentication process failed"); - return Err(anyhow::anyhow!("Failed to authenticate: {}", e)); - } - } - }; - - // Check if dbt projects exist. - let mut dbt_projects = match find_dbt_projects().await { - Ok(projects) => projects, - Err(e) => { - print_error("Error: Failed to find dbt projects"); - return Err(anyhow::anyhow!("Failed to find dbt projects: {}", e)); - } - }; - - if !dbt_projects.is_empty() { - // If dbt projects exist, ask user which ones to use for Buster. - print_error("Found already existing dbt projects..."); - let selected_dbt_projects = match MultiSelect::new( - "Please select the dbt projects you want to use for Buster (leave empty for all):", - dbt_projects.clone(), - ) - .with_vim_mode(true) - .prompt() - { - Ok(projects) => projects, - Err(e) => { - print_error("Error: Failed to get user selection"); - return Err(anyhow::anyhow!("Failed to get user selection: {}", e)); - } - }; - - let mut dbt_project_set = JoinSet::new(); - - for project in selected_dbt_projects { - dbt_project_set.spawn(async move { - create_buster_from_dbt_project_yml(&format!("{}/dbt_project.yml", project)).await - }); - } - - while let Some(result) = dbt_project_set.join_next().await { - if let Err(e) = result { - print_error("Error: Failed to process dbt project"); - return Err(anyhow::anyhow!("Failed to process dbt project: {}", e)); - } - } - } else { - // If no dbt projects exist, create a new one. - print_error("No dbt projects found. Creating a new dbt project..."); - if let Err(e) = dbt_command("init").await { - print_error("Error: Failed to initialize dbt project"); - return Err(anyhow::anyhow!("Failed to initialize dbt project: {}", e)); - } - - dbt_projects = match find_dbt_projects().await { - Ok(projects) => projects, - Err(e) => { - print_error("Error: Failed to find newly created dbt project"); - return Err(anyhow::anyhow!( - "Failed to find newly created dbt project: {}", - e - )); - } - }; - - if let Err(e) = - create_buster_from_dbt_project_yml(&format!("{}/dbt_project.yml", dbt_projects[0])) - .await - { - print_error("Error: Failed to create Buster project from dbt project"); - return Err(anyhow::anyhow!("Failed to create Buster project: {}", e)); - } - } - - println!( - "Uploading {} dbt profile(s) to Buster...", - dbt_projects.len() - ); - - // Get dbt profile credentials to upload to Buster - let dbt_profile_credentials = match get_dbt_profile_credentials(&dbt_projects).await { Ok(creds) => creds, - Err(e) => { - print_error("Error: Failed to get dbt profile credentials"); - return Err(anyhow::anyhow!( - "Failed to get dbt profile credentials: {}", - e - )); + Err(_) => { + println!("{}", "No valid Buster credentials found.".red()); + println!("Please run {} first.", "buster auth".cyan()); + return Err(anyhow::anyhow!("No valid Buster credentials found")); } }; + println!("{}", "✓ Buster credentials found".green()); - // Upload the profiles to Buster - if let Err(e) = upload_dbt_profiles_to_buster(dbt_profile_credentials, buster_creds).await { - print_error("Error: Failed to upload profiles to Buster"); - return Err(anyhow::anyhow!( - "Failed to upload profiles to Buster: {}", - e - )); + // Select database type + let db_types = vec![ + DatabaseType::Redshift, + DatabaseType::Postgres, + DatabaseType::BigQuery, + DatabaseType::Snowflake, + ]; + + let db_type = Select::new( + "Select your database type:", + db_types, + ) + .prompt()?; + + println!("You selected: {}", db_type.to_string().cyan()); + + match db_type { + DatabaseType::Redshift => setup_redshift(buster_creds.url, buster_creds.api_key).await, + _ => { + println!("{}", format!("{} support is coming soon!", db_type).yellow()); + Err(anyhow::anyhow!("Database type not yet implemented")) + } + } +} + +async fn setup_redshift(buster_url: String, buster_api_key: String) -> Result<()> { + println!("{}", "Setting up Redshift connection...".bold()); + + // Collect name (with validation) + let name_regex = Regex::new(r"^[a-zA-Z0-9_-]+$")?; + let name = Text::new("Enter a unique name for this data source:") + .with_help_message("Only alphanumeric characters, dash (-) and underscore (_) allowed") + .with_validator(move |input: &str| { + if name_regex.is_match(input) { + Ok(Validation::Valid) + } else { + Ok(Validation::Invalid("Name must contain only alphanumeric characters, dash (-) or underscore (_)".into())) + } + }) + .prompt()?; + + // Collect host + let host = Text::new("Enter the Redshift host:") + .with_help_message("Example: my-cluster.abc123xyz789.us-west-2.redshift.amazonaws.com") + .prompt()?; + + // Collect port (with validation) + let port_str = Text::new("Enter the Redshift port:") + .with_default("5439") + .with_help_message("Default Redshift port is 5439") + .with_validator(|input: &str| { + match input.parse::() { + Ok(_) => Ok(Validation::Valid), + Err(_) => Ok(Validation::Invalid("Port must be a valid number between 1 and 65535".into())), + } + }) + .prompt()?; + let port = port_str.parse::()?; + + // Collect username + let username = Text::new("Enter the Redshift username:") + .prompt()?; + + // Collect password (masked) + let password = Password::new("Enter the Redshift password:") + .without_confirmation() + .prompt()?; + + // Collect database (optional) + let database = Text::new("Enter the Redshift database (optional):") + .with_help_message("Leave blank to access all available databases") + .prompt()?; + let database = if database.trim().is_empty() { + None + } else { + Some(database) + }; + + // Collect schema (optional) + let schema = Text::new("Enter the Redshift schema (optional):") + .with_help_message("Leave blank to access all available schemas") + .prompt()?; + let schema = if schema.trim().is_empty() { + None + } else { + Some(schema) + }; + + // Create credentials + let redshift_creds = RedshiftCredentials { + host, + port, + username, + password, + database, + schemas: schema.map(|s| vec![s]), + }; + + // Create API request + let request = PostDataSourcesRequest { + name: name.clone(), + env: "dev".to_string(), // Default to dev environment + credential: Credential::Redshift( + PostgresCredentials { + host: redshift_creds.host, + port: redshift_creds.port, + username: redshift_creds.username, + password: redshift_creds.password, + database: redshift_creds.database.clone().unwrap_or_default(), + schema: redshift_creds.schemas.clone().and_then(|s| s.first().cloned()).unwrap_or_default(), + jump_host: None, + ssh_username: None, + ssh_private_key: None, + } + ), + }; + + // Send to API + println!("Sending credentials to Buster API..."); + let client = BusterClient::new(buster_url, buster_api_key)?; + + match client.post_data_sources(vec![request]).await { + Ok(_) => { + println!("{}", "✓ Data source created successfully!".green().bold()); + println!("Data source '{}' is now available for use with Buster.", name.cyan()); + Ok(()) + }, + Err(e) => { + println!("{}", "✗ Failed to create data source".red().bold()); + println!("Error: {}", e); + Err(anyhow::anyhow!("Failed to create data source: {}", e)) + } } - - // TODO: Get back the ids and store in artifacts. - - Ok(()) } diff --git a/cli/src/utils/buster/api.rs b/cli/src/utils/buster/api.rs index e4941aaa2..850e18ead 100644 --- a/cli/src/utils/buster/api.rs +++ b/cli/src/utils/buster/api.rs @@ -40,11 +40,9 @@ impl BusterClient { } pub async fn validate_api_key(&self) -> Result { - println!("Debug: Starting API key validation"); let request = ValidateApiKeyRequest { api_key: self.api_key.clone(), }; - println!("Debug: Created request object"); let mut headers = HeaderMap::new(); headers.insert( @@ -59,43 +57,25 @@ impl BusterClient { reqwest::header::USER_AGENT, HeaderValue::from_static("buster-cli"), ); - println!("Debug: Set up headers: {:?}", headers); let url = format!("{}/api/v1/api_keys/validate", self.base_url); - println!("Debug: Making request to URL: {}", url); let request = self .client .post(&url) .headers(headers) .json(&request); - println!("Debug: Built request: {:?}", request); let response = match request.send().await { Ok(resp) => resp, Err(e) => { - println!("Debug: Request failed with error: {:?}", e); - if let Some(source) = e.source() { - println!("Debug: Error source: {:?}", source); - } - if e.is_timeout() { - println!("Debug: Error was a timeout"); - } - if e.is_connect() { - println!("Debug: Error was a connection error"); - } - if e.is_request() { - println!("Debug: Error was a request error"); - } return Err(anyhow::anyhow!("Request failed: {}", e)); } }; - println!("Debug: Got response: {:?}", response); if !response.status().is_success() { let status = response.status(); let text = response.text().await?; - println!("Debug: Error response - Status: {}, Body: {}", status, text); return Err(anyhow::anyhow!( "Failed to validate API key. Status: {}, Response: {}", status, @@ -105,11 +85,9 @@ impl BusterClient { match response.json::().await { Ok(validate_response) => { - println!("Debug: Successfully parsed response: {:?}", validate_response); Ok(validate_response.valid) } Err(e) => { - println!("Debug: Failed to parse response: {:?}", e); Err(anyhow::anyhow!( "Failed to parse validate API key response: {}", e