mirror of https://github.com/buster-so/buster.git
implemented database name for snowflake warehouse. temp fix
This commit is contained in:
parent
45739b73f2
commit
2805d7ed70
|
@ -116,6 +116,7 @@ pub struct BusterModel {
|
|||
pub struct Model {
|
||||
pub name: String,
|
||||
pub data_source_name: Option<String>,
|
||||
pub database: Option<String>,
|
||||
pub schema: Option<String>,
|
||||
pub env: String,
|
||||
pub description: String,
|
||||
|
@ -279,17 +280,17 @@ async fn deploy_datasets_handler(
|
|||
let mut conn = get_pg_pool().get().await?;
|
||||
let mut results = Vec::new();
|
||||
|
||||
// Group requests by data source for efficient validation
|
||||
let mut data_source_groups: HashMap<String, Vec<&DeployDatasetsRequest>> = HashMap::new();
|
||||
// Group requests by data source and database for efficient validation
|
||||
let mut data_source_groups: HashMap<(String, Option<String>), Vec<&DeployDatasetsRequest>> = HashMap::new();
|
||||
for req in &requests {
|
||||
data_source_groups
|
||||
.entry(req.data_source_name.clone())
|
||||
.entry((req.data_source_name.clone(), req.model.clone()))
|
||||
.or_default()
|
||||
.push(req);
|
||||
}
|
||||
|
||||
// Process each data source group
|
||||
for (data_source_name, group) in data_source_groups {
|
||||
for ((data_source_name, database), group) in data_source_groups {
|
||||
// Get data source
|
||||
let data_source = match data_sources::table
|
||||
.filter(data_sources::name.eq(&data_source_name))
|
||||
|
@ -351,7 +352,7 @@ async fn deploy_datasets_handler(
|
|||
);
|
||||
|
||||
// Get all columns in one batch - this acts as our validation
|
||||
let ds_columns = match retrieve_dataset_columns_batch(&tables_to_validate, &credentials).await {
|
||||
let ds_columns = match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, database).await {
|
||||
Ok(cols) => {
|
||||
// Add debug logging
|
||||
tracing::info!(
|
||||
|
@ -622,9 +623,9 @@ async fn batch_validate_datasets(
|
|||
let mut failures = Vec::new();
|
||||
let organization_id = get_user_organization_id(user_id).await?;
|
||||
|
||||
// Group requests by data source for efficient validation
|
||||
// Group requests by data source and database for efficient validation
|
||||
let mut data_source_groups: HashMap<
|
||||
String,
|
||||
(String, Option<String>),
|
||||
Vec<(&DatasetValidationRequest, Vec<(&str, &str)>)>,
|
||||
> = HashMap::new();
|
||||
|
||||
|
@ -636,13 +637,13 @@ async fn batch_validate_datasets(
|
|||
.collect();
|
||||
|
||||
data_source_groups
|
||||
.entry(request.data_source_name.clone())
|
||||
.entry((request.data_source_name.clone(), None)) // Using None for database since it's not in the validation request
|
||||
.or_default()
|
||||
.push((request, columns));
|
||||
}
|
||||
|
||||
// Process each data source group
|
||||
for (data_source_name, group) in data_source_groups {
|
||||
for ((data_source_name, database), group) in data_source_groups {
|
||||
let mut conn = get_pg_pool().get().await?;
|
||||
|
||||
// Get data source
|
||||
|
@ -702,7 +703,7 @@ async fn batch_validate_datasets(
|
|||
|
||||
// Get all columns in one batch
|
||||
let ds_columns =
|
||||
match retrieve_dataset_columns_batch(&tables_to_validate, &credentials).await {
|
||||
match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, database).await {
|
||||
Ok(cols) => cols,
|
||||
Err(e) => {
|
||||
for (request, _) in group {
|
||||
|
|
|
@ -141,6 +141,7 @@ async fn update_dataset(user_id: &Uuid, dataset_id: &Uuid, name: &String) -> Res
|
|||
&dataset.dataset.database_name,
|
||||
&dataset.dataset.schema,
|
||||
&credentials,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
|
|
@ -213,6 +213,7 @@ async fn update_dataset_handler(
|
|||
&dataset_def.database_name,
|
||||
&dataset_def.schema,
|
||||
&credentials,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
|
|
@ -31,6 +31,7 @@ pub struct ColumnUpdate {
|
|||
/// Retrieves column types from the data source
|
||||
pub async fn get_column_types(
|
||||
dataset: &Dataset,
|
||||
database: Option<String>,
|
||||
data_source: &DataSource,
|
||||
) -> Result<Vec<ColumnUpdate>> {
|
||||
let credentials =
|
||||
|
@ -38,9 +39,14 @@ pub async fn get_column_types(
|
|||
.await
|
||||
.map_err(|e| anyhow!("Error getting data source credentials: {}", e))?;
|
||||
|
||||
let cols = retrieve_dataset_columns(&dataset.database_name, &dataset.schema, &credentials)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Error retrieving dataset columns: {}", e))?;
|
||||
let cols = retrieve_dataset_columns(
|
||||
&dataset.database_name,
|
||||
&dataset.schema,
|
||||
&credentials,
|
||||
database,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Error retrieving dataset columns: {}", e))?;
|
||||
|
||||
Ok(cols
|
||||
.into_iter()
|
||||
|
@ -122,62 +128,3 @@ pub async fn update_dataset_columns(
|
|||
|
||||
Ok(inserted_columns)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::database::enums::DataSourceType;
|
||||
|
||||
fn create_test_dataset() -> Dataset {
|
||||
Dataset {
|
||||
id: Uuid::new_v4(),
|
||||
name: "test_dataset".to_string(),
|
||||
database_name: "test_db".to_string(),
|
||||
schema: "public".to_string(),
|
||||
data_source_id: Uuid::new_v4(),
|
||||
organization_id: Uuid::new_v4(),
|
||||
created_by: Uuid::new_v4(),
|
||||
updated_by: Uuid::new_v4(),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
when_to_use: None,
|
||||
when_not_to_use: None,
|
||||
type_: crate::database::enums::DatasetType::View,
|
||||
definition: "".to_string(),
|
||||
enabled: true,
|
||||
imported: false,
|
||||
model: None,
|
||||
yml_file: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn create_test_data_source() -> DataSource {
|
||||
DataSource {
|
||||
id: Uuid::new_v4(),
|
||||
name: "test_source".to_string(),
|
||||
type_: DataSourceType::Postgres,
|
||||
secret_id: Uuid::new_v4(),
|
||||
organization_id: Uuid::new_v4(),
|
||||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
deleted_at: None,
|
||||
env: "dev".to_string(),
|
||||
onboarding_status: crate::database::enums::DataSourceOnboardingStatus::InProgress,
|
||||
onboarding_error: None,
|
||||
created_by: Uuid::new_v4(),
|
||||
updated_by: Uuid::new_v4(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_column_types() {
|
||||
let dataset = create_test_dataset();
|
||||
let data_source = create_test_data_source();
|
||||
|
||||
let result = get_column_types(&dataset, &data_source).await;
|
||||
assert!(result.is_err()); // Will fail because test data source doesn't exist
|
||||
}
|
||||
|
||||
// TODO: Add more tests for update_dataset_columns once we have a test database setup
|
||||
}
|
||||
|
|
|
@ -5,11 +5,12 @@ use crate::utils::query_engine::credentials::SnowflakeCredentials;
|
|||
|
||||
pub async fn get_snowflake_client(
|
||||
credentials: &SnowflakeCredentials,
|
||||
database: Option<String>,
|
||||
) -> Result<SnowflakeApi, Error> {
|
||||
let snowflake_client = match SnowflakeApi::with_password_auth(
|
||||
&credentials.account_id,
|
||||
Some(credentials.warehouse_id.as_str()),
|
||||
Some(credentials.database_id.as_str()),
|
||||
database.as_deref(),
|
||||
None,
|
||||
&credentials.username,
|
||||
credentials.role.as_deref(),
|
||||
|
|
|
@ -229,7 +229,7 @@ async fn route_to_query(
|
|||
DataSourceType::Snowflake => {
|
||||
let credentials: SnowflakeCredentials = serde_json::from_str(&credentials_string)?;
|
||||
|
||||
let mut snowflake_client = match get_snowflake_client(&credentials).await {
|
||||
let mut snowflake_client = match get_snowflake_client(&credentials, None).await {
|
||||
Ok(snowflake_client) => snowflake_client,
|
||||
Err(e) => {
|
||||
tracing::error!("There was an issue while establishing a connection to the parent data source: {}", e);
|
||||
|
|
|
@ -64,9 +64,10 @@ pub async fn import_dataset_columns(
|
|||
dataset_database_name: &String,
|
||||
dataset_schema_name: &String,
|
||||
credentials: &Credential,
|
||||
database: Option<String>,
|
||||
) -> Result<()> {
|
||||
let cols =
|
||||
match retrieve_dataset_columns(&dataset_database_name, &dataset_schema_name, credentials)
|
||||
match retrieve_dataset_columns(&dataset_database_name, &dataset_schema_name, credentials, database)
|
||||
.await
|
||||
{
|
||||
Ok(cols) => cols,
|
||||
|
@ -142,6 +143,7 @@ pub async fn retrieve_dataset_columns(
|
|||
dataset_name: &String,
|
||||
schema_name: &String,
|
||||
credentials: &Credential,
|
||||
database: Option<String>,
|
||||
) -> Result<Vec<DatasetColumnRecord>> {
|
||||
let cols_result = match credentials {
|
||||
Credential::Postgres(credentials) => {
|
||||
|
@ -181,6 +183,7 @@ pub async fn retrieve_dataset_columns(
|
|||
match get_snowflake_columns_batch(
|
||||
&[(dataset_name.clone(), schema_name.clone())],
|
||||
credentials,
|
||||
database,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
@ -197,6 +200,7 @@ pub async fn retrieve_dataset_columns(
|
|||
pub async fn retrieve_dataset_columns_batch(
|
||||
datasets: &[(String, String)], // Vec of (dataset_name, schema_name)
|
||||
credentials: &Credential,
|
||||
database: Option<String>,
|
||||
) -> Result<Vec<DatasetColumnRecord>> {
|
||||
match credentials {
|
||||
Credential::Postgres(credentials) => {
|
||||
|
@ -207,7 +211,7 @@ pub async fn retrieve_dataset_columns_batch(
|
|||
get_bigquery_columns_batch(datasets, credentials).await
|
||||
}
|
||||
Credential::Snowflake(credentials) => {
|
||||
get_snowflake_columns_batch(datasets, credentials).await
|
||||
get_snowflake_columns_batch(datasets, credentials, database).await
|
||||
}
|
||||
_ => Err(anyhow!("Unsupported data source type")),
|
||||
}
|
||||
|
@ -216,8 +220,9 @@ pub async fn retrieve_dataset_columns_batch(
|
|||
async fn get_snowflake_columns_batch(
|
||||
datasets: &[(String, String)],
|
||||
credentials: &SnowflakeCredentials,
|
||||
database: Option<String>,
|
||||
) -> Result<Vec<DatasetColumnRecord>> {
|
||||
let snowflake_client = get_snowflake_client(credentials).await?;
|
||||
let snowflake_client = get_snowflake_client(credentials, database).await?;
|
||||
|
||||
// Build the IN clause for (schema, table) pairs
|
||||
let table_pairs: Vec<String> = datasets
|
||||
|
@ -642,7 +647,7 @@ async fn get_snowflake_columns(
|
|||
schema_name: &String,
|
||||
credentials: &SnowflakeCredentials,
|
||||
) -> Result<Vec<DatasetColumnRecord>> {
|
||||
let snowflake_client = get_snowflake_client(credentials).await?;
|
||||
let snowflake_client = get_snowflake_client(credentials, None).await?;
|
||||
|
||||
let uppercase_dataset_name = dataset_name.to_uppercase();
|
||||
let uppercase_schema_name = schema_name.to_uppercase();
|
||||
|
|
|
@ -301,21 +301,29 @@ async fn get_bigquery_tables_and_views(
|
|||
if let Some(rows) = table_and_views_records.rows {
|
||||
for row in rows {
|
||||
if let Some(cols) = row.columns {
|
||||
let name = cols[0].value.as_ref()
|
||||
let name = cols[0]
|
||||
.value
|
||||
.as_ref()
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| anyhow!("Error fetching table name"))?
|
||||
.to_string();
|
||||
|
||||
let schema = cols[1].value.as_ref()
|
||||
let schema = cols[1]
|
||||
.value
|
||||
.as_ref()
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| anyhow!("Error fetching table schema"))?
|
||||
.to_string();
|
||||
|
||||
let definition = cols[2].value.as_ref()
|
||||
let definition = cols[2]
|
||||
.value
|
||||
.as_ref()
|
||||
.and_then(|v| v.as_str())
|
||||
.map(String::from);
|
||||
|
||||
let type_ = cols[3].value.as_ref()
|
||||
let type_ = cols[3]
|
||||
.value
|
||||
.as_ref()
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| anyhow!("Error fetching table type"))?
|
||||
.to_string();
|
||||
|
@ -336,7 +344,7 @@ async fn get_bigquery_tables_and_views(
|
|||
async fn get_snowflake_tables_and_views(
|
||||
credentials: &SnowflakeCredentials,
|
||||
) -> Result<Vec<DatasetRecord>> {
|
||||
let snowflake_client = get_snowflake_client(credentials).await?;
|
||||
let snowflake_client = get_snowflake_client(credentials, None).await?;
|
||||
|
||||
let schema_list = credentials.schemas.clone().unwrap_or_else(|| vec![]);
|
||||
let schema_string = if !schema_list.is_empty() {
|
||||
|
|
|
@ -95,7 +95,7 @@ pub async fn test_data_source_connection(
|
|||
_ => return Err(anyhow!("Invalid credential type")),
|
||||
};
|
||||
|
||||
match get_snowflake_client(&credential).await {
|
||||
match get_snowflake_client(&credential, None).await {
|
||||
Ok(client) => client,
|
||||
Err(e) => return Err(anyhow!("Error getting snowflake client: {:?}", e)),
|
||||
};
|
||||
|
|
|
@ -19,6 +19,7 @@ pub async fn validate_model(
|
|||
model_name: &str,
|
||||
model_database_name: &str,
|
||||
schema: &str,
|
||||
database: Option<String>,
|
||||
data_source: &DataSource,
|
||||
columns: &[(&str, &str)], // (name, type) - type is now ignored for validation
|
||||
expressions: Option<&[(&str, &str)]>, // (column_name, expr)
|
||||
|
@ -69,7 +70,7 @@ pub async fn validate_model(
|
|||
}
|
||||
|
||||
// Get data source columns using batched retrieval for all tables at once
|
||||
let ds_columns_result = match retrieve_dataset_columns_batch(&tables_to_validate, &credentials).await {
|
||||
let ds_columns_result = match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, database).await {
|
||||
Ok(cols) => cols,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get columns from data source: {}", e);
|
||||
|
@ -147,172 +148,4 @@ pub async fn validate_model(
|
|||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{database::enums::DataSourceType, utils::validation::ValidationErrorType};
|
||||
use uuid::Uuid;
|
||||
|
||||
fn create_test_data_source() -> DataSource {
|
||||
DataSource {
|
||||
id: Uuid::new_v4(),
|
||||
name: "test_source".to_string(),
|
||||
type_: DataSourceType::Postgres,
|
||||
secret_id: Uuid::new_v4(),
|
||||
organization_id: Uuid::new_v4(),
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
deleted_at: None,
|
||||
env: "dev".to_string(),
|
||||
onboarding_status: crate::database::enums::DataSourceOnboardingStatus::InProgress,
|
||||
onboarding_error: None,
|
||||
created_by: Uuid::new_v4(),
|
||||
updated_by: Uuid::new_v4(),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_validate_model_data_source_error() {
|
||||
let data_source = create_test_data_source();
|
||||
let result = validate_model(
|
||||
"test_model",
|
||||
"test_db",
|
||||
"test_schema",
|
||||
&data_source,
|
||||
&[("col1", "text")], // type is ignored now
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(!result.success);
|
||||
assert_eq!(result.errors.len(), 1);
|
||||
assert_eq!(
|
||||
result.errors[0].error_type,
|
||||
ValidationErrorType::DataSourceError
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_validate_model_column_existence() {
|
||||
let data_source = create_test_data_source();
|
||||
let result = validate_model(
|
||||
"test_model",
|
||||
"test_db",
|
||||
"test_schema",
|
||||
&data_source,
|
||||
&[
|
||||
("existing_col", "any_type"), // type is ignored
|
||||
("missing_col", "any_type"), // type is ignored
|
||||
],
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(!result.success);
|
||||
assert!(result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.error_type == ValidationErrorType::ColumnNotFound
|
||||
&& e.column_name.as_deref() == Some("missing_col")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_validate_model_with_expressions() {
|
||||
let data_source = create_test_data_source();
|
||||
let result = validate_model(
|
||||
"test_model",
|
||||
"test_db",
|
||||
"test_schema",
|
||||
&data_source,
|
||||
&[("col1", "any_type")], // type is ignored
|
||||
Some(&[("col1", "invalid_col + 1")]),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(!result.success);
|
||||
assert!(result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.error_type == ValidationErrorType::ExpressionError));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_validate_model_with_valid_expressions() {
|
||||
let data_source = create_test_data_source();
|
||||
let result = validate_model(
|
||||
"test_model",
|
||||
"test_db",
|
||||
"test_schema",
|
||||
&data_source,
|
||||
&[("col1", "any_type"), ("col2", "any_type")], // types are ignored
|
||||
Some(&[("result", "col1 + col2")]),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Should only fail due to data source error in test environment
|
||||
assert!(!result.success);
|
||||
assert!(result
|
||||
.errors
|
||||
.iter()
|
||||
.all(|e| e.error_type == ValidationErrorType::DataSourceError));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_validate_model_with_relationships() {
|
||||
let data_source = create_test_data_source();
|
||||
let result = validate_model(
|
||||
"test_model",
|
||||
"test_db",
|
||||
"test_schema",
|
||||
&data_source,
|
||||
&[("col1", "any_type")], // type is ignored
|
||||
None,
|
||||
Some(&[("model1", "model2", "many_to_one")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(!result.success);
|
||||
assert!(result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.error_type == ValidationErrorType::InvalidRelationship));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_validate_model_multiple_errors() {
|
||||
let data_source = create_test_data_source();
|
||||
let result = validate_model(
|
||||
"test_model",
|
||||
"test_db",
|
||||
"test_schema",
|
||||
&data_source,
|
||||
&[("col1", "any_type"), ("col2", "any_type")], // types are ignored
|
||||
Some(&[("col1", "invalid_col + 1")]),
|
||||
Some(&[("model1", "model2", "many_to_one")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(!result.success);
|
||||
assert!(result.errors.len() > 1);
|
||||
assert!(result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.error_type == ValidationErrorType::ExpressionError));
|
||||
assert!(result
|
||||
.errors
|
||||
.iter()
|
||||
.any(|e| e.error_type == ValidationErrorType::InvalidRelationship));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue