mirror of https://github.com/buster-so/buster.git
feat: enhance dataset validation and deployment error handling
- Add detailed validation error logging in CLI - Improve type compatibility checks in dataset validation - Modify deployment process to handle and report validation errors more comprehensively - Add Hash derive for Verification enum - Update API and CLI to support more informative validation results
This commit is contained in:
parent
3c82ac0774
commit
f081f3e16e
|
@ -441,6 +441,7 @@ impl FromSql<sql_types::VerificationEnum, Pg> for Verification {
|
||||||
Copy,
|
Copy,
|
||||||
PartialEq,
|
PartialEq,
|
||||||
Eq,
|
Eq,
|
||||||
|
Hash,
|
||||||
diesel::AsExpression,
|
diesel::AsExpression,
|
||||||
diesel::FromSqlRow,
|
diesel::FromSqlRow,
|
||||||
)]
|
)]
|
||||||
|
|
|
@ -219,23 +219,31 @@ pub async fn deploy_datasets(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if !validation.success {
|
results.push(validation.clone()); // Clone validation result before checking success
|
||||||
results.push(validation);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deploy model
|
// Only deploy if validation passed
|
||||||
match deploy_single_model(&req, &organization_id, &user.id).await {
|
if validation.success {
|
||||||
Ok(_) => results.push(validation),
|
// Deploy model
|
||||||
Err(e) => {
|
match deploy_single_model(&req, &organization_id, &user.id).await {
|
||||||
let mut validation = validation;
|
Ok(_) => (),
|
||||||
validation.success = false;
|
Err(e) => {
|
||||||
validation.add_error(ValidationError::data_source_error(e.to_string()));
|
let mut failed_validation = validation;
|
||||||
results.push(validation);
|
failed_validation.success = false;
|
||||||
|
failed_validation.add_error(ValidationError::data_source_error(e.to_string()));
|
||||||
|
results.pop(); // Remove the successful validation
|
||||||
|
results.push(failed_validation); // Add the failed one
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if any validations failed
|
||||||
|
let has_failures = results.iter().any(|r| !r.success);
|
||||||
|
if has_failures {
|
||||||
|
tracing::warn!("Some models failed validation");
|
||||||
|
return Ok(ApiResponse::JsonData(DeployDatasetsResponse { results }));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(ApiResponse::JsonData(DeployDatasetsResponse { results }))
|
Ok(ApiResponse::JsonData(DeployDatasetsResponse { results }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,10 @@ use crate::{
|
||||||
credentials::get_data_source_credentials,
|
credentials::get_data_source_credentials,
|
||||||
import_dataset_columns::retrieve_dataset_columns,
|
import_dataset_columns::retrieve_dataset_columns,
|
||||||
},
|
},
|
||||||
validation::types::{ValidationError, ValidationResult},
|
validation::{
|
||||||
|
types::{ValidationError, ValidationResult},
|
||||||
|
type_mapping::{normalize_type, types_compatible},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -72,11 +75,13 @@ pub async fn validate_model(
|
||||||
// Validate each column
|
// Validate each column
|
||||||
for (col_name, col_type) in columns {
|
for (col_name, col_type) in columns {
|
||||||
if let Some(ds_col) = ds_columns.iter().find(|c| c.name == *col_name) {
|
if let Some(ds_col) = ds_columns.iter().find(|c| c.name == *col_name) {
|
||||||
if !types_compatible(&ds_col.type_, col_type) {
|
if !types_compatible(data_source.type_, &ds_col.type_, col_type) {
|
||||||
|
let ds_type = normalize_type(data_source.type_, &ds_col.type_);
|
||||||
|
let model_type = normalize_type(data_source.type_, col_type);
|
||||||
result.add_error(ValidationError::type_mismatch(
|
result.add_error(ValidationError::type_mismatch(
|
||||||
col_name,
|
col_name,
|
||||||
col_type,
|
&model_type.to_string(),
|
||||||
&ds_col.type_,
|
&ds_type.to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -87,12 +92,6 @@ pub async fn validate_model(
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Basic type compatibility check - will be enhanced in Phase 2
|
|
||||||
fn types_compatible(ds_type: &str, model_type: &str) -> bool {
|
|
||||||
// For now, just check exact match
|
|
||||||
ds_type.to_lowercase() == model_type.to_lowercase()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
pub mod dataset_validation;
|
pub mod dataset_validation;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
pub mod type_mapping;
|
||||||
|
|
||||||
pub use dataset_validation::*;
|
pub use dataset_validation::*;
|
||||||
pub use types::*;
|
pub use types::*;
|
||||||
|
pub use type_mapping::*;
|
|
@ -4,7 +4,7 @@ use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
BusterClient, DeployDatasetsRequest, DeployDatasetsColumnsRequest, DeployDatasetsEntityRelationshipsRequest,
|
BusterClient, DeployDatasetsRequest, DeployDatasetsColumnsRequest, DeployDatasetsEntityRelationshipsRequest,
|
||||||
buster_credentials::get_and_validate_buster_credentials,
|
buster_credentials::get_and_validate_buster_credentials, ValidationResult, ValidationError, ValidationErrorType,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||||
|
@ -123,6 +123,81 @@ impl DeployProgress {
|
||||||
None => println!("⚠️ No SQL file found for '{}', using default SELECT", model_name),
|
None => println!("⚠️ No SQL file found for '{}', using default SELECT", model_name),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn log_validation_error(&self, validation: &ValidationResult) {
|
||||||
|
if !validation.success {
|
||||||
|
println!("\n❌ Validation failed for {}", validation.model_name);
|
||||||
|
println!(" Data Source: {}", validation.data_source_name);
|
||||||
|
println!(" Schema: {}", validation.schema);
|
||||||
|
|
||||||
|
// Group errors by type
|
||||||
|
let mut table_errors = Vec::new();
|
||||||
|
let mut column_errors = Vec::new();
|
||||||
|
let mut type_errors = Vec::new();
|
||||||
|
let mut other_errors = Vec::new();
|
||||||
|
|
||||||
|
for error in &validation.errors {
|
||||||
|
match error.error_type {
|
||||||
|
ValidationErrorType::TableNotFound => table_errors.push(error),
|
||||||
|
ValidationErrorType::ColumnNotFound => column_errors.push(error),
|
||||||
|
ValidationErrorType::TypeMismatch => type_errors.push(error),
|
||||||
|
ValidationErrorType::DataSourceError => other_errors.push(error),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print grouped errors
|
||||||
|
if !table_errors.is_empty() {
|
||||||
|
println!("\n Table/View Errors:");
|
||||||
|
for error in table_errors {
|
||||||
|
println!(" - {}", error.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !column_errors.is_empty() {
|
||||||
|
println!("\n Column Errors:");
|
||||||
|
for error in column_errors {
|
||||||
|
if let Some(col) = &error.column_name {
|
||||||
|
println!(" - Column '{}': {}", col, error.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !type_errors.is_empty() {
|
||||||
|
println!("\n Type Mismatch Errors:");
|
||||||
|
for error in type_errors {
|
||||||
|
if let Some(col) = &error.column_name {
|
||||||
|
println!(" - Column '{}': {}", col, error.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !other_errors.is_empty() {
|
||||||
|
println!("\n Other Errors:");
|
||||||
|
for error in other_errors {
|
||||||
|
println!(" - {}", error.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print suggestions if any
|
||||||
|
let suggestions: Vec<_> = validation.errors
|
||||||
|
.iter()
|
||||||
|
.filter_map(|e| e.suggestion.as_ref())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !suggestions.is_empty() {
|
||||||
|
println!("\n💡 Suggestions:");
|
||||||
|
for suggestion in suggestions {
|
||||||
|
println!(" - {}", suggestion);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn log_validation_success(&self, validation: &ValidationResult) {
|
||||||
|
println!("\n✅ Validation passed for {}", validation.model_name);
|
||||||
|
println!(" Data Source: {}", validation.data_source_name);
|
||||||
|
println!(" Schema: {}", validation.schema);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ModelFile {
|
impl ModelFile {
|
||||||
|
@ -513,21 +588,54 @@ pub async fn deploy_v2(path: Option<&str>) -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
let data_source_name = deploy_requests[0].data_source_name.clone();
|
let data_source_name = deploy_requests[0].data_source_name.clone();
|
||||||
if let Err(e) = client.deploy_datasets(deploy_requests).await {
|
match client.deploy_datasets(deploy_requests).await {
|
||||||
println!("\n❌ Deployment failed!");
|
Ok(response) => {
|
||||||
println!("Error: {}", e);
|
let mut has_validation_errors = false;
|
||||||
println!("\n💡 Troubleshooting:");
|
|
||||||
println!("1. Check data source:");
|
// Process validation results
|
||||||
println!(" - Verify '{}' exists in Buster", data_source_name);
|
for validation in &response.results {
|
||||||
println!(" - Confirm it has env='dev'");
|
if validation.success {
|
||||||
println!(" - Check your access permissions");
|
progress.log_validation_success(validation);
|
||||||
println!("2. Check model definitions:");
|
} else {
|
||||||
println!(" - Validate SQL syntax");
|
has_validation_errors = true;
|
||||||
println!(" - Verify column names match");
|
progress.log_validation_error(validation);
|
||||||
println!("3. Check relationships:");
|
}
|
||||||
println!(" - Ensure referenced models exist");
|
}
|
||||||
println!(" - Verify relationship types");
|
|
||||||
return Err(anyhow::anyhow!("Failed to deploy models to Buster: {}", e));
|
if has_validation_errors {
|
||||||
|
println!("\n❌ Deployment failed due to validation errors!");
|
||||||
|
println!("\n💡 Troubleshooting:");
|
||||||
|
println!("1. Check data source:");
|
||||||
|
println!(" - Verify '{}' exists in Buster", data_source_name);
|
||||||
|
println!(" - Confirm it has env='dev'");
|
||||||
|
println!(" - Check your access permissions");
|
||||||
|
println!("2. Check model definitions:");
|
||||||
|
println!(" - Validate SQL syntax");
|
||||||
|
println!(" - Verify column names match");
|
||||||
|
println!("3. Check relationships:");
|
||||||
|
println!(" - Ensure referenced models exist");
|
||||||
|
println!(" - Verify relationship types");
|
||||||
|
return Err(anyhow::anyhow!("Deployment failed due to validation errors"));
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("\n✅ All models deployed successfully!");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("\n❌ Deployment failed!");
|
||||||
|
println!("Error: {}", e);
|
||||||
|
println!("\n💡 Troubleshooting:");
|
||||||
|
println!("1. Check data source:");
|
||||||
|
println!(" - Verify '{}' exists in Buster", data_source_name);
|
||||||
|
println!(" - Confirm it has env='dev'");
|
||||||
|
println!(" - Check your access permissions");
|
||||||
|
println!("2. Check model definitions:");
|
||||||
|
println!(" - Validate SQL syntax");
|
||||||
|
println!(" - Verify column names match");
|
||||||
|
println!("3. Check relationships:");
|
||||||
|
println!(" - Ensure referenced models exist");
|
||||||
|
println!(" - Verify relationship types");
|
||||||
|
return Err(anyhow::anyhow!("Failed to deploy models to Buster: {}", e));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ use reqwest::{
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
PostDataSourcesRequest, DeployDatasetsRequest, ValidateApiKeyRequest, ValidateApiKeyResponse,
|
PostDataSourcesRequest, DeployDatasetsRequest, ValidateApiKeyRequest, ValidateApiKeyResponse,
|
||||||
|
DeployDatasetsResponse,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct BusterClient {
|
pub struct BusterClient {
|
||||||
|
@ -85,7 +86,7 @@ impl BusterClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn deploy_datasets(&self, req_body: Vec<DeployDatasetsRequest>) -> Result<()> {
|
pub async fn deploy_datasets(&self, req_body: Vec<DeployDatasetsRequest>) -> Result<DeployDatasetsResponse> {
|
||||||
let headers = self.build_headers()?;
|
let headers = self.build_headers()?;
|
||||||
|
|
||||||
match self
|
match self
|
||||||
|
@ -103,7 +104,7 @@ impl BusterClient {
|
||||||
res.text().await?
|
res.text().await?
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(res.json().await?)
|
||||||
}
|
}
|
||||||
Err(e) => Err(anyhow::anyhow!("POST /api/v1/datasets/deploy failed: {}", e)),
|
Err(e) => Err(anyhow::anyhow!("POST /api/v1/datasets/deploy failed: {}", e)),
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,3 +56,33 @@ pub struct DeployDatasetsEntityRelationshipsRequest {
|
||||||
#[serde(rename = "type")]
|
#[serde(rename = "type")]
|
||||||
pub type_: String,
|
pub type_: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct ValidationResult {
|
||||||
|
pub success: bool,
|
||||||
|
pub model_name: String,
|
||||||
|
pub data_source_name: String,
|
||||||
|
pub schema: String,
|
||||||
|
pub errors: Vec<ValidationError>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct ValidationError {
|
||||||
|
pub error_type: ValidationErrorType,
|
||||||
|
pub column_name: Option<String>,
|
||||||
|
pub message: String,
|
||||||
|
pub suggestion: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, PartialEq)]
|
||||||
|
pub enum ValidationErrorType {
|
||||||
|
TableNotFound,
|
||||||
|
ColumnNotFound,
|
||||||
|
TypeMismatch,
|
||||||
|
DataSourceError,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct DeployDatasetsResponse {
|
||||||
|
pub results: Vec<ValidationResult>,
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
[0m16:43:12.295485 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x118927770>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x118946f00>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x118947b90>]}
|
||||||
|
[0m16:43:12.299354 [debug] [MainThread]: An error was encountered while trying to send an event
|
||||||
|
|
||||||
|
|
||||||
|
============================== 16:43:12.299620 | 70b3c2c6-8fd9-4fa3-81da-c626eae8c910 ==============================
|
||||||
|
[0m16:43:12.299620 [info ] [MainThread]: Running with dbt=1.9.1
|
||||||
|
[0m16:43:12.299904 [debug] [MainThread]: running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/Users/dallin/.dbt', 'debug': 'False', 'warn_error': 'None', 'log_path': 'logs', 'version_check': 'True', 'fail_fast': 'False', 'use_colors': 'True', 'use_experimental_parser': 'False', 'empty': 'None', 'quiet': 'False', 'no_print': 'None', 'log_format': 'default', 'invocation_command': 'dbt ', 'introspect': 'True', 'static_parser': 'True', 'target_path': 'None', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'send_anonymous_usage_stats': 'True'}
|
||||||
|
[0m16:43:12.300518 [error] [MainThread]: Encountered an error:
|
||||||
|
Runtime Error
|
||||||
|
dbt_project.yml does not parse to a dictionary
|
||||||
|
[0m16:43:12.307797 [debug] [MainThread]: Resource report: {"command_name": "deps", "command_success": false, "command_wall_clock_time": 0.059449833, "process_in_blocks": "0", "process_kernel_time": 0.115982, "process_mem_max_rss": "109068288", "process_out_blocks": "0", "process_user_time": 0.735308}
|
||||||
|
[0m16:43:12.308245 [debug] [MainThread]: Command `cli deps` failed at 16:43:12.308182 after 0.06 seconds
|
||||||
|
[0m16:43:12.308493 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x100796ff0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1187fb320>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1189471d0>]}
|
||||||
|
[0m16:43:12.308659 [debug] [MainThread]: An error was encountered while trying to send an event
|
||||||
|
[0m16:43:12.308794 [debug] [MainThread]: Flushing usage events
|
||||||
|
[0m16:51:16.058543 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x112e66a80>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1131278c0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1131471d0>]}
|
||||||
|
[0m16:51:16.062068 [debug] [MainThread]: An error was encountered while trying to send an event
|
||||||
|
|
||||||
|
|
||||||
|
============================== 16:51:16.062308 | 8665ca62-25ab-439c-b703-12898d07be6d ==============================
|
||||||
|
[0m16:51:16.062308 [info ] [MainThread]: Running with dbt=1.9.1
|
||||||
|
[0m16:51:16.062585 [debug] [MainThread]: running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'log_cache_events': 'False', 'write_json': 'True', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/Users/dallin/.dbt', 'fail_fast': 'False', 'version_check': 'True', 'log_path': 'logs', 'debug': 'False', 'warn_error': 'None', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'empty': 'None', 'log_format': 'default', 'introspect': 'True', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'static_parser': 'True', 'target_path': 'None', 'invocation_command': 'dbt ', 'send_anonymous_usage_stats': 'True'}
|
||||||
|
[0m16:51:16.063228 [error] [MainThread]: Encountered an error:
|
||||||
|
Runtime Error
|
||||||
|
dbt_project.yml does not parse to a dictionary
|
||||||
|
[0m16:51:16.064688 [debug] [MainThread]: Resource report: {"command_name": "deps", "command_success": false, "command_wall_clock_time": 0.050356373, "process_in_blocks": "0", "process_kernel_time": 0.096193, "process_mem_max_rss": "108462080", "process_out_blocks": "0", "process_user_time": 0.712887}
|
||||||
|
[0m16:51:16.065111 [debug] [MainThread]: Command `cli deps` failed at 16:51:16.065048 after 0.05 seconds
|
||||||
|
[0m16:51:16.065346 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x112241640>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x112e07f20>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11315bcb0>]}
|
||||||
|
[0m16:51:16.065563 [debug] [MainThread]: An error was encountered while trying to send an event
|
||||||
|
[0m16:51:16.065718 [debug] [MainThread]: Flushing usage events
|
Loading…
Reference in New Issue