Refactor dataset deployment logic and enforce unique constraints

- Updated the SQL migration to enforce a unique constraint on the combination of `database_name` and `data_source_id` in the datasets table, ensuring data integrity.
- Refactored the `deploy_datasets_handler` to separate datasets with and without IDs, allowing for concurrent upsert operations based on their presence.
- Enhanced the upsert logic to handle datasets more efficiently, improving performance during dataset deployment.

These changes improve the robustness and efficiency of the dataset deployment process within the API.
This commit is contained in:
dal 2025-01-09 13:05:54 -07:00
parent 761028c95c
commit 43abb0321e
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 52 additions and 26 deletions

View File

@ -6,4 +6,4 @@
AND a.data_source_id = b.data_source_id; AND a.data_source_id = b.data_source_id;
ALTER TABLE datasets ALTER TABLE datasets
ADD CONSTRAINT datasets_database_name_data_source_id_key UNIQUE (id, database_name, data_source_id); ADD CONSTRAINT datasets_database_name_data_source_id_key UNIQUE (database_name, data_source_id);

View File

@ -252,7 +252,8 @@ async fn deploy_datasets_handler(
.map_err(|_| anyhow!("Data sources not found"))?; .map_err(|_| anyhow!("Data sources not found"))?;
// Need to create the datasets, multiple datasets can be created at once. // Need to create the datasets, multiple datasets can be created at once.
let mut datasets = Vec::new(); let mut datasets_with_ids = Vec::new();
let mut datasets_without_ids = Vec::new();
for req in &requests { for req in &requests {
let data_source = data_sources let data_source = data_sources
@ -289,33 +290,58 @@ async fn deploy_datasets_handler(
yml_file: req.yml_file.clone(), yml_file: req.yml_file.clone(),
}; };
datasets.push(dataset); match req.id {
Some(_) => datasets_with_ids.push(dataset),
None => datasets_without_ids.push(dataset),
}
} }
// Upsert the datasets into the database. // Upsert the datasets into the database concurrently
let inserted_datasets = diesel::insert_into(datasets::table) let (inserted_id_datasets, inserted_no_id_datasets) = tokio::try_join!(
.values(&datasets) async {
.on_conflict(( let mut conn = pg_pool.get().await?;
datasets::id, diesel::insert_into(datasets::table)
datasets::database_name, .values(&datasets_with_ids)
datasets::data_source_id, .on_conflict(datasets::id)
)) .do_update()
.do_update() .set((
.set(( datasets::name.eq(excluded(datasets::name)),
datasets::name.eq(excluded(datasets::name)), datasets::when_to_use.eq(excluded(datasets::when_to_use)),
datasets::data_source_id.eq(excluded(datasets::data_source_id)), datasets::definition.eq(excluded(datasets::definition)),
datasets::when_to_use.eq(excluded(datasets::when_to_use)), datasets::type_.eq(excluded(datasets::type_)),
datasets::definition.eq(excluded(datasets::definition)), datasets::schema.eq(excluded(datasets::schema)),
datasets::type_.eq(excluded(datasets::type_)), datasets::updated_at.eq(Utc::now()),
datasets::schema.eq(excluded(datasets::schema)), datasets::deleted_at.eq(None::<DateTime<Utc>>),
datasets::updated_at.eq(Utc::now()), ))
datasets::deleted_at.eq(None::<DateTime<Utc>>), .returning(Dataset::as_select())
)) .get_results::<Dataset>(&mut conn)
.returning(Dataset::as_select()) .await
.get_results::<Dataset>(&mut conn) .map_err(|e| anyhow!("Failed to create dataset: {}", e))
.await },
.map_err(|e| anyhow!("Failed to create dataset: {}", e))?; async {
let mut conn = pg_pool.get().await?;
diesel::insert_into(datasets::table)
.values(&datasets_without_ids)
.on_conflict((datasets::database_name, datasets::data_source_id))
.do_update()
.set((
datasets::name.eq(excluded(datasets::name)),
datasets::when_to_use.eq(excluded(datasets::when_to_use)),
datasets::definition.eq(excluded(datasets::definition)),
datasets::type_.eq(excluded(datasets::type_)),
datasets::schema.eq(excluded(datasets::schema)),
datasets::updated_at.eq(Utc::now()),
datasets::deleted_at.eq(None::<DateTime<Utc>>),
))
.returning(Dataset::as_select())
.get_results::<Dataset>(&mut conn)
.await
.map_err(|e| anyhow!("Failed to create dataset: {}", e))
}
)?;
let mut inserted_datasets = inserted_id_datasets;
inserted_datasets.extend(inserted_no_id_datasets);
// Upsert the dataset columns into the database. We will pull these from the data source for their types. // Upsert the dataset columns into the database. We will pull these from the data source for their types.
let mut columns_to_upsert: Vec<DatasetColumn> = Vec::new(); let mut columns_to_upsert: Vec<DatasetColumn> = Vec::new();