diff --git a/.github/workflows/cli-release.yml b/.github/workflows/cli-release.yml index 1b15d3982..27083fd6b 100644 --- a/.github/workflows/cli-release.yml +++ b/.github/workflows/cli-release.yml @@ -107,7 +107,7 @@ jobs: - name: Get version id: get_version run: | - VERSION=0.0.3 + VERSION=0.0.4 echo "version=$VERSION" >> $GITHUB_OUTPUT - name: Create Release diff --git a/api/src/routes/rest/routes/datasets/deploy_datasets.rs b/api/src/routes/rest/routes/datasets/deploy_datasets.rs index 7902a7d96..217d1dbe8 100644 --- a/api/src/routes/rest/routes/datasets/deploy_datasets.rs +++ b/api/src/routes/rest/routes/datasets/deploy_datasets.rs @@ -618,282 +618,4 @@ async fn deploy_datasets_handler( } Ok(results) -} - -async fn batch_validate_datasets( - user_id: &Uuid, - requests: Vec, -) -> Result { - let mut successes = Vec::new(); - let mut failures = Vec::new(); - let organization_id = get_user_organization_id(user_id).await?; - - // Group requests by data source and database for efficient validation - let mut data_source_groups: HashMap< - (String, Option), - Vec<(&DatasetValidationRequest, Vec<(&str, &str)>)>, - > = HashMap::new(); - - for request in &requests { - let columns: Vec<(&str, &str)> = request - .columns - .iter() - .map(|c| (c.name.as_str(), c.type_.as_deref().unwrap_or("text"))) - .collect(); - - data_source_groups - .entry((request.data_source_name.clone(), None)) // Using None for database since it's not in the validation request - .or_default() - .push((request, columns)); - } - - // Process each data source group - for ((data_source_name, database), group) in data_source_groups { - let mut conn = get_pg_pool().get().await?; - - // Get data source - let data_source = match data_sources::table - .filter(data_sources::name.eq(&data_source_name)) - .filter(data_sources::organization_id.eq(organization_id)) - .select(data_sources::all_columns) - .first::(&mut conn) - .await - { - Ok(ds) => ds, - Err(e) => { - for (request, _) in group { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: vec![ValidationError::data_source_error(format!( - "Data source not found: {}", - e - ))], - }); - } - continue; - } - }; - - // Prepare tables for batch validation - let tables_to_validate: Vec<(String, String)> = group - .iter() - .map(|(req, _)| (req.name.clone(), req.schema.clone())) - .collect(); - - // Get credentials - let credentials = - match get_data_source_credentials(&data_source.secret_id, &data_source.type_, false) - .await - { - Ok(creds) => creds, - Err(e) => { - for (request, _) in group { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: vec![ValidationError::data_source_error(format!( - "Failed to get data source credentials: {}", - e - ))], - }); - } - continue; - } - }; - - // Get all columns in one batch - let ds_columns = - match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, database).await { - Ok(cols) => cols, - Err(e) => { - for (request, _) in group { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: vec![ValidationError::data_source_error(format!( - "Failed to get columns from data source: {}", - e - ))], - }); - } - continue; - } - }; - - // Validate each dataset in the group - for (request, columns) in group { - let mut validation_errors = Vec::new(); - - // Filter columns for this dataset - let dataset_columns: Vec<_> = ds_columns - .iter() - .filter(|col| col.dataset_name == request.name && col.schema_name == request.schema) - .collect(); - - if dataset_columns.is_empty() { - validation_errors.push(ValidationError::table_not_found(&request.name)); - } else { - // Validate each column exists - for (col_name, _) in &columns { - if !dataset_columns.iter().any(|c| c.name == *col_name) { - validation_errors.push(ValidationError::column_not_found(col_name)); - } - } - } - - if validation_errors.is_empty() { - // Create or update dataset - match create_or_update_dataset(request, &organization_id, user_id).await { - Ok(dataset_id) => { - successes.push(DatasetValidationSuccess { - dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - }); - } - Err(e) => { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: vec![ValidationError::data_source_error(format!( - "Failed to create/update dataset: {}", - e - ))], - }); - } - } - } else { - failures.push(DatasetValidationFailure { - dataset_id: request.dataset_id, - name: request.name.clone(), - schema: request.schema.clone(), - data_source_name: request.data_source_name.clone(), - errors: validation_errors, - }); - } - } - } - - Ok(BatchValidationResult { - successes, - failures, - }) -} - -async fn create_or_update_dataset( - request: &DatasetValidationRequest, - organization_id: &Uuid, - user_id: &Uuid, -) -> Result { - let mut conn = get_pg_pool().get().await?; - let now = Utc::now(); - - let dataset_id = match request.dataset_id { - Some(id) => { - // Update existing dataset - diesel::update(datasets::table) - .filter(datasets::id.eq(id)) - .set(( - datasets::name.eq(&request.name), - datasets::updated_at.eq(now), - datasets::updated_by.eq(user_id), - )) - .execute(&mut conn) - .await?; - id - } - None => { - // Create new dataset - let dataset = Dataset { - id: Uuid::new_v4(), - name: request.name.clone(), - data_source_id: Uuid::new_v4(), // This needs to be set correctly - created_at: now, - updated_at: now, - database_name: request.name.clone(), - when_to_use: None, - when_not_to_use: None, - type_: DatasetType::View, - definition: String::new(), - schema: request.schema.clone(), - enabled: false, - created_by: user_id.clone(), - updated_by: user_id.clone(), - deleted_at: None, - imported: false, - organization_id: organization_id.clone(), - yml_file: None, - model: None, - database_identifier: None, - }; - - diesel::insert_into(datasets::table) - .values(&dataset) - .execute(&mut conn) - .await?; - - dataset.id - } - }; - - // Create new columns - let new_columns: Vec = request - .columns - .iter() - .map(|col| DatasetColumn { - id: Uuid::new_v4(), - dataset_id, - name: col.name.clone(), - type_: col.type_.clone().unwrap_or_else(|| "text".to_string()), - description: Some(col.description.clone()), - nullable: true, // This should be determined from the source - created_at: now, - updated_at: now, - deleted_at: None, - stored_values: None, - stored_values_status: None, - stored_values_error: None, - stored_values_count: None, - stored_values_last_synced: None, - semantic_type: col.semantic_type.clone(), - dim_type: None, - expr: col.expr.clone(), - }) - .collect(); - - // Get current column names for this dataset - let current_column_names: Vec = dataset_columns::table - .filter(dataset_columns::dataset_id.eq(dataset_id)) - .filter(dataset_columns::deleted_at.is_null()) - .select(dataset_columns::name) - .load::(&mut conn) - .await?; - - // Soft delete columns that are no longer present - let new_column_names: Vec = new_columns.iter().map(|c| c.name.clone()).collect(); - diesel::update(dataset_columns::table) - .filter(dataset_columns::dataset_id.eq(dataset_id)) - .filter(dataset_columns::deleted_at.is_null()) - .filter(dataset_columns::name.ne_all(&new_column_names)) - .set(dataset_columns::deleted_at.eq(now)) - .execute(&mut conn) - .await?; - - // Insert new columns - diesel::insert_into(dataset_columns::table) - .values(&new_columns) - .execute(&mut conn) - .await?; - - Ok(dataset_id) -} +} \ No newline at end of file diff --git a/api/src/routes/rest/routes/datasets/generate_datasets.rs b/api/src/routes/rest/routes/datasets/generate_datasets.rs index a0507b521..efe05ed9d 100644 --- a/api/src/routes/rest/routes/datasets/generate_datasets.rs +++ b/api/src/routes/rest/routes/datasets/generate_datasets.rs @@ -94,25 +94,25 @@ fn map_snowflake_type(type_str: &str) -> ColumnMappingType { match type_upper.as_str() { // Numeric types that should be measures "NUMBER" | "DECIMAL" | "NUMERIC" | "FLOAT" | "REAL" | "DOUBLE" | "INT" | "INTEGER" | - "BIGINT" | "SMALLINT" | "TINYINT" | "BYTEINT" => ColumnMappingType::Measure("number".to_string()), + "BIGINT" | "SMALLINT" | "TINYINT" | "BYTEINT" => ColumnMappingType::Measure(type_str.to_string()), // Date/Time types "DATE" | "DATETIME" | "TIME" | "TIMESTAMP" | "TIMESTAMP_LTZ" | - "TIMESTAMP_NTZ" | "TIMESTAMP_TZ" => ColumnMappingType::Dimension("timestamp".to_string()), + "TIMESTAMP_NTZ" | "TIMESTAMP_TZ" => ColumnMappingType::Dimension(type_str.to_string()), // String types - "TEXT" | "STRING" | "VARCHAR" | "CHAR" | "CHARACTER" => ColumnMappingType::Dimension("string".to_string()), + "TEXT" | "STRING" | "VARCHAR" | "CHAR" | "CHARACTER" => ColumnMappingType::Dimension(type_str.to_string()), // Boolean type - "BOOLEAN" | "BOOL" => ColumnMappingType::Dimension("boolean".to_string()), + "BOOLEAN" | "BOOL" => ColumnMappingType::Dimension(type_str.to_string()), // Unsupported types "ARRAY" | "OBJECT" | "VARIANT" => ColumnMappingType::Unsupported, // Default to dimension for unknown types _ => { - tracing::warn!("Unknown Snowflake type: {}, defaulting to string dimension", type_str); - ColumnMappingType::Dimension("string".to_string()) + tracing::warn!("Unknown Snowflake type: {}, defaulting to dimension", type_str); + ColumnMappingType::Dimension(type_str.to_string()) } } } diff --git a/cli/Cargo.toml b/cli/Cargo.toml index f0f28d7ed..4f603c593 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "buster-cli" -version = "0.0.3" +version = "0.0.4" edition = "2021" build = "build.rs" diff --git a/cli/src/commands/deploy_v2.rs b/cli/src/commands/deploy_v2.rs index 93dca4356..fc4af3c40 100644 --- a/cli/src/commands/deploy_v2.rs +++ b/cli/src/commands/deploy_v2.rs @@ -70,6 +70,8 @@ pub struct Measure { expr: String, agg: String, description: String, + #[serde(rename = "type")] + measure_type: Option, } #[derive(Debug)] @@ -477,9 +479,9 @@ impl ModelFile { description: measure.description.clone(), semantic_type: Some("measure".to_string()), expr: Some(measure.expr.clone()), - type_: None, + type_: measure.measure_type.clone(), agg: Some(measure.agg.clone()), - searchable: false, // Measures don't have stored values + searchable: false, }); }