bigquery support

This commit is contained in:
dal 2025-02-26 07:45:22 -07:00
parent ca0d8704c2
commit a70389b4e8
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
6 changed files with 180 additions and 21 deletions

View File

@ -107,7 +107,7 @@ jobs:
- name: Get version
id: get_version
run: |
VERSION=0.0.6
VERSION=0.0.7
echo "version=$VERSION" >> $GITHUB_OUTPUT
- name: Create Release

View File

@ -31,7 +31,6 @@ pub struct AthenaCredentials {
pub struct BigqueryCredentials {
pub credentials_json: Value,
pub project_id: String,
pub dataset_ids: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]

View File

@ -233,18 +233,7 @@ async fn get_bigquery_tables_and_views(
Err(e) => return Err(e),
};
let schema_string = if let Some(datasets) = &credentials.dataset_ids {
format!(
"IN ({})",
datasets
.iter()
.map(|s| format!("'{}'", s))
.collect::<Vec<String>>()
.join(", ")
)
} else {
"NOT IN ('INFORMATION_SCHEMA')".to_string()
};
let schema_string = "NOT IN ('INFORMATION_SCHEMA')".to_string();
let tables_and_views_query = format!(
"

View File

@ -1,6 +1,6 @@
[package]
name = "buster-cli"
version = "0.0.6"
version = "0.0.7"
edition = "2021"
build = "build.rs"

View File

@ -12,7 +12,7 @@ use std::time::Duration;
use crate::utils::{
buster_credentials::get_and_validate_buster_credentials,
profiles::{Credential, PostgresCredentials},
profiles::{BigqueryCredentials, Credential, PostgresCredentials},
BusterClient, BusterConfig, PostDataSourcesRequest,
};
@ -120,12 +120,15 @@ pub async fn init(destination_path: Option<&str>) -> Result<()> {
DatabaseType::Postgres => {
setup_postgres(buster_creds.url, buster_creds.api_key, &config_path).await
}
DatabaseType::BigQuery => {
setup_bigquery(buster_creds.url, buster_creds.api_key, &config_path).await
}
_ => {
println!(
"{}",
format!("{} support is coming soon!", db_type).yellow()
);
println!("Currently, only Redshift and Postgres are supported.");
println!("Currently, only Redshift, Postgres, and BigQuery are supported.");
Err(anyhow::anyhow!("Database type not yet implemented"))
}
}
@ -374,7 +377,7 @@ async fn setup_postgres(
// Collect port (with validation)
let port_str = Text::new("Enter the PostgreSQL port:")
.with_default("5432") // Default Postgres port is 5432
.with_default("5432") // Default Postgres port is 5432
.with_help_message("Default PostgreSQL port is 5432")
.with_validator(|input: &str| match input.parse::<u16>() {
Ok(_) => Ok(Validation::Valid),
@ -419,7 +422,7 @@ async fn setup_postgres(
// Collect schema (optional)
let schema = Text::new("Enter the PostgreSQL schema (optional):")
.with_help_message("Leave blank to access all available schemas")
.with_default("public") // Default Postgres schema is usually 'public'
.with_default("public") // Default Postgres schema is usually 'public'
.prompt()?;
let schema = if schema.trim().is_empty() {
None
@ -524,6 +527,172 @@ async fn setup_postgres(
}
}
async fn setup_bigquery(
buster_url: String,
buster_api_key: String,
config_path: &Path,
) -> Result<()> {
println!("{}", "Setting up BigQuery connection...".bold().green());
// Collect name (with validation)
let name_regex = Regex::new(r"^[a-zA-Z0-9_-]+$")?;
let name = Text::new("Enter a unique name for this data source:")
.with_help_message("Only alphanumeric characters, dash (-) and underscore (_) allowed")
.with_validator(move |input: &str| {
if input.trim().is_empty() {
return Ok(Validation::Invalid("Name cannot be empty".into()));
}
if name_regex.is_match(input) {
Ok(Validation::Valid)
} else {
Ok(Validation::Invalid(
"Name must contain only alphanumeric characters, dash (-) or underscore (_)"
.into(),
))
}
})
.prompt()?;
// Collect project ID
let project_id = Text::new("Enter the Google Cloud project ID:")
.with_help_message("Example: my-project-123456")
.with_validator(|input: &str| {
if input.trim().is_empty() {
return Ok(Validation::Invalid("Project ID cannot be empty".into()));
}
Ok(Validation::Valid)
})
.prompt()?;
// Collect dataset ID (optional)
let dataset_id = Text::new("Enter the BigQuery dataset ID (optional):")
.with_help_message("Leave blank to access all available datasets")
.prompt()?;
let dataset_id = if dataset_id.trim().is_empty() {
None
} else {
Some(dataset_id)
};
// Collect credentials JSON
println!(
"\n{}",
"BigQuery requires a service account credentials JSON file.".bold()
);
println!(
"You can create one in the Google Cloud Console under IAM & Admin > Service Accounts."
);
let credentials_path = Text::new("Enter the path to your credentials JSON file:")
.with_help_message("Example: /path/to/credentials.json")
.with_validator(|input: &str| {
let path = Path::new(input);
if !path.exists() {
return Ok(Validation::Invalid("File does not exist".into()));
}
if !path.is_file() {
return Ok(Validation::Invalid("Path is not a file".into()));
}
Ok(Validation::Valid)
})
.prompt()?;
// Read credentials file
let credentials_content = match fs::read_to_string(&credentials_path) {
Ok(content) => content,
Err(e) => {
return Err(anyhow::anyhow!("Failed to read credentials file: {}", e));
}
};
// Parse JSON to ensure it's valid
let credentials_json: serde_yaml::Value = match serde_yaml::from_str(&credentials_content) {
Ok(json) => json,
Err(e) => {
return Err(anyhow::anyhow!("Invalid JSON in credentials file: {}", e));
}
};
// Show summary and confirm
println!("\n{}", "Connection Summary:".bold());
println!("Name: {}", name.cyan());
println!("Project ID: {}", project_id.cyan());
// Display dataset ID with clear indication if it's empty
if let Some(ds) = &dataset_id {
println!("Dataset ID: {}", ds.cyan());
} else {
println!("Dataset ID: {}", "All datasets (null)".cyan());
}
println!("Credentials: {}", credentials_path.cyan());
let confirm = Confirm::new("Do you want to create this data source?")
.with_default(true)
.prompt()?;
if !confirm {
println!("{}", "Data source creation cancelled.".yellow());
return Ok(());
}
// Create API request
let request = PostDataSourcesRequest {
name: name.clone(),
env: "dev".to_string(), // Default to dev environment
credential: Credential::Bigquery(BigqueryCredentials {
credentials_json,
project_id: project_id.clone(),
dataset_ids: dataset_id.as_ref().map(|id| vec![id.clone()]),
}),
};
// Send to API with progress indicator
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::default_spinner()
.tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈ ")
.template("{spinner:.green} {msg}")
.unwrap(),
);
spinner.set_message("Sending credentials to Buster API...");
spinner.enable_steady_tick(Duration::from_millis(100));
let client = BusterClient::new(buster_url, buster_api_key)?;
match client.post_data_sources(vec![request]).await {
Ok(_) => {
spinner.finish_with_message(
"✓ Data source created successfully!"
.green()
.bold()
.to_string(),
);
println!(
"\nData source '{}' is now available for use with Buster.",
name.cyan()
);
// Create buster.yml file
create_buster_config_file(
config_path,
&name,
Some(&project_id), // Project ID maps to database
dataset_id.as_deref(), // Dataset ID maps to schema
)?;
println!("You can now use this data source with other Buster commands.");
Ok(())
}
Err(e) => {
spinner.finish_with_message("✗ Failed to create data source".red().bold().to_string());
println!("\nError: {}", e);
println!("Please check your credentials and try again.");
Err(anyhow::anyhow!("Failed to create data source: {}", e))
}
}
}
// Helper function to create buster.yml file
fn create_buster_config_file(
path: &Path,

View File

@ -10,8 +10,10 @@ use walkdir::WalkDir;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct BusterConfig {
pub data_source_name: Option<String>,
pub schema: Option<String>,
pub database: Option<String>,
#[serde(alias = "dataset_id")] // BigQuery alias for schema
pub schema: Option<String>, // For SQL DBs: schema, For BigQuery: dataset ID
#[serde(alias = "project_id")] // BigQuery alias for database
pub database: Option<String>, // For SQL DBs: database, For BigQuery: project ID
pub exclude_files: Option<Vec<String>>,
pub exclude_tags: Option<Vec<String>>,
}