mirror of https://github.com/buster-so/buster.git
Add Redshift support for credentials and column retrieval
This commit is contained in:
parent
3d77316fe6
commit
c1f7c0d95c
|
@ -82,7 +82,6 @@ pub struct PostgresCredentials {
|
|||
pub ssh_private_key: Option<String>,
|
||||
}
|
||||
|
||||
// Deprecated: REDSHIFT just uses postgres credentials
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct RedshiftCredentials {
|
||||
pub host: String,
|
||||
|
@ -222,7 +221,16 @@ pub async fn get_data_source_credentials(
|
|||
if redact_secret {
|
||||
credential.password = "[REDACTED]".to_string();
|
||||
}
|
||||
Credential::Postgres(credential)
|
||||
// Convert PostgresCredentials to RedshiftCredentials
|
||||
let redshift_creds = RedshiftCredentials {
|
||||
host: credential.host,
|
||||
port: credential.port,
|
||||
username: credential.username,
|
||||
password: credential.password,
|
||||
database: Some(credential.database),
|
||||
schemas: credential.schema.map(|s| vec![s]),
|
||||
};
|
||||
Credential::Redshift(redshift_creds)
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error deserializing Redshift secret: {:?}, raw secret: {}", e, secret_string);
|
||||
|
|
|
@ -3,12 +3,13 @@ use crate::database::{lib::get_pg_pool, models::DatasetColumn, schema::dataset_c
|
|||
use super::{
|
||||
credentials::{
|
||||
BigqueryCredentials, Credential, MySqlCredentials, PostgresCredentials,
|
||||
SnowflakeCredentials,
|
||||
SnowflakeCredentials, RedshiftCredentials,
|
||||
},
|
||||
data_source_connections::{
|
||||
get_bigquery_client::get_bigquery_client, get_mysql_connection::get_mysql_connection,
|
||||
get_postgres_connection::get_postgres_connection,
|
||||
get_snowflake_client::get_snowflake_client,
|
||||
get_redshift_connection::get_redshift_connection,
|
||||
},
|
||||
};
|
||||
use anyhow::{anyhow, Result};
|
||||
|
@ -191,6 +192,17 @@ pub async fn retrieve_dataset_columns(
|
|||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Credential::Redshift(credentials) => {
|
||||
match get_redshift_columns_batch(
|
||||
&[(dataset_name.clone(), schema_name.clone())],
|
||||
credentials,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(cols) => cols,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
_ => return Err(anyhow!("Unsupported data source type")),
|
||||
};
|
||||
|
||||
|
@ -213,6 +225,9 @@ pub async fn retrieve_dataset_columns_batch(
|
|||
Credential::Snowflake(credentials) => {
|
||||
get_snowflake_columns_batch(datasets, credentials, database).await
|
||||
}
|
||||
Credential::Redshift(credentials) => {
|
||||
get_redshift_columns_batch(datasets, credentials).await
|
||||
}
|
||||
_ => Err(anyhow!("Unsupported data source type")),
|
||||
}
|
||||
}
|
||||
|
@ -458,6 +473,69 @@ async fn get_postgres_columns_batch(
|
|||
Ok(columns)
|
||||
}
|
||||
|
||||
async fn get_redshift_columns_batch(
|
||||
datasets: &[(String, String)],
|
||||
credentials: &RedshiftCredentials,
|
||||
) -> Result<Vec<DatasetColumnRecord>> {
|
||||
// Convert RedshiftCredentials to PostgresCredentials for the connection
|
||||
let pg_credentials = PostgresCredentials {
|
||||
host: credentials.host.clone(),
|
||||
port: credentials.port,
|
||||
username: credentials.username.clone(),
|
||||
password: credentials.password.clone(),
|
||||
database: credentials.database.clone().unwrap_or_default(),
|
||||
schema: None,
|
||||
jump_host: None,
|
||||
ssh_username: None,
|
||||
ssh_private_key: None,
|
||||
};
|
||||
|
||||
let redshift_pool = match get_redshift_connection(&pg_credentials).await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
// Build the IN clause for (schema, table) pairs
|
||||
let table_pairs: Vec<String> = datasets
|
||||
.iter()
|
||||
.map(|(table, schema)| format!("('{schema}', '{table}')"))
|
||||
.collect();
|
||||
let table_pairs_str = table_pairs.join(", ");
|
||||
|
||||
// Query for tables and views in Redshift
|
||||
// Note: Redshift doesn't support pg_catalog.pg_statio_all_tables and pg_catalog.pg_description
|
||||
// the same way PostgreSQL does, so we simplify the query
|
||||
let sql = format!(
|
||||
"SELECT
|
||||
c.table_name as dataset_name,
|
||||
c.table_schema as schema_name,
|
||||
c.column_name as name,
|
||||
c.data_type as type_,
|
||||
CASE WHEN c.is_nullable = 'YES' THEN true ELSE false END as nullable,
|
||||
NULL AS comment,
|
||||
t.table_type as source_type
|
||||
FROM
|
||||
information_schema.columns c
|
||||
JOIN
|
||||
information_schema.tables t ON c.table_name = t.table_name AND c.table_schema = t.table_schema
|
||||
WHERE
|
||||
(c.table_schema, c.table_name) IN ({})
|
||||
AND t.table_type IN ('BASE TABLE', 'VIEW', 'MATERIALIZED VIEW')
|
||||
ORDER BY
|
||||
c.table_schema,
|
||||
c.table_name,
|
||||
c.ordinal_position;",
|
||||
table_pairs_str
|
||||
);
|
||||
|
||||
let columns = sqlx::query_as::<_, DatasetColumnRecord>(&sql)
|
||||
.fetch_all(&redshift_pool)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Error fetching columns from Redshift: {:?}", e))?;
|
||||
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
async fn get_mysql_columns_batch(
|
||||
datasets: &[(String, String)],
|
||||
credentials: &MySqlCredentials,
|
||||
|
@ -648,6 +726,7 @@ async fn get_bigquery_columns_batch(
|
|||
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
async fn get_snowflake_columns(
|
||||
dataset_name: &String,
|
||||
schema_name: &String,
|
||||
|
|
|
@ -1,19 +0,0 @@
|
|||
name: hello_blake
|
||||
version: 1.0.0
|
||||
profile: hello_blake
|
||||
model-paths:
|
||||
- models
|
||||
analysis-paths:
|
||||
- analyses
|
||||
test-paths:
|
||||
- tests
|
||||
seed-paths:
|
||||
- seeds
|
||||
macro-paths:
|
||||
- macros
|
||||
snapshot-paths:
|
||||
- snapshots
|
||||
clean-targets:
|
||||
- target
|
||||
- dbt_packages
|
||||
models: {}
|
|
@ -185,8 +185,19 @@ async fn setup_redshift(buster_url: String, buster_api_key: String) -> Result<()
|
|||
println!("Port: {}", port.to_string().cyan());
|
||||
println!("Username: {}", username.cyan());
|
||||
println!("Password: {}", "********".cyan());
|
||||
println!("Database: {}", database.clone().unwrap_or_else(|| "All databases".to_string()).cyan());
|
||||
println!("Schema: {}", schema.clone().unwrap_or_else(|| "All schemas".to_string()).cyan());
|
||||
|
||||
// Display database and schema with clear indication if they're empty
|
||||
if let Some(db) = &database {
|
||||
println!("Database: {}", db.cyan());
|
||||
} else {
|
||||
println!("Database: {}", "All databases (null)".cyan());
|
||||
}
|
||||
|
||||
if let Some(sch) = &schema {
|
||||
println!("Schema: {}", sch.cyan());
|
||||
} else {
|
||||
println!("Schema: {}", "All schemas (null)".cyan());
|
||||
}
|
||||
|
||||
let confirm = Confirm::new("Do you want to create this data source?")
|
||||
.with_default(true)
|
||||
|
@ -208,6 +219,8 @@ async fn setup_redshift(buster_url: String, buster_api_key: String) -> Result<()
|
|||
};
|
||||
|
||||
// Create API request
|
||||
// Note: PostgresCredentials requires String for database and schema, not Option<String>
|
||||
// We use empty strings to represent null/all databases or schemas
|
||||
let request = PostDataSourcesRequest {
|
||||
name: name.clone(),
|
||||
env: "dev".to_string(), // Default to dev environment
|
||||
|
|
Loading…
Reference in New Issue