mirror of https://github.com/buster-so/buster.git
bugfix - cli missing types and deploy
This commit is contained in:
parent
bcf242824c
commit
116677c0ab
|
@ -107,7 +107,7 @@ jobs:
|
|||
- name: Get version
|
||||
id: get_version
|
||||
run: |
|
||||
VERSION=0.0.3
|
||||
VERSION=0.0.4
|
||||
echo "version=$VERSION" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Create Release
|
||||
|
|
|
@ -618,282 +618,4 @@ async fn deploy_datasets_handler(
|
|||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
async fn batch_validate_datasets(
|
||||
user_id: &Uuid,
|
||||
requests: Vec<DatasetValidationRequest>,
|
||||
) -> Result<BatchValidationResult> {
|
||||
let mut successes = Vec::new();
|
||||
let mut failures = Vec::new();
|
||||
let organization_id = get_user_organization_id(user_id).await?;
|
||||
|
||||
// Group requests by data source and database for efficient validation
|
||||
let mut data_source_groups: HashMap<
|
||||
(String, Option<String>),
|
||||
Vec<(&DatasetValidationRequest, Vec<(&str, &str)>)>,
|
||||
> = HashMap::new();
|
||||
|
||||
for request in &requests {
|
||||
let columns: Vec<(&str, &str)> = request
|
||||
.columns
|
||||
.iter()
|
||||
.map(|c| (c.name.as_str(), c.type_.as_deref().unwrap_or("text")))
|
||||
.collect();
|
||||
|
||||
data_source_groups
|
||||
.entry((request.data_source_name.clone(), None)) // Using None for database since it's not in the validation request
|
||||
.or_default()
|
||||
.push((request, columns));
|
||||
}
|
||||
|
||||
// Process each data source group
|
||||
for ((data_source_name, database), group) in data_source_groups {
|
||||
let mut conn = get_pg_pool().get().await?;
|
||||
|
||||
// Get data source
|
||||
let data_source = match data_sources::table
|
||||
.filter(data_sources::name.eq(&data_source_name))
|
||||
.filter(data_sources::organization_id.eq(organization_id))
|
||||
.select(data_sources::all_columns)
|
||||
.first::<DataSource>(&mut conn)
|
||||
.await
|
||||
{
|
||||
Ok(ds) => ds,
|
||||
Err(e) => {
|
||||
for (request, _) in group {
|
||||
failures.push(DatasetValidationFailure {
|
||||
dataset_id: request.dataset_id,
|
||||
name: request.name.clone(),
|
||||
schema: request.schema.clone(),
|
||||
data_source_name: request.data_source_name.clone(),
|
||||
errors: vec![ValidationError::data_source_error(format!(
|
||||
"Data source not found: {}",
|
||||
e
|
||||
))],
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Prepare tables for batch validation
|
||||
let tables_to_validate: Vec<(String, String)> = group
|
||||
.iter()
|
||||
.map(|(req, _)| (req.name.clone(), req.schema.clone()))
|
||||
.collect();
|
||||
|
||||
// Get credentials
|
||||
let credentials =
|
||||
match get_data_source_credentials(&data_source.secret_id, &data_source.type_, false)
|
||||
.await
|
||||
{
|
||||
Ok(creds) => creds,
|
||||
Err(e) => {
|
||||
for (request, _) in group {
|
||||
failures.push(DatasetValidationFailure {
|
||||
dataset_id: request.dataset_id,
|
||||
name: request.name.clone(),
|
||||
schema: request.schema.clone(),
|
||||
data_source_name: request.data_source_name.clone(),
|
||||
errors: vec![ValidationError::data_source_error(format!(
|
||||
"Failed to get data source credentials: {}",
|
||||
e
|
||||
))],
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Get all columns in one batch
|
||||
let ds_columns =
|
||||
match retrieve_dataset_columns_batch(&tables_to_validate, &credentials, database).await {
|
||||
Ok(cols) => cols,
|
||||
Err(e) => {
|
||||
for (request, _) in group {
|
||||
failures.push(DatasetValidationFailure {
|
||||
dataset_id: request.dataset_id,
|
||||
name: request.name.clone(),
|
||||
schema: request.schema.clone(),
|
||||
data_source_name: request.data_source_name.clone(),
|
||||
errors: vec![ValidationError::data_source_error(format!(
|
||||
"Failed to get columns from data source: {}",
|
||||
e
|
||||
))],
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Validate each dataset in the group
|
||||
for (request, columns) in group {
|
||||
let mut validation_errors = Vec::new();
|
||||
|
||||
// Filter columns for this dataset
|
||||
let dataset_columns: Vec<_> = ds_columns
|
||||
.iter()
|
||||
.filter(|col| col.dataset_name == request.name && col.schema_name == request.schema)
|
||||
.collect();
|
||||
|
||||
if dataset_columns.is_empty() {
|
||||
validation_errors.push(ValidationError::table_not_found(&request.name));
|
||||
} else {
|
||||
// Validate each column exists
|
||||
for (col_name, _) in &columns {
|
||||
if !dataset_columns.iter().any(|c| c.name == *col_name) {
|
||||
validation_errors.push(ValidationError::column_not_found(col_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if validation_errors.is_empty() {
|
||||
// Create or update dataset
|
||||
match create_or_update_dataset(request, &organization_id, user_id).await {
|
||||
Ok(dataset_id) => {
|
||||
successes.push(DatasetValidationSuccess {
|
||||
dataset_id,
|
||||
name: request.name.clone(),
|
||||
schema: request.schema.clone(),
|
||||
data_source_name: request.data_source_name.clone(),
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
failures.push(DatasetValidationFailure {
|
||||
dataset_id: request.dataset_id,
|
||||
name: request.name.clone(),
|
||||
schema: request.schema.clone(),
|
||||
data_source_name: request.data_source_name.clone(),
|
||||
errors: vec![ValidationError::data_source_error(format!(
|
||||
"Failed to create/update dataset: {}",
|
||||
e
|
||||
))],
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
failures.push(DatasetValidationFailure {
|
||||
dataset_id: request.dataset_id,
|
||||
name: request.name.clone(),
|
||||
schema: request.schema.clone(),
|
||||
data_source_name: request.data_source_name.clone(),
|
||||
errors: validation_errors,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(BatchValidationResult {
|
||||
successes,
|
||||
failures,
|
||||
})
|
||||
}
|
||||
|
||||
async fn create_or_update_dataset(
|
||||
request: &DatasetValidationRequest,
|
||||
organization_id: &Uuid,
|
||||
user_id: &Uuid,
|
||||
) -> Result<Uuid> {
|
||||
let mut conn = get_pg_pool().get().await?;
|
||||
let now = Utc::now();
|
||||
|
||||
let dataset_id = match request.dataset_id {
|
||||
Some(id) => {
|
||||
// Update existing dataset
|
||||
diesel::update(datasets::table)
|
||||
.filter(datasets::id.eq(id))
|
||||
.set((
|
||||
datasets::name.eq(&request.name),
|
||||
datasets::updated_at.eq(now),
|
||||
datasets::updated_by.eq(user_id),
|
||||
))
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
id
|
||||
}
|
||||
None => {
|
||||
// Create new dataset
|
||||
let dataset = Dataset {
|
||||
id: Uuid::new_v4(),
|
||||
name: request.name.clone(),
|
||||
data_source_id: Uuid::new_v4(), // This needs to be set correctly
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
database_name: request.name.clone(),
|
||||
when_to_use: None,
|
||||
when_not_to_use: None,
|
||||
type_: DatasetType::View,
|
||||
definition: String::new(),
|
||||
schema: request.schema.clone(),
|
||||
enabled: false,
|
||||
created_by: user_id.clone(),
|
||||
updated_by: user_id.clone(),
|
||||
deleted_at: None,
|
||||
imported: false,
|
||||
organization_id: organization_id.clone(),
|
||||
yml_file: None,
|
||||
model: None,
|
||||
database_identifier: None,
|
||||
};
|
||||
|
||||
diesel::insert_into(datasets::table)
|
||||
.values(&dataset)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
dataset.id
|
||||
}
|
||||
};
|
||||
|
||||
// Create new columns
|
||||
let new_columns: Vec<DatasetColumn> = request
|
||||
.columns
|
||||
.iter()
|
||||
.map(|col| DatasetColumn {
|
||||
id: Uuid::new_v4(),
|
||||
dataset_id,
|
||||
name: col.name.clone(),
|
||||
type_: col.type_.clone().unwrap_or_else(|| "text".to_string()),
|
||||
description: Some(col.description.clone()),
|
||||
nullable: true, // This should be determined from the source
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
deleted_at: None,
|
||||
stored_values: None,
|
||||
stored_values_status: None,
|
||||
stored_values_error: None,
|
||||
stored_values_count: None,
|
||||
stored_values_last_synced: None,
|
||||
semantic_type: col.semantic_type.clone(),
|
||||
dim_type: None,
|
||||
expr: col.expr.clone(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Get current column names for this dataset
|
||||
let current_column_names: Vec<String> = dataset_columns::table
|
||||
.filter(dataset_columns::dataset_id.eq(dataset_id))
|
||||
.filter(dataset_columns::deleted_at.is_null())
|
||||
.select(dataset_columns::name)
|
||||
.load::<String>(&mut conn)
|
||||
.await?;
|
||||
|
||||
// Soft delete columns that are no longer present
|
||||
let new_column_names: Vec<String> = new_columns.iter().map(|c| c.name.clone()).collect();
|
||||
diesel::update(dataset_columns::table)
|
||||
.filter(dataset_columns::dataset_id.eq(dataset_id))
|
||||
.filter(dataset_columns::deleted_at.is_null())
|
||||
.filter(dataset_columns::name.ne_all(&new_column_names))
|
||||
.set(dataset_columns::deleted_at.eq(now))
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
// Insert new columns
|
||||
diesel::insert_into(dataset_columns::table)
|
||||
.values(&new_columns)
|
||||
.execute(&mut conn)
|
||||
.await?;
|
||||
|
||||
Ok(dataset_id)
|
||||
}
|
||||
}
|
|
@ -94,25 +94,25 @@ fn map_snowflake_type(type_str: &str) -> ColumnMappingType {
|
|||
match type_upper.as_str() {
|
||||
// Numeric types that should be measures
|
||||
"NUMBER" | "DECIMAL" | "NUMERIC" | "FLOAT" | "REAL" | "DOUBLE" | "INT" | "INTEGER" |
|
||||
"BIGINT" | "SMALLINT" | "TINYINT" | "BYTEINT" => ColumnMappingType::Measure("number".to_string()),
|
||||
"BIGINT" | "SMALLINT" | "TINYINT" | "BYTEINT" => ColumnMappingType::Measure(type_str.to_string()),
|
||||
|
||||
// Date/Time types
|
||||
"DATE" | "DATETIME" | "TIME" | "TIMESTAMP" | "TIMESTAMP_LTZ" |
|
||||
"TIMESTAMP_NTZ" | "TIMESTAMP_TZ" => ColumnMappingType::Dimension("timestamp".to_string()),
|
||||
"TIMESTAMP_NTZ" | "TIMESTAMP_TZ" => ColumnMappingType::Dimension(type_str.to_string()),
|
||||
|
||||
// String types
|
||||
"TEXT" | "STRING" | "VARCHAR" | "CHAR" | "CHARACTER" => ColumnMappingType::Dimension("string".to_string()),
|
||||
"TEXT" | "STRING" | "VARCHAR" | "CHAR" | "CHARACTER" => ColumnMappingType::Dimension(type_str.to_string()),
|
||||
|
||||
// Boolean type
|
||||
"BOOLEAN" | "BOOL" => ColumnMappingType::Dimension("boolean".to_string()),
|
||||
"BOOLEAN" | "BOOL" => ColumnMappingType::Dimension(type_str.to_string()),
|
||||
|
||||
// Unsupported types
|
||||
"ARRAY" | "OBJECT" | "VARIANT" => ColumnMappingType::Unsupported,
|
||||
|
||||
// Default to dimension for unknown types
|
||||
_ => {
|
||||
tracing::warn!("Unknown Snowflake type: {}, defaulting to string dimension", type_str);
|
||||
ColumnMappingType::Dimension("string".to_string())
|
||||
tracing::warn!("Unknown Snowflake type: {}, defaulting to dimension", type_str);
|
||||
ColumnMappingType::Dimension(type_str.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "buster-cli"
|
||||
version = "0.0.3"
|
||||
version = "0.0.4"
|
||||
edition = "2021"
|
||||
build = "build.rs"
|
||||
|
||||
|
|
|
@ -70,6 +70,8 @@ pub struct Measure {
|
|||
expr: String,
|
||||
agg: String,
|
||||
description: String,
|
||||
#[serde(rename = "type")]
|
||||
measure_type: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -477,9 +479,9 @@ impl ModelFile {
|
|||
description: measure.description.clone(),
|
||||
semantic_type: Some("measure".to_string()),
|
||||
expr: Some(measure.expr.clone()),
|
||||
type_: None,
|
||||
type_: measure.measure_type.clone(),
|
||||
agg: Some(measure.agg.clone()),
|
||||
searchable: false, // Measures don't have stored values
|
||||
searchable: false,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue