successfully init on redshift

This commit is contained in:
dal 2025-02-25 11:17:28 -07:00
parent 00f668852b
commit 0d45439173
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
7 changed files with 238 additions and 158 deletions

View File

@ -24,6 +24,7 @@ use crate::database::schema::users_to_organizations;
use crate::routes::rest::ApiResponse;
use crate::utils::clients::supabase_vault::create_secrets;
use crate::utils::query_engine::credentials::Credential;
use crate::utils::query_engine::credentials::PostgresCredentials;
#[derive(Debug, Deserialize)]
pub struct CreateDataSourceRequest {
@ -91,9 +92,41 @@ async fn post_data_sources_handler(
let secret_values = requests
.iter()
.map(|request| {
// Special handling for Redshift credentials
let credential = if let Credential::Redshift(redshift_creds) = &request.credential {
tracing::info!("Redshift credentials before conversion - database: {:?}, host: {}, port: {}",
redshift_creds.database, redshift_creds.host, redshift_creds.port);
// Convert RedshiftCredentials to PostgresCredentials
// This is necessary because Redshift uses PostgresCredentials internally
// but the database field is required in PostgresCredentials while optional in RedshiftCredentials
let postgres_creds = PostgresCredentials {
host: redshift_creds.host.clone(),
port: redshift_creds.port,
username: redshift_creds.username.clone(),
password: redshift_creds.password.clone(),
// Use the database value if provided, otherwise use "dev" as default
database: redshift_creds.database.clone().unwrap_or_else(|| "dev".to_string()),
schema: None,
jump_host: None,
ssh_username: None,
ssh_private_key: None,
};
Credential::Postgres(postgres_creds)
} else {
request.credential.clone()
};
let serialized = serde_json::to_string(&credential).unwrap();
if let Credential::Redshift(_) = &request.credential {
tracing::info!("Serialized Redshift credentials (converted to Postgres): {}", serialized);
}
(
request.name.clone(),
serde_json::to_string(&request.credential).unwrap(),
serialized,
)
})
.collect::<HashMap<String, String>>();

View File

@ -28,6 +28,13 @@ pub async fn create_secret(secret_value: &String) -> Result<Uuid> {
pub async fn create_secrets(
secret_values: &HashMap<String, String>,
) -> Result<HashMap<String, Uuid>> {
// Log the secret values for Redshift
for (name, value) in secret_values {
if value.contains("\"type\":\"redshift\"") {
tracing::info!("Creating secret for Redshift data source '{}': {}", name, value);
}
}
let secrets: Vec<(Uuid, &String)> = secret_values
.iter()
.map(|(_, value)| (Uuid::new_v4(), value))

View File

@ -2,6 +2,7 @@ use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use tracing;
use crate::{database::enums::DataSourceType, utils::clients::supabase_vault::read_secret};
@ -74,7 +75,6 @@ pub struct PostgresCredentials {
pub port: u16,
pub username: String,
pub password: String,
#[serde(alias = "dbname")]
pub database: String,
pub schema: Option<String>,
pub jump_host: Option<String>,
@ -89,7 +89,7 @@ pub struct RedshiftCredentials {
pub port: u16,
pub username: String,
pub password: String,
pub database: String,
pub database: Option<String>,
pub schemas: Option<Vec<String>>,
}
@ -216,12 +216,18 @@ pub async fn get_data_source_credentials(
DataSourceType::Redshift => {
match serde_json::from_str::<PostgresCredentials>(&secret_string) {
Ok(mut credential) => {
tracing::info!("Retrieved Redshift credentials from vault: database={:?}, host={}, port={}",
credential.database, credential.host, credential.port);
if redact_secret {
credential.password = "[REDACTED]".to_string();
}
Credential::Postgres(credential)
}
Err(e) => return Err(anyhow!("Error deserializing Redshift secret: {:?}", e)),
Err(e) => {
tracing::error!("Error deserializing Redshift secret: {:?}, raw secret: {}", e, secret_string);
return Err(anyhow!("Error deserializing Redshift secret: {:?}", e));
}
}
}
DataSourceType::Snowflake => {

View File

@ -114,7 +114,18 @@ async fn route_to_query(
results
}
DataSourceType::Redshift => {
let credentials: PostgresCredentials = serde_json::from_str(&credentials_string)?;
tracing::info!("Raw Redshift credentials string before deserialization: {}", credentials_string);
let credentials: PostgresCredentials = match serde_json::from_str(&credentials_string) {
Ok(creds) => creds,
Err(e) => {
tracing::error!("Error deserializing Redshift credentials: {:?}", e);
return Err(anyhow!("Error deserializing Redshift credentials: {:?}", e));
}
};
tracing::info!("Redshift query using credentials - database: {}, host: {}, port: {}",
credentials.database, credentials.host, credentials.port);
let redshift_client = get_redshift_connection(&credentials).await?;

View File

@ -1,6 +1,6 @@
[package]
name = "buster-cli"
version = "0.0.4"
version = "0.0.5"
edition = "2021"
build = "build.rs"

View File

@ -1,146 +1,191 @@
use anyhow::Result;
use inquire::MultiSelect;
use tokio::task::JoinSet;
use colored::*;
use inquire::{Select, Text, Password, validator::Validation};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::error::Error;
use crate::utils::{
buster_credentials::get_and_validate_buster_credentials,
command::{check_dbt_installation, dbt_command},
profiles::{get_dbt_profile_credentials, upload_dbt_profiles_to_buster},
project_files::{create_buster_from_dbt_project_yml, find_dbt_projects},
text::print_error,
profiles::{Credential, PostgresCredentials},
BusterClient, PostDataSourcesRequest,
};
use super::auth;
#[derive(Debug, Clone)]
enum DatabaseType {
Redshift,
Postgres,
BigQuery,
Snowflake,
}
impl std::fmt::Display for DatabaseType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DatabaseType::Redshift => write!(f, "Redshift"),
DatabaseType::Postgres => write!(f, "Postgres"),
DatabaseType::BigQuery => write!(f, "BigQuery"),
DatabaseType::Snowflake => write!(f, "Snowflake"),
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct RedshiftCredentials {
pub host: String,
pub port: u16,
pub username: String,
pub password: String,
pub database: Option<String>,
pub schemas: Option<Vec<String>>,
}
pub async fn init() -> Result<()> {
if let Err(e) = check_dbt_installation().await {
print_error("Error: Failed to check dbt installation");
return Err(anyhow::anyhow!("Failed to check dbt installation: {}", e));
}
println!("{}", "Initializing Buster...".bold().green());
// Get buster credentials
// Check for Buster credentials
println!("Checking for Buster credentials...");
let buster_creds = match get_and_validate_buster_credentials().await {
Ok(buster_creds) => Some(buster_creds),
Err(_) => {
print_error("No Buster credentials found. Beginning authentication flow...");
None
}
};
// If no buster credentials, go through auth flow.
let buster_creds = if let Some(buster_creds) = buster_creds {
buster_creds
} else {
match auth().await {
Ok(_) => match get_and_validate_buster_credentials().await {
Ok(buster_creds) => buster_creds,
Err(e) => {
print_error("Error: Authentication failed during credential validation");
return Err(anyhow::anyhow!("Failed to authenticate: {}", e));
}
},
Err(e) => {
print_error("Error: Authentication process failed");
return Err(anyhow::anyhow!("Failed to authenticate: {}", e));
}
}
};
// Check if dbt projects exist.
let mut dbt_projects = match find_dbt_projects().await {
Ok(projects) => projects,
Err(e) => {
print_error("Error: Failed to find dbt projects");
return Err(anyhow::anyhow!("Failed to find dbt projects: {}", e));
}
};
if !dbt_projects.is_empty() {
// If dbt projects exist, ask user which ones to use for Buster.
print_error("Found already existing dbt projects...");
let selected_dbt_projects = match MultiSelect::new(
"Please select the dbt projects you want to use for Buster (leave empty for all):",
dbt_projects.clone(),
)
.with_vim_mode(true)
.prompt()
{
Ok(projects) => projects,
Err(e) => {
print_error("Error: Failed to get user selection");
return Err(anyhow::anyhow!("Failed to get user selection: {}", e));
}
};
let mut dbt_project_set = JoinSet::new();
for project in selected_dbt_projects {
dbt_project_set.spawn(async move {
create_buster_from_dbt_project_yml(&format!("{}/dbt_project.yml", project)).await
});
}
while let Some(result) = dbt_project_set.join_next().await {
if let Err(e) = result {
print_error("Error: Failed to process dbt project");
return Err(anyhow::anyhow!("Failed to process dbt project: {}", e));
}
}
} else {
// If no dbt projects exist, create a new one.
print_error("No dbt projects found. Creating a new dbt project...");
if let Err(e) = dbt_command("init").await {
print_error("Error: Failed to initialize dbt project");
return Err(anyhow::anyhow!("Failed to initialize dbt project: {}", e));
}
dbt_projects = match find_dbt_projects().await {
Ok(projects) => projects,
Err(e) => {
print_error("Error: Failed to find newly created dbt project");
return Err(anyhow::anyhow!(
"Failed to find newly created dbt project: {}",
e
));
}
};
if let Err(e) =
create_buster_from_dbt_project_yml(&format!("{}/dbt_project.yml", dbt_projects[0]))
.await
{
print_error("Error: Failed to create Buster project from dbt project");
return Err(anyhow::anyhow!("Failed to create Buster project: {}", e));
}
}
println!(
"Uploading {} dbt profile(s) to Buster...",
dbt_projects.len()
);
// Get dbt profile credentials to upload to Buster
let dbt_profile_credentials = match get_dbt_profile_credentials(&dbt_projects).await {
Ok(creds) => creds,
Err(e) => {
print_error("Error: Failed to get dbt profile credentials");
return Err(anyhow::anyhow!(
"Failed to get dbt profile credentials: {}",
e
));
Err(_) => {
println!("{}", "No valid Buster credentials found.".red());
println!("Please run {} first.", "buster auth".cyan());
return Err(anyhow::anyhow!("No valid Buster credentials found"));
}
};
println!("{}", "✓ Buster credentials found".green());
// Upload the profiles to Buster
if let Err(e) = upload_dbt_profiles_to_buster(dbt_profile_credentials, buster_creds).await {
print_error("Error: Failed to upload profiles to Buster");
return Err(anyhow::anyhow!(
"Failed to upload profiles to Buster: {}",
e
));
// Select database type
let db_types = vec![
DatabaseType::Redshift,
DatabaseType::Postgres,
DatabaseType::BigQuery,
DatabaseType::Snowflake,
];
let db_type = Select::new(
"Select your database type:",
db_types,
)
.prompt()?;
println!("You selected: {}", db_type.to_string().cyan());
match db_type {
DatabaseType::Redshift => setup_redshift(buster_creds.url, buster_creds.api_key).await,
_ => {
println!("{}", format!("{} support is coming soon!", db_type).yellow());
Err(anyhow::anyhow!("Database type not yet implemented"))
}
}
}
async fn setup_redshift(buster_url: String, buster_api_key: String) -> Result<()> {
println!("{}", "Setting up Redshift connection...".bold());
// Collect name (with validation)
let name_regex = Regex::new(r"^[a-zA-Z0-9_-]+$")?;
let name = Text::new("Enter a unique name for this data source:")
.with_help_message("Only alphanumeric characters, dash (-) and underscore (_) allowed")
.with_validator(move |input: &str| {
if name_regex.is_match(input) {
Ok(Validation::Valid)
} else {
Ok(Validation::Invalid("Name must contain only alphanumeric characters, dash (-) or underscore (_)".into()))
}
})
.prompt()?;
// Collect host
let host = Text::new("Enter the Redshift host:")
.with_help_message("Example: my-cluster.abc123xyz789.us-west-2.redshift.amazonaws.com")
.prompt()?;
// Collect port (with validation)
let port_str = Text::new("Enter the Redshift port:")
.with_default("5439")
.with_help_message("Default Redshift port is 5439")
.with_validator(|input: &str| {
match input.parse::<u16>() {
Ok(_) => Ok(Validation::Valid),
Err(_) => Ok(Validation::Invalid("Port must be a valid number between 1 and 65535".into())),
}
})
.prompt()?;
let port = port_str.parse::<u16>()?;
// Collect username
let username = Text::new("Enter the Redshift username:")
.prompt()?;
// Collect password (masked)
let password = Password::new("Enter the Redshift password:")
.without_confirmation()
.prompt()?;
// Collect database (optional)
let database = Text::new("Enter the Redshift database (optional):")
.with_help_message("Leave blank to access all available databases")
.prompt()?;
let database = if database.trim().is_empty() {
None
} else {
Some(database)
};
// Collect schema (optional)
let schema = Text::new("Enter the Redshift schema (optional):")
.with_help_message("Leave blank to access all available schemas")
.prompt()?;
let schema = if schema.trim().is_empty() {
None
} else {
Some(schema)
};
// Create credentials
let redshift_creds = RedshiftCredentials {
host,
port,
username,
password,
database,
schemas: schema.map(|s| vec![s]),
};
// Create API request
let request = PostDataSourcesRequest {
name: name.clone(),
env: "dev".to_string(), // Default to dev environment
credential: Credential::Redshift(
PostgresCredentials {
host: redshift_creds.host,
port: redshift_creds.port,
username: redshift_creds.username,
password: redshift_creds.password,
database: redshift_creds.database.clone().unwrap_or_default(),
schema: redshift_creds.schemas.clone().and_then(|s| s.first().cloned()).unwrap_or_default(),
jump_host: None,
ssh_username: None,
ssh_private_key: None,
}
),
};
// Send to API
println!("Sending credentials to Buster API...");
let client = BusterClient::new(buster_url, buster_api_key)?;
match client.post_data_sources(vec![request]).await {
Ok(_) => {
println!("{}", "✓ Data source created successfully!".green().bold());
println!("Data source '{}' is now available for use with Buster.", name.cyan());
Ok(())
},
Err(e) => {
println!("{}", "✗ Failed to create data source".red().bold());
println!("Error: {}", e);
Err(anyhow::anyhow!("Failed to create data source: {}", e))
}
}
// TODO: Get back the ids and store in artifacts.
Ok(())
}

View File

@ -40,11 +40,9 @@ impl BusterClient {
}
pub async fn validate_api_key(&self) -> Result<bool> {
println!("Debug: Starting API key validation");
let request = ValidateApiKeyRequest {
api_key: self.api_key.clone(),
};
println!("Debug: Created request object");
let mut headers = HeaderMap::new();
headers.insert(
@ -59,43 +57,25 @@ impl BusterClient {
reqwest::header::USER_AGENT,
HeaderValue::from_static("buster-cli"),
);
println!("Debug: Set up headers: {:?}", headers);
let url = format!("{}/api/v1/api_keys/validate", self.base_url);
println!("Debug: Making request to URL: {}", url);
let request = self
.client
.post(&url)
.headers(headers)
.json(&request);
println!("Debug: Built request: {:?}", request);
let response = match request.send().await {
Ok(resp) => resp,
Err(e) => {
println!("Debug: Request failed with error: {:?}", e);
if let Some(source) = e.source() {
println!("Debug: Error source: {:?}", source);
}
if e.is_timeout() {
println!("Debug: Error was a timeout");
}
if e.is_connect() {
println!("Debug: Error was a connection error");
}
if e.is_request() {
println!("Debug: Error was a request error");
}
return Err(anyhow::anyhow!("Request failed: {}", e));
}
};
println!("Debug: Got response: {:?}", response);
if !response.status().is_success() {
let status = response.status();
let text = response.text().await?;
println!("Debug: Error response - Status: {}, Body: {}", status, text);
return Err(anyhow::anyhow!(
"Failed to validate API key. Status: {}, Response: {}",
status,
@ -105,11 +85,9 @@ impl BusterClient {
match response.json::<ValidateApiKeyResponse>().await {
Ok(validate_response) => {
println!("Debug: Successfully parsed response: {:?}", validate_response);
Ok(validate_response.valid)
}
Err(e) => {
println!("Debug: Failed to parse response: {:?}", e);
Err(anyhow::anyhow!(
"Failed to parse validate API key response: {}",
e