diff --git a/api/src/utils/query_engine/credentials.rs b/api/src/utils/query_engine/credentials.rs index e665fcdc3..9262f0051 100644 --- a/api/src/utils/query_engine/credentials.rs +++ b/api/src/utils/query_engine/credentials.rs @@ -82,7 +82,6 @@ pub struct PostgresCredentials { pub ssh_private_key: Option, } -// Deprecated: REDSHIFT just uses postgres credentials #[derive(Serialize, Deserialize, Debug, Clone)] pub struct RedshiftCredentials { pub host: String, @@ -222,7 +221,16 @@ pub async fn get_data_source_credentials( if redact_secret { credential.password = "[REDACTED]".to_string(); } - Credential::Postgres(credential) + // Convert PostgresCredentials to RedshiftCredentials + let redshift_creds = RedshiftCredentials { + host: credential.host, + port: credential.port, + username: credential.username, + password: credential.password, + database: Some(credential.database), + schemas: credential.schema.map(|s| vec![s]), + }; + Credential::Redshift(redshift_creds) } Err(e) => { tracing::error!("Error deserializing Redshift secret: {:?}, raw secret: {}", e, secret_string); diff --git a/api/src/utils/query_engine/import_dataset_columns.rs b/api/src/utils/query_engine/import_dataset_columns.rs index 3722ec3f8..928a294c6 100644 --- a/api/src/utils/query_engine/import_dataset_columns.rs +++ b/api/src/utils/query_engine/import_dataset_columns.rs @@ -3,12 +3,13 @@ use crate::database::{lib::get_pg_pool, models::DatasetColumn, schema::dataset_c use super::{ credentials::{ BigqueryCredentials, Credential, MySqlCredentials, PostgresCredentials, - SnowflakeCredentials, + SnowflakeCredentials, RedshiftCredentials, }, data_source_connections::{ get_bigquery_client::get_bigquery_client, get_mysql_connection::get_mysql_connection, get_postgres_connection::get_postgres_connection, get_snowflake_client::get_snowflake_client, + get_redshift_connection::get_redshift_connection, }, }; use anyhow::{anyhow, Result}; @@ -191,6 +192,17 @@ pub async fn retrieve_dataset_columns( Err(e) => return Err(e), } } + Credential::Redshift(credentials) => { + match get_redshift_columns_batch( + &[(dataset_name.clone(), schema_name.clone())], + credentials, + ) + .await + { + Ok(cols) => cols, + Err(e) => return Err(e), + } + } _ => return Err(anyhow!("Unsupported data source type")), }; @@ -213,6 +225,9 @@ pub async fn retrieve_dataset_columns_batch( Credential::Snowflake(credentials) => { get_snowflake_columns_batch(datasets, credentials, database).await } + Credential::Redshift(credentials) => { + get_redshift_columns_batch(datasets, credentials).await + } _ => Err(anyhow!("Unsupported data source type")), } } @@ -458,6 +473,69 @@ async fn get_postgres_columns_batch( Ok(columns) } +async fn get_redshift_columns_batch( + datasets: &[(String, String)], + credentials: &RedshiftCredentials, +) -> Result> { + // Convert RedshiftCredentials to PostgresCredentials for the connection + let pg_credentials = PostgresCredentials { + host: credentials.host.clone(), + port: credentials.port, + username: credentials.username.clone(), + password: credentials.password.clone(), + database: credentials.database.clone().unwrap_or_default(), + schema: None, + jump_host: None, + ssh_username: None, + ssh_private_key: None, + }; + + let redshift_pool = match get_redshift_connection(&pg_credentials).await { + Ok(conn) => conn, + Err(e) => return Err(e), + }; + + // Build the IN clause for (schema, table) pairs + let table_pairs: Vec = datasets + .iter() + .map(|(table, schema)| format!("('{schema}', '{table}')")) + .collect(); + let table_pairs_str = table_pairs.join(", "); + + // Query for tables and views in Redshift + // Note: Redshift doesn't support pg_catalog.pg_statio_all_tables and pg_catalog.pg_description + // the same way PostgreSQL does, so we simplify the query + let sql = format!( + "SELECT + c.table_name as dataset_name, + c.table_schema as schema_name, + c.column_name as name, + c.data_type as type_, + CASE WHEN c.is_nullable = 'YES' THEN true ELSE false END as nullable, + NULL AS comment, + t.table_type as source_type + FROM + information_schema.columns c + JOIN + information_schema.tables t ON c.table_name = t.table_name AND c.table_schema = t.table_schema + WHERE + (c.table_schema, c.table_name) IN ({}) + AND t.table_type IN ('BASE TABLE', 'VIEW', 'MATERIALIZED VIEW') + ORDER BY + c.table_schema, + c.table_name, + c.ordinal_position;", + table_pairs_str + ); + + let columns = sqlx::query_as::<_, DatasetColumnRecord>(&sql) + .fetch_all(&redshift_pool) + .await + .map_err(|e| anyhow!("Error fetching columns from Redshift: {:?}", e))?; + + Ok(columns) +} + async fn get_mysql_columns_batch( datasets: &[(String, String)], credentials: &MySqlCredentials, @@ -648,6 +726,7 @@ async fn get_bigquery_columns_batch( Ok(columns) } + async fn get_snowflake_columns( dataset_name: &String, schema_name: &String, diff --git a/cli/buster_project.yml b/cli/buster_project.yml deleted file mode 100644 index a3d64b52f..000000000 --- a/cli/buster_project.yml +++ /dev/null @@ -1,19 +0,0 @@ -name: hello_blake -version: 1.0.0 -profile: hello_blake -model-paths: -- models -analysis-paths: -- analyses -test-paths: -- tests -seed-paths: -- seeds -macro-paths: -- macros -snapshot-paths: -- snapshots -clean-targets: -- target -- dbt_packages -models: {} diff --git a/cli/src/commands/init.rs b/cli/src/commands/init.rs index f759b13f9..7059cf193 100644 --- a/cli/src/commands/init.rs +++ b/cli/src/commands/init.rs @@ -185,8 +185,19 @@ async fn setup_redshift(buster_url: String, buster_api_key: String) -> Result<() println!("Port: {}", port.to_string().cyan()); println!("Username: {}", username.cyan()); println!("Password: {}", "********".cyan()); - println!("Database: {}", database.clone().unwrap_or_else(|| "All databases".to_string()).cyan()); - println!("Schema: {}", schema.clone().unwrap_or_else(|| "All schemas".to_string()).cyan()); + + // Display database and schema with clear indication if they're empty + if let Some(db) = &database { + println!("Database: {}", db.cyan()); + } else { + println!("Database: {}", "All databases (null)".cyan()); + } + + if let Some(sch) = &schema { + println!("Schema: {}", sch.cyan()); + } else { + println!("Schema: {}", "All schemas (null)".cyan()); + } let confirm = Confirm::new("Do you want to create this data source?") .with_default(true) @@ -208,6 +219,8 @@ async fn setup_redshift(buster_url: String, buster_api_key: String) -> Result<() }; // Create API request + // Note: PostgresCredentials requires String for database and schema, not Option + // We use empty strings to represent null/all databases or schemas let request = PostDataSourcesRequest { name: name.clone(), env: "dev".to_string(), // Default to dev environment