mirror of https://github.com/buster-so/buster.git
Merge branch 'staging' into evals
This commit is contained in:
commit
91ebca260c
|
@ -330,6 +330,32 @@ async fn deploy_datasets_handler(
|
|||
}
|
||||
};
|
||||
|
||||
// ---- Fetch Existing Dataset IDs for this data source ----
|
||||
let request_db_names: Vec<String> = group.iter().map(|req| req.name.clone()).collect();
|
||||
let existing_dataset_ids: HashMap<String, Uuid> = match datasets::table
|
||||
.filter(datasets::data_source_id.eq(data_source.id))
|
||||
.filter(datasets::database_name.eq_any(&request_db_names))
|
||||
// .filter(datasets::deleted_at.is_null()) // Keep this commented if we want to reuse IDs of soft-deleted datasets
|
||||
.select((datasets::database_name, datasets::id))
|
||||
.load::<(String, Uuid)>(&mut conn)
|
||||
.await {
|
||||
Ok(ids) => ids.into_iter().collect(),
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to retrieve existing dataset IDs for data source '{}': {}",
|
||||
data_source_name,
|
||||
e
|
||||
);
|
||||
// If we can't fetch existing IDs, we might proceed with new IDs,
|
||||
// or fail the whole batch for this data source. Failing is safer.
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to retrieve existing dataset IDs: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
};
|
||||
// ---- End Fetch Existing IDs ----
|
||||
|
||||
// Process all requests in the group directly
|
||||
let mut datasets_to_upsert_map: HashMap<(String, Uuid), Dataset> = HashMap::new();
|
||||
let mut columns_to_upsert_map: HashMap<String, Vec<DatasetColumn>> = HashMap::new();
|
||||
|
@ -346,14 +372,27 @@ async fn deploy_datasets_handler(
|
|||
results.push(validation); // Add to results now
|
||||
|
||||
let now = Utc::now();
|
||||
let dataset_id = req.id.unwrap_or_else(Uuid::new_v4);
|
||||
|
||||
// ---- Determine Dataset ID: Prioritize existing ID ----
|
||||
let dataset_id = match existing_dataset_ids.get(&req.name) {
|
||||
Some(existing_id) => {
|
||||
tracing::debug!("Using existing dataset ID {} for '{}'", existing_id, req.name);
|
||||
*existing_id
|
||||
}
|
||||
None => {
|
||||
let new_id = req.id.unwrap_or_else(Uuid::new_v4);
|
||||
tracing::debug!("Using new/request dataset ID {} for '{}'", new_id, req.name);
|
||||
new_id
|
||||
}
|
||||
};
|
||||
// ---- End Determine Dataset ID ----
|
||||
|
||||
// Prepare Dataset for upsert
|
||||
let dataset = Dataset {
|
||||
id: dataset_id,
|
||||
id: dataset_id, // Use the determined ID (existing or new/request)
|
||||
name: req.name.clone(),
|
||||
data_source_id: data_source.id, // Use fetched data source ID
|
||||
created_at: now,
|
||||
created_at: now, // Will be ignored by upsert if row exists
|
||||
updated_at: now,
|
||||
database_name: req.name.clone(), // Use model name as database_name
|
||||
when_to_use: Some(req.description.clone()),
|
||||
|
@ -436,7 +475,6 @@ async fn deploy_datasets_handler(
|
|||
.on_conflict((datasets::database_name, datasets::data_source_id))
|
||||
.do_update()
|
||||
.set((
|
||||
datasets::id.eq(excluded(datasets::id)), // Ensure ID is updated on conflict if new one generated
|
||||
datasets::name.eq(excluded(datasets::name)),
|
||||
datasets::updated_at.eq(now), // Use current time for update
|
||||
datasets::updated_by.eq(excluded(datasets::updated_by)),
|
||||
|
|
Loading…
Reference in New Issue