mirror of https://github.com/buster-so/buster.git
Refactor buster-cli: Remove init and credentials modules, introduce buster_credentials module, and enhance API client. Added ratatui dependency for improved UI. Updated credential handling and API key validation logic. Implemented new data source posting functionality.
This commit is contained in:
parent
785fe6f237
commit
6194f61356
|
@ -12,6 +12,7 @@ confy = "0.6.0"
|
|||
dirs = "5.0.1"
|
||||
indicatif = "0.17.8"
|
||||
inquire = "0.7.5"
|
||||
ratatui = "0.29.0"
|
||||
reqwest = { version = "0.12.9", features = ["json"] }
|
||||
rpassword = "7.3.1"
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
name: hello_blake
|
||||
version: 1.0.0
|
||||
profile: hello_blake
|
||||
model-paths:
|
||||
- models
|
||||
analysis-paths:
|
||||
- analyses
|
||||
test-paths:
|
||||
- tests
|
||||
seed-paths:
|
||||
- seeds
|
||||
macro-paths:
|
||||
- macros
|
||||
snapshot-paths:
|
||||
- snapshots
|
||||
clean-targets:
|
||||
- target
|
||||
- dbt_packages
|
||||
models: {}
|
|
@ -2,7 +2,7 @@ use anyhow::Result;
|
|||
use inquire::{Password, Text};
|
||||
|
||||
use crate::utils::{
|
||||
credentials::{get_buster_credentials, set_buster_credentials, BusterCredentials},
|
||||
buster_credentials::{get_buster_credentials, set_buster_credentials, BusterCredentials},
|
||||
BusterClient,
|
||||
};
|
||||
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
use anyhow::Result;
|
||||
use inquire::{MultiSelect, Select};
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
use crate::utils::{
|
||||
command::{check_dbt_installation, dbt_command},
|
||||
credentials::get_and_validate_buster_credentials,
|
||||
project_files::{create_buster_from_dbt_project_yml, find_dbt_projects},
|
||||
};
|
||||
|
||||
use super::auth;
|
||||
|
||||
/// Check to make sure that the appropriate credentials are in.
|
||||
/// Check to see if an existing dbt project exists.
|
||||
/// - If it does, ask if they want to use it for Buster
|
||||
/// - If yes:
|
||||
/// -
|
||||
/// - If no, as if no dbt exists
|
||||
/// - If not, create a new example project
|
||||
|
||||
pub async fn init() -> Result<()> {
|
||||
check_dbt_installation().await?;
|
||||
|
||||
// Get buster credentials
|
||||
let buster_creds = match get_and_validate_buster_credentials().await {
|
||||
Ok(buster_creds) => Some(buster_creds),
|
||||
Err(_) => {
|
||||
println!("No Buster credentials found. Beginning authentication flow...");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// If no buster credentials, go through auth flow.
|
||||
let buster_creds = if let Some(buster_creds) = buster_creds {
|
||||
buster_creds
|
||||
} else {
|
||||
match auth().await {
|
||||
Ok(_) => match get_and_validate_buster_credentials().await {
|
||||
Ok(buster_creds) => buster_creds,
|
||||
Err(e) => anyhow::bail!("Failed to authenticate: {}", e),
|
||||
},
|
||||
Err(e) => anyhow::bail!("Failed to authenticate: {}", e),
|
||||
}
|
||||
};
|
||||
|
||||
// Check if dbt projects exist.
|
||||
let dbt_projects = find_dbt_projects().await?;
|
||||
|
||||
if !dbt_projects.is_empty() {
|
||||
// If dbt projects exist, ask user which ones to use for Buster.
|
||||
println!("Found already existing dbt projects...");
|
||||
let selected_dbt_projects = MultiSelect::new(
|
||||
"Please select the dbt projects you want to use for Buster (leave empty for all):",
|
||||
dbt_projects,
|
||||
)
|
||||
.with_vim_mode(true)
|
||||
.prompt()?;
|
||||
|
||||
let mut dbt_project_set = JoinSet::new();
|
||||
|
||||
for project in selected_dbt_projects {
|
||||
dbt_project_set
|
||||
.spawn(async move { create_buster_from_dbt_project_yml(&project).await });
|
||||
}
|
||||
|
||||
while let Some(result) = dbt_project_set.join_next().await {
|
||||
result??;
|
||||
}
|
||||
} else {
|
||||
// If no dbt projects exist, create a new one.
|
||||
println!("No dbt projects found. Creating a new dbt project...");
|
||||
dbt_command("init").await?;
|
||||
|
||||
let dbt_projects = find_dbt_projects().await?;
|
||||
|
||||
create_buster_from_dbt_project_yml(&dbt_projects[0]).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -5,7 +5,10 @@ use reqwest::{
|
|||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::BusterError;
|
||||
use crate::{
|
||||
error::BusterError,
|
||||
utils::profiles::{Credential, Profile},
|
||||
};
|
||||
|
||||
pub struct BusterClient {
|
||||
client: Client,
|
||||
|
@ -23,6 +26,14 @@ pub struct ValidateApiKeyRequest {
|
|||
pub api_key: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct PostDataSourcesRequest {
|
||||
pub name: String,
|
||||
pub env: String,
|
||||
#[serde(flatten)]
|
||||
pub credential: Credential,
|
||||
}
|
||||
|
||||
// Add this near other error-related code
|
||||
impl From<anyhow::Error> for BusterError {
|
||||
fn from(error: anyhow::Error) -> Self {
|
||||
|
@ -62,7 +73,7 @@ impl BusterClient {
|
|||
.send()
|
||||
.await?;
|
||||
|
||||
if response.status().is_client_error() {
|
||||
if !response.status().is_success() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to validate API key. This could be due to an invalid URL"
|
||||
));
|
||||
|
@ -76,4 +87,28 @@ impl BusterClient {
|
|||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn post_data_sources(&self, req_body: Vec<PostDataSourcesRequest>) -> Result<()> {
|
||||
let headers = self.build_headers()?;
|
||||
|
||||
match self
|
||||
.client
|
||||
.post(format!("{}/api/v1/data_sources", self.base_url))
|
||||
.headers(headers)
|
||||
.json(&req_body)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(res) => {
|
||||
if !res.status().is_success() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"POST /api/v1/data_sources failed: {}",
|
||||
res.text().await?
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => Err(anyhow::anyhow!("POST /api/v1/data_sources failed: {}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
pub mod credentials;
|
||||
pub mod buster_credentials;
|
||||
pub mod model_files;
|
||||
pub mod profiles;
|
||||
pub mod project_files;
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
use anyhow::Result;
|
||||
use dirs::home_dir;
|
||||
use inquire::{Select, Text, Password};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_yaml::Value;
|
||||
use std::collections::HashMap;
|
||||
use tokio::fs;
|
||||
|
||||
use crate::utils::{BusterClient, PostDataSourcesRequest};
|
||||
|
||||
use super::buster_credentials::BusterCredentials;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct DbtProfiles {
|
||||
#[serde(flatten)]
|
||||
|
@ -20,24 +23,129 @@ pub struct Profile {
|
|||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Output {
|
||||
#[serde(rename = "type")]
|
||||
pub db_type: DbType,
|
||||
pub schema: String,
|
||||
pub threads: u32,
|
||||
pub threads: Option<u32>,
|
||||
// TODO: Make this a struct for each of the different db types
|
||||
#[serde(flatten)]
|
||||
pub connection_config: Value,
|
||||
pub credential: Credential,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum DbType {
|
||||
Bigquery,
|
||||
Postgres,
|
||||
Redshift,
|
||||
Snowflake,
|
||||
#[serde(other)]
|
||||
Other,
|
||||
pub enum Credential {
|
||||
Postgres(PostgresCredentials),
|
||||
MySQL(MySqlCredentials),
|
||||
Bigquery(BigqueryCredentials),
|
||||
SqlServer(SqlServerCredentials),
|
||||
Redshift(PostgresCredentials),
|
||||
Databricks(DatabricksCredentials),
|
||||
Snowflake(SnowflakeCredentials),
|
||||
Starrocks(MySqlCredentials),
|
||||
}
|
||||
|
||||
impl Credential {
|
||||
pub fn get_db_type(&self) -> String {
|
||||
match self {
|
||||
Credential::Postgres(_) => "postgres".to_string(),
|
||||
Credential::MySQL(_) => "mysql".to_string(),
|
||||
Credential::Bigquery(_) => "bigquery".to_string(),
|
||||
Credential::SqlServer(_) => "sqlserver".to_string(),
|
||||
Credential::Redshift(_) => "redshift".to_string(),
|
||||
Credential::Databricks(_) => "databricks".to_string(),
|
||||
Credential::Snowflake(_) => "snowflake".to_string(),
|
||||
Credential::Starrocks(_) => "starrocks".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct AthenaCredentials {
|
||||
pub data_source: String,
|
||||
pub db_database: String,
|
||||
pub aws_access_key: String,
|
||||
pub aws_secret_access: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct BigqueryCredentials {
|
||||
pub credentials_json: Value,
|
||||
pub project_id: String,
|
||||
pub dataset_ids: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct DatabricksCredentials {
|
||||
pub host: String,
|
||||
pub api_key: String,
|
||||
pub warehouse_id: String,
|
||||
pub catalog_name: String,
|
||||
pub schemas: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct MariadbCredentials {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
pub jump_host: Option<String>,
|
||||
pub ssh_username: Option<String>,
|
||||
pub ssh_private_key: Option<String>,
|
||||
#[serde(rename = "schemas")]
|
||||
pub databases: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct MySqlCredentials {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
pub jump_host: Option<String>,
|
||||
pub ssh_username: Option<String>,
|
||||
pub ssh_private_key: Option<String>,
|
||||
#[serde(rename = "schemas")]
|
||||
pub databases: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct PostgresCredentials {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
#[serde(alias = "user")]
|
||||
pub username: String,
|
||||
#[serde(alias = "pass")]
|
||||
pub password: String,
|
||||
#[serde(rename = "dbname")]
|
||||
pub database: String,
|
||||
pub schema: String,
|
||||
pub jump_host: Option<String>,
|
||||
pub ssh_username: Option<String>,
|
||||
pub ssh_private_key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct SnowflakeCredentials {
|
||||
pub account_id: String,
|
||||
pub warehouse_id: String,
|
||||
pub database_id: String,
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
pub role: Option<String>,
|
||||
pub schemas: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct SqlServerCredentials {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub username: String,
|
||||
pub password: String,
|
||||
pub database: String,
|
||||
pub jump_host: Option<String>,
|
||||
pub ssh_username: Option<String>,
|
||||
pub ssh_private_key: Option<String>,
|
||||
pub schemas: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
pub async fn get_dbt_profiles_yml() -> Result<DbtProfiles> {
|
||||
|
@ -53,115 +161,50 @@ pub async fn get_dbt_profiles_yml() -> Result<DbtProfiles> {
|
|||
Ok(serde_yaml::from_str(&contents)?)
|
||||
}
|
||||
|
||||
pub async fn create_dbt_profiles_yml(profiles: Option<Vec<(String, Profile)>>) -> Result<()> {
|
||||
let mut path = home_dir().unwrap_or_default();
|
||||
path.push(".dbt");
|
||||
fs::create_dir_all(&path).await?;
|
||||
path.push("profiles.yml");
|
||||
|
||||
let profiles = DbtProfiles {
|
||||
profiles: profiles.unwrap_or_default().into_iter().collect(),
|
||||
};
|
||||
|
||||
fs::write(path, serde_yaml::to_string(&profiles)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn upsert_dbt_profiles(new_profiles: Vec<(String, Profile)>) -> Result<()> {
|
||||
let mut profiles = get_dbt_profiles_yml().await.unwrap_or(DbtProfiles {
|
||||
profiles: HashMap::new(),
|
||||
});
|
||||
|
||||
for (key, profile) in new_profiles {
|
||||
profiles.profiles.insert(key, profile);
|
||||
}
|
||||
|
||||
fs::write(
|
||||
home_dir().unwrap_or_default().join(".dbt/profiles.yml"),
|
||||
serde_yaml::to_string(&profiles)?,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_dbt_profiles(profile_keys: Vec<String>) -> Result<()> {
|
||||
let mut profiles = get_dbt_profiles_yml().await?;
|
||||
for key in profile_keys {
|
||||
profiles.profiles.remove(&key);
|
||||
}
|
||||
fs::write(
|
||||
home_dir().unwrap_or_default().join(".dbt/profiles.yml"),
|
||||
serde_yaml::to_string(&profiles)?,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_dbt_profile_names() -> Result<Vec<String>> {
|
||||
let profiles = get_dbt_profiles_yml().await?;
|
||||
Ok(profiles.profiles.keys().cloned().collect())
|
||||
}
|
||||
|
||||
pub async fn create_new_profile() -> Result<()> {
|
||||
let profile_name = match Text::new("Enter the name of the profile").prompt() {
|
||||
Ok(name) => name,
|
||||
Err(e) => anyhow::bail!("Failed to get user input: {}", e),
|
||||
// Returns a list of tuples containing the profile name, environment, and credentials.
|
||||
pub async fn get_dbt_profile_credentials(
|
||||
profile_names: &Vec<String>,
|
||||
) -> Result<Vec<(String, String, Credential)>> {
|
||||
let profiles = get_dbt_profiles_yml().await?;
|
||||
|
||||
let mut credentials = Vec::new();
|
||||
for name in profile_names {
|
||||
if let Some(profile) = profiles.profiles.get(name) {
|
||||
for (env, output) in &profile.outputs {
|
||||
credentials.push((name.clone(), env.clone(), output.credential.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(credentials)
|
||||
}
|
||||
|
||||
pub async fn upload_dbt_profiles_to_buster(
|
||||
credentials: Vec<(String, String, Credential)>,
|
||||
buster_creds: BusterCredentials,
|
||||
) -> Result<()> {
|
||||
let buster = BusterClient::new(buster_creds.url, buster_creds.api_key)?;
|
||||
|
||||
let mut req_body = Vec::new();
|
||||
for (name, env, cred) in credentials {
|
||||
req_body.push(PostDataSourcesRequest {
|
||||
name,
|
||||
env,
|
||||
credential: cred,
|
||||
});
|
||||
}
|
||||
|
||||
if let Err(e) = buster.post_data_sources(req_body).await {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to upload dbt profiles to Buster: {}",
|
||||
e
|
||||
));
|
||||
};
|
||||
|
||||
let profile_type = match Select::new(
|
||||
"Select the type of profile",
|
||||
vec!["Bigquery", "Postgres", "Redshift", "Snowflake"],
|
||||
)
|
||||
.prompt()
|
||||
{
|
||||
Ok(name) => name,
|
||||
Err(e) => anyhow::bail!("Failed to get user input: {}", e),
|
||||
};
|
||||
|
||||
let profile = match profile_type {
|
||||
"Postgres" => create_new_postgres_profile().await?,
|
||||
_ => anyhow::bail!("Profile type not yet implemented"),
|
||||
};
|
||||
|
||||
upsert_dbt_profiles(vec![(profile_name, profile)]).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_new_postgres_profile() -> Result<Profile> {
|
||||
let host = Text::new("Enter host").prompt()?;
|
||||
let user = Text::new("Enter username").prompt()?;
|
||||
let password = Password::new("Enter password").prompt()?;
|
||||
let port = Text::new("Enter port (default: 5432)")
|
||||
.with_default("5432")
|
||||
.prompt()?
|
||||
.parse::<u32>()?;
|
||||
let dbname = Text::new("Enter database name").prompt()?;
|
||||
let schema = Text::new("Enter schema").prompt()?;
|
||||
let threads = Text::new("Enter number of threads (default: 1)")
|
||||
.with_default("1")
|
||||
.prompt()?
|
||||
.parse::<u32>()?;
|
||||
|
||||
let connection_config = serde_json::json!({
|
||||
"host": host,
|
||||
"user": user,
|
||||
"password": password,
|
||||
"port": port,
|
||||
"dbname": dbname,
|
||||
});
|
||||
|
||||
Ok(Profile {
|
||||
target: "dev".to_string(),
|
||||
outputs: [(
|
||||
"dev".to_string(),
|
||||
Output {
|
||||
db_type: DbType::Postgres,
|
||||
schema,
|
||||
threads,
|
||||
connection_config: serde_yaml::to_value(connection_config)?,
|
||||
},
|
||||
)]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue