mirror of https://github.com/buster-so/buster.git
parse, deploy, config changes
This commit is contained in:
parent
19cf2d1f2c
commit
b0af860c07
|
@ -54,6 +54,9 @@ pub async fn deploy_datasets_handler_core(
|
||||||
.push(req);
|
.push(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut successfully_processed_data_source_ids: HashSet<Uuid> = HashSet::new();
|
||||||
|
let mut deployed_datasets_by_data_source: HashMap<Uuid, HashSet<String>> = HashMap::new();
|
||||||
|
|
||||||
for ((data_source_name, env, _database), group) in data_source_groups {
|
for ((data_source_name, env, _database), group) in data_source_groups {
|
||||||
// Get data source
|
// Get data source
|
||||||
let data_source = match database::schema::data_sources::table // Incorrect path
|
let data_source = match database::schema::data_sources::table // Incorrect path
|
||||||
|
@ -87,6 +90,8 @@ pub async fn deploy_datasets_handler_core(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
successfully_processed_data_source_ids.insert(data_source.id);
|
||||||
|
|
||||||
let request_db_names: Vec<String> = group.iter().map(|req| req.name.clone()).collect();
|
let request_db_names: Vec<String> = group.iter().map(|req| req.name.clone()).collect();
|
||||||
let existing_dataset_ids: HashMap<String, Uuid> = match database::schema::datasets::table // Incorrect path
|
let existing_dataset_ids: HashMap<String, Uuid> = match database::schema::datasets::table // Incorrect path
|
||||||
.filter(database::schema::datasets::data_source_id.eq(data_source.id))
|
.filter(database::schema::datasets::data_source_id.eq(data_source.id))
|
||||||
|
@ -98,20 +103,47 @@ pub async fn deploy_datasets_handler_core(
|
||||||
Ok(ids) => ids.into_iter().collect(),
|
Ok(ids) => ids.into_iter().collect(),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to retrieve existing dataset IDs for data source '{}': {}", data_source_name, e);
|
error!("Failed to retrieve existing dataset IDs for data source '{}': {}", data_source_name, e);
|
||||||
return Err(anyhow::anyhow!("Failed to retrieve existing dataset IDs: {}", e));
|
// Propagate error for this group or mark results as failed
|
||||||
|
for req_in_group in group {
|
||||||
|
if let Some(validation_result) = results.iter_mut().find(|r| r.model_name == req_in_group.name && r.data_source_name == req_in_group.data_source_name && r.schema == req_in_group.schema) {
|
||||||
|
validation_result.add_error(ValidationError::internal_error(format!("Failed to retrieve existing dataset IDs: {}", e)));
|
||||||
|
} else {
|
||||||
|
let mut validation = ValidationResult::new(
|
||||||
|
req_in_group.name.clone(),
|
||||||
|
req_in_group.data_source_name.clone(),
|
||||||
|
req_in_group.schema.clone(),
|
||||||
|
);
|
||||||
|
validation.add_error(ValidationError::internal_error(format!("Failed to retrieve existing dataset IDs: {}", e)));
|
||||||
|
results.push(validation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue; // Continue to the next group
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut datasets_to_upsert_map: HashMap<(String, Uuid), database::models::Dataset> = HashMap::new(); // Incorrect path
|
let mut datasets_to_upsert_map: HashMap<(String, Uuid), database::models::Dataset> = HashMap::new(); // Incorrect path
|
||||||
|
|
||||||
for req in group.clone() {
|
for req in group.clone() { // group is Vec<&DeployDatasetsRequest>
|
||||||
|
// Collect names of datasets intended for deployment in this group for this data_source.id
|
||||||
|
deployed_datasets_by_data_source
|
||||||
|
.entry(data_source.id)
|
||||||
|
.or_default()
|
||||||
|
.insert(req.name.clone());
|
||||||
|
|
||||||
let mut validation = ValidationResult::new(
|
let mut validation = ValidationResult::new(
|
||||||
req.name.clone(),
|
req.name.clone(),
|
||||||
req.data_source_name.clone(),
|
req.data_source_name.clone(),
|
||||||
req.schema.clone(),
|
req.schema.clone(),
|
||||||
);
|
);
|
||||||
validation.success = true;
|
// Basic validation, e.g., if model is required
|
||||||
results.push(validation); // Add to results early
|
if req.model.is_none() { // Example validation: model is required
|
||||||
|
validation.add_error(ValidationError::internal_error("Field 'model' is required.".to_string()));
|
||||||
|
results.push(validation);
|
||||||
|
continue; // Skip this request
|
||||||
|
}
|
||||||
|
validation.success = true; // Assume success initially, will be overridden by upsert errors
|
||||||
|
results.push(validation);
|
||||||
|
|
||||||
|
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
let dataset_id = existing_dataset_ids.get(&req.name).copied().unwrap_or_else(|| req.id.unwrap_or_else(Uuid::new_v4));
|
let dataset_id = existing_dataset_ids.get(&req.name).copied().unwrap_or_else(|| req.id.unwrap_or_else(Uuid::new_v4));
|
||||||
|
@ -120,7 +152,7 @@ pub async fn deploy_datasets_handler_core(
|
||||||
id: dataset_id,
|
id: dataset_id,
|
||||||
name: req.name.clone(),
|
name: req.name.clone(),
|
||||||
data_source_id: data_source.id,
|
data_source_id: data_source.id,
|
||||||
created_at: now,
|
created_at: now, // This will be handled by DB default or on_conflict for new records
|
||||||
updated_at: now,
|
updated_at: now,
|
||||||
database_name: req.name.clone(),
|
database_name: req.name.clone(),
|
||||||
when_to_use: Some(req.description.clone()),
|
when_to_use: Some(req.description.clone()),
|
||||||
|
@ -128,10 +160,10 @@ pub async fn deploy_datasets_handler_core(
|
||||||
type_: database::enums::DatasetType::View, // Incorrect path
|
type_: database::enums::DatasetType::View, // Incorrect path
|
||||||
definition: req.sql_definition.clone().unwrap_or_default(), // Still keeping SQL definition
|
definition: req.sql_definition.clone().unwrap_or_default(), // Still keeping SQL definition
|
||||||
schema: req.schema.clone(),
|
schema: req.schema.clone(),
|
||||||
enabled: true,
|
enabled: true, // By default, a deployed dataset is enabled
|
||||||
created_by: *user_id,
|
created_by: *user_id, // This will be handled by DB default or on_conflict for new records
|
||||||
updated_by: *user_id,
|
updated_by: *user_id,
|
||||||
deleted_at: None,
|
deleted_at: None, // Explicitly mark as not deleted
|
||||||
imported: false,
|
imported: false,
|
||||||
organization_id: organization_id,
|
organization_id: organization_id,
|
||||||
model: req.model.clone(),
|
model: req.model.clone(),
|
||||||
|
@ -156,26 +188,28 @@ pub async fn deploy_datasets_handler_core(
|
||||||
database::schema::datasets::definition.eq(excluded(database::schema::datasets::definition)),
|
database::schema::datasets::definition.eq(excluded(database::schema::datasets::definition)),
|
||||||
database::schema::datasets::when_to_use.eq(excluded(database::schema::datasets::when_to_use)),
|
database::schema::datasets::when_to_use.eq(excluded(database::schema::datasets::when_to_use)),
|
||||||
database::schema::datasets::model.eq(excluded(database::schema::datasets::model)),
|
database::schema::datasets::model.eq(excluded(database::schema::datasets::model)),
|
||||||
database::schema::datasets::yml_file.eq(excluded(database::schema::datasets::yml_file)), // Upsert yml_file
|
database::schema::datasets::yml_file.eq(excluded(database::schema::datasets::yml_file)),
|
||||||
database::schema::datasets::schema.eq(excluded(database::schema::datasets::schema)),
|
database::schema::datasets::schema.eq(excluded(database::schema::datasets::schema)),
|
||||||
database::schema::datasets::database_identifier.eq(excluded(database::schema::datasets::database_identifier)),
|
database::schema::datasets::database_identifier.eq(excluded(database::schema::datasets::database_identifier)),
|
||||||
database::schema::datasets::enabled.eq(excluded(database::schema::datasets::enabled)),
|
database::schema::datasets::enabled.eq(true), // Directly set to true on upsert
|
||||||
database::schema::datasets::deleted_at.eq(None::<DateTime<Utc>>),
|
database::schema::datasets::deleted_at.eq(None as Option<DateTime<Utc>>), // Explicitly ensure it's not deleted
|
||||||
))
|
))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_) => info!("Successfully upserted {} datasets for data source '{}'", datasets_to_upsert.len(), data_source_name),
|
Ok(num_upserted) => {
|
||||||
|
info!("Successfully upserted {} datasets for data source '{}'", num_upserted, data_source_name);
|
||||||
|
// Success is already marked for validation results, no change needed here unless specific counts matter.
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to bulk upsert datasets for data source '{}': {}", data_source_name, e);
|
error!("Failed to bulk upsert datasets for data_source_id '{}': {}", data_source.id, e);
|
||||||
// Mark all results for this group as failed
|
// Mark all results for this group's successfully mapped datasets as failed
|
||||||
for req_in_group in group {
|
for dataset_to_upsert in datasets_to_upsert {
|
||||||
if let Some(validation_result) = results.iter_mut().find(|r| r.model_name == req_in_group.name && r.data_source_name == req_in_group.data_source_name) {
|
if let Some(validation_result) = results.iter_mut().find(|r| r.model_name == dataset_to_upsert.name && r.data_source_name == data_source_name && r.schema == dataset_to_upsert.schema) {
|
||||||
|
validation_result.success = false; // Mark as false explicitly
|
||||||
validation_result.add_error(ValidationError::internal_error(format!("Failed to upsert dataset: {}", e)));
|
validation_result.add_error(ValidationError::internal_error(format!("Failed to upsert dataset: {}", e)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Optionally, you might want to return early here or collect errors and continue
|
|
||||||
// For now, let's just log and the results will reflect the failure.
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Column processing is skipped as per requirements.
|
// Column processing is skipped as per requirements.
|
||||||
|
@ -185,6 +219,80 @@ pub async fn deploy_datasets_handler_core(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- SOFT DELETION LOGIC ---
|
||||||
|
info!("Starting soft-deletion phase for datasets not in the current deployment batch...");
|
||||||
|
|
||||||
|
for data_source_id_to_clean in successfully_processed_data_source_ids {
|
||||||
|
let names_in_current_deployment = deployed_datasets_by_data_source
|
||||||
|
.get(&data_source_id_to_clean)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let active_datasets_in_db: Vec<(Uuid, String)> = match database::schema::datasets::table
|
||||||
|
.filter(database::schema::datasets::data_source_id.eq(data_source_id_to_clean))
|
||||||
|
.filter(database::schema::datasets::deleted_at.is_null())
|
||||||
|
.select((database::schema::datasets::id, database::schema::datasets::database_name))
|
||||||
|
.load::<(Uuid, String)>(&mut conn)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(datasets) => datasets,
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"SOFT_DELETE_FAIL: Failed to retrieve active datasets for data_source_id '{}': {}. Skipping soft-deletion for this data source.",
|
||||||
|
data_source_id_to_clean, e
|
||||||
|
);
|
||||||
|
// Optionally, add a non-model-specific error to `results` or a general operational warning.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut dataset_ids_to_soft_delete: Vec<Uuid> = Vec::new();
|
||||||
|
for (dataset_id, dataset_name_in_db) in active_datasets_in_db {
|
||||||
|
if !names_in_current_deployment.contains(&dataset_name_in_db) {
|
||||||
|
dataset_ids_to_soft_delete.push(dataset_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !dataset_ids_to_soft_delete.is_empty() {
|
||||||
|
info!(
|
||||||
|
"SOFT_DELETE: Identified {} datasets to soft-delete for data_source_id '{}'.",
|
||||||
|
dataset_ids_to_soft_delete.len(),
|
||||||
|
data_source_id_to_clean
|
||||||
|
);
|
||||||
|
let now = Utc::now();
|
||||||
|
match diesel::update(database::schema::datasets::table.filter(database::schema::datasets::id.eq_any(&dataset_ids_to_soft_delete)))
|
||||||
|
.set((
|
||||||
|
database::schema::datasets::deleted_at.eq(now),
|
||||||
|
database::schema::datasets::updated_at.eq(now),
|
||||||
|
database::schema::datasets::updated_by.eq(*user_id),
|
||||||
|
database::schema::datasets::enabled.eq(false),
|
||||||
|
))
|
||||||
|
.execute(&mut conn)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(num_deleted) => {
|
||||||
|
info!(
|
||||||
|
"SOFT_DELETE_SUCCESS: Successfully soft-deleted {} datasets for data_source_id '{}'.",
|
||||||
|
num_deleted, data_source_id_to_clean
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"SOFT_DELETE_FAIL: Failed to soft-delete {} datasets for data_source_id '{}': {}. These datasets may remain active erroneously.",
|
||||||
|
dataset_ids_to_soft_delete.len(), data_source_id_to_clean, e
|
||||||
|
);
|
||||||
|
// Optionally, add a non-model-specific error to `results` or a general operational warning.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"SOFT_DELETE: No datasets to soft-delete for data_source_id '{}'. All active datasets were part of the deployment or no relevant active datasets found.",
|
||||||
|
data_source_id_to_clean
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// --- END SOFT DELETION LOGIC ---
|
||||||
|
|
||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -741,7 +741,7 @@ fn handle_deploy_response(
|
||||||
let mut has_validation_errors = false;
|
let mut has_validation_errors = false;
|
||||||
|
|
||||||
// Process validation results
|
// Process validation results
|
||||||
for validation in &response.results {
|
for validation in response.results.iter() {
|
||||||
// Find corresponding file from model mapping
|
// Find corresponding file from model mapping
|
||||||
let file = model_mappings
|
let file = model_mappings
|
||||||
.iter()
|
.iter()
|
||||||
|
|
Loading…
Reference in New Issue